at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall

[crawler] fix cursor getting reset to 0

ptr.pet 6638fccb 218ce852

verified
+37 -20
+34 -14
src/crawler/mod.rs
··· 10 use rand::RngExt; 11 use rand::rngs::SmallRng; 12 use reqwest::StatusCode; 13 use smol_str::SmolStr; 14 use std::future::Future; 15 use std::ops::Mul; ··· 376 ) 377 } 378 379 pub mod throttle; 380 - use throttle::{OrThrottle, Throttler}; 381 382 pub struct Crawler { 383 state: Arc<AppState>, ··· 505 let db = &crawler.state.db; 506 507 let cursor_key = b"crawler_cursor"; 508 - let mut cursor: Option<SmolStr> = Db::get(db.cursors.clone(), cursor_key.to_vec()) 509 - .await? 510 - .map(|bytes| { 511 - let s = String::from_utf8_lossy(&bytes); 512 - info!(cursor = %s, "resuming"); 513 - s.into() 514 - }); 515 let mut was_throttled = false; 516 517 loop { ··· 569 list_repos_url 570 .query_pairs_mut() 571 .append_pair("limit", "1000"); 572 - if let Some(c) = &cursor { 573 list_repos_url 574 .query_pairs_mut() 575 .append_pair("cursor", c.as_str()); ··· 664 } 665 666 if let Some(new_cursor) = output.cursor { 667 - cursor = Some(new_cursor.as_str().into()); 668 batch.insert( 669 &db.cursors, 670 cursor_key.to_vec(), ··· 672 ); 673 } else { 674 info!("reached end of list."); 675 - cursor = None; 676 } 677 678 tokio::task::spawn_blocking(move || batch.commit().into_diagnostic()) ··· 681 682 crawler.account_new_repos(to_queue.len()).await; 683 684 - if cursor.is_none() { 685 tokio::time::sleep(Duration::from_secs(3600)).await; 686 } 687 } ··· 699 700 let mut rng: SmallRng = rand::make_rng(); 701 702 for guard in db.crawler.prefix(keys::CRAWLER_RETRY_PREFIX) { 703 let (key, val) = guard.into_inner().into_diagnostic()?; 704 - let (retry_after, _) = keys::crawler_retry_parse_value(&val)?; 705 let did = keys::crawler_retry_parse_key(&key)?.to_did(); 706 707 // we check an extra backoff of 1 - 7% just to make it less likely for ··· 727 info!(count = ready.len(), "retrying pending repos"); 728 729 let handle = tokio::runtime::Handle::current(); 730 - let mut batch = db.inner.batch(); 731 let filter = self.state.filter.load(); 732 let valid_dids = handle.block_on(self.check_signals_batch(&ready, &filter, &mut batch))?; 733
··· 10 use rand::RngExt; 11 use rand::rngs::SmallRng; 12 use reqwest::StatusCode; 13 + use serde::{Deserialize, Serialize}; 14 use smol_str::SmolStr; 15 use std::future::Future; 16 use std::ops::Mul; ··· 377 ) 378 } 379 380 + #[derive(Debug, Serialize, Deserialize)] 381 + enum Cursor { 382 + Done, 383 + Next(Option<SmolStr>), 384 + } 385 + 386 pub mod throttle; 387 + use throttle::{OrFailure, Throttler}; 388 389 pub struct Crawler { 390 state: Arc<AppState>, ··· 512 let db = &crawler.state.db; 513 514 let cursor_key = b"crawler_cursor"; 515 + let cursor_bytes = Db::get(db.cursors.clone(), cursor_key.to_vec()).await?; 516 + let mut cursor: Cursor = cursor_bytes 517 + .as_deref() 518 + .map(rmp_serde::from_slice) 519 + .transpose() 520 + .into_diagnostic()? 521 + .unwrap_or(Cursor::Next(None)); 522 let mut was_throttled = false; 523 524 loop { ··· 576 list_repos_url 577 .query_pairs_mut() 578 .append_pair("limit", "1000"); 579 + if let Cursor::Next(Some(c)) = &cursor { 580 list_repos_url 581 .query_pairs_mut() 582 .append_pair("cursor", c.as_str()); ··· 671 } 672 673 if let Some(new_cursor) = output.cursor { 674 + cursor = Cursor::Next(Some(new_cursor.as_str().into())); 675 batch.insert( 676 &db.cursors, 677 cursor_key.to_vec(), ··· 679 ); 680 } else { 681 info!("reached end of list."); 682 + cursor = Cursor::Done; 683 } 684 685 tokio::task::spawn_blocking(move || batch.commit().into_diagnostic()) ··· 688 689 crawler.account_new_repos(to_queue.len()).await; 690 691 + if matches!(cursor, Cursor::Done) { 692 tokio::time::sleep(Duration::from_secs(3600)).await; 693 } 694 } ··· 706 707 let mut rng: SmallRng = rand::make_rng(); 708 709 + let mut batch = db.inner.batch(); 710 for guard in db.crawler.prefix(keys::CRAWLER_RETRY_PREFIX) { 711 let (key, val) = guard.into_inner().into_diagnostic()?; 712 + let (retry_after, _) = match keys::crawler_retry_parse_value(&val) { 713 + Ok(x) => x, 714 + Err(_) => { 715 + // this handles the old db format 716 + // todo: remove this later!! its just for testing... 717 + let retry_after = now + 60 * 5; 718 + batch.insert( 719 + &db.crawler, 720 + key.clone(), 721 + keys::crawler_retry_value(retry_after, 0), 722 + ); 723 + (retry_after, 0) 724 + } 725 + }; 726 let did = keys::crawler_retry_parse_key(&key)?.to_did(); 727 728 // we check an extra backoff of 1 - 7% just to make it less likely for ··· 748 info!(count = ready.len(), "retrying pending repos"); 749 750 let handle = tokio::runtime::Handle::current(); 751 let filter = self.state.filter.load(); 752 let valid_dids = handle.block_on(self.check_signals_batch(&ready, &filter, &mut batch))?; 753
+3 -6
src/crawler/throttle.rs
··· 148 } 149 } 150 151 - /// extension trait that adds `.or_throttle()` to any future returning `Result<T, E>`. 152 - /// 153 - /// races the future against a hard-failure notification. soft ratelimits (429) do NOT 154 - /// trigger cancellation — those are handled by the background retry loop. 155 #[allow(async_fn_in_trait)] 156 - pub trait OrThrottle<T, E>: Future<Output = Result<T, E>> { 157 async fn or_failure( 158 self, 159 handle: &ThrottleHandle, ··· 169 } 170 } 171 172 - impl<T, E, F: Future<Output = Result<T, E>>> OrThrottle<T, E> for F {}
··· 148 } 149 } 150 151 + /// adds a method for racing the future against a hard-failure notification. 152 #[allow(async_fn_in_trait)] 153 + pub trait OrFailure<T, E>: Future<Output = Result<T, E>> { 154 async fn or_failure( 155 self, 156 handle: &ThrottleHandle, ··· 166 } 167 } 168 169 + impl<T, E, F: Future<Output = Result<T, E>>> OrFailure<T, E> for F {}