this repo has no description
1use std::net::SocketAddr;
2use std::process::ExitCode;
3use std::sync::Arc;
4use tokio::sync::watch;
5use tracing::{error, info, warn};
6use tranquil_pds::comms::{CommsService, DiscordSender, EmailSender, SignalSender, TelegramSender};
7use tranquil_pds::crawlers::{Crawlers, start_crawlers_service};
8use tranquil_pds::scheduled::{
9 backfill_genesis_commit_blocks, backfill_record_blobs, backfill_repo_rev, backfill_user_blocks,
10 start_backup_tasks, start_scheduled_tasks,
11};
12use tranquil_pds::state::AppState;
13
14#[tokio::main]
15async fn main() -> ExitCode {
16 dotenvy::dotenv().ok();
17 tracing_subscriber::fmt::init();
18 tranquil_pds::metrics::init_metrics();
19
20 match run().await {
21 Ok(()) => ExitCode::SUCCESS,
22 Err(e) => {
23 error!("Fatal error: {}", e);
24 ExitCode::FAILURE
25 }
26 }
27}
28
29async fn run() -> Result<(), Box<dyn std::error::Error>> {
30 let state = AppState::new().await?;
31 tranquil_pds::sync::listener::start_sequencer_listener(state.clone()).await;
32
33 let (shutdown_tx, shutdown_rx) = watch::channel(false);
34
35 let backfill_db = state.db.clone();
36 let backfill_block_store = state.block_store.clone();
37 tokio::spawn(async move {
38 tokio::join!(
39 backfill_genesis_commit_blocks(&backfill_db, backfill_block_store.clone()),
40 backfill_repo_rev(&backfill_db, backfill_block_store.clone()),
41 backfill_user_blocks(&backfill_db, backfill_block_store.clone()),
42 backfill_record_blobs(&backfill_db, backfill_block_store),
43 );
44 });
45
46 let mut comms_service = CommsService::new(state.db.clone());
47
48 if let Some(email_sender) = EmailSender::from_env() {
49 info!("Email comms enabled");
50 comms_service = comms_service.register_sender(email_sender);
51 } else {
52 warn!("Email comms disabled (MAIL_FROM_ADDRESS not set)");
53 }
54
55 if let Some(discord_sender) = DiscordSender::from_env() {
56 info!("Discord comms enabled");
57 comms_service = comms_service.register_sender(discord_sender);
58 }
59
60 if let Some(telegram_sender) = TelegramSender::from_env() {
61 info!("Telegram comms enabled");
62 comms_service = comms_service.register_sender(telegram_sender);
63 }
64
65 if let Some(signal_sender) = SignalSender::from_env() {
66 info!("Signal comms enabled");
67 comms_service = comms_service.register_sender(signal_sender);
68 }
69
70 let comms_handle = tokio::spawn(comms_service.run(shutdown_rx.clone()));
71
72 let crawlers_handle = if let Some(crawlers) = Crawlers::from_env() {
73 let crawlers = Arc::new(
74 crawlers.with_circuit_breaker(state.circuit_breakers.relay_notification.clone()),
75 );
76 let firehose_rx = state.firehose_tx.subscribe();
77 info!("Crawlers notification service enabled");
78 Some(tokio::spawn(start_crawlers_service(
79 crawlers,
80 firehose_rx,
81 shutdown_rx.clone(),
82 )))
83 } else {
84 warn!("Crawlers notification service disabled (PDS_HOSTNAME or CRAWLERS not set)");
85 None
86 };
87
88 let backup_handle = if let Some(backup_storage) = state.backup_storage.clone() {
89 info!("Backup service enabled");
90 Some(tokio::spawn(start_backup_tasks(
91 state.db.clone(),
92 state.block_store.clone(),
93 backup_storage,
94 shutdown_rx.clone(),
95 )))
96 } else {
97 warn!("Backup service disabled (BACKUP_S3_BUCKET not set or BACKUP_ENABLED=false)");
98 None
99 };
100
101 let scheduled_handle = tokio::spawn(start_scheduled_tasks(
102 state.db.clone(),
103 state.blob_store.clone(),
104 shutdown_rx,
105 ));
106
107 let app = tranquil_pds::app(state);
108
109 let host = std::env::var("SERVER_HOST").unwrap_or_else(|_| "127.0.0.1".to_string());
110 let port: u16 = std::env::var("SERVER_PORT")
111 .ok()
112 .and_then(|p| p.parse().ok())
113 .unwrap_or(3000);
114
115 let addr: SocketAddr = format!("{}:{}", host, port)
116 .parse()
117 .map_err(|e| format!("Invalid SERVER_HOST or SERVER_PORT: {}", e))?;
118
119 info!("listening on {}", addr);
120
121 let listener = tokio::net::TcpListener::bind(addr)
122 .await
123 .map_err(|e| format!("Failed to bind to {}: {}", addr, e))?;
124
125 let server_result = axum::serve(listener, app)
126 .with_graceful_shutdown(shutdown_signal(shutdown_tx))
127 .await;
128
129 comms_handle.await.ok();
130
131 if let Some(handle) = crawlers_handle {
132 handle.await.ok();
133 }
134
135 if let Some(handle) = backup_handle {
136 handle.await.ok();
137 }
138
139 scheduled_handle.await.ok();
140
141 if let Err(e) = server_result {
142 return Err(format!("Server error: {}", e).into());
143 }
144
145 Ok(())
146}
147
148async fn shutdown_signal(shutdown_tx: watch::Sender<bool>) {
149 let ctrl_c = async {
150 match tokio::signal::ctrl_c().await {
151 Ok(()) => {}
152 Err(e) => {
153 error!("Failed to install Ctrl+C handler: {}", e);
154 }
155 }
156 };
157
158 #[cfg(unix)]
159 let terminate = async {
160 match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
161 Ok(mut signal) => {
162 signal.recv().await;
163 }
164 Err(e) => {
165 error!("Failed to install SIGTERM handler: {}", e);
166 std::future::pending::<()>().await;
167 }
168 }
169 };
170
171 #[cfg(not(unix))]
172 let terminate = std::future::pending::<()>();
173
174 tokio::select! {
175 _ = ctrl_c => {},
176 _ = terminate => {},
177 }
178
179 info!("Shutdown signal received, stopping services...");
180 shutdown_tx.send(true).ok();
181}