this repo has no description
1use std::net::SocketAddr;
2use std::process::ExitCode;
3use std::sync::Arc;
4use tokio::sync::watch;
5use tracing::{error, info, warn};
6use tranquil_pds::comms::{CommsService, DiscordSender, EmailSender, SignalSender, TelegramSender};
7use tranquil_pds::crawlers::{Crawlers, start_crawlers_service};
8use tranquil_pds::state::AppState;
9
10#[tokio::main]
11async fn main() -> ExitCode {
12 dotenvy::dotenv().ok();
13 tracing_subscriber::fmt::init();
14 tranquil_pds::metrics::init_metrics();
15
16 match run().await {
17 Ok(()) => ExitCode::SUCCESS,
18 Err(e) => {
19 error!("Fatal error: {}", e);
20 ExitCode::FAILURE
21 }
22 }
23}
24
25async fn run() -> Result<(), Box<dyn std::error::Error>> {
26 let state = AppState::new().await?;
27 tranquil_pds::sync::listener::start_sequencer_listener(state.clone()).await;
28
29 let (shutdown_tx, shutdown_rx) = watch::channel(false);
30
31 let mut comms_service = CommsService::new(state.db.clone());
32
33 if let Some(email_sender) = EmailSender::from_env() {
34 info!("Email comms enabled");
35 comms_service = comms_service.register_sender(email_sender);
36 } else {
37 warn!("Email comms disabled (MAIL_FROM_ADDRESS not set)");
38 }
39
40 if let Some(discord_sender) = DiscordSender::from_env() {
41 info!("Discord comms enabled");
42 comms_service = comms_service.register_sender(discord_sender);
43 }
44
45 if let Some(telegram_sender) = TelegramSender::from_env() {
46 info!("Telegram comms enabled");
47 comms_service = comms_service.register_sender(telegram_sender);
48 }
49
50 if let Some(signal_sender) = SignalSender::from_env() {
51 info!("Signal comms enabled");
52 comms_service = comms_service.register_sender(signal_sender);
53 }
54
55 let comms_handle = tokio::spawn(comms_service.run(shutdown_rx.clone()));
56
57 let crawlers_handle = if let Some(crawlers) = Crawlers::from_env() {
58 let crawlers = Arc::new(
59 crawlers.with_circuit_breaker(state.circuit_breakers.relay_notification.clone()),
60 );
61 let firehose_rx = state.firehose_tx.subscribe();
62 info!("Crawlers notification service enabled");
63 Some(tokio::spawn(start_crawlers_service(
64 crawlers,
65 firehose_rx,
66 shutdown_rx,
67 )))
68 } else {
69 warn!("Crawlers notification service disabled (PDS_HOSTNAME or CRAWLERS not set)");
70 None
71 };
72
73 let app = tranquil_pds::app(state);
74 let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
75 info!("listening on {}", addr);
76
77 let listener = tokio::net::TcpListener::bind(addr)
78 .await
79 .map_err(|e| format!("Failed to bind to {}: {}", addr, e))?;
80
81 let server_result = axum::serve(listener, app)
82 .with_graceful_shutdown(shutdown_signal(shutdown_tx))
83 .await;
84
85 comms_handle.await.ok();
86
87 if let Some(handle) = crawlers_handle {
88 handle.await.ok();
89 }
90
91 if let Err(e) = server_result {
92 return Err(format!("Server error: {}", e).into());
93 }
94
95 Ok(())
96}
97
98async fn shutdown_signal(shutdown_tx: watch::Sender<bool>) {
99 let ctrl_c = async {
100 match tokio::signal::ctrl_c().await {
101 Ok(()) => {}
102 Err(e) => {
103 error!("Failed to install Ctrl+C handler: {}", e);
104 }
105 }
106 };
107
108 #[cfg(unix)]
109 let terminate = async {
110 match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
111 Ok(mut signal) => {
112 signal.recv().await;
113 }
114 Err(e) => {
115 error!("Failed to install SIGTERM handler: {}", e);
116 std::future::pending::<()>().await;
117 }
118 }
119 };
120
121 #[cfg(not(unix))]
122 let terminate = std::future::pending::<()>();
123
124 tokio::select! {
125 _ = ctrl_c => {},
126 _ = terminate => {},
127 }
128
129 info!("Shutdown signal received, stopping services...");
130 shutdown_tx.send(true).ok();
131}