Server tools to backfill, tail, mirror, and verify PLC logs

tests for the horrible boundary dedup

+399 -10
+399 -10
src/poll.rs
··· 43 } 44 45 /// PLC 46 struct PageBoundaryState { 47 last_at: Dt, 48 keys_at: Vec<OpKey>, // expected to ~always be length one ··· 64 // should unrefactor to make Op own its data again, parse (and deal with errors) 65 // upfront, and probably greatly simplify everything downstream. simple. 66 impl PageBoundaryState { 67 - fn new(page: &mut ExportPage) -> Option<Self> { 68 // grab the very last op 69 let (last_at, last_key) = loop { 70 - let Some(s) = page.ops.last().cloned() else { 71 // there are no ops left? oop. bail. 72 // last_at and existing keys remain in tact if there was no later op 73 return None; 74 }; 75 if s.is_empty() { 76 - // annoying: trim off any trailing blank lines 77 - page.ops.pop(); 78 continue; 79 } 80 let Ok(op) = serde_json::from_str::<Op>(&s) 81 .inspect_err(|e| log::warn!("deduplication failed last op parsing ({s:?}: {e}), ignoring for downstream to deal with.")) 82 else { 83 // doubly annoying: skip over trailing garbage?? 84 continue; 85 }; 86 break (op.created_at, Into::<OpKey>::into(&op)); ··· 93 }; 94 95 // and make sure all keys at this time are captured from the back 96 - me.capture_nth_last_at(page, last_at); 97 98 Some(me) 99 } ··· 119 } 120 121 // grab the very last op 122 let (last_at, last_key) = loop { 123 - let Some(s) = page.ops.last().cloned() else { 124 // there are no ops left? oop. bail. 125 // last_at and existing keys remain in tact if there was no later op 126 return; 127 }; 128 if s.is_empty() { 129 // annoying: trim off any trailing blank lines 130 - page.ops.pop(); 131 continue; 132 } 133 let Ok(op) = serde_json::from_str::<Op>(&s) 134 .inspect_err(|e| log::warn!("deduplication failed last op parsing ({s:?}: {e}), ignoring for downstream to deal with.")) 135 else { 136 // doubly annoying: skip over trailing garbage?? 137 continue; 138 }; 139 break (op.created_at, Into::<OpKey>::into(&op)); ··· 146 } else { 147 // weird cases: either time didn't move (fine...) or went backwards (not fine) 148 assert_eq!(last_at, self.last_at, "time moved backwards on a page"); 149 } 150 // and make sure all keys at this time are captured from the back 151 - self.capture_nth_last_at(page, last_at); 152 } 153 154 /// walk backwards from 2nd last and collect keys until created_at changes 155 - fn capture_nth_last_at(&mut self, page: &mut ExportPage, last_at: Dt) { 156 page.ops 157 .iter() 158 .rev() 159 .skip(1) // we alredy added the very last one 160 .map(|s| serde_json::from_str::<Op>(s).inspect_err(|e| 161 log::warn!("deduplication failed op parsing ({s:?}: {e}), bailing for downstream to deal with."))) ··· 218 if let Some(ref mut state) = boundary_state { 219 state.apply_to_next(&mut page); 220 } else { 221 - boundary_state = PageBoundaryState::new(&mut page); 222 } 223 if !page.is_empty() { 224 match dest.try_send(page) { ··· 234 prev_last = next_last.or(prev_last); 235 } 236 }
··· 43 } 44 45 /// PLC 46 + #[derive(Debug, PartialEq)] 47 struct PageBoundaryState { 48 last_at: Dt, 49 keys_at: Vec<OpKey>, // expected to ~always be length one ··· 65 // should unrefactor to make Op own its data again, parse (and deal with errors) 66 // upfront, and probably greatly simplify everything downstream. simple. 67 impl PageBoundaryState { 68 + fn new(page: &ExportPage) -> Option<Self> { 69 + let mut skips = 0; 70 + 71 // grab the very last op 72 let (last_at, last_key) = loop { 73 + let Some(s) = page.ops.iter().rev().nth(skips).cloned() else { 74 // there are no ops left? oop. bail. 75 // last_at and existing keys remain in tact if there was no later op 76 return None; 77 }; 78 if s.is_empty() { 79 + // annoying: ignore any trailing blank lines 80 + skips += 1; 81 continue; 82 } 83 let Ok(op) = serde_json::from_str::<Op>(&s) 84 .inspect_err(|e| log::warn!("deduplication failed last op parsing ({s:?}: {e}), ignoring for downstream to deal with.")) 85 else { 86 // doubly annoying: skip over trailing garbage?? 87 + skips += 1; 88 continue; 89 }; 90 break (op.created_at, Into::<OpKey>::into(&op)); ··· 97 }; 98 99 // and make sure all keys at this time are captured from the back 100 + me.capture_nth_last_at(page, last_at, skips); 101 102 Some(me) 103 } ··· 123 } 124 125 // grab the very last op 126 + let mut skips = 0; 127 let (last_at, last_key) = loop { 128 + let Some(s) = page.ops.iter().rev().nth(skips).cloned() else { 129 // there are no ops left? oop. bail. 130 // last_at and existing keys remain in tact if there was no later op 131 return; 132 }; 133 if s.is_empty() { 134 // annoying: trim off any trailing blank lines 135 + skips += 1; 136 continue; 137 } 138 let Ok(op) = serde_json::from_str::<Op>(&s) 139 .inspect_err(|e| log::warn!("deduplication failed last op parsing ({s:?}: {e}), ignoring for downstream to deal with.")) 140 else { 141 // doubly annoying: skip over trailing garbage?? 142 + skips += 1; 143 continue; 144 }; 145 break (op.created_at, Into::<OpKey>::into(&op)); ··· 152 } else { 153 // weird cases: either time didn't move (fine...) or went backwards (not fine) 154 assert_eq!(last_at, self.last_at, "time moved backwards on a page"); 155 + self.keys_at.push(last_key); 156 } 157 // and make sure all keys at this time are captured from the back 158 + self.capture_nth_last_at(page, last_at, skips); 159 } 160 161 /// walk backwards from 2nd last and collect keys until created_at changes 162 + fn capture_nth_last_at(&mut self, page: &ExportPage, last_at: Dt, skips: usize) { 163 page.ops 164 .iter() 165 .rev() 166 + .skip(skips) 167 .skip(1) // we alredy added the very last one 168 .map(|s| serde_json::from_str::<Op>(s).inspect_err(|e| 169 log::warn!("deduplication failed op parsing ({s:?}: {e}), bailing for downstream to deal with."))) ··· 226 if let Some(ref mut state) = boundary_state { 227 state.apply_to_next(&mut page); 228 } else { 229 + boundary_state = PageBoundaryState::new(&page); 230 } 231 if !page.is_empty() { 232 match dest.try_send(page) { ··· 242 prev_last = next_last.or(prev_last); 243 } 244 } 245 + 246 + #[cfg(test)] 247 + mod test { 248 + use super::*; 249 + 250 + const FIVES_TS: i64 = 1431648000; 251 + const NEXT_TS: i64 = 1431648001; 252 + 253 + fn valid_op() -> serde_json::Value { 254 + serde_json::json!({ 255 + "did": "did", 256 + "cid": "cid", 257 + "createdAt": "2015-05-15T00:00:00Z", 258 + "nullified": false, 259 + "operation": {}, 260 + }) 261 + } 262 + 263 + fn next_op() -> serde_json::Value { 264 + serde_json::json!({ 265 + "did": "didnext", 266 + "cid": "cidnext", 267 + "createdAt": "2015-05-15T00:00:01Z", 268 + "nullified": false, 269 + "operation": {}, 270 + }) 271 + } 272 + 273 + fn base_state() -> PageBoundaryState { 274 + let page = ExportPage { 275 + ops: vec![valid_op().to_string()], 276 + }; 277 + PageBoundaryState::new(&page).unwrap() 278 + } 279 + 280 + #[test] 281 + fn test_boundary_new_empty() { 282 + let page = ExportPage { ops: vec![] }; 283 + let state = PageBoundaryState::new(&page); 284 + assert!(state.is_none()); 285 + } 286 + 287 + #[test] 288 + fn test_boundary_new_empty_op() { 289 + let page = ExportPage { 290 + ops: vec!["".to_string()], 291 + }; 292 + let state = PageBoundaryState::new(&page); 293 + assert!(state.is_none()); 294 + } 295 + 296 + #[test] 297 + fn test_boundary_new_ignores_bad_op() { 298 + let page = ExportPage { 299 + ops: vec!["bad".to_string()], 300 + }; 301 + let state = PageBoundaryState::new(&page); 302 + assert!(state.is_none()); 303 + } 304 + 305 + #[test] 306 + fn test_boundary_new_multiple_bad_end() { 307 + let page = ExportPage { 308 + ops: vec![ 309 + "bad".to_string(), 310 + "".to_string(), 311 + "foo".to_string(), 312 + "".to_string(), 313 + ], 314 + }; 315 + let state = PageBoundaryState::new(&page); 316 + assert!(state.is_none()); 317 + } 318 + 319 + #[test] 320 + fn test_boundary_new_one_op() { 321 + let page = ExportPage { 322 + ops: vec![valid_op().to_string()], 323 + }; 324 + let state = PageBoundaryState::new(&page).unwrap(); 325 + assert_eq!(state.last_at, Dt::from_timestamp(FIVES_TS, 0).unwrap()); 326 + assert_eq!( 327 + state.keys_at, 328 + vec![OpKey { 329 + cid: "cid".to_string(), 330 + did: "did".to_string(), 331 + }] 332 + ); 333 + } 334 + 335 + #[test] 336 + fn test_boundary_new_one_op_with_stuff() { 337 + let expect_same_state = |m, ops| { 338 + let this_state = PageBoundaryState::new(&ExportPage { ops }).unwrap(); 339 + assert_eq!(this_state, base_state(), "{}", m); 340 + }; 341 + 342 + expect_same_state("empty before", vec!["".to_string(), valid_op().to_string()]); 343 + 344 + expect_same_state("empty after", vec![valid_op().to_string(), "".to_string()]); 345 + 346 + expect_same_state( 347 + "bad before, empty after", 348 + vec!["bad".to_string(), valid_op().to_string(), "".to_string()], 349 + ); 350 + 351 + expect_same_state( 352 + "bad and empty before and after", 353 + vec![ 354 + "".to_string(), 355 + "bad".to_string(), 356 + valid_op().to_string(), 357 + "".to_string(), 358 + "bad".to_string(), 359 + ], 360 + ); 361 + } 362 + 363 + #[test] 364 + fn test_add_new_empty() { 365 + let mut state = base_state(); 366 + state.apply_to_next(&mut ExportPage { ops: vec![] }); 367 + assert_eq!(state, base_state()); 368 + } 369 + 370 + #[test] 371 + fn test_add_new_empty_op() { 372 + let mut state = base_state(); 373 + state.apply_to_next(&mut ExportPage { 374 + ops: vec!["".to_string()], 375 + }); 376 + assert_eq!(state, base_state()); 377 + } 378 + 379 + #[test] 380 + fn test_add_new_ignores_bad_op() { 381 + let mut state = base_state(); 382 + state.apply_to_next(&mut ExportPage { 383 + ops: vec!["bad".to_string()], 384 + }); 385 + assert_eq!(state, base_state()); 386 + } 387 + 388 + #[test] 389 + fn test_add_new_multiple_bad() { 390 + let mut page = ExportPage { 391 + ops: vec![ 392 + "bad".to_string(), 393 + "".to_string(), 394 + "foo".to_string(), 395 + "".to_string(), 396 + ], 397 + }; 398 + 399 + let mut state = base_state(); 400 + state.apply_to_next(&mut page); 401 + assert_eq!(state, base_state()); 402 + } 403 + 404 + #[test] 405 + fn test_add_new_same_op() { 406 + let mut page = ExportPage { 407 + ops: vec![valid_op().to_string()], 408 + }; 409 + let mut state = base_state(); 410 + state.apply_to_next(&mut page); 411 + assert_eq!(state, base_state()); 412 + } 413 + 414 + #[test] 415 + fn test_add_new_same_time() { 416 + // make an op with a different OpKey 417 + let mut op = valid_op(); 418 + op.as_object_mut() 419 + .unwrap() 420 + .insert("cid".to_string(), "cid2".into()); 421 + let mut page = ExportPage { 422 + ops: vec![op.to_string()], 423 + }; 424 + 425 + let mut state = base_state(); 426 + state.apply_to_next(&mut page); 427 + assert_eq!(state.last_at, Dt::from_timestamp(FIVES_TS, 0).unwrap()); 428 + assert_eq!( 429 + state.keys_at, 430 + vec![ 431 + OpKey { 432 + cid: "cid".to_string(), 433 + did: "did".to_string(), 434 + }, 435 + OpKey { 436 + cid: "cid2".to_string(), 437 + did: "did".to_string(), 438 + }, 439 + ] 440 + ); 441 + } 442 + 443 + #[test] 444 + fn test_add_new_same_time_dup_before() { 445 + // make an op with a different OpKey 446 + let mut op = valid_op(); 447 + op.as_object_mut() 448 + .unwrap() 449 + .insert("cid".to_string(), "cid2".into()); 450 + let mut page = ExportPage { 451 + ops: vec![valid_op().to_string(), op.to_string()], 452 + }; 453 + 454 + let mut state = base_state(); 455 + state.apply_to_next(&mut page); 456 + assert_eq!(state.last_at, Dt::from_timestamp(FIVES_TS, 0).unwrap()); 457 + assert_eq!( 458 + state.keys_at, 459 + vec![ 460 + OpKey { 461 + cid: "cid".to_string(), 462 + did: "did".to_string(), 463 + }, 464 + OpKey { 465 + cid: "cid2".to_string(), 466 + did: "did".to_string(), 467 + }, 468 + ] 469 + ); 470 + } 471 + 472 + #[test] 473 + fn test_add_new_same_time_dup_after() { 474 + // make an op with a different OpKey 475 + let mut op = valid_op(); 476 + op.as_object_mut() 477 + .unwrap() 478 + .insert("cid".to_string(), "cid2".into()); 479 + let mut page = ExportPage { 480 + ops: vec![op.to_string(), valid_op().to_string()], 481 + }; 482 + 483 + let mut state = base_state(); 484 + state.apply_to_next(&mut page); 485 + assert_eq!(state.last_at, Dt::from_timestamp(FIVES_TS, 0).unwrap()); 486 + assert_eq!( 487 + state.keys_at, 488 + vec![ 489 + OpKey { 490 + cid: "cid".to_string(), 491 + did: "did".to_string(), 492 + }, 493 + OpKey { 494 + cid: "cid2".to_string(), 495 + did: "did".to_string(), 496 + }, 497 + ] 498 + ); 499 + } 500 + 501 + #[test] 502 + fn test_add_new_same_time_blank_after() { 503 + // make an op with a different OpKey 504 + let mut op = valid_op(); 505 + op.as_object_mut() 506 + .unwrap() 507 + .insert("cid".to_string(), "cid2".into()); 508 + let mut page = ExportPage { 509 + ops: vec![op.to_string(), "".to_string()], 510 + }; 511 + 512 + let mut state = base_state(); 513 + state.apply_to_next(&mut page); 514 + assert_eq!(state.last_at, Dt::from_timestamp(FIVES_TS, 0).unwrap()); 515 + assert_eq!( 516 + state.keys_at, 517 + vec![ 518 + OpKey { 519 + cid: "cid".to_string(), 520 + did: "did".to_string(), 521 + }, 522 + OpKey { 523 + cid: "cid2".to_string(), 524 + did: "did".to_string(), 525 + }, 526 + ] 527 + ); 528 + } 529 + 530 + #[test] 531 + fn test_add_new_next_time() { 532 + let mut page = ExportPage { 533 + ops: vec![next_op().to_string()], 534 + }; 535 + let mut state = base_state(); 536 + state.apply_to_next(&mut page); 537 + assert_eq!(state.last_at, Dt::from_timestamp(NEXT_TS, 0).unwrap()); 538 + assert_eq!( 539 + state.keys_at, 540 + vec![OpKey { 541 + cid: "cidnext".to_string(), 542 + did: "didnext".to_string(), 543 + },] 544 + ); 545 + } 546 + 547 + #[test] 548 + fn test_add_new_next_time_with_dup() { 549 + let mut page = ExportPage { 550 + ops: vec![valid_op().to_string(), next_op().to_string()], 551 + }; 552 + let mut state = base_state(); 553 + state.apply_to_next(&mut page); 554 + assert_eq!(state.last_at, Dt::from_timestamp(NEXT_TS, 0).unwrap()); 555 + assert_eq!( 556 + state.keys_at, 557 + vec![OpKey { 558 + cid: "cidnext".to_string(), 559 + did: "didnext".to_string(), 560 + },] 561 + ); 562 + assert_eq!(page.ops.len(), 1); 563 + assert_eq!(page.ops[0], next_op().to_string()); 564 + } 565 + 566 + #[test] 567 + fn test_add_new_next_time_with_dup_and_new_prev_same_time() { 568 + // make an op with a different OpKey 569 + let mut op = valid_op(); 570 + op.as_object_mut() 571 + .unwrap() 572 + .insert("cid".to_string(), "cid2".into()); 573 + 574 + let mut page = ExportPage { 575 + ops: vec![ 576 + valid_op().to_string(), // should get dropped 577 + op.to_string(), // should be kept 578 + next_op().to_string(), 579 + ], 580 + }; 581 + let mut state = base_state(); 582 + state.apply_to_next(&mut page); 583 + assert_eq!(state.last_at, Dt::from_timestamp(NEXT_TS, 0).unwrap()); 584 + assert_eq!( 585 + state.keys_at, 586 + vec![OpKey { 587 + cid: "cidnext".to_string(), 588 + did: "didnext".to_string(), 589 + },] 590 + ); 591 + assert_eq!(page.ops.len(), 2); 592 + assert_eq!(page.ops[0], op.to_string()); 593 + assert_eq!(page.ops[1], next_op().to_string()); 594 + } 595 + 596 + #[test] 597 + fn test_add_new_next_time_with_dup_later_and_new_prev_same_time() { 598 + // make an op with a different OpKey 599 + let mut op = valid_op(); 600 + op.as_object_mut() 601 + .unwrap() 602 + .insert("cid".to_string(), "cid2".into()); 603 + 604 + let mut page = ExportPage { 605 + ops: vec![ 606 + op.to_string(), // should be kept 607 + valid_op().to_string(), // should get dropped 608 + next_op().to_string(), 609 + ], 610 + }; 611 + let mut state = base_state(); 612 + state.apply_to_next(&mut page); 613 + assert_eq!(state.last_at, Dt::from_timestamp(NEXT_TS, 0).unwrap()); 614 + assert_eq!( 615 + state.keys_at, 616 + vec![OpKey { 617 + cid: "cidnext".to_string(), 618 + did: "didnext".to_string(), 619 + },] 620 + ); 621 + assert_eq!(page.ops.len(), 2); 622 + assert_eq!(page.ops[0], op.to_string()); 623 + assert_eq!(page.ops[1], next_op().to_string()); 624 + } 625 + }