at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall
at 3eb43b6191a55f71f8327cf6d64290a168d60cff 279 lines 10 kB view raw
1use miette::{IntoDiagnostic, Result}; 2use smol_str::SmolStr; 3use std::fmt; 4use std::path::PathBuf; 5use std::str::FromStr; 6use std::time::Duration; 7use url::Url; 8 9#[derive(Debug, Clone, Copy)] 10pub enum SignatureVerification { 11 Full, 12 BackfillOnly, 13 None, 14} 15 16impl FromStr for SignatureVerification { 17 type Err = miette::Error; 18 fn from_str(s: &str) -> Result<Self> { 19 match s { 20 "full" => Ok(Self::Full), 21 "backfill-only" => Ok(Self::BackfillOnly), 22 "none" => Ok(Self::None), 23 _ => Err(miette::miette!("invalid signature verification level")), 24 } 25 } 26} 27 28impl fmt::Display for SignatureVerification { 29 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 30 match self { 31 Self::Full => write!(f, "full"), 32 Self::BackfillOnly => write!(f, "backfill-only"), 33 Self::None => write!(f, "none"), 34 } 35 } 36} 37 38#[derive(Debug, Clone)] 39pub struct Config { 40 pub database_path: PathBuf, 41 pub relay_host: Url, 42 pub plc_urls: Vec<Url>, 43 pub full_network: bool, 44 pub cursor_save_interval: Duration, 45 pub repo_fetch_timeout: Duration, 46 pub log_level: SmolStr, 47 pub api_port: u16, 48 pub cache_size: u64, 49 pub backfill_concurrency_limit: usize, 50 pub disable_lz4_compression: bool, 51 pub debug_port: u16, 52 pub enable_debug: bool, 53 pub verify_signatures: SignatureVerification, 54 pub identity_cache_size: u64, 55 pub disable_firehose: bool, 56 pub disable_backfill: bool, 57 pub firehose_workers: usize, 58 pub db_worker_threads: usize, 59 pub db_max_journaling_size_mb: u64, 60 pub db_pending_memtable_size_mb: u64, 61 pub db_blocks_memtable_size_mb: u64, 62 pub db_repos_memtable_size_mb: u64, 63 pub db_events_memtable_size_mb: u64, 64 pub db_records_default_memtable_size_mb: u64, 65 pub db_records_partition_overrides: Vec<(glob::Pattern, u64)>, 66} 67 68impl Config { 69 pub fn from_env() -> Result<Self> { 70 macro_rules! cfg { 71 (@val $key:expr) => { 72 std::env::var(concat!("HYDRANT_", $key)) 73 }; 74 ($key:expr, $default:expr, sec) => { 75 cfg!(@val $key) 76 .ok() 77 .and_then(|s| humantime::parse_duration(&s).ok()) 78 .unwrap_or(Duration::from_secs($default)) 79 }; 80 ($key:expr, $default:expr) => { 81 cfg!(@val $key) 82 .ok() 83 .and_then(|s| s.parse().ok()) 84 .unwrap_or($default.to_owned()) 85 .into() 86 }; 87 } 88 89 let log_level = cfg!("LOG_LEVEL", "info"); 90 91 let relay_host = cfg!( 92 "RELAY_HOST", 93 Url::parse("wss://relay.fire.hose.cam").unwrap() 94 ); 95 let plc_urls: Vec<Url> = std::env::var("HYDRANT_PLC_URL") 96 .ok() 97 .map(|s| { 98 s.split(',') 99 .map(|s| Url::parse(s.trim())) 100 .collect::<Result<Vec<_>, _>>() 101 .map_err(|e| miette::miette!("invalid PLC URL: {}", e)) 102 }) 103 .unwrap_or_else(|| Ok(vec![Url::parse("https://plc.wtf").unwrap()]))?; 104 105 let full_network: bool = cfg!("FULL_NETWORK", false); 106 let backfill_concurrency_limit = cfg!("BACKFILL_CONCURRENCY_LIMIT", 128usize); 107 let cursor_save_interval = cfg!("CURSOR_SAVE_INTERVAL", 5, sec); 108 let repo_fetch_timeout = cfg!("REPO_FETCH_TIMEOUT", 300, sec); 109 110 let database_path = cfg!("DATABASE_PATH", "./hydrant.db"); 111 let cache_size = cfg!("CACHE_SIZE", 256u64); 112 let disable_lz4_compression = cfg!("NO_LZ4_COMPRESSION", false); 113 114 let api_port = cfg!("API_PORT", 3000u16); 115 let enable_debug = cfg!("ENABLE_DEBUG", false); 116 let debug_port = cfg!("DEBUG_PORT", 3001u16); 117 let verify_signatures = cfg!("VERIFY_SIGNATURES", SignatureVerification::Full); 118 let identity_cache_size = cfg!("IDENTITY_CACHE_SIZE", 1_000_000u64); 119 let disable_firehose = cfg!("DISABLE_FIREHOSE", false); 120 let disable_backfill = cfg!("DISABLE_BACKFILL", false); 121 let firehose_workers = cfg!("FIREHOSE_WORKERS", 32usize); 122 123 let ( 124 default_db_worker_threads, 125 default_db_max_journaling_size_mb, 126 default_db_memtable_size_mb, 127 default_records_memtable_size_mb, 128 default_partition_overrides, 129 ): (usize, u64, u64, u64, &str) = full_network 130 .then_some((8usize, 1024u64, 192u64, 8u64, "app.bsky.*=64")) 131 .unwrap_or((4usize, 512u64, 64u64, 16u64, "")); 132 133 let db_worker_threads = cfg!("DB_WORKER_THREADS", default_db_worker_threads); 134 let db_max_journaling_size_mb = cfg!( 135 "DB_MAX_JOURNALING_SIZE_MB", 136 default_db_max_journaling_size_mb 137 ); 138 let db_pending_memtable_size_mb = 139 cfg!("DB_PENDING_MEMTABLE_SIZE_MB", default_db_memtable_size_mb); 140 let db_blocks_memtable_size_mb = 141 cfg!("DB_BLOCKS_MEMTABLE_SIZE_MB", default_db_memtable_size_mb); 142 let db_repos_memtable_size_mb = 143 cfg!("DB_REPOS_MEMTABLE_SIZE_MB", default_db_memtable_size_mb); 144 let db_events_memtable_size_mb = 145 cfg!("DB_EVENTS_MEMTABLE_SIZE_MB", default_db_memtable_size_mb); 146 let db_records_default_memtable_size_mb = cfg!( 147 "DB_RECORDS_DEFAULT_MEMTABLE_SIZE_MB", 148 default_records_memtable_size_mb 149 ); 150 151 let db_records_partition_overrides: Vec<(glob::Pattern, u64)> = 152 std::env::var("HYDRANT_DB_RECORDS_PARTITION_OVERRIDES") 153 .unwrap_or_else(|_| default_partition_overrides.to_string()) 154 .split(',') 155 .filter(|s| !s.is_empty()) 156 .map(|s| { 157 let mut parts = s.split('='); 158 let pattern = parts 159 .next() 160 .ok_or_else(|| miette::miette!("invalid partition override format"))?; 161 let size = parts 162 .next() 163 .ok_or_else(|| miette::miette!("invalid partition override format"))? 164 .parse::<u64>() 165 .into_diagnostic()?; 166 Ok((glob::Pattern::new(pattern).into_diagnostic()?, size)) 167 }) 168 .collect::<Result<Vec<_>>>()?; 169 170 Ok(Self { 171 database_path, 172 relay_host, 173 plc_urls, 174 full_network, 175 cursor_save_interval, 176 repo_fetch_timeout, 177 log_level, 178 api_port, 179 cache_size, 180 backfill_concurrency_limit, 181 disable_lz4_compression, 182 debug_port, 183 enable_debug, 184 verify_signatures, 185 identity_cache_size, 186 disable_firehose, 187 disable_backfill, 188 firehose_workers, 189 db_worker_threads, 190 db_max_journaling_size_mb, 191 db_pending_memtable_size_mb, 192 db_blocks_memtable_size_mb, 193 db_repos_memtable_size_mb, 194 db_events_memtable_size_mb, 195 db_records_default_memtable_size_mb, 196 db_records_partition_overrides: db_records_partition_overrides, 197 }) 198 } 199} 200 201impl fmt::Display for Config { 202 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 203 writeln!(f, "hydrant configuration:")?; 204 writeln!(f, " log level: {}", self.log_level)?; 205 writeln!(f, " relay host: {}", self.relay_host)?; 206 writeln!(f, " plc urls: {:?}", self.plc_urls)?; 207 writeln!(f, " full network indexing: {}", self.full_network)?; 208 writeln!(f, " verify signatures: {}", self.verify_signatures)?; 209 writeln!( 210 f, 211 " backfill concurrency: {}", 212 self.backfill_concurrency_limit 213 )?; 214 writeln!( 215 f, 216 " identity cache size: {}", 217 self.identity_cache_size 218 )?; 219 writeln!( 220 f, 221 " cursor save interval: {}sec", 222 self.cursor_save_interval.as_secs() 223 )?; 224 writeln!( 225 f, 226 " repo fetch timeout: {}sec", 227 self.repo_fetch_timeout.as_secs() 228 )?; 229 writeln!( 230 f, 231 " database path: {}", 232 self.database_path.to_string_lossy() 233 )?; 234 writeln!(f, " cache size: {} mb", self.cache_size)?; 235 writeln!( 236 f, 237 " disable lz4 compression: {}", 238 self.disable_lz4_compression 239 )?; 240 writeln!(f, " api port: {}", self.api_port)?; 241 writeln!(f, " firehose workers: {}", self.firehose_workers)?; 242 writeln!(f, " db worker threads: {}", self.db_worker_threads)?; 243 writeln!( 244 f, 245 " db journal size: {} mb", 246 self.db_max_journaling_size_mb 247 )?; 248 writeln!( 249 f, 250 " db pending memtable: {} mb", 251 self.db_pending_memtable_size_mb 252 )?; 253 writeln!( 254 f, 255 " db blocks memtable: {} mb", 256 self.db_blocks_memtable_size_mb 257 )?; 258 writeln!( 259 f, 260 " db repos memtable: {} mb", 261 self.db_repos_memtable_size_mb 262 )?; 263 writeln!( 264 f, 265 " db events memtable: {} mb", 266 self.db_events_memtable_size_mb 267 )?; 268 writeln!( 269 f, 270 " db records def memtable: {} mb", 271 self.db_records_default_memtable_size_mb 272 )?; 273 writeln!(f, " enable debug: {}", self.enable_debug)?; 274 if self.enable_debug { 275 writeln!(f, " debug port: {}", self.debug_port)?; 276 } 277 Ok(()) 278 } 279}