Server tools to backfill, tail, mirror, and verify PLC logs

fjall: export streams the ops instead of collecting them

ptr.pet 74af90cd f6072bfc

verified
+13 -5
+13 -5
src/mirror/fjall.rs
··· 276 276 let limit = 1000; 277 277 let db = fjall.clone(); 278 278 279 - let ops = spawn_blocking(move || { 279 + let (tx, rx) = tokio::sync::mpsc::channel::<anyhow::Result<_>>(64); 280 + 281 + tokio::task::spawn_blocking(move || { 280 282 let iter = db.export_ops((after + 1)..)?; 281 - iter.take(limit).collect::<anyhow::Result<Vec<_>>>() 282 - }) 283 - .await?; 283 + for op in iter.take(limit) { 284 + if tx.blocking_send(op).is_err() { 285 + break; 286 + } 287 + } 288 + anyhow::Ok(()) 289 + }); 284 290 285 - let stream = futures::stream::iter(ops).map(|op| { 291 + // todo: its a bit annoying that errors just cut it off here... 292 + let stream = tokio_stream::wrappers::ReceiverStream::new(rx).map(|result| { 293 + let op = result.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; 286 294 let mut json = serde_json::to_string(&op.to_sequenced_json()).unwrap(); 287 295 json.push('\n'); 288 296 Ok::<_, std::io::Error>(json)