at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall

[ingest] fix signal mode: pass unknown DIDs to worker; add debug logging

ptr.pet bf0d6d86 a28c3f9c

verified
+28 -6
+18 -5
src/ingest/firehose.rs
··· 10 10 use std::sync::Arc; 11 11 use std::sync::atomic::Ordering; 12 12 use std::time::Duration; 13 - use tracing::{error, info}; 13 + use tracing::{debug, error, info, trace}; 14 14 use url::Url; 15 15 16 16 pub struct FirehoseIngestor { ··· 99 99 _ => return, 100 100 }; 101 101 102 - if !self 102 + let process = self 103 103 .should_process(did) 104 104 .await 105 105 .inspect_err(|e| error!("failed to check if we should process {did}: {e}")) 106 - .unwrap_or(false) 107 - { 106 + .unwrap_or(false); 107 + if !process { 108 + trace!("skipping {did}: not in filter"); 108 109 return; 109 110 } 111 + debug!("forwarding message for {did} to ingest buffer"); 110 112 111 113 if let Err(e) = self.buffer_tx.send(IngestMessage::Firehose(msg)) { 112 114 error!("failed to send message to buffer processor: {e}"); ··· 138 140 .contains_key(&did_key) 139 141 .into_diagnostic()? 140 142 { 143 + debug!("{did} is in DID allowlist, processing"); 141 144 return Ok(true); 142 145 } 143 - Db::contains_key(self.state.db.repos.clone(), keys::repo_key(did)).await 146 + let known = 147 + Db::contains_key(self.state.db.repos.clone(), keys::repo_key(did)).await?; 148 + if known { 149 + debug!("{did} is a known repo, processing"); 150 + } else { 151 + debug!( 152 + "{did} is unknown — passing to worker for signal check (mode={:?})", 153 + filter.mode 154 + ); 155 + } 156 + Ok(known || filter.mode == FilterMode::Signal) 144 157 } 145 158 } 146 159 }
+10 -1
src/ingest/worker.rs
··· 538 538 let touches_signal = commit.ops.iter().any(|op| { 539 539 op.path 540 540 .split_once('/') 541 - .map(|(col, _)| filter.matches_signal(col)) 541 + .map(|(col, _)| { 542 + let m = filter.matches_signal(col); 543 + debug!( 544 + "signal check for {did}: op path={} col={col} signals={:?} -> {m}", 545 + op.path, filter.signals 546 + ); 547 + m 548 + }) 542 549 .unwrap_or(false) 543 550 }); 544 551 if !touches_signal { 552 + debug!("dropping {did}: commit has no signal-matching ops"); 545 553 return Ok(RepoProcessResult::Syncing(None)); 546 554 } 555 + debug!("{did}: commit touches a signal, queuing backfill"); 547 556 } 548 557 549 558 debug!("discovered new account {did} from firehose, queueing backfill");