at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall
at 4fe44f8a28620861fb772d40d55fbeb65716e5df 281 lines 9.6 kB view raw
1use miette::Result; 2use smol_str::SmolStr; 3use std::fmt; 4use std::path::PathBuf; 5use std::str::FromStr; 6use std::time::Duration; 7use url::Url; 8 9#[derive(Debug, Clone, Copy)] 10pub enum SignatureVerification { 11 Full, 12 BackfillOnly, 13 None, 14} 15 16impl FromStr for SignatureVerification { 17 type Err = miette::Error; 18 fn from_str(s: &str) -> Result<Self> { 19 match s { 20 "full" => Ok(Self::Full), 21 "backfill-only" => Ok(Self::BackfillOnly), 22 "none" => Ok(Self::None), 23 _ => Err(miette::miette!("invalid signature verification level")), 24 } 25 } 26} 27 28impl fmt::Display for SignatureVerification { 29 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 30 match self { 31 Self::Full => write!(f, "full"), 32 Self::BackfillOnly => write!(f, "backfill-only"), 33 Self::None => write!(f, "none"), 34 } 35 } 36} 37 38#[derive(Debug, Clone)] 39pub struct Config { 40 pub database_path: PathBuf, 41 pub relay_host: Url, 42 pub plc_urls: Vec<Url>, 43 pub full_network: bool, 44 pub cursor_save_interval: Duration, 45 pub repo_fetch_timeout: Duration, 46 pub log_level: SmolStr, 47 pub api_port: u16, 48 pub cache_size: u64, 49 pub backfill_concurrency_limit: usize, 50 pub disable_lz4_compression: bool, 51 pub debug_port: u16, 52 pub enable_debug: bool, 53 pub verify_signatures: SignatureVerification, 54 pub identity_cache_size: u64, 55 pub enable_firehose: bool, 56 pub enable_backfill: bool, 57 pub enable_crawler: Option<bool>, 58 pub firehose_workers: usize, 59 pub db_worker_threads: usize, 60 pub db_max_journaling_size_mb: u64, 61 pub db_pending_memtable_size_mb: u64, 62 pub db_blocks_memtable_size_mb: u64, 63 pub db_repos_memtable_size_mb: u64, 64 pub db_events_memtable_size_mb: u64, 65 pub db_records_memtable_size_mb: u64, 66 pub crawler_max_pending_repos: usize, 67 pub crawler_resume_pending_repos: usize, 68} 69 70impl Config { 71 pub fn from_env() -> Result<Self> { 72 macro_rules! cfg { 73 (@val $key:expr) => { 74 std::env::var(concat!("HYDRANT_", $key)) 75 }; 76 ($key:expr, $default:expr, sec) => { 77 cfg!(@val $key) 78 .ok() 79 .and_then(|s| humantime::parse_duration(&s).ok()) 80 .unwrap_or(Duration::from_secs($default)) 81 }; 82 ($key:expr, $default:expr) => { 83 cfg!(@val $key) 84 .ok() 85 .and_then(|s| s.parse().ok()) 86 .unwrap_or($default.to_owned()) 87 .into() 88 }; 89 } 90 91 let log_level = cfg!("LOG_LEVEL", "info"); 92 93 let relay_host = cfg!( 94 "RELAY_HOST", 95 Url::parse("wss://relay.fire.hose.cam").unwrap() 96 ); 97 let plc_urls: Vec<Url> = std::env::var("HYDRANT_PLC_URL") 98 .ok() 99 .map(|s| { 100 s.split(',') 101 .map(|s| Url::parse(s.trim())) 102 .collect::<Result<Vec<_>, _>>() 103 .map_err(|e| miette::miette!("invalid PLC URL: {}", e)) 104 }) 105 .unwrap_or_else(|| Ok(vec![Url::parse("https://plc.wtf").unwrap()]))?; 106 107 let full_network: bool = cfg!("FULL_NETWORK", false); 108 let cursor_save_interval = cfg!("CURSOR_SAVE_INTERVAL", 5, sec); 109 let repo_fetch_timeout = cfg!("REPO_FETCH_TIMEOUT", 300, sec); 110 111 let database_path = cfg!("DATABASE_PATH", "./hydrant.db"); 112 let cache_size = cfg!("CACHE_SIZE", 256u64); 113 let disable_lz4_compression = cfg!("NO_LZ4_COMPRESSION", false); 114 115 let api_port = cfg!("API_PORT", 3000u16); 116 let enable_debug = cfg!("ENABLE_DEBUG", false); 117 let debug_port = cfg!("DEBUG_PORT", 3001u16); 118 let verify_signatures = cfg!("VERIFY_SIGNATURES", SignatureVerification::Full); 119 let identity_cache_size = cfg!("IDENTITY_CACHE_SIZE", 1_000_000u64); 120 let enable_firehose = cfg!("ENABLE_FIREHOSE", true); 121 let enable_backfill = cfg!("ENABLE_BACKFILL", true); 122 let enable_crawler = std::env::var("HYDRANT_ENABLE_CRAWLER") 123 .ok() 124 .and_then(|s| s.parse().ok()); 125 126 let backfill_concurrency_limit = cfg!("BACKFILL_CONCURRENCY_LIMIT", 128usize); 127 let firehose_workers = cfg!( 128 "FIREHOSE_WORKERS", 129 full_network.then_some(32usize).unwrap_or(8usize) 130 ); 131 132 let ( 133 default_db_worker_threads, 134 default_db_max_journaling_size_mb, 135 default_db_memtable_size_mb, 136 ): (usize, u64, u64) = full_network 137 .then_some((8usize, 1024u64, 192u64)) 138 .unwrap_or((4usize, 512u64, 64u64)); 139 140 let db_worker_threads = cfg!("DB_WORKER_THREADS", default_db_worker_threads); 141 let db_max_journaling_size_mb = cfg!( 142 "DB_MAX_JOURNALING_SIZE_MB", 143 default_db_max_journaling_size_mb 144 ); 145 let db_pending_memtable_size_mb = 146 cfg!("DB_PENDING_MEMTABLE_SIZE_MB", default_db_memtable_size_mb); 147 let db_blocks_memtable_size_mb = 148 cfg!("DB_BLOCKS_MEMTABLE_SIZE_MB", default_db_memtable_size_mb); 149 let db_repos_memtable_size_mb = 150 cfg!("DB_REPOS_MEMTABLE_SIZE_MB", default_db_memtable_size_mb); 151 let db_events_memtable_size_mb = 152 cfg!("DB_EVENTS_MEMTABLE_SIZE_MB", default_db_memtable_size_mb); 153 let db_records_memtable_size_mb = 154 cfg!("DB_RECORDS_MEMTABLE_SIZE_MB", default_db_memtable_size_mb); 155 156 let crawler_max_pending_repos = cfg!("CRAWLER_MAX_PENDING_REPOS", 2000usize); 157 let crawler_resume_pending_repos = cfg!("CRAWLER_RESUME_PENDING_REPOS", 1000usize); 158 159 Ok(Self { 160 database_path, 161 relay_host, 162 plc_urls, 163 full_network, 164 cursor_save_interval, 165 repo_fetch_timeout, 166 log_level, 167 api_port, 168 cache_size, 169 backfill_concurrency_limit, 170 disable_lz4_compression, 171 debug_port, 172 enable_debug, 173 verify_signatures, 174 identity_cache_size, 175 enable_firehose, 176 enable_backfill, 177 enable_crawler, 178 firehose_workers, 179 db_worker_threads, 180 db_max_journaling_size_mb, 181 db_pending_memtable_size_mb, 182 db_blocks_memtable_size_mb, 183 db_repos_memtable_size_mb, 184 db_events_memtable_size_mb, 185 db_records_memtable_size_mb, 186 crawler_max_pending_repos, 187 crawler_resume_pending_repos, 188 }) 189 } 190} 191 192impl fmt::Display for Config { 193 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 194 writeln!(f, "hydrant configuration:")?; 195 writeln!(f, " log level: {}", self.log_level)?; 196 writeln!(f, " relay host: {}", self.relay_host)?; 197 writeln!(f, " plc urls: {:?}", self.plc_urls)?; 198 writeln!(f, " full network indexing: {}", self.full_network)?; 199 writeln!(f, " verify signatures: {}", self.verify_signatures)?; 200 writeln!( 201 f, 202 " backfill concurrency: {}", 203 self.backfill_concurrency_limit 204 )?; 205 writeln!( 206 f, 207 " identity cache size: {}", 208 self.identity_cache_size 209 )?; 210 writeln!( 211 f, 212 " cursor save interval: {}sec", 213 self.cursor_save_interval.as_secs() 214 )?; 215 writeln!( 216 f, 217 " repo fetch timeout: {}sec", 218 self.repo_fetch_timeout.as_secs() 219 )?; 220 writeln!( 221 f, 222 " database path: {}", 223 self.database_path.to_string_lossy() 224 )?; 225 writeln!(f, " cache size: {} mb", self.cache_size)?; 226 writeln!( 227 f, 228 " disable lz4 compression: {}", 229 self.disable_lz4_compression 230 )?; 231 writeln!(f, " api port: {}", self.api_port)?; 232 writeln!(f, " firehose workers: {}", self.firehose_workers)?; 233 writeln!(f, " db worker threads: {}", self.db_worker_threads)?; 234 writeln!( 235 f, 236 " db journal size: {} mb", 237 self.db_max_journaling_size_mb 238 )?; 239 writeln!( 240 f, 241 " db pending memtable: {} mb", 242 self.db_pending_memtable_size_mb 243 )?; 244 writeln!( 245 f, 246 " db blocks memtable: {} mb", 247 self.db_blocks_memtable_size_mb 248 )?; 249 writeln!( 250 f, 251 " db repos memtable: {} mb", 252 self.db_repos_memtable_size_mb 253 )?; 254 writeln!( 255 f, 256 " db events memtable: {} mb", 257 self.db_events_memtable_size_mb 258 )?; 259 writeln!( 260 f, 261 " db records memtable: {} mb", 262 self.db_records_memtable_size_mb 263 )?; 264 265 writeln!( 266 f, 267 " crawler max pending: {}", 268 self.crawler_max_pending_repos 269 )?; 270 writeln!( 271 f, 272 " crawler resume pending: {}", 273 self.crawler_resume_pending_repos 274 )?; 275 writeln!(f, " enable debug: {}", self.enable_debug)?; 276 if self.enable_debug { 277 writeln!(f, " debug port: {}", self.debug_port)?; 278 } 279 Ok(()) 280 } 281}