···23smol_str = "0.3"
24futures = "0.3"
25reqwest = { version = "0.12", features = ["json", "rustls-tls", "stream", "gzip", "brotli", "zstd", "http2"], default-features = false }
00026axum = { version = "0.8.8", features = ["ws", "macros"] }
27tower-http = { version = "0.6.6", features = ["cors", "trace"] }
28tokio-stream = "0.1"
···49glob = "0.3"
50ordermap = { version = "1.1.0", features = ["serde"] }
51arc-swap = "1.8.2"
05253[dev-dependencies]
54tempfile = "3.26.0"
···23smol_str = "0.3"
24futures = "0.3"
25reqwest = { version = "0.12", features = ["json", "rustls-tls", "stream", "gzip", "brotli", "zstd", "http2"], default-features = false }
26+reqwest-client = { package = "reqwest", version = "0.13.2", features = ["json", "rustls", "stream", "gzip", "brotli", "zstd", "http2"], default-features = false }
27+reqwest-middleware = { version = "0.5.1", default-features = false, features = ["http2", "rustls"] }
28+reqwest-retry = { version = "0.9.1" }
29axum = { version = "0.8.8", features = ["ws", "macros"] }
30tower-http = { version = "0.6.6", features = ["cors", "trace"] }
31tokio-stream = "0.1"
···52glob = "0.3"
53ordermap = { version = "1.1.0", features = ["serde"] }
54arc-swap = "1.8.2"
55+rustls = { version = "0.23", features = ["ring"] }
5657[dev-dependencies]
58tempfile = "3.26.0"
+1-1
README.md
···42| `NO_LZ4_COMPRESSION` | `false` | disable lz4 compression for storage. |
43| `ENABLE_FIREHOSE` | `true` | whether to ingest relay subscriptions. |
44| `ENABLE_BACKFILL` | `true` | whether to backfill from PDS instances. |
45-| `ENABLE_CRAWLER` | `false` (if Filter), `true` (if Full) | whether to actively query the network for unknown repositories. |
46| `DB_WORKER_THREADS` | `4` (`8` if full network) | database worker threads. |
47| `DB_MAX_JOURNALING_SIZE_MB` | `512` (`1024` if full network) | max database journaling size in MB. |
48| `DB_PENDING_MEMTABLE_SIZE_MB` | `64` (`192` if full network) | pending memtable size in MB. |
···42| `NO_LZ4_COMPRESSION` | `false` | disable lz4 compression for storage. |
43| `ENABLE_FIREHOSE` | `true` | whether to ingest relay subscriptions. |
44| `ENABLE_BACKFILL` | `true` | whether to backfill from PDS instances. |
45+| `ENABLE_CRAWLER` | `false` (if Filter), `true` (if Full) | whether to actively query the network for unknown repositories. when in `Filter` mode without wildcard (`*`) signals, the crawler uses `com.atproto.repo.listRecords` to verify if a discovered repository has matching records before queuing it for backfill, this will be a lot faster usually since most repos will get filtered out faster. |
46| `DB_WORKER_THREADS` | `4` (`8` if full network) | database worker threads. |
47| `DB_MAX_JOURNALING_SIZE_MB` | `512` (`1024` if full network) | max database journaling size in MB. |
48| `DB_PENDING_MEMTABLE_SIZE_MB` | `64` (`192` if full network) | pending memtable size in MB. |
+151-36
src/crawler/mod.rs
···1use crate::db::{Db, keys, ser_repo_state};
2use crate::state::AppState;
3use crate::types::RepoState;
4-use jacquard::api::com_atproto::sync::list_repos::{ListRepos, ListReposOutput};
5-use jacquard::prelude::*;
6-use jacquard_common::CowStr;
07use miette::{IntoDiagnostic, Result};
8use rand::Rng;
9use rand::rngs::SmallRng;
00010use smol_str::SmolStr;
11use std::sync::Arc;
12use std::time::Duration;
···16pub struct Crawler {
17 state: Arc<AppState>,
18 relay_host: Url,
19- http: reqwest::Client,
20 max_pending: usize,
21 resume_pending: usize,
22}
···28 max_pending: usize,
29 resume_pending: usize,
30 ) -> Self {
00000000000000000031 Self {
32 state,
33 relay_host,
34- http: reqwest::Client::new(),
35 max_pending,
36 resume_pending,
37 }
···40 pub async fn run(self) -> Result<()> {
41 info!("crawler started");
420000000000043 let mut rng: SmallRng = rand::make_rng();
4445 let db = &self.state.db;
4647 // 1. load cursor
48 let cursor_key = b"crawler_cursor";
49- let mut cursor: Option<SmolStr> =
50- if let Ok(Some(bytes)) = Db::get(db.cursors.clone(), cursor_key.to_vec()).await {
051 let s = String::from_utf8_lossy(&bytes);
52 info!("resuming crawler from cursor: {}", s);
53- Some(s.into())
54- } else {
55- None
56- };
57 let mut was_throttled = false;
5859 loop {
···97 }
9899 // 2. fetch listrepos
100- let req = ListRepos::new()
101- .limit(1000)
102- .maybe_cursor(cursor.clone().map(|c| CowStr::from(c.to_string())))
103- .build();
104-105- let mut url = self.relay_host.clone();
106- if url.scheme() == "wss" {
107- url.set_scheme("https")
108- .map_err(|_| miette::miette!("invalid url: {url}"))?;
109- } else if url.scheme() == "ws" {
110- url.set_scheme("http")
111- .map_err(|_| miette::miette!("invalid url: {url}"))?;
112 }
113- let res_result = self.http.xrpc(url).send(&req).await;
114115- let output: ListReposOutput = match res_result {
116- Ok(res) => res.into_output().into_diagnostic()?,
0000000000117 Err(e) => {
118- let e = e
119- .source_err()
120- .map(|e| e.to_string())
121- .unwrap_or_else(|| e.to_string());
122 error!("crawler failed to list repos: {e}. retrying in 30s...");
123 tokio::time::sleep(Duration::from_secs(30)).await;
124 continue;
125 }
126 };
000127128 if output.repos.is_empty() {
129 info!("crawler finished enumeration (or empty page). sleeping for 1 hour.");
···135136 let mut batch = db.inner.batch();
137 let mut to_queue = Vec::new();
0000000138139 // 3. process repos
0140 for repo in output.repos {
141- let did_key = keys::repo_key(&repo.did);
0142143 let excl_key = crate::db::filter::exclude_key(repo.did.as_str())?;
144 if db.filter.contains_key(&excl_key).into_diagnostic()? {
···147148 // check if known
149 if !Db::contains_key(db.repos.clone(), &did_key).await? {
150- trace!("crawler found new repo: {}", repo.did);
0000000000000000000000000000000000000000000151152- let state = RepoState::backfilling(rng.next_u64());
153- batch.insert(&db.repos, &did_key, ser_repo_state(&state)?);
154- batch.insert(&db.pending, keys::pending_key(state.index_id), &did_key);
155- to_queue.push(repo.did.clone());
00000156 }
00000000000000000000157 }
158159 // 4. update cursor
···1use crate::db::{Db, keys, ser_repo_state};
2use crate::state::AppState;
3use crate::types::RepoState;
4+use jacquard::IntoStatic;
5+use jacquard_api::com_atproto::repo::list_records::ListRecordsOutput;
6+use jacquard_api::com_atproto::sync::list_repos::ListReposOutput;
7+use jacquard_common::types::string::Did;
8use miette::{IntoDiagnostic, Result};
9use rand::Rng;
10use rand::rngs::SmallRng;
11+use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
12+use reqwest_retry::Jitter;
13+use reqwest_retry::{RetryTransientMiddleware, policies::ExponentialBackoff};
14use smol_str::SmolStr;
15use std::sync::Arc;
16use std::time::Duration;
···20pub struct Crawler {
21 state: Arc<AppState>,
22 relay_host: Url,
23+ http: Arc<ClientWithMiddleware>,
24 max_pending: usize,
25 resume_pending: usize,
26}
···32 max_pending: usize,
33 resume_pending: usize,
34 ) -> Self {
35+ let retry_policy = ExponentialBackoff::builder()
36+ .jitter(Jitter::Bounded)
37+ .build_with_max_retries(8);
38+ let reqwest_client = reqwest_client::Client::builder()
39+ .user_agent(concat!(
40+ env!("CARGO_PKG_NAME"),
41+ "/",
42+ env!("CARGO_PKG_VERSION")
43+ ))
44+ .gzip(true)
45+ .build()
46+ .expect("that reqwest will build");
47+48+ let http = ClientBuilder::new(reqwest_client)
49+ .with(RetryTransientMiddleware::new_with_policy(retry_policy))
50+ .build();
51+ let http = Arc::new(http);
52+53 Self {
54 state,
55 relay_host,
56+ http,
57 max_pending,
58 resume_pending,
59 }
···62 pub async fn run(self) -> Result<()> {
63 info!("crawler started");
6465+ let mut api_url = self.relay_host.clone();
66+ if api_url.scheme() == "wss" {
67+ api_url
68+ .set_scheme("https")
69+ .map_err(|_| miette::miette!("invalid url: {api_url}"))?;
70+ } else if api_url.scheme() == "ws" {
71+ api_url
72+ .set_scheme("http")
73+ .map_err(|_| miette::miette!("invalid url: {api_url}"))?;
74+ }
75+76 let mut rng: SmallRng = rand::make_rng();
7778 let db = &self.state.db;
7980 // 1. load cursor
81 let cursor_key = b"crawler_cursor";
82+ let mut cursor: Option<SmolStr> = Db::get(db.cursors.clone(), cursor_key.to_vec())
83+ .await?
84+ .map(|bytes| {
85 let s = String::from_utf8_lossy(&bytes);
86 info!("resuming crawler from cursor: {}", s);
87+ s.into()
88+ });
0089 let mut was_throttled = false;
9091 loop {
···129 }
130131 // 2. fetch listrepos
132+ let mut list_repos_url = api_url
133+ .join("/xrpc/com.atproto.sync.listRepos")
134+ .into_diagnostic()?;
135+ list_repos_url
136+ .query_pairs_mut()
137+ .append_pair("limit", "1000");
138+ if let Some(c) = &cursor {
139+ list_repos_url
140+ .query_pairs_mut()
141+ .append_pair("cursor", c.as_str());
00142 }
0143144+ let res_result = self.http.get(list_repos_url.clone()).send().await;
145+ let bytes = match res_result {
146+ Ok(res) => match res.bytes().await {
147+ Ok(b) => b,
148+ Err(e) => {
149+ error!(
150+ "crawler failed to parse list repos response: {e}. retrying in 30s..."
151+ );
152+ tokio::time::sleep(Duration::from_secs(30)).await;
153+ continue;
154+ }
155+ },
156 Err(e) => {
0000157 error!("crawler failed to list repos: {e}. retrying in 30s...");
158 tokio::time::sleep(Duration::from_secs(30)).await;
159 continue;
160 }
161 };
162+ let output = serde_json::from_slice::<ListReposOutput>(&bytes)
163+ .into_diagnostic()?
164+ .into_static();
165166 if output.repos.is_empty() {
167 info!("crawler finished enumeration (or empty page). sleeping for 1 hour.");
···173174 let mut batch = db.inner.batch();
175 let mut to_queue = Vec::new();
176+ let filter = self.state.filter.load();
177+ // we can check whether or not to backfill repos faster if we only have to check
178+ // certain known signals, since we can just listRecords for those signals
179+ // if we have glob signals we cant do this since we dont know what signals to check
180+ let check_signals = filter.mode == crate::filter::FilterMode::Filter
181+ && !filter.signals.is_empty()
182+ && !filter.has_glob_signals();
183184 // 3. process repos
185+ let mut unknown_repos = Vec::new();
186 for repo in output.repos {
187+ let parsed_did: Did = repo.did.parse().unwrap();
188+ let did_key = keys::repo_key(&parsed_did);
189190 let excl_key = crate::db::filter::exclude_key(repo.did.as_str())?;
191 if db.filter.contains_key(&excl_key).into_diagnostic()? {
···194195 // check if known
196 if !Db::contains_key(db.repos.clone(), &did_key).await? {
197+ unknown_repos.push(repo);
198+ }
199+ }
200+201+ let mut valid_repos = Vec::new();
202+ if check_signals && !unknown_repos.is_empty() {
203+ let mut set = tokio::task::JoinSet::new();
204+ for repo in unknown_repos {
205+ let http = self.http.clone();
206+ let api_url = api_url.clone();
207+ let filter = filter.clone();
208+ set.spawn(async move {
209+ let mut found_signal = false;
210+ for signal in filter.signals.iter() {
211+ let mut list_records_url =
212+ api_url.join("/xrpc/com.atproto.repo.listRecords").unwrap();
213+ list_records_url
214+ .query_pairs_mut()
215+ .append_pair("repo", &repo.did)
216+ .append_pair("collection", signal)
217+ .append_pair("limit", "1");
218+219+ match http.get(list_records_url).send().await {
220+ Ok(res) => {
221+ let Ok(bytes) = res.bytes().await else {
222+ error!("failed to read bytes from listRecords response for repo {}, signal {signal}", repo.did);
223+ continue;
224+ };
225+ if let Ok(out) = serde_json::from_slice::<ListRecordsOutput>(&bytes) {
226+ if !out.records.is_empty() {
227+ found_signal = true;
228+ break;
229+ }
230+ }
231+ }
232+ Err(e) => {
233+ error!(
234+ "failed to listRecords for repo {}, signal {signal}: {e}",
235+ repo.did
236+ );
237+ continue;
238+ }
239+ }
240+ }
241242+ if !found_signal {
243+ trace!(
244+ "crawler skipped repo {}: no records match signals",
245+ repo.did
246+ );
247+ }
248+249+ (repo, found_signal)
250+ });
251 }
252+253+ while let Some(res) = set.join_next().await {
254+ let (repo, found_signal) = res.into_diagnostic()?;
255+ if found_signal {
256+ valid_repos.push(repo);
257+ }
258+ }
259+ } else {
260+ valid_repos = unknown_repos;
261+ }
262+263+ for repo in valid_repos {
264+ let parsed_did: Did = repo.did.parse().unwrap();
265+ let did_key = keys::repo_key(&parsed_did);
266+ trace!("crawler found new repo: {}", repo.did);
267+268+ let state = RepoState::backfilling(rng.next_u64());
269+ batch.insert(&db.repos, &did_key, ser_repo_state(&state)?);
270+ batch.insert(&db.pending, keys::pending_key(state.index_id), &did_key);
271+ to_queue.push(repo.did.clone());
272 }
273274 // 4. update cursor
+12-4
src/db/filter.rs
···1use fjall::{Keyspace, OwnedWriteBatch};
002use miette::{IntoDiagnostic, Result};
34use crate::db::types::TrimmedDid;
···111 for guard in ks.prefix(signal_prefix) {
112 let (k, _) = guard.into_inner().into_diagnostic()?;
113 let val = std::str::from_utf8(&k[signal_prefix.len()..]).into_diagnostic()?;
114- config.signals.push(smol_str::SmolStr::new(val));
115 }
116117 let col_prefix = [COLLECTION_PREFIX, SEP];
118 for guard in ks.prefix(col_prefix) {
119 let (k, _) = guard.into_inner().into_diagnostic()?;
120 let val = std::str::from_utf8(&k[col_prefix.len()..]).into_diagnostic()?;
121- config.collections.push(smol_str::SmolStr::new(val));
122 }
123124 Ok(config)
···196197 let config = load(&ks)?;
198 assert_eq!(config.mode, FilterMode::Filter);
199- assert_eq!(config.signals, vec!["a.b.c"]);
200- assert_eq!(config.collections, vec!["d.e.f"]);
000000201202 let excludes = read_set(&ks, EXCLUDE_PREFIX)?;
203 assert_eq!(excludes, vec!["did:plc:yk4q3id7id6p5z3bypvshc64"]);
···1use fjall::{Keyspace, OwnedWriteBatch};
2+use jacquard::IntoStatic;
3+use jacquard::types::nsid::Nsid;
4use miette::{IntoDiagnostic, Result};
56use crate::db::types::TrimmedDid;
···113 for guard in ks.prefix(signal_prefix) {
114 let (k, _) = guard.into_inner().into_diagnostic()?;
115 let val = std::str::from_utf8(&k[signal_prefix.len()..]).into_diagnostic()?;
116+ config.signals.push(Nsid::new(val)?.into_static());
117 }
118119 let col_prefix = [COLLECTION_PREFIX, SEP];
120 for guard in ks.prefix(col_prefix) {
121 let (k, _) = guard.into_inner().into_diagnostic()?;
122 let val = std::str::from_utf8(&k[col_prefix.len()..]).into_diagnostic()?;
123+ config.collections.push(Nsid::new(val)?.into_static());
124 }
125126 Ok(config)
···198199 let config = load(&ks)?;
200 assert_eq!(config.mode, FilterMode::Filter);
201+ assert_eq!(
202+ config.signals,
203+ vec![Nsid::new("a.b.c").unwrap().into_static()]
204+ );
205+ assert_eq!(
206+ config.collections,
207+ vec![Nsid::new("d.e.f").unwrap().into_static()]
208+ );
209210 let excludes = read_set(&ks, EXCLUDE_PREFIX)?;
211 assert_eq!(excludes, vec!["did:plc:yk4q3id7id6p5z3bypvshc64"]);
+7-8
src/filter.rs
···01use serde::{Deserialize, Serialize};
2-use smol_str::SmolStr;
3use std::sync::Arc;
45pub type FilterHandle = Arc<arc_swap::ArcSwap<FilterConfig>>;
···25 Full = 2,
26}
2728-/// hot-path in-memory config: only the small fields needed on every event.
29-/// dids and excludes are large sets kept in the filter keyspace only.
30#[derive(Debug, Clone, Serialize)]
31pub struct FilterConfig {
32 pub mode: FilterMode,
33- pub signals: Vec<SmolStr>,
34- pub collections: Vec<SmolStr>,
35}
3637impl FilterConfig {
···43 }
44 }
4546- /// returns true if the collection matches the content filter.
47- /// if collections is empty, all collections match.
48 pub fn matches_collection(&self, collection: &str) -> bool {
49 if self.collections.is_empty() {
50 return true;
···52 self.collections.iter().any(|p| nsid_matches(p, collection))
53 }
5455- /// returns true if the commit touches a collection covered by a signal.
56 pub fn matches_signal(&self, collection: &str) -> bool {
57 self.signals.iter().any(|p| nsid_matches(p, collection))
000058 }
59}
60