at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall

[backfill] make some logs trace instead of debug

ptr.pet 4ed265ff c80f16c7

verified
+5 -5
+4 -4
src/backfill/mod.rs
··· 616 // check if this record already exists with same CID 617 let (action, is_new) = if let Some(existing_cid) = existing_cids.remove(&path) { 618 if existing_cid == cid_obj.as_str() { 619 - debug!("skip {did}/{collection}/{rkey} ({cid})"); 620 continue; // skip unchanged record 621 } 622 (DbAction::Update, false) 623 } else { 624 (DbAction::Create, true) 625 }; 626 - debug!("{action} {did}/{collection}/{rkey} ({cid})"); 627 628 // Key is just did|rkey 629 let db_key = keys::record_key(&did, &rkey); ··· 656 657 // remove any remaining existing records (they weren't in the new MST) 658 for ((collection, rkey), cid) in existing_cids { 659 - debug!("remove {did}/{collection}/{rkey} ({cid})"); 660 let partition = app_state.db.record_partition(collection.as_str())?; 661 662 batch.remove(&partition, keys::record_key(&did, &rkey)); ··· 724 )); 725 726 // buffer processing is handled by BufferProcessor when blocked flag is cleared 727 - debug!("backfill complete for {did}"); 728 Ok(previous_state) 729 }
··· 616 // check if this record already exists with same CID 617 let (action, is_new) = if let Some(existing_cid) = existing_cids.remove(&path) { 618 if existing_cid == cid_obj.as_str() { 619 + trace!("skip {did}/{collection}/{rkey} ({cid})"); 620 continue; // skip unchanged record 621 } 622 (DbAction::Update, false) 623 } else { 624 (DbAction::Create, true) 625 }; 626 + trace!("{action} {did}/{collection}/{rkey} ({cid})"); 627 628 // Key is just did|rkey 629 let db_key = keys::record_key(&did, &rkey); ··· 656 657 // remove any remaining existing records (they weren't in the new MST) 658 for ((collection, rkey), cid) in existing_cids { 659 + trace!("remove {did}/{collection}/{rkey} ({cid})"); 660 let partition = app_state.db.record_partition(collection.as_str())?; 661 662 batch.remove(&partition, keys::record_key(&did, &rkey)); ··· 724 )); 725 726 // buffer processing is handled by BufferProcessor when blocked flag is cleared 727 + trace!("backfill complete for {did}"); 728 Ok(previous_state) 729 }
+1 -1
src/main.rs
··· 162 tokio::task::spawn_blocking(move || { 163 firehose_worker 164 .join() 165 - .map_err(|e| miette::miette!("buffer processor thread died: {e:?}")) 166 }) 167 .map(|r| r.into_diagnostic().flatten().flatten()), 168 ) as BoxFuture<_>,
··· 162 tokio::task::spawn_blocking(move || { 163 firehose_worker 164 .join() 165 + .map_err(|e| miette::miette!("buffer processor died: {e:?}")) 166 }) 167 .map(|r| r.into_diagnostic().flatten().flatten()), 168 ) as BoxFuture<_>,