this repo has no description
1use bspds::crawlers::{Crawlers, start_crawlers_service};
2use bspds::notifications::{DiscordSender, EmailSender, NotificationService, SignalSender, TelegramSender};
3use bspds::state::AppState;
4use std::net::SocketAddr;
5use std::process::ExitCode;
6use std::sync::Arc;
7use tokio::sync::watch;
8use tracing::{error, info, warn};
9#[tokio::main]
10async fn main() -> ExitCode {
11 dotenvy::dotenv().ok();
12 tracing_subscriber::fmt::init();
13 bspds::metrics::init_metrics();
14 match run().await {
15 Ok(()) => ExitCode::SUCCESS,
16 Err(e) => {
17 error!("Fatal error: {}", e);
18 ExitCode::FAILURE
19 }
20 }
21}
22async fn run() -> Result<(), Box<dyn std::error::Error>> {
23 let database_url = std::env::var("DATABASE_URL")
24 .map_err(|_| "DATABASE_URL environment variable must be set")?;
25 let max_connections: u32 = std::env::var("DATABASE_MAX_CONNECTIONS")
26 .ok()
27 .and_then(|v| v.parse().ok())
28 .unwrap_or(100);
29 let min_connections: u32 = std::env::var("DATABASE_MIN_CONNECTIONS")
30 .ok()
31 .and_then(|v| v.parse().ok())
32 .unwrap_or(10);
33 let acquire_timeout_secs: u64 = std::env::var("DATABASE_ACQUIRE_TIMEOUT_SECS")
34 .ok()
35 .and_then(|v| v.parse().ok())
36 .unwrap_or(10);
37 info!(
38 "Configuring database pool: max={}, min={}, acquire_timeout={}s",
39 max_connections, min_connections, acquire_timeout_secs
40 );
41 let pool = sqlx::postgres::PgPoolOptions::new()
42 .max_connections(max_connections)
43 .min_connections(min_connections)
44 .acquire_timeout(std::time::Duration::from_secs(acquire_timeout_secs))
45 .idle_timeout(std::time::Duration::from_secs(300))
46 .max_lifetime(std::time::Duration::from_secs(1800))
47 .connect(&database_url)
48 .await
49 .map_err(|e| format!("Failed to connect to Postgres: {}", e))?;
50 sqlx::migrate!("./migrations")
51 .run(&pool)
52 .await
53 .map_err(|e| format!("Failed to run migrations: {}", e))?;
54 let state = AppState::new(pool.clone()).await;
55 bspds::sync::listener::start_sequencer_listener(state.clone()).await;
56 let (shutdown_tx, shutdown_rx) = watch::channel(false);
57 let mut notification_service = NotificationService::new(pool);
58 if let Some(email_sender) = EmailSender::from_env() {
59 info!("Email notifications enabled");
60 notification_service = notification_service.register_sender(email_sender);
61 } else {
62 warn!("Email notifications disabled (MAIL_FROM_ADDRESS not set)");
63 }
64 if let Some(discord_sender) = DiscordSender::from_env() {
65 info!("Discord notifications enabled");
66 notification_service = notification_service.register_sender(discord_sender);
67 }
68 if let Some(telegram_sender) = TelegramSender::from_env() {
69 info!("Telegram notifications enabled");
70 notification_service = notification_service.register_sender(telegram_sender);
71 }
72 if let Some(signal_sender) = SignalSender::from_env() {
73 info!("Signal notifications enabled");
74 notification_service = notification_service.register_sender(signal_sender);
75 }
76 let notification_handle = tokio::spawn(notification_service.run(shutdown_rx.clone()));
77 let crawlers_handle = if let Some(crawlers) = Crawlers::from_env() {
78 let crawlers = Arc::new(
79 crawlers.with_circuit_breaker(state.circuit_breakers.relay_notification.clone())
80 );
81 let firehose_rx = state.firehose_tx.subscribe();
82 info!("Crawlers notification service enabled");
83 Some(tokio::spawn(start_crawlers_service(crawlers, firehose_rx, shutdown_rx)))
84 } else {
85 warn!("Crawlers notification service disabled (PDS_HOSTNAME or CRAWLERS not set)");
86 None
87 };
88 let app = bspds::app(state);
89 let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
90 info!("listening on {}", addr);
91 let listener = tokio::net::TcpListener::bind(addr)
92 .await
93 .map_err(|e| format!("Failed to bind to {}: {}", addr, e))?;
94 let server_result = axum::serve(listener, app)
95 .with_graceful_shutdown(shutdown_signal(shutdown_tx))
96 .await;
97 notification_handle.await.ok();
98 if let Some(handle) = crawlers_handle {
99 handle.await.ok();
100 }
101 if let Err(e) = server_result {
102 return Err(format!("Server error: {}", e).into());
103 }
104 Ok(())
105}
106async fn shutdown_signal(shutdown_tx: watch::Sender<bool>) {
107 let ctrl_c = async {
108 match tokio::signal::ctrl_c().await {
109 Ok(()) => {}
110 Err(e) => {
111 error!("Failed to install Ctrl+C handler: {}", e);
112 }
113 }
114 };
115 #[cfg(unix)]
116 let terminate = async {
117 match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
118 Ok(mut signal) => {
119 signal.recv().await;
120 }
121 Err(e) => {
122 error!("Failed to install SIGTERM handler: {}", e);
123 std::future::pending::<()>().await;
124 }
125 }
126 };
127 #[cfg(not(unix))]
128 let terminate = std::future::pending::<()>();
129 tokio::select! {
130 _ = ctrl_c => {},
131 _ = terminate => {},
132 }
133 info!("Shutdown signal received, stopping services...");
134 shutdown_tx.send(true).ok();
135}