forked from
atscan.net/plcbundle-rs
High-performance implementation of plcbundle written in Rust
1// Server startup and initialization logic
2// This module handles all the complex setup logic for starting the server,
3// including manager initialization, DID index building, handle resolver setup, etc.
4
5use crate::constants;
6use crate::manager::BundleManager;
7use crate::runtime::BundleRuntime;
8use crate::server::{Server, ServerConfig};
9use anyhow::{Context, Result};
10use std::path::PathBuf;
11use std::sync::Arc;
12use tokio::task::JoinSet;
13use tokio::time::Duration;
14
15/// Configuration for server startup
16pub struct StartupConfig {
17 pub dir: PathBuf,
18 pub sync: bool,
19 pub plc_url: String,
20 pub handle_resolver_url: Option<String>,
21 pub enable_resolver: bool,
22 pub verbose: bool,
23 pub host: String,
24 pub port: u16,
25 pub sync_interval: Duration,
26 pub max_bundles: u32,
27 pub enable_websocket: bool,
28 pub fetch_log: bool,
29}
30
31/// Progress callback for DID index building
32/// Arguments: (current_bundle, total_bundles, bytes_processed, total_bytes)
33pub type ProgressCallback = Box<dyn Fn(u32, u32, u64, u64) + Send + Sync>;
34
35/// Finish function for progress tracking (e.g., to finish a progress bar)
36pub type ProgressFinish = Box<dyn FnOnce() + Send + Sync>;
37
38/// Initialize and configure the BundleManager based on startup config
39pub fn initialize_manager(config: &StartupConfig) -> Result<BundleManager> {
40 let handle_resolver_url = if config.handle_resolver_url.is_none() {
41 if config.verbose {
42 log::debug!(
43 "[HandleResolver] Using default handle resolver: {}",
44 constants::DEFAULT_HANDLE_RESOLVER_URL
45 );
46 }
47 Some(constants::DEFAULT_HANDLE_RESOLVER_URL.to_string())
48 } else {
49 if config.verbose {
50 log::debug!(
51 "[HandleResolver] Using custom handle resolver: {}",
52 config.handle_resolver_url.as_ref().unwrap()
53 );
54 }
55 config.handle_resolver_url.clone()
56 };
57
58 // Preload mempool for server use (faster responses)
59 let preload_mempool = true;
60
61 let options = crate::ManagerOptions {
62 handle_resolver_url: handle_resolver_url.clone(),
63 preload_mempool,
64 verbose: config.verbose,
65 };
66
67 let manager = if config.sync {
68 // Sync mode can auto-init
69 match BundleManager::new(config.dir.clone(), options.clone()) {
70 Ok(mgr) => mgr,
71 Err(_) => {
72 // Repository doesn't exist, try to initialize
73 BundleManager::init_repository(&config.dir, config.plc_url.clone(), false)
74 .context("Failed to initialize repository")?;
75 // Try again after initialization
76 BundleManager::new(config.dir.clone(), options)
77 .context("Failed to open repository after initialization")?
78 }
79 }
80 } else {
81 // Read-only mode cannot auto-init
82 BundleManager::new(config.dir.clone(), options).context(format!(
83 "Repository not found. Use '{} init' first or run with --sync",
84 constants::BINARY_NAME
85 ))?
86 };
87
88 // Log handle resolver configuration
89 if config.verbose {
90 if let Some(url) = manager.get_handle_resolver_base_url() {
91 log::debug!(
92 "[HandleResolver] External handle resolver configured: {}",
93 url
94 );
95 } else {
96 log::debug!("[HandleResolver] External handle resolver not configured");
97 }
98 }
99
100 Ok(manager)
101}
102
103/// Build or verify DID index if resolver is enabled
104///
105/// `progress_callback_factory` is an optional function that creates a progress callback
106/// and finish function given the total bundle count and total bytes. This allows the caller
107/// to create progress tracking (e.g., a progress bar) after knowing the bundle count.
108pub fn setup_did_index<F>(
109 manager: &mut BundleManager,
110 enable_resolver: bool,
111 verbose: bool,
112 progress_callback_factory: Option<F>,
113) -> Result<()>
114where
115 F: FnOnce(u32, u64) -> (ProgressCallback, Option<ProgressFinish>),
116{
117 if !enable_resolver {
118 if verbose {
119 log::debug!("[DIDResolver] DID resolver disabled, skipping DID index check");
120 }
121 return Ok(());
122 }
123
124 if verbose {
125 log::debug!("[DIDResolver] Checking DID index status...");
126 }
127
128 let did_index_stats = manager.get_did_index_stats();
129 let total_dids = did_index_stats
130 .get("total_dids")
131 .and_then(|v| v.as_i64())
132 .unwrap_or(0);
133 let total_entries = did_index_stats
134 .get("total_entries")
135 .and_then(|v| v.as_i64())
136 .unwrap_or(0);
137
138 if verbose {
139 log::debug!(
140 "[DIDResolver] DID index stats: total_dids={}, total_entries={}",
141 total_dids,
142 total_entries
143 );
144 }
145
146 if total_dids == 0 {
147 if verbose {
148 log::debug!("[DIDResolver] DID index is empty or missing");
149 }
150
151 let last_bundle = manager.get_last_bundle();
152 if verbose {
153 log::debug!("[DIDResolver] Last bundle number: {}", last_bundle);
154 }
155
156 // Check if repository is empty
157 let index = manager.get_index();
158 if index.bundles.is_empty() {
159 eprintln!("⚠️ No bundles to index. DID resolution will not be available.");
160 eprintln!(
161 " Sync bundles first with '{} sync' or '{} server --sync'",
162 constants::BINARY_NAME,
163 constants::BINARY_NAME
164 );
165 if verbose {
166 log::debug!("[DIDResolver] Skipping index build - no bundles available");
167 }
168 return Ok(());
169 }
170
171 eprintln!("Building DID index...");
172 eprintln!("Indexing {} bundles\n", last_bundle);
173
174 if verbose {
175 log::debug!(
176 "[DIDResolver] Starting index rebuild for {} bundles",
177 last_bundle
178 );
179 }
180
181 let start_time = std::time::Instant::now();
182
183 // Calculate total uncompressed size for progress tracking
184 let bundle_numbers: Vec<u32> = (1..=last_bundle).collect();
185 let total_uncompressed_size = index.total_uncompressed_size_for_bundles(&bundle_numbers);
186
187 // Build index with progress callback (create it now that we know the totals)
188 let (callback, finish_fn) = if let Some(factory) = progress_callback_factory {
189 let (cb, finish) = factory(last_bundle, total_uncompressed_size);
190 let callback =
191 move |current: u32, total: u32, bytes_processed: u64, total_bytes: u64| {
192 cb(current, total, bytes_processed, total_bytes);
193 };
194 (Some(callback), finish)
195 } else {
196 (None, None)
197 };
198
199 manager.build_did_index(constants::DID_INDEX_FLUSH_INTERVAL, callback, None, None)?;
200
201 // Finish progress tracking if provided
202 if let Some(finish) = finish_fn {
203 finish();
204 }
205
206 let elapsed = start_time.elapsed();
207
208 let stats = manager.get_did_index_stats();
209 let final_total_dids = stats
210 .get("total_dids")
211 .and_then(|v| v.as_i64())
212 .unwrap_or(0);
213 let final_total_entries = stats
214 .get("total_entries")
215 .and_then(|v| v.as_i64())
216 .unwrap_or(0);
217
218 eprintln!("✓ DID index built");
219 eprintln!(" Total DIDs: {}\n", final_total_dids);
220
221 if verbose {
222 log::debug!("[DIDResolver] Index build completed in {:?}", elapsed);
223 log::debug!(
224 "[DIDResolver] Final stats: total_dids={}, total_entries={}",
225 final_total_dids,
226 final_total_entries
227 );
228 }
229
230 // Verify index integrity after building
231 if verbose {
232 log::debug!("[DIDResolver] Verifying DID index integrity...");
233 }
234 let verify_result =
235 manager.verify_did_index::<fn(u32, u32, u64, u64)>(false, 64, false, None)?;
236 if verify_result.missing_base_shards > 0 {
237 anyhow::bail!(
238 "DID index is corrupted: {} missing base shard(s). Run '{} index repair' to fix.",
239 verify_result.missing_base_shards,
240 crate::constants::BINARY_NAME
241 );
242 }
243 if verify_result.missing_delta_segments > 0 {
244 anyhow::bail!(
245 "DID index is corrupted: {} missing delta segment(s). Run '{} index repair' to fix.",
246 verify_result.missing_delta_segments,
247 crate::constants::BINARY_NAME
248 );
249 }
250 if verbose {
251 log::debug!("[DIDResolver] DID index integrity check passed");
252 }
253 } else {
254 if verbose {
255 log::debug!(
256 "[DIDResolver] DID index already exists with {} DIDs",
257 total_dids
258 );
259 }
260
261 // Verify index integrity before starting server
262 if verbose {
263 log::debug!("[DIDResolver] Verifying DID index integrity...");
264 }
265 let verify_result =
266 manager.verify_did_index::<fn(u32, u32, u64, u64)>(false, 64, false, None)?;
267 if verify_result.missing_base_shards > 0 {
268 anyhow::bail!(
269 "DID index is corrupted: {} missing base shard(s). Run '{} index repair' to fix.",
270 verify_result.missing_base_shards,
271 crate::constants::BINARY_NAME
272 );
273 }
274 if verify_result.missing_delta_segments > 0 {
275 anyhow::bail!(
276 "DID index is corrupted: {} missing delta segment(s). Run '{} index repair' to fix.",
277 verify_result.missing_delta_segments,
278 crate::constants::BINARY_NAME
279 );
280 }
281 if verbose {
282 log::debug!("[DIDResolver] DID index integrity check passed");
283 }
284 }
285
286 Ok(())
287}
288
289/// Test handle resolver on startup
290pub async fn test_handle_resolver(manager: &BundleManager, verbose: bool) -> Result<()> {
291 if let Some(handle_resolver) = manager.get_handle_resolver() {
292 if verbose {
293 log::debug!(
294 "[HandleResolver] Testing external handle resolver with handle resolution..."
295 );
296 }
297 eprintln!("Testing handle resolver...");
298
299 // Use a well-known handle that should always resolve
300 let test_handle = "bsky.app";
301 let start = std::time::Instant::now();
302 match handle_resolver.resolve_handle(test_handle).await {
303 Ok(did) => {
304 let duration = start.elapsed();
305 eprintln!(
306 "✓ Handle resolver test successful ({:.3}s)",
307 duration.as_secs_f64()
308 );
309 if verbose {
310 log::debug!(
311 "[HandleResolver] Test resolution of '{}' -> '{}' successful in {:.3}s",
312 test_handle,
313 did,
314 duration.as_secs_f64()
315 );
316 }
317 }
318 Err(e) => {
319 let duration = start.elapsed();
320 eprintln!(
321 "⚠️ Handle resolver test failed ({:.3}s): {}",
322 duration.as_secs_f64(),
323 e
324 );
325 if verbose {
326 log::warn!(
327 "[HandleResolver] Test resolution of '{}' failed after {:.3}s: {}",
328 test_handle,
329 duration.as_secs_f64(),
330 e
331 );
332 }
333 eprintln!(" Handle resolution endpoints may not work correctly.");
334 }
335 }
336 eprintln!();
337 }
338 Ok(())
339}
340
341/// Start the resolver ping loop to keep HTTP/2 connections alive
342pub async fn run_resolver_ping_loop(
343 resolver: Arc<crate::handle_resolver::HandleResolver>,
344 verbose: bool,
345 mut shutdown_rx: tokio::sync::watch::Receiver<bool>,
346) {
347 use tokio::time::{Duration, Instant, sleep};
348
349 // Ping every 2 minutes to keep HTTP/2 connections alive
350 let ping_interval = Duration::from_secs(120);
351
352 if verbose {
353 log::debug!(
354 "[HandleResolver] Starting keep-alive ping loop (interval: {:?})",
355 ping_interval
356 );
357 log::debug!(
358 "[HandleResolver] Handle resolver URL: {}",
359 resolver.get_base_url()
360 );
361 }
362
363 // Initial delay before first ping
364 if verbose {
365 log::debug!("[HandleResolver] Waiting 30s before first ping...");
366 }
367 sleep(Duration::from_secs(30)).await;
368
369 let mut ping_count = 0u64;
370 let mut success_count = 0u64;
371 let mut failure_count = 0u64;
372
373 loop {
374 // Check for shutdown
375 if *shutdown_rx.borrow() {
376 if verbose {
377 log::debug!("[HandleResolver] Shutdown requested, stopping ping loop");
378 }
379 break;
380 }
381
382 ping_count += 1;
383 let start = Instant::now();
384
385 if verbose {
386 log::debug!(
387 "[HandleResolver] Ping #{}: sending keep-alive request...",
388 ping_count
389 );
390 }
391
392 match resolver.ping().await {
393 Ok(_) => {
394 success_count += 1;
395 let duration = start.elapsed();
396 if verbose {
397 log::info!(
398 "[HandleResolver] Ping #{} successful in {:.3}s (success: {}/{}, failures: {})",
399 ping_count,
400 duration.as_secs_f64(),
401 success_count,
402 ping_count,
403 failure_count
404 );
405 }
406 }
407 Err(e) => {
408 failure_count += 1;
409 let duration = start.elapsed();
410 if verbose {
411 log::warn!(
412 "[HandleResolver] Ping #{} failed after {:.3}s: {} (success: {}/{}, failures: {})",
413 ping_count,
414 duration.as_secs_f64(),
415 e,
416 success_count,
417 ping_count,
418 failure_count
419 );
420 }
421 // Continue anyway - the connection will be re-established on next actual request
422 }
423 }
424
425 if verbose {
426 log::debug!("[HandleResolver] Next ping in {:?}...", ping_interval);
427 }
428
429 // Use select to allow cancellation during sleep
430 tokio::select! {
431 _ = sleep(ping_interval) => {}
432 _ = shutdown_rx.changed() => {
433 if *shutdown_rx.borrow() {
434 break;
435 }
436 }
437 }
438 }
439}
440
441/// Setup sync loop if sync mode is enabled
442pub fn setup_sync_loop(
443 manager: Arc<BundleManager>,
444 config: &StartupConfig,
445 server_runtime: &BundleRuntime,
446 background_tasks: &mut JoinSet<()>,
447) {
448 if !config.sync {
449 return;
450 }
451
452 let manager_clone = Arc::clone(&manager);
453 let plc_url = config.plc_url.clone();
454 let interval = config.sync_interval;
455 let max_bundles = config.max_bundles;
456 let verbose = config.verbose;
457 let fetch_log = config.fetch_log;
458 let shutdown_signal = server_runtime.shutdown_signal();
459 let sync_runtime = server_runtime.clone();
460
461 background_tasks.spawn(async move {
462 use crate::plc_client::PLCClient;
463 use crate::sync::{SyncConfig, SyncManager};
464
465 // Create PLC client
466 let client = match PLCClient::new(&plc_url) {
467 Ok(c) => c,
468 Err(e) => {
469 eprintln!("[Sync] Failed to create PLC client: {}", e);
470 return;
471 }
472 };
473
474 let sync_config = SyncConfig {
475 plc_url: plc_url.clone(),
476 continuous: true,
477 interval,
478 max_bundles: max_bundles as usize,
479 verbose,
480 shutdown_rx: Some(shutdown_signal),
481 shutdown_tx: Some(sync_runtime.shutdown_sender()),
482 fetch_log,
483 safety_lag: Duration::from_millis(constants::DEFAULT_SAFETY_LAG_MS),
484 };
485
486 use crate::sync::SyncLoggerImpl;
487 let logger = SyncLoggerImpl::new_server(verbose, interval);
488 let sync_manager = SyncManager::new(manager_clone, client, sync_config).with_logger(logger);
489
490 if let Err(e) = sync_manager.run_continuous().await {
491 eprintln!("[Sync] Sync loop error: {}", e);
492 // Trigger fatal shutdown on sync errors
493 sync_runtime.trigger_fatal_shutdown();
494 }
495 });
496}
497
498/// Setup resolver ping loop if resolver is enabled
499pub fn setup_resolver_ping_loop(
500 manager: &BundleManager,
501 config: &StartupConfig,
502 server_runtime: &BundleRuntime,
503 resolver_tasks: &mut JoinSet<()>,
504) {
505 if !config.enable_resolver {
506 return;
507 }
508
509 if let Some(handle_resolver) = manager.get_handle_resolver() {
510 let verbose = config.verbose;
511 let shutdown_signal = server_runtime.shutdown_signal();
512 resolver_tasks.spawn(async move {
513 run_resolver_ping_loop(handle_resolver, verbose, shutdown_signal).await;
514 });
515 }
516}
517
518/// Progress callback factory type
519/// Takes (last_bundle, total_bytes) and returns (progress_callback, finish_function)
520pub type ProgressCallbackFactory =
521 Box<dyn FnOnce(u32, u64) -> (ProgressCallback, Option<ProgressFinish>) + Send + Sync>;
522
523/// Main server startup function that orchestrates all initialization
524pub async fn start_server(
525 config: StartupConfig,
526 progress_callback_factory: Option<ProgressCallbackFactory>,
527) -> Result<()> {
528 use std::net::SocketAddr;
529
530 // Initialize manager (with mempool preload for server use)
531 let mut manager = initialize_manager(&config)?;
532
533 // Setup DID index if resolver is enabled
534 setup_did_index(
535 &mut manager,
536 config.enable_resolver,
537 config.verbose,
538 progress_callback_factory,
539 )?;
540
541 // Test handle resolver on startup if resolver is enabled
542 if config.enable_resolver {
543 test_handle_resolver(&manager, config.verbose).await?;
544 }
545
546 let manager = Arc::new(manager);
547
548 let addr = format!("{}:{}", config.host, config.port);
549 let socket_addr: SocketAddr = addr.parse().context("Invalid address format")?;
550
551 // Create server config
552 let server_config = ServerConfig {
553 sync_mode: config.sync,
554 sync_interval_seconds: config.sync_interval.as_secs(),
555 enable_websocket: config.enable_websocket,
556 enable_resolver: config.enable_resolver,
557 version: env!("CARGO_PKG_VERSION").to_string(),
558 };
559
560 // Create server
561 let server = Server::new(Arc::clone(&manager), server_config);
562 let app = server.router();
563
564 // Create shutdown coordinator
565 let server_runtime = BundleRuntime::new();
566 let mut background_tasks = JoinSet::new();
567 let mut resolver_tasks = JoinSet::new();
568
569 // Start sync loop if enabled
570 setup_sync_loop(
571 Arc::clone(&manager),
572 &config,
573 &server_runtime,
574 &mut background_tasks,
575 );
576
577 // Start handle resolver keep-alive ping task if resolver is enabled
578 setup_resolver_ping_loop(&manager, &config, &server_runtime, &mut resolver_tasks);
579
580 // Run server with immediate shutdown
581 let listener = tokio::net::TcpListener::bind(socket_addr)
582 .await
583 .context("Failed to bind to address")?;
584
585 // Display server info with ASCII art banner after successful bind
586 display_server_info(&manager, &addr, &config);
587 eprintln!("\nPress Ctrl+C to stop\n");
588
589 // Run the server - the shutdown future will complete when Ctrl+C is pressed
590 // or when a background task triggers shutdown
591 // This triggers graceful shutdown, which stops accepting new connections
592 // and waits for existing connections to finish (or timeout after 10 seconds)
593 axum::serve(listener, app)
594 .with_graceful_shutdown(server_runtime.create_shutdown_future())
595 .await
596 .context("Server error")?;
597
598 // Use common shutdown cleanup handler
599 server_runtime
600 .wait_for_shutdown_cleanup(
601 "Server",
602 Some(&mut resolver_tasks),
603 Some(&mut background_tasks),
604 )
605 .await;
606
607 Ok(())
608}
609
610/// Display server startup information
611fn display_server_info(manager: &BundleManager, addr: &str, config: &StartupConfig) {
612 use crate::server::get_ascii_art_banner;
613
614 // Print ASCII art banner
615 eprintln!("{}", get_ascii_art_banner(env!("CARGO_PKG_VERSION")));
616 eprintln!("{} HTTP server started", constants::BINARY_NAME);
617 eprintln!(" Directory: {}", manager.directory().display());
618 eprintln!(" Listening: http://{}", addr);
619
620 if config.sync {
621 eprintln!(" Sync: ENABLED (daemon mode)");
622 eprintln!(" PLC URL: {}", config.plc_url);
623 eprintln!(" Interval: {:?}", config.sync_interval);
624 if config.max_bundles > 0 {
625 eprintln!(" Max bundles: {}", config.max_bundles);
626 }
627 } else {
628 eprintln!(" Sync: disabled (read-only archive)");
629 eprintln!(" Tip: Use --sync to enable live syncing");
630 }
631
632 if config.enable_websocket {
633 eprintln!(" WebSocket: ENABLED (ws://{}/ws)", addr);
634 } else {
635 eprintln!(" WebSocket: disabled");
636 }
637
638 if config.enable_resolver {
639 eprintln!(" DID Resolver: ENABLED (/:did endpoints)");
640 if manager.get_handle_resolver_base_url().is_some() {
641 eprintln!(" Handle Resolver: ENABLED (handle->DID conversion)");
642 eprintln!(" Keep-alive ping: every 2 minutes");
643 } else {
644 eprintln!(" Handle Resolver: disabled (handle->DID conversion unavailable)");
645 }
646 } else {
647 eprintln!(" DID Resolver: disabled");
648 eprintln!(" Handle Resolver: disabled");
649 }
650
651 let index = manager.get_index();
652 eprintln!(" Bundles: {} available", index.bundles.len());
653}