this repo has no description
1use crate::circuit_breaker::CircuitBreaker;
2use crate::sync::firehose::SequencedEvent;
3use reqwest::Client;
4use std::sync::Arc;
5use std::sync::atomic::{AtomicU64, Ordering};
6use std::time::Duration;
7use tokio::sync::{broadcast, watch};
8use tracing::{debug, error, info, warn};
9
10const NOTIFY_THRESHOLD_SECS: u64 = 20 * 60;
11
12pub struct Crawlers {
13 hostname: String,
14 crawler_urls: Vec<String>,
15 http_client: Client,
16 last_notified: AtomicU64,
17 circuit_breaker: Option<Arc<CircuitBreaker>>,
18}
19
20impl Crawlers {
21 pub fn new(hostname: String, crawler_urls: Vec<String>) -> Self {
22 Self {
23 hostname,
24 crawler_urls,
25 http_client: Client::builder()
26 .timeout(Duration::from_secs(30))
27 .build()
28 .unwrap_or_default(),
29 last_notified: AtomicU64::new(0),
30 circuit_breaker: None,
31 }
32 }
33
34 pub fn with_circuit_breaker(mut self, circuit_breaker: Arc<CircuitBreaker>) -> Self {
35 self.circuit_breaker = Some(circuit_breaker);
36 self
37 }
38
39 pub fn from_env() -> Option<Self> {
40 let hostname = std::env::var("PDS_HOSTNAME").ok()?;
41
42 let crawler_urls: Vec<String> = std::env::var("CRAWLERS")
43 .unwrap_or_default()
44 .split(',')
45 .filter(|s| !s.is_empty())
46 .map(|s| s.trim().to_string())
47 .collect();
48
49 if crawler_urls.is_empty() {
50 return None;
51 }
52
53 Some(Self::new(hostname, crawler_urls))
54 }
55
56 fn should_notify(&self) -> bool {
57 let now = std::time::SystemTime::now()
58 .duration_since(std::time::UNIX_EPOCH)
59 .unwrap_or_default()
60 .as_secs();
61
62 let last = self.last_notified.load(Ordering::Relaxed);
63 now - last >= NOTIFY_THRESHOLD_SECS
64 }
65
66 fn mark_notified(&self) {
67 let now = std::time::SystemTime::now()
68 .duration_since(std::time::UNIX_EPOCH)
69 .unwrap_or_default()
70 .as_secs();
71
72 self.last_notified.store(now, Ordering::Relaxed);
73 }
74
75 pub async fn notify_of_update(&self) {
76 if !self.should_notify() {
77 debug!("Skipping crawler notification due to debounce");
78 return;
79 }
80
81 if let Some(cb) = &self.circuit_breaker
82 && !cb.can_execute().await
83 {
84 debug!("Skipping crawler notification due to circuit breaker open");
85 return;
86 }
87
88 self.mark_notified();
89 let circuit_breaker = self.circuit_breaker.clone();
90
91 for crawler_url in &self.crawler_urls {
92 let url = format!(
93 "{}/xrpc/com.atproto.sync.requestCrawl",
94 crawler_url.trim_end_matches('/')
95 );
96 let hostname = self.hostname.clone();
97 let client = self.http_client.clone();
98 let cb = circuit_breaker.clone();
99
100 tokio::spawn(async move {
101 match client
102 .post(&url)
103 .json(&serde_json::json!({ "hostname": hostname }))
104 .send()
105 .await
106 {
107 Ok(response) => {
108 if response.status().is_success() {
109 debug!(crawler = %url, "Successfully notified crawler");
110 if let Some(cb) = cb {
111 cb.record_success().await;
112 }
113 } else {
114 let status = response.status();
115 let body = response.text().await.unwrap_or_default();
116 warn!(
117 crawler = %url,
118 status = %status,
119 body = %body,
120 hostname = %hostname,
121 "Crawler notification returned non-success status"
122 );
123 if let Some(cb) = cb {
124 cb.record_failure().await;
125 }
126 }
127 }
128 Err(e) => {
129 warn!(crawler = %url, error = %e, "Failed to notify crawler");
130 if let Some(cb) = cb {
131 cb.record_failure().await;
132 }
133 }
134 }
135 });
136 }
137 }
138}
139
140pub async fn start_crawlers_service(
141 crawlers: Arc<Crawlers>,
142 mut firehose_rx: broadcast::Receiver<SequencedEvent>,
143 mut shutdown: watch::Receiver<bool>,
144) {
145 info!(
146 hostname = %crawlers.hostname,
147 crawler_count = crawlers.crawler_urls.len(),
148 crawlers = ?crawlers.crawler_urls,
149 "Starting crawlers notification service"
150 );
151
152 loop {
153 tokio::select! {
154 result = firehose_rx.recv() => {
155 match result {
156 Ok(event) => {
157 if event.event_type == "commit" {
158 crawlers.notify_of_update().await;
159 }
160 }
161 Err(broadcast::error::RecvError::Lagged(n)) => {
162 warn!(skipped = n, "Crawlers service lagged behind firehose");
163 crawlers.notify_of_update().await;
164 }
165 Err(broadcast::error::RecvError::Closed) => {
166 error!("Firehose channel closed, stopping crawlers service");
167 break;
168 }
169 }
170 }
171 _ = shutdown.changed() => {
172 if *shutdown.borrow() {
173 info!("Crawlers service shutting down");
174 break;
175 }
176 }
177 }
178 }
179}