at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
1use std::time::Duration;
2
3use rand::RngExt;
4use reqwest::StatusCode;
5use serde::{Deserialize, Deserializer, Serializer};
6
7/// outcome of [`RetryWithBackoff::retry`] when the operation does not succeed.
8pub enum RetryOutcome<E> {
9 /// ratelimited after exhausting all retries
10 Ratelimited,
11 /// non-ratelimit failure, carrying the last error
12 Failed(E),
13}
14
15/// extension trait that adds `.retry()` to async `FnMut` closures.
16///
17/// `on_ratelimit` receives the error and current attempt number.
18/// returning `Some(duration)` signals a transient failure and provides the backoff;
19/// returning `None` signals a terminal failure.
20pub trait RetryWithBackoff<T, E, Fut>: FnMut() -> Fut
21where
22 Fut: Future<Output = Result<T, E>>,
23{
24 #[allow(async_fn_in_trait)]
25 async fn retry(
26 &mut self,
27 max_retries: u32,
28 on_ratelimit: impl Fn(&E, u32) -> Option<Duration>,
29 ) -> Result<T, RetryOutcome<E>> {
30 let mut attempt = 0u32;
31 loop {
32 match self().await {
33 Ok(val) => return Ok(val),
34 Err(e) => match on_ratelimit(&e, attempt) {
35 Some(_) if attempt >= max_retries => return Err(RetryOutcome::Ratelimited),
36 Some(backoff) => {
37 // jitter the backoff
38 let backoff = rand::rng().random_range((backoff / 2)..backoff);
39 tokio::time::sleep(backoff).await;
40 attempt += 1;
41 }
42 None => return Err(RetryOutcome::Failed(e)),
43 },
44 }
45 }
46 }
47}
48
49impl<T, E, F, Fut> RetryWithBackoff<T, E, Fut> for F
50where
51 F: FnMut() -> Fut,
52 Fut: Future<Output = Result<T, E>>,
53{
54}
55
56/// extension trait that adds `.error_for_status()` to futures returning a reqwest `Response`.
57pub trait ErrorForStatus: Future<Output = Result<reqwest::Response, reqwest::Error>> {
58 fn error_for_status(self) -> impl Future<Output = Result<reqwest::Response, reqwest::Error>>
59 where
60 Self: Sized,
61 {
62 futures::FutureExt::map(self, |r| r.and_then(|r| r.error_for_status()))
63 }
64}
65
66impl<F: Future<Output = Result<reqwest::Response, reqwest::Error>>> ErrorForStatus for F {}
67
68/// extracts a retry delay in seconds from rate limit response headers.
69///
70/// checks in priority order:
71/// - `retry-after: <seconds>` (relative)
72/// - `ratelimit-reset: <unix timestamp>` (absolute) (ref pds sends this)
73pub fn parse_retry_after(resp: &reqwest::Response) -> Option<u64> {
74 let headers = resp.headers();
75
76 let retry_after = headers
77 .get(reqwest::header::RETRY_AFTER)
78 .and_then(|v| v.to_str().ok())
79 .and_then(|s| s.parse::<u64>().ok());
80
81 let rate_limit_reset = headers
82 .get("ratelimit-reset")
83 .and_then(|v| v.to_str().ok())
84 .and_then(|s| s.parse::<i64>().ok())
85 .map(|ts| {
86 let now = chrono::Utc::now().timestamp();
87 (ts - now).max(1) as u64
88 });
89
90 retry_after.or(rate_limit_reset)
91}
92
93// cloudflare-specific status codes
94pub const CONNECTION_TIMEOUT: StatusCode = unsafe {
95 match StatusCode::from_u16(522) {
96 Ok(s) => s,
97 _ => std::hint::unreachable_unchecked(),
98 }
99};
100pub const SITE_FROZEN: StatusCode = unsafe {
101 match StatusCode::from_u16(530) {
102 Ok(s) => s,
103 _ => std::hint::unreachable_unchecked(),
104 }
105};
106
107pub fn ser_status_code<S: Serializer>(s: &Option<StatusCode>, ser: S) -> Result<S::Ok, S::Error> {
108 match s {
109 Some(code) => ser.serialize_some(&code.as_u16()),
110 None => ser.serialize_none(),
111 }
112}
113
114pub fn deser_status_code<'de, D: Deserializer<'de>>(
115 deser: D,
116) -> Result<Option<StatusCode>, D::Error> {
117 Option::<u16>::deserialize(deser)?
118 .map(StatusCode::from_u16)
119 .transpose()
120 .map_err(serde::de::Error::custom)
121}