at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
1use futures::{FutureExt, future::BoxFuture};
2use hydrant::config::{Config, SignatureVerification};
3use hydrant::db::{self, set_firehose_cursor};
4use hydrant::ingest::firehose::FirehoseIngestor;
5use hydrant::state::AppState;
6use hydrant::{api, backfill::BackfillWorker, ingest::worker::FirehoseWorker};
7use miette::IntoDiagnostic;
8use mimalloc::MiMalloc;
9use std::sync::Arc;
10use std::sync::atomic::Ordering;
11use tokio::{sync::mpsc, task::spawn_blocking};
12use tracing::{debug, error, info};
13
14#[global_allocator]
15static GLOBAL: MiMalloc = MiMalloc;
16
17#[tokio::main]
18async fn main() -> miette::Result<()> {
19 rustls::crypto::aws_lc_rs::default_provider()
20 .install_default()
21 .ok();
22
23 let cfg = Config::from_env()?;
24
25 let env_filter = tracing_subscriber::EnvFilter::new(&cfg.log_level);
26 tracing_subscriber::fmt().with_env_filter(env_filter).init();
27
28 info!("{cfg}");
29
30 let state = AppState::new(&cfg)?;
31
32 if cfg.full_network
33 || cfg.filter_signals.is_some()
34 || cfg.filter_collections.is_some()
35 || cfg.filter_excludes.is_some()
36 {
37 let filter_ks = state.db.filter.clone();
38 let inner = state.db.inner.clone();
39 let full_network = cfg.full_network;
40 let signals = cfg.filter_signals.clone();
41 let collections = cfg.filter_collections.clone();
42 let excludes = cfg.filter_excludes.clone();
43
44 tokio::task::spawn_blocking(move || {
45 use hydrant::filter::{FilterMode, SetUpdate};
46 let mut batch = inner.batch();
47
48 let mode = if full_network {
49 Some(FilterMode::Full)
50 } else {
51 None
52 };
53
54 let signals_update = signals.map(SetUpdate::Set);
55 let collections_update = collections.map(SetUpdate::Set);
56 let excludes_update = excludes.map(SetUpdate::Set);
57
58 hydrant::db::filter::apply_patch(
59 &mut batch,
60 &filter_ks,
61 mode,
62 signals_update,
63 collections_update,
64 excludes_update,
65 )?;
66
67 batch.commit().into_diagnostic()
68 })
69 .await
70 .into_diagnostic()??;
71
72 let new_filter = hydrant::db::filter::load(&state.db.filter)?;
73 state.filter.store(new_filter.into());
74 }
75
76 let (buffer_tx, buffer_rx) = mpsc::unbounded_channel();
77 let state = Arc::new(state);
78
79 if cfg.enable_backfill {
80 tokio::spawn({
81 let state = state.clone();
82 let timeout = cfg.repo_fetch_timeout;
83 BackfillWorker::new(
84 state,
85 buffer_tx.clone(),
86 timeout,
87 cfg.backfill_concurrency_limit,
88 matches!(
89 cfg.verify_signatures,
90 SignatureVerification::Full | SignatureVerification::BackfillOnly
91 ),
92 )
93 .run()
94 });
95 }
96
97 if let Err(e) = spawn_blocking({
98 let state = state.clone();
99 move || hydrant::backfill::manager::queue_gone_backfills(&state)
100 })
101 .await
102 .into_diagnostic()?
103 {
104 error!(err = %e, "failed to queue gone backfills");
105 db::check_poisoned_report(&e);
106 }
107
108 std::thread::spawn({
109 let state = state.clone();
110 move || hydrant::backfill::manager::retry_worker(state)
111 });
112
113 tokio::spawn({
114 let state = state.clone();
115 let mut last_id = state.db.next_event_id.load(Ordering::Relaxed);
116 let mut last_time = std::time::Instant::now();
117 let mut interval = tokio::time::interval(std::time::Duration::from_secs(60));
118 async move {
119 loop {
120 interval.tick().await;
121
122 let current_id = state.db.next_event_id.load(Ordering::Relaxed);
123 let current_time = std::time::Instant::now();
124
125 let delta = current_id.saturating_sub(last_id);
126 if delta == 0 {
127 debug!("no new events in 60s");
128 continue;
129 }
130
131 let elapsed = current_time.duration_since(last_time).as_secs_f64();
132 let rate = if elapsed > 0.0 {
133 delta as f64 / elapsed
134 } else {
135 0.0
136 };
137
138 info!("{rate:.2} events/s ({delta} events in {elapsed:.1}s)");
139
140 last_id = current_id;
141 last_time = current_time;
142 }
143 }
144 });
145
146 std::thread::spawn({
147 let state = state.clone();
148 let persist_interval = cfg.cursor_save_interval;
149
150 move || {
151 info!("persistence worker started");
152 loop {
153 std::thread::sleep(persist_interval);
154
155 // persist firehose cursor
156 let seq = state.cur_firehose.load(Ordering::SeqCst);
157 if let Err(e) = set_firehose_cursor(&state.db, seq) {
158 error!(err = %e, "failed to save cursor");
159 db::check_poisoned_report(&e);
160 }
161
162 // persist counts
163 // TODO: make this more durable
164 if let Err(e) = db::persist_counts(&state.db) {
165 error!(err = %e, "failed to persist counts");
166 db::check_poisoned_report(&e);
167 }
168
169 // persist journal
170 if let Err(e) = state.db.persist() {
171 error!(err = %e, "db persist failed");
172 db::check_poisoned_report(&e);
173 }
174 }
175 }
176 });
177
178 info!("starting crawler ({:?})", state.filter.load().mode);
179 let state_clone = state.clone();
180 let relay_host_clone = cfg.relay_host.clone();
181 let crawler_max_pending = cfg.crawler_max_pending_repos;
182 let crawler_resume_pending = cfg.crawler_resume_pending_repos;
183
184 let should_run_crawler = match cfg.enable_crawler {
185 Some(true) => true,
186 Some(false) => false,
187 None => state.filter.load().mode == hydrant::filter::FilterMode::Full,
188 };
189
190 if should_run_crawler {
191 tokio::spawn(async move {
192 // the crawler is responsible for finding new repos
193 let crawler = hydrant::crawler::Crawler::new(
194 state_clone,
195 relay_host_clone,
196 crawler_max_pending,
197 crawler_resume_pending,
198 );
199 if let Err(e) = crawler.run().await {
200 error!(err = %e, "crawler error");
201 db::check_poisoned_report(&e);
202 }
203 });
204 } else {
205 info!("crawler disabled by config or filter mode");
206 }
207
208 let mut tasks = if cfg.enable_firehose {
209 let firehose_worker = std::thread::spawn({
210 let state = state.clone();
211 let handle = tokio::runtime::Handle::current();
212 move || {
213 FirehoseWorker::new(
214 state,
215 buffer_rx,
216 matches!(cfg.verify_signatures, SignatureVerification::Full),
217 cfg.firehose_workers,
218 )
219 .run(handle)
220 }
221 });
222
223 let ingestor = FirehoseIngestor::new(
224 state.clone(),
225 buffer_tx,
226 cfg.relay_host,
227 state.filter.clone(),
228 matches!(cfg.verify_signatures, SignatureVerification::Full),
229 );
230
231 vec![
232 Box::pin(
233 tokio::task::spawn_blocking(move || {
234 firehose_worker
235 .join()
236 .map_err(|e| miette::miette!("buffer processor died: {e:?}"))
237 })
238 .map(|r| r.into_diagnostic().flatten().flatten()),
239 ) as BoxFuture<_>,
240 Box::pin(ingestor.run()),
241 ]
242 } else {
243 info!("firehose ingestion disabled by config");
244 // if firehose is disabled, we just wait indefinitely (or until signal)
245 // essentially we just want to keep the main thread alive for the other components
246 vec![Box::pin(futures::future::pending::<miette::Result<()>>()) as BoxFuture<_>]
247 };
248
249 let state_api = state.clone();
250 tasks.push(Box::pin(async move {
251 api::serve(state_api, cfg.api_port)
252 .await
253 .map_err(|e| miette::miette!("API server failed: {e}"))
254 }) as BoxFuture<_>);
255
256 if cfg.enable_debug {
257 let state_debug = state.clone();
258 tasks.push(Box::pin(async move {
259 api::serve_debug(state_debug, cfg.debug_port)
260 .await
261 .map_err(|e| miette::miette!("debug server failed: {e}"))
262 }) as BoxFuture<_>);
263 }
264
265 let res = futures::future::select_all(tasks);
266 if let (Err(e), _, _) = res.await {
267 error!(err = %e, "critical worker died");
268 db::check_poisoned_report(&e);
269 }
270
271 if let Err(e) = state.db.persist() {
272 db::check_poisoned_report(&e);
273 return Err(e);
274 }
275
276 Ok(())
277}