forked from
ptr.pet/hydrant
kind of like tap but different and in rust
1use miette::Result;
2use smol_str::SmolStr;
3use std::fmt;
4use std::path::PathBuf;
5use std::str::FromStr;
6use std::time::Duration;
7use url::Url;
8
9#[derive(Debug, Clone, Copy)]
10pub enum SignatureVerification {
11 Full,
12 BackfillOnly,
13 None,
14}
15
16impl FromStr for SignatureVerification {
17 type Err = miette::Error;
18 fn from_str(s: &str) -> Result<Self> {
19 match s {
20 "full" => Ok(Self::Full),
21 "backfill-only" => Ok(Self::BackfillOnly),
22 "none" => Ok(Self::None),
23 _ => Err(miette::miette!("invalid signature verification level")),
24 }
25 }
26}
27
28impl fmt::Display for SignatureVerification {
29 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
30 match self {
31 Self::Full => write!(f, "full"),
32 Self::BackfillOnly => write!(f, "backfill-only"),
33 Self::None => write!(f, "none"),
34 }
35 }
36}
37
38#[derive(Debug, Clone)]
39pub struct Config {
40 pub database_path: PathBuf,
41 pub relay_host: Url,
42 pub plc_urls: Vec<Url>,
43 pub full_network: bool,
44 pub cursor_save_interval: Duration,
45 pub repo_fetch_timeout: Duration,
46 pub log_level: SmolStr,
47 pub api_port: u16,
48 pub cache_size: u64,
49 pub backfill_concurrency_limit: usize,
50 pub disable_lz4_compression: bool,
51 pub debug_port: u16,
52 pub enable_debug: bool,
53 pub verify_signatures: SignatureVerification,
54 pub identity_cache_size: u64,
55 pub disable_firehose: bool,
56 pub disable_backfill: bool,
57 pub firehose_workers: usize,
58 pub db_worker_threads: usize,
59 pub db_max_journaling_size_mb: u64,
60 pub db_pending_memtable_size_mb: u64,
61 pub db_blocks_memtable_size_mb: u64,
62 pub db_repos_memtable_size_mb: u64,
63 pub db_events_memtable_size_mb: u64,
64 pub db_records_memtable_size_mb: u64,
65 pub crawler_max_pending_repos: usize,
66 pub crawler_resume_pending_repos: usize,
67}
68
69impl Config {
70 pub fn from_env() -> Result<Self> {
71 macro_rules! cfg {
72 (@val $key:expr) => {
73 std::env::var(concat!("HYDRANT_", $key))
74 };
75 ($key:expr, $default:expr, sec) => {
76 cfg!(@val $key)
77 .ok()
78 .and_then(|s| humantime::parse_duration(&s).ok())
79 .unwrap_or(Duration::from_secs($default))
80 };
81 ($key:expr, $default:expr) => {
82 cfg!(@val $key)
83 .ok()
84 .and_then(|s| s.parse().ok())
85 .unwrap_or($default.to_owned())
86 .into()
87 };
88 }
89
90 let log_level = cfg!("LOG_LEVEL", "info");
91
92 let relay_host = cfg!(
93 "RELAY_HOST",
94 Url::parse("wss://relay.fire.hose.cam").unwrap()
95 );
96 let plc_urls: Vec<Url> = std::env::var("HYDRANT_PLC_URL")
97 .ok()
98 .map(|s| {
99 s.split(',')
100 .map(|s| Url::parse(s.trim()))
101 .collect::<Result<Vec<_>, _>>()
102 .map_err(|e| miette::miette!("invalid PLC URL: {}", e))
103 })
104 .unwrap_or_else(|| Ok(vec![Url::parse("https://plc.wtf").unwrap()]))?;
105
106 let full_network: bool = cfg!("FULL_NETWORK", false);
107 let cursor_save_interval = cfg!("CURSOR_SAVE_INTERVAL", 5, sec);
108 let repo_fetch_timeout = cfg!("REPO_FETCH_TIMEOUT", 300, sec);
109
110 let database_path = cfg!("DATABASE_PATH", "./hydrant.db");
111 let cache_size = cfg!("CACHE_SIZE", 256u64);
112 let disable_lz4_compression = cfg!("NO_LZ4_COMPRESSION", false);
113
114 let api_port = cfg!("API_PORT", 3000u16);
115 let enable_debug = cfg!("ENABLE_DEBUG", false);
116 let debug_port = cfg!("DEBUG_PORT", 3001u16);
117 let verify_signatures = cfg!("VERIFY_SIGNATURES", SignatureVerification::Full);
118 let identity_cache_size = cfg!("IDENTITY_CACHE_SIZE", 1_000_000u64);
119 let disable_firehose = cfg!("DISABLE_FIREHOSE", false);
120 let disable_backfill = cfg!("DISABLE_BACKFILL", false);
121
122 let backfill_concurrency_limit = cfg!("BACKFILL_CONCURRENCY_LIMIT", 128usize);
123 let firehose_workers = cfg!(
124 "FIREHOSE_WORKERS",
125 full_network.then_some(32usize).unwrap_or(8usize)
126 );
127
128 let (
129 default_db_worker_threads,
130 default_db_max_journaling_size_mb,
131 default_db_memtable_size_mb,
132 ): (usize, u64, u64) = full_network
133 .then_some((8usize, 1024u64, 192u64))
134 .unwrap_or((4usize, 512u64, 64u64));
135
136 let db_worker_threads = cfg!("DB_WORKER_THREADS", default_db_worker_threads);
137 let db_max_journaling_size_mb = cfg!(
138 "DB_MAX_JOURNALING_SIZE_MB",
139 default_db_max_journaling_size_mb
140 );
141 let db_pending_memtable_size_mb =
142 cfg!("DB_PENDING_MEMTABLE_SIZE_MB", default_db_memtable_size_mb);
143 let db_blocks_memtable_size_mb =
144 cfg!("DB_BLOCKS_MEMTABLE_SIZE_MB", default_db_memtable_size_mb);
145 let db_repos_memtable_size_mb =
146 cfg!("DB_REPOS_MEMTABLE_SIZE_MB", default_db_memtable_size_mb);
147 let db_events_memtable_size_mb =
148 cfg!("DB_EVENTS_MEMTABLE_SIZE_MB", default_db_memtable_size_mb);
149 let db_records_memtable_size_mb =
150 cfg!("DB_RECORDS_MEMTABLE_SIZE_MB", default_db_memtable_size_mb);
151
152 let crawler_max_pending_repos = cfg!("CRAWLER_MAX_PENDING_REPOS", 2000usize);
153 let crawler_resume_pending_repos = cfg!("CRAWLER_RESUME_PENDING_REPOS", 1000usize);
154
155 Ok(Self {
156 database_path,
157 relay_host,
158 plc_urls,
159 full_network,
160 cursor_save_interval,
161 repo_fetch_timeout,
162 log_level,
163 api_port,
164 cache_size,
165 backfill_concurrency_limit,
166 disable_lz4_compression,
167 debug_port,
168 enable_debug,
169 verify_signatures,
170 identity_cache_size,
171 disable_firehose,
172 disable_backfill,
173 firehose_workers,
174 db_worker_threads,
175 db_max_journaling_size_mb,
176 db_pending_memtable_size_mb,
177 db_blocks_memtable_size_mb,
178 db_repos_memtable_size_mb,
179 db_events_memtable_size_mb,
180 db_records_memtable_size_mb,
181 crawler_max_pending_repos,
182 crawler_resume_pending_repos,
183 })
184 }
185}
186
187impl fmt::Display for Config {
188 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
189 writeln!(f, "hydrant configuration:")?;
190 writeln!(f, " log level: {}", self.log_level)?;
191 writeln!(f, " relay host: {}", self.relay_host)?;
192 writeln!(f, " plc urls: {:?}", self.plc_urls)?;
193 writeln!(f, " full network indexing: {}", self.full_network)?;
194 writeln!(f, " verify signatures: {}", self.verify_signatures)?;
195 writeln!(
196 f,
197 " backfill concurrency: {}",
198 self.backfill_concurrency_limit
199 )?;
200 writeln!(
201 f,
202 " identity cache size: {}",
203 self.identity_cache_size
204 )?;
205 writeln!(
206 f,
207 " cursor save interval: {}sec",
208 self.cursor_save_interval.as_secs()
209 )?;
210 writeln!(
211 f,
212 " repo fetch timeout: {}sec",
213 self.repo_fetch_timeout.as_secs()
214 )?;
215 writeln!(
216 f,
217 " database path: {}",
218 self.database_path.to_string_lossy()
219 )?;
220 writeln!(f, " cache size: {} mb", self.cache_size)?;
221 writeln!(
222 f,
223 " disable lz4 compression: {}",
224 self.disable_lz4_compression
225 )?;
226 writeln!(f, " api port: {}", self.api_port)?;
227 writeln!(f, " firehose workers: {}", self.firehose_workers)?;
228 writeln!(f, " db worker threads: {}", self.db_worker_threads)?;
229 writeln!(
230 f,
231 " db journal size: {} mb",
232 self.db_max_journaling_size_mb
233 )?;
234 writeln!(
235 f,
236 " db pending memtable: {} mb",
237 self.db_pending_memtable_size_mb
238 )?;
239 writeln!(
240 f,
241 " db blocks memtable: {} mb",
242 self.db_blocks_memtable_size_mb
243 )?;
244 writeln!(
245 f,
246 " db repos memtable: {} mb",
247 self.db_repos_memtable_size_mb
248 )?;
249 writeln!(
250 f,
251 " db events memtable: {} mb",
252 self.db_events_memtable_size_mb
253 )?;
254 writeln!(
255 f,
256 " db records memtable: {} mb",
257 self.db_records_memtable_size_mb
258 )?;
259
260 writeln!(
261 f,
262 " crawler max pending: {}",
263 self.crawler_max_pending_repos
264 )?;
265 writeln!(
266 f,
267 " crawler resume pending: {}",
268 self.crawler_resume_pending_repos
269 )?;
270 writeln!(f, " enable debug: {}", self.enable_debug)?;
271 if self.enable_debug {
272 writeln!(f, " debug port: {}", self.debug_port)?;
273 }
274 Ok(())
275 }
276}