at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall

[crawler] fix toctou with process_retry_queue next wake up calc, fix next_attempt duration logic

ptr.pet e558361e e263dacf

verified
+9 -17
+9 -17
src/crawler/mod.rs
··· 54 if attempts >= MAX_RETRY_ATTEMPTS { 55 return None; 56 } 57 - let duration = self.duration * 2i32.pow(self.attempts.min(4)); 58 Some(Self { 59 after: Utc::now().add(duration), 60 duration, ··· 396 397 loop { 398 match crawler.process_retry_queue() { 399 - Ok(Some(next)) => { 400 - let secs = next.signed_duration_since(Utc::now()); 401 - sleep(secs.to_std().expect("that time delta was positive")); 402 - } 403 - Ok(None) => { 404 - sleep(Duration::from_secs(60)); 405 - } 406 Err(e) => { 407 error!(err = %e, "retry loop failed"); 408 sleep(Duration::from_secs(60)); ··· 628 } 629 } 630 631 - fn process_retry_queue(&self) -> Result<Option<DateTime<Utc>>> { 632 let db = &self.state.db; 633 let now = Utc::now(); 634 635 let mut ready: Vec<Did> = Vec::new(); 636 let mut existing: HashMap<Did<'static>, RetryState> = HashMap::new(); 637 - let mut next_retry: Option<DateTime<Utc>> = None; 638 let mut had_more = false; 639 640 let mut rng: SmallRng = rand::make_rng(); ··· 657 .mul(rng.random_range(0.01..0.07)) as i64, 658 ); 659 if state.after + backoff > now { 660 - next_retry = Some( 661 - next_retry 662 - .map(|earliest| earliest.min(state.after)) 663 - .unwrap_or(state.after), 664 - ); 665 continue; 666 } 667 ··· 675 } 676 677 if ready.is_empty() { 678 - return Ok(next_retry); 679 } 680 681 info!(count = ready.len(), "retrying pending repos"); ··· 706 } 707 708 // if we hit the batch cap there are more ready entries, loop back immediately 709 - Ok(had_more.then(Utc::now).or(next_retry)) 710 } 711 712 async fn check_signals_batch(
··· 54 if attempts >= MAX_RETRY_ATTEMPTS { 55 return None; 56 } 57 + let duration = self.duration * 2; 58 Some(Self { 59 after: Utc::now().add(duration), 60 duration, ··· 396 397 loop { 398 match crawler.process_retry_queue() { 399 + Ok(Some(dur)) => sleep(dur), 400 + Ok(None) => sleep(Duration::from_secs(60)), 401 Err(e) => { 402 error!(err = %e, "retry loop failed"); 403 sleep(Duration::from_secs(60)); ··· 623 } 624 } 625 626 + fn process_retry_queue(&self) -> Result<Option<Duration>> { 627 let db = &self.state.db; 628 let now = Utc::now(); 629 630 let mut ready: Vec<Did> = Vec::new(); 631 let mut existing: HashMap<Did<'static>, RetryState> = HashMap::new(); 632 + let mut next_wake: Option<Duration> = None; 633 let mut had_more = false; 634 635 let mut rng: SmallRng = rand::make_rng(); ··· 652 .mul(rng.random_range(0.01..0.07)) as i64, 653 ); 654 if state.after + backoff > now { 655 + let wake = (state.after - now).to_std().unwrap_or(Duration::ZERO); 656 + next_wake = Some(next_wake.map(|w| w.min(wake)).unwrap_or(wake)); 657 continue; 658 } 659 ··· 667 } 668 669 if ready.is_empty() { 670 + return Ok(next_wake); 671 } 672 673 info!(count = ready.len(), "retrying pending repos"); ··· 698 } 699 700 // if we hit the batch cap there are more ready entries, loop back immediately 701 + Ok(had_more.then_some(Duration::ZERO).or(next_wake)) 702 } 703 704 async fn check_signals_batch(