Server tools to backfill, tail, mirror, and verify PLC logs
at main 664 lines 20 kB view raw
1use crate::{CLIENT, Dt, ExportPage, Op, OpKey, SeqOp, SeqPage}; 2use reqwest::Url; 3use std::time::Duration; 4use thiserror::Error; 5use tokio::sync::mpsc; 6 7#[derive(Debug, Error)] 8pub enum GetPageError { 9 #[error(transparent)] 10 Reqwest(#[from] reqwest::Error), 11 #[error(transparent)] 12 ReqwestMiddleware(#[from] reqwest_middleware::Error), 13 #[error(transparent)] 14 Serde(#[from] serde_json::Error), 15} 16 17/// ops are primary-keyed by (did, cid) 18/// plc orders by `created_at` but does not guarantee distinct times per op 19/// we assume that the order will at least be deterministic: this may be unsound 20#[derive(Debug, PartialEq)] 21pub struct LastOp { 22 pub created_at: Dt, // any op greater is definitely not duplicated 23 pk: (String, String), // did, cid 24} 25 26impl From<Op> for LastOp { 27 fn from(op: Op) -> Self { 28 Self { 29 created_at: op.created_at, 30 pk: (op.did, op.cid), 31 } 32 } 33} 34 35impl From<&Op> for LastOp { 36 fn from(op: &Op) -> Self { 37 Self { 38 created_at: op.created_at, 39 pk: (op.did.clone(), op.cid.clone()), 40 } 41 } 42} 43 44// bit of a hack 45impl From<Dt> for LastOp { 46 fn from(dt: Dt) -> Self { 47 Self { 48 created_at: dt, 49 pk: ("".to_string(), "".to_string()), 50 } 51 } 52} 53 54/// State for removing duplicates ops between PLC export page boundaries 55#[derive(Debug, PartialEq)] 56pub struct PageBoundaryState { 57 /// The previous page's last timestamp 58 /// 59 /// Duplicate ops from /export only occur for the same exact timestamp 60 pub last_at: Dt, 61 /// The previous page's ops at its last timestamp 62 keys_at: Vec<OpKey>, // expected to ~always be length one 63} 64 65impl PageBoundaryState { 66 /// Initialize the boundary state with a PLC page 67 pub fn new(page: &ExportPage) -> Option<Self> { 68 // grab the very last op 69 let (last_at, last_key) = page.ops.last().map(|op| (op.created_at, op.into()))?; 70 71 // set initial state 72 let mut me = Self { 73 last_at, 74 keys_at: vec![last_key], 75 }; 76 77 // and make sure all keys at this time are captured from the back 78 me.capture_nth_last_at(page, last_at, 1); 79 80 Some(me) 81 } 82 /// Apply the deduplication and update state 83 /// 84 /// The beginning of the page will be modified to remove duplicates from the 85 /// previous page. 86 /// 87 /// The end of the page is inspected to update the deduplicator state for 88 /// the next page. 89 fn apply_to_next(&mut self, page: &mut ExportPage) { 90 // walk ops forward, kicking previously-seen ops until created_at advances 91 let to_remove: Vec<usize> = page 92 .ops 93 .iter() 94 .enumerate() 95 .take_while(|(_, op)| op.created_at == self.last_at) 96 .filter(|(_, op)| self.keys_at.contains(&(*op).into())) 97 .map(|(i, _)| i) 98 .collect(); 99 100 // actually remove them. last to first so indices don't shift 101 for dup_idx in to_remove.into_iter().rev() { 102 page.ops.remove(dup_idx); 103 } 104 105 // grab the very last op 106 let Some((last_at, last_key)) = page.ops.last().map(|op| (op.created_at, op.into())) else { 107 // there are no ops left? oop. bail. 108 // last_at and existing keys remain in tact 109 return; 110 }; 111 112 // reset state (as long as time actually moved forward on this page) 113 if last_at > self.last_at { 114 self.last_at = last_at; 115 self.keys_at = vec![last_key]; 116 } else { 117 // weird cases: either time didn't move (fine...) or went backwards (not fine) 118 assert_eq!(last_at, self.last_at, "time moved backwards on a page"); 119 self.keys_at.push(last_key); 120 } 121 // and make sure all keys at this time are captured from the back 122 self.capture_nth_last_at(page, last_at, 1); 123 } 124 125 /// walk backwards from 2nd last and collect keys until created_at changes 126 fn capture_nth_last_at(&mut self, page: &ExportPage, last_at: Dt, skips: usize) { 127 page.ops 128 .iter() 129 .rev() 130 .skip(skips) 131 .take_while(|op| op.created_at == last_at) 132 .for_each(|op| { 133 self.keys_at.push(op.into()); 134 }); 135 } 136} 137 138/// Get one PLC export page 139/// 140/// Extracts the final op so it can be used to fetch the following page 141pub async fn get_page(url: Url) -> Result<(ExportPage, Option<LastOp>), GetPageError> { 142 use futures::TryStreamExt; 143 use tokio::io::{AsyncBufReadExt, BufReader}; 144 use tokio_util::compat::FuturesAsyncReadCompatExt; 145 146 log::trace!("Getting page: {url}"); 147 148 let res = CLIENT.get(url).send().await?.error_for_status()?; 149 let stream = Box::pin( 150 res.bytes_stream() 151 .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)) 152 .into_async_read() 153 .compat(), 154 ); 155 156 let mut lines = BufReader::new(stream).lines(); 157 let mut ops = Vec::new(); 158 159 loop { 160 match lines.next_line().await { 161 Ok(Some(line)) => { 162 let line = line.trim(); 163 if line.is_empty() { 164 continue; 165 } 166 match serde_json::from_str::<Op>(line) { 167 Ok(op) => ops.push(op), 168 Err(e) => log::warn!("failed to parse op: {e} ({line})"), 169 } 170 } 171 Ok(None) => break, 172 Err(e) => { 173 log::warn!("transport error mid-page: {}; returning partial page", e); 174 break; 175 } 176 } 177 } 178 179 let last_op = ops.last().map(Into::into); 180 181 Ok((ExportPage { ops }, last_op)) 182} 183 184/// Poll an upstream PLC server for new ops 185/// 186/// Pages of operations are written to the `dest` channel. 187/// 188/// ```no_run 189/// # #[tokio::main] 190/// # async fn main() { 191/// use allegedly::{ExportPage, Op, poll_upstream}; 192/// 193/// let after = Some(chrono::Utc::now()); 194/// let upstream = "https://plc.wtf/export".parse().unwrap(); 195/// let throttle = std::time::Duration::from_millis(300); 196/// 197/// let (tx, mut rx) = tokio::sync::mpsc::channel(1); 198/// tokio::task::spawn(poll_upstream(after, upstream, throttle, tx)); 199/// 200/// while let Some(ExportPage { ops }) = rx.recv().await { 201/// println!("received {} plc ops", ops.len()); 202/// 203/// for Op { did, cid, operation, .. } in ops { 204/// // in this example we're alerting when changes are found for one 205/// // specific identity 206/// if did == "did:plc:hdhoaan3xa3jiuq4fg4mefid" { 207/// println!("Update found for {did}! cid={cid}\n -> operation: {}", operation.get()); 208/// } 209/// } 210/// } 211/// # } 212/// ``` 213pub async fn poll_upstream( 214 after: Option<Dt>, 215 base: Url, 216 throttle: Duration, 217 dest: mpsc::Sender<ExportPage>, 218) -> anyhow::Result<&'static str> { 219 log::info!("starting upstream poller at {base} after {after:?}"); 220 let mut tick = tokio::time::interval(throttle); 221 let mut prev_last: Option<LastOp> = after.map(Into::into); 222 let mut boundary_state: Option<PageBoundaryState> = None; 223 loop { 224 tick.tick().await; 225 226 let mut url = base.clone(); 227 if let Some(ref pl) = prev_last { 228 url.query_pairs_mut() 229 .append_pair("after", &pl.created_at.to_rfc3339()); 230 }; 231 232 let (mut page, next_last) = match get_page(url).await { 233 Ok(res) => res, 234 Err(e) => { 235 log::warn!("error polling upstream: {e}"); 236 continue; 237 } 238 }; 239 240 if let Some(ref mut state) = boundary_state { 241 state.apply_to_next(&mut page); 242 } else { 243 boundary_state = PageBoundaryState::new(&page); 244 } 245 if !page.is_empty() { 246 match dest.try_send(page) { 247 Ok(()) => {} 248 Err(mpsc::error::TrySendError::Full(page)) => { 249 log::warn!("export: destination channel full, awaiting..."); 250 dest.send(page).await?; 251 } 252 e => e?, 253 }; 254 } 255 256 prev_last = next_last.or(prev_last); 257 } 258} 259 260/// Fetch one page of seq-based export from `/export?after=<seq>` 261async fn get_seq_page(url: Url) -> Result<SeqPage, GetPageError> { 262 use futures::TryStreamExt; 263 use tokio::io::{AsyncBufReadExt, BufReader}; 264 use tokio_util::compat::FuturesAsyncReadCompatExt; 265 266 log::trace!("getting seq page: {url}"); 267 268 let res = CLIENT.get(url).send().await?.error_for_status()?; 269 let stream = Box::pin( 270 res.bytes_stream() 271 .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)) 272 .into_async_read() 273 .compat(), 274 ); 275 276 let mut lines = BufReader::new(stream).lines(); 277 let mut ops = Vec::new(); 278 279 loop { 280 match lines.next_line().await { 281 Ok(Some(line)) => { 282 let line = line.trim(); 283 if line.is_empty() { 284 continue; 285 } 286 match serde_json::from_str::<SeqOp>(line) { 287 Ok(op) => ops.push(op), 288 Err(e) => log::warn!("failed to parse seq op: {e} ({line})"), 289 } 290 } 291 Ok(None) => break, 292 Err(e) => { 293 log::warn!( 294 "transport error mid-seq-page: {}; returning partial page", 295 e 296 ); 297 break; 298 } 299 } 300 } 301 302 Ok(SeqPage { ops }) 303} 304 305/// Poll an upstream PLC server using seq-number-based cursoring 306/// 307/// Uses `/export?after=<seq>` — each op from the server carries a `seq` field 308/// which is a globally monotonic unsigned integer. Because seq is unique per op 309/// there is no need for page-boundary deduplication. 310/// 311/// Pages are sent to `dest`. Returns when the channel closes. 312pub async fn poll_upstream_seq( 313 after: Option<u64>, 314 base: Url, 315 throttle: Duration, 316 dest: mpsc::Sender<SeqPage>, 317) -> anyhow::Result<&'static str> { 318 log::info!("starting seq upstream poller at {base} after {after:?}"); 319 let mut tick = tokio::time::interval(throttle); 320 let mut last_seq: u64 = after.unwrap_or(0); 321 322 loop { 323 tick.tick().await; 324 325 let mut url = base.clone(); 326 url.query_pairs_mut() 327 .append_pair("after", &last_seq.to_string()); 328 329 let page = match get_seq_page(url).await { 330 Ok(p) => p, 331 Err(e) => { 332 log::warn!("error polling upstream (seq): {e}"); 333 continue; 334 } 335 }; 336 337 if let Some(last) = page.ops.last() { 338 last_seq = last.seq; 339 } 340 341 if !page.is_empty() { 342 match dest.try_send(page) { 343 Ok(()) => {} 344 Err(mpsc::error::TrySendError::Full(page)) => { 345 log::warn!("seq poll: destination channel full, awaiting..."); 346 dest.send(page).await?; 347 } 348 e => e?, 349 }; 350 } 351 } 352} 353 354/// Tail the upstream PLC `/export/stream` WebSocket endpoint 355/// 356/// `cursor` is a seq number to resume from. The server only supports backfill 357/// of up to ~1 week (server-configurable), so this cannot replay from seq 0. 358/// Use `poll_upstream_seq` to catch up first, then hand off to this function. 359/// 360/// Messages arrive as single-op `SeqPage`s sent to `dest`. Returns on 361/// disconnect so the caller can reconnect or fall back to polling. 362pub async fn tail_upstream_stream( 363 cursor: Option<u64>, 364 base: Url, 365 dest: mpsc::Sender<SeqPage>, 366) -> anyhow::Result<()> { 367 use futures::StreamExt; 368 use tokio_tungstenite::{connect_async, tungstenite::Message}; 369 370 let mut url = base.clone(); 371 // convert ws(s):// scheme if needed; some callers pass http(s):// 372 let ws_scheme = match url.scheme() { 373 "https" => "wss", 374 "http" => "ws", 375 _ => "ws", 376 } 377 .to_owned(); 378 url.set_scheme(&ws_scheme) 379 .map_err(|_| anyhow::anyhow!("failed to set websocket scheme"))?; 380 if let Some(seq) = cursor { 381 url.query_pairs_mut() 382 .append_pair("cursor", &seq.to_string()); 383 } 384 385 log::info!("connecting to stream: {url}"); 386 let (mut ws, _) = connect_async(url.as_str()).await?; 387 log::info!("stream connected"); 388 389 while let Some(msg) = ws.next().await { 390 let msg = msg?; 391 let text = match msg { 392 Message::Text(t) => t, 393 Message::Close(_) => { 394 log::info!("stream closed by server"); 395 break; 396 } 397 _ => continue, 398 }; 399 400 let op: SeqOp = match serde_json::from_str(&text) { 401 Ok(op) => op, 402 Err(e) => { 403 log::warn!("failed to parse stream event: {e} ({text})"); 404 continue; 405 } 406 }; 407 408 let page = SeqPage { ops: vec![op] }; 409 if dest.send(page).await.is_err() { 410 log::info!("stream dest channel closed, stopping"); 411 break; 412 } 413 } 414 415 Ok(()) 416} 417 418#[cfg(test)] 419mod test { 420 use super::*; 421 422 const FIVES_TS: i64 = 1431648000; 423 const NEXT_TS: i64 = 1431648001; 424 425 fn valid_op() -> Op { 426 serde_json::from_value(serde_json::json!({ 427 "did": "did", 428 "cid": "cid", 429 "createdAt": "2015-05-15T00:00:00Z", 430 "nullified": false, 431 "operation": {}, 432 })) 433 .unwrap() 434 } 435 436 fn next_op() -> Op { 437 serde_json::from_value(serde_json::json!({ 438 "did": "didnext", 439 "cid": "cidnext", 440 "createdAt": "2015-05-15T00:00:01Z", 441 "nullified": false, 442 "operation": {}, 443 })) 444 .unwrap() 445 } 446 447 fn base_state() -> PageBoundaryState { 448 let page = ExportPage { 449 ops: vec![valid_op()], 450 }; 451 PageBoundaryState::new(&page).expect("to have a base page boundary state") 452 } 453 454 #[test] 455 fn test_boundary_new_empty() { 456 let page = ExportPage { ops: vec![] }; 457 let state = PageBoundaryState::new(&page); 458 assert!(state.is_none()); 459 } 460 461 #[test] 462 fn test_boundary_new_one_op() { 463 let page = ExportPage { 464 ops: vec![valid_op()], 465 }; 466 let state = PageBoundaryState::new(&page).unwrap(); 467 assert_eq!(state.last_at, Dt::from_timestamp(FIVES_TS, 0).unwrap()); 468 assert_eq!( 469 state.keys_at, 470 vec![OpKey { 471 cid: "cid".to_string(), 472 did: "did".to_string(), 473 }] 474 ); 475 } 476 477 #[test] 478 fn test_add_new_empty() { 479 let mut state = base_state(); 480 state.apply_to_next(&mut ExportPage { ops: vec![] }); 481 assert_eq!(state, base_state()); 482 } 483 484 #[test] 485 fn test_add_new_same_op() { 486 let mut page = ExportPage { 487 ops: vec![valid_op()], 488 }; 489 let mut state = base_state(); 490 state.apply_to_next(&mut page); 491 assert_eq!(state, base_state()); 492 } 493 494 #[test] 495 fn test_add_new_same_time() { 496 // make an op with a different OpKey 497 let mut op = valid_op(); 498 op.cid = "cid2".to_string(); 499 let mut page = ExportPage { ops: vec![op] }; 500 501 let mut state = base_state(); 502 state.apply_to_next(&mut page); 503 assert_eq!(state.last_at, Dt::from_timestamp(FIVES_TS, 0).unwrap()); 504 assert_eq!( 505 state.keys_at, 506 vec![ 507 OpKey { 508 cid: "cid".to_string(), 509 did: "did".to_string(), 510 }, 511 OpKey { 512 cid: "cid2".to_string(), 513 did: "did".to_string(), 514 }, 515 ] 516 ); 517 } 518 519 #[test] 520 fn test_add_new_same_time_dup_before() { 521 // make an op with a different OpKey 522 let mut op = valid_op(); 523 op.cid = "cid2".to_string(); 524 let mut page = ExportPage { 525 ops: vec![valid_op(), op], 526 }; 527 528 let mut state = base_state(); 529 state.apply_to_next(&mut page); 530 assert_eq!(state.last_at, Dt::from_timestamp(FIVES_TS, 0).unwrap()); 531 assert_eq!( 532 state.keys_at, 533 vec![ 534 OpKey { 535 cid: "cid".to_string(), 536 did: "did".to_string(), 537 }, 538 OpKey { 539 cid: "cid2".to_string(), 540 did: "did".to_string(), 541 }, 542 ] 543 ); 544 } 545 546 #[test] 547 fn test_add_new_same_time_dup_after() { 548 // make an op with a different OpKey 549 let mut op = valid_op(); 550 op.cid = "cid2".to_string(); 551 let mut page = ExportPage { 552 ops: vec![op, valid_op()], 553 }; 554 555 let mut state = base_state(); 556 state.apply_to_next(&mut page); 557 assert_eq!(state.last_at, Dt::from_timestamp(FIVES_TS, 0).unwrap()); 558 assert_eq!( 559 state.keys_at, 560 vec![ 561 OpKey { 562 cid: "cid".to_string(), 563 did: "did".to_string(), 564 }, 565 OpKey { 566 cid: "cid2".to_string(), 567 did: "did".to_string(), 568 }, 569 ] 570 ); 571 } 572 573 #[test] 574 fn test_add_new_next_time() { 575 let mut page = ExportPage { 576 ops: vec![next_op()], 577 }; 578 let mut state = base_state(); 579 state.apply_to_next(&mut page); 580 assert_eq!(state.last_at, Dt::from_timestamp(NEXT_TS, 0).unwrap()); 581 assert_eq!( 582 state.keys_at, 583 vec![OpKey { 584 cid: "cidnext".to_string(), 585 did: "didnext".to_string(), 586 },] 587 ); 588 } 589 590 #[test] 591 fn test_add_new_next_time_with_dup() { 592 let mut page = ExportPage { 593 ops: vec![valid_op(), next_op()], 594 }; 595 let mut state = base_state(); 596 state.apply_to_next(&mut page); 597 assert_eq!(state.last_at, Dt::from_timestamp(NEXT_TS, 0).unwrap()); 598 assert_eq!( 599 state.keys_at, 600 vec![OpKey { 601 cid: "cidnext".to_string(), 602 did: "didnext".to_string(), 603 },] 604 ); 605 assert_eq!(page.ops.len(), 1); 606 assert_eq!(page.ops[0], next_op()); 607 } 608 609 #[test] 610 fn test_add_new_next_time_with_dup_and_new_prev_same_time() { 611 // make an op with a different OpKey 612 let mut op = valid_op(); 613 op.cid = "cid2".to_string(); 614 615 let mut page = ExportPage { 616 ops: vec![ 617 valid_op(), // should get dropped 618 op.clone(), // should be kept 619 next_op(), 620 ], 621 }; 622 let mut state = base_state(); 623 state.apply_to_next(&mut page); 624 assert_eq!(state.last_at, Dt::from_timestamp(NEXT_TS, 0).unwrap()); 625 assert_eq!( 626 state.keys_at, 627 vec![OpKey { 628 cid: "cidnext".to_string(), 629 did: "didnext".to_string(), 630 },] 631 ); 632 assert_eq!(page.ops.len(), 2); 633 assert_eq!(page.ops[0], op); 634 assert_eq!(page.ops[1], next_op()); 635 } 636 637 #[test] 638 fn test_add_new_next_time_with_dup_later_and_new_prev_same_time() { 639 // make an op with a different OpKey 640 let mut op = valid_op(); 641 op.cid = "cid2".to_string(); 642 643 let mut page = ExportPage { 644 ops: vec![ 645 op.clone(), // should be kept 646 valid_op(), // should get dropped 647 next_op(), 648 ], 649 }; 650 let mut state = base_state(); 651 state.apply_to_next(&mut page); 652 assert_eq!(state.last_at, Dt::from_timestamp(NEXT_TS, 0).unwrap()); 653 assert_eq!( 654 state.keys_at, 655 vec![OpKey { 656 cid: "cidnext".to_string(), 657 did: "didnext".to_string(), 658 },] 659 ); 660 assert_eq!(page.ops.len(), 2); 661 assert_eq!(page.ops[0], op); 662 assert_eq!(page.ops[1], next_op()); 663 } 664}