this repo has no description
1use crate::circuit_breaker::CircuitBreaker; 2use crate::sync::firehose::SequencedEvent; 3use reqwest::Client; 4use std::sync::Arc; 5use std::sync::atomic::{AtomicU64, Ordering}; 6use std::time::Duration; 7use tokio::sync::{broadcast, watch}; 8use tracing::{debug, error, info, warn}; 9 10const NOTIFY_THRESHOLD_SECS: u64 = 20 * 60; 11 12pub struct Crawlers { 13 hostname: String, 14 crawler_urls: Vec<String>, 15 http_client: Client, 16 last_notified: AtomicU64, 17 circuit_breaker: Option<Arc<CircuitBreaker>>, 18} 19 20impl Crawlers { 21 pub fn new(hostname: String, crawler_urls: Vec<String>) -> Self { 22 Self { 23 hostname, 24 crawler_urls, 25 http_client: Client::builder() 26 .timeout(Duration::from_secs(30)) 27 .connect_timeout(Duration::from_secs(5)) 28 .pool_max_idle_per_host(5) 29 .pool_idle_timeout(Duration::from_secs(90)) 30 .build() 31 .unwrap_or_default(), 32 last_notified: AtomicU64::new(0), 33 circuit_breaker: None, 34 } 35 } 36 37 pub fn with_circuit_breaker(mut self, circuit_breaker: Arc<CircuitBreaker>) -> Self { 38 self.circuit_breaker = Some(circuit_breaker); 39 self 40 } 41 42 pub fn from_env() -> Option<Self> { 43 let hostname = std::env::var("PDS_HOSTNAME").ok()?; 44 45 let crawler_urls: Vec<String> = std::env::var("CRAWLERS") 46 .unwrap_or_default() 47 .split(',') 48 .filter(|s| !s.is_empty()) 49 .map(|s| s.trim().to_string()) 50 .collect(); 51 52 if crawler_urls.is_empty() { 53 return None; 54 } 55 56 Some(Self::new(hostname, crawler_urls)) 57 } 58 59 fn should_notify(&self) -> bool { 60 let now = std::time::SystemTime::now() 61 .duration_since(std::time::UNIX_EPOCH) 62 .unwrap_or_default() 63 .as_secs(); 64 65 let last = self.last_notified.load(Ordering::Relaxed); 66 now - last >= NOTIFY_THRESHOLD_SECS 67 } 68 69 fn mark_notified(&self) { 70 let now = std::time::SystemTime::now() 71 .duration_since(std::time::UNIX_EPOCH) 72 .unwrap_or_default() 73 .as_secs(); 74 75 self.last_notified.store(now, Ordering::Relaxed); 76 } 77 78 pub async fn notify_of_update(&self) { 79 if !self.should_notify() { 80 debug!("Skipping crawler notification due to debounce"); 81 return; 82 } 83 84 if let Some(cb) = &self.circuit_breaker 85 && !cb.can_execute().await 86 { 87 debug!("Skipping crawler notification due to circuit breaker open"); 88 return; 89 } 90 91 self.mark_notified(); 92 let circuit_breaker = self.circuit_breaker.clone(); 93 94 for crawler_url in &self.crawler_urls { 95 let url = format!( 96 "{}/xrpc/com.atproto.sync.requestCrawl", 97 crawler_url.trim_end_matches('/') 98 ); 99 let hostname = self.hostname.clone(); 100 let client = self.http_client.clone(); 101 let cb = circuit_breaker.clone(); 102 103 tokio::spawn(async move { 104 match client 105 .post(&url) 106 .json(&serde_json::json!({ "hostname": hostname })) 107 .send() 108 .await 109 { 110 Ok(response) => { 111 if response.status().is_success() { 112 debug!(crawler = %url, "Successfully notified crawler"); 113 if let Some(cb) = cb { 114 cb.record_success().await; 115 } 116 } else { 117 let status = response.status(); 118 let body = response.text().await.unwrap_or_default(); 119 warn!( 120 crawler = %url, 121 status = %status, 122 body = %body, 123 hostname = %hostname, 124 "Crawler notification returned non-success status" 125 ); 126 if let Some(cb) = cb { 127 cb.record_failure().await; 128 } 129 } 130 } 131 Err(e) => { 132 warn!(crawler = %url, error = %e, "Failed to notify crawler"); 133 if let Some(cb) = cb { 134 cb.record_failure().await; 135 } 136 } 137 } 138 }); 139 } 140 } 141} 142 143pub async fn start_crawlers_service( 144 crawlers: Arc<Crawlers>, 145 mut firehose_rx: broadcast::Receiver<SequencedEvent>, 146 mut shutdown: watch::Receiver<bool>, 147) { 148 info!( 149 hostname = %crawlers.hostname, 150 crawler_count = crawlers.crawler_urls.len(), 151 crawlers = ?crawlers.crawler_urls, 152 "Starting crawlers notification service" 153 ); 154 155 loop { 156 tokio::select! { 157 result = firehose_rx.recv() => { 158 match result { 159 Ok(event) => { 160 if event.event_type == "commit" { 161 crawlers.notify_of_update().await; 162 } 163 } 164 Err(broadcast::error::RecvError::Lagged(n)) => { 165 warn!(skipped = n, "Crawlers service lagged behind firehose"); 166 crawlers.notify_of_update().await; 167 } 168 Err(broadcast::error::RecvError::Closed) => { 169 error!("Firehose channel closed, stopping crawlers service"); 170 break; 171 } 172 } 173 } 174 _ = shutdown.changed() => { 175 if *shutdown.borrow() { 176 info!("Crawlers service shutting down"); 177 break; 178 } 179 } 180 } 181 } 182}