at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall

[crawler] refactor HTTP backoff and structured logging

ptr.pet 5ba9461d 99d1ffac

verified
+251 -284
-60
Cargo.lock
··· 1540 "mimalloc", 1541 "rand 0.10.0", 1542 "reqwest", 1543 - "reqwest-middleware", 1544 - "reqwest-retry", 1545 "rmp-serde", 1546 "rustls", 1547 "scc", ··· 2866 ] 2867 2868 [[package]] 2869 - name = "reqwest-middleware" 2870 - version = "0.5.1" 2871 - source = "registry+https://github.com/rust-lang/crates.io-index" 2872 - checksum = "199dda04a536b532d0cc04d7979e39b1c763ea749bf91507017069c00b96056f" 2873 - dependencies = [ 2874 - "anyhow", 2875 - "async-trait", 2876 - "http", 2877 - "reqwest", 2878 - "thiserror 2.0.18", 2879 - "tower-service", 2880 - ] 2881 - 2882 - [[package]] 2883 - name = "reqwest-retry" 2884 - version = "0.9.1" 2885 - source = "registry+https://github.com/rust-lang/crates.io-index" 2886 - checksum = "fe2412db2af7d2268e7a5406be0431f37d9eb67ff390f35b395716f5f06c2eaa" 2887 - dependencies = [ 2888 - "anyhow", 2889 - "async-trait", 2890 - "futures", 2891 - "getrandom 0.2.17", 2892 - "http", 2893 - "hyper", 2894 - "reqwest", 2895 - "reqwest-middleware", 2896 - "retry-policies", 2897 - "thiserror 2.0.18", 2898 - "tokio", 2899 - "tracing", 2900 - "wasmtimer", 2901 - ] 2902 - 2903 - [[package]] 2904 name = "resolv-conf" 2905 version = "0.7.6" 2906 source = "registry+https://github.com/rust-lang/crates.io-index" 2907 checksum = "1e061d1b48cb8d38042de4ae0a7a6401009d6143dc80d2e2d6f31f0bdd6470c7" 2908 - 2909 - [[package]] 2910 - name = "retry-policies" 2911 - version = "0.5.1" 2912 - source = "registry+https://github.com/rust-lang/crates.io-index" 2913 - checksum = "46a4bd6027df676bcb752d3724db0ea3c0c5fc1dd0376fec51ac7dcaf9cc69be" 2914 - dependencies = [ 2915 - "rand 0.9.2", 2916 - ] 2917 2918 [[package]] 2919 name = "rfc6979" ··· 4184 "hashbrown 0.15.5", 4185 "indexmap", 4186 "semver", 4187 - ] 4188 - 4189 - [[package]] 4190 - name = "wasmtimer" 4191 - version = "0.4.3" 4192 - source = "registry+https://github.com/rust-lang/crates.io-index" 4193 - checksum = "1c598d6b99ea013e35844697fc4670d08339d5cda15588f193c6beedd12f644b" 4194 - dependencies = [ 4195 - "futures", 4196 - "js-sys", 4197 - "parking_lot", 4198 - "pin-utils", 4199 - "slab", 4200 - "wasm-bindgen", 4201 ] 4202 4203 [[package]]
··· 1540 "mimalloc", 1541 "rand 0.10.0", 1542 "reqwest", 1543 "rmp-serde", 1544 "rustls", 1545 "scc", ··· 2864 ] 2865 2866 [[package]] 2867 name = "resolv-conf" 2868 version = "0.7.6" 2869 source = "registry+https://github.com/rust-lang/crates.io-index" 2870 checksum = "1e061d1b48cb8d38042de4ae0a7a6401009d6143dc80d2e2d6f31f0bdd6470c7" 2871 2872 [[package]] 2873 name = "rfc6979" ··· 4138 "hashbrown 0.15.5", 4139 "indexmap", 4140 "semver", 4141 ] 4142 4143 [[package]]
-2
Cargo.toml
··· 23 smol_str = "0.3" 24 futures = "0.3" 25 reqwest = { version = "0.13.2", features = ["json", "rustls", "stream", "gzip", "brotli", "zstd", "http2"], default-features = false } 26 - reqwest-middleware = { version = "0.5.1", default-features = false, features = ["http2", "rustls"] } 27 - reqwest-retry = { version = "0.9.1" } 28 axum = { version = "0.8.8", features = ["ws", "macros"] } 29 tower-http = { version = "0.6.6", features = ["cors", "trace"] } 30
··· 23 smol_str = "0.3" 24 futures = "0.3" 25 reqwest = { version = "0.13.2", features = ["json", "rustls", "stream", "gzip", "brotli", "zstd", "http2"], default-features = false } 26 axum = { version = "0.8.8", features = ["ws", "macros"] } 27 tower-http = { version = "0.6.6", features = ["cors", "trace"] } 28
+250 -222
src/crawler/mod.rs
··· 9 use rand::Rng; 10 use rand::RngExt; 11 use rand::rngs::SmallRng; 12 - use reqwest_middleware::{ClientBuilder, ClientWithMiddleware}; 13 - use reqwest_retry::Jitter; 14 - use reqwest_retry::{ 15 - RetryTransientMiddleware, Retryable, RetryableStrategy, default_on_request_failure, 16 - default_on_request_success, policies::ExponentialBackoff, 17 - }; 18 - use smol_str::{SmolStr, ToSmolStr}; 19 - use std::error::Error; 20 use std::sync::Arc; 21 use std::sync::atomic::{AtomicUsize, Ordering}; 22 use std::time::Duration; 23 - use tracing::{debug, error, info, trace, warn}; 24 use url::Url; 25 26 enum CrawlCheckResult { 27 Signal, 28 NoSignal, 29 Ratelimited, 30 - Failed, 31 } 32 33 - struct NoTlsRetry; 34 35 - impl RetryableStrategy for NoTlsRetry { 36 - fn handle( 37 - &self, 38 - res: &Result<reqwest::Response, reqwest_middleware::Error>, 39 - ) -> Option<Retryable> { 40 - match res { 41 - Ok(success) => default_on_request_success(success), 42 - Err(error) => { 43 - if let reqwest_middleware::Error::Reqwest(e) = error { 44 - if e.is_timeout() { 45 - return Some(Retryable::Fatal); 46 - } 47 - let mut src = e.source(); 48 - while let Some(s) = src { 49 - if let Some(io_err) = s.downcast_ref::<std::io::Error>() { 50 - if is_tls_cert_error(io_err) { 51 - return Some(Retryable::Fatal); 52 - } 53 } 54 - src = s.source(); 55 } 56 } 57 - let retryable = default_on_request_failure(error); 58 - if retryable == Some(Retryable::Transient) { 59 - if let reqwest_middleware::Error::Reqwest(e) = error { 60 - let url = e.url().map(|u| u.as_str()).unwrap_or("unknown url"); 61 - let status = e 62 - .status() 63 - .map(|s| s.to_smolstr()) 64 - .unwrap_or_else(|| "unknown status".into()); 65 - warn!("retrying request {url}: {status}"); 66 } 67 } 68 - retryable 69 } 70 } 71 } 72 - } 73 74 - fn is_tls_cert_error(io_err: &std::io::Error) -> bool { 75 - let Some(inner) = io_err.get_ref() else { 76 - return false; 77 - }; 78 - if let Some(rustls_err) = inner.downcast_ref::<rustls::Error>() { 79 - return matches!(rustls_err, rustls::Error::InvalidCertificate(_)); 80 } 81 - if let Some(nested_io) = inner.downcast_ref::<std::io::Error>() { 82 - return is_tls_cert_error(nested_io); 83 - } 84 - false 85 } 86 87 pub struct Crawler { 88 state: Arc<AppState>, 89 relay_host: Url, 90 - http: Arc<ClientWithMiddleware>, 91 max_pending: usize, 92 resume_pending: usize, 93 count: Arc<AtomicUsize>, ··· 100 max_pending: usize, 101 resume_pending: usize, 102 ) -> Self { 103 - let retry_policy = ExponentialBackoff::builder() 104 - .jitter(Jitter::Bounded) 105 - .build_with_max_retries(5); 106 - let reqwest_client = reqwest::Client::builder() 107 - .user_agent(concat!( 108 - env!("CARGO_PKG_NAME"), 109 - "/", 110 - env!("CARGO_PKG_VERSION") 111 - )) 112 - .gzip(true) 113 - .build() 114 - .expect("that reqwest will build"); 115 - 116 - let http = ClientBuilder::new(reqwest_client) 117 - .with(RetryTransientMiddleware::new_with_policy_and_strategy( 118 - retry_policy, 119 - NoTlsRetry, 120 - )) 121 - .build(); 122 - let http = Arc::new(http); 123 124 Self { 125 state, ··· 132 } 133 134 pub async fn run(self) -> Result<()> { 135 - info!("crawler started"); 136 - 137 tokio::spawn({ 138 let count = self.count.clone(); 139 - let mut last_time = std::time::Instant::now(); 140 let mut interval = tokio::time::interval(Duration::from_secs(60)); 141 async move { 142 loop { 143 interval.tick().await; 144 let delta = count.swap(0, Ordering::Relaxed); 145 if delta == 0 { 146 continue; 147 } 148 let elapsed = last_time.elapsed().as_secs_f64(); 149 - let rate = if elapsed > 0.0 { 150 - delta as f64 / elapsed 151 - } else { 152 - 0.0 153 - }; 154 - info!("crawler: {rate:.2} repos/s ({delta} repos in {elapsed:.1}s)"); 155 - last_time = std::time::Instant::now(); 156 } 157 } 158 }); 159 160 - let mut api_url = self.relay_host.clone(); 161 - if api_url.scheme() == "wss" { 162 - api_url 163 .set_scheme("https") 164 - .map_err(|_| miette::miette!("invalid url: {api_url}"))?; 165 - } else if api_url.scheme() == "ws" { 166 - api_url 167 .set_scheme("http") 168 - .map_err(|_| miette::miette!("invalid url: {api_url}"))?; 169 } 170 171 let mut rng: SmallRng = rand::make_rng(); ··· 178 .await? 179 .map(|bytes| { 180 let s = String::from_utf8_lossy(&bytes); 181 - info!("resuming crawler from cursor: {s}"); 182 s.into() 183 }); 184 let mut was_throttled = false; ··· 190 if pending > self.max_pending as u64 { 191 if !was_throttled { 192 debug!( 193 - "crawler throttling: pending repos {} > max {}, sleeping...", 194 - pending, self.max_pending 195 ); 196 was_throttled = true; 197 } 198 - tokio::time::sleep(Duration::from_secs(10)).await; 199 } else if pending > self.resume_pending as u64 { 200 if !was_throttled { 201 debug!( 202 - "crawler throttling: pending repos {} > max {}, entering cooldown...", 203 - pending, self.max_pending 204 ); 205 was_throttled = true; 206 } 207 208 - while self.state.db.get_count("pending").await > self.resume_pending as u64 { 209 debug!( 210 - "crawler cooldown: pending repos {} > resume {}, sleeping...", 211 - self.state.db.get_count("pending").await, 212 - self.resume_pending 213 ); 214 - tokio::time::sleep(Duration::from_secs(10)).await; 215 } 216 break; 217 } else { 218 if was_throttled { 219 - info!("crawler resuming: throttling released"); 220 was_throttled = false; 221 } 222 break; ··· 224 } 225 226 // 2. fetch listrepos 227 - let mut list_repos_url = api_url 228 .join("/xrpc/com.atproto.sync.listRepos") 229 .into_diagnostic()?; 230 list_repos_url ··· 238 239 let res_result = self.http.get(list_repos_url.clone()).send().await; 240 let bytes = match res_result { 241 - Ok(res) => match res.bytes().await { 242 - Ok(b) => b, 243 - Err(e) => { 244 - error!( 245 - "crawler failed to parse list repos response: {e}. retrying in 30s..." 246 - ); 247 - tokio::time::sleep(Duration::from_secs(30)).await; 248 - continue; 249 } 250 - }, 251 Err(e) => { 252 - error!("crawler failed to list repos: {e}. retrying in 30s..."); 253 - tokio::time::sleep(Duration::from_secs(30)).await; 254 continue; 255 } 256 }; ··· 259 .into_static(); 260 261 if output.repos.is_empty() { 262 - info!("crawler finished enumeration (or empty page). sleeping for 1 hour."); 263 tokio::time::sleep(Duration::from_secs(3600)).await; 264 continue; 265 } 266 267 - debug!("crawler fetched {} repos...", output.repos.len()); 268 269 let mut batch = db.inner.batch(); 270 let mut to_queue = Vec::new(); ··· 301 302 for did in &valid_dids { 303 let did_key = keys::repo_key(did); 304 - trace!("crawler found new repo: {did}"); 305 306 let state = RepoState::backfilling_untracked(rng.next_u64()); 307 batch.insert(&db.repos, &did_key, ser_repo_state(&state)?); ··· 320 ); 321 } else { 322 // end of pagination 323 - info!("crawler reached end of list."); 324 cursor = None; 325 } 326 ··· 342 async fn check_signals_batch( 343 &self, 344 dids: &[Did<'static>], 345 - filter: &crate::filter::FilterConfig, 346 batch: &mut fjall::OwnedWriteBatch, 347 ) -> Result<Vec<Did<'static>>> { 348 let db = &self.state.db; ··· 354 let http = self.http.clone(); 355 let resolver = self.state.resolver.clone(); 356 let filter = filter.clone(); 357 - set.spawn(async move { 358 - const MAX_RETRIES: u32 = 5; 359 - let mut rng: SmallRng = rand::make_rng(); 360 - 361 - let pds_url = { 362 - let mut attempt = 0u32; 363 - loop { 364 - match resolver.resolve_identity_info(&did).await { 365 - Ok((url, _)) => break url, 366 - Err(crate::resolver::ResolverError::Ratelimited) 367 - if attempt < MAX_RETRIES => 368 - { 369 - let base = Duration::from_secs(1 << attempt); 370 - let jitter = Duration::from_millis(rng.random_range(0..2000)); 371 - let try_in = base + jitter; 372 - debug!( 373 - "crawler: rate limited resolving {did}, retry {}/{MAX_RETRIES} in {}s", 374 - attempt + 1, 375 - try_in.as_secs_f64() 376 - ); 377 - tokio::time::sleep(try_in).await; 378 - attempt += 1; 379 - } 380 - Err(crate::resolver::ResolverError::Ratelimited) => { 381 - error!( 382 - "crawler: rate limited resolving {did} after {MAX_RETRIES} retries" 383 - ); 384 - return (did, CrawlCheckResult::Ratelimited); 385 - } 386 - Err(e) => { 387 - error!("crawler: failed to resolve {did}: {e}"); 388 - return (did, CrawlCheckResult::Failed); 389 - } 390 - } 391 - } 392 - }; 393 - 394 - let mut found_signal = false; 395 - for signal in filter.signals.iter() { 396 - let mut list_records_url = 397 - pds_url.join("/xrpc/com.atproto.repo.listRecords").unwrap(); 398 - list_records_url 399 - .query_pairs_mut() 400 - .append_pair("repo", &did) 401 - .append_pair("collection", signal) 402 - .append_pair("limit", "1"); 403 - 404 - let res = http 405 - .get(list_records_url) 406 - .send() 407 - .await 408 - .into_diagnostic() 409 - .map(|res| res.error_for_status().into_diagnostic()) 410 - .flatten(); 411 - match res { 412 - Ok(res) => { 413 - let Ok(bytes) = res.bytes().await else { 414 - error!( 415 - "failed to read bytes from listRecords response for repo {did}, signal {signal}" 416 - ); 417 - return (did, CrawlCheckResult::Failed); 418 - }; 419 - match serde_json::from_slice::<ListRecordsOutput>(&bytes) { 420 - Ok(out) => { 421 - if !out.records.is_empty() { 422 - found_signal = true; 423 - break; 424 - } 425 - } 426 - Err(e) => { 427 - error!( 428 - "failed to parse listRecords response for repo {did}, signal {signal}: {e}" 429 - ); 430 - return (did, CrawlCheckResult::Failed); 431 - } 432 - } 433 - } 434 - Err(e) => { 435 - error!( 436 - "failed to listRecords for repo {did}, signal {signal}: {e}" 437 - ); 438 - return (did, CrawlCheckResult::Failed); 439 - } 440 - } 441 - } 442 - 443 - if found_signal { 444 - (did, CrawlCheckResult::Signal) 445 - } else { 446 - trace!("crawler skipped repo {did}: no records match signals"); 447 - (did, CrawlCheckResult::NoSignal) 448 - } 449 - }); 450 } 451 452 while let Some(res) = set.join_next().await { ··· 459 CrawlCheckResult::NoSignal => { 460 batch.remove(&db.crawler, keys::crawler_failed_key(&did)); 461 } 462 - CrawlCheckResult::Ratelimited | CrawlCheckResult::Failed => { 463 - batch.insert(&db.crawler, keys::crawler_failed_key(&did), []); 464 } 465 } 466 } ··· 482 483 let mut failed_dids = Vec::new(); 484 for guard in db.crawler.prefix(keys::CRAWLER_FAILED_PREFIX) { 485 - let (key, _) = guard.into_inner().into_diagnostic()?; 486 let did_bytes = &key[keys::CRAWLER_FAILED_PREFIX.len()..]; 487 let trimmed = TrimmedDid::try_from(did_bytes)?; 488 failed_dids.push(trimmed.to_did()); ··· 492 return Ok(()); 493 } 494 495 - info!( 496 - "crawler: retrying {} previously failed repos", 497 - failed_dids.len() 498 - ); 499 500 let mut batch = db.inner.batch(); 501 let valid_dids = self ··· 519 .into_diagnostic()??; 520 521 if !valid_dids.is_empty() { 522 - info!( 523 - "crawler: recovered {} repos from failed retry", 524 - valid_dids.len() 525 - ); 526 self.account_new_repos(valid_dids.len()).await; 527 } 528
··· 9 use rand::Rng; 10 use rand::RngExt; 11 use rand::rngs::SmallRng; 12 + use reqwest::StatusCode; 13 + use smol_str::SmolStr; 14 + use std::future::Future; 15 use std::sync::Arc; 16 use std::sync::atomic::{AtomicUsize, Ordering}; 17 use std::time::Duration; 18 + use tracing::{Instrument, debug, error, info, trace, warn}; 19 use url::Url; 20 21 enum CrawlCheckResult { 22 Signal, 23 NoSignal, 24 Ratelimited, 25 + Failed(Option<u16>), 26 } 27 28 + /// outcome of [`retry_with_backoff`] when the operation does not succeed. 29 + enum RetryOutcome<E> { 30 + /// ratelimited after exhausting all retries 31 + Ratelimited, 32 + /// non-ratelimit failure, carrying the last error 33 + Failed(E), 34 + } 35 36 + /// retries an async operation with exponential backoff when ratelimited. 37 + /// 38 + /// `op` is called on each attempt and returns `Result<T, E>`. 39 + /// `is_ratelimited` classifies an error as a ratelimit (triggering a retry) 40 + /// versus a fatal failure (returning immediately). 41 + async fn retry_with_backoff<T, E, F, Fut>( 42 + rng: &mut SmallRng, 43 + max_retries: u32, 44 + mut op: F, 45 + is_ratelimited: impl Fn(&E) -> bool, 46 + ) -> Result<T, RetryOutcome<E>> 47 + where 48 + F: FnMut() -> Fut, 49 + Fut: Future<Output = Result<T, E>>, 50 + { 51 + let mut attempt = 0u32; 52 + loop { 53 + match op().await { 54 + Ok(val) => return Ok(val), 55 + Err(e) if is_ratelimited(&e) => { 56 + if attempt < max_retries { 57 + let base = Duration::from_secs(1 << attempt); 58 + let jitter = Duration::from_millis(rng.random_range(0..2000)); 59 + tokio::time::sleep(base + jitter).await; 60 + attempt += 1; 61 + } else { 62 + return Err(RetryOutcome::Ratelimited); 63 + } 64 + } 65 + Err(e) => return Err(RetryOutcome::Failed(e)), 66 + } 67 + } 68 + } 69 + 70 + async fn check_repo_signals( 71 + http: Arc<reqwest::Client>, 72 + resolver: crate::resolver::Resolver, 73 + filter: Arc<crate::filter::FilterConfig>, 74 + did: Did<'static>, 75 + ) -> (Did<'static>, CrawlCheckResult) { 76 + const MAX_RETRIES: u32 = 5; 77 + let mut rng: SmallRng = rand::make_rng(); 78 + 79 + let pds_url = retry_with_backoff( 80 + &mut rng, 81 + MAX_RETRIES, 82 + || resolver.resolve_identity_info(&did), 83 + |e| matches!(e, crate::resolver::ResolverError::Ratelimited), 84 + ); 85 + let pds_url = match pds_url.await { 86 + Ok((url, _)) => url, 87 + Err(RetryOutcome::Ratelimited) => { 88 + error!( 89 + retries = MAX_RETRIES, 90 + "rate limited resolving identity, giving up" 91 + ); 92 + return (did, CrawlCheckResult::Ratelimited); 93 + } 94 + Err(RetryOutcome::Failed(e)) => { 95 + error!(err = %e, "failed to resolve identity"); 96 + return (did, CrawlCheckResult::Failed(None)); 97 + } 98 + }; 99 + 100 + let mut found_signal = false; 101 + for signal in filter.signals.iter() { 102 + let res = async { 103 + let mut list_records_url = pds_url.join("/xrpc/com.atproto.repo.listRecords").unwrap(); 104 + list_records_url 105 + .query_pairs_mut() 106 + .append_pair("repo", &did) 107 + .append_pair("collection", signal) 108 + .append_pair("limit", "1"); 109 + 110 + let res = retry_with_backoff( 111 + &mut rng, 112 + MAX_RETRIES, 113 + || async { 114 + http.get(list_records_url.clone()) 115 + .send() 116 + .await? 117 + .error_for_status() 118 + }, 119 + |e: &reqwest::Error| e.status() == Some(StatusCode::TOO_MANY_REQUESTS), 120 + ); 121 + let res = match res.await { 122 + Ok(r) => r, 123 + Err(RetryOutcome::Ratelimited) => { 124 + warn!( 125 + retries = MAX_RETRIES, 126 + "rate limited on listRecords, giving up" 127 + ); 128 + return CrawlCheckResult::Ratelimited; 129 + } 130 + Err(RetryOutcome::Failed(e)) => { 131 + match e.status() { 132 + Some(StatusCode::NOT_FOUND | StatusCode::GONE) => { 133 + trace!("repo not found"); 134 } 135 + Some(s) if s.is_client_error() => { 136 + error!(status = %s, "repo unavailable"); 137 + } 138 + _ => { 139 + error!(err = %e, "listRecords failed"); 140 + return CrawlCheckResult::Failed(e.status().map(|s| s.as_u16())); 141 + } 142 } 143 + return CrawlCheckResult::NoSignal; 144 } 145 + }; 146 + 147 + let bytes = match res.bytes().await { 148 + Ok(b) => b, 149 + Err(e) => { 150 + error!(err = %e, "failed to read listRecords response"); 151 + return CrawlCheckResult::Failed(None); 152 + } 153 + }; 154 + 155 + match serde_json::from_slice::<ListRecordsOutput>(&bytes) { 156 + Ok(out) => { 157 + if !out.records.is_empty() { 158 + return CrawlCheckResult::Signal; 159 } 160 } 161 + Err(e) => { 162 + error!(err = %e, "failed to parse listRecords response"); 163 + return CrawlCheckResult::Failed(None); 164 + } 165 + } 166 + 167 + CrawlCheckResult::NoSignal 168 + } 169 + .instrument(tracing::info_span!("signal_check", signal = %signal)) 170 + .await; 171 + 172 + match res { 173 + CrawlCheckResult::Signal => { 174 + found_signal = true; 175 + break; 176 + } 177 + CrawlCheckResult::NoSignal => { 178 + continue; 179 + } 180 + other => { 181 + return (did, other); 182 } 183 } 184 } 185 186 + if !found_signal { 187 + trace!("no signal-matching records found"); 188 } 189 + 190 + ( 191 + did, 192 + found_signal 193 + .then_some(CrawlCheckResult::Signal) 194 + .unwrap_or(CrawlCheckResult::NoSignal), 195 + ) 196 } 197 198 pub struct Crawler { 199 state: Arc<AppState>, 200 relay_host: Url, 201 + http: Arc<reqwest::Client>, 202 max_pending: usize, 203 resume_pending: usize, 204 count: Arc<AtomicUsize>, ··· 211 max_pending: usize, 212 resume_pending: usize, 213 ) -> Self { 214 + let http = Arc::new( 215 + reqwest::Client::builder() 216 + .user_agent(concat!( 217 + env!("CARGO_PKG_NAME"), 218 + "/", 219 + env!("CARGO_PKG_VERSION") 220 + )) 221 + .gzip(true) 222 + .build() 223 + .expect("that reqwest will build"), 224 + ); 225 226 Self { 227 state, ··· 234 } 235 236 pub async fn run(self) -> Result<()> { 237 tokio::spawn({ 238 + use std::time::Instant; 239 let count = self.count.clone(); 240 + let mut last_time = Instant::now(); 241 let mut interval = tokio::time::interval(Duration::from_secs(60)); 242 async move { 243 loop { 244 interval.tick().await; 245 let delta = count.swap(0, Ordering::Relaxed); 246 if delta == 0 { 247 + debug!("no repos processed in 60s"); 248 continue; 249 } 250 let elapsed = last_time.elapsed().as_secs_f64(); 251 + let rate = (elapsed > 0.0) 252 + .then(|| delta as f64 / elapsed) 253 + .unwrap_or(0.0); 254 + info!(rate, delta, elapsed, "crawler progress"); 255 + last_time = Instant::now(); 256 } 257 } 258 }); 259 260 + let mut relay_url = self.relay_host.clone(); 261 + match relay_url.scheme() { 262 + "wss" => relay_url 263 .set_scheme("https") 264 + .map_err(|_| miette::miette!("invalid url: {relay_url}"))?, 265 + "ws" => relay_url 266 .set_scheme("http") 267 + .map_err(|_| miette::miette!("invalid url: {relay_url}"))?, 268 + _ => {} 269 } 270 271 let mut rng: SmallRng = rand::make_rng(); ··· 278 .await? 279 .map(|bytes| { 280 let s = String::from_utf8_lossy(&bytes); 281 + info!(cursor = %s, "resuming"); 282 s.into() 283 }); 284 let mut was_throttled = false; ··· 290 if pending > self.max_pending as u64 { 291 if !was_throttled { 292 debug!( 293 + pending, 294 + max = self.max_pending, 295 + "throttling: above max pending" 296 ); 297 was_throttled = true; 298 } 299 } else if pending > self.resume_pending as u64 { 300 if !was_throttled { 301 debug!( 302 + pending, 303 + resume = self.resume_pending, 304 + "throttling: entering cooldown" 305 ); 306 was_throttled = true; 307 } 308 309 + loop { 310 + let current_pending = self.state.db.get_count("pending").await; 311 + if current_pending <= self.resume_pending as u64 { 312 + break; 313 + } 314 debug!( 315 + pending = current_pending, 316 + resume = self.resume_pending, 317 + "cooldown, waiting" 318 ); 319 } 320 break; 321 } else { 322 if was_throttled { 323 + info!("throttling released"); 324 was_throttled = false; 325 } 326 break; ··· 328 } 329 330 // 2. fetch listrepos 331 + let mut list_repos_url = relay_url 332 .join("/xrpc/com.atproto.sync.listRepos") 333 .into_diagnostic()?; 334 list_repos_url ··· 342 343 let res_result = self.http.get(list_repos_url.clone()).send().await; 344 let bytes = match res_result { 345 + Ok(res) => { 346 + match res.status() { 347 + StatusCode::TOO_MANY_REQUESTS => { 348 + warn!("rate limited by relay"); 349 + continue; 350 + } 351 + s if !s.is_success() => { 352 + error!(status = %s, "cant crawl"); 353 + continue; 354 + } 355 + _ => {} 356 + } 357 + match res.bytes().await { 358 + Ok(b) => b, 359 + Err(e) => { 360 + error!(err = %e, "cant read listRepos"); 361 + continue; 362 + } 363 } 364 + } 365 Err(e) => { 366 + error!(err = %e, "crawler failed to list repos"); 367 continue; 368 } 369 }; ··· 372 .into_static(); 373 374 if output.repos.is_empty() { 375 + info!("finished enumeration (or empty page)"); 376 tokio::time::sleep(Duration::from_secs(3600)).await; 377 continue; 378 } 379 380 + debug!(count = output.repos.len(), "fetched repos"); 381 382 let mut batch = db.inner.batch(); 383 let mut to_queue = Vec::new(); ··· 414 415 for did in &valid_dids { 416 let did_key = keys::repo_key(did); 417 + trace!(did = %did, "found new repo"); 418 419 let state = RepoState::backfilling_untracked(rng.next_u64()); 420 batch.insert(&db.repos, &did_key, ser_repo_state(&state)?); ··· 433 ); 434 } else { 435 // end of pagination 436 + info!("reached end of list."); 437 cursor = None; 438 } 439 ··· 455 async fn check_signals_batch( 456 &self, 457 dids: &[Did<'static>], 458 + filter: &Arc<crate::filter::FilterConfig>, 459 batch: &mut fjall::OwnedWriteBatch, 460 ) -> Result<Vec<Did<'static>>> { 461 let db = &self.state.db; ··· 467 let http = self.http.clone(); 468 let resolver = self.state.resolver.clone(); 469 let filter = filter.clone(); 470 + let span = tracing::info_span!("check_signals", did = %did); 471 + set.spawn(check_repo_signals(http, resolver, filter, did).instrument(span)); 472 } 473 474 while let Some(res) = set.join_next().await { ··· 481 CrawlCheckResult::NoSignal => { 482 batch.remove(&db.crawler, keys::crawler_failed_key(&did)); 483 } 484 + CrawlCheckResult::Ratelimited => { 485 + batch.insert( 486 + &db.crawler, 487 + keys::crawler_failed_key(&did), 488 + 429u16.to_be_bytes().as_ref(), 489 + ); 490 + } 491 + CrawlCheckResult::Failed(status) => { 492 + let code = status.unwrap_or(0); 493 + batch.insert( 494 + &db.crawler, 495 + keys::crawler_failed_key(&did), 496 + code.to_be_bytes().as_ref(), 497 + ); 498 } 499 } 500 } ··· 516 517 let mut failed_dids = Vec::new(); 518 for guard in db.crawler.prefix(keys::CRAWLER_FAILED_PREFIX) { 519 + let key = guard.key().into_diagnostic()?; 520 let did_bytes = &key[keys::CRAWLER_FAILED_PREFIX.len()..]; 521 let trimmed = TrimmedDid::try_from(did_bytes)?; 522 failed_dids.push(trimmed.to_did()); ··· 526 return Ok(()); 527 } 528 529 + info!("retrying {} previously failed repos", failed_dids.len()); 530 531 let mut batch = db.inner.batch(); 532 let valid_dids = self ··· 550 .into_diagnostic()??; 551 552 if !valid_dids.is_empty() { 553 + info!("recovered {} repos from failed retry", valid_dids.len()); 554 self.account_new_repos(valid_dids.len()).await; 555 } 556
+1
src/resolver.rs
··· 138 } 139 } 140 141 async fn resolve_doc(&self, did: &Did<'_>) -> Result<MiniDoc, ResolverError> { 142 let did_static = did.clone().into_static(); 143 if let Some(entry) = self.inner.cache.get_async(&did_static).await {
··· 138 } 139 } 140 141 + #[inline] 142 async fn resolve_doc(&self, did: &Did<'_>) -> Result<MiniDoc, ResolverError> { 143 let did_static = did.clone().into_static(); 144 if let Some(entry) = self.inner.cache.get_async(&did_static).await {