at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall

[crawler] use a tighter timeout, queue for retrying separately so they dont block the crawler

ptr.pet 8dd3080a e558361e

verified
+103 -42
+82 -38
src/crawler/mod.rs
··· 199 199 let resp = async { 200 200 let resp = http 201 201 .get(describe_url) 202 + .timeout(throttle.timeout()) 202 203 .send() 203 204 .await 204 205 .map_err(RequestError::Reqwest)?; ··· 232 233 return (did, throttle.to_retry_state().into()); 233 234 } 234 235 Err(RequestError::Reqwest(e)) => { 236 + if e.is_timeout() && !throttle.record_timeout() { 237 + // first or second timeout, just requeue 238 + let mut retry_state = RetryState::new(60); 239 + retry_state.status = e.status(); 240 + return (did, retry_state.into()); 241 + } 242 + // third timeout, if timeout fail is_throttle_worthy will ban the pds 243 + 235 244 if is_throttle_worthy(&e) { 236 245 if let Some(mins) = throttle.record_failure() { 237 246 warn!(url = %pds_url, mins, "throttling pds due to hard failure"); ··· 282 291 trace!("no signal-matching collections found"); 283 292 } 284 293 285 - ( 294 + return ( 286 295 did, 287 296 found_signal 288 297 .then_some(CrawlCheckResult::Signal) 289 298 .unwrap_or(CrawlCheckResult::NoSignal), 290 - ) 299 + ); 291 300 } 292 301 293 302 #[derive(Debug, Serialize, Deserialize)] ··· 439 448 .into_diagnostic() 440 449 .wrap_err("can't parse cursor")? 441 450 .unwrap_or(Cursor::Next(None)); 442 - let mut was_throttled = false; 443 451 452 + match cursor { 453 + Cursor::Next(Some(ref cursor)) => info!(cursor = %cursor, "resuming"), 454 + Cursor::Next(None) => info!("starting from scratch"), 455 + Cursor::Done => info!("was done, resuming"), 456 + } 457 + 458 + let mut was_throttled = false; 444 459 loop { 445 460 // throttle check 446 461 loop { ··· 535 550 } 536 551 }; 537 552 538 - let output = match serde_json::from_slice::<ListReposOutput>(&bytes) { 539 - Ok(out) => out.into_static(), 540 - Err(e) => { 541 - error!(err = %e, "failed to parse listRepos response"); 542 - continue; 543 - } 544 - }; 553 + let mut batch = db.inner.batch(); 554 + let mut to_queue = Vec::new(); 555 + let filter = crawler.state.filter.load(); 545 556 546 - if output.repos.is_empty() { 547 - info!("finished enumeration (or empty page)"); 548 - tokio::time::sleep(Duration::from_secs(3600)).await; 549 - continue; 557 + struct ParseResult { 558 + unknown_dids: Vec<Did<'static>>, 559 + cursor: Option<smol_str::SmolStr>, 560 + count: usize, 550 561 } 551 562 552 - debug!(count = output.repos.len(), "fetched repos"); 553 - crawler 554 - .crawled_count 555 - .fetch_add(output.repos.len(), Ordering::Relaxed); 563 + let parse_result = { 564 + let repos = db.repos.clone(); 565 + let filter_ks = db.filter.clone(); 566 + let crawler_ks = db.crawler.clone(); 567 + tokio::task::spawn_blocking(move || -> miette::Result<Option<ParseResult>> { 568 + let output = match serde_json::from_slice::<ListReposOutput>(&bytes) { 569 + Ok(out) => out.into_static(), 570 + Err(e) => { 571 + error!(err = %e, "failed to parse listRepos response"); 572 + return Ok(None); 573 + } 574 + }; 556 575 557 - let mut batch = db.inner.batch(); 558 - let mut to_queue = Vec::new(); 559 - let filter = crawler.state.filter.load(); 576 + if output.repos.is_empty() { 577 + return Ok(None); 578 + } 560 579 561 - let mut unknown_dids = Vec::new(); 562 - for repo in output.repos { 563 - let did_key = keys::repo_key(&repo.did); 580 + let count = output.repos.len(); 581 + let next_cursor = output.cursor.map(|c| c.as_str().into()); 582 + let mut unknown = Vec::new(); 583 + for repo in output.repos { 584 + let excl_key = crate::db::filter::exclude_key(repo.did.as_str())?; 585 + if filter_ks.contains_key(&excl_key).into_diagnostic()? { 586 + continue; 587 + } 564 588 565 - let excl_key = crate::db::filter::exclude_key(repo.did.as_str())?; 566 - if db.filter.contains_key(&excl_key).into_diagnostic()? { 567 - continue; 568 - } 589 + // already in retry queue — let the retry thread handle it 590 + let retry_key = keys::crawler_retry_key(&repo.did); 591 + if crawler_ks.contains_key(&retry_key).into_diagnostic()? { 592 + continue; 593 + } 569 594 570 - // already in retry queue — let the retry thread handle it 571 - let retry_key = keys::crawler_retry_key(&repo.did); 572 - if db.crawler.contains_key(&retry_key).into_diagnostic()? { 573 - continue; 574 - } 595 + let did_key = keys::repo_key(&repo.did); 596 + if !repos.contains_key(&did_key).into_diagnostic()? { 597 + unknown.push(repo.did.into_static()); 598 + } 599 + } 575 600 576 - if !Db::contains_key(db.repos.clone(), &did_key).await? { 577 - unknown_dids.push(repo.did.into_static()); 578 - } 579 - } 601 + Ok(Some(ParseResult { 602 + unknown_dids: unknown, 603 + cursor: next_cursor, 604 + count, 605 + })) 606 + }) 607 + .await 608 + .into_diagnostic()?? 609 + }; 610 + 611 + let Some(ParseResult { 612 + unknown_dids, 613 + cursor: next_cursor, 614 + count, 615 + }) = parse_result 616 + else { 617 + info!("finished enumeration (or empty page)"); 618 + tokio::time::sleep(Duration::from_secs(3600)).await; 619 + continue; 620 + }; 621 + 622 + debug!(count, "fetched repos"); 623 + crawler.crawled_count.fetch_add(count, Ordering::Relaxed); 580 624 581 625 let valid_dids = if filter.check_signals() && !unknown_dids.is_empty() { 582 626 // we dont need to pass any existing since we have none; we are crawling after all ··· 597 641 to_queue.push(did.clone()); 598 642 } 599 643 600 - if let Some(new_cursor) = output.cursor { 644 + if let Some(new_cursor) = next_cursor { 601 645 cursor = Cursor::Next(Some(new_cursor.as_str().into())); 602 646 } else { 603 647 info!("reached end of list.");
+19
src/crawler/throttle.rs
··· 2 2 use std::future::Future; 3 3 use std::sync::Arc; 4 4 use std::sync::atomic::{AtomicI64, AtomicUsize, Ordering}; 5 + use std::time::Duration; 5 6 use tokio::sync::{Notify, Semaphore, SemaphorePermit}; 6 7 use url::Url; 7 8 ··· 47 48 struct State { 48 49 throttled_until: AtomicI64, 49 50 consecutive_failures: AtomicUsize, 51 + consecutive_timeouts: AtomicUsize, 50 52 /// only fires on hard failures (timeout, TLS, bad gateway, etc). 51 53 /// ratelimits do NOT fire this — they just store `throttled_until` and 52 54 /// let tasks exit naturally, deferring to the background retry loop. ··· 59 61 Self { 60 62 throttled_until: AtomicI64::new(0), 61 63 consecutive_failures: AtomicUsize::new(0), 64 + consecutive_timeouts: AtomicUsize::new(0), 62 65 failure_notify: Notify::new(), 63 66 semaphore: Semaphore::new(PER_PDS_CONCURRENCY), 64 67 } ··· 82 85 83 86 pub fn record_success(&self) { 84 87 self.state.consecutive_failures.store(0, Ordering::Release); 88 + self.state.consecutive_timeouts.store(0, Ordering::Release); 85 89 self.state.throttled_until.store(0, Ordering::Release); 86 90 } 87 91 ··· 123 127 self.state.failure_notify.notify_waiters(); 124 128 125 129 Some(minutes) 130 + } 131 + 132 + /// returns current timeout duration — 3s, 6s, or 12s depending on prior timeouts. 133 + pub fn timeout(&self) -> Duration { 134 + let n = self.state.consecutive_timeouts.load(Ordering::Acquire); 135 + Duration::from_secs(3 * 2u64.pow(n.min(2) as u32)) 136 + } 137 + 138 + pub fn record_timeout(&self) -> bool { 139 + let timeouts = self 140 + .state 141 + .consecutive_timeouts 142 + .fetch_add(1, Ordering::AcqRel) 143 + + 1; 144 + timeouts > 2 126 145 } 127 146 128 147 /// acquire a concurrency slot for this PDS. hold the returned permit
+2 -4
src/db/mod.rs
··· 113 113 let repos = open_ks( 114 114 "repos", 115 115 opts() 116 - // most lookups hit since repo must exist after discovery 117 - // we don't hit here if it's not tracked anyway (that happens in filter) 118 - .expect_point_read_hits(true) 116 + // crawler checks if a repo doesn't exist 117 + .expect_point_read_hits(false) 119 118 .max_memtable_size(cfg.db_repos_memtable_size_mb * 1024 * 1024) 120 119 .data_block_size_policy(BlockSizePolicy::all(kb(4))), 121 120 )?; ··· 196 195 let crawler = open_ks( 197 196 "crawler", 198 197 opts() 199 - .expect_point_read_hits(true) 200 198 .max_memtable_size((kb(1024) * 16) as u64) 201 199 .data_block_size_policy(BlockSizePolicy::all(kb(1))), 202 200 )?;