at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
1use miette::Result;
2use smol_str::SmolStr;
3use std::fmt;
4use std::path::PathBuf;
5use std::str::FromStr;
6use std::time::Duration;
7use url::Url;
8
9#[derive(Debug, Clone, Copy)]
10pub enum SignatureVerification {
11 Full,
12 BackfillOnly,
13 None,
14}
15
16impl FromStr for SignatureVerification {
17 type Err = miette::Error;
18 fn from_str(s: &str) -> Result<Self> {
19 match s {
20 "full" => Ok(Self::Full),
21 "backfill-only" => Ok(Self::BackfillOnly),
22 "none" => Ok(Self::None),
23 _ => Err(miette::miette!("invalid signature verification level")),
24 }
25 }
26}
27
28impl fmt::Display for SignatureVerification {
29 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
30 match self {
31 Self::Full => write!(f, "full"),
32 Self::BackfillOnly => write!(f, "backfill-only"),
33 Self::None => write!(f, "none"),
34 }
35 }
36}
37
38#[derive(Debug, Clone)]
39pub struct Config {
40 pub database_path: PathBuf,
41 pub relay_host: Url,
42 pub plc_urls: Vec<Url>,
43 pub full_network: bool,
44 pub cursor_save_interval: Duration,
45 pub repo_fetch_timeout: Duration,
46 pub log_level: SmolStr,
47 pub api_port: u16,
48 pub cache_size: u64,
49 pub backfill_concurrency_limit: usize,
50 pub disable_lz4_compression: bool,
51 pub debug_port: u16,
52 pub enable_debug: bool,
53 pub verify_signatures: SignatureVerification,
54 pub identity_cache_size: u64,
55 pub enable_firehose: bool,
56 pub enable_backfill: bool,
57 pub enable_crawler: Option<bool>,
58 pub firehose_workers: usize,
59 pub db_worker_threads: usize,
60 pub db_max_journaling_size_mb: u64,
61 pub db_pending_memtable_size_mb: u64,
62 pub db_blocks_memtable_size_mb: u64,
63 pub db_repos_memtable_size_mb: u64,
64 pub db_events_memtable_size_mb: u64,
65 pub db_records_memtable_size_mb: u64,
66 pub crawler_max_pending_repos: usize,
67 pub crawler_resume_pending_repos: usize,
68}
69
70impl Config {
71 pub fn from_env() -> Result<Self> {
72 macro_rules! cfg {
73 (@val $key:expr) => {
74 std::env::var(concat!("HYDRANT_", $key))
75 };
76 ($key:expr, $default:expr, sec) => {
77 cfg!(@val $key)
78 .ok()
79 .and_then(|s| humantime::parse_duration(&s).ok())
80 .unwrap_or(Duration::from_secs($default))
81 };
82 ($key:expr, $default:expr) => {
83 cfg!(@val $key)
84 .ok()
85 .and_then(|s| s.parse().ok())
86 .unwrap_or($default.to_owned())
87 .into()
88 };
89 }
90
91 let log_level = cfg!("LOG_LEVEL", "info");
92
93 let relay_host = cfg!(
94 "RELAY_HOST",
95 Url::parse("wss://relay.fire.hose.cam").unwrap()
96 );
97 let plc_urls: Vec<Url> = std::env::var("HYDRANT_PLC_URL")
98 .ok()
99 .map(|s| {
100 s.split(',')
101 .map(|s| Url::parse(s.trim()))
102 .collect::<Result<Vec<_>, _>>()
103 .map_err(|e| miette::miette!("invalid PLC URL: {}", e))
104 })
105 .unwrap_or_else(|| Ok(vec![Url::parse("https://plc.wtf").unwrap()]))?;
106
107 let full_network: bool = cfg!("FULL_NETWORK", false);
108 let cursor_save_interval = cfg!("CURSOR_SAVE_INTERVAL", 5, sec);
109 let repo_fetch_timeout = cfg!("REPO_FETCH_TIMEOUT", 300, sec);
110
111 let database_path = cfg!("DATABASE_PATH", "./hydrant.db");
112 let cache_size = cfg!("CACHE_SIZE", 256u64);
113 let disable_lz4_compression = cfg!("NO_LZ4_COMPRESSION", false);
114
115 let api_port = cfg!("API_PORT", 3000u16);
116 let enable_debug = cfg!("ENABLE_DEBUG", false);
117 let debug_port = cfg!("DEBUG_PORT", 3001u16);
118 let verify_signatures = cfg!("VERIFY_SIGNATURES", SignatureVerification::Full);
119 let identity_cache_size = cfg!("IDENTITY_CACHE_SIZE", 1_000_000u64);
120 let enable_firehose = cfg!("ENABLE_FIREHOSE", true);
121 let enable_backfill = cfg!("ENABLE_BACKFILL", true);
122 let enable_crawler = std::env::var("HYDRANT_ENABLE_CRAWLER")
123 .ok()
124 .and_then(|s| s.parse().ok());
125
126 let backfill_concurrency_limit = cfg!("BACKFILL_CONCURRENCY_LIMIT", 128usize);
127 let firehose_workers = cfg!(
128 "FIREHOSE_WORKERS",
129 full_network.then_some(32usize).unwrap_or(8usize)
130 );
131
132 let (
133 default_db_worker_threads,
134 default_db_max_journaling_size_mb,
135 default_db_memtable_size_mb,
136 ): (usize, u64, u64) = full_network
137 .then_some((8usize, 1024u64, 192u64))
138 .unwrap_or((4usize, 512u64, 64u64));
139
140 let db_worker_threads = cfg!("DB_WORKER_THREADS", default_db_worker_threads);
141 let db_max_journaling_size_mb = cfg!(
142 "DB_MAX_JOURNALING_SIZE_MB",
143 default_db_max_journaling_size_mb
144 );
145 let db_pending_memtable_size_mb =
146 cfg!("DB_PENDING_MEMTABLE_SIZE_MB", default_db_memtable_size_mb);
147 let db_blocks_memtable_size_mb =
148 cfg!("DB_BLOCKS_MEMTABLE_SIZE_MB", default_db_memtable_size_mb);
149 let db_repos_memtable_size_mb =
150 cfg!("DB_REPOS_MEMTABLE_SIZE_MB", default_db_memtable_size_mb);
151 let db_events_memtable_size_mb =
152 cfg!("DB_EVENTS_MEMTABLE_SIZE_MB", default_db_memtable_size_mb);
153 let db_records_memtable_size_mb =
154 cfg!("DB_RECORDS_MEMTABLE_SIZE_MB", default_db_memtable_size_mb);
155
156 let crawler_max_pending_repos = cfg!("CRAWLER_MAX_PENDING_REPOS", 2000usize);
157 let crawler_resume_pending_repos = cfg!("CRAWLER_RESUME_PENDING_REPOS", 1000usize);
158
159 Ok(Self {
160 database_path,
161 relay_host,
162 plc_urls,
163 full_network,
164 cursor_save_interval,
165 repo_fetch_timeout,
166 log_level,
167 api_port,
168 cache_size,
169 backfill_concurrency_limit,
170 disable_lz4_compression,
171 debug_port,
172 enable_debug,
173 verify_signatures,
174 identity_cache_size,
175 enable_firehose,
176 enable_backfill,
177 enable_crawler,
178 firehose_workers,
179 db_worker_threads,
180 db_max_journaling_size_mb,
181 db_pending_memtable_size_mb,
182 db_blocks_memtable_size_mb,
183 db_repos_memtable_size_mb,
184 db_events_memtable_size_mb,
185 db_records_memtable_size_mb,
186 crawler_max_pending_repos,
187 crawler_resume_pending_repos,
188 })
189 }
190}
191
192impl fmt::Display for Config {
193 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
194 writeln!(f, "hydrant configuration:")?;
195 writeln!(f, " log level: {}", self.log_level)?;
196 writeln!(f, " relay host: {}", self.relay_host)?;
197 writeln!(f, " plc urls: {:?}", self.plc_urls)?;
198 writeln!(f, " full network indexing: {}", self.full_network)?;
199 writeln!(f, " verify signatures: {}", self.verify_signatures)?;
200 writeln!(
201 f,
202 " backfill concurrency: {}",
203 self.backfill_concurrency_limit
204 )?;
205 writeln!(
206 f,
207 " identity cache size: {}",
208 self.identity_cache_size
209 )?;
210 writeln!(
211 f,
212 " cursor save interval: {}sec",
213 self.cursor_save_interval.as_secs()
214 )?;
215 writeln!(
216 f,
217 " repo fetch timeout: {}sec",
218 self.repo_fetch_timeout.as_secs()
219 )?;
220 writeln!(
221 f,
222 " database path: {}",
223 self.database_path.to_string_lossy()
224 )?;
225 writeln!(f, " cache size: {} mb", self.cache_size)?;
226 writeln!(
227 f,
228 " disable lz4 compression: {}",
229 self.disable_lz4_compression
230 )?;
231 writeln!(f, " api port: {}", self.api_port)?;
232 writeln!(f, " firehose workers: {}", self.firehose_workers)?;
233 writeln!(f, " db worker threads: {}", self.db_worker_threads)?;
234 writeln!(
235 f,
236 " db journal size: {} mb",
237 self.db_max_journaling_size_mb
238 )?;
239 writeln!(
240 f,
241 " db pending memtable: {} mb",
242 self.db_pending_memtable_size_mb
243 )?;
244 writeln!(
245 f,
246 " db blocks memtable: {} mb",
247 self.db_blocks_memtable_size_mb
248 )?;
249 writeln!(
250 f,
251 " db repos memtable: {} mb",
252 self.db_repos_memtable_size_mb
253 )?;
254 writeln!(
255 f,
256 " db events memtable: {} mb",
257 self.db_events_memtable_size_mb
258 )?;
259 writeln!(
260 f,
261 " db records memtable: {} mb",
262 self.db_records_memtable_size_mb
263 )?;
264
265 writeln!(
266 f,
267 " crawler max pending: {}",
268 self.crawler_max_pending_repos
269 )?;
270 writeln!(
271 f,
272 " crawler resume pending: {}",
273 self.crawler_resume_pending_repos
274 )?;
275 writeln!(f, " enable debug: {}", self.enable_debug)?;
276 if self.enable_debug {
277 writeln!(f, " debug port: {}", self.debug_port)?;
278 }
279 Ok(())
280 }
281}