at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall

[config] refactor the parsing and display prettier config on startup

ptr.pet 61539b51 02165cb8

verified
+101 -79
+5
.agent/rules/read-agents.md
··· 1 + --- 2 + trigger: always_on 3 + --- 4 + 5 + ALWAYS read AGENTS.md at the start of a conversation, and keep it in mind throught.
+79 -56
src/config.rs
··· 1 - use miette::{IntoDiagnostic, Result}; 1 + use miette::Result; 2 2 use smol_str::SmolStr; 3 - use std::env; 3 + use std::fmt; 4 4 use std::path::PathBuf; 5 5 use std::time::Duration; 6 6 use url::Url; ··· 8 8 #[derive(Debug, Clone)] 9 9 pub struct Config { 10 10 pub database_path: PathBuf, 11 - pub relay_host: SmolStr, 11 + pub relay_host: Url, 12 12 pub plc_url: Url, 13 13 pub full_network: bool, 14 14 pub cursor_save_interval: Duration, ··· 24 24 25 25 impl Config { 26 26 pub fn from_env() -> Result<Self> { 27 - let database_path = env::var("HYDRANT_DATABASE_PATH") 28 - .unwrap_or_else(|_| "./hydrant.db".to_string()) 29 - .into(); 30 - 31 - let relay_host = env::var("HYDRANT_RELAY_HOST") 32 - .unwrap_or_else(|_| "wss://relay.fire.hose.cam".to_string()) 33 - .into(); 27 + macro_rules! cfg { 28 + (@val $key:expr) => { 29 + std::env::var(concat!("HYDRANT_", $key)) 30 + }; 31 + ($key:expr, $default:expr, sec) => { 32 + cfg!(@val $key) 33 + .ok() 34 + .and_then(|s| humantime::parse_duration(&s).ok()) 35 + .unwrap_or(Duration::from_secs($default)) 36 + }; 37 + ($key:expr, $default:expr) => { 38 + cfg!(@val $key) 39 + .ok() 40 + .and_then(|s| s.parse().ok()) 41 + .unwrap_or($default.to_owned()) 42 + .into() 43 + }; 44 + } 34 45 35 - let plc_url = env::var("HYDRANT_PLC_URL") 36 - .unwrap_or_else(|_| "https://plc.wtf".to_string()) 37 - .parse() 38 - .into_diagnostic()?; 46 + let log_level = cfg!("LOG_LEVEL", "info"); 39 47 40 - let full_network = env::var("HYDRANT_FULL_NETWORK") 41 - .map(|v| v == "true") 42 - .unwrap_or(false); 48 + let relay_host = cfg!( 49 + "RELAY_HOST", 50 + Url::parse("wss://relay.fire.hose.cam").unwrap() 51 + ); 52 + let plc_url = cfg!("PLC_URL", Url::parse("https://plc.wtf").unwrap()); 43 53 44 - let cursor_save_interval = env::var("HYDRANT_CURSOR_SAVE_INTERVAL") 45 - .ok() 46 - .and_then(|s| humantime::parse_duration(&s).ok()) 47 - .unwrap_or(Duration::from_secs(10)); 54 + let full_network = cfg!("FULL_NETWORK", false); 55 + let backfill_concurrency_limit = cfg!("BACKFILL_CONCURRENCY_LIMIT", 32usize); 56 + let cursor_save_interval = cfg!("CURSOR_SAVE_INTERVAL", 10, sec); 57 + let repo_fetch_timeout = cfg!("REPO_FETCH_TIMEOUT", 300, sec); 48 58 49 - let repo_fetch_timeout = env::var("HYDRANT_REPO_FETCH_TIMEOUT") 50 - .ok() 51 - .and_then(|s| humantime::parse_duration(&s).ok()) 52 - .unwrap_or(Duration::from_secs(300)); 59 + let database_path = cfg!("DATABASE_PATH", "./hydrant.db"); 60 + let cache_size = cfg!("CACHE_SIZE", 256u64); 61 + let disable_lz4_compression = cfg!("NO_LZ4_COMPRESSION", false); 53 62 54 - let log_level = env::var("HYDRANT_LOG_LEVEL") 55 - .unwrap_or_else(|_| "info".to_string()) 56 - .into(); 57 - 58 - let api_port = env::var("HYDRANT_API_PORT") 59 - .ok() 60 - .and_then(|s| s.parse().ok()) 61 - .unwrap_or(3000); 62 - 63 - let cache_size = env::var("HYDRANT_CACHE_SIZE") 64 - .ok() 65 - .and_then(|s| s.parse().ok()) 66 - .unwrap_or(256); 67 - 68 - let backfill_concurrency_limit = env::var("HYDRANT_BACKFILL_CONCURRENCY_LIMIT") 69 - .ok() 70 - .and_then(|s| s.parse().ok()) 71 - .unwrap_or(32); 72 - 73 - let disable_lz4_compression = env::var("HYDRANT_NO_LZ4_COMPRESSION") 74 - .map(|v| v == "true") 75 - .unwrap_or(false); 76 - 77 - let debug_port = env::var("HYDRANT_DEBUG_PORT") 78 - .ok() 79 - .and_then(|s| s.parse().ok()) 80 - .unwrap_or(3001); 81 - 82 - let enable_debug = env::var("HYDRANT_ENABLE_DEBUG") 83 - .map(|v| v == "true") 84 - .unwrap_or(false); 63 + let api_port = cfg!("API_PORT", 3000u16); 64 + let enable_debug = cfg!("ENABLE_DEBUG", false); 65 + let debug_port = cfg!("DEBUG_PORT", 3001u16); 85 66 86 67 Ok(Self { 87 68 database_path, ··· 100 81 }) 101 82 } 102 83 } 84 + 85 + impl fmt::Display for Config { 86 + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 87 + writeln!(f, "hydrant configuration:")?; 88 + writeln!(f, " log level: {}", self.log_level)?; 89 + writeln!(f, " relay host: {}", self.relay_host)?; 90 + writeln!(f, " plc url: {}", self.plc_url)?; 91 + writeln!(f, " full network indexing: {}", self.full_network)?; 92 + writeln!( 93 + f, 94 + " backfill concurrency: {}", 95 + self.backfill_concurrency_limit 96 + )?; 97 + writeln!( 98 + f, 99 + " cursor save interval: {}sec", 100 + self.cursor_save_interval.as_secs() 101 + )?; 102 + writeln!( 103 + f, 104 + " repo fetch timeout: {}sec", 105 + self.repo_fetch_timeout.as_secs() 106 + )?; 107 + writeln!( 108 + f, 109 + " database path: {}", 110 + self.database_path.to_string_lossy() 111 + )?; 112 + writeln!(f, " cache size: {} mb", self.cache_size)?; 113 + writeln!( 114 + f, 115 + " disable lz4 compression: {}", 116 + self.disable_lz4_compression 117 + )?; 118 + writeln!(f, " api port: {}", self.api_port)?; 119 + writeln!(f, " enable debug: {}", self.enable_debug)?; 120 + if self.enable_debug { 121 + writeln!(f, " debug port: {}", self.debug_port)?; 122 + } 123 + Ok(()) 124 + } 125 + }
+3 -5
src/crawler/mod.rs
··· 14 14 15 15 pub struct Crawler { 16 16 state: Arc<AppState>, 17 - relay_host: SmolStr, 17 + relay_host: Url, 18 18 http: reqwest::Client, 19 19 } 20 20 21 21 impl Crawler { 22 - pub fn new(state: Arc<AppState>, relay_host: SmolStr) -> Self { 22 + pub fn new(state: Arc<AppState>, relay_host: Url) -> Self { 23 23 Self { 24 24 state, 25 25 relay_host, ··· 32 32 33 33 let db = &self.state.db; 34 34 35 - let relay_url = Url::parse(&self.relay_host).into_diagnostic()?; 36 - 37 35 // 1. load cursor 38 36 let cursor_key = b"crawler_cursor"; 39 37 let mut cursor: Option<SmolStr> = ··· 52 50 .maybe_cursor(cursor.clone().map(|c| CowStr::from(c.to_string()))) 53 51 .build(); 54 52 55 - let res_result = self.http.xrpc(relay_url.clone()).send(&req).await; 53 + let res_result = self.http.xrpc(self.relay_host.clone()).send(&req).await; 56 54 57 55 let output: ListReposOutput = match res_result { 58 56 Ok(res) => res.into_output().into_diagnostic()?,
+4 -7
src/ingest/mod.rs
··· 4 4 use jacquard::types::did::Did; 5 5 use jacquard_common::xrpc::{SubscriptionClient, TungsteniteSubscriptionClient}; 6 6 use jacquard_common::IntoStatic; 7 - use miette::{IntoDiagnostic, Result}; 7 + use miette::Result; 8 8 use n0_future::StreamExt; 9 - use smol_str::SmolStr; 10 9 use std::sync::atomic::Ordering; 11 10 use std::sync::Arc; 12 11 use tracing::{debug, error, info}; ··· 14 13 15 14 pub struct Ingestor { 16 15 state: Arc<AppState>, 17 - relay_host: SmolStr, 16 + relay_host: Url, 18 17 full_network: bool, 19 18 } 20 19 21 20 impl Ingestor { 22 - pub fn new(state: Arc<AppState>, relay_host: SmolStr, full_network: bool) -> Self { 21 + pub fn new(state: Arc<AppState>, relay_host: Url, full_network: bool) -> Self { 23 22 Self { 24 23 state, 25 24 relay_host, ··· 28 27 } 29 28 30 29 pub async fn run(mut self) -> Result<()> { 31 - let base_url = Url::parse(&self.relay_host).into_diagnostic()?; 32 - 33 30 loop { 34 31 // 1. load cursor 35 32 let current_cursor = self.state.cur_firehose.load(Ordering::SeqCst); ··· 54 51 } 55 52 56 53 // 2. connect 57 - let client = TungsteniteSubscriptionClient::from_base_uri(base_url.clone()); 54 + let client = TungsteniteSubscriptionClient::from_base_uri(self.relay_host.clone()); 58 55 let params = if let Some(c) = start_cursor { 59 56 SubscribeRepos::new().cursor(c).build() 60 57 } else {
+10 -11
src/main.rs
··· 35 35 let env_filter = tracing_subscriber::EnvFilter::new(&cfg.log_level); 36 36 tracing_subscriber::fmt().with_env_filter(env_filter).init(); 37 37 38 - info!("starting hydrant with config: {cfg:#?}"); 38 + info!("{cfg}"); 39 39 40 40 let (state, backfill_rx, buffer_rx) = AppState::new(&cfg)?; 41 41 let state = Arc::new(state); ··· 147 147 }); 148 148 149 149 if cfg.full_network { 150 - tokio::spawn({ 151 - let state = state.clone(); 152 - let crawler_host = cfg.relay_host.clone(); 153 - 154 - Crawler::new(state, crawler_host).run().inspect_err(|e| { 155 - error!("crawler died: {e}"); 156 - Db::check_poisoned_report(&e); 157 - }) 158 - }); 150 + tokio::spawn( 151 + Crawler::new(state.clone(), cfg.relay_host.clone()) 152 + .run() 153 + .inspect_err(|e| { 154 + error!("crawler died: {e}"); 155 + Db::check_poisoned_report(&e); 156 + }), 157 + ); 159 158 } 160 159 161 - let ingestor = Ingestor::new(state.clone(), cfg.relay_host.clone(), cfg.full_network); 160 + let ingestor = Ingestor::new(state.clone(), cfg.relay_host, cfg.full_network); 162 161 163 162 let res = futures::future::try_join_all::<[BoxFuture<_>; _]>([ 164 163 Box::pin(buffer_processor_task.map(|r| r.into_diagnostic().flatten())),