kind of like tap but different and in rust
at main 276 lines 9.4 kB view raw
1use miette::Result; 2use smol_str::SmolStr; 3use std::fmt; 4use std::path::PathBuf; 5use std::str::FromStr; 6use std::time::Duration; 7use url::Url; 8 9#[derive(Debug, Clone, Copy)] 10pub enum SignatureVerification { 11 Full, 12 BackfillOnly, 13 None, 14} 15 16impl FromStr for SignatureVerification { 17 type Err = miette::Error; 18 fn from_str(s: &str) -> Result<Self> { 19 match s { 20 "full" => Ok(Self::Full), 21 "backfill-only" => Ok(Self::BackfillOnly), 22 "none" => Ok(Self::None), 23 _ => Err(miette::miette!("invalid signature verification level")), 24 } 25 } 26} 27 28impl fmt::Display for SignatureVerification { 29 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 30 match self { 31 Self::Full => write!(f, "full"), 32 Self::BackfillOnly => write!(f, "backfill-only"), 33 Self::None => write!(f, "none"), 34 } 35 } 36} 37 38#[derive(Debug, Clone)] 39pub struct Config { 40 pub database_path: PathBuf, 41 pub relay_host: Url, 42 pub plc_urls: Vec<Url>, 43 pub full_network: bool, 44 pub cursor_save_interval: Duration, 45 pub repo_fetch_timeout: Duration, 46 pub log_level: SmolStr, 47 pub api_port: u16, 48 pub cache_size: u64, 49 pub backfill_concurrency_limit: usize, 50 pub disable_lz4_compression: bool, 51 pub debug_port: u16, 52 pub enable_debug: bool, 53 pub verify_signatures: SignatureVerification, 54 pub identity_cache_size: u64, 55 pub disable_firehose: bool, 56 pub disable_backfill: bool, 57 pub firehose_workers: usize, 58 pub db_worker_threads: usize, 59 pub db_max_journaling_size_mb: u64, 60 pub db_pending_memtable_size_mb: u64, 61 pub db_blocks_memtable_size_mb: u64, 62 pub db_repos_memtable_size_mb: u64, 63 pub db_events_memtable_size_mb: u64, 64 pub db_records_memtable_size_mb: u64, 65 pub crawler_max_pending_repos: usize, 66 pub crawler_resume_pending_repos: usize, 67} 68 69impl Config { 70 pub fn from_env() -> Result<Self> { 71 macro_rules! cfg { 72 (@val $key:expr) => { 73 std::env::var(concat!("HYDRANT_", $key)) 74 }; 75 ($key:expr, $default:expr, sec) => { 76 cfg!(@val $key) 77 .ok() 78 .and_then(|s| humantime::parse_duration(&s).ok()) 79 .unwrap_or(Duration::from_secs($default)) 80 }; 81 ($key:expr, $default:expr) => { 82 cfg!(@val $key) 83 .ok() 84 .and_then(|s| s.parse().ok()) 85 .unwrap_or($default.to_owned()) 86 .into() 87 }; 88 } 89 90 let log_level = cfg!("LOG_LEVEL", "info"); 91 92 let relay_host = cfg!( 93 "RELAY_HOST", 94 Url::parse("wss://relay.fire.hose.cam").unwrap() 95 ); 96 let plc_urls: Vec<Url> = std::env::var("HYDRANT_PLC_URL") 97 .ok() 98 .map(|s| { 99 s.split(',') 100 .map(|s| Url::parse(s.trim())) 101 .collect::<Result<Vec<_>, _>>() 102 .map_err(|e| miette::miette!("invalid PLC URL: {}", e)) 103 }) 104 .unwrap_or_else(|| Ok(vec![Url::parse("https://plc.wtf").unwrap()]))?; 105 106 let full_network: bool = cfg!("FULL_NETWORK", false); 107 let cursor_save_interval = cfg!("CURSOR_SAVE_INTERVAL", 5, sec); 108 let repo_fetch_timeout = cfg!("REPO_FETCH_TIMEOUT", 300, sec); 109 110 let database_path = cfg!("DATABASE_PATH", "./hydrant.db"); 111 let cache_size = cfg!("CACHE_SIZE", 256u64); 112 let disable_lz4_compression = cfg!("NO_LZ4_COMPRESSION", false); 113 114 let api_port = cfg!("API_PORT", 3000u16); 115 let enable_debug = cfg!("ENABLE_DEBUG", false); 116 let debug_port = cfg!("DEBUG_PORT", 3001u16); 117 let verify_signatures = cfg!("VERIFY_SIGNATURES", SignatureVerification::Full); 118 let identity_cache_size = cfg!("IDENTITY_CACHE_SIZE", 1_000_000u64); 119 let disable_firehose = cfg!("DISABLE_FIREHOSE", false); 120 let disable_backfill = cfg!("DISABLE_BACKFILL", false); 121 122 let backfill_concurrency_limit = cfg!("BACKFILL_CONCURRENCY_LIMIT", 128usize); 123 let firehose_workers = cfg!( 124 "FIREHOSE_WORKERS", 125 full_network.then_some(32usize).unwrap_or(8usize) 126 ); 127 128 let ( 129 default_db_worker_threads, 130 default_db_max_journaling_size_mb, 131 default_db_memtable_size_mb, 132 ): (usize, u64, u64) = full_network 133 .then_some((8usize, 1024u64, 192u64)) 134 .unwrap_or((4usize, 512u64, 64u64)); 135 136 let db_worker_threads = cfg!("DB_WORKER_THREADS", default_db_worker_threads); 137 let db_max_journaling_size_mb = cfg!( 138 "DB_MAX_JOURNALING_SIZE_MB", 139 default_db_max_journaling_size_mb 140 ); 141 let db_pending_memtable_size_mb = 142 cfg!("DB_PENDING_MEMTABLE_SIZE_MB", default_db_memtable_size_mb); 143 let db_blocks_memtable_size_mb = 144 cfg!("DB_BLOCKS_MEMTABLE_SIZE_MB", default_db_memtable_size_mb); 145 let db_repos_memtable_size_mb = 146 cfg!("DB_REPOS_MEMTABLE_SIZE_MB", default_db_memtable_size_mb); 147 let db_events_memtable_size_mb = 148 cfg!("DB_EVENTS_MEMTABLE_SIZE_MB", default_db_memtable_size_mb); 149 let db_records_memtable_size_mb = 150 cfg!("DB_RECORDS_MEMTABLE_SIZE_MB", default_db_memtable_size_mb); 151 152 let crawler_max_pending_repos = cfg!("CRAWLER_MAX_PENDING_REPOS", 2000usize); 153 let crawler_resume_pending_repos = cfg!("CRAWLER_RESUME_PENDING_REPOS", 1000usize); 154 155 Ok(Self { 156 database_path, 157 relay_host, 158 plc_urls, 159 full_network, 160 cursor_save_interval, 161 repo_fetch_timeout, 162 log_level, 163 api_port, 164 cache_size, 165 backfill_concurrency_limit, 166 disable_lz4_compression, 167 debug_port, 168 enable_debug, 169 verify_signatures, 170 identity_cache_size, 171 disable_firehose, 172 disable_backfill, 173 firehose_workers, 174 db_worker_threads, 175 db_max_journaling_size_mb, 176 db_pending_memtable_size_mb, 177 db_blocks_memtable_size_mb, 178 db_repos_memtable_size_mb, 179 db_events_memtable_size_mb, 180 db_records_memtable_size_mb, 181 crawler_max_pending_repos, 182 crawler_resume_pending_repos, 183 }) 184 } 185} 186 187impl fmt::Display for Config { 188 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 189 writeln!(f, "hydrant configuration:")?; 190 writeln!(f, " log level: {}", self.log_level)?; 191 writeln!(f, " relay host: {}", self.relay_host)?; 192 writeln!(f, " plc urls: {:?}", self.plc_urls)?; 193 writeln!(f, " full network indexing: {}", self.full_network)?; 194 writeln!(f, " verify signatures: {}", self.verify_signatures)?; 195 writeln!( 196 f, 197 " backfill concurrency: {}", 198 self.backfill_concurrency_limit 199 )?; 200 writeln!( 201 f, 202 " identity cache size: {}", 203 self.identity_cache_size 204 )?; 205 writeln!( 206 f, 207 " cursor save interval: {}sec", 208 self.cursor_save_interval.as_secs() 209 )?; 210 writeln!( 211 f, 212 " repo fetch timeout: {}sec", 213 self.repo_fetch_timeout.as_secs() 214 )?; 215 writeln!( 216 f, 217 " database path: {}", 218 self.database_path.to_string_lossy() 219 )?; 220 writeln!(f, " cache size: {} mb", self.cache_size)?; 221 writeln!( 222 f, 223 " disable lz4 compression: {}", 224 self.disable_lz4_compression 225 )?; 226 writeln!(f, " api port: {}", self.api_port)?; 227 writeln!(f, " firehose workers: {}", self.firehose_workers)?; 228 writeln!(f, " db worker threads: {}", self.db_worker_threads)?; 229 writeln!( 230 f, 231 " db journal size: {} mb", 232 self.db_max_journaling_size_mb 233 )?; 234 writeln!( 235 f, 236 " db pending memtable: {} mb", 237 self.db_pending_memtable_size_mb 238 )?; 239 writeln!( 240 f, 241 " db blocks memtable: {} mb", 242 self.db_blocks_memtable_size_mb 243 )?; 244 writeln!( 245 f, 246 " db repos memtable: {} mb", 247 self.db_repos_memtable_size_mb 248 )?; 249 writeln!( 250 f, 251 " db events memtable: {} mb", 252 self.db_events_memtable_size_mb 253 )?; 254 writeln!( 255 f, 256 " db records memtable: {} mb", 257 self.db_records_memtable_size_mb 258 )?; 259 260 writeln!( 261 f, 262 " crawler max pending: {}", 263 self.crawler_max_pending_repos 264 )?; 265 writeln!( 266 f, 267 " crawler resume pending: {}", 268 self.crawler_resume_pending_repos 269 )?; 270 writeln!(f, " enable debug: {}", self.enable_debug)?; 271 if self.enable_debug { 272 writeln!(f, " debug port: {}", self.debug_port)?; 273 } 274 Ok(()) 275 } 276}