at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
1use futures::{FutureExt, future::BoxFuture};
2use hydrant::config::{Config, SignatureVerification};
3use hydrant::db::{self, set_firehose_cursor};
4use hydrant::ingest::firehose::FirehoseIngestor;
5use hydrant::state::AppState;
6use hydrant::{api, backfill::BackfillWorker, ingest::worker::FirehoseWorker};
7use miette::IntoDiagnostic;
8use mimalloc::MiMalloc;
9use std::sync::Arc;
10use std::sync::atomic::Ordering;
11use tokio::{sync::mpsc, task::spawn_blocking};
12use tracing::{error, info};
13
14#[global_allocator]
15static GLOBAL: MiMalloc = MiMalloc;
16
17#[tokio::main]
18async fn main() -> miette::Result<()> {
19 let cfg = Config::from_env()?;
20
21 let env_filter = tracing_subscriber::EnvFilter::new(&cfg.log_level);
22 tracing_subscriber::fmt().with_env_filter(env_filter).init();
23
24 info!("{cfg}");
25
26 let state = AppState::new(&cfg)?;
27
28 if cfg.full_network {
29 let filter_ks = state.db.filter.clone();
30 let inner = state.db.inner.clone();
31 tokio::task::spawn_blocking(move || {
32 use hydrant::db::filter::MODE_KEY;
33 use hydrant::filter::FilterMode;
34 let mut batch = inner.batch();
35 batch.insert(
36 &filter_ks,
37 MODE_KEY,
38 rmp_serde::to_vec(&FilterMode::Full).into_diagnostic()?,
39 );
40 batch.commit().into_diagnostic()
41 })
42 .await
43 .into_diagnostic()??;
44
45 let new_filter = hydrant::db::filter::load(&state.db.filter)?;
46 state.filter.store(new_filter.into());
47 }
48
49 let (buffer_tx, buffer_rx) = mpsc::unbounded_channel();
50 let state = Arc::new(state);
51
52 if cfg.enable_backfill {
53 tokio::spawn({
54 let state = state.clone();
55 let timeout = cfg.repo_fetch_timeout;
56 BackfillWorker::new(
57 state,
58 buffer_tx.clone(),
59 timeout,
60 cfg.backfill_concurrency_limit,
61 matches!(
62 cfg.verify_signatures,
63 SignatureVerification::Full | SignatureVerification::BackfillOnly
64 ),
65 )
66 .run()
67 });
68 }
69
70 if let Err(e) = spawn_blocking({
71 let state = state.clone();
72 move || hydrant::backfill::manager::queue_gone_backfills(&state)
73 })
74 .await
75 .into_diagnostic()?
76 {
77 error!("failed to queue gone backfills: {e}");
78 db::check_poisoned_report(&e);
79 }
80
81 std::thread::spawn({
82 let state = state.clone();
83 move || hydrant::backfill::manager::retry_worker(state)
84 });
85
86 tokio::spawn({
87 let state = state.clone();
88 async move {
89 let mut last_id = state.db.next_event_id.load(Ordering::Relaxed);
90 let mut last_time = std::time::Instant::now();
91 let mut interval = tokio::time::interval(std::time::Duration::from_secs(60));
92
93 loop {
94 interval.tick().await;
95
96 let current_id = state.db.next_event_id.load(Ordering::Relaxed);
97 let current_time = std::time::Instant::now();
98
99 let delta = current_id.saturating_sub(last_id);
100 let elapsed = current_time.duration_since(last_time).as_secs_f64();
101 let rate = if elapsed > 0.0 {
102 delta as f64 / elapsed
103 } else {
104 0.0
105 };
106
107 info!("{rate:.2} events/s ({delta} events in {elapsed:.1}s)");
108
109 last_id = current_id;
110 last_time = current_time;
111 }
112 }
113 });
114
115 std::thread::spawn({
116 let state = state.clone();
117 let persist_interval = cfg.cursor_save_interval;
118
119 move || {
120 info!("persistence worker started");
121 loop {
122 std::thread::sleep(persist_interval);
123
124 // persist firehose cursor
125 let seq = state.cur_firehose.load(Ordering::SeqCst);
126 if let Err(e) = set_firehose_cursor(&state.db, seq) {
127 error!("failed to save cursor: {e}");
128 db::check_poisoned_report(&e);
129 }
130
131 // persist counts
132 // TODO: make this more durable
133 if let Err(e) = db::persist_counts(&state.db) {
134 error!("failed to persist counts: {e}");
135 db::check_poisoned_report(&e);
136 }
137
138 // persist journal
139 if let Err(e) = state.db.persist() {
140 error!("db persist failed: {e}");
141 db::check_poisoned_report(&e);
142 }
143 }
144 }
145 });
146
147 info!("starting crawler ({:?})", state.filter.load().mode);
148 let state_clone = state.clone();
149 let relay_host_clone = cfg.relay_host.clone();
150 let crawler_max_pending = cfg.crawler_max_pending_repos;
151 let crawler_resume_pending = cfg.crawler_resume_pending_repos;
152
153 let should_run_crawler = match cfg.enable_crawler {
154 Some(true) => true,
155 Some(false) => false,
156 None => state.filter.load().mode == hydrant::filter::FilterMode::Full,
157 };
158
159 if should_run_crawler {
160 tokio::spawn(async move {
161 // the crawler is responsible for finding new repos
162 let crawler = hydrant::crawler::Crawler::new(
163 state_clone,
164 relay_host_clone,
165 crawler_max_pending,
166 crawler_resume_pending,
167 );
168 if let Err(e) = crawler.run().await {
169 error!("crawler error: {e}");
170 db::check_poisoned_report(&e);
171 }
172 });
173 } else {
174 info!("crawler disabled by config or filter mode");
175 }
176
177 let mut tasks = if cfg.enable_firehose {
178 let firehose_worker = std::thread::spawn({
179 let state = state.clone();
180 let handle = tokio::runtime::Handle::current();
181 move || {
182 FirehoseWorker::new(
183 state,
184 buffer_rx,
185 matches!(cfg.verify_signatures, SignatureVerification::Full),
186 cfg.firehose_workers,
187 )
188 .run(handle)
189 }
190 });
191
192 let ingestor = FirehoseIngestor::new(
193 state.clone(),
194 buffer_tx,
195 cfg.relay_host,
196 state.filter.clone(),
197 matches!(cfg.verify_signatures, SignatureVerification::Full),
198 );
199
200 vec![
201 Box::pin(
202 tokio::task::spawn_blocking(move || {
203 firehose_worker
204 .join()
205 .map_err(|e| miette::miette!("buffer processor died: {e:?}"))
206 })
207 .map(|r| r.into_diagnostic().flatten().flatten()),
208 ) as BoxFuture<_>,
209 Box::pin(ingestor.run()),
210 ]
211 } else {
212 info!("firehose ingestion disabled by config");
213 // if firehose is disabled, we just wait indefinitely (or until signal)
214 // essentially we just want to keep the main thread alive for the other components
215 vec![Box::pin(futures::future::pending::<miette::Result<()>>()) as BoxFuture<_>]
216 };
217
218 let state_api = state.clone();
219 tasks.push(Box::pin(async move {
220 api::serve(state_api, cfg.api_port)
221 .await
222 .map_err(|e| miette::miette!("API server failed: {e}"))
223 }) as BoxFuture<_>);
224
225 if cfg.enable_debug {
226 let state_debug = state.clone();
227 tasks.push(Box::pin(async move {
228 api::serve_debug(state_debug, cfg.debug_port)
229 .await
230 .map_err(|e| miette::miette!("debug server failed: {e}"))
231 }) as BoxFuture<_>);
232 }
233
234 let res = futures::future::select_all(tasks);
235 if let (Err(e), _, _) = res.await {
236 error!("critical worker died: {e}");
237 db::check_poisoned_report(&e);
238 }
239
240 if let Err(e) = state.db.persist() {
241 db::check_poisoned_report(&e);
242 return Err(e);
243 }
244
245 Ok(())
246}