at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall

[backfill] handle backfill resync transition from gone to errored properly, dont error on 429 errors

ptr.pet 64a0e115 5053bab7

verified
+52 -23
+52 -23
src/backfill/mod.rs
··· 13 13 use jacquard_repo::mst::Mst; 14 14 use jacquard_repo::{BlockStore, MemoryBlockStore}; 15 15 use miette::{IntoDiagnostic, Result}; 16 + use reqwest::StatusCode; 16 17 use smol_str::{SmolStr, ToSmolStr}; 17 18 use std::collections::HashMap; 18 19 use std::sync::Arc; ··· 133 134 .into_diagnostic()??; 134 135 } 135 136 Err(e) => { 136 - { 137 - if e.downcast_ref::<ClientError>() 138 - .map_or(false, |e| matches!(e.kind(), ClientErrorKind::Transport)) 139 - { 140 - let reason = e.root_cause(); 141 - error!("backfill failed for {did}: {e}: {reason}"); 142 - } else { 143 - warn!("backfill failed for {did}: {e}"); 137 + let mut was_ratelimited = false; 138 + 'err: { 139 + let Some(e) = e.downcast_ref::<ClientError>() else { 140 + error!("failed for {did}: {e}"); 141 + break 'err; 142 + }; 143 + 144 + match e.kind() { 145 + ClientErrorKind::Http { 146 + status: StatusCode::TOO_MANY_REQUESTS, 147 + } => { 148 + debug!("failed for {did}: too many requests"); 149 + was_ratelimited = true; 150 + break 'err; 151 + } 152 + ClientErrorKind::Transport => { 153 + let reason = e.source_err().unwrap(); 154 + error!("failed for {did}: {e}: {reason}"); 155 + break 'err; 156 + } 157 + _ => { 158 + error!("failed for {did}: {e}"); 159 + break 'err; 160 + } 144 161 } 145 162 } 163 + 146 164 let did_key = keys::repo_key(&did); 147 165 148 166 // 1. get current retry count 149 - let mut retry_count = 0; 150 - if let Ok(Some(bytes)) = Db::get(db.resync.clone(), &did_key).await { 151 - if let Ok(ResyncState::Error { 152 - retry_count: old_count, 153 - .. 154 - }) = rmp_serde::from_slice::<ResyncState>(&bytes) 155 - { 156 - retry_count = old_count + 1; 157 - } 158 - } 167 + let mut resync_state = Db::get(db.resync.clone(), &did_key) 168 + .await 169 + .and_then(|b| { 170 + b.map(|b| rmp_serde::from_slice::<ResyncState>(&b).into_diagnostic()) 171 + .transpose() 172 + })? 173 + .and_then(|s| { 174 + matches!(s, ResyncState::Gone { .. }) 175 + .then_some(None) 176 + .unwrap_or(Some(s)) 177 + }) 178 + .unwrap_or_else(|| ResyncState::Error { 179 + message: SmolStr::new_static(""), 180 + retry_count: 0, 181 + next_retry: 0, 182 + }); 159 183 160 - // 2. calculate backoff 161 - let next_retry = ResyncState::next_backoff(retry_count); 162 - 163 - let resync_state = ResyncState::Error { 164 - message: e.to_string().into(), 184 + let ResyncState::Error { 185 + message, 165 186 retry_count, 166 187 next_retry, 188 + } = &mut resync_state 189 + else { 190 + unreachable!("we handled the gone case above"); 167 191 }; 192 + 193 + // 2. calculate backoff and update the other fields 194 + *retry_count += was_ratelimited.then_some(3).unwrap_or(1); 195 + *next_retry = ResyncState::next_backoff(*retry_count); 196 + *message = e.to_smolstr(); 168 197 169 198 tokio::task::spawn_blocking({ 170 199 let state = state.clone();