at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall
at main 294 lines 9.0 kB view raw
1use fjall::Slice; 2use fjall::compaction::filter::{CompactionFilter, Context, Factory, ItemAccessor, Verdict}; 3use miette::{IntoDiagnostic, WrapErr}; 4use scc::HashMap; 5use std::sync::Arc; 6use std::sync::atomic::{AtomicBool, Ordering}; 7use std::time::Duration; 8use tracing::{error, info}; 9 10const EVENT_TTL_SECS: u64 = 3600; 11 12use crate::db::{Db, keys}; 13use crate::types::StoredEvent; 14 15pub struct BlocksGcFilterFactory { 16 pub gc_ready: Arc<AtomicBool>, 17 pub refcounts: Arc<HashMap<Slice, i64>>, 18} 19 20struct BlocksGcFilter { 21 gc_ready: Arc<AtomicBool>, 22 refcounts: Arc<HashMap<Slice, i64>>, 23} 24 25impl Factory for BlocksGcFilterFactory { 26 fn make_filter(&self, _ctx: &Context) -> Box<dyn CompactionFilter> { 27 Box::new(BlocksGcFilter { 28 gc_ready: self.gc_ready.clone(), 29 refcounts: self.refcounts.clone(), 30 }) 31 } 32 33 fn name(&self) -> &str { 34 "blocks_gc" 35 } 36} 37 38impl CompactionFilter for BlocksGcFilter { 39 fn filter_item(&mut self, item: ItemAccessor<'_>, _ctx: &Context) -> lsm_tree::Result<Verdict> { 40 if !self.gc_ready.load(Ordering::SeqCst) { 41 return Ok(Verdict::Keep); 42 } 43 44 let count = self 45 .refcounts 46 .read_sync(item.key().as_ref(), |_, v| *v) 47 .unwrap_or(0); 48 49 #[cfg(debug_assertions)] 50 if let Ok(cid) = cid::Cid::read_bytes(item.key().as_ref()) { 51 tracing::debug!(cid = %cid, count, "BlocksGcFilter checking block"); 52 } 53 54 Ok((count <= 0) 55 .then_some(Verdict::Destroy) 56 .unwrap_or(Verdict::Keep)) 57 } 58} 59 60pub fn startup_load_refcounts(db: &Db) -> miette::Result<()> { 61 let checkpoint_seq = db 62 .cursors 63 .get(keys::BLOCK_REFS_CHECKPOINT_SEQ_KEY) 64 .into_diagnostic()? 65 .map(|v| { 66 u64::from_be_bytes( 67 v.as_ref() 68 .try_into() 69 .expect("checkpoint seq should be 8 bytes"), 70 ) 71 }) 72 .unwrap_or(0); 73 74 // check if we need to run the one-time migration 75 let needs_migration = db 76 .counts 77 .get(keys::count_keyspace_key("gc_schema_version")) 78 .into_diagnostic()? 79 .is_none(); 80 81 if needs_migration { 82 migrate_build_refcounts(db)?; 83 } 84 85 // load snapshot 86 for guard in db.block_refs.iter() { 87 let (k, v) = guard.into_inner().into_diagnostic()?; 88 let count = i64::from_be_bytes( 89 v.as_ref() 90 .try_into() 91 .into_diagnostic() 92 .wrap_err("invalid block_refs count bytes")?, 93 ); 94 let _ = db.block_refcounts.insert_sync(k, count); 95 } 96 97 // replay WAL since checkpoint 98 let start_key = keys::reflog_key(checkpoint_seq.saturating_add(1)); 99 for guard in db.block_reflog.range(start_key..) { 100 let (_, v) = guard.into_inner().into_diagnostic()?; 101 let (cid, delta): (Vec<u8>, i8) = rmp_serde::from_slice(&v).into_diagnostic()?; 102 let cid = Slice::from(cid); 103 let mut entry = db.block_refcounts.entry_sync(cid).or_insert(0); 104 *entry += delta as i64; 105 } 106 107 db.gc_ready.store(true, Ordering::SeqCst); 108 info!("block refcounts loaded, gc ready"); 109 Ok(()) 110} 111 112fn migrate_build_refcounts(db: &Db) -> miette::Result<()> { 113 info!("building initial block refcounts from existing records (one-time migration)"); 114 let mut batch = db.inner.batch(); 115 116 // scan records 117 for guard in db.records.iter() { 118 let cid_bytes = guard.value().into_diagnostic()?; 119 let mut entry = db.block_refcounts.entry_sync(cid_bytes).or_insert(0); 120 *entry += 1i64; 121 } 122 123 // events with cids 124 for guard in db.events.iter() { 125 let v = guard.value().into_diagnostic()?; 126 let evt = rmp_serde::from_slice::<StoredEvent>(&v).into_diagnostic()?; 127 let Some(cid) = evt.cid else { 128 continue; 129 }; 130 let cid_bytes = Slice::from(cid.to_bytes()); 131 let mut entry = db.block_refcounts.entry_sync(cid_bytes).or_insert(0); 132 *entry += 1i64; 133 } 134 135 // persist as initial checkpoint 136 db.block_refcounts.iter_sync(|k, v| { 137 batch.insert(&db.block_refs, k.as_ref(), v.to_be_bytes()); 138 true 139 }); 140 141 let seq = db.next_reflog_seq.load(Ordering::SeqCst); 142 batch.insert( 143 &db.cursors, 144 keys::BLOCK_REFS_CHECKPOINT_SEQ_KEY, 145 seq.to_be_bytes(), 146 ); 147 // mark migration done 148 let one: u64 = 1; 149 batch.insert( 150 &db.counts, 151 keys::count_keyspace_key("gc_schema_version"), 152 one.to_be_bytes(), 153 ); 154 batch.commit().into_diagnostic()?; 155 156 info!("block refcount migration complete"); 157 Ok(()) 158} 159 160pub fn checkpoint_worker(state: Arc<crate::state::AppState>) { 161 info!("block refs checkpoint worker started"); 162 loop { 163 std::thread::sleep(Duration::from_secs(300)); 164 if let Err(e) = checkpoint(&state.db) { 165 error!(err = %e, "block refs checkpoint failed"); 166 } 167 } 168} 169 170fn checkpoint(db: &Db) -> miette::Result<()> { 171 let checkpoint_seq = db.next_reflog_seq.load(Ordering::SeqCst).saturating_sub(1); 172 173 let mut batch = db.inner.batch(); 174 175 db.block_refcounts.iter_sync(|k, v| { 176 batch.insert(&db.block_refs, k.as_ref(), v.to_be_bytes()); 177 true 178 }); 179 180 batch.insert( 181 &db.cursors, 182 keys::BLOCK_REFS_CHECKPOINT_SEQ_KEY, 183 checkpoint_seq.to_be_bytes(), 184 ); 185 186 // truncate reflog up to and including checkpoint_seq 187 for guard in db.block_reflog.range(..=keys::reflog_key(checkpoint_seq)) { 188 let k = guard.key().into_diagnostic()?; 189 batch.remove(&db.block_reflog, k); 190 } 191 192 batch.commit().into_diagnostic()?; 193 info!(seq = checkpoint_seq, "block refs checkpoint complete"); 194 Ok(()) 195} 196 197pub fn ephemeral_startup_load_refcounts(db: &Db) -> miette::Result<()> { 198 info!("rebuilding block refcounts from events (ephemeral mode)"); 199 200 for guard in db.events.iter() { 201 let v = guard.value().into_diagnostic()?; 202 let evt = rmp_serde::from_slice::<StoredEvent>(&v).into_diagnostic()?; 203 let Some(cid) = evt.cid else { 204 continue; 205 }; 206 let cid_bytes = Slice::from(cid.to_bytes()); 207 let mut entry = db.block_refcounts.entry_sync(cid_bytes).or_insert(0); 208 *entry += 1; 209 } 210 211 db.gc_ready.store(true, Ordering::SeqCst); 212 info!("ephemeral block refcounts ready"); 213 Ok(()) 214} 215 216pub fn ephemeral_ttl_worker(state: Arc<crate::state::AppState>) { 217 info!("ephemeral TTL worker started"); 218 loop { 219 std::thread::sleep(Duration::from_secs(60)); 220 if let Err(e) = ephemeral_ttl_tick(&state.db) { 221 error!(err = %e, "ephemeral TTL tick failed"); 222 } 223 } 224} 225 226fn ephemeral_ttl_tick(db: &Db) -> miette::Result<()> { 227 let now = chrono::Utc::now().timestamp() as u64; 228 let cutoff_ts = now.saturating_sub(EVENT_TTL_SECS); 229 230 // write current watermark 231 let current_event_id = db.next_event_id.load(Ordering::SeqCst); 232 db.cursors 233 .insert( 234 keys::event_watermark_key(now), 235 current_event_id.to_be_bytes(), 236 ) 237 .into_diagnostic()?; 238 239 // find the watermark entry closest to and <= cutoff_ts 240 let cutoff_key = keys::event_watermark_key(cutoff_ts); 241 let cutoff_event_id = db 242 .cursors 243 .range(..=cutoff_key.clone()) 244 .next_back() 245 .map(|g| g.into_inner().into_diagnostic()) 246 .transpose()? 247 .filter(|(k, _)| k.starts_with(keys::EVENT_WATERMARK_PREFIX)) 248 .map(|(_, v)| { 249 v.as_ref() 250 .try_into() 251 .into_diagnostic() 252 .wrap_err("expected cutoff event id to be u64") 253 }) 254 .transpose()? 255 .map(u64::from_be_bytes); 256 257 let Some(cutoff_event_id) = cutoff_event_id else { 258 // no watermark old enough yet, nothing to prune 259 return Ok(()); 260 }; 261 262 let cutoff_key_events = keys::event_key(cutoff_event_id); 263 let mut batch = db.inner.batch(); 264 let mut pruned = 0usize; 265 266 for guard in db.events.range(..cutoff_key_events) { 267 let (k, v) = guard.into_inner().into_diagnostic()?; 268 let evt = rmp_serde::from_slice::<StoredEvent>(&v).into_diagnostic()?; 269 let Some(cid) = evt.cid else { 270 continue; 271 }; 272 let cid_bytes = Slice::from(cid.to_bytes()); 273 let mut entry = db.block_refcounts.entry_sync(cid_bytes).or_insert(0); 274 *entry -= 1; 275 batch.remove(&db.events, k); 276 pruned += 1; 277 } 278 279 // clean up consumed watermark entries (everything up to and including cutoff_ts) 280 for guard in db.cursors.range(..=cutoff_key) { 281 let k = guard.key().into_diagnostic()?; 282 if k.starts_with(keys::EVENT_WATERMARK_PREFIX) { 283 batch.remove(&db.cursors, k); 284 } 285 } 286 287 batch.commit().into_diagnostic()?; 288 289 if pruned > 0 { 290 info!(pruned, "pruned old events"); 291 } 292 293 Ok(()) 294}