High-performance implementation of plcbundle written in Rust
at main 527 lines 19 kB view raw
1use anyhow::Result; 2use clap::{Args, ValueHint}; 3use std::collections::HashSet; 4use std::fs::File; 5use std::io::{self, BufRead, BufReader, BufWriter, Write}; 6use std::path::PathBuf; 7use std::sync::Arc; 8use std::sync::Mutex; 9use std::time::Instant; 10 11use super::progress::ProgressBar; 12use super::utils; 13use plcbundle::constants::{ 14 bundle_position_to_global, global_to_bundle_position, total_operations_from_bundles, 15}; 16use plcbundle::format::format_std_duration_auto; 17 18#[derive(Args)] 19#[command( 20 about = "Export operations to stdout or file", 21 long_about = "Export operations from bundles as raw JSONL (JSON Lines) format, suitable 22for processing with standard Unix tools or importing into other systems. 23 24This command streams operations directly from compressed bundles without 25loading them fully into memory, making it efficient for large exports. 26Operations are written one per line in JSON format, preserving all original 27operation data including DID, CID, timestamps, and operation payloads. 28 29Use --bundles to export specific bundles or ranges, or --all to export 30everything. The --count flag limits the total number of operations exported, 31useful for sampling or testing. Output goes to stdout by default, or specify 32--output to write to a file. 33 34This is the primary way to extract raw operation data from the repository 35for analysis, backup, or migration to other systems.", 36 help_template = crate::clap_help!( 37 examples: " # Export all bundles to stdout\n \ 38 {bin} export --all\n\n \ 39 # Export specific bundles\n \ 40 {bin} export 1-100\n \ 41 {bin} export 42\n\n \ 42 # Export to file\n \ 43 {bin} export --all -o operations.jsonl\n\n \ 44 # Export first 1000 operations\n \ 45 {bin} export --all --count 1000\n\n \ 46 # Export in reverse order\n \ 47 {bin} export --all --reverse\n\n \ 48 # Export specific operations by global position (0-indexed)\n \ 49 {bin} export --ops 0-999\n \ 50 {bin} export --ops 3255,553,0-9" 51 ) 52)] 53pub struct ExportCommand { 54 /// Bundle range to export (e.g., "42", "1-100", or "1-10,20-30") 55 #[arg(value_name = "BUNDLES", conflicts_with_all = ["all", "ops"])] 56 pub bundles: Option<String>, 57 58 /// Export all bundles 59 #[arg(long, conflicts_with = "ops")] 60 pub all: bool, 61 62 /// Export specific operations by global position (0-indexed, e.g., "0-999", "3255,553,0-9") 63 #[arg(long, conflicts_with_all = ["all", "bundles"])] 64 pub ops: Option<String>, 65 66 /// Output file (default: stdout) 67 #[arg(short, long, value_hint = ValueHint::FilePath)] 68 pub output: Option<PathBuf>, 69 70 /// Limit number of operations to export 71 #[arg(short = 'n', long)] 72 pub count: Option<usize>, 73 74 /// Export in reverse order (bundles and operations within bundles) 75 #[arg(long)] 76 pub reverse: bool, 77} 78 79pub fn run(cmd: ExportCommand, dir: PathBuf, quiet: bool, verbose: bool) -> Result<()> { 80 let output = cmd.output; 81 let count = cmd.count; 82 let reverse = cmd.reverse; 83 // Create BundleManager (follows RULES.md - NO direct file access from CLI) 84 let manager = super::utils::create_manager(dir.clone(), verbose, quiet, false)?; 85 let index = manager.get_index(); 86 let max_bundle = index.last_bundle; 87 88 // Determine what to export: bundles or specific operations 89 // Use a more efficient representation: store ranges and individual ops separately 90 // to avoid materializing millions of operations 91 let ops_to_export: Option<OperationFilter> = if let Some(ops_str) = cmd.ops { 92 // Calculate max operation number (0-indexed: (max_bundle * BUNDLE_SIZE) - 1) 93 // This is an upper bound; actual last operation may be lower if last bundle isn't full 94 let max_operation = total_operations_from_bundles(max_bundle).saturating_sub(1); 95 Some(OperationFilter::parse(&ops_str, max_operation)?) 96 } else { 97 None 98 }; 99 100 // Determine bundle numbers to process 101 let mut bundle_numbers: Vec<u32> = if let Some(bundles_str) = cmd.bundles { 102 utils::parse_bundle_spec(Some(bundles_str), max_bundle)? 103 } else if cmd.all { 104 (1..=max_bundle).collect() 105 } else if let Some(ref filter) = ops_to_export { 106 // When exporting specific operations, calculate which bundles are needed 107 // from the operation ranges without materializing all operations 108 filter.get_bundle_numbers(max_bundle) 109 } else { 110 anyhow::bail!("Must specify either --bundles, --all, or --ops"); 111 }; 112 113 // Reverse bundle order if requested 114 if reverse { 115 bundle_numbers.reverse(); 116 } 117 118 if verbose && !quiet { 119 log::debug!("Index: v{} ({})", index.version, index.origin); 120 log::debug!("Processing {} bundles", bundle_numbers.len()); 121 if let Some(ref count) = count { 122 log::debug!( 123 "Export limit: {} operations", 124 utils::format_number(*count as u64) 125 ); 126 } 127 } 128 129 // Display what is being exported (always visible unless quiet) 130 if !quiet { 131 if let Some(ref filter) = ops_to_export { 132 let op_count = filter.estimated_count(); 133 eprintln!("Exporting {} operations", utils::format_number(op_count)); 134 } else { 135 let bundle_range_str = format_bundle_range(&bundle_numbers); 136 eprintln!( 137 "Exporting bundles: {} ({})", 138 bundle_range_str, 139 bundle_numbers.len() 140 ); 141 } 142 } 143 144 // Open output with buffering 145 let writer: Box<dyn Write> = if let Some(output_path) = output { 146 Box::new(BufWriter::with_capacity( 147 1024 * 1024, 148 File::create(output_path)?, 149 )) 150 } else { 151 Box::new(BufWriter::with_capacity(1024 * 1024, io::stdout())) 152 }; 153 let mut writer = writer; 154 155 if !quiet { 156 log::info!("Exporting operations..."); 157 } 158 159 // Calculate total uncompressed and compressed sizes for progress tracking 160 let total_uncompressed_size = index.total_uncompressed_size_for_bundles(&bundle_numbers); 161 let total_compressed_size: u64 = bundle_numbers 162 .iter() 163 .filter_map(|bundle_num| { 164 index 165 .get_bundle(*bundle_num) 166 .map(|meta| meta.compressed_size) 167 }) 168 .sum(); 169 170 // Create progress bar tracking bundles processed 171 // Skip progress bar if quiet mode or if only one bundle needs to be loaded 172 let pb = if quiet || bundle_numbers.len() == 1 { 173 None 174 } else { 175 Some(ProgressBar::with_bytes( 176 bundle_numbers.len(), 177 total_uncompressed_size, 178 )) 179 }; 180 181 let start = Instant::now(); 182 let mut exported_count = 0; 183 let mut bundles_processed = 0; 184 let bytes_written = Arc::new(Mutex::new(0u64)); 185 let mut output_buffer = String::with_capacity(1024 * 1024); // 1MB buffer 186 const BATCH_SIZE: usize = 10000; 187 188 // Process bundles through BundleManager API (follows RULES.md) 189 for bundle_num in bundle_numbers { 190 // Check count limit 191 if let Some(limit) = count 192 && exported_count >= limit 193 { 194 break; 195 } 196 197 // Use BundleManager API to get decompressed stream 198 let decoder = match manager.stream_bundle_decompressed(bundle_num) { 199 Ok(decoder) => decoder, 200 Err(_) => { 201 // Bundle not found, skip it 202 bundles_processed += 1; 203 if let Some(ref pb) = pb { 204 let bytes = bytes_written.lock().unwrap(); 205 pb.set_with_bytes(bundles_processed, *bytes); 206 drop(bytes); 207 } 208 continue; 209 } 210 }; 211 let reader = BufReader::with_capacity(1024 * 1024, decoder); 212 213 // Collect lines from bundle with their positions 214 let mut bundle_lines = Vec::new(); 215 for (pos, line_result) in reader.lines().enumerate() { 216 let line = line_result?; 217 if line.is_empty() { 218 continue; 219 } 220 bundle_lines.push((pos, line)); 221 } 222 223 // Reverse lines if requested (preserve position information for filtering) 224 if reverse { 225 bundle_lines.reverse(); 226 } 227 228 // Write lines (filtering by operation position if --ops is specified) 229 for (pos, line) in bundle_lines { 230 // Check if this operation should be exported 231 if let Some(ref filter) = ops_to_export { 232 let global_pos = bundle_position_to_global(bundle_num, pos); 233 if !filter.contains(global_pos) { 234 continue; 235 } 236 } 237 238 // Check count limit 239 if let Some(limit) = count 240 && exported_count >= limit 241 { 242 break; 243 } 244 245 output_buffer.push_str(&line); 246 output_buffer.push('\n'); 247 exported_count += 1; 248 249 // Flush buffer when it gets large 250 if output_buffer.len() >= 1024 * 1024 { 251 let bytes = output_buffer.len(); 252 writer.write_all(output_buffer.as_bytes())?; 253 let mut bytes_guard = bytes_written.lock().unwrap(); 254 *bytes_guard += bytes as u64; 255 drop(bytes_guard); 256 output_buffer.clear(); 257 } 258 259 // Progress update (operations count in message, but bundles in progress bar) 260 if let Some(ref pb) = pb 261 && (exported_count % BATCH_SIZE == 0 || exported_count == 1) 262 { 263 let bytes = bytes_written.lock().unwrap(); 264 let total_bytes = *bytes + output_buffer.len() as u64; 265 drop(bytes); 266 pb.set_with_bytes(bundles_processed, total_bytes); 267 pb.set_message(format!( 268 "{} ops", 269 utils::format_number(exported_count as u64) 270 )); 271 } 272 } 273 274 // Update progress after processing each bundle 275 bundles_processed += 1; 276 if let Some(ref pb) = pb { 277 let bytes = bytes_written.lock().unwrap(); 278 let total_bytes = *bytes + output_buffer.len() as u64; 279 drop(bytes); 280 pb.set_with_bytes(bundles_processed, total_bytes); 281 pb.set_message(format!( 282 "{} ops", 283 utils::format_number(exported_count as u64) 284 )); 285 } 286 } 287 288 // Flush remaining buffer 289 if !output_buffer.is_empty() { 290 let bytes = output_buffer.len(); 291 writer.write_all(output_buffer.as_bytes())?; 292 let mut bytes_guard = bytes_written.lock().unwrap(); 293 *bytes_guard += bytes as u64; 294 drop(bytes_guard); 295 } 296 writer.flush()?; 297 298 // Final progress update 299 if let Some(ref pb) = pb { 300 let bytes = bytes_written.lock().unwrap(); 301 pb.set_with_bytes(bundles_processed, *bytes); 302 drop(bytes); 303 pb.finish(); 304 } 305 306 if !quiet { 307 let elapsed = start.elapsed(); 308 let elapsed_secs = elapsed.as_secs_f64(); 309 310 // Format duration with auto-scaling using utility function 311 let duration_str = format_std_duration_auto(elapsed); 312 313 let mut parts = Vec::new(); 314 parts.push(format!( 315 "Exported: {} ops", 316 utils::format_number(exported_count as u64) 317 )); 318 parts.push(format!("in {}", duration_str)); 319 320 if elapsed_secs > 0.0 { 321 // Calculate throughputs 322 let uncompressed_throughput_mb = 323 (total_uncompressed_size as f64 / elapsed_secs) / (1024.0 * 1024.0); 324 let compressed_throughput_mb = 325 (total_compressed_size as f64 / elapsed_secs) / (1024.0 * 1024.0); 326 327 parts.push(format!( 328 "({:.1} MB/s uncompressed, {:.1} MB/s compressed)", 329 uncompressed_throughput_mb, compressed_throughput_mb 330 )); 331 } 332 333 eprintln!("{}", parts.join(" ")); 334 } 335 336 Ok(()) 337} 338 339/// Format bundle numbers as a compact range string (e.g., "1-10", "1, 5, 10", "1-5, 10-15") 340/// For large lists, shows a simple min-max range to avoid verbose output 341fn format_bundle_range(bundles: &[u32]) -> String { 342 if bundles.is_empty() { 343 return String::new(); 344 } 345 if bundles.len() == 1 { 346 return bundles[0].to_string(); 347 } 348 349 // For large lists, just show min-max range 350 if bundles.len() > 50 { 351 let min = bundles.iter().min().copied().unwrap_or(0); 352 let max = bundles.iter().max().copied().unwrap_or(0); 353 return format!("{}-{}", min, max); 354 } 355 356 let mut ranges = Vec::new(); 357 let mut range_start = bundles[0]; 358 let mut range_end = bundles[0]; 359 360 for &bundle in bundles.iter().skip(1) { 361 if bundle == range_end + 1 { 362 range_end = bundle; 363 } else { 364 if range_start == range_end { 365 ranges.push(range_start.to_string()); 366 } else { 367 ranges.push(format!("{}-{}", range_start, range_end)); 368 } 369 range_start = bundle; 370 range_end = bundle; 371 } 372 } 373 374 // Add the last range 375 if range_start == range_end { 376 ranges.push(range_start.to_string()); 377 } else { 378 ranges.push(format!("{}-{}", range_start, range_end)); 379 } 380 381 let result = ranges.join(", "); 382 383 // If the formatted string is too long, fall back to min-max 384 if result.len() > 200 { 385 let min = bundles.iter().min().copied().unwrap_or(0); 386 let max = bundles.iter().max().copied().unwrap_or(0); 387 format!("{}-{}", min, max) 388 } else { 389 result 390 } 391} 392 393/// Efficient filter for operation ranges that avoids materializing large lists 394/// Stores ranges and individual operations separately for O(1) or O(log n) lookups 395struct OperationFilter { 396 /// Ranges of operations (inclusive start, inclusive end) 397 ranges: Vec<(u64, u64)>, 398 /// Individual operations (for non-range specs) 399 individual: HashSet<u64>, 400} 401 402impl OperationFilter { 403 /// Parse operation range specification without materializing all values 404 /// This is much more efficient for large ranges like "0-10000000" 405 fn parse(spec: &str, max_operation: u64) -> Result<Self> { 406 use anyhow::Context; 407 408 if max_operation == 0 { 409 anyhow::bail!("No operations available"); 410 } 411 412 let mut ranges = Vec::new(); 413 let mut individual = HashSet::new(); 414 415 for part in spec.split(',') { 416 let part = part.trim(); 417 if part.is_empty() { 418 continue; 419 } 420 421 if part.contains('-') { 422 let range: Vec<&str> = part.split('-').collect(); 423 if range.len() != 2 { 424 anyhow::bail!("Invalid range format: {}", part); 425 } 426 let start_str = range[0].trim(); 427 let end_str = range[1].trim(); 428 429 let start: u64 = start_str 430 .parse() 431 .with_context(|| format!("Invalid operation number: {}", start_str))?; 432 let end: u64 = end_str 433 .parse() 434 .with_context(|| format!("Invalid operation number: {}", end_str))?; 435 436 if start > end { 437 anyhow::bail!("Invalid range: {} > {} (start must be <= end)", start, end); 438 } 439 if start > max_operation || end > max_operation { 440 anyhow::bail!( 441 "Invalid range: {}-{} (exceeds maximum operation {})", 442 start, 443 end, 444 max_operation 445 ); 446 } 447 ranges.push((start, end)); 448 } else { 449 let num: u64 = part 450 .parse() 451 .with_context(|| format!("Invalid operation number: {}", part))?; 452 if num > max_operation { 453 anyhow::bail!( 454 "Operation number {} out of range (max: {})", 455 num, 456 max_operation 457 ); 458 } 459 individual.insert(num); 460 } 461 } 462 463 // Sort ranges for efficient lookup 464 ranges.sort_unstable(); 465 466 Ok(OperationFilter { ranges, individual }) 467 } 468 469 /// Check if a global operation position is included in the filter 470 fn contains(&self, global_pos: u64) -> bool { 471 // Check individual operations first (O(1)) 472 if self.individual.contains(&global_pos) { 473 return true; 474 } 475 476 // Check ranges (O(log n) with binary search, but we use linear for simplicity) 477 // For small number of ranges, linear is fine 478 for &(start, end) in &self.ranges { 479 if global_pos >= start && global_pos <= end { 480 return true; 481 } 482 } 483 484 false 485 } 486 487 /// Get bundle numbers that contain operations in this filter 488 /// This calculates bundles from range bounds without materializing all operations 489 fn get_bundle_numbers(&self, max_bundle: u32) -> Vec<u32> { 490 let mut bundle_set = HashSet::new(); 491 492 // Process ranges 493 for &(start, end) in &self.ranges { 494 // Convert start and end to bundle numbers 495 let (start_bundle, _) = global_to_bundle_position(start); 496 let (end_bundle, _) = global_to_bundle_position(end); 497 498 // Add all bundles in the range 499 for bundle in start_bundle..=end_bundle.min(max_bundle) { 500 bundle_set.insert(bundle); 501 } 502 } 503 504 // Process individual operations 505 for &op in &self.individual { 506 let (bundle, _) = global_to_bundle_position(op); 507 if bundle <= max_bundle { 508 bundle_set.insert(bundle); 509 } 510 } 511 512 let mut bundles: Vec<u32> = bundle_set.into_iter().collect(); 513 bundles.sort_unstable(); 514 bundles 515 } 516 517 /// Estimate the number of operations (for display purposes) 518 fn estimated_count(&self) -> u64 { 519 let range_count: u64 = self 520 .ranges 521 .iter() 522 .map(|&(start, end)| end.saturating_sub(start).saturating_add(1)) 523 .sum(); 524 let individual_count = self.individual.len() as u64; 525 range_count.saturating_add(individual_count) 526 } 527}