forked from
atscan.net/plcbundle-rs
High-performance implementation of plcbundle written in Rust
1// Migrate command - convert bundles to multi-frame format
2use super::progress::ProgressBar;
3use super::utils::{HasGlobalFlags, format_bytes};
4use anyhow::{Result, bail};
5use clap::Args;
6use plcbundle::BundleManager;
7use plcbundle::constants;
8use std::path::PathBuf;
9use std::time::Instant;
10
11#[derive(Args)]
12#[command(
13 about = "Migrate bundles to new bundle format",
14 long_about = "Rebuild bundles to the new bundle format by recompressing and creating
15multi-frame structure with embedded frame offsets.
16
17The migration process scans for bundles missing frame metadata, recompresses them
18using the multi-frame format (100 operations per frame), generates a frame offset
19index in the metadata, and preserves all cryptographic hashes and metadata.
20Content integrity is verified throughout the process.
21
22Original bundle files are replaced atomically to ensure data safety. The command
23automatically detects which bundles need migration and processes them in parallel
24for efficiency. Use --dry-run to preview what would be migrated without making
25changes.
26
27After migration, bundles benefit from improved random access performance and lower
28memory footprint when accessing individual operations rather than loading entire bundles.",
29 help_template = crate::clap_help!(
30 examples: " # Preview migration (recommended first)\n \
31 {bin} migrate --dry-run\n\n \
32 # Migrate all legacy bundles (auto-detects CPU cores)\n \
33 {bin} migrate\n\n \
34 # Migrate specific bundle range\n \
35 {bin} migrate --bundles 1-100\n\n \
36 # Migrate single bundle\n \
37 {bin} migrate --bundles 42\n\n \
38 # Migrate multiple ranges\n \
39 {bin} migrate --bundles 1-10,20-30,50\n\n \
40 # Force migration even if frame metadata exists\n \
41 {bin} migrate --force\n\n \
42 # Limit threads (if needed for resource constraints)\n \
43 {bin} migrate -j 4\n\n \
44 # Verbose output\n \
45 {bin} migrate -v"
46 )
47)]
48pub struct MigrateCommand {
49 /// Show what would be migrated without migrating
50 #[arg(short = 'n', long)]
51 pub dry_run: bool,
52
53 /// Re-migrate bundles that already have frame metadata
54 #[arg(short, long)]
55 pub force: bool,
56
57 /// Bundle range to migrate (e.g., \"1-100\", \"42\", \"1-10,20-30\", \"latest:10\")
58 /// If not specified, migrates all bundles that need migration
59 #[arg(long)]
60 pub bundles: Option<String>,
61
62 /// Number of threads to use (0 = auto-detect)
63 #[arg(short = 'j', long, default_value = "0")]
64 pub threads: usize,
65}
66
67impl HasGlobalFlags for MigrateCommand {
68 fn verbose(&self) -> bool {
69 false
70 }
71 fn quiet(&self) -> bool {
72 false
73 }
74}
75
76pub fn run(cmd: MigrateCommand, dir: PathBuf, global_verbose: bool) -> Result<()> {
77 let manager = super::utils::create_manager(dir.clone(), global_verbose, false, false)?;
78
79 // Auto-detect number of threads if 0
80 let workers = super::utils::get_worker_threads(cmd.threads, 4);
81
82 eprintln!(
83 "Scanning for legacy bundles in: {}\n",
84 super::utils::display_path(&dir).display()
85 );
86
87 let index = manager.get_index();
88 let bundles = &index.bundles;
89
90 if bundles.is_empty() {
91 eprintln!("No bundles to migrate");
92 return Ok(());
93 }
94
95 // Determine which bundles to consider for migration
96 let last_bundle = index.last_bundle;
97 let target_bundles: Option<std::collections::HashSet<u32>> = if let Some(ref bundles_spec) =
98 cmd.bundles
99 {
100 let bundle_list = super::utils::parse_bundle_spec(Some(bundles_spec.clone()), last_bundle)?;
101 Some(bundle_list.into_iter().collect())
102 } else {
103 None
104 };
105
106 // Check which bundles need migration
107 let mut needs_migration = Vec::new();
108 let mut total_size = 0u64;
109 let mut format_counts = std::collections::HashMap::new();
110
111 for meta in bundles {
112 // Filter by bundle range if specified
113 if let Some(ref target_set) = target_bundles
114 && !target_set.contains(&meta.bundle_number)
115 {
116 continue;
117 }
118
119 let embedded_meta = manager.get_embedded_metadata(meta.bundle_number)?;
120 let (old_format, has_frame_offsets) = match embedded_meta {
121 Some(ref m) if !m.frame_offsets.is_empty() => (m.format.clone(), true),
122 Some(m) => (m.format.clone(), false),
123 None => ("v0 (single-frame)".to_string(), false),
124 };
125
126 let needs_migrate = cmd.force || !has_frame_offsets;
127
128 if needs_migrate {
129 needs_migration.push(BundleMigrationInfo {
130 bundle_number: meta.bundle_number,
131 old_size: meta.compressed_size,
132 uncompressed_size: meta.uncompressed_size,
133 old_format,
134 });
135 total_size += meta.compressed_size;
136 *format_counts
137 .entry(needs_migration.last().unwrap().old_format.clone())
138 .or_insert(0) += 1;
139 }
140 }
141
142 if needs_migration.is_empty() {
143 if let Some(ref bundles_spec) = cmd.bundles {
144 eprintln!("No bundles in range '{}' need migration", bundles_spec);
145 } else {
146 eprintln!("✓ All bundles already migrated");
147 }
148 eprintln!("\nUse --force to re-migrate");
149 return Ok(());
150 }
151
152 // Sort bundles by number to ensure chain integrity (migrate in order: 1, 2, 3, ...)
153 needs_migration.sort_by_key(|info| info.bundle_number);
154
155 // Show migration plan
156 eprintln!("Migration Plan");
157 eprintln!("══════════════\n");
158
159 if let Some(ref bundles_spec) = cmd.bundles {
160 eprintln!(" Range: {}", bundles_spec);
161 }
162
163 let mut format_parts = Vec::new();
164 for (format, count) in &format_counts {
165 format_parts.push(format!("{} ({})", format, count));
166 }
167 eprintln!(
168 " Format: {} → {}/1.0",
169 format_parts.join(" + "),
170 constants::BINARY_NAME
171 );
172
173 let total_uncompressed: u64 = needs_migration.iter().map(|i| i.uncompressed_size).sum();
174 let avg_compression = if total_size > 0 {
175 total_uncompressed as f64 / total_size as f64
176 } else {
177 0.0
178 };
179
180 eprintln!(" Bundles: {}", needs_migration.len());
181 eprintln!(
182 " Size: {} ({:.3}x compression)",
183 format_bytes(total_size),
184 avg_compression
185 );
186 eprintln!(
187 " Workers: {}, Compression Level: {}\n",
188 workers,
189 constants::ZSTD_COMPRESSION_LEVEL
190 );
191
192 if cmd.dry_run {
193 eprintln!("💡 Dry-run mode");
194 return Ok(());
195 }
196
197 // Execute migration
198 eprintln!("Migrating...\n");
199
200 let start = Instant::now();
201
202 // Calculate total bytes to process
203 let total_bytes: u64 = needs_migration.iter().map(|info| info.old_size).sum();
204 let progress = ProgressBar::with_bytes(needs_migration.len(), total_bytes);
205
206 let mut success = 0;
207 let mut failed = 0;
208 let mut first_error: Option<anyhow::Error> = None;
209 let mut hash_changes = Vec::new();
210
211 let mut total_old_size = 0u64;
212 let mut total_new_size = 0u64;
213 let mut total_old_uncompressed = 0u64;
214 let mut total_new_uncompressed = 0u64;
215
216 // Parallel migration using rayon
217 // Note: Even though we use parallelism, bundles MUST be migrated in order
218 // for chain integrity. We parallelize the WORK (compression) but commit sequentially.
219 use rayon::prelude::*;
220 use std::sync::{Arc, Mutex};
221
222 let progress_arc = Arc::new(Mutex::new(progress));
223 // Use atomics for counters to reduce lock contention
224 use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
225 let count_atomic = Arc::new(AtomicUsize::new(0));
226 let bytes_atomic = Arc::new(AtomicU64::new(0));
227
228 // Update progress bar less frequently to reduce contention
229 // Update every N bundles or every 100ms, whichever comes first
230 let update_interval = (workers.max(1) * 4).max(10); // At least every 10 bundles, or 4x workers
231
232 let results: Vec<_> = if workers > 1 {
233 // Parallel mode: process in chunks to maintain some ordering
234 // Increase chunk size to reduce contention
235 let chunk_size = workers * 4; // Process 4x workers at a time for better pipelining
236
237 needs_migration
238 .par_chunks(chunk_size)
239 .flat_map(|chunk| {
240 // Process chunk in parallel
241 chunk
242 .par_iter()
243 .map(|info| {
244 let result = manager.migrate_bundle(info.bundle_number);
245
246 // Update atomics (lock-free)
247 let current_count = count_atomic.fetch_add(1, Ordering::Relaxed) + 1;
248 let total_bytes = bytes_atomic.fetch_add(info.old_size, Ordering::Relaxed)
249 + info.old_size;
250
251 // Only update progress bar periodically to reduce lock contention
252 if current_count.is_multiple_of(update_interval) || current_count == 1 {
253 let prog = progress_arc.lock().unwrap();
254 prog.set_with_bytes(current_count, total_bytes);
255 }
256
257 (info, result)
258 })
259 .collect::<Vec<_>>()
260 })
261 .collect()
262 } else {
263 // Sequential mode - can update more frequently
264 needs_migration
265 .iter()
266 .enumerate()
267 .map(|(i, info)| {
268 let result = manager.migrate_bundle(info.bundle_number);
269
270 let current_count = i + 1;
271 let total_bytes =
272 bytes_atomic.fetch_add(info.old_size, Ordering::Relaxed) + info.old_size;
273
274 // Update every bundle in sequential mode (no contention)
275 let prog = progress_arc.lock().unwrap();
276 prog.set_with_bytes(current_count, total_bytes);
277
278 (info, result)
279 })
280 .collect()
281 };
282
283 // Process results
284 for (info, result) in results {
285 total_old_size += info.old_size;
286 total_old_uncompressed += info.uncompressed_size;
287
288 match result {
289 Ok((size_diff, new_uncompressed_size, _new_compressed_size)) => {
290 success += 1;
291 hash_changes.push(info.bundle_number);
292
293 let new_size = (info.old_size as i64 + size_diff) as u64;
294 total_new_size += new_size;
295 total_new_uncompressed += new_uncompressed_size;
296
297 if global_verbose {
298 let old_ratio = info.uncompressed_size as f64 / info.old_size as f64;
299 let new_ratio = new_uncompressed_size as f64 / new_size as f64;
300 let size_change = if size_diff >= 0 {
301 format!("+{}", format_bytes(size_diff as u64))
302 } else {
303 format!("-{}", format_bytes((-size_diff) as u64))
304 };
305 eprintln!(
306 "✓ {}: {:.3}x→{:.3}x {}",
307 info.bundle_number, old_ratio, new_ratio, size_change
308 );
309 }
310 }
311 Err(e) => {
312 failed += 1;
313 let err_msg = e.to_string();
314
315 // Always print chain hash errors (even in non-verbose mode)
316 if err_msg.contains("Chain hash mismatch")
317 || err_msg.contains("Parent hash mismatch")
318 {
319 eprintln!("\n❌ Bundle {}: {}", info.bundle_number, err_msg);
320 } else if global_verbose {
321 eprintln!("✗ Bundle {} failed: {}", info.bundle_number, e);
322 }
323
324 if first_error.is_none() {
325 first_error = Some(e);
326 }
327 }
328 }
329 }
330
331 // Final progress update with accurate counts
332 {
333 let final_count = count_atomic.load(Ordering::Relaxed);
334 let final_bytes = bytes_atomic.load(Ordering::Relaxed);
335 let prog = progress_arc.lock().unwrap();
336 prog.set_with_bytes(final_count, final_bytes);
337 prog.finish();
338 }
339 let elapsed = start.elapsed();
340
341 // Update index (already done in migrate_bundle, but verify)
342 if !hash_changes.is_empty() && global_verbose {
343 eprintln!("\nUpdating index...");
344 let update_start = Instant::now();
345 // Index is already updated by migrate_bundle, just verify
346 eprintln!(
347 " ✓ {} entries in {:?}",
348 hash_changes.len(),
349 update_start.elapsed()
350 );
351 }
352
353 // Summary
354 eprintln!();
355 if failed == 0 {
356 eprintln!("✓ Complete: {} bundles in {:?}\n", success, elapsed);
357
358 if total_old_size > 0 && success > 0 {
359 let size_diff = total_new_size as i64 - total_old_size as i64;
360 let old_ratio = total_old_uncompressed as f64 / total_old_size as f64;
361 let new_ratio = total_new_uncompressed as f64 / total_new_size as f64;
362 let ratio_diff = new_ratio - old_ratio;
363
364 // Measure actual metadata size
365 let mut total_actual_metadata = 0u64;
366 for bundle_num in &hash_changes {
367 if let Ok(meta_size) = measure_metadata_size(&manager, *bundle_num) {
368 total_actual_metadata += meta_size;
369 }
370 }
371
372 eprintln!(" Old New Change");
373 eprintln!(" ──────── ──────── ─────────");
374 let size_change = if size_diff >= 0 {
375 format!("+{}", format_bytes(size_diff as u64))
376 } else {
377 format!("-{}", format_bytes((-size_diff) as u64))
378 };
379 eprintln!(
380 "Size {:<13} {:<13} {} ({:.1}%)",
381 format_bytes(total_old_size),
382 format_bytes(total_new_size),
383 size_change,
384 size_diff as f64 / total_old_size as f64 * 100.0
385 );
386 let old_ratio_str = format!("{:.3}x", old_ratio);
387 let new_ratio_str = format!("{:.3}x", new_ratio);
388 let ratio_diff_str = format!("{:+.3}x", ratio_diff);
389 eprintln!(
390 "Ratio {:<13} {:<13} {}",
391 old_ratio_str, new_ratio_str, ratio_diff_str
392 );
393 let avg_change = size_diff / success as i64;
394 let avg_change_str = if avg_change >= 0 {
395 format!("+{}", format_bytes(avg_change as u64))
396 } else {
397 format!("-{}", format_bytes((-avg_change) as u64))
398 };
399 eprintln!(
400 "Avg/bundle {:<13} {:<13} {}\n",
401 format_bytes(total_old_size / success as u64),
402 format_bytes(total_new_size / success as u64),
403 avg_change_str
404 );
405
406 if total_actual_metadata > 0 {
407 let compression_efficiency = size_diff - total_actual_metadata as i64;
408 let threshold = total_old_size as i64 / 1000; // 0.1% of old size
409
410 eprintln!("Breakdown:");
411 eprintln!(
412 " Metadata: {} (~{}/bundle, structural)",
413 format_bytes(total_actual_metadata),
414 format_bytes(total_actual_metadata / success as u64)
415 );
416
417 if compression_efficiency.abs() > threshold {
418 if compression_efficiency > 0 {
419 let pct_worse =
420 compression_efficiency as f64 / total_old_size as f64 * 100.0;
421 eprintln!(
422 " Compression: {} ({:.2}% worse)",
423 format_bytes(compression_efficiency as u64),
424 pct_worse
425 );
426 } else {
427 let pct_better =
428 (-compression_efficiency) as f64 / total_old_size as f64 * 100.0;
429 eprintln!(
430 " Compression: {} ({:.2}% better)",
431 format_bytes((-compression_efficiency) as u64),
432 pct_better
433 );
434 }
435 } else {
436 eprintln!(" Compression: unchanged");
437 }
438 }
439 eprintln!();
440 }
441 } else {
442 eprintln!("⚠️ Failed: {} bundles", failed);
443 if let Some(ref err) = first_error {
444 let err_msg = err.to_string();
445 eprintln!(" First error: {}", err);
446
447 // Provide helpful guidance for chain hash errors
448 if err_msg.contains("Chain hash mismatch") {
449 eprintln!("\n💡 Chain hash errors indicate:");
450 eprintln!(" • The bundle content doesn't match the expected chain hash");
451 eprintln!(" • This could mean the original bundle was corrupted or modified");
452 eprintln!(" • The chain integrity check is working correctly");
453 eprintln!("\n To diagnose:");
454 eprintln!(
455 " 1. Run '{} verify' to check all bundles",
456 constants::BINARY_NAME
457 );
458 eprintln!(" 2. Check if the bundle file was manually modified");
459 eprintln!(" 3. Re-sync affected bundles from the PLC directory");
460 } else if err_msg.contains("Parent hash mismatch") {
461 eprintln!("\n💡 Parent hash errors indicate:");
462 eprintln!(" • The chain linkage is broken between bundles");
463 eprintln!(" • Bundles may have been migrated out of order");
464 eprintln!(" • The index metadata may be inconsistent");
465 eprintln!("\n To fix:");
466 eprintln!(
467 " 1. Run '{} verify' to identify all broken links",
468 constants::BINARY_NAME
469 );
470 eprintln!(" 2. Ensure bundles are migrated in sequential order (1, 2, 3, ...)");
471 }
472 }
473 bail!("migration failed for {} bundles", failed);
474 }
475
476 Ok(())
477}
478
479struct BundleMigrationInfo {
480 bundle_number: u32,
481 old_size: u64,
482 uncompressed_size: u64,
483 old_format: String,
484}
485
486fn measure_metadata_size(manager: &BundleManager, bundle_num: u32) -> Result<u64> {
487 use std::io::Read;
488
489 let mut file = manager.stream_bundle_raw(bundle_num)?;
490
491 // Read magic (4 bytes) + size (4 bytes)
492 let mut header = [0u8; 8];
493 file.read_exact(&mut header)?;
494
495 // Check if it's a skippable frame
496 let magic = u32::from_le_bytes([header[0], header[1], header[2], header[3]]);
497 if !(0x184D2A50..=0x184D2A5F).contains(&magic) {
498 return Ok(0); // No metadata frame
499 }
500
501 // Get frame data size
502 let frame_size = u32::from_le_bytes([header[4], header[5], header[6], header[7]]) as u64;
503
504 // Total metadata size = 4 (magic) + 4 (size) + frameSize (data)
505 Ok(8 + frame_size)
506}