at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
1use crate::db::{Db, keys};
2
3use fjall::{OwnedWriteBatch, Slice};
4use miette::{IntoDiagnostic, Result};
5use std::sync::atomic::Ordering;
6
7/// a write batch that tracks block refcount deltas and applies them
8/// automatically on commit. this prevents the bug class where callers
9/// forget to call `apply_block_refcount_deltas` after committing.
10pub struct RefcountedBatch<'db> {
11 batch: OwnedWriteBatch,
12 db: &'db Db,
13 pending_deltas: Vec<(Slice, i8)>,
14}
15
16impl<'db> RefcountedBatch<'db> {
17 pub fn new(db: &'db Db) -> Self {
18 Self {
19 batch: db.inner.batch(),
20 db,
21 pending_deltas: Vec::new(),
22 }
23 }
24
25 /// records a refcount delta for the given CID. writes to the reflog
26 /// in the batch and accumulates the delta for in-memory application on commit.
27 pub fn update_block_refcount(&mut self, cid_bytes: Slice, delta: i8) -> Result<()> {
28 #[cfg(debug_assertions)]
29 if let Ok(cid) = cid::Cid::read_bytes(cid_bytes.as_ref()) {
30 tracing::debug!(delta, %cid, "update_block_refcount");
31 }
32
33 let value = rmp_serde::to_vec(&(cid_bytes.as_ref(), delta)).into_diagnostic()?;
34 let seq = self.db.next_reflog_seq.fetch_add(1, Ordering::SeqCst);
35 self.batch
36 .insert(&self.db.block_reflog, keys::reflog_key(seq), value);
37 self.pending_deltas.push((cid_bytes, delta));
38 Ok(())
39 }
40
41 /// commits the batch and applies all accumulated refcount deltas to the in-memory map.
42 pub fn commit(self) -> Result<(), fjall::Error> {
43 self.batch.commit()?;
44 apply_deltas(self.db, &self.pending_deltas);
45 Ok(())
46 }
47
48 pub fn batch_mut(&mut self) -> &mut OwnedWriteBatch {
49 &mut self.batch
50 }
51}
52
53fn apply_deltas(db: &Db, deltas: &[(Slice, i8)]) {
54 for (cid_bytes, delta) in deltas {
55 let mut entry = db
56 .block_refcounts
57 .entry_sync(cid_bytes.clone())
58 .or_insert(0);
59 *entry += *delta as i64;
60 }
61}