at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall

[crawler] add NoTlsRetry strategy, log retries, reduce max retries to 5

ptr.pet beaf30a4 ae1c309a

verified
+69 -8
+69 -8
src/crawler/mod.rs
··· 11 11 use rand::rngs::SmallRng; 12 12 use reqwest_middleware::{ClientBuilder, ClientWithMiddleware}; 13 13 use reqwest_retry::Jitter; 14 - use reqwest_retry::{RetryTransientMiddleware, policies::ExponentialBackoff}; 15 - use smol_str::SmolStr; 14 + use reqwest_retry::{ 15 + RetryTransientMiddleware, Retryable, RetryableStrategy, default_on_request_failure, 16 + default_on_request_success, policies::ExponentialBackoff, 17 + }; 18 + use smol_str::{SmolStr, ToSmolStr}; 19 + use std::error::Error; 16 20 use std::sync::Arc; 17 21 use std::sync::atomic::{AtomicUsize, Ordering}; 18 22 use std::time::Duration; 19 - use tracing::{debug, error, info, trace}; 23 + use tracing::{debug, error, info, trace, warn}; 20 24 use url::Url; 21 25 22 26 enum CrawlCheckResult { ··· 26 30 Failed, 27 31 } 28 32 33 + struct NoTlsRetry; 34 + 35 + impl RetryableStrategy for NoTlsRetry { 36 + fn handle( 37 + &self, 38 + res: &Result<reqwest::Response, reqwest_middleware::Error>, 39 + ) -> Option<Retryable> { 40 + match res { 41 + Ok(success) => default_on_request_success(success), 42 + Err(error) => { 43 + if let reqwest_middleware::Error::Reqwest(e) = error { 44 + if e.is_timeout() { 45 + return Some(Retryable::Fatal); 46 + } 47 + let mut src = e.source(); 48 + while let Some(s) = src { 49 + if let Some(io_err) = s.downcast_ref::<std::io::Error>() { 50 + if is_tls_cert_error(io_err) { 51 + return Some(Retryable::Fatal); 52 + } 53 + } 54 + src = s.source(); 55 + } 56 + } 57 + let retryable = default_on_request_failure(error); 58 + if retryable == Some(Retryable::Transient) { 59 + if let reqwest_middleware::Error::Reqwest(e) = error { 60 + let url = e.url().map(|u| u.as_str()).unwrap_or("unknown url"); 61 + let status = e 62 + .status() 63 + .map(|s| s.to_smolstr()) 64 + .unwrap_or_else(|| "unknown status".into()); 65 + warn!("retrying request {url}: {status}"); 66 + } 67 + } 68 + retryable 69 + } 70 + } 71 + } 72 + } 73 + 74 + fn is_tls_cert_error(io_err: &std::io::Error) -> bool { 75 + let Some(inner) = io_err.get_ref() else { 76 + return false; 77 + }; 78 + if let Some(rustls_err) = inner.downcast_ref::<rustls::Error>() { 79 + return matches!(rustls_err, rustls::Error::InvalidCertificate(_)); 80 + } 81 + if let Some(nested_io) = inner.downcast_ref::<std::io::Error>() { 82 + return is_tls_cert_error(nested_io); 83 + } 84 + false 85 + } 86 + 29 87 pub struct Crawler { 30 88 state: Arc<AppState>, 31 89 relay_host: Url, ··· 44 102 ) -> Self { 45 103 let retry_policy = ExponentialBackoff::builder() 46 104 .jitter(Jitter::Bounded) 47 - .build_with_max_retries(8); 105 + .build_with_max_retries(5); 48 106 let reqwest_client = reqwest::Client::builder() 49 107 .user_agent(concat!( 50 108 env!("CARGO_PKG_NAME"), ··· 56 114 .expect("that reqwest will build"); 57 115 58 116 let http = ClientBuilder::new(reqwest_client) 59 - .with(RetryTransientMiddleware::new_with_policy(retry_policy)) 117 + .with(RetryTransientMiddleware::new_with_policy_and_strategy( 118 + retry_policy, 119 + NoTlsRetry, 120 + )) 60 121 .build(); 61 122 let http = Arc::new(http); 62 123 ··· 242 303 let did_key = keys::repo_key(did); 243 304 trace!("crawler found new repo: {did}"); 244 305 245 - let state = RepoState::backfilling(rng.next_u64()); 306 + let state = RepoState::backfilling_untracked(rng.next_u64()); 246 307 batch.insert(&db.repos, &did_key, ser_repo_state(&state)?); 247 308 batch.insert(&db.pending, keys::pending_key(state.index_id), &did_key); 248 309 to_queue.push(did.clone()); ··· 294 355 let resolver = self.state.resolver.clone(); 295 356 let filter = filter.clone(); 296 357 set.spawn(async move { 297 - const MAX_RETRIES: u32 = 8; 358 + const MAX_RETRIES: u32 = 5; 298 359 let mut rng: SmallRng = rand::make_rng(); 299 360 300 361 let pds_url = { ··· 448 509 continue; 449 510 } 450 511 451 - let state = RepoState::backfilling(rng.next_u64()); 512 + let state = RepoState::backfilling_untracked(rng.next_u64()); 452 513 batch.insert(&db.repos, &did_key, ser_repo_state(&state)?); 453 514 batch.insert(&db.pending, keys::pending_key(state.index_id), &did_key); 454 515 }