High-performance implementation of plcbundle written in Rust
at main 192 lines 6.9 kB view raw
1use super::utils; 2use super::utils::HasGlobalFlags; 3use anyhow::Result; 4use clap::{Args, ValueHint}; 5use plcbundle::{ 6 constants, 7 plc_client::PLCClient, 8 runtime::BundleRuntime, 9 sync::{SyncConfig, SyncLoggerImpl, SyncManager}, 10}; 11use std::path::PathBuf; 12use std::sync::Arc; 13use std::time::Duration; 14 15fn parse_duration(s: &str) -> Result<Duration, String> { 16 if let Some(s) = s.strip_suffix('s') { 17 s.parse::<u64>() 18 .map(Duration::from_secs) 19 .map_err(|e| e.to_string()) 20 } else if let Some(s) = s.strip_suffix('m') { 21 s.parse::<u64>() 22 .map(|m| Duration::from_secs(m * 60)) 23 .map_err(|e| e.to_string()) 24 } else { 25 s.parse::<u64>() 26 .map(Duration::from_secs) 27 .map_err(|e| e.to_string()) 28 } 29} 30 31#[derive(Args)] 32#[command( 33 about = "Fetch new bundles from PLC directory", 34 long_about = "Download new operations from the PLC directory and create bundles in your 35local repository. Similar to 'git fetch', this command updates your repository 36with new data from the remote source without modifying existing bundles. 37 38The command fetches operations starting from where your repository left off, 39creates new bundles when 10,000 operations accumulate, and updates the index. 40Use --max-bundles to limit how many bundles are fetched in a single run, or 41use --continuous to run as a daemon that periodically checks for and fetches 42new bundles. 43 44In continuous mode, the command runs indefinitely, checking for new bundles 45at the specified interval (default 60 seconds). This is useful for keeping 46a repository up-to-date automatically. For one-time syncs, omit --continuous. 47 48This is the primary way to populate and update your repository with data from 49the PLC directory. The command handles rate limiting, error recovery, and 50maintains chain integrity throughout the sync process.", 51 alias = "fetch", 52 help_template = crate::clap_help!( 53 examples: " # Fetch new bundles once\n \ 54 {bin} sync\n\n \ 55 # Run continuously (daemon mode)\n \ 56 {bin} sync --continuous\n\n \ 57 # Custom sync interval\n \ 58 {bin} sync --continuous --interval 30s\n\n \ 59 # Fetch maximum 10 bundles then stop\n \ 60 {bin} sync --max-bundles 10" 61 ) 62)] 63pub struct SyncCommand { 64 /// PLC directory URL 65 #[arg(long, default_value = constants::DEFAULT_PLC_DIRECTORY_URL, value_hint = ValueHint::Url)] 66 pub plc: String, 67 68 /// Keep syncing (run as daemon) 69 #[arg(long, conflicts_with = "max_bundles")] 70 pub continuous: bool, 71 72 /// Sync interval for continuous mode 73 #[arg(long, default_value = "60s", value_parser = parse_duration)] 74 pub interval: Duration, 75 76 /// Maximum bundles to fetch (0 = all, only for one-time sync) 77 #[arg(long, default_value = "0", conflicts_with = "continuous")] 78 pub max_bundles: usize, 79 80 /// Enable extended per-request fetch logging 81 #[arg(long)] 82 pub fetch_log: bool, 83} 84 85impl HasGlobalFlags for SyncCommand { 86 fn verbose(&self) -> bool { 87 false 88 } 89 fn quiet(&self) -> bool { 90 false 91 } 92} 93 94pub fn run(cmd: SyncCommand, dir: PathBuf, global_quiet: bool, global_verbose: bool) -> Result<()> { 95 tokio::runtime::Runtime::new()?.block_on(async { 96 if !global_quiet { 97 println!("Syncing from: {}", utils::display_path(&dir).display()); 98 println!("PLC Directory: {}", cmd.plc); 99 if cmd.continuous { 100 println!("Mode: continuous (interval: {:?})", cmd.interval); 101 } 102 } 103 104 let client = PLCClient::new(&cmd.plc)?; 105 let manager = Arc::new(super::utils::create_manager( 106 dir.clone(), 107 global_verbose, 108 global_quiet, 109 false, 110 )?); 111 112 let config = SyncConfig { 113 plc_url: cmd.plc.clone(), 114 continuous: cmd.continuous, 115 interval: cmd.interval, 116 max_bundles: cmd.max_bundles, 117 verbose: global_verbose, 118 shutdown_rx: None, 119 shutdown_tx: None, 120 fetch_log: cmd.fetch_log, 121 safety_lag: Duration::from_millis(constants::DEFAULT_SAFETY_LAG_MS), 122 }; 123 124 let quiet = global_quiet; 125 126 if cmd.continuous { 127 // For continuous mode, use run_continuous() with SyncLoggerImpl and BundleRuntime for graceful shutdown 128 let runtime = BundleRuntime::new(); 129 let shutdown_signal = runtime.shutdown_signal(); 130 let shutdown_sender = runtime.shutdown_sender(); 131 132 let config = SyncConfig { 133 plc_url: cmd.plc.clone(), 134 continuous: true, 135 interval: cmd.interval, 136 max_bundles: 0, 137 verbose: global_verbose, 138 shutdown_rx: Some(shutdown_signal), 139 shutdown_tx: Some(shutdown_sender), 140 fetch_log: cmd.fetch_log, 141 safety_lag: Duration::from_millis(constants::DEFAULT_SAFETY_LAG_MS), 142 }; 143 144 let logger = SyncLoggerImpl::new_server(global_verbose, cmd.interval); 145 let sync_manager = SyncManager::new(manager, client, config).with_logger(logger); 146 147 // Run sync with graceful shutdown handling 148 tokio::select! { 149 result = sync_manager.run_continuous() => { 150 if let Err(e) = result { 151 eprintln!("[Sync] Sync error: {}", e); 152 runtime.trigger_fatal_shutdown(); 153 return Err(e); 154 } 155 } 156 _ = runtime.create_shutdown_future() => { 157 if !global_quiet { 158 eprintln!("\nShutdown signal received, stopping sync..."); 159 } 160 } 161 } 162 163 // Use common shutdown cleanup handler (no tasks to clean up for sync) 164 runtime 165 .wait_for_shutdown_cleanup::<()>("Sync", None, None) 166 .await; 167 168 Ok(()) 169 } else { 170 // For one-time sync, use run_once() with logger 171 let logger = SyncLoggerImpl::new_cli(); 172 let sync_manager = SyncManager::new(manager, client, config).with_logger(logger); 173 174 let max_bundles = if cmd.max_bundles > 0 { 175 Some(cmd.max_bundles) 176 } else { 177 None 178 }; 179 let synced = sync_manager.run_once(max_bundles).await?; 180 181 if !quiet { 182 if synced == 0 { 183 eprintln!("✓ Already up to date"); 184 } else { 185 eprintln!("✓ Sync complete: {} bundle(s) fetched", synced); 186 } 187 } 188 189 Ok(()) 190 } 191 }) 192}