at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
1use std::ops::Not;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicUsize, Ordering};
4use std::time::Duration;
5
6use jacquard_common::IntoStatic;
7use jacquard_common::types::crypto::PublicKey;
8use jacquard_common::types::ident::AtIdentifier;
9use jacquard_common::types::string::Did;
10use jacquard_common::types::string::Handle;
11use jacquard_identity::JacquardResolver;
12use jacquard_identity::resolver::{
13 IdentityError, IdentityErrorKind, IdentityResolver, PlcSource, ResolverOptions,
14};
15use miette::{Diagnostic, IntoDiagnostic};
16use reqwest::StatusCode;
17use scc::HashCache;
18use smol_str::SmolStr;
19use thiserror::Error;
20use url::Url;
21
22#[derive(Debug, Diagnostic, Error)]
23pub enum ResolverError {
24 #[error("{0}")]
25 Generic(miette::Report),
26 #[error("too many requests")]
27 Ratelimited,
28 #[error("transport error: {0}")]
29 Transport(SmolStr),
30}
31
32impl From<IdentityError> for ResolverError {
33 fn from(e: IdentityError) -> Self {
34 match e.kind() {
35 IdentityErrorKind::HttpStatus(reqwest::StatusCode::TOO_MANY_REQUESTS) => {
36 Self::Ratelimited
37 }
38 IdentityErrorKind::Transport(msg) => Self::Transport(msg.clone()),
39 _ => Self::Generic(e.into()),
40 }
41 }
42}
43
44impl From<miette::Report> for ResolverError {
45 fn from(report: miette::Report) -> Self {
46 ResolverError::Generic(report)
47 }
48}
49
50#[derive(Clone)]
51struct MiniDoc {
52 pds: Url,
53 handle: Option<Handle<'static>>,
54 key: Option<PublicKey<'static>>,
55}
56
57struct ResolverInner {
58 jacquards: Vec<JacquardResolver>,
59 next_idx: AtomicUsize,
60 cache: HashCache<Did<'static>, MiniDoc>,
61}
62
63#[derive(Clone)]
64pub struct Resolver {
65 inner: Arc<ResolverInner>,
66}
67
68impl Resolver {
69 pub fn new(plc_urls: Vec<Url>, identity_cache_size: u64) -> Self {
70 let http = reqwest::Client::new();
71 let mut jacquards = Vec::with_capacity(plc_urls.len());
72
73 for url in plc_urls {
74 let mut opts = ResolverOptions::default();
75 opts.plc_source = PlcSource::PlcDirectory { base: url };
76 opts.request_timeout = Some(Duration::from_secs(3));
77
78 jacquards.push(JacquardResolver::new(http.clone(), opts).with_system_dns());
79 }
80
81 if jacquards.is_empty() {
82 panic!("at least one PLC URL must be provided");
83 }
84
85 Self {
86 inner: Arc::new(ResolverInner {
87 jacquards,
88 next_idx: AtomicUsize::new(0),
89 cache: HashCache::with_capacity(
90 std::cmp::min(1000, (identity_cache_size / 100) as usize),
91 identity_cache_size as usize,
92 ),
93 }),
94 }
95 }
96
97 async fn req<'r, T, Fut>(
98 &'r self,
99 is_plc: bool,
100 f: impl Fn(&'r JacquardResolver) -> Fut,
101 ) -> Result<T, ResolverError>
102 where
103 Fut: Future<Output = Result<T, IdentityError>>,
104 {
105 let mut idx =
106 self.inner.next_idx.fetch_add(1, Ordering::Relaxed) % self.inner.jacquards.len();
107 let mut try_count = 0;
108 loop {
109 let res = f(&self.inner.jacquards[idx]).await;
110 try_count += 1;
111 // retry these with the different plc resolvers
112 if is_plc {
113 let is_retriable = matches!(
114 res.as_ref().map_err(|e| e.kind()),
115 Err(IdentityErrorKind::HttpStatus(StatusCode::TOO_MANY_REQUESTS)
116 | IdentityErrorKind::Transport(_))
117 );
118 // check if retriable and we haven't gone through all the plc resolvers
119 if is_retriable && try_count < self.inner.jacquards.len() {
120 idx = (idx + 1) % self.inner.jacquards.len();
121 continue;
122 }
123 }
124 return res.map_err(Into::into);
125 }
126 }
127
128 pub async fn resolve_did(
129 &self,
130 identifier: &AtIdentifier<'_>,
131 ) -> Result<Did<'static>, ResolverError> {
132 match identifier {
133 AtIdentifier::Did(did) => Ok(did.clone().into_static()),
134 AtIdentifier::Handle(handle) => {
135 let did = self.req(false, |j| j.resolve_handle(handle)).await?;
136 Ok(did.into_static())
137 }
138 }
139 }
140
141 #[inline]
142 async fn resolve_doc(&self, did: &Did<'_>) -> Result<MiniDoc, ResolverError> {
143 let did_static = did.clone().into_static();
144 if let Some(entry) = self.inner.cache.get_async(&did_static).await {
145 return Ok(entry.get().clone());
146 }
147
148 let doc_resp = self
149 .req(did.starts_with("did:plc:"), |j| j.resolve_did_doc(did))
150 .await?;
151 let doc = doc_resp.parse()?;
152
153 let pds = doc
154 .pds_endpoint()
155 .ok_or_else(|| miette::miette!("no PDS service found in DID Doc for {did}"))?;
156
157 let mut handles = doc.handles();
158 let handle = handles
159 .is_empty()
160 .not()
161 .then(|| handles.remove(0).into_static());
162 let key = doc.atproto_public_key().ok().flatten();
163
164 let mini = MiniDoc { pds, handle, key };
165 let _ = self.inner.cache.put_async(did_static, mini.clone()).await;
166 Ok(mini)
167 }
168
169 pub async fn resolve_identity_info(
170 &self,
171 did: &Did<'_>,
172 ) -> Result<(Url, Option<Handle<'static>>), ResolverError> {
173 let mini = self.resolve_doc(did).await?;
174 Ok((mini.pds, mini.handle))
175 }
176
177 pub async fn resolve_signing_key(
178 &self,
179 did: &Did<'_>,
180 ) -> Result<PublicKey<'static>, ResolverError> {
181 let did = did.clone().into_static();
182 let mini = self.resolve_doc(&did).await?;
183 Ok(mini
184 .key
185 .ok_or_else(|| NoSigningKeyError(did))
186 .into_diagnostic()?)
187 }
188}
189
190#[derive(Debug, Diagnostic, Error)]
191#[error("no atproto signing key in DID doc for {0}")]
192pub struct NoSigningKeyError(Did<'static>);