this repo has no description
1use std::net::SocketAddr; 2use std::process::ExitCode; 3use std::sync::Arc; 4use tokio::sync::watch; 5use tracing::{error, info, warn}; 6use tranquil_pds::comms::{CommsService, DiscordSender, EmailSender, SignalSender, TelegramSender}; 7use tranquil_pds::crawlers::{Crawlers, start_crawlers_service}; 8use tranquil_pds::scheduled::{backfill_repo_rev, backfill_user_blocks, start_scheduled_tasks}; 9use tranquil_pds::state::AppState; 10 11#[tokio::main] 12async fn main() -> ExitCode { 13 dotenvy::dotenv().ok(); 14 tracing_subscriber::fmt::init(); 15 tranquil_pds::metrics::init_metrics(); 16 17 match run().await { 18 Ok(()) => ExitCode::SUCCESS, 19 Err(e) => { 20 error!("Fatal error: {}", e); 21 ExitCode::FAILURE 22 } 23 } 24} 25 26async fn run() -> Result<(), Box<dyn std::error::Error>> { 27 let state = AppState::new().await?; 28 tranquil_pds::sync::listener::start_sequencer_listener(state.clone()).await; 29 30 let (shutdown_tx, shutdown_rx) = watch::channel(false); 31 32 let backfill_db = state.db.clone(); 33 let backfill_block_store = state.block_store.clone(); 34 tokio::spawn(async move { 35 backfill_repo_rev(&backfill_db, backfill_block_store.clone()).await; 36 backfill_user_blocks(&backfill_db, backfill_block_store).await; 37 }); 38 39 let mut comms_service = CommsService::new(state.db.clone()); 40 41 if let Some(email_sender) = EmailSender::from_env() { 42 info!("Email comms enabled"); 43 comms_service = comms_service.register_sender(email_sender); 44 } else { 45 warn!("Email comms disabled (MAIL_FROM_ADDRESS not set)"); 46 } 47 48 if let Some(discord_sender) = DiscordSender::from_env() { 49 info!("Discord comms enabled"); 50 comms_service = comms_service.register_sender(discord_sender); 51 } 52 53 if let Some(telegram_sender) = TelegramSender::from_env() { 54 info!("Telegram comms enabled"); 55 comms_service = comms_service.register_sender(telegram_sender); 56 } 57 58 if let Some(signal_sender) = SignalSender::from_env() { 59 info!("Signal comms enabled"); 60 comms_service = comms_service.register_sender(signal_sender); 61 } 62 63 let comms_handle = tokio::spawn(comms_service.run(shutdown_rx.clone())); 64 65 let crawlers_handle = if let Some(crawlers) = Crawlers::from_env() { 66 let crawlers = Arc::new( 67 crawlers.with_circuit_breaker(state.circuit_breakers.relay_notification.clone()), 68 ); 69 let firehose_rx = state.firehose_tx.subscribe(); 70 info!("Crawlers notification service enabled"); 71 Some(tokio::spawn(start_crawlers_service( 72 crawlers, 73 firehose_rx, 74 shutdown_rx.clone(), 75 ))) 76 } else { 77 warn!("Crawlers notification service disabled (PDS_HOSTNAME or CRAWLERS not set)"); 78 None 79 }; 80 81 let scheduled_handle = tokio::spawn(start_scheduled_tasks( 82 state.db.clone(), 83 state.blob_store.clone(), 84 shutdown_rx, 85 )); 86 87 let app = tranquil_pds::app(state); 88 let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); 89 info!("listening on {}", addr); 90 91 let listener = tokio::net::TcpListener::bind(addr) 92 .await 93 .map_err(|e| format!("Failed to bind to {}: {}", addr, e))?; 94 95 let server_result = axum::serve(listener, app) 96 .with_graceful_shutdown(shutdown_signal(shutdown_tx)) 97 .await; 98 99 comms_handle.await.ok(); 100 101 if let Some(handle) = crawlers_handle { 102 handle.await.ok(); 103 } 104 105 scheduled_handle.await.ok(); 106 107 if let Err(e) = server_result { 108 return Err(format!("Server error: {}", e).into()); 109 } 110 111 Ok(()) 112} 113 114async fn shutdown_signal(shutdown_tx: watch::Sender<bool>) { 115 let ctrl_c = async { 116 match tokio::signal::ctrl_c().await { 117 Ok(()) => {} 118 Err(e) => { 119 error!("Failed to install Ctrl+C handler: {}", e); 120 } 121 } 122 }; 123 124 #[cfg(unix)] 125 let terminate = async { 126 match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) { 127 Ok(mut signal) => { 128 signal.recv().await; 129 } 130 Err(e) => { 131 error!("Failed to install SIGTERM handler: {}", e); 132 std::future::pending::<()>().await; 133 } 134 } 135 }; 136 137 #[cfg(not(unix))] 138 let terminate = std::future::pending::<()>(); 139 140 tokio::select! { 141 _ = ctrl_c => {}, 142 _ = terminate => {}, 143 } 144 145 info!("Shutdown signal received, stopping services..."); 146 shutdown_tx.send(true).ok(); 147}