at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall
at main 302 lines 11 kB view raw
1use miette::Result; 2use std::fmt; 3use std::path::PathBuf; 4use std::str::FromStr; 5use std::time::Duration; 6use url::Url; 7 8#[derive(Debug, Clone, Copy)] 9pub enum SignatureVerification { 10 Full, 11 BackfillOnly, 12 None, 13} 14 15impl FromStr for SignatureVerification { 16 type Err = miette::Error; 17 fn from_str(s: &str) -> Result<Self> { 18 match s { 19 "full" => Ok(Self::Full), 20 "backfill-only" => Ok(Self::BackfillOnly), 21 "none" => Ok(Self::None), 22 _ => Err(miette::miette!("invalid signature verification level")), 23 } 24 } 25} 26 27impl fmt::Display for SignatureVerification { 28 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 29 match self { 30 Self::Full => write!(f, "full"), 31 Self::BackfillOnly => write!(f, "backfill-only"), 32 Self::None => write!(f, "none"), 33 } 34 } 35} 36 37#[derive(Debug, Clone)] 38pub struct Config { 39 pub database_path: PathBuf, 40 pub relay_host: Url, 41 pub plc_urls: Vec<Url>, 42 pub full_network: bool, 43 pub ephemeral: bool, 44 pub cursor_save_interval: Duration, 45 pub repo_fetch_timeout: Duration, 46 pub api_port: u16, 47 pub cache_size: u64, 48 pub backfill_concurrency_limit: usize, 49 pub disable_lz4_compression: bool, 50 pub debug_port: u16, 51 pub enable_debug: bool, 52 pub verify_signatures: SignatureVerification, 53 pub identity_cache_size: u64, 54 pub enable_firehose: bool, 55 pub enable_backfill: bool, 56 pub enable_crawler: Option<bool>, 57 pub firehose_workers: usize, 58 pub db_worker_threads: usize, 59 pub db_max_journaling_size_mb: u64, 60 pub db_pending_memtable_size_mb: u64, 61 pub db_blocks_memtable_size_mb: u64, 62 pub db_repos_memtable_size_mb: u64, 63 pub db_events_memtable_size_mb: u64, 64 pub db_records_memtable_size_mb: u64, 65 pub crawler_max_pending_repos: usize, 66 pub crawler_resume_pending_repos: usize, 67 pub filter_signals: Option<Vec<String>>, 68 pub filter_collections: Option<Vec<String>>, 69 pub filter_excludes: Option<Vec<String>>, 70} 71 72impl Config { 73 pub fn from_env() -> Result<Self> { 74 macro_rules! cfg { 75 (@val $key:expr) => { 76 std::env::var(concat!("HYDRANT_", $key)) 77 }; 78 ($key:expr, $default:expr, sec) => { 79 cfg!(@val $key) 80 .ok() 81 .and_then(|s| humantime::parse_duration(&s).ok()) 82 .unwrap_or(Duration::from_secs($default)) 83 }; 84 ($key:expr, $default:expr) => { 85 cfg!(@val $key) 86 .ok() 87 .and_then(|s| s.parse().ok()) 88 .unwrap_or($default.to_owned()) 89 .into() 90 }; 91 } 92 93 let relay_host = cfg!( 94 "RELAY_HOST", 95 Url::parse("wss://relay.fire.hose.cam").unwrap() 96 ); 97 let plc_urls: Vec<Url> = std::env::var("HYDRANT_PLC_URL") 98 .ok() 99 .map(|s| { 100 s.split(',') 101 .map(|s| Url::parse(s.trim())) 102 .collect::<Result<Vec<_>, _>>() 103 .map_err(|e| miette::miette!("invalid PLC URL: {e}")) 104 }) 105 .unwrap_or_else(|| Ok(vec![Url::parse("https://plc.wtf").unwrap()]))?; 106 107 let full_network: bool = cfg!("FULL_NETWORK", false); 108 let cursor_save_interval = cfg!("CURSOR_SAVE_INTERVAL", 5, sec); 109 let repo_fetch_timeout = cfg!("REPO_FETCH_TIMEOUT", 300, sec); 110 111 let ephemeral: bool = cfg!("EPHEMERAL", false); 112 let database_path = cfg!("DATABASE_PATH", "./hydrant.db"); 113 let cache_size = cfg!("CACHE_SIZE", 256u64); 114 let disable_lz4_compression = cfg!("NO_LZ4_COMPRESSION", false); 115 116 let api_port = cfg!("API_PORT", 3000u16); 117 let enable_debug = cfg!("ENABLE_DEBUG", false); 118 let debug_port = cfg!("DEBUG_PORT", 3001u16); 119 let verify_signatures = cfg!("VERIFY_SIGNATURES", SignatureVerification::Full); 120 let identity_cache_size = cfg!("IDENTITY_CACHE_SIZE", 1_000_000u64); 121 let enable_firehose = cfg!("ENABLE_FIREHOSE", true); 122 let enable_backfill = cfg!("ENABLE_BACKFILL", true); 123 let enable_crawler = std::env::var("HYDRANT_ENABLE_CRAWLER") 124 .ok() 125 .and_then(|s| s.parse().ok()); 126 127 let backfill_concurrency_limit = cfg!("BACKFILL_CONCURRENCY_LIMIT", 128usize); 128 let firehose_workers = cfg!( 129 "FIREHOSE_WORKERS", 130 full_network.then_some(32usize).unwrap_or(8usize) 131 ); 132 133 let ( 134 default_db_worker_threads, 135 default_db_max_journaling_size_mb, 136 default_db_memtable_size_mb, 137 ): (usize, u64, u64) = full_network 138 .then_some((8usize, 1024u64, 192u64)) 139 .unwrap_or((4usize, 400u64, 32u64)); 140 141 let db_worker_threads = cfg!("DB_WORKER_THREADS", default_db_worker_threads); 142 let db_max_journaling_size_mb = cfg!( 143 "DB_MAX_JOURNALING_SIZE_MB", 144 default_db_max_journaling_size_mb 145 ); 146 let db_pending_memtable_size_mb = 147 cfg!("DB_PENDING_MEMTABLE_SIZE_MB", default_db_memtable_size_mb); 148 let db_blocks_memtable_size_mb = 149 cfg!("DB_BLOCKS_MEMTABLE_SIZE_MB", default_db_memtable_size_mb); 150 let db_repos_memtable_size_mb = 151 cfg!("DB_REPOS_MEMTABLE_SIZE_MB", default_db_memtable_size_mb); 152 let db_events_memtable_size_mb = 153 cfg!("DB_EVENTS_MEMTABLE_SIZE_MB", default_db_memtable_size_mb); 154 let db_records_memtable_size_mb = 155 cfg!("DB_RECORDS_MEMTABLE_SIZE_MB", default_db_memtable_size_mb); 156 157 let crawler_max_pending_repos = cfg!("CRAWLER_MAX_PENDING_REPOS", 2000usize); 158 let crawler_resume_pending_repos = cfg!("CRAWLER_RESUME_PENDING_REPOS", 1000usize); 159 160 let filter_signals = std::env::var("HYDRANT_FILTER_SIGNALS").ok().map(|s| { 161 s.split(',') 162 .map(|s| s.trim().to_string()) 163 .filter(|s| !s.is_empty()) 164 .collect() 165 }); 166 167 let filter_collections = std::env::var("HYDRANT_FILTER_COLLECTIONS").ok().map(|s| { 168 s.split(',') 169 .map(|s| s.trim().to_string()) 170 .filter(|s| !s.is_empty()) 171 .collect() 172 }); 173 174 let filter_excludes = std::env::var("HYDRANT_FILTER_EXCLUDES").ok().map(|s| { 175 s.split(',') 176 .map(|s| s.trim().to_string()) 177 .filter(|s| !s.is_empty()) 178 .collect() 179 }); 180 181 Ok(Self { 182 database_path, 183 relay_host, 184 plc_urls, 185 ephemeral, 186 full_network, 187 cursor_save_interval, 188 repo_fetch_timeout, 189 api_port, 190 cache_size, 191 backfill_concurrency_limit, 192 disable_lz4_compression, 193 debug_port, 194 enable_debug, 195 verify_signatures, 196 identity_cache_size, 197 enable_firehose, 198 enable_backfill, 199 enable_crawler, 200 firehose_workers, 201 db_worker_threads, 202 db_max_journaling_size_mb, 203 db_pending_memtable_size_mb, 204 db_blocks_memtable_size_mb, 205 db_repos_memtable_size_mb, 206 db_events_memtable_size_mb, 207 db_records_memtable_size_mb, 208 crawler_max_pending_repos, 209 crawler_resume_pending_repos, 210 filter_signals, 211 filter_collections, 212 filter_excludes, 213 }) 214 } 215} 216 217macro_rules! config_line { 218 ($f:expr, $label:expr, $value:expr) => { 219 writeln!($f, " {:<width$}{}", $label, $value, width = LABEL_WIDTH) 220 }; 221} 222 223impl fmt::Display for Config { 224 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 225 const LABEL_WIDTH: usize = 27; 226 227 writeln!(f, "hydrant configuration:")?; 228 config_line!(f, "relay host", self.relay_host)?; 229 config_line!(f, "plc urls", format_args!("{:?}", self.plc_urls))?; 230 config_line!(f, "full network indexing", self.full_network)?; 231 config_line!(f, "verify signatures", self.verify_signatures)?; 232 config_line!(f, "backfill concurrency", self.backfill_concurrency_limit)?; 233 config_line!(f, "identity cache size", self.identity_cache_size)?; 234 config_line!( 235 f, 236 "cursor save interval", 237 format_args!("{}sec", self.cursor_save_interval.as_secs()) 238 )?; 239 config_line!( 240 f, 241 "repo fetch timeout", 242 format_args!("{}sec", self.repo_fetch_timeout.as_secs()) 243 )?; 244 config_line!(f, "ephemeral", self.ephemeral)?; 245 config_line!(f, "database path", self.database_path.to_string_lossy())?; 246 config_line!(f, "cache size", format_args!("{} mb", self.cache_size))?; 247 config_line!(f, "disable lz4 compression", self.disable_lz4_compression)?; 248 config_line!(f, "api port", self.api_port)?; 249 config_line!(f, "firehose workers", self.firehose_workers)?; 250 config_line!(f, "db worker threads", self.db_worker_threads)?; 251 config_line!( 252 f, 253 "db journal size", 254 format_args!("{} mb", self.db_max_journaling_size_mb) 255 )?; 256 config_line!( 257 f, 258 "db pending memtable", 259 format_args!("{} mb", self.db_pending_memtable_size_mb) 260 )?; 261 config_line!( 262 f, 263 "db blocks memtable", 264 format_args!("{} mb", self.db_blocks_memtable_size_mb) 265 )?; 266 config_line!( 267 f, 268 "db repos memtable", 269 format_args!("{} mb", self.db_repos_memtable_size_mb) 270 )?; 271 config_line!( 272 f, 273 "db events memtable", 274 format_args!("{} mb", self.db_events_memtable_size_mb) 275 )?; 276 config_line!( 277 f, 278 "db records memtable", 279 format_args!("{} mb", self.db_records_memtable_size_mb) 280 )?; 281 config_line!(f, "crawler max pending", self.crawler_max_pending_repos)?; 282 config_line!( 283 f, 284 "crawler resume pending", 285 self.crawler_resume_pending_repos 286 )?; 287 if let Some(signals) = &self.filter_signals { 288 config_line!(f, "filter signals", format_args!("{:?}", signals))?; 289 } 290 if let Some(collections) = &self.filter_collections { 291 config_line!(f, "filter collections", format_args!("{:?}", collections))?; 292 } 293 if let Some(excludes) = &self.filter_excludes { 294 config_line!(f, "filter excludes", format_args!("{:?}", excludes))?; 295 } 296 config_line!(f, "enable debug", self.enable_debug)?; 297 if self.enable_debug { 298 config_line!(f, "debug port", self.debug_port)?; 299 } 300 Ok(()) 301 } 302}