High-performance implementation of plcbundle written in Rust
at main 262 lines 8.3 kB view raw
1use anyhow::Result; 2use clap::{Args, ValueEnum, ValueHint}; 3use plcbundle::processor::{OutputHandler, Stats}; 4use plcbundle::*; 5use std::io::Write; 6use std::path::PathBuf; 7use std::sync::{Arc, Mutex}; 8use std::time::Instant; 9 10use super::progress::ProgressBar as CustomProgressBar; 11use super::utils; 12 13#[derive(Args)] 14#[command( 15 about = "Query bundles with JMESPath or simple path", 16 long_about = "Extract and filter operations from bundles using powerful query expressions. 17 18The command supports three query modes: 19 • auto - Automatically detect based on query syntax (default) 20 • simple - Fast path extraction (e.g., 'did', 'operation.type') 21 • jmespath - Full JMESPath expressions for complex filtering and transformation 22 23Simple path mode is optimized for fast single-field access, while JMESPath mode 24enables complex queries with filtering, projection, and transformation. The 25command automatically detects which mode to use based on the query syntax, or you 26can explicitly specify with --mode. 27 28Queries are executed in parallel across multiple bundles for maximum 29performance. Results are streamed as JSONL (one JSON object per line) by 30default, making it easy to pipe to other tools like jq or process in scripts. 31 32Use --json for pretty-printed JSON output, or --stats-only to see query 33statistics without outputting results.", 34 alias = "q", 35 help_template = crate::clap_help!( 36 examples: " # Simple path query (extract DID field)\n \ 37 {bin} query did\n\n \ 38 # Query specific bundles\n \ 39 {bin} query did --bundles 1-100\n\n \ 40 # JMESPath query (complex filtering)\n \ 41 {bin} query 'operation.type == \"create\"'\n\n \ 42 # Query with mode specification\n \ 43 {bin} query did --mode simple\n \ 44 {bin} query 'operation.type' --mode jmespath\n\n \ 45 # Output to file\n \ 46 {bin} query did -o results.jsonl\n\n \ 47 # Pretty-printed JSON output\n \ 48 {bin} query did --json\n\n \ 49 # Statistics only (no results)\n \ 50 {bin} query did --stats-only\n\n \ 51 # Using alias\n \ 52 {bin} q did --bundles latest:10" 53 ) 54)] 55pub struct QueryCommand { 56 /// Query expression (e.g., "did", "operation.type", etc.) 57 pub expression: String, 58 59 /// Bundle range (e.g., "1-10,15,20-25" or "latest:10" for last 10) 60 #[arg(short, long)] 61 pub bundles: Option<String>, 62 63 /// Number of threads (0 = auto) 64 #[arg(short = 'j', long, default_value = "0")] 65 pub threads: usize, 66 67 /// Query mode 68 #[arg(short = 'm', long, default_value = "auto")] 69 pub mode: QueryModeArg, 70 71 /// Batch size for output 72 #[arg(long, default_value = "2000")] 73 pub batch_size: usize, 74 75 /// Output as pretty-printed JSON (default: JSONL, one JSON object per line) 76 #[arg(long)] 77 pub json: bool, 78 79 /// Output file (default: stdout) 80 #[arg(short, long, value_hint = ValueHint::FilePath)] 81 pub output: Option<PathBuf>, 82 83 /// Show statistics only, don't output results 84 #[arg(long)] 85 pub stats_only: bool, 86} 87 88#[derive(Debug, Clone, ValueEnum)] 89pub enum QueryModeArg { 90 /// Auto-detect based on query 91 Auto, 92 /// Simple path mode (faster) 93 Simple, 94 /// JMESPath mode (flexible) 95 Jmespath, 96} 97 98pub struct StdoutHandler { 99 lock: Mutex<()>, 100 stats_only: bool, 101} 102 103pub fn run(cmd: QueryCommand, dir: PathBuf, quiet: bool, verbose: bool) -> Result<()> { 104 let expression = cmd.expression; 105 let bundles_spec = cmd.bundles; 106 let threads = cmd.threads; 107 let mode = cmd.mode; 108 let batch_size = cmd.batch_size; 109 let _output = cmd.output; 110 let stats_only = cmd.stats_only; 111 let num_threads = if threads == 0 { 112 std::thread::available_parallelism() 113 .map(|n| n.get()) 114 .unwrap_or(1) 115 } else { 116 threads 117 }; 118 119 // Auto-detect query mode 120 let query_mode = match mode { 121 QueryModeArg::Auto => { 122 if expression.contains('[') || expression.contains('|') || expression.contains('@') { 123 QueryMode::JmesPath 124 } else { 125 QueryMode::Simple 126 } 127 } 128 QueryModeArg::Simple => QueryMode::Simple, 129 QueryModeArg::Jmespath => QueryMode::JmesPath, 130 }; 131 132 let options = OptionsBuilder::new() 133 .directory(dir) 134 .query(expression.clone()) 135 .query_mode(query_mode) 136 .num_threads(num_threads) 137 .batch_size(batch_size) 138 .build(); 139 140 let processor = plcbundle::processor::Processor::new(options)?; 141 let index = processor.load_index()?; 142 143 if verbose && !quiet { 144 log::debug!("📦 Index: v{} ({})", index.version, index.origin); 145 log::debug!("📊 Total bundles: {}", index.last_bundle); 146 } 147 148 let bundle_numbers = utils::parse_bundle_spec(bundles_spec, index.last_bundle)?; 149 150 if !quiet { 151 let mode_str = match query_mode { 152 QueryMode::Simple => "simple", 153 QueryMode::JmesPath => "jmespath", 154 }; 155 log::debug!( 156 "🔍 Processing {} bundles | {} mode | {} threads\n", 157 bundle_numbers.len(), 158 mode_str, 159 num_threads 160 ); 161 } 162 163 // Calculate total uncompressed size for progress tracking 164 let total_uncompressed_size = index.total_uncompressed_size_for_bundles(&bundle_numbers); 165 166 let pb = if quiet { 167 None 168 } else { 169 Some(CustomProgressBar::with_bytes( 170 bundle_numbers.len(), 171 total_uncompressed_size, 172 )) 173 }; 174 175 let start = Instant::now(); 176 let output = Arc::new(StdoutHandler::new(stats_only)); 177 178 // Track bundle count separately since callback gives increment, not total 179 let bundle_count = Arc::new(Mutex::new(0usize)); 180 let pb_arc = pb.as_ref().map(|pb| Arc::new(Mutex::new(pb))); 181 182 let stats = processor.process( 183 &bundle_numbers, 184 output, 185 Some({ 186 let pb_arc = pb_arc.clone(); 187 let bundle_count = bundle_count.clone(); 188 move |_increment, stats: &Stats| { 189 if let Some(ref pb_mutex) = pb_arc { 190 let mut count = bundle_count.lock().unwrap(); 191 *count += 1; 192 let current_bundles = *count; 193 drop(count); 194 195 let pb = pb_mutex.lock().unwrap(); 196 197 // Update progress with bundles processed and bytes 198 pb.set_with_bytes(current_bundles, stats.total_bytes); 199 200 // Set message with matches 201 pb.set_message(format!( 202 "{} matches", 203 utils::format_number(stats.matches as u64) 204 )); 205 } 206 } 207 }), 208 )?; 209 210 if let Some(ref pb) = pb { 211 pb.finish(); 212 } 213 214 let elapsed = start.elapsed(); 215 let match_pct = if stats.operations > 0 { 216 (stats.matches as f64 / stats.operations as f64) * 100.0 217 } else { 218 0.0 219 }; 220 221 if !quiet { 222 log::info!("\n✅ Complete in {:.2}s", elapsed.as_secs_f64()); 223 log::info!( 224 " Operations: {} ({:.2}% matched)", 225 utils::format_number(stats.operations as u64), 226 match_pct 227 ); 228 log::info!( 229 " Data processed: {}", 230 utils::format_bytes(stats.total_bytes) 231 ); 232 if elapsed.as_secs_f64() > 0.0 { 233 log::info!( 234 " Throughput: {:.0} ops/sec | {}/s", 235 stats.operations as f64 / elapsed.as_secs_f64(), 236 utils::format_bytes((stats.total_bytes as f64 / elapsed.as_secs_f64()) as u64) 237 ); 238 } 239 } 240 241 Ok(()) 242} 243 244impl StdoutHandler { 245 pub fn new(stats_only: bool) -> Self { 246 Self { 247 lock: Mutex::new(()), 248 stats_only, 249 } 250 } 251} 252 253impl OutputHandler for StdoutHandler { 254 fn write_batch(&self, batch: &str) -> Result<()> { 255 if self.stats_only { 256 return Ok(()); 257 } 258 let _lock = self.lock.lock().unwrap(); 259 std::io::stdout().write_all(batch.as_bytes())?; 260 Ok(()) 261 } 262}