at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall

[ingest] invalidate cached doc on account, sync events and on backfill

ptr.pet 606f3c23 9835bcb1

verified
+28 -16
+3
src/backfill/mod.rs
··· 361 361 ) -> Result<Option<RepoState<'static>>, BackfillError> { 362 362 debug!(did = %did, "backfilling"); 363 363 364 + // always invalidate doc before backfilling 365 + app_state.resolver.invalidate(did).await; 366 + 364 367 let db = &app_state.db; 365 368 let did_key = keys::repo_key(did); 366 369 let state_bytes = Db::get(db.repos.clone(), did_key)
+11 -12
src/ingest/worker.rs
··· 88 88 } 89 89 90 90 // starts the worker threads and the main dispatch loop 91 - // the dispatch loop reads from the firehose channel and distributes messages to shards 92 - // based on the consistent hash of the DID 91 + // the dispatch loop reads from the firehose channel and 92 + // distributes messages to shards based on the hash of the DID 93 93 pub fn run(mut self, handle: tokio::runtime::Handle) -> Result<()> { 94 94 let mut shards = Vec::with_capacity(self.num_shards); 95 95 ··· 102 102 let handle = handle.clone(); 103 103 104 104 std::thread::Builder::new() 105 - .name(format!("ingest-shard-{}", i)) 105 + .name(format!("ingest-shard-{i}")) 106 106 .spawn(move || { 107 - Self::worker_thread(i, rx, state, verify, handle); 107 + Self::shard(i, rx, state, verify, handle); 108 108 }) 109 109 .into_diagnostic()?; 110 110 } ··· 143 143 Ok(()) 144 144 } 145 145 146 - // synchronous worker loop running on a dedicated thread 147 - // pulls messages from the channel, builds batches, and processes them 148 - // enters the tokio runtime only when necessary (key resolution) 149 - fn worker_thread( 146 + #[inline(always)] 147 + fn shard( 150 148 id: usize, 151 149 mut rx: mpsc::UnboundedReceiver<IngestMessage>, 152 150 state: Arc<AppState>, ··· 313 311 SubscribeReposMessage::Sync(sync) => { 314 312 debug!(did = %did, "processing buffered sync"); 315 313 314 + ctx.state.resolver.invalidate_sync(did); 315 + 316 316 match ops::verify_sync_event( 317 317 sync.blocks.as_ref(), 318 318 Self::fetch_key(ctx, did)?.as_ref(), ··· 355 355 } 356 356 SubscribeReposMessage::Identity(identity) => { 357 357 debug!(did = %did, "processing buffered identity"); 358 - let handle = identity 359 - .handle 360 - .as_ref() 361 - .map(|h| h.to_cowstr().into_static()); 358 + let handle = identity.handle.as_ref().map(|h| h.clone().into_static()); 362 359 363 360 let evt = IdentityEvt { 364 361 did: did.clone().into_static(), ··· 374 371 active: account.active, 375 372 status: account.status.as_ref().map(|s| s.to_cowstr().into_static()), 376 373 }; 374 + 375 + ctx.state.resolver.invalidate_sync(did); 377 376 378 377 if !account.active { 379 378 use crate::ingest::stream::AccountStatus;
+13 -3
src/resolver.rs
··· 48 48 } 49 49 50 50 #[derive(Clone)] 51 - struct MiniDoc { 51 + pub struct MiniDoc { 52 52 pds: Url, 53 53 handle: Option<Handle<'static>>, 54 54 key: Option<PublicKey<'static>>, ··· 94 94 } 95 95 } 96 96 97 + pub async fn invalidate(&self, did: &Did<'_>) { 98 + self.inner 99 + .cache 100 + .remove_async(&did.clone().into_static()) 101 + .await; 102 + } 103 + 104 + pub fn invalidate_sync(&self, did: &Did<'_>) { 105 + self.inner.cache.remove_sync(&did.clone().into_static()); 106 + } 107 + 97 108 async fn req<'r, T, Fut>( 98 109 &'r self, 99 110 is_plc: bool, ··· 138 149 } 139 150 } 140 151 141 - #[inline] 142 - async fn resolve_doc(&self, did: &Did<'_>) -> Result<MiniDoc, ResolverError> { 152 + pub async fn resolve_doc(&self, did: &Did<'_>) -> Result<MiniDoc, ResolverError> { 143 153 let did_static = did.clone().into_static(); 144 154 if let Some(entry) = self.inner.cache.get_async(&did_static).await { 145 155 return Ok(entry.get().clone());
+1 -1
src/types.rs
··· 171 171 #[serde(borrow)] 172 172 pub did: Did<'i>, 173 173 #[serde(skip_serializing_if = "Option::is_none")] 174 - pub handle: Option<CowStr<'i>>, 174 + pub handle: Option<Handle<'i>>, 175 175 } 176 176 177 177 #[derive(Debug, Serialize, Clone)]