at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall
at main 896 lines 32 kB view raw
1use crate::crawler::throttle::ThrottleHandle; 2use crate::db::{Db, keys, ser_repo_state}; 3use crate::state::AppState; 4use crate::types::RepoState; 5use crate::util::{ErrorForStatus, RetryOutcome, RetryWithBackoff, parse_retry_after}; 6use chrono::{DateTime, TimeDelta, Utc}; 7use futures::FutureExt; 8use jacquard_api::com_atproto::repo::describe_repo::DescribeRepoOutput; 9use jacquard_api::com_atproto::sync::list_repos::ListReposOutput; 10use jacquard_common::{IntoStatic, types::string::Did}; 11use miette::{Context, IntoDiagnostic, Result}; 12use rand::Rng; 13use rand::RngExt; 14use rand::rngs::SmallRng; 15use reqwest::StatusCode; 16use serde::{Deserialize, Serialize}; 17use smol_str::{SmolStr, ToSmolStr, format_smolstr}; 18use std::collections::HashMap; 19use std::ops::{Add, Mul, Sub}; 20use std::sync::Arc; 21use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; 22use std::time::Duration; 23use tracing::{Instrument, debug, error, info, trace, warn}; 24use url::Url; 25 26const MAX_RETRY_ATTEMPTS: u32 = 5; 27const MAX_RETRY_BATCH: usize = 1000; 28 29#[derive(Debug, Serialize, Deserialize)] 30struct RetryState { 31 after: DateTime<Utc>, 32 duration: TimeDelta, 33 attempts: u32, 34 #[serde(serialize_with = "crate::util::ser_status_code")] 35 #[serde(deserialize_with = "crate::util::deser_status_code")] 36 status: Option<StatusCode>, 37} 38 39impl RetryState { 40 fn new(secs: i64) -> Self { 41 let duration = TimeDelta::seconds(secs); 42 Self { 43 duration, 44 after: Utc::now().add(duration), 45 attempts: 0, 46 status: None, 47 } 48 } 49 50 /// returns the next retry state with doubled duration and incremented attempt count, 51 /// or `None` if the attempt count would reach the cap (entry left in db as-is). 52 fn next_attempt(self) -> Option<Self> { 53 let attempts = self.attempts + 1; 54 if attempts >= MAX_RETRY_ATTEMPTS { 55 return None; 56 } 57 let duration = self.duration * 2; 58 Some(Self { 59 after: Utc::now().add(duration), 60 duration, 61 attempts, 62 status: None, 63 }) 64 } 65 66 fn with_status(mut self, code: StatusCode) -> Self { 67 self.status = Some(code); 68 self 69 } 70} 71 72trait ToRetryState { 73 fn to_retry_state(&self) -> RetryState; 74} 75 76impl ToRetryState for ThrottleHandle { 77 fn to_retry_state(&self) -> RetryState { 78 let after = chrono::DateTime::from_timestamp_secs(self.throttled_until()).unwrap(); 79 RetryState { 80 duration: after.sub(Utc::now()), 81 after, 82 attempts: 0, 83 status: None, 84 } 85 } 86} 87 88enum CrawlCheckResult { 89 Signal, 90 NoSignal, 91 Retry(RetryState), 92} 93 94impl From<RetryState> for CrawlCheckResult { 95 fn from(value: RetryState) -> Self { 96 Self::Retry(value) 97 } 98} 99 100fn is_throttle_worthy(e: &reqwest::Error) -> bool { 101 use std::error::Error; 102 103 if e.is_timeout() { 104 return true; 105 } 106 107 let mut src = e.source(); 108 while let Some(s) = src { 109 if let Some(io_err) = s.downcast_ref::<std::io::Error>() { 110 if is_tls_cert_error(io_err) { 111 return true; 112 } 113 } 114 src = s.source(); 115 } 116 117 e.status().map_or(false, |s| { 118 matches!( 119 s, 120 StatusCode::BAD_GATEWAY 121 | StatusCode::SERVICE_UNAVAILABLE 122 | StatusCode::GATEWAY_TIMEOUT 123 | crate::util::CONNECTION_TIMEOUT 124 | crate::util::SITE_FROZEN 125 ) 126 }) 127} 128 129fn is_tls_cert_error(io_err: &std::io::Error) -> bool { 130 let Some(inner) = io_err.get_ref() else { 131 return false; 132 }; 133 if let Some(rustls_err) = inner.downcast_ref::<rustls::Error>() { 134 return matches!(rustls_err, rustls::Error::InvalidCertificate(_)); 135 } 136 if let Some(nested_io) = inner.downcast_ref::<std::io::Error>() { 137 return is_tls_cert_error(nested_io); 138 } 139 false 140} 141 142const CURSOR_KEY: &[u8] = b"crawler_cursor"; 143 144#[derive(Debug, Serialize, Deserialize)] 145enum Cursor { 146 Done(SmolStr), 147 Next(Option<SmolStr>), 148} 149 150pub mod throttle; 151use throttle::{OrFailure, Throttler}; 152 153pub struct Crawler { 154 state: Arc<AppState>, 155 relay_host: Url, 156 http: reqwest::Client, 157 max_pending: usize, 158 resume_pending: usize, 159 count: AtomicUsize, 160 crawled_count: AtomicUsize, 161 throttled: AtomicBool, 162 pds_throttler: Throttler, 163} 164 165impl Crawler { 166 pub fn new( 167 state: Arc<AppState>, 168 relay_host: Url, 169 max_pending: usize, 170 resume_pending: usize, 171 ) -> Self { 172 let http = reqwest::Client::builder() 173 .user_agent(concat!( 174 env!("CARGO_PKG_NAME"), 175 "/", 176 env!("CARGO_PKG_VERSION") 177 )) 178 .gzip(true) 179 .build() 180 .expect("that reqwest will build"); 181 182 Self { 183 state, 184 relay_host, 185 http, 186 max_pending, 187 resume_pending, 188 count: AtomicUsize::new(0), 189 crawled_count: AtomicUsize::new(0), 190 throttled: AtomicBool::new(false), 191 pds_throttler: Throttler::new(), 192 } 193 } 194 195 async fn get_cursor(&self) -> Result<Cursor> { 196 let cursor_bytes = Db::get(self.state.db.cursors.clone(), CURSOR_KEY).await?; 197 let cursor: Cursor = cursor_bytes 198 .as_deref() 199 .map(rmp_serde::from_slice) 200 .transpose() 201 .into_diagnostic() 202 .wrap_err("can't parse cursor")? 203 .unwrap_or(Cursor::Next(None)); 204 Ok(cursor) 205 } 206 207 pub async fn run(self) -> Result<()> { 208 let crawler = Arc::new(self); 209 210 // stats ticker 211 let ticker = tokio::spawn({ 212 use std::time::Instant; 213 let crawler = crawler.clone(); 214 let mut last_time = Instant::now(); 215 let mut interval = tokio::time::interval(Duration::from_secs(60)); 216 async move { 217 loop { 218 interval.tick().await; 219 let delta_processed = crawler.count.swap(0, Ordering::Relaxed); 220 let delta_crawled = crawler.crawled_count.swap(0, Ordering::Relaxed); 221 let is_throttled = crawler.throttled.load(Ordering::Relaxed); 222 223 crawler.pds_throttler.evict_clean().await; 224 225 if delta_processed == 0 && delta_crawled == 0 { 226 if is_throttled { 227 info!("throttled: pending queue full"); 228 } else { 229 info!("idle: no repos crawled or processed in 60s"); 230 } 231 continue; 232 } 233 234 let elapsed = last_time.elapsed().as_secs_f64(); 235 let cursor = Self::get_cursor(&crawler).await.map_or_else( 236 |e| e.to_smolstr(), 237 |c| match c { 238 Cursor::Done(c) => format_smolstr!("done({c})"), 239 Cursor::Next(None) => "none".to_smolstr(), 240 Cursor::Next(Some(c)) => c.to_smolstr(), 241 }, 242 ); 243 info!( 244 %cursor, 245 processed = delta_processed, 246 crawled = delta_crawled, 247 elapsed, 248 "progress" 249 ); 250 last_time = Instant::now(); 251 } 252 } 253 }); 254 tokio::spawn(async move { 255 let Err(e) = ticker.await; 256 error!(err = ?e, "stats ticker panicked, aborting"); 257 std::process::abort(); 258 }); 259 260 // retry thread 261 std::thread::spawn({ 262 let crawler = crawler.clone(); 263 let handle = tokio::runtime::Handle::current(); 264 move || { 265 use std::thread::sleep; 266 267 let _g = handle.enter(); 268 269 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { 270 loop { 271 match crawler.process_retry_queue() { 272 Ok(Some(dur)) => sleep(dur.max(Duration::from_secs(1))), 273 Ok(None) => sleep(Duration::from_secs(60)), 274 Err(e) => { 275 error!(err = %e, "retry loop failed"); 276 sleep(Duration::from_secs(60)); 277 } 278 } 279 } 280 })); 281 if result.is_err() { 282 error!("retry thread panicked, aborting"); 283 std::process::abort(); 284 } 285 } 286 }); 287 288 loop { 289 if let Err(e) = Self::crawl(crawler.clone()).await { 290 error!(err = ?e, "fatal error, restarting in 30s"); 291 tokio::time::sleep(Duration::from_secs(30)).await; 292 } 293 } 294 } 295 296 async fn crawl(crawler: Arc<Self>) -> Result<()> { 297 let mut relay_url = crawler.relay_host.clone(); 298 match relay_url.scheme() { 299 "wss" => relay_url 300 .set_scheme("https") 301 .map_err(|_| miette::miette!("invalid url: {relay_url}"))?, 302 "ws" => relay_url 303 .set_scheme("http") 304 .map_err(|_| miette::miette!("invalid url: {relay_url}"))?, 305 _ => {} 306 } 307 308 let mut rng: SmallRng = rand::make_rng(); 309 let db = &crawler.state.db; 310 311 let mut cursor = crawler.get_cursor().await?; 312 313 match &cursor { 314 Cursor::Next(Some(c)) => info!(cursor = %c, "resuming"), 315 Cursor::Next(None) => info!("starting from scratch"), 316 Cursor::Done(c) => info!(cursor = %c, "was done, resuming"), 317 } 318 319 let mut was_throttled = false; 320 loop { 321 // throttle check 322 loop { 323 let pending = crawler.state.db.get_count("pending").await; 324 if pending > crawler.max_pending as u64 { 325 if !was_throttled { 326 debug!( 327 pending, 328 max = crawler.max_pending, 329 "throttling: above max pending" 330 ); 331 was_throttled = true; 332 crawler.throttled.store(true, Ordering::Relaxed); 333 } 334 tokio::time::sleep(Duration::from_secs(5)).await; 335 } else if pending > crawler.resume_pending as u64 { 336 if !was_throttled { 337 debug!( 338 pending, 339 resume = crawler.resume_pending, 340 "throttling: entering cooldown" 341 ); 342 was_throttled = true; 343 crawler.throttled.store(true, Ordering::Relaxed); 344 } 345 346 loop { 347 let current_pending = crawler.state.db.get_count("pending").await; 348 if current_pending <= crawler.resume_pending as u64 { 349 break; 350 } 351 debug!( 352 pending = current_pending, 353 resume = crawler.resume_pending, 354 "cooldown, waiting" 355 ); 356 tokio::time::sleep(Duration::from_secs(5)).await; 357 } 358 break; 359 } else { 360 if was_throttled { 361 info!("throttling released"); 362 was_throttled = false; 363 crawler.throttled.store(false, Ordering::Relaxed); 364 } 365 break; 366 } 367 } 368 369 let mut list_repos_url = relay_url 370 .join("/xrpc/com.atproto.sync.listRepos") 371 .into_diagnostic()?; 372 list_repos_url 373 .query_pairs_mut() 374 .append_pair("limit", "1000"); 375 if let Cursor::Next(Some(c)) = &cursor { 376 list_repos_url 377 .query_pairs_mut() 378 .append_pair("cursor", c.as_str()); 379 } 380 381 let fetch_result = (|| { 382 crawler 383 .http 384 .get(list_repos_url.clone()) 385 .send() 386 .error_for_status() 387 }) 388 .retry(5, |e: &reqwest::Error, attempt| { 389 matches!(e.status(), Some(StatusCode::TOO_MANY_REQUESTS)) 390 .then(|| Duration::from_secs(1 << attempt.min(5))) 391 }) 392 .await; 393 394 let res = match fetch_result { 395 Ok(r) => r, 396 Err(RetryOutcome::Ratelimited) => { 397 warn!("rate limited by relay after retries"); 398 continue; 399 } 400 Err(RetryOutcome::Failed(e)) => { 401 error!(err = %e, "crawler failed to fetch listRepos"); 402 continue; 403 } 404 }; 405 406 let bytes = match res.bytes().await { 407 Ok(b) => b, 408 Err(e) => { 409 error!(err = %e, "cant read listRepos response"); 410 continue; 411 } 412 }; 413 414 let mut batch = db.inner.batch(); 415 let filter = crawler.state.filter.load(); 416 417 struct ParseResult { 418 unknown_dids: Vec<Did<'static>>, 419 cursor: Option<smol_str::SmolStr>, 420 count: usize, 421 } 422 423 const BLOCKING_TASK_TIMEOUT: Duration = Duration::from_secs(30); 424 425 let parse_result = { 426 let repos = db.repos.clone(); 427 let filter_ks = db.filter.clone(); 428 let crawler_ks = db.crawler.clone(); 429 430 // this wont actually cancel the task since spawn_blocking isnt cancel safe 431 // but at least we'll see whats going on? 432 tokio::time::timeout( 433 BLOCKING_TASK_TIMEOUT, 434 tokio::task::spawn_blocking(move || -> miette::Result<Option<ParseResult>> { 435 let output = match serde_json::from_slice::<ListReposOutput>(&bytes) { 436 Ok(out) => out.into_static(), 437 Err(e) => { 438 error!(err = %e, "failed to parse listRepos response"); 439 return Ok(None); 440 } 441 }; 442 443 if output.repos.is_empty() { 444 return Ok(None); 445 } 446 447 let count = output.repos.len(); 448 let next_cursor = output.cursor.map(|c| c.as_str().into()); 449 let mut unknown = Vec::new(); 450 for repo in output.repos { 451 let excl_key = crate::db::filter::exclude_key(repo.did.as_str())?; 452 if filter_ks.contains_key(&excl_key).into_diagnostic()? { 453 continue; 454 } 455 456 // already in retry queue — let the retry thread handle it 457 let retry_key = keys::crawler_retry_key(&repo.did); 458 if crawler_ks.contains_key(&retry_key).into_diagnostic()? { 459 continue; 460 } 461 462 let did_key = keys::repo_key(&repo.did); 463 if !repos.contains_key(&did_key).into_diagnostic()? { 464 unknown.push(repo.did.into_static()); 465 } 466 } 467 468 Ok(Some(ParseResult { 469 unknown_dids: unknown, 470 cursor: next_cursor, 471 count, 472 })) 473 }), 474 ) 475 .await 476 } 477 .into_diagnostic()? 478 .map_err(|_| { 479 error!( 480 "spawn_blocking task for parsing listRepos timed out after {}", 481 BLOCKING_TASK_TIMEOUT.as_secs() 482 ); 483 miette::miette!("spawn_blocking task for parsing listRepos timed out") 484 })?; 485 486 let ParseResult { 487 unknown_dids, 488 cursor: next_cursor, 489 count, 490 } = match parse_result { 491 Ok(Some(res)) => res, 492 Ok(None) => { 493 info!("finished enumeration (or empty page)"); 494 if let Cursor::Next(Some(c)) = cursor { 495 info!("reached end of list."); 496 cursor = Cursor::Done(c); 497 } 498 info!("sleeping 1h before next enumeration pass"); 499 tokio::time::sleep(Duration::from_secs(3600)).await; 500 info!("resuming after 1h sleep"); 501 continue; 502 } 503 Err(e) => return Err(e).wrap_err("error while crawling"), 504 }; 505 506 debug!(count, "fetched repos"); 507 crawler.crawled_count.fetch_add(count, Ordering::Relaxed); 508 509 let valid_dids = if filter.check_signals() && !unknown_dids.is_empty() { 510 // we dont need to pass any existing since we have none; we are crawling after all 511 crawler 512 .check_signals_batch(&unknown_dids, &filter, &mut batch, &HashMap::new()) 513 .await? 514 } else { 515 unknown_dids 516 }; 517 518 for did in &valid_dids { 519 let did_key = keys::repo_key(did); 520 trace!(did = %did, "found new repo"); 521 522 let state = RepoState::untracked(rng.next_u64()); 523 batch.insert(&db.repos, &did_key, ser_repo_state(&state)?); 524 batch.insert(&db.pending, keys::pending_key(state.index_id), &did_key); 525 } 526 527 if let Some(new_cursor) = next_cursor { 528 cursor = Cursor::Next(Some(new_cursor.as_str().into())); 529 } else if let Cursor::Next(Some(c)) = cursor { 530 info!("reached end of list."); 531 cursor = Cursor::Done(c); 532 } 533 batch.insert( 534 &db.cursors, 535 CURSOR_KEY, 536 rmp_serde::to_vec(&cursor) 537 .into_diagnostic() 538 .wrap_err("cant serialize cursor")?, 539 ); 540 541 tokio::time::timeout( 542 BLOCKING_TASK_TIMEOUT, 543 tokio::task::spawn_blocking(move || batch.commit().into_diagnostic()), 544 ) 545 .await 546 .into_diagnostic()? 547 .map_err(|_| { 548 error!( 549 "spawn_blocking task for batch commit timed out after {}", 550 BLOCKING_TASK_TIMEOUT.as_secs() 551 ); 552 miette::miette!("spawn_blocking task for batch commit timed out") 553 })? 554 .inspect_err(|e| { 555 error!(err = ?e, "batch commit failed"); 556 }) 557 .ok(); 558 559 crawler.account_new_repos(valid_dids.len()).await; 560 561 if matches!(cursor, Cursor::Done(_)) { 562 info!("enumeration complete, sleeping 1h before next pass"); 563 tokio::time::sleep(Duration::from_secs(3600)).await; 564 info!("resuming after 1h sleep"); 565 } 566 } 567 } 568 569 fn process_retry_queue(&self) -> Result<Option<Duration>> { 570 let db = &self.state.db; 571 let now = Utc::now(); 572 573 let mut ready: Vec<Did> = Vec::new(); 574 let mut existing: HashMap<Did<'static>, RetryState> = HashMap::new(); 575 let mut next_wake: Option<Duration> = None; 576 let mut had_more = false; 577 578 let mut rng: SmallRng = rand::make_rng(); 579 580 let mut batch = db.inner.batch(); 581 for guard in db.crawler.prefix(keys::CRAWLER_RETRY_PREFIX) { 582 let (key, val) = guard.into_inner().into_diagnostic()?; 583 let state: RetryState = rmp_serde::from_slice(&val).into_diagnostic()?; 584 let did = keys::crawler_retry_parse_key(&key)?.to_did(); 585 586 // leave capped entries alone for API inspection 587 if state.attempts >= MAX_RETRY_ATTEMPTS { 588 continue; 589 } 590 591 let backoff = TimeDelta::seconds( 592 state 593 .duration 594 .as_seconds_f64() 595 .mul(rng.random_range(0.01..0.07)) as i64, 596 ); 597 if state.after + backoff > now { 598 let wake = (state.after - now).to_std().unwrap_or(Duration::ZERO); 599 next_wake = Some(next_wake.map(|w| w.min(wake)).unwrap_or(wake)); 600 continue; 601 } 602 603 if ready.len() >= MAX_RETRY_BATCH { 604 had_more = true; 605 break; 606 } 607 608 ready.push(did.clone()); 609 existing.insert(did, state); 610 } 611 612 if ready.is_empty() { 613 return Ok(next_wake); 614 } 615 616 debug!(count = ready.len(), "retrying pending repos"); 617 618 let handle = tokio::runtime::Handle::current(); 619 let filter = self.state.filter.load(); 620 let valid_dids = 621 handle.block_on(self.check_signals_batch(&ready, &filter, &mut batch, &existing))?; 622 623 let mut rng: SmallRng = rand::make_rng(); 624 for did in &valid_dids { 625 let did_key = keys::repo_key(did); 626 627 if db.repos.contains_key(&did_key).into_diagnostic()? { 628 continue; 629 } 630 631 let state = RepoState::untracked(rng.next_u64()); 632 batch.insert(&db.repos, &did_key, ser_repo_state(&state)?); 633 batch.insert(&db.pending, keys::pending_key(state.index_id), &did_key); 634 } 635 636 batch.commit().into_diagnostic()?; 637 638 if !valid_dids.is_empty() { 639 info!(count = valid_dids.len(), "recovered from retry queue"); 640 handle.block_on(self.account_new_repos(valid_dids.len())); 641 } 642 643 // if we hit the batch cap there are more ready entries, loop back immediately 644 Ok(had_more.then_some(Duration::ZERO).or(next_wake)) 645 } 646 647 fn check_repo_signals( 648 &self, 649 filter: Arc<crate::filter::FilterConfig>, 650 did: Did<'static>, 651 ) -> impl Future<Output = (Did<'static>, CrawlCheckResult)> + Send + 'static { 652 let resolver = self.state.resolver.clone(); 653 let http = self.http.clone(); 654 let throttler = self.pds_throttler.clone(); 655 async move { 656 const MAX_RETRIES: u32 = 5; 657 658 let pds_url = (|| resolver.resolve_identity_info(&did)) 659 .retry(MAX_RETRIES, |e, attempt| { 660 matches!(e, crate::resolver::ResolverError::Ratelimited) 661 .then(|| Duration::from_secs(1 << attempt.min(5))) 662 }) 663 .await; 664 665 let pds_url = match pds_url { 666 Ok((url, _)) => url, 667 Err(RetryOutcome::Ratelimited) => { 668 error!( 669 retries = MAX_RETRIES, 670 "rate limited resolving identity, giving up" 671 ); 672 // no pds handle to read retry_after from; use a short default 673 return (did, RetryState::new(60).into()); 674 } 675 Err(RetryOutcome::Failed(e)) => { 676 error!(err = %e, "failed to resolve identity"); 677 return (did, RetryState::new(60).into()); 678 } 679 }; 680 681 let throttle = throttler.get_handle(&pds_url).await; 682 if throttle.is_throttled() { 683 trace!(host = pds_url.host_str(), "skipping throttled pds"); 684 return (did, throttle.to_retry_state().into()); 685 } 686 687 let _permit = throttle.acquire().unit_error().or_failure(&throttle, || ()); 688 let Ok(_permit) = _permit.await else { 689 trace!( 690 host = pds_url.host_str(), 691 "pds failed while waiting for permit" 692 ); 693 return (did, throttle.to_retry_state().into()); 694 }; 695 696 enum RequestError { 697 Reqwest(reqwest::Error), 698 RateLimited(Option<u64>), 699 /// hard failure notification from another task on this PDS 700 Throttled, 701 } 702 703 let mut describe_url = pds_url.join("/xrpc/com.atproto.repo.describeRepo").unwrap(); 704 describe_url.query_pairs_mut().append_pair("repo", &did); 705 706 let resp = async { 707 let resp = http 708 .get(describe_url) 709 .timeout(throttle.timeout()) 710 .send() 711 .await 712 .map_err(RequestError::Reqwest)?; 713 714 // dont retry ratelimits since we will just put it in a queue to be tried again later 715 if resp.status() == StatusCode::TOO_MANY_REQUESTS { 716 return Err(RequestError::RateLimited(parse_retry_after(&resp))); 717 } 718 719 resp.error_for_status().map_err(RequestError::Reqwest) 720 } 721 .or_failure(&throttle, || RequestError::Throttled) 722 .await; 723 724 let resp = match resp { 725 Ok(r) => { 726 throttle.record_success(); 727 r 728 } 729 Err(RequestError::RateLimited(secs)) => { 730 throttle.record_ratelimit(secs); 731 return ( 732 did, 733 throttle 734 .to_retry_state() 735 .with_status(StatusCode::TOO_MANY_REQUESTS) 736 .into(), 737 ); 738 } 739 Err(RequestError::Throttled) => { 740 return (did, throttle.to_retry_state().into()); 741 } 742 Err(RequestError::Reqwest(e)) => { 743 if e.is_timeout() && !throttle.record_timeout() { 744 // first or second timeout, just requeue 745 let mut retry_state = RetryState::new(60); 746 retry_state.status = e.status(); 747 return (did, retry_state.into()); 748 } 749 // third timeout, if timeout fail is_throttle_worthy will ban the pds 750 751 if is_throttle_worthy(&e) { 752 if let Some(mins) = throttle.record_failure() { 753 warn!(url = %pds_url, mins, "throttling pds due to hard failure"); 754 } 755 let mut retry_state = throttle.to_retry_state(); 756 retry_state.status = e.status(); 757 return (did, retry_state.into()); 758 } 759 760 match e.status() { 761 Some(StatusCode::NOT_FOUND | StatusCode::GONE) => { 762 trace!("repo not found"); 763 return (did, CrawlCheckResult::NoSignal); 764 } 765 Some(s) if s.is_client_error() => { 766 error!(status = %s, "repo unavailable"); 767 return (did, CrawlCheckResult::NoSignal); 768 } 769 _ => { 770 error!(err = %e, "repo errored"); 771 let mut retry_state = RetryState::new(60 * 15); 772 retry_state.status = e.status(); 773 return (did, retry_state.into()); 774 } 775 } 776 } 777 }; 778 779 let bytes = match resp.bytes().await { 780 Ok(b) => b, 781 Err(e) => { 782 error!(err = %e, "failed to read describeRepo response"); 783 return (did, RetryState::new(60 * 5).into()); 784 } 785 }; 786 787 let out = match serde_json::from_slice::<DescribeRepoOutput>(&bytes) { 788 Ok(out) => out, 789 Err(e) => { 790 error!(err = %e, "failed to parse describeRepo response"); 791 return (did, RetryState::new(60 * 10).into()); 792 } 793 }; 794 795 let found_signal = out 796 .collections 797 .iter() 798 .any(|col| filter.matches_signal(col.as_str())); 799 800 if !found_signal { 801 trace!("no signal-matching collections found"); 802 } 803 804 return ( 805 did, 806 found_signal 807 .then_some(CrawlCheckResult::Signal) 808 .unwrap_or(CrawlCheckResult::NoSignal), 809 ); 810 } 811 } 812 813 async fn check_signals_batch( 814 &self, 815 dids: &[Did<'static>], 816 filter: &Arc<crate::filter::FilterConfig>, 817 batch: &mut fjall::OwnedWriteBatch, 818 existing: &HashMap<Did<'static>, RetryState>, 819 ) -> Result<Vec<Did<'static>>> { 820 let db = &self.state.db; 821 let mut valid = Vec::new(); 822 let mut set = tokio::task::JoinSet::new(); 823 824 for did in dids { 825 let did = did.clone(); 826 let filter = filter.clone(); 827 let span = tracing::info_span!("signals", did = %did); 828 set.spawn(self.check_repo_signals(filter, did).instrument(span)); 829 } 830 831 while let Some(res) = tokio::time::timeout(Duration::from_secs(60), set.join_next()) 832 .await 833 .into_diagnostic() 834 .map_err(|_| { 835 error!("signal check task timed out after 60s"); 836 miette::miette!("signal check task timed out") 837 })? 838 { 839 let (did, result) = match res { 840 Ok(inner) => inner, 841 Err(e) => { 842 error!(err = ?e, "signal check task failed or panicked"); 843 continue; 844 } 845 }; 846 847 match result { 848 CrawlCheckResult::Signal => { 849 batch.remove(&db.crawler, keys::crawler_retry_key(&did)); 850 valid.push(did); 851 } 852 CrawlCheckResult::NoSignal => { 853 batch.remove(&db.crawler, keys::crawler_retry_key(&did)); 854 } 855 CrawlCheckResult::Retry(state) => { 856 let prev_attempts = existing.get(&did).map(|s| s.attempts).unwrap_or(0); 857 let carried = RetryState { 858 attempts: prev_attempts, 859 ..state 860 }; 861 let next = match carried.next_attempt() { 862 Some(next) => next, 863 None => RetryState { 864 attempts: MAX_RETRY_ATTEMPTS, 865 ..state 866 }, 867 }; 868 batch.insert( 869 &db.crawler, 870 keys::crawler_retry_key(&did), 871 rmp_serde::to_vec(&next).into_diagnostic()?, 872 ); 873 } 874 } 875 } 876 877 Ok(valid) 878 } 879 880 async fn account_new_repos(&self, count: usize) { 881 if count == 0 { 882 return; 883 } 884 885 self.count.fetch_add(count, Ordering::Relaxed); 886 self.state 887 .db 888 .update_count_async("repos", count as i64) 889 .await; 890 self.state 891 .db 892 .update_count_async("pending", count as i64) 893 .await; 894 self.state.notify_backfill(); 895 } 896}