this repo has no description
1use bspds::crawlers::{Crawlers, start_crawlers_service};
2use bspds::notifications::{
3 DiscordSender, EmailSender, NotificationService, SignalSender, TelegramSender,
4};
5use bspds::state::AppState;
6use std::net::SocketAddr;
7use std::process::ExitCode;
8use std::sync::Arc;
9use tokio::sync::watch;
10use tracing::{error, info, warn};
11
12#[tokio::main]
13async fn main() -> ExitCode {
14 dotenvy::dotenv().ok();
15 tracing_subscriber::fmt::init();
16 bspds::metrics::init_metrics();
17
18 match run().await {
19 Ok(()) => ExitCode::SUCCESS,
20 Err(e) => {
21 error!("Fatal error: {}", e);
22 ExitCode::FAILURE
23 }
24 }
25}
26
27async fn run() -> Result<(), Box<dyn std::error::Error>> {
28 let database_url = std::env::var("DATABASE_URL")
29 .map_err(|_| "DATABASE_URL environment variable must be set")?;
30
31 let max_connections: u32 = std::env::var("DATABASE_MAX_CONNECTIONS")
32 .ok()
33 .and_then(|v| v.parse().ok())
34 .unwrap_or(100);
35
36 let min_connections: u32 = std::env::var("DATABASE_MIN_CONNECTIONS")
37 .ok()
38 .and_then(|v| v.parse().ok())
39 .unwrap_or(10);
40
41 let acquire_timeout_secs: u64 = std::env::var("DATABASE_ACQUIRE_TIMEOUT_SECS")
42 .ok()
43 .and_then(|v| v.parse().ok())
44 .unwrap_or(10);
45
46 info!(
47 "Configuring database pool: max={}, min={}, acquire_timeout={}s",
48 max_connections, min_connections, acquire_timeout_secs
49 );
50
51 let pool = sqlx::postgres::PgPoolOptions::new()
52 .max_connections(max_connections)
53 .min_connections(min_connections)
54 .acquire_timeout(std::time::Duration::from_secs(acquire_timeout_secs))
55 .idle_timeout(std::time::Duration::from_secs(300))
56 .max_lifetime(std::time::Duration::from_secs(1800))
57 .connect(&database_url)
58 .await
59 .map_err(|e| format!("Failed to connect to Postgres: {}", e))?;
60
61 sqlx::migrate!("./migrations")
62 .run(&pool)
63 .await
64 .map_err(|e| format!("Failed to run migrations: {}", e))?;
65
66 let state = AppState::new(pool.clone()).await;
67 bspds::sync::listener::start_sequencer_listener(state.clone()).await;
68
69 let (shutdown_tx, shutdown_rx) = watch::channel(false);
70
71 let mut notification_service = NotificationService::new(pool);
72
73 if let Some(email_sender) = EmailSender::from_env() {
74 info!("Email notifications enabled");
75 notification_service = notification_service.register_sender(email_sender);
76 } else {
77 warn!("Email notifications disabled (MAIL_FROM_ADDRESS not set)");
78 }
79
80 if let Some(discord_sender) = DiscordSender::from_env() {
81 info!("Discord notifications enabled");
82 notification_service = notification_service.register_sender(discord_sender);
83 }
84
85 if let Some(telegram_sender) = TelegramSender::from_env() {
86 info!("Telegram notifications enabled");
87 notification_service = notification_service.register_sender(telegram_sender);
88 }
89
90 if let Some(signal_sender) = SignalSender::from_env() {
91 info!("Signal notifications enabled");
92 notification_service = notification_service.register_sender(signal_sender);
93 }
94
95 let notification_handle = tokio::spawn(notification_service.run(shutdown_rx.clone()));
96
97 let crawlers_handle = if let Some(crawlers) = Crawlers::from_env() {
98 let crawlers = Arc::new(
99 crawlers.with_circuit_breaker(state.circuit_breakers.relay_notification.clone()),
100 );
101 let firehose_rx = state.firehose_tx.subscribe();
102 info!("Crawlers notification service enabled");
103 Some(tokio::spawn(start_crawlers_service(
104 crawlers,
105 firehose_rx,
106 shutdown_rx,
107 )))
108 } else {
109 warn!("Crawlers notification service disabled (PDS_HOSTNAME or CRAWLERS not set)");
110 None
111 };
112
113 let app = bspds::app(state);
114 let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
115 info!("listening on {}", addr);
116
117 let listener = tokio::net::TcpListener::bind(addr)
118 .await
119 .map_err(|e| format!("Failed to bind to {}: {}", addr, e))?;
120
121 let server_result = axum::serve(listener, app)
122 .with_graceful_shutdown(shutdown_signal(shutdown_tx))
123 .await;
124
125 notification_handle.await.ok();
126
127 if let Some(handle) = crawlers_handle {
128 handle.await.ok();
129 }
130
131 if let Err(e) = server_result {
132 return Err(format!("Server error: {}", e).into());
133 }
134
135 Ok(())
136}
137
138async fn shutdown_signal(shutdown_tx: watch::Sender<bool>) {
139 let ctrl_c = async {
140 match tokio::signal::ctrl_c().await {
141 Ok(()) => {}
142 Err(e) => {
143 error!("Failed to install Ctrl+C handler: {}", e);
144 }
145 }
146 };
147
148 #[cfg(unix)]
149 let terminate = async {
150 match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
151 Ok(mut signal) => {
152 signal.recv().await;
153 }
154 Err(e) => {
155 error!("Failed to install SIGTERM handler: {}", e);
156 std::future::pending::<()>().await;
157 }
158 }
159 };
160
161 #[cfg(not(unix))]
162 let terminate = std::future::pending::<()>();
163
164 tokio::select! {
165 _ = ctrl_c => {},
166 _ = terminate => {},
167 }
168
169 info!("Shutdown signal received, stopping services...");
170 shutdown_tx.send(true).ok();
171}