at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall

[ingest] dont unconditionally insert blocks to the db

ptr.pet e2d47ab1 800449f7

verified
+13 -10
+2
src/backfill/mod.rs
··· 712 .db 713 .update_count_async("records", records_cnt_delta) 714 .await; 715 app_state 716 .db 717 .update_count_async("blocks", added_blocks)
··· 712 .db 713 .update_count_async("records", records_cnt_delta) 714 .await; 715 + } 716 + if added_blocks > 0 { 717 app_state 718 .db 719 .update_count_async("blocks", added_blocks)
+11 -10
src/ops.rs
··· 259 260 batch.insert(&db.repos, keys::repo_key(did), ser_repo_state(&repo_state)?); 261 262 - // store all blocks in the CAS 263 - for (cid, bytes) in &parsed.blocks { 264 - batch.insert(&db.blocks, cid.to_bytes(), bytes.to_vec()); 265 - } 266 - 267 // 2. iterate ops and update records index 268 let mut records_delta = 0; 269 let mut collection_deltas: HashMap<&str, i64> = HashMap::new(); 270 271 for op in &commit.ops { ··· 286 let Some(cid) = &op.cid else { 287 continue; 288 }; 289 batch.insert( 290 &db.records, 291 db_key.clone(), 292 - cid.to_ipld() 293 - .into_diagnostic() 294 - .wrap_err("expected valid cid from relay")? 295 - .to_bytes(), 296 ); 297 298 // accumulate counts ··· 325 } 326 327 // update counts 328 - let blocks_count = parsed.blocks.len() as i64; 329 for (col, delta) in collection_deltas { 330 db::update_record_count(batch, db, did, col, delta)?; 331 }
··· 259 260 batch.insert(&db.repos, keys::repo_key(did), ser_repo_state(&repo_state)?); 261 262 // 2. iterate ops and update records index 263 let mut records_delta = 0; 264 + let mut blocks_count = 0; 265 let mut collection_deltas: HashMap<&str, i64> = HashMap::new(); 266 267 for op in &commit.ops { ··· 282 let Some(cid) = &op.cid else { 283 continue; 284 }; 285 + let cid_ipld = cid.to_ipld() 286 + .into_diagnostic() 287 + .wrap_err("expected valid cid from relay")?; 288 + 289 + if let Some(bytes) = parsed.blocks.get(&cid_ipld) { 290 + batch.insert(&db.blocks, cid_ipld.to_bytes(), bytes.to_vec()); 291 + blocks_count += 1; 292 + } 293 + 294 batch.insert( 295 &db.records, 296 db_key.clone(), 297 + cid_ipld.to_bytes(), 298 ); 299 300 // accumulate counts ··· 327 } 328 329 // update counts 330 for (col, delta) in collection_deltas { 331 db::update_record_count(batch, db, did, col, delta)?; 332 }