this repo has no description
1use bspds::crawlers::{Crawlers, start_crawlers_service};
2use bspds::notifications::{DiscordSender, EmailSender, NotificationService, SignalSender, TelegramSender};
3use bspds::state::AppState;
4use std::net::SocketAddr;
5use std::process::ExitCode;
6use std::sync::Arc;
7use tokio::sync::watch;
8use tracing::{error, info, warn};
9
10#[tokio::main]
11async fn main() -> ExitCode {
12 dotenvy::dotenv().ok();
13 tracing_subscriber::fmt::init();
14
15 match run().await {
16 Ok(()) => ExitCode::SUCCESS,
17 Err(e) => {
18 error!("Fatal error: {}", e);
19 ExitCode::FAILURE
20 }
21 }
22}
23
24async fn run() -> Result<(), Box<dyn std::error::Error>> {
25 let database_url = std::env::var("DATABASE_URL")
26 .map_err(|_| "DATABASE_URL environment variable must be set")?;
27
28 let pool = sqlx::postgres::PgPoolOptions::new()
29 .max_connections(20)
30 .min_connections(2)
31 .acquire_timeout(std::time::Duration::from_secs(10))
32 .idle_timeout(std::time::Duration::from_secs(300))
33 .max_lifetime(std::time::Duration::from_secs(1800))
34 .connect(&database_url)
35 .await
36 .map_err(|e| format!("Failed to connect to Postgres: {}", e))?;
37
38 sqlx::migrate!("./migrations")
39 .run(&pool)
40 .await
41 .map_err(|e| format!("Failed to run migrations: {}", e))?;
42
43 let state = AppState::new(pool.clone()).await;
44
45 bspds::sync::listener::start_sequencer_listener(state.clone()).await;
46
47 let (shutdown_tx, shutdown_rx) = watch::channel(false);
48
49 let mut notification_service = NotificationService::new(pool);
50
51 if let Some(email_sender) = EmailSender::from_env() {
52 info!("Email notifications enabled");
53 notification_service = notification_service.register_sender(email_sender);
54 } else {
55 warn!("Email notifications disabled (MAIL_FROM_ADDRESS not set)");
56 }
57
58 if let Some(discord_sender) = DiscordSender::from_env() {
59 info!("Discord notifications enabled");
60 notification_service = notification_service.register_sender(discord_sender);
61 }
62
63 if let Some(telegram_sender) = TelegramSender::from_env() {
64 info!("Telegram notifications enabled");
65 notification_service = notification_service.register_sender(telegram_sender);
66 }
67
68 if let Some(signal_sender) = SignalSender::from_env() {
69 info!("Signal notifications enabled");
70 notification_service = notification_service.register_sender(signal_sender);
71 }
72
73 let notification_handle = tokio::spawn(notification_service.run(shutdown_rx.clone()));
74
75 let crawlers_handle = if let Some(crawlers) = Crawlers::from_env() {
76 let crawlers = Arc::new(
77 crawlers.with_circuit_breaker(state.circuit_breakers.relay_notification.clone())
78 );
79 let firehose_rx = state.firehose_tx.subscribe();
80 info!("Crawlers notification service enabled");
81 Some(tokio::spawn(start_crawlers_service(crawlers, firehose_rx, shutdown_rx)))
82 } else {
83 warn!("Crawlers notification service disabled (PDS_HOSTNAME or CRAWLERS not set)");
84 None
85 };
86
87 let app = bspds::app(state);
88
89 let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
90 info!("listening on {}", addr);
91 let listener = tokio::net::TcpListener::bind(addr)
92 .await
93 .map_err(|e| format!("Failed to bind to {}: {}", addr, e))?;
94
95 let server_result = axum::serve(listener, app)
96 .with_graceful_shutdown(shutdown_signal(shutdown_tx))
97 .await;
98
99 notification_handle.await.ok();
100 if let Some(handle) = crawlers_handle {
101 handle.await.ok();
102 }
103
104 if let Err(e) = server_result {
105 return Err(format!("Server error: {}", e).into());
106 }
107
108 Ok(())
109}
110
111async fn shutdown_signal(shutdown_tx: watch::Sender<bool>) {
112 let ctrl_c = async {
113 match tokio::signal::ctrl_c().await {
114 Ok(()) => {}
115 Err(e) => {
116 error!("Failed to install Ctrl+C handler: {}", e);
117 }
118 }
119 };
120
121 #[cfg(unix)]
122 let terminate = async {
123 match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
124 Ok(mut signal) => {
125 signal.recv().await;
126 }
127 Err(e) => {
128 error!("Failed to install SIGTERM handler: {}", e);
129 std::future::pending::<()>().await;
130 }
131 }
132 };
133
134 #[cfg(not(unix))]
135 let terminate = std::future::pending::<()>();
136
137 tokio::select! {
138 _ = ctrl_c => {},
139 _ = terminate => {},
140 }
141
142 info!("Shutdown signal received, stopping services...");
143 shutdown_tx.send(true).ok();
144}