at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall
at 3eb43b6191a55f71f8327cf6d64290a168d60cff 206 lines 6.7 kB view raw
1use futures::{FutureExt, TryFutureExt, future::BoxFuture}; 2use hydrant::config::{Config, SignatureVerification}; 3use hydrant::crawler::Crawler; 4use hydrant::db::{self, set_firehose_cursor}; 5use hydrant::ingest::firehose::FirehoseIngestor; 6use hydrant::state::AppState; 7use hydrant::{api, backfill::BackfillWorker, ingest::worker::FirehoseWorker}; 8use miette::IntoDiagnostic; 9use mimalloc::MiMalloc; 10use std::sync::Arc; 11use std::sync::atomic::Ordering; 12use tokio::{sync::mpsc, task::spawn_blocking}; 13use tracing::{error, info}; 14 15#[global_allocator] 16static GLOBAL: MiMalloc = MiMalloc; 17 18#[tokio::main] 19async fn main() -> miette::Result<()> { 20 let cfg = Config::from_env()?; 21 22 let env_filter = tracing_subscriber::EnvFilter::new(&cfg.log_level); 23 tracing_subscriber::fmt().with_env_filter(env_filter).init(); 24 25 info!("{cfg}"); 26 27 let state = AppState::new(&cfg)?; 28 let (buffer_tx, buffer_rx) = mpsc::unbounded_channel(); 29 let state = Arc::new(state); 30 31 if !cfg.disable_backfill { 32 tokio::spawn({ 33 let state = state.clone(); 34 let timeout = cfg.repo_fetch_timeout; 35 BackfillWorker::new( 36 state, 37 buffer_tx.clone(), 38 timeout, 39 cfg.backfill_concurrency_limit, 40 matches!( 41 cfg.verify_signatures, 42 SignatureVerification::Full | SignatureVerification::BackfillOnly 43 ), 44 ) 45 .run() 46 }); 47 } 48 49 if let Err(e) = spawn_blocking({ 50 let state = state.clone(); 51 move || hydrant::backfill::manager::queue_gone_backfills(&state) 52 }) 53 .await 54 .into_diagnostic()? 55 { 56 error!("failed to queue gone backfills: {e}"); 57 db::check_poisoned_report(&e); 58 } 59 60 std::thread::spawn({ 61 let state = state.clone(); 62 move || hydrant::backfill::manager::retry_worker(state) 63 }); 64 65 tokio::spawn({ 66 let state = state.clone(); 67 async move { 68 let mut last_id = state.db.next_event_id.load(Ordering::Relaxed); 69 let mut last_time = std::time::Instant::now(); 70 let mut interval = tokio::time::interval(std::time::Duration::from_secs(60)); 71 72 loop { 73 interval.tick().await; 74 75 let current_id = state.db.next_event_id.load(Ordering::Relaxed); 76 let current_time = std::time::Instant::now(); 77 78 let delta = current_id.saturating_sub(last_id); 79 let elapsed = current_time.duration_since(last_time).as_secs_f64(); 80 let rate = if elapsed > 0.0 { 81 delta as f64 / elapsed 82 } else { 83 0.0 84 }; 85 86 info!("{rate:.2} events/s ({delta} events in {elapsed:.1}s)"); 87 88 last_id = current_id; 89 last_time = current_time; 90 } 91 } 92 }); 93 94 std::thread::spawn({ 95 let state = state.clone(); 96 let persist_interval = cfg.cursor_save_interval; 97 98 move || { 99 info!("persistence worker started"); 100 loop { 101 std::thread::sleep(persist_interval); 102 103 // persist firehose cursor 104 let seq = state.cur_firehose.load(Ordering::SeqCst); 105 if let Err(e) = set_firehose_cursor(&state.db, seq) { 106 error!("failed to save cursor: {e}"); 107 db::check_poisoned_report(&e); 108 } 109 110 // persist counts 111 // TODO: make this more durable 112 if let Err(e) = db::persist_counts(&state.db) { 113 error!("failed to persist counts: {e}"); 114 db::check_poisoned_report(&e); 115 } 116 117 // persist journal 118 if let Err(e) = state.db.persist() { 119 error!("db persist failed: {e}"); 120 db::check_poisoned_report(&e); 121 } 122 } 123 } 124 }); 125 126 if cfg.full_network { 127 tokio::spawn( 128 Crawler::new(state.clone(), cfg.relay_host.clone()) 129 .run() 130 .inspect_err(|e| { 131 error!("crawler died: {e}"); 132 db::check_poisoned_report(&e); 133 }), 134 ); 135 } 136 137 let mut tasks = if !cfg.disable_firehose { 138 let firehose_worker = std::thread::spawn({ 139 let state = state.clone(); 140 let handle = tokio::runtime::Handle::current(); 141 move || { 142 FirehoseWorker::new( 143 state, 144 buffer_rx, 145 matches!(cfg.verify_signatures, SignatureVerification::Full), 146 cfg.firehose_workers, 147 ) 148 .run(handle) 149 } 150 }); 151 152 let ingestor = FirehoseIngestor::new( 153 state.clone(), 154 buffer_tx, 155 cfg.relay_host, 156 cfg.full_network, 157 matches!(cfg.verify_signatures, SignatureVerification::Full), 158 ); 159 160 vec![ 161 Box::pin( 162 tokio::task::spawn_blocking(move || { 163 firehose_worker 164 .join() 165 .map_err(|e| miette::miette!("buffer processor died: {e:?}")) 166 }) 167 .map(|r| r.into_diagnostic().flatten().flatten()), 168 ) as BoxFuture<_>, 169 Box::pin(ingestor.run()), 170 ] 171 } else { 172 info!("firehose ingestion disabled by config"); 173 // if firehose is disabled, we just wait indefinitely (or until signal) 174 // essentially we just want to keep the main thread alive for the other components 175 vec![Box::pin(futures::future::pending::<miette::Result<()>>()) as BoxFuture<_>] 176 }; 177 178 let state_api = state.clone(); 179 tasks.push(Box::pin(async move { 180 api::serve(state_api, cfg.api_port) 181 .await 182 .map_err(|e| miette::miette!("API server failed: {e}")) 183 }) as BoxFuture<_>); 184 185 if cfg.enable_debug { 186 let state_debug = state.clone(); 187 tasks.push(Box::pin(async move { 188 api::serve_debug(state_debug, cfg.debug_port) 189 .await 190 .map_err(|e| miette::miette!("debug server failed: {e}")) 191 }) as BoxFuture<_>); 192 } 193 194 let res = futures::future::select_all(tasks); 195 if let (Err(e), _, _) = res.await { 196 error!("critical worker died: {e}"); 197 db::check_poisoned_report(&e); 198 } 199 200 if let Err(e) = state.db.persist() { 201 db::check_poisoned_report(&e); 202 return Err(e); 203 } 204 205 Ok(()) 206}