this repo has no description
1use bspds::notifications::{EmailSender, NotificationService};
2use bspds::state::AppState;
3use std::net::SocketAddr;
4use std::process::ExitCode;
5use tokio::sync::watch;
6use tracing::{error, info, warn};
7
8#[tokio::main]
9async fn main() -> ExitCode {
10 dotenvy::dotenv().ok();
11 tracing_subscriber::fmt::init();
12
13 match run().await {
14 Ok(()) => ExitCode::SUCCESS,
15 Err(e) => {
16 error!("Fatal error: {}", e);
17 ExitCode::FAILURE
18 }
19 }
20}
21
22async fn run() -> Result<(), Box<dyn std::error::Error>> {
23 let database_url = std::env::var("DATABASE_URL")
24 .map_err(|_| "DATABASE_URL environment variable must be set")?;
25
26 let pool = sqlx::postgres::PgPoolOptions::new()
27 .max_connections(20)
28 .min_connections(2)
29 .acquire_timeout(std::time::Duration::from_secs(10))
30 .idle_timeout(std::time::Duration::from_secs(300))
31 .max_lifetime(std::time::Duration::from_secs(1800))
32 .connect(&database_url)
33 .await
34 .map_err(|e| format!("Failed to connect to Postgres: {}", e))?;
35
36 sqlx::migrate!("./migrations")
37 .run(&pool)
38 .await
39 .map_err(|e| format!("Failed to run migrations: {}", e))?;
40
41 let state = AppState::new(pool.clone()).await;
42
43 bspds::sync::listener::start_sequencer_listener(state.clone()).await;
44 let relays = std::env::var("RELAYS")
45 .unwrap_or_default()
46 .split(',')
47 .filter(|s| !s.is_empty())
48 .map(|s| s.to_string())
49 .collect();
50 bspds::sync::relay_client::start_relay_clients(state.clone(), relays, None).await;
51
52 let (shutdown_tx, shutdown_rx) = watch::channel(false);
53
54 let mut notification_service = NotificationService::new(pool);
55
56 if let Some(email_sender) = EmailSender::from_env() {
57 info!("Email notifications enabled");
58 notification_service = notification_service.register_sender(email_sender);
59 } else {
60 warn!("Email notifications disabled (MAIL_FROM_ADDRESS not set)");
61 }
62
63 let notification_handle = tokio::spawn(notification_service.run(shutdown_rx));
64
65 let app = bspds::app(state);
66
67 let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
68 info!("listening on {}", addr);
69 let listener = tokio::net::TcpListener::bind(addr)
70 .await
71 .map_err(|e| format!("Failed to bind to {}: {}", addr, e))?;
72
73 let server_result = axum::serve(listener, app)
74 .with_graceful_shutdown(shutdown_signal(shutdown_tx))
75 .await;
76
77 notification_handle.await.ok();
78
79 if let Err(e) = server_result {
80 return Err(format!("Server error: {}", e).into());
81 }
82
83 Ok(())
84}
85
86async fn shutdown_signal(shutdown_tx: watch::Sender<bool>) {
87 let ctrl_c = async {
88 match tokio::signal::ctrl_c().await {
89 Ok(()) => {}
90 Err(e) => {
91 error!("Failed to install Ctrl+C handler: {}", e);
92 }
93 }
94 };
95
96 #[cfg(unix)]
97 let terminate = async {
98 match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
99 Ok(mut signal) => {
100 signal.recv().await;
101 }
102 Err(e) => {
103 error!("Failed to install SIGTERM handler: {}", e);
104 std::future::pending::<()>().await;
105 }
106 }
107 };
108
109 #[cfg(not(unix))]
110 let terminate = std::future::pending::<()>();
111
112 tokio::select! {
113 _ = ctrl_c => {},
114 _ = terminate => {},
115 }
116
117 info!("Shutdown signal received, stopping services...");
118 shutdown_tx.send(true).ok();
119}