this repo has no description
1use bspds::notifications::{EmailSender, NotificationService}; 2use bspds::state::AppState; 3use std::net::SocketAddr; 4use std::process::ExitCode; 5use tokio::sync::watch; 6use tracing::{error, info, warn}; 7 8#[tokio::main] 9async fn main() -> ExitCode { 10 dotenvy::dotenv().ok(); 11 tracing_subscriber::fmt::init(); 12 13 match run().await { 14 Ok(()) => ExitCode::SUCCESS, 15 Err(e) => { 16 error!("Fatal error: {}", e); 17 ExitCode::FAILURE 18 } 19 } 20} 21 22async fn run() -> Result<(), Box<dyn std::error::Error>> { 23 let database_url = std::env::var("DATABASE_URL") 24 .map_err(|_| "DATABASE_URL environment variable must be set")?; 25 26 let pool = sqlx::postgres::PgPoolOptions::new() 27 .max_connections(20) 28 .min_connections(2) 29 .acquire_timeout(std::time::Duration::from_secs(10)) 30 .idle_timeout(std::time::Duration::from_secs(300)) 31 .max_lifetime(std::time::Duration::from_secs(1800)) 32 .connect(&database_url) 33 .await 34 .map_err(|e| format!("Failed to connect to Postgres: {}", e))?; 35 36 sqlx::migrate!("./migrations") 37 .run(&pool) 38 .await 39 .map_err(|e| format!("Failed to run migrations: {}", e))?; 40 41 let state = AppState::new(pool.clone()).await; 42 43 bspds::sync::listener::start_sequencer_listener(state.clone()).await; 44 let relays = std::env::var("RELAYS") 45 .unwrap_or_default() 46 .split(',') 47 .filter(|s| !s.is_empty()) 48 .map(|s| s.to_string()) 49 .collect(); 50 bspds::sync::relay_client::start_relay_clients(state.clone(), relays, None).await; 51 52 let (shutdown_tx, shutdown_rx) = watch::channel(false); 53 54 let mut notification_service = NotificationService::new(pool); 55 56 if let Some(email_sender) = EmailSender::from_env() { 57 info!("Email notifications enabled"); 58 notification_service = notification_service.register_sender(email_sender); 59 } else { 60 warn!("Email notifications disabled (MAIL_FROM_ADDRESS not set)"); 61 } 62 63 let notification_handle = tokio::spawn(notification_service.run(shutdown_rx)); 64 65 let app = bspds::app(state); 66 67 let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); 68 info!("listening on {}", addr); 69 let listener = tokio::net::TcpListener::bind(addr) 70 .await 71 .map_err(|e| format!("Failed to bind to {}: {}", addr, e))?; 72 73 let server_result = axum::serve(listener, app) 74 .with_graceful_shutdown(shutdown_signal(shutdown_tx)) 75 .await; 76 77 notification_handle.await.ok(); 78 79 if let Err(e) = server_result { 80 return Err(format!("Server error: {}", e).into()); 81 } 82 83 Ok(()) 84} 85 86async fn shutdown_signal(shutdown_tx: watch::Sender<bool>) { 87 let ctrl_c = async { 88 match tokio::signal::ctrl_c().await { 89 Ok(()) => {} 90 Err(e) => { 91 error!("Failed to install Ctrl+C handler: {}", e); 92 } 93 } 94 }; 95 96 #[cfg(unix)] 97 let terminate = async { 98 match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) { 99 Ok(mut signal) => { 100 signal.recv().await; 101 } 102 Err(e) => { 103 error!("Failed to install SIGTERM handler: {}", e); 104 std::future::pending::<()>().await; 105 } 106 } 107 }; 108 109 #[cfg(not(unix))] 110 let terminate = std::future::pending::<()>(); 111 112 tokio::select! { 113 _ = ctrl_c => {}, 114 _ = terminate => {}, 115 } 116 117 info!("Shutdown signal received, stopping services..."); 118 shutdown_tx.send(true).ok(); 119}