High-performance implementation of plcbundle written in Rust
at main 506 lines 20 kB view raw
1// Migrate command - convert bundles to multi-frame format 2use super::progress::ProgressBar; 3use super::utils::{HasGlobalFlags, format_bytes}; 4use anyhow::{Result, bail}; 5use clap::Args; 6use plcbundle::BundleManager; 7use plcbundle::constants; 8use std::path::PathBuf; 9use std::time::Instant; 10 11#[derive(Args)] 12#[command( 13 about = "Migrate bundles to new bundle format", 14 long_about = "Rebuild bundles to the new bundle format by recompressing and creating 15multi-frame structure with embedded frame offsets. 16 17The migration process scans for bundles missing frame metadata, recompresses them 18using the multi-frame format (100 operations per frame), generates a frame offset 19index in the metadata, and preserves all cryptographic hashes and metadata. 20Content integrity is verified throughout the process. 21 22Original bundle files are replaced atomically to ensure data safety. The command 23automatically detects which bundles need migration and processes them in parallel 24for efficiency. Use --dry-run to preview what would be migrated without making 25changes. 26 27After migration, bundles benefit from improved random access performance and lower 28memory footprint when accessing individual operations rather than loading entire bundles.", 29 help_template = crate::clap_help!( 30 examples: " # Preview migration (recommended first)\n \ 31 {bin} migrate --dry-run\n\n \ 32 # Migrate all legacy bundles (auto-detects CPU cores)\n \ 33 {bin} migrate\n\n \ 34 # Migrate specific bundle range\n \ 35 {bin} migrate --bundles 1-100\n\n \ 36 # Migrate single bundle\n \ 37 {bin} migrate --bundles 42\n\n \ 38 # Migrate multiple ranges\n \ 39 {bin} migrate --bundles 1-10,20-30,50\n\n \ 40 # Force migration even if frame metadata exists\n \ 41 {bin} migrate --force\n\n \ 42 # Limit threads (if needed for resource constraints)\n \ 43 {bin} migrate -j 4\n\n \ 44 # Verbose output\n \ 45 {bin} migrate -v" 46 ) 47)] 48pub struct MigrateCommand { 49 /// Show what would be migrated without migrating 50 #[arg(short = 'n', long)] 51 pub dry_run: bool, 52 53 /// Re-migrate bundles that already have frame metadata 54 #[arg(short, long)] 55 pub force: bool, 56 57 /// Bundle range to migrate (e.g., \"1-100\", \"42\", \"1-10,20-30\", \"latest:10\") 58 /// If not specified, migrates all bundles that need migration 59 #[arg(long)] 60 pub bundles: Option<String>, 61 62 /// Number of threads to use (0 = auto-detect) 63 #[arg(short = 'j', long, default_value = "0")] 64 pub threads: usize, 65} 66 67impl HasGlobalFlags for MigrateCommand { 68 fn verbose(&self) -> bool { 69 false 70 } 71 fn quiet(&self) -> bool { 72 false 73 } 74} 75 76pub fn run(cmd: MigrateCommand, dir: PathBuf, global_verbose: bool) -> Result<()> { 77 let manager = super::utils::create_manager(dir.clone(), global_verbose, false, false)?; 78 79 // Auto-detect number of threads if 0 80 let workers = super::utils::get_worker_threads(cmd.threads, 4); 81 82 eprintln!( 83 "Scanning for legacy bundles in: {}\n", 84 super::utils::display_path(&dir).display() 85 ); 86 87 let index = manager.get_index(); 88 let bundles = &index.bundles; 89 90 if bundles.is_empty() { 91 eprintln!("No bundles to migrate"); 92 return Ok(()); 93 } 94 95 // Determine which bundles to consider for migration 96 let last_bundle = index.last_bundle; 97 let target_bundles: Option<std::collections::HashSet<u32>> = if let Some(ref bundles_spec) = 98 cmd.bundles 99 { 100 let bundle_list = super::utils::parse_bundle_spec(Some(bundles_spec.clone()), last_bundle)?; 101 Some(bundle_list.into_iter().collect()) 102 } else { 103 None 104 }; 105 106 // Check which bundles need migration 107 let mut needs_migration = Vec::new(); 108 let mut total_size = 0u64; 109 let mut format_counts = std::collections::HashMap::new(); 110 111 for meta in bundles { 112 // Filter by bundle range if specified 113 if let Some(ref target_set) = target_bundles 114 && !target_set.contains(&meta.bundle_number) 115 { 116 continue; 117 } 118 119 let embedded_meta = manager.get_embedded_metadata(meta.bundle_number)?; 120 let (old_format, has_frame_offsets) = match embedded_meta { 121 Some(ref m) if !m.frame_offsets.is_empty() => (m.format.clone(), true), 122 Some(m) => (m.format.clone(), false), 123 None => ("v0 (single-frame)".to_string(), false), 124 }; 125 126 let needs_migrate = cmd.force || !has_frame_offsets; 127 128 if needs_migrate { 129 needs_migration.push(BundleMigrationInfo { 130 bundle_number: meta.bundle_number, 131 old_size: meta.compressed_size, 132 uncompressed_size: meta.uncompressed_size, 133 old_format, 134 }); 135 total_size += meta.compressed_size; 136 *format_counts 137 .entry(needs_migration.last().unwrap().old_format.clone()) 138 .or_insert(0) += 1; 139 } 140 } 141 142 if needs_migration.is_empty() { 143 if let Some(ref bundles_spec) = cmd.bundles { 144 eprintln!("No bundles in range '{}' need migration", bundles_spec); 145 } else { 146 eprintln!("✓ All bundles already migrated"); 147 } 148 eprintln!("\nUse --force to re-migrate"); 149 return Ok(()); 150 } 151 152 // Sort bundles by number to ensure chain integrity (migrate in order: 1, 2, 3, ...) 153 needs_migration.sort_by_key(|info| info.bundle_number); 154 155 // Show migration plan 156 eprintln!("Migration Plan"); 157 eprintln!("══════════════\n"); 158 159 if let Some(ref bundles_spec) = cmd.bundles { 160 eprintln!(" Range: {}", bundles_spec); 161 } 162 163 let mut format_parts = Vec::new(); 164 for (format, count) in &format_counts { 165 format_parts.push(format!("{} ({})", format, count)); 166 } 167 eprintln!( 168 " Format: {}{}/1.0", 169 format_parts.join(" + "), 170 constants::BINARY_NAME 171 ); 172 173 let total_uncompressed: u64 = needs_migration.iter().map(|i| i.uncompressed_size).sum(); 174 let avg_compression = if total_size > 0 { 175 total_uncompressed as f64 / total_size as f64 176 } else { 177 0.0 178 }; 179 180 eprintln!(" Bundles: {}", needs_migration.len()); 181 eprintln!( 182 " Size: {} ({:.3}x compression)", 183 format_bytes(total_size), 184 avg_compression 185 ); 186 eprintln!( 187 " Workers: {}, Compression Level: {}\n", 188 workers, 189 constants::ZSTD_COMPRESSION_LEVEL 190 ); 191 192 if cmd.dry_run { 193 eprintln!("💡 Dry-run mode"); 194 return Ok(()); 195 } 196 197 // Execute migration 198 eprintln!("Migrating...\n"); 199 200 let start = Instant::now(); 201 202 // Calculate total bytes to process 203 let total_bytes: u64 = needs_migration.iter().map(|info| info.old_size).sum(); 204 let progress = ProgressBar::with_bytes(needs_migration.len(), total_bytes); 205 206 let mut success = 0; 207 let mut failed = 0; 208 let mut first_error: Option<anyhow::Error> = None; 209 let mut hash_changes = Vec::new(); 210 211 let mut total_old_size = 0u64; 212 let mut total_new_size = 0u64; 213 let mut total_old_uncompressed = 0u64; 214 let mut total_new_uncompressed = 0u64; 215 216 // Parallel migration using rayon 217 // Note: Even though we use parallelism, bundles MUST be migrated in order 218 // for chain integrity. We parallelize the WORK (compression) but commit sequentially. 219 use rayon::prelude::*; 220 use std::sync::{Arc, Mutex}; 221 222 let progress_arc = Arc::new(Mutex::new(progress)); 223 // Use atomics for counters to reduce lock contention 224 use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; 225 let count_atomic = Arc::new(AtomicUsize::new(0)); 226 let bytes_atomic = Arc::new(AtomicU64::new(0)); 227 228 // Update progress bar less frequently to reduce contention 229 // Update every N bundles or every 100ms, whichever comes first 230 let update_interval = (workers.max(1) * 4).max(10); // At least every 10 bundles, or 4x workers 231 232 let results: Vec<_> = if workers > 1 { 233 // Parallel mode: process in chunks to maintain some ordering 234 // Increase chunk size to reduce contention 235 let chunk_size = workers * 4; // Process 4x workers at a time for better pipelining 236 237 needs_migration 238 .par_chunks(chunk_size) 239 .flat_map(|chunk| { 240 // Process chunk in parallel 241 chunk 242 .par_iter() 243 .map(|info| { 244 let result = manager.migrate_bundle(info.bundle_number); 245 246 // Update atomics (lock-free) 247 let current_count = count_atomic.fetch_add(1, Ordering::Relaxed) + 1; 248 let total_bytes = bytes_atomic.fetch_add(info.old_size, Ordering::Relaxed) 249 + info.old_size; 250 251 // Only update progress bar periodically to reduce lock contention 252 if current_count.is_multiple_of(update_interval) || current_count == 1 { 253 let prog = progress_arc.lock().unwrap(); 254 prog.set_with_bytes(current_count, total_bytes); 255 } 256 257 (info, result) 258 }) 259 .collect::<Vec<_>>() 260 }) 261 .collect() 262 } else { 263 // Sequential mode - can update more frequently 264 needs_migration 265 .iter() 266 .enumerate() 267 .map(|(i, info)| { 268 let result = manager.migrate_bundle(info.bundle_number); 269 270 let current_count = i + 1; 271 let total_bytes = 272 bytes_atomic.fetch_add(info.old_size, Ordering::Relaxed) + info.old_size; 273 274 // Update every bundle in sequential mode (no contention) 275 let prog = progress_arc.lock().unwrap(); 276 prog.set_with_bytes(current_count, total_bytes); 277 278 (info, result) 279 }) 280 .collect() 281 }; 282 283 // Process results 284 for (info, result) in results { 285 total_old_size += info.old_size; 286 total_old_uncompressed += info.uncompressed_size; 287 288 match result { 289 Ok((size_diff, new_uncompressed_size, _new_compressed_size)) => { 290 success += 1; 291 hash_changes.push(info.bundle_number); 292 293 let new_size = (info.old_size as i64 + size_diff) as u64; 294 total_new_size += new_size; 295 total_new_uncompressed += new_uncompressed_size; 296 297 if global_verbose { 298 let old_ratio = info.uncompressed_size as f64 / info.old_size as f64; 299 let new_ratio = new_uncompressed_size as f64 / new_size as f64; 300 let size_change = if size_diff >= 0 { 301 format!("+{}", format_bytes(size_diff as u64)) 302 } else { 303 format!("-{}", format_bytes((-size_diff) as u64)) 304 }; 305 eprintln!( 306 "{}: {:.3}x→{:.3}x {}", 307 info.bundle_number, old_ratio, new_ratio, size_change 308 ); 309 } 310 } 311 Err(e) => { 312 failed += 1; 313 let err_msg = e.to_string(); 314 315 // Always print chain hash errors (even in non-verbose mode) 316 if err_msg.contains("Chain hash mismatch") 317 || err_msg.contains("Parent hash mismatch") 318 { 319 eprintln!("\n❌ Bundle {}: {}", info.bundle_number, err_msg); 320 } else if global_verbose { 321 eprintln!("✗ Bundle {} failed: {}", info.bundle_number, e); 322 } 323 324 if first_error.is_none() { 325 first_error = Some(e); 326 } 327 } 328 } 329 } 330 331 // Final progress update with accurate counts 332 { 333 let final_count = count_atomic.load(Ordering::Relaxed); 334 let final_bytes = bytes_atomic.load(Ordering::Relaxed); 335 let prog = progress_arc.lock().unwrap(); 336 prog.set_with_bytes(final_count, final_bytes); 337 prog.finish(); 338 } 339 let elapsed = start.elapsed(); 340 341 // Update index (already done in migrate_bundle, but verify) 342 if !hash_changes.is_empty() && global_verbose { 343 eprintln!("\nUpdating index..."); 344 let update_start = Instant::now(); 345 // Index is already updated by migrate_bundle, just verify 346 eprintln!( 347 "{} entries in {:?}", 348 hash_changes.len(), 349 update_start.elapsed() 350 ); 351 } 352 353 // Summary 354 eprintln!(); 355 if failed == 0 { 356 eprintln!("✓ Complete: {} bundles in {:?}\n", success, elapsed); 357 358 if total_old_size > 0 && success > 0 { 359 let size_diff = total_new_size as i64 - total_old_size as i64; 360 let old_ratio = total_old_uncompressed as f64 / total_old_size as f64; 361 let new_ratio = total_new_uncompressed as f64 / total_new_size as f64; 362 let ratio_diff = new_ratio - old_ratio; 363 364 // Measure actual metadata size 365 let mut total_actual_metadata = 0u64; 366 for bundle_num in &hash_changes { 367 if let Ok(meta_size) = measure_metadata_size(&manager, *bundle_num) { 368 total_actual_metadata += meta_size; 369 } 370 } 371 372 eprintln!(" Old New Change"); 373 eprintln!(" ──────── ──────── ─────────"); 374 let size_change = if size_diff >= 0 { 375 format!("+{}", format_bytes(size_diff as u64)) 376 } else { 377 format!("-{}", format_bytes((-size_diff) as u64)) 378 }; 379 eprintln!( 380 "Size {:<13} {:<13} {} ({:.1}%)", 381 format_bytes(total_old_size), 382 format_bytes(total_new_size), 383 size_change, 384 size_diff as f64 / total_old_size as f64 * 100.0 385 ); 386 let old_ratio_str = format!("{:.3}x", old_ratio); 387 let new_ratio_str = format!("{:.3}x", new_ratio); 388 let ratio_diff_str = format!("{:+.3}x", ratio_diff); 389 eprintln!( 390 "Ratio {:<13} {:<13} {}", 391 old_ratio_str, new_ratio_str, ratio_diff_str 392 ); 393 let avg_change = size_diff / success as i64; 394 let avg_change_str = if avg_change >= 0 { 395 format!("+{}", format_bytes(avg_change as u64)) 396 } else { 397 format!("-{}", format_bytes((-avg_change) as u64)) 398 }; 399 eprintln!( 400 "Avg/bundle {:<13} {:<13} {}\n", 401 format_bytes(total_old_size / success as u64), 402 format_bytes(total_new_size / success as u64), 403 avg_change_str 404 ); 405 406 if total_actual_metadata > 0 { 407 let compression_efficiency = size_diff - total_actual_metadata as i64; 408 let threshold = total_old_size as i64 / 1000; // 0.1% of old size 409 410 eprintln!("Breakdown:"); 411 eprintln!( 412 " Metadata: {} (~{}/bundle, structural)", 413 format_bytes(total_actual_metadata), 414 format_bytes(total_actual_metadata / success as u64) 415 ); 416 417 if compression_efficiency.abs() > threshold { 418 if compression_efficiency > 0 { 419 let pct_worse = 420 compression_efficiency as f64 / total_old_size as f64 * 100.0; 421 eprintln!( 422 " Compression: {} ({:.2}% worse)", 423 format_bytes(compression_efficiency as u64), 424 pct_worse 425 ); 426 } else { 427 let pct_better = 428 (-compression_efficiency) as f64 / total_old_size as f64 * 100.0; 429 eprintln!( 430 " Compression: {} ({:.2}% better)", 431 format_bytes((-compression_efficiency) as u64), 432 pct_better 433 ); 434 } 435 } else { 436 eprintln!(" Compression: unchanged"); 437 } 438 } 439 eprintln!(); 440 } 441 } else { 442 eprintln!("⚠️ Failed: {} bundles", failed); 443 if let Some(ref err) = first_error { 444 let err_msg = err.to_string(); 445 eprintln!(" First error: {}", err); 446 447 // Provide helpful guidance for chain hash errors 448 if err_msg.contains("Chain hash mismatch") { 449 eprintln!("\n💡 Chain hash errors indicate:"); 450 eprintln!(" • The bundle content doesn't match the expected chain hash"); 451 eprintln!(" • This could mean the original bundle was corrupted or modified"); 452 eprintln!(" • The chain integrity check is working correctly"); 453 eprintln!("\n To diagnose:"); 454 eprintln!( 455 " 1. Run '{} verify' to check all bundles", 456 constants::BINARY_NAME 457 ); 458 eprintln!(" 2. Check if the bundle file was manually modified"); 459 eprintln!(" 3. Re-sync affected bundles from the PLC directory"); 460 } else if err_msg.contains("Parent hash mismatch") { 461 eprintln!("\n💡 Parent hash errors indicate:"); 462 eprintln!(" • The chain linkage is broken between bundles"); 463 eprintln!(" • Bundles may have been migrated out of order"); 464 eprintln!(" • The index metadata may be inconsistent"); 465 eprintln!("\n To fix:"); 466 eprintln!( 467 " 1. Run '{} verify' to identify all broken links", 468 constants::BINARY_NAME 469 ); 470 eprintln!(" 2. Ensure bundles are migrated in sequential order (1, 2, 3, ...)"); 471 } 472 } 473 bail!("migration failed for {} bundles", failed); 474 } 475 476 Ok(()) 477} 478 479struct BundleMigrationInfo { 480 bundle_number: u32, 481 old_size: u64, 482 uncompressed_size: u64, 483 old_format: String, 484} 485 486fn measure_metadata_size(manager: &BundleManager, bundle_num: u32) -> Result<u64> { 487 use std::io::Read; 488 489 let mut file = manager.stream_bundle_raw(bundle_num)?; 490 491 // Read magic (4 bytes) + size (4 bytes) 492 let mut header = [0u8; 8]; 493 file.read_exact(&mut header)?; 494 495 // Check if it's a skippable frame 496 let magic = u32::from_le_bytes([header[0], header[1], header[2], header[3]]); 497 if !(0x184D2A50..=0x184D2A5F).contains(&magic) { 498 return Ok(0); // No metadata frame 499 } 500 501 // Get frame data size 502 let frame_size = u32::from_le_bytes([header[4], header[5], header[6], header[7]]) as u64; 503 504 // Total metadata size = 4 (magic) + 4 (size) + frameSize (data) 505 Ok(8 + frame_size) 506}