at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall
at 25e6d3c7400a40dfbadcae56258603ec3353c476 182 lines 5.7 kB view raw
1use std::ops::Not; 2use std::sync::Arc; 3use std::sync::atomic::{AtomicUsize, Ordering}; 4use std::time::Duration; 5 6use jacquard::IntoStatic; 7use jacquard::types::string::Handle; 8use jacquard_common::types::crypto::PublicKey; 9use jacquard_common::types::ident::AtIdentifier; 10use jacquard_common::types::string::Did; 11use jacquard_identity::JacquardResolver; 12use jacquard_identity::resolver::{ 13 IdentityError, IdentityErrorKind, IdentityResolver, PlcSource, ResolverOptions, 14}; 15use miette::{Diagnostic, IntoDiagnostic}; 16use reqwest::StatusCode; 17use scc::HashCache; 18use smol_str::SmolStr; 19use thiserror::Error; 20use url::Url; 21 22#[derive(Debug, Diagnostic, Error)] 23pub enum ResolverError { 24 #[error("{0}")] 25 Generic(miette::Report), 26 #[error("too many requests")] 27 Ratelimited, 28 #[error("transport error: {0}")] 29 Transport(SmolStr), 30} 31 32impl From<IdentityError> for ResolverError { 33 fn from(e: IdentityError) -> Self { 34 match e.kind() { 35 IdentityErrorKind::HttpStatus(reqwest::StatusCode::TOO_MANY_REQUESTS) => { 36 Self::Ratelimited 37 } 38 IdentityErrorKind::Transport(msg) => Self::Transport(msg.clone()), 39 _ => Self::Generic(e.into()), 40 } 41 } 42} 43 44impl From<miette::Report> for ResolverError { 45 fn from(report: miette::Report) -> Self { 46 ResolverError::Generic(report) 47 } 48} 49 50struct ResolverInner { 51 jacquards: Vec<JacquardResolver>, 52 next_idx: AtomicUsize, 53 key_cache: HashCache<Did<'static>, PublicKey<'static>>, 54} 55 56#[derive(Clone)] 57pub struct Resolver { 58 inner: Arc<ResolverInner>, 59} 60 61impl Resolver { 62 pub fn new(plc_urls: Vec<Url>, identity_cache_size: u64) -> Self { 63 let http = reqwest::Client::new(); 64 let mut jacquards = Vec::with_capacity(plc_urls.len()); 65 66 for url in plc_urls { 67 let mut opts = ResolverOptions::default(); 68 opts.plc_source = PlcSource::PlcDirectory { base: url }; 69 opts.request_timeout = Some(Duration::from_secs(3)); 70 71 // no jacquard cache - we manage our own 72 jacquards.push(JacquardResolver::new(http.clone(), opts)); 73 } 74 75 if jacquards.is_empty() { 76 panic!("at least one PLC URL must be provided"); 77 } 78 79 Self { 80 inner: Arc::new(ResolverInner { 81 jacquards, 82 next_idx: AtomicUsize::new(0), 83 key_cache: HashCache::with_capacity( 84 std::cmp::min(1000, (identity_cache_size / 100) as usize), 85 identity_cache_size as usize, 86 ), 87 }), 88 } 89 } 90 91 async fn req<'r, T, Fut>( 92 &'r self, 93 is_plc: bool, 94 f: impl Fn(&'r JacquardResolver) -> Fut, 95 ) -> Result<T, ResolverError> 96 where 97 Fut: Future<Output = Result<T, IdentityError>>, 98 { 99 let mut idx = 100 self.inner.next_idx.fetch_add(1, Ordering::Relaxed) % self.inner.jacquards.len(); 101 let mut try_count = 0; 102 loop { 103 let res = f(&self.inner.jacquards[idx]).await; 104 try_count += 1; 105 // retry these with the different plc resolvers 106 if is_plc { 107 let is_retriable = matches!( 108 res.as_ref().map_err(|e| e.kind()), 109 Err(IdentityErrorKind::HttpStatus(StatusCode::TOO_MANY_REQUESTS) 110 | IdentityErrorKind::Transport(_)) 111 ); 112 // check if retriable and we haven't gone through all the plc resolvers 113 if is_retriable && try_count < self.inner.jacquards.len() { 114 idx = (idx + 1) % self.inner.jacquards.len(); 115 continue; 116 } 117 } 118 return res.map_err(Into::into); 119 } 120 } 121 122 pub async fn resolve_did( 123 &self, 124 identifier: &AtIdentifier<'_>, 125 ) -> Result<Did<'static>, ResolverError> { 126 match identifier { 127 AtIdentifier::Did(did) => Ok(did.clone().into_static()), 128 AtIdentifier::Handle(handle) => { 129 let did = self.req(false, |j| j.resolve_handle(handle)).await?; 130 Ok(did.into_static()) 131 } 132 } 133 } 134 135 pub async fn resolve_identity_info( 136 &self, 137 did: &Did<'_>, 138 ) -> Result<(Url, Option<Handle<'_>>), ResolverError> { 139 let doc_resp = self 140 .req(did.starts_with("did:plc:"), |j| j.resolve_did_doc(did)) 141 .await?; 142 let doc = doc_resp.parse()?; 143 144 let pds = doc 145 .pds_endpoint() 146 .ok_or_else(|| miette::miette!("no PDS service found in DID Doc for {did}"))?; 147 148 let mut handles = doc.handles(); 149 let handle = handles.is_empty().not().then(|| handles.remove(0)); 150 151 Ok((pds, handle)) 152 } 153 154 pub async fn resolve_signing_key( 155 &self, 156 did: &Did<'_>, 157 ) -> Result<PublicKey<'static>, ResolverError> { 158 let did = did.clone().into_static(); 159 if let Some(entry) = self.inner.key_cache.get_async(&did).await { 160 return Ok(entry.get().clone()); 161 } 162 163 let doc_resp = self 164 .req(did.starts_with("did:plc:"), |j| j.resolve_did_doc(&did)) 165 .await?; 166 let doc = doc_resp.parse()?; 167 168 let key = doc 169 .atproto_public_key() 170 .into_diagnostic()? 171 .ok_or_else(|| NoSigningKeyError(did.clone())) 172 .into_diagnostic()?; 173 174 let _ = self.inner.key_cache.put_async(did, key.clone()).await; 175 176 Ok(key) 177 } 178} 179 180#[derive(Debug, Diagnostic, Error)] 181#[error("no atproto signing key in DID doc for {0}")] 182pub struct NoSigningKeyError(Did<'static>);