at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall

[stream,ingest] fix event send order, fix delete events not desering because of missing CID, consistency fixes

ptr.pet d66238cc 5dc179d5

verified
+166 -117
+4 -2
src/api/stats.rs
··· 1 1 use crate::api::AppState; 2 - use axum::{extract::State, response::Result, Json}; 2 + use axum::{Json, extract::State, response::Result}; 3 3 use serde::Serialize; 4 4 use std::{collections::HashMap, sync::Arc}; 5 5 ··· 11 11 pub async fn get_stats(State(state): State<Arc<AppState>>) -> Result<Json<StatsResponse>> { 12 12 let db = &state.db; 13 13 14 - let counts = futures::future::join_all( 14 + let mut counts: HashMap<&'static str, u64> = futures::future::join_all( 15 15 ["repos", "records", "blocks", "pending", "resync"] 16 16 .into_iter() 17 17 .map(|name| async move { (name, db.get_count(name).await) }), ··· 19 19 .await 20 20 .into_iter() 21 21 .collect(); 22 + // this should be accurate since we dont remove events 23 + counts.insert("events", db.events.approximate_len() as u64); 22 24 23 25 Ok(Json(StatsResponse { counts })) 24 26 }
+84 -57
src/api/stream.rs
··· 9 9 response::IntoResponse, 10 10 }; 11 11 use jacquard_common::types::value::RawData; 12 + use miette::{Context, IntoDiagnostic}; 12 13 use serde::Deserialize; 13 14 use std::sync::Arc; 14 15 use tokio::sync::{broadcast, mpsc}; 16 + use tracing::error; 15 17 16 18 #[derive(Deserialize)] 17 19 pub struct StreamQuery { ··· 54 56 loop { 55 57 let mut found = false; 56 58 for item in ks.range(keys::event_key(current_id + 1)..) { 57 - if let Ok((k, v)) = item.into_inner() { 58 - let mut buf = [0u8; 8]; 59 - buf.copy_from_slice(&k); 60 - let id = u64::from_be_bytes(buf); 61 - current_id = id; 59 + let (k, v) = match item.into_inner() { 60 + Ok((k, v)) => (k, v), 61 + Err(e) => { 62 + error!("failed to read event from db: {e}"); 63 + break; 64 + } 65 + }; 66 + let id = match k 67 + .as_ref() 68 + .try_into() 69 + .into_diagnostic() 70 + .wrap_err("expected event id to be 8 bytes") 71 + .map(u64::from_be_bytes) 72 + { 73 + Ok(id) => id, 74 + Err(e) => { 75 + error!("failed to parse event id: {e}"); 76 + continue; 77 + } 78 + }; 79 + current_id = id; 62 80 63 - let StoredEvent { 64 - did, 65 - rev, 66 - collection, 67 - rkey, 68 - action, 69 - cid, 70 - } = match rmp_serde::from_slice(&v) { 71 - Ok(e) => e, 72 - Err(_) => continue, 73 - }; 81 + let StoredEvent { 82 + did, 83 + rev, 84 + collection, 85 + rkey, 86 + action, 87 + cid, 88 + } = match rmp_serde::from_slice(&v) { 89 + Ok(e) => e, 90 + Err(e) => { 91 + error!("failed to deserialize stored event: {e}"); 92 + continue; 93 + } 94 + }; 74 95 75 - let marshallable = { 76 - let mut record_val = None; 77 - if let Some(cid_str) = &cid { 78 - if let Ok(Some(block_bytes)) = 79 - db.blocks.get(keys::block_key(cid_str)) 96 + let marshallable = { 97 + let mut record_val = None; 98 + if let Some(cid_str) = &cid { 99 + if let Ok(Some(block_bytes)) = 100 + db.blocks.get(keys::block_key(cid_str)) 101 + { 102 + if let Ok(raw_data) = 103 + serde_ipld_dagcbor::from_slice::<RawData>(&block_bytes) 80 104 { 81 - if let Ok(raw_data) = 82 - serde_ipld_dagcbor::from_slice::<RawData>(&block_bytes) 83 - { 84 - record_val = serde_json::to_value(raw_data).ok(); 85 - } 105 + record_val = serde_json::to_value(raw_data).ok(); 86 106 } 87 107 } 108 + } 88 109 89 - MarshallableEvt { 90 - id, 91 - event_type: "record".into(), 92 - record: Some(RecordEvt { 93 - live: true, 94 - did: did.to_did(), 95 - rev, 96 - collection, 97 - rkey, 98 - action, 99 - record: record_val, 100 - cid: cid.map(|c| match c { 101 - jacquard::types::cid::Cid::Ipld { s, .. } => s, 102 - jacquard::types::cid::Cid::Str(s) => s, 103 - }), 110 + MarshallableEvt { 111 + id, 112 + event_type: "record".into(), 113 + record: Some(RecordEvt { 114 + live: true, 115 + did: did.to_did(), 116 + rev, 117 + collection, 118 + rkey, 119 + action, 120 + record: record_val, 121 + cid: cid.map(|c| match c { 122 + jacquard::types::cid::Cid::Ipld { s, .. } => s, 123 + jacquard::types::cid::Cid::Str(s) => s, 104 124 }), 105 - identity: None, 106 - account: None, 107 - } 108 - }; 125 + }), 126 + identity: None, 127 + account: None, 128 + } 129 + }; 109 130 110 - let json_str = match serde_json::to_string(&marshallable) { 111 - Ok(s) => s, 112 - Err(_) => continue, 113 - }; 131 + let json_str = match serde_json::to_string(&marshallable) { 132 + Ok(s) => s, 133 + Err(e) => { 134 + error!("failed to serialize ws event: {e}"); 135 + continue; 136 + } 137 + }; 114 138 115 - if tx.blocking_send(Message::Text(json_str.into())).is_err() { 116 - return; 117 - } 118 - found = true; 119 - } else { 120 - break; 139 + if let Err(e) = tx.blocking_send(Message::Text(json_str.into())) { 140 + error!("failed to send ws message: {e}"); 141 + return; 121 142 } 143 + 144 + found = true; 122 145 } 123 146 if !found { 124 147 break; ··· 134 157 // send ephemeral event directly 135 158 let json_str = match serde_json::to_string(&evt) { 136 159 Ok(s) => s, 137 - Err(_) => continue, 160 + Err(e) => { 161 + error!("failed to serialize ws event: {e}"); 162 + continue; 163 + } 138 164 }; 139 - if tx.blocking_send(Message::Text(json_str.into())).is_err() { 165 + if let Err(e) = tx.blocking_send(Message::Text(json_str.into())) { 166 + error!("failed to send ws message: {e}"); 140 167 return; 141 168 } 142 169 }
+9 -16
src/backfill/mod.rs
··· 11 11 use jacquard_common::xrpc::XrpcError; 12 12 use jacquard_repo::mst::Mst; 13 13 use jacquard_repo::{BlockStore, MemoryBlockStore}; 14 - use miette::{IntoDiagnostic, Result}; 14 + use miette::{Context, IntoDiagnostic, Result}; 15 15 use smol_str::{SmolStr, ToSmolStr}; 16 16 use std::collections::HashMap; 17 17 use std::sync::Arc; ··· 246 246 .into(), 247 247 ), 248 248 }; 249 - ops::emit_account_event(db, evt); 249 + let _ = app_state.db.event_tx.send(ops::make_account_event(db, evt)); 250 250 }; 251 251 252 252 // 2. fetch repo (car) ··· 377 377 let (key, cid_bytes) = guard.into_inner().into_diagnostic()?; 378 378 // extract path (collection/rkey) from key by skipping the DID prefix 379 379 let mut path_split = key[prefix_len..].split(|b| *b == keys::SEP); 380 - let collection = std::str::from_utf8( 381 - path_split 382 - .next() 383 - .ok_or_else(|| miette::miette!("collection not found"))?, 384 - ) 385 - .into_diagnostic()? 386 - .to_smolstr(); 387 - let rkey = std::str::from_utf8( 388 - path_split 389 - .next() 390 - .ok_or_else(|| miette::miette!("record key not found"))?, 391 - ) 392 - .into_diagnostic()? 393 - .to_smolstr(); 380 + let collection = 381 + std::str::from_utf8(path_split.next().wrap_err("collection not found")?) 382 + .into_diagnostic()? 383 + .to_smolstr(); 384 + let rkey = std::str::from_utf8(path_split.next().wrap_err("rkey not found")?) 385 + .into_diagnostic()? 386 + .to_smolstr(); 394 387 let cid = std::str::from_utf8(&cid_bytes) 395 388 .into_diagnostic()? 396 389 .to_smolstr();
+42 -8
src/ingest/worker.rs
··· 3 3 use crate::ops::{self, send_backfill_req}; 4 4 use crate::resolver::NoSigningKeyError; 5 5 use crate::state::AppState; 6 - use crate::types::{AccountEvt, IdentityEvt, RepoState, RepoStatus}; 6 + use crate::types::{AccountEvt, BroadcastEvent, IdentityEvt, RepoState, RepoStatus}; 7 7 use jacquard::api::com_atproto::sync::subscribe_repos::SubscribeReposMessage; 8 8 9 9 use fjall::OwnedWriteBatch; ··· 108 108 let _g = handle.enter(); 109 109 let mut repo_cache = HashMap::new(); 110 110 let mut deleted = HashSet::new(); 111 + let mut broadcast_events = Vec::<BroadcastEvent>::with_capacity(BUF_SIZE); 111 112 112 113 loop { 113 114 let mut batch = self.state.db.inner.batch(); 114 115 repo_cache.clear(); 115 116 deleted.clear(); 117 + broadcast_events.clear(); 116 118 117 119 // resolve signing keys for commits and syncs if verification is enabled 118 120 let keys = if self.verify_signatures { ··· 135 137 HashMap::new() 136 138 }; 137 139 140 + let mut added_blocks = 0; 141 + let mut records_delta = 0; 138 142 for msg in buf.drain(..) { 139 143 let (did, seq) = match &msg { 140 144 SubscribeReposMessage::Commit(c) => (&c.repo, c.seq), ··· 152 156 continue; 153 157 } 154 158 155 - match self.process_message(&mut repo_cache, &mut batch, &msg, did, &keys) { 159 + match self.process_message( 160 + &mut repo_cache, 161 + &mut batch, 162 + &mut added_blocks, 163 + &mut records_delta, 164 + &mut broadcast_events, 165 + &msg, 166 + did, 167 + &keys, 168 + ) { 156 169 Ok(ProcessResult::Ok) => {} 157 170 Ok(ProcessResult::Deleted) => { 158 171 deleted.insert(did.clone()); ··· 178 191 179 192 // commit all changes to db 180 193 batch.commit().into_diagnostic()?; 194 + 195 + if added_blocks > 0 { 196 + self.state.db.update_count("blocks", added_blocks); 197 + } 198 + if records_delta != 0 { 199 + self.state.db.update_count("records", records_delta); 200 + } 201 + for evt in broadcast_events.drain(..) { 202 + let _ = self.state.db.event_tx.send(evt); 203 + } 204 + 181 205 self.state 182 206 .db 183 207 .inner ··· 215 239 &self, 216 240 repo_cache: &mut HashMap<Did<'static>, RepoState<'static>>, 217 241 batch: &mut OwnedWriteBatch, 242 + added_blocks: &mut i64, 243 + records_delta: &mut i64, 244 + broadcast_events: &mut Vec<BroadcastEvent>, 218 245 msg: &BufferedMessage, 219 246 did: &Did, 220 247 keys: &HashMap<Did<'static>, Result<PublicKey<'static>>>, ··· 284 311 return Ok(ProcessResult::Ok); 285 312 } 286 313 287 - let (new_state, cb) = 288 - ops::apply_commit(batch, &state.db, repo_state, &commit, get_key()?)?; 289 - cb(); 290 - repo_cache.insert(did.clone().into_static(), new_state); 314 + let res = ops::apply_commit(batch, &state.db, repo_state, &commit, get_key()?)?; 315 + repo_cache.insert(did.clone().into_static(), res.repo_state); 316 + *added_blocks += res.blocks_count; 317 + *records_delta += res.records_delta; 318 + broadcast_events.push(BroadcastEvent::Persisted( 319 + self.state 320 + .db 321 + .next_event_id 322 + .load(std::sync::atomic::Ordering::SeqCst) 323 + - 1, 324 + )); 291 325 } 292 326 SubscribeReposMessage::Sync(sync) => { 293 327 debug!("processing buffered sync for {did}"); ··· 338 372 did: did.clone().into_static(), 339 373 handle, 340 374 }; 341 - ops::emit_identity_event(&state.db, evt); 375 + broadcast_events.push(ops::make_identity_event(&state.db, evt)); 342 376 } 343 377 SubscribeReposMessage::Account(account) => { 344 378 debug!("processing buffered account for {did}"); ··· 406 440 // 2. initiating backfilling is also handled there 407 441 } 408 442 409 - ops::emit_account_event(&state.db, evt); 443 + broadcast_events.push(ops::make_account_event(&state.db, evt)); 410 444 } 411 445 _ => { 412 446 warn!("unknown message type in buffer for {did}");
+22 -30
src/ops.rs
··· 17 17 use std::collections::HashMap; 18 18 use std::sync::atomic::Ordering; 19 19 use std::time::Instant; 20 - use tracing::{debug, trace}; 20 + use tracing::{debug, trace, warn}; 21 21 22 22 pub fn send_backfill_req(state: &AppState, did: jacquard::types::did::Did<'static>) -> Result<()> { 23 23 state ··· 30 30 31 31 // emitting identity is ephemeral 32 32 // we dont replay these, consumers can just fetch identity themselves if they need it 33 - pub fn emit_identity_event(db: &Db, evt: IdentityEvt<'static>) { 33 + pub fn make_identity_event(db: &Db, evt: IdentityEvt<'static>) -> BroadcastEvent { 34 34 let event_id = db.next_event_id.fetch_add(1, Ordering::SeqCst); 35 35 let marshallable = MarshallableEvt { 36 36 id: event_id, ··· 39 39 identity: Some(evt), 40 40 account: None, 41 41 }; 42 - let _ = db.event_tx.send(BroadcastEvent::Ephemeral(marshallable)); 42 + BroadcastEvent::Ephemeral(marshallable) 43 43 } 44 44 45 - pub fn emit_account_event(db: &Db, evt: AccountEvt<'static>) { 45 + pub fn make_account_event(db: &Db, evt: AccountEvt<'static>) -> BroadcastEvent { 46 46 let event_id = db.next_event_id.fetch_add(1, Ordering::SeqCst); 47 47 let marshallable = MarshallableEvt { 48 48 id: event_id, ··· 51 51 identity: None, 52 52 account: Some(evt), 53 53 }; 54 - let _ = db.event_tx.send(BroadcastEvent::Ephemeral(marshallable)); 54 + BroadcastEvent::Ephemeral(marshallable) 55 55 } 56 56 57 57 pub fn delete_repo<'batch>( ··· 170 170 )) 171 171 } 172 172 173 - pub fn apply_commit<'batch, 'db, 's>( 173 + pub struct ApplyCommitResults<'s> { 174 + pub repo_state: RepoState<'s>, 175 + pub records_delta: i64, 176 + pub blocks_count: i64, 177 + } 178 + 179 + pub fn apply_commit<'batch, 'db, 'commit, 's>( 174 180 batch: &'batch mut OwnedWriteBatch, 175 181 db: &'db Db, 176 182 mut repo_state: RepoState<'s>, 177 - commit: &Commit<'_>, 183 + commit: &'commit Commit<'commit>, 178 184 signing_key: Option<&PublicKey>, 179 - ) -> Result<(RepoState<'s>, impl FnOnce() + use<'db>)> { 185 + ) -> Result<ApplyCommitResults<'s>> { 180 186 let did = &commit.repo; 181 187 debug!("applying commit {} for {did}", &commit.commit); 182 188 ··· 221 227 222 228 // 2. iterate ops and update records index 223 229 let mut records_delta = 0; 224 - let mut events_count = 0; 225 230 let mut collection_deltas: HashMap<&str, i64> = HashMap::new(); 226 231 227 232 for op in &commit.ops { ··· 251 256 records_delta -= 1; 252 257 *collection_deltas.entry(collection).or_default() -= 1; 253 258 } 254 - _ => {} 259 + _ => { 260 + warn!("{did}/{}: unknown op action '{}'", op.path, op.action); 261 + } 255 262 } 256 263 257 264 let evt = StoredEvent { ··· 265 272 266 273 let bytes = rmp_serde::to_vec(&evt).into_diagnostic()?; 267 274 batch.insert(&db.events, keys::event_key(event_id), bytes); 268 - events_count += 1; 269 275 } 270 276 271 - let start = Instant::now(); 272 - 273 - trace!("committed sync batch for {did} in {:?}", start.elapsed()); 274 - 275 277 // update counts 276 278 let blocks_count = parsed.blocks.len() as i64; 277 279 for (col, delta) in collection_deltas { 278 280 db::update_record_count(batch, db, did, col, delta)?; 279 281 } 280 282 281 - let _ = db.event_tx.send(BroadcastEvent::Persisted( 282 - db.next_event_id.load(Ordering::SeqCst) - 1, 283 - )); 284 - 285 - Ok((repo_state, move || { 286 - if blocks_count > 0 { 287 - db.update_count("blocks", blocks_count); 288 - } 289 - if records_delta != 0 { 290 - db.update_count("records", records_delta); 291 - } 292 - if events_count > 0 { 293 - db.update_count("events", events_count); 294 - } 295 - })) 283 + Ok(ApplyCommitResults { 284 + repo_state, 285 + records_delta, 286 + blocks_count, 287 + }) 296 288 } 297 289 298 290 pub fn parse_path(path: &str) -> Result<(&str, &str)> {
+5 -4
src/types.rs
··· 105 105 106 106 // from src/api/event.rs 107 107 108 - #[derive(Debug, Serialize, Deserialize, Clone)] 108 + #[derive(Debug, Serialize, Clone)] 109 109 pub struct MarshallableEvt<'i> { 110 110 pub id: u64, 111 111 #[serde(rename = "type")] ··· 128 128 Ephemeral(MarshallableEvt<'static>), 129 129 } 130 130 131 - #[derive(Debug, Serialize, Deserialize, Clone)] 131 + #[derive(Debug, Serialize, Clone)] 132 132 pub struct RecordEvt<'i> { 133 133 pub live: bool, 134 134 #[serde(borrow)] ··· 143 143 pub cid: Option<CowStr<'i>>, 144 144 } 145 145 146 - #[derive(Debug, Serialize, Deserialize, Clone)] 146 + #[derive(Debug, Serialize, Clone)] 147 147 pub struct IdentityEvt<'i> { 148 148 #[serde(borrow)] 149 149 pub did: Did<'i>, ··· 151 151 pub handle: Option<CowStr<'i>>, 152 152 } 153 153 154 - #[derive(Debug, Serialize, Deserialize, Clone)] 154 + #[derive(Debug, Serialize, Clone)] 155 155 pub struct AccountEvt<'i> { 156 156 #[serde(borrow)] 157 157 pub did: Did<'i>, ··· 174 174 #[serde(borrow)] 175 175 pub action: CowStr<'i>, 176 176 #[serde(borrow)] 177 + #[serde(default)] 177 178 #[serde(skip_serializing_if = "Option::is_none")] 178 179 pub cid: Option<Cid<'i>>, 179 180 }