at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall

[crawler] restart on 'fatal' errors

ptr.pet 9b3bea0a bea4e639

verified
+19 -13
+19 -13
src/crawler/mod.rs
··· 23 23 enum CrawlCheckResult { 24 24 Signal, 25 25 NoSignal, 26 - /// task could not complete; should be retried at `retry_after` (unix timestamp). 27 - /// `status` is the HTTP status that triggered this (0 for non-HTTP failures). 28 - Retry { 29 - retry_after: i64, 30 - status: u16, 31 - }, 26 + Retry { retry_after: i64, status: u16 }, 32 27 } 33 28 34 29 /// outcome of [`RetryWithBackoff::retry`] when the operation does not succeed. ··· 431 426 } 432 427 433 428 pub async fn run(self) -> Result<()> { 429 + let crawler = Arc::new(self); 430 + 434 431 // stats ticker 435 432 tokio::spawn({ 436 433 use std::time::Instant; 437 - let count = self.count.clone(); 438 - let crawled_count = self.crawled_count.clone(); 439 - let throttled = self.throttled.clone(); 440 - let pds_throttler = self.pds_throttler.clone(); 434 + let count = crawler.count.clone(); 435 + let crawled_count = crawler.crawled_count.clone(); 436 + let throttled = crawler.throttled.clone(); 437 + let pds_throttler = crawler.pds_throttler.clone(); 441 438 let mut last_time = Instant::now(); 442 439 let mut interval = tokio::time::interval(Duration::from_secs(60)); 443 440 async move { ··· 451 448 452 449 if delta_processed == 0 && delta_crawled == 0 { 453 450 if is_throttled { 454 - info!("crawler throttled: pending queue full"); 451 + info!("throttled: pending queue full"); 455 452 } else { 456 453 debug!("no repos crawled or processed in 60s"); 457 454 } ··· 463 460 processed = delta_processed, 464 461 crawled = delta_crawled, 465 462 elapsed, 466 - "crawler progress" 463 + "progress" 467 464 ); 468 465 last_time = Instant::now(); 469 466 } 470 467 } 471 468 }); 472 469 473 - let crawler = Arc::new(self); 470 + // retry thread 474 471 std::thread::spawn({ 475 472 let crawler = crawler.clone(); 476 473 let handle = tokio::runtime::Handle::current(); ··· 497 494 } 498 495 }); 499 496 497 + loop { 498 + if let Err(e) = Self::crawl(crawler.clone()).await { 499 + error!(err = %e, "fatal error, restarting in 30s"); 500 + tokio::time::sleep(Duration::from_secs(30)).await; 501 + } 502 + } 503 + } 504 + 505 + async fn crawl(crawler: Arc<Self>) -> Result<()> { 500 506 let mut relay_url = crawler.relay_host.clone(); 501 507 match relay_url.scheme() { 502 508 "wss" => relay_url