at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall
at 4fe44f8a28620861fb772d40d55fbeb65716e5df 246 lines 8.1 kB view raw
1use futures::{FutureExt, future::BoxFuture}; 2use hydrant::config::{Config, SignatureVerification}; 3use hydrant::db::{self, set_firehose_cursor}; 4use hydrant::ingest::firehose::FirehoseIngestor; 5use hydrant::state::AppState; 6use hydrant::{api, backfill::BackfillWorker, ingest::worker::FirehoseWorker}; 7use miette::IntoDiagnostic; 8use mimalloc::MiMalloc; 9use std::sync::Arc; 10use std::sync::atomic::Ordering; 11use tokio::{sync::mpsc, task::spawn_blocking}; 12use tracing::{error, info}; 13 14#[global_allocator] 15static GLOBAL: MiMalloc = MiMalloc; 16 17#[tokio::main] 18async fn main() -> miette::Result<()> { 19 let cfg = Config::from_env()?; 20 21 let env_filter = tracing_subscriber::EnvFilter::new(&cfg.log_level); 22 tracing_subscriber::fmt().with_env_filter(env_filter).init(); 23 24 info!("{cfg}"); 25 26 let state = AppState::new(&cfg)?; 27 28 if cfg.full_network { 29 let filter_ks = state.db.filter.clone(); 30 let inner = state.db.inner.clone(); 31 tokio::task::spawn_blocking(move || { 32 use hydrant::db::filter::MODE_KEY; 33 use hydrant::filter::FilterMode; 34 let mut batch = inner.batch(); 35 batch.insert( 36 &filter_ks, 37 MODE_KEY, 38 rmp_serde::to_vec(&FilterMode::Full).into_diagnostic()?, 39 ); 40 batch.commit().into_diagnostic() 41 }) 42 .await 43 .into_diagnostic()??; 44 45 let new_filter = hydrant::db::filter::load(&state.db.filter)?; 46 state.filter.store(new_filter.into()); 47 } 48 49 let (buffer_tx, buffer_rx) = mpsc::unbounded_channel(); 50 let state = Arc::new(state); 51 52 if cfg.enable_backfill { 53 tokio::spawn({ 54 let state = state.clone(); 55 let timeout = cfg.repo_fetch_timeout; 56 BackfillWorker::new( 57 state, 58 buffer_tx.clone(), 59 timeout, 60 cfg.backfill_concurrency_limit, 61 matches!( 62 cfg.verify_signatures, 63 SignatureVerification::Full | SignatureVerification::BackfillOnly 64 ), 65 ) 66 .run() 67 }); 68 } 69 70 if let Err(e) = spawn_blocking({ 71 let state = state.clone(); 72 move || hydrant::backfill::manager::queue_gone_backfills(&state) 73 }) 74 .await 75 .into_diagnostic()? 76 { 77 error!("failed to queue gone backfills: {e}"); 78 db::check_poisoned_report(&e); 79 } 80 81 std::thread::spawn({ 82 let state = state.clone(); 83 move || hydrant::backfill::manager::retry_worker(state) 84 }); 85 86 tokio::spawn({ 87 let state = state.clone(); 88 async move { 89 let mut last_id = state.db.next_event_id.load(Ordering::Relaxed); 90 let mut last_time = std::time::Instant::now(); 91 let mut interval = tokio::time::interval(std::time::Duration::from_secs(60)); 92 93 loop { 94 interval.tick().await; 95 96 let current_id = state.db.next_event_id.load(Ordering::Relaxed); 97 let current_time = std::time::Instant::now(); 98 99 let delta = current_id.saturating_sub(last_id); 100 let elapsed = current_time.duration_since(last_time).as_secs_f64(); 101 let rate = if elapsed > 0.0 { 102 delta as f64 / elapsed 103 } else { 104 0.0 105 }; 106 107 info!("{rate:.2} events/s ({delta} events in {elapsed:.1}s)"); 108 109 last_id = current_id; 110 last_time = current_time; 111 } 112 } 113 }); 114 115 std::thread::spawn({ 116 let state = state.clone(); 117 let persist_interval = cfg.cursor_save_interval; 118 119 move || { 120 info!("persistence worker started"); 121 loop { 122 std::thread::sleep(persist_interval); 123 124 // persist firehose cursor 125 let seq = state.cur_firehose.load(Ordering::SeqCst); 126 if let Err(e) = set_firehose_cursor(&state.db, seq) { 127 error!("failed to save cursor: {e}"); 128 db::check_poisoned_report(&e); 129 } 130 131 // persist counts 132 // TODO: make this more durable 133 if let Err(e) = db::persist_counts(&state.db) { 134 error!("failed to persist counts: {e}"); 135 db::check_poisoned_report(&e); 136 } 137 138 // persist journal 139 if let Err(e) = state.db.persist() { 140 error!("db persist failed: {e}"); 141 db::check_poisoned_report(&e); 142 } 143 } 144 } 145 }); 146 147 info!("starting crawler ({:?})", state.filter.load().mode); 148 let state_clone = state.clone(); 149 let relay_host_clone = cfg.relay_host.clone(); 150 let crawler_max_pending = cfg.crawler_max_pending_repos; 151 let crawler_resume_pending = cfg.crawler_resume_pending_repos; 152 153 let should_run_crawler = match cfg.enable_crawler { 154 Some(true) => true, 155 Some(false) => false, 156 None => state.filter.load().mode == hydrant::filter::FilterMode::Full, 157 }; 158 159 if should_run_crawler { 160 tokio::spawn(async move { 161 // the crawler is responsible for finding new repos 162 let crawler = hydrant::crawler::Crawler::new( 163 state_clone, 164 relay_host_clone, 165 crawler_max_pending, 166 crawler_resume_pending, 167 ); 168 if let Err(e) = crawler.run().await { 169 error!("crawler error: {e}"); 170 db::check_poisoned_report(&e); 171 } 172 }); 173 } else { 174 info!("crawler disabled by config or filter mode"); 175 } 176 177 let mut tasks = if cfg.enable_firehose { 178 let firehose_worker = std::thread::spawn({ 179 let state = state.clone(); 180 let handle = tokio::runtime::Handle::current(); 181 move || { 182 FirehoseWorker::new( 183 state, 184 buffer_rx, 185 matches!(cfg.verify_signatures, SignatureVerification::Full), 186 cfg.firehose_workers, 187 ) 188 .run(handle) 189 } 190 }); 191 192 let ingestor = FirehoseIngestor::new( 193 state.clone(), 194 buffer_tx, 195 cfg.relay_host, 196 state.filter.clone(), 197 matches!(cfg.verify_signatures, SignatureVerification::Full), 198 ); 199 200 vec![ 201 Box::pin( 202 tokio::task::spawn_blocking(move || { 203 firehose_worker 204 .join() 205 .map_err(|e| miette::miette!("buffer processor died: {e:?}")) 206 }) 207 .map(|r| r.into_diagnostic().flatten().flatten()), 208 ) as BoxFuture<_>, 209 Box::pin(ingestor.run()), 210 ] 211 } else { 212 info!("firehose ingestion disabled by config"); 213 // if firehose is disabled, we just wait indefinitely (or until signal) 214 // essentially we just want to keep the main thread alive for the other components 215 vec![Box::pin(futures::future::pending::<miette::Result<()>>()) as BoxFuture<_>] 216 }; 217 218 let state_api = state.clone(); 219 tasks.push(Box::pin(async move { 220 api::serve(state_api, cfg.api_port) 221 .await 222 .map_err(|e| miette::miette!("API server failed: {e}")) 223 }) as BoxFuture<_>); 224 225 if cfg.enable_debug { 226 let state_debug = state.clone(); 227 tasks.push(Box::pin(async move { 228 api::serve_debug(state_debug, cfg.debug_port) 229 .await 230 .map_err(|e| miette::miette!("debug server failed: {e}")) 231 }) as BoxFuture<_>); 232 } 233 234 let res = futures::future::select_all(tasks); 235 if let (Err(e), _, _) = res.await { 236 error!("critical worker died: {e}"); 237 db::check_poisoned_report(&e); 238 } 239 240 if let Err(e) = state.db.persist() { 241 db::check_poisoned_report(&e); 242 return Err(e); 243 } 244 245 Ok(()) 246}