this repo has no description
1use bspds::crawlers::{Crawlers, start_crawlers_service}; 2use bspds::notifications::{DiscordSender, EmailSender, NotificationService, SignalSender, TelegramSender}; 3use bspds::state::AppState; 4use std::net::SocketAddr; 5use std::process::ExitCode; 6use std::sync::Arc; 7use tokio::sync::watch; 8use tracing::{error, info, warn}; 9#[tokio::main] 10async fn main() -> ExitCode { 11 dotenvy::dotenv().ok(); 12 tracing_subscriber::fmt::init(); 13 bspds::metrics::init_metrics(); 14 match run().await { 15 Ok(()) => ExitCode::SUCCESS, 16 Err(e) => { 17 error!("Fatal error: {}", e); 18 ExitCode::FAILURE 19 } 20 } 21} 22async fn run() -> Result<(), Box<dyn std::error::Error>> { 23 let database_url = std::env::var("DATABASE_URL") 24 .map_err(|_| "DATABASE_URL environment variable must be set")?; 25 let max_connections: u32 = std::env::var("DATABASE_MAX_CONNECTIONS") 26 .ok() 27 .and_then(|v| v.parse().ok()) 28 .unwrap_or(100); 29 let min_connections: u32 = std::env::var("DATABASE_MIN_CONNECTIONS") 30 .ok() 31 .and_then(|v| v.parse().ok()) 32 .unwrap_or(10); 33 let acquire_timeout_secs: u64 = std::env::var("DATABASE_ACQUIRE_TIMEOUT_SECS") 34 .ok() 35 .and_then(|v| v.parse().ok()) 36 .unwrap_or(10); 37 info!( 38 "Configuring database pool: max={}, min={}, acquire_timeout={}s", 39 max_connections, min_connections, acquire_timeout_secs 40 ); 41 let pool = sqlx::postgres::PgPoolOptions::new() 42 .max_connections(max_connections) 43 .min_connections(min_connections) 44 .acquire_timeout(std::time::Duration::from_secs(acquire_timeout_secs)) 45 .idle_timeout(std::time::Duration::from_secs(300)) 46 .max_lifetime(std::time::Duration::from_secs(1800)) 47 .connect(&database_url) 48 .await 49 .map_err(|e| format!("Failed to connect to Postgres: {}", e))?; 50 sqlx::migrate!("./migrations") 51 .run(&pool) 52 .await 53 .map_err(|e| format!("Failed to run migrations: {}", e))?; 54 let state = AppState::new(pool.clone()).await; 55 bspds::sync::listener::start_sequencer_listener(state.clone()).await; 56 let (shutdown_tx, shutdown_rx) = watch::channel(false); 57 let mut notification_service = NotificationService::new(pool); 58 if let Some(email_sender) = EmailSender::from_env() { 59 info!("Email notifications enabled"); 60 notification_service = notification_service.register_sender(email_sender); 61 } else { 62 warn!("Email notifications disabled (MAIL_FROM_ADDRESS not set)"); 63 } 64 if let Some(discord_sender) = DiscordSender::from_env() { 65 info!("Discord notifications enabled"); 66 notification_service = notification_service.register_sender(discord_sender); 67 } 68 if let Some(telegram_sender) = TelegramSender::from_env() { 69 info!("Telegram notifications enabled"); 70 notification_service = notification_service.register_sender(telegram_sender); 71 } 72 if let Some(signal_sender) = SignalSender::from_env() { 73 info!("Signal notifications enabled"); 74 notification_service = notification_service.register_sender(signal_sender); 75 } 76 let notification_handle = tokio::spawn(notification_service.run(shutdown_rx.clone())); 77 let crawlers_handle = if let Some(crawlers) = Crawlers::from_env() { 78 let crawlers = Arc::new( 79 crawlers.with_circuit_breaker(state.circuit_breakers.relay_notification.clone()) 80 ); 81 let firehose_rx = state.firehose_tx.subscribe(); 82 info!("Crawlers notification service enabled"); 83 Some(tokio::spawn(start_crawlers_service(crawlers, firehose_rx, shutdown_rx))) 84 } else { 85 warn!("Crawlers notification service disabled (PDS_HOSTNAME or CRAWLERS not set)"); 86 None 87 }; 88 let app = bspds::app(state); 89 let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); 90 info!("listening on {}", addr); 91 let listener = tokio::net::TcpListener::bind(addr) 92 .await 93 .map_err(|e| format!("Failed to bind to {}: {}", addr, e))?; 94 let server_result = axum::serve(listener, app) 95 .with_graceful_shutdown(shutdown_signal(shutdown_tx)) 96 .await; 97 notification_handle.await.ok(); 98 if let Some(handle) = crawlers_handle { 99 handle.await.ok(); 100 } 101 if let Err(e) = server_result { 102 return Err(format!("Server error: {}", e).into()); 103 } 104 Ok(()) 105} 106async fn shutdown_signal(shutdown_tx: watch::Sender<bool>) { 107 let ctrl_c = async { 108 match tokio::signal::ctrl_c().await { 109 Ok(()) => {} 110 Err(e) => { 111 error!("Failed to install Ctrl+C handler: {}", e); 112 } 113 } 114 }; 115 #[cfg(unix)] 116 let terminate = async { 117 match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) { 118 Ok(mut signal) => { 119 signal.recv().await; 120 } 121 Err(e) => { 122 error!("Failed to install SIGTERM handler: {}", e); 123 std::future::pending::<()>().await; 124 } 125 } 126 }; 127 #[cfg(not(unix))] 128 let terminate = std::future::pending::<()>(); 129 tokio::select! { 130 _ = ctrl_c => {}, 131 _ = terminate => {}, 132 } 133 info!("Shutdown signal received, stopping services..."); 134 shutdown_tx.send(true).ok(); 135}