High-performance implementation of plcbundle written in Rust

refactor: update rate limiting constants and query mode

+39 -89
-62
src/bundle_format.rs
··· 451 451 } 452 452 453 453 #[test] 454 - fn test_parallel_compression_matches_sequential() { 455 - use crate::operations::Operation; 456 - use sonic_rs::{Value, from_str}; 457 - 458 - // Create test operations 459 - let mut operations = Vec::new(); 460 - for i in 0..250 { 461 - // Multiple frames (250 ops = 3 frames with FRAME_SIZE=100) 462 - let operation_json = format!(r#"{{"type":"create","data":"test data {}"}}"#, i); 463 - let operation_value: Value = from_str(&operation_json).unwrap(); 464 - let extra_value: Value = from_str("{}").unwrap_or_else(|_| Value::new()); 465 - operations.push(Operation { 466 - did: format!("did:plc:test{}", i), 467 - operation: operation_value, 468 - cid: Some(format!("cid{}", i)), 469 - created_at: "2024-01-01T00:00:00Z".to_string(), 470 - nullified: false, 471 - extra: extra_value, 472 - raw_json: None, 473 - }); 474 - } 475 - 476 - // Compress using both methods 477 - let result_sequential = compress_operations_to_frames(&operations).unwrap(); 478 - let result_parallel = compress_operations_to_frames_parallel(&operations).unwrap(); 479 - 480 - // Verify results match 481 - assert_eq!( 482 - result_sequential.compressed_frames.len(), 483 - result_parallel.compressed_frames.len() 484 - ); 485 - assert_eq!( 486 - result_sequential.frame_offsets, 487 - result_parallel.frame_offsets 488 - ); 489 - assert_eq!( 490 - result_sequential.uncompressed_size, 491 - result_parallel.uncompressed_size 492 - ); 493 - assert_eq!( 494 - result_sequential.compressed_size, 495 - result_parallel.compressed_size 496 - ); 497 - 498 - // Verify compressed frames are identical 499 - for (seq_frame, par_frame) in result_sequential 500 - .compressed_frames 501 - .iter() 502 - .zip(result_parallel.compressed_frames.iter()) 503 - { 504 - assert_eq!(seq_frame, par_frame, "Compressed frames must be identical"); 505 - } 506 - 507 - println!("✓ Parallel compression produces identical output to sequential"); 508 - println!(" Frames: {}", result_parallel.compressed_frames.len()); 509 - println!( 510 - " Compressed size: {} bytes", 511 - result_parallel.compressed_size 512 - ); 513 - } 514 - 515 - #[test] 516 454 fn test_skippable_frame_roundtrip() { 517 455 let data = b"test data"; 518 456 let mut buffer = Vec::new();
+11 -4
src/constants.rs
··· 42 42 // Rate Limiting Constants 43 43 // ============================================================================ 44 44 45 - /// Default rate limit for PLC API requests (requests per minute) 46 - /// Set to 80% of quota (72 req/min) to provide safety margin and prevent rate limiting 47 - pub const DEFAULT_RATE_LIMIT: usize = 72; 45 + /// PLC directory quota: maximum requests allowed per period 46 + pub const PLC_RATE_LIMIT_REQUEST: usize = 500; 47 + 48 + /// Rate limiting window length in seconds (measurement and enforcement are equal) 49 + pub const PLC_RATE_LIMIT_PERIOD: u64 = 300; 50 + 51 + /// Safety margin applied to quota to avoid hitting limits (e.g., 0.8 = 80%) 52 + pub const PLC_RATE_LIMIT_SAFETY_FACTOR: f64 = 0.8; 48 53 49 54 // ============================================================================ 50 55 // Timeout Constants (in seconds) ··· 301 306 fn test_constants_values() { 302 307 assert_eq!(BUNDLE_SIZE, 10_000); 303 308 assert_eq!(FRAME_SIZE, 100); 304 - assert_eq!(DEFAULT_RATE_LIMIT, 72); 309 + assert_eq!(PLC_RATE_LIMIT_REQUEST, 500); 310 + assert_eq!(PLC_RATE_LIMIT_PERIOD, 300); 311 + assert!((PLC_RATE_LIMIT_SAFETY_FACTOR - 0.8).abs() < f64::EPSILON); 305 312 assert_eq!(HTTP_TIMEOUT_SECS, 60); 306 313 assert_eq!(HTTP_INDEX_TIMEOUT_SECS, 30); 307 314 assert_eq!(HTTP_BUNDLE_TIMEOUT_SECS, 60);
+1 -1
src/lib.rs
··· 19 19 //! assert!(resolved.locations_found >= 0); 20 20 //! 21 21 //! // Query a bundle range 22 - //! let spec = QuerySpec { bundles: BundleRange::Range(1, 10), filter: None, query: String::new(), mode: QueryMode::All }; 22 + //! let spec = QuerySpec { bundles: BundleRange::Range(1, 10), filter: None, query: String::new(), mode: QueryMode::Simple }; 23 23 //! for item in mgr.query(spec) { let _ = item?; } 24 24 //! # Ok::<(), anyhow::Error>(()) 25 25 //! ```
+1 -1
src/manager.rs
··· 74 74 /// println!("Resolved to shard {}", resolved.shard_num); 75 75 /// 76 76 /// // Query a range and export 77 - /// let spec = QuerySpec { bundles: BundleRange::Range(40, 45), filter: None, query: String::new(), mode: QueryMode::All }; 77 + /// let spec = QuerySpec { bundles: BundleRange::Range(40, 45), filter: None, query: String::new(), mode: QueryMode::Simple }; 78 78 /// let mut count = 0u64; 79 79 /// for item in mgr.query(spec) { count += 1; } 80 80 /// assert!(count > 0);
+9 -4
src/plc_client.rs
··· 22 22 23 23 impl PLCClient { 24 24 pub fn new(base_url: impl Into<String>) -> Result<Self> { 25 - let rate_limit_period = Duration::from_secs(constants::HTTP_TIMEOUT_SECS); 25 + let period = Duration::from_secs(constants::PLC_RATE_LIMIT_PERIOD); 26 + let requests_per_period = (constants::PLC_RATE_LIMIT_REQUEST as f64 27 + * constants::PLC_RATE_LIMIT_SAFETY_FACTOR) 28 + .floor() as usize; 26 29 Ok(Self { 27 30 client: reqwest::Client::builder() 28 31 .timeout(Duration::from_secs(constants::HTTP_TIMEOUT_SECS)) 29 32 .build()?, 30 33 base_url: base_url.into(), 31 - rate_limiter: RateLimiter::new(constants::DEFAULT_RATE_LIMIT, rate_limit_period), 34 + rate_limiter: RateLimiter::new(requests_per_period, period), 32 35 last_retry_after: std::sync::Arc::new(tokio::sync::Mutex::new(None)), 33 36 request_timestamps: Arc::new(std::sync::Mutex::new(VecDeque::new())), 34 - rate_limit_period, 37 + rate_limit_period: period, 35 38 }) 36 39 } 37 40 ··· 97 100 let retry_after = self.last_retry_after.lock().await.take(); 98 101 if let Some(retry_after) = retry_after { 99 102 let requests_in_period = self.count_requests_in_period(); 100 - let rate_limit = constants::DEFAULT_RATE_LIMIT; 103 + let rate_limit = (constants::PLC_RATE_LIMIT_REQUEST as f64 104 + * constants::PLC_RATE_LIMIT_SAFETY_FACTOR) 105 + .floor() as usize; 101 106 eprintln!( 102 107 "[Sync] Rate limited by PLC directory ({} requests in last {:?}, limit: {}), waiting {:?} before retry {}/{}", 103 108 requests_in_period,
+15 -14
src/server/handle_random.rs
··· 24 24 if count > 1000 { 25 25 return bad_request("count must be <= 1000").into_response(); 26 26 } 27 - let effective_seed = params 28 - .seed 29 - .unwrap_or_else(|| { 30 - SystemTime::now() 31 - .duration_since(UNIX_EPOCH) 32 - .unwrap() 33 - .as_nanos() as u64 34 - }); 27 + let effective_seed = params.seed.unwrap_or_else(|| { 28 + SystemTime::now() 29 + .duration_since(UNIX_EPOCH) 30 + .unwrap() 31 + .as_nanos() as u64 32 + }); 35 33 if count == 0 { 36 34 return ( 37 35 StatusCode::OK, ··· 51 49 Err(e) => return task_join_error(e).into_response(), 52 50 }; 53 51 54 - (StatusCode::OK, axum::Json(json!({ 55 - "dids": dids, 56 - "count": count, 57 - "seed": effective_seed 58 - }))) 52 + ( 53 + StatusCode::OK, 54 + axum::Json(json!({ 55 + "dids": dids, 56 + "count": count, 57 + "seed": effective_seed 58 + })), 59 + ) 59 60 .into_response() 60 - } 61 + }
+2 -2
tests/common/mod.rs
··· 5 5 use tempfile::TempDir; 6 6 7 7 use plcbundle::BundleManager; 8 + use plcbundle::Operation; 8 9 use plcbundle::index::{BundleMetadata, Index}; 9 - use plcbundle::operations::Operation; 10 10 11 11 pub fn setup_manager(dir: &PathBuf) -> Result<BundleManager> { 12 - plcbundle::Index::init(dir, "http://localhost:1234".to_string(), true)?; 12 + Index::init(dir, "http://localhost:1234".to_string(), true)?; 13 13 let manager = BundleManager::new(dir.clone(), ())?; 14 14 Ok(manager) 15 15 }
-1
tests/manager.rs
··· 91 91 bundles: plcbundle::BundleRange::Single(1), 92 92 format: plcbundle::ExportFormat::JsonLines, 93 93 filter: None, 94 - compression: None, 95 94 count: Some(5), 96 95 after_timestamp: None, 97 96 };