High-performance implementation of plcbundle written in Rust
at main 653 lines 22 kB view raw
1// Server startup and initialization logic 2// This module handles all the complex setup logic for starting the server, 3// including manager initialization, DID index building, handle resolver setup, etc. 4 5use crate::constants; 6use crate::manager::BundleManager; 7use crate::runtime::BundleRuntime; 8use crate::server::{Server, ServerConfig}; 9use anyhow::{Context, Result}; 10use std::path::PathBuf; 11use std::sync::Arc; 12use tokio::task::JoinSet; 13use tokio::time::Duration; 14 15/// Configuration for server startup 16pub struct StartupConfig { 17 pub dir: PathBuf, 18 pub sync: bool, 19 pub plc_url: String, 20 pub handle_resolver_url: Option<String>, 21 pub enable_resolver: bool, 22 pub verbose: bool, 23 pub host: String, 24 pub port: u16, 25 pub sync_interval: Duration, 26 pub max_bundles: u32, 27 pub enable_websocket: bool, 28 pub fetch_log: bool, 29} 30 31/// Progress callback for DID index building 32/// Arguments: (current_bundle, total_bundles, bytes_processed, total_bytes) 33pub type ProgressCallback = Box<dyn Fn(u32, u32, u64, u64) + Send + Sync>; 34 35/// Finish function for progress tracking (e.g., to finish a progress bar) 36pub type ProgressFinish = Box<dyn FnOnce() + Send + Sync>; 37 38/// Initialize and configure the BundleManager based on startup config 39pub fn initialize_manager(config: &StartupConfig) -> Result<BundleManager> { 40 let handle_resolver_url = if config.handle_resolver_url.is_none() { 41 if config.verbose { 42 log::debug!( 43 "[HandleResolver] Using default handle resolver: {}", 44 constants::DEFAULT_HANDLE_RESOLVER_URL 45 ); 46 } 47 Some(constants::DEFAULT_HANDLE_RESOLVER_URL.to_string()) 48 } else { 49 if config.verbose { 50 log::debug!( 51 "[HandleResolver] Using custom handle resolver: {}", 52 config.handle_resolver_url.as_ref().unwrap() 53 ); 54 } 55 config.handle_resolver_url.clone() 56 }; 57 58 // Preload mempool for server use (faster responses) 59 let preload_mempool = true; 60 61 let options = crate::ManagerOptions { 62 handle_resolver_url: handle_resolver_url.clone(), 63 preload_mempool, 64 verbose: config.verbose, 65 }; 66 67 let manager = if config.sync { 68 // Sync mode can auto-init 69 match BundleManager::new(config.dir.clone(), options.clone()) { 70 Ok(mgr) => mgr, 71 Err(_) => { 72 // Repository doesn't exist, try to initialize 73 BundleManager::init_repository(&config.dir, config.plc_url.clone(), false) 74 .context("Failed to initialize repository")?; 75 // Try again after initialization 76 BundleManager::new(config.dir.clone(), options) 77 .context("Failed to open repository after initialization")? 78 } 79 } 80 } else { 81 // Read-only mode cannot auto-init 82 BundleManager::new(config.dir.clone(), options).context(format!( 83 "Repository not found. Use '{} init' first or run with --sync", 84 constants::BINARY_NAME 85 ))? 86 }; 87 88 // Log handle resolver configuration 89 if config.verbose { 90 if let Some(url) = manager.get_handle_resolver_base_url() { 91 log::debug!( 92 "[HandleResolver] External handle resolver configured: {}", 93 url 94 ); 95 } else { 96 log::debug!("[HandleResolver] External handle resolver not configured"); 97 } 98 } 99 100 Ok(manager) 101} 102 103/// Build or verify DID index if resolver is enabled 104/// 105/// `progress_callback_factory` is an optional function that creates a progress callback 106/// and finish function given the total bundle count and total bytes. This allows the caller 107/// to create progress tracking (e.g., a progress bar) after knowing the bundle count. 108pub fn setup_did_index<F>( 109 manager: &mut BundleManager, 110 enable_resolver: bool, 111 verbose: bool, 112 progress_callback_factory: Option<F>, 113) -> Result<()> 114where 115 F: FnOnce(u32, u64) -> (ProgressCallback, Option<ProgressFinish>), 116{ 117 if !enable_resolver { 118 if verbose { 119 log::debug!("[DIDResolver] DID resolver disabled, skipping DID index check"); 120 } 121 return Ok(()); 122 } 123 124 if verbose { 125 log::debug!("[DIDResolver] Checking DID index status..."); 126 } 127 128 let did_index_stats = manager.get_did_index_stats(); 129 let total_dids = did_index_stats 130 .get("total_dids") 131 .and_then(|v| v.as_i64()) 132 .unwrap_or(0); 133 let total_entries = did_index_stats 134 .get("total_entries") 135 .and_then(|v| v.as_i64()) 136 .unwrap_or(0); 137 138 if verbose { 139 log::debug!( 140 "[DIDResolver] DID index stats: total_dids={}, total_entries={}", 141 total_dids, 142 total_entries 143 ); 144 } 145 146 if total_dids == 0 { 147 if verbose { 148 log::debug!("[DIDResolver] DID index is empty or missing"); 149 } 150 151 let last_bundle = manager.get_last_bundle(); 152 if verbose { 153 log::debug!("[DIDResolver] Last bundle number: {}", last_bundle); 154 } 155 156 // Check if repository is empty 157 let index = manager.get_index(); 158 if index.bundles.is_empty() { 159 eprintln!("⚠️ No bundles to index. DID resolution will not be available."); 160 eprintln!( 161 " Sync bundles first with '{} sync' or '{} server --sync'", 162 constants::BINARY_NAME, 163 constants::BINARY_NAME 164 ); 165 if verbose { 166 log::debug!("[DIDResolver] Skipping index build - no bundles available"); 167 } 168 return Ok(()); 169 } 170 171 eprintln!("Building DID index..."); 172 eprintln!("Indexing {} bundles\n", last_bundle); 173 174 if verbose { 175 log::debug!( 176 "[DIDResolver] Starting index rebuild for {} bundles", 177 last_bundle 178 ); 179 } 180 181 let start_time = std::time::Instant::now(); 182 183 // Calculate total uncompressed size for progress tracking 184 let bundle_numbers: Vec<u32> = (1..=last_bundle).collect(); 185 let total_uncompressed_size = index.total_uncompressed_size_for_bundles(&bundle_numbers); 186 187 // Build index with progress callback (create it now that we know the totals) 188 let (callback, finish_fn) = if let Some(factory) = progress_callback_factory { 189 let (cb, finish) = factory(last_bundle, total_uncompressed_size); 190 let callback = 191 move |current: u32, total: u32, bytes_processed: u64, total_bytes: u64| { 192 cb(current, total, bytes_processed, total_bytes); 193 }; 194 (Some(callback), finish) 195 } else { 196 (None, None) 197 }; 198 199 manager.build_did_index(constants::DID_INDEX_FLUSH_INTERVAL, callback, None, None)?; 200 201 // Finish progress tracking if provided 202 if let Some(finish) = finish_fn { 203 finish(); 204 } 205 206 let elapsed = start_time.elapsed(); 207 208 let stats = manager.get_did_index_stats(); 209 let final_total_dids = stats 210 .get("total_dids") 211 .and_then(|v| v.as_i64()) 212 .unwrap_or(0); 213 let final_total_entries = stats 214 .get("total_entries") 215 .and_then(|v| v.as_i64()) 216 .unwrap_or(0); 217 218 eprintln!("✓ DID index built"); 219 eprintln!(" Total DIDs: {}\n", final_total_dids); 220 221 if verbose { 222 log::debug!("[DIDResolver] Index build completed in {:?}", elapsed); 223 log::debug!( 224 "[DIDResolver] Final stats: total_dids={}, total_entries={}", 225 final_total_dids, 226 final_total_entries 227 ); 228 } 229 230 // Verify index integrity after building 231 if verbose { 232 log::debug!("[DIDResolver] Verifying DID index integrity..."); 233 } 234 let verify_result = 235 manager.verify_did_index::<fn(u32, u32, u64, u64)>(false, 64, false, None)?; 236 if verify_result.missing_base_shards > 0 { 237 anyhow::bail!( 238 "DID index is corrupted: {} missing base shard(s). Run '{} index repair' to fix.", 239 verify_result.missing_base_shards, 240 crate::constants::BINARY_NAME 241 ); 242 } 243 if verify_result.missing_delta_segments > 0 { 244 anyhow::bail!( 245 "DID index is corrupted: {} missing delta segment(s). Run '{} index repair' to fix.", 246 verify_result.missing_delta_segments, 247 crate::constants::BINARY_NAME 248 ); 249 } 250 if verbose { 251 log::debug!("[DIDResolver] DID index integrity check passed"); 252 } 253 } else { 254 if verbose { 255 log::debug!( 256 "[DIDResolver] DID index already exists with {} DIDs", 257 total_dids 258 ); 259 } 260 261 // Verify index integrity before starting server 262 if verbose { 263 log::debug!("[DIDResolver] Verifying DID index integrity..."); 264 } 265 let verify_result = 266 manager.verify_did_index::<fn(u32, u32, u64, u64)>(false, 64, false, None)?; 267 if verify_result.missing_base_shards > 0 { 268 anyhow::bail!( 269 "DID index is corrupted: {} missing base shard(s). Run '{} index repair' to fix.", 270 verify_result.missing_base_shards, 271 crate::constants::BINARY_NAME 272 ); 273 } 274 if verify_result.missing_delta_segments > 0 { 275 anyhow::bail!( 276 "DID index is corrupted: {} missing delta segment(s). Run '{} index repair' to fix.", 277 verify_result.missing_delta_segments, 278 crate::constants::BINARY_NAME 279 ); 280 } 281 if verbose { 282 log::debug!("[DIDResolver] DID index integrity check passed"); 283 } 284 } 285 286 Ok(()) 287} 288 289/// Test handle resolver on startup 290pub async fn test_handle_resolver(manager: &BundleManager, verbose: bool) -> Result<()> { 291 if let Some(handle_resolver) = manager.get_handle_resolver() { 292 if verbose { 293 log::debug!( 294 "[HandleResolver] Testing external handle resolver with handle resolution..." 295 ); 296 } 297 eprintln!("Testing handle resolver..."); 298 299 // Use a well-known handle that should always resolve 300 let test_handle = "bsky.app"; 301 let start = std::time::Instant::now(); 302 match handle_resolver.resolve_handle(test_handle).await { 303 Ok(did) => { 304 let duration = start.elapsed(); 305 eprintln!( 306 "✓ Handle resolver test successful ({:.3}s)", 307 duration.as_secs_f64() 308 ); 309 if verbose { 310 log::debug!( 311 "[HandleResolver] Test resolution of '{}' -> '{}' successful in {:.3}s", 312 test_handle, 313 did, 314 duration.as_secs_f64() 315 ); 316 } 317 } 318 Err(e) => { 319 let duration = start.elapsed(); 320 eprintln!( 321 "⚠️ Handle resolver test failed ({:.3}s): {}", 322 duration.as_secs_f64(), 323 e 324 ); 325 if verbose { 326 log::warn!( 327 "[HandleResolver] Test resolution of '{}' failed after {:.3}s: {}", 328 test_handle, 329 duration.as_secs_f64(), 330 e 331 ); 332 } 333 eprintln!(" Handle resolution endpoints may not work correctly."); 334 } 335 } 336 eprintln!(); 337 } 338 Ok(()) 339} 340 341/// Start the resolver ping loop to keep HTTP/2 connections alive 342pub async fn run_resolver_ping_loop( 343 resolver: Arc<crate::handle_resolver::HandleResolver>, 344 verbose: bool, 345 mut shutdown_rx: tokio::sync::watch::Receiver<bool>, 346) { 347 use tokio::time::{Duration, Instant, sleep}; 348 349 // Ping every 2 minutes to keep HTTP/2 connections alive 350 let ping_interval = Duration::from_secs(120); 351 352 if verbose { 353 log::debug!( 354 "[HandleResolver] Starting keep-alive ping loop (interval: {:?})", 355 ping_interval 356 ); 357 log::debug!( 358 "[HandleResolver] Handle resolver URL: {}", 359 resolver.get_base_url() 360 ); 361 } 362 363 // Initial delay before first ping 364 if verbose { 365 log::debug!("[HandleResolver] Waiting 30s before first ping..."); 366 } 367 sleep(Duration::from_secs(30)).await; 368 369 let mut ping_count = 0u64; 370 let mut success_count = 0u64; 371 let mut failure_count = 0u64; 372 373 loop { 374 // Check for shutdown 375 if *shutdown_rx.borrow() { 376 if verbose { 377 log::debug!("[HandleResolver] Shutdown requested, stopping ping loop"); 378 } 379 break; 380 } 381 382 ping_count += 1; 383 let start = Instant::now(); 384 385 if verbose { 386 log::debug!( 387 "[HandleResolver] Ping #{}: sending keep-alive request...", 388 ping_count 389 ); 390 } 391 392 match resolver.ping().await { 393 Ok(_) => { 394 success_count += 1; 395 let duration = start.elapsed(); 396 if verbose { 397 log::info!( 398 "[HandleResolver] Ping #{} successful in {:.3}s (success: {}/{}, failures: {})", 399 ping_count, 400 duration.as_secs_f64(), 401 success_count, 402 ping_count, 403 failure_count 404 ); 405 } 406 } 407 Err(e) => { 408 failure_count += 1; 409 let duration = start.elapsed(); 410 if verbose { 411 log::warn!( 412 "[HandleResolver] Ping #{} failed after {:.3}s: {} (success: {}/{}, failures: {})", 413 ping_count, 414 duration.as_secs_f64(), 415 e, 416 success_count, 417 ping_count, 418 failure_count 419 ); 420 } 421 // Continue anyway - the connection will be re-established on next actual request 422 } 423 } 424 425 if verbose { 426 log::debug!("[HandleResolver] Next ping in {:?}...", ping_interval); 427 } 428 429 // Use select to allow cancellation during sleep 430 tokio::select! { 431 _ = sleep(ping_interval) => {} 432 _ = shutdown_rx.changed() => { 433 if *shutdown_rx.borrow() { 434 break; 435 } 436 } 437 } 438 } 439} 440 441/// Setup sync loop if sync mode is enabled 442pub fn setup_sync_loop( 443 manager: Arc<BundleManager>, 444 config: &StartupConfig, 445 server_runtime: &BundleRuntime, 446 background_tasks: &mut JoinSet<()>, 447) { 448 if !config.sync { 449 return; 450 } 451 452 let manager_clone = Arc::clone(&manager); 453 let plc_url = config.plc_url.clone(); 454 let interval = config.sync_interval; 455 let max_bundles = config.max_bundles; 456 let verbose = config.verbose; 457 let fetch_log = config.fetch_log; 458 let shutdown_signal = server_runtime.shutdown_signal(); 459 let sync_runtime = server_runtime.clone(); 460 461 background_tasks.spawn(async move { 462 use crate::plc_client::PLCClient; 463 use crate::sync::{SyncConfig, SyncManager}; 464 465 // Create PLC client 466 let client = match PLCClient::new(&plc_url) { 467 Ok(c) => c, 468 Err(e) => { 469 eprintln!("[Sync] Failed to create PLC client: {}", e); 470 return; 471 } 472 }; 473 474 let sync_config = SyncConfig { 475 plc_url: plc_url.clone(), 476 continuous: true, 477 interval, 478 max_bundles: max_bundles as usize, 479 verbose, 480 shutdown_rx: Some(shutdown_signal), 481 shutdown_tx: Some(sync_runtime.shutdown_sender()), 482 fetch_log, 483 safety_lag: Duration::from_millis(constants::DEFAULT_SAFETY_LAG_MS), 484 }; 485 486 use crate::sync::SyncLoggerImpl; 487 let logger = SyncLoggerImpl::new_server(verbose, interval); 488 let sync_manager = SyncManager::new(manager_clone, client, sync_config).with_logger(logger); 489 490 if let Err(e) = sync_manager.run_continuous().await { 491 eprintln!("[Sync] Sync loop error: {}", e); 492 // Trigger fatal shutdown on sync errors 493 sync_runtime.trigger_fatal_shutdown(); 494 } 495 }); 496} 497 498/// Setup resolver ping loop if resolver is enabled 499pub fn setup_resolver_ping_loop( 500 manager: &BundleManager, 501 config: &StartupConfig, 502 server_runtime: &BundleRuntime, 503 resolver_tasks: &mut JoinSet<()>, 504) { 505 if !config.enable_resolver { 506 return; 507 } 508 509 if let Some(handle_resolver) = manager.get_handle_resolver() { 510 let verbose = config.verbose; 511 let shutdown_signal = server_runtime.shutdown_signal(); 512 resolver_tasks.spawn(async move { 513 run_resolver_ping_loop(handle_resolver, verbose, shutdown_signal).await; 514 }); 515 } 516} 517 518/// Progress callback factory type 519/// Takes (last_bundle, total_bytes) and returns (progress_callback, finish_function) 520pub type ProgressCallbackFactory = 521 Box<dyn FnOnce(u32, u64) -> (ProgressCallback, Option<ProgressFinish>) + Send + Sync>; 522 523/// Main server startup function that orchestrates all initialization 524pub async fn start_server( 525 config: StartupConfig, 526 progress_callback_factory: Option<ProgressCallbackFactory>, 527) -> Result<()> { 528 use std::net::SocketAddr; 529 530 // Initialize manager (with mempool preload for server use) 531 let mut manager = initialize_manager(&config)?; 532 533 // Setup DID index if resolver is enabled 534 setup_did_index( 535 &mut manager, 536 config.enable_resolver, 537 config.verbose, 538 progress_callback_factory, 539 )?; 540 541 // Test handle resolver on startup if resolver is enabled 542 if config.enable_resolver { 543 test_handle_resolver(&manager, config.verbose).await?; 544 } 545 546 let manager = Arc::new(manager); 547 548 let addr = format!("{}:{}", config.host, config.port); 549 let socket_addr: SocketAddr = addr.parse().context("Invalid address format")?; 550 551 // Create server config 552 let server_config = ServerConfig { 553 sync_mode: config.sync, 554 sync_interval_seconds: config.sync_interval.as_secs(), 555 enable_websocket: config.enable_websocket, 556 enable_resolver: config.enable_resolver, 557 version: env!("CARGO_PKG_VERSION").to_string(), 558 }; 559 560 // Create server 561 let server = Server::new(Arc::clone(&manager), server_config); 562 let app = server.router(); 563 564 // Create shutdown coordinator 565 let server_runtime = BundleRuntime::new(); 566 let mut background_tasks = JoinSet::new(); 567 let mut resolver_tasks = JoinSet::new(); 568 569 // Start sync loop if enabled 570 setup_sync_loop( 571 Arc::clone(&manager), 572 &config, 573 &server_runtime, 574 &mut background_tasks, 575 ); 576 577 // Start handle resolver keep-alive ping task if resolver is enabled 578 setup_resolver_ping_loop(&manager, &config, &server_runtime, &mut resolver_tasks); 579 580 // Run server with immediate shutdown 581 let listener = tokio::net::TcpListener::bind(socket_addr) 582 .await 583 .context("Failed to bind to address")?; 584 585 // Display server info with ASCII art banner after successful bind 586 display_server_info(&manager, &addr, &config); 587 eprintln!("\nPress Ctrl+C to stop\n"); 588 589 // Run the server - the shutdown future will complete when Ctrl+C is pressed 590 // or when a background task triggers shutdown 591 // This triggers graceful shutdown, which stops accepting new connections 592 // and waits for existing connections to finish (or timeout after 10 seconds) 593 axum::serve(listener, app) 594 .with_graceful_shutdown(server_runtime.create_shutdown_future()) 595 .await 596 .context("Server error")?; 597 598 // Use common shutdown cleanup handler 599 server_runtime 600 .wait_for_shutdown_cleanup( 601 "Server", 602 Some(&mut resolver_tasks), 603 Some(&mut background_tasks), 604 ) 605 .await; 606 607 Ok(()) 608} 609 610/// Display server startup information 611fn display_server_info(manager: &BundleManager, addr: &str, config: &StartupConfig) { 612 use crate::server::get_ascii_art_banner; 613 614 // Print ASCII art banner 615 eprintln!("{}", get_ascii_art_banner(env!("CARGO_PKG_VERSION"))); 616 eprintln!("{} HTTP server started", constants::BINARY_NAME); 617 eprintln!(" Directory: {}", manager.directory().display()); 618 eprintln!(" Listening: http://{}", addr); 619 620 if config.sync { 621 eprintln!(" Sync: ENABLED (daemon mode)"); 622 eprintln!(" PLC URL: {}", config.plc_url); 623 eprintln!(" Interval: {:?}", config.sync_interval); 624 if config.max_bundles > 0 { 625 eprintln!(" Max bundles: {}", config.max_bundles); 626 } 627 } else { 628 eprintln!(" Sync: disabled (read-only archive)"); 629 eprintln!(" Tip: Use --sync to enable live syncing"); 630 } 631 632 if config.enable_websocket { 633 eprintln!(" WebSocket: ENABLED (ws://{}/ws)", addr); 634 } else { 635 eprintln!(" WebSocket: disabled"); 636 } 637 638 if config.enable_resolver { 639 eprintln!(" DID Resolver: ENABLED (/:did endpoints)"); 640 if manager.get_handle_resolver_base_url().is_some() { 641 eprintln!(" Handle Resolver: ENABLED (handle->DID conversion)"); 642 eprintln!(" Keep-alive ping: every 2 minutes"); 643 } else { 644 eprintln!(" Handle Resolver: disabled (handle->DID conversion unavailable)"); 645 } 646 } else { 647 eprintln!(" DID Resolver: disabled"); 648 eprintln!(" Handle Resolver: disabled"); 649 } 650 651 let index = manager.get_index(); 652 eprintln!(" Bundles: {} available", index.bundles.len()); 653}