tangled
alpha
login
or
join now
ptr.pet
/
hydrant
27
fork
atom
at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
27
fork
atom
overview
issues
6
pulls
pipelines
[crawler] limit concurrency per pds, implement smarter retries
ptr.pet
1 week ago
218ce852
571eb26d
verified
This commit was signed with the committer's
known signature
.
ptr.pet
SSH Key Fingerprint:
SHA256:Abmvag+juovVufZTxyWY8KcVgrznxvBjQpJesv071Aw=
+512
-325
7 changed files
expand all
collapse all
unified
split
src
crawler
ban.rs
mod.rs
throttle.rs
db
keys.rs
filter.rs
ingest
worker.rs
types.rs
-117
src/crawler/ban.rs
···
1
-
use scc::HashMap;
2
-
use std::future::Future;
3
-
use std::sync::Arc;
4
-
use std::sync::atomic::{AtomicI64, AtomicUsize, Ordering};
5
-
use tokio::sync::Notify;
6
-
use url::Url;
7
-
8
-
#[derive(Clone)]
9
-
pub struct BanTracker {
10
-
states: Arc<HashMap<Url, Arc<State>>>,
11
-
}
12
-
13
-
impl BanTracker {
14
-
pub fn new() -> Self {
15
-
Self {
16
-
states: Arc::new(HashMap::new()),
17
-
}
18
-
}
19
-
20
-
pub fn get_handle(&self, url: &Url) -> BanHandle {
21
-
let state = self
22
-
.states
23
-
.entry_sync(url.clone())
24
-
.or_insert_with(|| {
25
-
Arc::new(State {
26
-
banned_until: AtomicI64::new(0),
27
-
consecutive_failures: AtomicUsize::new(0),
28
-
ban_notify: Notify::new(),
29
-
})
30
-
})
31
-
.get()
32
-
.clone();
33
-
34
-
BanHandle { state }
35
-
}
36
-
}
37
-
38
-
struct State {
39
-
banned_until: AtomicI64,
40
-
consecutive_failures: AtomicUsize,
41
-
ban_notify: Notify,
42
-
}
43
-
44
-
pub struct BanHandle {
45
-
state: Arc<State>,
46
-
}
47
-
48
-
impl BanHandle {
49
-
pub fn is_banned(&self) -> bool {
50
-
let until = self.state.banned_until.load(Ordering::Acquire);
51
-
if until == 0 {
52
-
return false;
53
-
}
54
-
let now = chrono::Utc::now().timestamp();
55
-
now < until
56
-
}
57
-
58
-
pub fn record_success(&self) {
59
-
self.state.consecutive_failures.store(0, Ordering::Release);
60
-
self.state.banned_until.store(0, Ordering::Release);
61
-
}
62
-
63
-
// returns the amount of minutes banned if its a new ban
64
-
pub fn record_failure(&self) -> Option<i64> {
65
-
if self.is_banned() {
66
-
return None;
67
-
}
68
-
69
-
let failures = self
70
-
.state
71
-
.consecutive_failures
72
-
.fetch_add(1, Ordering::AcqRel)
73
-
+ 1;
74
-
75
-
// start with 30 minutes, double each consecutive failure
76
-
let base_minutes = 30;
77
-
let exponent = (failures as u32).saturating_sub(1);
78
-
let minutes = base_minutes * 2i64.pow(exponent.min(10));
79
-
let now = chrono::Utc::now().timestamp();
80
-
81
-
self.state
82
-
.banned_until
83
-
.store(now + minutes * 60, Ordering::Release);
84
-
85
-
self.state.ban_notify.notify_waiters();
86
-
87
-
Some(minutes)
88
-
}
89
-
90
-
pub async fn wait_for_ban(&self) {
91
-
loop {
92
-
let notified = self.state.ban_notify.notified();
93
-
if self.is_banned() {
94
-
return;
95
-
}
96
-
notified.await;
97
-
}
98
-
}
99
-
}
100
-
101
-
/// extension trait that adds `.or_ban()` to any future returning `Result<T, E>`.
102
-
#[allow(async_fn_in_trait)]
103
-
pub trait OrBan<T, E>: Future<Output = Result<T, E>> {
104
-
/// races the future against a ban notification.
105
-
/// if the pds is banned before the future completes, returns `on_ban()` immediately.
106
-
async fn or_ban(self, handle: &BanHandle, on_ban: impl FnOnce() -> E) -> Result<T, E>
107
-
where
108
-
Self: Sized,
109
-
{
110
-
tokio::select! {
111
-
res = self => res,
112
-
_ = handle.wait_for_ban() => Err(on_ban()),
113
-
}
114
-
}
115
-
}
116
-
117
-
impl<T, E, F: Future<Output = Result<T, E>>> OrBan<T, E> for F {}
···
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
+308
-201
src/crawler/mod.rs
···
1
-
use crate::db::types::TrimmedDid;
2
use crate::db::{Db, keys, ser_repo_state};
3
use crate::state::AppState;
4
use crate::types::RepoState;
5
-
use futures::TryFutureExt;
6
use jacquard_api::com_atproto::repo::list_records::ListRecordsOutput;
7
use jacquard_api::com_atproto::sync::list_repos::ListReposOutput;
8
use jacquard_common::{IntoStatic, types::string::Did};
···
13
use reqwest::StatusCode;
14
use smol_str::SmolStr;
15
use std::future::Future;
0
16
use std::sync::Arc;
17
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
18
use std::time::Duration;
···
22
enum CrawlCheckResult {
23
Signal,
24
NoSignal,
25
-
Ratelimited,
26
-
Failed(Option<u16>),
0
0
0
0
27
}
28
29
-
/// outcome of [`RetryWithBackoff::retry_with_backoff`] when the operation does not succeed.
30
enum RetryOutcome<E> {
31
/// ratelimited after exhausting all retries
32
Ratelimited,
···
34
Failed(E),
35
}
36
37
-
/// extension trait that adds `retry_with_backoff` to async `FnMut` closures.
0
0
0
0
38
trait RetryWithBackoff<T, E, Fut>: FnMut() -> Fut
39
where
40
Fut: Future<Output = Result<T, E>>,
···
42
async fn retry(
43
&mut self,
44
max_retries: u32,
45
-
is_ratelimited: impl Fn(&E) -> bool,
46
) -> Result<T, RetryOutcome<E>> {
47
let mut attempt = 0u32;
48
loop {
49
match self().await {
50
Ok(val) => return Ok(val),
51
-
Err(e) if is_ratelimited(&e) => {
52
-
if attempt >= max_retries {
53
-
return Err(RetryOutcome::Ratelimited);
0
0
0
54
}
55
-
let base = Duration::from_secs(1 << attempt);
56
-
let jitter = Duration::from_millis(rand::rng().random_range(0..2000));
57
-
tokio::time::sleep(base + jitter).await;
58
-
attempt += 1;
59
-
}
60
-
Err(e) => return Err(RetryOutcome::Failed(e)),
61
}
62
}
63
}
···
82
83
impl<F: Future<Output = Result<reqwest::Response, reqwest::Error>>> ErrorForStatus for F {}
84
85
-
// these two are cloudflare specific
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
86
const CONNECTION_TIMEOUT: StatusCode = unsafe {
87
match StatusCode::from_u16(522) {
88
Ok(s) => s,
89
-
_ => std::hint::unreachable_unchecked(), // status code is valid
90
}
91
};
92
const SITE_FROZEN: StatusCode = unsafe {
93
match StatusCode::from_u16(530) {
94
Ok(s) => s,
95
-
_ => std::hint::unreachable_unchecked(), // status code is valid
96
}
97
};
98
99
-
// we ban on:
100
-
// - timeouts
101
-
// - tls cert errors
102
-
// - bad gateway / gateway timeout, service unavailable, 522 and 530
103
-
fn is_ban_worthy(e: &reqwest::Error) -> bool {
104
use std::error::Error;
105
106
if e.is_timeout() {
···
147
resolver: crate::resolver::Resolver,
148
filter: Arc<crate::filter::FilterConfig>,
149
did: Did<'static>,
150
-
tracker: Arc<BanTracker>,
151
) -> (Did<'static>, CrawlCheckResult) {
152
const MAX_RETRIES: u32 = 5;
153
154
let pds_url = (|| resolver.resolve_identity_info(&did))
155
-
.retry(MAX_RETRIES, |e| {
156
matches!(e, crate::resolver::ResolverError::Ratelimited)
0
157
})
158
.await;
0
159
let pds_url = match pds_url {
160
Ok((url, _)) => url,
161
Err(RetryOutcome::Ratelimited) => {
···
163
retries = MAX_RETRIES,
164
"rate limited resolving identity, giving up"
165
);
166
-
return (did, CrawlCheckResult::Ratelimited);
0
0
0
0
0
0
0
0
167
}
168
Err(RetryOutcome::Failed(e)) => {
169
error!(err = %e, "failed to resolve identity");
170
-
return (did, CrawlCheckResult::Failed(None));
0
0
0
0
0
0
0
171
}
172
};
173
174
-
let pds_handle = tracker.get_handle(&pds_url);
175
-
if pds_handle.is_banned() {
176
-
trace!(host = pds_url.host_str(), "skipping banned pds");
177
-
return (did, CrawlCheckResult::Failed(None));
0
0
0
0
0
0
178
}
179
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
180
enum RequestError {
181
Reqwest(reqwest::Error),
182
-
Banned,
0
0
183
}
184
185
let mut found_signal = false;
···
192
.append_pair("collection", signal)
193
.append_pair("limit", "1");
194
195
-
let res = (|| http.get(list_records_url.clone())
0
0
196
.send()
197
-
.error_for_status()
198
-
.map_err(RequestError::Reqwest)
199
-
.or_ban(&pds_handle, || RequestError::Banned))
200
-
.retry(MAX_RETRIES, |e: &RequestError| {
201
-
matches!(e, RequestError::Reqwest(e) if matches!(e.status(), Some(StatusCode::TOO_MANY_REQUESTS)))
202
-
})
203
-
.await;
204
-
let res = match res {
0
0
0
0
0
0
205
Ok(r) => {
206
-
pds_handle.record_success();
207
r
208
}
209
-
Err(RetryOutcome::Ratelimited) => {
210
-
warn!(
211
-
retries = MAX_RETRIES,
212
-
"rate limited on listRecords, giving up"
213
-
);
214
-
return CrawlCheckResult::Ratelimited;
215
}
216
-
Err(RetryOutcome::Failed(e)) => match e {
217
-
RequestError::Banned => return CrawlCheckResult::Failed(None),
218
-
RequestError::Reqwest(e) => {
219
-
if is_ban_worthy(&e) {
220
-
if let Some(mins) = pds_handle.record_failure() {
221
-
tracing::warn!(url = %pds_url, mins, "banned pds");
222
-
}
223
-
return CrawlCheckResult::Failed(e.status().map(|s| s.as_u16()));
0
0
224
}
0
0
0
0
0
225
226
-
match e.status() {
227
-
Some(StatusCode::NOT_FOUND | StatusCode::GONE) => {
228
-
trace!("repo not found");
229
-
}
230
-
Some(s) if s.is_client_error() => {
231
-
error!(status = %s, "repo unavailable");
232
-
}
233
-
_ => {
234
-
error!(err = %e, "listRecords failed");
235
-
return CrawlCheckResult::Failed(e.status().map(|s| s.as_u16()));
236
-
}
237
}
238
-
return CrawlCheckResult::NoSignal;
0
0
0
0
0
0
0
0
0
0
239
}
240
-
},
241
};
242
243
-
let bytes = match res.bytes().await {
244
Ok(b) => b,
245
Err(e) => {
246
error!(err = %e, "failed to read listRecords response");
247
-
return CrawlCheckResult::Failed(None);
0
0
0
248
}
249
};
250
251
match serde_json::from_slice::<ListRecordsOutput>(&bytes) {
252
-
Ok(out) => {
253
-
if !out.records.is_empty() {
254
-
return CrawlCheckResult::Signal;
255
-
}
256
-
}
257
Err(e) => {
258
error!(err = %e, "failed to parse listRecords response");
259
-
return CrawlCheckResult::Failed(None);
0
0
0
260
}
261
}
262
263
CrawlCheckResult::NoSignal
264
}
265
-
.instrument(tracing::info_span!("signal_check", signal = %signal))
266
.await;
267
268
match res {
···
270
found_signal = true;
271
break;
272
}
273
-
CrawlCheckResult::NoSignal => {
274
-
continue;
275
-
}
276
-
other => {
277
-
return (did, other);
278
-
}
279
}
280
}
281
···
291
)
292
}
293
294
-
pub mod ban;
295
-
use ban::{BanTracker, OrBan};
296
297
pub struct Crawler {
298
state: Arc<AppState>,
···
303
count: Arc<AtomicUsize>,
304
crawled_count: Arc<AtomicUsize>,
305
throttled: Arc<AtomicBool>,
306
-
tracker: Arc<BanTracker>,
307
}
308
309
impl Crawler {
···
334
count: Arc::new(AtomicUsize::new(0)),
335
crawled_count: Arc::new(AtomicUsize::new(0)),
336
throttled: Arc::new(AtomicBool::new(false)),
337
-
tracker: Arc::new(BanTracker::new()),
338
}
339
}
340
341
pub async fn run(self) -> Result<()> {
0
342
tokio::spawn({
343
use std::time::Instant;
344
let count = self.count.clone();
345
let crawled_count = self.crawled_count.clone();
346
let throttled = self.throttled.clone();
0
347
let mut last_time = Instant::now();
348
let mut interval = tokio::time::interval(Duration::from_secs(60));
349
async move {
···
352
let delta_processed = count.swap(0, Ordering::Relaxed);
353
let delta_crawled = crawled_count.swap(0, Ordering::Relaxed);
354
let is_throttled = throttled.load(Ordering::Relaxed);
0
0
355
356
if delta_processed == 0 && delta_crawled == 0 {
357
if is_throttled {
···
374
}
375
});
376
377
-
let mut relay_url = self.relay_host.clone();
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
378
match relay_url.scheme() {
379
"wss" => relay_url
380
.set_scheme("https")
···
386
}
387
388
let mut rng: SmallRng = rand::make_rng();
389
-
390
-
let db = &self.state.db;
391
392
-
// 1. load cursor
393
let cursor_key = b"crawler_cursor";
394
let mut cursor: Option<SmolStr> = Db::get(db.cursors.clone(), cursor_key.to_vec())
395
.await?
···
401
let mut was_throttled = false;
402
403
loop {
404
-
// check throttling
405
loop {
406
-
let pending = self.state.db.get_count("pending").await;
407
-
if pending > self.max_pending as u64 {
408
if !was_throttled {
409
debug!(
410
pending,
411
-
max = self.max_pending,
412
"throttling: above max pending"
413
);
414
was_throttled = true;
415
-
self.throttled.store(true, Ordering::Relaxed);
416
}
417
tokio::time::sleep(Duration::from_secs(5)).await;
418
-
} else if pending > self.resume_pending as u64 {
419
if !was_throttled {
420
debug!(
421
pending,
422
-
resume = self.resume_pending,
423
"throttling: entering cooldown"
424
);
425
was_throttled = true;
426
-
self.throttled.store(true, Ordering::Relaxed);
427
}
428
429
loop {
430
-
let current_pending = self.state.db.get_count("pending").await;
431
-
if current_pending <= self.resume_pending as u64 {
432
break;
433
}
434
debug!(
435
pending = current_pending,
436
-
resume = self.resume_pending,
437
"cooldown, waiting"
438
);
439
tokio::time::sleep(Duration::from_secs(5)).await;
···
443
if was_throttled {
444
info!("throttling released");
445
was_throttled = false;
446
-
self.throttled.store(false, Ordering::Relaxed);
447
}
448
break;
449
}
450
}
451
452
-
// 2. fetch listrepos
453
let mut list_repos_url = relay_url
454
.join("/xrpc/com.atproto.sync.listRepos")
455
.into_diagnostic()?;
···
463
}
464
465
let fetch_result = (|| {
466
-
self.http
0
467
.get(list_repos_url.clone())
468
.send()
469
.error_for_status()
470
})
471
-
.retry(5, |e: &reqwest::Error| {
472
matches!(e.status(), Some(StatusCode::TOO_MANY_REQUESTS))
0
473
})
474
.await;
475
···
508
}
509
510
debug!(count = output.repos.len(), "fetched repos");
511
-
self.crawled_count
0
512
.fetch_add(output.repos.len(), Ordering::Relaxed);
513
514
let mut batch = db.inner.batch();
515
let mut to_queue = Vec::new();
516
-
let filter = self.state.filter.load();
517
-
// we can check whether or not to backfill repos faster if we only have to check
518
-
// certain known signals, since we can just listRecords for those signals
519
-
// if we have glob signals we cant do this since we dont know what signals to check
520
-
let check_signals = filter.mode == crate::filter::FilterMode::Filter
521
-
&& !filter.signals.is_empty()
522
-
&& !filter.has_glob_signals();
523
524
-
// 3. process repos
525
let mut unknown_dids = Vec::new();
526
for repo in output.repos {
527
let did_key = keys::repo_key(&repo.did);
···
531
continue;
532
}
533
534
-
// check if known
535
if !Db::contains_key(db.repos.clone(), &did_key).await? {
536
unknown_dids.push(repo.did.into_static());
537
}
538
}
539
540
-
let valid_dids = if check_signals && !unknown_dids.is_empty() {
541
-
self.check_signals_batch(&unknown_dids, &filter, &mut batch)
0
542
.await?
543
} else {
544
unknown_dids
···
548
let did_key = keys::repo_key(did);
549
trace!(did = %did, "found new repo");
550
551
-
let state = RepoState::backfilling_untracked(rng.next_u64());
552
batch.insert(&db.repos, &did_key, ser_repo_state(&state)?);
553
batch.insert(&db.pending, keys::pending_key(state.index_id), &did_key);
554
to_queue.push(did.clone());
555
}
556
557
-
// 4. update cursor
558
if let Some(new_cursor) = output.cursor {
559
cursor = Some(new_cursor.as_str().into());
560
-
561
batch.insert(
562
&db.cursors,
563
cursor_key.to_vec(),
564
new_cursor.as_bytes().to_vec(),
565
);
566
} else {
567
-
// end of pagination
568
info!("reached end of list.");
569
cursor = None;
570
}
···
573
.await
574
.into_diagnostic()??;
575
576
-
self.account_new_repos(to_queue.len()).await;
577
578
if cursor.is_none() {
579
-
// 6. retry previously failed repos before sleeping
580
-
self.retry_failed_repos(&mut rng).await?;
581
-
582
tokio::time::sleep(Duration::from_secs(3600)).await;
583
}
584
}
585
}
586
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
587
async fn check_signals_batch(
588
&self,
589
dids: &[Did<'static>],
···
599
let http = self.http.clone();
600
let resolver = self.state.resolver.clone();
601
let filter = filter.clone();
602
-
let tracker = self.tracker.clone();
603
-
let span = tracing::info_span!("check_signals", did = %did);
604
-
set.spawn(check_repo_signals(http, resolver, filter, did, tracker).instrument(span));
605
}
606
607
while let Some(res) = set.join_next().await {
608
let (did, result) = res.into_diagnostic()?;
609
match result {
610
CrawlCheckResult::Signal => {
611
-
batch.remove(&db.crawler, keys::crawler_failed_key(&did));
612
valid.push(did);
613
}
614
-
CrawlCheckResult::NoSignal => {
615
-
batch.remove(&db.crawler, keys::crawler_failed_key(&did));
616
-
}
617
-
CrawlCheckResult::Ratelimited => {
0
618
batch.insert(
619
&db.crawler,
620
-
keys::crawler_failed_key(&did),
621
-
429u16.to_be_bytes().as_ref(),
622
-
);
623
-
}
624
-
CrawlCheckResult::Failed(status) => {
625
-
let code = status.unwrap_or(0);
626
-
batch.insert(
627
-
&db.crawler,
628
-
keys::crawler_failed_key(&did),
629
-
code.to_be_bytes().as_ref(),
630
);
631
}
632
}
633
}
634
635
Ok(valid)
636
-
}
637
-
638
-
async fn retry_failed_repos(&self, rng: &mut SmallRng) -> Result<()> {
639
-
let db = &self.state.db;
640
-
let filter = self.state.filter.load();
641
-
642
-
let check_signals = filter.mode == crate::filter::FilterMode::Filter
643
-
&& !filter.signals.is_empty()
644
-
&& !filter.has_glob_signals();
645
-
646
-
if !check_signals {
647
-
return Ok(());
648
-
}
649
-
650
-
let mut failed_dids = Vec::new();
651
-
for guard in db.crawler.prefix(keys::CRAWLER_FAILED_PREFIX) {
652
-
let key = guard.key().into_diagnostic()?;
653
-
let did_bytes = &key[keys::CRAWLER_FAILED_PREFIX.len()..];
654
-
let trimmed = TrimmedDid::try_from(did_bytes)?;
655
-
failed_dids.push(trimmed.to_did());
656
-
}
657
-
658
-
if failed_dids.is_empty() {
659
-
return Ok(());
660
-
}
661
-
662
-
info!("retrying {} previously failed repos", failed_dids.len());
663
-
664
-
let mut batch = db.inner.batch();
665
-
let valid_dids = self
666
-
.check_signals_batch(&failed_dids, &filter, &mut batch)
667
-
.await?;
668
-
669
-
for did in &valid_dids {
670
-
let did_key = keys::repo_key(did);
671
-
672
-
if Db::contains_key(db.repos.clone(), &did_key).await? {
673
-
continue;
674
-
}
675
-
676
-
let state = RepoState::backfilling_untracked(rng.next_u64());
677
-
batch.insert(&db.repos, &did_key, ser_repo_state(&state)?);
678
-
batch.insert(&db.pending, keys::pending_key(state.index_id), &did_key);
679
-
}
680
-
681
-
tokio::task::spawn_blocking(move || batch.commit().into_diagnostic())
682
-
.await
683
-
.into_diagnostic()??;
684
-
685
-
if !valid_dids.is_empty() {
686
-
info!("recovered {} repos from failed retry", valid_dids.len());
687
-
self.account_new_repos(valid_dids.len()).await;
688
-
}
689
-
690
-
Ok(())
691
}
692
693
async fn account_new_repos(&self, count: usize) {
···
0
1
use crate::db::{Db, keys, ser_repo_state};
2
use crate::state::AppState;
3
use crate::types::RepoState;
4
+
use futures::FutureExt;
5
use jacquard_api::com_atproto::repo::list_records::ListRecordsOutput;
6
use jacquard_api::com_atproto::sync::list_repos::ListReposOutput;
7
use jacquard_common::{IntoStatic, types::string::Did};
···
12
use reqwest::StatusCode;
13
use smol_str::SmolStr;
14
use std::future::Future;
15
+
use std::ops::Mul;
16
use std::sync::Arc;
17
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
18
use std::time::Duration;
···
22
enum CrawlCheckResult {
23
Signal,
24
NoSignal,
25
+
/// task could not complete; should be retried at `retry_after` (unix timestamp).
26
+
/// `status` is the HTTP status that triggered this (0 for non-HTTP failures).
27
+
Retry {
28
+
retry_after: i64,
29
+
status: u16,
30
+
},
31
}
32
33
+
/// outcome of [`RetryWithBackoff::retry`] when the operation does not succeed.
34
enum RetryOutcome<E> {
35
/// ratelimited after exhausting all retries
36
Ratelimited,
···
38
Failed(E),
39
}
40
41
+
/// extension trait that adds `.retry()` to async `FnMut` closures.
42
+
///
43
+
/// `on_ratelimit` receives the error and current attempt number.
44
+
/// returning `Some(duration)` signals a transient failure and provides the backoff;
45
+
/// returning `None` signals a terminal failure.
46
trait RetryWithBackoff<T, E, Fut>: FnMut() -> Fut
47
where
48
Fut: Future<Output = Result<T, E>>,
···
50
async fn retry(
51
&mut self,
52
max_retries: u32,
53
+
on_ratelimit: impl Fn(&E, u32) -> Option<Duration>,
54
) -> Result<T, RetryOutcome<E>> {
55
let mut attempt = 0u32;
56
loop {
57
match self().await {
58
Ok(val) => return Ok(val),
59
+
Err(e) => match on_ratelimit(&e, attempt) {
60
+
Some(_) if attempt >= max_retries => return Err(RetryOutcome::Ratelimited),
61
+
Some(backoff) => {
62
+
let jitter = Duration::from_millis(rand::rng().random_range(0..2000));
63
+
tokio::time::sleep(backoff + jitter).await;
64
+
attempt += 1;
65
}
66
+
None => return Err(RetryOutcome::Failed(e)),
67
+
},
0
0
0
0
68
}
69
}
70
}
···
89
90
impl<F: Future<Output = Result<reqwest::Response, reqwest::Error>>> ErrorForStatus for F {}
91
92
+
/// extracts a retry delay in seconds from rate limit response headers.
93
+
///
94
+
/// checks in priority order:
95
+
/// - `retry-after: <seconds>` (relative)
96
+
/// - `ratelimit-reset: <unix timestamp>` (absolute) (ref pds sends this)
97
+
fn parse_retry_after(resp: &reqwest::Response) -> Option<u64> {
98
+
let headers = resp.headers();
99
+
100
+
let retry_after = headers
101
+
.get(reqwest::header::RETRY_AFTER)
102
+
.and_then(|v| v.to_str().ok())
103
+
.and_then(|s| s.parse::<u64>().ok());
104
+
105
+
let rate_limit_reset = headers
106
+
.get("ratelimit-reset")
107
+
.and_then(|v| v.to_str().ok())
108
+
.and_then(|s| s.parse::<i64>().ok())
109
+
.map(|ts| {
110
+
let now = chrono::Utc::now().timestamp();
111
+
(ts - now).max(1) as u64
112
+
});
113
+
114
+
retry_after.or(rate_limit_reset)
115
+
}
116
+
117
+
// cloudflare-specific status codes
118
const CONNECTION_TIMEOUT: StatusCode = unsafe {
119
match StatusCode::from_u16(522) {
120
Ok(s) => s,
121
+
_ => std::hint::unreachable_unchecked(),
122
}
123
};
124
const SITE_FROZEN: StatusCode = unsafe {
125
match StatusCode::from_u16(530) {
126
Ok(s) => s,
127
+
_ => std::hint::unreachable_unchecked(),
128
}
129
};
130
131
+
fn is_throttle_worthy(e: &reqwest::Error) -> bool {
0
0
0
0
132
use std::error::Error;
133
134
if e.is_timeout() {
···
175
resolver: crate::resolver::Resolver,
176
filter: Arc<crate::filter::FilterConfig>,
177
did: Did<'static>,
178
+
throttler: Arc<Throttler>,
179
) -> (Did<'static>, CrawlCheckResult) {
180
const MAX_RETRIES: u32 = 5;
181
182
let pds_url = (|| resolver.resolve_identity_info(&did))
183
+
.retry(MAX_RETRIES, |e, attempt| {
184
matches!(e, crate::resolver::ResolverError::Ratelimited)
185
+
.then(|| Duration::from_secs(1 << attempt.min(5)))
186
})
187
.await;
188
+
189
let pds_url = match pds_url {
190
Ok((url, _)) => url,
191
Err(RetryOutcome::Ratelimited) => {
···
193
retries = MAX_RETRIES,
194
"rate limited resolving identity, giving up"
195
);
196
+
// no pds handle to read retry_after from; use a short default
197
+
let retry_after = chrono::Utc::now().timestamp() + 60;
198
+
return (
199
+
did,
200
+
CrawlCheckResult::Retry {
201
+
retry_after,
202
+
status: 429,
203
+
},
204
+
);
205
}
206
Err(RetryOutcome::Failed(e)) => {
207
error!(err = %e, "failed to resolve identity");
208
+
let retry_after = chrono::Utc::now().timestamp() + 60;
209
+
return (
210
+
did,
211
+
CrawlCheckResult::Retry {
212
+
retry_after,
213
+
status: 0,
214
+
},
215
+
);
216
}
217
};
218
219
+
let throttle = throttler.get_handle(&pds_url).await;
220
+
if throttle.is_throttled() {
221
+
trace!(host = pds_url.host_str(), "skipping throttled pds");
222
+
return (
223
+
did,
224
+
CrawlCheckResult::Retry {
225
+
retry_after: throttle.throttled_until(),
226
+
status: 0,
227
+
},
228
+
);
229
}
230
231
+
let _permit = throttle.acquire().unit_error().or_failure(&throttle, || ());
232
+
let Ok(_permit) = _permit.await else {
233
+
trace!(
234
+
host = pds_url.host_str(),
235
+
"pds failed while waiting for permit"
236
+
);
237
+
return (
238
+
did,
239
+
CrawlCheckResult::Retry {
240
+
retry_after: throttle.throttled_until(),
241
+
status: 0,
242
+
},
243
+
);
244
+
};
245
+
246
enum RequestError {
247
Reqwest(reqwest::Error),
248
+
RateLimited(Option<u64>),
249
+
/// hard failure notification from another task on this PDS
250
+
Throttled,
251
}
252
253
let mut found_signal = false;
···
260
.append_pair("collection", signal)
261
.append_pair("limit", "1");
262
263
+
let resp = async {
264
+
let resp = http
265
+
.get(list_records_url.clone())
266
.send()
267
+
.await
268
+
.map_err(RequestError::Reqwest)?;
269
+
270
+
// dont retry ratelimits since we will just put it in a queue to be tried again later
271
+
if resp.status() == StatusCode::TOO_MANY_REQUESTS {
272
+
return Err(RequestError::RateLimited(parse_retry_after(&resp)));
273
+
}
274
+
275
+
resp.error_for_status().map_err(RequestError::Reqwest)
276
+
}
277
+
.or_failure(&throttle, || RequestError::Throttled)
278
+
.await;
279
+
280
+
let resp = match resp {
281
Ok(r) => {
282
+
throttle.record_success();
283
r
284
}
285
+
Err(RequestError::RateLimited(secs)) => {
286
+
throttle.record_ratelimit(secs);
287
+
return CrawlCheckResult::Retry {
288
+
retry_after: throttle.throttled_until(),
289
+
status: 429,
290
+
};
291
}
292
+
Err(RequestError::Throttled) => {
293
+
return CrawlCheckResult::Retry {
294
+
retry_after: throttle.throttled_until(),
295
+
status: 0,
296
+
};
297
+
}
298
+
Err(RequestError::Reqwest(e)) => {
299
+
if is_throttle_worthy(&e) {
300
+
if let Some(mins) = throttle.record_failure() {
301
+
warn!(url = %pds_url, mins, "throttling pds due to hard failure");
302
}
303
+
return CrawlCheckResult::Retry {
304
+
retry_after: throttle.throttled_until(),
305
+
status: e.status().map_or(0, |s| s.as_u16()),
306
+
};
307
+
}
308
309
+
match e.status() {
310
+
Some(StatusCode::NOT_FOUND | StatusCode::GONE) => {
311
+
trace!("repo not found");
312
+
return CrawlCheckResult::NoSignal;
0
0
0
0
0
0
0
313
}
314
+
Some(s) if s.is_client_error() => {
315
+
error!(status = %s, "repo unavailable");
316
+
return CrawlCheckResult::NoSignal;
317
+
}
318
+
_ => {
319
+
error!(err = %e, "repo errored");
320
+
return CrawlCheckResult::Retry {
321
+
retry_after: chrono::Utc::now().timestamp() + 60,
322
+
status: e.status().map_or(0, |s| s.as_u16()),
323
+
};
324
+
}
325
}
326
+
}
327
};
328
329
+
let bytes = match resp.bytes().await {
330
Ok(b) => b,
331
Err(e) => {
332
error!(err = %e, "failed to read listRecords response");
333
+
return CrawlCheckResult::Retry {
334
+
retry_after: chrono::Utc::now().timestamp() + 60,
335
+
status: 0,
336
+
};
337
}
338
};
339
340
match serde_json::from_slice::<ListRecordsOutput>(&bytes) {
341
+
Ok(out) if !out.records.is_empty() => return CrawlCheckResult::Signal,
342
+
Ok(_) => {}
0
0
0
343
Err(e) => {
344
error!(err = %e, "failed to parse listRecords response");
345
+
return CrawlCheckResult::Retry {
346
+
retry_after: chrono::Utc::now().timestamp() + 60,
347
+
status: 0,
348
+
};
349
}
350
}
351
352
CrawlCheckResult::NoSignal
353
}
354
+
.instrument(tracing::info_span!("check", signal = %signal))
355
.await;
356
357
match res {
···
359
found_signal = true;
360
break;
361
}
362
+
CrawlCheckResult::NoSignal => continue,
363
+
other => return (did, other),
0
0
0
0
364
}
365
}
366
···
376
)
377
}
378
379
+
pub mod throttle;
380
+
use throttle::{OrThrottle, Throttler};
381
382
pub struct Crawler {
383
state: Arc<AppState>,
···
388
count: Arc<AtomicUsize>,
389
crawled_count: Arc<AtomicUsize>,
390
throttled: Arc<AtomicBool>,
391
+
pds_throttler: Arc<Throttler>,
392
}
393
394
impl Crawler {
···
419
count: Arc::new(AtomicUsize::new(0)),
420
crawled_count: Arc::new(AtomicUsize::new(0)),
421
throttled: Arc::new(AtomicBool::new(false)),
422
+
pds_throttler: Arc::new(Throttler::new()),
423
}
424
}
425
426
pub async fn run(self) -> Result<()> {
427
+
// stats ticker
428
tokio::spawn({
429
use std::time::Instant;
430
let count = self.count.clone();
431
let crawled_count = self.crawled_count.clone();
432
let throttled = self.throttled.clone();
433
+
let pds_throttler = self.pds_throttler.clone();
434
let mut last_time = Instant::now();
435
let mut interval = tokio::time::interval(Duration::from_secs(60));
436
async move {
···
439
let delta_processed = count.swap(0, Ordering::Relaxed);
440
let delta_crawled = crawled_count.swap(0, Ordering::Relaxed);
441
let is_throttled = throttled.load(Ordering::Relaxed);
442
+
443
+
pds_throttler.evict_clean().await;
444
445
if delta_processed == 0 && delta_crawled == 0 {
446
if is_throttled {
···
463
}
464
});
465
466
+
let crawler = Arc::new(self);
467
+
std::thread::spawn({
468
+
let crawler = crawler.clone();
469
+
let handle = tokio::runtime::Handle::current();
470
+
move || {
471
+
use std::thread::sleep;
472
+
473
+
let _g = handle.enter();
474
+
475
+
loop {
476
+
match crawler.process_retry_queue() {
477
+
Ok(Some(next_ts)) => {
478
+
let secs = (next_ts - chrono::Utc::now().timestamp()).max(1) as u64;
479
+
sleep(Duration::from_secs(secs));
480
+
}
481
+
Ok(None) => {
482
+
sleep(Duration::from_secs(60));
483
+
}
484
+
Err(e) => {
485
+
error!(err = %e, "retry loop failed");
486
+
sleep(Duration::from_secs(60));
487
+
}
488
+
}
489
+
}
490
+
}
491
+
});
492
+
493
+
let mut relay_url = crawler.relay_host.clone();
494
match relay_url.scheme() {
495
"wss" => relay_url
496
.set_scheme("https")
···
502
}
503
504
let mut rng: SmallRng = rand::make_rng();
505
+
let db = &crawler.state.db;
0
506
0
507
let cursor_key = b"crawler_cursor";
508
let mut cursor: Option<SmolStr> = Db::get(db.cursors.clone(), cursor_key.to_vec())
509
.await?
···
515
let mut was_throttled = false;
516
517
loop {
518
+
// throttle check
519
loop {
520
+
let pending = crawler.state.db.get_count("pending").await;
521
+
if pending > crawler.max_pending as u64 {
522
if !was_throttled {
523
debug!(
524
pending,
525
+
max = crawler.max_pending,
526
"throttling: above max pending"
527
);
528
was_throttled = true;
529
+
crawler.throttled.store(true, Ordering::Relaxed);
530
}
531
tokio::time::sleep(Duration::from_secs(5)).await;
532
+
} else if pending > crawler.resume_pending as u64 {
533
if !was_throttled {
534
debug!(
535
pending,
536
+
resume = crawler.resume_pending,
537
"throttling: entering cooldown"
538
);
539
was_throttled = true;
540
+
crawler.throttled.store(true, Ordering::Relaxed);
541
}
542
543
loop {
544
+
let current_pending = crawler.state.db.get_count("pending").await;
545
+
if current_pending <= crawler.resume_pending as u64 {
546
break;
547
}
548
debug!(
549
pending = current_pending,
550
+
resume = crawler.resume_pending,
551
"cooldown, waiting"
552
);
553
tokio::time::sleep(Duration::from_secs(5)).await;
···
557
if was_throttled {
558
info!("throttling released");
559
was_throttled = false;
560
+
crawler.throttled.store(false, Ordering::Relaxed);
561
}
562
break;
563
}
564
}
565
0
566
let mut list_repos_url = relay_url
567
.join("/xrpc/com.atproto.sync.listRepos")
568
.into_diagnostic()?;
···
576
}
577
578
let fetch_result = (|| {
579
+
crawler
580
+
.http
581
.get(list_repos_url.clone())
582
.send()
583
.error_for_status()
584
})
585
+
.retry(5, |e: &reqwest::Error, attempt| {
586
matches!(e.status(), Some(StatusCode::TOO_MANY_REQUESTS))
587
+
.then(|| Duration::from_secs(1 << attempt.min(5)))
588
})
589
.await;
590
···
623
}
624
625
debug!(count = output.repos.len(), "fetched repos");
626
+
crawler
627
+
.crawled_count
628
.fetch_add(output.repos.len(), Ordering::Relaxed);
629
630
let mut batch = db.inner.batch();
631
let mut to_queue = Vec::new();
632
+
let filter = crawler.state.filter.load();
0
0
0
0
0
0
633
0
634
let mut unknown_dids = Vec::new();
635
for repo in output.repos {
636
let did_key = keys::repo_key(&repo.did);
···
640
continue;
641
}
642
0
643
if !Db::contains_key(db.repos.clone(), &did_key).await? {
644
unknown_dids.push(repo.did.into_static());
645
}
646
}
647
648
+
let valid_dids = if filter.check_signals() && !unknown_dids.is_empty() {
649
+
crawler
650
+
.check_signals_batch(&unknown_dids, &filter, &mut batch)
651
.await?
652
} else {
653
unknown_dids
···
657
let did_key = keys::repo_key(did);
658
trace!(did = %did, "found new repo");
659
660
+
let state = RepoState::untracked(rng.next_u64());
661
batch.insert(&db.repos, &did_key, ser_repo_state(&state)?);
662
batch.insert(&db.pending, keys::pending_key(state.index_id), &did_key);
663
to_queue.push(did.clone());
664
}
665
0
666
if let Some(new_cursor) = output.cursor {
667
cursor = Some(new_cursor.as_str().into());
0
668
batch.insert(
669
&db.cursors,
670
cursor_key.to_vec(),
671
new_cursor.as_bytes().to_vec(),
672
);
673
} else {
0
674
info!("reached end of list.");
675
cursor = None;
676
}
···
679
.await
680
.into_diagnostic()??;
681
682
+
crawler.account_new_repos(to_queue.len()).await;
683
684
if cursor.is_none() {
0
0
0
685
tokio::time::sleep(Duration::from_secs(3600)).await;
686
}
687
}
688
}
689
690
+
/// scan the retry queue for entries whose `retry_after` timestamp has passed,
691
+
/// retry them, and return the earliest still-pending timestamp (if any) so the
692
+
/// caller knows when to wake up next.
693
+
fn process_retry_queue(&self) -> Result<Option<i64>> {
694
+
let db = &self.state.db;
695
+
let now = chrono::Utc::now().timestamp();
696
+
697
+
let mut ready: Vec<Did> = Vec::new();
698
+
let mut next_retry: Option<i64> = None;
699
+
700
+
let mut rng: SmallRng = rand::make_rng();
701
+
702
+
for guard in db.crawler.prefix(keys::CRAWLER_RETRY_PREFIX) {
703
+
let (key, val) = guard.into_inner().into_diagnostic()?;
704
+
let (retry_after, _) = keys::crawler_retry_parse_value(&val)?;
705
+
let did = keys::crawler_retry_parse_key(&key)?.to_did();
706
+
707
+
// we check an extra backoff of 1 - 7% just to make it less likely for
708
+
// many requests to coincide with each other
709
+
let backoff =
710
+
((retry_after - now).max(0) as f64).mul(rng.random_range(0.01..0.07)) as i64;
711
+
if retry_after + backoff > now {
712
+
next_retry = Some(
713
+
next_retry
714
+
.map(|earliest| earliest.min(retry_after))
715
+
.unwrap_or(retry_after),
716
+
);
717
+
continue;
718
+
}
719
+
720
+
ready.push(did);
721
+
}
722
+
723
+
if ready.is_empty() {
724
+
return Ok(next_retry);
725
+
}
726
+
727
+
info!(count = ready.len(), "retrying pending repos");
728
+
729
+
let handle = tokio::runtime::Handle::current();
730
+
let mut batch = db.inner.batch();
731
+
let filter = self.state.filter.load();
732
+
let valid_dids = handle.block_on(self.check_signals_batch(&ready, &filter, &mut batch))?;
733
+
734
+
let mut rng: SmallRng = rand::make_rng();
735
+
for did in &valid_dids {
736
+
let did_key = keys::repo_key(did);
737
+
738
+
if db.repos.contains_key(&did_key).into_diagnostic()? {
739
+
continue;
740
+
}
741
+
742
+
let state = RepoState::untracked(rng.next_u64());
743
+
batch.insert(&db.repos, &did_key, ser_repo_state(&state)?);
744
+
batch.insert(&db.pending, keys::pending_key(state.index_id), &did_key);
745
+
}
746
+
747
+
batch.commit().into_diagnostic()?;
748
+
749
+
if !valid_dids.is_empty() {
750
+
info!(count = valid_dids.len(), "recovered from retry queue");
751
+
handle.block_on(self.account_new_repos(valid_dids.len()));
752
+
}
753
+
754
+
Ok(next_retry)
755
+
}
756
+
757
async fn check_signals_batch(
758
&self,
759
dids: &[Did<'static>],
···
769
let http = self.http.clone();
770
let resolver = self.state.resolver.clone();
771
let filter = filter.clone();
772
+
let throttler = self.pds_throttler.clone();
773
+
let span = tracing::info_span!("signals", did = %did);
774
+
set.spawn(check_repo_signals(http, resolver, filter, did, throttler).instrument(span));
775
}
776
777
while let Some(res) = set.join_next().await {
778
let (did, result) = res.into_diagnostic()?;
779
match result {
780
CrawlCheckResult::Signal => {
0
781
valid.push(did);
782
}
783
+
CrawlCheckResult::NoSignal => {}
784
+
CrawlCheckResult::Retry {
785
+
retry_after,
786
+
status,
787
+
} => {
788
batch.insert(
789
&db.crawler,
790
+
keys::crawler_retry_key(&did),
791
+
keys::crawler_retry_value(retry_after, status),
0
0
0
0
0
0
0
0
792
);
793
}
794
}
795
}
796
797
Ok(valid)
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
798
}
799
800
async fn account_new_repos(&self, count: usize) {
+172
src/crawler/throttle.rs
···
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
···
1
+
use scc::HashMap;
2
+
use std::future::Future;
3
+
use std::sync::Arc;
4
+
use std::sync::atomic::{AtomicI64, AtomicUsize, Ordering};
5
+
use tokio::sync::{Notify, Semaphore, SemaphorePermit};
6
+
use url::Url;
7
+
8
+
/// max concurrent in-flight requests per PDS before we start queuing
9
+
/// ref pds allows 10 requests per second... so 10 should be fine
10
+
const PER_PDS_CONCURRENCY: usize = 10;
11
+
12
+
#[derive(Clone)]
13
+
pub struct Throttler {
14
+
states: Arc<HashMap<Url, Arc<State>>>,
15
+
}
16
+
17
+
impl Throttler {
18
+
pub fn new() -> Self {
19
+
Self {
20
+
states: Arc::new(HashMap::new()),
21
+
}
22
+
}
23
+
24
+
pub async fn get_handle(&self, url: &Url) -> ThrottleHandle {
25
+
let state = self
26
+
.states
27
+
.entry_async(url.clone())
28
+
.await
29
+
.or_insert_with(|| Arc::new(State::new()))
30
+
.get()
31
+
.clone();
32
+
33
+
ThrottleHandle { state }
34
+
}
35
+
36
+
/// drop entries with no active throttle and no consecutive failures.
37
+
pub async fn evict_clean(&self) {
38
+
self.states
39
+
.retain_async(|_, v| {
40
+
v.throttled_until.load(Ordering::Acquire) != 0
41
+
|| v.consecutive_failures.load(Ordering::Acquire) != 0
42
+
})
43
+
.await;
44
+
}
45
+
}
46
+
47
+
struct State {
48
+
throttled_until: AtomicI64,
49
+
consecutive_failures: AtomicUsize,
50
+
/// only fires on hard failures (timeout, TLS, bad gateway, etc).
51
+
/// ratelimits do NOT fire this — they just store `throttled_until` and
52
+
/// let tasks exit naturally, deferring to the background retry loop.
53
+
failure_notify: Notify,
54
+
semaphore: Semaphore,
55
+
}
56
+
57
+
impl State {
58
+
fn new() -> Self {
59
+
Self {
60
+
throttled_until: AtomicI64::new(0),
61
+
consecutive_failures: AtomicUsize::new(0),
62
+
failure_notify: Notify::new(),
63
+
semaphore: Semaphore::new(PER_PDS_CONCURRENCY),
64
+
}
65
+
}
66
+
}
67
+
68
+
pub struct ThrottleHandle {
69
+
state: Arc<State>,
70
+
}
71
+
72
+
impl ThrottleHandle {
73
+
pub fn is_throttled(&self) -> bool {
74
+
let until = self.state.throttled_until.load(Ordering::Acquire);
75
+
until != 0 && chrono::Utc::now().timestamp() < until
76
+
}
77
+
78
+
/// the unix timestamp at which this throttle expires (0 if not throttled).
79
+
pub fn throttled_until(&self) -> i64 {
80
+
self.state.throttled_until.load(Ordering::Acquire)
81
+
}
82
+
83
+
pub fn record_success(&self) {
84
+
self.state.consecutive_failures.store(0, Ordering::Release);
85
+
self.state.throttled_until.store(0, Ordering::Release);
86
+
}
87
+
88
+
/// called on a 429 response. `retry_after_secs` comes from the `Retry-After`
89
+
/// header if present; falls back to 60s. uses `fetch_max` so concurrent callers
90
+
/// don't race each other back to a shorter window.
91
+
///
92
+
/// deliberately does NOT notify waiters — 429s are soft and tasks should exit
93
+
/// naturally via the `Retry` result rather than being cancelled.
94
+
pub fn record_ratelimit(&self, retry_after_secs: Option<u64>) {
95
+
let secs = retry_after_secs.unwrap_or(60) as i64;
96
+
let until = chrono::Utc::now().timestamp() + secs;
97
+
self.state
98
+
.throttled_until
99
+
.fetch_max(until, Ordering::AcqRel);
100
+
}
101
+
102
+
/// called on hard failures (timeout, TLS error, bad gateway, etc).
103
+
/// returns throttle duration in minutes if this is a *new* throttle,
104
+
/// and notifies all in-flight tasks to cancel immediately.
105
+
pub fn record_failure(&self) -> Option<i64> {
106
+
if self.is_throttled() {
107
+
return None;
108
+
}
109
+
110
+
let failures = self
111
+
.state
112
+
.consecutive_failures
113
+
.fetch_add(1, Ordering::AcqRel)
114
+
+ 1;
115
+
116
+
// 30 min, 60 min, 120 min, ... capped at ~512 hours
117
+
let base_minutes = 30i64;
118
+
let exponent = (failures as u32).saturating_sub(1);
119
+
let minutes = base_minutes * 2i64.pow(exponent.min(10));
120
+
let until = chrono::Utc::now().timestamp() + minutes * 60;
121
+
122
+
self.state.throttled_until.store(until, Ordering::Release);
123
+
self.state.failure_notify.notify_waiters();
124
+
125
+
Some(minutes)
126
+
}
127
+
128
+
/// acquire a concurrency slot for this PDS. hold the returned permit
129
+
/// for the duration of the request.
130
+
pub async fn acquire(&self) -> SemaphorePermit<'_> {
131
+
self.state
132
+
.semaphore
133
+
.acquire()
134
+
.await
135
+
.expect("throttle semaphore unexpectedly closed")
136
+
}
137
+
138
+
/// resolves when this PDS gets a hard failure notification.
139
+
/// used by `or_throttle` and the semaphore acquire select to cancel in-flight work.
140
+
pub async fn wait_for_failure(&self) {
141
+
loop {
142
+
let notified = self.state.failure_notify.notified();
143
+
if self.is_throttled() {
144
+
return;
145
+
}
146
+
notified.await;
147
+
}
148
+
}
149
+
}
150
+
151
+
/// extension trait that adds `.or_throttle()` to any future returning `Result<T, E>`.
152
+
///
153
+
/// races the future against a hard-failure notification. soft ratelimits (429) do NOT
154
+
/// trigger cancellation — those are handled by the background retry loop.
155
+
#[allow(async_fn_in_trait)]
156
+
pub trait OrThrottle<T, E>: Future<Output = Result<T, E>> {
157
+
async fn or_failure(
158
+
self,
159
+
handle: &ThrottleHandle,
160
+
on_throttle: impl FnOnce() -> E,
161
+
) -> Result<T, E>
162
+
where
163
+
Self: Sized,
164
+
{
165
+
tokio::select! {
166
+
res = self => res,
167
+
_ = handle.wait_for_failure() => Err(on_throttle()),
168
+
}
169
+
}
170
+
}
171
+
172
+
impl<T, E, F: Future<Output = Result<T, E>>> OrThrottle<T, E> for F {}
+24
-4
src/db/keys.rs
···
132
prefix
133
}
134
135
-
pub const CRAWLER_FAILED_PREFIX: &[u8] = &[b'f', SEP];
0
136
137
-
pub fn crawler_failed_key(did: &Did) -> Vec<u8> {
138
let repo = TrimmedDid::from(did);
139
-
let mut key = Vec::with_capacity(CRAWLER_FAILED_PREFIX.len() + repo.len());
140
-
key.extend_from_slice(CRAWLER_FAILED_PREFIX);
141
repo.write_to_vec(&mut key);
142
key
143
}
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
···
132
prefix
133
}
134
135
+
/// key format: `ret|<did bytes>`
136
+
pub const CRAWLER_RETRY_PREFIX: &[u8] = b"ret|";
137
138
+
pub fn crawler_retry_key(did: &Did) -> Vec<u8> {
139
let repo = TrimmedDid::from(did);
140
+
let mut key = Vec::with_capacity(CRAWLER_RETRY_PREFIX.len() + repo.len());
141
+
key.extend_from_slice(CRAWLER_RETRY_PREFIX);
142
repo.write_to_vec(&mut key);
143
key
144
}
145
+
146
+
/// value format: `<retry_after: i64 BE><status: u16 BE>`
147
+
pub fn crawler_retry_value(retry_after: i64, status: u16) -> [u8; 10] {
148
+
let mut buf = [0u8; 10];
149
+
buf[..8].copy_from_slice(&retry_after.to_be_bytes());
150
+
buf[8..].copy_from_slice(&status.to_be_bytes());
151
+
buf
152
+
}
153
+
154
+
pub fn crawler_retry_parse_value(val: &[u8]) -> miette::Result<(i64, u16)> {
155
+
miette::ensure!(val.len() >= 10, "crawler retry value too short");
156
+
let retry_after = i64::from_be_bytes(val[..8].try_into().unwrap());
157
+
let status = u16::from_be_bytes(val[8..10].try_into().unwrap());
158
+
Ok((retry_after, status))
159
+
}
160
+
161
+
pub fn crawler_retry_parse_key(key: &[u8]) -> miette::Result<TrimmedDid<'_>> {
162
+
TrimmedDid::try_from(&key[CRAWLER_RETRY_PREFIX.len()..])
163
+
}
+5
-1
src/filter.rs
···
52
self.signals.iter().any(|p| nsid_matches(p, collection))
53
}
54
55
-
pub fn has_glob_signals(&self) -> bool {
56
self.signals.iter().any(|s| s.ends_with(".*"))
0
0
0
0
57
}
58
}
59
···
52
self.signals.iter().any(|p| nsid_matches(p, collection))
53
}
54
55
+
fn has_glob_signals(&self) -> bool {
56
self.signals.iter().any(|s| s.ends_with(".*"))
57
+
}
58
+
59
+
pub fn check_signals(&self) -> bool {
60
+
self.mode == FilterMode::Filter && !self.signals.is_empty() && !self.has_glob_signals()
61
}
62
}
63
+1
-1
src/ingest/worker.rs
···
556
557
debug!(did = %did, "discovered new account from firehose, queueing backfill");
558
559
-
let repo_state = RepoState::backfilling_untracked(rand::rng().next_u64());
560
let mut batch = ctx.state.db.inner.batch();
561
batch.insert(
562
&ctx.state.db.repos,
···
556
557
debug!(did = %did, "discovered new account from firehose, queueing backfill");
558
559
+
let repo_state = RepoState::untracked(rand::rng().next_u64());
560
let mut batch = ctx.state.db.inner.batch();
561
batch.insert(
562
&ctx.state.db.repos,
+2
-1
src/types.rs
···
65
}
66
}
67
68
-
pub fn backfilling_untracked(index_id: u64) -> Self {
0
69
Self {
70
tracked: false,
71
..Self::backfilling(index_id)
···
65
}
66
}
67
68
+
/// backfilling, but not tracked yet
69
+
pub fn untracked(index_id: u64) -> Self {
70
Self {
71
tracked: false,
72
..Self::backfilling(index_id)