this repo has no description
1use std::net::SocketAddr; 2use std::process::ExitCode; 3use std::sync::Arc; 4use tokio::sync::watch; 5use tracing::{error, info, warn}; 6use tranquil_pds::comms::{CommsService, DiscordSender, EmailSender, SignalSender, TelegramSender}; 7use tranquil_pds::crawlers::{Crawlers, start_crawlers_service}; 8use tranquil_pds::scheduled::{backfill_genesis_commit_blocks, backfill_repo_rev, backfill_user_blocks, start_scheduled_tasks}; 9use tranquil_pds::state::AppState; 10 11#[tokio::main] 12async fn main() -> ExitCode { 13 dotenvy::dotenv().ok(); 14 tracing_subscriber::fmt::init(); 15 tranquil_pds::metrics::init_metrics(); 16 17 match run().await { 18 Ok(()) => ExitCode::SUCCESS, 19 Err(e) => { 20 error!("Fatal error: {}", e); 21 ExitCode::FAILURE 22 } 23 } 24} 25 26async fn run() -> Result<(), Box<dyn std::error::Error>> { 27 let state = AppState::new().await?; 28 tranquil_pds::sync::listener::start_sequencer_listener(state.clone()).await; 29 30 let (shutdown_tx, shutdown_rx) = watch::channel(false); 31 32 let backfill_db = state.db.clone(); 33 let backfill_block_store = state.block_store.clone(); 34 tokio::spawn(async move { 35 backfill_genesis_commit_blocks(&backfill_db, backfill_block_store.clone()).await; 36 backfill_repo_rev(&backfill_db, backfill_block_store.clone()).await; 37 backfill_user_blocks(&backfill_db, backfill_block_store).await; 38 }); 39 40 let mut comms_service = CommsService::new(state.db.clone()); 41 42 if let Some(email_sender) = EmailSender::from_env() { 43 info!("Email comms enabled"); 44 comms_service = comms_service.register_sender(email_sender); 45 } else { 46 warn!("Email comms disabled (MAIL_FROM_ADDRESS not set)"); 47 } 48 49 if let Some(discord_sender) = DiscordSender::from_env() { 50 info!("Discord comms enabled"); 51 comms_service = comms_service.register_sender(discord_sender); 52 } 53 54 if let Some(telegram_sender) = TelegramSender::from_env() { 55 info!("Telegram comms enabled"); 56 comms_service = comms_service.register_sender(telegram_sender); 57 } 58 59 if let Some(signal_sender) = SignalSender::from_env() { 60 info!("Signal comms enabled"); 61 comms_service = comms_service.register_sender(signal_sender); 62 } 63 64 let comms_handle = tokio::spawn(comms_service.run(shutdown_rx.clone())); 65 66 let crawlers_handle = if let Some(crawlers) = Crawlers::from_env() { 67 let crawlers = Arc::new( 68 crawlers.with_circuit_breaker(state.circuit_breakers.relay_notification.clone()), 69 ); 70 let firehose_rx = state.firehose_tx.subscribe(); 71 info!("Crawlers notification service enabled"); 72 Some(tokio::spawn(start_crawlers_service( 73 crawlers, 74 firehose_rx, 75 shutdown_rx.clone(), 76 ))) 77 } else { 78 warn!("Crawlers notification service disabled (PDS_HOSTNAME or CRAWLERS not set)"); 79 None 80 }; 81 82 let scheduled_handle = tokio::spawn(start_scheduled_tasks( 83 state.db.clone(), 84 state.blob_store.clone(), 85 shutdown_rx, 86 )); 87 88 let app = tranquil_pds::app(state); 89 let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); 90 info!("listening on {}", addr); 91 92 let listener = tokio::net::TcpListener::bind(addr) 93 .await 94 .map_err(|e| format!("Failed to bind to {}: {}", addr, e))?; 95 96 let server_result = axum::serve(listener, app) 97 .with_graceful_shutdown(shutdown_signal(shutdown_tx)) 98 .await; 99 100 comms_handle.await.ok(); 101 102 if let Some(handle) = crawlers_handle { 103 handle.await.ok(); 104 } 105 106 scheduled_handle.await.ok(); 107 108 if let Err(e) = server_result { 109 return Err(format!("Server error: {}", e).into()); 110 } 111 112 Ok(()) 113} 114 115async fn shutdown_signal(shutdown_tx: watch::Sender<bool>) { 116 let ctrl_c = async { 117 match tokio::signal::ctrl_c().await { 118 Ok(()) => {} 119 Err(e) => { 120 error!("Failed to install Ctrl+C handler: {}", e); 121 } 122 } 123 }; 124 125 #[cfg(unix)] 126 let terminate = async { 127 match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) { 128 Ok(mut signal) => { 129 signal.recv().await; 130 } 131 Err(e) => { 132 error!("Failed to install SIGTERM handler: {}", e); 133 std::future::pending::<()>().await; 134 } 135 } 136 }; 137 138 #[cfg(not(unix))] 139 let terminate = std::future::pending::<()>(); 140 141 tokio::select! { 142 _ = ctrl_c => {}, 143 _ = terminate => {}, 144 } 145 146 info!("Shutdown signal received, stopping services..."); 147 shutdown_tx.send(true).ok(); 148}