forked from
ptr.pet/hydrant
kind of like tap but different and in rust
1use crate::db::{Db, keys, ser_repo_state};
2use crate::filter::FilterMode;
3use crate::state::AppState;
4use crate::types::RepoState;
5use jacquard::api::com_atproto::sync::list_repos::{ListRepos, ListReposOutput};
6use jacquard::prelude::*;
7use jacquard_common::CowStr;
8use miette::{IntoDiagnostic, Result};
9use rand::Rng;
10use rand::rngs::SmallRng;
11use smol_str::SmolStr;
12use std::sync::Arc;
13use std::time::Duration;
14use tracing::{debug, error, info, trace};
15use url::Url;
16
17pub struct Crawler {
18 state: Arc<AppState>,
19 relay_host: Url,
20 http: reqwest::Client,
21 max_pending: usize,
22 resume_pending: usize,
23}
24
25impl Crawler {
26 pub fn new(
27 state: Arc<AppState>,
28 relay_host: Url,
29 max_pending: usize,
30 resume_pending: usize,
31 ) -> Self {
32 Self {
33 state,
34 relay_host,
35 http: reqwest::Client::new(),
36 max_pending,
37 resume_pending,
38 }
39 }
40
41 pub async fn run(self) -> Result<()> {
42 info!("crawler started");
43
44 let mut rng: SmallRng = rand::make_rng();
45
46 let db = &self.state.db;
47
48 // 1. load cursor
49 let cursor_key = b"crawler_cursor";
50 let mut cursor: Option<SmolStr> =
51 if let Ok(Some(bytes)) = Db::get(db.cursors.clone(), cursor_key.to_vec()).await {
52 let s = String::from_utf8_lossy(&bytes);
53 info!("resuming crawler from cursor: {}", s);
54 Some(s.into())
55 } else {
56 None
57 };
58 let mut was_throttled = false;
59
60 loop {
61 // check throttling
62 loop {
63 let pending = self.state.db.get_count("pending").await;
64 if pending > self.max_pending as u64 {
65 if !was_throttled {
66 debug!(
67 "crawler throttling: pending repos {} > max {}, sleeping...",
68 pending, self.max_pending
69 );
70 was_throttled = true;
71 }
72 tokio::time::sleep(Duration::from_secs(10)).await;
73 } else if pending > self.resume_pending as u64 {
74 if !was_throttled {
75 debug!(
76 "crawler throttling: pending repos {} > max {}, entering cooldown...",
77 pending, self.max_pending
78 );
79 was_throttled = true;
80 }
81
82 while self.state.db.get_count("pending").await > self.resume_pending as u64 {
83 debug!(
84 "crawler cooldown: pending repos {} > resume {}, sleeping...",
85 self.state.db.get_count("pending").await,
86 self.resume_pending
87 );
88 tokio::time::sleep(Duration::from_secs(10)).await;
89 }
90 break;
91 } else {
92 if was_throttled {
93 info!("crawler resuming: throttling released");
94 was_throttled = false;
95 }
96 break;
97 }
98 }
99
100 // 2. fetch listrepos
101 let req = ListRepos::new()
102 .limit(1000)
103 .maybe_cursor(cursor.clone().map(|c| CowStr::from(c.to_string())))
104 .build();
105
106 let mut url = self.relay_host.clone();
107 if url.scheme() == "wss" {
108 url.set_scheme("https")
109 .map_err(|_| miette::miette!("invalid url: {url}"))?;
110 } else if url.scheme() == "ws" {
111 url.set_scheme("http")
112 .map_err(|_| miette::miette!("invalid url: {url}"))?;
113 }
114 let res_result = self.http.xrpc(url).send(&req).await;
115
116 let output: ListReposOutput = match res_result {
117 Ok(res) => res.into_output().into_diagnostic()?,
118 Err(e) => {
119 let e = e
120 .source_err()
121 .map(|e| e.to_string())
122 .unwrap_or_else(|| e.to_string());
123 error!("crawler failed to list repos: {e}. retrying in 30s...");
124 tokio::time::sleep(Duration::from_secs(30)).await;
125 continue;
126 }
127 };
128
129 if output.repos.is_empty() {
130 info!("crawler finished enumeration (or empty page). sleeping for 1 hour.");
131 tokio::time::sleep(Duration::from_secs(3600)).await;
132 continue;
133 }
134
135 debug!("crawler fetched {} repos...", output.repos.len());
136
137 let mut batch = db.inner.batch();
138 let mut to_queue = Vec::new();
139
140 let filter = self.state.filter.load();
141
142 // 3. process repos
143 for repo in output.repos {
144 let did_key = keys::repo_key(&repo.did);
145
146 let excl_key =
147 crate::filter::filter_key(crate::filter::EXCLUDE_PREFIX, repo.did.as_str());
148 if db.filter.contains_key(&excl_key).into_diagnostic()? {
149 continue;
150 }
151
152 if filter.mode != FilterMode::Full {
153 let did_filter_key =
154 crate::filter::filter_key(crate::filter::DID_PREFIX, repo.did.as_str());
155 if !db.filter.contains_key(&did_filter_key).into_diagnostic()? {
156 continue;
157 }
158 }
159
160 // check if known
161 if !Db::contains_key(db.repos.clone(), &did_key).await? {
162 trace!("crawler found new repo: {}", repo.did);
163
164 let state = RepoState::backfilling(rng.next_u64());
165 batch.insert(&db.repos, &did_key, ser_repo_state(&state)?);
166 batch.insert(&db.pending, keys::pending_key(state.index_id), &did_key);
167 to_queue.push(repo.did.clone());
168 }
169 }
170
171 // 4. update cursor
172 if let Some(new_cursor) = output.cursor {
173 cursor = Some(new_cursor.as_str().into());
174
175 batch.insert(
176 &db.cursors,
177 cursor_key.to_vec(),
178 new_cursor.as_bytes().to_vec(),
179 );
180 } else {
181 // end of pagination
182 info!("crawler reached end of list.");
183 cursor = None;
184 }
185
186 tokio::task::spawn_blocking(move || batch.commit().into_diagnostic())
187 .await
188 .into_diagnostic()??;
189
190 // update counts if we found new repos
191 if !to_queue.is_empty() {
192 let count = to_queue.len() as i64;
193 self.state.db.update_count_async("repos", count).await;
194 self.state.db.update_count_async("pending", count).await;
195 }
196
197 // 5. notify backfill worker
198 if !to_queue.is_empty() {
199 self.state.notify_backfill();
200 }
201
202 if cursor.is_none() {
203 tokio::time::sleep(Duration::from_secs(3600)).await;
204 }
205 }
206 }
207}