forked from
atscan.net/plcbundle-rs
High-performance implementation of plcbundle written in Rust
1use anyhow::Result;
2use clap::{Args, ValueEnum, ValueHint};
3use plcbundle::processor::{OutputHandler, Stats};
4use plcbundle::*;
5use std::io::Write;
6use std::path::PathBuf;
7use std::sync::{Arc, Mutex};
8use std::time::Instant;
9
10use super::progress::ProgressBar as CustomProgressBar;
11use super::utils;
12
13#[derive(Args)]
14#[command(
15 about = "Query bundles with JMESPath or simple path",
16 long_about = "Extract and filter operations from bundles using powerful query expressions.
17
18The command supports three query modes:
19 • auto - Automatically detect based on query syntax (default)
20 • simple - Fast path extraction (e.g., 'did', 'operation.type')
21 • jmespath - Full JMESPath expressions for complex filtering and transformation
22
23Simple path mode is optimized for fast single-field access, while JMESPath mode
24enables complex queries with filtering, projection, and transformation. The
25command automatically detects which mode to use based on the query syntax, or you
26can explicitly specify with --mode.
27
28Queries are executed in parallel across multiple bundles for maximum
29performance. Results are streamed as JSONL (one JSON object per line) by
30default, making it easy to pipe to other tools like jq or process in scripts.
31
32Use --json for pretty-printed JSON output, or --stats-only to see query
33statistics without outputting results.",
34 alias = "q",
35 help_template = crate::clap_help!(
36 examples: " # Simple path query (extract DID field)\n \
37 {bin} query did\n\n \
38 # Query specific bundles\n \
39 {bin} query did --bundles 1-100\n\n \
40 # JMESPath query (complex filtering)\n \
41 {bin} query 'operation.type == \"create\"'\n\n \
42 # Query with mode specification\n \
43 {bin} query did --mode simple\n \
44 {bin} query 'operation.type' --mode jmespath\n\n \
45 # Output to file\n \
46 {bin} query did -o results.jsonl\n\n \
47 # Pretty-printed JSON output\n \
48 {bin} query did --json\n\n \
49 # Statistics only (no results)\n \
50 {bin} query did --stats-only\n\n \
51 # Using alias\n \
52 {bin} q did --bundles latest:10"
53 )
54)]
55pub struct QueryCommand {
56 /// Query expression (e.g., "did", "operation.type", etc.)
57 pub expression: String,
58
59 /// Bundle range (e.g., "1-10,15,20-25" or "latest:10" for last 10)
60 #[arg(short, long)]
61 pub bundles: Option<String>,
62
63 /// Number of threads (0 = auto)
64 #[arg(short = 'j', long, default_value = "0")]
65 pub threads: usize,
66
67 /// Query mode
68 #[arg(short = 'm', long, default_value = "auto")]
69 pub mode: QueryModeArg,
70
71 /// Batch size for output
72 #[arg(long, default_value = "2000")]
73 pub batch_size: usize,
74
75 /// Output as pretty-printed JSON (default: JSONL, one JSON object per line)
76 #[arg(long)]
77 pub json: bool,
78
79 /// Output file (default: stdout)
80 #[arg(short, long, value_hint = ValueHint::FilePath)]
81 pub output: Option<PathBuf>,
82
83 /// Show statistics only, don't output results
84 #[arg(long)]
85 pub stats_only: bool,
86}
87
88#[derive(Debug, Clone, ValueEnum)]
89pub enum QueryModeArg {
90 /// Auto-detect based on query
91 Auto,
92 /// Simple path mode (faster)
93 Simple,
94 /// JMESPath mode (flexible)
95 Jmespath,
96}
97
98pub struct StdoutHandler {
99 lock: Mutex<()>,
100 stats_only: bool,
101}
102
103pub fn run(cmd: QueryCommand, dir: PathBuf, quiet: bool, verbose: bool) -> Result<()> {
104 let expression = cmd.expression;
105 let bundles_spec = cmd.bundles;
106 let threads = cmd.threads;
107 let mode = cmd.mode;
108 let batch_size = cmd.batch_size;
109 let _output = cmd.output;
110 let stats_only = cmd.stats_only;
111 let num_threads = if threads == 0 {
112 std::thread::available_parallelism()
113 .map(|n| n.get())
114 .unwrap_or(1)
115 } else {
116 threads
117 };
118
119 // Auto-detect query mode
120 let query_mode = match mode {
121 QueryModeArg::Auto => {
122 if expression.contains('[') || expression.contains('|') || expression.contains('@') {
123 QueryMode::JmesPath
124 } else {
125 QueryMode::Simple
126 }
127 }
128 QueryModeArg::Simple => QueryMode::Simple,
129 QueryModeArg::Jmespath => QueryMode::JmesPath,
130 };
131
132 let options = OptionsBuilder::new()
133 .directory(dir)
134 .query(expression.clone())
135 .query_mode(query_mode)
136 .num_threads(num_threads)
137 .batch_size(batch_size)
138 .build();
139
140 let processor = plcbundle::processor::Processor::new(options)?;
141 let index = processor.load_index()?;
142
143 if verbose && !quiet {
144 log::debug!("📦 Index: v{} ({})", index.version, index.origin);
145 log::debug!("📊 Total bundles: {}", index.last_bundle);
146 }
147
148 let bundle_numbers = utils::parse_bundle_spec(bundles_spec, index.last_bundle)?;
149
150 if !quiet {
151 let mode_str = match query_mode {
152 QueryMode::Simple => "simple",
153 QueryMode::JmesPath => "jmespath",
154 };
155 log::debug!(
156 "🔍 Processing {} bundles | {} mode | {} threads\n",
157 bundle_numbers.len(),
158 mode_str,
159 num_threads
160 );
161 }
162
163 // Calculate total uncompressed size for progress tracking
164 let total_uncompressed_size = index.total_uncompressed_size_for_bundles(&bundle_numbers);
165
166 let pb = if quiet {
167 None
168 } else {
169 Some(CustomProgressBar::with_bytes(
170 bundle_numbers.len(),
171 total_uncompressed_size,
172 ))
173 };
174
175 let start = Instant::now();
176 let output = Arc::new(StdoutHandler::new(stats_only));
177
178 // Track bundle count separately since callback gives increment, not total
179 let bundle_count = Arc::new(Mutex::new(0usize));
180 let pb_arc = pb.as_ref().map(|pb| Arc::new(Mutex::new(pb)));
181
182 let stats = processor.process(
183 &bundle_numbers,
184 output,
185 Some({
186 let pb_arc = pb_arc.clone();
187 let bundle_count = bundle_count.clone();
188 move |_increment, stats: &Stats| {
189 if let Some(ref pb_mutex) = pb_arc {
190 let mut count = bundle_count.lock().unwrap();
191 *count += 1;
192 let current_bundles = *count;
193 drop(count);
194
195 let pb = pb_mutex.lock().unwrap();
196
197 // Update progress with bundles processed and bytes
198 pb.set_with_bytes(current_bundles, stats.total_bytes);
199
200 // Set message with matches
201 pb.set_message(format!(
202 "✓ {} matches",
203 utils::format_number(stats.matches as u64)
204 ));
205 }
206 }
207 }),
208 )?;
209
210 if let Some(ref pb) = pb {
211 pb.finish();
212 }
213
214 let elapsed = start.elapsed();
215 let match_pct = if stats.operations > 0 {
216 (stats.matches as f64 / stats.operations as f64) * 100.0
217 } else {
218 0.0
219 };
220
221 if !quiet {
222 log::info!("\n✅ Complete in {:.2}s", elapsed.as_secs_f64());
223 log::info!(
224 " Operations: {} ({:.2}% matched)",
225 utils::format_number(stats.operations as u64),
226 match_pct
227 );
228 log::info!(
229 " Data processed: {}",
230 utils::format_bytes(stats.total_bytes)
231 );
232 if elapsed.as_secs_f64() > 0.0 {
233 log::info!(
234 " Throughput: {:.0} ops/sec | {}/s",
235 stats.operations as f64 / elapsed.as_secs_f64(),
236 utils::format_bytes((stats.total_bytes as f64 / elapsed.as_secs_f64()) as u64)
237 );
238 }
239 }
240
241 Ok(())
242}
243
244impl StdoutHandler {
245 pub fn new(stats_only: bool) -> Self {
246 Self {
247 lock: Mutex::new(()),
248 stats_only,
249 }
250 }
251}
252
253impl OutputHandler for StdoutHandler {
254 fn write_batch(&self, batch: &str) -> Result<()> {
255 if self.stats_only {
256 return Ok(());
257 }
258 let _lock = self.lock.lock().unwrap();
259 std::io::stdout().write_all(batch.as_bytes())?;
260 Ok(())
261 }
262}