at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall
at main 385 lines 13 kB view raw
1use fjall::OwnedWriteBatch; 2use fjall::Slice; 3 4use crate::db::refcount::RefcountedBatch; 5use jacquard_common::CowStr; 6use jacquard_common::IntoStatic; 7use jacquard_common::types::cid::Cid; 8use jacquard_common::types::crypto::PublicKey; 9use jacquard_common::types::did::Did; 10use jacquard_repo::car::reader::parse_car_bytes; 11use miette::{Context, IntoDiagnostic, Result}; 12use rand::{Rng, rng}; 13use std::collections::HashMap; 14use std::sync::atomic::Ordering; 15use std::time::Instant; 16use tracing::{debug, trace}; 17 18use crate::db::types::{DbAction, DbRkey, DbTid, TrimmedDid}; 19use crate::db::{self, Db, keys, ser_repo_state}; 20use crate::filter::FilterConfig; 21use crate::ingest::stream::Commit; 22use crate::types::{ 23 AccountEvt, BroadcastEvent, IdentityEvt, MarshallableEvt, RepoState, RepoStatus, ResyncState, 24 StoredEvent, 25}; 26 27pub fn persist_to_resync_buffer(db: &Db, did: &Did, commit: &Commit) -> Result<()> { 28 let key = keys::resync_buffer_key(did, DbTid::from(&commit.rev)); 29 let value = rmp_serde::to_vec(commit).into_diagnostic()?; 30 db.resync_buffer.insert(key, value).into_diagnostic()?; 31 debug!( 32 did = %did, 33 seq = commit.seq, 34 "buffered commit to resync_buffer" 35 ); 36 Ok(()) 37} 38 39pub fn has_buffered_commits(db: &Db, did: &Did) -> bool { 40 let prefix = keys::resync_buffer_prefix(did); 41 db.resync_buffer.prefix(&prefix).next().is_some() 42} 43 44// emitting identity is ephemeral 45// we dont replay these, consumers can just fetch identity themselves if they need it 46pub fn make_identity_event(db: &Db, evt: IdentityEvt<'static>) -> BroadcastEvent { 47 let event_id = db.next_event_id.fetch_add(1, Ordering::SeqCst); 48 let marshallable = MarshallableEvt { 49 id: event_id, 50 event_type: "identity".into(), 51 record: None, 52 identity: Some(evt), 53 account: None, 54 }; 55 BroadcastEvent::Ephemeral(Box::new(marshallable)) 56} 57 58pub fn make_account_event(db: &Db, evt: AccountEvt<'static>) -> BroadcastEvent { 59 let event_id = db.next_event_id.fetch_add(1, Ordering::SeqCst); 60 let marshallable = MarshallableEvt { 61 id: event_id, 62 event_type: "account".into(), 63 record: None, 64 identity: None, 65 account: Some(evt), 66 }; 67 BroadcastEvent::Ephemeral(Box::new(marshallable)) 68} 69 70pub fn delete_repo( 71 batch: &mut RefcountedBatch<'_>, 72 db: &Db, 73 did: &Did, 74 repo_state: &RepoState, 75) -> Result<()> { 76 debug!(did = %did, "deleting repo"); 77 78 let repo_key = keys::repo_key(did); 79 let pending_key = keys::pending_key(repo_state.index_id); 80 81 // 1. delete from repos, pending, resync 82 batch.batch_mut().remove(&db.repos, &repo_key); 83 match repo_state.status { 84 RepoStatus::Synced => {} 85 RepoStatus::Backfilling => { 86 batch.batch_mut().remove(&db.pending, &pending_key); 87 } 88 _ => { 89 batch.batch_mut().remove(&db.resync, &repo_key); 90 } 91 } 92 93 // 2. delete from resync buffer 94 let resync_prefix = keys::resync_buffer_prefix(did); 95 for guard in db.resync_buffer.prefix(&resync_prefix) { 96 let k = guard.key().into_diagnostic()?; 97 batch.batch_mut().remove(&db.resync_buffer, k); 98 } 99 100 // 3. delete from records 101 let records_prefix = keys::record_prefix_did(did); 102 for guard in db.records.prefix(&records_prefix) { 103 let (k, cid_bytes) = guard.into_inner().into_diagnostic()?; 104 batch.update_block_refcount(cid_bytes, -1)?; 105 batch.batch_mut().remove(&db.records, k); 106 } 107 108 // 4. reset collection counts 109 let mut count_prefix = Vec::new(); 110 count_prefix.push(b'r'); 111 count_prefix.push(keys::SEP); 112 TrimmedDid::from(did).write_to_vec(&mut count_prefix); 113 count_prefix.push(keys::SEP); 114 115 for guard in db.counts.prefix(&count_prefix) { 116 let k = guard.key().into_diagnostic()?; 117 batch.batch_mut().remove(&db.counts, k); 118 } 119 120 Ok(()) 121} 122 123pub fn update_repo_status<'batch, 's>( 124 batch: &'batch mut OwnedWriteBatch, 125 db: &Db, 126 did: &Did, 127 mut repo_state: RepoState<'s>, 128 new_status: RepoStatus, 129) -> Result<RepoState<'s>> { 130 debug!(did = %did, status = ?new_status, "updating repo status"); 131 132 let repo_key = keys::repo_key(did); 133 let pending_key = keys::pending_key(repo_state.index_id); 134 135 // manage queues 136 match &new_status { 137 RepoStatus::Synced => { 138 batch.remove(&db.pending, &pending_key); 139 // we dont have to remove from resync here because it has to transition resync -> pending first 140 } 141 RepoStatus::Backfilling => { 142 // if we are coming from an error state, remove from resync 143 if !matches!(repo_state.status, RepoStatus::Synced) { 144 batch.remove(&db.resync, &repo_key); 145 } 146 // remove the old entry 147 batch.remove(&db.pending, &pending_key); 148 // add as new entry 149 repo_state.index_id = rng().next_u64(); 150 batch.insert( 151 &db.pending, 152 keys::pending_key(repo_state.index_id), 153 &repo_key, 154 ); 155 } 156 RepoStatus::Error(_msg) => { 157 batch.remove(&db.pending, &pending_key); 158 // TODO: we need to make errors have kind instead of "message" in repo status 159 // and then pass it to resync error kind 160 let resync_state = crate::types::ResyncState::Error { 161 kind: crate::types::ResyncErrorKind::Generic, 162 retry_count: 0, 163 next_retry: chrono::Utc::now().timestamp(), 164 }; 165 batch.insert( 166 &db.resync, 167 &repo_key, 168 rmp_serde::to_vec(&resync_state).into_diagnostic()?, 169 ); 170 } 171 RepoStatus::Deactivated | RepoStatus::Takendown | RepoStatus::Suspended => { 172 // this shouldnt be needed since a repo wont be in a pending state when it gets to any of these states 173 // batch.remove(&db.pending, &pending_key); 174 let resync_state = ResyncState::Gone { 175 status: new_status.clone(), 176 }; 177 batch.insert( 178 &db.resync, 179 &repo_key, 180 rmp_serde::to_vec(&resync_state).into_diagnostic()?, 181 ); 182 } 183 } 184 185 repo_state.status = new_status; 186 repo_state.last_updated_at = chrono::Utc::now().timestamp(); 187 188 batch.insert(&db.repos, &repo_key, ser_repo_state(&repo_state)?); 189 190 Ok(repo_state) 191} 192 193pub fn verify_sync_event(blocks: &[u8], key: Option<&PublicKey>) -> Result<(Cid<'static>, String)> { 194 let parsed = tokio::task::block_in_place(|| { 195 tokio::runtime::Handle::current() 196 .block_on(parse_car_bytes(blocks)) 197 .into_diagnostic() 198 })?; 199 200 let root_bytes = parsed 201 .blocks 202 .get(&parsed.root) 203 .ok_or_else(|| miette::miette!("root block missing from CAR"))?; 204 205 let repo_commit = jacquard_repo::commit::Commit::from_cbor(root_bytes).into_diagnostic()?; 206 207 if let Some(key) = key { 208 repo_commit 209 .verify(key) 210 .map_err(|e| miette::miette!("signature verification failed: {e}"))?; 211 } 212 213 Ok(( 214 Cid::ipld(repo_commit.data).into_static(), 215 repo_commit.rev.to_string(), 216 )) 217} 218 219pub struct ApplyCommitResults<'s> { 220 pub repo_state: RepoState<'s>, 221 pub records_delta: i64, 222 pub blocks_count: i64, 223} 224 225pub fn apply_commit<'db, 'commit, 's>( 226 batch: &mut RefcountedBatch<'db>, 227 db: &'db Db, 228 mut repo_state: RepoState<'s>, 229 commit: &'commit Commit<'commit>, 230 signing_key: Option<&PublicKey>, 231 filter: &FilterConfig, 232 ephemeral: bool, 233) -> Result<ApplyCommitResults<'s>> { 234 let did = &commit.repo; 235 debug!(did = %did, commit = %commit.commit, "applying commit"); 236 237 // 1. parse CAR blocks and store them in CAS 238 let start = Instant::now(); 239 let parsed = tokio::task::block_in_place(|| { 240 tokio::runtime::Handle::current() 241 .block_on(parse_car_bytes(commit.blocks.as_ref())) 242 .into_diagnostic() 243 })?; 244 245 trace!(did = %did, elapsed = ?start.elapsed(), "parsed car"); 246 247 let root_bytes = parsed 248 .blocks 249 .get(&parsed.root) 250 .ok_or_else(|| miette::miette!("root block missing from CAR"))?; 251 252 let repo_commit = jacquard_repo::commit::Commit::from_cbor(root_bytes).into_diagnostic()?; 253 254 if let Some(key) = signing_key { 255 repo_commit 256 .verify(key) 257 .map_err(|e| miette::miette!("signature verification failed for {did}: {e}"))?; 258 trace!(did = %did, "signature verified"); 259 } 260 261 repo_state.rev = Some((&commit.rev).into()); 262 repo_state.data = Some(repo_commit.data); 263 repo_state.last_updated_at = chrono::Utc::now().timestamp(); 264 265 batch 266 .batch_mut() 267 .insert(&db.repos, keys::repo_key(did), ser_repo_state(&repo_state)?); 268 269 // 2. iterate ops and update records index 270 let mut records_delta = 0; 271 let mut blocks_count = 0; 272 let mut collection_deltas: HashMap<&str, i64> = HashMap::new(); 273 274 for op in &commit.ops { 275 let (collection, rkey) = parse_path(&op.path)?; 276 277 if !filter.matches_collection(collection) { 278 continue; 279 } 280 281 let rkey = DbRkey::new(rkey); 282 let db_key = keys::record_key(did, collection, &rkey); 283 284 let event_id = db.next_event_id.fetch_add(1, Ordering::SeqCst); 285 286 let action = DbAction::try_from(op.action.as_str())?; 287 match action { 288 DbAction::Create | DbAction::Update => { 289 let Some(cid) = &op.cid else { 290 continue; 291 }; 292 let cid_ipld = cid 293 .to_ipld() 294 .into_diagnostic() 295 .wrap_err("expected valid cid from relay")?; 296 297 let Some(bytes) = parsed.blocks.get(&cid_ipld) else { 298 return Err(miette::miette!( 299 "block {cid} not found in CAR for record {did}/{collection}/{rkey}" 300 )); 301 }; 302 let cid_bytes = Slice::from(cid_ipld.to_bytes()); 303 batch 304 .batch_mut() 305 .insert(&db.blocks, cid_bytes.clone(), bytes.to_vec()); 306 blocks_count += 1; 307 batch.update_block_refcount( 308 cid_bytes.clone(), 309 ephemeral.then_some(1).unwrap_or(2), 310 )?; 311 312 if !ephemeral { 313 batch 314 .batch_mut() 315 .insert(&db.records, db_key.clone(), cid_ipld.to_bytes()); 316 // for Update, also decrement old CID refcount 317 if action == DbAction::Update { 318 let Some(old_cid_bytes) = db.records.get(&db_key).into_diagnostic()? else { 319 return Err(miette::miette!( 320 "!!! THIS IS A BUG !!! expected previous cid to be there for record being updated ({did}/{collection}/{rkey}). how did we get here?" 321 )); 322 }; 323 if old_cid_bytes != cid_bytes { 324 batch.update_block_refcount(old_cid_bytes, -1)?; 325 } 326 } 327 // accumulate counts 328 if action == DbAction::Create { 329 records_delta += 1; 330 *collection_deltas.entry(collection).or_default() += 1; 331 } 332 } 333 } 334 DbAction::Delete => { 335 if !ephemeral { 336 // decrement block refcount 337 let old_cid_bytes = db.records.get(&db_key).into_diagnostic()?; 338 if let Some(cid_bytes) = old_cid_bytes { 339 batch.update_block_refcount(cid_bytes, -1)?; 340 } 341 batch.batch_mut().remove(&db.records, db_key); 342 343 // accumulate counts 344 records_delta -= 1; 345 *collection_deltas.entry(collection).or_default() -= 1; 346 } 347 } 348 } 349 350 let evt = StoredEvent { 351 live: true, 352 did: TrimmedDid::from(did), 353 rev: DbTid::from(&commit.rev), 354 collection: CowStr::Borrowed(collection), 355 rkey, 356 action, 357 cid: op.cid.as_ref().map(|c| c.to_ipld().expect("valid cid")), 358 }; 359 360 let bytes = rmp_serde::to_vec(&evt).into_diagnostic()?; 361 batch 362 .batch_mut() 363 .insert(&db.events, keys::event_key(event_id), bytes); 364 } 365 366 // update counts 367 if !ephemeral { 368 for (col, delta) in collection_deltas { 369 db::update_record_count(batch.batch_mut(), db, did, col, delta)?; 370 } 371 } 372 373 Ok(ApplyCommitResults { 374 repo_state, 375 records_delta, 376 blocks_count, 377 }) 378} 379 380pub fn parse_path(path: &str) -> Result<(&str, &str)> { 381 let mut parts = path.splitn(2, '/'); 382 let collection = parts.next().wrap_err("missing collection")?; 383 let rkey = parts.next().wrap_err("missing rkey")?; 384 Ok((collection, rkey)) 385}