this repo has no description
1use std::net::SocketAddr;
2use std::process::ExitCode;
3use std::sync::Arc;
4use tokio::sync::watch;
5use tracing::{error, info, warn};
6use tranquil_pds::comms::{CommsService, DiscordSender, EmailSender, SignalSender, TelegramSender};
7use tranquil_pds::crawlers::{Crawlers, start_crawlers_service};
8use tranquil_pds::scheduled::{
9 backfill_genesis_commit_blocks, backfill_record_blobs, backfill_repo_rev, backfill_user_blocks,
10 start_backup_tasks, start_scheduled_tasks,
11};
12use tranquil_pds::state::AppState;
13
14#[tokio::main]
15async fn main() -> ExitCode {
16 dotenvy::dotenv().ok();
17 tracing_subscriber::fmt::init();
18 tranquil_pds::metrics::init_metrics();
19
20 match run().await {
21 Ok(()) => ExitCode::SUCCESS,
22 Err(e) => {
23 error!("Fatal error: {}", e);
24 ExitCode::FAILURE
25 }
26 }
27}
28
29async fn run() -> Result<(), Box<dyn std::error::Error>> {
30 let state = AppState::new().await?;
31 tranquil_pds::sync::listener::start_sequencer_listener(state.clone()).await;
32
33 let (shutdown_tx, shutdown_rx) = watch::channel(false);
34
35 let backfill_db = state.db.clone();
36 let backfill_block_store = state.block_store.clone();
37 tokio::spawn(async move {
38 backfill_genesis_commit_blocks(&backfill_db, backfill_block_store.clone()).await;
39 backfill_repo_rev(&backfill_db, backfill_block_store.clone()).await;
40 backfill_user_blocks(&backfill_db, backfill_block_store.clone()).await;
41 backfill_record_blobs(&backfill_db, backfill_block_store).await;
42 });
43
44 let mut comms_service = CommsService::new(state.db.clone());
45
46 if let Some(email_sender) = EmailSender::from_env() {
47 info!("Email comms enabled");
48 comms_service = comms_service.register_sender(email_sender);
49 } else {
50 warn!("Email comms disabled (MAIL_FROM_ADDRESS not set)");
51 }
52
53 if let Some(discord_sender) = DiscordSender::from_env() {
54 info!("Discord comms enabled");
55 comms_service = comms_service.register_sender(discord_sender);
56 }
57
58 if let Some(telegram_sender) = TelegramSender::from_env() {
59 info!("Telegram comms enabled");
60 comms_service = comms_service.register_sender(telegram_sender);
61 }
62
63 if let Some(signal_sender) = SignalSender::from_env() {
64 info!("Signal comms enabled");
65 comms_service = comms_service.register_sender(signal_sender);
66 }
67
68 let comms_handle = tokio::spawn(comms_service.run(shutdown_rx.clone()));
69
70 let crawlers_handle = if let Some(crawlers) = Crawlers::from_env() {
71 let crawlers = Arc::new(
72 crawlers.with_circuit_breaker(state.circuit_breakers.relay_notification.clone()),
73 );
74 let firehose_rx = state.firehose_tx.subscribe();
75 info!("Crawlers notification service enabled");
76 Some(tokio::spawn(start_crawlers_service(
77 crawlers,
78 firehose_rx,
79 shutdown_rx.clone(),
80 )))
81 } else {
82 warn!("Crawlers notification service disabled (PDS_HOSTNAME or CRAWLERS not set)");
83 None
84 };
85
86 let backup_handle = if let Some(backup_storage) = state.backup_storage.clone() {
87 info!("Backup service enabled");
88 Some(tokio::spawn(start_backup_tasks(
89 state.db.clone(),
90 state.block_store.clone(),
91 backup_storage,
92 shutdown_rx.clone(),
93 )))
94 } else {
95 warn!("Backup service disabled (BACKUP_S3_BUCKET not set or BACKUP_ENABLED=false)");
96 None
97 };
98
99 let scheduled_handle = tokio::spawn(start_scheduled_tasks(
100 state.db.clone(),
101 state.blob_store.clone(),
102 shutdown_rx,
103 ));
104
105 let app = tranquil_pds::app(state);
106
107 let host = std::env::var("SERVER_HOST").unwrap_or_else(|_| "127.0.0.1".to_string());
108 let port: u16 = std::env::var("SERVER_PORT")
109 .ok()
110 .and_then(|p| p.parse().ok())
111 .unwrap_or(3000);
112
113 let addr: SocketAddr = format!("{}:{}", host, port)
114 .parse()
115 .map_err(|e| format!("Invalid SERVER_HOST or SERVER_PORT: {}", e))?;
116
117 info!("listening on {}", addr);
118
119 let listener = tokio::net::TcpListener::bind(addr)
120 .await
121 .map_err(|e| format!("Failed to bind to {}: {}", addr, e))?;
122
123 let server_result = axum::serve(listener, app)
124 .with_graceful_shutdown(shutdown_signal(shutdown_tx))
125 .await;
126
127 comms_handle.await.ok();
128
129 if let Some(handle) = crawlers_handle {
130 handle.await.ok();
131 }
132
133 if let Some(handle) = backup_handle {
134 handle.await.ok();
135 }
136
137 scheduled_handle.await.ok();
138
139 if let Err(e) = server_result {
140 return Err(format!("Server error: {}", e).into());
141 }
142
143 Ok(())
144}
145
146async fn shutdown_signal(shutdown_tx: watch::Sender<bool>) {
147 let ctrl_c = async {
148 match tokio::signal::ctrl_c().await {
149 Ok(()) => {}
150 Err(e) => {
151 error!("Failed to install Ctrl+C handler: {}", e);
152 }
153 }
154 };
155
156 #[cfg(unix)]
157 let terminate = async {
158 match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
159 Ok(mut signal) => {
160 signal.recv().await;
161 }
162 Err(e) => {
163 error!("Failed to install SIGTERM handler: {}", e);
164 std::future::pending::<()>().await;
165 }
166 }
167 };
168
169 #[cfg(not(unix))]
170 let terminate = std::future::pending::<()>();
171
172 tokio::select! {
173 _ = ctrl_c => {},
174 _ = terminate => {},
175 }
176
177 info!("Shutdown signal received, stopping services...");
178 shutdown_tx.send(true).ok();
179}