forked from
atscan.net/plcbundle-rs
High-performance implementation of plcbundle written in Rust
1//! Rate-limited HTTP client for PLC directory: fetch operations and DID documents with retries/backoff and token-bucket limiting
2// PLC Client - HTTP client for interacting with PLC directory APIs
3use crate::constants;
4use crate::resolver::DIDDocument;
5use anyhow::{Context, Result};
6use std::collections::VecDeque;
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10// Import PLCOperation from sync module (needed for fetch_operations)
11use crate::sync::PLCOperation;
12
13/// HTTP client for PLC directory with rate limiting and retry logic
14pub struct PLCClient {
15 client: reqwest::Client,
16 base_url: String,
17 rate_limiter: RateLimiter,
18 last_retry_after: std::sync::Arc<tokio::sync::Mutex<Option<Duration>>>,
19 request_timestamps: Arc<std::sync::Mutex<VecDeque<Instant>>>,
20 rate_limit_period: Duration,
21}
22
23pub struct RawExportResponse {
24 pub status: u16,
25 pub headers: Vec<(String, String)>,
26 pub body: String,
27 pub http_start: String,
28}
29
30impl PLCClient {
31 pub fn new(base_url: impl Into<String>) -> Result<Self> {
32 let period = Duration::from_secs(constants::PLC_RATE_LIMIT_PERIOD);
33 let requests_per_period = (constants::PLC_RATE_LIMIT_REQUEST as f64
34 * constants::PLC_RATE_LIMIT_SAFETY_FACTOR)
35 .floor() as usize;
36 Ok(Self {
37 client: reqwest::Client::builder()
38 .timeout(Duration::from_secs(constants::HTTP_TIMEOUT_SECS))
39 .build()?,
40 base_url: base_url.into(),
41 rate_limiter: RateLimiter::new(requests_per_period, period),
42 last_retry_after: std::sync::Arc::new(tokio::sync::Mutex::new(None)),
43 request_timestamps: Arc::new(std::sync::Mutex::new(VecDeque::new())),
44 rate_limit_period: period,
45 })
46 }
47
48 pub fn build_export_url(&self, after: &str, count: usize) -> String {
49 format!("{}/export?after={}&count={}", self.base_url, after, count)
50 }
51
52 /// Record a request timestamp and clean up old entries
53 fn record_request(&self) {
54 let now = Instant::now();
55 let mut timestamps = self.request_timestamps.lock().unwrap();
56
57 // Remove timestamps older than the rate limit period
58 // If checked_sub fails (shouldn't happen in practice), use now as cutoff (counts all)
59 let cutoff = now.checked_sub(self.rate_limit_period).unwrap_or(now);
60 while let Some(&oldest) = timestamps.front() {
61 if oldest < cutoff {
62 timestamps.pop_front();
63 } else {
64 break;
65 }
66 }
67
68 timestamps.push_back(now);
69 }
70
71 /// Count requests made in the rate limit period
72 fn count_requests_in_period(&self) -> usize {
73 let now = Instant::now();
74 let timestamps = self.request_timestamps.lock().unwrap();
75
76 // If checked_sub fails (shouldn't happen in practice), use now as cutoff (counts all)
77 let cutoff = now.checked_sub(self.rate_limit_period).unwrap_or(now);
78 timestamps.iter().filter(|&&ts| ts >= cutoff).count()
79 }
80
81 /// Fetch operations from PLC directory export endpoint
82 pub async fn fetch_operations(
83 &self,
84 after: &str,
85 count: usize,
86 shutdown_rx: Option<tokio::sync::watch::Receiver<bool>>,
87 capture_raw: bool,
88 ) -> Result<(
89 Vec<PLCOperation>,
90 Duration,
91 Duration,
92 Option<RawExportResponse>,
93 Option<chrono::DateTime<chrono::Utc>>,
94 )> {
95 self.fetch_operations_unified(after, count, shutdown_rx, capture_raw)
96 .await
97 }
98
99 // merged into `fetch_operations`
100
101 // merged into `fetch_operations`
102
103 async fn fetch_operations_unified(
104 &self,
105 after: &str,
106 count: usize,
107 shutdown_rx: Option<tokio::sync::watch::Receiver<bool>>,
108 capture_raw: bool,
109 ) -> Result<(
110 Vec<PLCOperation>,
111 Duration,
112 Duration,
113 Option<RawExportResponse>,
114 Option<chrono::DateTime<chrono::Utc>>,
115 )> {
116 let mut backoff = Duration::from_secs(1);
117 let mut last_err = None;
118 let mut total_wait = Duration::from_secs(0);
119 let mut total_http = Duration::from_secs(0);
120
121 for attempt in 1..=5 {
122 if let Some(ref rx) = shutdown_rx && *rx.borrow() {
123 anyhow::bail!("Shutdown requested");
124 }
125 let export_url = format!("{}/export?after={}&count={}", self.base_url, after, count);
126
127 let permits = self.rate_limiter.available_permits();
128 let requests_in_period = self.count_requests_in_period();
129 log::debug!(
130 "[PLCClient] Preparing /export request: {} | permits={} | window={:?} | requests_in_window={}",
131 export_url,
132 permits,
133 self.rate_limit_period,
134 requests_in_period
135 );
136
137 let wait_start = Instant::now();
138 if let Some(mut rx) = shutdown_rx.clone() {
139 tokio::select! {
140 _ = self.rate_limiter.wait() => {}
141 _ = rx.changed() => { if *rx.borrow() { anyhow::bail!("Shutdown requested"); } }
142 }
143 } else {
144 self.rate_limiter.wait().await;
145 }
146 let wait_elapsed = wait_start.elapsed();
147 if wait_elapsed.as_nanos() > 0 {
148 log::debug!("[PLCClient] Rate limiter wait: {:?}", wait_elapsed);
149 }
150 total_wait += wait_elapsed;
151
152 *self.last_retry_after.lock().await = None;
153 self.record_request();
154
155 let result = if capture_raw {
156 self.do_fetch_operations(after, count, true).await
157 } else {
158 self.do_fetch_operations(after, count, false).await
159 };
160
161 match result {
162 Ok((operations, http_duration, capture, server_time)) => {
163 total_http += http_duration;
164 return Ok((operations, total_wait, total_http, capture, server_time));
165 }
166 Err(e) => {
167 last_err = Some(e);
168 let retry_after = self.last_retry_after.lock().await.take();
169 if let Some(retry_after) = retry_after {
170 let requests_in_period = self.count_requests_in_period();
171 let rate_limit = (constants::PLC_RATE_LIMIT_REQUEST as f64
172 * constants::PLC_RATE_LIMIT_SAFETY_FACTOR)
173 .floor() as usize;
174 eprintln!(
175 "[Sync] Rate limited by PLC directory ({} requests in last {:?}, limit: {}), waiting {:?} before retry {}/{}",
176 requests_in_period,
177 self.rate_limit_period,
178 rate_limit,
179 retry_after,
180 attempt,
181 5
182 );
183 if let Some(mut rx) = shutdown_rx.clone() {
184 tokio::select! {
185 _ = tokio::time::sleep(retry_after) => {}
186 _ = rx.changed() => { if *rx.borrow() { anyhow::bail!("Shutdown requested"); } }
187 }
188 } else {
189 tokio::time::sleep(retry_after).await;
190 }
191 continue;
192 }
193
194 if attempt < 5 {
195 eprintln!(
196 "[Sync] Request failed (attempt {}/{}): {}, retrying in {:?}",
197 attempt,
198 5,
199 last_err.as_ref().unwrap(),
200 backoff
201 );
202 if let Some(mut rx) = shutdown_rx.clone() {
203 tokio::select! {
204 _ = tokio::time::sleep(backoff) => {}
205 _ = rx.changed() => { if *rx.borrow() { anyhow::bail!("Shutdown requested"); } }
206 }
207 } else {
208 tokio::time::sleep(backoff).await;
209 }
210 backoff *= 2;
211 }
212 }
213 }
214 }
215
216 anyhow::bail!(
217 "Failed after {} attempts: {}",
218 5,
219 last_err.unwrap_or_else(|| anyhow::anyhow!("Unknown error"))
220 )
221 }
222
223 // Removed legacy duplicate retry path; unified via `fetch_operations_unified`
224
225 async fn do_fetch_operations(
226 &self,
227 after: &str,
228 count: usize,
229 capture_raw: bool,
230 ) -> Result<(
231 Vec<PLCOperation>,
232 Duration,
233 Option<RawExportResponse>,
234 Option<chrono::DateTime<chrono::Utc>>,
235 )> {
236 let url = format!("{}/export", self.base_url);
237 let request_start_wall = chrono::Utc::now();
238 let request_start = Instant::now();
239 let response = self
240 .client
241 .get(&url)
242 .query(&[("after", after), ("count", &count.to_string())])
243 .header("User-Agent", constants::user_agent())
244 .send()
245 .await?;
246
247 if response.status() == reqwest::StatusCode::TOO_MANY_REQUESTS {
248 let retry_after = parse_retry_after(&response);
249 *self.last_retry_after.lock().await = Some(retry_after);
250 anyhow::bail!("Rate limited (429)");
251 }
252
253 if !response.status().is_success() {
254 anyhow::bail!("PLC request failed: {}", response.status());
255 }
256 let status = if capture_raw { Some(response.status().as_u16()) } else { None };
257 let headers_vec: Option<Vec<(String, String)>> = if capture_raw {
258 Some(
259 response
260 .headers()
261 .iter()
262 .filter_map(|(k, v)| Some((k.as_str().to_string(), v.to_str().ok()?.to_string())))
263 .collect(),
264 )
265 } else {
266 None
267 };
268
269 // Extract Date header for server time
270 let server_time = response
271 .headers()
272 .get("date")
273 .and_then(|v| v.to_str().ok())
274 .and_then(|s| httpdate::parse_http_date(s).ok())
275 .map(|t| chrono::DateTime::<chrono::Utc>::from(t));
276
277 let body = response.text().await?;
278 let request_duration = request_start.elapsed();
279 let mut operations = Vec::new();
280
281 for line in body.lines() {
282 if line.trim().is_empty() {
283 continue;
284 }
285 match sonic_rs::from_str::<PLCOperation>(line) {
286 Ok(mut op) => {
287 // CRITICAL: Store raw JSON to preserve exact byte content
288 // This is required by the V1 specification (docs/specification.md § 4.2)
289 // to ensure content_hash is reproducible across implementations.
290 // Re-serializing would change key order/whitespace and break hash verification.
291 op.raw_json = Some(line.to_string());
292 operations.push(op);
293 }
294 Err(e) => eprintln!("Warning: failed to parse operation: {}", e),
295 }
296 }
297
298 let capture = if capture_raw {
299 Some(RawExportResponse {
300 status: status.unwrap(),
301 headers: headers_vec.unwrap(),
302 body,
303 http_start: request_start_wall.to_rfc3339(),
304 })
305 } else {
306 None
307 };
308
309 Ok((operations, request_duration, capture, server_time))
310 }
311
312 /// Fetch DID document raw JSON from PLC directory
313 ///
314 /// Fetches the raw JSON string for a DID document from the PLC directory.
315 /// This preserves the exact byte content as received from the source.
316 /// Uses the /{did} endpoint.
317 pub async fn fetch_did_document_raw(&self, did: &str) -> Result<String> {
318 use std::time::Instant;
319
320 // Construct DID document URL
321 // PLC directory exposes DID documents at /{did} (same as plcbundle instances)
322 let url = format!("{}/{}", self.base_url.trim_end_matches('/'), did);
323
324 log::debug!("Fetching DID document from: {}", url);
325 let request_start = Instant::now();
326
327 let response = self
328 .client
329 .get(&url)
330 .header("User-Agent", constants::user_agent())
331 .send()
332 .await
333 .context(format!("Failed to fetch DID document from {}", url))?;
334
335 let request_duration = request_start.elapsed();
336 log::debug!(
337 "HTTP request completed in {:?}, status: {}",
338 request_duration,
339 response.status()
340 );
341
342 // Handle rate limiting (429)
343 if response.status() == reqwest::StatusCode::TOO_MANY_REQUESTS {
344 let retry_after = parse_retry_after(&response);
345 *self.last_retry_after.lock().await = Some(retry_after);
346 log::warn!("Rate limited (429), retry after: {:?}", retry_after);
347 anyhow::bail!("Rate limited (429)");
348 }
349
350 if !response.status().is_success() {
351 log::error!(
352 "Unexpected status code: {} for DID document at {}",
353 response.status(),
354 url
355 );
356 anyhow::bail!(
357 "Unexpected status code: {} for DID document at {}",
358 response.status(),
359 url
360 );
361 }
362
363 let data = response.text().await?;
364 let data_size = data.len();
365 log::debug!("Received response body: {} bytes", data_size);
366
367 Ok(data)
368 }
369
370 /// Fetch DID document from PLC directory
371 ///
372 /// Fetches the W3C DID document for a given DID from the PLC directory.
373 /// Uses the /did/{did} endpoint.
374 pub async fn fetch_did_document(&self, did: &str) -> Result<DIDDocument> {
375 let data = self.fetch_did_document_raw(did).await?;
376 let document: DIDDocument =
377 sonic_rs::from_str(&data).context("Failed to parse DID document JSON")?;
378 Ok(document)
379 }
380}
381
382/// Parse the Retry-After header from a response
383/// Returns the duration to wait before retrying, capped at 60 seconds maximum
384fn parse_retry_after(response: &reqwest::Response) -> Duration {
385 const MAX_RETRY_SECONDS: u64 = 60;
386
387 if let Some(retry_after_header) = response.headers().get("retry-after")
388 && let Ok(retry_after_str) = retry_after_header.to_str()
389 {
390 // Try parsing as seconds (integer) - most common format
391 if let Ok(seconds) = retry_after_str.parse::<u64>() {
392 // Cap at maximum wait time
393 return Duration::from_secs(seconds.min(MAX_RETRY_SECONDS));
394 }
395
396 // Try parsing as HTTP date (RFC 7231)
397 // httpdate::parse_http_date returns a SystemTime
398 if let Ok(http_time) = httpdate::parse_http_date(retry_after_str)
399 && let Ok(duration) = http_time.duration_since(std::time::SystemTime::now())
400 {
401 // Cap at maximum wait time
402 return duration.min(Duration::from_secs(MAX_RETRY_SECONDS));
403 }
404 }
405
406 // Default to 60 seconds if no header or parsing fails
407 Duration::from_secs(MAX_RETRY_SECONDS)
408}
409
410/// Simple token bucket rate limiter
411/// Prevents burst requests by starting with 0 permits and refilling at steady rate
412struct RateLimiter {
413 semaphore: std::sync::Arc<tokio::sync::Semaphore>,
414 shutdown: std::sync::Arc<std::sync::atomic::AtomicBool>,
415}
416
417impl RateLimiter {
418 fn new(requests_per_period: usize, period: Duration) -> Self {
419 // Use a proper token bucket rate limiter
420 // Start with 0 permits to prevent initial burst
421 let semaphore = std::sync::Arc::new(tokio::sync::Semaphore::new(0));
422 let sem_clone = semaphore.clone();
423 let shutdown = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
424 let shutdown_clone = shutdown.clone();
425
426 let refill_rate =
427 Duration::from_secs_f64(period.as_secs_f64() / requests_per_period as f64);
428
429 // Spawn background task to refill permits at steady rate
430 // CRITICAL: Add first permit immediately, then refill at steady rate
431 let refill_rate_clone = refill_rate;
432 let capacity = requests_per_period;
433 tokio::spawn(async move {
434 // Add first permit immediately so first request can proceed
435 if sem_clone.available_permits() < capacity {
436 sem_clone.add_permits(1);
437 }
438
439 // Then refill at steady rate
440 loop {
441 if shutdown_clone.load(std::sync::atomic::Ordering::Relaxed) {
442 break;
443 }
444 tokio::time::sleep(refill_rate_clone).await;
445 // Add one permit if under capacity (burst allowed up to capacity)
446 if sem_clone.available_permits() < capacity {
447 sem_clone.add_permits(1);
448 }
449 }
450 });
451
452 Self { semaphore, shutdown }
453 }
454
455 async fn wait(&self) {
456 match self.semaphore.acquire().await {
457 Ok(permit) => permit.forget(),
458 Err(_) => {
459 log::warn!(
460 "[PLCClient] Rate limiter disabled (semaphore closed), proceeding without delay"
461 );
462 }
463 }
464 }
465
466 fn available_permits(&self) -> usize {
467 self.semaphore.available_permits()
468 }
469}
470
471impl Drop for RateLimiter {
472 fn drop(&mut self) {
473 self.shutdown.store(true, std::sync::atomic::Ordering::Relaxed);
474 }
475}
476
477#[cfg(test)]
478mod tests {
479 use super::*;
480
481 #[tokio::test]
482 async fn test_plc_client_new() {
483 let client = PLCClient::new("https://plc.directory").unwrap();
484 // Verify client was created successfully
485 assert!(client.base_url.contains("plc.directory"));
486 }
487
488 #[tokio::test]
489 async fn test_plc_client_new_with_trailing_slash() {
490 let client = PLCClient::new("https://plc.directory/").unwrap();
491 // URL should be stored as-is (no normalization in PLCClient)
492 assert!(client.base_url.contains("plc.directory"));
493 }
494}