High-performance implementation of plcbundle written in Rust

feat(sync): add fetch timing metrics to sync operations

Track and report time spent waiting for rate limits (fetch_wait_ms) and actual HTTP request duration (fetch_http_ms) during sync operations. This provides better visibility into performance characteristics of the sync process.

+53 -9
+9 -1
src/manager.rs
··· 30 30 did_index_compacted: bool, 31 31 unique_dids: u32, 32 32 size_bytes: u64, 33 + fetch_wait_ms: u64, 34 + fetch_http_ms: u64, 33 35 }, 34 36 /// Caught up to latest PLC data, mempool has partial operations 35 37 CaughtUp { ··· 1838 1840 let fetch_start = Instant::now(); 1839 1841 let mut caught_up = false; 1840 1842 const MAX_ATTEMPTS: usize = 50; 1843 + let mut total_wait = std::time::Duration::from_secs(0); 1844 + let mut total_http = std::time::Duration::from_secs(0); 1841 1845 1842 1846 while fetch_num < MAX_ATTEMPTS { 1843 1847 let stats = self.get_mempool_stats()?; ··· 1874 1878 anyhow::bail!("Shutdown requested"); 1875 1879 } 1876 1880 } 1877 - let plc_ops = if let Some(rx) = shutdown_rx.clone() { 1881 + let (plc_ops, wait_dur, http_dur) = if let Some(rx) = shutdown_rx.clone() { 1878 1882 client 1879 1883 .fetch_operations_cancelable(&after_time, request_count, Some(rx)) 1880 1884 .await? 1881 1885 } else { 1882 1886 client.fetch_operations(&after_time, request_count).await? 1883 1887 }; 1888 + total_wait += wait_dur; 1889 + total_http += http_dur; 1884 1890 1885 1891 let fetched_count = plc_ops.len(); 1886 1892 ··· 2143 2149 did_index_compacted, 2144 2150 unique_dids, 2145 2151 size_bytes, 2152 + fetch_wait_ms: total_wait.as_millis() as u64, 2153 + fetch_http_ms: total_http.as_millis() as u64, 2146 2154 }) 2147 2155 } 2148 2156
+22 -6
src/plc_client.rs
··· 68 68 } 69 69 70 70 /// Fetch operations from PLC directory export endpoint 71 - pub async fn fetch_operations(&self, after: &str, count: usize) -> Result<Vec<PLCOperation>> { 71 + pub async fn fetch_operations( 72 + &self, 73 + after: &str, 74 + count: usize, 75 + ) -> Result<(Vec<PLCOperation>, Duration, Duration)> { 72 76 self.fetch_operations_with_retry_cancelable(after, count, 5, None).await 73 77 } 74 78 ··· 77 81 after: &str, 78 82 count: usize, 79 83 shutdown_rx: Option<tokio::sync::watch::Receiver<bool>>, 80 - ) -> Result<Vec<PLCOperation>> { 84 + ) -> Result<(Vec<PLCOperation>, Duration, Duration)> { 81 85 self.fetch_operations_with_retry_cancelable(after, count, 5, shutdown_rx).await 82 86 } 83 87 ··· 87 91 count: usize, 88 92 max_retries: usize, 89 93 shutdown_rx: Option<tokio::sync::watch::Receiver<bool>>, 90 - ) -> Result<Vec<PLCOperation>> { 94 + ) -> Result<(Vec<PLCOperation>, Duration, Duration)> { 91 95 let mut backoff = Duration::from_secs(1); 92 96 let mut last_err = None; 97 + let mut total_wait = Duration::from_secs(0); 98 + let mut total_http = Duration::from_secs(0); 93 99 94 100 for attempt in 1..=max_retries { 95 101 if let Some(ref rx) = shutdown_rx { ··· 127 133 if wait_elapsed.as_nanos() > 0 { 128 134 log::debug!("[PLCClient] Rate limiter wait: {:?}", wait_elapsed); 129 135 } 136 + total_wait += wait_elapsed; 130 137 131 138 // Clear previous retry_after 132 139 *self.last_retry_after.lock().await = None; ··· 135 142 self.record_request(); 136 143 137 144 match self.do_fetch_operations(after, count).await { 138 - Ok(operations) => return Ok(operations), 145 + Ok((operations, http_duration)) => { 146 + total_http += http_duration; 147 + return Ok((operations, total_wait, total_http)); 148 + } 139 149 Err(e) => { 140 150 last_err = Some(e); 141 151 ··· 200 210 ) 201 211 } 202 212 203 - async fn do_fetch_operations(&self, after: &str, count: usize) -> Result<Vec<PLCOperation>> { 213 + async fn do_fetch_operations( 214 + &self, 215 + after: &str, 216 + count: usize, 217 + ) -> Result<(Vec<PLCOperation>, Duration)> { 204 218 let url = format!("{}/export", self.base_url); 219 + let request_start = Instant::now(); 205 220 let response = self 206 221 .client 207 222 .get(&url) ··· 222 237 } 223 238 224 239 let body = response.text().await?; 240 + let request_duration = request_start.elapsed(); 225 241 let mut operations = Vec::new(); 226 242 227 243 for line in body.lines() { ··· 241 257 } 242 258 } 243 259 244 - Ok(operations) 260 + Ok((operations, request_duration)) 245 261 } 246 262 247 263 /// Fetch DID document raw JSON from PLC directory
+22 -2
src/sync.rs
··· 100 100 101 101 unique_dids: u32, 102 102 size_bytes: u64, 103 + fetch_wait_ms: u64, 104 + fetch_http_ms: u64, 103 105 }, 104 106 CaughtUp { 105 107 next_bundle: u32, ··· 174 176 did_index_compacted: bool, 175 177 unique_dids: u32, 176 178 size_bytes: u64, 179 + fetch_wait_ms: u64, 180 + fetch_http_ms: u64, 177 181 ); 178 182 179 183 // Allow the sync logger to accept multiple arguments for detailed bundle info ··· 274 278 _did_index_compacted: bool, 275 279 unique_dids: u32, 276 280 size_bytes: u64, 281 + fetch_wait_ms: u64, 282 + fetch_http_ms: u64, 277 283 ) { 278 - let fetch_secs = fetch_duration_ms as f64 / 1000.0; 284 + let fetch_secs = fetch_http_ms as f64 / 1000.0; 285 + let wait_secs = fetch_wait_ms as f64 / 1000.0; 279 286 let size_kb = size_bytes as f64 / 1024.0; 280 287 let size_str = if size_kb >= 1024.0 { 281 288 format!("{:.1}MB", size_kb / 1024.0) ··· 284 291 }; 285 292 286 293 eprintln!( 287 - "[INFO] → Bundle {:06} | {} | {} dids | {} | fetch: {:.2}s ({} reqs) | save: {}ms | index: {}ms | {}", 294 + "[INFO] → Bundle {:06} | {} | {} dids | {} | fetch: {:.2}s ({} reqs, {:.1}s wait) | save: {}ms | index: {}ms | {}", 288 295 bundle_num, 289 296 hash, 290 297 unique_dids, 291 298 size_str, 292 299 fetch_secs, 293 300 fetch_requests, 301 + wait_secs, 294 302 bundle_save_ms, 295 303 index_ms, 296 304 age ··· 411 419 did_index_compacted, 412 420 unique_dids, 413 421 size_bytes, 422 + fetch_wait_ms, 423 + fetch_http_ms, 414 424 } => { 415 425 logger.on_bundle_created( 416 426 *bundle_num, ··· 424 434 *did_index_compacted, 425 435 *unique_dids, 426 436 *size_bytes, 437 + *fetch_wait_ms, 438 + *fetch_http_ms, 427 439 ); 428 440 } 429 441 SyncEvent::CaughtUp { ··· 508 520 did_index_compacted, 509 521 unique_dids, 510 522 size_bytes, 523 + fetch_wait_ms, 524 + fetch_http_ms, 511 525 }) => { 512 526 synced += 1; 513 527 ··· 523 537 did_index_compacted, 524 538 unique_dids, 525 539 size_bytes, 540 + fetch_wait_ms, 541 + fetch_http_ms, 526 542 }); 527 543 528 544 // Show compaction message if index was compacted ··· 636 652 did_index_compacted, 637 653 unique_dids, 638 654 size_bytes, 655 + fetch_wait_ms, 656 + fetch_http_ms, 639 657 }) => { 640 658 total_synced += 1; 641 659 ··· 656 674 did_index_compacted, 657 675 unique_dids, 658 676 size_bytes, 677 + fetch_wait_ms, 678 + fetch_http_ms, 659 679 }); 660 680 661 681 // Show compaction message if index was compacted