Server tools to backfill, tail, mirror, and verify PLC logs

deserialize op early and into owned

it's *fine* but i don't think it's guaranteed that serde_json will round-trip the object structure exactly, and that might even be unlikely.

in general this shouldn't be a problem, since logs will still be exactly verifiable compared to upstream by canonicalizing into dag-cbor etc, but that's annoying.

leaving it for now but once we get there, we can:

- keep a copy of the original string beside the op (~2x memory? probably fine?) or
- use some rust trickery to let us keep the original string and put the parsed thing as a referencing struct next to it. (oroborus etc)

+106 -290
+1 -2
src/backfill.rs
··· 52 52 53 53 // wait for the big backfill to finish 54 54 while let Some(res) = workers.join_next().await { 55 - res 56 - .inspect_err(|e| log::error!("problem joining source workers: {e}"))? 55 + res.inspect_err(|e| log::error!("problem joining source workers: {e}"))? 57 56 .inspect_err(|e| log::error!("problem *from* source worker: {e}"))?; 58 57 } 59 58 log::info!("finished fetching backfill in {:?}", t_step.elapsed());
+1 -1
src/bin/allegedly.rs
··· 125 125 let mut last_at = None; 126 126 while let Some(page) = rx.recv().await { 127 127 for op in &page.ops { 128 - println!("{op}"); 128 + println!("{}", serde_json::to_string(op)?); 129 129 } 130 130 if notify_last_at.is_some() 131 131 && let Some(s) = PageBoundaryState::new(&page)
+21 -10
src/lib.rs
··· 1 - use serde::Deserialize; 1 + use serde::{Deserialize, Serialize}; 2 2 3 3 mod backfill; 4 4 mod client; ··· 23 23 /// plc.directory caps /export at 1000 ops; backfill tasks may send more in a page. 24 24 #[derive(Debug)] 25 25 pub struct ExportPage { 26 - pub ops: Vec<String>, 26 + pub ops: Vec<Op>, 27 27 } 28 28 29 29 impl ExportPage { ··· 35 35 /// A fully-deserialized plc operation 36 36 /// 37 37 /// including the plc's wrapping with timestmap and nullified state 38 - #[derive(Debug, Deserialize)] 38 + #[derive(Debug, Clone, Deserialize, Serialize)] 39 39 #[serde(rename_all = "camelCase")] 40 - pub struct Op<'a> { 41 - pub did: &'a str, 42 - pub cid: &'a str, 40 + pub struct Op { 41 + pub did: String, 42 + pub cid: String, 43 43 pub created_at: Dt, 44 44 pub nullified: bool, 45 - #[serde(borrow)] 46 - pub operation: &'a serde_json::value::RawValue, 45 + pub operation: Box<serde_json::value::RawValue>, 46 + } 47 + 48 + #[cfg(test)] 49 + impl PartialEq for Op { 50 + fn eq(&self, other: &Self) -> bool { 51 + self.did == other.did 52 + && self.cid == other.cid 53 + && self.created_at == other.created_at 54 + && self.nullified == other.nullified 55 + && serde_json::from_str::<serde_json::Value>(self.operation.get()).unwrap() 56 + == serde_json::from_str::<serde_json::Value>(other.operation.get()).unwrap() 57 + } 47 58 } 48 59 49 60 /// Database primary key for an op ··· 53 64 pub cid: String, 54 65 } 55 66 56 - impl From<&Op<'_>> for OpKey { 57 - fn from(Op { did, cid, .. }: &Op<'_>) -> Self { 67 + impl From<&Op> for OpKey { 68 + fn from(Op { did, cid, .. }: &Op) -> Self { 58 69 Self { 59 70 did: did.to_string(), 60 71 cid: cid.to_string(),
+4 -12
src/plc_pg.rs
··· 1 - use crate::{Dt, ExportPage, Op, PageBoundaryState}; 1 + use crate::{Dt, ExportPage, PageBoundaryState}; 2 2 use native_tls::{Certificate, TlsConnector}; 3 3 use postgres_native_tls::MakeTlsConnector; 4 4 use std::path::PathBuf; ··· 154 154 while let Some(page) = pages.recv().await { 155 155 log::trace!("writing page with {} ops", page.ops.len()); 156 156 let tx = client.transaction().await?; 157 - for s in page.ops { 158 - let Ok(op) = serde_json::from_str::<Op>(&s) else { 159 - log::warn!("ignoring unparseable op {s:?}"); 160 - continue; 161 - }; 157 + for op in page.ops { 162 158 ops_inserted += tx 163 159 .execute( 164 160 &ops_stmt, ··· 258 254 let mut writer = pin!(BinaryCopyInWriter::new(sync, types)); 259 255 let mut last_at = None; 260 256 while let Some(page) = pages.recv().await { 261 - for s in &page.ops { 262 - let Ok(op) = serde_json::from_str::<Op>(s) else { 263 - log::warn!("ignoring unparseable op: {s:?}"); 264 - continue; 265 - }; 257 + for op in &page.ops { 266 258 writer 267 259 .as_mut() 268 260 .write(&[ 269 261 &op.did, 270 - &Json(op.operation), 262 + &Json(op.operation.clone()), 271 263 &op.cid, 272 264 &op.nullified, 273 265 &op.created_at,
+63 -252
src/poll.rs
··· 26 26 pk: (String, String), // did, cid 27 27 } 28 28 29 - impl From<Op<'_>> for LastOp { 29 + impl From<Op> for LastOp { 30 30 fn from(op: Op) -> Self { 31 31 Self { 32 32 created_at: op.created_at, 33 - pk: (op.did.to_string(), op.cid.to_string()), 33 + pk: (op.did, op.cid), 34 34 } 35 35 } 36 36 } 37 37 38 + impl From<&Op> for LastOp { 39 + fn from(op: &Op) -> Self { 40 + Self { 41 + created_at: op.created_at, 42 + pk: (op.did.clone(), op.cid.clone()), 43 + } 44 + } 45 + } 46 + 47 + // bit of a hack 38 48 impl From<Dt> for LastOp { 39 49 fn from(dt: Dt) -> Self { 40 50 Self { ··· 51 61 keys_at: Vec<OpKey>, // expected to ~always be length one 52 62 } 53 63 54 - // ok so this is silly. 55 - // 56 - // i think i had some idea that deferring parsing to later steps would make it 57 - // easier to do things like sometimes not parsing at all (where the output is 58 - // also json lines), and maybe avoid some memory shuffling. 59 - // but since the input already has to be split into lines, keeping them as line 60 - // strings is probably the worst option: space-inefficient, allows garbage, and 61 - // leads to, well, this impl. 62 - // 63 - // it almost could have been slick if the *original* was just reused, and the 64 - // parsed ops were just kind of on the side referencing into it, but i'm lazy 65 - // and didn't get it there. 66 - // 67 - // should unrefactor to make Op own its data again, parse (and deal with errors) 68 - // upfront, and probably greatly simplify everything downstream. simple. 64 + /// track keys at final createdAt to deduplicate the start of the next page 69 65 impl PageBoundaryState { 70 66 pub fn new(page: &ExportPage) -> Option<Self> { 71 - let mut skips = 0; 72 - 73 67 // grab the very last op 74 - let (last_at, last_key) = loop { 75 - let Some(s) = page.ops.iter().rev().nth(skips).cloned() else { 76 - // there are no ops left? oop. bail. 77 - // last_at and existing keys remain in tact if there was no later op 78 - return None; 79 - }; 80 - if s.is_empty() { 81 - // annoying: ignore any trailing blank lines 82 - skips += 1; 83 - continue; 84 - } 85 - let Ok(op) = serde_json::from_str::<Op>(&s) 86 - .inspect_err(|e| log::warn!("deduplication failed last op parsing ({s:?}: {e}), ignoring for downstream to deal with.")) 87 - else { 88 - // doubly annoying: skip over trailing garbage?? 89 - skips += 1; 90 - continue; 91 - }; 92 - break (op.created_at, Into::<OpKey>::into(&op)); 93 - }; 68 + let (last_at, last_key) = page.ops.last().map(|op| (op.created_at, op.into()))?; 94 69 95 70 // set initial state 96 71 let mut me = Self { ··· 99 74 }; 100 75 101 76 // and make sure all keys at this time are captured from the back 102 - me.capture_nth_last_at(page, last_at, skips); 77 + me.capture_nth_last_at(page, last_at, 1); 103 78 104 79 Some(me) 105 80 } ··· 108 83 let to_remove: Vec<usize> = page 109 84 .ops 110 85 .iter() 111 - .map(|s| serde_json::from_str::<Op>(s).inspect_err(|e| 112 - log::warn!("deduplication failed op parsing ({s:?}: {e}), bailing for downstream to deal with."))) 113 86 .enumerate() 114 - .take_while(|(_, opr)| opr.as_ref().map(|op| op.created_at == self.last_at).unwrap_or(false)) 115 - .filter_map(|(i, opr)| { 116 - if self.keys_at.contains(&(&opr.expect("any Errs were filtered by take_while")).into()) { 117 - Some(i) 118 - } else { None } 119 - }) 87 + .take_while(|(_, op)| op.created_at == self.last_at) 88 + .filter(|(_, op)| self.keys_at.contains(&(*op).into())) 89 + .map(|(i, _)| i) 120 90 .collect(); 121 91 122 - // actually remove them. last to first to indices don't shift 92 + // actually remove them. last to first so indices don't shift 123 93 for dup_idx in to_remove.into_iter().rev() { 124 94 page.ops.remove(dup_idx); 125 95 } 126 96 127 97 // grab the very last op 128 - let mut skips = 0; 129 - let (last_at, last_key) = loop { 130 - let Some(s) = page.ops.iter().rev().nth(skips).cloned() else { 131 - // there are no ops left? oop. bail. 132 - // last_at and existing keys remain in tact if there was no later op 133 - return; 134 - }; 135 - if s.is_empty() { 136 - // annoying: trim off any trailing blank lines 137 - skips += 1; 138 - continue; 139 - } 140 - let Ok(op) = serde_json::from_str::<Op>(&s) 141 - .inspect_err(|e| log::warn!("deduplication failed last op parsing ({s:?}: {e}), ignoring for downstream to deal with.")) 142 - else { 143 - // doubly annoying: skip over trailing garbage?? 144 - skips += 1; 145 - continue; 146 - }; 147 - break (op.created_at, Into::<OpKey>::into(&op)); 98 + let Some((last_at, last_key)) = page.ops.last().map(|op| (op.created_at, op.into())) else { 99 + // there are no ops left? oop. bail. 100 + // last_at and existing keys remain in tact 101 + return; 148 102 }; 149 103 150 104 // reset state (as long as time actually moved forward on this page) ··· 157 111 self.keys_at.push(last_key); 158 112 } 159 113 // and make sure all keys at this time are captured from the back 160 - self.capture_nth_last_at(page, last_at, skips); 114 + self.capture_nth_last_at(page, last_at, 1); 161 115 } 162 116 163 117 /// walk backwards from 2nd last and collect keys until created_at changes ··· 166 120 .iter() 167 121 .rev() 168 122 .skip(skips) 169 - .skip(1) // we alredy added the very last one 170 - .map(|s| serde_json::from_str::<Op>(s).inspect_err(|e| 171 - log::warn!("deduplication failed op parsing ({s:?}: {e}), bailing for downstream to deal with."))) 172 - .take_while(|opr| opr.as_ref().map(|op| op.created_at == last_at).unwrap_or(false)) 173 - .for_each(|opr| { 174 - let op = &opr.expect("any Errs were filtered by take_while"); 123 + .take_while(|op| op.created_at == last_at) 124 + .for_each(|op| { 175 125 self.keys_at.push(op.into()); 176 126 }); 177 127 } ··· 180 130 pub async fn get_page(url: Url) -> Result<(ExportPage, Option<LastOp>), GetPageError> { 181 131 log::trace!("Getting page: {url}"); 182 132 183 - let ops: Vec<String> = CLIENT 133 + let ops: Vec<Op> = CLIENT 184 134 .get(url) 185 135 .send() 186 136 .await? ··· 190 140 .trim() 191 141 .split('\n') 192 142 .filter_map(|s| { 193 - let s = s.trim(); 194 - if s.is_empty() { None } else { Some(s) } 143 + serde_json::from_str::<Op>(s) 144 + .inspect_err(|e| log::warn!("failed to parse op: {e} ({s})")) 145 + .ok() 195 146 }) 196 - .map(Into::into) 197 147 .collect(); 198 148 199 - let last_op = ops 200 - .last() 201 - .filter(|s| !s.is_empty()) 202 - .map(|s| serde_json::from_str::<Op>(s)) 203 - .transpose()? 204 - .map(Into::into) 205 - .inspect(|at| log::trace!("new last op: {at:?}")); 149 + let last_op = ops.last().map(Into::into); 206 150 207 151 Ok((ExportPage { ops }, last_op)) 208 152 } ··· 252 196 const FIVES_TS: i64 = 1431648000; 253 197 const NEXT_TS: i64 = 1431648001; 254 198 255 - fn valid_op() -> serde_json::Value { 256 - serde_json::json!({ 199 + fn valid_op() -> Op { 200 + serde_json::from_value(serde_json::json!({ 257 201 "did": "did", 258 202 "cid": "cid", 259 203 "createdAt": "2015-05-15T00:00:00Z", 260 204 "nullified": false, 261 205 "operation": {}, 262 - }) 206 + })) 207 + .unwrap() 263 208 } 264 209 265 - fn next_op() -> serde_json::Value { 266 - serde_json::json!({ 210 + fn next_op() -> Op { 211 + serde_json::from_value(serde_json::json!({ 267 212 "did": "didnext", 268 213 "cid": "cidnext", 269 214 "createdAt": "2015-05-15T00:00:01Z", 270 215 "nullified": false, 271 216 "operation": {}, 272 - }) 217 + })) 218 + .unwrap() 273 219 } 274 220 275 221 fn base_state() -> PageBoundaryState { 276 222 let page = ExportPage { 277 - ops: vec![valid_op().to_string()], 223 + ops: vec![valid_op()], 278 224 }; 279 225 PageBoundaryState::new(&page).expect("to have a base page boundary state") 280 226 } ··· 287 233 } 288 234 289 235 #[test] 290 - fn test_boundary_new_empty_op() { 291 - let page = ExportPage { 292 - ops: vec!["".to_string()], 293 - }; 294 - let state = PageBoundaryState::new(&page); 295 - assert!(state.is_none()); 296 - } 297 - 298 - #[test] 299 - fn test_boundary_new_ignores_bad_op() { 300 - let page = ExportPage { 301 - ops: vec!["bad".to_string()], 302 - }; 303 - let state = PageBoundaryState::new(&page); 304 - assert!(state.is_none()); 305 - } 306 - 307 - #[test] 308 - fn test_boundary_new_multiple_bad_end() { 309 - let page = ExportPage { 310 - ops: vec![ 311 - "bad".to_string(), 312 - "".to_string(), 313 - "foo".to_string(), 314 - "".to_string(), 315 - ], 316 - }; 317 - let state = PageBoundaryState::new(&page); 318 - assert!(state.is_none()); 319 - } 320 - 321 - #[test] 322 236 fn test_boundary_new_one_op() { 323 237 let page = ExportPage { 324 - ops: vec![valid_op().to_string()], 238 + ops: vec![valid_op()], 325 239 }; 326 240 let state = PageBoundaryState::new(&page).unwrap(); 327 241 assert_eq!(state.last_at, Dt::from_timestamp(FIVES_TS, 0).unwrap()); ··· 335 249 } 336 250 337 251 #[test] 338 - fn test_boundary_new_one_op_with_stuff() { 339 - let expect_same_state = |m, ops| { 340 - let this_state = PageBoundaryState::new(&ExportPage { ops }).unwrap(); 341 - assert_eq!(this_state, base_state(), "{}", m); 342 - }; 343 - 344 - expect_same_state("empty before", vec!["".to_string(), valid_op().to_string()]); 345 - 346 - expect_same_state("empty after", vec![valid_op().to_string(), "".to_string()]); 347 - 348 - expect_same_state( 349 - "bad before, empty after", 350 - vec!["bad".to_string(), valid_op().to_string(), "".to_string()], 351 - ); 352 - 353 - expect_same_state( 354 - "bad and empty before and after", 355 - vec![ 356 - "".to_string(), 357 - "bad".to_string(), 358 - valid_op().to_string(), 359 - "".to_string(), 360 - "bad".to_string(), 361 - ], 362 - ); 363 - } 364 - 365 - #[test] 366 252 fn test_add_new_empty() { 367 253 let mut state = base_state(); 368 254 state.apply_to_next(&mut ExportPage { ops: vec![] }); ··· 370 256 } 371 257 372 258 #[test] 373 - fn test_add_new_empty_op() { 374 - let mut state = base_state(); 375 - state.apply_to_next(&mut ExportPage { 376 - ops: vec!["".to_string()], 377 - }); 378 - assert_eq!(state, base_state()); 379 - } 380 - 381 - #[test] 382 - fn test_add_new_ignores_bad_op() { 383 - let mut state = base_state(); 384 - state.apply_to_next(&mut ExportPage { 385 - ops: vec!["bad".to_string()], 386 - }); 387 - assert_eq!(state, base_state()); 388 - } 389 - 390 - #[test] 391 - fn test_add_new_multiple_bad() { 392 - let mut page = ExportPage { 393 - ops: vec![ 394 - "bad".to_string(), 395 - "".to_string(), 396 - "foo".to_string(), 397 - "".to_string(), 398 - ], 399 - }; 400 - 401 - let mut state = base_state(); 402 - state.apply_to_next(&mut page); 403 - assert_eq!(state, base_state()); 404 - } 405 - 406 - #[test] 407 259 fn test_add_new_same_op() { 408 260 let mut page = ExportPage { 409 - ops: vec![valid_op().to_string()], 261 + ops: vec![valid_op()], 410 262 }; 411 263 let mut state = base_state(); 412 264 state.apply_to_next(&mut page); ··· 417 269 fn test_add_new_same_time() { 418 270 // make an op with a different OpKey 419 271 let mut op = valid_op(); 420 - op.as_object_mut() 421 - .unwrap() 422 - .insert("cid".to_string(), "cid2".into()); 423 - let mut page = ExportPage { 424 - ops: vec![op.to_string()], 425 - }; 272 + op.cid = "cid2".to_string(); 273 + let mut page = ExportPage { ops: vec![op] }; 426 274 427 275 let mut state = base_state(); 428 276 state.apply_to_next(&mut page); ··· 446 294 fn test_add_new_same_time_dup_before() { 447 295 // make an op with a different OpKey 448 296 let mut op = valid_op(); 449 - op.as_object_mut() 450 - .unwrap() 451 - .insert("cid".to_string(), "cid2".into()); 297 + op.cid = "cid2".to_string(); 452 298 let mut page = ExportPage { 453 - ops: vec![valid_op().to_string(), op.to_string()], 299 + ops: vec![valid_op(), op], 454 300 }; 455 301 456 302 let mut state = base_state(); ··· 475 321 fn test_add_new_same_time_dup_after() { 476 322 // make an op with a different OpKey 477 323 let mut op = valid_op(); 478 - op.as_object_mut() 479 - .unwrap() 480 - .insert("cid".to_string(), "cid2".into()); 481 - let mut page = ExportPage { 482 - ops: vec![op.to_string(), valid_op().to_string()], 483 - }; 484 - 485 - let mut state = base_state(); 486 - state.apply_to_next(&mut page); 487 - assert_eq!(state.last_at, Dt::from_timestamp(FIVES_TS, 0).unwrap()); 488 - assert_eq!( 489 - state.keys_at, 490 - vec![ 491 - OpKey { 492 - cid: "cid".to_string(), 493 - did: "did".to_string(), 494 - }, 495 - OpKey { 496 - cid: "cid2".to_string(), 497 - did: "did".to_string(), 498 - }, 499 - ] 500 - ); 501 - } 502 - 503 - #[test] 504 - fn test_add_new_same_time_blank_after() { 505 - // make an op with a different OpKey 506 - let mut op = valid_op(); 507 - op.as_object_mut() 508 - .unwrap() 509 - .insert("cid".to_string(), "cid2".into()); 324 + op.cid = "cid2".to_string(); 510 325 let mut page = ExportPage { 511 - ops: vec![op.to_string(), "".to_string()], 326 + ops: vec![op, valid_op()], 512 327 }; 513 328 514 329 let mut state = base_state(); ··· 532 347 #[test] 533 348 fn test_add_new_next_time() { 534 349 let mut page = ExportPage { 535 - ops: vec![next_op().to_string()], 350 + ops: vec![next_op()], 536 351 }; 537 352 let mut state = base_state(); 538 353 state.apply_to_next(&mut page); ··· 549 364 #[test] 550 365 fn test_add_new_next_time_with_dup() { 551 366 let mut page = ExportPage { 552 - ops: vec![valid_op().to_string(), next_op().to_string()], 367 + ops: vec![valid_op(), next_op()], 553 368 }; 554 369 let mut state = base_state(); 555 370 state.apply_to_next(&mut page); ··· 562 377 },] 563 378 ); 564 379 assert_eq!(page.ops.len(), 1); 565 - assert_eq!(page.ops[0], next_op().to_string()); 380 + assert_eq!(page.ops[0], next_op()); 566 381 } 567 382 568 383 #[test] 569 384 fn test_add_new_next_time_with_dup_and_new_prev_same_time() { 570 385 // make an op with a different OpKey 571 386 let mut op = valid_op(); 572 - op.as_object_mut() 573 - .unwrap() 574 - .insert("cid".to_string(), "cid2".into()); 387 + op.cid = "cid2".to_string(); 575 388 576 389 let mut page = ExportPage { 577 390 ops: vec![ 578 - valid_op().to_string(), // should get dropped 579 - op.to_string(), // should be kept 580 - next_op().to_string(), 391 + valid_op(), // should get dropped 392 + op.clone(), // should be kept 393 + next_op(), 581 394 ], 582 395 }; 583 396 let mut state = base_state(); ··· 591 404 },] 592 405 ); 593 406 assert_eq!(page.ops.len(), 2); 594 - assert_eq!(page.ops[0], op.to_string()); 595 - assert_eq!(page.ops[1], next_op().to_string()); 407 + assert_eq!(page.ops[0], op); 408 + assert_eq!(page.ops[1], next_op()); 596 409 } 597 410 598 411 #[test] 599 412 fn test_add_new_next_time_with_dup_later_and_new_prev_same_time() { 600 413 // make an op with a different OpKey 601 414 let mut op = valid_op(); 602 - op.as_object_mut() 603 - .unwrap() 604 - .insert("cid".to_string(), "cid2".into()); 415 + op.cid = "cid2".to_string(); 605 416 606 417 let mut page = ExportPage { 607 418 ops: vec![ 608 - op.to_string(), // should be kept 609 - valid_op().to_string(), // should get dropped 610 - next_op().to_string(), 419 + op.clone(), // should be kept 420 + valid_op(), // should get dropped 421 + next_op(), 611 422 ], 612 423 }; 613 424 let mut state = base_state(); ··· 621 432 },] 622 433 ); 623 434 assert_eq!(page.ops.len(), 2); 624 - assert_eq!(page.ops[0], op.to_string()); 625 - assert_eq!(page.ops[1], next_op().to_string()); 435 + assert_eq!(page.ops[0], op); 436 + assert_eq!(page.ops[1], next_op()); 626 437 } 627 438 }
+16 -13
src/weekly.rs
··· 142 142 let mut week_t0 = total_t0; 143 143 144 144 while let Some(page) = rx.recv().await { 145 - for mut s in page.ops { 146 - let Ok(op) = serde_json::from_str::<Op>(&s) 147 - .inspect_err(|e| log::error!("failed to parse plc op, ignoring: {e}")) 148 - else { 149 - continue; 150 - }; 145 + for op in page.ops { 151 146 let op_week = op.created_at.into(); 152 147 if current_week.map(|w| w != op_week).unwrap_or(true) { 153 148 encoder.shutdown().await?; ··· 172 167 week_ops = 0; 173 168 week_t0 = now; 174 169 } 175 - s.push('\n'); // hack 176 - log::trace!("writing: {s}"); 177 - encoder.write_all(s.as_bytes()).await?; 170 + log::trace!("writing: {op:?}"); 171 + encoder 172 + .write_all(serde_json::to_string(&op)?.as_bytes()) 173 + .await?; 178 174 total_ops += 1; 179 175 week_ops += 1; 180 176 } ··· 201 197 dest: mpsc::Sender<ExportPage>, 202 198 ) -> anyhow::Result<()> { 203 199 use futures::TryStreamExt; 204 - let reader = source.reader_for(week) 200 + let reader = source 201 + .reader_for(week) 205 202 .await 206 203 .inspect_err(|e| log::error!("week_to_pages reader failed: {e}"))?; 207 204 let decoder = GzipDecoder::new(BufReader::new(reader)); ··· 212 209 .await 213 210 .inspect_err(|e| log::error!("failed to get next chunk: {e}"))? 214 211 { 215 - let ops: Vec<String> = chunk.into_iter().collect(); 212 + let ops: Vec<Op> = chunk 213 + .into_iter() 214 + .filter_map(|s| { 215 + serde_json::from_str::<Op>(&s) 216 + .inspect_err(|e| log::warn!("failed to parse op: {e} ({s})")) 217 + .ok() 218 + }) 219 + .collect(); 216 220 let page = ExportPage { ops }; 217 - dest 218 - .send(page) 221 + dest.send(page) 219 222 .await 220 223 .inspect_err(|e| log::error!("failed to send page: {e}"))?; 221 224 }