this repo has no description
1use std::net::SocketAddr; 2use std::process::ExitCode; 3use std::sync::Arc; 4use tokio::sync::watch; 5use tracing::{error, info, warn}; 6use tranquil_pds::comms::{CommsService, DiscordSender, EmailSender, SignalSender, TelegramSender}; 7use tranquil_pds::crawlers::{Crawlers, start_crawlers_service}; 8use tranquil_pds::scheduled::{backfill_genesis_commit_blocks, backfill_record_blobs, backfill_repo_rev, backfill_user_blocks, start_scheduled_tasks}; 9use tranquil_pds::state::AppState; 10 11#[tokio::main] 12async fn main() -> ExitCode { 13 dotenvy::dotenv().ok(); 14 tracing_subscriber::fmt::init(); 15 tranquil_pds::metrics::init_metrics(); 16 17 match run().await { 18 Ok(()) => ExitCode::SUCCESS, 19 Err(e) => { 20 error!("Fatal error: {}", e); 21 ExitCode::FAILURE 22 } 23 } 24} 25 26async fn run() -> Result<(), Box<dyn std::error::Error>> { 27 let state = AppState::new().await?; 28 tranquil_pds::sync::listener::start_sequencer_listener(state.clone()).await; 29 30 let (shutdown_tx, shutdown_rx) = watch::channel(false); 31 32 let backfill_db = state.db.clone(); 33 let backfill_block_store = state.block_store.clone(); 34 tokio::spawn(async move { 35 backfill_genesis_commit_blocks(&backfill_db, backfill_block_store.clone()).await; 36 backfill_repo_rev(&backfill_db, backfill_block_store.clone()).await; 37 backfill_user_blocks(&backfill_db, backfill_block_store.clone()).await; 38 backfill_record_blobs(&backfill_db, backfill_block_store).await; 39 }); 40 41 let mut comms_service = CommsService::new(state.db.clone()); 42 43 if let Some(email_sender) = EmailSender::from_env() { 44 info!("Email comms enabled"); 45 comms_service = comms_service.register_sender(email_sender); 46 } else { 47 warn!("Email comms disabled (MAIL_FROM_ADDRESS not set)"); 48 } 49 50 if let Some(discord_sender) = DiscordSender::from_env() { 51 info!("Discord comms enabled"); 52 comms_service = comms_service.register_sender(discord_sender); 53 } 54 55 if let Some(telegram_sender) = TelegramSender::from_env() { 56 info!("Telegram comms enabled"); 57 comms_service = comms_service.register_sender(telegram_sender); 58 } 59 60 if let Some(signal_sender) = SignalSender::from_env() { 61 info!("Signal comms enabled"); 62 comms_service = comms_service.register_sender(signal_sender); 63 } 64 65 let comms_handle = tokio::spawn(comms_service.run(shutdown_rx.clone())); 66 67 let crawlers_handle = if let Some(crawlers) = Crawlers::from_env() { 68 let crawlers = Arc::new( 69 crawlers.with_circuit_breaker(state.circuit_breakers.relay_notification.clone()), 70 ); 71 let firehose_rx = state.firehose_tx.subscribe(); 72 info!("Crawlers notification service enabled"); 73 Some(tokio::spawn(start_crawlers_service( 74 crawlers, 75 firehose_rx, 76 shutdown_rx.clone(), 77 ))) 78 } else { 79 warn!("Crawlers notification service disabled (PDS_HOSTNAME or CRAWLERS not set)"); 80 None 81 }; 82 83 let scheduled_handle = tokio::spawn(start_scheduled_tasks( 84 state.db.clone(), 85 state.blob_store.clone(), 86 shutdown_rx, 87 )); 88 89 let app = tranquil_pds::app(state); 90 let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); 91 info!("listening on {}", addr); 92 93 let listener = tokio::net::TcpListener::bind(addr) 94 .await 95 .map_err(|e| format!("Failed to bind to {}: {}", addr, e))?; 96 97 let server_result = axum::serve(listener, app) 98 .with_graceful_shutdown(shutdown_signal(shutdown_tx)) 99 .await; 100 101 comms_handle.await.ok(); 102 103 if let Some(handle) = crawlers_handle { 104 handle.await.ok(); 105 } 106 107 scheduled_handle.await.ok(); 108 109 if let Err(e) = server_result { 110 return Err(format!("Server error: {}", e).into()); 111 } 112 113 Ok(()) 114} 115 116async fn shutdown_signal(shutdown_tx: watch::Sender<bool>) { 117 let ctrl_c = async { 118 match tokio::signal::ctrl_c().await { 119 Ok(()) => {} 120 Err(e) => { 121 error!("Failed to install Ctrl+C handler: {}", e); 122 } 123 } 124 }; 125 126 #[cfg(unix)] 127 let terminate = async { 128 match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) { 129 Ok(mut signal) => { 130 signal.recv().await; 131 } 132 Err(e) => { 133 error!("Failed to install SIGTERM handler: {}", e); 134 std::future::pending::<()>().await; 135 } 136 } 137 }; 138 139 #[cfg(not(unix))] 140 let terminate = std::future::pending::<()>(); 141 142 tokio::select! { 143 _ = ctrl_c => {}, 144 _ = terminate => {}, 145 } 146 147 info!("Shutdown signal received, stopping services..."); 148 shutdown_tx.send(true).ok(); 149}