at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall

[resolver] support multiple PLC URLs

ptr.pet df488e88 5ffd5f9a

verified
+41 -17
+1 -1
src/bin/mst_dump.rs
··· 33 33 34 34 // Init resolver 35 35 let plc_url = Url::parse("https://plc.directory").into_diagnostic()?; 36 - let resolver = Resolver::new(plc_url, 100); 36 + let resolver = Resolver::new(vec![plc_url], 100); 37 37 38 38 // Resolve identity 39 39 info!("Resolving {}...", identifier_str);
+12 -4
src/config.rs
··· 39 39 pub struct Config { 40 40 pub database_path: PathBuf, 41 41 pub relay_host: Url, 42 - pub plc_url: Url, 42 + pub plc_urls: Vec<Url>, 43 43 pub full_network: bool, 44 44 pub cursor_save_interval: Duration, 45 45 pub repo_fetch_timeout: Duration, ··· 84 84 "RELAY_HOST", 85 85 Url::parse("wss://relay.fire.hose.cam").unwrap() 86 86 ); 87 - let plc_url = cfg!("PLC_URL", Url::parse("https://plc.wtf").unwrap()); 87 + let plc_urls: Vec<Url> = std::env::var("HYDRANT_PLC_URL") 88 + .ok() 89 + .map(|s| { 90 + s.split(',') 91 + .map(|s| Url::parse(s.trim())) 92 + .collect::<Result<Vec<_>, _>>() 93 + .map_err(|e| miette::miette!("invalid PLC URL: {}", e)) 94 + }) 95 + .unwrap_or_else(|| Ok(vec![Url::parse("https://plc.wtf").unwrap()]))?; 88 96 89 97 let full_network = cfg!("FULL_NETWORK", false); 90 98 let backfill_concurrency_limit = cfg!("BACKFILL_CONCURRENCY_LIMIT", 32usize); ··· 107 115 Ok(Self { 108 116 database_path, 109 117 relay_host, 110 - plc_url, 118 + plc_urls, 111 119 full_network, 112 120 cursor_save_interval, 113 121 repo_fetch_timeout, ··· 132 140 writeln!(f, "hydrant configuration:")?; 133 141 writeln!(f, " log level: {}", self.log_level)?; 134 142 writeln!(f, " relay host: {}", self.relay_host)?; 135 - writeln!(f, " plc url: {}", self.plc_url)?; 143 + writeln!(f, " plc urls: {:?}", self.plc_urls)?; 136 144 writeln!(f, " full network indexing: {}", self.full_network)?; 137 145 writeln!(f, " verify signatures: {}", self.verify_signatures)?; 138 146 writeln!(
+27 -11
src/resolver.rs
··· 1 1 use std::ops::Not; 2 2 use std::sync::Arc; 3 + use std::sync::atomic::{AtomicUsize, Ordering}; 3 4 use std::time::Duration; 4 5 5 6 use jacquard::IntoStatic; ··· 46 47 } 47 48 48 49 struct ResolverInner { 49 - jacquard: JacquardResolver, 50 + jacquards: Vec<JacquardResolver>, 51 + next_idx: AtomicUsize, 50 52 key_cache: HashCache<Did<'static>, PublicKey<'static>>, 51 53 } 52 54 ··· 56 58 } 57 59 58 60 impl Resolver { 59 - pub fn new(plc_url: Url, identity_cache_size: u64) -> Self { 61 + pub fn new(plc_urls: Vec<Url>, identity_cache_size: u64) -> Self { 60 62 let http = reqwest::Client::new(); 61 - let mut opts = ResolverOptions::default(); 62 - opts.plc_source = PlcSource::PlcDirectory { base: plc_url }; 63 - opts.request_timeout = Some(Duration::from_secs(3)); 63 + let mut jacquards = Vec::with_capacity(plc_urls.len()); 64 64 65 - // no jacquard cache - we manage our own 66 - let jacquard = JacquardResolver::new(http, opts); 65 + for url in plc_urls { 66 + let mut opts = ResolverOptions::default(); 67 + opts.plc_source = PlcSource::PlcDirectory { base: url }; 68 + opts.request_timeout = Some(Duration::from_secs(3)); 69 + 70 + // no jacquard cache - we manage our own 71 + jacquards.push(JacquardResolver::new(http.clone(), opts)); 72 + } 73 + 74 + if jacquards.is_empty() { 75 + panic!("at least one PLC URL must be provided"); 76 + } 67 77 68 78 Self { 69 79 inner: Arc::new(ResolverInner { 70 - jacquard, 80 + jacquards, 81 + next_idx: AtomicUsize::new(0), 71 82 key_cache: HashCache::with_capacity( 72 83 std::cmp::min(1000, (identity_cache_size / 100) as usize), 73 84 identity_cache_size as usize, ··· 76 87 } 77 88 } 78 89 90 + fn get_jacquard(&self) -> &JacquardResolver { 91 + let idx = self.inner.next_idx.fetch_add(1, Ordering::Relaxed) % self.inner.jacquards.len(); 92 + &self.inner.jacquards[idx] 93 + } 94 + 79 95 pub async fn resolve_did( 80 96 &self, 81 97 identifier: &AtIdentifier<'_>, ··· 83 99 match identifier { 84 100 AtIdentifier::Did(did) => Ok(did.clone().into_static()), 85 101 AtIdentifier::Handle(handle) => { 86 - let did = self.inner.jacquard.resolve_handle(handle).await?; 102 + let did = self.get_jacquard().resolve_handle(handle).await?; 87 103 Ok(did.into_static()) 88 104 } 89 105 } ··· 93 109 &self, 94 110 did: &Did<'_>, 95 111 ) -> Result<(Url, Option<Handle<'_>>), ResolverError> { 96 - let doc_resp = self.inner.jacquard.resolve_did_doc(did).await?; 112 + let doc_resp = self.get_jacquard().resolve_did_doc(did).await?; 97 113 let doc = doc_resp.parse()?; 98 114 99 115 let pds = doc ··· 115 131 return Ok(entry.get().clone()); 116 132 } 117 133 118 - let doc_resp = self.inner.jacquard.resolve_did_doc(&did).await?; 134 + let doc_resp = self.get_jacquard().resolve_did_doc(&did).await?; 119 135 let doc = doc_resp.parse()?; 120 136 121 137 let key = doc
+1 -1
src/state.rs
··· 19 19 config.cache_size, 20 20 config.disable_lz4_compression, 21 21 )?; 22 - let resolver = Resolver::new(config.plc_url.clone(), config.identity_cache_size); 22 + let resolver = Resolver::new(config.plc_urls.clone(), config.identity_cache_size); 23 23 24 24 Ok(Self { 25 25 db,