at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall
at 38c5d8b26077410afd982b90fa85bfd42190a7cb 202 lines 6.3 kB view raw
1use std::ops::Not; 2use std::sync::Arc; 3use std::sync::atomic::{AtomicUsize, Ordering}; 4use std::time::Duration; 5 6use jacquard_common::IntoStatic; 7use jacquard_common::types::crypto::PublicKey; 8use jacquard_common::types::ident::AtIdentifier; 9use jacquard_common::types::string::Did; 10use jacquard_common::types::string::Handle; 11use jacquard_identity::JacquardResolver; 12use jacquard_identity::resolver::{ 13 IdentityError, IdentityErrorKind, IdentityResolver, PlcSource, ResolverOptions, 14}; 15use miette::{Diagnostic, IntoDiagnostic}; 16use reqwest::StatusCode; 17use scc::HashCache; 18use smol_str::SmolStr; 19use thiserror::Error; 20use url::Url; 21 22#[derive(Debug, Diagnostic, Error)] 23pub enum ResolverError { 24 #[error("{0}")] 25 Generic(miette::Report), 26 #[error("too many requests")] 27 Ratelimited, 28 #[error("transport error: {0}")] 29 Transport(SmolStr), 30} 31 32impl From<IdentityError> for ResolverError { 33 fn from(e: IdentityError) -> Self { 34 match e.kind() { 35 IdentityErrorKind::HttpStatus(reqwest::StatusCode::TOO_MANY_REQUESTS) => { 36 Self::Ratelimited 37 } 38 IdentityErrorKind::Transport(msg) => Self::Transport(msg.clone()), 39 _ => Self::Generic(e.into()), 40 } 41 } 42} 43 44impl From<miette::Report> for ResolverError { 45 fn from(report: miette::Report) -> Self { 46 ResolverError::Generic(report) 47 } 48} 49 50#[derive(Clone)] 51pub struct MiniDoc { 52 pub pds: Url, 53 pub handle: Option<Handle<'static>>, 54 pub key: Option<PublicKey<'static>>, 55} 56 57struct ResolverInner { 58 jacquards: Vec<JacquardResolver>, 59 next_idx: AtomicUsize, 60 cache: HashCache<Did<'static>, MiniDoc>, 61} 62 63#[derive(Clone)] 64pub struct Resolver { 65 inner: Arc<ResolverInner>, 66} 67 68impl Resolver { 69 pub fn new(plc_urls: Vec<Url>, identity_cache_size: u64) -> Self { 70 let http = reqwest::Client::new(); 71 let mut jacquards = Vec::with_capacity(plc_urls.len()); 72 73 for url in plc_urls { 74 let mut opts = ResolverOptions::default(); 75 opts.plc_source = PlcSource::PlcDirectory { base: url }; 76 opts.request_timeout = Some(Duration::from_secs(3)); 77 78 jacquards.push(JacquardResolver::new(http.clone(), opts).with_system_dns()); 79 } 80 81 if jacquards.is_empty() { 82 panic!("at least one PLC URL must be provided"); 83 } 84 85 Self { 86 inner: Arc::new(ResolverInner { 87 jacquards, 88 next_idx: AtomicUsize::new(0), 89 cache: HashCache::with_capacity( 90 std::cmp::min(1000, (identity_cache_size / 100) as usize), 91 identity_cache_size as usize, 92 ), 93 }), 94 } 95 } 96 97 pub async fn invalidate(&self, did: &Did<'_>) { 98 self.inner 99 .cache 100 .remove_async(&did.clone().into_static()) 101 .await; 102 } 103 104 pub fn invalidate_sync(&self, did: &Did<'_>) { 105 self.inner.cache.remove_sync(&did.clone().into_static()); 106 } 107 108 async fn req<'r, T, Fut>( 109 &'r self, 110 is_plc: bool, 111 f: impl Fn(&'r JacquardResolver) -> Fut, 112 ) -> Result<T, ResolverError> 113 where 114 Fut: Future<Output = Result<T, IdentityError>>, 115 { 116 let mut idx = 117 self.inner.next_idx.fetch_add(1, Ordering::Relaxed) % self.inner.jacquards.len(); 118 let mut try_count = 0; 119 loop { 120 let res = f(&self.inner.jacquards[idx]).await; 121 try_count += 1; 122 // retry these with the different plc resolvers 123 if is_plc { 124 let is_retriable = matches!( 125 res.as_ref().map_err(|e| e.kind()), 126 Err(IdentityErrorKind::HttpStatus(StatusCode::TOO_MANY_REQUESTS) 127 | IdentityErrorKind::Transport(_)) 128 ); 129 // check if retriable and we haven't gone through all the plc resolvers 130 if is_retriable && try_count < self.inner.jacquards.len() { 131 idx = (idx + 1) % self.inner.jacquards.len(); 132 continue; 133 } 134 } 135 return res.map_err(Into::into); 136 } 137 } 138 139 pub async fn resolve_did( 140 &self, 141 identifier: &AtIdentifier<'_>, 142 ) -> Result<Did<'static>, ResolverError> { 143 match identifier { 144 AtIdentifier::Did(did) => Ok(did.clone().into_static()), 145 AtIdentifier::Handle(handle) => { 146 let did = self.req(false, |j| j.resolve_handle(handle)).await?; 147 Ok(did.into_static()) 148 } 149 } 150 } 151 152 pub async fn resolve_doc(&self, did: &Did<'_>) -> Result<MiniDoc, ResolverError> { 153 let did_static = did.clone().into_static(); 154 if let Some(entry) = self.inner.cache.get_async(&did_static).await { 155 return Ok(entry.get().clone()); 156 } 157 158 let doc_resp = self 159 .req(did.starts_with("did:plc:"), |j| j.resolve_did_doc(did)) 160 .await?; 161 let doc = doc_resp.parse()?; 162 163 let pds = doc 164 .pds_endpoint() 165 .ok_or_else(|| miette::miette!("no PDS service found in DID Doc for {did}"))?; 166 167 let mut handles = doc.handles(); 168 let handle = handles 169 .is_empty() 170 .not() 171 .then(|| handles.remove(0).into_static()); 172 let key = doc.atproto_public_key().ok().flatten(); 173 174 let mini = MiniDoc { pds, handle, key }; 175 let _ = self.inner.cache.put_async(did_static, mini.clone()).await; 176 Ok(mini) 177 } 178 179 pub async fn resolve_identity_info( 180 &self, 181 did: &Did<'_>, 182 ) -> Result<(Url, Option<Handle<'static>>), ResolverError> { 183 let mini = self.resolve_doc(did).await?; 184 Ok((mini.pds, mini.handle)) 185 } 186 187 pub async fn resolve_signing_key( 188 &self, 189 did: &Did<'_>, 190 ) -> Result<PublicKey<'static>, ResolverError> { 191 let did = did.clone().into_static(); 192 let mini = self.resolve_doc(&did).await?; 193 Ok(mini 194 .key 195 .ok_or_else(|| NoSigningKeyError(did)) 196 .into_diagnostic()?) 197 } 198} 199 200#[derive(Debug, Diagnostic, Error)] 201#[error("no atproto signing key in DID doc for {0}")] 202pub struct NoSigningKeyError(Did<'static>);