at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall

[crawler] track attempts, backoff exponentially for fails, ban if failed too much

ptr.pet d58e0261 ffa704fd

verified
+80 -26
+80 -26
src/crawler/mod.rs
··· 15 use reqwest::StatusCode; 16 use serde::{Deserialize, Serialize}; 17 use smol_str::SmolStr; 18 use std::ops::{Add, Mul, Sub}; 19 use std::sync::Arc; 20 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; ··· 22 use tracing::{Instrument, debug, error, info, trace, warn}; 23 use url::Url; 24 25 #[derive(Debug, Serialize, Deserialize)] 26 struct RetryState { 27 - after: DateTime<Utc>, // seconds 28 - duration: TimeDelta, // banned for in seconds 29 #[serde(serialize_with = "crate::util::ser_status_code")] 30 #[serde(deserialize_with = "crate::util::deser_status_code")] 31 status: Option<StatusCode>, ··· 37 Self { 38 duration, 39 after: Utc::now().add(duration), 40 status: None, 41 } 42 } 43 44 fn with_status(mut self, code: StatusCode) -> Self { 45 self.status = Some(code); 46 self ··· 57 RetryState { 58 duration: after.sub(Utc::now()), 59 after, 60 status: None, 61 } 62 } ··· 567 continue; 568 } 569 570 if !Db::contains_key(db.repos.clone(), &did_key).await? { 571 unknown_dids.push(repo.did.into_static()); 572 } 573 } 574 575 let valid_dids = if filter.check_signals() && !unknown_dids.is_empty() { 576 crawler 577 - .check_signals_batch(&unknown_dids, &filter, &mut batch) 578 .await? 579 } else { 580 unknown_dids ··· 616 } 617 } 618 619 - /// scan the retry queue for entries whose `retry_after` timestamp has passed, 620 - /// retry them, and return the earliest still-pending timestamp (if any) so the 621 - /// caller knows when to wake up next. 622 fn process_retry_queue(&self) -> Result<Option<DateTime<Utc>>> { 623 let db = &self.state.db; 624 let now = Utc::now(); 625 626 let mut ready: Vec<Did> = Vec::new(); 627 let mut next_retry: Option<DateTime<Utc>> = None; 628 629 let mut rng: SmallRng = rand::make_rng(); 630 631 let mut batch = db.inner.batch(); 632 for guard in db.crawler.prefix(keys::CRAWLER_RETRY_PREFIX) { 633 let (key, val) = guard.into_inner().into_diagnostic()?; 634 - let RetryState { 635 - after, duration, .. 636 - }: RetryState = rmp_serde::from_slice(&val).into_diagnostic()?; 637 let did = keys::crawler_retry_parse_key(&key)?.to_did(); 638 639 - // we check an extra backoff of 1 - 7% just to make it less likely for 640 - // many requests to coincide with each other 641 let backoff = TimeDelta::seconds( 642 - duration.as_seconds_f64().mul(rng.random_range(0.01..0.07)) as i64, 643 ); 644 - if after + backoff > now { 645 next_retry = Some( 646 next_retry 647 - .map(|earliest| earliest.min(after)) 648 - .unwrap_or(after), 649 ); 650 continue; 651 } 652 653 - ready.push(did); 654 } 655 656 if ready.is_empty() { ··· 661 662 let handle = tokio::runtime::Handle::current(); 663 let filter = self.state.filter.load(); 664 - let valid_dids = handle.block_on(self.check_signals_batch(&ready, &filter, &mut batch))?; 665 666 let mut rng: SmallRng = rand::make_rng(); 667 for did in &valid_dids { ··· 683 handle.block_on(self.account_new_repos(valid_dids.len())); 684 } 685 686 - Ok(next_retry) 687 } 688 689 async fn check_signals_batch( ··· 691 dids: &[Did<'static>], 692 filter: &Arc<crate::filter::FilterConfig>, 693 batch: &mut fjall::OwnedWriteBatch, 694 ) -> Result<Vec<Did<'static>>> { 695 let db = &self.state.db; 696 let mut valid = Vec::new(); ··· 710 let (did, result) = res.into_diagnostic()?; 711 match result { 712 CrawlCheckResult::Signal => { 713 valid.push(did); 714 } 715 - CrawlCheckResult::NoSignal => {} 716 CrawlCheckResult::Retry(state) => { 717 - batch.insert( 718 - &db.crawler, 719 - keys::crawler_retry_key(&did), 720 - rmp_serde::to_vec(&state) 721 - .into_diagnostic() 722 - .wrap_err("cant ser retry state")?, 723 - ); 724 } 725 } 726 }
··· 15 use reqwest::StatusCode; 16 use serde::{Deserialize, Serialize}; 17 use smol_str::SmolStr; 18 + use std::collections::HashMap; 19 use std::ops::{Add, Mul, Sub}; 20 use std::sync::Arc; 21 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; ··· 23 use tracing::{Instrument, debug, error, info, trace, warn}; 24 use url::Url; 25 26 + const MAX_RETRY_ATTEMPTS: u32 = 5; 27 + const MAX_RETRY_BATCH: usize = 500; 28 + 29 #[derive(Debug, Serialize, Deserialize)] 30 struct RetryState { 31 + after: DateTime<Utc>, 32 + duration: TimeDelta, 33 + attempts: u32, 34 #[serde(serialize_with = "crate::util::ser_status_code")] 35 #[serde(deserialize_with = "crate::util::deser_status_code")] 36 status: Option<StatusCode>, ··· 42 Self { 43 duration, 44 after: Utc::now().add(duration), 45 + attempts: 0, 46 status: None, 47 } 48 } 49 50 + /// returns the next retry state with doubled duration and incremented attempt count, 51 + /// or `None` if the attempt count would reach the cap (entry left in db as-is). 52 + fn next_attempt(self) -> Option<Self> { 53 + let attempts = self.attempts + 1; 54 + if attempts >= MAX_RETRY_ATTEMPTS { 55 + return None; 56 + } 57 + let duration = self.duration * 2i32.pow(self.attempts.min(4)); 58 + Some(Self { 59 + after: Utc::now().add(duration), 60 + duration, 61 + attempts, 62 + status: None, 63 + }) 64 + } 65 + 66 fn with_status(mut self, code: StatusCode) -> Self { 67 self.status = Some(code); 68 self ··· 79 RetryState { 80 duration: after.sub(Utc::now()), 81 after, 82 + attempts: 0, 83 status: None, 84 } 85 } ··· 590 continue; 591 } 592 593 + // already in retry queue — let the retry thread handle it 594 + let retry_key = keys::crawler_retry_key(&repo.did); 595 + if db.crawler.contains_key(&retry_key).into_diagnostic()? { 596 + continue; 597 + } 598 + 599 if !Db::contains_key(db.repos.clone(), &did_key).await? { 600 unknown_dids.push(repo.did.into_static()); 601 } 602 } 603 604 let valid_dids = if filter.check_signals() && !unknown_dids.is_empty() { 605 + // we dont need to pass any existing since we have none; we are crawling after all 606 crawler 607 + .check_signals_batch(&unknown_dids, &filter, &mut batch, &HashMap::new()) 608 .await? 609 } else { 610 unknown_dids ··· 646 } 647 } 648 649 fn process_retry_queue(&self) -> Result<Option<DateTime<Utc>>> { 650 let db = &self.state.db; 651 let now = Utc::now(); 652 653 let mut ready: Vec<Did> = Vec::new(); 654 + let mut existing: HashMap<Did<'static>, RetryState> = HashMap::new(); 655 let mut next_retry: Option<DateTime<Utc>> = None; 656 + let mut had_more = false; 657 658 let mut rng: SmallRng = rand::make_rng(); 659 660 let mut batch = db.inner.batch(); 661 for guard in db.crawler.prefix(keys::CRAWLER_RETRY_PREFIX) { 662 let (key, val) = guard.into_inner().into_diagnostic()?; 663 + let state: RetryState = rmp_serde::from_slice(&val).into_diagnostic()?; 664 let did = keys::crawler_retry_parse_key(&key)?.to_did(); 665 666 + // leave capped entries alone for API inspection 667 + if state.attempts >= MAX_RETRY_ATTEMPTS { 668 + continue; 669 + } 670 + 671 let backoff = TimeDelta::seconds( 672 + state 673 + .duration 674 + .as_seconds_f64() 675 + .mul(rng.random_range(0.01..0.07)) as i64, 676 ); 677 + if state.after + backoff > now { 678 next_retry = Some( 679 next_retry 680 + .map(|earliest| earliest.min(state.after)) 681 + .unwrap_or(state.after), 682 ); 683 continue; 684 } 685 686 + if ready.len() >= MAX_RETRY_BATCH { 687 + had_more = true; 688 + break; 689 + } 690 + 691 + ready.push(did.clone()); 692 + existing.insert(did, state); 693 } 694 695 if ready.is_empty() { ··· 700 701 let handle = tokio::runtime::Handle::current(); 702 let filter = self.state.filter.load(); 703 + let valid_dids = 704 + handle.block_on(self.check_signals_batch(&ready, &filter, &mut batch, &existing))?; 705 706 let mut rng: SmallRng = rand::make_rng(); 707 for did in &valid_dids { ··· 723 handle.block_on(self.account_new_repos(valid_dids.len())); 724 } 725 726 + // if we hit the batch cap there are more ready entries, loop back immediately 727 + Ok(had_more.then(Utc::now).or(next_retry)) 728 } 729 730 async fn check_signals_batch( ··· 732 dids: &[Did<'static>], 733 filter: &Arc<crate::filter::FilterConfig>, 734 batch: &mut fjall::OwnedWriteBatch, 735 + existing: &HashMap<Did<'static>, RetryState>, 736 ) -> Result<Vec<Did<'static>>> { 737 let db = &self.state.db; 738 let mut valid = Vec::new(); ··· 752 let (did, result) = res.into_diagnostic()?; 753 match result { 754 CrawlCheckResult::Signal => { 755 + batch.remove(&db.crawler, keys::crawler_retry_key(&did)); 756 valid.push(did); 757 } 758 + CrawlCheckResult::NoSignal => { 759 + batch.remove(&db.crawler, keys::crawler_retry_key(&did)); 760 + } 761 CrawlCheckResult::Retry(state) => { 762 + let prev_attempts = existing.get(&did).map(|s| s.attempts).unwrap_or(0); 763 + let carried = RetryState { 764 + attempts: prev_attempts, 765 + ..state 766 + }; 767 + if let Some(next) = carried.next_attempt() { 768 + batch.insert( 769 + &db.crawler, 770 + keys::crawler_retry_key(&did), 771 + rmp_serde::to_vec(&next) 772 + .into_diagnostic() 773 + .wrap_err("cant ser retry state")?, 774 + ); 775 + } 776 + // next_attempt() == None means we've hit the cap; 777 + // leave the existing entry untouched for API inspection 778 } 779 } 780 }