tangled
alpha
login
or
join now
ptr.pet
/
hydrant
29
fork
atom
at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
29
fork
atom
overview
issues
6
pulls
pipelines
[resolver] implement failover for plc requests
ptr.pet
4 weeks ago
4ef5c943
8e6111ba
verified
This commit was signed with the committer's
known signature
.
ptr.pet
SSH Key Fingerprint:
SHA256:Abmvag+juovVufZTxyWY8KcVgrznxvBjQpJesv071Aw=
+37
-6
1 changed file
expand all
collapse all
unified
split
src
resolver.rs
+37
-6
src/resolver.rs
···
13
13
IdentityError, IdentityErrorKind, IdentityResolver, PlcSource, ResolverOptions,
14
14
};
15
15
use miette::{Diagnostic, IntoDiagnostic};
16
16
+
use reqwest::StatusCode;
16
17
use scc::HashCache;
17
18
use smol_str::SmolStr;
18
19
use thiserror::Error;
···
87
88
}
88
89
}
89
90
90
90
-
fn get_jacquard(&self) -> &JacquardResolver {
91
91
-
let idx = self.inner.next_idx.fetch_add(1, Ordering::Relaxed) % self.inner.jacquards.len();
92
92
-
&self.inner.jacquards[idx]
91
91
+
async fn req<'r, T, Fut>(
92
92
+
&'r self,
93
93
+
is_plc: bool,
94
94
+
f: impl Fn(&'r JacquardResolver) -> Fut,
95
95
+
) -> Result<T, ResolverError>
96
96
+
where
97
97
+
Fut: Future<Output = Result<T, IdentityError>>,
98
98
+
{
99
99
+
let mut idx =
100
100
+
self.inner.next_idx.fetch_add(1, Ordering::Relaxed) % self.inner.jacquards.len();
101
101
+
let mut try_count = 0;
102
102
+
loop {
103
103
+
let res = f(&self.inner.jacquards[idx]).await;
104
104
+
try_count += 1;
105
105
+
// retry these with the different plc resolvers
106
106
+
if is_plc {
107
107
+
let is_retriable = matches!(
108
108
+
res.as_ref().map_err(|e| e.kind()),
109
109
+
Err(IdentityErrorKind::HttpStatus(StatusCode::TOO_MANY_REQUESTS)
110
110
+
| IdentityErrorKind::Transport(_))
111
111
+
);
112
112
+
// check if retriable and we haven't gone through all the plc resolvers
113
113
+
if is_retriable && try_count < self.inner.jacquards.len() {
114
114
+
idx = (idx + 1) % self.inner.jacquards.len();
115
115
+
continue;
116
116
+
}
117
117
+
}
118
118
+
return res.map_err(Into::into);
119
119
+
}
93
120
}
94
121
95
122
pub async fn resolve_did(
···
99
126
match identifier {
100
127
AtIdentifier::Did(did) => Ok(did.clone().into_static()),
101
128
AtIdentifier::Handle(handle) => {
102
102
-
let did = self.get_jacquard().resolve_handle(handle).await?;
129
129
+
let did = self.req(false, |j| j.resolve_handle(handle)).await?;
103
130
Ok(did.into_static())
104
131
}
105
132
}
···
109
136
&self,
110
137
did: &Did<'_>,
111
138
) -> Result<(Url, Option<Handle<'_>>), ResolverError> {
112
112
-
let doc_resp = self.get_jacquard().resolve_did_doc(did).await?;
139
139
+
let doc_resp = self
140
140
+
.req(did.starts_with("did:plc:"), |j| j.resolve_did_doc(did))
141
141
+
.await?;
113
142
let doc = doc_resp.parse()?;
114
143
115
144
let pds = doc
···
131
160
return Ok(entry.get().clone());
132
161
}
133
162
134
134
-
let doc_resp = self.get_jacquard().resolve_did_doc(&did).await?;
163
163
+
let doc_resp = self
164
164
+
.req(did.starts_with("did:plc:"), |j| j.resolve_did_doc(&did))
165
165
+
.await?;
135
166
let doc = doc_resp.parse()?;
136
167
137
168
let key = doc