at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall

[ingest] log errors if we couldnt check in should_process

ptr.pet e89a5aa1 ddda6d79

verified
+8 -3
+1 -2
src/backfill/mod.rs
··· 584 }; 585 trace!("{action} {did}/{collection}/{rkey} ({cid})"); 586 587 - // Key is did|collection|rkey 588 let db_key = keys::record_key(&did, collection, &rkey); 589 590 batch.insert(&app_state.db.blocks, cid.to_bytes(), val.as_ref()); ··· 684 db.next_event_id.load(Ordering::SeqCst) - 1, 685 )); 686 687 - // buffer processing is handled by BufferProcessor when blocked flag is cleared 688 trace!("backfill complete for {did}"); 689 Ok(previous_state) 690 }
··· 584 }; 585 trace!("{action} {did}/{collection}/{rkey} ({cid})"); 586 587 + // key is did|collection|rkey 588 let db_key = keys::record_key(&did, collection, &rkey); 589 590 batch.insert(&app_state.db.blocks, cid.to_bytes(), val.as_ref()); ··· 684 db.next_event_id.load(Ordering::SeqCst) - 1, 685 )); 686 687 trace!("backfill complete for {did}"); 688 Ok(previous_state) 689 }
+7 -1
src/ingest/firehose.rs
··· 98 SubscribeReposMessage::Identity(identity) => &identity.did, 99 SubscribeReposMessage::Account(account) => &account.did, 100 SubscribeReposMessage::Sync(sync) => &sync.did, 101 _ => return, 102 }; 103 104 - if !self.should_process(did).await.unwrap_or(false) { 105 return; 106 } 107
··· 98 SubscribeReposMessage::Identity(identity) => &identity.did, 99 SubscribeReposMessage::Account(account) => &account.did, 100 SubscribeReposMessage::Sync(sync) => &sync.did, 101 + // todo: handle info and unknowns 102 _ => return, 103 }; 104 105 + if !self 106 + .should_process(did) 107 + .await 108 + .inspect_err(|e| error!("failed to check if we should process {did}: {e}")) 109 + .unwrap_or(false) 110 + { 111 return; 112 } 113