High-performance implementation of plcbundle written in Rust
at main 494 lines 18 kB view raw
1//! Rate-limited HTTP client for PLC directory: fetch operations and DID documents with retries/backoff and token-bucket limiting 2// PLC Client - HTTP client for interacting with PLC directory APIs 3use crate::constants; 4use crate::resolver::DIDDocument; 5use anyhow::{Context, Result}; 6use std::collections::VecDeque; 7use std::sync::Arc; 8use std::time::{Duration, Instant}; 9 10// Import PLCOperation from sync module (needed for fetch_operations) 11use crate::sync::PLCOperation; 12 13/// HTTP client for PLC directory with rate limiting and retry logic 14pub struct PLCClient { 15 client: reqwest::Client, 16 base_url: String, 17 rate_limiter: RateLimiter, 18 last_retry_after: std::sync::Arc<tokio::sync::Mutex<Option<Duration>>>, 19 request_timestamps: Arc<std::sync::Mutex<VecDeque<Instant>>>, 20 rate_limit_period: Duration, 21} 22 23pub struct RawExportResponse { 24 pub status: u16, 25 pub headers: Vec<(String, String)>, 26 pub body: String, 27 pub http_start: String, 28} 29 30impl PLCClient { 31 pub fn new(base_url: impl Into<String>) -> Result<Self> { 32 let period = Duration::from_secs(constants::PLC_RATE_LIMIT_PERIOD); 33 let requests_per_period = (constants::PLC_RATE_LIMIT_REQUEST as f64 34 * constants::PLC_RATE_LIMIT_SAFETY_FACTOR) 35 .floor() as usize; 36 Ok(Self { 37 client: reqwest::Client::builder() 38 .timeout(Duration::from_secs(constants::HTTP_TIMEOUT_SECS)) 39 .build()?, 40 base_url: base_url.into(), 41 rate_limiter: RateLimiter::new(requests_per_period, period), 42 last_retry_after: std::sync::Arc::new(tokio::sync::Mutex::new(None)), 43 request_timestamps: Arc::new(std::sync::Mutex::new(VecDeque::new())), 44 rate_limit_period: period, 45 }) 46 } 47 48 pub fn build_export_url(&self, after: &str, count: usize) -> String { 49 format!("{}/export?after={}&count={}", self.base_url, after, count) 50 } 51 52 /// Record a request timestamp and clean up old entries 53 fn record_request(&self) { 54 let now = Instant::now(); 55 let mut timestamps = self.request_timestamps.lock().unwrap(); 56 57 // Remove timestamps older than the rate limit period 58 // If checked_sub fails (shouldn't happen in practice), use now as cutoff (counts all) 59 let cutoff = now.checked_sub(self.rate_limit_period).unwrap_or(now); 60 while let Some(&oldest) = timestamps.front() { 61 if oldest < cutoff { 62 timestamps.pop_front(); 63 } else { 64 break; 65 } 66 } 67 68 timestamps.push_back(now); 69 } 70 71 /// Count requests made in the rate limit period 72 fn count_requests_in_period(&self) -> usize { 73 let now = Instant::now(); 74 let timestamps = self.request_timestamps.lock().unwrap(); 75 76 // If checked_sub fails (shouldn't happen in practice), use now as cutoff (counts all) 77 let cutoff = now.checked_sub(self.rate_limit_period).unwrap_or(now); 78 timestamps.iter().filter(|&&ts| ts >= cutoff).count() 79 } 80 81 /// Fetch operations from PLC directory export endpoint 82 pub async fn fetch_operations( 83 &self, 84 after: &str, 85 count: usize, 86 shutdown_rx: Option<tokio::sync::watch::Receiver<bool>>, 87 capture_raw: bool, 88 ) -> Result<( 89 Vec<PLCOperation>, 90 Duration, 91 Duration, 92 Option<RawExportResponse>, 93 Option<chrono::DateTime<chrono::Utc>>, 94 )> { 95 self.fetch_operations_unified(after, count, shutdown_rx, capture_raw) 96 .await 97 } 98 99 // merged into `fetch_operations` 100 101 // merged into `fetch_operations` 102 103 async fn fetch_operations_unified( 104 &self, 105 after: &str, 106 count: usize, 107 shutdown_rx: Option<tokio::sync::watch::Receiver<bool>>, 108 capture_raw: bool, 109 ) -> Result<( 110 Vec<PLCOperation>, 111 Duration, 112 Duration, 113 Option<RawExportResponse>, 114 Option<chrono::DateTime<chrono::Utc>>, 115 )> { 116 let mut backoff = Duration::from_secs(1); 117 let mut last_err = None; 118 let mut total_wait = Duration::from_secs(0); 119 let mut total_http = Duration::from_secs(0); 120 121 for attempt in 1..=5 { 122 if let Some(ref rx) = shutdown_rx && *rx.borrow() { 123 anyhow::bail!("Shutdown requested"); 124 } 125 let export_url = format!("{}/export?after={}&count={}", self.base_url, after, count); 126 127 let permits = self.rate_limiter.available_permits(); 128 let requests_in_period = self.count_requests_in_period(); 129 log::debug!( 130 "[PLCClient] Preparing /export request: {} | permits={} | window={:?} | requests_in_window={}", 131 export_url, 132 permits, 133 self.rate_limit_period, 134 requests_in_period 135 ); 136 137 let wait_start = Instant::now(); 138 if let Some(mut rx) = shutdown_rx.clone() { 139 tokio::select! { 140 _ = self.rate_limiter.wait() => {} 141 _ = rx.changed() => { if *rx.borrow() { anyhow::bail!("Shutdown requested"); } } 142 } 143 } else { 144 self.rate_limiter.wait().await; 145 } 146 let wait_elapsed = wait_start.elapsed(); 147 if wait_elapsed.as_nanos() > 0 { 148 log::debug!("[PLCClient] Rate limiter wait: {:?}", wait_elapsed); 149 } 150 total_wait += wait_elapsed; 151 152 *self.last_retry_after.lock().await = None; 153 self.record_request(); 154 155 let result = if capture_raw { 156 self.do_fetch_operations(after, count, true).await 157 } else { 158 self.do_fetch_operations(after, count, false).await 159 }; 160 161 match result { 162 Ok((operations, http_duration, capture, server_time)) => { 163 total_http += http_duration; 164 return Ok((operations, total_wait, total_http, capture, server_time)); 165 } 166 Err(e) => { 167 last_err = Some(e); 168 let retry_after = self.last_retry_after.lock().await.take(); 169 if let Some(retry_after) = retry_after { 170 let requests_in_period = self.count_requests_in_period(); 171 let rate_limit = (constants::PLC_RATE_LIMIT_REQUEST as f64 172 * constants::PLC_RATE_LIMIT_SAFETY_FACTOR) 173 .floor() as usize; 174 eprintln!( 175 "[Sync] Rate limited by PLC directory ({} requests in last {:?}, limit: {}), waiting {:?} before retry {}/{}", 176 requests_in_period, 177 self.rate_limit_period, 178 rate_limit, 179 retry_after, 180 attempt, 181 5 182 ); 183 if let Some(mut rx) = shutdown_rx.clone() { 184 tokio::select! { 185 _ = tokio::time::sleep(retry_after) => {} 186 _ = rx.changed() => { if *rx.borrow() { anyhow::bail!("Shutdown requested"); } } 187 } 188 } else { 189 tokio::time::sleep(retry_after).await; 190 } 191 continue; 192 } 193 194 if attempt < 5 { 195 eprintln!( 196 "[Sync] Request failed (attempt {}/{}): {}, retrying in {:?}", 197 attempt, 198 5, 199 last_err.as_ref().unwrap(), 200 backoff 201 ); 202 if let Some(mut rx) = shutdown_rx.clone() { 203 tokio::select! { 204 _ = tokio::time::sleep(backoff) => {} 205 _ = rx.changed() => { if *rx.borrow() { anyhow::bail!("Shutdown requested"); } } 206 } 207 } else { 208 tokio::time::sleep(backoff).await; 209 } 210 backoff *= 2; 211 } 212 } 213 } 214 } 215 216 anyhow::bail!( 217 "Failed after {} attempts: {}", 218 5, 219 last_err.unwrap_or_else(|| anyhow::anyhow!("Unknown error")) 220 ) 221 } 222 223 // Removed legacy duplicate retry path; unified via `fetch_operations_unified` 224 225 async fn do_fetch_operations( 226 &self, 227 after: &str, 228 count: usize, 229 capture_raw: bool, 230 ) -> Result<( 231 Vec<PLCOperation>, 232 Duration, 233 Option<RawExportResponse>, 234 Option<chrono::DateTime<chrono::Utc>>, 235 )> { 236 let url = format!("{}/export", self.base_url); 237 let request_start_wall = chrono::Utc::now(); 238 let request_start = Instant::now(); 239 let response = self 240 .client 241 .get(&url) 242 .query(&[("after", after), ("count", &count.to_string())]) 243 .header("User-Agent", constants::user_agent()) 244 .send() 245 .await?; 246 247 if response.status() == reqwest::StatusCode::TOO_MANY_REQUESTS { 248 let retry_after = parse_retry_after(&response); 249 *self.last_retry_after.lock().await = Some(retry_after); 250 anyhow::bail!("Rate limited (429)"); 251 } 252 253 if !response.status().is_success() { 254 anyhow::bail!("PLC request failed: {}", response.status()); 255 } 256 let status = if capture_raw { Some(response.status().as_u16()) } else { None }; 257 let headers_vec: Option<Vec<(String, String)>> = if capture_raw { 258 Some( 259 response 260 .headers() 261 .iter() 262 .filter_map(|(k, v)| Some((k.as_str().to_string(), v.to_str().ok()?.to_string()))) 263 .collect(), 264 ) 265 } else { 266 None 267 }; 268 269 // Extract Date header for server time 270 let server_time = response 271 .headers() 272 .get("date") 273 .and_then(|v| v.to_str().ok()) 274 .and_then(|s| httpdate::parse_http_date(s).ok()) 275 .map(|t| chrono::DateTime::<chrono::Utc>::from(t)); 276 277 let body = response.text().await?; 278 let request_duration = request_start.elapsed(); 279 let mut operations = Vec::new(); 280 281 for line in body.lines() { 282 if line.trim().is_empty() { 283 continue; 284 } 285 match sonic_rs::from_str::<PLCOperation>(line) { 286 Ok(mut op) => { 287 // CRITICAL: Store raw JSON to preserve exact byte content 288 // This is required by the V1 specification (docs/specification.md § 4.2) 289 // to ensure content_hash is reproducible across implementations. 290 // Re-serializing would change key order/whitespace and break hash verification. 291 op.raw_json = Some(line.to_string()); 292 operations.push(op); 293 } 294 Err(e) => eprintln!("Warning: failed to parse operation: {}", e), 295 } 296 } 297 298 let capture = if capture_raw { 299 Some(RawExportResponse { 300 status: status.unwrap(), 301 headers: headers_vec.unwrap(), 302 body, 303 http_start: request_start_wall.to_rfc3339(), 304 }) 305 } else { 306 None 307 }; 308 309 Ok((operations, request_duration, capture, server_time)) 310 } 311 312 /// Fetch DID document raw JSON from PLC directory 313 /// 314 /// Fetches the raw JSON string for a DID document from the PLC directory. 315 /// This preserves the exact byte content as received from the source. 316 /// Uses the /{did} endpoint. 317 pub async fn fetch_did_document_raw(&self, did: &str) -> Result<String> { 318 use std::time::Instant; 319 320 // Construct DID document URL 321 // PLC directory exposes DID documents at /{did} (same as plcbundle instances) 322 let url = format!("{}/{}", self.base_url.trim_end_matches('/'), did); 323 324 log::debug!("Fetching DID document from: {}", url); 325 let request_start = Instant::now(); 326 327 let response = self 328 .client 329 .get(&url) 330 .header("User-Agent", constants::user_agent()) 331 .send() 332 .await 333 .context(format!("Failed to fetch DID document from {}", url))?; 334 335 let request_duration = request_start.elapsed(); 336 log::debug!( 337 "HTTP request completed in {:?}, status: {}", 338 request_duration, 339 response.status() 340 ); 341 342 // Handle rate limiting (429) 343 if response.status() == reqwest::StatusCode::TOO_MANY_REQUESTS { 344 let retry_after = parse_retry_after(&response); 345 *self.last_retry_after.lock().await = Some(retry_after); 346 log::warn!("Rate limited (429), retry after: {:?}", retry_after); 347 anyhow::bail!("Rate limited (429)"); 348 } 349 350 if !response.status().is_success() { 351 log::error!( 352 "Unexpected status code: {} for DID document at {}", 353 response.status(), 354 url 355 ); 356 anyhow::bail!( 357 "Unexpected status code: {} for DID document at {}", 358 response.status(), 359 url 360 ); 361 } 362 363 let data = response.text().await?; 364 let data_size = data.len(); 365 log::debug!("Received response body: {} bytes", data_size); 366 367 Ok(data) 368 } 369 370 /// Fetch DID document from PLC directory 371 /// 372 /// Fetches the W3C DID document for a given DID from the PLC directory. 373 /// Uses the /did/{did} endpoint. 374 pub async fn fetch_did_document(&self, did: &str) -> Result<DIDDocument> { 375 let data = self.fetch_did_document_raw(did).await?; 376 let document: DIDDocument = 377 sonic_rs::from_str(&data).context("Failed to parse DID document JSON")?; 378 Ok(document) 379 } 380} 381 382/// Parse the Retry-After header from a response 383/// Returns the duration to wait before retrying, capped at 60 seconds maximum 384fn parse_retry_after(response: &reqwest::Response) -> Duration { 385 const MAX_RETRY_SECONDS: u64 = 60; 386 387 if let Some(retry_after_header) = response.headers().get("retry-after") 388 && let Ok(retry_after_str) = retry_after_header.to_str() 389 { 390 // Try parsing as seconds (integer) - most common format 391 if let Ok(seconds) = retry_after_str.parse::<u64>() { 392 // Cap at maximum wait time 393 return Duration::from_secs(seconds.min(MAX_RETRY_SECONDS)); 394 } 395 396 // Try parsing as HTTP date (RFC 7231) 397 // httpdate::parse_http_date returns a SystemTime 398 if let Ok(http_time) = httpdate::parse_http_date(retry_after_str) 399 && let Ok(duration) = http_time.duration_since(std::time::SystemTime::now()) 400 { 401 // Cap at maximum wait time 402 return duration.min(Duration::from_secs(MAX_RETRY_SECONDS)); 403 } 404 } 405 406 // Default to 60 seconds if no header or parsing fails 407 Duration::from_secs(MAX_RETRY_SECONDS) 408} 409 410/// Simple token bucket rate limiter 411/// Prevents burst requests by starting with 0 permits and refilling at steady rate 412struct RateLimiter { 413 semaphore: std::sync::Arc<tokio::sync::Semaphore>, 414 shutdown: std::sync::Arc<std::sync::atomic::AtomicBool>, 415} 416 417impl RateLimiter { 418 fn new(requests_per_period: usize, period: Duration) -> Self { 419 // Use a proper token bucket rate limiter 420 // Start with 0 permits to prevent initial burst 421 let semaphore = std::sync::Arc::new(tokio::sync::Semaphore::new(0)); 422 let sem_clone = semaphore.clone(); 423 let shutdown = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)); 424 let shutdown_clone = shutdown.clone(); 425 426 let refill_rate = 427 Duration::from_secs_f64(period.as_secs_f64() / requests_per_period as f64); 428 429 // Spawn background task to refill permits at steady rate 430 // CRITICAL: Add first permit immediately, then refill at steady rate 431 let refill_rate_clone = refill_rate; 432 let capacity = requests_per_period; 433 tokio::spawn(async move { 434 // Add first permit immediately so first request can proceed 435 if sem_clone.available_permits() < capacity { 436 sem_clone.add_permits(1); 437 } 438 439 // Then refill at steady rate 440 loop { 441 if shutdown_clone.load(std::sync::atomic::Ordering::Relaxed) { 442 break; 443 } 444 tokio::time::sleep(refill_rate_clone).await; 445 // Add one permit if under capacity (burst allowed up to capacity) 446 if sem_clone.available_permits() < capacity { 447 sem_clone.add_permits(1); 448 } 449 } 450 }); 451 452 Self { semaphore, shutdown } 453 } 454 455 async fn wait(&self) { 456 match self.semaphore.acquire().await { 457 Ok(permit) => permit.forget(), 458 Err(_) => { 459 log::warn!( 460 "[PLCClient] Rate limiter disabled (semaphore closed), proceeding without delay" 461 ); 462 } 463 } 464 } 465 466 fn available_permits(&self) -> usize { 467 self.semaphore.available_permits() 468 } 469} 470 471impl Drop for RateLimiter { 472 fn drop(&mut self) { 473 self.shutdown.store(true, std::sync::atomic::Ordering::Relaxed); 474 } 475} 476 477#[cfg(test)] 478mod tests { 479 use super::*; 480 481 #[tokio::test] 482 async fn test_plc_client_new() { 483 let client = PLCClient::new("https://plc.directory").unwrap(); 484 // Verify client was created successfully 485 assert!(client.base_url.contains("plc.directory")); 486 } 487 488 #[tokio::test] 489 async fn test_plc_client_new_with_trailing_slash() { 490 let client = PLCClient::new("https://plc.directory/").unwrap(); 491 // URL should be stored as-is (no normalization in PLCClient) 492 assert!(client.base_url.contains("plc.directory")); 493 } 494}