QuickDID is a high-performance AT Protocol identity resolution service written in Rust. It provides handle-to-DID resolution with Redis-backed caching and queue processing.
at main 745 lines 30 kB view raw
1use anyhow::Result; 2use atproto_identity::{ 3 config::{CertificateBundles, DnsNameservers}, 4 resolve::HickoryDnsResolver, 5}; 6use atproto_jetstream::{Consumer as JetstreamConsumer, ConsumerTaskConfig}; 7use atproto_lexicon::resolve::{DefaultLexiconResolver, LexiconResolver}; 8use quickdid::{ 9 cache::create_redis_pool, 10 config::Config, 11 handle_resolver::{ 12 create_base_resolver, create_caching_resolver, 13 create_proactive_refresh_resolver_with_metrics, create_rate_limited_resolver_with_timeout, 14 create_redis_resolver_with_ttl, create_sqlite_resolver_with_ttl, 15 }, 16 handle_resolver_task::{HandleResolverTaskConfig, create_handle_resolver_task_with_config}, 17 http::{AppContext, create_router}, 18 jetstream_handler::QuickDidEventHandler, 19 lexicon_resolver::create_redis_lexicon_resolver_with_ttl, 20 metrics::create_metrics_publisher, 21 queue::{ 22 HandleResolutionWork, QueueAdapter, create_mpsc_queue_from_channel, create_noop_queue, 23 create_redis_queue, create_redis_queue_with_dedup, create_sqlite_queue, 24 create_sqlite_queue_with_max_size, 25 }, 26 sqlite_schema::create_sqlite_pool, 27 task_manager::spawn_cancellable_task, 28}; 29use std::sync::Arc; 30use tokio::signal; 31use tokio_util::{sync::CancellationToken, task::TaskTracker}; 32use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; 33 34/// Helper function to create a Redis pool with consistent error handling 35fn try_create_redis_pool(redis_url: &str, purpose: &str) -> Option<deadpool_redis::Pool> { 36 match create_redis_pool(redis_url) { 37 Ok(pool) => { 38 tracing::info!("Redis pool created for {}", purpose); 39 Some(pool) 40 } 41 Err(e) => { 42 tracing::warn!("Failed to create Redis pool for {}: {}", purpose, e); 43 None 44 } 45 } 46} 47 48/// Helper function to create a SQLite pool with consistent error handling 49async fn try_create_sqlite_pool(sqlite_url: &str, purpose: &str) -> Option<sqlx::SqlitePool> { 50 match create_sqlite_pool(sqlite_url).await { 51 Ok(pool) => { 52 tracing::info!("SQLite pool created for {}", purpose); 53 Some(pool) 54 } 55 Err(e) => { 56 tracing::warn!("Failed to create SQLite pool for {}: {}", purpose, e); 57 None 58 } 59 } 60} 61 62/// Simple command-line argument handling for --version and --help 63fn handle_simple_args() -> bool { 64 let args: Vec<String> = std::env::args().collect(); 65 66 if args.len() > 1 { 67 match args[1].as_str() { 68 "--version" | "-V" => { 69 println!("quickdid {}", env!("CARGO_PKG_VERSION")); 70 return true; 71 } 72 "--help" | "-h" => { 73 println!("QuickDID - AT Protocol Identity Resolver Service"); 74 println!("Version: {}", env!("CARGO_PKG_VERSION")); 75 println!(); 76 println!("USAGE:"); 77 println!(" quickdid [OPTIONS]"); 78 println!(); 79 println!("OPTIONS:"); 80 println!(" -h, --help Print help information"); 81 println!(" -V, --version Print version information"); 82 println!(); 83 println!("ENVIRONMENT VARIABLES:"); 84 println!( 85 " HTTP_EXTERNAL External hostname for service endpoints (required)" 86 ); 87 println!(" HTTP_PORT HTTP server port (default: 8080)"); 88 println!(" PLC_HOSTNAME PLC directory hostname (default: plc.directory)"); 89 println!( 90 " USER_AGENT HTTP User-Agent header (auto-generated with version)" 91 ); 92 println!(" DNS_NAMESERVERS Custom DNS nameservers (comma-separated IPs)"); 93 println!( 94 " CERTIFICATE_BUNDLES Additional CA certificates (comma-separated paths)" 95 ); 96 println!(); 97 println!(" CACHING:"); 98 println!(" REDIS_URL Redis URL for handle resolution caching"); 99 println!( 100 " SQLITE_URL SQLite database URL for handle resolution caching" 101 ); 102 println!( 103 " CACHE_TTL_MEMORY TTL for in-memory cache in seconds (default: 600)" 104 ); 105 println!( 106 " CACHE_TTL_REDIS TTL for Redis cache in seconds (default: 7776000)" 107 ); 108 println!( 109 " CACHE_TTL_SQLITE TTL for SQLite cache in seconds (default: 7776000)" 110 ); 111 println!(); 112 println!(" QUEUE CONFIGURATION:"); 113 println!( 114 " QUEUE_ADAPTER Queue adapter: 'mpsc', 'redis', 'sqlite', 'noop' (default: mpsc)" 115 ); 116 println!(" QUEUE_REDIS_URL Redis URL for queue adapter"); 117 println!( 118 " QUEUE_REDIS_PREFIX Redis key prefix for queues (default: queue:handleresolver:)" 119 ); 120 println!(" QUEUE_REDIS_TIMEOUT Queue blocking timeout in seconds (default: 5)"); 121 println!( 122 " QUEUE_REDIS_DEDUP_ENABLED Enable queue deduplication (default: false)" 123 ); 124 println!(" QUEUE_REDIS_DEDUP_TTL TTL for dedup keys in seconds (default: 60)"); 125 println!(" QUEUE_WORKER_ID Worker ID for Redis queue (default: worker1)"); 126 println!(" QUEUE_BUFFER_SIZE Buffer size for MPSC queue (default: 1000)"); 127 println!(" QUEUE_SQLITE_MAX_SIZE Maximum SQLite queue size (default: 10000)"); 128 println!(); 129 println!(" RATE LIMITING:"); 130 println!( 131 " RESOLVER_MAX_CONCURRENT Maximum concurrent resolutions (default: 0 = disabled)" 132 ); 133 println!( 134 " RESOLVER_MAX_CONCURRENT_TIMEOUT_MS Timeout for acquiring permits in ms (default: 0 = no timeout)" 135 ); 136 println!(); 137 println!(" METRICS:"); 138 println!( 139 " METRICS_ADAPTER Metrics adapter: 'noop' or 'statsd' (default: noop)" 140 ); 141 println!( 142 " METRICS_STATSD_HOST StatsD host when using statsd adapter (e.g., localhost:8125)" 143 ); 144 println!( 145 " METRICS_STATSD_BIND Bind address for StatsD UDP socket (default: [::]:0)" 146 ); 147 println!(" METRICS_PREFIX Prefix for all metrics (default: quickdid)"); 148 println!( 149 " METRICS_TAGS Default tags for metrics (comma-separated key:value pairs)" 150 ); 151 println!(); 152 println!(" PROACTIVE CACHE REFRESH:"); 153 println!( 154 " PROACTIVE_REFRESH_ENABLED Enable proactive cache refresh (default: false)" 155 ); 156 println!( 157 " PROACTIVE_REFRESH_THRESHOLD Threshold as percentage of TTL (0.0-1.0, default: 0.8)" 158 ); 159 println!(); 160 println!(" JETSTREAM:"); 161 println!(" JETSTREAM_ENABLED Enable Jetstream consumer (default: false)"); 162 println!( 163 " JETSTREAM_HOSTNAME Jetstream hostname (default: jetstream.atproto.tools)" 164 ); 165 println!(); 166 println!( 167 "For more information, visit: https://github.com/smokesignal.events/quickdid" 168 ); 169 return true; 170 } 171 _ => {} 172 } 173 } 174 175 false 176} 177 178#[tokio::main] 179async fn main() -> Result<()> { 180 // Handle --version and --help 181 if handle_simple_args() { 182 return Ok(()); 183 } 184 185 // Initialize tracing 186 tracing_subscriber::registry() 187 .with( 188 tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| { 189 "quickdid=info,atproto_identity=debug,atproto_xrpcs=debug".into() 190 }), 191 ) 192 .with(tracing_subscriber::fmt::layer()) 193 .init(); 194 195 let config = Config::from_env()?; 196 197 // Validate configuration 198 config.validate()?; 199 200 tracing::info!("Starting QuickDID service on port {}", config.http_port); 201 tracing::info!( 202 "Cache TTL - Memory: {}s, Redis: {}s, SQLite: {}s", 203 config.cache_ttl_memory, 204 config.cache_ttl_redis, 205 config.cache_ttl_sqlite 206 ); 207 208 // Parse certificate bundles if provided 209 let certificate_bundles: CertificateBundles = config 210 .certificate_bundles 211 .clone() 212 .unwrap_or_default() 213 .try_into()?; 214 215 // Parse DNS nameservers if provided 216 let dns_nameservers: DnsNameservers = config 217 .dns_nameservers 218 .clone() 219 .unwrap_or_default() 220 .try_into()?; 221 222 // Build HTTP client 223 let mut client_builder = reqwest::Client::builder(); 224 for ca_certificate in certificate_bundles.as_ref() { 225 let cert = std::fs::read(ca_certificate)?; 226 let cert = reqwest::Certificate::from_pem(&cert)?; 227 client_builder = client_builder.add_root_certificate(cert); 228 } 229 client_builder = client_builder.user_agent(&config.user_agent); 230 let http_client = client_builder.build()?; 231 232 // Create DNS resolver 233 let dns_resolver = HickoryDnsResolver::create_resolver(dns_nameservers.as_ref()); 234 235 // Clone DNS resolver for lexicon resolution before wrapping in Arc 236 let lexicon_dns_resolver = dns_resolver.clone(); 237 238 // Wrap DNS resolver in Arc for handle resolution 239 let dns_resolver_arc = Arc::new(dns_resolver); 240 241 // Create metrics publisher based on configuration 242 let metrics_publisher = create_metrics_publisher(&config).map_err(|e| { 243 tracing::error!("Failed to create metrics publisher: {}", e); 244 anyhow::anyhow!("Failed to create metrics publisher: {}", e) 245 })?; 246 247 tracing::info!( 248 "Metrics publisher created with {} adapter", 249 config.metrics_adapter 250 ); 251 252 metrics_publisher.gauge("server", 1).await; 253 254 // Create base handle resolver using factory function 255 let mut base_handle_resolver = create_base_resolver( 256 dns_resolver_arc.clone(), 257 http_client.clone(), 258 metrics_publisher.clone(), 259 ); 260 261 // Apply rate limiting if configured 262 if config.resolver_max_concurrent > 0 { 263 let timeout_info = if config.resolver_max_concurrent_timeout_ms > 0 { 264 format!(", {}ms timeout", config.resolver_max_concurrent_timeout_ms) 265 } else { 266 String::new() 267 }; 268 tracing::info!( 269 "Applying rate limiting to handle resolver (max {} concurrent resolutions{})", 270 config.resolver_max_concurrent, 271 timeout_info 272 ); 273 base_handle_resolver = create_rate_limited_resolver_with_timeout( 274 base_handle_resolver, 275 config.resolver_max_concurrent, 276 config.resolver_max_concurrent_timeout_ms, 277 metrics_publisher.clone(), 278 ); 279 } 280 281 // Create Redis pool if configured 282 let redis_pool = config 283 .redis_url 284 .as_ref() 285 .and_then(|url| try_create_redis_pool(url, "handle resolver cache")); 286 287 // Create SQLite pool if configured 288 let sqlite_pool = if let Some(url) = config.sqlite_url.as_ref() { 289 try_create_sqlite_pool(url, "handle resolver cache").await 290 } else { 291 None 292 }; 293 294 // Create task tracker and cancellation token 295 let tracker = TaskTracker::new(); 296 let token = CancellationToken::new(); 297 298 // Create the queue adapter first (needed for proactive refresh) 299 let handle_queue: Arc<dyn QueueAdapter<HandleResolutionWork>> = { 300 // Create queue adapter based on configuration 301 let adapter: Arc<dyn QueueAdapter<HandleResolutionWork>> = match config 302 .queue_adapter 303 .as_str() 304 { 305 "redis" => { 306 // Use queue-specific Redis URL, fall back to general Redis URL 307 let queue_redis_url = config 308 .queue_redis_url 309 .as_ref() 310 .or(config.redis_url.as_ref()); 311 312 if let Some(url) = queue_redis_url { 313 if let Some(pool) = try_create_redis_pool(url, "queue adapter") { 314 tracing::info!( 315 "Creating Redis queue adapter with prefix: {}, dedup: {}, dedup_ttl: {}s", 316 config.queue_redis_prefix, 317 config.queue_redis_dedup_enabled, 318 config.queue_redis_dedup_ttl 319 ); 320 if config.queue_redis_dedup_enabled { 321 create_redis_queue_with_dedup::<HandleResolutionWork>( 322 pool, 323 config.queue_worker_id.clone(), 324 config.queue_redis_prefix.clone(), 325 config.queue_redis_timeout, 326 true, 327 config.queue_redis_dedup_ttl, 328 ) 329 } else { 330 create_redis_queue::<HandleResolutionWork>( 331 pool, 332 config.queue_worker_id.clone(), 333 config.queue_redis_prefix.clone(), 334 config.queue_redis_timeout, 335 ) 336 } 337 } else { 338 tracing::warn!("Falling back to MPSC queue adapter"); 339 // Fall back to MPSC if Redis fails 340 let (handle_sender, handle_receiver) = 341 tokio::sync::mpsc::channel::<HandleResolutionWork>( 342 config.queue_buffer_size, 343 ); 344 create_mpsc_queue_from_channel(handle_sender, handle_receiver) 345 } 346 } else { 347 tracing::warn!( 348 "Redis queue adapter requested but no Redis URL configured, using no-op adapter" 349 ); 350 create_noop_queue::<HandleResolutionWork>() 351 } 352 } 353 "sqlite" => { 354 // Use SQLite adapter 355 if let Some(url) = config.sqlite_url.as_ref() { 356 if let Some(pool) = try_create_sqlite_pool(url, "queue adapter").await { 357 if config.queue_sqlite_max_size > 0 { 358 tracing::info!( 359 "Creating SQLite queue adapter with work shedding (max_size: {})", 360 config.queue_sqlite_max_size 361 ); 362 create_sqlite_queue_with_max_size::<HandleResolutionWork>( 363 pool, 364 config.queue_sqlite_max_size, 365 ) 366 } else { 367 tracing::info!("Creating SQLite queue adapter (unlimited size)"); 368 create_sqlite_queue::<HandleResolutionWork>(pool) 369 } 370 } else { 371 tracing::warn!( 372 "Failed to create SQLite pool for queue, falling back to MPSC queue adapter" 373 ); 374 // Fall back to MPSC if SQLite fails 375 let (handle_sender, handle_receiver) = 376 tokio::sync::mpsc::channel::<HandleResolutionWork>( 377 config.queue_buffer_size, 378 ); 379 create_mpsc_queue_from_channel(handle_sender, handle_receiver) 380 } 381 } else { 382 tracing::warn!( 383 "SQLite queue adapter requested but no SQLite URL configured, using no-op adapter" 384 ); 385 create_noop_queue::<HandleResolutionWork>() 386 } 387 } 388 "mpsc" => { 389 // Use MPSC adapter 390 tracing::info!( 391 "Using MPSC queue adapter with buffer size: {}", 392 config.queue_buffer_size 393 ); 394 let (handle_sender, handle_receiver) = 395 tokio::sync::mpsc::channel::<HandleResolutionWork>(config.queue_buffer_size); 396 create_mpsc_queue_from_channel(handle_sender, handle_receiver) 397 } 398 "noop" | "none" => { 399 // Use no-op adapter 400 tracing::info!("Using no-op queue adapter (queuing disabled)"); 401 create_noop_queue::<HandleResolutionWork>() 402 } 403 _ => { 404 // Default to no-op adapter for unknown types 405 tracing::warn!( 406 "Unknown queue adapter type '{}', using no-op adapter", 407 config.queue_adapter 408 ); 409 create_noop_queue::<HandleResolutionWork>() 410 } 411 }; 412 413 adapter 414 }; 415 416 // Create handle resolver with cache priority: Redis > SQLite > In-memory 417 let (mut handle_resolver, cache_ttl): ( 418 Arc<dyn quickdid::handle_resolver::HandleResolver>, 419 u64, 420 ) = if let Some(ref pool) = redis_pool { 421 tracing::info!( 422 "Using Redis-backed handle resolver with {}-second cache TTL", 423 config.cache_ttl_redis 424 ); 425 ( 426 create_redis_resolver_with_ttl( 427 base_handle_resolver, 428 pool.clone(), 429 config.cache_ttl_redis, 430 metrics_publisher.clone(), 431 ), 432 config.cache_ttl_redis, 433 ) 434 } else if let Some(pool) = sqlite_pool { 435 tracing::info!( 436 "Using SQLite-backed handle resolver with {}-second cache TTL", 437 config.cache_ttl_sqlite 438 ); 439 ( 440 create_sqlite_resolver_with_ttl( 441 base_handle_resolver, 442 pool, 443 config.cache_ttl_sqlite, 444 metrics_publisher.clone(), 445 ), 446 config.cache_ttl_sqlite, 447 ) 448 } else { 449 tracing::info!( 450 "Using in-memory handle resolver with {}-second cache TTL", 451 config.cache_ttl_memory 452 ); 453 ( 454 create_caching_resolver( 455 base_handle_resolver, 456 config.cache_ttl_memory, 457 metrics_publisher.clone(), 458 ), 459 config.cache_ttl_memory, 460 ) 461 }; 462 463 // Apply proactive refresh if enabled 464 if config.proactive_refresh_enabled && !matches!(config.queue_adapter.as_str(), "noop" | "none") 465 { 466 tracing::info!( 467 "Enabling proactive cache refresh with {}% threshold", 468 (config.proactive_refresh_threshold * 100.0) as u32 469 ); 470 handle_resolver = create_proactive_refresh_resolver_with_metrics( 471 handle_resolver, 472 handle_queue.clone(), 473 metrics_publisher.clone(), 474 cache_ttl, 475 config.proactive_refresh_threshold, 476 ); 477 } else if config.proactive_refresh_enabled { 478 tracing::warn!( 479 "Proactive refresh enabled but queue adapter is no-op, skipping proactive refresh" 480 ); 481 } 482 483 // Create lexicon resolver with Redis caching if available 484 let lexicon_resolver: Arc<dyn LexiconResolver> = { 485 let base_lexicon_resolver: Arc<dyn LexiconResolver> = Arc::new( 486 DefaultLexiconResolver::new(http_client.clone(), lexicon_dns_resolver), 487 ); 488 489 if let Some(ref pool) = redis_pool { 490 tracing::info!( 491 "Using Redis-backed lexicon resolver with {}-second cache TTL", 492 config.cache_ttl_redis 493 ); 494 create_redis_lexicon_resolver_with_ttl( 495 base_lexicon_resolver, 496 pool.clone(), 497 config.cache_ttl_redis, 498 metrics_publisher.clone(), 499 ) 500 } else { 501 tracing::info!("Using base lexicon resolver without caching"); 502 base_lexicon_resolver 503 } 504 }; 505 506 // Setup background handle resolution task 507 { 508 let adapter_for_task = handle_queue.clone(); 509 510 // Only spawn handle resolver task if not using noop adapter 511 if !matches!(config.queue_adapter.as_str(), "noop" | "none") { 512 // Create handle resolver task configuration 513 let handle_task_config = HandleResolverTaskConfig { 514 default_timeout_ms: 10000, 515 }; 516 517 // Create and start handle resolver task 518 let handle_task = create_handle_resolver_task_with_config( 519 adapter_for_task, 520 handle_resolver.clone(), 521 token.clone(), 522 handle_task_config, 523 metrics_publisher.clone(), 524 ); 525 526 // Spawn the handle resolver task 527 spawn_cancellable_task( 528 &tracker, 529 token.clone(), 530 "handle_resolver", 531 |cancel_token| async move { 532 tokio::select! { 533 result = handle_task.run() => { 534 if let Err(e) = result { 535 tracing::error!(error = ?e, "Handle resolver task failed"); 536 Err(anyhow::anyhow!(e)) 537 } else { 538 Ok(()) 539 } 540 } 541 _ = cancel_token.cancelled() => { 542 tracing::info!("Handle resolver task cancelled"); 543 Ok(()) 544 } 545 } 546 }, 547 ); 548 549 tracing::info!( 550 "Background handle resolution task started with {} adapter", 551 config.queue_adapter 552 ); 553 } else { 554 tracing::info!("Background handle resolution task disabled (using no-op adapter)"); 555 } 556 }; 557 558 // Create app context with the queue adapter 559 let app_context = AppContext::new( 560 handle_resolver.clone(), 561 handle_queue, 562 lexicon_resolver, 563 metrics_publisher.clone(), 564 config.etag_seed.clone(), 565 config.cache_control_header.clone(), 566 config.static_files_dir.clone(), 567 ); 568 569 // Create router 570 let router = create_router(app_context); 571 572 // Setup signal handler 573 { 574 let signal_tracker = tracker.clone(); 575 let signal_token = token.clone(); 576 577 // Spawn signal handler without using the managed task helper since it's special 578 tracing::info!("Starting signal handler task"); 579 tokio::spawn(async move { 580 let ctrl_c = async { 581 signal::ctrl_c() 582 .await 583 .expect("failed to install Ctrl+C handler"); 584 }; 585 586 #[cfg(unix)] 587 let terminate = async { 588 signal::unix::signal(signal::unix::SignalKind::terminate()) 589 .expect("failed to install signal handler") 590 .recv() 591 .await; 592 }; 593 594 #[cfg(not(unix))] 595 let terminate = std::future::pending::<()>(); 596 597 tokio::select! { 598 () = signal_token.cancelled() => { 599 tracing::info!("Signal handler task shutting down gracefully"); 600 }, 601 _ = terminate => { 602 tracing::info!("Received SIGTERM signal, initiating shutdown"); 603 }, 604 _ = ctrl_c => { 605 tracing::info!("Received Ctrl+C signal, initiating shutdown"); 606 }, 607 } 608 609 signal_tracker.close(); 610 signal_token.cancel(); 611 tracing::info!("Signal handler task completed"); 612 }); 613 } 614 615 // Start Jetstream consumer if enabled 616 if config.jetstream_enabled { 617 let jetstream_resolver = handle_resolver.clone(); 618 let jetstream_metrics = metrics_publisher.clone(); 619 let jetstream_hostname = config.jetstream_hostname.clone(); 620 let jetstream_user_agent = config.user_agent.clone(); 621 622 spawn_cancellable_task( 623 &tracker, 624 token.clone(), 625 "jetstream_consumer", 626 move |cancel_token| async move { 627 tracing::info!(hostname = %jetstream_hostname, "Starting Jetstream consumer"); 628 629 // Create event handler 630 let event_handler = Arc::new(QuickDidEventHandler::new( 631 jetstream_resolver, 632 jetstream_metrics.clone(), 633 )); 634 635 // Reconnection loop 636 let mut reconnect_count = 0u32; 637 let max_reconnects_per_minute = 5; 638 let reconnect_window = std::time::Duration::from_secs(60); 639 let mut last_disconnect = std::time::Instant::now() - reconnect_window; 640 641 while !cancel_token.is_cancelled() { 642 let now = std::time::Instant::now(); 643 if now.duration_since(last_disconnect) < reconnect_window { 644 reconnect_count += 1; 645 if reconnect_count > max_reconnects_per_minute { 646 tracing::warn!( 647 count = reconnect_count, 648 "Too many Jetstream reconnects, waiting 60 seconds" 649 ); 650 tokio::time::sleep(reconnect_window).await; 651 reconnect_count = 0; 652 last_disconnect = now; 653 continue; 654 } 655 } else { 656 reconnect_count = 0; 657 } 658 659 // Create consumer configuration 660 let consumer_config = ConsumerTaskConfig { 661 user_agent: jetstream_user_agent.clone(), 662 compression: false, 663 zstd_dictionary_location: String::new(), 664 jetstream_hostname: jetstream_hostname.clone(), 665 // Listen to the "community.lexicon.collection.fake" collection 666 // so that we keep an active connection open but only for 667 // account and identity events. 668 collections: vec!["community.lexicon.collection.fake".to_string()], // Listen to all collections 669 dids: vec![], 670 max_message_size_bytes: None, 671 cursor: None, 672 require_hello: true, 673 }; 674 675 let consumer = JetstreamConsumer::new(consumer_config); 676 677 // Register event handler 678 if let Err(e) = consumer.register_handler(event_handler.clone()).await { 679 tracing::error!(error = ?e, "Failed to register Jetstream event handler"); 680 continue; 681 } 682 683 // Run consumer with cancellation support 684 match consumer.run_background(cancel_token.clone()).await { 685 Ok(()) => { 686 tracing::info!("Jetstream consumer stopped normally"); 687 if cancel_token.is_cancelled() { 688 break; 689 } 690 last_disconnect = std::time::Instant::now(); 691 tokio::time::sleep(std::time::Duration::from_secs(5)).await; 692 } 693 Err(e) => { 694 tracing::error!(error = ?e, "Jetstream consumer connection failed, will reconnect"); 695 jetstream_metrics.incr("jetstream.connection.error").await; 696 last_disconnect = std::time::Instant::now(); 697 698 if !cancel_token.is_cancelled() { 699 tokio::time::sleep(std::time::Duration::from_secs(5)).await; 700 } 701 } 702 } 703 } 704 705 tracing::info!("Jetstream consumer task shutting down"); 706 Ok(()) 707 }, 708 ); 709 } else { 710 tracing::info!("Jetstream consumer disabled"); 711 } 712 713 // Start HTTP server with cancellation support 714 let bind_address = format!("0.0.0.0:{}", config.http_port); 715 spawn_cancellable_task( 716 &tracker, 717 token.clone(), 718 "http", 719 move |cancel_token| async move { 720 let listener = tokio::net::TcpListener::bind(&bind_address) 721 .await 722 .map_err(|e| anyhow::anyhow!("Failed to bind to {}: {}", bind_address, e))?; 723 724 tracing::info!("QuickDID service listening on {}", bind_address); 725 726 let shutdown_token = cancel_token.clone(); 727 axum::serve(listener, router) 728 .with_graceful_shutdown(async move { 729 shutdown_token.cancelled().await; 730 }) 731 .await 732 .map_err(|e| anyhow::anyhow!("HTTP server error: {}", e))?; 733 734 Ok(()) 735 }, 736 ); 737 738 // Wait for all tasks to complete 739 tracing::info!("Waiting for all tasks to complete..."); 740 tracker.wait().await; 741 742 tracing::info!("All tasks completed, application shutting down"); 743 744 Ok(()) 745}