at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
1use std::ops::Not;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicUsize, Ordering};
4use std::time::Duration;
5
6use jacquard::IntoStatic;
7use jacquard::types::string::Handle;
8use jacquard_common::types::crypto::PublicKey;
9use jacquard_common::types::ident::AtIdentifier;
10use jacquard_common::types::string::Did;
11use jacquard_identity::JacquardResolver;
12use jacquard_identity::resolver::{
13 IdentityError, IdentityErrorKind, IdentityResolver, PlcSource, ResolverOptions,
14};
15use miette::{Diagnostic, IntoDiagnostic};
16use reqwest::StatusCode;
17use scc::HashCache;
18use smol_str::SmolStr;
19use thiserror::Error;
20use url::Url;
21
22#[derive(Debug, Diagnostic, Error)]
23pub enum ResolverError {
24 #[error("{0}")]
25 Generic(miette::Report),
26 #[error("too many requests")]
27 Ratelimited,
28 #[error("transport error: {0}")]
29 Transport(SmolStr),
30}
31
32impl From<IdentityError> for ResolverError {
33 fn from(e: IdentityError) -> Self {
34 match e.kind() {
35 IdentityErrorKind::HttpStatus(reqwest::StatusCode::TOO_MANY_REQUESTS) => {
36 Self::Ratelimited
37 }
38 IdentityErrorKind::Transport(msg) => Self::Transport(msg.clone()),
39 _ => Self::Generic(e.into()),
40 }
41 }
42}
43
44impl From<miette::Report> for ResolverError {
45 fn from(report: miette::Report) -> Self {
46 ResolverError::Generic(report)
47 }
48}
49
50struct ResolverInner {
51 jacquards: Vec<JacquardResolver>,
52 next_idx: AtomicUsize,
53 key_cache: HashCache<Did<'static>, PublicKey<'static>>,
54}
55
56#[derive(Clone)]
57pub struct Resolver {
58 inner: Arc<ResolverInner>,
59}
60
61impl Resolver {
62 pub fn new(plc_urls: Vec<Url>, identity_cache_size: u64) -> Self {
63 let http = reqwest::Client::new();
64 let mut jacquards = Vec::with_capacity(plc_urls.len());
65
66 for url in plc_urls {
67 let mut opts = ResolverOptions::default();
68 opts.plc_source = PlcSource::PlcDirectory { base: url };
69 opts.request_timeout = Some(Duration::from_secs(3));
70
71 // no jacquard cache - we manage our own
72 jacquards.push(JacquardResolver::new(http.clone(), opts));
73 }
74
75 if jacquards.is_empty() {
76 panic!("at least one PLC URL must be provided");
77 }
78
79 Self {
80 inner: Arc::new(ResolverInner {
81 jacquards,
82 next_idx: AtomicUsize::new(0),
83 key_cache: HashCache::with_capacity(
84 std::cmp::min(1000, (identity_cache_size / 100) as usize),
85 identity_cache_size as usize,
86 ),
87 }),
88 }
89 }
90
91 async fn req<'r, T, Fut>(
92 &'r self,
93 is_plc: bool,
94 f: impl Fn(&'r JacquardResolver) -> Fut,
95 ) -> Result<T, ResolverError>
96 where
97 Fut: Future<Output = Result<T, IdentityError>>,
98 {
99 let mut idx =
100 self.inner.next_idx.fetch_add(1, Ordering::Relaxed) % self.inner.jacquards.len();
101 let mut try_count = 0;
102 loop {
103 let res = f(&self.inner.jacquards[idx]).await;
104 try_count += 1;
105 // retry these with the different plc resolvers
106 if is_plc {
107 let is_retriable = matches!(
108 res.as_ref().map_err(|e| e.kind()),
109 Err(IdentityErrorKind::HttpStatus(StatusCode::TOO_MANY_REQUESTS)
110 | IdentityErrorKind::Transport(_))
111 );
112 // check if retriable and we haven't gone through all the plc resolvers
113 if is_retriable && try_count < self.inner.jacquards.len() {
114 idx = (idx + 1) % self.inner.jacquards.len();
115 continue;
116 }
117 }
118 return res.map_err(Into::into);
119 }
120 }
121
122 pub async fn resolve_did(
123 &self,
124 identifier: &AtIdentifier<'_>,
125 ) -> Result<Did<'static>, ResolverError> {
126 match identifier {
127 AtIdentifier::Did(did) => Ok(did.clone().into_static()),
128 AtIdentifier::Handle(handle) => {
129 let did = self.req(false, |j| j.resolve_handle(handle)).await?;
130 Ok(did.into_static())
131 }
132 }
133 }
134
135 pub async fn resolve_identity_info(
136 &self,
137 did: &Did<'_>,
138 ) -> Result<(Url, Option<Handle<'_>>), ResolverError> {
139 let doc_resp = self
140 .req(did.starts_with("did:plc:"), |j| j.resolve_did_doc(did))
141 .await?;
142 let doc = doc_resp.parse()?;
143
144 let pds = doc
145 .pds_endpoint()
146 .ok_or_else(|| miette::miette!("no PDS service found in DID Doc for {did}"))?;
147
148 let mut handles = doc.handles();
149 let handle = handles.is_empty().not().then(|| handles.remove(0));
150
151 Ok((pds, handle))
152 }
153
154 pub async fn resolve_signing_key(
155 &self,
156 did: &Did<'_>,
157 ) -> Result<PublicKey<'static>, ResolverError> {
158 let did = did.clone().into_static();
159 if let Some(entry) = self.inner.key_cache.get_async(&did).await {
160 return Ok(entry.get().clone());
161 }
162
163 let doc_resp = self
164 .req(did.starts_with("did:plc:"), |j| j.resolve_did_doc(&did))
165 .await?;
166 let doc = doc_resp.parse()?;
167
168 let key = doc
169 .atproto_public_key()
170 .into_diagnostic()?
171 .ok_or_else(|| NoSigningKeyError(did.clone()))
172 .into_diagnostic()?;
173
174 let _ = self.inner.key_cache.put_async(did, key.clone()).await;
175
176 Ok(key)
177 }
178}
179
180#[derive(Debug, Diagnostic, Error)]
181#[error("no atproto signing key in DID doc for {0}")]
182pub struct NoSigningKeyError(Did<'static>);