forked from
atscan.net/plcbundle-rs
High-performance implementation of plcbundle written in Rust
1//! Bundle repository index: load/save, sequence validation, and rebuild from bundle metadata
2// Replace your current src/index.rs with this:
3
4use anyhow::Result;
5use serde::{Deserialize, Serialize}; // Add Serialize here
6use std::fs::File;
7use std::path::Path;
8
9#[derive(Debug, Deserialize, Serialize, Clone)] // Add Clone here
10pub struct Index {
11 pub version: String,
12 pub origin: String,
13 pub last_bundle: u32,
14 pub updated_at: String,
15 pub total_size_bytes: u64,
16 pub total_uncompressed_size_bytes: u64,
17 pub bundles: Vec<BundleMetadata>,
18}
19
20#[derive(Debug, Deserialize, Serialize, Clone)] // Add Serialize here
21pub struct BundleMetadata {
22 pub bundle_number: u32,
23 pub start_time: String,
24 pub end_time: String,
25 pub operation_count: u32,
26 pub did_count: u32,
27 pub hash: String,
28 pub content_hash: String,
29 #[serde(default)]
30 pub parent: String, // Empty string for first bundle
31 pub compressed_hash: String,
32 pub compressed_size: u64,
33 pub uncompressed_size: u64,
34 #[serde(default)]
35 pub cursor: String, // Empty string for first bundle
36 pub created_at: String,
37}
38
39impl Index {
40 pub fn load<P: AsRef<Path>>(directory: P) -> Result<Self> {
41 let index_path = directory.as_ref().join("plc_bundles.json");
42 let display_path = index_path
43 .canonicalize()
44 .unwrap_or_else(|_| index_path.clone());
45 log::debug!(
46 "[BundleManager] Loading index from: {}",
47 display_path.display()
48 );
49 let start = std::time::Instant::now();
50 let file = File::open(&index_path)?;
51 let index: Index = sonic_rs::from_reader(file)?;
52 let elapsed = start.elapsed();
53 let elapsed_ms = elapsed.as_secs_f64() * 1000.0;
54 log::debug!(
55 "[BundleManager] Index loaded: v{} ({}), {} bundles, last bundle: {} ({:.3}ms)",
56 index.version,
57 index.origin,
58 index.bundles.len(),
59 index.last_bundle,
60 elapsed_ms
61 );
62
63 // Validate bundle sequence integrity
64 index.validate_bundle_sequence()?;
65
66 Ok(index)
67 }
68
69 /// Validate that bundles form a consecutive sequence starting from 1
70 /// This ensures no gaps in the bundle chain and that the first bundle is always 1
71 fn validate_bundle_sequence(&self) -> Result<()> {
72 if self.bundles.is_empty() {
73 return Ok(()); // Empty index is valid
74 }
75
76 // First bundle must be 1
77 let first_bundle = self.bundles.first().unwrap().bundle_number;
78 if first_bundle != 1 {
79 anyhow::bail!(
80 "Invalid bundle sequence: first bundle is {} but must be 1",
81 first_bundle
82 );
83 }
84
85 // Check for gaps in sequence
86 for i in 0..self.bundles.len() {
87 let expected = (i + 1) as u32;
88 let actual = self.bundles[i].bundle_number;
89 if actual != expected {
90 anyhow::bail!(
91 "Gap detected in bundle sequence: expected bundle {}, found bundle {}",
92 expected,
93 actual
94 );
95 }
96 }
97
98 // Verify last_bundle matches the last bundle in the array
99 let last_in_array = self.bundles.last().unwrap().bundle_number;
100 if self.last_bundle != last_in_array {
101 anyhow::bail!(
102 "Inconsistent last_bundle: index says {}, but last bundle in array is {}",
103 self.last_bundle,
104 last_in_array
105 );
106 }
107
108 Ok(())
109 }
110
111 /// Save index to disk atomically
112 pub fn save<P: AsRef<Path>>(&self, directory: P) -> Result<()> {
113 use anyhow::Context;
114
115 // Validate bundle sequence before saving
116 self.validate_bundle_sequence()
117 .context("Cannot save invalid index")?;
118
119 let index_path = directory.as_ref().join("plc_bundles.json");
120 let temp_path = index_path.with_extension("json.tmp");
121
122 let json = sonic_rs::to_string_pretty(self).context("Failed to serialize index")?;
123
124 std::fs::write(&temp_path, json)
125 .with_context(|| format!("Failed to write temp index: {}", temp_path.display()))?;
126
127 std::fs::rename(&temp_path, &index_path)
128 .with_context(|| format!("Failed to rename index: {}", index_path.display()))?;
129
130 Ok(())
131 }
132
133 /// Initialize a new repository with an empty index
134 ///
135 /// Creates all necessary directories and an empty index file.
136 /// This is idempotent - if the repository already exists, it will return an error
137 /// unless `force` is true, in which case it will reinitialize.
138 ///
139 /// # Arguments
140 /// * `directory` - Directory to initialize
141 /// * `origin` - PLC directory URL or origin identifier
142 /// * `force` - Whether to reinitialize if already exists
143 ///
144 /// # Returns
145 /// True if initialized (created new), False if already existed and force=false
146 pub fn init<P: AsRef<Path>>(directory: P, origin: String, force: bool) -> Result<bool> {
147 use anyhow::Context;
148
149 let dir = directory.as_ref();
150 let index_path = dir.join("plc_bundles.json");
151
152 // Check if already initialized
153 if index_path.exists() && !force {
154 return Ok(false); // Already initialized
155 }
156
157 // Create directory if it doesn't exist
158 if !dir.exists() {
159 std::fs::create_dir_all(dir)
160 .with_context(|| format!("Failed to create directory: {}", dir.display()))?;
161 }
162
163 // Create .plcbundle directory for DID index
164 let plcbundle_dir = dir.join(crate::constants::DID_INDEX_DIR);
165 if !plcbundle_dir.exists() {
166 std::fs::create_dir_all(&plcbundle_dir).with_context(|| {
167 format!(
168 "Failed to create DID index directory: {}",
169 plcbundle_dir.display()
170 )
171 })?;
172 }
173
174 // Create and save empty index
175 let index = Index {
176 version: "1.0".to_string(),
177 origin,
178 last_bundle: 0,
179 updated_at: chrono::Utc::now().to_rfc3339(),
180 total_size_bytes: 0,
181 total_uncompressed_size_bytes: 0,
182 bundles: Vec::new(),
183 };
184
185 index.save(dir)?;
186
187 Ok(true) // Successfully initialized
188 }
189
190 /// Rebuild index from existing bundle files by scanning their metadata
191 ///
192 /// This scans all .jsonl.zst files in the directory and reconstructs the index
193 /// by extracting embedded metadata from each bundle's skippable frame.
194 ///
195 /// # Arguments
196 /// * `directory` - Directory containing bundle files
197 /// * `origin` - Optional origin URL (auto-detected from first bundle if None)
198 /// * `progress_cb` - Optional progress callback (current, total)
199 ///
200 /// # Returns
201 /// Reconstructed Index ready to be saved
202 pub fn rebuild_from_bundles<P: AsRef<Path>, F>(
203 directory: P,
204 origin: Option<String>,
205 progress_cb: Option<F>,
206 ) -> Result<Self>
207 where
208 F: Fn(usize, usize, u64, u64) + Send + Sync, // (current, total, bytes_processed, total_bytes)
209 {
210 use anyhow::Context;
211
212 let dir = directory.as_ref();
213
214 // Find all bundle files
215 let mut bundle_files: Vec<(u32, std::path::PathBuf)> = Vec::new();
216
217 for entry in std::fs::read_dir(dir)
218 .with_context(|| format!("Failed to read directory: {}", dir.display()))?
219 {
220 let entry = entry?;
221 let path = entry.path();
222
223 if !path.is_file() {
224 continue;
225 }
226
227 let filename = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
228
229 // Match pattern: NNNNNN.jsonl.zst (16 chars: 6 digits + 10 chars for .jsonl.zst)
230 if filename.ends_with(".jsonl.zst")
231 && filename.len() == 16
232 && let Ok(bundle_num) = filename[0..6].parse::<u32>()
233 {
234 bundle_files.push((bundle_num, path));
235 }
236 }
237
238 if bundle_files.is_empty() {
239 anyhow::bail!("No bundle files found in directory");
240 }
241
242 // Sort by bundle number
243 bundle_files.sort_by_key(|(num, _)| *num);
244
245 // Validate that first bundle is 1
246 let first_bundle_num = bundle_files[0].0;
247 if first_bundle_num != 1 {
248 anyhow::bail!(
249 "Invalid bundle sequence: first bundle file is {:06}.jsonl.zst but must be 000001.jsonl.zst",
250 first_bundle_num
251 );
252 }
253
254 // Validate no gaps in bundle sequence
255 for (i, (actual, _)) in bundle_files.iter().enumerate() {
256 let expected = (i + 1) as u32;
257 if *actual != expected {
258 anyhow::bail!(
259 "Gap detected in bundle files: expected {:06}.jsonl.zst, found {:06}.jsonl.zst",
260 expected,
261 *actual
262 );
263 }
264 }
265
266 // Pre-calculate total bytes for progress tracking (parallel)
267 use rayon::prelude::*;
268 let total_bytes: u64 = bundle_files
269 .par_iter()
270 .filter_map(|(_, bundle_path)| std::fs::metadata(bundle_path).ok())
271 .map(|metadata| metadata.len())
272 .sum();
273
274 // Extract metadata from each bundle in parallel
275 use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
276 use std::sync::{Arc, Mutex};
277
278 let detected_origin: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(origin));
279 let progress_cb_arc: Arc<Mutex<Option<F>>> = Arc::new(Mutex::new(progress_cb));
280 let count_atomic = Arc::new(AtomicUsize::new(0));
281 let bytes_atomic = Arc::new(AtomicU64::new(0));
282
283 // Update progress bar less frequently to reduce contention
284 let update_interval = (rayon::current_num_threads().max(1) * 4).max(10);
285
286 // Process bundles in parallel
287 let bundle_count = bundle_files.len();
288 let mut bundles_metadata: Vec<BundleMetadata> = bundle_files
289 .par_iter()
290 .map(|(bundle_num, bundle_path)| -> Result<BundleMetadata> {
291 // Get file size
292 let metadata = std::fs::metadata(bundle_path)?;
293 let compressed_size = metadata.len();
294 let bytes_processed =
295 bytes_atomic.fetch_add(compressed_size, Ordering::Relaxed) + compressed_size;
296 let current_count = count_atomic.fetch_add(1, Ordering::Relaxed) + 1;
297
298 // Update progress periodically
299 if (current_count.is_multiple_of(update_interval)
300 || current_count == 1
301 || current_count == bundle_count)
302 && let Ok(cb_guard) = progress_cb_arc.lock()
303 && let Some(ref cb) = *cb_guard
304 {
305 cb(current_count, bundle_count, bytes_processed, total_bytes);
306 }
307
308 // Extract embedded metadata from bundle file
309 let embedded = crate::bundle_format::extract_metadata_from_file(bundle_path)
310 .with_context(|| {
311 format!("Failed to extract metadata from bundle {}", bundle_num)
312 })?;
313
314 // Auto-detect origin from first bundle if not provided
315 {
316 let mut origin_guard = detected_origin.lock().unwrap();
317 if origin_guard.is_none() {
318 *origin_guard = Some(embedded.origin.clone());
319 }
320 }
321
322 // Verify origin matches
323 {
324 let origin_guard = detected_origin.lock().unwrap();
325 if let Some(ref expected_origin) = *origin_guard
326 && embedded.origin != *expected_origin
327 {
328 anyhow::bail!(
329 "Bundle {:06}: origin mismatch (expected '{}', got '{}')",
330 bundle_num,
331 expected_origin,
332 embedded.origin
333 );
334 }
335 }
336
337 // Calculate compressed hash
338 let compressed_data = std::fs::read(bundle_path)?;
339 let compressed_hash = {
340 use sha2::{Digest, Sha256};
341 let mut hasher = Sha256::new();
342 hasher.update(&compressed_data);
343 format!("{:x}", hasher.finalize())
344 };
345
346 // Get uncompressed_size from embedded metadata if available
347 // Fall back to 0 for legacy bundles without this field
348 let uncompressed_size = embedded.uncompressed_size.unwrap_or(0);
349
350 // Build bundle metadata for index
351 Ok(BundleMetadata {
352 bundle_number: *bundle_num,
353 start_time: embedded.start_time.clone(),
354 end_time: embedded.end_time.clone(),
355 operation_count: embedded.operation_count as u32,
356 did_count: embedded.did_count as u32,
357 hash: String::new(), // Will be calculated after collecting all
358 content_hash: embedded.content_hash.clone(),
359 parent: String::new(), // Will be set during chain hash calculation
360 compressed_hash,
361 compressed_size,
362 uncompressed_size,
363 cursor: String::new(), // Will be set from previous bundle's end_time
364 created_at: embedded.created_at.clone(),
365 })
366 })
367 .collect::<Result<Vec<_>>>()?;
368
369 // Sort by bundle number to maintain order (parallel processing may reorder)
370 bundles_metadata.sort_by_key(|b| b.bundle_number);
371
372 let total_compressed_size: u64 = bundles_metadata.iter().map(|b| b.compressed_size).sum();
373 let detected_origin = detected_origin.lock().unwrap().clone();
374
375 // Calculate total uncompressed size
376 // Note: For legacy bundles without uncompressed_size in metadata, it will be 0
377 let total_uncompressed_size: u64 =
378 bundles_metadata.iter().map(|b| b.uncompressed_size).sum();
379
380 // Calculate chain hashes sequentially (depends on previous bundles)
381 for i in 0..bundles_metadata.len() {
382 if i == 0 {
383 // Genesis bundle
384 use sha2::{Digest, Sha256};
385 bundles_metadata[i].parent = String::new();
386 bundles_metadata[i].cursor = String::new();
387 let content_hash = bundles_metadata[i].content_hash.clone();
388 let chain_input = format!("plcbundle:genesis:{}", content_hash);
389 let mut hasher = Sha256::new();
390 hasher.update(chain_input.as_bytes());
391 bundles_metadata[i].hash = format!("{:x}", hasher.finalize());
392 } else {
393 use sha2::{Digest, Sha256};
394 let prev_hash = bundles_metadata[i - 1].hash.clone();
395 let prev_end_time = bundles_metadata[i - 1].end_time.clone();
396 let content_hash = bundles_metadata[i].content_hash.clone();
397
398 bundles_metadata[i].parent = prev_hash.clone();
399 bundles_metadata[i].cursor = prev_end_time;
400
401 let chain_input = format!("{}:{}", prev_hash, content_hash);
402 let mut hasher = Sha256::new();
403 hasher.update(chain_input.as_bytes());
404 bundles_metadata[i].hash = format!("{:x}", hasher.finalize());
405 }
406 }
407
408 let last_bundle = bundles_metadata.last().unwrap().bundle_number;
409 let origin_str =
410 detected_origin.unwrap_or_else(|| crate::constants::DEFAULT_ORIGIN.to_string());
411
412 Ok(Index {
413 version: "1.0".to_string(),
414 origin: origin_str,
415 last_bundle,
416 updated_at: chrono::Utc::now().to_rfc3339(),
417 total_size_bytes: total_compressed_size,
418 total_uncompressed_size_bytes: total_uncompressed_size,
419 bundles: bundles_metadata,
420 })
421 }
422
423 pub fn get_bundle(&self, bundle_number: u32) -> Option<&BundleMetadata> {
424 self.bundles
425 .iter()
426 .find(|b| b.bundle_number == bundle_number)
427 }
428
429 /// Calculate total uncompressed size for a set of bundle numbers.
430 /// Optimizes by using the pre-calculated total when all bundles are selected.
431 ///
432 /// # Arguments
433 /// * `bundle_numbers` - Vector of bundle numbers to calculate size for
434 ///
435 /// # Returns
436 /// Total uncompressed size in bytes
437 pub fn total_uncompressed_size_for_bundles(&self, bundle_numbers: &[u32]) -> u64 {
438 // Check if we're querying all bundles (1 to last_bundle)
439 let is_all_bundles = !bundle_numbers.is_empty()
440 && bundle_numbers.len() == self.last_bundle as usize
441 && bundle_numbers.first() == Some(&1)
442 && bundle_numbers.last() == Some(&self.last_bundle);
443
444 if is_all_bundles {
445 // Use pre-calculated total from index
446 self.total_uncompressed_size_bytes
447 } else {
448 // Sum only the selected bundles
449 bundle_numbers
450 .iter()
451 .filter_map(|bundle_num| {
452 self.get_bundle(*bundle_num)
453 .map(|meta| meta.uncompressed_size)
454 })
455 .sum()
456 }
457 }
458}
459
460#[cfg(test)]
461mod tests {
462 use super::*;
463
464 #[test]
465 fn test_validate_empty_index() {
466 let index = Index {
467 version: "1.0".to_string(),
468 origin: "test".to_string(),
469 last_bundle: 0,
470 updated_at: "2024-01-01T00:00:00Z".to_string(),
471 total_size_bytes: 0,
472 total_uncompressed_size_bytes: 0,
473 bundles: Vec::new(),
474 };
475
476 assert!(index.validate_bundle_sequence().is_ok());
477 }
478
479 #[test]
480 fn test_validate_single_bundle_correct() {
481 let index = Index {
482 version: "1.0".to_string(),
483 origin: "test".to_string(),
484 last_bundle: 1,
485 updated_at: "2024-01-01T00:00:00Z".to_string(),
486 total_size_bytes: 100,
487 total_uncompressed_size_bytes: 200,
488 bundles: vec![BundleMetadata {
489 bundle_number: 1,
490 start_time: "2024-01-01T00:00:00Z".to_string(),
491 end_time: "2024-01-01T01:00:00Z".to_string(),
492 operation_count: 10,
493 did_count: 5,
494 hash: "hash1".to_string(),
495 content_hash: "content1".to_string(),
496 parent: String::new(),
497 compressed_hash: "comp1".to_string(),
498 compressed_size: 100,
499 uncompressed_size: 200,
500 cursor: String::new(),
501 created_at: "2024-01-01T00:00:00Z".to_string(),
502 }],
503 };
504
505 assert!(index.validate_bundle_sequence().is_ok());
506 }
507
508 #[test]
509 fn test_validate_first_bundle_not_one() {
510 let index = Index {
511 version: "1.0".to_string(),
512 origin: "test".to_string(),
513 last_bundle: 2,
514 updated_at: "2024-01-01T00:00:00Z".to_string(),
515 total_size_bytes: 100,
516 total_uncompressed_size_bytes: 200,
517 bundles: vec![BundleMetadata {
518 bundle_number: 2,
519 start_time: "2024-01-01T00:00:00Z".to_string(),
520 end_time: "2024-01-01T01:00:00Z".to_string(),
521 operation_count: 10,
522 did_count: 5,
523 hash: "hash1".to_string(),
524 content_hash: "content1".to_string(),
525 parent: String::new(),
526 compressed_hash: "comp1".to_string(),
527 compressed_size: 100,
528 uncompressed_size: 200,
529 cursor: String::new(),
530 created_at: "2024-01-01T00:00:00Z".to_string(),
531 }],
532 };
533
534 let result = index.validate_bundle_sequence();
535 assert!(result.is_err());
536 assert!(
537 result
538 .unwrap_err()
539 .to_string()
540 .contains("first bundle is 2 but must be 1")
541 );
542 }
543
544 #[test]
545 fn test_validate_gap_in_sequence() {
546 let index = Index {
547 version: "1.0".to_string(),
548 origin: "test".to_string(),
549 last_bundle: 3,
550 updated_at: "2024-01-01T00:00:00Z".to_string(),
551 total_size_bytes: 200,
552 total_uncompressed_size_bytes: 400,
553 bundles: vec![
554 BundleMetadata {
555 bundle_number: 1,
556 start_time: "2024-01-01T00:00:00Z".to_string(),
557 end_time: "2024-01-01T01:00:00Z".to_string(),
558 operation_count: 10,
559 did_count: 5,
560 hash: "hash1".to_string(),
561 content_hash: "content1".to_string(),
562 parent: String::new(),
563 compressed_hash: "comp1".to_string(),
564 compressed_size: 100,
565 uncompressed_size: 200,
566 cursor: String::new(),
567 created_at: "2024-01-01T00:00:00Z".to_string(),
568 },
569 BundleMetadata {
570 bundle_number: 3, // Gap: missing bundle 2
571 start_time: "2024-01-01T01:00:00Z".to_string(),
572 end_time: "2024-01-01T02:00:00Z".to_string(),
573 operation_count: 10,
574 did_count: 5,
575 hash: "hash3".to_string(),
576 content_hash: "content3".to_string(),
577 parent: "hash1".to_string(),
578 compressed_hash: "comp3".to_string(),
579 compressed_size: 100,
580 uncompressed_size: 200,
581 cursor: "2024-01-01T01:00:00Z".to_string(),
582 created_at: "2024-01-01T01:00:00Z".to_string(),
583 },
584 ],
585 };
586
587 let result = index.validate_bundle_sequence();
588 assert!(result.is_err());
589 assert!(
590 result
591 .unwrap_err()
592 .to_string()
593 .contains("expected bundle 2, found bundle 3")
594 );
595 }
596
597 #[test]
598 fn test_validate_consecutive_sequence() {
599 let index = Index {
600 version: "1.0".to_string(),
601 origin: "test".to_string(),
602 last_bundle: 3,
603 updated_at: "2024-01-01T00:00:00Z".to_string(),
604 total_size_bytes: 300,
605 total_uncompressed_size_bytes: 600,
606 bundles: vec![
607 BundleMetadata {
608 bundle_number: 1,
609 start_time: "2024-01-01T00:00:00Z".to_string(),
610 end_time: "2024-01-01T01:00:00Z".to_string(),
611 operation_count: 10,
612 did_count: 5,
613 hash: "hash1".to_string(),
614 content_hash: "content1".to_string(),
615 parent: String::new(),
616 compressed_hash: "comp1".to_string(),
617 compressed_size: 100,
618 uncompressed_size: 200,
619 cursor: String::new(),
620 created_at: "2024-01-01T00:00:00Z".to_string(),
621 },
622 BundleMetadata {
623 bundle_number: 2,
624 start_time: "2024-01-01T01:00:00Z".to_string(),
625 end_time: "2024-01-01T02:00:00Z".to_string(),
626 operation_count: 10,
627 did_count: 5,
628 hash: "hash2".to_string(),
629 content_hash: "content2".to_string(),
630 parent: "hash1".to_string(),
631 compressed_hash: "comp2".to_string(),
632 compressed_size: 100,
633 uncompressed_size: 200,
634 cursor: "2024-01-01T01:00:00Z".to_string(),
635 created_at: "2024-01-01T01:00:00Z".to_string(),
636 },
637 BundleMetadata {
638 bundle_number: 3,
639 start_time: "2024-01-01T02:00:00Z".to_string(),
640 end_time: "2024-01-01T03:00:00Z".to_string(),
641 operation_count: 10,
642 did_count: 5,
643 hash: "hash3".to_string(),
644 content_hash: "content3".to_string(),
645 parent: "hash2".to_string(),
646 compressed_hash: "comp3".to_string(),
647 compressed_size: 100,
648 uncompressed_size: 200,
649 cursor: "2024-01-01T02:00:00Z".to_string(),
650 created_at: "2024-01-01T02:00:00Z".to_string(),
651 },
652 ],
653 };
654
655 assert!(index.validate_bundle_sequence().is_ok());
656 }
657
658 #[test]
659 fn test_get_bundle() {
660 let index = Index {
661 version: "1.0".to_string(),
662 origin: "test".to_string(),
663 last_bundle: 3,
664 updated_at: "2024-01-01T00:00:00Z".to_string(),
665 total_size_bytes: 300,
666 total_uncompressed_size_bytes: 600,
667 bundles: vec![
668 BundleMetadata {
669 bundle_number: 1,
670 start_time: "2024-01-01T00:00:00Z".to_string(),
671 end_time: "2024-01-01T01:00:00Z".to_string(),
672 operation_count: 10,
673 did_count: 5,
674 hash: "hash1".to_string(),
675 content_hash: "content1".to_string(),
676 parent: String::new(),
677 compressed_hash: "comp1".to_string(),
678 compressed_size: 100,
679 uncompressed_size: 200,
680 cursor: String::new(),
681 created_at: "2024-01-01T00:00:00Z".to_string(),
682 },
683 BundleMetadata {
684 bundle_number: 2,
685 start_time: "2024-01-01T01:00:00Z".to_string(),
686 end_time: "2024-01-01T02:00:00Z".to_string(),
687 operation_count: 10,
688 did_count: 5,
689 hash: "hash2".to_string(),
690 content_hash: "content2".to_string(),
691 parent: "hash1".to_string(),
692 compressed_hash: "comp2".to_string(),
693 compressed_size: 100,
694 uncompressed_size: 200,
695 cursor: "2024-01-01T01:00:00Z".to_string(),
696 created_at: "2024-01-01T01:00:00Z".to_string(),
697 },
698 BundleMetadata {
699 bundle_number: 3,
700 start_time: "2024-01-01T02:00:00Z".to_string(),
701 end_time: "2024-01-01T03:00:00Z".to_string(),
702 operation_count: 10,
703 did_count: 5,
704 hash: "hash3".to_string(),
705 content_hash: "content3".to_string(),
706 parent: "hash2".to_string(),
707 compressed_hash: "comp3".to_string(),
708 compressed_size: 100,
709 uncompressed_size: 200,
710 cursor: "2024-01-01T02:00:00Z".to_string(),
711 created_at: "2024-01-01T02:00:00Z".to_string(),
712 },
713 ],
714 };
715
716 assert!(index.get_bundle(1).is_some());
717 assert_eq!(index.get_bundle(1).unwrap().bundle_number, 1);
718 assert_eq!(index.get_bundle(1).unwrap().hash, "hash1");
719
720 assert!(index.get_bundle(2).is_some());
721 assert_eq!(index.get_bundle(2).unwrap().bundle_number, 2);
722 assert_eq!(index.get_bundle(2).unwrap().hash, "hash2");
723
724 assert!(index.get_bundle(3).is_some());
725 assert_eq!(index.get_bundle(3).unwrap().bundle_number, 3);
726
727 assert!(index.get_bundle(4).is_none());
728 assert!(index.get_bundle(0).is_none());
729 }
730
731 #[test]
732 fn test_total_uncompressed_size_for_bundles_all() {
733 let index = Index {
734 version: "1.0".to_string(),
735 origin: "test".to_string(),
736 last_bundle: 3,
737 updated_at: "2024-01-01T00:00:00Z".to_string(),
738 total_size_bytes: 300,
739 total_uncompressed_size_bytes: 600,
740 bundles: vec![
741 BundleMetadata {
742 bundle_number: 1,
743 start_time: "2024-01-01T00:00:00Z".to_string(),
744 end_time: "2024-01-01T01:00:00Z".to_string(),
745 operation_count: 10,
746 did_count: 5,
747 hash: "hash1".to_string(),
748 content_hash: "content1".to_string(),
749 parent: String::new(),
750 compressed_hash: "comp1".to_string(),
751 compressed_size: 100,
752 uncompressed_size: 200,
753 cursor: String::new(),
754 created_at: "2024-01-01T00:00:00Z".to_string(),
755 },
756 BundleMetadata {
757 bundle_number: 2,
758 start_time: "2024-01-01T01:00:00Z".to_string(),
759 end_time: "2024-01-01T02:00:00Z".to_string(),
760 operation_count: 10,
761 did_count: 5,
762 hash: "hash2".to_string(),
763 content_hash: "content2".to_string(),
764 parent: "hash1".to_string(),
765 compressed_hash: "comp2".to_string(),
766 compressed_size: 100,
767 uncompressed_size: 200,
768 cursor: "2024-01-01T01:00:00Z".to_string(),
769 created_at: "2024-01-01T01:00:00Z".to_string(),
770 },
771 BundleMetadata {
772 bundle_number: 3,
773 start_time: "2024-01-01T02:00:00Z".to_string(),
774 end_time: "2024-01-01T03:00:00Z".to_string(),
775 operation_count: 10,
776 did_count: 5,
777 hash: "hash3".to_string(),
778 content_hash: "content3".to_string(),
779 parent: "hash2".to_string(),
780 compressed_hash: "comp3".to_string(),
781 compressed_size: 100,
782 uncompressed_size: 200,
783 cursor: "2024-01-01T02:00:00Z".to_string(),
784 created_at: "2024-01-01T02:00:00Z".to_string(),
785 },
786 ],
787 };
788
789 // Test all bundles - should use pre-calculated total
790 let all_bundles = vec![1, 2, 3];
791 assert_eq!(index.total_uncompressed_size_for_bundles(&all_bundles), 600);
792 }
793
794 #[test]
795 fn test_total_uncompressed_size_for_bundles_subset() {
796 let index = Index {
797 version: "1.0".to_string(),
798 origin: "test".to_string(),
799 last_bundle: 3,
800 updated_at: "2024-01-01T00:00:00Z".to_string(),
801 total_size_bytes: 300,
802 total_uncompressed_size_bytes: 600,
803 bundles: vec![
804 BundleMetadata {
805 bundle_number: 1,
806 start_time: "2024-01-01T00:00:00Z".to_string(),
807 end_time: "2024-01-01T01:00:00Z".to_string(),
808 operation_count: 10,
809 did_count: 5,
810 hash: "hash1".to_string(),
811 content_hash: "content1".to_string(),
812 parent: String::new(),
813 compressed_hash: "comp1".to_string(),
814 compressed_size: 100,
815 uncompressed_size: 200,
816 cursor: String::new(),
817 created_at: "2024-01-01T00:00:00Z".to_string(),
818 },
819 BundleMetadata {
820 bundle_number: 2,
821 start_time: "2024-01-01T01:00:00Z".to_string(),
822 end_time: "2024-01-01T02:00:00Z".to_string(),
823 operation_count: 10,
824 did_count: 5,
825 hash: "hash2".to_string(),
826 content_hash: "content2".to_string(),
827 parent: "hash1".to_string(),
828 compressed_hash: "comp2".to_string(),
829 compressed_size: 100,
830 uncompressed_size: 300,
831 cursor: "2024-01-01T01:00:00Z".to_string(),
832 created_at: "2024-01-01T01:00:00Z".to_string(),
833 },
834 BundleMetadata {
835 bundle_number: 3,
836 start_time: "2024-01-01T02:00:00Z".to_string(),
837 end_time: "2024-01-01T03:00:00Z".to_string(),
838 operation_count: 10,
839 did_count: 5,
840 hash: "hash3".to_string(),
841 content_hash: "content3".to_string(),
842 parent: "hash2".to_string(),
843 compressed_hash: "comp3".to_string(),
844 compressed_size: 100,
845 uncompressed_size: 400,
846 cursor: "2024-01-01T02:00:00Z".to_string(),
847 created_at: "2024-01-01T02:00:00Z".to_string(),
848 },
849 ],
850 };
851
852 // Test subset - should sum individual bundles
853 let subset = vec![1, 3];
854 assert_eq!(
855 index.total_uncompressed_size_for_bundles(&subset),
856 200 + 400
857 );
858
859 // Test single bundle
860 let single = vec![2];
861 assert_eq!(index.total_uncompressed_size_for_bundles(&single), 300);
862 }
863
864 #[test]
865 fn test_total_uncompressed_size_for_bundles_empty() {
866 let index = Index {
867 version: "1.0".to_string(),
868 origin: "test".to_string(),
869 last_bundle: 0,
870 updated_at: "2024-01-01T00:00:00Z".to_string(),
871 total_size_bytes: 0,
872 total_uncompressed_size_bytes: 0,
873 bundles: Vec::new(),
874 };
875
876 assert_eq!(index.total_uncompressed_size_for_bundles(&[]), 0);
877 }
878
879 #[test]
880 fn test_total_uncompressed_size_for_bundles_missing_bundle() {
881 let index = Index {
882 version: "1.0".to_string(),
883 origin: "test".to_string(),
884 last_bundle: 2,
885 updated_at: "2024-01-01T00:00:00Z".to_string(),
886 total_size_bytes: 200,
887 total_uncompressed_size_bytes: 400,
888 bundles: vec![
889 BundleMetadata {
890 bundle_number: 1,
891 start_time: "2024-01-01T00:00:00Z".to_string(),
892 end_time: "2024-01-01T01:00:00Z".to_string(),
893 operation_count: 10,
894 did_count: 5,
895 hash: "hash1".to_string(),
896 content_hash: "content1".to_string(),
897 parent: String::new(),
898 compressed_hash: "comp1".to_string(),
899 compressed_size: 100,
900 uncompressed_size: 200,
901 cursor: String::new(),
902 created_at: "2024-01-01T00:00:00Z".to_string(),
903 },
904 BundleMetadata {
905 bundle_number: 2,
906 start_time: "2024-01-01T01:00:00Z".to_string(),
907 end_time: "2024-01-01T02:00:00Z".to_string(),
908 operation_count: 10,
909 did_count: 5,
910 hash: "hash2".to_string(),
911 content_hash: "content2".to_string(),
912 parent: "hash1".to_string(),
913 compressed_hash: "comp2".to_string(),
914 compressed_size: 100,
915 uncompressed_size: 200,
916 cursor: "2024-01-01T01:00:00Z".to_string(),
917 created_at: "2024-01-01T01:00:00Z".to_string(),
918 },
919 ],
920 };
921
922 // Requesting non-existent bundle should be ignored
923 let with_missing = vec![1, 3, 2];
924 assert_eq!(
925 index.total_uncompressed_size_for_bundles(&with_missing),
926 200 + 200
927 );
928 }
929
930 #[test]
931 fn test_validate_last_bundle_mismatch() {
932 let index = Index {
933 version: "1.0".to_string(),
934 origin: "test".to_string(),
935 last_bundle: 5, // Incorrect: actual last bundle is 2
936 updated_at: "2024-01-01T00:00:00Z".to_string(),
937 total_size_bytes: 200,
938 total_uncompressed_size_bytes: 400,
939 bundles: vec![
940 BundleMetadata {
941 bundle_number: 1,
942 start_time: "2024-01-01T00:00:00Z".to_string(),
943 end_time: "2024-01-01T01:00:00Z".to_string(),
944 operation_count: 10,
945 did_count: 5,
946 hash: "hash1".to_string(),
947 content_hash: "content1".to_string(),
948 parent: String::new(),
949 compressed_hash: "comp1".to_string(),
950 compressed_size: 100,
951 uncompressed_size: 200,
952 cursor: String::new(),
953 created_at: "2024-01-01T00:00:00Z".to_string(),
954 },
955 BundleMetadata {
956 bundle_number: 2,
957 start_time: "2024-01-01T01:00:00Z".to_string(),
958 end_time: "2024-01-01T02:00:00Z".to_string(),
959 operation_count: 10,
960 did_count: 5,
961 hash: "hash2".to_string(),
962 content_hash: "content2".to_string(),
963 parent: "hash1".to_string(),
964 compressed_hash: "comp2".to_string(),
965 compressed_size: 100,
966 uncompressed_size: 200,
967 cursor: "2024-01-01T01:00:00Z".to_string(),
968 created_at: "2024-01-01T01:00:00Z".to_string(),
969 },
970 ],
971 };
972
973 let result = index.validate_bundle_sequence();
974 assert!(result.is_err());
975 assert!(
976 result
977 .unwrap_err()
978 .to_string()
979 .contains("last_bundle: index says 5, but last bundle in array is 2")
980 );
981 }
982}