this repo has no description
1use crate::circuit_breaker::CircuitBreaker;
2use crate::sync::firehose::SequencedEvent;
3use reqwest::Client;
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::sync::Arc;
6use std::time::Duration;
7use tokio::sync::{broadcast, watch};
8use tracing::{debug, error, info, warn};
9const NOTIFY_THRESHOLD_SECS: u64 = 20 * 60;
10pub struct Crawlers {
11 hostname: String,
12 crawler_urls: Vec<String>,
13 http_client: Client,
14 last_notified: AtomicU64,
15 circuit_breaker: Option<Arc<CircuitBreaker>>,
16}
17impl Crawlers {
18 pub fn new(hostname: String, crawler_urls: Vec<String>) -> Self {
19 Self {
20 hostname,
21 crawler_urls,
22 http_client: Client::builder()
23 .timeout(Duration::from_secs(30))
24 .build()
25 .unwrap_or_default(),
26 last_notified: AtomicU64::new(0),
27 circuit_breaker: None,
28 }
29 }
30 pub fn with_circuit_breaker(mut self, circuit_breaker: Arc<CircuitBreaker>) -> Self {
31 self.circuit_breaker = Some(circuit_breaker);
32 self
33 }
34 pub fn from_env() -> Option<Self> {
35 let hostname = std::env::var("PDS_HOSTNAME").ok()?;
36 let crawler_urls: Vec<String> = std::env::var("CRAWLERS")
37 .unwrap_or_default()
38 .split(',')
39 .filter(|s| !s.is_empty())
40 .map(|s| s.trim().to_string())
41 .collect();
42 if crawler_urls.is_empty() {
43 return None;
44 }
45 Some(Self::new(hostname, crawler_urls))
46 }
47 fn should_notify(&self) -> bool {
48 let now = std::time::SystemTime::now()
49 .duration_since(std::time::UNIX_EPOCH)
50 .unwrap_or_default()
51 .as_secs();
52 let last = self.last_notified.load(Ordering::Relaxed);
53 now - last >= NOTIFY_THRESHOLD_SECS
54 }
55 fn mark_notified(&self) {
56 let now = std::time::SystemTime::now()
57 .duration_since(std::time::UNIX_EPOCH)
58 .unwrap_or_default()
59 .as_secs();
60 self.last_notified.store(now, Ordering::Relaxed);
61 }
62 pub async fn notify_of_update(&self) {
63 if !self.should_notify() {
64 debug!("Skipping crawler notification due to debounce");
65 return;
66 }
67 if let Some(cb) = &self.circuit_breaker {
68 if !cb.can_execute().await {
69 debug!("Skipping crawler notification due to circuit breaker open");
70 return;
71 }
72 }
73 self.mark_notified();
74 let circuit_breaker = self.circuit_breaker.clone();
75 for crawler_url in &self.crawler_urls {
76 let url = format!("{}/xrpc/com.atproto.sync.requestCrawl", crawler_url.trim_end_matches('/'));
77 let hostname = self.hostname.clone();
78 let client = self.http_client.clone();
79 let cb = circuit_breaker.clone();
80 tokio::spawn(async move {
81 match client
82 .post(&url)
83 .json(&serde_json::json!({ "hostname": hostname }))
84 .send()
85 .await
86 {
87 Ok(response) => {
88 if response.status().is_success() {
89 debug!(crawler = %url, "Successfully notified crawler");
90 if let Some(cb) = cb {
91 cb.record_success().await;
92 }
93 } else {
94 let status = response.status();
95 let body = response.text().await.unwrap_or_default();
96 warn!(
97 crawler = %url,
98 status = %status,
99 body = %body,
100 hostname = %hostname,
101 "Crawler notification returned non-success status"
102 );
103 if let Some(cb) = cb {
104 cb.record_failure().await;
105 }
106 }
107 }
108 Err(e) => {
109 warn!(crawler = %url, error = %e, "Failed to notify crawler");
110 if let Some(cb) = cb {
111 cb.record_failure().await;
112 }
113 }
114 }
115 });
116 }
117 }
118}
119pub async fn start_crawlers_service(
120 crawlers: Arc<Crawlers>,
121 mut firehose_rx: broadcast::Receiver<SequencedEvent>,
122 mut shutdown: watch::Receiver<bool>,
123) {
124 info!(
125 hostname = %crawlers.hostname,
126 crawler_count = crawlers.crawler_urls.len(),
127 crawlers = ?crawlers.crawler_urls,
128 "Starting crawlers notification service"
129 );
130 loop {
131 tokio::select! {
132 result = firehose_rx.recv() => {
133 match result {
134 Ok(event) => {
135 if event.event_type == "commit" {
136 crawlers.notify_of_update().await;
137 }
138 }
139 Err(broadcast::error::RecvError::Lagged(n)) => {
140 warn!(skipped = n, "Crawlers service lagged behind firehose");
141 crawlers.notify_of_update().await;
142 }
143 Err(broadcast::error::RecvError::Closed) => {
144 error!("Firehose channel closed, stopping crawlers service");
145 break;
146 }
147 }
148 }
149 _ = shutdown.changed() => {
150 if *shutdown.borrow() {
151 info!("Crawlers service shutting down");
152 break;
153 }
154 }
155 }
156 }
157}