at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
1use fjall::Slice;
2use fjall::compaction::filter::{CompactionFilter, Context, Factory, ItemAccessor, Verdict};
3use miette::{IntoDiagnostic, WrapErr};
4use scc::HashMap;
5use std::sync::Arc;
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::time::Duration;
8use tracing::{error, info};
9
10const EVENT_TTL_SECS: u64 = 3600;
11
12use crate::db::{Db, keys};
13use crate::types::StoredEvent;
14
15pub struct BlocksGcFilterFactory {
16 pub gc_ready: Arc<AtomicBool>,
17 pub refcounts: Arc<HashMap<Slice, i64>>,
18}
19
20struct BlocksGcFilter {
21 gc_ready: Arc<AtomicBool>,
22 refcounts: Arc<HashMap<Slice, i64>>,
23}
24
25impl Factory for BlocksGcFilterFactory {
26 fn make_filter(&self, _ctx: &Context) -> Box<dyn CompactionFilter> {
27 Box::new(BlocksGcFilter {
28 gc_ready: self.gc_ready.clone(),
29 refcounts: self.refcounts.clone(),
30 })
31 }
32
33 fn name(&self) -> &str {
34 "blocks_gc"
35 }
36}
37
38impl CompactionFilter for BlocksGcFilter {
39 fn filter_item(&mut self, item: ItemAccessor<'_>, _ctx: &Context) -> lsm_tree::Result<Verdict> {
40 if !self.gc_ready.load(Ordering::SeqCst) {
41 return Ok(Verdict::Keep);
42 }
43
44 let count = self
45 .refcounts
46 .read_sync(item.key().as_ref(), |_, v| *v)
47 .unwrap_or(0);
48
49 #[cfg(debug_assertions)]
50 if let Ok(cid) = cid::Cid::read_bytes(item.key().as_ref()) {
51 tracing::debug!(cid = %cid, count, "BlocksGcFilter checking block");
52 }
53
54 Ok((count <= 0)
55 .then_some(Verdict::Destroy)
56 .unwrap_or(Verdict::Keep))
57 }
58}
59
60pub fn startup_load_refcounts(db: &Db) -> miette::Result<()> {
61 let checkpoint_seq = db
62 .cursors
63 .get(keys::BLOCK_REFS_CHECKPOINT_SEQ_KEY)
64 .into_diagnostic()?
65 .map(|v| {
66 u64::from_be_bytes(
67 v.as_ref()
68 .try_into()
69 .expect("checkpoint seq should be 8 bytes"),
70 )
71 })
72 .unwrap_or(0);
73
74 // check if we need to run the one-time migration
75 let needs_migration = db
76 .counts
77 .get(keys::count_keyspace_key("gc_schema_version"))
78 .into_diagnostic()?
79 .is_none();
80
81 if needs_migration {
82 migrate_build_refcounts(db)?;
83 }
84
85 // load snapshot
86 for guard in db.block_refs.iter() {
87 let (k, v) = guard.into_inner().into_diagnostic()?;
88 let count = i64::from_be_bytes(
89 v.as_ref()
90 .try_into()
91 .into_diagnostic()
92 .wrap_err("invalid block_refs count bytes")?,
93 );
94 let _ = db.block_refcounts.insert_sync(k, count);
95 }
96
97 // replay WAL since checkpoint
98 let start_key = keys::reflog_key(checkpoint_seq.saturating_add(1));
99 for guard in db.block_reflog.range(start_key..) {
100 let (_, v) = guard.into_inner().into_diagnostic()?;
101 let (cid, delta): (Vec<u8>, i8) = rmp_serde::from_slice(&v).into_diagnostic()?;
102 let cid = Slice::from(cid);
103 let mut entry = db.block_refcounts.entry_sync(cid).or_insert(0);
104 *entry += delta as i64;
105 }
106
107 db.gc_ready.store(true, Ordering::SeqCst);
108 info!("block refcounts loaded, gc ready");
109 Ok(())
110}
111
112fn migrate_build_refcounts(db: &Db) -> miette::Result<()> {
113 info!("building initial block refcounts from existing records (one-time migration)");
114 let mut batch = db.inner.batch();
115
116 // scan records
117 for guard in db.records.iter() {
118 let cid_bytes = guard.value().into_diagnostic()?;
119 let mut entry = db.block_refcounts.entry_sync(cid_bytes).or_insert(0);
120 *entry += 1i64;
121 }
122
123 // events with cids
124 for guard in db.events.iter() {
125 let v = guard.value().into_diagnostic()?;
126 let evt = rmp_serde::from_slice::<StoredEvent>(&v).into_diagnostic()?;
127 let Some(cid) = evt.cid else {
128 continue;
129 };
130 let cid_bytes = Slice::from(cid.to_bytes());
131 let mut entry = db.block_refcounts.entry_sync(cid_bytes).or_insert(0);
132 *entry += 1i64;
133 }
134
135 // persist as initial checkpoint
136 db.block_refcounts.iter_sync(|k, v| {
137 batch.insert(&db.block_refs, k.as_ref(), v.to_be_bytes());
138 true
139 });
140
141 let seq = db.next_reflog_seq.load(Ordering::SeqCst);
142 batch.insert(
143 &db.cursors,
144 keys::BLOCK_REFS_CHECKPOINT_SEQ_KEY,
145 seq.to_be_bytes(),
146 );
147 // mark migration done
148 let one: u64 = 1;
149 batch.insert(
150 &db.counts,
151 keys::count_keyspace_key("gc_schema_version"),
152 one.to_be_bytes(),
153 );
154 batch.commit().into_diagnostic()?;
155
156 info!("block refcount migration complete");
157 Ok(())
158}
159
160pub fn checkpoint_worker(state: Arc<crate::state::AppState>) {
161 info!("block refs checkpoint worker started");
162 loop {
163 std::thread::sleep(Duration::from_secs(300));
164 if let Err(e) = checkpoint(&state.db) {
165 error!(err = %e, "block refs checkpoint failed");
166 }
167 }
168}
169
170fn checkpoint(db: &Db) -> miette::Result<()> {
171 let checkpoint_seq = db.next_reflog_seq.load(Ordering::SeqCst).saturating_sub(1);
172
173 let mut batch = db.inner.batch();
174
175 db.block_refcounts.iter_sync(|k, v| {
176 batch.insert(&db.block_refs, k.as_ref(), v.to_be_bytes());
177 true
178 });
179
180 batch.insert(
181 &db.cursors,
182 keys::BLOCK_REFS_CHECKPOINT_SEQ_KEY,
183 checkpoint_seq.to_be_bytes(),
184 );
185
186 // truncate reflog up to and including checkpoint_seq
187 for guard in db.block_reflog.range(..=keys::reflog_key(checkpoint_seq)) {
188 let k = guard.key().into_diagnostic()?;
189 batch.remove(&db.block_reflog, k);
190 }
191
192 batch.commit().into_diagnostic()?;
193 info!(seq = checkpoint_seq, "block refs checkpoint complete");
194 Ok(())
195}
196
197pub fn ephemeral_startup_load_refcounts(db: &Db) -> miette::Result<()> {
198 info!("rebuilding block refcounts from events (ephemeral mode)");
199
200 for guard in db.events.iter() {
201 let v = guard.value().into_diagnostic()?;
202 let evt = rmp_serde::from_slice::<StoredEvent>(&v).into_diagnostic()?;
203 let Some(cid) = evt.cid else {
204 continue;
205 };
206 let cid_bytes = Slice::from(cid.to_bytes());
207 let mut entry = db.block_refcounts.entry_sync(cid_bytes).or_insert(0);
208 *entry += 1;
209 }
210
211 db.gc_ready.store(true, Ordering::SeqCst);
212 info!("ephemeral block refcounts ready");
213 Ok(())
214}
215
216pub fn ephemeral_ttl_worker(state: Arc<crate::state::AppState>) {
217 info!("ephemeral TTL worker started");
218 loop {
219 std::thread::sleep(Duration::from_secs(60));
220 if let Err(e) = ephemeral_ttl_tick(&state.db) {
221 error!(err = %e, "ephemeral TTL tick failed");
222 }
223 }
224}
225
226fn ephemeral_ttl_tick(db: &Db) -> miette::Result<()> {
227 let now = chrono::Utc::now().timestamp() as u64;
228 let cutoff_ts = now.saturating_sub(EVENT_TTL_SECS);
229
230 // write current watermark
231 let current_event_id = db.next_event_id.load(Ordering::SeqCst);
232 db.cursors
233 .insert(
234 keys::event_watermark_key(now),
235 current_event_id.to_be_bytes(),
236 )
237 .into_diagnostic()?;
238
239 // find the watermark entry closest to and <= cutoff_ts
240 let cutoff_key = keys::event_watermark_key(cutoff_ts);
241 let cutoff_event_id = db
242 .cursors
243 .range(..=cutoff_key.clone())
244 .next_back()
245 .map(|g| g.into_inner().into_diagnostic())
246 .transpose()?
247 .filter(|(k, _)| k.starts_with(keys::EVENT_WATERMARK_PREFIX))
248 .map(|(_, v)| {
249 v.as_ref()
250 .try_into()
251 .into_diagnostic()
252 .wrap_err("expected cutoff event id to be u64")
253 })
254 .transpose()?
255 .map(u64::from_be_bytes);
256
257 let Some(cutoff_event_id) = cutoff_event_id else {
258 // no watermark old enough yet, nothing to prune
259 return Ok(());
260 };
261
262 let cutoff_key_events = keys::event_key(cutoff_event_id);
263 let mut batch = db.inner.batch();
264 let mut pruned = 0usize;
265
266 for guard in db.events.range(..cutoff_key_events) {
267 let (k, v) = guard.into_inner().into_diagnostic()?;
268 let evt = rmp_serde::from_slice::<StoredEvent>(&v).into_diagnostic()?;
269 let Some(cid) = evt.cid else {
270 continue;
271 };
272 let cid_bytes = Slice::from(cid.to_bytes());
273 let mut entry = db.block_refcounts.entry_sync(cid_bytes).or_insert(0);
274 *entry -= 1;
275 batch.remove(&db.events, k);
276 pruned += 1;
277 }
278
279 // clean up consumed watermark entries (everything up to and including cutoff_ts)
280 for guard in db.cursors.range(..=cutoff_key) {
281 let k = guard.key().into_diagnostic()?;
282 if k.starts_with(keys::EVENT_WATERMARK_PREFIX) {
283 batch.remove(&db.cursors, k);
284 }
285 }
286
287 batch.commit().into_diagnostic()?;
288
289 if pruned > 0 {
290 info!(pruned, "pruned old events");
291 }
292
293 Ok(())
294}