at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall

[api] parse directly to Value from the CBOR blocks, handle errors better

ptr.pet f38f450f ccc6a954

verified
+32 -15
+32 -15
src/api/stream.rs
··· 9 9 response::IntoResponse, 10 10 }; 11 11 use jacquard_common::CowStr; 12 - use jacquard_common::types::value::RawData; 13 12 use miette::{Context, IntoDiagnostic}; 14 13 use serde::Deserialize; 15 14 use std::sync::Arc; 16 15 use tokio::sync::{broadcast, mpsc}; 17 - use tracing::error; 16 + use tracing::{error, info_span, warn}; 18 17 19 18 #[derive(Deserialize)] 20 19 pub struct StreamQuery { ··· 34 33 let (cancel_tx, cancel_rx) = tokio::sync::oneshot::channel::<()>(); 35 34 36 35 let runtime = tokio::runtime::Handle::current(); 36 + let id = std::time::SystemTime::UNIX_EPOCH 37 + .elapsed() 38 + .unwrap() 39 + .as_secs(); 37 40 38 41 let thread = std::thread::Builder::new() 39 42 .name(format!( 40 - "stream-handler-{}", 41 - std::time::SystemTime::UNIX_EPOCH 42 - .elapsed() 43 - .unwrap() 44 - .as_secs() 43 + "stream-handler-{id}", 44 + 45 45 )) 46 46 .spawn(move || { 47 47 let mut cancel_rx = cancel_rx; ··· 56 56 } 57 57 }; 58 58 let _runtime_guard = runtime.enter(); 59 + let span = info_span!("stream", id); 60 + let _entered_span = span.enter(); 59 61 60 62 loop { 61 63 // 1. catch up from DB ··· 100 102 } 101 103 }; 102 104 105 + let _entered = info_span!("record", cid = ?cid.map(|c| c.to_string())).entered(); 106 + 103 107 let marshallable = { 104 - let mut record_val = None; 105 - if let Some(cid) = &cid { 106 - if let Ok(Some(block_bytes)) = db.blocks.get(&cid.to_bytes()) { 107 - if let Ok(raw_data) = 108 - serde_ipld_dagcbor::from_slice::<RawData>(&block_bytes) 109 - { 110 - record_val = serde_json::to_value(raw_data).ok(); 108 + let record_val; 109 + let block_bytes = cid 110 + .map(|cid| db.blocks.get(&cid.to_bytes())) 111 + .transpose() 112 + .map(Option::flatten); 113 + match block_bytes { 114 + Ok(Some(block_bytes)) => match serde_ipld_dagcbor::from_slice::<serde_json::Value>(&block_bytes) { 115 + Ok(val) => record_val = val, 116 + Err(e) => { 117 + error!(err = %e, "cant parse block, must be corrupted?"); 118 + return; 111 119 } 112 120 } 121 + Ok(None) => { 122 + warn!("block not found, possibly repo deleted but events not evicted yet?"); 123 + continue; 124 + } 125 + Err(e) => { 126 + error!(err = %e, "can't get block"); 127 + crate::db::check_poisoned(&e); 128 + return; 129 + } 113 130 } 114 131 115 132 MarshallableEvt { ··· 122 139 collection, 123 140 rkey: CowStr::Owned(rkey.to_smolstr().into()), 124 141 action: CowStr::Borrowed(action.as_str()), 125 - record: record_val, 142 + record: Some(record_val), 126 143 cid: cid 127 144 .map(|c| jacquard_common::types::cid::Cid::ipld(c).into()), 128 145 }),