at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall

[crawler] fix cursor resetting to Next(None)

ptr.pet 1e32f8a6 80f5c529

verified
+22 -15
+22 -15
src/crawler/mod.rs
··· 14 14 use rand::rngs::SmallRng; 15 15 use reqwest::StatusCode; 16 16 use serde::{Deserialize, Serialize}; 17 - use smol_str::{SmolStr, ToSmolStr}; 17 + use smol_str::{SmolStr, ToSmolStr, format_smolstr}; 18 18 use std::collections::HashMap; 19 19 use std::ops::{Add, Mul, Sub}; 20 20 use std::sync::Arc; ··· 143 143 144 144 #[derive(Debug, Serialize, Deserialize)] 145 145 enum Cursor { 146 - Done, 146 + Done(SmolStr), 147 147 Next(Option<SmolStr>), 148 148 } 149 149 ··· 235 235 let cursor = Self::get_cursor(&crawler).await.map_or_else( 236 236 |e| e.to_smolstr(), 237 237 |c| match c { 238 - Cursor::Done => "done".to_smolstr(), 238 + Cursor::Done(c) => format_smolstr!("done({c})"), 239 239 Cursor::Next(None) => "none".to_smolstr(), 240 240 Cursor::Next(Some(c)) => c.to_smolstr(), 241 241 }, ··· 299 299 300 300 let mut cursor = Self::get_cursor(&crawler).await?; 301 301 302 - match cursor { 303 - Cursor::Next(Some(ref cursor)) => info!(cursor = %cursor, "resuming"), 302 + match &cursor { 303 + Cursor::Next(Some(c)) => info!(cursor = %c, "resuming"), 304 304 Cursor::Next(None) => info!("starting from scratch"), 305 - Cursor::Done => info!("was done, resuming"), 305 + Cursor::Done(c) => info!(cursor = %c, "was done, resuming"), 306 306 } 307 307 308 308 let mut was_throttled = false; ··· 473 473 miette::miette!("spawn_blocking task for parsing listRepos timed out") 474 474 })?; 475 475 476 - let Ok(Some(ParseResult { 476 + let ParseResult { 477 477 unknown_dids, 478 478 cursor: next_cursor, 479 479 count, 480 - })) = parse_result 481 - else { 482 - info!("finished enumeration (or empty page)"); 483 - tokio::time::sleep(Duration::from_secs(3600)).await; 484 - continue; 480 + } = match parse_result { 481 + Ok(Some(res)) => res, 482 + Ok(None) => { 483 + info!("finished enumeration (or empty page)"); 484 + if let Cursor::Next(Some(c)) = cursor { 485 + info!("reached end of list."); 486 + cursor = Cursor::Done(c); 487 + } 488 + tokio::time::sleep(Duration::from_secs(3600)).await; 489 + continue; 490 + } 491 + Err(e) => return Err(e).wrap_err("error while crawling"), 485 492 }; 486 493 487 494 debug!(count, "fetched repos"); ··· 508 515 509 516 if let Some(new_cursor) = next_cursor { 510 517 cursor = Cursor::Next(Some(new_cursor.as_str().into())); 511 - } else { 518 + } else if let Cursor::Next(Some(c)) = cursor { 512 519 info!("reached end of list."); 513 - cursor = Cursor::Done; 520 + cursor = Cursor::Done(c); 514 521 } 515 522 batch.insert( 516 523 &db.cursors, ··· 540 547 541 548 crawler.account_new_repos(to_queue.len()).await; 542 549 543 - if matches!(cursor, Cursor::Done) { 550 + if matches!(cursor, Cursor::Done(_)) { 544 551 tokio::time::sleep(Duration::from_secs(3600)).await; 545 552 } 546 553 }