forked from
atscan.net/plcbundle-rs
High-performance implementation of plcbundle written in Rust
1use super::utils;
2use super::utils::HasGlobalFlags;
3use anyhow::Result;
4use clap::{Args, ValueHint};
5use plcbundle::{
6 constants,
7 plc_client::PLCClient,
8 runtime::BundleRuntime,
9 sync::{SyncConfig, SyncLoggerImpl, SyncManager},
10};
11use std::path::PathBuf;
12use std::sync::Arc;
13use std::time::Duration;
14
15fn parse_duration(s: &str) -> Result<Duration, String> {
16 if let Some(s) = s.strip_suffix('s') {
17 s.parse::<u64>()
18 .map(Duration::from_secs)
19 .map_err(|e| e.to_string())
20 } else if let Some(s) = s.strip_suffix('m') {
21 s.parse::<u64>()
22 .map(|m| Duration::from_secs(m * 60))
23 .map_err(|e| e.to_string())
24 } else {
25 s.parse::<u64>()
26 .map(Duration::from_secs)
27 .map_err(|e| e.to_string())
28 }
29}
30
31#[derive(Args)]
32#[command(
33 about = "Fetch new bundles from PLC directory",
34 long_about = "Download new operations from the PLC directory and create bundles in your
35local repository. Similar to 'git fetch', this command updates your repository
36with new data from the remote source without modifying existing bundles.
37
38The command fetches operations starting from where your repository left off,
39creates new bundles when 10,000 operations accumulate, and updates the index.
40Use --max-bundles to limit how many bundles are fetched in a single run, or
41use --continuous to run as a daemon that periodically checks for and fetches
42new bundles.
43
44In continuous mode, the command runs indefinitely, checking for new bundles
45at the specified interval (default 60 seconds). This is useful for keeping
46a repository up-to-date automatically. For one-time syncs, omit --continuous.
47
48This is the primary way to populate and update your repository with data from
49the PLC directory. The command handles rate limiting, error recovery, and
50maintains chain integrity throughout the sync process.",
51 alias = "fetch",
52 help_template = crate::clap_help!(
53 examples: " # Fetch new bundles once\n \
54 {bin} sync\n\n \
55 # Run continuously (daemon mode)\n \
56 {bin} sync --continuous\n\n \
57 # Custom sync interval\n \
58 {bin} sync --continuous --interval 30s\n\n \
59 # Fetch maximum 10 bundles then stop\n \
60 {bin} sync --max-bundles 10"
61 )
62)]
63pub struct SyncCommand {
64 /// PLC directory URL
65 #[arg(long, default_value = constants::DEFAULT_PLC_DIRECTORY_URL, value_hint = ValueHint::Url)]
66 pub plc: String,
67
68 /// Keep syncing (run as daemon)
69 #[arg(long, conflicts_with = "max_bundles")]
70 pub continuous: bool,
71
72 /// Sync interval for continuous mode
73 #[arg(long, default_value = "60s", value_parser = parse_duration)]
74 pub interval: Duration,
75
76 /// Maximum bundles to fetch (0 = all, only for one-time sync)
77 #[arg(long, default_value = "0", conflicts_with = "continuous")]
78 pub max_bundles: usize,
79
80 /// Enable extended per-request fetch logging
81 #[arg(long)]
82 pub fetch_log: bool,
83}
84
85impl HasGlobalFlags for SyncCommand {
86 fn verbose(&self) -> bool {
87 false
88 }
89 fn quiet(&self) -> bool {
90 false
91 }
92}
93
94pub fn run(cmd: SyncCommand, dir: PathBuf, global_quiet: bool, global_verbose: bool) -> Result<()> {
95 tokio::runtime::Runtime::new()?.block_on(async {
96 if !global_quiet {
97 println!("Syncing from: {}", utils::display_path(&dir).display());
98 println!("PLC Directory: {}", cmd.plc);
99 if cmd.continuous {
100 println!("Mode: continuous (interval: {:?})", cmd.interval);
101 }
102 }
103
104 let client = PLCClient::new(&cmd.plc)?;
105 let manager = Arc::new(super::utils::create_manager(
106 dir.clone(),
107 global_verbose,
108 global_quiet,
109 false,
110 )?);
111
112 let config = SyncConfig {
113 plc_url: cmd.plc.clone(),
114 continuous: cmd.continuous,
115 interval: cmd.interval,
116 max_bundles: cmd.max_bundles,
117 verbose: global_verbose,
118 shutdown_rx: None,
119 shutdown_tx: None,
120 fetch_log: cmd.fetch_log,
121 safety_lag: Duration::from_millis(constants::DEFAULT_SAFETY_LAG_MS),
122 };
123
124 let quiet = global_quiet;
125
126 if cmd.continuous {
127 // For continuous mode, use run_continuous() with SyncLoggerImpl and BundleRuntime for graceful shutdown
128 let runtime = BundleRuntime::new();
129 let shutdown_signal = runtime.shutdown_signal();
130 let shutdown_sender = runtime.shutdown_sender();
131
132 let config = SyncConfig {
133 plc_url: cmd.plc.clone(),
134 continuous: true,
135 interval: cmd.interval,
136 max_bundles: 0,
137 verbose: global_verbose,
138 shutdown_rx: Some(shutdown_signal),
139 shutdown_tx: Some(shutdown_sender),
140 fetch_log: cmd.fetch_log,
141 safety_lag: Duration::from_millis(constants::DEFAULT_SAFETY_LAG_MS),
142 };
143
144 let logger = SyncLoggerImpl::new_server(global_verbose, cmd.interval);
145 let sync_manager = SyncManager::new(manager, client, config).with_logger(logger);
146
147 // Run sync with graceful shutdown handling
148 tokio::select! {
149 result = sync_manager.run_continuous() => {
150 if let Err(e) = result {
151 eprintln!("[Sync] Sync error: {}", e);
152 runtime.trigger_fatal_shutdown();
153 return Err(e);
154 }
155 }
156 _ = runtime.create_shutdown_future() => {
157 if !global_quiet {
158 eprintln!("\nShutdown signal received, stopping sync...");
159 }
160 }
161 }
162
163 // Use common shutdown cleanup handler (no tasks to clean up for sync)
164 runtime
165 .wait_for_shutdown_cleanup::<()>("Sync", None, None)
166 .await;
167
168 Ok(())
169 } else {
170 // For one-time sync, use run_once() with logger
171 let logger = SyncLoggerImpl::new_cli();
172 let sync_manager = SyncManager::new(manager, client, config).with_logger(logger);
173
174 let max_bundles = if cmd.max_bundles > 0 {
175 Some(cmd.max_bundles)
176 } else {
177 None
178 };
179 let synced = sync_manager.run_once(max_bundles).await?;
180
181 if !quiet {
182 if synced == 0 {
183 eprintln!("✓ Already up to date");
184 } else {
185 eprintln!("✓ Sync complete: {} bundle(s) fetched", synced);
186 }
187 }
188
189 Ok(())
190 }
191 })
192}