at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall
at e558361eb571e7e019a4a7967bb4ae7e666f3f25 277 lines 9.0 kB view raw
1use futures::{FutureExt, future::BoxFuture}; 2use hydrant::config::{Config, SignatureVerification}; 3use hydrant::db::{self, set_firehose_cursor}; 4use hydrant::ingest::firehose::FirehoseIngestor; 5use hydrant::state::AppState; 6use hydrant::{api, backfill::BackfillWorker, ingest::worker::FirehoseWorker}; 7use miette::IntoDiagnostic; 8use mimalloc::MiMalloc; 9use std::sync::Arc; 10use std::sync::atomic::Ordering; 11use tokio::{sync::mpsc, task::spawn_blocking}; 12use tracing::{debug, error, info}; 13 14#[global_allocator] 15static GLOBAL: MiMalloc = MiMalloc; 16 17#[tokio::main] 18async fn main() -> miette::Result<()> { 19 rustls::crypto::aws_lc_rs::default_provider() 20 .install_default() 21 .ok(); 22 23 let cfg = Config::from_env()?; 24 25 let env_filter = tracing_subscriber::EnvFilter::new(&cfg.log_level); 26 tracing_subscriber::fmt().with_env_filter(env_filter).init(); 27 28 info!("{cfg}"); 29 30 let state = AppState::new(&cfg)?; 31 32 if cfg.full_network 33 || cfg.filter_signals.is_some() 34 || cfg.filter_collections.is_some() 35 || cfg.filter_excludes.is_some() 36 { 37 let filter_ks = state.db.filter.clone(); 38 let inner = state.db.inner.clone(); 39 let full_network = cfg.full_network; 40 let signals = cfg.filter_signals.clone(); 41 let collections = cfg.filter_collections.clone(); 42 let excludes = cfg.filter_excludes.clone(); 43 44 tokio::task::spawn_blocking(move || { 45 use hydrant::filter::{FilterMode, SetUpdate}; 46 let mut batch = inner.batch(); 47 48 let mode = if full_network { 49 Some(FilterMode::Full) 50 } else { 51 None 52 }; 53 54 let signals_update = signals.map(SetUpdate::Set); 55 let collections_update = collections.map(SetUpdate::Set); 56 let excludes_update = excludes.map(SetUpdate::Set); 57 58 hydrant::db::filter::apply_patch( 59 &mut batch, 60 &filter_ks, 61 mode, 62 signals_update, 63 collections_update, 64 excludes_update, 65 )?; 66 67 batch.commit().into_diagnostic() 68 }) 69 .await 70 .into_diagnostic()??; 71 72 let new_filter = hydrant::db::filter::load(&state.db.filter)?; 73 state.filter.store(new_filter.into()); 74 } 75 76 let (buffer_tx, buffer_rx) = mpsc::unbounded_channel(); 77 let state = Arc::new(state); 78 79 if cfg.enable_backfill { 80 tokio::spawn({ 81 let state = state.clone(); 82 let timeout = cfg.repo_fetch_timeout; 83 BackfillWorker::new( 84 state, 85 buffer_tx.clone(), 86 timeout, 87 cfg.backfill_concurrency_limit, 88 matches!( 89 cfg.verify_signatures, 90 SignatureVerification::Full | SignatureVerification::BackfillOnly 91 ), 92 ) 93 .run() 94 }); 95 } 96 97 if let Err(e) = spawn_blocking({ 98 let state = state.clone(); 99 move || hydrant::backfill::manager::queue_gone_backfills(&state) 100 }) 101 .await 102 .into_diagnostic()? 103 { 104 error!(err = %e, "failed to queue gone backfills"); 105 db::check_poisoned_report(&e); 106 } 107 108 std::thread::spawn({ 109 let state = state.clone(); 110 move || hydrant::backfill::manager::retry_worker(state) 111 }); 112 113 tokio::spawn({ 114 let state = state.clone(); 115 let mut last_id = state.db.next_event_id.load(Ordering::Relaxed); 116 let mut last_time = std::time::Instant::now(); 117 let mut interval = tokio::time::interval(std::time::Duration::from_secs(60)); 118 async move { 119 loop { 120 interval.tick().await; 121 122 let current_id = state.db.next_event_id.load(Ordering::Relaxed); 123 let current_time = std::time::Instant::now(); 124 125 let delta = current_id.saturating_sub(last_id); 126 if delta == 0 { 127 debug!("no new events in 60s"); 128 continue; 129 } 130 131 let elapsed = current_time.duration_since(last_time).as_secs_f64(); 132 let rate = if elapsed > 0.0 { 133 delta as f64 / elapsed 134 } else { 135 0.0 136 }; 137 138 info!("{rate:.2} events/s ({delta} events in {elapsed:.1}s)"); 139 140 last_id = current_id; 141 last_time = current_time; 142 } 143 } 144 }); 145 146 std::thread::spawn({ 147 let state = state.clone(); 148 let persist_interval = cfg.cursor_save_interval; 149 150 move || { 151 info!("persistence worker started"); 152 loop { 153 std::thread::sleep(persist_interval); 154 155 // persist firehose cursor 156 let seq = state.cur_firehose.load(Ordering::SeqCst); 157 if let Err(e) = set_firehose_cursor(&state.db, seq) { 158 error!(err = %e, "failed to save cursor"); 159 db::check_poisoned_report(&e); 160 } 161 162 // persist counts 163 // TODO: make this more durable 164 if let Err(e) = db::persist_counts(&state.db) { 165 error!(err = %e, "failed to persist counts"); 166 db::check_poisoned_report(&e); 167 } 168 169 // persist journal 170 if let Err(e) = state.db.persist() { 171 error!(err = %e, "db persist failed"); 172 db::check_poisoned_report(&e); 173 } 174 } 175 } 176 }); 177 178 info!("starting crawler ({:?})", state.filter.load().mode); 179 let state_clone = state.clone(); 180 let relay_host_clone = cfg.relay_host.clone(); 181 let crawler_max_pending = cfg.crawler_max_pending_repos; 182 let crawler_resume_pending = cfg.crawler_resume_pending_repos; 183 184 let should_run_crawler = match cfg.enable_crawler { 185 Some(true) => true, 186 Some(false) => false, 187 None => state.filter.load().mode == hydrant::filter::FilterMode::Full, 188 }; 189 190 if should_run_crawler { 191 tokio::spawn(async move { 192 // the crawler is responsible for finding new repos 193 let crawler = hydrant::crawler::Crawler::new( 194 state_clone, 195 relay_host_clone, 196 crawler_max_pending, 197 crawler_resume_pending, 198 ); 199 if let Err(e) = crawler.run().await { 200 error!(err = %e, "crawler error"); 201 db::check_poisoned_report(&e); 202 } 203 }); 204 } else { 205 info!("crawler disabled by config or filter mode"); 206 } 207 208 let mut tasks = if cfg.enable_firehose { 209 let firehose_worker = std::thread::spawn({ 210 let state = state.clone(); 211 let handle = tokio::runtime::Handle::current(); 212 move || { 213 FirehoseWorker::new( 214 state, 215 buffer_rx, 216 matches!(cfg.verify_signatures, SignatureVerification::Full), 217 cfg.firehose_workers, 218 ) 219 .run(handle) 220 } 221 }); 222 223 let ingestor = FirehoseIngestor::new( 224 state.clone(), 225 buffer_tx, 226 cfg.relay_host, 227 state.filter.clone(), 228 matches!(cfg.verify_signatures, SignatureVerification::Full), 229 ); 230 231 vec![ 232 Box::pin( 233 tokio::task::spawn_blocking(move || { 234 firehose_worker 235 .join() 236 .map_err(|e| miette::miette!("buffer processor died: {e:?}")) 237 }) 238 .map(|r| r.into_diagnostic().flatten().flatten()), 239 ) as BoxFuture<_>, 240 Box::pin(ingestor.run()), 241 ] 242 } else { 243 info!("firehose ingestion disabled by config"); 244 // if firehose is disabled, we just wait indefinitely (or until signal) 245 // essentially we just want to keep the main thread alive for the other components 246 vec![Box::pin(futures::future::pending::<miette::Result<()>>()) as BoxFuture<_>] 247 }; 248 249 let state_api = state.clone(); 250 tasks.push(Box::pin(async move { 251 api::serve(state_api, cfg.api_port) 252 .await 253 .map_err(|e| miette::miette!("API server failed: {e}")) 254 }) as BoxFuture<_>); 255 256 if cfg.enable_debug { 257 let state_debug = state.clone(); 258 tasks.push(Box::pin(async move { 259 api::serve_debug(state_debug, cfg.debug_port) 260 .await 261 .map_err(|e| miette::miette!("debug server failed: {e}")) 262 }) as BoxFuture<_>); 263 } 264 265 let res = futures::future::select_all(tasks); 266 if let (Err(e), _, _) = res.await { 267 error!(err = %e, "critical worker died"); 268 db::check_poisoned_report(&e); 269 } 270 271 if let Err(e) = state.db.persist() { 272 db::check_poisoned_report(&e); 273 return Err(e); 274 } 275 276 Ok(()) 277}