this repo has no description
1use std::net::SocketAddr;
2use std::process::ExitCode;
3use std::sync::Arc;
4use tokio::sync::watch;
5use tracing::{error, info, warn};
6use tranquil_pds::comms::{CommsService, DiscordSender, EmailSender, SignalSender, TelegramSender};
7use tranquil_pds::crawlers::{Crawlers, start_crawlers_service};
8use tranquil_pds::scheduled::{backfill_genesis_commit_blocks, backfill_record_blobs, backfill_repo_rev, backfill_user_blocks, start_scheduled_tasks};
9use tranquil_pds::state::AppState;
10
11#[tokio::main]
12async fn main() -> ExitCode {
13 dotenvy::dotenv().ok();
14 tracing_subscriber::fmt::init();
15 tranquil_pds::metrics::init_metrics();
16
17 match run().await {
18 Ok(()) => ExitCode::SUCCESS,
19 Err(e) => {
20 error!("Fatal error: {}", e);
21 ExitCode::FAILURE
22 }
23 }
24}
25
26async fn run() -> Result<(), Box<dyn std::error::Error>> {
27 let state = AppState::new().await?;
28 tranquil_pds::sync::listener::start_sequencer_listener(state.clone()).await;
29
30 let (shutdown_tx, shutdown_rx) = watch::channel(false);
31
32 let backfill_db = state.db.clone();
33 let backfill_block_store = state.block_store.clone();
34 tokio::spawn(async move {
35 backfill_genesis_commit_blocks(&backfill_db, backfill_block_store.clone()).await;
36 backfill_repo_rev(&backfill_db, backfill_block_store.clone()).await;
37 backfill_user_blocks(&backfill_db, backfill_block_store.clone()).await;
38 backfill_record_blobs(&backfill_db, backfill_block_store).await;
39 });
40
41 let mut comms_service = CommsService::new(state.db.clone());
42
43 if let Some(email_sender) = EmailSender::from_env() {
44 info!("Email comms enabled");
45 comms_service = comms_service.register_sender(email_sender);
46 } else {
47 warn!("Email comms disabled (MAIL_FROM_ADDRESS not set)");
48 }
49
50 if let Some(discord_sender) = DiscordSender::from_env() {
51 info!("Discord comms enabled");
52 comms_service = comms_service.register_sender(discord_sender);
53 }
54
55 if let Some(telegram_sender) = TelegramSender::from_env() {
56 info!("Telegram comms enabled");
57 comms_service = comms_service.register_sender(telegram_sender);
58 }
59
60 if let Some(signal_sender) = SignalSender::from_env() {
61 info!("Signal comms enabled");
62 comms_service = comms_service.register_sender(signal_sender);
63 }
64
65 let comms_handle = tokio::spawn(comms_service.run(shutdown_rx.clone()));
66
67 let crawlers_handle = if let Some(crawlers) = Crawlers::from_env() {
68 let crawlers = Arc::new(
69 crawlers.with_circuit_breaker(state.circuit_breakers.relay_notification.clone()),
70 );
71 let firehose_rx = state.firehose_tx.subscribe();
72 info!("Crawlers notification service enabled");
73 Some(tokio::spawn(start_crawlers_service(
74 crawlers,
75 firehose_rx,
76 shutdown_rx.clone(),
77 )))
78 } else {
79 warn!("Crawlers notification service disabled (PDS_HOSTNAME or CRAWLERS not set)");
80 None
81 };
82
83 let scheduled_handle = tokio::spawn(start_scheduled_tasks(
84 state.db.clone(),
85 state.blob_store.clone(),
86 shutdown_rx,
87 ));
88
89 let app = tranquil_pds::app(state);
90 let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
91 info!("listening on {}", addr);
92
93 let listener = tokio::net::TcpListener::bind(addr)
94 .await
95 .map_err(|e| format!("Failed to bind to {}: {}", addr, e))?;
96
97 let server_result = axum::serve(listener, app)
98 .with_graceful_shutdown(shutdown_signal(shutdown_tx))
99 .await;
100
101 comms_handle.await.ok();
102
103 if let Some(handle) = crawlers_handle {
104 handle.await.ok();
105 }
106
107 scheduled_handle.await.ok();
108
109 if let Err(e) = server_result {
110 return Err(format!("Server error: {}", e).into());
111 }
112
113 Ok(())
114}
115
116async fn shutdown_signal(shutdown_tx: watch::Sender<bool>) {
117 let ctrl_c = async {
118 match tokio::signal::ctrl_c().await {
119 Ok(()) => {}
120 Err(e) => {
121 error!("Failed to install Ctrl+C handler: {}", e);
122 }
123 }
124 };
125
126 #[cfg(unix)]
127 let terminate = async {
128 match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
129 Ok(mut signal) => {
130 signal.recv().await;
131 }
132 Err(e) => {
133 error!("Failed to install SIGTERM handler: {}", e);
134 std::future::pending::<()>().await;
135 }
136 }
137 };
138
139 #[cfg(not(unix))]
140 let terminate = std::future::pending::<()>();
141
142 tokio::select! {
143 _ = ctrl_c => {},
144 _ = terminate => {},
145 }
146
147 info!("Shutdown signal received, stopping services...");
148 shutdown_tx.send(true).ok();
149}