at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
1use futures::{FutureExt, TryFutureExt, future::BoxFuture};
2use hydrant::config::{Config, SignatureVerification};
3use hydrant::crawler::Crawler;
4use hydrant::db::{self, set_firehose_cursor};
5use hydrant::ingest::firehose::FirehoseIngestor;
6use hydrant::state::AppState;
7use hydrant::{api, backfill::BackfillWorker, ingest::worker::FirehoseWorker};
8use miette::IntoDiagnostic;
9use mimalloc::MiMalloc;
10use std::sync::Arc;
11use std::sync::atomic::Ordering;
12use tokio::{sync::mpsc, task::spawn_blocking};
13use tracing::{error, info};
14
15#[global_allocator]
16static GLOBAL: MiMalloc = MiMalloc;
17
18#[tokio::main]
19async fn main() -> miette::Result<()> {
20 let cfg = Config::from_env()?;
21
22 let env_filter = tracing_subscriber::EnvFilter::new(&cfg.log_level);
23 tracing_subscriber::fmt().with_env_filter(env_filter).init();
24
25 info!("{cfg}");
26
27 let state = AppState::new(&cfg)?;
28 let (buffer_tx, buffer_rx) = mpsc::unbounded_channel();
29 let state = Arc::new(state);
30
31 if !cfg.disable_backfill {
32 tokio::spawn({
33 let state = state.clone();
34 let timeout = cfg.repo_fetch_timeout;
35 BackfillWorker::new(
36 state,
37 buffer_tx.clone(),
38 timeout,
39 cfg.backfill_concurrency_limit,
40 matches!(
41 cfg.verify_signatures,
42 SignatureVerification::Full | SignatureVerification::BackfillOnly
43 ),
44 )
45 .run()
46 });
47 }
48
49 if let Err(e) = spawn_blocking({
50 let state = state.clone();
51 move || hydrant::backfill::manager::queue_gone_backfills(&state)
52 })
53 .await
54 .into_diagnostic()?
55 {
56 error!("failed to queue gone backfills: {e}");
57 db::check_poisoned_report(&e);
58 }
59
60 std::thread::spawn({
61 let state = state.clone();
62 move || hydrant::backfill::manager::retry_worker(state)
63 });
64
65 tokio::spawn({
66 let state = state.clone();
67 async move {
68 let mut last_id = state.db.next_event_id.load(Ordering::Relaxed);
69 let mut last_time = std::time::Instant::now();
70 let mut interval = tokio::time::interval(std::time::Duration::from_secs(60));
71
72 loop {
73 interval.tick().await;
74
75 let current_id = state.db.next_event_id.load(Ordering::Relaxed);
76 let current_time = std::time::Instant::now();
77
78 let delta = current_id.saturating_sub(last_id);
79 let elapsed = current_time.duration_since(last_time).as_secs_f64();
80 let rate = if elapsed > 0.0 {
81 delta as f64 / elapsed
82 } else {
83 0.0
84 };
85
86 info!("{rate:.2} events/s ({delta} events in {elapsed:.1}s)");
87
88 last_id = current_id;
89 last_time = current_time;
90 }
91 }
92 });
93
94 std::thread::spawn({
95 let state = state.clone();
96 let persist_interval = cfg.cursor_save_interval;
97
98 move || {
99 info!("persistence worker started");
100 loop {
101 std::thread::sleep(persist_interval);
102
103 // persist firehose cursor
104 let seq = state.cur_firehose.load(Ordering::SeqCst);
105 if let Err(e) = set_firehose_cursor(&state.db, seq) {
106 error!("failed to save cursor: {e}");
107 db::check_poisoned_report(&e);
108 }
109
110 // persist counts
111 // TODO: make this more durable
112 if let Err(e) = db::persist_counts(&state.db) {
113 error!("failed to persist counts: {e}");
114 db::check_poisoned_report(&e);
115 }
116
117 // persist journal
118 if let Err(e) = state.db.persist() {
119 error!("db persist failed: {e}");
120 db::check_poisoned_report(&e);
121 }
122 }
123 }
124 });
125
126 if cfg.full_network {
127 tokio::spawn(
128 Crawler::new(state.clone(), cfg.relay_host.clone())
129 .run()
130 .inspect_err(|e| {
131 error!("crawler died: {e}");
132 db::check_poisoned_report(&e);
133 }),
134 );
135 }
136
137 let mut tasks = if !cfg.disable_firehose {
138 let firehose_worker = std::thread::spawn({
139 let state = state.clone();
140 let handle = tokio::runtime::Handle::current();
141 move || {
142 FirehoseWorker::new(
143 state,
144 buffer_rx,
145 matches!(cfg.verify_signatures, SignatureVerification::Full),
146 cfg.firehose_workers,
147 )
148 .run(handle)
149 }
150 });
151
152 let ingestor = FirehoseIngestor::new(
153 state.clone(),
154 buffer_tx,
155 cfg.relay_host,
156 cfg.full_network,
157 matches!(cfg.verify_signatures, SignatureVerification::Full),
158 );
159
160 vec![
161 Box::pin(
162 tokio::task::spawn_blocking(move || {
163 firehose_worker
164 .join()
165 .map_err(|e| miette::miette!("buffer processor died: {e:?}"))
166 })
167 .map(|r| r.into_diagnostic().flatten().flatten()),
168 ) as BoxFuture<_>,
169 Box::pin(ingestor.run()),
170 ]
171 } else {
172 info!("firehose ingestion disabled by config");
173 // if firehose is disabled, we just wait indefinitely (or until signal)
174 // essentially we just want to keep the main thread alive for the other components
175 vec![Box::pin(futures::future::pending::<miette::Result<()>>()) as BoxFuture<_>]
176 };
177
178 let state_api = state.clone();
179 tasks.push(Box::pin(async move {
180 api::serve(state_api, cfg.api_port)
181 .await
182 .map_err(|e| miette::miette!("API server failed: {e}"))
183 }) as BoxFuture<_>);
184
185 if cfg.enable_debug {
186 let state_debug = state.clone();
187 tasks.push(Box::pin(async move {
188 api::serve_debug(state_debug, cfg.debug_port)
189 .await
190 .map_err(|e| miette::miette!("debug server failed: {e}"))
191 }) as BoxFuture<_>);
192 }
193
194 let res = futures::future::select_all(tasks);
195 if let (Err(e), _, _) = res.await {
196 error!("critical worker died: {e}");
197 db::check_poisoned_report(&e);
198 }
199
200 if let Err(e) = state.db.persist() {
201 db::check_poisoned_report(&e);
202 return Err(e);
203 }
204
205 Ok(())
206}