tangled
alpha
login
or
join now
ptr.pet
/
hydrant
21
fork
atom
at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
21
fork
atom
overview
issues
6
pulls
pipelines
[backfill] remove the task limiter
ptr.pet
4 weeks ago
088eafdc
2864d674
verified
This commit was signed with the committer's
known signature
.
ptr.pet
SSH Key Fingerprint:
SHA256:Abmvag+juovVufZTxyWY8KcVgrznxvBjQpJesv071Aw=
+11
-79
1 changed file
expand all
collapse all
unified
split
src
backfill
mod.rs
+11
-79
src/backfill/mod.rs
···
31
31
32
32
use crate::ingest::{BufferTx, IngestMessage};
33
33
34
34
-
trait SliceSplitExt {
35
35
-
fn split_once<'a>(&'a self, delimiter: impl Fn(&u8) -> bool) -> Option<(&'a [u8], &'a [u8])>;
36
36
-
}
37
37
-
38
38
-
impl SliceSplitExt for [u8] {
39
39
-
fn split_once<'a>(&'a self, delimiter: impl Fn(&u8) -> bool) -> Option<(&'a [u8], &'a [u8])> {
40
40
-
let idx = self.iter().position(delimiter)?;
41
41
-
Some((&self[..idx], &self[idx + 1..]))
42
42
-
}
43
43
-
}
44
44
-
45
45
-
struct AdaptiveLimiter {
46
46
-
current_limit: usize,
47
47
-
max_limit: usize,
48
48
-
min_limit: usize,
49
49
-
}
50
50
-
51
51
-
impl AdaptiveLimiter {
52
52
-
fn new(start: usize, max: usize) -> Self {
53
53
-
Self {
54
54
-
current_limit: start,
55
55
-
max_limit: max,
56
56
-
min_limit: 1,
57
57
-
}
58
58
-
}
59
59
-
60
60
-
fn on_success(&mut self) {
61
61
-
if self.current_limit < self.max_limit {
62
62
-
self.current_limit += 1;
63
63
-
debug!("adaptive limiter increased to {}", self.current_limit);
64
64
-
}
65
65
-
}
66
66
-
67
67
-
fn on_failure(&mut self) {
68
68
-
if self.current_limit > self.min_limit {
69
69
-
self.current_limit = (self.current_limit / 2).max(self.min_limit);
70
70
-
debug!("adaptive limiter decreased to {}", self.current_limit);
71
71
-
}
72
72
-
}
73
73
-
}
74
74
-
75
34
pub struct BackfillWorker {
76
35
state: Arc<AppState>,
77
36
buffer_tx: BufferTx,
···
121
80
pub async fn run(self) {
122
81
info!("backfill worker started");
123
82
124
124
-
let (feedback_tx, mut feedback_rx) = tokio::sync::mpsc::channel(100);
125
125
-
let mut limiter = AdaptiveLimiter::new(
126
126
-
self.semaphore.available_permits(),
127
127
-
self.semaphore.available_permits(),
128
128
-
); // assume start at max
129
129
-
130
83
loop {
131
131
-
// apply feedback from finished tasks
132
132
-
while let Ok(was_ratelimited) = feedback_rx.try_recv() {
133
133
-
if was_ratelimited {
134
134
-
limiter.on_failure();
135
135
-
} else {
136
136
-
limiter.on_success();
137
137
-
}
138
138
-
}
139
139
-
140
84
let mut spawned = 0;
141
85
142
86
for guard in self.state.db.pending.iter() {
143
143
-
if self.in_flight.len() >= limiter.current_limit {
144
144
-
break;
145
145
-
}
146
146
-
147
87
let key = match guard.key() {
148
88
Ok(k) => k,
149
89
Err(e) => {
···
181
121
let did = did.clone();
182
122
let buffer_tx = self.buffer_tx.clone();
183
123
let verify = self.verify_signatures;
184
184
-
let feedback_tx = feedback_tx.clone();
185
124
186
125
tokio::spawn(async move {
187
126
let _guard = guard;
188
127
let res = did_task(&state, http, buffer_tx, &did, key, permit, verify).await;
189
128
190
190
-
let was_ratelimited = match &res {
191
191
-
Err(e) if matches!(e, BackfillError::Ratelimited) => true,
192
192
-
_ => false,
193
193
-
};
194
194
-
195
129
if let Err(e) = res {
196
130
error!("backfill process failed for {did}: {e}");
197
131
if let BackfillError::Generic(report) = &e {
···
201
135
202
136
// wake worker to pick up more (in case we were sleeping at limit)
203
137
state.backfill_notify.notify_one();
204
204
-
205
205
-
let _ = feedback_tx.try_send(was_ratelimited);
206
138
});
207
139
208
140
spawned += 1;
209
141
}
210
142
211
143
if spawned == 0 {
212
212
-
// if we didn't spawn anything, wait for notification OR feedback
213
213
-
tokio::select! {
214
214
-
_ = self.state.backfill_notify.notified() => {},
215
215
-
_ = tokio::time::sleep(Duration::from_secs(1)) => {}, // poll for feedback if idle
216
216
-
}
144
144
+
// wait for new tasks
145
145
+
self.state.backfill_notify.notified().await;
217
146
} else {
218
147
// if we spawned tasks, yield briefly to let them start and avoid tight loop
219
148
tokio::time::sleep(Duration::from_millis(10)).await;
···
614
543
let (key, cid_bytes) = guard.into_inner().into_diagnostic()?;
615
544
// key is did|collection|rkey
616
545
// skip did|
617
617
-
let remaining = &key[prefix.len()..];
618
618
-
let (collection_bytes, rkey_bytes) =
619
619
-
SliceSplitExt::split_once(remaining, |b| *b == keys::SEP)
620
620
-
.ok_or_else(|| miette::miette!("invalid record key format: {key:?}"))?;
546
546
+
let mut remaining = key[prefix.len()..].split(|b| keys::SEP.eq(b));
547
547
+
let collection_raw = remaining
548
548
+
.next()
549
549
+
.ok_or_else(|| miette::miette!("invalid record key format: {key:?}"))?;
550
550
+
let rkey_raw = remaining
551
551
+
.next()
552
552
+
.ok_or_else(|| miette::miette!("invalid record key format: {key:?}"))?;
621
553
622
622
-
let collection = std::str::from_utf8(collection_bytes)
554
554
+
let collection = std::str::from_utf8(collection_raw)
623
555
.map_err(|e| miette::miette!("invalid collection utf8: {e}"))?;
624
556
625
625
-
let rkey = keys::parse_rkey(rkey_bytes)
557
557
+
let rkey = keys::parse_rkey(rkey_raw)
626
558
.map_err(|e| miette::miette!("invalid rkey '{key:?}' for {did}: {e}"))?;
627
559
628
560
let cid = cid::Cid::read_bytes(cid_bytes.as_ref())