tangled
alpha
login
or
join now
ptr.pet
/
hydrant
28
fork
atom
at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
28
fork
atom
overview
issues
6
pulls
pipelines
[crawler] use describeRepo instead of listRecords...
ptr.pet
1 week ago
e263dacf
d58e0261
verified
This commit was signed with the committer's
known signature
.
ptr.pet
SSH Key Fingerprint:
SHA256:Abmvag+juovVufZTxyWY8KcVgrznxvBjQpJesv071Aw=
+72
-90
1 changed file
expand all
collapse all
unified
split
src
crawler
mod.rs
+72
-90
src/crawler/mod.rs
···
5
5
use crate::util::{ErrorForStatus, RetryOutcome, RetryWithBackoff, parse_retry_after};
6
6
use chrono::{DateTime, TimeDelta, Utc};
7
7
use futures::FutureExt;
8
8
-
use jacquard_api::com_atproto::repo::list_records::ListRecordsOutput;
8
8
+
use jacquard_api::com_atproto::repo::describe_repo::DescribeRepoOutput;
9
9
use jacquard_api::com_atproto::sync::list_repos::ListReposOutput;
10
10
use jacquard_common::{IntoStatic, types::string::Did};
11
11
use miette::{Context, IntoDiagnostic, Result};
···
193
193
Throttled,
194
194
}
195
195
196
196
-
let mut found_signal = false;
197
197
-
for signal in filter.signals.iter() {
198
198
-
let res = async {
199
199
-
let mut list_records_url = pds_url.join("/xrpc/com.atproto.repo.listRecords").unwrap();
200
200
-
list_records_url
201
201
-
.query_pairs_mut()
202
202
-
.append_pair("repo", &did)
203
203
-
.append_pair("collection", signal)
204
204
-
.append_pair("limit", "1");
196
196
+
let mut describe_url = pds_url.join("/xrpc/com.atproto.repo.describeRepo").unwrap();
197
197
+
describe_url.query_pairs_mut().append_pair("repo", &did);
205
198
206
206
-
let resp = async {
207
207
-
let resp = http
208
208
-
.get(list_records_url.clone())
209
209
-
.send()
210
210
-
.await
211
211
-
.map_err(RequestError::Reqwest)?;
199
199
+
let resp = async {
200
200
+
let resp = http
201
201
+
.get(describe_url)
202
202
+
.send()
203
203
+
.await
204
204
+
.map_err(RequestError::Reqwest)?;
212
205
213
213
-
// dont retry ratelimits since we will just put it in a queue to be tried again later
214
214
-
if resp.status() == StatusCode::TOO_MANY_REQUESTS {
215
215
-
return Err(RequestError::RateLimited(parse_retry_after(&resp)));
216
216
-
}
206
206
+
// dont retry ratelimits since we will just put it in a queue to be tried again later
207
207
+
if resp.status() == StatusCode::TOO_MANY_REQUESTS {
208
208
+
return Err(RequestError::RateLimited(parse_retry_after(&resp)));
209
209
+
}
217
210
218
218
-
resp.error_for_status().map_err(RequestError::Reqwest)
219
219
-
}
220
220
-
.or_failure(&throttle, || RequestError::Throttled)
221
221
-
.await;
211
211
+
resp.error_for_status().map_err(RequestError::Reqwest)
212
212
+
}
213
213
+
.or_failure(&throttle, || RequestError::Throttled)
214
214
+
.await;
222
215
223
223
-
let resp = match resp {
224
224
-
Ok(r) => {
225
225
-
throttle.record_success();
226
226
-
r
227
227
-
}
228
228
-
Err(RequestError::RateLimited(secs)) => {
229
229
-
throttle.record_ratelimit(secs);
230
230
-
return throttle
231
231
-
.to_retry_state()
232
232
-
.with_status(StatusCode::TOO_MANY_REQUESTS)
233
233
-
.into();
234
234
-
}
235
235
-
Err(RequestError::Throttled) => {
236
236
-
return throttle.to_retry_state().into();
216
216
+
let resp = match resp {
217
217
+
Ok(r) => {
218
218
+
throttle.record_success();
219
219
+
r
220
220
+
}
221
221
+
Err(RequestError::RateLimited(secs)) => {
222
222
+
throttle.record_ratelimit(secs);
223
223
+
return (
224
224
+
did,
225
225
+
throttle
226
226
+
.to_retry_state()
227
227
+
.with_status(StatusCode::TOO_MANY_REQUESTS)
228
228
+
.into(),
229
229
+
);
230
230
+
}
231
231
+
Err(RequestError::Throttled) => {
232
232
+
return (did, throttle.to_retry_state().into());
233
233
+
}
234
234
+
Err(RequestError::Reqwest(e)) => {
235
235
+
if is_throttle_worthy(&e) {
236
236
+
if let Some(mins) = throttle.record_failure() {
237
237
+
warn!(url = %pds_url, mins, "throttling pds due to hard failure");
237
238
}
238
238
-
Err(RequestError::Reqwest(e)) => {
239
239
-
if is_throttle_worthy(&e) {
240
240
-
if let Some(mins) = throttle.record_failure() {
241
241
-
warn!(url = %pds_url, mins, "throttling pds due to hard failure");
242
242
-
}
243
243
-
let mut retry_state = throttle.to_retry_state();
244
244
-
retry_state.status = e.status();
245
245
-
return retry_state.into();
246
246
-
}
239
239
+
let mut retry_state = throttle.to_retry_state();
240
240
+
retry_state.status = e.status();
241
241
+
return (did, retry_state.into());
242
242
+
}
247
243
248
248
-
match e.status() {
249
249
-
Some(StatusCode::NOT_FOUND | StatusCode::GONE) => {
250
250
-
trace!("repo not found");
251
251
-
return CrawlCheckResult::NoSignal;
252
252
-
}
253
253
-
Some(s) if s.is_client_error() => {
254
254
-
error!(status = %s, "repo unavailable");
255
255
-
return CrawlCheckResult::NoSignal;
256
256
-
}
257
257
-
_ => {
258
258
-
error!(err = %e, "repo errored");
259
259
-
let mut retry_state = RetryState::new(60 * 15);
260
260
-
retry_state.status = e.status();
261
261
-
return retry_state.into();
262
262
-
}
263
263
-
}
244
244
+
match e.status() {
245
245
+
Some(StatusCode::NOT_FOUND | StatusCode::GONE) => {
246
246
+
trace!("repo not found");
247
247
+
return (did, CrawlCheckResult::NoSignal);
264
248
}
265
265
-
};
266
266
-
267
267
-
let bytes = match resp.bytes().await {
268
268
-
Ok(b) => b,
269
269
-
Err(e) => {
270
270
-
error!(err = %e, "failed to read listRecords response");
271
271
-
return RetryState::new(60 * 5).into();
249
249
+
Some(s) if s.is_client_error() => {
250
250
+
error!(status = %s, "repo unavailable");
251
251
+
return (did, CrawlCheckResult::NoSignal);
272
252
}
273
273
-
};
274
274
-
275
275
-
match serde_json::from_slice::<ListRecordsOutput>(&bytes) {
276
276
-
Ok(out) if !out.records.is_empty() => return CrawlCheckResult::Signal,
277
277
-
Ok(_) => {}
278
278
-
Err(e) => {
279
279
-
error!(err = %e, "failed to parse listRecords response");
280
280
-
return RetryState::new(60 * 10).into();
253
253
+
_ => {
254
254
+
error!(err = %e, "repo errored");
255
255
+
let mut retry_state = RetryState::new(60 * 15);
256
256
+
retry_state.status = e.status();
257
257
+
return (did, retry_state.into());
281
258
}
282
259
}
260
260
+
}
261
261
+
};
283
262
284
284
-
CrawlCheckResult::NoSignal
263
263
+
let bytes = match resp.bytes().await {
264
264
+
Ok(b) => b,
265
265
+
Err(e) => {
266
266
+
error!(err = %e, "failed to read describeRepo response");
267
267
+
return (did, RetryState::new(60 * 5).into());
285
268
}
286
286
-
.instrument(tracing::info_span!("check", signal = %signal))
287
287
-
.await;
269
269
+
};
288
270
289
289
-
match res {
290
290
-
CrawlCheckResult::Signal => {
291
291
-
found_signal = true;
292
292
-
break;
293
293
-
}
294
294
-
CrawlCheckResult::NoSignal => continue,
295
295
-
other => return (did, other),
271
271
+
let out = match serde_json::from_slice::<DescribeRepoOutput>(&bytes) {
272
272
+
Ok(out) => out,
273
273
+
Err(e) => {
274
274
+
error!(err = %e, "failed to parse describeRepo response");
275
275
+
return (did, RetryState::new(60 * 10).into());
296
276
}
297
297
-
}
277
277
+
};
278
278
+
279
279
+
let found_signal = filter.signals.iter().any(|s| out.collections.contains(s));
298
280
299
281
if !found_signal {
300
300
-
trace!("no signal-matching records found");
282
282
+
trace!("no signal-matching collections found");
301
283
}
302
284
303
285
(