kind of like tap but different and in rust
at main 234 lines 7.5 kB view raw
1use futures::{FutureExt, TryFutureExt, future::BoxFuture}; 2use hydrant::config::{Config, SignatureVerification}; 3use hydrant::crawler::Crawler; 4use hydrant::db::{self, set_firehose_cursor}; 5use hydrant::ingest::firehose::FirehoseIngestor; 6use hydrant::state::AppState; 7use hydrant::{api, backfill::BackfillWorker, ingest::worker::FirehoseWorker}; 8use miette::IntoDiagnostic; 9use mimalloc::MiMalloc; 10use std::sync::Arc; 11use std::sync::atomic::Ordering; 12use tokio::{sync::mpsc, task::spawn_blocking}; 13use tracing::{error, info}; 14 15#[global_allocator] 16static GLOBAL: MiMalloc = MiMalloc; 17 18#[tokio::main] 19async fn main() -> miette::Result<()> { 20 let cfg = Config::from_env()?; 21 22 let env_filter = tracing_subscriber::EnvFilter::new(&cfg.log_level); 23 tracing_subscriber::fmt().with_env_filter(env_filter).init(); 24 25 info!("{cfg}"); 26 27 let state = AppState::new(&cfg)?; 28 29 if cfg.full_network { 30 let filter_ks = state.db.filter.clone(); 31 let inner = state.db.inner.clone(); 32 tokio::task::spawn_blocking(move || { 33 use hydrant::filter::{FilterMode, MODE_KEY}; 34 let mut batch = inner.batch(); 35 batch.insert( 36 &filter_ks, 37 MODE_KEY, 38 rmp_serde::to_vec(&FilterMode::Full).into_diagnostic()?, 39 ); 40 batch.commit().into_diagnostic() 41 }) 42 .await 43 .into_diagnostic()??; 44 45 let new_filter = hydrant::db::filter::load(&state.db.filter)?; 46 state.filter.store(new_filter.into()); 47 } 48 49 let (buffer_tx, buffer_rx) = mpsc::unbounded_channel(); 50 let state = Arc::new(state); 51 52 if !cfg.disable_backfill { 53 tokio::spawn({ 54 let state = state.clone(); 55 let timeout = cfg.repo_fetch_timeout; 56 BackfillWorker::new( 57 state, 58 buffer_tx.clone(), 59 timeout, 60 cfg.backfill_concurrency_limit, 61 matches!( 62 cfg.verify_signatures, 63 SignatureVerification::Full | SignatureVerification::BackfillOnly 64 ), 65 ) 66 .run() 67 }); 68 } 69 70 if let Err(e) = spawn_blocking({ 71 let state = state.clone(); 72 move || hydrant::backfill::manager::queue_gone_backfills(&state) 73 }) 74 .await 75 .into_diagnostic()? 76 { 77 error!("failed to queue gone backfills: {e}"); 78 db::check_poisoned_report(&e); 79 } 80 81 std::thread::spawn({ 82 let state = state.clone(); 83 move || hydrant::backfill::manager::retry_worker(state) 84 }); 85 86 tokio::spawn({ 87 let state = state.clone(); 88 async move { 89 let mut last_id = state.db.next_event_id.load(Ordering::Relaxed); 90 let mut last_time = std::time::Instant::now(); 91 let mut interval = tokio::time::interval(std::time::Duration::from_secs(60)); 92 93 loop { 94 interval.tick().await; 95 96 let current_id = state.db.next_event_id.load(Ordering::Relaxed); 97 let current_time = std::time::Instant::now(); 98 99 let delta = current_id.saturating_sub(last_id); 100 let elapsed = current_time.duration_since(last_time).as_secs_f64(); 101 let rate = if elapsed > 0.0 { 102 delta as f64 / elapsed 103 } else { 104 0.0 105 }; 106 107 info!("{rate:.2} events/s ({delta} events in {elapsed:.1}s)"); 108 109 last_id = current_id; 110 last_time = current_time; 111 } 112 } 113 }); 114 115 std::thread::spawn({ 116 let state = state.clone(); 117 let persist_interval = cfg.cursor_save_interval; 118 119 move || { 120 info!("persistence worker started"); 121 loop { 122 std::thread::sleep(persist_interval); 123 124 // persist firehose cursor 125 let seq = state.cur_firehose.load(Ordering::SeqCst); 126 if let Err(e) = set_firehose_cursor(&state.db, seq) { 127 error!("failed to save cursor: {e}"); 128 db::check_poisoned_report(&e); 129 } 130 131 // persist counts 132 // TODO: make this more durable 133 if let Err(e) = db::persist_counts(&state.db) { 134 error!("failed to persist counts: {e}"); 135 db::check_poisoned_report(&e); 136 } 137 138 // persist journal 139 if let Err(e) = state.db.persist() { 140 error!("db persist failed: {e}"); 141 db::check_poisoned_report(&e); 142 } 143 } 144 } 145 }); 146 147 if let hydrant::filter::FilterMode::Full | hydrant::filter::FilterMode::Signal = 148 state.filter.load().mode 149 { 150 tokio::spawn( 151 Crawler::new( 152 state.clone(), 153 cfg.relay_host.clone(), 154 cfg.crawler_max_pending_repos, 155 cfg.crawler_resume_pending_repos, 156 ) 157 .run() 158 .inspect_err(|e| { 159 error!("crawler died: {e}"); 160 db::check_poisoned_report(&e); 161 }), 162 ); 163 } 164 165 let mut tasks = if !cfg.disable_firehose { 166 let firehose_worker = std::thread::spawn({ 167 let state = state.clone(); 168 let handle = tokio::runtime::Handle::current(); 169 move || { 170 FirehoseWorker::new( 171 state, 172 buffer_rx, 173 matches!(cfg.verify_signatures, SignatureVerification::Full), 174 cfg.firehose_workers, 175 ) 176 .run(handle) 177 } 178 }); 179 180 let ingestor = FirehoseIngestor::new( 181 state.clone(), 182 buffer_tx, 183 cfg.relay_host, 184 state.filter.clone(), 185 matches!(cfg.verify_signatures, SignatureVerification::Full), 186 ); 187 188 vec![ 189 Box::pin( 190 tokio::task::spawn_blocking(move || { 191 firehose_worker 192 .join() 193 .map_err(|e| miette::miette!("buffer processor died: {e:?}")) 194 }) 195 .map(|r| r.into_diagnostic().flatten().flatten()), 196 ) as BoxFuture<_>, 197 Box::pin(ingestor.run()), 198 ] 199 } else { 200 info!("firehose ingestion disabled by config"); 201 // if firehose is disabled, we just wait indefinitely (or until signal) 202 // essentially we just want to keep the main thread alive for the other components 203 vec![Box::pin(futures::future::pending::<miette::Result<()>>()) as BoxFuture<_>] 204 }; 205 206 let state_api = state.clone(); 207 tasks.push(Box::pin(async move { 208 api::serve(state_api, cfg.api_port) 209 .await 210 .map_err(|e| miette::miette!("API server failed: {e}")) 211 }) as BoxFuture<_>); 212 213 if cfg.enable_debug { 214 let state_debug = state.clone(); 215 tasks.push(Box::pin(async move { 216 api::serve_debug(state_debug, cfg.debug_port) 217 .await 218 .map_err(|e| miette::miette!("debug server failed: {e}")) 219 }) as BoxFuture<_>); 220 } 221 222 let res = futures::future::select_all(tasks); 223 if let (Err(e), _, _) = res.await { 224 error!("critical worker died: {e}"); 225 db::check_poisoned_report(&e); 226 } 227 228 if let Err(e) = state.db.persist() { 229 db::check_poisoned_report(&e); 230 return Err(e); 231 } 232 233 Ok(()) 234}