this repo has no description
1use crate::circuit_breaker::CircuitBreaker; 2use crate::sync::firehose::SequencedEvent; 3use reqwest::Client; 4use std::sync::Arc; 5use std::sync::atomic::{AtomicU64, Ordering}; 6use std::time::Duration; 7use tokio::sync::{broadcast, watch}; 8use tracing::{debug, error, info, warn}; 9 10const NOTIFY_THRESHOLD_SECS: u64 = 20 * 60; 11 12pub struct Crawlers { 13 hostname: String, 14 crawler_urls: Vec<String>, 15 http_client: Client, 16 last_notified: AtomicU64, 17 circuit_breaker: Option<Arc<CircuitBreaker>>, 18} 19 20impl Crawlers { 21 pub fn new(hostname: String, crawler_urls: Vec<String>) -> Self { 22 Self { 23 hostname, 24 crawler_urls, 25 http_client: Client::builder() 26 .timeout(Duration::from_secs(30)) 27 .build() 28 .unwrap_or_default(), 29 last_notified: AtomicU64::new(0), 30 circuit_breaker: None, 31 } 32 } 33 34 pub fn with_circuit_breaker(mut self, circuit_breaker: Arc<CircuitBreaker>) -> Self { 35 self.circuit_breaker = Some(circuit_breaker); 36 self 37 } 38 39 pub fn from_env() -> Option<Self> { 40 let hostname = std::env::var("PDS_HOSTNAME").ok()?; 41 42 let crawler_urls: Vec<String> = std::env::var("CRAWLERS") 43 .unwrap_or_default() 44 .split(',') 45 .filter(|s| !s.is_empty()) 46 .map(|s| s.trim().to_string()) 47 .collect(); 48 49 if crawler_urls.is_empty() { 50 return None; 51 } 52 53 Some(Self::new(hostname, crawler_urls)) 54 } 55 56 fn should_notify(&self) -> bool { 57 let now = std::time::SystemTime::now() 58 .duration_since(std::time::UNIX_EPOCH) 59 .unwrap_or_default() 60 .as_secs(); 61 62 let last = self.last_notified.load(Ordering::Relaxed); 63 now - last >= NOTIFY_THRESHOLD_SECS 64 } 65 66 fn mark_notified(&self) { 67 let now = std::time::SystemTime::now() 68 .duration_since(std::time::UNIX_EPOCH) 69 .unwrap_or_default() 70 .as_secs(); 71 72 self.last_notified.store(now, Ordering::Relaxed); 73 } 74 75 pub async fn notify_of_update(&self) { 76 if !self.should_notify() { 77 debug!("Skipping crawler notification due to debounce"); 78 return; 79 } 80 81 if let Some(cb) = &self.circuit_breaker 82 && !cb.can_execute().await 83 { 84 debug!("Skipping crawler notification due to circuit breaker open"); 85 return; 86 } 87 88 self.mark_notified(); 89 let circuit_breaker = self.circuit_breaker.clone(); 90 91 for crawler_url in &self.crawler_urls { 92 let url = format!( 93 "{}/xrpc/com.atproto.sync.requestCrawl", 94 crawler_url.trim_end_matches('/') 95 ); 96 let hostname = self.hostname.clone(); 97 let client = self.http_client.clone(); 98 let cb = circuit_breaker.clone(); 99 100 tokio::spawn(async move { 101 match client 102 .post(&url) 103 .json(&serde_json::json!({ "hostname": hostname })) 104 .send() 105 .await 106 { 107 Ok(response) => { 108 if response.status().is_success() { 109 debug!(crawler = %url, "Successfully notified crawler"); 110 if let Some(cb) = cb { 111 cb.record_success().await; 112 } 113 } else { 114 let status = response.status(); 115 let body = response.text().await.unwrap_or_default(); 116 warn!( 117 crawler = %url, 118 status = %status, 119 body = %body, 120 hostname = %hostname, 121 "Crawler notification returned non-success status" 122 ); 123 if let Some(cb) = cb { 124 cb.record_failure().await; 125 } 126 } 127 } 128 Err(e) => { 129 warn!(crawler = %url, error = %e, "Failed to notify crawler"); 130 if let Some(cb) = cb { 131 cb.record_failure().await; 132 } 133 } 134 } 135 }); 136 } 137 } 138} 139 140pub async fn start_crawlers_service( 141 crawlers: Arc<Crawlers>, 142 mut firehose_rx: broadcast::Receiver<SequencedEvent>, 143 mut shutdown: watch::Receiver<bool>, 144) { 145 info!( 146 hostname = %crawlers.hostname, 147 crawler_count = crawlers.crawler_urls.len(), 148 crawlers = ?crawlers.crawler_urls, 149 "Starting crawlers notification service" 150 ); 151 152 loop { 153 tokio::select! { 154 result = firehose_rx.recv() => { 155 match result { 156 Ok(event) => { 157 if event.event_type == "commit" { 158 crawlers.notify_of_update().await; 159 } 160 } 161 Err(broadcast::error::RecvError::Lagged(n)) => { 162 warn!(skipped = n, "Crawlers service lagged behind firehose"); 163 crawlers.notify_of_update().await; 164 } 165 Err(broadcast::error::RecvError::Closed) => { 166 error!("Firehose channel closed, stopping crawlers service"); 167 break; 168 } 169 } 170 } 171 _ = shutdown.changed() => { 172 if *shutdown.borrow() { 173 info!("Crawlers service shutting down"); 174 break; 175 } 176 } 177 } 178 } 179}