this repo has no description
1use crate::circuit_breaker::CircuitBreaker; 2use crate::sync::firehose::SequencedEvent; 3use reqwest::Client; 4use std::sync::atomic::{AtomicU64, Ordering}; 5use std::sync::Arc; 6use std::time::Duration; 7use tokio::sync::{broadcast, watch}; 8use tracing::{debug, error, info, warn}; 9const NOTIFY_THRESHOLD_SECS: u64 = 20 * 60; 10pub struct Crawlers { 11 hostname: String, 12 crawler_urls: Vec<String>, 13 http_client: Client, 14 last_notified: AtomicU64, 15 circuit_breaker: Option<Arc<CircuitBreaker>>, 16} 17impl Crawlers { 18 pub fn new(hostname: String, crawler_urls: Vec<String>) -> Self { 19 Self { 20 hostname, 21 crawler_urls, 22 http_client: Client::builder() 23 .timeout(Duration::from_secs(30)) 24 .build() 25 .unwrap_or_default(), 26 last_notified: AtomicU64::new(0), 27 circuit_breaker: None, 28 } 29 } 30 pub fn with_circuit_breaker(mut self, circuit_breaker: Arc<CircuitBreaker>) -> Self { 31 self.circuit_breaker = Some(circuit_breaker); 32 self 33 } 34 pub fn from_env() -> Option<Self> { 35 let hostname = std::env::var("PDS_HOSTNAME").ok()?; 36 let crawler_urls: Vec<String> = std::env::var("CRAWLERS") 37 .unwrap_or_default() 38 .split(',') 39 .filter(|s| !s.is_empty()) 40 .map(|s| s.trim().to_string()) 41 .collect(); 42 if crawler_urls.is_empty() { 43 return None; 44 } 45 Some(Self::new(hostname, crawler_urls)) 46 } 47 fn should_notify(&self) -> bool { 48 let now = std::time::SystemTime::now() 49 .duration_since(std::time::UNIX_EPOCH) 50 .unwrap_or_default() 51 .as_secs(); 52 let last = self.last_notified.load(Ordering::Relaxed); 53 now - last >= NOTIFY_THRESHOLD_SECS 54 } 55 fn mark_notified(&self) { 56 let now = std::time::SystemTime::now() 57 .duration_since(std::time::UNIX_EPOCH) 58 .unwrap_or_default() 59 .as_secs(); 60 self.last_notified.store(now, Ordering::Relaxed); 61 } 62 pub async fn notify_of_update(&self) { 63 if !self.should_notify() { 64 debug!("Skipping crawler notification due to debounce"); 65 return; 66 } 67 if let Some(cb) = &self.circuit_breaker { 68 if !cb.can_execute().await { 69 debug!("Skipping crawler notification due to circuit breaker open"); 70 return; 71 } 72 } 73 self.mark_notified(); 74 let circuit_breaker = self.circuit_breaker.clone(); 75 for crawler_url in &self.crawler_urls { 76 let url = format!("{}/xrpc/com.atproto.sync.requestCrawl", crawler_url.trim_end_matches('/')); 77 let hostname = self.hostname.clone(); 78 let client = self.http_client.clone(); 79 let cb = circuit_breaker.clone(); 80 tokio::spawn(async move { 81 match client 82 .post(&url) 83 .json(&serde_json::json!({ "hostname": hostname })) 84 .send() 85 .await 86 { 87 Ok(response) => { 88 if response.status().is_success() { 89 debug!(crawler = %url, "Successfully notified crawler"); 90 if let Some(cb) = cb { 91 cb.record_success().await; 92 } 93 } else { 94 let status = response.status(); 95 let body = response.text().await.unwrap_or_default(); 96 warn!( 97 crawler = %url, 98 status = %status, 99 body = %body, 100 hostname = %hostname, 101 "Crawler notification returned non-success status" 102 ); 103 if let Some(cb) = cb { 104 cb.record_failure().await; 105 } 106 } 107 } 108 Err(e) => { 109 warn!(crawler = %url, error = %e, "Failed to notify crawler"); 110 if let Some(cb) = cb { 111 cb.record_failure().await; 112 } 113 } 114 } 115 }); 116 } 117 } 118} 119pub async fn start_crawlers_service( 120 crawlers: Arc<Crawlers>, 121 mut firehose_rx: broadcast::Receiver<SequencedEvent>, 122 mut shutdown: watch::Receiver<bool>, 123) { 124 info!( 125 hostname = %crawlers.hostname, 126 crawler_count = crawlers.crawler_urls.len(), 127 crawlers = ?crawlers.crawler_urls, 128 "Starting crawlers notification service" 129 ); 130 loop { 131 tokio::select! { 132 result = firehose_rx.recv() => { 133 match result { 134 Ok(event) => { 135 if event.event_type == "commit" { 136 crawlers.notify_of_update().await; 137 } 138 } 139 Err(broadcast::error::RecvError::Lagged(n)) => { 140 warn!(skipped = n, "Crawlers service lagged behind firehose"); 141 crawlers.notify_of_update().await; 142 } 143 Err(broadcast::error::RecvError::Closed) => { 144 error!("Firehose channel closed, stopping crawlers service"); 145 break; 146 } 147 } 148 } 149 _ = shutdown.changed() => { 150 if *shutdown.borrow() { 151 info!("Crawlers service shutting down"); 152 break; 153 } 154 } 155 } 156 } 157}