this repo has no description
1use std::net::SocketAddr; 2use std::process::ExitCode; 3use std::sync::Arc; 4use tokio::sync::watch; 5use tracing::{error, info, warn}; 6use tranquil_pds::comms::{CommsService, DiscordSender, EmailSender, SignalSender, TelegramSender}; 7use tranquil_pds::crawlers::{Crawlers, start_crawlers_service}; 8use tranquil_pds::scheduled::{ 9 backfill_genesis_commit_blocks, backfill_record_blobs, backfill_repo_rev, backfill_user_blocks, 10 start_scheduled_tasks, 11}; 12use tranquil_pds::state::AppState; 13 14#[tokio::main] 15async fn main() -> ExitCode { 16 dotenvy::dotenv().ok(); 17 tracing_subscriber::fmt::init(); 18 tranquil_pds::metrics::init_metrics(); 19 20 match run().await { 21 Ok(()) => ExitCode::SUCCESS, 22 Err(e) => { 23 error!("Fatal error: {}", e); 24 ExitCode::FAILURE 25 } 26 } 27} 28 29async fn run() -> Result<(), Box<dyn std::error::Error>> { 30 let state = AppState::new().await?; 31 tranquil_pds::sync::listener::start_sequencer_listener(state.clone()).await; 32 33 let (shutdown_tx, shutdown_rx) = watch::channel(false); 34 35 let backfill_db = state.db.clone(); 36 let backfill_block_store = state.block_store.clone(); 37 tokio::spawn(async move { 38 backfill_genesis_commit_blocks(&backfill_db, backfill_block_store.clone()).await; 39 backfill_repo_rev(&backfill_db, backfill_block_store.clone()).await; 40 backfill_user_blocks(&backfill_db, backfill_block_store.clone()).await; 41 backfill_record_blobs(&backfill_db, backfill_block_store).await; 42 }); 43 44 let mut comms_service = CommsService::new(state.db.clone()); 45 46 if let Some(email_sender) = EmailSender::from_env() { 47 info!("Email comms enabled"); 48 comms_service = comms_service.register_sender(email_sender); 49 } else { 50 warn!("Email comms disabled (MAIL_FROM_ADDRESS not set)"); 51 } 52 53 if let Some(discord_sender) = DiscordSender::from_env() { 54 info!("Discord comms enabled"); 55 comms_service = comms_service.register_sender(discord_sender); 56 } 57 58 if let Some(telegram_sender) = TelegramSender::from_env() { 59 info!("Telegram comms enabled"); 60 comms_service = comms_service.register_sender(telegram_sender); 61 } 62 63 if let Some(signal_sender) = SignalSender::from_env() { 64 info!("Signal comms enabled"); 65 comms_service = comms_service.register_sender(signal_sender); 66 } 67 68 let comms_handle = tokio::spawn(comms_service.run(shutdown_rx.clone())); 69 70 let crawlers_handle = if let Some(crawlers) = Crawlers::from_env() { 71 let crawlers = Arc::new( 72 crawlers.with_circuit_breaker(state.circuit_breakers.relay_notification.clone()), 73 ); 74 let firehose_rx = state.firehose_tx.subscribe(); 75 info!("Crawlers notification service enabled"); 76 Some(tokio::spawn(start_crawlers_service( 77 crawlers, 78 firehose_rx, 79 shutdown_rx.clone(), 80 ))) 81 } else { 82 warn!("Crawlers notification service disabled (PDS_HOSTNAME or CRAWLERS not set)"); 83 None 84 }; 85 86 let scheduled_handle = tokio::spawn(start_scheduled_tasks( 87 state.db.clone(), 88 state.blob_store.clone(), 89 shutdown_rx, 90 )); 91 92 let app = tranquil_pds::app(state); 93 94 let host = std::env::var("SERVER_HOST").unwrap_or_else(|_| "127.0.0.1".to_string()); 95 let port: u16 = std::env::var("SERVER_PORT") 96 .ok() 97 .and_then(|p| p.parse().ok()) 98 .unwrap_or(3000); 99 100 let addr: SocketAddr = format!("{}:{}", host, port) 101 .parse() 102 .map_err(|e| format!("Invalid SERVER_HOST or SERVER_PORT: {}", e))?; 103 104 info!("listening on {}", addr); 105 106 let listener = tokio::net::TcpListener::bind(addr) 107 .await 108 .map_err(|e| format!("Failed to bind to {}: {}", addr, e))?; 109 110 let server_result = axum::serve(listener, app) 111 .with_graceful_shutdown(shutdown_signal(shutdown_tx)) 112 .await; 113 114 comms_handle.await.ok(); 115 116 if let Some(handle) = crawlers_handle { 117 handle.await.ok(); 118 } 119 120 scheduled_handle.await.ok(); 121 122 if let Err(e) = server_result { 123 return Err(format!("Server error: {}", e).into()); 124 } 125 126 Ok(()) 127} 128 129async fn shutdown_signal(shutdown_tx: watch::Sender<bool>) { 130 let ctrl_c = async { 131 match tokio::signal::ctrl_c().await { 132 Ok(()) => {} 133 Err(e) => { 134 error!("Failed to install Ctrl+C handler: {}", e); 135 } 136 } 137 }; 138 139 #[cfg(unix)] 140 let terminate = async { 141 match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) { 142 Ok(mut signal) => { 143 signal.recv().await; 144 } 145 Err(e) => { 146 error!("Failed to install SIGTERM handler: {}", e); 147 std::future::pending::<()>().await; 148 } 149 } 150 }; 151 152 #[cfg(not(unix))] 153 let terminate = std::future::pending::<()>(); 154 155 tokio::select! { 156 _ = ctrl_c => {}, 157 _ = terminate => {}, 158 } 159 160 info!("Shutdown signal received, stopping services..."); 161 shutdown_tx.send(true).ok(); 162}