at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall

[crawler] introduce RetryState struct for crawler retries in the db, use chrono across the code

ptr.pet ffa704fd 676472f6

verified
+201 -179
+79 -164
src/crawler/mod.rs
··· 1 + use crate::crawler::throttle::ThrottleHandle; 1 2 use crate::db::{Db, keys, ser_repo_state}; 2 3 use crate::state::AppState; 3 4 use crate::types::RepoState; 5 + use crate::util::{ErrorForStatus, RetryOutcome, RetryWithBackoff, parse_retry_after}; 6 + use chrono::{DateTime, TimeDelta, Utc}; 4 7 use futures::FutureExt; 5 8 use jacquard_api::com_atproto::repo::list_records::ListRecordsOutput; 6 9 use jacquard_api::com_atproto::sync::list_repos::ListReposOutput; ··· 12 15 use reqwest::StatusCode; 13 16 use serde::{Deserialize, Serialize}; 14 17 use smol_str::SmolStr; 15 - use std::future::Future; 16 - use std::ops::Mul; 18 + use std::ops::{Add, Mul, Sub}; 17 19 use std::sync::Arc; 18 20 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; 19 21 use std::time::Duration; 20 22 use tracing::{Instrument, debug, error, info, trace, warn}; 21 23 use url::Url; 22 24 23 - enum CrawlCheckResult { 24 - Signal, 25 - NoSignal, 26 - Retry { retry_after: i64, status: u16 }, 25 + #[derive(Debug, Serialize, Deserialize)] 26 + struct RetryState { 27 + after: DateTime<Utc>, // seconds 28 + duration: TimeDelta, // banned for in seconds 29 + #[serde(serialize_with = "crate::util::ser_status_code")] 30 + #[serde(deserialize_with = "crate::util::deser_status_code")] 31 + status: Option<StatusCode>, 27 32 } 28 33 29 - /// outcome of [`RetryWithBackoff::retry`] when the operation does not succeed. 30 - enum RetryOutcome<E> { 31 - /// ratelimited after exhausting all retries 32 - Ratelimited, 33 - /// non-ratelimit failure, carrying the last error 34 - Failed(E), 35 - } 36 - 37 - /// extension trait that adds `.retry()` to async `FnMut` closures. 38 - /// 39 - /// `on_ratelimit` receives the error and current attempt number. 40 - /// returning `Some(duration)` signals a transient failure and provides the backoff; 41 - /// returning `None` signals a terminal failure. 42 - trait RetryWithBackoff<T, E, Fut>: FnMut() -> Fut 43 - where 44 - Fut: Future<Output = Result<T, E>>, 45 - { 46 - async fn retry( 47 - &mut self, 48 - max_retries: u32, 49 - on_ratelimit: impl Fn(&E, u32) -> Option<Duration>, 50 - ) -> Result<T, RetryOutcome<E>> { 51 - let mut attempt = 0u32; 52 - loop { 53 - match self().await { 54 - Ok(val) => return Ok(val), 55 - Err(e) => match on_ratelimit(&e, attempt) { 56 - Some(_) if attempt >= max_retries => return Err(RetryOutcome::Ratelimited), 57 - Some(backoff) => { 58 - let jitter = Duration::from_millis(rand::rng().random_range(0..2000)); 59 - tokio::time::sleep(backoff + jitter).await; 60 - attempt += 1; 61 - } 62 - None => return Err(RetryOutcome::Failed(e)), 63 - }, 64 - } 34 + impl RetryState { 35 + fn new(secs: i64) -> Self { 36 + let duration = TimeDelta::seconds(secs); 37 + Self { 38 + duration, 39 + after: Utc::now().add(duration), 40 + status: None, 65 41 } 66 42 } 43 + 44 + fn with_status(mut self, code: StatusCode) -> Self { 45 + self.status = Some(code); 46 + self 47 + } 67 48 } 68 49 69 - impl<T, E, F, Fut> RetryWithBackoff<T, E, Fut> for F 70 - where 71 - F: FnMut() -> Fut, 72 - Fut: Future<Output = Result<T, E>>, 73 - { 50 + trait ToRetryState { 51 + fn to_retry_state(&self) -> RetryState; 74 52 } 75 53 76 - /// extension trait that adds `.error_for_status()` to futures returning a reqwest `Response`. 77 - trait ErrorForStatus: Future<Output = Result<reqwest::Response, reqwest::Error>> { 78 - fn error_for_status(self) -> impl Future<Output = Result<reqwest::Response, reqwest::Error>> 79 - where 80 - Self: Sized, 81 - { 82 - futures::FutureExt::map(self, |r| r.and_then(|r| r.error_for_status())) 54 + impl ToRetryState for ThrottleHandle { 55 + fn to_retry_state(&self) -> RetryState { 56 + let after = chrono::DateTime::from_timestamp_secs(self.throttled_until()).unwrap(); 57 + RetryState { 58 + duration: after.sub(Utc::now()), 59 + after, 60 + status: None, 61 + } 83 62 } 84 63 } 85 64 86 - impl<F: Future<Output = Result<reqwest::Response, reqwest::Error>>> ErrorForStatus for F {} 87 - 88 - /// extracts a retry delay in seconds from rate limit response headers. 89 - /// 90 - /// checks in priority order: 91 - /// - `retry-after: <seconds>` (relative) 92 - /// - `ratelimit-reset: <unix timestamp>` (absolute) (ref pds sends this) 93 - fn parse_retry_after(resp: &reqwest::Response) -> Option<u64> { 94 - let headers = resp.headers(); 95 - 96 - let retry_after = headers 97 - .get(reqwest::header::RETRY_AFTER) 98 - .and_then(|v| v.to_str().ok()) 99 - .and_then(|s| s.parse::<u64>().ok()); 100 - 101 - let rate_limit_reset = headers 102 - .get("ratelimit-reset") 103 - .and_then(|v| v.to_str().ok()) 104 - .and_then(|s| s.parse::<i64>().ok()) 105 - .map(|ts| { 106 - let now = chrono::Utc::now().timestamp(); 107 - (ts - now).max(1) as u64 108 - }); 109 - 110 - retry_after.or(rate_limit_reset) 65 + enum CrawlCheckResult { 66 + Signal, 67 + NoSignal, 68 + Retry(RetryState), 111 69 } 112 70 113 - // cloudflare-specific status codes 114 - const CONNECTION_TIMEOUT: StatusCode = unsafe { 115 - match StatusCode::from_u16(522) { 116 - Ok(s) => s, 117 - _ => std::hint::unreachable_unchecked(), 71 + impl From<RetryState> for CrawlCheckResult { 72 + fn from(value: RetryState) -> Self { 73 + Self::Retry(value) 118 74 } 119 - }; 120 - const SITE_FROZEN: StatusCode = unsafe { 121 - match StatusCode::from_u16(530) { 122 - Ok(s) => s, 123 - _ => std::hint::unreachable_unchecked(), 124 - } 125 - }; 75 + } 126 76 127 77 fn is_throttle_worthy(e: &reqwest::Error) -> bool { 128 78 use std::error::Error; ··· 147 97 StatusCode::BAD_GATEWAY 148 98 | StatusCode::SERVICE_UNAVAILABLE 149 99 | StatusCode::GATEWAY_TIMEOUT 150 - | CONNECTION_TIMEOUT 151 - | SITE_FROZEN 100 + | crate::util::CONNECTION_TIMEOUT 101 + | crate::util::SITE_FROZEN 152 102 ) 153 103 }) 154 104 } ··· 190 140 "rate limited resolving identity, giving up" 191 141 ); 192 142 // no pds handle to read retry_after from; use a short default 193 - let retry_after = chrono::Utc::now().timestamp() + 60; 194 - return ( 195 - did, 196 - CrawlCheckResult::Retry { 197 - retry_after, 198 - status: 429, 199 - }, 200 - ); 143 + return (did, RetryState::new(60).into()); 201 144 } 202 145 Err(RetryOutcome::Failed(e)) => { 203 146 error!(err = %e, "failed to resolve identity"); 204 - let retry_after = chrono::Utc::now().timestamp() + 60; 205 - return ( 206 - did, 207 - CrawlCheckResult::Retry { 208 - retry_after, 209 - status: 0, 210 - }, 211 - ); 147 + return (did, RetryState::new(60).into()); 212 148 } 213 149 }; 214 150 215 151 let throttle = throttler.get_handle(&pds_url).await; 216 152 if throttle.is_throttled() { 217 153 trace!(host = pds_url.host_str(), "skipping throttled pds"); 218 - return ( 219 - did, 220 - CrawlCheckResult::Retry { 221 - retry_after: throttle.throttled_until(), 222 - status: 0, 223 - }, 224 - ); 154 + return (did, throttle.to_retry_state().into()); 225 155 } 226 156 227 157 let _permit = throttle.acquire().unit_error().or_failure(&throttle, || ()); ··· 230 160 host = pds_url.host_str(), 231 161 "pds failed while waiting for permit" 232 162 ); 233 - return ( 234 - did, 235 - CrawlCheckResult::Retry { 236 - retry_after: throttle.throttled_until(), 237 - status: 0, 238 - }, 239 - ); 163 + return (did, throttle.to_retry_state().into()); 240 164 }; 241 165 242 166 enum RequestError { ··· 280 204 } 281 205 Err(RequestError::RateLimited(secs)) => { 282 206 throttle.record_ratelimit(secs); 283 - return CrawlCheckResult::Retry { 284 - retry_after: throttle.throttled_until(), 285 - status: 429, 286 - }; 207 + return throttle 208 + .to_retry_state() 209 + .with_status(StatusCode::TOO_MANY_REQUESTS) 210 + .into(); 287 211 } 288 212 Err(RequestError::Throttled) => { 289 - return CrawlCheckResult::Retry { 290 - retry_after: throttle.throttled_until(), 291 - status: 0, 292 - }; 213 + return throttle.to_retry_state().into(); 293 214 } 294 215 Err(RequestError::Reqwest(e)) => { 295 216 if is_throttle_worthy(&e) { 296 217 if let Some(mins) = throttle.record_failure() { 297 218 warn!(url = %pds_url, mins, "throttling pds due to hard failure"); 298 219 } 299 - return CrawlCheckResult::Retry { 300 - retry_after: throttle.throttled_until(), 301 - status: e.status().map_or(0, |s| s.as_u16()), 302 - }; 220 + let mut retry_state = throttle.to_retry_state(); 221 + retry_state.status = e.status(); 222 + return retry_state.into(); 303 223 } 304 224 305 225 match e.status() { ··· 313 233 } 314 234 _ => { 315 235 error!(err = %e, "repo errored"); 316 - return CrawlCheckResult::Retry { 317 - retry_after: chrono::Utc::now().timestamp() + 60, 318 - status: e.status().map_or(0, |s| s.as_u16()), 319 - }; 236 + let mut retry_state = RetryState::new(60 * 15); 237 + retry_state.status = e.status(); 238 + return retry_state.into(); 320 239 } 321 240 } 322 241 } ··· 326 245 Ok(b) => b, 327 246 Err(e) => { 328 247 error!(err = %e, "failed to read listRecords response"); 329 - return CrawlCheckResult::Retry { 330 - retry_after: chrono::Utc::now().timestamp() + 60, 331 - status: 0, 332 - }; 248 + return RetryState::new(60 * 5).into(); 333 249 } 334 250 }; 335 251 ··· 338 254 Ok(_) => {} 339 255 Err(e) => { 340 256 error!(err = %e, "failed to parse listRecords response"); 341 - return CrawlCheckResult::Retry { 342 - retry_after: chrono::Utc::now().timestamp() + 60, 343 - status: 0, 344 - }; 257 + return RetryState::new(60 * 10).into(); 345 258 } 346 259 } 347 260 ··· 478 391 479 392 loop { 480 393 match crawler.process_retry_queue() { 481 - Ok(Some(next_ts)) => { 482 - let secs = (next_ts - chrono::Utc::now().timestamp()).max(1) as u64; 483 - sleep(Duration::from_secs(secs)); 394 + Ok(Some(next)) => { 395 + let secs = next.signed_duration_since(Utc::now()); 396 + sleep(secs.to_std().expect("that time delta was positive")); 484 397 } 485 398 Ok(None) => { 486 399 sleep(Duration::from_secs(60)); ··· 706 619 /// scan the retry queue for entries whose `retry_after` timestamp has passed, 707 620 /// retry them, and return the earliest still-pending timestamp (if any) so the 708 621 /// caller knows when to wake up next. 709 - fn process_retry_queue(&self) -> Result<Option<i64>> { 622 + fn process_retry_queue(&self) -> Result<Option<DateTime<Utc>>> { 710 623 let db = &self.state.db; 711 - let now = chrono::Utc::now().timestamp(); 624 + let now = Utc::now(); 712 625 713 626 let mut ready: Vec<Did> = Vec::new(); 714 - let mut next_retry: Option<i64> = None; 627 + let mut next_retry: Option<DateTime<Utc>> = None; 715 628 716 629 let mut rng: SmallRng = rand::make_rng(); 717 630 718 631 let mut batch = db.inner.batch(); 719 632 for guard in db.crawler.prefix(keys::CRAWLER_RETRY_PREFIX) { 720 633 let (key, val) = guard.into_inner().into_diagnostic()?; 721 - let (retry_after, _) = keys::crawler_retry_parse_value(&val)?; 634 + let RetryState { 635 + after, duration, .. 636 + }: RetryState = rmp_serde::from_slice(&val).into_diagnostic()?; 722 637 let did = keys::crawler_retry_parse_key(&key)?.to_did(); 723 638 724 639 // we check an extra backoff of 1 - 7% just to make it less likely for 725 640 // many requests to coincide with each other 726 - let backoff = 727 - ((retry_after - now).max(0) as f64).mul(rng.random_range(0.01..0.07)) as i64; 728 - if retry_after + backoff > now { 641 + let backoff = TimeDelta::seconds( 642 + duration.as_seconds_f64().mul(rng.random_range(0.01..0.07)) as i64, 643 + ); 644 + if after + backoff > now { 729 645 next_retry = Some( 730 646 next_retry 731 - .map(|earliest| earliest.min(retry_after)) 732 - .unwrap_or(retry_after), 647 + .map(|earliest| earliest.min(after)) 648 + .unwrap_or(after), 733 649 ); 734 650 continue; 735 651 } ··· 797 713 valid.push(did); 798 714 } 799 715 CrawlCheckResult::NoSignal => {} 800 - CrawlCheckResult::Retry { 801 - retry_after, 802 - status, 803 - } => { 716 + CrawlCheckResult::Retry(state) => { 804 717 batch.insert( 805 718 &db.crawler, 806 719 keys::crawler_retry_key(&did), 807 - keys::crawler_retry_value(retry_after, status), 720 + rmp_serde::to_vec(&state) 721 + .into_diagnostic() 722 + .wrap_err("cant ser retry state")?, 808 723 ); 809 724 } 810 725 }
-15
src/db/keys.rs
··· 143 143 key 144 144 } 145 145 146 - /// value format: `<retry_after: i64 BE><status: u16 BE>` 147 - pub fn crawler_retry_value(retry_after: i64, status: u16) -> [u8; 10] { 148 - let mut buf = [0u8; 10]; 149 - buf[..8].copy_from_slice(&retry_after.to_be_bytes()); 150 - buf[8..].copy_from_slice(&status.to_be_bytes()); 151 - buf 152 - } 153 - 154 - pub fn crawler_retry_parse_value(val: &[u8]) -> miette::Result<(i64, u16)> { 155 - miette::ensure!(val.len() >= 10, "crawler retry value too short"); 156 - let retry_after = i64::from_be_bytes(val[..8].try_into().unwrap()); 157 - let status = u16::from_be_bytes(val[8..10].try_into().unwrap()); 158 - Ok((retry_after, status)) 159 - } 160 - 161 146 pub fn crawler_retry_parse_key(key: &[u8]) -> miette::Result<TrimmedDid<'_>> { 162 147 TrimmedDid::try_from(&key[CRAWLER_RETRY_PREFIX.len()..]) 163 148 }
+1
src/lib.rs
··· 9 9 pub mod resolver; 10 10 pub mod state; 11 11 pub mod types; 12 + pub mod util;
+121
src/util.rs
··· 1 + use std::time::Duration; 2 + 3 + use rand::RngExt; 4 + use reqwest::StatusCode; 5 + use serde::{Deserialize, Deserializer, Serializer}; 6 + 7 + /// outcome of [`RetryWithBackoff::retry`] when the operation does not succeed. 8 + pub enum RetryOutcome<E> { 9 + /// ratelimited after exhausting all retries 10 + Ratelimited, 11 + /// non-ratelimit failure, carrying the last error 12 + Failed(E), 13 + } 14 + 15 + /// extension trait that adds `.retry()` to async `FnMut` closures. 16 + /// 17 + /// `on_ratelimit` receives the error and current attempt number. 18 + /// returning `Some(duration)` signals a transient failure and provides the backoff; 19 + /// returning `None` signals a terminal failure. 20 + pub trait RetryWithBackoff<T, E, Fut>: FnMut() -> Fut 21 + where 22 + Fut: Future<Output = Result<T, E>>, 23 + { 24 + #[allow(async_fn_in_trait)] 25 + async fn retry( 26 + &mut self, 27 + max_retries: u32, 28 + on_ratelimit: impl Fn(&E, u32) -> Option<Duration>, 29 + ) -> Result<T, RetryOutcome<E>> { 30 + let mut attempt = 0u32; 31 + loop { 32 + match self().await { 33 + Ok(val) => return Ok(val), 34 + Err(e) => match on_ratelimit(&e, attempt) { 35 + Some(_) if attempt >= max_retries => return Err(RetryOutcome::Ratelimited), 36 + Some(backoff) => { 37 + // jitter the backoff 38 + let backoff = rand::rng().random_range((backoff / 2)..backoff); 39 + tokio::time::sleep(backoff).await; 40 + attempt += 1; 41 + } 42 + None => return Err(RetryOutcome::Failed(e)), 43 + }, 44 + } 45 + } 46 + } 47 + } 48 + 49 + impl<T, E, F, Fut> RetryWithBackoff<T, E, Fut> for F 50 + where 51 + F: FnMut() -> Fut, 52 + Fut: Future<Output = Result<T, E>>, 53 + { 54 + } 55 + 56 + /// extension trait that adds `.error_for_status()` to futures returning a reqwest `Response`. 57 + pub trait ErrorForStatus: Future<Output = Result<reqwest::Response, reqwest::Error>> { 58 + fn error_for_status(self) -> impl Future<Output = Result<reqwest::Response, reqwest::Error>> 59 + where 60 + Self: Sized, 61 + { 62 + futures::FutureExt::map(self, |r| r.and_then(|r| r.error_for_status())) 63 + } 64 + } 65 + 66 + impl<F: Future<Output = Result<reqwest::Response, reqwest::Error>>> ErrorForStatus for F {} 67 + 68 + /// extracts a retry delay in seconds from rate limit response headers. 69 + /// 70 + /// checks in priority order: 71 + /// - `retry-after: <seconds>` (relative) 72 + /// - `ratelimit-reset: <unix timestamp>` (absolute) (ref pds sends this) 73 + pub fn parse_retry_after(resp: &reqwest::Response) -> Option<u64> { 74 + let headers = resp.headers(); 75 + 76 + let retry_after = headers 77 + .get(reqwest::header::RETRY_AFTER) 78 + .and_then(|v| v.to_str().ok()) 79 + .and_then(|s| s.parse::<u64>().ok()); 80 + 81 + let rate_limit_reset = headers 82 + .get("ratelimit-reset") 83 + .and_then(|v| v.to_str().ok()) 84 + .and_then(|s| s.parse::<i64>().ok()) 85 + .map(|ts| { 86 + let now = chrono::Utc::now().timestamp(); 87 + (ts - now).max(1) as u64 88 + }); 89 + 90 + retry_after.or(rate_limit_reset) 91 + } 92 + 93 + // cloudflare-specific status codes 94 + pub const CONNECTION_TIMEOUT: StatusCode = unsafe { 95 + match StatusCode::from_u16(522) { 96 + Ok(s) => s, 97 + _ => std::hint::unreachable_unchecked(), 98 + } 99 + }; 100 + pub const SITE_FROZEN: StatusCode = unsafe { 101 + match StatusCode::from_u16(530) { 102 + Ok(s) => s, 103 + _ => std::hint::unreachable_unchecked(), 104 + } 105 + }; 106 + 107 + pub fn ser_status_code<S: Serializer>(s: &Option<StatusCode>, ser: S) -> Result<S::Ok, S::Error> { 108 + match s { 109 + Some(code) => ser.serialize_some(&code.as_u16()), 110 + None => ser.serialize_none(), 111 + } 112 + } 113 + 114 + pub fn deser_status_code<'de, D: Deserializer<'de>>( 115 + deser: D, 116 + ) -> Result<Option<StatusCode>, D::Error> { 117 + Option::<u16>::deserialize(deser)? 118 + .map(StatusCode::from_u16) 119 + .transpose() 120 + .map_err(serde::de::Error::custom) 121 + }