at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall

[crawler] make retry and ban be traits to improve readability, fix busy loop

ptr.pet 571eb26d 106ab406

verified
+121 -73
+19
src/crawler/ban.rs
··· 1 use scc::HashMap; 2 use std::sync::Arc; 3 use std::sync::atomic::{AtomicI64, AtomicUsize, Ordering}; 4 use tokio::sync::Notify; ··· 96 } 97 } 98 }
··· 1 use scc::HashMap; 2 + use std::future::Future; 3 use std::sync::Arc; 4 use std::sync::atomic::{AtomicI64, AtomicUsize, Ordering}; 5 use tokio::sync::Notify; ··· 97 } 98 } 99 } 100 + 101 + /// extension trait that adds `.or_ban()` to any future returning `Result<T, E>`. 102 + #[allow(async_fn_in_trait)] 103 + pub trait OrBan<T, E>: Future<Output = Result<T, E>> { 104 + /// races the future against a ban notification. 105 + /// if the pds is banned before the future completes, returns `on_ban()` immediately. 106 + async fn or_ban(self, handle: &BanHandle, on_ban: impl FnOnce() -> E) -> Result<T, E> 107 + where 108 + Self: Sized, 109 + { 110 + tokio::select! { 111 + res = self => res, 112 + _ = handle.wait_for_ban() => Err(on_ban()), 113 + } 114 + } 115 + } 116 + 117 + impl<T, E, F: Future<Output = Result<T, E>>> OrBan<T, E> for F {}
+102 -73
src/crawler/mod.rs
··· 2 use crate::db::{Db, keys, ser_repo_state}; 3 use crate::state::AppState; 4 use crate::types::RepoState; 5 use jacquard_api::com_atproto::repo::list_records::ListRecordsOutput; 6 use jacquard_api::com_atproto::sync::list_repos::ListReposOutput; 7 use jacquard_common::{IntoStatic, types::string::Did}; ··· 13 use smol_str::SmolStr; 14 use std::future::Future; 15 use std::sync::Arc; 16 - use std::sync::atomic::{AtomicUsize, Ordering}; 17 use std::time::Duration; 18 use tracing::{Instrument, debug, error, info, trace, warn}; 19 use url::Url; ··· 25 Failed(Option<u16>), 26 } 27 28 - /// outcome of [`retry_with_backoff`] when the operation does not succeed. 29 enum RetryOutcome<E> { 30 /// ratelimited after exhausting all retries 31 Ratelimited, ··· 33 Failed(E), 34 } 35 36 - /// retries an async operation with exponential backoff when ratelimited. 37 - /// 38 - /// `op` is called on each attempt and returns `Result<T, E>`. 39 - /// `is_ratelimited` classifies an error as a ratelimit (triggering a retry) 40 - /// versus a fatal failure (returning immediately). 41 - async fn retry_with_backoff<T, E, F, Fut>( 42 - rng: &mut SmallRng, 43 - max_retries: u32, 44 - mut op: F, 45 - is_ratelimited: impl Fn(&E) -> bool, 46 - ) -> Result<T, RetryOutcome<E>> 47 where 48 - F: FnMut() -> Fut, 49 Fut: Future<Output = Result<T, E>>, 50 { 51 - let mut attempt = 0u32; 52 - loop { 53 - match op().await { 54 - Ok(val) => return Ok(val), 55 - Err(e) if is_ratelimited(&e) => { 56 - if attempt < max_retries { 57 let base = Duration::from_secs(1 << attempt); 58 - let jitter = Duration::from_millis(rng.random_range(0..2000)); 59 tokio::time::sleep(base + jitter).await; 60 attempt += 1; 61 - } else { 62 - return Err(RetryOutcome::Ratelimited); 63 } 64 } 65 - Err(e) => return Err(RetryOutcome::Failed(e)), 66 } 67 } 68 } 69 70 // these two are cloudflare specific 71 const CONNECTION_TIMEOUT: StatusCode = unsafe { 72 match StatusCode::from_u16(522) { ··· 135 tracker: Arc<BanTracker>, 136 ) -> (Did<'static>, CrawlCheckResult) { 137 const MAX_RETRIES: u32 = 5; 138 - let mut rng: SmallRng = rand::make_rng(); 139 140 - let pds_url = retry_with_backoff( 141 - &mut rng, 142 - MAX_RETRIES, 143 - || resolver.resolve_identity_info(&did), 144 - |e| matches!(e, crate::resolver::ResolverError::Ratelimited), 145 - ); 146 - let pds_url = match pds_url.await { 147 Ok((url, _)) => url, 148 Err(RetryOutcome::Ratelimited) => { 149 error!( ··· 179 .append_pair("collection", signal) 180 .append_pair("limit", "1"); 181 182 - let res = retry_with_backoff( 183 - &mut rng, 184 - MAX_RETRIES, 185 - || async { 186 - let req = http.get(list_records_url.clone()).send(); 187 - tokio::select! { 188 - res = req => res.and_then(|r| r.error_for_status()).map_err(RequestError::Reqwest), 189 - _ = pds_handle.wait_for_ban() => Err(RequestError::Banned), 190 - } 191 - }, 192 - |e: &RequestError| { 193 matches!(e, RequestError::Reqwest(e) if matches!(e.status(), Some(StatusCode::TOO_MANY_REQUESTS))) 194 - }, 195 - ); 196 - let res = match res.await { 197 Ok(r) => { 198 pds_handle.record_success(); 199 r ··· 284 } 285 286 pub mod ban; 287 - use ban::BanTracker; 288 289 pub struct Crawler { 290 state: Arc<AppState>, ··· 294 resume_pending: usize, 295 count: Arc<AtomicUsize>, 296 crawled_count: Arc<AtomicUsize>, 297 tracker: Arc<BanTracker>, 298 } 299 ··· 324 resume_pending, 325 count: Arc::new(AtomicUsize::new(0)), 326 crawled_count: Arc::new(AtomicUsize::new(0)), 327 tracker: Arc::new(BanTracker::new()), 328 } 329 } ··· 333 use std::time::Instant; 334 let count = self.count.clone(); 335 let crawled_count = self.crawled_count.clone(); 336 let mut last_time = Instant::now(); 337 let mut interval = tokio::time::interval(Duration::from_secs(60)); 338 async move { ··· 340 interval.tick().await; 341 let delta_processed = count.swap(0, Ordering::Relaxed); 342 let delta_crawled = crawled_count.swap(0, Ordering::Relaxed); 343 344 if delta_processed == 0 && delta_crawled == 0 { 345 - debug!("no repos crawled or processed in 60s"); 346 continue; 347 } 348 ··· 396 "throttling: above max pending" 397 ); 398 was_throttled = true; 399 } 400 } else if pending > self.resume_pending as u64 { 401 if !was_throttled { 402 debug!( ··· 405 "throttling: entering cooldown" 406 ); 407 was_throttled = true; 408 } 409 410 loop { ··· 417 resume = self.resume_pending, 418 "cooldown, waiting" 419 ); 420 } 421 break; 422 } else { 423 if was_throttled { 424 info!("throttling released"); 425 was_throttled = false; 426 } 427 break; 428 } ··· 441 .append_pair("cursor", c.as_str()); 442 } 443 444 - let res_result = self.http.get(list_repos_url.clone()).send().await; 445 - let bytes = match res_result { 446 - Ok(res) => { 447 - match res.status() { 448 - StatusCode::TOO_MANY_REQUESTS => { 449 - warn!("rate limited by relay"); 450 - continue; 451 - } 452 - s if !s.is_success() => { 453 - error!(status = %s, "cant crawl"); 454 - continue; 455 - } 456 - _ => {} 457 - } 458 - match res.bytes().await { 459 - Ok(b) => b, 460 - Err(e) => { 461 - error!(err = %e, "cant read listRepos"); 462 - continue; 463 - } 464 - } 465 } 466 Err(e) => { 467 - error!(err = %e, "crawler failed to list repos"); 468 continue; 469 } 470 }; 471 - let output = serde_json::from_slice::<ListReposOutput>(&bytes) 472 - .into_diagnostic()? 473 - .into_static(); 474 475 if output.repos.is_empty() { 476 info!("finished enumeration (or empty page)");
··· 2 use crate::db::{Db, keys, ser_repo_state}; 3 use crate::state::AppState; 4 use crate::types::RepoState; 5 + use futures::TryFutureExt; 6 use jacquard_api::com_atproto::repo::list_records::ListRecordsOutput; 7 use jacquard_api::com_atproto::sync::list_repos::ListReposOutput; 8 use jacquard_common::{IntoStatic, types::string::Did}; ··· 14 use smol_str::SmolStr; 15 use std::future::Future; 16 use std::sync::Arc; 17 + use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; 18 use std::time::Duration; 19 use tracing::{Instrument, debug, error, info, trace, warn}; 20 use url::Url; ··· 26 Failed(Option<u16>), 27 } 28 29 + /// outcome of [`RetryWithBackoff::retry_with_backoff`] when the operation does not succeed. 30 enum RetryOutcome<E> { 31 /// ratelimited after exhausting all retries 32 Ratelimited, ··· 34 Failed(E), 35 } 36 37 + /// extension trait that adds `retry_with_backoff` to async `FnMut` closures. 38 + trait RetryWithBackoff<T, E, Fut>: FnMut() -> Fut 39 where 40 Fut: Future<Output = Result<T, E>>, 41 { 42 + async fn retry( 43 + &mut self, 44 + max_retries: u32, 45 + is_ratelimited: impl Fn(&E) -> bool, 46 + ) -> Result<T, RetryOutcome<E>> { 47 + let mut attempt = 0u32; 48 + loop { 49 + match self().await { 50 + Ok(val) => return Ok(val), 51 + Err(e) if is_ratelimited(&e) => { 52 + if attempt >= max_retries { 53 + return Err(RetryOutcome::Ratelimited); 54 + } 55 let base = Duration::from_secs(1 << attempt); 56 + let jitter = Duration::from_millis(rand::rng().random_range(0..2000)); 57 tokio::time::sleep(base + jitter).await; 58 attempt += 1; 59 } 60 + Err(e) => return Err(RetryOutcome::Failed(e)), 61 } 62 } 63 } 64 } 65 66 + impl<T, E, F, Fut> RetryWithBackoff<T, E, Fut> for F 67 + where 68 + F: FnMut() -> Fut, 69 + Fut: Future<Output = Result<T, E>>, 70 + { 71 + } 72 + 73 + /// extension trait that adds `.error_for_status()` to futures returning a reqwest `Response`. 74 + trait ErrorForStatus: Future<Output = Result<reqwest::Response, reqwest::Error>> { 75 + fn error_for_status(self) -> impl Future<Output = Result<reqwest::Response, reqwest::Error>> 76 + where 77 + Self: Sized, 78 + { 79 + futures::FutureExt::map(self, |r| r.and_then(|r| r.error_for_status())) 80 + } 81 + } 82 + 83 + impl<F: Future<Output = Result<reqwest::Response, reqwest::Error>>> ErrorForStatus for F {} 84 + 85 // these two are cloudflare specific 86 const CONNECTION_TIMEOUT: StatusCode = unsafe { 87 match StatusCode::from_u16(522) { ··· 150 tracker: Arc<BanTracker>, 151 ) -> (Did<'static>, CrawlCheckResult) { 152 const MAX_RETRIES: u32 = 5; 153 154 + let pds_url = (|| resolver.resolve_identity_info(&did)) 155 + .retry(MAX_RETRIES, |e| { 156 + matches!(e, crate::resolver::ResolverError::Ratelimited) 157 + }) 158 + .await; 159 + let pds_url = match pds_url { 160 Ok((url, _)) => url, 161 Err(RetryOutcome::Ratelimited) => { 162 error!( ··· 192 .append_pair("collection", signal) 193 .append_pair("limit", "1"); 194 195 + let res = (|| http.get(list_records_url.clone()) 196 + .send() 197 + .error_for_status() 198 + .map_err(RequestError::Reqwest) 199 + .or_ban(&pds_handle, || RequestError::Banned)) 200 + .retry(MAX_RETRIES, |e: &RequestError| { 201 matches!(e, RequestError::Reqwest(e) if matches!(e.status(), Some(StatusCode::TOO_MANY_REQUESTS))) 202 + }) 203 + .await; 204 + let res = match res { 205 Ok(r) => { 206 pds_handle.record_success(); 207 r ··· 292 } 293 294 pub mod ban; 295 + use ban::{BanTracker, OrBan}; 296 297 pub struct Crawler { 298 state: Arc<AppState>, ··· 302 resume_pending: usize, 303 count: Arc<AtomicUsize>, 304 crawled_count: Arc<AtomicUsize>, 305 + throttled: Arc<AtomicBool>, 306 tracker: Arc<BanTracker>, 307 } 308 ··· 333 resume_pending, 334 count: Arc::new(AtomicUsize::new(0)), 335 crawled_count: Arc::new(AtomicUsize::new(0)), 336 + throttled: Arc::new(AtomicBool::new(false)), 337 tracker: Arc::new(BanTracker::new()), 338 } 339 } ··· 343 use std::time::Instant; 344 let count = self.count.clone(); 345 let crawled_count = self.crawled_count.clone(); 346 + let throttled = self.throttled.clone(); 347 let mut last_time = Instant::now(); 348 let mut interval = tokio::time::interval(Duration::from_secs(60)); 349 async move { ··· 351 interval.tick().await; 352 let delta_processed = count.swap(0, Ordering::Relaxed); 353 let delta_crawled = crawled_count.swap(0, Ordering::Relaxed); 354 + let is_throttled = throttled.load(Ordering::Relaxed); 355 356 if delta_processed == 0 && delta_crawled == 0 { 357 + if is_throttled { 358 + info!("crawler throttled: pending queue full"); 359 + } else { 360 + debug!("no repos crawled or processed in 60s"); 361 + } 362 continue; 363 } 364 ··· 412 "throttling: above max pending" 413 ); 414 was_throttled = true; 415 + self.throttled.store(true, Ordering::Relaxed); 416 } 417 + tokio::time::sleep(Duration::from_secs(5)).await; 418 } else if pending > self.resume_pending as u64 { 419 if !was_throttled { 420 debug!( ··· 423 "throttling: entering cooldown" 424 ); 425 was_throttled = true; 426 + self.throttled.store(true, Ordering::Relaxed); 427 } 428 429 loop { ··· 436 resume = self.resume_pending, 437 "cooldown, waiting" 438 ); 439 + tokio::time::sleep(Duration::from_secs(5)).await; 440 } 441 break; 442 } else { 443 if was_throttled { 444 info!("throttling released"); 445 was_throttled = false; 446 + self.throttled.store(false, Ordering::Relaxed); 447 } 448 break; 449 } ··· 462 .append_pair("cursor", c.as_str()); 463 } 464 465 + let fetch_result = (|| { 466 + self.http 467 + .get(list_repos_url.clone()) 468 + .send() 469 + .error_for_status() 470 + }) 471 + .retry(5, |e: &reqwest::Error| { 472 + matches!(e.status(), Some(StatusCode::TOO_MANY_REQUESTS)) 473 + }) 474 + .await; 475 + 476 + let res = match fetch_result { 477 + Ok(r) => r, 478 + Err(RetryOutcome::Ratelimited) => { 479 + warn!("rate limited by relay after retries"); 480 + continue; 481 + } 482 + Err(RetryOutcome::Failed(e)) => { 483 + error!(err = %e, "crawler failed to fetch listRepos"); 484 + continue; 485 + } 486 + }; 487 + 488 + let bytes = match res.bytes().await { 489 + Ok(b) => b, 490 + Err(e) => { 491 + error!(err = %e, "cant read listRepos response"); 492 + continue; 493 } 494 + }; 495 + 496 + let output = match serde_json::from_slice::<ListReposOutput>(&bytes) { 497 + Ok(out) => out.into_static(), 498 Err(e) => { 499 + error!(err = %e, "failed to parse listRepos response"); 500 continue; 501 } 502 }; 503 504 if output.repos.is_empty() { 505 info!("finished enumeration (or empty page)");