forked from
ptr.pet/hydrant
kind of like tap but different and in rust
1use futures::{FutureExt, TryFutureExt, future::BoxFuture};
2use hydrant::config::{Config, SignatureVerification};
3use hydrant::crawler::Crawler;
4use hydrant::db::{self, set_firehose_cursor};
5use hydrant::ingest::firehose::FirehoseIngestor;
6use hydrant::state::AppState;
7use hydrant::{api, backfill::BackfillWorker, ingest::worker::FirehoseWorker};
8use miette::IntoDiagnostic;
9use mimalloc::MiMalloc;
10use std::sync::Arc;
11use std::sync::atomic::Ordering;
12use tokio::{sync::mpsc, task::spawn_blocking};
13use tracing::{error, info};
14
15#[global_allocator]
16static GLOBAL: MiMalloc = MiMalloc;
17
18#[tokio::main]
19async fn main() -> miette::Result<()> {
20 let cfg = Config::from_env()?;
21
22 let env_filter = tracing_subscriber::EnvFilter::new(&cfg.log_level);
23 tracing_subscriber::fmt().with_env_filter(env_filter).init();
24
25 info!("{cfg}");
26
27 let state = AppState::new(&cfg)?;
28
29 if cfg.full_network {
30 let filter_ks = state.db.filter.clone();
31 let inner = state.db.inner.clone();
32 tokio::task::spawn_blocking(move || {
33 use hydrant::filter::{FilterMode, MODE_KEY};
34 let mut batch = inner.batch();
35 batch.insert(
36 &filter_ks,
37 MODE_KEY,
38 rmp_serde::to_vec(&FilterMode::Full).into_diagnostic()?,
39 );
40 batch.commit().into_diagnostic()
41 })
42 .await
43 .into_diagnostic()??;
44
45 let new_filter = hydrant::db::filter::load(&state.db.filter)?;
46 state.filter.store(new_filter.into());
47 }
48
49 let (buffer_tx, buffer_rx) = mpsc::unbounded_channel();
50 let state = Arc::new(state);
51
52 if !cfg.disable_backfill {
53 tokio::spawn({
54 let state = state.clone();
55 let timeout = cfg.repo_fetch_timeout;
56 BackfillWorker::new(
57 state,
58 buffer_tx.clone(),
59 timeout,
60 cfg.backfill_concurrency_limit,
61 matches!(
62 cfg.verify_signatures,
63 SignatureVerification::Full | SignatureVerification::BackfillOnly
64 ),
65 )
66 .run()
67 });
68 }
69
70 if let Err(e) = spawn_blocking({
71 let state = state.clone();
72 move || hydrant::backfill::manager::queue_gone_backfills(&state)
73 })
74 .await
75 .into_diagnostic()?
76 {
77 error!("failed to queue gone backfills: {e}");
78 db::check_poisoned_report(&e);
79 }
80
81 std::thread::spawn({
82 let state = state.clone();
83 move || hydrant::backfill::manager::retry_worker(state)
84 });
85
86 tokio::spawn({
87 let state = state.clone();
88 async move {
89 let mut last_id = state.db.next_event_id.load(Ordering::Relaxed);
90 let mut last_time = std::time::Instant::now();
91 let mut interval = tokio::time::interval(std::time::Duration::from_secs(60));
92
93 loop {
94 interval.tick().await;
95
96 let current_id = state.db.next_event_id.load(Ordering::Relaxed);
97 let current_time = std::time::Instant::now();
98
99 let delta = current_id.saturating_sub(last_id);
100 let elapsed = current_time.duration_since(last_time).as_secs_f64();
101 let rate = if elapsed > 0.0 {
102 delta as f64 / elapsed
103 } else {
104 0.0
105 };
106
107 info!("{rate:.2} events/s ({delta} events in {elapsed:.1}s)");
108
109 last_id = current_id;
110 last_time = current_time;
111 }
112 }
113 });
114
115 std::thread::spawn({
116 let state = state.clone();
117 let persist_interval = cfg.cursor_save_interval;
118
119 move || {
120 info!("persistence worker started");
121 loop {
122 std::thread::sleep(persist_interval);
123
124 // persist firehose cursor
125 let seq = state.cur_firehose.load(Ordering::SeqCst);
126 if let Err(e) = set_firehose_cursor(&state.db, seq) {
127 error!("failed to save cursor: {e}");
128 db::check_poisoned_report(&e);
129 }
130
131 // persist counts
132 // TODO: make this more durable
133 if let Err(e) = db::persist_counts(&state.db) {
134 error!("failed to persist counts: {e}");
135 db::check_poisoned_report(&e);
136 }
137
138 // persist journal
139 if let Err(e) = state.db.persist() {
140 error!("db persist failed: {e}");
141 db::check_poisoned_report(&e);
142 }
143 }
144 }
145 });
146
147 if let hydrant::filter::FilterMode::Full | hydrant::filter::FilterMode::Signal =
148 state.filter.load().mode
149 {
150 tokio::spawn(
151 Crawler::new(
152 state.clone(),
153 cfg.relay_host.clone(),
154 cfg.crawler_max_pending_repos,
155 cfg.crawler_resume_pending_repos,
156 )
157 .run()
158 .inspect_err(|e| {
159 error!("crawler died: {e}");
160 db::check_poisoned_report(&e);
161 }),
162 );
163 }
164
165 let mut tasks = if !cfg.disable_firehose {
166 let firehose_worker = std::thread::spawn({
167 let state = state.clone();
168 let handle = tokio::runtime::Handle::current();
169 move || {
170 FirehoseWorker::new(
171 state,
172 buffer_rx,
173 matches!(cfg.verify_signatures, SignatureVerification::Full),
174 cfg.firehose_workers,
175 )
176 .run(handle)
177 }
178 });
179
180 let ingestor = FirehoseIngestor::new(
181 state.clone(),
182 buffer_tx,
183 cfg.relay_host,
184 state.filter.clone(),
185 matches!(cfg.verify_signatures, SignatureVerification::Full),
186 );
187
188 vec![
189 Box::pin(
190 tokio::task::spawn_blocking(move || {
191 firehose_worker
192 .join()
193 .map_err(|e| miette::miette!("buffer processor died: {e:?}"))
194 })
195 .map(|r| r.into_diagnostic().flatten().flatten()),
196 ) as BoxFuture<_>,
197 Box::pin(ingestor.run()),
198 ]
199 } else {
200 info!("firehose ingestion disabled by config");
201 // if firehose is disabled, we just wait indefinitely (or until signal)
202 // essentially we just want to keep the main thread alive for the other components
203 vec![Box::pin(futures::future::pending::<miette::Result<()>>()) as BoxFuture<_>]
204 };
205
206 let state_api = state.clone();
207 tasks.push(Box::pin(async move {
208 api::serve(state_api, cfg.api_port)
209 .await
210 .map_err(|e| miette::miette!("API server failed: {e}"))
211 }) as BoxFuture<_>);
212
213 if cfg.enable_debug {
214 let state_debug = state.clone();
215 tasks.push(Box::pin(async move {
216 api::serve_debug(state_debug, cfg.debug_port)
217 .await
218 .map_err(|e| miette::miette!("debug server failed: {e}"))
219 }) as BoxFuture<_>);
220 }
221
222 let res = futures::future::select_all(tasks);
223 if let (Err(e), _, _) = res.await {
224 error!("critical worker died: {e}");
225 db::check_poisoned_report(&e);
226 }
227
228 if let Err(e) = state.db.persist() {
229 db::check_poisoned_report(&e);
230 return Err(e);
231 }
232
233 Ok(())
234}