at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
1use miette::{IntoDiagnostic, Result};
2use smol_str::SmolStr;
3use std::fmt;
4use std::path::PathBuf;
5use std::str::FromStr;
6use std::time::Duration;
7use url::Url;
8
9#[derive(Debug, Clone, Copy)]
10pub enum SignatureVerification {
11 Full,
12 BackfillOnly,
13 None,
14}
15
16impl FromStr for SignatureVerification {
17 type Err = miette::Error;
18 fn from_str(s: &str) -> Result<Self> {
19 match s {
20 "full" => Ok(Self::Full),
21 "backfill-only" => Ok(Self::BackfillOnly),
22 "none" => Ok(Self::None),
23 _ => Err(miette::miette!("invalid signature verification level")),
24 }
25 }
26}
27
28impl fmt::Display for SignatureVerification {
29 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
30 match self {
31 Self::Full => write!(f, "full"),
32 Self::BackfillOnly => write!(f, "backfill-only"),
33 Self::None => write!(f, "none"),
34 }
35 }
36}
37
38#[derive(Debug, Clone)]
39pub struct Config {
40 pub database_path: PathBuf,
41 pub relay_host: Url,
42 pub plc_urls: Vec<Url>,
43 pub full_network: bool,
44 pub cursor_save_interval: Duration,
45 pub repo_fetch_timeout: Duration,
46 pub log_level: SmolStr,
47 pub api_port: u16,
48 pub cache_size: u64,
49 pub backfill_concurrency_limit: usize,
50 pub disable_lz4_compression: bool,
51 pub debug_port: u16,
52 pub enable_debug: bool,
53 pub verify_signatures: SignatureVerification,
54 pub identity_cache_size: u64,
55 pub disable_firehose: bool,
56 pub disable_backfill: bool,
57 pub firehose_workers: usize,
58 pub db_worker_threads: usize,
59 pub db_max_journaling_size_mb: u64,
60 pub db_pending_memtable_size_mb: u64,
61 pub db_blocks_memtable_size_mb: u64,
62 pub db_repos_memtable_size_mb: u64,
63 pub db_events_memtable_size_mb: u64,
64 pub db_records_default_memtable_size_mb: u64,
65 pub db_records_partition_overrides: Vec<(glob::Pattern, u64)>,
66}
67
68impl Config {
69 pub fn from_env() -> Result<Self> {
70 macro_rules! cfg {
71 (@val $key:expr) => {
72 std::env::var(concat!("HYDRANT_", $key))
73 };
74 ($key:expr, $default:expr, sec) => {
75 cfg!(@val $key)
76 .ok()
77 .and_then(|s| humantime::parse_duration(&s).ok())
78 .unwrap_or(Duration::from_secs($default))
79 };
80 ($key:expr, $default:expr) => {
81 cfg!(@val $key)
82 .ok()
83 .and_then(|s| s.parse().ok())
84 .unwrap_or($default.to_owned())
85 .into()
86 };
87 }
88
89 let log_level = cfg!("LOG_LEVEL", "info");
90
91 let relay_host = cfg!(
92 "RELAY_HOST",
93 Url::parse("wss://relay.fire.hose.cam").unwrap()
94 );
95 let plc_urls: Vec<Url> = std::env::var("HYDRANT_PLC_URL")
96 .ok()
97 .map(|s| {
98 s.split(',')
99 .map(|s| Url::parse(s.trim()))
100 .collect::<Result<Vec<_>, _>>()
101 .map_err(|e| miette::miette!("invalid PLC URL: {}", e))
102 })
103 .unwrap_or_else(|| Ok(vec![Url::parse("https://plc.wtf").unwrap()]))?;
104
105 let full_network: bool = cfg!("FULL_NETWORK", false);
106 let backfill_concurrency_limit = cfg!("BACKFILL_CONCURRENCY_LIMIT", 128usize);
107 let cursor_save_interval = cfg!("CURSOR_SAVE_INTERVAL", 5, sec);
108 let repo_fetch_timeout = cfg!("REPO_FETCH_TIMEOUT", 300, sec);
109
110 let database_path = cfg!("DATABASE_PATH", "./hydrant.db");
111 let cache_size = cfg!("CACHE_SIZE", 256u64);
112 let disable_lz4_compression = cfg!("NO_LZ4_COMPRESSION", false);
113
114 let api_port = cfg!("API_PORT", 3000u16);
115 let enable_debug = cfg!("ENABLE_DEBUG", false);
116 let debug_port = cfg!("DEBUG_PORT", 3001u16);
117 let verify_signatures = cfg!("VERIFY_SIGNATURES", SignatureVerification::Full);
118 let identity_cache_size = cfg!("IDENTITY_CACHE_SIZE", 1_000_000u64);
119 let disable_firehose = cfg!("DISABLE_FIREHOSE", false);
120 let disable_backfill = cfg!("DISABLE_BACKFILL", false);
121 let firehose_workers = cfg!("FIREHOSE_WORKERS", 32usize);
122
123 let (
124 default_db_worker_threads,
125 default_db_max_journaling_size_mb,
126 default_db_memtable_size_mb,
127 default_records_memtable_size_mb,
128 default_partition_overrides,
129 ): (usize, u64, u64, u64, &str) = full_network
130 .then_some((8usize, 1024u64, 192u64, 8u64, "app.bsky.*=64"))
131 .unwrap_or((4usize, 512u64, 64u64, 16u64, ""));
132
133 let db_worker_threads = cfg!("DB_WORKER_THREADS", default_db_worker_threads);
134 let db_max_journaling_size_mb = cfg!(
135 "DB_MAX_JOURNALING_SIZE_MB",
136 default_db_max_journaling_size_mb
137 );
138 let db_pending_memtable_size_mb =
139 cfg!("DB_PENDING_MEMTABLE_SIZE_MB", default_db_memtable_size_mb);
140 let db_blocks_memtable_size_mb =
141 cfg!("DB_BLOCKS_MEMTABLE_SIZE_MB", default_db_memtable_size_mb);
142 let db_repos_memtable_size_mb =
143 cfg!("DB_REPOS_MEMTABLE_SIZE_MB", default_db_memtable_size_mb);
144 let db_events_memtable_size_mb =
145 cfg!("DB_EVENTS_MEMTABLE_SIZE_MB", default_db_memtable_size_mb);
146 let db_records_default_memtable_size_mb = cfg!(
147 "DB_RECORDS_DEFAULT_MEMTABLE_SIZE_MB",
148 default_records_memtable_size_mb
149 );
150
151 let db_records_partition_overrides: Vec<(glob::Pattern, u64)> =
152 std::env::var("HYDRANT_DB_RECORDS_PARTITION_OVERRIDES")
153 .unwrap_or_else(|_| default_partition_overrides.to_string())
154 .split(',')
155 .filter(|s| !s.is_empty())
156 .map(|s| {
157 let mut parts = s.split('=');
158 let pattern = parts
159 .next()
160 .ok_or_else(|| miette::miette!("invalid partition override format"))?;
161 let size = parts
162 .next()
163 .ok_or_else(|| miette::miette!("invalid partition override format"))?
164 .parse::<u64>()
165 .into_diagnostic()?;
166 Ok((glob::Pattern::new(pattern).into_diagnostic()?, size))
167 })
168 .collect::<Result<Vec<_>>>()?;
169
170 Ok(Self {
171 database_path,
172 relay_host,
173 plc_urls,
174 full_network,
175 cursor_save_interval,
176 repo_fetch_timeout,
177 log_level,
178 api_port,
179 cache_size,
180 backfill_concurrency_limit,
181 disable_lz4_compression,
182 debug_port,
183 enable_debug,
184 verify_signatures,
185 identity_cache_size,
186 disable_firehose,
187 disable_backfill,
188 firehose_workers,
189 db_worker_threads,
190 db_max_journaling_size_mb,
191 db_pending_memtable_size_mb,
192 db_blocks_memtable_size_mb,
193 db_repos_memtable_size_mb,
194 db_events_memtable_size_mb,
195 db_records_default_memtable_size_mb,
196 db_records_partition_overrides: db_records_partition_overrides,
197 })
198 }
199}
200
201impl fmt::Display for Config {
202 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
203 writeln!(f, "hydrant configuration:")?;
204 writeln!(f, " log level: {}", self.log_level)?;
205 writeln!(f, " relay host: {}", self.relay_host)?;
206 writeln!(f, " plc urls: {:?}", self.plc_urls)?;
207 writeln!(f, " full network indexing: {}", self.full_network)?;
208 writeln!(f, " verify signatures: {}", self.verify_signatures)?;
209 writeln!(
210 f,
211 " backfill concurrency: {}",
212 self.backfill_concurrency_limit
213 )?;
214 writeln!(
215 f,
216 " identity cache size: {}",
217 self.identity_cache_size
218 )?;
219 writeln!(
220 f,
221 " cursor save interval: {}sec",
222 self.cursor_save_interval.as_secs()
223 )?;
224 writeln!(
225 f,
226 " repo fetch timeout: {}sec",
227 self.repo_fetch_timeout.as_secs()
228 )?;
229 writeln!(
230 f,
231 " database path: {}",
232 self.database_path.to_string_lossy()
233 )?;
234 writeln!(f, " cache size: {} mb", self.cache_size)?;
235 writeln!(
236 f,
237 " disable lz4 compression: {}",
238 self.disable_lz4_compression
239 )?;
240 writeln!(f, " api port: {}", self.api_port)?;
241 writeln!(f, " firehose workers: {}", self.firehose_workers)?;
242 writeln!(f, " db worker threads: {}", self.db_worker_threads)?;
243 writeln!(
244 f,
245 " db journal size: {} mb",
246 self.db_max_journaling_size_mb
247 )?;
248 writeln!(
249 f,
250 " db pending memtable: {} mb",
251 self.db_pending_memtable_size_mb
252 )?;
253 writeln!(
254 f,
255 " db blocks memtable: {} mb",
256 self.db_blocks_memtable_size_mb
257 )?;
258 writeln!(
259 f,
260 " db repos memtable: {} mb",
261 self.db_repos_memtable_size_mb
262 )?;
263 writeln!(
264 f,
265 " db events memtable: {} mb",
266 self.db_events_memtable_size_mb
267 )?;
268 writeln!(
269 f,
270 " db records def memtable: {} mb",
271 self.db_records_default_memtable_size_mb
272 )?;
273 writeln!(f, " enable debug: {}", self.enable_debug)?;
274 if self.enable_debug {
275 writeln!(f, " debug port: {}", self.debug_port)?;
276 }
277 Ok(())
278 }
279}