forked from
atscan.net/plcbundle-rs
High-performance implementation of plcbundle written in Rust
1use indicatif::{ProgressBar as IndicatifProgressBar, ProgressStyle};
2use plcbundle::format::format_bytes_per_sec;
3use std::sync::Arc;
4use std::sync::Mutex;
5
6/// Progress bar wrapper around indicatif for displaying operation progress
7pub struct ProgressBar {
8 pb: IndicatifProgressBar,
9 show_bytes: bool,
10 current_bytes: Arc<Mutex<u64>>,
11}
12
13impl ProgressBar {
14 /// Create a new progress bar
15 /// This is used for Stage 2 (shard consolidation) where per_sec isn't meaningful
16 /// and we don't want indicatif to color things red based on slow progress detection
17 pub fn new(total: usize) -> Self {
18 let pb = IndicatifProgressBar::new(total as u64);
19 // Remove {per_sec} and {msg} from template since they're not meaningful for shard consolidation
20 // and indicatif may color it red when it detects slow progress
21 pb.set_style(
22 ProgressStyle::default_bar()
23 .template("{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos:.cyan}/{len:.cyan} | ETA: {eta}")
24 .unwrap()
25 .progress_chars("█▓▒░ "),
26 );
27
28 Self {
29 pb,
30 show_bytes: false,
31 current_bytes: Arc::new(Mutex::new(0)),
32 }
33 }
34
35 /// Create a progress bar with byte tracking
36 pub fn with_bytes(total: usize, _total_bytes: u64) -> Self {
37 let pb = IndicatifProgressBar::new(total as u64);
38
39 // Custom template that shows data rate calculated from our tracked bytes
40 // We'll calculate bytes/sec manually and include it in the message
41 pb.set_style(
42 ProgressStyle::default_bar()
43 .template("{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} | {msg} | ETA: {eta}")
44 .unwrap()
45 .progress_chars("█▓▒░ "),
46 );
47
48 Self {
49 pb,
50 show_bytes: true,
51 current_bytes: Arc::new(Mutex::new(0)),
52 }
53 }
54
55 /// Set current progress
56 pub fn set(&self, current: usize) {
57 self.pb.set_position(current as u64);
58 }
59
60 /// Set progress with exact byte tracking
61 pub fn set_with_bytes(&self, current: usize, bytes: u64) {
62 self.pb.set_position(current as u64);
63 let mut bytes_guard = self.current_bytes.lock().unwrap();
64 *bytes_guard = bytes;
65 let current_msg = self.pb.message().to_string();
66 drop(bytes_guard);
67
68 // Update message to include data rate, preserving user message if present
69 let elapsed = self.pb.elapsed().as_secs_f64();
70 let bytes_guard = self.current_bytes.lock().unwrap();
71 let bytes = *bytes_guard;
72 drop(bytes_guard);
73
74 let bytes_per_sec = if elapsed > 0.0 {
75 bytes as f64 / elapsed
76 } else {
77 0.0
78 };
79
80 // Extract user message (everything after " | " if present)
81 let user_msg = if let Some(pos) = current_msg.find(" | ") {
82 ¤t_msg[pos + 3..]
83 } else {
84 ""
85 };
86
87 let new_msg = if user_msg.is_empty() {
88 format_bytes_per_sec(bytes_per_sec)
89 } else {
90 format!("{} | {}", format_bytes_per_sec(bytes_per_sec), user_msg)
91 };
92 self.pb.set_message(new_msg);
93 }
94
95 pub fn set_message<S: Into<String>>(&self, msg: S) {
96 let msg_str: String = msg.into();
97 if self.show_bytes {
98 // If showing bytes, prepend data rate to the message
99 let elapsed = self.pb.elapsed().as_secs_f64();
100 let bytes_guard = self.current_bytes.lock().unwrap();
101 let bytes = *bytes_guard;
102 drop(bytes_guard);
103
104 let bytes_per_sec = if elapsed > 0.0 {
105 bytes as f64 / elapsed
106 } else {
107 0.0
108 };
109
110 let new_msg = if msg_str.is_empty() {
111 format_bytes_per_sec(bytes_per_sec)
112 } else {
113 format!("{} | {}", format_bytes_per_sec(bytes_per_sec), msg_str)
114 };
115 self.pb.set_message(new_msg);
116 } else {
117 self.pb.set_message(msg_str);
118 }
119 }
120
121 /// Finish the progress bar
122 pub fn finish(&self) {
123 self.pb.finish();
124 }
125}
126
127/// Manages a two-stage progress bar for DID index building
128/// Stage 1: Processing bundles (with byte tracking)
129/// Stage 2: Consolidating shards (simple count)
130pub struct TwoStageProgress {
131 stage1_progress: Arc<Mutex<Option<ProgressBar>>>,
132 stage2_progress: Arc<Mutex<Option<ProgressBar>>>,
133 stage2_started: Arc<Mutex<bool>>,
134 stage1_finished: Arc<Mutex<bool>>,
135 interrupted: Arc<std::sync::atomic::AtomicBool>,
136 last_bundle: u32,
137}
138
139impl TwoStageProgress {
140 /// Create a new two-stage progress bar
141 ///
142 /// # Arguments
143 /// * `last_bundle` - Total number of bundles to process in stage 1
144 /// * `total_bytes` - Total uncompressed bytes to process in stage 1
145 pub fn new(last_bundle: u32, total_bytes: u64) -> Self {
146 use std::sync::atomic::AtomicBool;
147
148 Self {
149 stage1_progress: Arc::new(Mutex::new(Some(ProgressBar::with_bytes(
150 last_bundle as usize,
151 total_bytes,
152 )))),
153 stage2_progress: Arc::new(Mutex::new(None)),
154 stage2_started: Arc::new(Mutex::new(false)),
155 stage1_finished: Arc::new(Mutex::new(false)),
156 interrupted: Arc::new(AtomicBool::new(false)),
157 last_bundle,
158 }
159 }
160
161 /// Get the interrupted flag (for passing to build_did_index)
162 pub fn interrupted(&self) -> Arc<std::sync::atomic::AtomicBool> {
163 self.interrupted.clone()
164 }
165
166 /// Create a callback function for build_did_index (signature: Fn(u32, u32, u64, u64))
167 pub fn callback_for_build_did_index(
168 &self,
169 ) -> impl Fn(u32, u32, u64, u64) + Send + Sync + 'static {
170 let progress = self.clone_for_callback();
171 move |current, total, bytes_processed, _total_bytes| {
172 progress.update_progress(current, total, bytes_processed);
173 }
174 }
175
176 /// Clone the necessary parts for use in callbacks
177 fn clone_for_callback(&self) -> TwoStageProgressCallback {
178 TwoStageProgressCallback {
179 stage1_progress: self.stage1_progress.clone(),
180 stage2_progress: self.stage2_progress.clone(),
181 stage2_started: self.stage2_started.clone(),
182 stage1_finished: self.stage1_finished.clone(),
183 interrupted: self.interrupted.clone(),
184 last_bundle: self.last_bundle,
185 }
186 }
187}
188
189impl TwoStageProgress {
190 /// Finish any remaining progress bars (call this after build completes)
191 pub fn finish(&self) {
192 if let Some(pb) = self.stage1_progress.lock().unwrap().take() {
193 pb.finish();
194 }
195 if let Some(pb) = self.stage2_progress.lock().unwrap().take() {
196 pb.finish();
197 }
198 }
199}
200
201/// Internal struct for callback closures (avoids lifetime issues)
202struct TwoStageProgressCallback {
203 stage1_progress: Arc<Mutex<Option<ProgressBar>>>,
204 stage2_progress: Arc<Mutex<Option<ProgressBar>>>,
205 stage2_started: Arc<Mutex<bool>>,
206 stage1_finished: Arc<Mutex<bool>>,
207 interrupted: Arc<std::sync::atomic::AtomicBool>,
208 last_bundle: u32,
209}
210
211impl TwoStageProgressCallback {
212 fn update_progress(&self, current: u32, total: u32, bytes_processed: u64) {
213 use std::sync::atomic::Ordering;
214
215 // Stop updating progress bar if interrupted
216 if self.interrupted.load(Ordering::Relaxed) {
217 return;
218 }
219
220 // Detect stage change: if total changes from bundle count to shard count (256), we're in stage 2
221 let is_stage_2 = total == 256 && current <= 256;
222
223 if is_stage_2 {
224 // Check if this is the first time we're entering stage 2
225 let mut started = self.stage2_started.lock().unwrap();
226 if !*started {
227 *started = true;
228 drop(started);
229
230 // Finish Stage 1 progress bar if not already finished
231 // (did_index.rs already printed empty line + Stage 2 header before this callback)
232 let mut finished = self.stage1_finished.lock().unwrap();
233 if !*finished {
234 *finished = true;
235 drop(finished);
236 if let Some(pb) = self.stage1_progress.lock().unwrap().take() {
237 pb.finish();
238 }
239 // Add extra newline after Stage 1 (did_index.rs already printed one before Stage 2 header)
240 eprintln!();
241 }
242
243 // Create new simple progress bar for Stage 2 (256 shards, no byte tracking)
244 let mut stage2_pb = self.stage2_progress.lock().unwrap();
245 *stage2_pb = Some(ProgressBar::new(256));
246 }
247
248 // Update Stage 2 progress bar (pos/len already shows the count, no message needed)
249 let mut stage2_pb_guard = self.stage2_progress.lock().unwrap();
250 if let Some(ref pb) = *stage2_pb_guard {
251 pb.set(current as usize);
252 // Don't set message - progress bar template will show pos/len without extra message
253
254 // Finish progress bar when stage 2 completes (256/256)
255 if current == 256
256 && let Some(pb) = stage2_pb_guard.take()
257 {
258 pb.finish();
259 }
260 }
261 } else {
262 // Stage 1: use byte tracking, no stage message in progress bar
263 // Check if Stage 1 is complete (current == total and total == last_bundle)
264 if current == total && total == self.last_bundle {
265 let mut finished = self.stage1_finished.lock().unwrap();
266 if !*finished {
267 *finished = true;
268 drop(finished);
269 // Finish Stage 1 progress bar and add extra newline
270 if let Some(pb) = self.stage1_progress.lock().unwrap().take() {
271 pb.finish();
272 }
273 eprintln!();
274 }
275 } else if let Some(ref pb) = *self.stage1_progress.lock().unwrap() {
276 pb.set_with_bytes(current as usize, bytes_processed);
277 // Don't set stage message - just show bytes/sec
278 }
279 }
280 }
281}