this repo has no description
1use std::net::SocketAddr; 2use std::process::ExitCode; 3use std::sync::Arc; 4use tokio::sync::watch; 5use tracing::{error, info, warn}; 6use tranquil_pds::comms::{CommsService, DiscordSender, EmailSender, SignalSender, TelegramSender}; 7use tranquil_pds::crawlers::{Crawlers, start_crawlers_service}; 8use tranquil_pds::state::AppState; 9 10#[tokio::main] 11async fn main() -> ExitCode { 12 dotenvy::dotenv().ok(); 13 tracing_subscriber::fmt::init(); 14 tranquil_pds::metrics::init_metrics(); 15 16 match run().await { 17 Ok(()) => ExitCode::SUCCESS, 18 Err(e) => { 19 error!("Fatal error: {}", e); 20 ExitCode::FAILURE 21 } 22 } 23} 24 25async fn run() -> Result<(), Box<dyn std::error::Error>> { 26 let state = AppState::new().await?; 27 tranquil_pds::sync::listener::start_sequencer_listener(state.clone()).await; 28 29 let (shutdown_tx, shutdown_rx) = watch::channel(false); 30 31 let mut comms_service = CommsService::new(state.db.clone()); 32 33 if let Some(email_sender) = EmailSender::from_env() { 34 info!("Email comms enabled"); 35 comms_service = comms_service.register_sender(email_sender); 36 } else { 37 warn!("Email comms disabled (MAIL_FROM_ADDRESS not set)"); 38 } 39 40 if let Some(discord_sender) = DiscordSender::from_env() { 41 info!("Discord comms enabled"); 42 comms_service = comms_service.register_sender(discord_sender); 43 } 44 45 if let Some(telegram_sender) = TelegramSender::from_env() { 46 info!("Telegram comms enabled"); 47 comms_service = comms_service.register_sender(telegram_sender); 48 } 49 50 if let Some(signal_sender) = SignalSender::from_env() { 51 info!("Signal comms enabled"); 52 comms_service = comms_service.register_sender(signal_sender); 53 } 54 55 let comms_handle = tokio::spawn(comms_service.run(shutdown_rx.clone())); 56 57 let crawlers_handle = if let Some(crawlers) = Crawlers::from_env() { 58 let crawlers = Arc::new( 59 crawlers.with_circuit_breaker(state.circuit_breakers.relay_notification.clone()), 60 ); 61 let firehose_rx = state.firehose_tx.subscribe(); 62 info!("Crawlers notification service enabled"); 63 Some(tokio::spawn(start_crawlers_service( 64 crawlers, 65 firehose_rx, 66 shutdown_rx, 67 ))) 68 } else { 69 warn!("Crawlers notification service disabled (PDS_HOSTNAME or CRAWLERS not set)"); 70 None 71 }; 72 73 let app = tranquil_pds::app(state); 74 let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); 75 info!("listening on {}", addr); 76 77 let listener = tokio::net::TcpListener::bind(addr) 78 .await 79 .map_err(|e| format!("Failed to bind to {}: {}", addr, e))?; 80 81 let server_result = axum::serve(listener, app) 82 .with_graceful_shutdown(shutdown_signal(shutdown_tx)) 83 .await; 84 85 comms_handle.await.ok(); 86 87 if let Some(handle) = crawlers_handle { 88 handle.await.ok(); 89 } 90 91 if let Err(e) = server_result { 92 return Err(format!("Server error: {}", e).into()); 93 } 94 95 Ok(()) 96} 97 98async fn shutdown_signal(shutdown_tx: watch::Sender<bool>) { 99 let ctrl_c = async { 100 match tokio::signal::ctrl_c().await { 101 Ok(()) => {} 102 Err(e) => { 103 error!("Failed to install Ctrl+C handler: {}", e); 104 } 105 } 106 }; 107 108 #[cfg(unix)] 109 let terminate = async { 110 match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) { 111 Ok(mut signal) => { 112 signal.recv().await; 113 } 114 Err(e) => { 115 error!("Failed to install SIGTERM handler: {}", e); 116 std::future::pending::<()>().await; 117 } 118 } 119 }; 120 121 #[cfg(not(unix))] 122 let terminate = std::future::pending::<()>(); 123 124 tokio::select! { 125 _ = ctrl_c => {}, 126 _ = terminate => {}, 127 } 128 129 info!("Shutdown signal received, stopping services..."); 130 shutdown_tx.send(true).ok(); 131}