this repo has no description
1use std::net::SocketAddr;
2use std::process::ExitCode;
3use std::sync::Arc;
4use tokio::sync::watch;
5use tracing::{error, info, warn};
6use tranquil_pds::comms::{CommsService, DiscordSender, EmailSender, SignalSender, TelegramSender};
7use tranquil_pds::crawlers::{Crawlers, start_crawlers_service};
8use tranquil_pds::state::AppState;
9
10#[tokio::main]
11async fn main() -> ExitCode {
12 dotenvy::dotenv().ok();
13 tracing_subscriber::fmt::init();
14 tranquil_pds::metrics::init_metrics();
15
16 match run().await {
17 Ok(()) => ExitCode::SUCCESS,
18 Err(e) => {
19 error!("Fatal error: {}", e);
20 ExitCode::FAILURE
21 }
22 }
23}
24
25async fn run() -> Result<(), Box<dyn std::error::Error>> {
26 let database_url = std::env::var("DATABASE_URL")
27 .map_err(|_| "DATABASE_URL environment variable must be set")?;
28
29 let max_connections: u32 = std::env::var("DATABASE_MAX_CONNECTIONS")
30 .ok()
31 .and_then(|v| v.parse().ok())
32 .unwrap_or(100);
33
34 let min_connections: u32 = std::env::var("DATABASE_MIN_CONNECTIONS")
35 .ok()
36 .and_then(|v| v.parse().ok())
37 .unwrap_or(10);
38
39 let acquire_timeout_secs: u64 = std::env::var("DATABASE_ACQUIRE_TIMEOUT_SECS")
40 .ok()
41 .and_then(|v| v.parse().ok())
42 .unwrap_or(10);
43
44 info!(
45 "Configuring database pool: max={}, min={}, acquire_timeout={}s",
46 max_connections, min_connections, acquire_timeout_secs
47 );
48
49 let pool = sqlx::postgres::PgPoolOptions::new()
50 .max_connections(max_connections)
51 .min_connections(min_connections)
52 .acquire_timeout(std::time::Duration::from_secs(acquire_timeout_secs))
53 .idle_timeout(std::time::Duration::from_secs(300))
54 .max_lifetime(std::time::Duration::from_secs(1800))
55 .connect(&database_url)
56 .await
57 .map_err(|e| format!("Failed to connect to Postgres: {}", e))?;
58
59 sqlx::migrate!("./migrations")
60 .run(&pool)
61 .await
62 .map_err(|e| format!("Failed to run migrations: {}", e))?;
63
64 let state = AppState::new(pool.clone()).await;
65 tranquil_pds::sync::listener::start_sequencer_listener(state.clone()).await;
66
67 let (shutdown_tx, shutdown_rx) = watch::channel(false);
68
69 let mut comms_service = CommsService::new(pool);
70
71 if let Some(email_sender) = EmailSender::from_env() {
72 info!("Email comms enabled");
73 comms_service = comms_service.register_sender(email_sender);
74 } else {
75 warn!("Email comms disabled (MAIL_FROM_ADDRESS not set)");
76 }
77
78 if let Some(discord_sender) = DiscordSender::from_env() {
79 info!("Discord comms enabled");
80 comms_service = comms_service.register_sender(discord_sender);
81 }
82
83 if let Some(telegram_sender) = TelegramSender::from_env() {
84 info!("Telegram comms enabled");
85 comms_service = comms_service.register_sender(telegram_sender);
86 }
87
88 if let Some(signal_sender) = SignalSender::from_env() {
89 info!("Signal comms enabled");
90 comms_service = comms_service.register_sender(signal_sender);
91 }
92
93 let comms_handle = tokio::spawn(comms_service.run(shutdown_rx.clone()));
94
95 let crawlers_handle = if let Some(crawlers) = Crawlers::from_env() {
96 let crawlers = Arc::new(
97 crawlers.with_circuit_breaker(state.circuit_breakers.relay_notification.clone()),
98 );
99 let firehose_rx = state.firehose_tx.subscribe();
100 info!("Crawlers notification service enabled");
101 Some(tokio::spawn(start_crawlers_service(
102 crawlers,
103 firehose_rx,
104 shutdown_rx,
105 )))
106 } else {
107 warn!("Crawlers notification service disabled (PDS_HOSTNAME or CRAWLERS not set)");
108 None
109 };
110
111 let app = tranquil_pds::app(state);
112 let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
113 info!("listening on {}", addr);
114
115 let listener = tokio::net::TcpListener::bind(addr)
116 .await
117 .map_err(|e| format!("Failed to bind to {}: {}", addr, e))?;
118
119 let server_result = axum::serve(listener, app)
120 .with_graceful_shutdown(shutdown_signal(shutdown_tx))
121 .await;
122
123 comms_handle.await.ok();
124
125 if let Some(handle) = crawlers_handle {
126 handle.await.ok();
127 }
128
129 if let Err(e) = server_result {
130 return Err(format!("Server error: {}", e).into());
131 }
132
133 Ok(())
134}
135
136async fn shutdown_signal(shutdown_tx: watch::Sender<bool>) {
137 let ctrl_c = async {
138 match tokio::signal::ctrl_c().await {
139 Ok(()) => {}
140 Err(e) => {
141 error!("Failed to install Ctrl+C handler: {}", e);
142 }
143 }
144 };
145
146 #[cfg(unix)]
147 let terminate = async {
148 match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
149 Ok(mut signal) => {
150 signal.recv().await;
151 }
152 Err(e) => {
153 error!("Failed to install SIGTERM handler: {}", e);
154 std::future::pending::<()>().await;
155 }
156 }
157 };
158
159 #[cfg(not(unix))]
160 let terminate = std::future::pending::<()>();
161
162 tokio::select! {
163 _ = ctrl_c => {},
164 _ = terminate => {},
165 }
166
167 info!("Shutdown signal received, stopping services...");
168 shutdown_tx.send(true).ok();
169}