at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall

[crawler] also log how many repos were crawled alongside processed

ptr.pet 800449f7 34afb38d

verified
+17 -7
+17 -7
src/crawler/mod.rs
··· 202 max_pending: usize, 203 resume_pending: usize, 204 count: Arc<AtomicUsize>, 205 } 206 207 impl Crawler { ··· 230 max_pending, 231 resume_pending, 232 count: Arc::new(AtomicUsize::new(0)), 233 } 234 } 235 ··· 237 tokio::spawn({ 238 use std::time::Instant; 239 let count = self.count.clone(); 240 let mut last_time = Instant::now(); 241 let mut interval = tokio::time::interval(Duration::from_secs(60)); 242 async move { 243 loop { 244 interval.tick().await; 245 - let delta = count.swap(0, Ordering::Relaxed); 246 - if delta == 0 { 247 - debug!("no repos processed in 60s"); 248 continue; 249 } 250 let elapsed = last_time.elapsed().as_secs_f64(); 251 - let rate = (elapsed > 0.0) 252 - .then(|| delta as f64 / elapsed) 253 - .unwrap_or(0.0); 254 - info!(rate, delta, elapsed, "crawler progress"); 255 last_time = Instant::now(); 256 } 257 } ··· 378 } 379 380 debug!(count = output.repos.len(), "fetched repos"); 381 382 let mut batch = db.inner.batch(); 383 let mut to_queue = Vec::new();
··· 202 max_pending: usize, 203 resume_pending: usize, 204 count: Arc<AtomicUsize>, 205 + crawled_count: Arc<AtomicUsize>, 206 } 207 208 impl Crawler { ··· 231 max_pending, 232 resume_pending, 233 count: Arc::new(AtomicUsize::new(0)), 234 + crawled_count: Arc::new(AtomicUsize::new(0)), 235 } 236 } 237 ··· 239 tokio::spawn({ 240 use std::time::Instant; 241 let count = self.count.clone(); 242 + let crawled_count = self.crawled_count.clone(); 243 let mut last_time = Instant::now(); 244 let mut interval = tokio::time::interval(Duration::from_secs(60)); 245 async move { 246 loop { 247 interval.tick().await; 248 + let delta_processed = count.swap(0, Ordering::Relaxed); 249 + let delta_crawled = crawled_count.swap(0, Ordering::Relaxed); 250 + 251 + if delta_processed == 0 && delta_crawled == 0 { 252 + debug!("no repos crawled or processed in 60s"); 253 continue; 254 } 255 + 256 let elapsed = last_time.elapsed().as_secs_f64(); 257 + info!( 258 + processed = delta_processed, 259 + crawled = delta_crawled, 260 + elapsed, 261 + "crawler progress" 262 + ); 263 last_time = Instant::now(); 264 } 265 } ··· 386 } 387 388 debug!(count = output.repos.len(), "fetched repos"); 389 + self.crawled_count 390 + .fetch_add(output.repos.len(), Ordering::Relaxed); 391 392 let mut batch = db.inner.batch(); 393 let mut to_queue = Vec::new();