High-performance implementation of plcbundle written in Rust
at main 982 lines 39 kB view raw
1//! Bundle repository index: load/save, sequence validation, and rebuild from bundle metadata 2// Replace your current src/index.rs with this: 3 4use anyhow::Result; 5use serde::{Deserialize, Serialize}; // Add Serialize here 6use std::fs::File; 7use std::path::Path; 8 9#[derive(Debug, Deserialize, Serialize, Clone)] // Add Clone here 10pub struct Index { 11 pub version: String, 12 pub origin: String, 13 pub last_bundle: u32, 14 pub updated_at: String, 15 pub total_size_bytes: u64, 16 pub total_uncompressed_size_bytes: u64, 17 pub bundles: Vec<BundleMetadata>, 18} 19 20#[derive(Debug, Deserialize, Serialize, Clone)] // Add Serialize here 21pub struct BundleMetadata { 22 pub bundle_number: u32, 23 pub start_time: String, 24 pub end_time: String, 25 pub operation_count: u32, 26 pub did_count: u32, 27 pub hash: String, 28 pub content_hash: String, 29 #[serde(default)] 30 pub parent: String, // Empty string for first bundle 31 pub compressed_hash: String, 32 pub compressed_size: u64, 33 pub uncompressed_size: u64, 34 #[serde(default)] 35 pub cursor: String, // Empty string for first bundle 36 pub created_at: String, 37} 38 39impl Index { 40 pub fn load<P: AsRef<Path>>(directory: P) -> Result<Self> { 41 let index_path = directory.as_ref().join("plc_bundles.json"); 42 let display_path = index_path 43 .canonicalize() 44 .unwrap_or_else(|_| index_path.clone()); 45 log::debug!( 46 "[BundleManager] Loading index from: {}", 47 display_path.display() 48 ); 49 let start = std::time::Instant::now(); 50 let file = File::open(&index_path)?; 51 let index: Index = sonic_rs::from_reader(file)?; 52 let elapsed = start.elapsed(); 53 let elapsed_ms = elapsed.as_secs_f64() * 1000.0; 54 log::debug!( 55 "[BundleManager] Index loaded: v{} ({}), {} bundles, last bundle: {} ({:.3}ms)", 56 index.version, 57 index.origin, 58 index.bundles.len(), 59 index.last_bundle, 60 elapsed_ms 61 ); 62 63 // Validate bundle sequence integrity 64 index.validate_bundle_sequence()?; 65 66 Ok(index) 67 } 68 69 /// Validate that bundles form a consecutive sequence starting from 1 70 /// This ensures no gaps in the bundle chain and that the first bundle is always 1 71 fn validate_bundle_sequence(&self) -> Result<()> { 72 if self.bundles.is_empty() { 73 return Ok(()); // Empty index is valid 74 } 75 76 // First bundle must be 1 77 let first_bundle = self.bundles.first().unwrap().bundle_number; 78 if first_bundle != 1 { 79 anyhow::bail!( 80 "Invalid bundle sequence: first bundle is {} but must be 1", 81 first_bundle 82 ); 83 } 84 85 // Check for gaps in sequence 86 for i in 0..self.bundles.len() { 87 let expected = (i + 1) as u32; 88 let actual = self.bundles[i].bundle_number; 89 if actual != expected { 90 anyhow::bail!( 91 "Gap detected in bundle sequence: expected bundle {}, found bundle {}", 92 expected, 93 actual 94 ); 95 } 96 } 97 98 // Verify last_bundle matches the last bundle in the array 99 let last_in_array = self.bundles.last().unwrap().bundle_number; 100 if self.last_bundle != last_in_array { 101 anyhow::bail!( 102 "Inconsistent last_bundle: index says {}, but last bundle in array is {}", 103 self.last_bundle, 104 last_in_array 105 ); 106 } 107 108 Ok(()) 109 } 110 111 /// Save index to disk atomically 112 pub fn save<P: AsRef<Path>>(&self, directory: P) -> Result<()> { 113 use anyhow::Context; 114 115 // Validate bundle sequence before saving 116 self.validate_bundle_sequence() 117 .context("Cannot save invalid index")?; 118 119 let index_path = directory.as_ref().join("plc_bundles.json"); 120 let temp_path = index_path.with_extension("json.tmp"); 121 122 let json = sonic_rs::to_string_pretty(self).context("Failed to serialize index")?; 123 124 std::fs::write(&temp_path, json) 125 .with_context(|| format!("Failed to write temp index: {}", temp_path.display()))?; 126 127 std::fs::rename(&temp_path, &index_path) 128 .with_context(|| format!("Failed to rename index: {}", index_path.display()))?; 129 130 Ok(()) 131 } 132 133 /// Initialize a new repository with an empty index 134 /// 135 /// Creates all necessary directories and an empty index file. 136 /// This is idempotent - if the repository already exists, it will return an error 137 /// unless `force` is true, in which case it will reinitialize. 138 /// 139 /// # Arguments 140 /// * `directory` - Directory to initialize 141 /// * `origin` - PLC directory URL or origin identifier 142 /// * `force` - Whether to reinitialize if already exists 143 /// 144 /// # Returns 145 /// True if initialized (created new), False if already existed and force=false 146 pub fn init<P: AsRef<Path>>(directory: P, origin: String, force: bool) -> Result<bool> { 147 use anyhow::Context; 148 149 let dir = directory.as_ref(); 150 let index_path = dir.join("plc_bundles.json"); 151 152 // Check if already initialized 153 if index_path.exists() && !force { 154 return Ok(false); // Already initialized 155 } 156 157 // Create directory if it doesn't exist 158 if !dir.exists() { 159 std::fs::create_dir_all(dir) 160 .with_context(|| format!("Failed to create directory: {}", dir.display()))?; 161 } 162 163 // Create .plcbundle directory for DID index 164 let plcbundle_dir = dir.join(crate::constants::DID_INDEX_DIR); 165 if !plcbundle_dir.exists() { 166 std::fs::create_dir_all(&plcbundle_dir).with_context(|| { 167 format!( 168 "Failed to create DID index directory: {}", 169 plcbundle_dir.display() 170 ) 171 })?; 172 } 173 174 // Create and save empty index 175 let index = Index { 176 version: "1.0".to_string(), 177 origin, 178 last_bundle: 0, 179 updated_at: chrono::Utc::now().to_rfc3339(), 180 total_size_bytes: 0, 181 total_uncompressed_size_bytes: 0, 182 bundles: Vec::new(), 183 }; 184 185 index.save(dir)?; 186 187 Ok(true) // Successfully initialized 188 } 189 190 /// Rebuild index from existing bundle files by scanning their metadata 191 /// 192 /// This scans all .jsonl.zst files in the directory and reconstructs the index 193 /// by extracting embedded metadata from each bundle's skippable frame. 194 /// 195 /// # Arguments 196 /// * `directory` - Directory containing bundle files 197 /// * `origin` - Optional origin URL (auto-detected from first bundle if None) 198 /// * `progress_cb` - Optional progress callback (current, total) 199 /// 200 /// # Returns 201 /// Reconstructed Index ready to be saved 202 pub fn rebuild_from_bundles<P: AsRef<Path>, F>( 203 directory: P, 204 origin: Option<String>, 205 progress_cb: Option<F>, 206 ) -> Result<Self> 207 where 208 F: Fn(usize, usize, u64, u64) + Send + Sync, // (current, total, bytes_processed, total_bytes) 209 { 210 use anyhow::Context; 211 212 let dir = directory.as_ref(); 213 214 // Find all bundle files 215 let mut bundle_files: Vec<(u32, std::path::PathBuf)> = Vec::new(); 216 217 for entry in std::fs::read_dir(dir) 218 .with_context(|| format!("Failed to read directory: {}", dir.display()))? 219 { 220 let entry = entry?; 221 let path = entry.path(); 222 223 if !path.is_file() { 224 continue; 225 } 226 227 let filename = path.file_name().and_then(|n| n.to_str()).unwrap_or(""); 228 229 // Match pattern: NNNNNN.jsonl.zst (16 chars: 6 digits + 10 chars for .jsonl.zst) 230 if filename.ends_with(".jsonl.zst") 231 && filename.len() == 16 232 && let Ok(bundle_num) = filename[0..6].parse::<u32>() 233 { 234 bundle_files.push((bundle_num, path)); 235 } 236 } 237 238 if bundle_files.is_empty() { 239 anyhow::bail!("No bundle files found in directory"); 240 } 241 242 // Sort by bundle number 243 bundle_files.sort_by_key(|(num, _)| *num); 244 245 // Validate that first bundle is 1 246 let first_bundle_num = bundle_files[0].0; 247 if first_bundle_num != 1 { 248 anyhow::bail!( 249 "Invalid bundle sequence: first bundle file is {:06}.jsonl.zst but must be 000001.jsonl.zst", 250 first_bundle_num 251 ); 252 } 253 254 // Validate no gaps in bundle sequence 255 for (i, (actual, _)) in bundle_files.iter().enumerate() { 256 let expected = (i + 1) as u32; 257 if *actual != expected { 258 anyhow::bail!( 259 "Gap detected in bundle files: expected {:06}.jsonl.zst, found {:06}.jsonl.zst", 260 expected, 261 *actual 262 ); 263 } 264 } 265 266 // Pre-calculate total bytes for progress tracking (parallel) 267 use rayon::prelude::*; 268 let total_bytes: u64 = bundle_files 269 .par_iter() 270 .filter_map(|(_, bundle_path)| std::fs::metadata(bundle_path).ok()) 271 .map(|metadata| metadata.len()) 272 .sum(); 273 274 // Extract metadata from each bundle in parallel 275 use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; 276 use std::sync::{Arc, Mutex}; 277 278 let detected_origin: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(origin)); 279 let progress_cb_arc: Arc<Mutex<Option<F>>> = Arc::new(Mutex::new(progress_cb)); 280 let count_atomic = Arc::new(AtomicUsize::new(0)); 281 let bytes_atomic = Arc::new(AtomicU64::new(0)); 282 283 // Update progress bar less frequently to reduce contention 284 let update_interval = (rayon::current_num_threads().max(1) * 4).max(10); 285 286 // Process bundles in parallel 287 let bundle_count = bundle_files.len(); 288 let mut bundles_metadata: Vec<BundleMetadata> = bundle_files 289 .par_iter() 290 .map(|(bundle_num, bundle_path)| -> Result<BundleMetadata> { 291 // Get file size 292 let metadata = std::fs::metadata(bundle_path)?; 293 let compressed_size = metadata.len(); 294 let bytes_processed = 295 bytes_atomic.fetch_add(compressed_size, Ordering::Relaxed) + compressed_size; 296 let current_count = count_atomic.fetch_add(1, Ordering::Relaxed) + 1; 297 298 // Update progress periodically 299 if (current_count.is_multiple_of(update_interval) 300 || current_count == 1 301 || current_count == bundle_count) 302 && let Ok(cb_guard) = progress_cb_arc.lock() 303 && let Some(ref cb) = *cb_guard 304 { 305 cb(current_count, bundle_count, bytes_processed, total_bytes); 306 } 307 308 // Extract embedded metadata from bundle file 309 let embedded = crate::bundle_format::extract_metadata_from_file(bundle_path) 310 .with_context(|| { 311 format!("Failed to extract metadata from bundle {}", bundle_num) 312 })?; 313 314 // Auto-detect origin from first bundle if not provided 315 { 316 let mut origin_guard = detected_origin.lock().unwrap(); 317 if origin_guard.is_none() { 318 *origin_guard = Some(embedded.origin.clone()); 319 } 320 } 321 322 // Verify origin matches 323 { 324 let origin_guard = detected_origin.lock().unwrap(); 325 if let Some(ref expected_origin) = *origin_guard 326 && embedded.origin != *expected_origin 327 { 328 anyhow::bail!( 329 "Bundle {:06}: origin mismatch (expected '{}', got '{}')", 330 bundle_num, 331 expected_origin, 332 embedded.origin 333 ); 334 } 335 } 336 337 // Calculate compressed hash 338 let compressed_data = std::fs::read(bundle_path)?; 339 let compressed_hash = { 340 use sha2::{Digest, Sha256}; 341 let mut hasher = Sha256::new(); 342 hasher.update(&compressed_data); 343 format!("{:x}", hasher.finalize()) 344 }; 345 346 // Get uncompressed_size from embedded metadata if available 347 // Fall back to 0 for legacy bundles without this field 348 let uncompressed_size = embedded.uncompressed_size.unwrap_or(0); 349 350 // Build bundle metadata for index 351 Ok(BundleMetadata { 352 bundle_number: *bundle_num, 353 start_time: embedded.start_time.clone(), 354 end_time: embedded.end_time.clone(), 355 operation_count: embedded.operation_count as u32, 356 did_count: embedded.did_count as u32, 357 hash: String::new(), // Will be calculated after collecting all 358 content_hash: embedded.content_hash.clone(), 359 parent: String::new(), // Will be set during chain hash calculation 360 compressed_hash, 361 compressed_size, 362 uncompressed_size, 363 cursor: String::new(), // Will be set from previous bundle's end_time 364 created_at: embedded.created_at.clone(), 365 }) 366 }) 367 .collect::<Result<Vec<_>>>()?; 368 369 // Sort by bundle number to maintain order (parallel processing may reorder) 370 bundles_metadata.sort_by_key(|b| b.bundle_number); 371 372 let total_compressed_size: u64 = bundles_metadata.iter().map(|b| b.compressed_size).sum(); 373 let detected_origin = detected_origin.lock().unwrap().clone(); 374 375 // Calculate total uncompressed size 376 // Note: For legacy bundles without uncompressed_size in metadata, it will be 0 377 let total_uncompressed_size: u64 = 378 bundles_metadata.iter().map(|b| b.uncompressed_size).sum(); 379 380 // Calculate chain hashes sequentially (depends on previous bundles) 381 for i in 0..bundles_metadata.len() { 382 if i == 0 { 383 // Genesis bundle 384 use sha2::{Digest, Sha256}; 385 bundles_metadata[i].parent = String::new(); 386 bundles_metadata[i].cursor = String::new(); 387 let content_hash = bundles_metadata[i].content_hash.clone(); 388 let chain_input = format!("plcbundle:genesis:{}", content_hash); 389 let mut hasher = Sha256::new(); 390 hasher.update(chain_input.as_bytes()); 391 bundles_metadata[i].hash = format!("{:x}", hasher.finalize()); 392 } else { 393 use sha2::{Digest, Sha256}; 394 let prev_hash = bundles_metadata[i - 1].hash.clone(); 395 let prev_end_time = bundles_metadata[i - 1].end_time.clone(); 396 let content_hash = bundles_metadata[i].content_hash.clone(); 397 398 bundles_metadata[i].parent = prev_hash.clone(); 399 bundles_metadata[i].cursor = prev_end_time; 400 401 let chain_input = format!("{}:{}", prev_hash, content_hash); 402 let mut hasher = Sha256::new(); 403 hasher.update(chain_input.as_bytes()); 404 bundles_metadata[i].hash = format!("{:x}", hasher.finalize()); 405 } 406 } 407 408 let last_bundle = bundles_metadata.last().unwrap().bundle_number; 409 let origin_str = 410 detected_origin.unwrap_or_else(|| crate::constants::DEFAULT_ORIGIN.to_string()); 411 412 Ok(Index { 413 version: "1.0".to_string(), 414 origin: origin_str, 415 last_bundle, 416 updated_at: chrono::Utc::now().to_rfc3339(), 417 total_size_bytes: total_compressed_size, 418 total_uncompressed_size_bytes: total_uncompressed_size, 419 bundles: bundles_metadata, 420 }) 421 } 422 423 pub fn get_bundle(&self, bundle_number: u32) -> Option<&BundleMetadata> { 424 self.bundles 425 .iter() 426 .find(|b| b.bundle_number == bundle_number) 427 } 428 429 /// Calculate total uncompressed size for a set of bundle numbers. 430 /// Optimizes by using the pre-calculated total when all bundles are selected. 431 /// 432 /// # Arguments 433 /// * `bundle_numbers` - Vector of bundle numbers to calculate size for 434 /// 435 /// # Returns 436 /// Total uncompressed size in bytes 437 pub fn total_uncompressed_size_for_bundles(&self, bundle_numbers: &[u32]) -> u64 { 438 // Check if we're querying all bundles (1 to last_bundle) 439 let is_all_bundles = !bundle_numbers.is_empty() 440 && bundle_numbers.len() == self.last_bundle as usize 441 && bundle_numbers.first() == Some(&1) 442 && bundle_numbers.last() == Some(&self.last_bundle); 443 444 if is_all_bundles { 445 // Use pre-calculated total from index 446 self.total_uncompressed_size_bytes 447 } else { 448 // Sum only the selected bundles 449 bundle_numbers 450 .iter() 451 .filter_map(|bundle_num| { 452 self.get_bundle(*bundle_num) 453 .map(|meta| meta.uncompressed_size) 454 }) 455 .sum() 456 } 457 } 458} 459 460#[cfg(test)] 461mod tests { 462 use super::*; 463 464 #[test] 465 fn test_validate_empty_index() { 466 let index = Index { 467 version: "1.0".to_string(), 468 origin: "test".to_string(), 469 last_bundle: 0, 470 updated_at: "2024-01-01T00:00:00Z".to_string(), 471 total_size_bytes: 0, 472 total_uncompressed_size_bytes: 0, 473 bundles: Vec::new(), 474 }; 475 476 assert!(index.validate_bundle_sequence().is_ok()); 477 } 478 479 #[test] 480 fn test_validate_single_bundle_correct() { 481 let index = Index { 482 version: "1.0".to_string(), 483 origin: "test".to_string(), 484 last_bundle: 1, 485 updated_at: "2024-01-01T00:00:00Z".to_string(), 486 total_size_bytes: 100, 487 total_uncompressed_size_bytes: 200, 488 bundles: vec![BundleMetadata { 489 bundle_number: 1, 490 start_time: "2024-01-01T00:00:00Z".to_string(), 491 end_time: "2024-01-01T01:00:00Z".to_string(), 492 operation_count: 10, 493 did_count: 5, 494 hash: "hash1".to_string(), 495 content_hash: "content1".to_string(), 496 parent: String::new(), 497 compressed_hash: "comp1".to_string(), 498 compressed_size: 100, 499 uncompressed_size: 200, 500 cursor: String::new(), 501 created_at: "2024-01-01T00:00:00Z".to_string(), 502 }], 503 }; 504 505 assert!(index.validate_bundle_sequence().is_ok()); 506 } 507 508 #[test] 509 fn test_validate_first_bundle_not_one() { 510 let index = Index { 511 version: "1.0".to_string(), 512 origin: "test".to_string(), 513 last_bundle: 2, 514 updated_at: "2024-01-01T00:00:00Z".to_string(), 515 total_size_bytes: 100, 516 total_uncompressed_size_bytes: 200, 517 bundles: vec![BundleMetadata { 518 bundle_number: 2, 519 start_time: "2024-01-01T00:00:00Z".to_string(), 520 end_time: "2024-01-01T01:00:00Z".to_string(), 521 operation_count: 10, 522 did_count: 5, 523 hash: "hash1".to_string(), 524 content_hash: "content1".to_string(), 525 parent: String::new(), 526 compressed_hash: "comp1".to_string(), 527 compressed_size: 100, 528 uncompressed_size: 200, 529 cursor: String::new(), 530 created_at: "2024-01-01T00:00:00Z".to_string(), 531 }], 532 }; 533 534 let result = index.validate_bundle_sequence(); 535 assert!(result.is_err()); 536 assert!( 537 result 538 .unwrap_err() 539 .to_string() 540 .contains("first bundle is 2 but must be 1") 541 ); 542 } 543 544 #[test] 545 fn test_validate_gap_in_sequence() { 546 let index = Index { 547 version: "1.0".to_string(), 548 origin: "test".to_string(), 549 last_bundle: 3, 550 updated_at: "2024-01-01T00:00:00Z".to_string(), 551 total_size_bytes: 200, 552 total_uncompressed_size_bytes: 400, 553 bundles: vec![ 554 BundleMetadata { 555 bundle_number: 1, 556 start_time: "2024-01-01T00:00:00Z".to_string(), 557 end_time: "2024-01-01T01:00:00Z".to_string(), 558 operation_count: 10, 559 did_count: 5, 560 hash: "hash1".to_string(), 561 content_hash: "content1".to_string(), 562 parent: String::new(), 563 compressed_hash: "comp1".to_string(), 564 compressed_size: 100, 565 uncompressed_size: 200, 566 cursor: String::new(), 567 created_at: "2024-01-01T00:00:00Z".to_string(), 568 }, 569 BundleMetadata { 570 bundle_number: 3, // Gap: missing bundle 2 571 start_time: "2024-01-01T01:00:00Z".to_string(), 572 end_time: "2024-01-01T02:00:00Z".to_string(), 573 operation_count: 10, 574 did_count: 5, 575 hash: "hash3".to_string(), 576 content_hash: "content3".to_string(), 577 parent: "hash1".to_string(), 578 compressed_hash: "comp3".to_string(), 579 compressed_size: 100, 580 uncompressed_size: 200, 581 cursor: "2024-01-01T01:00:00Z".to_string(), 582 created_at: "2024-01-01T01:00:00Z".to_string(), 583 }, 584 ], 585 }; 586 587 let result = index.validate_bundle_sequence(); 588 assert!(result.is_err()); 589 assert!( 590 result 591 .unwrap_err() 592 .to_string() 593 .contains("expected bundle 2, found bundle 3") 594 ); 595 } 596 597 #[test] 598 fn test_validate_consecutive_sequence() { 599 let index = Index { 600 version: "1.0".to_string(), 601 origin: "test".to_string(), 602 last_bundle: 3, 603 updated_at: "2024-01-01T00:00:00Z".to_string(), 604 total_size_bytes: 300, 605 total_uncompressed_size_bytes: 600, 606 bundles: vec![ 607 BundleMetadata { 608 bundle_number: 1, 609 start_time: "2024-01-01T00:00:00Z".to_string(), 610 end_time: "2024-01-01T01:00:00Z".to_string(), 611 operation_count: 10, 612 did_count: 5, 613 hash: "hash1".to_string(), 614 content_hash: "content1".to_string(), 615 parent: String::new(), 616 compressed_hash: "comp1".to_string(), 617 compressed_size: 100, 618 uncompressed_size: 200, 619 cursor: String::new(), 620 created_at: "2024-01-01T00:00:00Z".to_string(), 621 }, 622 BundleMetadata { 623 bundle_number: 2, 624 start_time: "2024-01-01T01:00:00Z".to_string(), 625 end_time: "2024-01-01T02:00:00Z".to_string(), 626 operation_count: 10, 627 did_count: 5, 628 hash: "hash2".to_string(), 629 content_hash: "content2".to_string(), 630 parent: "hash1".to_string(), 631 compressed_hash: "comp2".to_string(), 632 compressed_size: 100, 633 uncompressed_size: 200, 634 cursor: "2024-01-01T01:00:00Z".to_string(), 635 created_at: "2024-01-01T01:00:00Z".to_string(), 636 }, 637 BundleMetadata { 638 bundle_number: 3, 639 start_time: "2024-01-01T02:00:00Z".to_string(), 640 end_time: "2024-01-01T03:00:00Z".to_string(), 641 operation_count: 10, 642 did_count: 5, 643 hash: "hash3".to_string(), 644 content_hash: "content3".to_string(), 645 parent: "hash2".to_string(), 646 compressed_hash: "comp3".to_string(), 647 compressed_size: 100, 648 uncompressed_size: 200, 649 cursor: "2024-01-01T02:00:00Z".to_string(), 650 created_at: "2024-01-01T02:00:00Z".to_string(), 651 }, 652 ], 653 }; 654 655 assert!(index.validate_bundle_sequence().is_ok()); 656 } 657 658 #[test] 659 fn test_get_bundle() { 660 let index = Index { 661 version: "1.0".to_string(), 662 origin: "test".to_string(), 663 last_bundle: 3, 664 updated_at: "2024-01-01T00:00:00Z".to_string(), 665 total_size_bytes: 300, 666 total_uncompressed_size_bytes: 600, 667 bundles: vec![ 668 BundleMetadata { 669 bundle_number: 1, 670 start_time: "2024-01-01T00:00:00Z".to_string(), 671 end_time: "2024-01-01T01:00:00Z".to_string(), 672 operation_count: 10, 673 did_count: 5, 674 hash: "hash1".to_string(), 675 content_hash: "content1".to_string(), 676 parent: String::new(), 677 compressed_hash: "comp1".to_string(), 678 compressed_size: 100, 679 uncompressed_size: 200, 680 cursor: String::new(), 681 created_at: "2024-01-01T00:00:00Z".to_string(), 682 }, 683 BundleMetadata { 684 bundle_number: 2, 685 start_time: "2024-01-01T01:00:00Z".to_string(), 686 end_time: "2024-01-01T02:00:00Z".to_string(), 687 operation_count: 10, 688 did_count: 5, 689 hash: "hash2".to_string(), 690 content_hash: "content2".to_string(), 691 parent: "hash1".to_string(), 692 compressed_hash: "comp2".to_string(), 693 compressed_size: 100, 694 uncompressed_size: 200, 695 cursor: "2024-01-01T01:00:00Z".to_string(), 696 created_at: "2024-01-01T01:00:00Z".to_string(), 697 }, 698 BundleMetadata { 699 bundle_number: 3, 700 start_time: "2024-01-01T02:00:00Z".to_string(), 701 end_time: "2024-01-01T03:00:00Z".to_string(), 702 operation_count: 10, 703 did_count: 5, 704 hash: "hash3".to_string(), 705 content_hash: "content3".to_string(), 706 parent: "hash2".to_string(), 707 compressed_hash: "comp3".to_string(), 708 compressed_size: 100, 709 uncompressed_size: 200, 710 cursor: "2024-01-01T02:00:00Z".to_string(), 711 created_at: "2024-01-01T02:00:00Z".to_string(), 712 }, 713 ], 714 }; 715 716 assert!(index.get_bundle(1).is_some()); 717 assert_eq!(index.get_bundle(1).unwrap().bundle_number, 1); 718 assert_eq!(index.get_bundle(1).unwrap().hash, "hash1"); 719 720 assert!(index.get_bundle(2).is_some()); 721 assert_eq!(index.get_bundle(2).unwrap().bundle_number, 2); 722 assert_eq!(index.get_bundle(2).unwrap().hash, "hash2"); 723 724 assert!(index.get_bundle(3).is_some()); 725 assert_eq!(index.get_bundle(3).unwrap().bundle_number, 3); 726 727 assert!(index.get_bundle(4).is_none()); 728 assert!(index.get_bundle(0).is_none()); 729 } 730 731 #[test] 732 fn test_total_uncompressed_size_for_bundles_all() { 733 let index = Index { 734 version: "1.0".to_string(), 735 origin: "test".to_string(), 736 last_bundle: 3, 737 updated_at: "2024-01-01T00:00:00Z".to_string(), 738 total_size_bytes: 300, 739 total_uncompressed_size_bytes: 600, 740 bundles: vec![ 741 BundleMetadata { 742 bundle_number: 1, 743 start_time: "2024-01-01T00:00:00Z".to_string(), 744 end_time: "2024-01-01T01:00:00Z".to_string(), 745 operation_count: 10, 746 did_count: 5, 747 hash: "hash1".to_string(), 748 content_hash: "content1".to_string(), 749 parent: String::new(), 750 compressed_hash: "comp1".to_string(), 751 compressed_size: 100, 752 uncompressed_size: 200, 753 cursor: String::new(), 754 created_at: "2024-01-01T00:00:00Z".to_string(), 755 }, 756 BundleMetadata { 757 bundle_number: 2, 758 start_time: "2024-01-01T01:00:00Z".to_string(), 759 end_time: "2024-01-01T02:00:00Z".to_string(), 760 operation_count: 10, 761 did_count: 5, 762 hash: "hash2".to_string(), 763 content_hash: "content2".to_string(), 764 parent: "hash1".to_string(), 765 compressed_hash: "comp2".to_string(), 766 compressed_size: 100, 767 uncompressed_size: 200, 768 cursor: "2024-01-01T01:00:00Z".to_string(), 769 created_at: "2024-01-01T01:00:00Z".to_string(), 770 }, 771 BundleMetadata { 772 bundle_number: 3, 773 start_time: "2024-01-01T02:00:00Z".to_string(), 774 end_time: "2024-01-01T03:00:00Z".to_string(), 775 operation_count: 10, 776 did_count: 5, 777 hash: "hash3".to_string(), 778 content_hash: "content3".to_string(), 779 parent: "hash2".to_string(), 780 compressed_hash: "comp3".to_string(), 781 compressed_size: 100, 782 uncompressed_size: 200, 783 cursor: "2024-01-01T02:00:00Z".to_string(), 784 created_at: "2024-01-01T02:00:00Z".to_string(), 785 }, 786 ], 787 }; 788 789 // Test all bundles - should use pre-calculated total 790 let all_bundles = vec![1, 2, 3]; 791 assert_eq!(index.total_uncompressed_size_for_bundles(&all_bundles), 600); 792 } 793 794 #[test] 795 fn test_total_uncompressed_size_for_bundles_subset() { 796 let index = Index { 797 version: "1.0".to_string(), 798 origin: "test".to_string(), 799 last_bundle: 3, 800 updated_at: "2024-01-01T00:00:00Z".to_string(), 801 total_size_bytes: 300, 802 total_uncompressed_size_bytes: 600, 803 bundles: vec![ 804 BundleMetadata { 805 bundle_number: 1, 806 start_time: "2024-01-01T00:00:00Z".to_string(), 807 end_time: "2024-01-01T01:00:00Z".to_string(), 808 operation_count: 10, 809 did_count: 5, 810 hash: "hash1".to_string(), 811 content_hash: "content1".to_string(), 812 parent: String::new(), 813 compressed_hash: "comp1".to_string(), 814 compressed_size: 100, 815 uncompressed_size: 200, 816 cursor: String::new(), 817 created_at: "2024-01-01T00:00:00Z".to_string(), 818 }, 819 BundleMetadata { 820 bundle_number: 2, 821 start_time: "2024-01-01T01:00:00Z".to_string(), 822 end_time: "2024-01-01T02:00:00Z".to_string(), 823 operation_count: 10, 824 did_count: 5, 825 hash: "hash2".to_string(), 826 content_hash: "content2".to_string(), 827 parent: "hash1".to_string(), 828 compressed_hash: "comp2".to_string(), 829 compressed_size: 100, 830 uncompressed_size: 300, 831 cursor: "2024-01-01T01:00:00Z".to_string(), 832 created_at: "2024-01-01T01:00:00Z".to_string(), 833 }, 834 BundleMetadata { 835 bundle_number: 3, 836 start_time: "2024-01-01T02:00:00Z".to_string(), 837 end_time: "2024-01-01T03:00:00Z".to_string(), 838 operation_count: 10, 839 did_count: 5, 840 hash: "hash3".to_string(), 841 content_hash: "content3".to_string(), 842 parent: "hash2".to_string(), 843 compressed_hash: "comp3".to_string(), 844 compressed_size: 100, 845 uncompressed_size: 400, 846 cursor: "2024-01-01T02:00:00Z".to_string(), 847 created_at: "2024-01-01T02:00:00Z".to_string(), 848 }, 849 ], 850 }; 851 852 // Test subset - should sum individual bundles 853 let subset = vec![1, 3]; 854 assert_eq!( 855 index.total_uncompressed_size_for_bundles(&subset), 856 200 + 400 857 ); 858 859 // Test single bundle 860 let single = vec![2]; 861 assert_eq!(index.total_uncompressed_size_for_bundles(&single), 300); 862 } 863 864 #[test] 865 fn test_total_uncompressed_size_for_bundles_empty() { 866 let index = Index { 867 version: "1.0".to_string(), 868 origin: "test".to_string(), 869 last_bundle: 0, 870 updated_at: "2024-01-01T00:00:00Z".to_string(), 871 total_size_bytes: 0, 872 total_uncompressed_size_bytes: 0, 873 bundles: Vec::new(), 874 }; 875 876 assert_eq!(index.total_uncompressed_size_for_bundles(&[]), 0); 877 } 878 879 #[test] 880 fn test_total_uncompressed_size_for_bundles_missing_bundle() { 881 let index = Index { 882 version: "1.0".to_string(), 883 origin: "test".to_string(), 884 last_bundle: 2, 885 updated_at: "2024-01-01T00:00:00Z".to_string(), 886 total_size_bytes: 200, 887 total_uncompressed_size_bytes: 400, 888 bundles: vec![ 889 BundleMetadata { 890 bundle_number: 1, 891 start_time: "2024-01-01T00:00:00Z".to_string(), 892 end_time: "2024-01-01T01:00:00Z".to_string(), 893 operation_count: 10, 894 did_count: 5, 895 hash: "hash1".to_string(), 896 content_hash: "content1".to_string(), 897 parent: String::new(), 898 compressed_hash: "comp1".to_string(), 899 compressed_size: 100, 900 uncompressed_size: 200, 901 cursor: String::new(), 902 created_at: "2024-01-01T00:00:00Z".to_string(), 903 }, 904 BundleMetadata { 905 bundle_number: 2, 906 start_time: "2024-01-01T01:00:00Z".to_string(), 907 end_time: "2024-01-01T02:00:00Z".to_string(), 908 operation_count: 10, 909 did_count: 5, 910 hash: "hash2".to_string(), 911 content_hash: "content2".to_string(), 912 parent: "hash1".to_string(), 913 compressed_hash: "comp2".to_string(), 914 compressed_size: 100, 915 uncompressed_size: 200, 916 cursor: "2024-01-01T01:00:00Z".to_string(), 917 created_at: "2024-01-01T01:00:00Z".to_string(), 918 }, 919 ], 920 }; 921 922 // Requesting non-existent bundle should be ignored 923 let with_missing = vec![1, 3, 2]; 924 assert_eq!( 925 index.total_uncompressed_size_for_bundles(&with_missing), 926 200 + 200 927 ); 928 } 929 930 #[test] 931 fn test_validate_last_bundle_mismatch() { 932 let index = Index { 933 version: "1.0".to_string(), 934 origin: "test".to_string(), 935 last_bundle: 5, // Incorrect: actual last bundle is 2 936 updated_at: "2024-01-01T00:00:00Z".to_string(), 937 total_size_bytes: 200, 938 total_uncompressed_size_bytes: 400, 939 bundles: vec![ 940 BundleMetadata { 941 bundle_number: 1, 942 start_time: "2024-01-01T00:00:00Z".to_string(), 943 end_time: "2024-01-01T01:00:00Z".to_string(), 944 operation_count: 10, 945 did_count: 5, 946 hash: "hash1".to_string(), 947 content_hash: "content1".to_string(), 948 parent: String::new(), 949 compressed_hash: "comp1".to_string(), 950 compressed_size: 100, 951 uncompressed_size: 200, 952 cursor: String::new(), 953 created_at: "2024-01-01T00:00:00Z".to_string(), 954 }, 955 BundleMetadata { 956 bundle_number: 2, 957 start_time: "2024-01-01T01:00:00Z".to_string(), 958 end_time: "2024-01-01T02:00:00Z".to_string(), 959 operation_count: 10, 960 did_count: 5, 961 hash: "hash2".to_string(), 962 content_hash: "content2".to_string(), 963 parent: "hash1".to_string(), 964 compressed_hash: "comp2".to_string(), 965 compressed_size: 100, 966 uncompressed_size: 200, 967 cursor: "2024-01-01T01:00:00Z".to_string(), 968 created_at: "2024-01-01T01:00:00Z".to_string(), 969 }, 970 ], 971 }; 972 973 let result = index.validate_bundle_sequence(); 974 assert!(result.is_err()); 975 assert!( 976 result 977 .unwrap_err() 978 .to_string() 979 .contains("last_bundle: index says 5, but last bundle in array is 2") 980 ); 981 } 982}