at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall
at main 121 lines 3.9 kB view raw
1use std::time::Duration; 2 3use rand::RngExt; 4use reqwest::StatusCode; 5use serde::{Deserialize, Deserializer, Serializer}; 6 7/// outcome of [`RetryWithBackoff::retry`] when the operation does not succeed. 8pub enum RetryOutcome<E> { 9 /// ratelimited after exhausting all retries 10 Ratelimited, 11 /// non-ratelimit failure, carrying the last error 12 Failed(E), 13} 14 15/// extension trait that adds `.retry()` to async `FnMut` closures. 16/// 17/// `on_ratelimit` receives the error and current attempt number. 18/// returning `Some(duration)` signals a transient failure and provides the backoff; 19/// returning `None` signals a terminal failure. 20pub trait RetryWithBackoff<T, E, Fut>: FnMut() -> Fut 21where 22 Fut: Future<Output = Result<T, E>>, 23{ 24 #[allow(async_fn_in_trait)] 25 async fn retry( 26 &mut self, 27 max_retries: u32, 28 on_ratelimit: impl Fn(&E, u32) -> Option<Duration>, 29 ) -> Result<T, RetryOutcome<E>> { 30 let mut attempt = 0u32; 31 loop { 32 match self().await { 33 Ok(val) => return Ok(val), 34 Err(e) => match on_ratelimit(&e, attempt) { 35 Some(_) if attempt >= max_retries => return Err(RetryOutcome::Ratelimited), 36 Some(backoff) => { 37 // jitter the backoff 38 let backoff = rand::rng().random_range((backoff / 2)..backoff); 39 tokio::time::sleep(backoff).await; 40 attempt += 1; 41 } 42 None => return Err(RetryOutcome::Failed(e)), 43 }, 44 } 45 } 46 } 47} 48 49impl<T, E, F, Fut> RetryWithBackoff<T, E, Fut> for F 50where 51 F: FnMut() -> Fut, 52 Fut: Future<Output = Result<T, E>>, 53{ 54} 55 56/// extension trait that adds `.error_for_status()` to futures returning a reqwest `Response`. 57pub trait ErrorForStatus: Future<Output = Result<reqwest::Response, reqwest::Error>> { 58 fn error_for_status(self) -> impl Future<Output = Result<reqwest::Response, reqwest::Error>> 59 where 60 Self: Sized, 61 { 62 futures::FutureExt::map(self, |r| r.and_then(|r| r.error_for_status())) 63 } 64} 65 66impl<F: Future<Output = Result<reqwest::Response, reqwest::Error>>> ErrorForStatus for F {} 67 68/// extracts a retry delay in seconds from rate limit response headers. 69/// 70/// checks in priority order: 71/// - `retry-after: <seconds>` (relative) 72/// - `ratelimit-reset: <unix timestamp>` (absolute) (ref pds sends this) 73pub fn parse_retry_after(resp: &reqwest::Response) -> Option<u64> { 74 let headers = resp.headers(); 75 76 let retry_after = headers 77 .get(reqwest::header::RETRY_AFTER) 78 .and_then(|v| v.to_str().ok()) 79 .and_then(|s| s.parse::<u64>().ok()); 80 81 let rate_limit_reset = headers 82 .get("ratelimit-reset") 83 .and_then(|v| v.to_str().ok()) 84 .and_then(|s| s.parse::<i64>().ok()) 85 .map(|ts| { 86 let now = chrono::Utc::now().timestamp(); 87 (ts - now).max(1) as u64 88 }); 89 90 retry_after.or(rate_limit_reset) 91} 92 93// cloudflare-specific status codes 94pub const CONNECTION_TIMEOUT: StatusCode = unsafe { 95 match StatusCode::from_u16(522) { 96 Ok(s) => s, 97 _ => std::hint::unreachable_unchecked(), 98 } 99}; 100pub const SITE_FROZEN: StatusCode = unsafe { 101 match StatusCode::from_u16(530) { 102 Ok(s) => s, 103 _ => std::hint::unreachable_unchecked(), 104 } 105}; 106 107pub fn ser_status_code<S: Serializer>(s: &Option<StatusCode>, ser: S) -> Result<S::Ok, S::Error> { 108 match s { 109 Some(code) => ser.serialize_some(&code.as_u16()), 110 None => ser.serialize_none(), 111 } 112} 113 114pub fn deser_status_code<'de, D: Deserializer<'de>>( 115 deser: D, 116) -> Result<Option<StatusCode>, D::Error> { 117 Option::<u16>::deserialize(deser)? 118 .map(StatusCode::from_u16) 119 .transpose() 120 .map_err(serde::de::Error::custom) 121}