QuickDID is a high-performance AT Protocol identity resolution service written in Rust. It provides handle-to-DID resolution with Redis-backed caching and queue processing.
1use anyhow::Result;
2use atproto_identity::{
3 config::{CertificateBundles, DnsNameservers},
4 resolve::HickoryDnsResolver,
5};
6use atproto_jetstream::{Consumer as JetstreamConsumer, ConsumerTaskConfig};
7use atproto_lexicon::resolve::{DefaultLexiconResolver, LexiconResolver};
8use quickdid::{
9 cache::create_redis_pool,
10 config::Config,
11 handle_resolver::{
12 create_base_resolver, create_caching_resolver,
13 create_proactive_refresh_resolver_with_metrics, create_rate_limited_resolver_with_timeout,
14 create_redis_resolver_with_ttl, create_sqlite_resolver_with_ttl,
15 },
16 handle_resolver_task::{HandleResolverTaskConfig, create_handle_resolver_task_with_config},
17 http::{AppContext, create_router},
18 jetstream_handler::QuickDidEventHandler,
19 lexicon_resolver::create_redis_lexicon_resolver_with_ttl,
20 metrics::create_metrics_publisher,
21 queue::{
22 HandleResolutionWork, QueueAdapter, create_mpsc_queue_from_channel, create_noop_queue,
23 create_redis_queue, create_redis_queue_with_dedup, create_sqlite_queue,
24 create_sqlite_queue_with_max_size,
25 },
26 sqlite_schema::create_sqlite_pool,
27 task_manager::spawn_cancellable_task,
28};
29use std::sync::Arc;
30use tokio::signal;
31use tokio_util::{sync::CancellationToken, task::TaskTracker};
32use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
33
34/// Helper function to create a Redis pool with consistent error handling
35fn try_create_redis_pool(redis_url: &str, purpose: &str) -> Option<deadpool_redis::Pool> {
36 match create_redis_pool(redis_url) {
37 Ok(pool) => {
38 tracing::info!("Redis pool created for {}", purpose);
39 Some(pool)
40 }
41 Err(e) => {
42 tracing::warn!("Failed to create Redis pool for {}: {}", purpose, e);
43 None
44 }
45 }
46}
47
48/// Helper function to create a SQLite pool with consistent error handling
49async fn try_create_sqlite_pool(sqlite_url: &str, purpose: &str) -> Option<sqlx::SqlitePool> {
50 match create_sqlite_pool(sqlite_url).await {
51 Ok(pool) => {
52 tracing::info!("SQLite pool created for {}", purpose);
53 Some(pool)
54 }
55 Err(e) => {
56 tracing::warn!("Failed to create SQLite pool for {}: {}", purpose, e);
57 None
58 }
59 }
60}
61
62/// Simple command-line argument handling for --version and --help
63fn handle_simple_args() -> bool {
64 let args: Vec<String> = std::env::args().collect();
65
66 if args.len() > 1 {
67 match args[1].as_str() {
68 "--version" | "-V" => {
69 println!("quickdid {}", env!("CARGO_PKG_VERSION"));
70 return true;
71 }
72 "--help" | "-h" => {
73 println!("QuickDID - AT Protocol Identity Resolver Service");
74 println!("Version: {}", env!("CARGO_PKG_VERSION"));
75 println!();
76 println!("USAGE:");
77 println!(" quickdid [OPTIONS]");
78 println!();
79 println!("OPTIONS:");
80 println!(" -h, --help Print help information");
81 println!(" -V, --version Print version information");
82 println!();
83 println!("ENVIRONMENT VARIABLES:");
84 println!(
85 " HTTP_EXTERNAL External hostname for service endpoints (required)"
86 );
87 println!(" HTTP_PORT HTTP server port (default: 8080)");
88 println!(" PLC_HOSTNAME PLC directory hostname (default: plc.directory)");
89 println!(
90 " USER_AGENT HTTP User-Agent header (auto-generated with version)"
91 );
92 println!(" DNS_NAMESERVERS Custom DNS nameservers (comma-separated IPs)");
93 println!(
94 " CERTIFICATE_BUNDLES Additional CA certificates (comma-separated paths)"
95 );
96 println!();
97 println!(" CACHING:");
98 println!(" REDIS_URL Redis URL for handle resolution caching");
99 println!(
100 " SQLITE_URL SQLite database URL for handle resolution caching"
101 );
102 println!(
103 " CACHE_TTL_MEMORY TTL for in-memory cache in seconds (default: 600)"
104 );
105 println!(
106 " CACHE_TTL_REDIS TTL for Redis cache in seconds (default: 7776000)"
107 );
108 println!(
109 " CACHE_TTL_SQLITE TTL for SQLite cache in seconds (default: 7776000)"
110 );
111 println!();
112 println!(" QUEUE CONFIGURATION:");
113 println!(
114 " QUEUE_ADAPTER Queue adapter: 'mpsc', 'redis', 'sqlite', 'noop' (default: mpsc)"
115 );
116 println!(" QUEUE_REDIS_URL Redis URL for queue adapter");
117 println!(
118 " QUEUE_REDIS_PREFIX Redis key prefix for queues (default: queue:handleresolver:)"
119 );
120 println!(" QUEUE_REDIS_TIMEOUT Queue blocking timeout in seconds (default: 5)");
121 println!(
122 " QUEUE_REDIS_DEDUP_ENABLED Enable queue deduplication (default: false)"
123 );
124 println!(" QUEUE_REDIS_DEDUP_TTL TTL for dedup keys in seconds (default: 60)");
125 println!(" QUEUE_WORKER_ID Worker ID for Redis queue (default: worker1)");
126 println!(" QUEUE_BUFFER_SIZE Buffer size for MPSC queue (default: 1000)");
127 println!(" QUEUE_SQLITE_MAX_SIZE Maximum SQLite queue size (default: 10000)");
128 println!();
129 println!(" RATE LIMITING:");
130 println!(
131 " RESOLVER_MAX_CONCURRENT Maximum concurrent resolutions (default: 0 = disabled)"
132 );
133 println!(
134 " RESOLVER_MAX_CONCURRENT_TIMEOUT_MS Timeout for acquiring permits in ms (default: 0 = no timeout)"
135 );
136 println!();
137 println!(" METRICS:");
138 println!(
139 " METRICS_ADAPTER Metrics adapter: 'noop' or 'statsd' (default: noop)"
140 );
141 println!(
142 " METRICS_STATSD_HOST StatsD host when using statsd adapter (e.g., localhost:8125)"
143 );
144 println!(
145 " METRICS_STATSD_BIND Bind address for StatsD UDP socket (default: [::]:0)"
146 );
147 println!(" METRICS_PREFIX Prefix for all metrics (default: quickdid)");
148 println!(
149 " METRICS_TAGS Default tags for metrics (comma-separated key:value pairs)"
150 );
151 println!();
152 println!(" PROACTIVE CACHE REFRESH:");
153 println!(
154 " PROACTIVE_REFRESH_ENABLED Enable proactive cache refresh (default: false)"
155 );
156 println!(
157 " PROACTIVE_REFRESH_THRESHOLD Threshold as percentage of TTL (0.0-1.0, default: 0.8)"
158 );
159 println!();
160 println!(" JETSTREAM:");
161 println!(" JETSTREAM_ENABLED Enable Jetstream consumer (default: false)");
162 println!(
163 " JETSTREAM_HOSTNAME Jetstream hostname (default: jetstream.atproto.tools)"
164 );
165 println!();
166 println!(
167 "For more information, visit: https://github.com/smokesignal.events/quickdid"
168 );
169 return true;
170 }
171 _ => {}
172 }
173 }
174
175 false
176}
177
178#[tokio::main]
179async fn main() -> Result<()> {
180 // Handle --version and --help
181 if handle_simple_args() {
182 return Ok(());
183 }
184
185 // Initialize tracing
186 tracing_subscriber::registry()
187 .with(
188 tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| {
189 "quickdid=info,atproto_identity=debug,atproto_xrpcs=debug".into()
190 }),
191 )
192 .with(tracing_subscriber::fmt::layer())
193 .init();
194
195 let config = Config::from_env()?;
196
197 // Validate configuration
198 config.validate()?;
199
200 tracing::info!("Starting QuickDID service on port {}", config.http_port);
201 tracing::info!(
202 "Cache TTL - Memory: {}s, Redis: {}s, SQLite: {}s",
203 config.cache_ttl_memory,
204 config.cache_ttl_redis,
205 config.cache_ttl_sqlite
206 );
207
208 // Parse certificate bundles if provided
209 let certificate_bundles: CertificateBundles = config
210 .certificate_bundles
211 .clone()
212 .unwrap_or_default()
213 .try_into()?;
214
215 // Parse DNS nameservers if provided
216 let dns_nameservers: DnsNameservers = config
217 .dns_nameservers
218 .clone()
219 .unwrap_or_default()
220 .try_into()?;
221
222 // Build HTTP client
223 let mut client_builder = reqwest::Client::builder();
224 for ca_certificate in certificate_bundles.as_ref() {
225 let cert = std::fs::read(ca_certificate)?;
226 let cert = reqwest::Certificate::from_pem(&cert)?;
227 client_builder = client_builder.add_root_certificate(cert);
228 }
229 client_builder = client_builder.user_agent(&config.user_agent);
230 let http_client = client_builder.build()?;
231
232 // Create DNS resolver
233 let dns_resolver = HickoryDnsResolver::create_resolver(dns_nameservers.as_ref());
234
235 // Clone DNS resolver for lexicon resolution before wrapping in Arc
236 let lexicon_dns_resolver = dns_resolver.clone();
237
238 // Wrap DNS resolver in Arc for handle resolution
239 let dns_resolver_arc = Arc::new(dns_resolver);
240
241 // Create metrics publisher based on configuration
242 let metrics_publisher = create_metrics_publisher(&config).map_err(|e| {
243 tracing::error!("Failed to create metrics publisher: {}", e);
244 anyhow::anyhow!("Failed to create metrics publisher: {}", e)
245 })?;
246
247 tracing::info!(
248 "Metrics publisher created with {} adapter",
249 config.metrics_adapter
250 );
251
252 metrics_publisher.gauge("server", 1).await;
253
254 // Create base handle resolver using factory function
255 let mut base_handle_resolver = create_base_resolver(
256 dns_resolver_arc.clone(),
257 http_client.clone(),
258 metrics_publisher.clone(),
259 );
260
261 // Apply rate limiting if configured
262 if config.resolver_max_concurrent > 0 {
263 let timeout_info = if config.resolver_max_concurrent_timeout_ms > 0 {
264 format!(", {}ms timeout", config.resolver_max_concurrent_timeout_ms)
265 } else {
266 String::new()
267 };
268 tracing::info!(
269 "Applying rate limiting to handle resolver (max {} concurrent resolutions{})",
270 config.resolver_max_concurrent,
271 timeout_info
272 );
273 base_handle_resolver = create_rate_limited_resolver_with_timeout(
274 base_handle_resolver,
275 config.resolver_max_concurrent,
276 config.resolver_max_concurrent_timeout_ms,
277 metrics_publisher.clone(),
278 );
279 }
280
281 // Create Redis pool if configured
282 let redis_pool = config
283 .redis_url
284 .as_ref()
285 .and_then(|url| try_create_redis_pool(url, "handle resolver cache"));
286
287 // Create SQLite pool if configured
288 let sqlite_pool = if let Some(url) = config.sqlite_url.as_ref() {
289 try_create_sqlite_pool(url, "handle resolver cache").await
290 } else {
291 None
292 };
293
294 // Create task tracker and cancellation token
295 let tracker = TaskTracker::new();
296 let token = CancellationToken::new();
297
298 // Create the queue adapter first (needed for proactive refresh)
299 let handle_queue: Arc<dyn QueueAdapter<HandleResolutionWork>> = {
300 // Create queue adapter based on configuration
301 let adapter: Arc<dyn QueueAdapter<HandleResolutionWork>> = match config
302 .queue_adapter
303 .as_str()
304 {
305 "redis" => {
306 // Use queue-specific Redis URL, fall back to general Redis URL
307 let queue_redis_url = config
308 .queue_redis_url
309 .as_ref()
310 .or(config.redis_url.as_ref());
311
312 if let Some(url) = queue_redis_url {
313 if let Some(pool) = try_create_redis_pool(url, "queue adapter") {
314 tracing::info!(
315 "Creating Redis queue adapter with prefix: {}, dedup: {}, dedup_ttl: {}s",
316 config.queue_redis_prefix,
317 config.queue_redis_dedup_enabled,
318 config.queue_redis_dedup_ttl
319 );
320 if config.queue_redis_dedup_enabled {
321 create_redis_queue_with_dedup::<HandleResolutionWork>(
322 pool,
323 config.queue_worker_id.clone(),
324 config.queue_redis_prefix.clone(),
325 config.queue_redis_timeout,
326 true,
327 config.queue_redis_dedup_ttl,
328 )
329 } else {
330 create_redis_queue::<HandleResolutionWork>(
331 pool,
332 config.queue_worker_id.clone(),
333 config.queue_redis_prefix.clone(),
334 config.queue_redis_timeout,
335 )
336 }
337 } else {
338 tracing::warn!("Falling back to MPSC queue adapter");
339 // Fall back to MPSC if Redis fails
340 let (handle_sender, handle_receiver) =
341 tokio::sync::mpsc::channel::<HandleResolutionWork>(
342 config.queue_buffer_size,
343 );
344 create_mpsc_queue_from_channel(handle_sender, handle_receiver)
345 }
346 } else {
347 tracing::warn!(
348 "Redis queue adapter requested but no Redis URL configured, using no-op adapter"
349 );
350 create_noop_queue::<HandleResolutionWork>()
351 }
352 }
353 "sqlite" => {
354 // Use SQLite adapter
355 if let Some(url) = config.sqlite_url.as_ref() {
356 if let Some(pool) = try_create_sqlite_pool(url, "queue adapter").await {
357 if config.queue_sqlite_max_size > 0 {
358 tracing::info!(
359 "Creating SQLite queue adapter with work shedding (max_size: {})",
360 config.queue_sqlite_max_size
361 );
362 create_sqlite_queue_with_max_size::<HandleResolutionWork>(
363 pool,
364 config.queue_sqlite_max_size,
365 )
366 } else {
367 tracing::info!("Creating SQLite queue adapter (unlimited size)");
368 create_sqlite_queue::<HandleResolutionWork>(pool)
369 }
370 } else {
371 tracing::warn!(
372 "Failed to create SQLite pool for queue, falling back to MPSC queue adapter"
373 );
374 // Fall back to MPSC if SQLite fails
375 let (handle_sender, handle_receiver) =
376 tokio::sync::mpsc::channel::<HandleResolutionWork>(
377 config.queue_buffer_size,
378 );
379 create_mpsc_queue_from_channel(handle_sender, handle_receiver)
380 }
381 } else {
382 tracing::warn!(
383 "SQLite queue adapter requested but no SQLite URL configured, using no-op adapter"
384 );
385 create_noop_queue::<HandleResolutionWork>()
386 }
387 }
388 "mpsc" => {
389 // Use MPSC adapter
390 tracing::info!(
391 "Using MPSC queue adapter with buffer size: {}",
392 config.queue_buffer_size
393 );
394 let (handle_sender, handle_receiver) =
395 tokio::sync::mpsc::channel::<HandleResolutionWork>(config.queue_buffer_size);
396 create_mpsc_queue_from_channel(handle_sender, handle_receiver)
397 }
398 "noop" | "none" => {
399 // Use no-op adapter
400 tracing::info!("Using no-op queue adapter (queuing disabled)");
401 create_noop_queue::<HandleResolutionWork>()
402 }
403 _ => {
404 // Default to no-op adapter for unknown types
405 tracing::warn!(
406 "Unknown queue adapter type '{}', using no-op adapter",
407 config.queue_adapter
408 );
409 create_noop_queue::<HandleResolutionWork>()
410 }
411 };
412
413 adapter
414 };
415
416 // Create handle resolver with cache priority: Redis > SQLite > In-memory
417 let (mut handle_resolver, cache_ttl): (
418 Arc<dyn quickdid::handle_resolver::HandleResolver>,
419 u64,
420 ) = if let Some(ref pool) = redis_pool {
421 tracing::info!(
422 "Using Redis-backed handle resolver with {}-second cache TTL",
423 config.cache_ttl_redis
424 );
425 (
426 create_redis_resolver_with_ttl(
427 base_handle_resolver,
428 pool.clone(),
429 config.cache_ttl_redis,
430 metrics_publisher.clone(),
431 ),
432 config.cache_ttl_redis,
433 )
434 } else if let Some(pool) = sqlite_pool {
435 tracing::info!(
436 "Using SQLite-backed handle resolver with {}-second cache TTL",
437 config.cache_ttl_sqlite
438 );
439 (
440 create_sqlite_resolver_with_ttl(
441 base_handle_resolver,
442 pool,
443 config.cache_ttl_sqlite,
444 metrics_publisher.clone(),
445 ),
446 config.cache_ttl_sqlite,
447 )
448 } else {
449 tracing::info!(
450 "Using in-memory handle resolver with {}-second cache TTL",
451 config.cache_ttl_memory
452 );
453 (
454 create_caching_resolver(
455 base_handle_resolver,
456 config.cache_ttl_memory,
457 metrics_publisher.clone(),
458 ),
459 config.cache_ttl_memory,
460 )
461 };
462
463 // Apply proactive refresh if enabled
464 if config.proactive_refresh_enabled && !matches!(config.queue_adapter.as_str(), "noop" | "none")
465 {
466 tracing::info!(
467 "Enabling proactive cache refresh with {}% threshold",
468 (config.proactive_refresh_threshold * 100.0) as u32
469 );
470 handle_resolver = create_proactive_refresh_resolver_with_metrics(
471 handle_resolver,
472 handle_queue.clone(),
473 metrics_publisher.clone(),
474 cache_ttl,
475 config.proactive_refresh_threshold,
476 );
477 } else if config.proactive_refresh_enabled {
478 tracing::warn!(
479 "Proactive refresh enabled but queue adapter is no-op, skipping proactive refresh"
480 );
481 }
482
483 // Create lexicon resolver with Redis caching if available
484 let lexicon_resolver: Arc<dyn LexiconResolver> = {
485 let base_lexicon_resolver: Arc<dyn LexiconResolver> = Arc::new(
486 DefaultLexiconResolver::new(http_client.clone(), lexicon_dns_resolver),
487 );
488
489 if let Some(ref pool) = redis_pool {
490 tracing::info!(
491 "Using Redis-backed lexicon resolver with {}-second cache TTL",
492 config.cache_ttl_redis
493 );
494 create_redis_lexicon_resolver_with_ttl(
495 base_lexicon_resolver,
496 pool.clone(),
497 config.cache_ttl_redis,
498 metrics_publisher.clone(),
499 )
500 } else {
501 tracing::info!("Using base lexicon resolver without caching");
502 base_lexicon_resolver
503 }
504 };
505
506 // Setup background handle resolution task
507 {
508 let adapter_for_task = handle_queue.clone();
509
510 // Only spawn handle resolver task if not using noop adapter
511 if !matches!(config.queue_adapter.as_str(), "noop" | "none") {
512 // Create handle resolver task configuration
513 let handle_task_config = HandleResolverTaskConfig {
514 default_timeout_ms: 10000,
515 };
516
517 // Create and start handle resolver task
518 let handle_task = create_handle_resolver_task_with_config(
519 adapter_for_task,
520 handle_resolver.clone(),
521 token.clone(),
522 handle_task_config,
523 metrics_publisher.clone(),
524 );
525
526 // Spawn the handle resolver task
527 spawn_cancellable_task(
528 &tracker,
529 token.clone(),
530 "handle_resolver",
531 |cancel_token| async move {
532 tokio::select! {
533 result = handle_task.run() => {
534 if let Err(e) = result {
535 tracing::error!(error = ?e, "Handle resolver task failed");
536 Err(anyhow::anyhow!(e))
537 } else {
538 Ok(())
539 }
540 }
541 _ = cancel_token.cancelled() => {
542 tracing::info!("Handle resolver task cancelled");
543 Ok(())
544 }
545 }
546 },
547 );
548
549 tracing::info!(
550 "Background handle resolution task started with {} adapter",
551 config.queue_adapter
552 );
553 } else {
554 tracing::info!("Background handle resolution task disabled (using no-op adapter)");
555 }
556 };
557
558 // Create app context with the queue adapter
559 let app_context = AppContext::new(
560 handle_resolver.clone(),
561 handle_queue,
562 lexicon_resolver,
563 metrics_publisher.clone(),
564 config.etag_seed.clone(),
565 config.cache_control_header.clone(),
566 config.static_files_dir.clone(),
567 );
568
569 // Create router
570 let router = create_router(app_context);
571
572 // Setup signal handler
573 {
574 let signal_tracker = tracker.clone();
575 let signal_token = token.clone();
576
577 // Spawn signal handler without using the managed task helper since it's special
578 tracing::info!("Starting signal handler task");
579 tokio::spawn(async move {
580 let ctrl_c = async {
581 signal::ctrl_c()
582 .await
583 .expect("failed to install Ctrl+C handler");
584 };
585
586 #[cfg(unix)]
587 let terminate = async {
588 signal::unix::signal(signal::unix::SignalKind::terminate())
589 .expect("failed to install signal handler")
590 .recv()
591 .await;
592 };
593
594 #[cfg(not(unix))]
595 let terminate = std::future::pending::<()>();
596
597 tokio::select! {
598 () = signal_token.cancelled() => {
599 tracing::info!("Signal handler task shutting down gracefully");
600 },
601 _ = terminate => {
602 tracing::info!("Received SIGTERM signal, initiating shutdown");
603 },
604 _ = ctrl_c => {
605 tracing::info!("Received Ctrl+C signal, initiating shutdown");
606 },
607 }
608
609 signal_tracker.close();
610 signal_token.cancel();
611 tracing::info!("Signal handler task completed");
612 });
613 }
614
615 // Start Jetstream consumer if enabled
616 if config.jetstream_enabled {
617 let jetstream_resolver = handle_resolver.clone();
618 let jetstream_metrics = metrics_publisher.clone();
619 let jetstream_hostname = config.jetstream_hostname.clone();
620 let jetstream_user_agent = config.user_agent.clone();
621
622 spawn_cancellable_task(
623 &tracker,
624 token.clone(),
625 "jetstream_consumer",
626 move |cancel_token| async move {
627 tracing::info!(hostname = %jetstream_hostname, "Starting Jetstream consumer");
628
629 // Create event handler
630 let event_handler = Arc::new(QuickDidEventHandler::new(
631 jetstream_resolver,
632 jetstream_metrics.clone(),
633 ));
634
635 // Reconnection loop
636 let mut reconnect_count = 0u32;
637 let max_reconnects_per_minute = 5;
638 let reconnect_window = std::time::Duration::from_secs(60);
639 let mut last_disconnect = std::time::Instant::now() - reconnect_window;
640
641 while !cancel_token.is_cancelled() {
642 let now = std::time::Instant::now();
643 if now.duration_since(last_disconnect) < reconnect_window {
644 reconnect_count += 1;
645 if reconnect_count > max_reconnects_per_minute {
646 tracing::warn!(
647 count = reconnect_count,
648 "Too many Jetstream reconnects, waiting 60 seconds"
649 );
650 tokio::time::sleep(reconnect_window).await;
651 reconnect_count = 0;
652 last_disconnect = now;
653 continue;
654 }
655 } else {
656 reconnect_count = 0;
657 }
658
659 // Create consumer configuration
660 let consumer_config = ConsumerTaskConfig {
661 user_agent: jetstream_user_agent.clone(),
662 compression: false,
663 zstd_dictionary_location: String::new(),
664 jetstream_hostname: jetstream_hostname.clone(),
665 // Listen to the "community.lexicon.collection.fake" collection
666 // so that we keep an active connection open but only for
667 // account and identity events.
668 collections: vec!["community.lexicon.collection.fake".to_string()], // Listen to all collections
669 dids: vec![],
670 max_message_size_bytes: None,
671 cursor: None,
672 require_hello: true,
673 };
674
675 let consumer = JetstreamConsumer::new(consumer_config);
676
677 // Register event handler
678 if let Err(e) = consumer.register_handler(event_handler.clone()).await {
679 tracing::error!(error = ?e, "Failed to register Jetstream event handler");
680 continue;
681 }
682
683 // Run consumer with cancellation support
684 match consumer.run_background(cancel_token.clone()).await {
685 Ok(()) => {
686 tracing::info!("Jetstream consumer stopped normally");
687 if cancel_token.is_cancelled() {
688 break;
689 }
690 last_disconnect = std::time::Instant::now();
691 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
692 }
693 Err(e) => {
694 tracing::error!(error = ?e, "Jetstream consumer connection failed, will reconnect");
695 jetstream_metrics.incr("jetstream.connection.error").await;
696 last_disconnect = std::time::Instant::now();
697
698 if !cancel_token.is_cancelled() {
699 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
700 }
701 }
702 }
703 }
704
705 tracing::info!("Jetstream consumer task shutting down");
706 Ok(())
707 },
708 );
709 } else {
710 tracing::info!("Jetstream consumer disabled");
711 }
712
713 // Start HTTP server with cancellation support
714 let bind_address = format!("0.0.0.0:{}", config.http_port);
715 spawn_cancellable_task(
716 &tracker,
717 token.clone(),
718 "http",
719 move |cancel_token| async move {
720 let listener = tokio::net::TcpListener::bind(&bind_address)
721 .await
722 .map_err(|e| anyhow::anyhow!("Failed to bind to {}: {}", bind_address, e))?;
723
724 tracing::info!("QuickDID service listening on {}", bind_address);
725
726 let shutdown_token = cancel_token.clone();
727 axum::serve(listener, router)
728 .with_graceful_shutdown(async move {
729 shutdown_token.cancelled().await;
730 })
731 .await
732 .map_err(|e| anyhow::anyhow!("HTTP server error: {}", e))?;
733
734 Ok(())
735 },
736 );
737
738 // Wait for all tasks to complete
739 tracing::info!("Waiting for all tasks to complete...");
740 tracker.wait().await;
741
742 tracing::info!("All tasks completed, application shutting down");
743
744 Ok(())
745}