at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall

[crawler] ban PDSes if they are unreachable

ptr.pet 106ab406 e2d47ab1

verified
+221 -32
+1 -1
Cargo.toml
··· 36 36 37 37 mimalloc = { version = "0.1", features = ["v3"] } 38 38 hex = "0.4" 39 - scc = "3" 39 + scc = "3.6.6" 40 40 data-encoding = "2.10.0" 41 41 cid = "0.11.1" 42 42 thiserror = "2.0.18"
+98
src/crawler/ban.rs
··· 1 + use scc::HashMap; 2 + use std::sync::Arc; 3 + use std::sync::atomic::{AtomicI64, AtomicUsize, Ordering}; 4 + use tokio::sync::Notify; 5 + use url::Url; 6 + 7 + #[derive(Clone)] 8 + pub struct BanTracker { 9 + states: Arc<HashMap<Url, Arc<State>>>, 10 + } 11 + 12 + impl BanTracker { 13 + pub fn new() -> Self { 14 + Self { 15 + states: Arc::new(HashMap::new()), 16 + } 17 + } 18 + 19 + pub fn get_handle(&self, url: &Url) -> BanHandle { 20 + let state = self 21 + .states 22 + .entry_sync(url.clone()) 23 + .or_insert_with(|| { 24 + Arc::new(State { 25 + banned_until: AtomicI64::new(0), 26 + consecutive_failures: AtomicUsize::new(0), 27 + ban_notify: Notify::new(), 28 + }) 29 + }) 30 + .get() 31 + .clone(); 32 + 33 + BanHandle { state } 34 + } 35 + } 36 + 37 + struct State { 38 + banned_until: AtomicI64, 39 + consecutive_failures: AtomicUsize, 40 + ban_notify: Notify, 41 + } 42 + 43 + pub struct BanHandle { 44 + state: Arc<State>, 45 + } 46 + 47 + impl BanHandle { 48 + pub fn is_banned(&self) -> bool { 49 + let until = self.state.banned_until.load(Ordering::Acquire); 50 + if until == 0 { 51 + return false; 52 + } 53 + let now = chrono::Utc::now().timestamp(); 54 + now < until 55 + } 56 + 57 + pub fn record_success(&self) { 58 + self.state.consecutive_failures.store(0, Ordering::Release); 59 + self.state.banned_until.store(0, Ordering::Release); 60 + } 61 + 62 + // returns the amount of minutes banned if its a new ban 63 + pub fn record_failure(&self) -> Option<i64> { 64 + if self.is_banned() { 65 + return None; 66 + } 67 + 68 + let failures = self 69 + .state 70 + .consecutive_failures 71 + .fetch_add(1, Ordering::AcqRel) 72 + + 1; 73 + 74 + // start with 30 minutes, double each consecutive failure 75 + let base_minutes = 30; 76 + let exponent = (failures as u32).saturating_sub(1); 77 + let minutes = base_minutes * 2i64.pow(exponent.min(10)); 78 + let now = chrono::Utc::now().timestamp(); 79 + 80 + self.state 81 + .banned_until 82 + .store(now + minutes * 60, Ordering::Release); 83 + 84 + self.state.ban_notify.notify_waiters(); 85 + 86 + Some(minutes) 87 + } 88 + 89 + pub async fn wait_for_ban(&self) { 90 + loop { 91 + let notified = self.state.ban_notify.notified(); 92 + if self.is_banned() { 93 + return; 94 + } 95 + notified.await; 96 + } 97 + } 98 + }
+113 -19
src/crawler/mod.rs
··· 67 67 } 68 68 } 69 69 70 + // these two are cloudflare specific 71 + const CONNECTION_TIMEOUT: StatusCode = unsafe { 72 + match StatusCode::from_u16(522) { 73 + Ok(s) => s, 74 + _ => std::hint::unreachable_unchecked(), // status code is valid 75 + } 76 + }; 77 + const SITE_FROZEN: StatusCode = unsafe { 78 + match StatusCode::from_u16(530) { 79 + Ok(s) => s, 80 + _ => std::hint::unreachable_unchecked(), // status code is valid 81 + } 82 + }; 83 + 84 + // we ban on: 85 + // - timeouts 86 + // - tls cert errors 87 + // - bad gateway / gateway timeout, service unavailable, 522 and 530 88 + fn is_ban_worthy(e: &reqwest::Error) -> bool { 89 + use std::error::Error; 90 + 91 + if e.is_timeout() { 92 + return true; 93 + } 94 + 95 + let mut src = e.source(); 96 + while let Some(s) = src { 97 + if let Some(io_err) = s.downcast_ref::<std::io::Error>() { 98 + if is_tls_cert_error(io_err) { 99 + return true; 100 + } 101 + } 102 + src = s.source(); 103 + } 104 + 105 + e.status().map_or(false, |s| { 106 + matches!( 107 + s, 108 + StatusCode::BAD_GATEWAY 109 + | StatusCode::SERVICE_UNAVAILABLE 110 + | StatusCode::GATEWAY_TIMEOUT 111 + | CONNECTION_TIMEOUT 112 + | SITE_FROZEN 113 + ) 114 + }) 115 + } 116 + 117 + fn is_tls_cert_error(io_err: &std::io::Error) -> bool { 118 + let Some(inner) = io_err.get_ref() else { 119 + return false; 120 + }; 121 + if let Some(rustls_err) = inner.downcast_ref::<rustls::Error>() { 122 + return matches!(rustls_err, rustls::Error::InvalidCertificate(_)); 123 + } 124 + if let Some(nested_io) = inner.downcast_ref::<std::io::Error>() { 125 + return is_tls_cert_error(nested_io); 126 + } 127 + false 128 + } 129 + 70 130 async fn check_repo_signals( 71 131 http: Arc<reqwest::Client>, 72 132 resolver: crate::resolver::Resolver, 73 133 filter: Arc<crate::filter::FilterConfig>, 74 134 did: Did<'static>, 135 + tracker: Arc<BanTracker>, 75 136 ) -> (Did<'static>, CrawlCheckResult) { 76 137 const MAX_RETRIES: u32 = 5; 77 138 let mut rng: SmallRng = rand::make_rng(); ··· 97 158 } 98 159 }; 99 160 161 + let pds_handle = tracker.get_handle(&pds_url); 162 + if pds_handle.is_banned() { 163 + trace!(host = pds_url.host_str(), "skipping banned pds"); 164 + return (did, CrawlCheckResult::Failed(None)); 165 + } 166 + 167 + enum RequestError { 168 + Reqwest(reqwest::Error), 169 + Banned, 170 + } 171 + 100 172 let mut found_signal = false; 101 173 for signal in filter.signals.iter() { 102 174 let res = async { ··· 111 183 &mut rng, 112 184 MAX_RETRIES, 113 185 || async { 114 - http.get(list_records_url.clone()) 115 - .send() 116 - .await? 117 - .error_for_status() 186 + let req = http.get(list_records_url.clone()).send(); 187 + tokio::select! { 188 + res = req => res.and_then(|r| r.error_for_status()).map_err(RequestError::Reqwest), 189 + _ = pds_handle.wait_for_ban() => Err(RequestError::Banned), 190 + } 118 191 }, 119 - |e: &reqwest::Error| e.status() == Some(StatusCode::TOO_MANY_REQUESTS), 192 + |e: &RequestError| { 193 + matches!(e, RequestError::Reqwest(e) if matches!(e.status(), Some(StatusCode::TOO_MANY_REQUESTS))) 194 + }, 120 195 ); 121 196 let res = match res.await { 122 - Ok(r) => r, 197 + Ok(r) => { 198 + pds_handle.record_success(); 199 + r 200 + } 123 201 Err(RetryOutcome::Ratelimited) => { 124 202 warn!( 125 203 retries = MAX_RETRIES, ··· 127 205 ); 128 206 return CrawlCheckResult::Ratelimited; 129 207 } 130 - Err(RetryOutcome::Failed(e)) => { 131 - match e.status() { 132 - Some(StatusCode::NOT_FOUND | StatusCode::GONE) => { 133 - trace!("repo not found"); 208 + Err(RetryOutcome::Failed(e)) => match e { 209 + RequestError::Banned => return CrawlCheckResult::Failed(None), 210 + RequestError::Reqwest(e) => { 211 + if is_ban_worthy(&e) { 212 + if let Some(mins) = pds_handle.record_failure() { 213 + tracing::warn!(url = %pds_url, mins, "banned pds"); 214 + } 215 + return CrawlCheckResult::Failed(e.status().map(|s| s.as_u16())); 134 216 } 135 - Some(s) if s.is_client_error() => { 136 - error!(status = %s, "repo unavailable"); 217 + 218 + match e.status() { 219 + Some(StatusCode::NOT_FOUND | StatusCode::GONE) => { 220 + trace!("repo not found"); 221 + } 222 + Some(s) if s.is_client_error() => { 223 + error!(status = %s, "repo unavailable"); 224 + } 225 + _ => { 226 + error!(err = %e, "listRecords failed"); 227 + return CrawlCheckResult::Failed(e.status().map(|s| s.as_u16())); 228 + } 137 229 } 138 - _ => { 139 - error!(err = %e, "listRecords failed"); 140 - return CrawlCheckResult::Failed(e.status().map(|s| s.as_u16())); 141 - } 230 + return CrawlCheckResult::NoSignal; 142 231 } 143 - return CrawlCheckResult::NoSignal; 144 - } 232 + }, 145 233 }; 146 234 147 235 let bytes = match res.bytes().await { ··· 195 283 ) 196 284 } 197 285 286 + pub mod ban; 287 + use ban::BanTracker; 288 + 198 289 pub struct Crawler { 199 290 state: Arc<AppState>, 200 291 relay_host: Url, ··· 203 294 resume_pending: usize, 204 295 count: Arc<AtomicUsize>, 205 296 crawled_count: Arc<AtomicUsize>, 297 + tracker: Arc<BanTracker>, 206 298 } 207 299 208 300 impl Crawler { ··· 232 324 resume_pending, 233 325 count: Arc::new(AtomicUsize::new(0)), 234 326 crawled_count: Arc::new(AtomicUsize::new(0)), 327 + tracker: Arc::new(BanTracker::new()), 235 328 } 236 329 } 237 330 ··· 477 570 let http = self.http.clone(); 478 571 let resolver = self.state.resolver.clone(); 479 572 let filter = filter.clone(); 573 + let tracker = self.tracker.clone(); 480 574 let span = tracing::info_span!("check_signals", did = %did); 481 - set.spawn(check_repo_signals(http, resolver, filter, did).instrument(span)); 575 + set.spawn(check_repo_signals(http, resolver, filter, did, tracker).instrument(span)); 482 576 } 483 577 484 578 while let Some(res) = set.join_next().await {
+9 -12
src/ops.rs
··· 1 1 use fjall::OwnedWriteBatch; 2 + use jacquard_common::CowStr; 3 + use jacquard_common::IntoStatic; 2 4 use jacquard_common::types::cid::Cid; 3 5 use jacquard_common::types::crypto::PublicKey; 4 6 use jacquard_common::types::did::Did; 5 - use jacquard_common::CowStr; 6 - use jacquard_common::IntoStatic; 7 7 use jacquard_repo::car::reader::parse_car_bytes; 8 8 use miette::{Context, IntoDiagnostic, Result}; 9 - use rand::{rng, Rng}; 9 + use rand::{Rng, rng}; 10 10 use std::collections::HashMap; 11 11 use std::sync::atomic::Ordering; 12 12 use std::time::Instant; 13 13 use tracing::{debug, trace}; 14 14 15 15 use crate::db::types::{DbAction, DbRkey, DbTid, TrimmedDid}; 16 - use crate::db::{self, keys, ser_repo_state, Db}; 16 + use crate::db::{self, Db, keys, ser_repo_state}; 17 17 use crate::filter::FilterConfig; 18 18 use crate::ingest::stream::Commit; 19 19 use crate::types::{ ··· 282 282 let Some(cid) = &op.cid else { 283 283 continue; 284 284 }; 285 - let cid_ipld = cid.to_ipld() 285 + let cid_ipld = cid 286 + .to_ipld() 286 287 .into_diagnostic() 287 288 .wrap_err("expected valid cid from relay")?; 288 - 289 + 289 290 if let Some(bytes) = parsed.blocks.get(&cid_ipld) { 290 291 batch.insert(&db.blocks, cid_ipld.to_bytes(), bytes.to_vec()); 291 292 blocks_count += 1; 292 293 } 293 - 294 - batch.insert( 295 - &db.records, 296 - db_key.clone(), 297 - cid_ipld.to_bytes(), 298 - ); 294 + 295 + batch.insert(&db.records, db_key.clone(), cid_ipld.to_bytes()); 299 296 300 297 // accumulate counts 301 298 if action == DbAction::Create {