at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall

[crawler] call listRecords on pds, handle errors and ratelimits better

ptr.pet 11373a07 69319002

verified
+323 -112
+1
AGENTS.md
··· 85 85 - `resync_buffer`: Maps `{DID}|{Rev}` -> `Commit` (MessagePack). Used to buffer live events during backfill. 86 86 - `counts`: Maps `k|{NAME}` or `r|{DID}|{COL}` -> `Count` (u64 BE Bytes). 87 87 - `filter`: Stores filter config. Handled by the `db::filter` module. Includes mode key `m` -> `FilterMode` (MessagePack), and set entries for signals (`s|{NSID}`), collections (`c|{NSID}`), and excludes (`x|{DID}`) -> empty value. 88 + - `crawler`: Stores crawler state with prefixed keys. Failed crawl entries use `f|{DID}` -> empty value, representing repos that failed signal checking during crawl discovery. 88 89 89 90 ## Safe commands 90 91
+258 -80
src/crawler/mod.rs
··· 1 + use crate::db::types::TrimmedDid; 1 2 use crate::db::{Db, keys, ser_repo_state}; 2 3 use crate::state::AppState; 3 4 use crate::types::RepoState; ··· 6 7 use jacquard_common::{IntoStatic, types::string::Did}; 7 8 use miette::{IntoDiagnostic, Result}; 8 9 use rand::Rng; 10 + use rand::RngExt; 9 11 use rand::rngs::SmallRng; 10 12 use reqwest_middleware::{ClientBuilder, ClientWithMiddleware}; 11 13 use reqwest_retry::Jitter; 12 14 use reqwest_retry::{RetryTransientMiddleware, policies::ExponentialBackoff}; 13 15 use smol_str::SmolStr; 14 16 use std::sync::Arc; 17 + use std::sync::atomic::{AtomicUsize, Ordering}; 15 18 use std::time::Duration; 16 19 use tracing::{debug, error, info, trace}; 17 20 use url::Url; 18 21 22 + enum CrawlCheckResult { 23 + Signal, 24 + NoSignal, 25 + Ratelimited, 26 + Failed, 27 + } 28 + 19 29 pub struct Crawler { 20 30 state: Arc<AppState>, 21 31 relay_host: Url, 22 32 http: Arc<ClientWithMiddleware>, 23 33 max_pending: usize, 24 34 resume_pending: usize, 35 + count: Arc<AtomicUsize>, 25 36 } 26 37 27 38 impl Crawler { ··· 55 66 http, 56 67 max_pending, 57 68 resume_pending, 69 + count: Arc::new(AtomicUsize::new(0)), 58 70 } 59 71 } 60 72 61 73 pub async fn run(self) -> Result<()> { 62 74 info!("crawler started"); 63 75 76 + tokio::spawn({ 77 + let count = self.count.clone(); 78 + let mut last_time = std::time::Instant::now(); 79 + let mut interval = tokio::time::interval(Duration::from_secs(60)); 80 + async move { 81 + loop { 82 + interval.tick().await; 83 + let delta = count.swap(0, Ordering::Relaxed); 84 + if delta == 0 { 85 + continue; 86 + } 87 + let elapsed = last_time.elapsed().as_secs_f64(); 88 + let rate = if elapsed > 0.0 { 89 + delta as f64 / elapsed 90 + } else { 91 + 0.0 92 + }; 93 + info!("crawler: {rate:.2} repos/s ({delta} repos in {elapsed:.1}s)"); 94 + last_time = std::time::Instant::now(); 95 + } 96 + } 97 + }); 98 + 64 99 let mut api_url = self.relay_host.clone(); 65 100 if api_url.scheme() == "wss" { 66 101 api_url ··· 82 117 .await? 83 118 .map(|bytes| { 84 119 let s = String::from_utf8_lossy(&bytes); 85 - info!("resuming crawler from cursor: {}", s); 120 + info!("resuming crawler from cursor: {s}"); 86 121 s.into() 87 122 }); 88 123 let mut was_throttled = false; ··· 181 216 && !filter.has_glob_signals(); 182 217 183 218 // 3. process repos 184 - let mut unknown_repos = Vec::new(); 219 + let mut unknown_dids = Vec::new(); 185 220 for repo in output.repos { 186 - let parsed_did: Did = repo.did.parse().unwrap(); 187 - let did_key = keys::repo_key(&parsed_did); 221 + let did_key = keys::repo_key(&repo.did); 188 222 189 223 let excl_key = crate::db::filter::exclude_key(repo.did.as_str())?; 190 224 if db.filter.contains_key(&excl_key).into_diagnostic()? { ··· 193 227 194 228 // check if known 195 229 if !Db::contains_key(db.repos.clone(), &did_key).await? { 196 - unknown_repos.push(repo); 230 + unknown_dids.push(repo.did.into_static()); 197 231 } 198 232 } 199 233 200 - let mut valid_repos = Vec::new(); 201 - if check_signals && !unknown_repos.is_empty() { 202 - let mut set = tokio::task::JoinSet::new(); 203 - for repo in unknown_repos { 204 - let http = self.http.clone(); 205 - let api_url = api_url.clone(); 206 - let filter = filter.clone(); 207 - set.spawn(async move { 208 - let mut found_signal = false; 209 - for signal in filter.signals.iter() { 210 - let mut list_records_url = 211 - api_url.join("/xrpc/com.atproto.repo.listRecords").unwrap(); 212 - list_records_url 213 - .query_pairs_mut() 214 - .append_pair("repo", &repo.did) 215 - .append_pair("collection", signal) 216 - .append_pair("limit", "1"); 217 - 218 - match http.get(list_records_url).send().await { 219 - Ok(res) => { 220 - let Ok(bytes) = res.bytes().await else { 221 - error!("failed to read bytes from listRecords response for repo {}, signal {signal}", repo.did); 222 - continue; 223 - }; 224 - if let Ok(out) = serde_json::from_slice::<ListRecordsOutput>(&bytes) { 225 - if !out.records.is_empty() { 226 - found_signal = true; 227 - break; 228 - } 229 - } 230 - } 231 - Err(e) => { 232 - error!( 233 - "failed to listRecords for repo {}, signal {signal}: {e}", 234 - repo.did 235 - ); 236 - continue; 237 - } 238 - } 239 - } 240 - 241 - if !found_signal { 242 - trace!( 243 - "crawler skipped repo {}: no records match signals", 244 - repo.did 245 - ); 246 - } 247 - 248 - (repo, found_signal) 249 - }); 250 - } 251 - 252 - while let Some(res) = set.join_next().await { 253 - let (repo, found_signal) = res.into_diagnostic()?; 254 - if found_signal { 255 - valid_repos.push(repo); 256 - } 257 - } 234 + let valid_dids = if check_signals && !unknown_dids.is_empty() { 235 + self.check_signals_batch(&unknown_dids, &filter, &mut batch) 236 + .await? 258 237 } else { 259 - valid_repos = unknown_repos; 260 - } 238 + unknown_dids 239 + }; 261 240 262 - for repo in valid_repos { 263 - let parsed_did: Did = repo.did.parse().unwrap(); 264 - let did_key = keys::repo_key(&parsed_did); 265 - trace!("crawler found new repo: {}", repo.did); 241 + for did in &valid_dids { 242 + let did_key = keys::repo_key(did); 243 + trace!("crawler found new repo: {did}"); 266 244 267 245 let state = RepoState::backfilling(rng.next_u64()); 268 246 batch.insert(&db.repos, &did_key, ser_repo_state(&state)?); 269 247 batch.insert(&db.pending, keys::pending_key(state.index_id), &did_key); 270 - to_queue.push(repo.did.clone()); 248 + to_queue.push(did.clone()); 271 249 } 272 250 273 251 // 4. update cursor ··· 289 267 .await 290 268 .into_diagnostic()??; 291 269 292 - // update counts if we found new repos 293 - if !to_queue.is_empty() { 294 - let count = to_queue.len() as i64; 295 - self.state.db.update_count_async("repos", count).await; 296 - self.state.db.update_count_async("pending", count).await; 270 + self.account_new_repos(to_queue.len()).await; 271 + 272 + if cursor.is_none() { 273 + // 6. retry previously failed repos before sleeping 274 + self.retry_failed_repos(&mut rng).await?; 275 + 276 + tokio::time::sleep(Duration::from_secs(3600)).await; 297 277 } 278 + } 279 + } 298 280 299 - // 5. notify backfill worker 300 - if !to_queue.is_empty() { 301 - self.state.notify_backfill(); 281 + async fn check_signals_batch( 282 + &self, 283 + dids: &[Did<'static>], 284 + filter: &crate::filter::FilterConfig, 285 + batch: &mut fjall::OwnedWriteBatch, 286 + ) -> Result<Vec<Did<'static>>> { 287 + let db = &self.state.db; 288 + let mut valid = Vec::new(); 289 + let mut set = tokio::task::JoinSet::new(); 290 + 291 + for did in dids { 292 + let did = did.clone(); 293 + let http = self.http.clone(); 294 + let resolver = self.state.resolver.clone(); 295 + let filter = filter.clone(); 296 + set.spawn(async move { 297 + const MAX_RETRIES: u32 = 8; 298 + let mut rng: SmallRng = rand::make_rng(); 299 + 300 + let pds_url = { 301 + let mut attempt = 0u32; 302 + loop { 303 + match resolver.resolve_identity_info(&did).await { 304 + Ok((url, _)) => break url, 305 + Err(crate::resolver::ResolverError::Ratelimited) 306 + if attempt < MAX_RETRIES => 307 + { 308 + let base = Duration::from_secs(1 << attempt); 309 + let jitter = Duration::from_millis(rng.random_range(0..2000)); 310 + let try_in = base + jitter; 311 + debug!( 312 + "crawler: rate limited resolving {did}, retry {}/{MAX_RETRIES} in {}s", 313 + attempt + 1, 314 + try_in.as_secs_f64() 315 + ); 316 + tokio::time::sleep(try_in).await; 317 + attempt += 1; 318 + } 319 + Err(crate::resolver::ResolverError::Ratelimited) => { 320 + error!( 321 + "crawler: rate limited resolving {did} after {MAX_RETRIES} retries" 322 + ); 323 + return (did, CrawlCheckResult::Ratelimited); 324 + } 325 + Err(e) => { 326 + error!("crawler: failed to resolve {did}: {e}"); 327 + return (did, CrawlCheckResult::Failed); 328 + } 329 + } 330 + } 331 + }; 332 + 333 + let mut found_signal = false; 334 + for signal in filter.signals.iter() { 335 + let mut list_records_url = 336 + pds_url.join("/xrpc/com.atproto.repo.listRecords").unwrap(); 337 + list_records_url 338 + .query_pairs_mut() 339 + .append_pair("repo", &did) 340 + .append_pair("collection", signal) 341 + .append_pair("limit", "1"); 342 + 343 + let res = http 344 + .get(list_records_url) 345 + .send() 346 + .await 347 + .into_diagnostic() 348 + .map(|res| res.error_for_status().into_diagnostic()) 349 + .flatten(); 350 + match res { 351 + Ok(res) => { 352 + let Ok(bytes) = res.bytes().await else { 353 + error!( 354 + "failed to read bytes from listRecords response for repo {did}, signal {signal}" 355 + ); 356 + return (did, CrawlCheckResult::Failed); 357 + }; 358 + match serde_json::from_slice::<ListRecordsOutput>(&bytes) { 359 + Ok(out) => { 360 + if !out.records.is_empty() { 361 + found_signal = true; 362 + break; 363 + } 364 + } 365 + Err(e) => { 366 + error!( 367 + "failed to parse listRecords response for repo {did}, signal {signal}: {e}" 368 + ); 369 + return (did, CrawlCheckResult::Failed); 370 + } 371 + } 372 + } 373 + Err(e) => { 374 + error!( 375 + "failed to listRecords for repo {did}, signal {signal}: {e}" 376 + ); 377 + return (did, CrawlCheckResult::Failed); 378 + } 379 + } 380 + } 381 + 382 + if found_signal { 383 + (did, CrawlCheckResult::Signal) 384 + } else { 385 + trace!("crawler skipped repo {did}: no records match signals"); 386 + (did, CrawlCheckResult::NoSignal) 387 + } 388 + }); 389 + } 390 + 391 + while let Some(res) = set.join_next().await { 392 + let (did, result) = res.into_diagnostic()?; 393 + match result { 394 + CrawlCheckResult::Signal => { 395 + batch.remove(&db.crawler, keys::crawler_failed_key(&did)); 396 + valid.push(did); 397 + } 398 + CrawlCheckResult::NoSignal => { 399 + batch.remove(&db.crawler, keys::crawler_failed_key(&did)); 400 + } 401 + CrawlCheckResult::Ratelimited | CrawlCheckResult::Failed => { 402 + batch.insert(&db.crawler, keys::crawler_failed_key(&did), []); 403 + } 302 404 } 405 + } 303 406 304 - if cursor.is_none() { 305 - tokio::time::sleep(Duration::from_secs(3600)).await; 407 + Ok(valid) 408 + } 409 + 410 + async fn retry_failed_repos(&self, rng: &mut SmallRng) -> Result<()> { 411 + let db = &self.state.db; 412 + let filter = self.state.filter.load(); 413 + 414 + let check_signals = filter.mode == crate::filter::FilterMode::Filter 415 + && !filter.signals.is_empty() 416 + && !filter.has_glob_signals(); 417 + 418 + if !check_signals { 419 + return Ok(()); 420 + } 421 + 422 + let mut failed_dids = Vec::new(); 423 + for guard in db.crawler.prefix(keys::CRAWLER_FAILED_PREFIX) { 424 + let (key, _) = guard.into_inner().into_diagnostic()?; 425 + let did_bytes = &key[keys::CRAWLER_FAILED_PREFIX.len()..]; 426 + let trimmed = TrimmedDid::try_from(did_bytes)?; 427 + failed_dids.push(trimmed.to_did()); 428 + } 429 + 430 + if failed_dids.is_empty() { 431 + return Ok(()); 432 + } 433 + 434 + info!( 435 + "crawler: retrying {} previously failed repos", 436 + failed_dids.len() 437 + ); 438 + 439 + let mut batch = db.inner.batch(); 440 + let valid_dids = self 441 + .check_signals_batch(&failed_dids, &filter, &mut batch) 442 + .await?; 443 + 444 + for did in &valid_dids { 445 + let did_key = keys::repo_key(did); 446 + 447 + if Db::contains_key(db.repos.clone(), &did_key).await? { 448 + continue; 306 449 } 450 + 451 + let state = RepoState::backfilling(rng.next_u64()); 452 + batch.insert(&db.repos, &did_key, ser_repo_state(&state)?); 453 + batch.insert(&db.pending, keys::pending_key(state.index_id), &did_key); 307 454 } 455 + 456 + tokio::task::spawn_blocking(move || batch.commit().into_diagnostic()) 457 + .await 458 + .into_diagnostic()??; 459 + 460 + if !valid_dids.is_empty() { 461 + info!( 462 + "crawler: recovered {} repos from failed retry", 463 + valid_dids.len() 464 + ); 465 + self.account_new_repos(valid_dids.len()).await; 466 + } 467 + 468 + Ok(()) 469 + } 470 + 471 + async fn account_new_repos(&self, count: usize) { 472 + if count == 0 { 473 + return; 474 + } 475 + 476 + self.count.fetch_add(count, Ordering::Relaxed); 477 + self.state 478 + .db 479 + .update_count_async("repos", count as i64) 480 + .await; 481 + self.state 482 + .db 483 + .update_count_async("pending", count as i64) 484 + .await; 485 + self.state.notify_backfill(); 308 486 } 309 487 }
+10
src/db/keys.rs
··· 131 131 prefix.push(SEP); 132 132 prefix 133 133 } 134 + 135 + pub const CRAWLER_FAILED_PREFIX: &[u8] = &[b'f', SEP]; 136 + 137 + pub fn crawler_failed_key(did: &Did) -> Vec<u8> { 138 + let repo = TrimmedDid::from(did); 139 + let mut key = Vec::with_capacity(CRAWLER_FAILED_PREFIX.len() + repo.len()); 140 + key.extend_from_slice(CRAWLER_FAILED_PREFIX); 141 + repo.write_to_vec(&mut key); 142 + key 143 + }
+9
src/db/mod.rs
··· 33 33 pub events: Keyspace, 34 34 pub counts: Keyspace, 35 35 pub filter: Keyspace, 36 + pub crawler: Keyspace, 36 37 pub event_tx: broadcast::Sender<BroadcastEvent>, 37 38 pub next_event_id: Arc<AtomicU64>, 38 39 pub counts_map: HashMap<SmolStr, u64>, ··· 190 191 opts().data_block_size_policy(BlockSizePolicy::all(kb(1))), 191 192 )?; 192 193 194 + let crawler = open_ks( 195 + "crawler", 196 + opts() 197 + .expect_point_read_hits(true) 198 + .data_block_size_policy(BlockSizePolicy::all(kb(1))), 199 + )?; 200 + 193 201 let mut last_id = 0; 194 202 if let Some(guard) = events.iter().next_back() { 195 203 let k = guard.key().into_diagnostic()?; ··· 242 250 events, 243 251 counts, 244 252 filter, 253 + crawler, 245 254 event_tx, 246 255 counts_map, 247 256 next_event_id: Arc::new(AtomicU64::new(last_id + 1)),
+9 -5
src/main.rs
··· 9 9 use std::sync::Arc; 10 10 use std::sync::atomic::Ordering; 11 11 use tokio::{sync::mpsc, task::spawn_blocking}; 12 - use tracing::{error, info}; 12 + use tracing::{debug, error, info}; 13 13 14 14 #[global_allocator] 15 15 static GLOBAL: MiMalloc = MiMalloc; ··· 112 112 113 113 tokio::spawn({ 114 114 let state = state.clone(); 115 + let mut last_id = state.db.next_event_id.load(Ordering::Relaxed); 116 + let mut last_time = std::time::Instant::now(); 117 + let mut interval = tokio::time::interval(std::time::Duration::from_secs(60)); 115 118 async move { 116 - let mut last_id = state.db.next_event_id.load(Ordering::Relaxed); 117 - let mut last_time = std::time::Instant::now(); 118 - let mut interval = tokio::time::interval(std::time::Duration::from_secs(60)); 119 - 120 119 loop { 121 120 interval.tick().await; 122 121 ··· 124 123 let current_time = std::time::Instant::now(); 125 124 126 125 let delta = current_id.saturating_sub(last_id); 126 + if delta == 0 { 127 + debug!("no new events in 60s"); 128 + continue; 129 + } 130 + 127 131 let elapsed = current_time.duration_since(last_time).as_secs_f64(); 128 132 let rate = if elapsed > 0.0 { 129 133 delta as f64 / elapsed
+36 -27
src/resolver.rs
··· 47 47 } 48 48 } 49 49 50 + #[derive(Clone)] 51 + struct MiniDoc { 52 + pds: Url, 53 + handle: Option<Handle<'static>>, 54 + key: Option<PublicKey<'static>>, 55 + } 56 + 50 57 struct ResolverInner { 51 58 jacquards: Vec<JacquardResolver>, 52 59 next_idx: AtomicUsize, 53 - key_cache: HashCache<Did<'static>, PublicKey<'static>>, 60 + cache: HashCache<Did<'static>, MiniDoc>, 54 61 } 55 62 56 63 #[derive(Clone)] ··· 68 75 opts.plc_source = PlcSource::PlcDirectory { base: url }; 69 76 opts.request_timeout = Some(Duration::from_secs(3)); 70 77 71 - // no jacquard cache - we manage our own 72 78 jacquards.push(JacquardResolver::new(http.clone(), opts).with_system_dns()); 73 79 } 74 80 ··· 80 86 inner: Arc::new(ResolverInner { 81 87 jacquards, 82 88 next_idx: AtomicUsize::new(0), 83 - key_cache: HashCache::with_capacity( 89 + cache: HashCache::with_capacity( 84 90 std::cmp::min(1000, (identity_cache_size / 100) as usize), 85 91 identity_cache_size as usize, 86 92 ), ··· 132 138 } 133 139 } 134 140 135 - pub async fn resolve_identity_info( 136 - &self, 137 - did: &Did<'_>, 138 - ) -> Result<(Url, Option<Handle<'_>>), ResolverError> { 141 + async fn resolve_doc(&self, did: &Did<'_>) -> Result<MiniDoc, ResolverError> { 142 + let did_static = did.clone().into_static(); 143 + if let Some(entry) = self.inner.cache.get_async(&did_static).await { 144 + return Ok(entry.get().clone()); 145 + } 146 + 139 147 let doc_resp = self 140 148 .req(did.starts_with("did:plc:"), |j| j.resolve_did_doc(did)) 141 149 .await?; ··· 146 154 .ok_or_else(|| miette::miette!("no PDS service found in DID Doc for {did}"))?; 147 155 148 156 let mut handles = doc.handles(); 149 - let handle = handles.is_empty().not().then(|| handles.remove(0)); 157 + let handle = handles 158 + .is_empty() 159 + .not() 160 + .then(|| handles.remove(0).into_static()); 161 + let key = doc.atproto_public_key().ok().flatten(); 162 + 163 + let mini = MiniDoc { pds, handle, key }; 164 + let _ = self.inner.cache.put_async(did_static, mini.clone()).await; 165 + Ok(mini) 166 + } 150 167 151 - Ok((pds, handle)) 168 + pub async fn resolve_identity_info( 169 + &self, 170 + did: &Did<'_>, 171 + ) -> Result<(Url, Option<Handle<'static>>), ResolverError> { 172 + let mini = self.resolve_doc(did).await?; 173 + Ok((mini.pds, mini.handle)) 152 174 } 153 175 154 176 pub async fn resolve_signing_key( ··· 156 178 did: &Did<'_>, 157 179 ) -> Result<PublicKey<'static>, ResolverError> { 158 180 let did = did.clone().into_static(); 159 - if let Some(entry) = self.inner.key_cache.get_async(&did).await { 160 - return Ok(entry.get().clone()); 161 - } 162 - 163 - let doc_resp = self 164 - .req(did.starts_with("did:plc:"), |j| j.resolve_did_doc(&did)) 165 - .await?; 166 - let doc = doc_resp.parse()?; 167 - 168 - let key = doc 169 - .atproto_public_key() 170 - .into_diagnostic()? 171 - .ok_or_else(|| NoSigningKeyError(did.clone())) 172 - .into_diagnostic()?; 173 - 174 - let _ = self.inner.key_cache.put_async(did, key.clone()).await; 175 - 176 - Ok(key) 181 + let mini = self.resolve_doc(&did).await?; 182 + Ok(mini 183 + .key 184 + .ok_or_else(|| NoSigningKeyError(did)) 185 + .into_diagnostic()?) 177 186 } 178 187 } 179 188