at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall
at ffa704fdf0884cad263253b2cc44d87c8fb5f4fe 192 lines 6.0 kB view raw
1use std::ops::Not; 2use std::sync::Arc; 3use std::sync::atomic::{AtomicUsize, Ordering}; 4use std::time::Duration; 5 6use jacquard_common::IntoStatic; 7use jacquard_common::types::crypto::PublicKey; 8use jacquard_common::types::ident::AtIdentifier; 9use jacquard_common::types::string::Did; 10use jacquard_common::types::string::Handle; 11use jacquard_identity::JacquardResolver; 12use jacquard_identity::resolver::{ 13 IdentityError, IdentityErrorKind, IdentityResolver, PlcSource, ResolverOptions, 14}; 15use miette::{Diagnostic, IntoDiagnostic}; 16use reqwest::StatusCode; 17use scc::HashCache; 18use smol_str::SmolStr; 19use thiserror::Error; 20use url::Url; 21 22#[derive(Debug, Diagnostic, Error)] 23pub enum ResolverError { 24 #[error("{0}")] 25 Generic(miette::Report), 26 #[error("too many requests")] 27 Ratelimited, 28 #[error("transport error: {0}")] 29 Transport(SmolStr), 30} 31 32impl From<IdentityError> for ResolverError { 33 fn from(e: IdentityError) -> Self { 34 match e.kind() { 35 IdentityErrorKind::HttpStatus(reqwest::StatusCode::TOO_MANY_REQUESTS) => { 36 Self::Ratelimited 37 } 38 IdentityErrorKind::Transport(msg) => Self::Transport(msg.clone()), 39 _ => Self::Generic(e.into()), 40 } 41 } 42} 43 44impl From<miette::Report> for ResolverError { 45 fn from(report: miette::Report) -> Self { 46 ResolverError::Generic(report) 47 } 48} 49 50#[derive(Clone)] 51struct MiniDoc { 52 pds: Url, 53 handle: Option<Handle<'static>>, 54 key: Option<PublicKey<'static>>, 55} 56 57struct ResolverInner { 58 jacquards: Vec<JacquardResolver>, 59 next_idx: AtomicUsize, 60 cache: HashCache<Did<'static>, MiniDoc>, 61} 62 63#[derive(Clone)] 64pub struct Resolver { 65 inner: Arc<ResolverInner>, 66} 67 68impl Resolver { 69 pub fn new(plc_urls: Vec<Url>, identity_cache_size: u64) -> Self { 70 let http = reqwest::Client::new(); 71 let mut jacquards = Vec::with_capacity(plc_urls.len()); 72 73 for url in plc_urls { 74 let mut opts = ResolverOptions::default(); 75 opts.plc_source = PlcSource::PlcDirectory { base: url }; 76 opts.request_timeout = Some(Duration::from_secs(3)); 77 78 jacquards.push(JacquardResolver::new(http.clone(), opts).with_system_dns()); 79 } 80 81 if jacquards.is_empty() { 82 panic!("at least one PLC URL must be provided"); 83 } 84 85 Self { 86 inner: Arc::new(ResolverInner { 87 jacquards, 88 next_idx: AtomicUsize::new(0), 89 cache: HashCache::with_capacity( 90 std::cmp::min(1000, (identity_cache_size / 100) as usize), 91 identity_cache_size as usize, 92 ), 93 }), 94 } 95 } 96 97 async fn req<'r, T, Fut>( 98 &'r self, 99 is_plc: bool, 100 f: impl Fn(&'r JacquardResolver) -> Fut, 101 ) -> Result<T, ResolverError> 102 where 103 Fut: Future<Output = Result<T, IdentityError>>, 104 { 105 let mut idx = 106 self.inner.next_idx.fetch_add(1, Ordering::Relaxed) % self.inner.jacquards.len(); 107 let mut try_count = 0; 108 loop { 109 let res = f(&self.inner.jacquards[idx]).await; 110 try_count += 1; 111 // retry these with the different plc resolvers 112 if is_plc { 113 let is_retriable = matches!( 114 res.as_ref().map_err(|e| e.kind()), 115 Err(IdentityErrorKind::HttpStatus(StatusCode::TOO_MANY_REQUESTS) 116 | IdentityErrorKind::Transport(_)) 117 ); 118 // check if retriable and we haven't gone through all the plc resolvers 119 if is_retriable && try_count < self.inner.jacquards.len() { 120 idx = (idx + 1) % self.inner.jacquards.len(); 121 continue; 122 } 123 } 124 return res.map_err(Into::into); 125 } 126 } 127 128 pub async fn resolve_did( 129 &self, 130 identifier: &AtIdentifier<'_>, 131 ) -> Result<Did<'static>, ResolverError> { 132 match identifier { 133 AtIdentifier::Did(did) => Ok(did.clone().into_static()), 134 AtIdentifier::Handle(handle) => { 135 let did = self.req(false, |j| j.resolve_handle(handle)).await?; 136 Ok(did.into_static()) 137 } 138 } 139 } 140 141 #[inline] 142 async fn resolve_doc(&self, did: &Did<'_>) -> Result<MiniDoc, ResolverError> { 143 let did_static = did.clone().into_static(); 144 if let Some(entry) = self.inner.cache.get_async(&did_static).await { 145 return Ok(entry.get().clone()); 146 } 147 148 let doc_resp = self 149 .req(did.starts_with("did:plc:"), |j| j.resolve_did_doc(did)) 150 .await?; 151 let doc = doc_resp.parse()?; 152 153 let pds = doc 154 .pds_endpoint() 155 .ok_or_else(|| miette::miette!("no PDS service found in DID Doc for {did}"))?; 156 157 let mut handles = doc.handles(); 158 let handle = handles 159 .is_empty() 160 .not() 161 .then(|| handles.remove(0).into_static()); 162 let key = doc.atproto_public_key().ok().flatten(); 163 164 let mini = MiniDoc { pds, handle, key }; 165 let _ = self.inner.cache.put_async(did_static, mini.clone()).await; 166 Ok(mini) 167 } 168 169 pub async fn resolve_identity_info( 170 &self, 171 did: &Did<'_>, 172 ) -> Result<(Url, Option<Handle<'static>>), ResolverError> { 173 let mini = self.resolve_doc(did).await?; 174 Ok((mini.pds, mini.handle)) 175 } 176 177 pub async fn resolve_signing_key( 178 &self, 179 did: &Did<'_>, 180 ) -> Result<PublicKey<'static>, ResolverError> { 181 let did = did.clone().into_static(); 182 let mini = self.resolve_doc(&did).await?; 183 Ok(mini 184 .key 185 .ok_or_else(|| NoSigningKeyError(did)) 186 .into_diagnostic()?) 187 } 188} 189 190#[derive(Debug, Diagnostic, Error)] 191#[error("no atproto signing key in DID doc for {0}")] 192pub struct NoSigningKeyError(Did<'static>);