at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
1use crate::crawler::throttle::ThrottleHandle;
2use crate::db::{Db, keys, ser_repo_state};
3use crate::state::AppState;
4use crate::types::RepoState;
5use crate::util::{ErrorForStatus, RetryOutcome, RetryWithBackoff, parse_retry_after};
6use chrono::{DateTime, TimeDelta, Utc};
7use futures::FutureExt;
8use jacquard_api::com_atproto::repo::describe_repo::DescribeRepoOutput;
9use jacquard_api::com_atproto::sync::list_repos::ListReposOutput;
10use jacquard_common::{IntoStatic, types::string::Did};
11use miette::{Context, IntoDiagnostic, Result};
12use rand::Rng;
13use rand::RngExt;
14use rand::rngs::SmallRng;
15use reqwest::StatusCode;
16use serde::{Deserialize, Serialize};
17use smol_str::{SmolStr, ToSmolStr, format_smolstr};
18use std::collections::HashMap;
19use std::ops::{Add, Mul, Sub};
20use std::sync::Arc;
21use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
22use std::time::Duration;
23use tracing::{Instrument, debug, error, info, trace, warn};
24use url::Url;
25
26const MAX_RETRY_ATTEMPTS: u32 = 5;
27const MAX_RETRY_BATCH: usize = 1000;
28
29#[derive(Debug, Serialize, Deserialize)]
30struct RetryState {
31 after: DateTime<Utc>,
32 duration: TimeDelta,
33 attempts: u32,
34 #[serde(serialize_with = "crate::util::ser_status_code")]
35 #[serde(deserialize_with = "crate::util::deser_status_code")]
36 status: Option<StatusCode>,
37}
38
39impl RetryState {
40 fn new(secs: i64) -> Self {
41 let duration = TimeDelta::seconds(secs);
42 Self {
43 duration,
44 after: Utc::now().add(duration),
45 attempts: 0,
46 status: None,
47 }
48 }
49
50 /// returns the next retry state with doubled duration and incremented attempt count,
51 /// or `None` if the attempt count would reach the cap (entry left in db as-is).
52 fn next_attempt(self) -> Option<Self> {
53 let attempts = self.attempts + 1;
54 if attempts >= MAX_RETRY_ATTEMPTS {
55 return None;
56 }
57 let duration = self.duration * 2;
58 Some(Self {
59 after: Utc::now().add(duration),
60 duration,
61 attempts,
62 status: None,
63 })
64 }
65
66 fn with_status(mut self, code: StatusCode) -> Self {
67 self.status = Some(code);
68 self
69 }
70}
71
72trait ToRetryState {
73 fn to_retry_state(&self) -> RetryState;
74}
75
76impl ToRetryState for ThrottleHandle {
77 fn to_retry_state(&self) -> RetryState {
78 let after = chrono::DateTime::from_timestamp_secs(self.throttled_until()).unwrap();
79 RetryState {
80 duration: after.sub(Utc::now()),
81 after,
82 attempts: 0,
83 status: None,
84 }
85 }
86}
87
88enum CrawlCheckResult {
89 Signal,
90 NoSignal,
91 Retry(RetryState),
92}
93
94impl From<RetryState> for CrawlCheckResult {
95 fn from(value: RetryState) -> Self {
96 Self::Retry(value)
97 }
98}
99
100fn is_throttle_worthy(e: &reqwest::Error) -> bool {
101 use std::error::Error;
102
103 if e.is_timeout() {
104 return true;
105 }
106
107 let mut src = e.source();
108 while let Some(s) = src {
109 if let Some(io_err) = s.downcast_ref::<std::io::Error>() {
110 if is_tls_cert_error(io_err) {
111 return true;
112 }
113 }
114 src = s.source();
115 }
116
117 e.status().map_or(false, |s| {
118 matches!(
119 s,
120 StatusCode::BAD_GATEWAY
121 | StatusCode::SERVICE_UNAVAILABLE
122 | StatusCode::GATEWAY_TIMEOUT
123 | crate::util::CONNECTION_TIMEOUT
124 | crate::util::SITE_FROZEN
125 )
126 })
127}
128
129fn is_tls_cert_error(io_err: &std::io::Error) -> bool {
130 let Some(inner) = io_err.get_ref() else {
131 return false;
132 };
133 if let Some(rustls_err) = inner.downcast_ref::<rustls::Error>() {
134 return matches!(rustls_err, rustls::Error::InvalidCertificate(_));
135 }
136 if let Some(nested_io) = inner.downcast_ref::<std::io::Error>() {
137 return is_tls_cert_error(nested_io);
138 }
139 false
140}
141
142const CURSOR_KEY: &[u8] = b"crawler_cursor";
143
144#[derive(Debug, Serialize, Deserialize)]
145enum Cursor {
146 Done(SmolStr),
147 Next(Option<SmolStr>),
148}
149
150pub mod throttle;
151use throttle::{OrFailure, Throttler};
152
153pub struct Crawler {
154 state: Arc<AppState>,
155 relay_host: Url,
156 http: reqwest::Client,
157 max_pending: usize,
158 resume_pending: usize,
159 count: AtomicUsize,
160 crawled_count: AtomicUsize,
161 throttled: AtomicBool,
162 pds_throttler: Throttler,
163}
164
165impl Crawler {
166 pub fn new(
167 state: Arc<AppState>,
168 relay_host: Url,
169 max_pending: usize,
170 resume_pending: usize,
171 ) -> Self {
172 let http = reqwest::Client::builder()
173 .user_agent(concat!(
174 env!("CARGO_PKG_NAME"),
175 "/",
176 env!("CARGO_PKG_VERSION")
177 ))
178 .gzip(true)
179 .build()
180 .expect("that reqwest will build");
181
182 Self {
183 state,
184 relay_host,
185 http,
186 max_pending,
187 resume_pending,
188 count: AtomicUsize::new(0),
189 crawled_count: AtomicUsize::new(0),
190 throttled: AtomicBool::new(false),
191 pds_throttler: Throttler::new(),
192 }
193 }
194
195 async fn get_cursor(&self) -> Result<Cursor> {
196 let cursor_bytes = Db::get(self.state.db.cursors.clone(), CURSOR_KEY).await?;
197 let cursor: Cursor = cursor_bytes
198 .as_deref()
199 .map(rmp_serde::from_slice)
200 .transpose()
201 .into_diagnostic()
202 .wrap_err("can't parse cursor")?
203 .unwrap_or(Cursor::Next(None));
204 Ok(cursor)
205 }
206
207 pub async fn run(self) -> Result<()> {
208 let crawler = Arc::new(self);
209
210 // stats ticker
211 let ticker = tokio::spawn({
212 use std::time::Instant;
213 let crawler = crawler.clone();
214 let mut last_time = Instant::now();
215 let mut interval = tokio::time::interval(Duration::from_secs(60));
216 async move {
217 loop {
218 interval.tick().await;
219 let delta_processed = crawler.count.swap(0, Ordering::Relaxed);
220 let delta_crawled = crawler.crawled_count.swap(0, Ordering::Relaxed);
221 let is_throttled = crawler.throttled.load(Ordering::Relaxed);
222
223 crawler.pds_throttler.evict_clean().await;
224
225 if delta_processed == 0 && delta_crawled == 0 {
226 if is_throttled {
227 info!("throttled: pending queue full");
228 } else {
229 info!("idle: no repos crawled or processed in 60s");
230 }
231 continue;
232 }
233
234 let elapsed = last_time.elapsed().as_secs_f64();
235 let cursor = Self::get_cursor(&crawler).await.map_or_else(
236 |e| e.to_smolstr(),
237 |c| match c {
238 Cursor::Done(c) => format_smolstr!("done({c})"),
239 Cursor::Next(None) => "none".to_smolstr(),
240 Cursor::Next(Some(c)) => c.to_smolstr(),
241 },
242 );
243 info!(
244 %cursor,
245 processed = delta_processed,
246 crawled = delta_crawled,
247 elapsed,
248 "progress"
249 );
250 last_time = Instant::now();
251 }
252 }
253 });
254 tokio::spawn(async move {
255 let Err(e) = ticker.await;
256 error!(err = ?e, "stats ticker panicked, aborting");
257 std::process::abort();
258 });
259
260 // retry thread
261 std::thread::spawn({
262 let crawler = crawler.clone();
263 let handle = tokio::runtime::Handle::current();
264 move || {
265 use std::thread::sleep;
266
267 let _g = handle.enter();
268
269 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
270 loop {
271 match crawler.process_retry_queue() {
272 Ok(Some(dur)) => sleep(dur.max(Duration::from_secs(1))),
273 Ok(None) => sleep(Duration::from_secs(60)),
274 Err(e) => {
275 error!(err = %e, "retry loop failed");
276 sleep(Duration::from_secs(60));
277 }
278 }
279 }
280 }));
281 if result.is_err() {
282 error!("retry thread panicked, aborting");
283 std::process::abort();
284 }
285 }
286 });
287
288 loop {
289 if let Err(e) = Self::crawl(crawler.clone()).await {
290 error!(err = ?e, "fatal error, restarting in 30s");
291 tokio::time::sleep(Duration::from_secs(30)).await;
292 }
293 }
294 }
295
296 async fn crawl(crawler: Arc<Self>) -> Result<()> {
297 let mut relay_url = crawler.relay_host.clone();
298 match relay_url.scheme() {
299 "wss" => relay_url
300 .set_scheme("https")
301 .map_err(|_| miette::miette!("invalid url: {relay_url}"))?,
302 "ws" => relay_url
303 .set_scheme("http")
304 .map_err(|_| miette::miette!("invalid url: {relay_url}"))?,
305 _ => {}
306 }
307
308 let mut rng: SmallRng = rand::make_rng();
309 let db = &crawler.state.db;
310
311 let mut cursor = crawler.get_cursor().await?;
312
313 match &cursor {
314 Cursor::Next(Some(c)) => info!(cursor = %c, "resuming"),
315 Cursor::Next(None) => info!("starting from scratch"),
316 Cursor::Done(c) => info!(cursor = %c, "was done, resuming"),
317 }
318
319 let mut was_throttled = false;
320 loop {
321 // throttle check
322 loop {
323 let pending = crawler.state.db.get_count("pending").await;
324 if pending > crawler.max_pending as u64 {
325 if !was_throttled {
326 debug!(
327 pending,
328 max = crawler.max_pending,
329 "throttling: above max pending"
330 );
331 was_throttled = true;
332 crawler.throttled.store(true, Ordering::Relaxed);
333 }
334 tokio::time::sleep(Duration::from_secs(5)).await;
335 } else if pending > crawler.resume_pending as u64 {
336 if !was_throttled {
337 debug!(
338 pending,
339 resume = crawler.resume_pending,
340 "throttling: entering cooldown"
341 );
342 was_throttled = true;
343 crawler.throttled.store(true, Ordering::Relaxed);
344 }
345
346 loop {
347 let current_pending = crawler.state.db.get_count("pending").await;
348 if current_pending <= crawler.resume_pending as u64 {
349 break;
350 }
351 debug!(
352 pending = current_pending,
353 resume = crawler.resume_pending,
354 "cooldown, waiting"
355 );
356 tokio::time::sleep(Duration::from_secs(5)).await;
357 }
358 break;
359 } else {
360 if was_throttled {
361 info!("throttling released");
362 was_throttled = false;
363 crawler.throttled.store(false, Ordering::Relaxed);
364 }
365 break;
366 }
367 }
368
369 let mut list_repos_url = relay_url
370 .join("/xrpc/com.atproto.sync.listRepos")
371 .into_diagnostic()?;
372 list_repos_url
373 .query_pairs_mut()
374 .append_pair("limit", "1000");
375 if let Cursor::Next(Some(c)) = &cursor {
376 list_repos_url
377 .query_pairs_mut()
378 .append_pair("cursor", c.as_str());
379 }
380
381 let fetch_result = (|| {
382 crawler
383 .http
384 .get(list_repos_url.clone())
385 .send()
386 .error_for_status()
387 })
388 .retry(5, |e: &reqwest::Error, attempt| {
389 matches!(e.status(), Some(StatusCode::TOO_MANY_REQUESTS))
390 .then(|| Duration::from_secs(1 << attempt.min(5)))
391 })
392 .await;
393
394 let res = match fetch_result {
395 Ok(r) => r,
396 Err(RetryOutcome::Ratelimited) => {
397 warn!("rate limited by relay after retries");
398 continue;
399 }
400 Err(RetryOutcome::Failed(e)) => {
401 error!(err = %e, "crawler failed to fetch listRepos");
402 continue;
403 }
404 };
405
406 let bytes = match res.bytes().await {
407 Ok(b) => b,
408 Err(e) => {
409 error!(err = %e, "cant read listRepos response");
410 continue;
411 }
412 };
413
414 let mut batch = db.inner.batch();
415 let filter = crawler.state.filter.load();
416
417 struct ParseResult {
418 unknown_dids: Vec<Did<'static>>,
419 cursor: Option<smol_str::SmolStr>,
420 count: usize,
421 }
422
423 const BLOCKING_TASK_TIMEOUT: Duration = Duration::from_secs(30);
424
425 let parse_result = {
426 let repos = db.repos.clone();
427 let filter_ks = db.filter.clone();
428 let crawler_ks = db.crawler.clone();
429
430 // this wont actually cancel the task since spawn_blocking isnt cancel safe
431 // but at least we'll see whats going on?
432 tokio::time::timeout(
433 BLOCKING_TASK_TIMEOUT,
434 tokio::task::spawn_blocking(move || -> miette::Result<Option<ParseResult>> {
435 let output = match serde_json::from_slice::<ListReposOutput>(&bytes) {
436 Ok(out) => out.into_static(),
437 Err(e) => {
438 error!(err = %e, "failed to parse listRepos response");
439 return Ok(None);
440 }
441 };
442
443 if output.repos.is_empty() {
444 return Ok(None);
445 }
446
447 let count = output.repos.len();
448 let next_cursor = output.cursor.map(|c| c.as_str().into());
449 let mut unknown = Vec::new();
450 for repo in output.repos {
451 let excl_key = crate::db::filter::exclude_key(repo.did.as_str())?;
452 if filter_ks.contains_key(&excl_key).into_diagnostic()? {
453 continue;
454 }
455
456 // already in retry queue — let the retry thread handle it
457 let retry_key = keys::crawler_retry_key(&repo.did);
458 if crawler_ks.contains_key(&retry_key).into_diagnostic()? {
459 continue;
460 }
461
462 let did_key = keys::repo_key(&repo.did);
463 if !repos.contains_key(&did_key).into_diagnostic()? {
464 unknown.push(repo.did.into_static());
465 }
466 }
467
468 Ok(Some(ParseResult {
469 unknown_dids: unknown,
470 cursor: next_cursor,
471 count,
472 }))
473 }),
474 )
475 .await
476 }
477 .into_diagnostic()?
478 .map_err(|_| {
479 error!(
480 "spawn_blocking task for parsing listRepos timed out after {}",
481 BLOCKING_TASK_TIMEOUT.as_secs()
482 );
483 miette::miette!("spawn_blocking task for parsing listRepos timed out")
484 })?;
485
486 let ParseResult {
487 unknown_dids,
488 cursor: next_cursor,
489 count,
490 } = match parse_result {
491 Ok(Some(res)) => res,
492 Ok(None) => {
493 info!("finished enumeration (or empty page)");
494 if let Cursor::Next(Some(c)) = cursor {
495 info!("reached end of list.");
496 cursor = Cursor::Done(c);
497 }
498 info!("sleeping 1h before next enumeration pass");
499 tokio::time::sleep(Duration::from_secs(3600)).await;
500 info!("resuming after 1h sleep");
501 continue;
502 }
503 Err(e) => return Err(e).wrap_err("error while crawling"),
504 };
505
506 debug!(count, "fetched repos");
507 crawler.crawled_count.fetch_add(count, Ordering::Relaxed);
508
509 let valid_dids = if filter.check_signals() && !unknown_dids.is_empty() {
510 // we dont need to pass any existing since we have none; we are crawling after all
511 crawler
512 .check_signals_batch(&unknown_dids, &filter, &mut batch, &HashMap::new())
513 .await?
514 } else {
515 unknown_dids
516 };
517
518 for did in &valid_dids {
519 let did_key = keys::repo_key(did);
520 trace!(did = %did, "found new repo");
521
522 let state = RepoState::untracked(rng.next_u64());
523 batch.insert(&db.repos, &did_key, ser_repo_state(&state)?);
524 batch.insert(&db.pending, keys::pending_key(state.index_id), &did_key);
525 }
526
527 if let Some(new_cursor) = next_cursor {
528 cursor = Cursor::Next(Some(new_cursor.as_str().into()));
529 } else if let Cursor::Next(Some(c)) = cursor {
530 info!("reached end of list.");
531 cursor = Cursor::Done(c);
532 }
533 batch.insert(
534 &db.cursors,
535 CURSOR_KEY,
536 rmp_serde::to_vec(&cursor)
537 .into_diagnostic()
538 .wrap_err("cant serialize cursor")?,
539 );
540
541 tokio::time::timeout(
542 BLOCKING_TASK_TIMEOUT,
543 tokio::task::spawn_blocking(move || batch.commit().into_diagnostic()),
544 )
545 .await
546 .into_diagnostic()?
547 .map_err(|_| {
548 error!(
549 "spawn_blocking task for batch commit timed out after {}",
550 BLOCKING_TASK_TIMEOUT.as_secs()
551 );
552 miette::miette!("spawn_blocking task for batch commit timed out")
553 })?
554 .inspect_err(|e| {
555 error!(err = ?e, "batch commit failed");
556 })
557 .ok();
558
559 crawler.account_new_repos(valid_dids.len()).await;
560
561 if matches!(cursor, Cursor::Done(_)) {
562 info!("enumeration complete, sleeping 1h before next pass");
563 tokio::time::sleep(Duration::from_secs(3600)).await;
564 info!("resuming after 1h sleep");
565 }
566 }
567 }
568
569 fn process_retry_queue(&self) -> Result<Option<Duration>> {
570 let db = &self.state.db;
571 let now = Utc::now();
572
573 let mut ready: Vec<Did> = Vec::new();
574 let mut existing: HashMap<Did<'static>, RetryState> = HashMap::new();
575 let mut next_wake: Option<Duration> = None;
576 let mut had_more = false;
577
578 let mut rng: SmallRng = rand::make_rng();
579
580 let mut batch = db.inner.batch();
581 for guard in db.crawler.prefix(keys::CRAWLER_RETRY_PREFIX) {
582 let (key, val) = guard.into_inner().into_diagnostic()?;
583 let state: RetryState = rmp_serde::from_slice(&val).into_diagnostic()?;
584 let did = keys::crawler_retry_parse_key(&key)?.to_did();
585
586 // leave capped entries alone for API inspection
587 if state.attempts >= MAX_RETRY_ATTEMPTS {
588 continue;
589 }
590
591 let backoff = TimeDelta::seconds(
592 state
593 .duration
594 .as_seconds_f64()
595 .mul(rng.random_range(0.01..0.07)) as i64,
596 );
597 if state.after + backoff > now {
598 let wake = (state.after - now).to_std().unwrap_or(Duration::ZERO);
599 next_wake = Some(next_wake.map(|w| w.min(wake)).unwrap_or(wake));
600 continue;
601 }
602
603 if ready.len() >= MAX_RETRY_BATCH {
604 had_more = true;
605 break;
606 }
607
608 ready.push(did.clone());
609 existing.insert(did, state);
610 }
611
612 if ready.is_empty() {
613 return Ok(next_wake);
614 }
615
616 debug!(count = ready.len(), "retrying pending repos");
617
618 let handle = tokio::runtime::Handle::current();
619 let filter = self.state.filter.load();
620 let valid_dids =
621 handle.block_on(self.check_signals_batch(&ready, &filter, &mut batch, &existing))?;
622
623 let mut rng: SmallRng = rand::make_rng();
624 for did in &valid_dids {
625 let did_key = keys::repo_key(did);
626
627 if db.repos.contains_key(&did_key).into_diagnostic()? {
628 continue;
629 }
630
631 let state = RepoState::untracked(rng.next_u64());
632 batch.insert(&db.repos, &did_key, ser_repo_state(&state)?);
633 batch.insert(&db.pending, keys::pending_key(state.index_id), &did_key);
634 }
635
636 batch.commit().into_diagnostic()?;
637
638 if !valid_dids.is_empty() {
639 info!(count = valid_dids.len(), "recovered from retry queue");
640 handle.block_on(self.account_new_repos(valid_dids.len()));
641 }
642
643 // if we hit the batch cap there are more ready entries, loop back immediately
644 Ok(had_more.then_some(Duration::ZERO).or(next_wake))
645 }
646
647 fn check_repo_signals(
648 &self,
649 filter: Arc<crate::filter::FilterConfig>,
650 did: Did<'static>,
651 ) -> impl Future<Output = (Did<'static>, CrawlCheckResult)> + Send + 'static {
652 let resolver = self.state.resolver.clone();
653 let http = self.http.clone();
654 let throttler = self.pds_throttler.clone();
655 async move {
656 const MAX_RETRIES: u32 = 5;
657
658 let pds_url = (|| resolver.resolve_identity_info(&did))
659 .retry(MAX_RETRIES, |e, attempt| {
660 matches!(e, crate::resolver::ResolverError::Ratelimited)
661 .then(|| Duration::from_secs(1 << attempt.min(5)))
662 })
663 .await;
664
665 let pds_url = match pds_url {
666 Ok((url, _)) => url,
667 Err(RetryOutcome::Ratelimited) => {
668 error!(
669 retries = MAX_RETRIES,
670 "rate limited resolving identity, giving up"
671 );
672 // no pds handle to read retry_after from; use a short default
673 return (did, RetryState::new(60).into());
674 }
675 Err(RetryOutcome::Failed(e)) => {
676 error!(err = %e, "failed to resolve identity");
677 return (did, RetryState::new(60).into());
678 }
679 };
680
681 let throttle = throttler.get_handle(&pds_url).await;
682 if throttle.is_throttled() {
683 trace!(host = pds_url.host_str(), "skipping throttled pds");
684 return (did, throttle.to_retry_state().into());
685 }
686
687 let _permit = throttle.acquire().unit_error().or_failure(&throttle, || ());
688 let Ok(_permit) = _permit.await else {
689 trace!(
690 host = pds_url.host_str(),
691 "pds failed while waiting for permit"
692 );
693 return (did, throttle.to_retry_state().into());
694 };
695
696 enum RequestError {
697 Reqwest(reqwest::Error),
698 RateLimited(Option<u64>),
699 /// hard failure notification from another task on this PDS
700 Throttled,
701 }
702
703 let mut describe_url = pds_url.join("/xrpc/com.atproto.repo.describeRepo").unwrap();
704 describe_url.query_pairs_mut().append_pair("repo", &did);
705
706 let resp = async {
707 let resp = http
708 .get(describe_url)
709 .timeout(throttle.timeout())
710 .send()
711 .await
712 .map_err(RequestError::Reqwest)?;
713
714 // dont retry ratelimits since we will just put it in a queue to be tried again later
715 if resp.status() == StatusCode::TOO_MANY_REQUESTS {
716 return Err(RequestError::RateLimited(parse_retry_after(&resp)));
717 }
718
719 resp.error_for_status().map_err(RequestError::Reqwest)
720 }
721 .or_failure(&throttle, || RequestError::Throttled)
722 .await;
723
724 let resp = match resp {
725 Ok(r) => {
726 throttle.record_success();
727 r
728 }
729 Err(RequestError::RateLimited(secs)) => {
730 throttle.record_ratelimit(secs);
731 return (
732 did,
733 throttle
734 .to_retry_state()
735 .with_status(StatusCode::TOO_MANY_REQUESTS)
736 .into(),
737 );
738 }
739 Err(RequestError::Throttled) => {
740 return (did, throttle.to_retry_state().into());
741 }
742 Err(RequestError::Reqwest(e)) => {
743 if e.is_timeout() && !throttle.record_timeout() {
744 // first or second timeout, just requeue
745 let mut retry_state = RetryState::new(60);
746 retry_state.status = e.status();
747 return (did, retry_state.into());
748 }
749 // third timeout, if timeout fail is_throttle_worthy will ban the pds
750
751 if is_throttle_worthy(&e) {
752 if let Some(mins) = throttle.record_failure() {
753 warn!(url = %pds_url, mins, "throttling pds due to hard failure");
754 }
755 let mut retry_state = throttle.to_retry_state();
756 retry_state.status = e.status();
757 return (did, retry_state.into());
758 }
759
760 match e.status() {
761 Some(StatusCode::NOT_FOUND | StatusCode::GONE) => {
762 trace!("repo not found");
763 return (did, CrawlCheckResult::NoSignal);
764 }
765 Some(s) if s.is_client_error() => {
766 error!(status = %s, "repo unavailable");
767 return (did, CrawlCheckResult::NoSignal);
768 }
769 _ => {
770 error!(err = %e, "repo errored");
771 let mut retry_state = RetryState::new(60 * 15);
772 retry_state.status = e.status();
773 return (did, retry_state.into());
774 }
775 }
776 }
777 };
778
779 let bytes = match resp.bytes().await {
780 Ok(b) => b,
781 Err(e) => {
782 error!(err = %e, "failed to read describeRepo response");
783 return (did, RetryState::new(60 * 5).into());
784 }
785 };
786
787 let out = match serde_json::from_slice::<DescribeRepoOutput>(&bytes) {
788 Ok(out) => out,
789 Err(e) => {
790 error!(err = %e, "failed to parse describeRepo response");
791 return (did, RetryState::new(60 * 10).into());
792 }
793 };
794
795 let found_signal = out
796 .collections
797 .iter()
798 .any(|col| filter.matches_signal(col.as_str()));
799
800 if !found_signal {
801 trace!("no signal-matching collections found");
802 }
803
804 return (
805 did,
806 found_signal
807 .then_some(CrawlCheckResult::Signal)
808 .unwrap_or(CrawlCheckResult::NoSignal),
809 );
810 }
811 }
812
813 async fn check_signals_batch(
814 &self,
815 dids: &[Did<'static>],
816 filter: &Arc<crate::filter::FilterConfig>,
817 batch: &mut fjall::OwnedWriteBatch,
818 existing: &HashMap<Did<'static>, RetryState>,
819 ) -> Result<Vec<Did<'static>>> {
820 let db = &self.state.db;
821 let mut valid = Vec::new();
822 let mut set = tokio::task::JoinSet::new();
823
824 for did in dids {
825 let did = did.clone();
826 let filter = filter.clone();
827 let span = tracing::info_span!("signals", did = %did);
828 set.spawn(self.check_repo_signals(filter, did).instrument(span));
829 }
830
831 while let Some(res) = tokio::time::timeout(Duration::from_secs(60), set.join_next())
832 .await
833 .into_diagnostic()
834 .map_err(|_| {
835 error!("signal check task timed out after 60s");
836 miette::miette!("signal check task timed out")
837 })?
838 {
839 let (did, result) = match res {
840 Ok(inner) => inner,
841 Err(e) => {
842 error!(err = ?e, "signal check task failed or panicked");
843 continue;
844 }
845 };
846
847 match result {
848 CrawlCheckResult::Signal => {
849 batch.remove(&db.crawler, keys::crawler_retry_key(&did));
850 valid.push(did);
851 }
852 CrawlCheckResult::NoSignal => {
853 batch.remove(&db.crawler, keys::crawler_retry_key(&did));
854 }
855 CrawlCheckResult::Retry(state) => {
856 let prev_attempts = existing.get(&did).map(|s| s.attempts).unwrap_or(0);
857 let carried = RetryState {
858 attempts: prev_attempts,
859 ..state
860 };
861 let next = match carried.next_attempt() {
862 Some(next) => next,
863 None => RetryState {
864 attempts: MAX_RETRY_ATTEMPTS,
865 ..state
866 },
867 };
868 batch.insert(
869 &db.crawler,
870 keys::crawler_retry_key(&did),
871 rmp_serde::to_vec(&next).into_diagnostic()?,
872 );
873 }
874 }
875 }
876
877 Ok(valid)
878 }
879
880 async fn account_new_repos(&self, count: usize) {
881 if count == 0 {
882 return;
883 }
884
885 self.count.fetch_add(count, Ordering::Relaxed);
886 self.state
887 .db
888 .update_count_async("repos", count as i64)
889 .await;
890 self.state
891 .db
892 .update_count_async("pending", count as i64)
893 .await;
894 self.state.notify_backfill();
895 }
896}