this repo has no description
1use bspds::crawlers::{Crawlers, start_crawlers_service}; 2use bspds::notifications::{ 3 DiscordSender, EmailSender, NotificationService, SignalSender, TelegramSender, 4}; 5use bspds::state::AppState; 6use std::net::SocketAddr; 7use std::process::ExitCode; 8use std::sync::Arc; 9use tokio::sync::watch; 10use tracing::{error, info, warn}; 11 12#[tokio::main] 13async fn main() -> ExitCode { 14 dotenvy::dotenv().ok(); 15 tracing_subscriber::fmt::init(); 16 bspds::metrics::init_metrics(); 17 18 match run().await { 19 Ok(()) => ExitCode::SUCCESS, 20 Err(e) => { 21 error!("Fatal error: {}", e); 22 ExitCode::FAILURE 23 } 24 } 25} 26 27async fn run() -> Result<(), Box<dyn std::error::Error>> { 28 let database_url = std::env::var("DATABASE_URL") 29 .map_err(|_| "DATABASE_URL environment variable must be set")?; 30 31 let max_connections: u32 = std::env::var("DATABASE_MAX_CONNECTIONS") 32 .ok() 33 .and_then(|v| v.parse().ok()) 34 .unwrap_or(100); 35 36 let min_connections: u32 = std::env::var("DATABASE_MIN_CONNECTIONS") 37 .ok() 38 .and_then(|v| v.parse().ok()) 39 .unwrap_or(10); 40 41 let acquire_timeout_secs: u64 = std::env::var("DATABASE_ACQUIRE_TIMEOUT_SECS") 42 .ok() 43 .and_then(|v| v.parse().ok()) 44 .unwrap_or(10); 45 46 info!( 47 "Configuring database pool: max={}, min={}, acquire_timeout={}s", 48 max_connections, min_connections, acquire_timeout_secs 49 ); 50 51 let pool = sqlx::postgres::PgPoolOptions::new() 52 .max_connections(max_connections) 53 .min_connections(min_connections) 54 .acquire_timeout(std::time::Duration::from_secs(acquire_timeout_secs)) 55 .idle_timeout(std::time::Duration::from_secs(300)) 56 .max_lifetime(std::time::Duration::from_secs(1800)) 57 .connect(&database_url) 58 .await 59 .map_err(|e| format!("Failed to connect to Postgres: {}", e))?; 60 61 sqlx::migrate!("./migrations") 62 .run(&pool) 63 .await 64 .map_err(|e| format!("Failed to run migrations: {}", e))?; 65 66 let state = AppState::new(pool.clone()).await; 67 bspds::sync::listener::start_sequencer_listener(state.clone()).await; 68 69 let (shutdown_tx, shutdown_rx) = watch::channel(false); 70 71 let mut notification_service = NotificationService::new(pool); 72 73 if let Some(email_sender) = EmailSender::from_env() { 74 info!("Email notifications enabled"); 75 notification_service = notification_service.register_sender(email_sender); 76 } else { 77 warn!("Email notifications disabled (MAIL_FROM_ADDRESS not set)"); 78 } 79 80 if let Some(discord_sender) = DiscordSender::from_env() { 81 info!("Discord notifications enabled"); 82 notification_service = notification_service.register_sender(discord_sender); 83 } 84 85 if let Some(telegram_sender) = TelegramSender::from_env() { 86 info!("Telegram notifications enabled"); 87 notification_service = notification_service.register_sender(telegram_sender); 88 } 89 90 if let Some(signal_sender) = SignalSender::from_env() { 91 info!("Signal notifications enabled"); 92 notification_service = notification_service.register_sender(signal_sender); 93 } 94 95 let notification_handle = tokio::spawn(notification_service.run(shutdown_rx.clone())); 96 97 let crawlers_handle = if let Some(crawlers) = Crawlers::from_env() { 98 let crawlers = Arc::new( 99 crawlers.with_circuit_breaker(state.circuit_breakers.relay_notification.clone()), 100 ); 101 let firehose_rx = state.firehose_tx.subscribe(); 102 info!("Crawlers notification service enabled"); 103 Some(tokio::spawn(start_crawlers_service( 104 crawlers, 105 firehose_rx, 106 shutdown_rx, 107 ))) 108 } else { 109 warn!("Crawlers notification service disabled (PDS_HOSTNAME or CRAWLERS not set)"); 110 None 111 }; 112 113 let app = bspds::app(state); 114 let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); 115 info!("listening on {}", addr); 116 117 let listener = tokio::net::TcpListener::bind(addr) 118 .await 119 .map_err(|e| format!("Failed to bind to {}: {}", addr, e))?; 120 121 let server_result = axum::serve(listener, app) 122 .with_graceful_shutdown(shutdown_signal(shutdown_tx)) 123 .await; 124 125 notification_handle.await.ok(); 126 127 if let Some(handle) = crawlers_handle { 128 handle.await.ok(); 129 } 130 131 if let Err(e) = server_result { 132 return Err(format!("Server error: {}", e).into()); 133 } 134 135 Ok(()) 136} 137 138async fn shutdown_signal(shutdown_tx: watch::Sender<bool>) { 139 let ctrl_c = async { 140 match tokio::signal::ctrl_c().await { 141 Ok(()) => {} 142 Err(e) => { 143 error!("Failed to install Ctrl+C handler: {}", e); 144 } 145 } 146 }; 147 148 #[cfg(unix)] 149 let terminate = async { 150 match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) { 151 Ok(mut signal) => { 152 signal.recv().await; 153 } 154 Err(e) => { 155 error!("Failed to install SIGTERM handler: {}", e); 156 std::future::pending::<()>().await; 157 } 158 } 159 }; 160 161 #[cfg(not(unix))] 162 let terminate = std::future::pending::<()>(); 163 164 tokio::select! { 165 _ = ctrl_c => {}, 166 _ = terminate => {}, 167 } 168 169 info!("Shutdown signal received, stopping services..."); 170 shutdown_tx.send(true).ok(); 171}