at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall

[backfill] fix pending keys

ptr.pet 3b5d62cf 4ef5c943

verified
+14 -44
+5 -13
src/backfill/manager.rs
··· 1 1 use crate::db::types::TrimmedDid; 2 - use crate::db::{self, deser_repo_state, ser_repo_state}; 2 + use crate::db::{self, deser_repo_state, keys, ser_repo_state}; 3 3 use crate::state::AppState; 4 4 use crate::types::{RepoStatus, ResyncState}; 5 5 use miette::{IntoDiagnostic, Result}; ··· 28 28 // move back to pending 29 29 let mut batch = state.db.inner.batch(); 30 30 batch.remove(&state.db.resync, key.clone()); 31 - batch.insert( 32 - &state.db.pending, 33 - crate::db::keys::pending_key(&did), 34 - Vec::new(), 35 - ); 31 + batch.insert(&state.db.pending, key.clone(), Vec::new()); 36 32 37 33 // update repo state back to backfilling 38 - let repo_key = crate::db::keys::repo_key(&did); 39 - if let Some(state_bytes) = state.db.repos.get(&repo_key).into_diagnostic()? { 34 + if let Some(state_bytes) = state.db.repos.get(&key).into_diagnostic()? { 40 35 let mut repo_state = deser_repo_state(&state_bytes)?; 41 36 repo_state.status = RepoStatus::Backfilling; 42 - batch.insert(&state.db.repos, &repo_key, ser_repo_state(&repo_state)?); 37 + batch.insert(&state.db.repos, key, ser_repo_state(&repo_state)?); 43 38 } 44 39 45 40 state.db.update_count("resync", -1); ··· 91 86 92 87 // move back to pending 93 88 state.db.update_count("pending", 1); 94 - if let Err(e) = db 95 - .pending 96 - .insert(crate::db::keys::pending_key(&did), Vec::new()) 97 - { 89 + if let Err(e) = db.pending.insert(keys::repo_key(&did), Vec::new()) { 98 90 error!("failed to move {did} to pending: {e}"); 99 91 db::check_poisoned(&e); 100 92 continue;
+6 -16
src/backfill/mod.rs
··· 126 126 127 127 let mut spawned = 0; 128 128 129 - // limit the number of active tasks based on adaptive limit 130 - // we iterate in reverse to prioritize newer items (LIFO) 131 - // effective key comparison: {timestamp}|{did} 132 - // older timestamps are smaller, newer are larger. 133 - // rev() starts from largest (newest). 134 - for guard in self.state.db.pending.iter().rev() { 129 + for guard in self.state.db.pending.iter() { 135 130 if self.in_flight.len() >= limiter.current_limit { 136 131 break; 137 132 } ··· 145 140 } 146 141 }; 147 142 148 - let did = if key.len() > 9 && key[8] == keys::SEP { 149 - match TrimmedDid::try_from(&key[9..]) { 150 - Ok(d) => d.to_did(), 151 - Err(e) => { 152 - error!("invalid did '{key:?}' in pending: {e}"); 153 - continue; 154 - } 143 + let did = match TrimmedDid::try_from(key.as_ref()) { 144 + Ok(d) => d.to_did(), 145 + Err(e) => { 146 + error!("invalid did '{key:?}' in pending: {e}"); 147 + continue; 155 148 } 156 - } else { 157 - error!("invalid did '{key:?}' in pending"); 158 - continue; 159 149 }; 160 150 161 151 if self.in_flight.contains_sync(&did) {
-12
src/db/keys.rs
··· 114 114 prefix.push(SEP); 115 115 prefix 116 116 } 117 - 118 - // key format: {timestamp}|{DID} (DID trimmed) 119 - // timestamp is big-endian u64 micros 120 - pub fn pending_key(did: &Did) -> Vec<u8> { 121 - let repo = TrimmedDid::from(did); 122 - let mut key = Vec::with_capacity(8 + 1 + repo.len()); 123 - let ts = chrono::Utc::now().timestamp_micros() as u64; 124 - key.extend_from_slice(&ts.to_be_bytes()); 125 - key.push(SEP); 126 - repo.write_to_vec(&mut key); 127 - key 128 - }
+3 -3
src/ingest/worker.rs
··· 360 360 RepoStatus::Backfilling, 361 361 )?; 362 362 ctx.state.db.update_count("pending", 1); 363 - batch.insert(&ctx.state.db.pending, keys::pending_key(did), &[]); 363 + batch.insert(&ctx.state.db.pending, keys::repo_key(did), &[]); 364 364 batch.commit().into_diagnostic()?; 365 365 ctx.state.notify_backfill(); 366 366 return Ok(RepoProcessResult::Ok(repo_state)); ··· 506 506 RepoStatus::Backfilling, 507 507 )?; 508 508 ctx.state.db.update_count("pending", 1); 509 - batch.insert(&ctx.state.db.pending, keys::pending_key(did), &[]); 509 + batch.insert(&ctx.state.db.pending, keys::repo_key(did), &[]); 510 510 batch.commit().into_diagnostic()?; 511 511 ctx.repo_cache 512 512 .insert(did.clone().into_static(), repo_state.clone().into_static()); ··· 567 567 &repo_key, 568 568 crate::db::ser_repo_state(&new_state)?, 569 569 ); 570 - batch.insert(&ctx.state.db.pending, keys::pending_key(did), &[]); 570 + batch.insert(&ctx.state.db.pending, repo_key, &[]); 571 571 ctx.state.db.update_count("repos", 1); 572 572 ctx.state.db.update_count("pending", 1); 573 573 batch.commit().into_diagnostic()?;