at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall

[backfill] emit delete events if we have records that dont exist anymore

ptr.pet 5dc179d5 7d1c28db

verified
+66 -34
+66 -34
src/backfill/mod.rs
··· 12 12 use jacquard_repo::mst::Mst; 13 13 use jacquard_repo::{BlockStore, MemoryBlockStore}; 14 14 use miette::{IntoDiagnostic, Result}; 15 - use smol_str::{SmolStr, ToSmolStr, format_smolstr}; 15 + use smol_str::{SmolStr, ToSmolStr}; 16 16 use std::collections::HashMap; 17 17 use std::sync::Arc; 18 18 use std::sync::atomic::Ordering; ··· 356 356 357 357 // 6. insert records into db 358 358 let start = Instant::now(); 359 - let (_state, added_records, added_blocks, count) = { 359 + let (_state, records_cnt_delta, added_blocks, count) = { 360 360 let app_state = app_state.clone(); 361 361 let did = did.clone(); 362 362 let rev = root_commit.rev; 363 363 364 364 tokio::task::spawn_blocking(move || { 365 365 let mut count = 0; 366 - let mut added_records = 0; 366 + let mut delta = 0; 367 367 let mut added_blocks = 0; 368 368 let mut collection_counts: HashMap<SmolStr, u64> = HashMap::new(); 369 369 let mut batch = app_state.db.inner.batch(); ··· 372 372 // pre-load existing record CIDs for this DID to detect duplicates/updates 373 373 let prefix = keys::record_prefix(&did); 374 374 let prefix_len = prefix.len(); 375 - let mut existing_cids: HashMap<SmolStr, SmolStr> = HashMap::new(); 375 + let mut existing_cids: HashMap<(SmolStr, SmolStr), SmolStr> = HashMap::new(); 376 376 for guard in app_state.db.records.prefix(&prefix) { 377 377 let (key, cid_bytes) = guard.into_inner().into_diagnostic()?; 378 378 // extract path (collection/rkey) from key by skipping the DID prefix 379 - let path = String::from_utf8_lossy(&key[prefix_len..]).to_smolstr(); 380 - let cid = String::from_utf8_lossy(&cid_bytes).to_smolstr(); 381 - existing_cids.insert(path, cid); 379 + let mut path_split = key[prefix_len..].split(|b| *b == keys::SEP); 380 + let collection = std::str::from_utf8( 381 + path_split 382 + .next() 383 + .ok_or_else(|| miette::miette!("collection not found"))?, 384 + ) 385 + .into_diagnostic()? 386 + .to_smolstr(); 387 + let rkey = std::str::from_utf8( 388 + path_split 389 + .next() 390 + .ok_or_else(|| miette::miette!("record key not found"))?, 391 + ) 392 + .into_diagnostic()? 393 + .to_smolstr(); 394 + let cid = std::str::from_utf8(&cid_bytes) 395 + .into_diagnostic()? 396 + .to_smolstr(); 397 + existing_cids.insert((collection, rkey), cid); 382 398 } 383 399 384 400 for (key, cid) in leaves { ··· 388 404 389 405 if let Some(val) = val_bytes { 390 406 let (collection, rkey) = ops::parse_path(&key)?; 391 - let collection = collection.to_smolstr(); 407 + let path = (collection.to_smolstr(), rkey.to_smolstr()); 392 408 let cid = Cid::ipld(cid); 393 409 394 410 // check if this record already exists with same CID 395 - let path = format_smolstr!("{collection}{}{rkey}", keys::SEP as char); 396 - let (action, is_new) = if let Some(existing_cid) = existing_cids.get(&path) 397 - { 398 - if existing_cid == cid.as_str() { 399 - debug!("skip {did}/{collection}/{rkey} ({cid})"); 400 - continue; // skip unchanged record 401 - } 402 - ("update", false) 403 - } else { 404 - ("create", true) 405 - }; 411 + let (action, is_new) = 412 + if let Some(existing_cid) = existing_cids.remove(&path) { 413 + if existing_cid == cid.as_str() { 414 + debug!("skip {did}/{collection}/{rkey} ({cid})"); 415 + continue; // skip unchanged record 416 + } 417 + ("update", false) 418 + } else { 419 + ("create", true) 420 + }; 406 421 debug!("{action} {did}/{collection}/{rkey} ({cid})"); 407 422 408 - let db_key = keys::record_key(&did, &collection, rkey); 423 + let db_key = keys::record_key(&did, &collection, &rkey); 409 424 410 425 batch.insert( 411 426 &app_state.db.blocks, ··· 416 431 417 432 added_blocks += 1; 418 433 if is_new { 419 - added_records += 1; 420 - *collection_counts.entry(collection.clone()).or_default() += 1; 434 + delta += 1; 435 + *collection_counts.entry(path.0.clone()).or_default() += 1; 421 436 } 422 437 423 438 let event_id = app_state.db.next_event_id.fetch_add(1, Ordering::SeqCst); 424 439 let evt = StoredEvent { 425 440 did: TrimmedDid::from(&did), 426 441 rev: CowStr::Borrowed(rev.as_str()), 427 - collection: CowStr::Borrowed(collection.as_str()), 442 + collection: CowStr::Borrowed(collection), 428 443 rkey: CowStr::Borrowed(rkey), 429 444 action: CowStr::Borrowed(action), 430 445 cid: Some(cid), 431 446 }; 432 - 433 447 let bytes = rmp_serde::to_vec(&evt).into_diagnostic()?; 434 448 batch.insert(&app_state.db.events, keys::event_key(event_id), bytes); 435 449 ··· 437 451 } 438 452 } 439 453 454 + // remove any remaining existing records (they weren't in the new MST) 455 + for ((collection, rkey), cid) in existing_cids { 456 + debug!("remove {did}/{collection}/{rkey} ({cid})"); 457 + batch.remove( 458 + &app_state.db.records, 459 + keys::record_key(&did, &collection, &rkey), 460 + ); 461 + 462 + let event_id = app_state.db.next_event_id.fetch_add(1, Ordering::SeqCst); 463 + let evt = StoredEvent { 464 + did: TrimmedDid::from(&did), 465 + rev: CowStr::Borrowed(rev.as_str()), 466 + collection: CowStr::Borrowed(&collection), 467 + rkey: CowStr::Borrowed(&rkey), 468 + action: CowStr::Borrowed("delete"), 469 + cid: None, 470 + }; 471 + let bytes = rmp_serde::to_vec(&evt).into_diagnostic()?; 472 + batch.insert(&app_state.db.events, keys::event_key(event_id), bytes); 473 + 474 + delta -= 1; 475 + count += 1; 476 + } 477 + 440 478 // 6. update status to synced 441 479 state.status = RepoStatus::Synced; 442 480 state.rev = Some(rev); ··· 456 494 457 495 batch.commit().into_diagnostic()?; 458 496 459 - Ok::<_, miette::Report>((state, added_records, added_blocks, count)) 497 + Ok::<_, miette::Report>((state, delta, added_blocks, count)) 460 498 }) 461 499 .await 462 500 .into_diagnostic()?? 463 501 }; 464 - 465 - trace!( 466 - "inserted {} records into db for {} in {:?}", 467 - count, 468 - did, 469 - start.elapsed() 470 - ); 502 + trace!("did {count} ops for {did} in {:?}", start.elapsed()); 471 503 472 504 // do the counts 473 - if added_records > 0 { 505 + if records_cnt_delta > 0 { 474 506 app_state 475 507 .db 476 - .update_count_async("records", added_records) 508 + .update_count_async("records", records_cnt_delta) 477 509 .await; 478 510 app_state 479 511 .db