at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
1use std::ops::Not;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicUsize, Ordering};
4use std::time::Duration;
5
6use jacquard_common::IntoStatic;
7use jacquard_common::types::crypto::PublicKey;
8use jacquard_common::types::ident::AtIdentifier;
9use jacquard_common::types::string::Did;
10use jacquard_common::types::string::Handle;
11use jacquard_identity::JacquardResolver;
12use jacquard_identity::resolver::{
13 IdentityError, IdentityErrorKind, IdentityResolver, PlcSource, ResolverOptions,
14};
15use miette::{Diagnostic, IntoDiagnostic};
16use reqwest::StatusCode;
17use scc::HashCache;
18use smol_str::SmolStr;
19use thiserror::Error;
20use url::Url;
21
22#[derive(Debug, Diagnostic, Error)]
23pub enum ResolverError {
24 #[error("{0}")]
25 Generic(miette::Report),
26 #[error("too many requests")]
27 Ratelimited,
28 #[error("transport error: {0}")]
29 Transport(SmolStr),
30}
31
32impl From<IdentityError> for ResolverError {
33 fn from(e: IdentityError) -> Self {
34 match e.kind() {
35 IdentityErrorKind::HttpStatus(reqwest::StatusCode::TOO_MANY_REQUESTS) => {
36 Self::Ratelimited
37 }
38 IdentityErrorKind::Transport(msg) => Self::Transport(msg.clone()),
39 _ => Self::Generic(e.into()),
40 }
41 }
42}
43
44impl From<miette::Report> for ResolverError {
45 fn from(report: miette::Report) -> Self {
46 ResolverError::Generic(report)
47 }
48}
49
50#[derive(Clone)]
51pub struct MiniDoc {
52 pub pds: Url,
53 pub handle: Option<Handle<'static>>,
54 pub key: Option<PublicKey<'static>>,
55}
56
57struct ResolverInner {
58 jacquards: Vec<JacquardResolver>,
59 next_idx: AtomicUsize,
60 cache: HashCache<Did<'static>, MiniDoc>,
61}
62
63#[derive(Clone)]
64pub struct Resolver {
65 inner: Arc<ResolverInner>,
66}
67
68impl Resolver {
69 pub fn new(plc_urls: Vec<Url>, identity_cache_size: u64) -> Self {
70 let http = reqwest::Client::new();
71 let mut jacquards = Vec::with_capacity(plc_urls.len());
72
73 for url in plc_urls {
74 let mut opts = ResolverOptions::default();
75 opts.plc_source = PlcSource::PlcDirectory { base: url };
76 opts.request_timeout = Some(Duration::from_secs(3));
77
78 jacquards.push(JacquardResolver::new(http.clone(), opts).with_system_dns());
79 }
80
81 if jacquards.is_empty() {
82 panic!("at least one PLC URL must be provided");
83 }
84
85 Self {
86 inner: Arc::new(ResolverInner {
87 jacquards,
88 next_idx: AtomicUsize::new(0),
89 cache: HashCache::with_capacity(
90 std::cmp::min(1000, (identity_cache_size / 100) as usize),
91 identity_cache_size as usize,
92 ),
93 }),
94 }
95 }
96
97 pub async fn invalidate(&self, did: &Did<'_>) {
98 self.inner
99 .cache
100 .remove_async(&did.clone().into_static())
101 .await;
102 }
103
104 pub fn invalidate_sync(&self, did: &Did<'_>) {
105 self.inner.cache.remove_sync(&did.clone().into_static());
106 }
107
108 async fn req<'r, T, Fut>(
109 &'r self,
110 is_plc: bool,
111 f: impl Fn(&'r JacquardResolver) -> Fut,
112 ) -> Result<T, ResolverError>
113 where
114 Fut: Future<Output = Result<T, IdentityError>>,
115 {
116 let mut idx =
117 self.inner.next_idx.fetch_add(1, Ordering::Relaxed) % self.inner.jacquards.len();
118 let mut try_count = 0;
119 loop {
120 let res = f(&self.inner.jacquards[idx]).await;
121 try_count += 1;
122 // retry these with the different plc resolvers
123 if is_plc {
124 let is_retriable = matches!(
125 res.as_ref().map_err(|e| e.kind()),
126 Err(IdentityErrorKind::HttpStatus(StatusCode::TOO_MANY_REQUESTS)
127 | IdentityErrorKind::Transport(_))
128 );
129 // check if retriable and we haven't gone through all the plc resolvers
130 if is_retriable && try_count < self.inner.jacquards.len() {
131 idx = (idx + 1) % self.inner.jacquards.len();
132 continue;
133 }
134 }
135 return res.map_err(Into::into);
136 }
137 }
138
139 pub async fn resolve_did(
140 &self,
141 identifier: &AtIdentifier<'_>,
142 ) -> Result<Did<'static>, ResolverError> {
143 match identifier {
144 AtIdentifier::Did(did) => Ok(did.clone().into_static()),
145 AtIdentifier::Handle(handle) => {
146 let did = self.req(false, |j| j.resolve_handle(handle)).await?;
147 Ok(did.into_static())
148 }
149 }
150 }
151
152 pub async fn resolve_doc(&self, did: &Did<'_>) -> Result<MiniDoc, ResolverError> {
153 let did_static = did.clone().into_static();
154 if let Some(entry) = self.inner.cache.get_async(&did_static).await {
155 return Ok(entry.get().clone());
156 }
157
158 let doc_resp = self
159 .req(did.starts_with("did:plc:"), |j| j.resolve_did_doc(did))
160 .await?;
161 let doc = doc_resp.parse()?;
162
163 let pds = doc
164 .pds_endpoint()
165 .ok_or_else(|| miette::miette!("no PDS service found in DID Doc for {did}"))?;
166
167 let mut handles = doc.handles();
168 let handle = handles
169 .is_empty()
170 .not()
171 .then(|| handles.remove(0).into_static());
172 let key = doc.atproto_public_key().ok().flatten();
173
174 let mini = MiniDoc { pds, handle, key };
175 let _ = self.inner.cache.put_async(did_static, mini.clone()).await;
176 Ok(mini)
177 }
178
179 pub async fn resolve_identity_info(
180 &self,
181 did: &Did<'_>,
182 ) -> Result<(Url, Option<Handle<'static>>), ResolverError> {
183 let mini = self.resolve_doc(did).await?;
184 Ok((mini.pds, mini.handle))
185 }
186
187 pub async fn resolve_signing_key(
188 &self,
189 did: &Did<'_>,
190 ) -> Result<PublicKey<'static>, ResolverError> {
191 let did = did.clone().into_static();
192 let mini = self.resolve_doc(&did).await?;
193 Ok(mini
194 .key
195 .ok_or_else(|| NoSigningKeyError(did))
196 .into_diagnostic()?)
197 }
198}
199
200#[derive(Debug, Diagnostic, Error)]
201#[error("no atproto signing key in DID doc for {0}")]
202pub struct NoSigningKeyError(Did<'static>);