forked from
atscan.net/plcbundle-rs
High-performance implementation of plcbundle written in Rust
1// src/bin/commands/mempool.rs
2use super::utils;
3use super::utils::HasGlobalFlags;
4use anyhow::Result;
5use clap::{Args, Subcommand, ValueHint};
6use plcbundle::format::format_number;
7use plcbundle::{BundleManager, constants};
8use std::io::{self, Write};
9use std::path::Path;
10use std::path::PathBuf;
11
12#[derive(Args)]
13#[command(
14 about = "Manage mempool operations",
15 long_about = "Manage the mempool, which stores operations waiting to be bundled into the next
16bundle file. The mempool maintains strict chronological order and automatically
17validates operation consistency to ensure data integrity.
18
19Operations accumulate in the mempool until there are enough (10,000) to create a
20new bundle. The 'status' subcommand shows how many operations are currently in the
21mempool and whether a new bundle can be created. The 'dump' subcommand exports all
22mempool operations as JSONL for backup or analysis.
23
24The 'clear' subcommand removes all operations from the mempool, useful for resetting
25the state or removing invalid operations. This is typically done automatically when
26a bundle is created, but can be done manually if needed.
27
28Mempool operations are stored in a temporary file that gets converted to a bundle
29file when the threshold is reached. The mempool ensures operations are processed
30in the correct order and maintains consistency with the bundle chain.",
31 alias = "mp",
32 help_template = crate::clap_help!(
33 examples: " # Show mempool status\n \
34 {bin} mempool\n \
35 {bin} mempool status\n\n \
36 # Clear all operations\n \
37 {bin} mempool clear\n\n \
38 # Export operations as JSONL\n \
39 {bin} mempool dump\n \
40 {bin} mempool dump > operations.jsonl\n\n \
41 # Using alias\n \
42 {bin} mp status"
43 )
44)]
45pub struct MempoolCommand {
46 #[command(subcommand)]
47 pub command: Option<MempoolSubcommand>,
48}
49
50#[derive(Subcommand)]
51pub enum MempoolSubcommand {
52 /// Show mempool status
53 #[command(alias = "s", alias = "info")]
54 Status {
55 /// Show sample operations
56 #[arg(short, long)]
57 verbose: bool,
58 },
59
60 /// Clear all operations from mempool
61 #[command(alias = "c", alias = "reset")]
62 Clear {
63 /// Skip confirmation prompt
64 #[arg(short, long)]
65 force: bool,
66 },
67
68 /// Export mempool operations as JSONL
69 #[command(alias = "export", alias = "d")]
70 Dump {
71 /// Output file (default: stdout)
72 #[arg(short, long, value_hint = ValueHint::FilePath)]
73 output: Option<PathBuf>,
74 },
75}
76
77impl HasGlobalFlags for MempoolCommand {
78 fn verbose(&self) -> bool {
79 false
80 }
81 fn quiet(&self) -> bool {
82 false
83 }
84}
85
86pub fn run(cmd: MempoolCommand, dir: PathBuf, global_verbose: bool) -> Result<()> {
87 let manager = utils::create_manager(dir.clone(), global_verbose, false, false)?;
88
89 match cmd.command {
90 Some(MempoolSubcommand::Status { verbose }) => {
91 show_status(&manager, &dir, verbose || global_verbose)
92 }
93 Some(MempoolSubcommand::Clear { force }) => clear(&manager, &dir, force),
94 Some(MempoolSubcommand::Dump { output }) => dump(&manager, output),
95 None => {
96 // Default to status
97 show_status(&manager, &dir, global_verbose)
98 }
99 }
100}
101
102fn show_status(manager: &BundleManager, dir: &Path, verbose: bool) -> Result<()> {
103 manager.load_mempool()?;
104 let stats = manager.get_mempool_stats()?;
105
106 // Show mempool file location
107 let mempool_filename = format!(
108 "{}{:06}.jsonl",
109 constants::MEMPOOL_FILE_PREFIX,
110 stats.target_bundle
111 );
112 let mempool_path: PathBuf = utils::display_path(dir).join(mempool_filename);
113
114 println!("Mempool Status");
115 println!("══════════════\n");
116 println!(" Directory: {}", utils::display_path(dir).display());
117 println!(" Mempool File: {}", mempool_path.display());
118 println!(" Target bundle: {}", stats.target_bundle);
119 println!(
120 " Operations: {} / {}",
121 stats.count,
122 constants::BUNDLE_SIZE
123 );
124 println!(
125 " Min timestamp: {}\n",
126 stats.min_timestamp.format("%Y-%m-%d %H:%M:%S")
127 );
128
129 // Validation status
130 let validation_icon = if stats.validated { "✓" } else { "⚠️" };
131 println!(
132 " Validated: {} {}\n",
133 validation_icon, stats.validated
134 );
135
136 if stats.count > 0 {
137 // Size information
138 if let Some(size_bytes) = stats.size_bytes {
139 println!(" Size: {:.2} KB", size_bytes as f64 / 1024.0);
140 }
141
142 // Time range
143 if let Some(first_time) = stats.first_time {
144 println!(
145 " First op: {}",
146 first_time.format("%Y-%m-%d %H:%M:%S")
147 );
148 }
149 if let Some(last_time) = stats.last_time {
150 println!(
151 " Last op: {}",
152 last_time.format("%Y-%m-%d %H:%M:%S")
153 );
154 }
155
156 println!();
157
158 // Progress bar
159 let progress = (stats.count as f64 / constants::BUNDLE_SIZE as f64) * 100.0;
160 println!(
161 " Progress: {:.1}% ({}/{})",
162 progress,
163 stats.count,
164 constants::BUNDLE_SIZE
165 );
166
167 let bar_width = 40;
168 let filled =
169 ((bar_width as f64) * (stats.count as f64) / constants::BUNDLE_SIZE as f64) as usize;
170 let filled = filled.min(bar_width);
171 let bar = "█".repeat(filled) + &"░".repeat(bar_width - filled);
172 println!(" [{}]\n", bar);
173
174 // Bundle creation status
175 if stats.can_create_bundle {
176 println!(" ✓ Ready to create bundle");
177 } else {
178 let remaining = constants::BUNDLE_SIZE - stats.count;
179 println!(" Need {} more operations", format_number(remaining));
180 }
181 } else {
182 println!(" (empty)");
183 }
184
185 println!();
186
187 // Verbose: Show sample operations
188 if verbose && stats.count > 0 {
189 println!("Sample Operations (first 10)");
190 println!("────────────────────────────\n");
191
192 let ops = manager.get_mempool_operations()?;
193
194 let show_count = 10.min(ops.len());
195
196 for (i, op) in ops.iter().take(show_count).enumerate() {
197 println!(" {}. DID: {}", i + 1, op.did);
198 if let Some(ref cid) = op.cid {
199 println!(" CID: {}", cid);
200 }
201 println!(" Created: {}", op.created_at);
202 println!();
203 }
204
205 if ops.len() > show_count {
206 println!(" ... and {} more\n", ops.len() - show_count);
207 }
208 }
209
210 Ok(())
211}
212
213fn clear(manager: &BundleManager, dir: &Path, force: bool) -> Result<()> {
214 manager.load_mempool()?;
215 let stats = manager.get_mempool_stats()?;
216 let count = stats.count;
217
218 if count == 0 {
219 println!("Mempool is already empty");
220 return Ok(());
221 }
222
223 println!("Working in: {}\n", utils::display_path(dir).display());
224
225 if !force {
226 print!(
227 "⚠️ This will clear {} operations from the mempool.\nAre you sure? [y/N]: ",
228 count
229 );
230 io::stdout().flush()?;
231
232 let mut response = String::new();
233 io::stdin().read_line(&mut response)?;
234
235 if response.trim().to_lowercase() != "y" {
236 println!("Cancelled");
237 return Ok(());
238 }
239 }
240
241 manager.clear_mempool()?;
242
243 println!("\n✓ Mempool cleared ({} operations removed)", count);
244 Ok(())
245}
246
247fn dump(manager: &BundleManager, output: Option<PathBuf>) -> Result<()> {
248 manager.load_mempool()?;
249 let ops = manager.get_mempool_operations()?;
250
251 if ops.is_empty() {
252 eprintln!("Mempool is empty");
253 return Ok(());
254 }
255
256 // Determine output destination
257 let mut writer: Box<dyn Write> = if let Some(path) = output {
258 eprintln!("Exporting to: {}", path.display());
259 Box::new(std::fs::File::create(path)?)
260 } else {
261 Box::new(io::stdout())
262 };
263
264 // Write JSONL
265 for op in &ops {
266 let json = sonic_rs::to_string(op)?;
267 writeln!(writer, "{}", json)?;
268 }
269
270 eprintln!("Exported {} operations from mempool", ops.len());
271 Ok(())
272}