at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall

[backfill] respect signals, excludes, collections filters properly

ptr.pet 7d3090b1 e2867db2

verified
+49 -7
+49 -7
src/backfill/mod.rs
··· 1 use crate::db::types::{DbAction, DbRkey, DbTid, TrimmedDid}; 2 use crate::db::{self, Db, keys, ser_repo_state}; 3 use crate::ops; 4 use crate::resolver::ResolverError; 5 use crate::state::AppState; ··· 163 let db = &state.db; 164 165 match process_did(&state, &http, &did, verify_signatures).await { 166 - Ok(previous_state) => { 167 let did_key = keys::repo_key(&did); 168 169 // determine old gauge state ··· 227 } 228 Ok(()) 229 } 230 Err(e) => { 231 match &e { 232 BackfillError::Ratelimited => { ··· 371 http: &reqwest::Client, 372 did: &Did<'static>, 373 verify_signatures: bool, 374 - ) -> Result<RepoState<'static>, BackfillError> { 375 debug!("backfilling {}", did); 376 377 let db = &app_state.db; ··· 427 let mut batch = db.inner.batch(); 428 ops::delete_repo(&mut batch, db, did, state)?; 429 batch.commit().into_diagnostic()?; 430 - return Ok(previous_state); // stop backfill 431 } 432 433 let inactive_status = match e { ··· 458 .await?; 459 460 // return success so wrapper stops retrying 461 - return Ok(previous_state); 462 } 463 464 Err(e).into_diagnostic()? ··· 520 521 // 6. insert records into db 522 let start = Instant::now(); 523 - let (_state, records_cnt_delta, added_blocks, count) = { 524 let app_state = app_state.clone(); 525 let did = did.clone(); 526 let rev = root_commit.rev; 527 528 tokio::task::spawn_blocking(move || { 529 let mut count = 0; 530 let mut delta = 0; 531 let mut added_blocks = 0; ··· 561 existing_cids.insert((collection.into(), rkey), cid); 562 } 563 564 for (key, cid) in leaves { 565 let val_bytes = tokio::runtime::Handle::current() 566 .block_on(store.get(&cid)) ··· 568 569 if let Some(val) = val_bytes { 570 let (collection, rkey) = ops::parse_path(&key)?; 571 let rkey = DbRkey::new(rkey); 572 let path = (collection.to_smolstr(), rkey.clone()); 573 let cid_obj = Cid::ipld(cid); ··· 639 count += 1; 640 } 641 642 // 6. update data, status is updated in worker shard 643 state.rev = Some((&rev).into()); 644 state.data = Some(root_commit.data); ··· 657 658 batch.commit().into_diagnostic()?; 659 660 - Ok::<_, miette::Report>((state, delta, added_blocks, count)) 661 }) 662 .await 663 .into_diagnostic()?? 664 }; 665 trace!("did {count} ops for {did} in {:?}", start.elapsed()); 666 667 // do the counts ··· 685 )); 686 687 trace!("backfill complete for {did}"); 688 - Ok(previous_state) 689 }
··· 1 use crate::db::types::{DbAction, DbRkey, DbTid, TrimmedDid}; 2 use crate::db::{self, Db, keys, ser_repo_state}; 3 + use crate::filter::FilterMode; 4 use crate::ops; 5 use crate::resolver::ResolverError; 6 use crate::state::AppState; ··· 164 let db = &state.db; 165 166 match process_did(&state, &http, &did, verify_signatures).await { 167 + Ok(Some(previous_state)) => { 168 let did_key = keys::repo_key(&did); 169 170 // determine old gauge state ··· 228 } 229 Ok(()) 230 } 231 + Ok(None) => { 232 + // signal mode: repo had no matching records, was cleaned up by process_did 233 + state.db.update_count_async("repos", -1).await; 234 + state.db.update_count_async("pending", -1).await; 235 + Ok(()) 236 + } 237 Err(e) => { 238 match &e { 239 BackfillError::Ratelimited => { ··· 378 http: &reqwest::Client, 379 did: &Did<'static>, 380 verify_signatures: bool, 381 + ) -> Result<Option<RepoState<'static>>, BackfillError> { 382 debug!("backfilling {}", did); 383 384 let db = &app_state.db; ··· 434 let mut batch = db.inner.batch(); 435 ops::delete_repo(&mut batch, db, did, state)?; 436 batch.commit().into_diagnostic()?; 437 + return Ok(Some(previous_state)); // stop backfill 438 } 439 440 let inactive_status = match e { ··· 465 .await?; 466 467 // return success so wrapper stops retrying 468 + return Ok(Some(previous_state)); 469 } 470 471 Err(e).into_diagnostic()? ··· 527 528 // 6. insert records into db 529 let start = Instant::now(); 530 + let result = { 531 let app_state = app_state.clone(); 532 let did = did.clone(); 533 let rev = root_commit.rev; 534 535 tokio::task::spawn_blocking(move || { 536 + let filter = app_state.filter.load(); 537 let mut count = 0; 538 let mut delta = 0; 539 let mut added_blocks = 0; ··· 569 existing_cids.insert((collection.into(), rkey), cid); 570 } 571 572 + let mut signal_seen = filter.mode != FilterMode::Signal; 573 + 574 for (key, cid) in leaves { 575 let val_bytes = tokio::runtime::Handle::current() 576 .block_on(store.get(&cid)) ··· 578 579 if let Some(val) = val_bytes { 580 let (collection, rkey) = ops::parse_path(&key)?; 581 + 582 + if !filter.matches_collection(collection) { 583 + continue; 584 + } 585 + 586 + if !signal_seen && filter.matches_signal(collection) { 587 + signal_seen = true; 588 + } 589 + 590 let rkey = DbRkey::new(rkey); 591 let path = (collection.to_smolstr(), rkey.clone()); 592 let cid_obj = Cid::ipld(cid); ··· 658 count += 1; 659 } 660 661 + if !signal_seen { 662 + return Ok::<_, miette::Report>(None); 663 + } 664 + 665 // 6. update data, status is updated in worker shard 666 state.rev = Some((&rev).into()); 667 state.data = Some(root_commit.data); ··· 680 681 batch.commit().into_diagnostic()?; 682 683 + Ok::<_, miette::Report>(Some((state, delta, added_blocks, count))) 684 }) 685 .await 686 .into_diagnostic()?? 687 }; 688 + 689 + let Some((_state, records_cnt_delta, added_blocks, count)) = result else { 690 + // signal mode: no signal-matching records found — clean up the optimistically-added repo 691 + let did_key = keys::repo_key(did); 692 + let backfill_pending_key = keys::pending_key(previous_state.index_id); 693 + let repos_ks = app_state.db.repos.clone(); 694 + let pending_ks = app_state.db.pending.clone(); 695 + let db_inner = app_state.db.inner.clone(); 696 + tokio::task::spawn_blocking(move || { 697 + let mut batch = db_inner.batch(); 698 + batch.remove(&repos_ks, &did_key); 699 + batch.remove(&pending_ks, backfill_pending_key); 700 + batch.commit().into_diagnostic() 701 + }) 702 + .await 703 + .into_diagnostic()??; 704 + return Ok(None); 705 + }; 706 + 707 trace!("did {count} ops for {did} in {:?}", start.elapsed()); 708 709 // do the counts ··· 727 )); 728 729 trace!("backfill complete for {did}"); 730 + Ok(Some(previous_state)) 731 }