at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
1use miette::Result;
2use std::fmt;
3use std::path::PathBuf;
4use std::str::FromStr;
5use std::time::Duration;
6use url::Url;
7
8#[derive(Debug, Clone, Copy)]
9pub enum SignatureVerification {
10 Full,
11 BackfillOnly,
12 None,
13}
14
15impl FromStr for SignatureVerification {
16 type Err = miette::Error;
17 fn from_str(s: &str) -> Result<Self> {
18 match s {
19 "full" => Ok(Self::Full),
20 "backfill-only" => Ok(Self::BackfillOnly),
21 "none" => Ok(Self::None),
22 _ => Err(miette::miette!("invalid signature verification level")),
23 }
24 }
25}
26
27impl fmt::Display for SignatureVerification {
28 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
29 match self {
30 Self::Full => write!(f, "full"),
31 Self::BackfillOnly => write!(f, "backfill-only"),
32 Self::None => write!(f, "none"),
33 }
34 }
35}
36
37#[derive(Debug, Clone)]
38pub struct Config {
39 pub database_path: PathBuf,
40 pub relay_host: Url,
41 pub plc_urls: Vec<Url>,
42 pub full_network: bool,
43 pub ephemeral: bool,
44 pub cursor_save_interval: Duration,
45 pub repo_fetch_timeout: Duration,
46 pub api_port: u16,
47 pub cache_size: u64,
48 pub backfill_concurrency_limit: usize,
49 pub disable_lz4_compression: bool,
50 pub debug_port: u16,
51 pub enable_debug: bool,
52 pub verify_signatures: SignatureVerification,
53 pub identity_cache_size: u64,
54 pub enable_firehose: bool,
55 pub enable_backfill: bool,
56 pub enable_crawler: Option<bool>,
57 pub firehose_workers: usize,
58 pub db_worker_threads: usize,
59 pub db_max_journaling_size_mb: u64,
60 pub db_pending_memtable_size_mb: u64,
61 pub db_blocks_memtable_size_mb: u64,
62 pub db_repos_memtable_size_mb: u64,
63 pub db_events_memtable_size_mb: u64,
64 pub db_records_memtable_size_mb: u64,
65 pub crawler_max_pending_repos: usize,
66 pub crawler_resume_pending_repos: usize,
67 pub filter_signals: Option<Vec<String>>,
68 pub filter_collections: Option<Vec<String>>,
69 pub filter_excludes: Option<Vec<String>>,
70}
71
72impl Config {
73 pub fn from_env() -> Result<Self> {
74 macro_rules! cfg {
75 (@val $key:expr) => {
76 std::env::var(concat!("HYDRANT_", $key))
77 };
78 ($key:expr, $default:expr, sec) => {
79 cfg!(@val $key)
80 .ok()
81 .and_then(|s| humantime::parse_duration(&s).ok())
82 .unwrap_or(Duration::from_secs($default))
83 };
84 ($key:expr, $default:expr) => {
85 cfg!(@val $key)
86 .ok()
87 .and_then(|s| s.parse().ok())
88 .unwrap_or($default.to_owned())
89 .into()
90 };
91 }
92
93 let relay_host = cfg!(
94 "RELAY_HOST",
95 Url::parse("wss://relay.fire.hose.cam").unwrap()
96 );
97 let plc_urls: Vec<Url> = std::env::var("HYDRANT_PLC_URL")
98 .ok()
99 .map(|s| {
100 s.split(',')
101 .map(|s| Url::parse(s.trim()))
102 .collect::<Result<Vec<_>, _>>()
103 .map_err(|e| miette::miette!("invalid PLC URL: {e}"))
104 })
105 .unwrap_or_else(|| Ok(vec![Url::parse("https://plc.wtf").unwrap()]))?;
106
107 let full_network: bool = cfg!("FULL_NETWORK", false);
108 let cursor_save_interval = cfg!("CURSOR_SAVE_INTERVAL", 5, sec);
109 let repo_fetch_timeout = cfg!("REPO_FETCH_TIMEOUT", 300, sec);
110
111 let ephemeral: bool = cfg!("EPHEMERAL", false);
112 let database_path = cfg!("DATABASE_PATH", "./hydrant.db");
113 let cache_size = cfg!("CACHE_SIZE", 256u64);
114 let disable_lz4_compression = cfg!("NO_LZ4_COMPRESSION", false);
115
116 let api_port = cfg!("API_PORT", 3000u16);
117 let enable_debug = cfg!("ENABLE_DEBUG", false);
118 let debug_port = cfg!("DEBUG_PORT", 3001u16);
119 let verify_signatures = cfg!("VERIFY_SIGNATURES", SignatureVerification::Full);
120 let identity_cache_size = cfg!("IDENTITY_CACHE_SIZE", 1_000_000u64);
121 let enable_firehose = cfg!("ENABLE_FIREHOSE", true);
122 let enable_backfill = cfg!("ENABLE_BACKFILL", true);
123 let enable_crawler = std::env::var("HYDRANT_ENABLE_CRAWLER")
124 .ok()
125 .and_then(|s| s.parse().ok());
126
127 let backfill_concurrency_limit = cfg!("BACKFILL_CONCURRENCY_LIMIT", 128usize);
128 let firehose_workers = cfg!(
129 "FIREHOSE_WORKERS",
130 full_network.then_some(32usize).unwrap_or(8usize)
131 );
132
133 let (
134 default_db_worker_threads,
135 default_db_max_journaling_size_mb,
136 default_db_memtable_size_mb,
137 ): (usize, u64, u64) = full_network
138 .then_some((8usize, 1024u64, 192u64))
139 .unwrap_or((4usize, 400u64, 32u64));
140
141 let db_worker_threads = cfg!("DB_WORKER_THREADS", default_db_worker_threads);
142 let db_max_journaling_size_mb = cfg!(
143 "DB_MAX_JOURNALING_SIZE_MB",
144 default_db_max_journaling_size_mb
145 );
146 let db_pending_memtable_size_mb =
147 cfg!("DB_PENDING_MEMTABLE_SIZE_MB", default_db_memtable_size_mb);
148 let db_blocks_memtable_size_mb =
149 cfg!("DB_BLOCKS_MEMTABLE_SIZE_MB", default_db_memtable_size_mb);
150 let db_repos_memtable_size_mb =
151 cfg!("DB_REPOS_MEMTABLE_SIZE_MB", default_db_memtable_size_mb);
152 let db_events_memtable_size_mb =
153 cfg!("DB_EVENTS_MEMTABLE_SIZE_MB", default_db_memtable_size_mb);
154 let db_records_memtable_size_mb =
155 cfg!("DB_RECORDS_MEMTABLE_SIZE_MB", default_db_memtable_size_mb);
156
157 let crawler_max_pending_repos = cfg!("CRAWLER_MAX_PENDING_REPOS", 2000usize);
158 let crawler_resume_pending_repos = cfg!("CRAWLER_RESUME_PENDING_REPOS", 1000usize);
159
160 let filter_signals = std::env::var("HYDRANT_FILTER_SIGNALS").ok().map(|s| {
161 s.split(',')
162 .map(|s| s.trim().to_string())
163 .filter(|s| !s.is_empty())
164 .collect()
165 });
166
167 let filter_collections = std::env::var("HYDRANT_FILTER_COLLECTIONS").ok().map(|s| {
168 s.split(',')
169 .map(|s| s.trim().to_string())
170 .filter(|s| !s.is_empty())
171 .collect()
172 });
173
174 let filter_excludes = std::env::var("HYDRANT_FILTER_EXCLUDES").ok().map(|s| {
175 s.split(',')
176 .map(|s| s.trim().to_string())
177 .filter(|s| !s.is_empty())
178 .collect()
179 });
180
181 Ok(Self {
182 database_path,
183 relay_host,
184 plc_urls,
185 ephemeral,
186 full_network,
187 cursor_save_interval,
188 repo_fetch_timeout,
189 api_port,
190 cache_size,
191 backfill_concurrency_limit,
192 disable_lz4_compression,
193 debug_port,
194 enable_debug,
195 verify_signatures,
196 identity_cache_size,
197 enable_firehose,
198 enable_backfill,
199 enable_crawler,
200 firehose_workers,
201 db_worker_threads,
202 db_max_journaling_size_mb,
203 db_pending_memtable_size_mb,
204 db_blocks_memtable_size_mb,
205 db_repos_memtable_size_mb,
206 db_events_memtable_size_mb,
207 db_records_memtable_size_mb,
208 crawler_max_pending_repos,
209 crawler_resume_pending_repos,
210 filter_signals,
211 filter_collections,
212 filter_excludes,
213 })
214 }
215}
216
217macro_rules! config_line {
218 ($f:expr, $label:expr, $value:expr) => {
219 writeln!($f, " {:<width$}{}", $label, $value, width = LABEL_WIDTH)
220 };
221}
222
223impl fmt::Display for Config {
224 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
225 const LABEL_WIDTH: usize = 27;
226
227 writeln!(f, "hydrant configuration:")?;
228 config_line!(f, "relay host", self.relay_host)?;
229 config_line!(f, "plc urls", format_args!("{:?}", self.plc_urls))?;
230 config_line!(f, "full network indexing", self.full_network)?;
231 config_line!(f, "verify signatures", self.verify_signatures)?;
232 config_line!(f, "backfill concurrency", self.backfill_concurrency_limit)?;
233 config_line!(f, "identity cache size", self.identity_cache_size)?;
234 config_line!(
235 f,
236 "cursor save interval",
237 format_args!("{}sec", self.cursor_save_interval.as_secs())
238 )?;
239 config_line!(
240 f,
241 "repo fetch timeout",
242 format_args!("{}sec", self.repo_fetch_timeout.as_secs())
243 )?;
244 config_line!(f, "ephemeral", self.ephemeral)?;
245 config_line!(f, "database path", self.database_path.to_string_lossy())?;
246 config_line!(f, "cache size", format_args!("{} mb", self.cache_size))?;
247 config_line!(f, "disable lz4 compression", self.disable_lz4_compression)?;
248 config_line!(f, "api port", self.api_port)?;
249 config_line!(f, "firehose workers", self.firehose_workers)?;
250 config_line!(f, "db worker threads", self.db_worker_threads)?;
251 config_line!(
252 f,
253 "db journal size",
254 format_args!("{} mb", self.db_max_journaling_size_mb)
255 )?;
256 config_line!(
257 f,
258 "db pending memtable",
259 format_args!("{} mb", self.db_pending_memtable_size_mb)
260 )?;
261 config_line!(
262 f,
263 "db blocks memtable",
264 format_args!("{} mb", self.db_blocks_memtable_size_mb)
265 )?;
266 config_line!(
267 f,
268 "db repos memtable",
269 format_args!("{} mb", self.db_repos_memtable_size_mb)
270 )?;
271 config_line!(
272 f,
273 "db events memtable",
274 format_args!("{} mb", self.db_events_memtable_size_mb)
275 )?;
276 config_line!(
277 f,
278 "db records memtable",
279 format_args!("{} mb", self.db_records_memtable_size_mb)
280 )?;
281 config_line!(f, "crawler max pending", self.crawler_max_pending_repos)?;
282 config_line!(
283 f,
284 "crawler resume pending",
285 self.crawler_resume_pending_repos
286 )?;
287 if let Some(signals) = &self.filter_signals {
288 config_line!(f, "filter signals", format_args!("{:?}", signals))?;
289 }
290 if let Some(collections) = &self.filter_collections {
291 config_line!(f, "filter collections", format_args!("{:?}", collections))?;
292 }
293 if let Some(excludes) = &self.filter_excludes {
294 config_line!(f, "filter excludes", format_args!("{:?}", excludes))?;
295 }
296 config_line!(f, "enable debug", self.enable_debug)?;
297 if self.enable_debug {
298 config_line!(f, "debug port", self.debug_port)?;
299 }
300 Ok(())
301 }
302}