forked from
atscan.net/plcbundle-rs
High-performance implementation of plcbundle written in Rust
1//! PLC synchronization: events and logger, boundary-CID deduplication, one-shot and continuous modes, and robust error/backoff handling
2
3// Sync module - PLC directory synchronization
4use crate::constants;
5use crate::operations::Operation;
6use crate::plc_client::PLCClient;
7use anyhow::Result;
8use serde::Deserialize;
9use sonic_rs::{JsonValueTrait, Value};
10use std::any::Any;
11use std::collections::HashSet;
12use std::sync::{Arc, Mutex};
13use std::time::Duration;
14
15#[derive(Debug, Deserialize)]
16pub struct PLCOperation {
17 did: String,
18 operation: Value,
19 cid: String,
20 #[serde(default)]
21 nullified: Option<Value>,
22 #[serde(rename = "createdAt")]
23 created_at: String,
24
25 #[serde(skip)]
26 pub raw_json: Option<String>,
27}
28
29impl From<PLCOperation> for Operation {
30 fn from(plc: PLCOperation) -> Self {
31 let is_nullified = plc.nullified.as_ref().is_some_and(|v| {
32 v.as_bool().unwrap_or(false) || v.as_str().is_some_and(|s| !s.is_empty())
33 });
34
35 Self {
36 did: plc.did,
37 operation: plc.operation,
38 cid: Some(plc.cid),
39 nullified: is_nullified,
40 created_at: plc.created_at,
41 extra: sonic_rs::from_str("null").unwrap_or_else(|_| Value::new()),
42 raw_json: plc.raw_json,
43 }
44 }
45}
46
47// ============================================================================
48// Boundary CID Logic (CRITICAL for deduplication)
49// ============================================================================
50
51/// Get CIDs that share the same timestamp as the last operation
52pub fn get_boundary_cids(operations: &[Operation]) -> HashSet<String> {
53 if operations.is_empty() {
54 return HashSet::new();
55 }
56
57 let last_time = &operations.last().unwrap().created_at;
58 operations
59 .iter()
60 .rev()
61 .take_while(|op| &op.created_at == last_time)
62 .filter_map(|op| op.cid.clone())
63 .collect()
64}
65
66/// Strip operations that match previous bundle's boundary CIDs
67pub fn strip_boundary_duplicates(
68 mut operations: Vec<Operation>,
69 prev_boundary: &HashSet<String>,
70) -> Vec<Operation> {
71 if prev_boundary.is_empty() {
72 return operations;
73 }
74
75 operations.retain(|op| {
76 op.cid
77 .as_ref()
78 .is_none_or(|cid| !prev_boundary.contains(cid))
79 });
80
81 operations
82}
83
84// ============================================================================
85// Sync Events
86// ============================================================================
87
88#[derive(Debug, Clone)]
89pub enum SyncEvent {
90 BundleCreated {
91 bundle_num: u32,
92 hash: String,
93 age: String,
94 fetch_duration_ms: u64,
95 bundle_save_ms: u64,
96 index_ms: u64,
97 total_duration_ms: u64,
98 fetch_requests: usize,
99 did_index_compacted: bool,
100
101 unique_dids: u32,
102 size_bytes: u64,
103 fetch_wait_ms: u64,
104 fetch_http_ms: u64,
105 },
106 CaughtUp {
107 next_bundle: u32,
108 mempool_count: usize,
109 new_ops: usize,
110 fetch_duration_ms: u64,
111 },
112 InitialSyncComplete {
113 total_bundles: u32,
114 mempool_count: usize,
115 },
116 Error {
117 error: String,
118 },
119}
120
121// ============================================================================
122// Sync Configuration
123// ============================================================================
124
125#[derive(Debug)]
126pub struct SyncConfig {
127 pub plc_url: String,
128 pub continuous: bool,
129 pub interval: Duration,
130 pub max_bundles: usize,
131 pub verbose: bool,
132 pub shutdown_rx: Option<tokio::sync::watch::Receiver<bool>>,
133 pub shutdown_tx: Option<tokio::sync::watch::Sender<bool>>,
134 pub fetch_log: bool,
135 pub safety_lag: Duration,
136}
137
138impl Default for SyncConfig {
139 fn default() -> Self {
140 Self {
141 plc_url: constants::DEFAULT_PLC_DIRECTORY_URL.to_string(),
142 continuous: false,
143 interval: Duration::from_secs(60),
144 max_bundles: 0,
145 verbose: false,
146 shutdown_rx: None,
147 shutdown_tx: None,
148 fetch_log: false,
149 safety_lag: Duration::from_millis(crate::constants::DEFAULT_SAFETY_LAG_MS),
150 }
151 }
152}
153
154#[derive(Debug, Default)]
155pub struct SyncStats {
156 pub bundles_synced: usize,
157 pub operations_fetched: usize,
158 pub total_duration: Duration,
159}
160
161// ============================================================================
162// Sync Logger Trait
163// ============================================================================
164
165/// Trait for logging sync events
166pub trait SyncLogger: Send + Sync {
167 fn on_sync_start(&self, interval: Duration);
168
169 #[allow(clippy::too_many_arguments)]
170 fn on_bundle_created(
171 &self,
172 bundle_num: u32,
173 hash: &str,
174 age: &str,
175 fetch_duration_ms: u64,
176 bundle_save_ms: u64,
177 index_ms: u64,
178 total_duration_ms: u64,
179 fetch_requests: usize,
180 did_index_compacted: bool,
181 unique_dids: u32,
182 size_bytes: u64,
183 fetch_wait_ms: u64,
184 fetch_http_ms: u64,
185 );
186
187 // Allow the sync logger to accept multiple arguments for detailed bundle info
188 // (Removed workaround method; use allow attribute on trait method instead)
189
190 fn on_caught_up(
191 &self,
192 next_bundle: u32,
193 mempool_count: usize,
194 new_ops: usize,
195 fetch_duration_ms: u64,
196 );
197
198 fn on_initial_sync_complete(
199 &self,
200 total_bundles: u32,
201 mempool_count: usize,
202 interval: Duration,
203 );
204
205 fn on_error(&self, error: &str);
206
207 /// Get a reference to self as Any for downcasting
208 fn as_any(&self) -> &dyn Any;
209}
210
211/// Unified sync logger (used for both CLI and server)
212pub struct SyncLoggerImpl {
213 verbose: Option<Arc<Mutex<bool>>>,
214 interval: Option<Duration>,
215}
216
217impl SyncLoggerImpl {
218 /// Create a new logger for server/continuous mode (with verbose and interval)
219 pub fn new_server(verbose: bool, interval: Duration) -> Self {
220 Self {
221 verbose: Some(Arc::new(Mutex::new(verbose))),
222 interval: Some(interval),
223 }
224 }
225
226 /// Create a new logger for CLI/one-time mode
227 pub fn new_cli() -> Self {
228 Self {
229 verbose: None,
230 interval: None,
231 }
232 }
233
234 /// Get a clone of the verbose state Arc for external access (server mode only)
235 pub fn verbose_handle(&self) -> Option<Arc<Mutex<bool>>> {
236 self.verbose.clone()
237 }
238
239 /// Toggle verbose mode (server mode only)
240 pub fn toggle_verbose(&self) -> Option<bool> {
241 self.verbose.as_ref().map(|verbose| {
242 let mut v = verbose.lock().unwrap();
243 *v = !*v;
244 *v
245 })
246 }
247
248 /// Set verbose mode (server mode only)
249 pub fn set_verbose(&self, value: bool) {
250 if let Some(verbose) = &self.verbose {
251 let mut v = verbose.lock().unwrap();
252 *v = value;
253 }
254 }
255}
256
257impl SyncLogger for SyncLoggerImpl {
258 fn as_any(&self) -> &dyn Any {
259 self
260 }
261
262 fn on_sync_start(&self, interval: Duration) {
263 eprintln!("[Sync] Starting initial sync...");
264 if let Some(verbose) = &self.verbose
265 && *verbose.lock().unwrap()
266 {
267 eprintln!("[Sync] Sync loop interval: {:?}", interval);
268 }
269 }
270
271 #[allow(clippy::too_many_arguments)]
272 fn on_bundle_created(
273 &self,
274 bundle_num: u32,
275 hash: &str,
276 age: &str,
277 _fetch_duration_ms: u64,
278 bundle_save_ms: u64,
279 index_ms: u64,
280 _total_duration_ms: u64,
281 fetch_requests: usize,
282 _did_index_compacted: bool,
283 unique_dids: u32,
284 size_bytes: u64,
285 fetch_wait_ms: u64,
286 fetch_http_ms: u64,
287 ) {
288 let fetch_secs = fetch_http_ms as f64 / 1000.0;
289 let wait_secs = fetch_wait_ms as f64 / 1000.0;
290 let size_kb = size_bytes as f64 / 1024.0;
291 let size_str = if size_kb >= 1024.0 {
292 format!("{:.1}MB", size_kb / 1024.0)
293 } else {
294 format!("{:.0}KB", size_kb)
295 };
296 let base = format!(
297 "[INFO] → Bundle {:06} | {} | {} dids | {} | fetch: {:.2}s ({} reqs, {:.1}s wait) | save: {}ms",
298 bundle_num,
299 hash,
300 unique_dids,
301 size_str,
302 fetch_secs,
303 fetch_requests,
304 wait_secs,
305 bundle_save_ms
306 );
307 if index_ms > 0 {
308 eprintln!("{} | index: {}ms | {}", base, index_ms, age);
309 } else {
310 eprintln!("{} | {}", base, age);
311 }
312 }
313
314 fn on_caught_up(
315 &self,
316 next_bundle: u32,
317 mempool_count: usize,
318 new_ops: usize,
319 fetch_duration_ms: u64,
320 ) {
321 if new_ops > 0 {
322 eprintln!(
323 "[Sync] ✓ Bundle {:06} (upcoming) | mempool: {} ({:+}) | fetch: {}ms",
324 next_bundle, mempool_count, new_ops as i32, fetch_duration_ms
325 );
326 } else {
327 eprintln!(
328 "[Sync] ✓ Bundle {:06} (upcoming) | mempool: {} | fetch: {}ms",
329 next_bundle, mempool_count, fetch_duration_ms
330 );
331 }
332 }
333
334 fn on_initial_sync_complete(
335 &self,
336 total_bundles: u32,
337 mempool_count: usize,
338 _interval: Duration,
339 ) {
340 eprintln!(
341 "[Sync] ✓ Initial sync complete ({} bundles synced)",
342 total_bundles
343 );
344 if mempool_count > 0 {
345 eprintln!("[Sync] ✓ Mempool: {} operations", mempool_count);
346 }
347 // Only show monitoring message for continuous mode (when interval is stored)
348 if let Some(display_interval) = self.interval {
349 eprintln!(
350 "[Sync] Now monitoring for new operations (interval: {:?})...",
351 display_interval
352 );
353 }
354 }
355
356 fn on_error(&self, error: &str) {
357 eprintln!("[Sync] Error during sync: {}", error);
358 }
359}
360
361// ============================================================================
362// Sync Manager
363// ============================================================================
364
365pub struct SyncManager {
366 manager: std::sync::Arc<crate::manager::BundleManager>,
367 client: PLCClient,
368 config: SyncConfig,
369 logger: Option<Box<dyn SyncLogger>>,
370 #[allow(clippy::type_complexity)]
371 event_callback: Option<Box<dyn Fn(&SyncEvent) + Send + Sync>>,
372}
373
374impl SyncManager {
375 pub fn new(
376 manager: std::sync::Arc<crate::manager::BundleManager>,
377 client: PLCClient,
378 config: SyncConfig,
379 ) -> Self {
380 Self {
381 manager,
382 client,
383 config,
384 logger: None,
385 event_callback: None,
386 }
387 }
388
389 /// Set a logger for sync events (replaces default formatting)
390 pub fn with_logger<L>(mut self, logger: L) -> Self
391 where
392 L: SyncLogger + 'static,
393 {
394 self.logger = Some(Box::new(logger));
395 self
396 }
397
398 /// Set a custom event callback (for advanced use cases)
399 pub fn with_callback<F>(mut self, callback: F) -> Self
400 where
401 F: Fn(&SyncEvent) + Send + Sync + 'static,
402 {
403 self.event_callback = Some(Box::new(callback));
404 self
405 }
406
407 fn handle_event(&self, event: &SyncEvent) {
408 // First, call custom callback if provided
409 if let Some(callback) = &self.event_callback {
410 callback(event);
411 }
412
413 // Then, call logger if provided
414 if let Some(logger) = &self.logger {
415 match event {
416 SyncEvent::BundleCreated {
417 bundle_num,
418 hash,
419 age,
420 fetch_duration_ms,
421 bundle_save_ms,
422 index_ms,
423 total_duration_ms,
424 fetch_requests,
425 did_index_compacted,
426 unique_dids,
427 size_bytes,
428 fetch_wait_ms,
429 fetch_http_ms,
430 } => {
431 logger.on_bundle_created(
432 *bundle_num,
433 hash,
434 age,
435 *fetch_duration_ms,
436 *bundle_save_ms,
437 *index_ms,
438 *total_duration_ms,
439 *fetch_requests,
440 *did_index_compacted,
441 *unique_dids,
442 *size_bytes,
443 *fetch_wait_ms,
444 *fetch_http_ms,
445 );
446 }
447 SyncEvent::CaughtUp {
448 next_bundle,
449 mempool_count,
450 new_ops,
451 fetch_duration_ms,
452 } => {
453 logger.on_caught_up(*next_bundle, *mempool_count, *new_ops, *fetch_duration_ms);
454 }
455 SyncEvent::InitialSyncComplete {
456 total_bundles,
457 mempool_count,
458 } => {
459 logger.on_initial_sync_complete(
460 *total_bundles,
461 *mempool_count,
462 self.config.interval,
463 );
464 }
465 SyncEvent::Error { error } => {
466 logger.on_error(error);
467 }
468 }
469 }
470 }
471
472 /// Show compaction message if index was compacted during bundle sync
473 fn show_compaction_if_needed(
474 &self,
475 did_index_compacted: bool,
476 delta_segments_before: u64,
477 index_ms: u64,
478 ) {
479 if did_index_compacted {
480 let stats_after = self.manager.get_did_index_stats();
481 let delta_segments_after = stats_after
482 .get("delta_segments")
483 .and_then(|v| v.as_u64())
484 .unwrap_or(0);
485 let segments_compacted = delta_segments_before.saturating_sub(delta_segments_after);
486 eprintln!(
487 "[Sync] ✓ Index compacted | segments: {} → {} ({} removed) | index: {}ms",
488 delta_segments_before, delta_segments_after, segments_compacted, index_ms
489 );
490 }
491 }
492
493 pub async fn run_once(&self, max_bundles: Option<usize>) -> Result<usize> {
494 let mut synced = 0;
495
496 loop {
497 // Check for shutdown if configured
498 if let Some(ref shutdown_rx) = self.config.shutdown_rx
499 && *shutdown_rx.borrow()
500 {
501 break;
502 }
503
504 // Get stats before sync to track compaction
505 let stats_before = self.manager.get_did_index_stats();
506 let delta_segments_before = stats_before
507 .get("delta_segments")
508 .and_then(|v| v.as_u64())
509 .unwrap_or(0);
510
511 match self
512 .manager
513 .sync_next_bundle(
514 &self.client,
515 None,
516 true,
517 self.config.fetch_log,
518 Some(self.config.safety_lag),
519 )
520 .await
521 {
522 Ok(crate::manager::SyncResult::BundleCreated {
523 bundle_num,
524 mempool_count: _,
525 duration_ms,
526 fetch_duration_ms,
527 bundle_save_ms,
528 index_ms,
529 fetch_requests,
530 hash,
531 age,
532 did_index_compacted,
533 unique_dids,
534 size_bytes,
535 fetch_wait_ms,
536 fetch_http_ms,
537 }) => {
538 synced += 1;
539
540 self.handle_event(&SyncEvent::BundleCreated {
541 bundle_num,
542 hash,
543 age,
544 fetch_duration_ms,
545 bundle_save_ms,
546 index_ms,
547 total_duration_ms: duration_ms,
548 fetch_requests,
549 did_index_compacted,
550 unique_dids,
551 size_bytes,
552 fetch_wait_ms,
553 fetch_http_ms,
554 });
555
556 // Show compaction message if index was compacted
557 self.show_compaction_if_needed(
558 did_index_compacted,
559 delta_segments_before,
560 index_ms,
561 );
562
563 // Check if we've reached the limit
564 if let Some(max) = max_bundles
565 && synced >= max
566 {
567 break;
568 }
569 }
570 Ok(crate::manager::SyncResult::CaughtUp {
571 next_bundle,
572 mempool_count,
573 new_ops,
574 fetch_duration_ms,
575 }) => {
576 self.handle_event(&SyncEvent::CaughtUp {
577 next_bundle,
578 mempool_count,
579 new_ops,
580 fetch_duration_ms,
581 });
582 break;
583 }
584 Err(e) => {
585 let error_msg = e.to_string();
586 self.handle_event(&SyncEvent::Error {
587 error: error_msg.clone(),
588 });
589
590 // Trigger shutdown on error if configured
591 // This ensures the application terminates on persistent errors
592 if let Some(ref shutdown_tx) = self.config.shutdown_tx {
593 let _ = shutdown_tx.send(true);
594 }
595
596 return Err(e);
597 }
598 }
599
600 // Small delay between bundles
601 tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
602 }
603
604 Ok(synced)
605 }
606
607 pub async fn run_continuous(&self) -> Result<()> {
608 use tokio::time::sleep;
609
610 let mut total_synced = 0u32;
611 let mut is_initial_sync = true;
612 let mut did_index_batch_done = false;
613 let mut initial_sync_first_bundle: Option<u32> = None;
614
615 // Notify logger that sync is starting
616 if let Some(logger) = &self.logger {
617 logger.on_sync_start(self.config.interval);
618 }
619
620 // Keyboard handler for verbose toggle - DISABLED
621 // This feature is disabled because reading from stdin blocks shutdown.
622 // Users can still use verbose mode by passing --verbose flag at startup.
623 //
624 // TODO: Implement a non-blocking alternative using signals or other IPC
625 #[cfg(feature = "crossterm")]
626 {
627 // Keyboard input feature temporarily disabled to fix shutdown freeze
628 // The stdin reading was causing the server to hang on Ctrl+C
629 }
630
631 loop {
632 // Check for shutdown before starting sync
633 if let Some(ref shutdown_rx) = self.config.shutdown_rx
634 && *shutdown_rx.borrow()
635 {
636 if self.config.verbose {
637 eprintln!("[Sync] Shutdown requested, stopping...");
638 }
639 break;
640 }
641
642 // Update DID index on every bundle (now fast with delta segments)
643 // Get stats before sync to track compaction
644 let stats_before = self.manager.get_did_index_stats();
645 let delta_segments_before = stats_before
646 .get("delta_segments")
647 .and_then(|v| v.as_u64())
648 .unwrap_or(0);
649
650 let sync_result = self
651 .manager
652 .sync_next_bundle(
653 &self.client,
654 self.config.shutdown_rx.clone(),
655 !is_initial_sync,
656 self.config.fetch_log,
657 Some(self.config.safety_lag),
658 )
659 .await;
660
661 match sync_result {
662 Ok(crate::manager::SyncResult::BundleCreated {
663 bundle_num,
664 mempool_count: _,
665 duration_ms,
666 fetch_duration_ms,
667 bundle_save_ms,
668 index_ms,
669 fetch_requests,
670 hash,
671 age,
672 did_index_compacted,
673 unique_dids,
674 size_bytes,
675 fetch_wait_ms,
676 fetch_http_ms,
677 }) => {
678 total_synced += 1;
679 if is_initial_sync && initial_sync_first_bundle.is_none() {
680 initial_sync_first_bundle = Some(bundle_num);
681 }
682
683 // Reset error counter on successful sync
684 use std::sync::atomic::{AtomicU32, Ordering};
685 static CONSECUTIVE_ERRORS: AtomicU32 = AtomicU32::new(0);
686 CONSECUTIVE_ERRORS.store(0, Ordering::Relaxed);
687
688 self.handle_event(&SyncEvent::BundleCreated {
689 bundle_num,
690 hash,
691 age,
692 fetch_duration_ms,
693 bundle_save_ms,
694 index_ms,
695 total_duration_ms: duration_ms,
696 fetch_requests,
697 did_index_compacted,
698 unique_dids,
699 size_bytes,
700 fetch_wait_ms,
701 fetch_http_ms,
702 });
703
704 // Show compaction message if index was compacted
705 self.show_compaction_if_needed(
706 did_index_compacted,
707 delta_segments_before,
708 index_ms,
709 );
710
711 // Check max bundles limit
712 if self.config.max_bundles > 0
713 && total_synced as usize >= self.config.max_bundles
714 {
715 if self.config.verbose {
716 eprintln!(
717 "[Sync] Reached max bundles limit ({})",
718 self.config.max_bundles
719 );
720 }
721 break;
722 }
723
724 // Check for shutdown before sleeping
725 if let Some(ref shutdown_rx) = self.config.shutdown_rx
726 && *shutdown_rx.borrow()
727 {
728 if self.config.verbose {
729 eprintln!("[Sync] Shutdown requested, stopping...");
730 }
731 break;
732 }
733
734 // During initial sync, sleep briefly (500ms) to avoid hammering the API
735 // After initial sync, use the full interval
736 // Use select to allow cancellation during sleep
737 let sleep_duration = if is_initial_sync {
738 Duration::from_millis(500)
739 } else {
740 self.config.interval
741 };
742
743 if let Some(ref shutdown_rx) = self.config.shutdown_rx {
744 let mut shutdown_rx = shutdown_rx.clone();
745 tokio::select! {
746 _ = sleep(sleep_duration) => {}
747 _ = shutdown_rx.changed() => {
748 if *shutdown_rx.borrow() {
749 break;
750 }
751 }
752 }
753 } else {
754 sleep(sleep_duration).await;
755 }
756 }
757 Ok(crate::manager::SyncResult::CaughtUp {
758 next_bundle,
759 mempool_count,
760 new_ops,
761 fetch_duration_ms,
762 }) => {
763 // Check for shutdown
764 if let Some(ref shutdown_rx) = self.config.shutdown_rx
765 && *shutdown_rx.borrow()
766 {
767 if self.config.verbose {
768 eprintln!("[Sync] Shutdown requested, stopping...");
769 }
770 break;
771 }
772
773 // Caught up to the end of the chain
774 // When initial sync finishes, perform a single batch DID index update if the index is empty
775 // or if we created bundles during initial sync with per-bundle updates disabled.
776 if is_initial_sync && !did_index_batch_done {
777 let stats = self.manager.get_did_index_stats();
778 let total_dids = stats
779 .get("total_dids")
780 .and_then(|v| v.as_u64())
781 .unwrap_or(0);
782 let total_entries = stats
783 .get("total_entries")
784 .and_then(|v| v.as_u64())
785 .unwrap_or(0);
786
787 let end_bundle = self.manager.get_last_bundle();
788 let start_bundle = initial_sync_first_bundle.unwrap_or(1);
789
790 // Only run batch update if there are bundles to process and either index is empty
791 // or we created some bundles during this initial sync.
792 let created_bundles = total_synced > 0;
793 let index_is_empty = total_dids == 0 && total_entries == 0;
794 if end_bundle >= start_bundle && (index_is_empty || created_bundles) {
795 if self.config.verbose {
796 eprintln!(
797 "[Sync] Performing batch DID index update: {} → {} (index empty={}, created_bundles={})",
798 start_bundle, end_bundle, index_is_empty, created_bundles
799 );
800 }
801 if let Err(e) = self
802 .manager
803 .batch_update_did_index_async(start_bundle, end_bundle, true)
804 .await
805 {
806 eprintln!(
807 "[Sync] Batch DID index update failed after initial sync: {}",
808 e
809 );
810 } else {
811 did_index_batch_done = true;
812 }
813 }
814
815 is_initial_sync = false;
816 self.handle_event(&SyncEvent::InitialSyncComplete {
817 total_bundles: total_synced,
818 mempool_count,
819 });
820 }
821
822 self.handle_event(&SyncEvent::CaughtUp {
823 next_bundle,
824 mempool_count,
825 new_ops,
826 fetch_duration_ms,
827 });
828
829 // Always sleep for the full interval when caught up (monitoring mode)
830 // Use select to allow cancellation during sleep
831 if let Some(ref shutdown_rx) = self.config.shutdown_rx {
832 let mut shutdown_rx = shutdown_rx.clone();
833 tokio::select! {
834 _ = sleep(self.config.interval) => {}
835 _ = shutdown_rx.changed() => {
836 if *shutdown_rx.borrow() {
837 break;
838 }
839 }
840 }
841 } else {
842 sleep(self.config.interval).await;
843 }
844 }
845 Err(e) => {
846 let error_msg = e.to_string();
847 self.handle_event(&SyncEvent::Error {
848 error: error_msg.clone(),
849 });
850
851 // Determine if error is retryable
852 let is_retryable = is_retryable_error(&error_msg);
853
854 if is_retryable {
855 // Retry transient errors with exponential backoff
856 use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
857 static CONSECUTIVE_ERRORS: AtomicU32 = AtomicU32::new(0);
858 static LAST_ERROR_TIME_SECS: AtomicU64 = AtomicU64::new(0);
859
860 let now = std::time::SystemTime::now()
861 .duration_since(std::time::UNIX_EPOCH)
862 .unwrap()
863 .as_secs();
864
865 let last_error_secs = LAST_ERROR_TIME_SECS.load(Ordering::Relaxed);
866
867 // Reset error count if last error was more than 5 minutes ago
868 if last_error_secs > 0 && now - last_error_secs > 300 {
869 CONSECUTIVE_ERRORS.store(0, Ordering::Relaxed);
870 }
871
872 let error_count = CONSECUTIVE_ERRORS.fetch_add(1, Ordering::Relaxed) + 1;
873 LAST_ERROR_TIME_SECS.store(now, Ordering::Relaxed);
874
875 // Calculate backoff with exponential increase (cap at 5 minutes)
876 let backoff_secs = std::cmp::min(2u64.pow((error_count - 1).min(8)), 300);
877
878 if self.config.verbose || error_count == 1 {
879 eprintln!(
880 "[Sync] Retryable error (attempt {}): {}",
881 error_count, error_msg
882 );
883 eprintln!("[Sync] Retrying in {} seconds...", backoff_secs);
884 }
885
886 // Too many consecutive errors - give up
887 if error_count >= 10 {
888 eprintln!(
889 "[Sync] Too many consecutive errors ({}) - shutting down",
890 error_count
891 );
892
893 if let Some(ref shutdown_tx) = self.config.shutdown_tx {
894 let _ = shutdown_tx.send(true);
895 }
896 return Err(e);
897 }
898
899 // Wait with backoff, checking for shutdown
900 let backoff_duration = Duration::from_secs(backoff_secs);
901 if let Some(ref shutdown_rx) = self.config.shutdown_rx {
902 let mut shutdown_rx = shutdown_rx.clone();
903 tokio::select! {
904 _ = sleep(backoff_duration) => {}
905 _ = shutdown_rx.changed() => {
906 if *shutdown_rx.borrow() {
907 return Ok(());
908 }
909 }
910 }
911 } else {
912 sleep(backoff_duration).await;
913 }
914 } else {
915 // Fatal error - shutdown immediately
916 eprintln!("[Sync] Fatal error - shutting down: {}", error_msg);
917
918 if let Some(ref shutdown_tx) = self.config.shutdown_tx {
919 let _ = shutdown_tx.send(true);
920 }
921 return Err(e);
922 }
923 }
924 }
925 }
926
927 Ok(())
928 }
929}
930
931// ============================================================================
932// Helper Functions
933// ============================================================================
934
935/// Determine if an error is retryable
936fn is_retryable_error(error_msg: &str) -> bool {
937 let error_lower = error_msg.to_lowercase();
938
939 // Retryable errors (transient issues)
940 let retryable_patterns = [
941 "connection",
942 "timeout",
943 "network",
944 "temporary",
945 "unavailable",
946 "too many",
947 "rate limit",
948 "503",
949 "502",
950 "504",
951 "broken pipe",
952 "connection reset",
953 "dns",
954 "io error",
955 "interrupted",
956 "would block",
957 ];
958
959 // Fatal errors (permanent issues that won't be fixed by retrying)
960 let fatal_patterns = [
961 "no such file",
962 "permission denied",
963 "disk full",
964 "quota exceeded",
965 "corrupted",
966 "invalid",
967 "parse error",
968 "404",
969 "403",
970 "401",
971 "shutdown requested",
972 ];
973
974 // Check for fatal patterns first
975 for pattern in &fatal_patterns {
976 if error_lower.contains(pattern) {
977 return false;
978 }
979 }
980
981 // Check for retryable patterns
982 for pattern in &retryable_patterns {
983 if error_lower.contains(pattern) {
984 return true;
985 }
986 }
987
988 // Default: retry unknown errors (conservative approach)
989 // This prevents the server from crashing on unexpected errors
990 true
991}
992
993#[cfg(test)]
994mod tests {
995 use super::*;
996
997 #[test]
998 fn test_boundary_cids() {
999 let ops = vec![
1000 Operation {
1001 did: "did:plc:1".into(),
1002 operation: sonic_rs::from_str("null").unwrap_or_else(|_| Value::new()),
1003 cid: Some("cid1".into()),
1004 nullified: false,
1005 created_at: "2024-01-01T00:00:00Z".into(),
1006 extra: sonic_rs::from_str("null").unwrap_or_else(|_| Value::new()),
1007 raw_json: None,
1008 },
1009 Operation {
1010 did: "did:plc:2".into(),
1011 operation: sonic_rs::from_str("null").unwrap_or_else(|_| Value::new()),
1012 cid: Some("cid2".into()),
1013 nullified: false,
1014 created_at: "2024-01-01T00:00:01Z".into(),
1015 extra: sonic_rs::from_str("null").unwrap_or_else(|_| Value::new()),
1016 raw_json: None,
1017 },
1018 Operation {
1019 did: "did:plc:3".into(),
1020 operation: sonic_rs::from_str("null").unwrap_or_else(|_| Value::new()),
1021 cid: Some("cid3".into()),
1022 nullified: false,
1023 created_at: "2024-01-01T00:00:01Z".into(), // Same time as cid2
1024 extra: sonic_rs::from_str("null").unwrap_or_else(|_| Value::new()),
1025 raw_json: None,
1026 },
1027 ];
1028
1029 let boundary = get_boundary_cids(&ops);
1030 assert_eq!(boundary.len(), 2);
1031 assert!(boundary.contains("cid2"));
1032 assert!(boundary.contains("cid3"));
1033 }
1034
1035 #[test]
1036 fn test_strip_duplicates() {
1037 let mut prev = HashSet::new();
1038 prev.insert("cid1".to_string());
1039
1040 let ops = vec![
1041 Operation {
1042 did: "did:plc:1".into(),
1043 operation: sonic_rs::from_str("null").unwrap_or_else(|_| Value::new()),
1044 cid: Some("cid1".into()), // Duplicate
1045 nullified: false,
1046 created_at: "2024-01-01T00:00:00Z".into(),
1047 extra: sonic_rs::from_str("null").unwrap_or_else(|_| Value::new()),
1048 raw_json: None,
1049 },
1050 Operation {
1051 did: "did:plc:2".into(),
1052 operation: sonic_rs::from_str("null").unwrap_or_else(|_| Value::new()),
1053 cid: Some("cid2".into()), // New
1054 nullified: false,
1055 created_at: "2024-01-01T00:00:01Z".into(),
1056 extra: sonic_rs::from_str("null").unwrap_or_else(|_| Value::new()),
1057 raw_json: None,
1058 },
1059 ];
1060
1061 let result = strip_boundary_duplicates(ops, &prev);
1062 assert_eq!(result.len(), 1);
1063 assert_eq!(result[0].cid.as_ref().unwrap(), "cid2");
1064 }
1065
1066 // Additional comprehensive tests
1067 fn create_operation_helper(did: &str, cid: Option<&str>, created_at: &str) -> Operation {
1068 Operation {
1069 did: did.to_string(),
1070 operation: Value::new(),
1071 cid: cid.map(|s| s.to_string()),
1072 nullified: false,
1073 created_at: created_at.to_string(),
1074 extra: Value::new(),
1075 raw_json: None,
1076 }
1077 }
1078
1079 #[test]
1080 fn test_get_boundary_cids_empty() {
1081 let ops = vec![];
1082 let cids = get_boundary_cids(&ops);
1083 assert!(cids.is_empty());
1084 }
1085
1086 #[test]
1087 fn test_get_boundary_cids_single() {
1088 let ops = vec![create_operation_helper(
1089 "did:plc:test",
1090 Some("cid1"),
1091 "2024-01-01T00:00:00Z",
1092 )];
1093 let cids = get_boundary_cids(&ops);
1094 assert_eq!(cids.len(), 1);
1095 assert!(cids.contains("cid1"));
1096 }
1097
1098 #[test]
1099 fn test_get_boundary_cids_multiple_same_time() {
1100 let ops = vec![
1101 create_operation_helper("did:plc:test1", Some("cid1"), "2024-01-01T00:00:00Z"),
1102 create_operation_helper("did:plc:test2", Some("cid2"), "2024-01-01T00:00:00Z"),
1103 create_operation_helper("did:plc:test3", Some("cid3"), "2024-01-01T00:00:00Z"),
1104 ];
1105 let cids = get_boundary_cids(&ops);
1106 assert_eq!(cids.len(), 3);
1107 assert!(cids.contains("cid1"));
1108 assert!(cids.contains("cid2"));
1109 assert!(cids.contains("cid3"));
1110 }
1111
1112 #[test]
1113 fn test_get_boundary_cids_different_times() {
1114 let ops = vec![
1115 create_operation_helper("did:plc:test1", Some("cid1"), "2024-01-01T00:00:00Z"),
1116 create_operation_helper("did:plc:test2", Some("cid2"), "2024-01-01T00:00:01Z"),
1117 create_operation_helper("did:plc:test3", Some("cid3"), "2024-01-01T00:00:01Z"),
1118 ];
1119 let cids = get_boundary_cids(&ops);
1120 assert_eq!(cids.len(), 2);
1121 assert!(!cids.contains("cid1"));
1122 assert!(cids.contains("cid2"));
1123 assert!(cids.contains("cid3"));
1124 }
1125
1126 #[test]
1127 fn test_get_boundary_cids_no_cid() {
1128 let ops = vec![
1129 create_operation_helper("did:plc:test1", None, "2024-01-01T00:00:00Z"),
1130 create_operation_helper("did:plc:test2", Some("cid2"), "2024-01-01T00:00:00Z"),
1131 ];
1132 let cids = get_boundary_cids(&ops);
1133 assert_eq!(cids.len(), 1);
1134 assert!(cids.contains("cid2"));
1135 }
1136
1137 #[test]
1138 fn test_strip_boundary_duplicates_empty() {
1139 let ops = vec![];
1140 let prev_boundary = HashSet::new();
1141 let result = strip_boundary_duplicates(ops, &prev_boundary);
1142 assert!(result.is_empty());
1143 }
1144
1145 #[test]
1146 fn test_strip_boundary_duplicates_no_prev_boundary() {
1147 let ops = vec![
1148 create_operation_helper("did:plc:test1", Some("cid1"), "2024-01-01T00:00:00Z"),
1149 create_operation_helper("did:plc:test2", Some("cid2"), "2024-01-01T00:00:00Z"),
1150 ];
1151 let prev_boundary = HashSet::new();
1152 let result = strip_boundary_duplicates(ops, &prev_boundary);
1153 assert_eq!(result.len(), 2);
1154 }
1155
1156 #[test]
1157 fn test_strip_boundary_duplicates_with_matches() {
1158 let ops = vec![
1159 create_operation_helper("did:plc:test1", Some("cid1"), "2024-01-01T00:00:00Z"),
1160 create_operation_helper("did:plc:test2", Some("cid2"), "2024-01-01T00:00:00Z"),
1161 create_operation_helper("did:plc:test3", Some("cid3"), "2024-01-01T00:00:00Z"),
1162 ];
1163 let mut prev_boundary = HashSet::new();
1164 prev_boundary.insert("cid2".to_string());
1165 let result = strip_boundary_duplicates(ops, &prev_boundary);
1166 assert_eq!(result.len(), 2);
1167 assert_eq!(result[0].cid, Some("cid1".to_string()));
1168 assert_eq!(result[1].cid, Some("cid3".to_string()));
1169 }
1170
1171 #[test]
1172 fn test_strip_boundary_duplicates_no_cid() {
1173 let ops = vec![
1174 create_operation_helper("did:plc:test1", None, "2024-01-01T00:00:00Z"),
1175 create_operation_helper("did:plc:test2", Some("cid2"), "2024-01-01T00:00:00Z"),
1176 ];
1177 let mut prev_boundary = HashSet::new();
1178 prev_boundary.insert("cid2".to_string());
1179 let result = strip_boundary_duplicates(ops, &prev_boundary);
1180 assert_eq!(result.len(), 1);
1181 assert_eq!(result[0].cid, None);
1182 }
1183
1184 #[test]
1185 fn test_strip_boundary_duplicates_all_matched() {
1186 let ops = vec![
1187 create_operation_helper("did:plc:test1", Some("cid1"), "2024-01-01T00:00:00Z"),
1188 create_operation_helper("did:plc:test2", Some("cid2"), "2024-01-01T00:00:00Z"),
1189 ];
1190 let mut prev_boundary = HashSet::new();
1191 prev_boundary.insert("cid1".to_string());
1192 prev_boundary.insert("cid2".to_string());
1193 let result = strip_boundary_duplicates(ops, &prev_boundary);
1194 assert!(result.is_empty());
1195 }
1196}