at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall
at main 61 lines 2.0 kB view raw
1use crate::db::{Db, keys}; 2 3use fjall::{OwnedWriteBatch, Slice}; 4use miette::{IntoDiagnostic, Result}; 5use std::sync::atomic::Ordering; 6 7/// a write batch that tracks block refcount deltas and applies them 8/// automatically on commit. this prevents the bug class where callers 9/// forget to call `apply_block_refcount_deltas` after committing. 10pub struct RefcountedBatch<'db> { 11 batch: OwnedWriteBatch, 12 db: &'db Db, 13 pending_deltas: Vec<(Slice, i8)>, 14} 15 16impl<'db> RefcountedBatch<'db> { 17 pub fn new(db: &'db Db) -> Self { 18 Self { 19 batch: db.inner.batch(), 20 db, 21 pending_deltas: Vec::new(), 22 } 23 } 24 25 /// records a refcount delta for the given CID. writes to the reflog 26 /// in the batch and accumulates the delta for in-memory application on commit. 27 pub fn update_block_refcount(&mut self, cid_bytes: Slice, delta: i8) -> Result<()> { 28 #[cfg(debug_assertions)] 29 if let Ok(cid) = cid::Cid::read_bytes(cid_bytes.as_ref()) { 30 tracing::debug!(delta, %cid, "update_block_refcount"); 31 } 32 33 let value = rmp_serde::to_vec(&(cid_bytes.as_ref(), delta)).into_diagnostic()?; 34 let seq = self.db.next_reflog_seq.fetch_add(1, Ordering::SeqCst); 35 self.batch 36 .insert(&self.db.block_reflog, keys::reflog_key(seq), value); 37 self.pending_deltas.push((cid_bytes, delta)); 38 Ok(()) 39 } 40 41 /// commits the batch and applies all accumulated refcount deltas to the in-memory map. 42 pub fn commit(self) -> Result<(), fjall::Error> { 43 self.batch.commit()?; 44 apply_deltas(self.db, &self.pending_deltas); 45 Ok(()) 46 } 47 48 pub fn batch_mut(&mut self) -> &mut OwnedWriteBatch { 49 &mut self.batch 50 } 51} 52 53fn apply_deltas(db: &Db, deltas: &[(Slice, i8)]) { 54 for (cid_bytes, delta) in deltas { 55 let mut entry = db 56 .block_refcounts 57 .entry_sync(cid_bytes.clone()) 58 .or_insert(0); 59 *entry += *delta as i64; 60 } 61}