at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall

[backfill] implement LIFO prioritization and adaptive concurrency

ptr.pet 33aa27be a366d89f

verified
+141 -36
+9 -2
src/backfill/manager.rs
··· 28 28 // move back to pending 29 29 let mut batch = state.db.inner.batch(); 30 30 batch.remove(&state.db.resync, key.clone()); 31 - batch.insert(&state.db.pending, key, Vec::new()); 31 + batch.insert( 32 + &state.db.pending, 33 + crate::db::keys::pending_key(&did), 34 + Vec::new(), 35 + ); 32 36 33 37 // update repo state back to backfilling 34 38 let repo_key = crate::db::keys::repo_key(&did); ··· 87 91 88 92 // move back to pending 89 93 state.db.update_count("pending", 1); 90 - if let Err(e) = db.pending.insert(key, Vec::new()) { 94 + if let Err(e) = db 95 + .pending 96 + .insert(crate::db::keys::pending_key(&did), Vec::new()) 97 + { 91 98 error!("failed to move {did} to pending: {e}"); 92 99 db::check_poisoned(&e); 93 100 continue;
+117 -33
src/backfill/mod.rs
··· 4 4 use crate::state::AppState; 5 5 use crate::types::{AccountEvt, BroadcastEvent, RepoState, RepoStatus, ResyncState, StoredEvent}; 6 6 7 + use fjall::Slice; 7 8 use jacquard::api::com_atproto::sync::get_repo::{GetRepo, GetRepoError}; 8 9 use jacquard::error::{ClientError, ClientErrorKind}; 9 10 use jacquard::types::cid::Cid; ··· 27 28 28 29 use crate::ingest::{BufferTx, IngestMessage}; 29 30 31 + struct AdaptiveLimiter { 32 + current_limit: usize, 33 + max_limit: usize, 34 + min_limit: usize, 35 + } 36 + 37 + impl AdaptiveLimiter { 38 + fn new(start: usize, max: usize) -> Self { 39 + Self { 40 + current_limit: start, 41 + max_limit: max, 42 + min_limit: 1, 43 + } 44 + } 45 + 46 + fn on_success(&mut self) { 47 + if self.current_limit < self.max_limit { 48 + self.current_limit += 1; 49 + debug!("adaptive limiter increased to {}", self.current_limit); 50 + } 51 + } 52 + 53 + fn on_failure(&mut self) { 54 + if self.current_limit > self.min_limit { 55 + self.current_limit = (self.current_limit / 2).max(self.min_limit); 56 + debug!("adaptive limiter decreased to {}", self.current_limit); 57 + } 58 + } 59 + } 60 + 30 61 pub struct BackfillWorker { 31 62 state: Arc<AppState>, 32 63 buffer_tx: BufferTx, ··· 75 106 impl BackfillWorker { 76 107 pub async fn run(self) { 77 108 info!("backfill worker started"); 109 + 110 + let (feedback_tx, mut feedback_rx) = tokio::sync::mpsc::channel(100); 111 + let mut limiter = AdaptiveLimiter::new( 112 + self.semaphore.available_permits(), 113 + self.semaphore.available_permits(), 114 + ); // assume start at max 115 + 78 116 loop { 117 + // apply feedback from finished tasks 118 + while let Ok(was_ratelimited) = feedback_rx.try_recv() { 119 + if was_ratelimited { 120 + limiter.on_failure(); 121 + } else { 122 + limiter.on_success(); 123 + } 124 + } 125 + 79 126 let mut spawned = 0; 80 127 81 - for guard in self.state.db.pending.iter() { 128 + // limit the number of active tasks based on adaptive limit 129 + // we iterate in reverse to prioritize newer items (LIFO) 130 + // effective key comparison: {timestamp}|{did} 131 + // older timestamps are smaller, newer are larger. 132 + // rev() starts from largest (newest). 133 + for guard in self.state.db.pending.iter().rev() { 134 + if self.in_flight.len() >= limiter.current_limit { 135 + break; 136 + } 137 + 82 138 let key = match guard.key() { 83 139 Ok(k) => k, 84 140 Err(e) => { ··· 88 144 } 89 145 }; 90 146 91 - let did = match TrimmedDid::try_from(key.as_ref()) { 92 - Ok(d) => d.to_did(), 93 - Err(e) => { 94 - error!("invalid did in pending: {e}"); 95 - continue; 147 + let did = if key.len() > 9 && key[8] == keys::SEP { 148 + match TrimmedDid::try_from(&key[9..]) { 149 + Ok(d) => d.to_did(), 150 + Err(e) => { 151 + error!("invalid did '{key:?}' in pending: {e}"); 152 + continue; 153 + } 96 154 } 155 + } else { 156 + error!("invalid did '{key:?}' in pending"); 157 + continue; 97 158 }; 98 159 99 160 if self.in_flight.contains_sync(&did) { ··· 113 174 114 175 let state = self.state.clone(); 115 176 let http = self.http.clone(); 116 - let buffer_tx_clone = self.buffer_tx.clone(); 117 - let did_clone = did.clone(); 177 + let did = did.clone(); 178 + let buffer_tx = self.buffer_tx.clone(); 118 179 let verify = self.verify_signatures; 180 + let feedback_tx = feedback_tx.clone(); 119 181 120 182 tokio::spawn(async move { 121 183 let _guard = guard; 122 - did_task( 123 - state, 124 - http, 125 - buffer_tx_clone, 126 - did_clone.clone(), 127 - permit, 128 - verify, 129 - ) 130 - .await 131 - .inspect_err(move |e| { 132 - error!("backfill process failed for {did_clone}: {e}"); 133 - db::check_poisoned_report(e); 134 - }) 184 + let res = did_task(&state, http, buffer_tx, &did, key, permit, verify).await; 185 + 186 + let was_ratelimited = match &res { 187 + Err(e) if matches!(e, BackfillError::Ratelimited) => true, 188 + _ => false, 189 + }; 190 + 191 + if let Err(e) = res { 192 + error!("backfill process failed for {did}: {e}"); 193 + if let BackfillError::Generic(report) = &e { 194 + db::check_poisoned_report(report); 195 + } 196 + } 197 + 198 + // wake worker to pick up more (in case we were sleeping at limit) 199 + state.backfill_notify.notify_one(); 200 + 201 + let _ = feedback_tx.try_send(was_ratelimited); 135 202 }); 136 203 137 204 spawned += 1; 138 205 } 139 206 140 207 if spawned == 0 { 141 - self.state.backfill_notify.notified().await; 208 + // if we didn't spawn anything, wait for notification OR feedback 209 + tokio::select! { 210 + _ = self.state.backfill_notify.notified() => {}, 211 + _ = tokio::time::sleep(Duration::from_secs(1)) => {}, // poll for feedback if idle 212 + } 213 + } else { 214 + // if we spawned tasks, yield briefly to let them start and avoid tight loop 215 + tokio::time::sleep(Duration::from_millis(10)).await; 142 216 } 143 217 } 144 218 } 145 219 } 146 220 147 221 async fn did_task( 148 - state: Arc<AppState>, 222 + state: &Arc<AppState>, 149 223 http: reqwest::Client, 150 224 buffer_tx: BufferTx, 151 - did: Did<'static>, 225 + did: &Did<'static>, 226 + pending_key: Slice, 152 227 _permit: tokio::sync::OwnedSemaphorePermit, 153 228 verify_signatures: bool, 154 - ) -> Result<()> { 229 + ) -> Result<(), BackfillError> { 155 230 let db = &state.db; 156 231 157 232 match process_did(&state, &http, &did, verify_signatures).await { ··· 170 245 let mut batch = db.inner.batch(); 171 246 // remove from pending 172 247 if was_pending { 173 - batch.remove(&db.pending, &did_key); 248 + batch.remove(&db.pending, pending_key); 174 249 } 175 250 // remove from resync 176 251 if was_resync { ··· 201 276 if let Err(e) = buffer_tx.send(IngestMessage::BackfillFinished(did.clone())) { 202 277 error!("failed to send BackfillFinished for {did}: {e}"); 203 278 } 279 + Ok(()) 204 280 } 205 281 Err(e) => { 206 282 let mut was_ratelimited = false; ··· 247 323 }; 248 324 249 325 // 2. calculate backoff and update the other fields 250 - *retry_count += was_ratelimited.then_some(3).unwrap_or(1); 326 + *retry_count += was_ratelimited.then_some(1).unwrap_or(1); 251 327 *next_retry = ResyncState::next_backoff(*retry_count); 252 328 *message = e.to_smolstr(); 329 + 330 + let error_string = e.to_string(); 253 331 254 332 tokio::task::spawn_blocking({ 255 333 let state = state.clone(); ··· 265 343 { 266 344 let mut state: RepoState = 267 345 rmp_serde::from_slice(&state_bytes).into_diagnostic()?; 268 - state.status = RepoStatus::Error(e.to_string().into()); 346 + state.status = RepoStatus::Error(error_string.into()); 269 347 Some(rmp_serde::to_vec(&state).into_diagnostic()?) 270 348 } else { 271 349 None ··· 273 351 274 352 let mut batch = state.db.inner.batch(); 275 353 batch.insert(&state.db.resync, &did_key, serialized_resync_state); 276 - batch.remove(&state.db.pending, &did_key); 354 + batch.remove(&state.db.pending, pending_key); 277 355 if let Some(state_bytes) = serialized_repo_state { 278 356 batch.insert(&state.db.repos, &did_key, state_bytes); 279 357 } ··· 285 363 286 364 state.db.update_count_async("resync", 1).await; 287 365 state.db.update_count_async("pending", -1).await; 366 + 367 + // add error stats 368 + if was_ratelimited { 369 + state.db.update_count_async("error_ratelimited", 1).await; 370 + } else if let BackfillError::Transport(_) = &e { 371 + state.db.update_count_async("error_transport", 1).await; 372 + } else { 373 + state.db.update_count_async("error_generic", 1).await; 374 + } 375 + Err(e) 288 376 } 289 377 } 290 - 291 - // wake worker to pick up more 292 - state.backfill_notify.notify_one(); 293 - Ok(()) 294 378 } 295 379 296 380 #[derive(Debug, Diagnostic, Error)]
+12
src/db/keys.rs
··· 114 114 prefix.push(SEP); 115 115 prefix 116 116 } 117 + 118 + // key format: {timestamp}|{DID} (DID trimmed) 119 + // timestamp is big-endian u64 micros 120 + pub fn pending_key(did: &Did) -> Vec<u8> { 121 + let repo = TrimmedDid::from(did); 122 + let mut key = Vec::with_capacity(8 + 1 + repo.len()); 123 + let ts = chrono::Utc::now().timestamp_micros() as u64; 124 + key.extend_from_slice(&ts.to_be_bytes()); 125 + key.push(SEP); 126 + repo.write_to_vec(&mut key); 127 + key 128 + }
+3 -1
src/ingest/worker.rs
··· 357 357 RepoStatus::Backfilling, 358 358 )?; 359 359 ctx.state.db.update_count("pending", 1); 360 + batch.insert(&ctx.state.db.pending, keys::pending_key(did), &[]); 360 361 batch.commit().into_diagnostic()?; 361 362 ctx.state.notify_backfill(); 362 363 return Ok(RepoProcessResult::Ok(repo_state)); ··· 502 503 RepoStatus::Backfilling, 503 504 )?; 504 505 ctx.state.db.update_count("pending", 1); 506 + batch.insert(&ctx.state.db.pending, keys::pending_key(did), &[]); 505 507 batch.commit().into_diagnostic()?; 506 508 ctx.repo_cache 507 509 .insert(did.clone().into_static(), repo_state.clone().into_static()); ··· 562 564 &repo_key, 563 565 crate::db::ser_repo_state(&new_state)?, 564 566 ); 565 - batch.insert(&ctx.state.db.pending, &repo_key, &[]); 567 + batch.insert(&ctx.state.db.pending, keys::pending_key(did), &[]); 566 568 ctx.state.db.update_count("repos", 1); 567 569 ctx.state.db.update_count("pending", 1); 568 570 batch.commit().into_diagnostic()?;