this repo has no description
1use std::net::SocketAddr; 2use std::process::ExitCode; 3use std::sync::Arc; 4use tokio::sync::watch; 5use tracing::{error, info, warn}; 6use tranquil_pds::comms::{CommsService, DiscordSender, EmailSender, SignalSender, TelegramSender}; 7use tranquil_pds::crawlers::{Crawlers, start_crawlers_service}; 8use tranquil_pds::state::AppState; 9 10#[tokio::main] 11async fn main() -> ExitCode { 12 dotenvy::dotenv().ok(); 13 tracing_subscriber::fmt::init(); 14 tranquil_pds::metrics::init_metrics(); 15 16 match run().await { 17 Ok(()) => ExitCode::SUCCESS, 18 Err(e) => { 19 error!("Fatal error: {}", e); 20 ExitCode::FAILURE 21 } 22 } 23} 24 25async fn run() -> Result<(), Box<dyn std::error::Error>> { 26 let database_url = std::env::var("DATABASE_URL") 27 .map_err(|_| "DATABASE_URL environment variable must be set")?; 28 29 let max_connections: u32 = std::env::var("DATABASE_MAX_CONNECTIONS") 30 .ok() 31 .and_then(|v| v.parse().ok()) 32 .unwrap_or(100); 33 34 let min_connections: u32 = std::env::var("DATABASE_MIN_CONNECTIONS") 35 .ok() 36 .and_then(|v| v.parse().ok()) 37 .unwrap_or(10); 38 39 let acquire_timeout_secs: u64 = std::env::var("DATABASE_ACQUIRE_TIMEOUT_SECS") 40 .ok() 41 .and_then(|v| v.parse().ok()) 42 .unwrap_or(10); 43 44 info!( 45 "Configuring database pool: max={}, min={}, acquire_timeout={}s", 46 max_connections, min_connections, acquire_timeout_secs 47 ); 48 49 let pool = sqlx::postgres::PgPoolOptions::new() 50 .max_connections(max_connections) 51 .min_connections(min_connections) 52 .acquire_timeout(std::time::Duration::from_secs(acquire_timeout_secs)) 53 .idle_timeout(std::time::Duration::from_secs(300)) 54 .max_lifetime(std::time::Duration::from_secs(1800)) 55 .connect(&database_url) 56 .await 57 .map_err(|e| format!("Failed to connect to Postgres: {}", e))?; 58 59 sqlx::migrate!("./migrations") 60 .run(&pool) 61 .await 62 .map_err(|e| format!("Failed to run migrations: {}", e))?; 63 64 let state = AppState::new(pool.clone()).await; 65 tranquil_pds::sync::listener::start_sequencer_listener(state.clone()).await; 66 67 let (shutdown_tx, shutdown_rx) = watch::channel(false); 68 69 let mut comms_service = CommsService::new(pool); 70 71 if let Some(email_sender) = EmailSender::from_env() { 72 info!("Email comms enabled"); 73 comms_service = comms_service.register_sender(email_sender); 74 } else { 75 warn!("Email comms disabled (MAIL_FROM_ADDRESS not set)"); 76 } 77 78 if let Some(discord_sender) = DiscordSender::from_env() { 79 info!("Discord comms enabled"); 80 comms_service = comms_service.register_sender(discord_sender); 81 } 82 83 if let Some(telegram_sender) = TelegramSender::from_env() { 84 info!("Telegram comms enabled"); 85 comms_service = comms_service.register_sender(telegram_sender); 86 } 87 88 if let Some(signal_sender) = SignalSender::from_env() { 89 info!("Signal comms enabled"); 90 comms_service = comms_service.register_sender(signal_sender); 91 } 92 93 let comms_handle = tokio::spawn(comms_service.run(shutdown_rx.clone())); 94 95 let crawlers_handle = if let Some(crawlers) = Crawlers::from_env() { 96 let crawlers = Arc::new( 97 crawlers.with_circuit_breaker(state.circuit_breakers.relay_notification.clone()), 98 ); 99 let firehose_rx = state.firehose_tx.subscribe(); 100 info!("Crawlers notification service enabled"); 101 Some(tokio::spawn(start_crawlers_service( 102 crawlers, 103 firehose_rx, 104 shutdown_rx, 105 ))) 106 } else { 107 warn!("Crawlers notification service disabled (PDS_HOSTNAME or CRAWLERS not set)"); 108 None 109 }; 110 111 let app = tranquil_pds::app(state); 112 let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); 113 info!("listening on {}", addr); 114 115 let listener = tokio::net::TcpListener::bind(addr) 116 .await 117 .map_err(|e| format!("Failed to bind to {}: {}", addr, e))?; 118 119 let server_result = axum::serve(listener, app) 120 .with_graceful_shutdown(shutdown_signal(shutdown_tx)) 121 .await; 122 123 comms_handle.await.ok(); 124 125 if let Some(handle) = crawlers_handle { 126 handle.await.ok(); 127 } 128 129 if let Err(e) = server_result { 130 return Err(format!("Server error: {}", e).into()); 131 } 132 133 Ok(()) 134} 135 136async fn shutdown_signal(shutdown_tx: watch::Sender<bool>) { 137 let ctrl_c = async { 138 match tokio::signal::ctrl_c().await { 139 Ok(()) => {} 140 Err(e) => { 141 error!("Failed to install Ctrl+C handler: {}", e); 142 } 143 } 144 }; 145 146 #[cfg(unix)] 147 let terminate = async { 148 match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) { 149 Ok(mut signal) => { 150 signal.recv().await; 151 } 152 Err(e) => { 153 error!("Failed to install SIGTERM handler: {}", e); 154 std::future::pending::<()>().await; 155 } 156 } 157 }; 158 159 #[cfg(not(unix))] 160 let terminate = std::future::pending::<()>(); 161 162 tokio::select! { 163 _ = ctrl_c => {}, 164 _ = terminate => {}, 165 } 166 167 info!("Shutdown signal received, stopping services..."); 168 shutdown_tx.send(true).ok(); 169}