High-performance implementation of plcbundle written in Rust
at main 187 lines 7.1 kB view raw
1// Server command - start HTTP server 2use anyhow::Result; 3use clap::{Args, ValueHint}; 4use std::path::PathBuf; 5 6#[cfg(feature = "server")] 7use tokio::time::Duration; 8 9#[cfg(feature = "server")] 10use super::progress::ProgressBar; 11#[cfg(feature = "server")] 12use plcbundle::server::{ProgressCallbackFactory, StartupConfig, start_server}; 13#[cfg(feature = "server")] 14use std::sync::{Arc, Mutex}; 15 16#[cfg(feature = "server")] 17fn parse_duration_for_clap(s: &str) -> Result<Duration, String> { 18 plcbundle::server::parse_duration(s).map_err(|e| e.to_string()) 19} 20 21#[derive(Args)] 22#[command( 23 about = "Start HTTP server", 24 long_about = "Start an HTTP server to expose bundle data and repository functionality over 25HTTP. The server provides RESTful endpoints for accessing bundles, operations, 26DID documents, and repository metadata. 27 28In standard mode, the server operates as a read-only archive, serving data from 29existing bundles. Enable --sync to run in daemon mode, where the server continuously 30fetches new bundles from the PLC directory in the background while serving requests. 31 32Optional features include WebSocket streaming (--websocket) for real-time updates 33and DID resolution endpoints (--resolver) that enable W3C DID document resolution 34and handle-to-DID lookups. When resolver is enabled, the server automatically builds 35or updates the DID index on startup if needed. 36 37This is the primary way to expose your repository to other systems, applications, 38or users over the network. The server is designed for production use with proper 39error handling, graceful shutdown, and resource management.", 40 help_template = crate::clap_help!( 41 examples: " # Start server on default port (8080)\n \ 42 {bin} server\n\n \ 43 # Custom host and port\n \ 44 {bin} server --host 0.0.0.0 --port 3000\n\n \ 45 # Server with sync mode (daemon)\n \ 46 {bin} server --sync\n\n \ 47 # Sync with custom interval\n \ 48 {bin} server --sync --interval 30s\n\n \ 49 # Enable WebSocket streaming\n \ 50 {bin} server --websocket\n\n \ 51 # Enable DID resolution endpoints\n \ 52 {bin} server --resolver\n\n \ 53 # Full-featured server\n \ 54 {bin} server --sync --websocket --resolver --port 8080" 55 ) 56)] 57pub struct ServerCommand { 58 /// HTTP server port 59 #[arg(long, default_value = "8080", help_heading = "Server Options")] 60 pub port: u16, 61 62 /// HTTP server host 63 #[arg(long, default_value = "127.0.0.1", help_heading = "Server Options")] 64 pub host: String, 65 66 /// Enable sync mode (run as daemon, continuously fetch from PLC) 67 #[arg(short, long, help_heading = "Sync Options")] 68 pub sync: bool, 69 70 /// PLC directory URL (for sync mode) 71 #[arg(long, default_value = crate::constants::DEFAULT_PLC_DIRECTORY_URL, help_heading = "Sync Options", value_hint = ValueHint::Url)] 72 pub plc: String, 73 74 /// Sync interval (how often to check for new bundles) 75 #[cfg(feature = "server")] 76 #[arg(long, default_value = "60s", value_parser = parse_duration_for_clap, help_heading = "Sync Options")] 77 pub interval: Duration, 78 79 #[cfg(not(feature = "server"))] 80 #[arg(long, default_value = "60s", help_heading = "Sync Options")] 81 pub interval: String, 82 83 /// Maximum bundles to fetch (0 = unlimited) 84 #[arg(long, default_value = "0", help_heading = "Sync Options")] 85 pub max_bundles: u32, 86 87 /// Enable extended per-request fetch logging 88 #[arg(long, help_heading = "Sync Options")] 89 pub fetch_log: bool, 90 91 /// Enable WebSocket endpoint for streaming 92 #[arg(long, help_heading = "Feature Options")] 93 pub websocket: bool, 94 95 /// Enable DID resolution endpoints 96 #[arg(long, help_heading = "Feature Options")] 97 pub resolver: bool, 98 99 /// Handle resolver URL (defaults to quickdid.smokesignal.tools if not provided) 100 #[arg(long, help_heading = "Feature Options", value_hint = ValueHint::Url)] 101 pub handle_resolver: Option<String>, 102} 103 104pub fn run(cmd: ServerCommand, dir: PathBuf, global_verbose: bool) -> Result<()> { 105 #[cfg(not(feature = "server"))] 106 { 107 let _ = (cmd, dir, global_verbose); // Suppress unused warnings when server feature is disabled 108 anyhow::bail!("Server feature is not enabled. Rebuild with --features server"); 109 } 110 111 #[cfg(feature = "server")] 112 { 113 run_server(cmd, dir, global_verbose) 114 } 115} 116 117#[cfg(feature = "server")] 118fn run_server(cmd: ServerCommand, dir: PathBuf, global_verbose: bool) -> Result<()> { 119 use anyhow::Context; 120 use plcbundle::constants; 121 use tokio::runtime::Runtime; 122 123 // Create tokio runtime for async operations 124 let rt = Runtime::new().context("Failed to create tokio runtime")?; 125 126 // Determine handle resolver URL 127 let handle_resolver_url = if cmd.handle_resolver.is_none() { 128 Some(constants::DEFAULT_HANDLE_RESOLVER_URL.to_string()) 129 } else { 130 cmd.handle_resolver.clone() 131 }; 132 133 // Create startup config 134 let startup_config = StartupConfig { 135 dir, 136 sync: cmd.sync, 137 plc_url: cmd.plc, 138 handle_resolver_url, 139 enable_resolver: cmd.resolver, 140 verbose: global_verbose, 141 host: cmd.host, 142 port: cmd.port, 143 sync_interval: cmd.interval, 144 max_bundles: cmd.max_bundles, 145 enable_websocket: cmd.websocket, 146 fetch_log: cmd.fetch_log, 147 }; 148 149 // Create progress callback factory for DID index building (CLI-specific) 150 // This factory will be called with (last_bundle, total_bytes) when the index needs to be built 151 let progress_callback_factory: Option<ProgressCallbackFactory> = if cmd.resolver { 152 Some(Box::new(move |last_bundle: u32, total_bytes: u64| { 153 let progress = Arc::new(Mutex::new(ProgressBar::with_bytes( 154 last_bundle as usize, 155 total_bytes, 156 ))); 157 let progress_clone = progress.clone(); 158 let progress_finish = progress.clone(); 159 let verbose = global_verbose; 160 161 let callback = Box::new( 162 move |current: u32, _total: u32, bytes_processed: u64, _total_bytes: u64| { 163 let pb = progress_clone.lock().unwrap(); 164 pb.set_with_bytes(current as usize, bytes_processed); 165 if verbose && current.is_multiple_of(100) { 166 log::debug!( 167 "[DIDResolver] Index progress: {}/{} bundles", 168 current, 169 _total 170 ); 171 } 172 }, 173 ) as Box<dyn Fn(u32, u32, u64, u64) + Send + Sync>; 174 175 let finish = Box::new(move || { 176 let pb = progress_finish.lock().unwrap(); 177 pb.finish(); 178 }) as Box<dyn FnOnce() + Send + Sync>; 179 180 (callback, Some(finish)) 181 })) 182 } else { 183 None 184 }; 185 186 rt.block_on(start_server(startup_config, progress_callback_factory)) 187}