this repo has no description
1use crate::circuit_breaker::CircuitBreaker;
2use crate::sync::firehose::SequencedEvent;
3use reqwest::Client;
4use std::sync::Arc;
5use std::sync::atomic::{AtomicU64, Ordering};
6use std::time::Duration;
7use tokio::sync::{broadcast, watch};
8use tracing::{debug, error, info, warn};
9
10const NOTIFY_THRESHOLD_SECS: u64 = 20 * 60;
11
12pub struct Crawlers {
13 hostname: String,
14 crawler_urls: Vec<String>,
15 http_client: Client,
16 last_notified: AtomicU64,
17 circuit_breaker: Option<Arc<CircuitBreaker>>,
18}
19
20impl Crawlers {
21 pub fn new(hostname: String, crawler_urls: Vec<String>) -> Self {
22 Self {
23 hostname,
24 crawler_urls,
25 http_client: Client::builder()
26 .timeout(Duration::from_secs(30))
27 .connect_timeout(Duration::from_secs(5))
28 .pool_max_idle_per_host(5)
29 .pool_idle_timeout(Duration::from_secs(90))
30 .build()
31 .unwrap_or_default(),
32 last_notified: AtomicU64::new(0),
33 circuit_breaker: None,
34 }
35 }
36
37 pub fn with_circuit_breaker(mut self, circuit_breaker: Arc<CircuitBreaker>) -> Self {
38 self.circuit_breaker = Some(circuit_breaker);
39 self
40 }
41
42 pub fn from_env() -> Option<Self> {
43 let hostname = std::env::var("PDS_HOSTNAME").ok()?;
44
45 let crawler_urls: Vec<String> = std::env::var("CRAWLERS")
46 .unwrap_or_default()
47 .split(',')
48 .filter(|s| !s.is_empty())
49 .map(|s| s.trim().to_string())
50 .collect();
51
52 if crawler_urls.is_empty() {
53 return None;
54 }
55
56 Some(Self::new(hostname, crawler_urls))
57 }
58
59 fn should_notify(&self) -> bool {
60 let now = std::time::SystemTime::now()
61 .duration_since(std::time::UNIX_EPOCH)
62 .unwrap_or_default()
63 .as_secs();
64
65 let last = self.last_notified.load(Ordering::Relaxed);
66 now - last >= NOTIFY_THRESHOLD_SECS
67 }
68
69 fn mark_notified(&self) {
70 let now = std::time::SystemTime::now()
71 .duration_since(std::time::UNIX_EPOCH)
72 .unwrap_or_default()
73 .as_secs();
74
75 self.last_notified.store(now, Ordering::Relaxed);
76 }
77
78 pub async fn notify_of_update(&self) {
79 if !self.should_notify() {
80 debug!("Skipping crawler notification due to debounce");
81 return;
82 }
83
84 if let Some(cb) = &self.circuit_breaker
85 && !cb.can_execute().await
86 {
87 debug!("Skipping crawler notification due to circuit breaker open");
88 return;
89 }
90
91 self.mark_notified();
92 let circuit_breaker = self.circuit_breaker.clone();
93
94 for crawler_url in &self.crawler_urls {
95 let url = format!(
96 "{}/xrpc/com.atproto.sync.requestCrawl",
97 crawler_url.trim_end_matches('/')
98 );
99 let hostname = self.hostname.clone();
100 let client = self.http_client.clone();
101 let cb = circuit_breaker.clone();
102
103 tokio::spawn(async move {
104 match client
105 .post(&url)
106 .json(&serde_json::json!({ "hostname": hostname }))
107 .send()
108 .await
109 {
110 Ok(response) => {
111 if response.status().is_success() {
112 debug!(crawler = %url, "Successfully notified crawler");
113 if let Some(cb) = cb {
114 cb.record_success().await;
115 }
116 } else {
117 let status = response.status();
118 let body = response.text().await.unwrap_or_default();
119 warn!(
120 crawler = %url,
121 status = %status,
122 body = %body,
123 hostname = %hostname,
124 "Crawler notification returned non-success status"
125 );
126 if let Some(cb) = cb {
127 cb.record_failure().await;
128 }
129 }
130 }
131 Err(e) => {
132 warn!(crawler = %url, error = %e, "Failed to notify crawler");
133 if let Some(cb) = cb {
134 cb.record_failure().await;
135 }
136 }
137 }
138 });
139 }
140 }
141}
142
143pub async fn start_crawlers_service(
144 crawlers: Arc<Crawlers>,
145 mut firehose_rx: broadcast::Receiver<SequencedEvent>,
146 mut shutdown: watch::Receiver<bool>,
147) {
148 info!(
149 hostname = %crawlers.hostname,
150 crawler_count = crawlers.crawler_urls.len(),
151 crawlers = ?crawlers.crawler_urls,
152 "Starting crawlers notification service"
153 );
154
155 loop {
156 tokio::select! {
157 result = firehose_rx.recv() => {
158 match result {
159 Ok(event) => {
160 if event.event_type == "commit" {
161 crawlers.notify_of_update().await;
162 }
163 }
164 Err(broadcast::error::RecvError::Lagged(n)) => {
165 warn!(skipped = n, "Crawlers service lagged behind firehose");
166 crawlers.notify_of_update().await;
167 }
168 Err(broadcast::error::RecvError::Closed) => {
169 error!("Firehose channel closed, stopping crawlers service");
170 break;
171 }
172 }
173 }
174 _ = shutdown.changed() => {
175 if *shutdown.borrow() {
176 info!("Crawlers service shutting down");
177 break;
178 }
179 }
180 }
181 }
182}