forked from
atscan.net/plcbundle-rs
High-performance implementation of plcbundle written in Rust
1use anyhow::Result;
2use clap::{Args, ValueHint};
3use std::collections::HashSet;
4use std::fs::File;
5use std::io::{self, BufRead, BufReader, BufWriter, Write};
6use std::path::PathBuf;
7use std::sync::Arc;
8use std::sync::Mutex;
9use std::time::Instant;
10
11use super::progress::ProgressBar;
12use super::utils;
13use plcbundle::constants::{
14 bundle_position_to_global, global_to_bundle_position, total_operations_from_bundles,
15};
16use plcbundle::format::format_std_duration_auto;
17
18#[derive(Args)]
19#[command(
20 about = "Export operations to stdout or file",
21 long_about = "Export operations from bundles as raw JSONL (JSON Lines) format, suitable
22for processing with standard Unix tools or importing into other systems.
23
24This command streams operations directly from compressed bundles without
25loading them fully into memory, making it efficient for large exports.
26Operations are written one per line in JSON format, preserving all original
27operation data including DID, CID, timestamps, and operation payloads.
28
29Use --bundles to export specific bundles or ranges, or --all to export
30everything. The --count flag limits the total number of operations exported,
31useful for sampling or testing. Output goes to stdout by default, or specify
32--output to write to a file.
33
34This is the primary way to extract raw operation data from the repository
35for analysis, backup, or migration to other systems.",
36 help_template = crate::clap_help!(
37 examples: " # Export all bundles to stdout\n \
38 {bin} export --all\n\n \
39 # Export specific bundles\n \
40 {bin} export 1-100\n \
41 {bin} export 42\n\n \
42 # Export to file\n \
43 {bin} export --all -o operations.jsonl\n\n \
44 # Export first 1000 operations\n \
45 {bin} export --all --count 1000\n\n \
46 # Export in reverse order\n \
47 {bin} export --all --reverse\n\n \
48 # Export specific operations by global position (0-indexed)\n \
49 {bin} export --ops 0-999\n \
50 {bin} export --ops 3255,553,0-9"
51 )
52)]
53pub struct ExportCommand {
54 /// Bundle range to export (e.g., "42", "1-100", or "1-10,20-30")
55 #[arg(value_name = "BUNDLES", conflicts_with_all = ["all", "ops"])]
56 pub bundles: Option<String>,
57
58 /// Export all bundles
59 #[arg(long, conflicts_with = "ops")]
60 pub all: bool,
61
62 /// Export specific operations by global position (0-indexed, e.g., "0-999", "3255,553,0-9")
63 #[arg(long, conflicts_with_all = ["all", "bundles"])]
64 pub ops: Option<String>,
65
66 /// Output file (default: stdout)
67 #[arg(short, long, value_hint = ValueHint::FilePath)]
68 pub output: Option<PathBuf>,
69
70 /// Limit number of operations to export
71 #[arg(short = 'n', long)]
72 pub count: Option<usize>,
73
74 /// Export in reverse order (bundles and operations within bundles)
75 #[arg(long)]
76 pub reverse: bool,
77}
78
79pub fn run(cmd: ExportCommand, dir: PathBuf, quiet: bool, verbose: bool) -> Result<()> {
80 let output = cmd.output;
81 let count = cmd.count;
82 let reverse = cmd.reverse;
83 // Create BundleManager (follows RULES.md - NO direct file access from CLI)
84 let manager = super::utils::create_manager(dir.clone(), verbose, quiet, false)?;
85 let index = manager.get_index();
86 let max_bundle = index.last_bundle;
87
88 // Determine what to export: bundles or specific operations
89 // Use a more efficient representation: store ranges and individual ops separately
90 // to avoid materializing millions of operations
91 let ops_to_export: Option<OperationFilter> = if let Some(ops_str) = cmd.ops {
92 // Calculate max operation number (0-indexed: (max_bundle * BUNDLE_SIZE) - 1)
93 // This is an upper bound; actual last operation may be lower if last bundle isn't full
94 let max_operation = total_operations_from_bundles(max_bundle).saturating_sub(1);
95 Some(OperationFilter::parse(&ops_str, max_operation)?)
96 } else {
97 None
98 };
99
100 // Determine bundle numbers to process
101 let mut bundle_numbers: Vec<u32> = if let Some(bundles_str) = cmd.bundles {
102 utils::parse_bundle_spec(Some(bundles_str), max_bundle)?
103 } else if cmd.all {
104 (1..=max_bundle).collect()
105 } else if let Some(ref filter) = ops_to_export {
106 // When exporting specific operations, calculate which bundles are needed
107 // from the operation ranges without materializing all operations
108 filter.get_bundle_numbers(max_bundle)
109 } else {
110 anyhow::bail!("Must specify either --bundles, --all, or --ops");
111 };
112
113 // Reverse bundle order if requested
114 if reverse {
115 bundle_numbers.reverse();
116 }
117
118 if verbose && !quiet {
119 log::debug!("Index: v{} ({})", index.version, index.origin);
120 log::debug!("Processing {} bundles", bundle_numbers.len());
121 if let Some(ref count) = count {
122 log::debug!(
123 "Export limit: {} operations",
124 utils::format_number(*count as u64)
125 );
126 }
127 }
128
129 // Display what is being exported (always visible unless quiet)
130 if !quiet {
131 if let Some(ref filter) = ops_to_export {
132 let op_count = filter.estimated_count();
133 eprintln!("Exporting {} operations", utils::format_number(op_count));
134 } else {
135 let bundle_range_str = format_bundle_range(&bundle_numbers);
136 eprintln!(
137 "Exporting bundles: {} ({})",
138 bundle_range_str,
139 bundle_numbers.len()
140 );
141 }
142 }
143
144 // Open output with buffering
145 let writer: Box<dyn Write> = if let Some(output_path) = output {
146 Box::new(BufWriter::with_capacity(
147 1024 * 1024,
148 File::create(output_path)?,
149 ))
150 } else {
151 Box::new(BufWriter::with_capacity(1024 * 1024, io::stdout()))
152 };
153 let mut writer = writer;
154
155 if !quiet {
156 log::info!("Exporting operations...");
157 }
158
159 // Calculate total uncompressed and compressed sizes for progress tracking
160 let total_uncompressed_size = index.total_uncompressed_size_for_bundles(&bundle_numbers);
161 let total_compressed_size: u64 = bundle_numbers
162 .iter()
163 .filter_map(|bundle_num| {
164 index
165 .get_bundle(*bundle_num)
166 .map(|meta| meta.compressed_size)
167 })
168 .sum();
169
170 // Create progress bar tracking bundles processed
171 // Skip progress bar if quiet mode or if only one bundle needs to be loaded
172 let pb = if quiet || bundle_numbers.len() == 1 {
173 None
174 } else {
175 Some(ProgressBar::with_bytes(
176 bundle_numbers.len(),
177 total_uncompressed_size,
178 ))
179 };
180
181 let start = Instant::now();
182 let mut exported_count = 0;
183 let mut bundles_processed = 0;
184 let bytes_written = Arc::new(Mutex::new(0u64));
185 let mut output_buffer = String::with_capacity(1024 * 1024); // 1MB buffer
186 const BATCH_SIZE: usize = 10000;
187
188 // Process bundles through BundleManager API (follows RULES.md)
189 for bundle_num in bundle_numbers {
190 // Check count limit
191 if let Some(limit) = count
192 && exported_count >= limit
193 {
194 break;
195 }
196
197 // Use BundleManager API to get decompressed stream
198 let decoder = match manager.stream_bundle_decompressed(bundle_num) {
199 Ok(decoder) => decoder,
200 Err(_) => {
201 // Bundle not found, skip it
202 bundles_processed += 1;
203 if let Some(ref pb) = pb {
204 let bytes = bytes_written.lock().unwrap();
205 pb.set_with_bytes(bundles_processed, *bytes);
206 drop(bytes);
207 }
208 continue;
209 }
210 };
211 let reader = BufReader::with_capacity(1024 * 1024, decoder);
212
213 // Collect lines from bundle with their positions
214 let mut bundle_lines = Vec::new();
215 for (pos, line_result) in reader.lines().enumerate() {
216 let line = line_result?;
217 if line.is_empty() {
218 continue;
219 }
220 bundle_lines.push((pos, line));
221 }
222
223 // Reverse lines if requested (preserve position information for filtering)
224 if reverse {
225 bundle_lines.reverse();
226 }
227
228 // Write lines (filtering by operation position if --ops is specified)
229 for (pos, line) in bundle_lines {
230 // Check if this operation should be exported
231 if let Some(ref filter) = ops_to_export {
232 let global_pos = bundle_position_to_global(bundle_num, pos);
233 if !filter.contains(global_pos) {
234 continue;
235 }
236 }
237
238 // Check count limit
239 if let Some(limit) = count
240 && exported_count >= limit
241 {
242 break;
243 }
244
245 output_buffer.push_str(&line);
246 output_buffer.push('\n');
247 exported_count += 1;
248
249 // Flush buffer when it gets large
250 if output_buffer.len() >= 1024 * 1024 {
251 let bytes = output_buffer.len();
252 writer.write_all(output_buffer.as_bytes())?;
253 let mut bytes_guard = bytes_written.lock().unwrap();
254 *bytes_guard += bytes as u64;
255 drop(bytes_guard);
256 output_buffer.clear();
257 }
258
259 // Progress update (operations count in message, but bundles in progress bar)
260 if let Some(ref pb) = pb
261 && (exported_count % BATCH_SIZE == 0 || exported_count == 1)
262 {
263 let bytes = bytes_written.lock().unwrap();
264 let total_bytes = *bytes + output_buffer.len() as u64;
265 drop(bytes);
266 pb.set_with_bytes(bundles_processed, total_bytes);
267 pb.set_message(format!(
268 "{} ops",
269 utils::format_number(exported_count as u64)
270 ));
271 }
272 }
273
274 // Update progress after processing each bundle
275 bundles_processed += 1;
276 if let Some(ref pb) = pb {
277 let bytes = bytes_written.lock().unwrap();
278 let total_bytes = *bytes + output_buffer.len() as u64;
279 drop(bytes);
280 pb.set_with_bytes(bundles_processed, total_bytes);
281 pb.set_message(format!(
282 "{} ops",
283 utils::format_number(exported_count as u64)
284 ));
285 }
286 }
287
288 // Flush remaining buffer
289 if !output_buffer.is_empty() {
290 let bytes = output_buffer.len();
291 writer.write_all(output_buffer.as_bytes())?;
292 let mut bytes_guard = bytes_written.lock().unwrap();
293 *bytes_guard += bytes as u64;
294 drop(bytes_guard);
295 }
296 writer.flush()?;
297
298 // Final progress update
299 if let Some(ref pb) = pb {
300 let bytes = bytes_written.lock().unwrap();
301 pb.set_with_bytes(bundles_processed, *bytes);
302 drop(bytes);
303 pb.finish();
304 }
305
306 if !quiet {
307 let elapsed = start.elapsed();
308 let elapsed_secs = elapsed.as_secs_f64();
309
310 // Format duration with auto-scaling using utility function
311 let duration_str = format_std_duration_auto(elapsed);
312
313 let mut parts = Vec::new();
314 parts.push(format!(
315 "Exported: {} ops",
316 utils::format_number(exported_count as u64)
317 ));
318 parts.push(format!("in {}", duration_str));
319
320 if elapsed_secs > 0.0 {
321 // Calculate throughputs
322 let uncompressed_throughput_mb =
323 (total_uncompressed_size as f64 / elapsed_secs) / (1024.0 * 1024.0);
324 let compressed_throughput_mb =
325 (total_compressed_size as f64 / elapsed_secs) / (1024.0 * 1024.0);
326
327 parts.push(format!(
328 "({:.1} MB/s uncompressed, {:.1} MB/s compressed)",
329 uncompressed_throughput_mb, compressed_throughput_mb
330 ));
331 }
332
333 eprintln!("{}", parts.join(" "));
334 }
335
336 Ok(())
337}
338
339/// Format bundle numbers as a compact range string (e.g., "1-10", "1, 5, 10", "1-5, 10-15")
340/// For large lists, shows a simple min-max range to avoid verbose output
341fn format_bundle_range(bundles: &[u32]) -> String {
342 if bundles.is_empty() {
343 return String::new();
344 }
345 if bundles.len() == 1 {
346 return bundles[0].to_string();
347 }
348
349 // For large lists, just show min-max range
350 if bundles.len() > 50 {
351 let min = bundles.iter().min().copied().unwrap_or(0);
352 let max = bundles.iter().max().copied().unwrap_or(0);
353 return format!("{}-{}", min, max);
354 }
355
356 let mut ranges = Vec::new();
357 let mut range_start = bundles[0];
358 let mut range_end = bundles[0];
359
360 for &bundle in bundles.iter().skip(1) {
361 if bundle == range_end + 1 {
362 range_end = bundle;
363 } else {
364 if range_start == range_end {
365 ranges.push(range_start.to_string());
366 } else {
367 ranges.push(format!("{}-{}", range_start, range_end));
368 }
369 range_start = bundle;
370 range_end = bundle;
371 }
372 }
373
374 // Add the last range
375 if range_start == range_end {
376 ranges.push(range_start.to_string());
377 } else {
378 ranges.push(format!("{}-{}", range_start, range_end));
379 }
380
381 let result = ranges.join(", ");
382
383 // If the formatted string is too long, fall back to min-max
384 if result.len() > 200 {
385 let min = bundles.iter().min().copied().unwrap_or(0);
386 let max = bundles.iter().max().copied().unwrap_or(0);
387 format!("{}-{}", min, max)
388 } else {
389 result
390 }
391}
392
393/// Efficient filter for operation ranges that avoids materializing large lists
394/// Stores ranges and individual operations separately for O(1) or O(log n) lookups
395struct OperationFilter {
396 /// Ranges of operations (inclusive start, inclusive end)
397 ranges: Vec<(u64, u64)>,
398 /// Individual operations (for non-range specs)
399 individual: HashSet<u64>,
400}
401
402impl OperationFilter {
403 /// Parse operation range specification without materializing all values
404 /// This is much more efficient for large ranges like "0-10000000"
405 fn parse(spec: &str, max_operation: u64) -> Result<Self> {
406 use anyhow::Context;
407
408 if max_operation == 0 {
409 anyhow::bail!("No operations available");
410 }
411
412 let mut ranges = Vec::new();
413 let mut individual = HashSet::new();
414
415 for part in spec.split(',') {
416 let part = part.trim();
417 if part.is_empty() {
418 continue;
419 }
420
421 if part.contains('-') {
422 let range: Vec<&str> = part.split('-').collect();
423 if range.len() != 2 {
424 anyhow::bail!("Invalid range format: {}", part);
425 }
426 let start_str = range[0].trim();
427 let end_str = range[1].trim();
428
429 let start: u64 = start_str
430 .parse()
431 .with_context(|| format!("Invalid operation number: {}", start_str))?;
432 let end: u64 = end_str
433 .parse()
434 .with_context(|| format!("Invalid operation number: {}", end_str))?;
435
436 if start > end {
437 anyhow::bail!("Invalid range: {} > {} (start must be <= end)", start, end);
438 }
439 if start > max_operation || end > max_operation {
440 anyhow::bail!(
441 "Invalid range: {}-{} (exceeds maximum operation {})",
442 start,
443 end,
444 max_operation
445 );
446 }
447 ranges.push((start, end));
448 } else {
449 let num: u64 = part
450 .parse()
451 .with_context(|| format!("Invalid operation number: {}", part))?;
452 if num > max_operation {
453 anyhow::bail!(
454 "Operation number {} out of range (max: {})",
455 num,
456 max_operation
457 );
458 }
459 individual.insert(num);
460 }
461 }
462
463 // Sort ranges for efficient lookup
464 ranges.sort_unstable();
465
466 Ok(OperationFilter { ranges, individual })
467 }
468
469 /// Check if a global operation position is included in the filter
470 fn contains(&self, global_pos: u64) -> bool {
471 // Check individual operations first (O(1))
472 if self.individual.contains(&global_pos) {
473 return true;
474 }
475
476 // Check ranges (O(log n) with binary search, but we use linear for simplicity)
477 // For small number of ranges, linear is fine
478 for &(start, end) in &self.ranges {
479 if global_pos >= start && global_pos <= end {
480 return true;
481 }
482 }
483
484 false
485 }
486
487 /// Get bundle numbers that contain operations in this filter
488 /// This calculates bundles from range bounds without materializing all operations
489 fn get_bundle_numbers(&self, max_bundle: u32) -> Vec<u32> {
490 let mut bundle_set = HashSet::new();
491
492 // Process ranges
493 for &(start, end) in &self.ranges {
494 // Convert start and end to bundle numbers
495 let (start_bundle, _) = global_to_bundle_position(start);
496 let (end_bundle, _) = global_to_bundle_position(end);
497
498 // Add all bundles in the range
499 for bundle in start_bundle..=end_bundle.min(max_bundle) {
500 bundle_set.insert(bundle);
501 }
502 }
503
504 // Process individual operations
505 for &op in &self.individual {
506 let (bundle, _) = global_to_bundle_position(op);
507 if bundle <= max_bundle {
508 bundle_set.insert(bundle);
509 }
510 }
511
512 let mut bundles: Vec<u32> = bundle_set.into_iter().collect();
513 bundles.sort_unstable();
514 bundles
515 }
516
517 /// Estimate the number of operations (for display purposes)
518 fn estimated_count(&self) -> u64 {
519 let range_count: u64 = self
520 .ranges
521 .iter()
522 .map(|&(start, end)| end.saturating_sub(start).saturating_add(1))
523 .sum();
524 let individual_count = self.individual.len() as u64;
525 range_count.saturating_add(individual_count)
526 }
527}