at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall

[tests] improve the gc test

ptr.pet 88cb2848 41096cfe

verified
+183 -8
+69
src/api/debug.rs
··· 32 32 .route("/debug/iter", get(handle_debug_iter)) 33 33 .route("/debug/refcount", get(handle_debug_refcount)) 34 34 .route("/debug/refcount", post(handle_set_debug_refcount)) 35 + .route("/debug/repo_refcounts", get(handle_debug_repo_refcounts)) 35 36 .route("/debug/compact", post(handle_debug_compact)) 36 37 } 37 38 ··· 332 333 333 334 Ok(StatusCode::OK) 334 335 } 336 + 337 + #[derive(Deserialize)] 338 + pub struct DebugRepoRefcountsRequest { 339 + pub did: String, 340 + } 341 + 342 + #[derive(Serialize)] 343 + pub struct DebugRepoRefcountsResponse { 344 + pub cids: std::collections::HashMap<String, i64>, 345 + } 346 + 347 + pub async fn handle_debug_repo_refcounts( 348 + State(state): State<Arc<AppState>>, 349 + Query(req): Query<DebugRepoRefcountsRequest>, 350 + ) -> Result<Json<DebugRepoRefcountsResponse>, StatusCode> { 351 + let raw_did = jacquard_common::types::ident::AtIdentifier::new(req.did.as_str()) 352 + .map_err(|_| StatusCode::BAD_REQUEST)?; 353 + let did = state 354 + .resolver 355 + .resolve_did(&raw_did) 356 + .await 357 + .map_err(|_| StatusCode::BAD_REQUEST)?; 358 + 359 + let state_clone = state.clone(); 360 + 361 + let cids = tokio::task::spawn_blocking(move || { 362 + let mut unique_cids: std::collections::HashSet<String> = std::collections::HashSet::new(); 363 + let db = &state_clone.db; 364 + 365 + // 1. Scan records 366 + let records_prefix = crate::db::keys::record_prefix_did(&did); 367 + for guard in db.records.prefix(&records_prefix) { 368 + if let Ok((_k, v)) = guard.into_inner() { 369 + if let Ok(cid) = cid::Cid::read_bytes(v.as_ref()) { 370 + unique_cids.insert(cid.to_string()); 371 + } 372 + } 373 + } 374 + 375 + // 2. Scan events 376 + let trimmed_did = crate::db::types::TrimmedDid::from(&did); 377 + for guard in db.events.iter() { 378 + if let Ok((_k, v)) = guard.into_inner() { 379 + if let Ok(evt) = rmp_serde::from_slice::<crate::types::StoredEvent>(v.as_ref()) { 380 + if evt.did == trimmed_did { 381 + if let Some(cid) = evt.cid { 382 + unique_cids.insert(cid.to_string()); 383 + } 384 + } 385 + } 386 + } 387 + } 388 + 389 + let mut counts: std::collections::HashMap<String, i64> = std::collections::HashMap::new(); 390 + for cid_str in unique_cids { 391 + if let Ok(cid) = cid::Cid::from_str(&cid_str) { 392 + let cid_bytes = fjall::Slice::from(cid.to_bytes()); 393 + let count = db.block_refcounts.read_sync(cid_bytes.as_ref(), |_, v| *v).unwrap_or(0); 394 + counts.insert(cid_str, count); 395 + } 396 + } 397 + counts 398 + }) 399 + .await 400 + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; 401 + 402 + Ok(Json(DebugRepoRefcountsResponse { cids })) 403 + }
+7 -4
src/ops.rs
··· 101 101 let records_prefix = keys::record_prefix_did(did); 102 102 for guard in db.records.prefix(&records_prefix) { 103 103 let (k, cid_bytes) = guard.into_inner().into_diagnostic()?; 104 - batch.update_block_refcount(cid_bytes, -1); 104 + batch.update_block_refcount(cid_bytes, -1)?; 105 105 batch.batch_mut().remove(&db.records, k); 106 106 } 107 107 ··· 304 304 .batch_mut() 305 305 .insert(&db.blocks, cid_bytes.clone(), bytes.to_vec()); 306 306 blocks_count += 1; 307 - batch.update_block_refcount(cid_bytes.clone(), ephemeral.then_some(1).unwrap_or(2)); 307 + batch.update_block_refcount( 308 + cid_bytes.clone(), 309 + ephemeral.then_some(1).unwrap_or(2), 310 + )?; 308 311 309 312 if !ephemeral { 310 313 batch ··· 318 321 )); 319 322 }; 320 323 if old_cid_bytes != cid_bytes { 321 - batch.update_block_refcount(old_cid_bytes, -1); 324 + batch.update_block_refcount(old_cid_bytes, -1)?; 322 325 } 323 326 } 324 327 // accumulate counts ··· 333 336 // decrement block refcount 334 337 let old_cid_bytes = db.records.get(&db_key).into_diagnostic()?; 335 338 if let Some(cid_bytes) = old_cid_bytes { 336 - batch.update_block_refcount(cid_bytes, -1); 339 + batch.update_block_refcount(cid_bytes, -1)?; 337 340 } 338 341 batch.batch_mut().remove(&db.records, db_key); 339 342
+107 -4
tests/gc_test.nu
··· 61 61 } 62 62 } 63 63 64 + def check-repo-refcounts [debug_url: string, did: string, expected_refcount: int] { 65 + let refs = (http get $"($debug_url)/debug/repo_refcounts?did=($did)") 66 + let cids = $refs.cids 67 + let count = ($cids | columns | length) 68 + if $count == 0 { 69 + if $expected_refcount != 0 { 70 + error make {msg: $"FAILED: expected refcounts to be ($expected_refcount) but found no cids for ($did)"} 71 + } 72 + print $"blocks for ($did) completely verified as 0" 73 + return 74 + } 75 + 76 + for rcid in ($cids | transpose key value) { 77 + if $rcid.value < $expected_refcount { 78 + error make {msg: $"FAILED: expected refcount for ($rcid.key) to be >= ($expected_refcount), but found ($rcid.value)"} 79 + } 80 + if $expected_refcount == 0 and $rcid.value != 0 { 81 + error make {msg: $"FAILED: expected refcount for ($rcid.key) to be 0, but found ($rcid.value)"} 82 + } 83 + } 84 + print $"all ($count) tracked blocks for ($did) verified with refcount >= ($expected_refcount)" 85 + } 86 + 64 87 def ack-all-events [debug_url: string, url: string] { 65 88 print "acking all events..." 89 + mut total_acked = 0 66 90 mut items = [] 91 + 92 + # wait for at least some events 67 93 for i in 1..30 { 68 94 let events = http get $"($debug_url)/debug/iter?partition=events&limit=1000" 69 95 $items = $events.items ··· 77 103 error make {msg: "FAILED: no events to ack"} 78 104 } 79 105 80 - let event_ids = ($items | each { |x| ($x | first | into int) }) 81 - 82 - http post -t application/json $"($url)/stream/ack" { ids: $event_ids } 83 - print $"acked ($event_ids | length) events" 106 + loop { 107 + let event_ids = ($items | each { |x| ($x | first | into int) }) 108 + http post -t application/json $"($url)/stream/ack" { ids: $event_ids } 109 + $total_acked += ($event_ids | length) 110 + 111 + # getting next batch 112 + let next_events = http get $"($debug_url)/debug/iter?partition=events&limit=1000" 113 + $items = $next_events.items 114 + if ($items | length) == 0 { 115 + break 116 + } 117 + } 118 + 119 + print $"acked ($total_acked) events" 84 120 } 85 121 86 122 def main [] { ··· 93 129 94 130 let before_count = (wait-for-blocks $debug_url) 95 131 print $"found ($before_count) blocks before GC" 132 + 133 + check-repo-refcounts $debug_url $repo1 2 96 134 97 135 print "deleting repo..." 98 136 http delete -t application/json $"($url)/repos" --data [ { did: ($repo1), delete_data: true } ] 99 137 sleep 1sec 100 138 139 + check-repo-refcounts $debug_url $repo1 1 140 + 101 141 compact-and-check-blocks $debug_url $before_count 102 142 } 103 143 ··· 107 147 108 148 let before_count = (wait-for-blocks $debug_url) 109 149 print $"found ($before_count) blocks before GC" 150 + 151 + check-repo-refcounts $debug_url $repo1 2 110 152 111 153 ack-all-events $debug_url $url 112 154 sleep 1sec 155 + 156 + check-repo-refcounts $debug_url $repo1 1 113 157 114 158 compact-and-check-blocks $debug_url $before_count 115 159 } ··· 120 164 121 165 let before_count = (wait-for-blocks $debug_url) 122 166 print $"found ($before_count) blocks before GC" 167 + 168 + check-repo-refcounts $debug_url $repo1 2 123 169 124 170 print "deleting repo..." 125 171 http delete -t application/json $"($url)/repos" --data [ { did: ($repo1), delete_data: true } ] 126 172 127 173 ack-all-events $debug_url $url 128 174 sleep 1sec 175 + 176 + check-repo-refcounts $debug_url $repo1 0 177 + 178 + compact-and-check-blocks $debug_url 0 179 + } 180 + 181 + run-test-instance "delete repo, compact, ack events, compact" { |url, debug_url| 182 + print $"adding repo ($repo1) to tracking..." 183 + http put -t application/json $"($url)/repos" [ { did: ($repo1) } ] 184 + 185 + let before_count = (wait-for-blocks $debug_url) 186 + print $"found ($before_count) blocks before GC" 187 + 188 + check-repo-refcounts $debug_url $repo1 2 189 + 190 + print "deleting repo..." 191 + http delete -t application/json $"($url)/repos" --data [ { did: ($repo1), delete_data: true } ] 192 + sleep 1sec 193 + 194 + check-repo-refcounts $debug_url $repo1 1 195 + 196 + compact-and-check-blocks $debug_url $before_count 197 + 198 + ack-all-events $debug_url $url 199 + sleep 1sec 200 + 201 + check-repo-refcounts $debug_url $repo1 0 202 + 203 + compact-and-check-blocks $debug_url 0 204 + } 205 + 206 + run-test-instance "ack events, compact, delete repo, compact" { |url, debug_url| 207 + print $"adding repo ($repo1) to tracking..." 208 + http put -t application/json $"($url)/repos" [ { did: ($repo1) } ] 209 + 210 + let before_count = (wait-for-blocks $debug_url) 211 + print $"found ($before_count) blocks before GC" 212 + 213 + check-repo-refcounts $debug_url $repo1 2 214 + 215 + ack-all-events $debug_url $url 216 + sleep 1sec 217 + 218 + check-repo-refcounts $debug_url $repo1 1 219 + 220 + compact-and-check-blocks $debug_url $before_count 221 + 222 + print "deleting repo..." 223 + http delete -t application/json $"($url)/repos" --data [ { did: ($repo1), delete_data: true } ] 224 + sleep 1sec 225 + 226 + check-repo-refcounts $debug_url $repo1 0 129 227 130 228 compact-and-check-blocks $debug_url 0 131 229 } ··· 151 249 # we will ack all events to be safe. Since repo2 is NOT deleted, its refcount should be fine even if events are acked. 152 250 ack-all-events $debug_url $url 153 251 sleep 1sec 252 + 253 + # repo1 should have expected refcount 0 254 + check-repo-refcounts $debug_url $repo1 0 255 + # Wait, for repo2, we didn't delete it, but events are acked, so its refcount should be 1 256 + check-repo-refcounts $debug_url $repo2 1 154 257 155 258 compact-and-check-blocks $debug_url $repo2_blocks 156 259 }