High-performance implementation of plcbundle written in Rust
at main 272 lines 8.5 kB view raw
1// src/bin/commands/mempool.rs 2use super::utils; 3use super::utils::HasGlobalFlags; 4use anyhow::Result; 5use clap::{Args, Subcommand, ValueHint}; 6use plcbundle::format::format_number; 7use plcbundle::{BundleManager, constants}; 8use std::io::{self, Write}; 9use std::path::Path; 10use std::path::PathBuf; 11 12#[derive(Args)] 13#[command( 14 about = "Manage mempool operations", 15 long_about = "Manage the mempool, which stores operations waiting to be bundled into the next 16bundle file. The mempool maintains strict chronological order and automatically 17validates operation consistency to ensure data integrity. 18 19Operations accumulate in the mempool until there are enough (10,000) to create a 20new bundle. The 'status' subcommand shows how many operations are currently in the 21mempool and whether a new bundle can be created. The 'dump' subcommand exports all 22mempool operations as JSONL for backup or analysis. 23 24The 'clear' subcommand removes all operations from the mempool, useful for resetting 25the state or removing invalid operations. This is typically done automatically when 26a bundle is created, but can be done manually if needed. 27 28Mempool operations are stored in a temporary file that gets converted to a bundle 29file when the threshold is reached. The mempool ensures operations are processed 30in the correct order and maintains consistency with the bundle chain.", 31 alias = "mp", 32 help_template = crate::clap_help!( 33 examples: " # Show mempool status\n \ 34 {bin} mempool\n \ 35 {bin} mempool status\n\n \ 36 # Clear all operations\n \ 37 {bin} mempool clear\n\n \ 38 # Export operations as JSONL\n \ 39 {bin} mempool dump\n \ 40 {bin} mempool dump > operations.jsonl\n\n \ 41 # Using alias\n \ 42 {bin} mp status" 43 ) 44)] 45pub struct MempoolCommand { 46 #[command(subcommand)] 47 pub command: Option<MempoolSubcommand>, 48} 49 50#[derive(Subcommand)] 51pub enum MempoolSubcommand { 52 /// Show mempool status 53 #[command(alias = "s", alias = "info")] 54 Status { 55 /// Show sample operations 56 #[arg(short, long)] 57 verbose: bool, 58 }, 59 60 /// Clear all operations from mempool 61 #[command(alias = "c", alias = "reset")] 62 Clear { 63 /// Skip confirmation prompt 64 #[arg(short, long)] 65 force: bool, 66 }, 67 68 /// Export mempool operations as JSONL 69 #[command(alias = "export", alias = "d")] 70 Dump { 71 /// Output file (default: stdout) 72 #[arg(short, long, value_hint = ValueHint::FilePath)] 73 output: Option<PathBuf>, 74 }, 75} 76 77impl HasGlobalFlags for MempoolCommand { 78 fn verbose(&self) -> bool { 79 false 80 } 81 fn quiet(&self) -> bool { 82 false 83 } 84} 85 86pub fn run(cmd: MempoolCommand, dir: PathBuf, global_verbose: bool) -> Result<()> { 87 let manager = utils::create_manager(dir.clone(), global_verbose, false, false)?; 88 89 match cmd.command { 90 Some(MempoolSubcommand::Status { verbose }) => { 91 show_status(&manager, &dir, verbose || global_verbose) 92 } 93 Some(MempoolSubcommand::Clear { force }) => clear(&manager, &dir, force), 94 Some(MempoolSubcommand::Dump { output }) => dump(&manager, output), 95 None => { 96 // Default to status 97 show_status(&manager, &dir, global_verbose) 98 } 99 } 100} 101 102fn show_status(manager: &BundleManager, dir: &Path, verbose: bool) -> Result<()> { 103 manager.load_mempool()?; 104 let stats = manager.get_mempool_stats()?; 105 106 // Show mempool file location 107 let mempool_filename = format!( 108 "{}{:06}.jsonl", 109 constants::MEMPOOL_FILE_PREFIX, 110 stats.target_bundle 111 ); 112 let mempool_path: PathBuf = utils::display_path(dir).join(mempool_filename); 113 114 println!("Mempool Status"); 115 println!("══════════════\n"); 116 println!(" Directory: {}", utils::display_path(dir).display()); 117 println!(" Mempool File: {}", mempool_path.display()); 118 println!(" Target bundle: {}", stats.target_bundle); 119 println!( 120 " Operations: {} / {}", 121 stats.count, 122 constants::BUNDLE_SIZE 123 ); 124 println!( 125 " Min timestamp: {}\n", 126 stats.min_timestamp.format("%Y-%m-%d %H:%M:%S") 127 ); 128 129 // Validation status 130 let validation_icon = if stats.validated { "" } else { "⚠️" }; 131 println!( 132 " Validated: {} {}\n", 133 validation_icon, stats.validated 134 ); 135 136 if stats.count > 0 { 137 // Size information 138 if let Some(size_bytes) = stats.size_bytes { 139 println!(" Size: {:.2} KB", size_bytes as f64 / 1024.0); 140 } 141 142 // Time range 143 if let Some(first_time) = stats.first_time { 144 println!( 145 " First op: {}", 146 first_time.format("%Y-%m-%d %H:%M:%S") 147 ); 148 } 149 if let Some(last_time) = stats.last_time { 150 println!( 151 " Last op: {}", 152 last_time.format("%Y-%m-%d %H:%M:%S") 153 ); 154 } 155 156 println!(); 157 158 // Progress bar 159 let progress = (stats.count as f64 / constants::BUNDLE_SIZE as f64) * 100.0; 160 println!( 161 " Progress: {:.1}% ({}/{})", 162 progress, 163 stats.count, 164 constants::BUNDLE_SIZE 165 ); 166 167 let bar_width = 40; 168 let filled = 169 ((bar_width as f64) * (stats.count as f64) / constants::BUNDLE_SIZE as f64) as usize; 170 let filled = filled.min(bar_width); 171 let bar = "".repeat(filled) + &"".repeat(bar_width - filled); 172 println!(" [{}]\n", bar); 173 174 // Bundle creation status 175 if stats.can_create_bundle { 176 println!(" ✓ Ready to create bundle"); 177 } else { 178 let remaining = constants::BUNDLE_SIZE - stats.count; 179 println!(" Need {} more operations", format_number(remaining)); 180 } 181 } else { 182 println!(" (empty)"); 183 } 184 185 println!(); 186 187 // Verbose: Show sample operations 188 if verbose && stats.count > 0 { 189 println!("Sample Operations (first 10)"); 190 println!("────────────────────────────\n"); 191 192 let ops = manager.get_mempool_operations()?; 193 194 let show_count = 10.min(ops.len()); 195 196 for (i, op) in ops.iter().take(show_count).enumerate() { 197 println!(" {}. DID: {}", i + 1, op.did); 198 if let Some(ref cid) = op.cid { 199 println!(" CID: {}", cid); 200 } 201 println!(" Created: {}", op.created_at); 202 println!(); 203 } 204 205 if ops.len() > show_count { 206 println!(" ... and {} more\n", ops.len() - show_count); 207 } 208 } 209 210 Ok(()) 211} 212 213fn clear(manager: &BundleManager, dir: &Path, force: bool) -> Result<()> { 214 manager.load_mempool()?; 215 let stats = manager.get_mempool_stats()?; 216 let count = stats.count; 217 218 if count == 0 { 219 println!("Mempool is already empty"); 220 return Ok(()); 221 } 222 223 println!("Working in: {}\n", utils::display_path(dir).display()); 224 225 if !force { 226 print!( 227 "⚠️ This will clear {} operations from the mempool.\nAre you sure? [y/N]: ", 228 count 229 ); 230 io::stdout().flush()?; 231 232 let mut response = String::new(); 233 io::stdin().read_line(&mut response)?; 234 235 if response.trim().to_lowercase() != "y" { 236 println!("Cancelled"); 237 return Ok(()); 238 } 239 } 240 241 manager.clear_mempool()?; 242 243 println!("\n✓ Mempool cleared ({} operations removed)", count); 244 Ok(()) 245} 246 247fn dump(manager: &BundleManager, output: Option<PathBuf>) -> Result<()> { 248 manager.load_mempool()?; 249 let ops = manager.get_mempool_operations()?; 250 251 if ops.is_empty() { 252 eprintln!("Mempool is empty"); 253 return Ok(()); 254 } 255 256 // Determine output destination 257 let mut writer: Box<dyn Write> = if let Some(path) = output { 258 eprintln!("Exporting to: {}", path.display()); 259 Box::new(std::fs::File::create(path)?) 260 } else { 261 Box::new(io::stdout()) 262 }; 263 264 // Write JSONL 265 for op in &ops { 266 let json = sonic_rs::to_string(op)?; 267 writeln!(writer, "{}", json)?; 268 } 269 270 eprintln!("Exported {} operations from mempool", ops.len()); 271 Ok(()) 272}