at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall

[backfill] add startup retry for deactivated repositories

ptr.pet b055ce25 868967ec

verified
+100 -80
+60 -42
src/backfill/manager.rs
··· 1 1 use crate::db::keys::reconstruct_did; 2 - use crate::db::Db; 2 + use crate::db::{deser_repo_state, ser_repo_state, Db}; 3 3 use crate::state::AppState; 4 - use crate::types::ErrorState; 5 - use fjall::Slice; 4 + use crate::types::{ErrorState, RepoStatus}; 6 5 use miette::{IntoDiagnostic, Result}; 7 6 use std::sync::Arc; 8 7 use std::time::Duration; 9 8 use tracing::{debug, error, info}; 10 9 11 - pub async fn queue_pending_backfills(state: &AppState) -> Result<()> { 10 + pub fn queue_pending_backfills(state: &AppState) -> Result<()> { 12 11 info!("scanning for pending backfills..."); 13 12 let mut count = 0; 14 13 15 - let ks = state.db.pending.clone(); 16 - let items = tokio::task::spawn_blocking(move || { 17 - let mut collected: Vec<Slice> = Vec::new(); 18 - for item in ks.iter() { 19 - let k = item.key().into_diagnostic()?; 20 - collected.push(k); 21 - } 22 - Ok::<Vec<Slice>, miette::Report>(collected) 23 - }) 24 - .await 25 - .into_diagnostic()??; 26 - 27 - for key in items { 14 + for guard in state.db.pending.iter() { 15 + let key = guard.key().into_diagnostic()?; 28 16 let did_str = String::from_utf8_lossy(&key); 29 17 let Ok(did) = reconstruct_did(&did_str) else { 30 18 error!("invalid did in db, skipping: did:{did_str}"); ··· 43 31 Ok(()) 44 32 } 45 33 46 - pub async fn retry_worker(state: Arc<AppState>) { 34 + pub fn queue_gone_backfills(state: &Arc<AppState>) -> Result<()> { 35 + info!("scanning for deactivated/takendown repos to retry..."); 36 + let mut count = 0; 37 + 38 + for guard in state.db.repos.iter() { 39 + let (key, val) = guard.into_inner().into_diagnostic()?; 40 + let did_str = String::from_utf8_lossy(&key); 41 + let Ok(did) = reconstruct_did(&did_str) else { 42 + error!("invalid did in db, skipping: did:{did_str}"); 43 + continue; 44 + }; 45 + if let Ok(repo_state) = deser_repo_state(&val) { 46 + if matches!( 47 + repo_state.status, 48 + RepoStatus::Deactivated | RepoStatus::Takendown | RepoStatus::Suspended 49 + ) { 50 + info!("queuing retry for gone repo: {did}"); 51 + 52 + let mut new_state = repo_state.clone(); 53 + new_state.status = RepoStatus::Backfilling; 54 + let bytes = ser_repo_state(&new_state)?; 55 + 56 + let mut batch = state.db.inner.batch(); 57 + batch.insert(&state.db.repos, key.clone(), bytes); 58 + batch.insert(&state.db.pending, key, Vec::new()); 59 + batch.commit().into_diagnostic()?; 60 + 61 + if let Err(e) = state.backfill_tx.send(did.clone()) { 62 + error!("failed to queue retry for {did}: {e}"); 63 + } else { 64 + count += 1; 65 + } 66 + } 67 + } 68 + } 69 + 70 + info!("queued {count} gone backfills"); 71 + Ok(()) 72 + } 73 + 74 + pub fn retry_worker(state: Arc<AppState>) { 47 75 let db = &state.db; 48 76 info!("retry worker started"); 49 77 loop { 50 78 // sleep first (e.g., check every minute) 51 - tokio::time::sleep(Duration::from_secs(60)).await; 79 + std::thread::sleep(Duration::from_secs(60)); 52 80 53 81 let now = chrono::Utc::now().timestamp(); 54 82 let mut count = 0; 55 83 56 - let ks = db.errors.clone(); 57 - let items = tokio::task::spawn_blocking(move || { 58 - let mut collected: Vec<(Slice, Slice)> = Vec::new(); 59 - for item in ks.iter() { 60 - let (k, v) = item.into_inner().into_diagnostic()?; 61 - collected.push((k, v)); 62 - } 63 - Ok::<_, miette::Report>(collected) 64 - }) 65 - .await 66 - .into_diagnostic() 67 - .flatten() 68 - .unwrap_or_else(|e| { 69 - error!("failed to scan errors: {e}"); 70 - Db::check_poisoned_report(&e); 71 - Vec::new() 72 - }); 73 - 74 - for (key, value) in items { 84 + for guard in db.errors.iter() { 85 + let (key, value) = match guard.into_inner() { 86 + Ok(t) => t, 87 + Err(e) => { 88 + error!("failed to get error: {e}"); 89 + Db::check_poisoned(&e); 90 + continue; 91 + } 92 + }; 75 93 let did_str = String::from_utf8_lossy(&key); 76 94 let Ok(did) = reconstruct_did(&did_str) else { 77 - error!("invalid did in db, skipping: {did_str}"); 95 + error!("invalid did in db, skipping: did:{did_str}"); 78 96 continue; 79 97 }; 80 98 if let Ok(err_state) = rmp_serde::from_slice::<ErrorState>(&value) { ··· 82 100 info!("retrying backfill for {did}"); 83 101 84 102 // move back to pending 85 - if let Err(e) = Db::insert(db.pending.clone(), key, Vec::new()).await { 103 + if let Err(e) = db.pending.insert(key, Vec::new()) { 86 104 error!("failed to move {did} to pending: {e}"); 87 - Db::check_poisoned_report(&e); 105 + Db::check_poisoned(&e); 88 106 continue; 89 107 } 90 108 91 109 // queue 92 - if let Err(e) = state.backfill_tx.send(did.to_owned()) { 110 + if let Err(e) = state.backfill_tx.send(did.clone()) { 93 111 error!("failed to queue retry for {did}: {e}"); 94 112 } else { 95 113 count += 1;
+40 -38
src/main.rs
··· 15 15 use crate::db::Db; 16 16 use crate::ingest::Ingestor; 17 17 use crate::state::AppState; 18 + use futures::TryFutureExt; 19 + use miette::IntoDiagnostic; 18 20 use mimalloc::MiMalloc; 19 21 use std::sync::atomic::Ordering; 20 22 use std::sync::Arc; 23 + use tokio::task::spawn_blocking; 21 24 use tracing::{error, info}; 22 25 23 26 #[global_allocator] ··· 48 51 tokio::spawn({ 49 52 let state = state.clone(); 50 53 let timeout = cfg.repo_fetch_timeout; 51 - async move { 52 - let worker = Worker::new(state, backfill_rx, timeout, cfg.backfill_concurrency_limit); 53 - worker.run().await; 54 - } 54 + Worker::new(state, backfill_rx, timeout, cfg.backfill_concurrency_limit).run() 55 55 }); 56 56 57 - if let Err(e) = crate::backfill::manager::queue_pending_backfills(&state).await { 57 + if let Err(e) = spawn_blocking({ 58 + let state = state.clone(); 59 + move || crate::backfill::manager::queue_pending_backfills(&state) 60 + }) 61 + .await 62 + .into_diagnostic()? 63 + { 58 64 error!("failed to queue pending backfills: {e}"); 59 65 Db::check_poisoned_report(&e); 60 66 } 61 67 62 - tokio::spawn({ 68 + if let Err(e) = spawn_blocking({ 69 + let state = state.clone(); 70 + move || crate::backfill::manager::queue_gone_backfills(&state) 71 + }) 72 + .await 73 + .into_diagnostic()? 74 + { 75 + error!("failed to queue gone backfills: {e}"); 76 + Db::check_poisoned_report(&e); 77 + } 78 + 79 + std::thread::spawn({ 63 80 let state = state.clone(); 64 - async move { 65 - crate::backfill::manager::retry_worker(state).await; 66 - } 81 + move || crate::backfill::manager::retry_worker(state) 67 82 }); 68 83 69 84 tokio::spawn({ ··· 95 110 } 96 111 }); 97 112 98 - tokio::spawn({ 113 + std::thread::spawn({ 99 114 let state = state.clone(); 100 115 let persist_interval = cfg.cursor_save_interval; 101 116 102 - async move { 117 + move || { 103 118 info!("persistence worker started"); 104 119 loop { 105 - tokio::time::sleep(persist_interval).await; 120 + std::thread::sleep(persist_interval); 106 121 107 122 let seq = state.cur_firehose.load(Ordering::SeqCst); 108 123 const CURSOR_KEY: &[u8] = b"firehose_cursor"; 109 - if let Err(e) = Db::insert( 110 - state.db.cursors.clone(), 111 - CURSOR_KEY, 112 - seq.to_string().into_bytes(), 113 - ) 114 - .await 124 + if let Err(e) = state 125 + .db 126 + .cursors 127 + .insert(CURSOR_KEY, seq.to_string().into_bytes()) 115 128 { 116 129 error!("failed to save cursor: {e}"); 117 - Db::check_poisoned_report(&e); 130 + Db::check_poisoned(&e); 118 131 } 119 132 120 - let state = state.clone(); 121 - let res = tokio::task::spawn_blocking(move || state.db.persist()).await; 122 - 123 - match res { 124 - Ok(Err(e)) => { 125 - error!("db persist failed: {e}"); 126 - Db::check_poisoned_report(&e); 127 - } 128 - Err(e) => { 129 - error!("persistence task join failed: {e}"); 130 - } 131 - _ => {} 133 + if let Err(e) = state.db.persist() { 134 + error!("db persist failed: {e}"); 135 + Db::check_poisoned_report(&e); 132 136 } 133 137 } 134 138 } ··· 138 142 tokio::spawn({ 139 143 let state = state.clone(); 140 144 let crawler_host = cfg.relay_host.clone(); 141 - async move { 142 - let crawler = Crawler::new(state, crawler_host); 143 - if let Err(e) = crawler.run().await { 144 - error!("crawler died: {e}"); 145 - Db::check_poisoned_report(&e); 146 - } 147 - } 145 + 146 + Crawler::new(state, crawler_host).run().inspect_err(|e| { 147 + error!("crawler died: {e}"); 148 + Db::check_poisoned_report(&e); 149 + }) 148 150 }); 149 151 } 150 152