at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall

[all] implement garbage collection for blocks, expose /stream/ack

ptr.pet 41096cfe 0081e8af

verified
+929 -154
-5
.agent/rules/read-agents.md
··· 1 - --- 2 - trigger: always_on 3 - --- 4 - 5 - ALWAYS read AGENTS.md at the start of a conversation, and keep it in mind throught.
+1 -1
AGENTS.md
··· 117 117 <!-- gitnexus:start --> 118 118 # GitNexus — Code Intelligence 119 119 120 - This project is indexed by GitNexus as **hydrant** (599 symbols, 1624 relationships, 51 execution flows). Use the GitNexus MCP tools to understand code, assess impact, and navigate safely. 120 + This project is indexed by GitNexus as **hydrant** (655 symbols, 1810 relationships, 55 execution flows). Use the GitNexus MCP tools to understand code, assess impact, and navigate safely. 121 121 122 122 > If any GitNexus tool warns the index is stale, run `npx gitnexus analyze` in terminal first. 123 123
+3
Cargo.toml
··· 50 50 51 51 [dev-dependencies] 52 52 tempfile = "3.26.0" 53 + 54 + [profile.dev] 55 + opt-level = 2
+3
README.md
··· 131 131 - `GET /stream`: subscribe to the event stream. 132 132 - query parameters: 133 133 - `cursor` (optional): start streaming from a specific event ID. 134 + - `POST /stream/ack`: ack events. 135 + - body: 136 + - `ids`: list of event IDs to acknowledge. 134 137 135 138 ### stats 136 139
+81 -4
src/api/debug.rs
··· 1 1 use crate::api::AppState; 2 2 use crate::db::keys; 3 3 use crate::types::{RepoState, ResyncState, StoredEvent}; 4 + use axum::routing::{get, post}; 4 5 use axum::{ 5 6 Json, 6 7 extract::{Query, State}, ··· 26 27 27 28 pub fn router() -> axum::Router<Arc<AppState>> { 28 29 axum::Router::new() 29 - .route("/debug/count", axum::routing::get(handle_debug_count)) 30 - .route("/debug/get", axum::routing::get(handle_debug_get)) 31 - .route("/debug/iter", axum::routing::get(handle_debug_iter)) 30 + .route("/debug/count", get(handle_debug_count)) 31 + .route("/debug/get", get(handle_debug_get)) 32 + .route("/debug/iter", get(handle_debug_iter)) 33 + .route("/debug/refcount", get(handle_debug_refcount)) 34 + .route("/debug/refcount", post(handle_set_debug_refcount)) 35 + .route("/debug/compact", post(handle_debug_compact)) 32 36 } 33 37 34 38 pub async fn handle_debug_count( ··· 182 186 let end = parse_bound(req.end)?; 183 187 184 188 let items = tokio::task::spawn_blocking(move || { 185 - let limit = req.limit.unwrap_or(50).min(1000); 189 + let limit = req.limit.unwrap_or(50); 186 190 187 191 let collect = |iter: &mut dyn Iterator<Item = fjall::Guard>| { 188 192 let mut items = Vec::new(); ··· 196 200 u64::from_be_bytes(arr).to_string() 197 201 } else { 198 202 "invalid_u64".to_string() 203 + } 204 + } else if partition == "blocks" { 205 + match cid::Cid::read_bytes(k.as_ref()) { 206 + Ok(cid) => cid.to_string(), 207 + Err(_) => String::from_utf8_lossy(&k).into_owned(), 199 208 } 200 209 } else { 201 210 String::from_utf8_lossy(&k).into_owned() ··· 255 264 _ => Err(StatusCode::BAD_REQUEST), 256 265 } 257 266 } 267 + 268 + #[derive(Deserialize)] 269 + pub struct DebugCompactRequest { 270 + pub partition: String, 271 + } 272 + 273 + pub async fn handle_debug_compact( 274 + State(state): State<Arc<AppState>>, 275 + Query(req): Query<DebugCompactRequest>, 276 + ) -> Result<StatusCode, StatusCode> { 277 + let ks = get_keyspace_by_name(&state.db, &req.partition)?; 278 + let state_clone = state.clone(); 279 + 280 + tokio::task::spawn_blocking(move || { 281 + let _ = ks.remove(b"dummy_tombstone123"); 282 + let _ = state_clone.db.persist(); 283 + let _ = ks.rotate_memtable_and_wait(); 284 + ks.major_compact() 285 + }) 286 + .await 287 + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? 288 + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; 289 + 290 + Ok(StatusCode::OK) 291 + } 292 + 293 + #[derive(Deserialize)] 294 + pub struct DebugRefcountRequest { 295 + pub cid: String, 296 + } 297 + 298 + #[derive(Serialize)] 299 + pub struct DebugRefcountResponse { 300 + pub count: Option<i64>, 301 + } 302 + 303 + pub async fn handle_debug_refcount( 304 + State(state): State<Arc<AppState>>, 305 + Query(req): Query<DebugRefcountRequest>, 306 + ) -> Result<Json<DebugRefcountResponse>, StatusCode> { 307 + let cid = cid::Cid::from_str(&req.cid).map_err(|_| StatusCode::BAD_REQUEST)?; 308 + let cid_bytes = fjall::Slice::from(cid.to_bytes()); 309 + 310 + let count = state 311 + .db 312 + .block_refcounts 313 + .read_sync(cid_bytes.as_ref(), |_, v| *v); 314 + 315 + Ok(Json(DebugRefcountResponse { count })) 316 + } 317 + 318 + #[derive(Deserialize)] 319 + pub struct DebugSetRefcountRequest { 320 + pub cid: String, 321 + pub count: i64, 322 + } 323 + 324 + pub async fn handle_set_debug_refcount( 325 + State(state): State<Arc<AppState>>, 326 + axum::extract::Json(req): axum::extract::Json<DebugSetRefcountRequest>, 327 + ) -> Result<StatusCode, StatusCode> { 328 + let cid = cid::Cid::from_str(&req.cid).map_err(|_| StatusCode::BAD_REQUEST)?; 329 + let cid_bytes = fjall::Slice::from(cid.to_bytes()); 330 + 331 + let _ = state.db.block_refcounts.insert_sync(cid_bytes, req.count); 332 + 333 + Ok(StatusCode::OK) 334 + }
+8 -5
src/api/repos.rs
··· 14 14 use serde::{Deserialize, Serialize}; 15 15 16 16 use crate::api::AppState; 17 + use crate::db::refcount::RefcountedBatch; 17 18 use crate::db::{keys, ser_repo_state}; 18 19 use crate::types::{GaugeState, RepoState}; 19 20 ··· 28 29 #[derive(Deserialize, Debug)] 29 30 pub struct RepoRequest { 30 31 pub did: String, 31 - #[serde(skip_serializing_if = "Option::is_none", rename = "deleteData")] 32 + #[serde(skip_serializing_if = "Option::is_none")] 32 33 pub delete_data: Option<bool>, 33 34 } 34 35 ··· 276 277 let state_task = state.clone(); 277 278 let (deleted_count, gauge_decrements) = tokio::task::spawn_blocking(move || { 278 279 let db = &state_task.db; 279 - let mut batch = db.inner.batch(); 280 + let mut batch = RefcountedBatch::new(db); 280 281 let mut deleted_count = 0i64; 281 282 let mut gauge_decrements = Vec::new(); 282 283 ··· 306 307 } else if repo_state.tracked { 307 308 let mut repo_state = repo_state.into_static(); 308 309 repo_state.tracked = false; 309 - batch.insert( 310 + batch.batch_mut().insert( 310 311 &db.repos, 311 312 &did_key, 312 313 ser_repo_state(&repo_state).map_err(internal)?, 313 314 ); 314 - batch.remove(&db.pending, keys::pending_key(repo_state.index_id)); 315 - batch.remove(&db.resync, &did_key); 315 + batch 316 + .batch_mut() 317 + .remove(&db.pending, keys::pending_key(repo_state.index_id)); 318 + batch.batch_mut().remove(&db.resync, &did_key); 316 319 if old_gauge != GaugeState::Synced { 317 320 gauge_decrements.push(old_gauge); 318 321 }
+23 -10
src/api/stream.rs
··· 1 1 use crate::api::AppState; 2 2 use crate::db::keys; 3 + use crate::db::refcount::RefcountedBatch; 3 4 use crate::types::{BroadcastEvent, MarshallableEvt, RecordEvt, StoredEvent}; 4 5 use axum::Router; 5 6 use axum::http::StatusCode; 6 - use axum::routing::get; 7 + use axum::routing::{get, post}; 7 8 use axum::{ 8 9 extract::{ 9 10 Query, State, ··· 19 20 use tracing::{error, info_span, warn}; 20 21 21 22 pub fn router() -> Router<Arc<AppState>> { 22 - Router::new().route("/", get(handle_stream)) 23 - // .route("/ack", post(handle_ack)) 23 + Router::new() 24 + .route("/", get(handle_stream)) 25 + .route("/ack", post(handle_ack)) 24 26 } 25 27 26 - #[allow(dead_code)] 27 28 #[derive(Deserialize)] 28 29 pub struct AckBody { 29 30 pub ids: Vec<u64>, 30 31 } 31 32 32 - #[allow(dead_code)] 33 33 pub async fn handle_ack( 34 34 State(state): State<Arc<AppState>>, 35 35 axum::Json(body): axum::Json<AckBody>, ··· 41 41 let state = state.clone(); 42 42 let ids = body.ids; 43 43 tokio::task::spawn_blocking(move || { 44 - let mut batch = state.db.inner.batch(); 44 + let mut batch = RefcountedBatch::new(&state.db); 45 45 for &id in &ids { 46 - batch.remove(&state.db.events, keys::event_key(id)); 46 + let _entered = tracing::info_span!("ack", id).entered(); 47 + let key = keys::event_key(id); 48 + let Some(event_bytes) = state.db.events.get(&key).into_diagnostic()? else { 49 + tracing::warn!("event bytes not found"); 50 + continue; 51 + }; 52 + let evt = rmp_serde::from_slice::<StoredEvent>(&event_bytes).into_diagnostic()?; 53 + if let Some(cid) = evt.cid { 54 + tracing::debug!(cid = %cid, "acking event"); 55 + batch.update_block_refcount(fjall::Slice::from(cid.to_bytes()), -1)?; 56 + } else { 57 + tracing::debug!("acking event with NO cid"); 58 + } 59 + batch.batch_mut().remove(&state.db.events, key); 47 60 } 48 61 batch 49 62 .commit() 50 63 .into_diagnostic() 51 - .wrap_err("failed to delete events") 52 - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; 64 + .wrap_err("failed to delete events")?; 53 65 Ok(StatusCode::OK) 54 66 }) 55 67 .await 56 68 .into_diagnostic() 57 69 .wrap_err("panicked while deleting events") 58 - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))? 70 + .flatten() 71 + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string())) 59 72 } 60 73 61 74 #[derive(Deserialize)]
+82 -35
src/backfill/mod.rs
··· 22 22 use reqwest::StatusCode; 23 23 use smol_str::{SmolStr, ToSmolStr}; 24 24 use std::collections::HashMap; 25 + use std::str::FromStr; 25 26 use std::sync::Arc; 26 27 use std::sync::atomic::Ordering; 27 28 use std::time::{Duration, Instant}; ··· 39 40 http: reqwest::Client, 40 41 semaphore: Arc<Semaphore>, 41 42 verify_signatures: bool, 43 + ephemeral: bool, 42 44 in_flight: Arc<scc::HashSet<Did<'static>>>, 43 45 } 44 46 ··· 49 51 timeout: Duration, 50 52 concurrency_limit: usize, 51 53 verify_signatures: bool, 54 + ephemeral: bool, 52 55 ) -> Self { 53 56 Self { 54 57 state, ··· 62 65 .expect("failed to build http client"), 63 66 semaphore: Arc::new(Semaphore::new(concurrency_limit)), 64 67 verify_signatures, 68 + ephemeral, 65 69 in_flight: Arc::new(scc::HashSet::new()), 66 70 } 67 71 } ··· 123 127 let did = did.clone(); 124 128 let buffer_tx = self.buffer_tx.clone(); 125 129 let verify = self.verify_signatures; 130 + let ephemeral = self.ephemeral; 126 131 127 132 tokio::spawn(async move { 128 133 let _guard = guard; 129 - let res = did_task(&state, http, buffer_tx, &did, key, permit, verify).await; 134 + let res = did_task( 135 + &state, http, buffer_tx, &did, key, permit, verify, ephemeral, 136 + ) 137 + .await; 130 138 131 139 if let Err(e) = res { 132 140 error!(did = %did, err = %e, "backfill process failed"); ··· 161 169 pending_key: Slice, 162 170 _permit: tokio::sync::OwnedSemaphorePermit, 163 171 verify_signatures: bool, 172 + ephemeral: bool, 164 173 ) -> Result<(), BackfillError> { 165 174 let db = &state.db; 166 175 167 - match process_did(&state, &http, &did, verify_signatures).await { 176 + match process_did(&state, &http, &did, verify_signatures, ephemeral).await { 168 177 Ok(Some(repo_state)) => { 169 178 let did_key = keys::repo_key(&did); 170 179 ··· 367 376 http: &reqwest::Client, 368 377 did: &Did<'static>, 369 378 verify_signatures: bool, 379 + ephemeral: bool, 370 380 ) -> Result<Option<RepoState<'static>>, BackfillError> { 371 381 debug!(did = %did, "backfilling"); 372 382 ··· 426 436 Err(XrpcError::Xrpc(e)) => { 427 437 if matches!(e, GetRepoError::RepoNotFound(_)) { 428 438 warn!(did = %did, "repo not found, deleting"); 429 - let mut batch = db.inner.batch(); 439 + let mut batch = db::refcount::RefcountedBatch::new(db); 430 440 if let Err(e) = crate::ops::delete_repo(&mut batch, db, did, &state) { 431 441 tracing::error!(err = %e, "failed to wipe repo during backfill"); 432 442 } ··· 539 549 let mut delta = 0; 540 550 let mut added_blocks = 0; 541 551 let mut collection_counts: HashMap<SmolStr, u64> = HashMap::new(); 542 - let mut batch = app_state.db.inner.batch(); 552 + let mut batch = db::refcount::RefcountedBatch::new(&app_state.db); 543 553 let store = mst.storage(); 544 554 545 555 let prefix = keys::record_prefix_did(&did); 546 556 let mut existing_cids: HashMap<(SmolStr, DbRkey), SmolStr> = HashMap::new(); 547 557 548 - for guard in app_state.db.records.prefix(&prefix) { 549 - let (key, cid_bytes) = guard.into_inner().into_diagnostic()?; 550 - // key is did|collection|rkey 551 - // skip did| 552 - let mut remaining = key[prefix.len()..].splitn(2, |b| keys::SEP.eq(b)); 553 - let collection_raw = remaining 554 - .next() 555 - .ok_or_else(|| miette::miette!("invalid record key format: {key:?}"))?; 556 - let rkey_raw = remaining 557 - .next() 558 - .ok_or_else(|| miette::miette!("invalid record key format: {key:?}"))?; 558 + if !ephemeral { 559 + for guard in app_state.db.records.prefix(&prefix) { 560 + let (key, cid_bytes) = guard.into_inner().into_diagnostic()?; 561 + // key is did|collection|rkey 562 + // skip did| 563 + let mut remaining = key[prefix.len()..].splitn(2, |b| keys::SEP.eq(b)); 564 + let collection_raw = remaining 565 + .next() 566 + .ok_or_else(|| miette::miette!("invalid record key format: {key:?}"))?; 567 + let rkey_raw = remaining 568 + .next() 569 + .ok_or_else(|| miette::miette!("invalid record key format: {key:?}"))?; 559 570 560 - let collection = std::str::from_utf8(collection_raw) 561 - .map_err(|e| miette::miette!("invalid collection utf8: {e}"))?; 571 + let collection = std::str::from_utf8(collection_raw) 572 + .map_err(|e| miette::miette!("invalid collection utf8: {e}"))?; 562 573 563 - let rkey = keys::parse_rkey(rkey_raw) 564 - .map_err(|e| miette::miette!("invalid rkey '{key:?}' for {did}: {e}"))?; 574 + let rkey = keys::parse_rkey(rkey_raw) 575 + .map_err(|e| miette::miette!("invalid rkey '{key:?}' for {did}: {e}"))?; 565 576 566 - let cid = cid::Cid::read_bytes(cid_bytes.as_ref()) 567 - .map_err(|e| miette::miette!("invalid cid '{cid_bytes:?}' for {did}: {e}"))? 568 - .to_smolstr(); 577 + let cid = cid::Cid::read_bytes(cid_bytes.as_ref()) 578 + .map_err(|e| miette::miette!("invalid cid '{cid_bytes:?}' for {did}: {e}"))? 579 + .to_smolstr(); 569 580 570 - existing_cids.insert((collection.into(), rkey), cid); 581 + existing_cids.insert((collection.into(), rkey), cid); 582 + } 571 583 } 572 584 573 585 let mut signal_seen = filter.mode == FilterMode::Full || filter.signals.is_empty(); ··· 602 614 let cid_obj = Cid::ipld(cid); 603 615 604 616 // check if this record already exists with same CID 605 - let (action, is_new) = if let Some(existing_cid) = existing_cids.remove(&path) { 617 + let existing_cid = existing_cids.remove(&path); 618 + let action = if let Some(existing_cid) = &existing_cid { 606 619 if existing_cid == cid_obj.as_str() { 607 620 trace!(did = %did, collection = %collection, rkey = %rkey, cid = %cid, "skip unchanged record"); 608 621 continue; // skip unchanged record 609 622 } 610 - (DbAction::Update, false) 623 + DbAction::Update 611 624 } else { 612 - (DbAction::Create, true) 625 + DbAction::Create 613 626 }; 614 627 trace!(did = %did, collection = %collection, rkey = %rkey, cid = %cid, ?action, "action record"); 615 628 616 629 // key is did|collection|rkey 617 630 let db_key = keys::record_key(&did, collection, &rkey); 618 631 619 - batch.insert(&app_state.db.blocks, cid.to_bytes(), val.as_ref()); 620 - batch.insert(&app_state.db.records, db_key, cid.to_bytes()); 632 + batch.batch_mut().insert(&app_state.db.blocks, cid.to_bytes(), val.as_ref()); 633 + if !ephemeral { 634 + batch.batch_mut().insert(&app_state.db.records, db_key, cid.to_bytes()); 635 + } 621 636 622 637 added_blocks += 1; 623 - if is_new { 638 + if action == DbAction::Create { 624 639 delta += 1; 625 640 *collection_counts.entry(path.0.clone()).or_default() += 1; 626 641 } ··· 636 651 cid: Some(cid_obj.to_ipld().expect("valid cid")), 637 652 }; 638 653 let bytes = rmp_serde::to_vec(&evt).into_diagnostic()?; 639 - batch.insert(&app_state.db.events, keys::event_key(event_id), bytes); 654 + batch.batch_mut().insert(&app_state.db.events, keys::event_key(event_id), bytes); 655 + 656 + // update block refcount 657 + let cid_bytes = Slice::from(cid.to_bytes()); 658 + // if ephemeral, we only care about events, so its 1 659 + // if not, then its 2 since we also insert to records 660 + batch.update_block_refcount(cid_bytes, ephemeral.then_some(1).unwrap_or(2))?; 661 + // for Update, also decrement old CID refcount 662 + // event will still be there, so we only decrement for records 663 + // which means only if not ephemeral 664 + if !ephemeral && action == DbAction::Update { 665 + let existing_cid = existing_cid.expect("that cid exists since this is Update"); 666 + if existing_cid != cid_obj.as_str() { 667 + let old_cid_bytes = Slice::from( 668 + cid::Cid::from_str(&existing_cid) 669 + .expect("valid cid from existing_cids") 670 + .to_bytes() 671 + ); 672 + batch.update_block_refcount(old_cid_bytes, -1)?; 673 + } 674 + } 640 675 641 676 count += 1; 642 677 } ··· 646 681 for ((collection, rkey), cid) in existing_cids { 647 682 trace!(did = %did, collection = %collection, rkey = %rkey, cid = %cid, "remove existing record"); 648 683 649 - batch.remove( 684 + // we dont have to put if ephemeral around here since 685 + // existing_cids will be empty anyyway 686 + batch.batch_mut().remove( 650 687 &app_state.db.records, 651 688 keys::record_key(&did, &collection, &rkey), 652 689 ); 653 690 691 + // decrement block refcount 692 + let cid_bytes = Slice::from( 693 + cid::Cid::from_str(&cid) 694 + .expect("valid cid from existing_cids") 695 + .to_bytes() 696 + ); 697 + batch.update_block_refcount(cid_bytes, -1)?; 698 + 654 699 let event_id = app_state.db.next_event_id.fetch_add(1, Ordering::SeqCst); 655 700 let evt = StoredEvent { 656 701 live: false, ··· 662 707 cid: None, 663 708 }; 664 709 let bytes = rmp_serde::to_vec(&evt).into_diagnostic()?; 665 - batch.insert(&app_state.db.events, keys::event_key(event_id), bytes); 710 + batch.batch_mut().insert(&app_state.db.events, keys::event_key(event_id), bytes); 666 711 667 712 delta -= 1; 668 713 count += 1; ··· 679 724 state.data = Some(root_commit.data); 680 725 state.last_updated_at = chrono::Utc::now().timestamp(); 681 726 682 - batch.insert( 727 + batch.batch_mut().insert( 683 728 &app_state.db.repos, 684 729 keys::repo_key(&did), 685 730 ser_repo_state(&state)?, 686 731 ); 687 732 688 733 // add the counts 689 - for (col, cnt) in collection_counts { 690 - db::set_record_count(&mut batch, &app_state.db, &did, &col, cnt); 734 + if !ephemeral { 735 + for (col, cnt) in collection_counts { 736 + db::set_record_count(batch.batch_mut(), &app_state.db, &did, &col, cnt); 737 + } 691 738 } 692 739 693 740 batch.commit().into_diagnostic()?;
-25
src/db/compaction.rs
··· 2 2 use lsm_tree::compaction::{CompactionFilter, Factory}; 3 3 use lsm_tree::compaction::{ItemAccessor, Verdict}; 4 4 5 - mod drop_all { 6 - use super::*; 7 - 8 - pub struct DropAllFilter; 9 - 10 - impl CompactionFilter for DropAllFilter { 11 - fn filter_item(&mut self, _: ItemAccessor<'_>, _: &Context) -> lsm_tree::Result<Verdict> { 12 - Ok(Verdict::Destroy) 13 - } 14 - } 15 - 16 - pub struct DropAllFilterFactory; 17 - 18 - impl Factory for DropAllFilterFactory { 19 - fn name(&self) -> &str { 20 - "drop_all" 21 - } 22 - 23 - fn make_filter(&self, _: &Context) -> Box<dyn CompactionFilter> { 24 - Box::new(DropAllFilter) 25 - } 26 - } 27 - } 28 - pub use drop_all::*; 29 - 30 5 mod drop_prefix { 31 6 use super::*; 32 7
+294
src/db/gc.rs
··· 1 + use fjall::Slice; 2 + use fjall::compaction::filter::{CompactionFilter, Context, Factory, ItemAccessor, Verdict}; 3 + use miette::{IntoDiagnostic, WrapErr}; 4 + use scc::HashMap; 5 + use std::sync::Arc; 6 + use std::sync::atomic::{AtomicBool, Ordering}; 7 + use std::time::Duration; 8 + use tracing::{error, info}; 9 + 10 + const EVENT_TTL_SECS: u64 = 3600; 11 + 12 + use crate::db::{Db, keys}; 13 + use crate::types::StoredEvent; 14 + 15 + pub struct BlocksGcFilterFactory { 16 + pub gc_ready: Arc<AtomicBool>, 17 + pub refcounts: Arc<HashMap<Slice, i64>>, 18 + } 19 + 20 + struct BlocksGcFilter { 21 + gc_ready: Arc<AtomicBool>, 22 + refcounts: Arc<HashMap<Slice, i64>>, 23 + } 24 + 25 + impl Factory for BlocksGcFilterFactory { 26 + fn make_filter(&self, _ctx: &Context) -> Box<dyn CompactionFilter> { 27 + Box::new(BlocksGcFilter { 28 + gc_ready: self.gc_ready.clone(), 29 + refcounts: self.refcounts.clone(), 30 + }) 31 + } 32 + 33 + fn name(&self) -> &str { 34 + "blocks_gc" 35 + } 36 + } 37 + 38 + impl CompactionFilter for BlocksGcFilter { 39 + fn filter_item(&mut self, item: ItemAccessor<'_>, _ctx: &Context) -> lsm_tree::Result<Verdict> { 40 + if !self.gc_ready.load(Ordering::SeqCst) { 41 + return Ok(Verdict::Keep); 42 + } 43 + 44 + let count = self 45 + .refcounts 46 + .read_sync(item.key().as_ref(), |_, v| *v) 47 + .unwrap_or(0); 48 + 49 + #[cfg(debug_assertions)] 50 + if let Ok(cid) = cid::Cid::read_bytes(item.key().as_ref()) { 51 + tracing::debug!(cid = %cid, count, "BlocksGcFilter checking block"); 52 + } 53 + 54 + Ok((count <= 0) 55 + .then_some(Verdict::Destroy) 56 + .unwrap_or(Verdict::Keep)) 57 + } 58 + } 59 + 60 + pub fn startup_load_refcounts(db: &Db) -> miette::Result<()> { 61 + let checkpoint_seq = db 62 + .cursors 63 + .get(keys::BLOCK_REFS_CHECKPOINT_SEQ_KEY) 64 + .into_diagnostic()? 65 + .map(|v| { 66 + u64::from_be_bytes( 67 + v.as_ref() 68 + .try_into() 69 + .expect("checkpoint seq should be 8 bytes"), 70 + ) 71 + }) 72 + .unwrap_or(0); 73 + 74 + // check if we need to run the one-time migration 75 + let needs_migration = db 76 + .counts 77 + .get(keys::count_keyspace_key("gc_schema_version")) 78 + .into_diagnostic()? 79 + .is_none(); 80 + 81 + if needs_migration { 82 + migrate_build_refcounts(db)?; 83 + } 84 + 85 + // load snapshot 86 + for guard in db.block_refs.iter() { 87 + let (k, v) = guard.into_inner().into_diagnostic()?; 88 + let count = i64::from_be_bytes( 89 + v.as_ref() 90 + .try_into() 91 + .into_diagnostic() 92 + .wrap_err("invalid block_refs count bytes")?, 93 + ); 94 + let _ = db.block_refcounts.insert_sync(k, count); 95 + } 96 + 97 + // replay WAL since checkpoint 98 + let start_key = keys::reflog_key(checkpoint_seq.saturating_add(1)); 99 + for guard in db.block_reflog.range(start_key..) { 100 + let (_, v) = guard.into_inner().into_diagnostic()?; 101 + let (cid, delta): (Vec<u8>, i8) = rmp_serde::from_slice(&v).into_diagnostic()?; 102 + let cid = Slice::from(cid); 103 + let mut entry = db.block_refcounts.entry_sync(cid).or_insert(0); 104 + *entry += delta as i64; 105 + } 106 + 107 + db.gc_ready.store(true, Ordering::SeqCst); 108 + info!("block refcounts loaded, gc ready"); 109 + Ok(()) 110 + } 111 + 112 + fn migrate_build_refcounts(db: &Db) -> miette::Result<()> { 113 + info!("building initial block refcounts from existing records (one-time migration)"); 114 + let mut batch = db.inner.batch(); 115 + 116 + // scan records 117 + for guard in db.records.iter() { 118 + let cid_bytes = guard.value().into_diagnostic()?; 119 + let mut entry = db.block_refcounts.entry_sync(cid_bytes).or_insert(0); 120 + *entry += 1i64; 121 + } 122 + 123 + // events with cids 124 + for guard in db.events.iter() { 125 + let v = guard.value().into_diagnostic()?; 126 + let evt = rmp_serde::from_slice::<StoredEvent>(&v).into_diagnostic()?; 127 + let Some(cid) = evt.cid else { 128 + continue; 129 + }; 130 + let cid_bytes = Slice::from(cid.to_bytes()); 131 + let mut entry = db.block_refcounts.entry_sync(cid_bytes).or_insert(0); 132 + *entry += 1i64; 133 + } 134 + 135 + // persist as initial checkpoint 136 + db.block_refcounts.iter_sync(|k, v| { 137 + batch.insert(&db.block_refs, k.as_ref(), v.to_be_bytes()); 138 + true 139 + }); 140 + 141 + let seq = db.next_reflog_seq.load(Ordering::SeqCst); 142 + batch.insert( 143 + &db.cursors, 144 + keys::BLOCK_REFS_CHECKPOINT_SEQ_KEY, 145 + seq.to_be_bytes(), 146 + ); 147 + // mark migration done 148 + let one: u64 = 1; 149 + batch.insert( 150 + &db.counts, 151 + keys::count_keyspace_key("gc_schema_version"), 152 + one.to_be_bytes(), 153 + ); 154 + batch.commit().into_diagnostic()?; 155 + 156 + info!("block refcount migration complete"); 157 + Ok(()) 158 + } 159 + 160 + pub fn checkpoint_worker(state: Arc<crate::state::AppState>) { 161 + info!("block refs checkpoint worker started"); 162 + loop { 163 + std::thread::sleep(Duration::from_secs(300)); 164 + if let Err(e) = checkpoint(&state.db) { 165 + error!(err = %e, "block refs checkpoint failed"); 166 + } 167 + } 168 + } 169 + 170 + fn checkpoint(db: &Db) -> miette::Result<()> { 171 + let checkpoint_seq = db.next_reflog_seq.load(Ordering::SeqCst).saturating_sub(1); 172 + 173 + let mut batch = db.inner.batch(); 174 + 175 + db.block_refcounts.iter_sync(|k, v| { 176 + batch.insert(&db.block_refs, k.as_ref(), v.to_be_bytes()); 177 + true 178 + }); 179 + 180 + batch.insert( 181 + &db.cursors, 182 + keys::BLOCK_REFS_CHECKPOINT_SEQ_KEY, 183 + checkpoint_seq.to_be_bytes(), 184 + ); 185 + 186 + // truncate reflog up to and including checkpoint_seq 187 + for guard in db.block_reflog.range(..=keys::reflog_key(checkpoint_seq)) { 188 + let k = guard.key().into_diagnostic()?; 189 + batch.remove(&db.block_reflog, k); 190 + } 191 + 192 + batch.commit().into_diagnostic()?; 193 + info!(seq = checkpoint_seq, "block refs checkpoint complete"); 194 + Ok(()) 195 + } 196 + 197 + pub fn ephemeral_startup_load_refcounts(db: &Db) -> miette::Result<()> { 198 + info!("rebuilding block refcounts from events (ephemeral mode)"); 199 + 200 + for guard in db.events.iter() { 201 + let v = guard.value().into_diagnostic()?; 202 + let evt = rmp_serde::from_slice::<StoredEvent>(&v).into_diagnostic()?; 203 + let Some(cid) = evt.cid else { 204 + continue; 205 + }; 206 + let cid_bytes = Slice::from(cid.to_bytes()); 207 + let mut entry = db.block_refcounts.entry_sync(cid_bytes).or_insert(0); 208 + *entry += 1; 209 + } 210 + 211 + db.gc_ready.store(true, Ordering::SeqCst); 212 + info!("ephemeral block refcounts ready"); 213 + Ok(()) 214 + } 215 + 216 + pub fn ephemeral_ttl_worker(state: Arc<crate::state::AppState>) { 217 + info!("ephemeral TTL worker started"); 218 + loop { 219 + std::thread::sleep(Duration::from_secs(60)); 220 + if let Err(e) = ephemeral_ttl_tick(&state.db) { 221 + error!(err = %e, "ephemeral TTL tick failed"); 222 + } 223 + } 224 + } 225 + 226 + fn ephemeral_ttl_tick(db: &Db) -> miette::Result<()> { 227 + let now = chrono::Utc::now().timestamp() as u64; 228 + let cutoff_ts = now.saturating_sub(EVENT_TTL_SECS); 229 + 230 + // write current watermark 231 + let current_event_id = db.next_event_id.load(Ordering::SeqCst); 232 + db.cursors 233 + .insert( 234 + keys::event_watermark_key(now), 235 + current_event_id.to_be_bytes(), 236 + ) 237 + .into_diagnostic()?; 238 + 239 + // find the watermark entry closest to and <= cutoff_ts 240 + let cutoff_key = keys::event_watermark_key(cutoff_ts); 241 + let cutoff_event_id = db 242 + .cursors 243 + .range(..=cutoff_key.clone()) 244 + .next_back() 245 + .map(|g| g.into_inner().into_diagnostic()) 246 + .transpose()? 247 + .filter(|(k, _)| k.starts_with(keys::EVENT_WATERMARK_PREFIX)) 248 + .map(|(_, v)| { 249 + v.as_ref() 250 + .try_into() 251 + .into_diagnostic() 252 + .wrap_err("expected cutoff event id to be u64") 253 + }) 254 + .transpose()? 255 + .map(u64::from_be_bytes); 256 + 257 + let Some(cutoff_event_id) = cutoff_event_id else { 258 + // no watermark old enough yet, nothing to prune 259 + return Ok(()); 260 + }; 261 + 262 + let cutoff_key_events = keys::event_key(cutoff_event_id); 263 + let mut batch = db.inner.batch(); 264 + let mut pruned = 0usize; 265 + 266 + for guard in db.events.range(..cutoff_key_events) { 267 + let (k, v) = guard.into_inner().into_diagnostic()?; 268 + let evt = rmp_serde::from_slice::<StoredEvent>(&v).into_diagnostic()?; 269 + let Some(cid) = evt.cid else { 270 + continue; 271 + }; 272 + let cid_bytes = Slice::from(cid.to_bytes()); 273 + let mut entry = db.block_refcounts.entry_sync(cid_bytes).or_insert(0); 274 + *entry -= 1; 275 + batch.remove(&db.events, k); 276 + pruned += 1; 277 + } 278 + 279 + // clean up consumed watermark entries (everything up to and including cutoff_ts) 280 + for guard in db.cursors.range(..=cutoff_key) { 281 + let k = guard.key().into_diagnostic()?; 282 + if k.starts_with(keys::EVENT_WATERMARK_PREFIX) { 283 + batch.remove(&db.cursors, k); 284 + } 285 + } 286 + 287 + batch.commit().into_diagnostic()?; 288 + 289 + if pruned > 0 { 290 + info!(pruned, "pruned old events"); 291 + } 292 + 293 + Ok(()) 294 + }
+16 -1
src/db/keys.rs
··· 8 8 9 9 pub const CURSOR_KEY: &[u8] = b"firehose_cursor"; 10 10 11 - // Key format: {DID} 11 + pub const BLOCK_REFS_CHECKPOINT_SEQ_KEY: &[u8] = b"block_refs_checkpoint_seq"; 12 + 13 + pub const EVENT_WATERMARK_PREFIX: &[u8] = b"ewm|"; 14 + 15 + // key format: {DID} 12 16 pub fn repo_key<'a>(did: &'a Did) -> Vec<u8> { 13 17 let mut vec = Vec::with_capacity(32); 14 18 TrimmedDid::from(did).write_to_vec(&mut vec); ··· 17 21 18 22 pub fn pending_key(id: u64) -> [u8; 8] { 19 23 id.to_be_bytes() 24 + } 25 + 26 + pub fn reflog_key(seq: u64) -> [u8; 8] { 27 + seq.to_be_bytes() 28 + } 29 + 30 + pub fn event_watermark_key(timestamp_secs: u64) -> Vec<u8> { 31 + let mut key = Vec::with_capacity(EVENT_WATERMARK_PREFIX.len() + 8); 32 + key.extend_from_slice(EVENT_WATERMARK_PREFIX); 33 + key.extend_from_slice(&timestamp_secs.to_be_bytes()); 34 + key 20 35 } 21 36 22 37 // prefix format: {DID}| (DID trimmed)
+65 -18
src/db/mod.rs
··· 1 - use crate::db::compaction::{DropAllFilterFactory, DropPrefixFilterFactory}; 1 + use crate::db::compaction::DropPrefixFilterFactory; 2 2 use crate::types::{BroadcastEvent, RepoState}; 3 - use fjall::compaction::{Fifo, Levelled}; 3 + 4 4 use fjall::config::BlockSizePolicy; 5 5 use fjall::{Database, Keyspace, KeyspaceCreateOptions, OwnedWriteBatch, PersistMode, Slice}; 6 6 use jacquard_common::IntoStatic; ··· 11 11 use smol_str::SmolStr; 12 12 13 13 use std::sync::Arc; 14 + use std::sync::atomic::{AtomicBool, AtomicU64}; 14 15 15 16 pub mod compaction; 16 17 pub mod filter; 18 + pub mod gc; 17 19 pub mod keys; 20 + pub mod refcount; 18 21 pub mod types; 19 22 20 - use std::sync::atomic::AtomicU64; 21 23 use tokio::sync::broadcast; 22 24 use tracing::error; 23 25 ··· 38 40 pub counts: Keyspace, 39 41 pub filter: Keyspace, 40 42 pub crawler: Keyspace, 43 + pub block_refs: Keyspace, 44 + pub block_reflog: Keyspace, 45 + pub block_refcounts: Arc<HashMap<Slice, i64>>, 46 + pub gc_ready: Arc<AtomicBool>, 47 + pub next_reflog_seq: Arc<AtomicU64>, 41 48 pub event_tx: broadcast::Sender<BroadcastEvent>, 42 49 pub next_event_id: Arc<AtomicU64>, 43 50 pub counts_map: HashMap<SmolStr, u64>, ··· 98 105 v * 1024 * 1024 99 106 } 100 107 108 + let gc_ready = Arc::new(AtomicBool::new(false)); 109 + let block_refcounts: Arc<HashMap<Slice, i64>> = Arc::new(HashMap::new()); 110 + 111 + let gc_ready_factory = gc_ready.clone(); 112 + let refcounts_factory = block_refcounts.clone(); 113 + 101 114 let db = Database::builder(&cfg.database_path) 102 115 .cache_size(cfg.cache_size * 2_u64.pow(20) / 2) 103 116 .manual_journal_persist(true) ··· 110 123 .max_journaling_size(mb(cfg.db_max_journaling_size_mb)) 111 124 .with_compaction_filter_factories({ 112 125 let ephemeral = cfg.ephemeral; 113 - let f = move |ks: &str| match ks { 114 - "records" => { 115 - ephemeral.then(|| -> Arc<dyn Factory> { Arc::new(DropAllFilterFactory) }) 126 + let f = move |ks: &str| { 127 + tracing::info!("with_compaction_filter_factories queried for keyspace: {ks}",); 128 + match ks { 129 + "counts" => ephemeral.then(|| -> Arc<dyn Factory> { 130 + Arc::new(DropPrefixFilterFactory { 131 + prefix: keys::COUNT_COLLECTION_PREFIX, 132 + }) 133 + }), 134 + "blocks" => Some(Arc::new(gc::BlocksGcFilterFactory { 135 + gc_ready: gc_ready_factory.clone(), 136 + refcounts: refcounts_factory.clone(), 137 + }) as Arc<dyn Factory>), 138 + _ => None, 116 139 } 117 - "counts" => ephemeral.then(|| -> Arc<dyn Factory> { 118 - Arc::new(DropPrefixFilterFactory { 119 - prefix: keys::COUNT_COLLECTION_PREFIX, 120 - }) 121 - }), 122 - _ => None, 123 140 }; 124 141 Arc::new(f) 125 142 }) ··· 197 214 // only iterators are used here, no point reads 198 215 .expect_point_read_hits(true) 199 216 .max_memtable_size(mb(cfg.db_events_memtable_size_mb)) 200 - .data_block_size_policy(BlockSizePolicy::new([kb(16), kb(32)])) 201 - .compaction_strategy(if cfg.ephemeral { 202 - Arc::new(Fifo::new(mb(512), Some(60 * 60))) 203 - } else { 204 - Arc::new(Levelled::default()) 205 - }), 217 + .data_block_size_policy(BlockSizePolicy::new([kb(16), kb(32)])), 206 218 )?; 207 219 let counts = open_ks( 208 220 "counts", ··· 231 243 .data_block_size_policy(BlockSizePolicy::all(kb(1))), 232 244 )?; 233 245 246 + let block_refs = open_ks( 247 + "block_refs", 248 + opts() 249 + // only ever iterated on 250 + .expect_point_read_hits(true) 251 + .max_memtable_size(mb(cfg.db_records_memtable_size_mb / 2)) 252 + .data_block_size_policy(BlockSizePolicy::all(kb(64))), 253 + )?; 254 + 255 + let block_reflog = open_ks( 256 + "block_reflog", 257 + opts() 258 + // only ever iterated on 259 + .expect_point_read_hits(true) 260 + .max_memtable_size(mb(4)) 261 + .data_block_size_policy(BlockSizePolicy::all(kb(64))), 262 + )?; 263 + 234 264 let mut last_id = 0; 235 265 if let Some(guard) = events.iter().next_back() { 236 266 let k = guard.key().into_diagnostic()?; ··· 241 271 .wrap_err("expected to be id (8 bytes)")?, 242 272 ); 243 273 } 274 + 275 + let mut last_reflog_seq = 0u64; 276 + if let Some(guard) = block_reflog.iter().next_back() { 277 + let k = guard.key().into_diagnostic()?; 278 + last_reflog_seq = u64::from_be_bytes( 279 + k.as_ref() 280 + .try_into() 281 + .into_diagnostic() 282 + .wrap_err("expected to be reflog seq (8 bytes)")?, 283 + ); 284 + } 285 + let next_reflog_seq = Arc::new(AtomicU64::new(last_reflog_seq.saturating_add(1))); 244 286 245 287 // load counts into memory 246 288 let counts_map = HashMap::new(); ··· 284 326 counts, 285 327 filter, 286 328 crawler, 329 + block_refs, 330 + block_reflog, 331 + block_refcounts, 332 + gc_ready, 333 + next_reflog_seq, 287 334 event_tx, 288 335 counts_map, 289 336 next_event_id: Arc::new(AtomicU64::new(last_id + 1)),
+61
src/db/refcount.rs
··· 1 + use crate::db::{Db, keys}; 2 + 3 + use fjall::{OwnedWriteBatch, Slice}; 4 + use miette::{IntoDiagnostic, Result}; 5 + use std::sync::atomic::Ordering; 6 + 7 + /// a write batch that tracks block refcount deltas and applies them 8 + /// automatically on commit. this prevents the bug class where callers 9 + /// forget to call `apply_block_refcount_deltas` after committing. 10 + pub struct RefcountedBatch<'db> { 11 + batch: OwnedWriteBatch, 12 + db: &'db Db, 13 + pending_deltas: Vec<(Slice, i8)>, 14 + } 15 + 16 + impl<'db> RefcountedBatch<'db> { 17 + pub fn new(db: &'db Db) -> Self { 18 + Self { 19 + batch: db.inner.batch(), 20 + db, 21 + pending_deltas: Vec::new(), 22 + } 23 + } 24 + 25 + /// records a refcount delta for the given CID. writes to the reflog 26 + /// in the batch and accumulates the delta for in-memory application on commit. 27 + pub fn update_block_refcount(&mut self, cid_bytes: Slice, delta: i8) -> Result<()> { 28 + #[cfg(debug_assertions)] 29 + if let Ok(cid) = cid::Cid::read_bytes(cid_bytes.as_ref()) { 30 + tracing::debug!(delta, %cid, "update_block_refcount"); 31 + } 32 + 33 + let value = rmp_serde::to_vec(&(cid_bytes.as_ref(), delta)).into_diagnostic()?; 34 + let seq = self.db.next_reflog_seq.fetch_add(1, Ordering::SeqCst); 35 + self.batch 36 + .insert(&self.db.block_reflog, keys::reflog_key(seq), value); 37 + self.pending_deltas.push((cid_bytes, delta)); 38 + Ok(()) 39 + } 40 + 41 + /// commits the batch and applies all accumulated refcount deltas to the in-memory map. 42 + pub fn commit(self) -> Result<(), fjall::Error> { 43 + self.batch.commit()?; 44 + apply_deltas(self.db, &self.pending_deltas); 45 + Ok(()) 46 + } 47 + 48 + pub fn batch_mut(&mut self) -> &mut OwnedWriteBatch { 49 + &mut self.batch 50 + } 51 + } 52 + 53 + fn apply_deltas(db: &Db, deltas: &[(Slice, i8)]) { 54 + for (cid_bytes, delta) in deltas { 55 + let mut entry = db 56 + .block_refcounts 57 + .entry_sync(cid_bytes.clone()) 58 + .or_insert(0); 59 + *entry += *delta as i64; 60 + } 61 + }
+37 -17
src/ingest/worker.rs
··· 7 7 use crate::state::AppState; 8 8 use crate::types::{AccountEvt, BroadcastEvent, GaugeState, IdentityEvt, RepoState, RepoStatus}; 9 9 10 - use fjall::OwnedWriteBatch; 10 + use crate::db::refcount::RefcountedBatch; 11 11 12 12 use jacquard_common::IntoStatic; 13 13 use jacquard_common::cowstr::ToCowStr; ··· 59 59 state: Arc<AppState>, 60 60 rx: BufferRx, 61 61 verify_signatures: bool, 62 + ephemeral: bool, 62 63 num_shards: usize, 63 64 } 64 65 65 66 struct WorkerContext<'a> { 66 67 verify_signatures: bool, 68 + ephemeral: bool, 67 69 state: &'a AppState, 68 - batch: &'a mut OwnedWriteBatch, 70 + batch: RefcountedBatch<'a>, 69 71 added_blocks: &'a mut i64, 70 72 records_delta: &'a mut i64, 71 73 broadcast_events: &'a mut Vec<BroadcastEvent>, ··· 77 79 state: Arc<AppState>, 78 80 rx: BufferRx, 79 81 verify_signatures: bool, 82 + ephemeral: bool, 80 83 num_shards: usize, 81 84 ) -> Self { 82 85 Self { 83 86 state, 84 87 rx, 85 88 verify_signatures, 89 + ephemeral, 86 90 num_shards, 87 91 } 88 92 } ··· 99 103 100 104 let state = self.state.clone(); 101 105 let verify = self.verify_signatures; 106 + let ephemeral = self.ephemeral; 102 107 let handle = handle.clone(); 103 108 104 109 std::thread::Builder::new() 105 110 .name(format!("ingest-shard-{i}")) 106 111 .spawn(move || { 107 - Self::shard(i, rx, state, verify, handle); 112 + Self::shard(i, rx, state, verify, ephemeral, handle); 108 113 }) 109 114 .into_diagnostic()?; 110 115 } ··· 149 154 mut rx: mpsc::UnboundedReceiver<IngestMessage>, 150 155 state: Arc<AppState>, 151 156 verify_signatures: bool, 157 + ephemeral: bool, 152 158 handle: tokio::runtime::Handle, 153 159 ) { 154 160 let _guard = handle.enter(); ··· 157 163 let mut broadcast_events = Vec::new(); 158 164 159 165 while let Some(msg) = rx.blocking_recv() { 160 - let mut batch = state.db.inner.batch(); 166 + let batch = RefcountedBatch::new(&state.db); 161 167 broadcast_events.clear(); 162 168 163 169 let mut added_blocks = 0; ··· 165 171 166 172 let mut ctx = WorkerContext { 167 173 state: &state, 168 - batch: &mut batch, 174 + batch, 169 175 added_blocks: &mut added_blocks, 170 176 records_delta: &mut records_delta, 171 177 broadcast_events: &mut broadcast_events, 172 178 handle: &handle, 173 179 verify_signatures, 180 + ephemeral, 174 181 }; 175 182 176 183 match msg { ··· 191 198 // while the resync buffer is being drained, we should handle that probably 192 199 // but also it should still be fine since we'll sync eventually anyway 193 200 let res = ops::update_repo_status( 194 - &mut batch, 201 + ctx.batch.batch_mut(), 195 202 &state.db, 196 203 &did, 197 204 s, ··· 260 267 } 261 268 } 262 269 263 - if let Err(e) = batch.commit() { 270 + if let Err(e) = ctx.batch.commit() { 264 271 error!(shard = id, err = %e, "failed to commit batch"); 265 272 } 266 273 ··· 366 373 367 374 let handle = identity.handle.as_ref().map(|h| h.clone()); 368 375 repo_state.handle = handle.or(repo_state.handle); 369 - ctx.batch.insert( 376 + ctx.batch.batch_mut().insert( 370 377 &ctx.state.db.repos, 371 378 keys::repo_key(did), 372 379 crate::db::ser_repo_state(&repo_state)?, ··· 394 401 match &account.status { 395 402 Some(AccountStatus::Deleted) => { 396 403 debug!(did = %did, "account deleted, wiping data"); 397 - crate::ops::delete_repo(ctx.batch, &ctx.state.db, did, &repo_state)?; 404 + crate::ops::delete_repo( 405 + &mut ctx.batch, 406 + &ctx.state.db, 407 + did, 408 + &repo_state, 409 + )?; 398 410 return Ok(RepoProcessResult::Deleted); 399 411 } 400 412 status => { ··· 432 444 } 433 445 434 446 repo_state = ops::update_repo_status( 435 - ctx.batch, 447 + ctx.batch.batch_mut(), 436 448 &ctx.state.db, 437 449 did, 438 450 repo_state, ··· 509 521 return Ok(RepoProcessResult::Syncing(Some(commit))); 510 522 } 511 523 524 + let signing_key = Self::fetch_key(ctx, did)?; 512 525 let res = ops::apply_commit( 513 - ctx.batch, 526 + &mut ctx.batch, 514 527 &ctx.state.db, 515 528 repo_state, 516 529 &commit, 517 - Self::fetch_key(ctx, did)?.as_ref(), 530 + signing_key.as_ref(), 518 531 &ctx.state.filter.load(), 532 + ctx.ephemeral, 519 533 )?; 520 534 let repo_state = res.repo_state; 521 535 *ctx.added_blocks += res.blocks_count; ··· 629 643 } 630 644 } 631 645 repo_state = ops::update_repo_status( 632 - ctx.batch, 646 + ctx.batch.batch_mut(), 633 647 &ctx.state.db, 634 648 did, 635 649 repo_state, ··· 660 674 Ok(r) => r, 661 675 Err(e) => { 662 676 if !Self::check_if_retriable_failure(&e) { 663 - ctx.batch.remove(&ctx.state.db.resync_buffer, key); 677 + ctx.batch 678 + .batch_mut() 679 + .remove(&ctx.state.db.resync_buffer, key); 664 680 } 665 681 return Err(e); 666 682 } 667 683 }; 668 684 match res { 669 685 RepoProcessResult::Ok(rs) => { 670 - ctx.batch.remove(&ctx.state.db.resync_buffer, key); 686 + ctx.batch 687 + .batch_mut() 688 + .remove(&ctx.state.db.resync_buffer, key); 671 689 repo_state = rs; 672 690 } 673 691 RepoProcessResult::Syncing(_) => { 674 692 return Ok(RepoProcessResult::Syncing(None)); 675 693 } 676 694 RepoProcessResult::Deleted => { 677 - ctx.batch.remove(&ctx.state.db.resync_buffer, key); 695 + ctx.batch 696 + .batch_mut() 697 + .remove(&ctx.state.db.resync_buffer, key); 678 698 return Ok(RepoProcessResult::Deleted); 679 699 } 680 700 } ··· 692 712 ctx.state.resolver.invalidate_sync(did); 693 713 let doc = ctx.handle.block_on(ctx.state.resolver.resolve_doc(did))?; 694 714 repo_state.update_from_doc(doc); 695 - ctx.batch.insert( 715 + ctx.batch.batch_mut().insert( 696 716 &ctx.state.db.repos, 697 717 keys::repo_key(did), 698 718 crate::db::ser_repo_state(&repo_state)?,
+24
src/main.rs
··· 29 29 30 30 let state = AppState::new(&cfg)?; 31 31 32 + // load block refcounts for GC - must complete before any ingest workers start 33 + if cfg.ephemeral { 34 + db::gc::ephemeral_startup_load_refcounts(&state.db)?; 35 + } else { 36 + db::gc::startup_load_refcounts(&state.db)?; 37 + } 38 + 32 39 if cfg.full_network 33 40 || cfg.filter_signals.is_some() 34 41 || cfg.filter_collections.is_some() ··· 76 83 let (buffer_tx, buffer_rx) = mpsc::unbounded_channel(); 77 84 let state = Arc::new(state); 78 85 86 + // spawn GC workers 87 + if cfg.ephemeral { 88 + let state_ttl = state.clone(); 89 + std::thread::Builder::new() 90 + .name("ephemeral-ttl".into()) 91 + .spawn(move || db::gc::ephemeral_ttl_worker(state_ttl)) 92 + .into_diagnostic()?; 93 + } else { 94 + let state_gc = state.clone(); 95 + std::thread::Builder::new() 96 + .name("gc-checkpoint".into()) 97 + .spawn(move || db::gc::checkpoint_worker(state_gc)) 98 + .into_diagnostic()?; 99 + } 100 + 79 101 if cfg.enable_backfill { 80 102 tokio::spawn({ 81 103 let state = state.clone(); ··· 89 111 cfg.verify_signatures, 90 112 SignatureVerification::Full | SignatureVerification::BackfillOnly 91 113 ), 114 + cfg.ephemeral, 92 115 ) 93 116 .run() 94 117 }); ··· 213 236 state, 214 237 buffer_rx, 215 238 matches!(cfg.verify_signatures, SignatureVerification::Full), 239 + cfg.ephemeral, 216 240 cfg.firehose_workers, 217 241 ) 218 242 .run(handle)
+68 -29
src/ops.rs
··· 1 1 use fjall::OwnedWriteBatch; 2 + use fjall::Slice; 3 + 4 + use crate::db::refcount::RefcountedBatch; 2 5 use jacquard_common::CowStr; 3 6 use jacquard_common::IntoStatic; 4 7 use jacquard_common::types::cid::Cid; ··· 64 67 BroadcastEvent::Ephemeral(Box::new(marshallable)) 65 68 } 66 69 67 - pub fn delete_repo<'batch>( 68 - batch: &'batch mut OwnedWriteBatch, 70 + pub fn delete_repo( 71 + batch: &mut RefcountedBatch<'_>, 69 72 db: &Db, 70 73 did: &Did, 71 74 repo_state: &RepoState, ··· 76 79 let pending_key = keys::pending_key(repo_state.index_id); 77 80 78 81 // 1. delete from repos, pending, resync 79 - batch.remove(&db.repos, &repo_key); 82 + batch.batch_mut().remove(&db.repos, &repo_key); 80 83 match repo_state.status { 81 84 RepoStatus::Synced => {} 82 85 RepoStatus::Backfilling => { 83 - batch.remove(&db.pending, &pending_key); 86 + batch.batch_mut().remove(&db.pending, &pending_key); 84 87 } 85 88 _ => { 86 - batch.remove(&db.resync, &repo_key); 89 + batch.batch_mut().remove(&db.resync, &repo_key); 87 90 } 88 91 } 89 92 ··· 91 94 let resync_prefix = keys::resync_buffer_prefix(did); 92 95 for guard in db.resync_buffer.prefix(&resync_prefix) { 93 96 let k = guard.key().into_diagnostic()?; 94 - batch.remove(&db.resync_buffer, k); 97 + batch.batch_mut().remove(&db.resync_buffer, k); 95 98 } 96 99 97 100 // 3. delete from records 98 101 let records_prefix = keys::record_prefix_did(did); 99 102 for guard in db.records.prefix(&records_prefix) { 100 - let k = guard.key().into_diagnostic()?; 101 - batch.remove(&db.records, k); 103 + let (k, cid_bytes) = guard.into_inner().into_diagnostic()?; 104 + batch.update_block_refcount(cid_bytes, -1); 105 + batch.batch_mut().remove(&db.records, k); 102 106 } 103 107 104 108 // 4. reset collection counts ··· 110 114 111 115 for guard in db.counts.prefix(&count_prefix) { 112 116 let k = guard.key().into_diagnostic()?; 113 - batch.remove(&db.counts, k); 117 + batch.batch_mut().remove(&db.counts, k); 114 118 } 115 119 116 120 Ok(()) ··· 218 222 pub blocks_count: i64, 219 223 } 220 224 221 - pub fn apply_commit<'batch, 'db, 'commit, 's>( 222 - batch: &'batch mut OwnedWriteBatch, 225 + pub fn apply_commit<'db, 'commit, 's>( 226 + batch: &mut RefcountedBatch<'db>, 223 227 db: &'db Db, 224 228 mut repo_state: RepoState<'s>, 225 229 commit: &'commit Commit<'commit>, 226 230 signing_key: Option<&PublicKey>, 227 231 filter: &FilterConfig, 232 + ephemeral: bool, 228 233 ) -> Result<ApplyCommitResults<'s>> { 229 234 let did = &commit.repo; 230 235 debug!(did = %did, commit = %commit.commit, "applying commit"); ··· 257 262 repo_state.data = Some(repo_commit.data); 258 263 repo_state.last_updated_at = chrono::Utc::now().timestamp(); 259 264 260 - batch.insert(&db.repos, keys::repo_key(did), ser_repo_state(&repo_state)?); 265 + batch 266 + .batch_mut() 267 + .insert(&db.repos, keys::repo_key(did), ser_repo_state(&repo_state)?); 261 268 262 269 // 2. iterate ops and update records index 263 270 let mut records_delta = 0; ··· 287 294 .into_diagnostic() 288 295 .wrap_err("expected valid cid from relay")?; 289 296 290 - if let Some(bytes) = parsed.blocks.get(&cid_ipld) { 291 - batch.insert(&db.blocks, cid_ipld.to_bytes(), bytes.to_vec()); 292 - blocks_count += 1; 293 - } 297 + let Some(bytes) = parsed.blocks.get(&cid_ipld) else { 298 + return Err(miette::miette!( 299 + "block {cid} not found in CAR for record {did}/{collection}/{rkey}" 300 + )); 301 + }; 302 + let cid_bytes = Slice::from(cid_ipld.to_bytes()); 303 + batch 304 + .batch_mut() 305 + .insert(&db.blocks, cid_bytes.clone(), bytes.to_vec()); 306 + blocks_count += 1; 307 + batch.update_block_refcount(cid_bytes.clone(), ephemeral.then_some(1).unwrap_or(2)); 294 308 295 - batch.insert(&db.records, db_key.clone(), cid_ipld.to_bytes()); 296 - 297 - // accumulate counts 298 - if action == DbAction::Create { 299 - records_delta += 1; 300 - *collection_deltas.entry(collection).or_default() += 1; 309 + if !ephemeral { 310 + batch 311 + .batch_mut() 312 + .insert(&db.records, db_key.clone(), cid_ipld.to_bytes()); 313 + // for Update, also decrement old CID refcount 314 + if action == DbAction::Update { 315 + let Some(old_cid_bytes) = db.records.get(&db_key).into_diagnostic()? else { 316 + return Err(miette::miette!( 317 + "!!! THIS IS A BUG !!! expected previous cid to be there for record being updated ({did}/{collection}/{rkey}). how did we get here?" 318 + )); 319 + }; 320 + if old_cid_bytes != cid_bytes { 321 + batch.update_block_refcount(old_cid_bytes, -1); 322 + } 323 + } 324 + // accumulate counts 325 + if action == DbAction::Create { 326 + records_delta += 1; 327 + *collection_deltas.entry(collection).or_default() += 1; 328 + } 301 329 } 302 330 } 303 331 DbAction::Delete => { 304 - batch.remove(&db.records, db_key); 332 + if !ephemeral { 333 + // decrement block refcount 334 + let old_cid_bytes = db.records.get(&db_key).into_diagnostic()?; 335 + if let Some(cid_bytes) = old_cid_bytes { 336 + batch.update_block_refcount(cid_bytes, -1); 337 + } 338 + batch.batch_mut().remove(&db.records, db_key); 305 339 306 - // accumulate counts 307 - records_delta -= 1; 308 - *collection_deltas.entry(collection).or_default() -= 1; 340 + // accumulate counts 341 + records_delta -= 1; 342 + *collection_deltas.entry(collection).or_default() -= 1; 343 + } 309 344 } 310 345 } 311 346 ··· 320 355 }; 321 356 322 357 let bytes = rmp_serde::to_vec(&evt).into_diagnostic()?; 323 - batch.insert(&db.events, keys::event_key(event_id), bytes); 358 + batch 359 + .batch_mut() 360 + .insert(&db.events, keys::event_key(event_id), bytes); 324 361 } 325 362 326 363 // update counts 327 - for (col, delta) in collection_deltas { 328 - db::update_record_count(batch, db, did, col, delta)?; 364 + if !ephemeral { 365 + for (col, delta) in collection_deltas { 366 + db::update_record_count(batch.batch_mut(), db, did, col, delta)?; 367 + } 329 368 } 330 369 331 370 Ok(ApplyCommitResults {
+2 -2
tests/common.nu
··· 52 52 # build the hydrant binary 53 53 export def build-hydrant [] { 54 54 print "building hydrant..." 55 - cargo build --release 56 - "target/release/hydrant" 55 + cargo build 56 + "target/debug/hydrant" 57 57 } 58 58 59 59 # start hydrant in the background
+159
tests/gc_test.nu
··· 1 + #!/usr/bin/env nu 2 + use common.nu * 3 + 4 + def run-test-instance [name: string, scenario_closure: closure] { 5 + let port = 3004 6 + let debug_port = $port + 1 7 + let url = $"http://localhost:($port)" 8 + let debug_url = $"http://localhost:($debug_port)" 9 + let db_path = (mktemp -d -t hydrant_gc_test.XXXXXX) 10 + 11 + print $"--- running scenario: ($name) ---" 12 + print $"database path: ($db_path)" 13 + 14 + let binary = build-hydrant 15 + let instance = start-hydrant $binary $db_path $port 16 + 17 + try { 18 + if not (wait-for-api $url) { 19 + error make {msg: "api failed to start"} 20 + } 21 + 22 + do $scenario_closure $url $debug_url 23 + 24 + print $"PASSED: ($name)\n" 25 + } catch { |e| 26 + print $"test failed: ($e.msg)" 27 + try { kill $instance.pid } 28 + exit 1 29 + } 30 + 31 + try { kill $instance.pid } 32 + } 33 + 34 + def wait-for-blocks [debug_url: string] { 35 + print "waiting for blocks to appear..." 36 + mut blocks = {} 37 + mut count = 0 38 + for i in 1..30 { 39 + $blocks = (http get $"($debug_url)/debug/iter?partition=blocks&limit=1000") 40 + $count = ($blocks.items | length) 41 + if $count > 0 { 42 + break 43 + } 44 + sleep 2sec 45 + } 46 + if $count == 0 { 47 + error make {msg: "FAILED: no blocks found after backfill"} 48 + } 49 + $count 50 + } 51 + 52 + def compact-and-check-blocks [debug_url: string, expected_count: int] { 53 + print "triggering major compaction on blocks partition..." 54 + http post -H [Content-Length 0] $"($debug_url)/debug/compact?partition=blocks" "" 55 + 56 + let blocks_after = http get $"($debug_url)/debug/iter?partition=blocks&limit=1000" 57 + let after_count = ($blocks_after.items | length) 58 + 59 + if $after_count != $expected_count { 60 + error make {msg: $"FAILED: expected ($expected_count) blocks after compaction, found ($after_count)"} 61 + } 62 + } 63 + 64 + def ack-all-events [debug_url: string, url: string] { 65 + print "acking all events..." 66 + mut items = [] 67 + for i in 1..30 { 68 + let events = http get $"($debug_url)/debug/iter?partition=events&limit=1000" 69 + $items = $events.items 70 + if ($items | length) > 0 { 71 + break 72 + } 73 + sleep 2sec 74 + } 75 + 76 + if ($items | length) == 0 { 77 + error make {msg: "FAILED: no events to ack"} 78 + } 79 + 80 + let event_ids = ($items | each { |x| ($x | first | into int) }) 81 + 82 + http post -t application/json $"($url)/stream/ack" { ids: $event_ids } 83 + print $"acked ($event_ids | length) events" 84 + } 85 + 86 + def main [] { 87 + let repo1 = "did:web:guestbook.gaze.systems" 88 + let repo2 = "did:plc:dfl62fgb7wtjj3fcbb72naae" 89 + 90 + run-test-instance "delete repo only" { |url, debug_url| 91 + print $"adding repo ($repo1) to tracking..." 92 + http put -t application/json $"($url)/repos" [ { did: ($repo1) } ] 93 + 94 + let before_count = (wait-for-blocks $debug_url) 95 + print $"found ($before_count) blocks before GC" 96 + 97 + print "deleting repo..." 98 + http delete -t application/json $"($url)/repos" --data [ { did: ($repo1), delete_data: true } ] 99 + sleep 1sec 100 + 101 + compact-and-check-blocks $debug_url $before_count 102 + } 103 + 104 + run-test-instance "ack events only" { |url, debug_url| 105 + print $"adding repo ($repo1) to tracking..." 106 + http put -t application/json $"($url)/repos" [ { did: ($repo1) } ] 107 + 108 + let before_count = (wait-for-blocks $debug_url) 109 + print $"found ($before_count) blocks before GC" 110 + 111 + ack-all-events $debug_url $url 112 + sleep 1sec 113 + 114 + compact-and-check-blocks $debug_url $before_count 115 + } 116 + 117 + run-test-instance "delete repo, ack events" { |url, debug_url| 118 + print $"adding repo ($repo1) to tracking..." 119 + http put -t application/json $"($url)/repos" [ { did: ($repo1) } ] 120 + 121 + let before_count = (wait-for-blocks $debug_url) 122 + print $"found ($before_count) blocks before GC" 123 + 124 + print "deleting repo..." 125 + http delete -t application/json $"($url)/repos" --data [ { did: ($repo1), delete_data: true } ] 126 + 127 + ack-all-events $debug_url $url 128 + sleep 1sec 129 + 130 + compact-and-check-blocks $debug_url 0 131 + } 132 + 133 + run-test-instance "multiple repos" { |url, debug_url| 134 + print $"adding repo ($repo2) to tracking..." 135 + http put -t application/json $"($url)/repos" [ { did: ($repo2) } ] 136 + let repo2_blocks = (wait-for-blocks $debug_url) 137 + print $"found ($repo2_blocks) blocks for repo2" 138 + 139 + print $"adding repo ($repo1) to tracking..." 140 + http put -t application/json $"($url)/repos" [ { did: ($repo1) } ] 141 + 142 + # wait a bit more for repo1 blocks to finish 143 + sleep 5sec 144 + let total_blocks = (http get $"($debug_url)/debug/iter?partition=blocks&limit=1000000" | get items | length) 145 + print $"found ($total_blocks) total blocks before GC" 146 + 147 + print $"deleting repo ($repo1)..." 148 + http delete -t application/json $"($url)/repos" --data [ { did: ($repo1), delete_data: true } ] 149 + 150 + # ack events specifically for repo1? Actually wait, the events endpoint contains all events. 151 + # we will ack all events to be safe. Since repo2 is NOT deleted, its refcount should be fine even if events are acked. 152 + ack-all-events $debug_url $url 153 + sleep 1sec 154 + 155 + compact-and-check-blocks $debug_url $repo2_blocks 156 + } 157 + 158 + print "all gc tests passed!" 159 + }
+2 -2
tests/repos_api_test.nu
··· 110 110 111 111 # Helper to build hydrant 112 112 def build-hydrant [] { 113 - cargo build --release --quiet 114 - "./target/release/hydrant" 113 + cargo build --quiet 114 + "./target/debug/hydrant" 115 115 } 116 116 117 117 # Helper to start hydrant