at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall
at main 163 lines 5.6 kB view raw
1use crate::db::types::TrimmedDid; 2use crate::db::{self, deser_repo_state}; 3use crate::ops; 4use crate::state::AppState; 5use crate::types::{GaugeState, RepoStatus, ResyncState}; 6use miette::{IntoDiagnostic, Result}; 7use std::sync::Arc; 8use std::time::Duration; 9use tracing::{debug, error, info, warn}; 10 11pub fn queue_gone_backfills(state: &Arc<AppState>) -> Result<()> { 12 debug!("scanning for deactivated/takendown repos to retry..."); 13 let mut transitions = Vec::new(); 14 15 let mut batch = state.db.inner.batch(); 16 17 for guard in state.db.resync.iter() { 18 let (key, val) = guard.into_inner().into_diagnostic()?; 19 let did = match TrimmedDid::try_from(key.as_ref()) { 20 Ok(did) => did.to_did(), 21 Err(e) => { 22 error!(err = %e, "invalid did in db, skipping"); 23 continue; 24 } 25 }; 26 27 if let Ok(resync_state) = rmp_serde::from_slice::<ResyncState>(&val) { 28 if matches!(resync_state, ResyncState::Gone { .. }) { 29 debug!(did = %did, "queuing retry for gone repo"); 30 31 let Some(state_bytes) = state.db.repos.get(&key).into_diagnostic()? else { 32 warn!(did = %did, "repo state not found"); 33 continue; 34 }; 35 36 // update repo state back to backfilling 37 let repo_state = deser_repo_state(&state_bytes)?; 38 ops::update_repo_status( 39 &mut batch, 40 &state.db, 41 &did, 42 repo_state, 43 RepoStatus::Backfilling, 44 )?; 45 46 transitions.push((GaugeState::Resync(None), GaugeState::Pending)); 47 } 48 } 49 } 50 51 if transitions.is_empty() { 52 return Ok(()); 53 } 54 55 batch.commit().into_diagnostic()?; 56 57 for (old_gauge, new_gauge) in &transitions { 58 state.db.update_gauge_diff(old_gauge, new_gauge); 59 } 60 61 state.notify_backfill(); 62 63 info!(count = transitions.len(), "queued gone backfills"); 64 Ok(()) 65} 66 67pub fn retry_worker(state: Arc<AppState>) { 68 let db = &state.db; 69 info!("retry worker started"); 70 loop { 71 // sleep first (e.g., check every minute) 72 std::thread::sleep(Duration::from_secs(60)); 73 74 let now = chrono::Utc::now().timestamp(); 75 let mut transitions = Vec::new(); 76 77 let mut batch = state.db.inner.batch(); 78 79 for guard in db.resync.iter() { 80 let (key, value) = match guard.into_inner() { 81 Ok(t) => t, 82 Err(e) => { 83 error!(err = %e, "failed to get resync state"); 84 db::check_poisoned(&e); 85 continue; 86 } 87 }; 88 let did = match TrimmedDid::try_from(key.as_ref()) { 89 Ok(did) => did.to_did(), 90 Err(e) => { 91 error!(err = %e, "invalid did in db, skipping"); 92 continue; 93 } 94 }; 95 96 match rmp_serde::from_slice::<ResyncState>(&value) { 97 Ok(ResyncState::Error { 98 kind, next_retry, .. 99 }) => { 100 if next_retry <= now { 101 debug!(did = %did, "retrying backfill"); 102 103 let state_bytes = match state.db.repos.get(&key).into_diagnostic() { 104 Ok(b) => b, 105 Err(err) => { 106 error!(did = %did, err = %err, "failed to get repo state"); 107 continue; 108 } 109 }; 110 let Some(state_bytes) = state_bytes else { 111 error!(did = %did, "repo state not found"); 112 continue; 113 }; 114 115 let repo_state = match deser_repo_state(&state_bytes) { 116 Ok(s) => s, 117 Err(e) => { 118 error!(did = %did, err = %e, "failed to deserialize repo state"); 119 continue; 120 } 121 }; 122 let res = ops::update_repo_status( 123 &mut batch, 124 &state.db, 125 &did, 126 repo_state, 127 RepoStatus::Backfilling, 128 ); 129 if let Err(e) = res { 130 error!(did = %did, err = %e, "failed to update repo status"); 131 continue; 132 } 133 134 transitions.push((GaugeState::Resync(Some(kind)), GaugeState::Pending)); 135 } 136 } 137 Ok(_) => { 138 // not an error state, do nothing 139 } 140 Err(e) => { 141 error!(did = %did, err = %e, "failed to deserialize resync state"); 142 continue; 143 } 144 } 145 } 146 147 if transitions.is_empty() { 148 continue; 149 } 150 151 if let Err(e) = batch.commit() { 152 error!(err = %e, "failed to commit batch"); 153 db::check_poisoned(&e); 154 continue; 155 } 156 157 for (old_gauge, new_gauge) in &transitions { 158 state.db.update_gauge_diff(old_gauge, new_gauge); 159 } 160 state.notify_backfill(); 161 info!(count = transitions.len(), "queued retries"); 162 } 163}