kind of like tap but different and in rust
at main 207 lines 7.4 kB view raw
1use crate::db::{Db, keys, ser_repo_state}; 2use crate::filter::FilterMode; 3use crate::state::AppState; 4use crate::types::RepoState; 5use jacquard::api::com_atproto::sync::list_repos::{ListRepos, ListReposOutput}; 6use jacquard::prelude::*; 7use jacquard_common::CowStr; 8use miette::{IntoDiagnostic, Result}; 9use rand::Rng; 10use rand::rngs::SmallRng; 11use smol_str::SmolStr; 12use std::sync::Arc; 13use std::time::Duration; 14use tracing::{debug, error, info, trace}; 15use url::Url; 16 17pub struct Crawler { 18 state: Arc<AppState>, 19 relay_host: Url, 20 http: reqwest::Client, 21 max_pending: usize, 22 resume_pending: usize, 23} 24 25impl Crawler { 26 pub fn new( 27 state: Arc<AppState>, 28 relay_host: Url, 29 max_pending: usize, 30 resume_pending: usize, 31 ) -> Self { 32 Self { 33 state, 34 relay_host, 35 http: reqwest::Client::new(), 36 max_pending, 37 resume_pending, 38 } 39 } 40 41 pub async fn run(self) -> Result<()> { 42 info!("crawler started"); 43 44 let mut rng: SmallRng = rand::make_rng(); 45 46 let db = &self.state.db; 47 48 // 1. load cursor 49 let cursor_key = b"crawler_cursor"; 50 let mut cursor: Option<SmolStr> = 51 if let Ok(Some(bytes)) = Db::get(db.cursors.clone(), cursor_key.to_vec()).await { 52 let s = String::from_utf8_lossy(&bytes); 53 info!("resuming crawler from cursor: {}", s); 54 Some(s.into()) 55 } else { 56 None 57 }; 58 let mut was_throttled = false; 59 60 loop { 61 // check throttling 62 loop { 63 let pending = self.state.db.get_count("pending").await; 64 if pending > self.max_pending as u64 { 65 if !was_throttled { 66 debug!( 67 "crawler throttling: pending repos {} > max {}, sleeping...", 68 pending, self.max_pending 69 ); 70 was_throttled = true; 71 } 72 tokio::time::sleep(Duration::from_secs(10)).await; 73 } else if pending > self.resume_pending as u64 { 74 if !was_throttled { 75 debug!( 76 "crawler throttling: pending repos {} > max {}, entering cooldown...", 77 pending, self.max_pending 78 ); 79 was_throttled = true; 80 } 81 82 while self.state.db.get_count("pending").await > self.resume_pending as u64 { 83 debug!( 84 "crawler cooldown: pending repos {} > resume {}, sleeping...", 85 self.state.db.get_count("pending").await, 86 self.resume_pending 87 ); 88 tokio::time::sleep(Duration::from_secs(10)).await; 89 } 90 break; 91 } else { 92 if was_throttled { 93 info!("crawler resuming: throttling released"); 94 was_throttled = false; 95 } 96 break; 97 } 98 } 99 100 // 2. fetch listrepos 101 let req = ListRepos::new() 102 .limit(1000) 103 .maybe_cursor(cursor.clone().map(|c| CowStr::from(c.to_string()))) 104 .build(); 105 106 let mut url = self.relay_host.clone(); 107 if url.scheme() == "wss" { 108 url.set_scheme("https") 109 .map_err(|_| miette::miette!("invalid url: {url}"))?; 110 } else if url.scheme() == "ws" { 111 url.set_scheme("http") 112 .map_err(|_| miette::miette!("invalid url: {url}"))?; 113 } 114 let res_result = self.http.xrpc(url).send(&req).await; 115 116 let output: ListReposOutput = match res_result { 117 Ok(res) => res.into_output().into_diagnostic()?, 118 Err(e) => { 119 let e = e 120 .source_err() 121 .map(|e| e.to_string()) 122 .unwrap_or_else(|| e.to_string()); 123 error!("crawler failed to list repos: {e}. retrying in 30s..."); 124 tokio::time::sleep(Duration::from_secs(30)).await; 125 continue; 126 } 127 }; 128 129 if output.repos.is_empty() { 130 info!("crawler finished enumeration (or empty page). sleeping for 1 hour."); 131 tokio::time::sleep(Duration::from_secs(3600)).await; 132 continue; 133 } 134 135 debug!("crawler fetched {} repos...", output.repos.len()); 136 137 let mut batch = db.inner.batch(); 138 let mut to_queue = Vec::new(); 139 140 let filter = self.state.filter.load(); 141 142 // 3. process repos 143 for repo in output.repos { 144 let did_key = keys::repo_key(&repo.did); 145 146 let excl_key = 147 crate::filter::filter_key(crate::filter::EXCLUDE_PREFIX, repo.did.as_str()); 148 if db.filter.contains_key(&excl_key).into_diagnostic()? { 149 continue; 150 } 151 152 if filter.mode != FilterMode::Full { 153 let did_filter_key = 154 crate::filter::filter_key(crate::filter::DID_PREFIX, repo.did.as_str()); 155 if !db.filter.contains_key(&did_filter_key).into_diagnostic()? { 156 continue; 157 } 158 } 159 160 // check if known 161 if !Db::contains_key(db.repos.clone(), &did_key).await? { 162 trace!("crawler found new repo: {}", repo.did); 163 164 let state = RepoState::backfilling(rng.next_u64()); 165 batch.insert(&db.repos, &did_key, ser_repo_state(&state)?); 166 batch.insert(&db.pending, keys::pending_key(state.index_id), &did_key); 167 to_queue.push(repo.did.clone()); 168 } 169 } 170 171 // 4. update cursor 172 if let Some(new_cursor) = output.cursor { 173 cursor = Some(new_cursor.as_str().into()); 174 175 batch.insert( 176 &db.cursors, 177 cursor_key.to_vec(), 178 new_cursor.as_bytes().to_vec(), 179 ); 180 } else { 181 // end of pagination 182 info!("crawler reached end of list."); 183 cursor = None; 184 } 185 186 tokio::task::spawn_blocking(move || batch.commit().into_diagnostic()) 187 .await 188 .into_diagnostic()??; 189 190 // update counts if we found new repos 191 if !to_queue.is_empty() { 192 let count = to_queue.len() as i64; 193 self.state.db.update_count_async("repos", count).await; 194 self.state.db.update_count_async("pending", count).await; 195 } 196 197 // 5. notify backfill worker 198 if !to_queue.is_empty() { 199 self.state.notify_backfill(); 200 } 201 202 if cursor.is_none() { 203 tokio::time::sleep(Duration::from_secs(3600)).await; 204 } 205 } 206 } 207}