this repo has no description
1use bspds::crawlers::{Crawlers, start_crawlers_service}; 2use bspds::notifications::{DiscordSender, EmailSender, NotificationService, SignalSender, TelegramSender}; 3use bspds::state::AppState; 4use std::net::SocketAddr; 5use std::process::ExitCode; 6use std::sync::Arc; 7use tokio::sync::watch; 8use tracing::{error, info, warn}; 9 10#[tokio::main] 11async fn main() -> ExitCode { 12 dotenvy::dotenv().ok(); 13 tracing_subscriber::fmt::init(); 14 15 match run().await { 16 Ok(()) => ExitCode::SUCCESS, 17 Err(e) => { 18 error!("Fatal error: {}", e); 19 ExitCode::FAILURE 20 } 21 } 22} 23 24async fn run() -> Result<(), Box<dyn std::error::Error>> { 25 let database_url = std::env::var("DATABASE_URL") 26 .map_err(|_| "DATABASE_URL environment variable must be set")?; 27 28 let pool = sqlx::postgres::PgPoolOptions::new() 29 .max_connections(20) 30 .min_connections(2) 31 .acquire_timeout(std::time::Duration::from_secs(10)) 32 .idle_timeout(std::time::Duration::from_secs(300)) 33 .max_lifetime(std::time::Duration::from_secs(1800)) 34 .connect(&database_url) 35 .await 36 .map_err(|e| format!("Failed to connect to Postgres: {}", e))?; 37 38 sqlx::migrate!("./migrations") 39 .run(&pool) 40 .await 41 .map_err(|e| format!("Failed to run migrations: {}", e))?; 42 43 let state = AppState::new(pool.clone()).await; 44 45 bspds::sync::listener::start_sequencer_listener(state.clone()).await; 46 47 let (shutdown_tx, shutdown_rx) = watch::channel(false); 48 49 let mut notification_service = NotificationService::new(pool); 50 51 if let Some(email_sender) = EmailSender::from_env() { 52 info!("Email notifications enabled"); 53 notification_service = notification_service.register_sender(email_sender); 54 } else { 55 warn!("Email notifications disabled (MAIL_FROM_ADDRESS not set)"); 56 } 57 58 if let Some(discord_sender) = DiscordSender::from_env() { 59 info!("Discord notifications enabled"); 60 notification_service = notification_service.register_sender(discord_sender); 61 } 62 63 if let Some(telegram_sender) = TelegramSender::from_env() { 64 info!("Telegram notifications enabled"); 65 notification_service = notification_service.register_sender(telegram_sender); 66 } 67 68 if let Some(signal_sender) = SignalSender::from_env() { 69 info!("Signal notifications enabled"); 70 notification_service = notification_service.register_sender(signal_sender); 71 } 72 73 let notification_handle = tokio::spawn(notification_service.run(shutdown_rx.clone())); 74 75 let crawlers_handle = if let Some(crawlers) = Crawlers::from_env() { 76 let crawlers = Arc::new( 77 crawlers.with_circuit_breaker(state.circuit_breakers.relay_notification.clone()) 78 ); 79 let firehose_rx = state.firehose_tx.subscribe(); 80 info!("Crawlers notification service enabled"); 81 Some(tokio::spawn(start_crawlers_service(crawlers, firehose_rx, shutdown_rx))) 82 } else { 83 warn!("Crawlers notification service disabled (PDS_HOSTNAME or CRAWLERS not set)"); 84 None 85 }; 86 87 let app = bspds::app(state); 88 89 let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); 90 info!("listening on {}", addr); 91 let listener = tokio::net::TcpListener::bind(addr) 92 .await 93 .map_err(|e| format!("Failed to bind to {}: {}", addr, e))?; 94 95 let server_result = axum::serve(listener, app) 96 .with_graceful_shutdown(shutdown_signal(shutdown_tx)) 97 .await; 98 99 notification_handle.await.ok(); 100 if let Some(handle) = crawlers_handle { 101 handle.await.ok(); 102 } 103 104 if let Err(e) = server_result { 105 return Err(format!("Server error: {}", e).into()); 106 } 107 108 Ok(()) 109} 110 111async fn shutdown_signal(shutdown_tx: watch::Sender<bool>) { 112 let ctrl_c = async { 113 match tokio::signal::ctrl_c().await { 114 Ok(()) => {} 115 Err(e) => { 116 error!("Failed to install Ctrl+C handler: {}", e); 117 } 118 } 119 }; 120 121 #[cfg(unix)] 122 let terminate = async { 123 match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) { 124 Ok(mut signal) => { 125 signal.recv().await; 126 } 127 Err(e) => { 128 error!("Failed to install SIGTERM handler: {}", e); 129 std::future::pending::<()>().await; 130 } 131 } 132 }; 133 134 #[cfg(not(unix))] 135 let terminate = std::future::pending::<()>(); 136 137 tokio::select! { 138 _ = ctrl_c => {}, 139 _ = terminate => {}, 140 } 141 142 info!("Shutdown signal received, stopping services..."); 143 shutdown_tx.send(true).ok(); 144}