High-performance implementation of plcbundle written in Rust
at main 281 lines 10 kB view raw
1use indicatif::{ProgressBar as IndicatifProgressBar, ProgressStyle}; 2use plcbundle::format::format_bytes_per_sec; 3use std::sync::Arc; 4use std::sync::Mutex; 5 6/// Progress bar wrapper around indicatif for displaying operation progress 7pub struct ProgressBar { 8 pb: IndicatifProgressBar, 9 show_bytes: bool, 10 current_bytes: Arc<Mutex<u64>>, 11} 12 13impl ProgressBar { 14 /// Create a new progress bar 15 /// This is used for Stage 2 (shard consolidation) where per_sec isn't meaningful 16 /// and we don't want indicatif to color things red based on slow progress detection 17 pub fn new(total: usize) -> Self { 18 let pb = IndicatifProgressBar::new(total as u64); 19 // Remove {per_sec} and {msg} from template since they're not meaningful for shard consolidation 20 // and indicatif may color it red when it detects slow progress 21 pb.set_style( 22 ProgressStyle::default_bar() 23 .template("{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos:.cyan}/{len:.cyan} | ETA: {eta}") 24 .unwrap() 25 .progress_chars("█▓▒░ "), 26 ); 27 28 Self { 29 pb, 30 show_bytes: false, 31 current_bytes: Arc::new(Mutex::new(0)), 32 } 33 } 34 35 /// Create a progress bar with byte tracking 36 pub fn with_bytes(total: usize, _total_bytes: u64) -> Self { 37 let pb = IndicatifProgressBar::new(total as u64); 38 39 // Custom template that shows data rate calculated from our tracked bytes 40 // We'll calculate bytes/sec manually and include it in the message 41 pb.set_style( 42 ProgressStyle::default_bar() 43 .template("{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} | {msg} | ETA: {eta}") 44 .unwrap() 45 .progress_chars("█▓▒░ "), 46 ); 47 48 Self { 49 pb, 50 show_bytes: true, 51 current_bytes: Arc::new(Mutex::new(0)), 52 } 53 } 54 55 /// Set current progress 56 pub fn set(&self, current: usize) { 57 self.pb.set_position(current as u64); 58 } 59 60 /// Set progress with exact byte tracking 61 pub fn set_with_bytes(&self, current: usize, bytes: u64) { 62 self.pb.set_position(current as u64); 63 let mut bytes_guard = self.current_bytes.lock().unwrap(); 64 *bytes_guard = bytes; 65 let current_msg = self.pb.message().to_string(); 66 drop(bytes_guard); 67 68 // Update message to include data rate, preserving user message if present 69 let elapsed = self.pb.elapsed().as_secs_f64(); 70 let bytes_guard = self.current_bytes.lock().unwrap(); 71 let bytes = *bytes_guard; 72 drop(bytes_guard); 73 74 let bytes_per_sec = if elapsed > 0.0 { 75 bytes as f64 / elapsed 76 } else { 77 0.0 78 }; 79 80 // Extract user message (everything after " | " if present) 81 let user_msg = if let Some(pos) = current_msg.find(" | ") { 82 &current_msg[pos + 3..] 83 } else { 84 "" 85 }; 86 87 let new_msg = if user_msg.is_empty() { 88 format_bytes_per_sec(bytes_per_sec) 89 } else { 90 format!("{} | {}", format_bytes_per_sec(bytes_per_sec), user_msg) 91 }; 92 self.pb.set_message(new_msg); 93 } 94 95 pub fn set_message<S: Into<String>>(&self, msg: S) { 96 let msg_str: String = msg.into(); 97 if self.show_bytes { 98 // If showing bytes, prepend data rate to the message 99 let elapsed = self.pb.elapsed().as_secs_f64(); 100 let bytes_guard = self.current_bytes.lock().unwrap(); 101 let bytes = *bytes_guard; 102 drop(bytes_guard); 103 104 let bytes_per_sec = if elapsed > 0.0 { 105 bytes as f64 / elapsed 106 } else { 107 0.0 108 }; 109 110 let new_msg = if msg_str.is_empty() { 111 format_bytes_per_sec(bytes_per_sec) 112 } else { 113 format!("{} | {}", format_bytes_per_sec(bytes_per_sec), msg_str) 114 }; 115 self.pb.set_message(new_msg); 116 } else { 117 self.pb.set_message(msg_str); 118 } 119 } 120 121 /// Finish the progress bar 122 pub fn finish(&self) { 123 self.pb.finish(); 124 } 125} 126 127/// Manages a two-stage progress bar for DID index building 128/// Stage 1: Processing bundles (with byte tracking) 129/// Stage 2: Consolidating shards (simple count) 130pub struct TwoStageProgress { 131 stage1_progress: Arc<Mutex<Option<ProgressBar>>>, 132 stage2_progress: Arc<Mutex<Option<ProgressBar>>>, 133 stage2_started: Arc<Mutex<bool>>, 134 stage1_finished: Arc<Mutex<bool>>, 135 interrupted: Arc<std::sync::atomic::AtomicBool>, 136 last_bundle: u32, 137} 138 139impl TwoStageProgress { 140 /// Create a new two-stage progress bar 141 /// 142 /// # Arguments 143 /// * `last_bundle` - Total number of bundles to process in stage 1 144 /// * `total_bytes` - Total uncompressed bytes to process in stage 1 145 pub fn new(last_bundle: u32, total_bytes: u64) -> Self { 146 use std::sync::atomic::AtomicBool; 147 148 Self { 149 stage1_progress: Arc::new(Mutex::new(Some(ProgressBar::with_bytes( 150 last_bundle as usize, 151 total_bytes, 152 )))), 153 stage2_progress: Arc::new(Mutex::new(None)), 154 stage2_started: Arc::new(Mutex::new(false)), 155 stage1_finished: Arc::new(Mutex::new(false)), 156 interrupted: Arc::new(AtomicBool::new(false)), 157 last_bundle, 158 } 159 } 160 161 /// Get the interrupted flag (for passing to build_did_index) 162 pub fn interrupted(&self) -> Arc<std::sync::atomic::AtomicBool> { 163 self.interrupted.clone() 164 } 165 166 /// Create a callback function for build_did_index (signature: Fn(u32, u32, u64, u64)) 167 pub fn callback_for_build_did_index( 168 &self, 169 ) -> impl Fn(u32, u32, u64, u64) + Send + Sync + 'static { 170 let progress = self.clone_for_callback(); 171 move |current, total, bytes_processed, _total_bytes| { 172 progress.update_progress(current, total, bytes_processed); 173 } 174 } 175 176 /// Clone the necessary parts for use in callbacks 177 fn clone_for_callback(&self) -> TwoStageProgressCallback { 178 TwoStageProgressCallback { 179 stage1_progress: self.stage1_progress.clone(), 180 stage2_progress: self.stage2_progress.clone(), 181 stage2_started: self.stage2_started.clone(), 182 stage1_finished: self.stage1_finished.clone(), 183 interrupted: self.interrupted.clone(), 184 last_bundle: self.last_bundle, 185 } 186 } 187} 188 189impl TwoStageProgress { 190 /// Finish any remaining progress bars (call this after build completes) 191 pub fn finish(&self) { 192 if let Some(pb) = self.stage1_progress.lock().unwrap().take() { 193 pb.finish(); 194 } 195 if let Some(pb) = self.stage2_progress.lock().unwrap().take() { 196 pb.finish(); 197 } 198 } 199} 200 201/// Internal struct for callback closures (avoids lifetime issues) 202struct TwoStageProgressCallback { 203 stage1_progress: Arc<Mutex<Option<ProgressBar>>>, 204 stage2_progress: Arc<Mutex<Option<ProgressBar>>>, 205 stage2_started: Arc<Mutex<bool>>, 206 stage1_finished: Arc<Mutex<bool>>, 207 interrupted: Arc<std::sync::atomic::AtomicBool>, 208 last_bundle: u32, 209} 210 211impl TwoStageProgressCallback { 212 fn update_progress(&self, current: u32, total: u32, bytes_processed: u64) { 213 use std::sync::atomic::Ordering; 214 215 // Stop updating progress bar if interrupted 216 if self.interrupted.load(Ordering::Relaxed) { 217 return; 218 } 219 220 // Detect stage change: if total changes from bundle count to shard count (256), we're in stage 2 221 let is_stage_2 = total == 256 && current <= 256; 222 223 if is_stage_2 { 224 // Check if this is the first time we're entering stage 2 225 let mut started = self.stage2_started.lock().unwrap(); 226 if !*started { 227 *started = true; 228 drop(started); 229 230 // Finish Stage 1 progress bar if not already finished 231 // (did_index.rs already printed empty line + Stage 2 header before this callback) 232 let mut finished = self.stage1_finished.lock().unwrap(); 233 if !*finished { 234 *finished = true; 235 drop(finished); 236 if let Some(pb) = self.stage1_progress.lock().unwrap().take() { 237 pb.finish(); 238 } 239 // Add extra newline after Stage 1 (did_index.rs already printed one before Stage 2 header) 240 eprintln!(); 241 } 242 243 // Create new simple progress bar for Stage 2 (256 shards, no byte tracking) 244 let mut stage2_pb = self.stage2_progress.lock().unwrap(); 245 *stage2_pb = Some(ProgressBar::new(256)); 246 } 247 248 // Update Stage 2 progress bar (pos/len already shows the count, no message needed) 249 let mut stage2_pb_guard = self.stage2_progress.lock().unwrap(); 250 if let Some(ref pb) = *stage2_pb_guard { 251 pb.set(current as usize); 252 // Don't set message - progress bar template will show pos/len without extra message 253 254 // Finish progress bar when stage 2 completes (256/256) 255 if current == 256 256 && let Some(pb) = stage2_pb_guard.take() 257 { 258 pb.finish(); 259 } 260 } 261 } else { 262 // Stage 1: use byte tracking, no stage message in progress bar 263 // Check if Stage 1 is complete (current == total and total == last_bundle) 264 if current == total && total == self.last_bundle { 265 let mut finished = self.stage1_finished.lock().unwrap(); 266 if !*finished { 267 *finished = true; 268 drop(finished); 269 // Finish Stage 1 progress bar and add extra newline 270 if let Some(pb) = self.stage1_progress.lock().unwrap().take() { 271 pb.finish(); 272 } 273 eprintln!(); 274 } 275 } else if let Some(ref pb) = *self.stage1_progress.lock().unwrap() { 276 pb.set_with_bytes(current as usize, bytes_processed); 277 // Don't set stage message - just show bytes/sec 278 } 279 } 280 } 281}