+2
-2
Cargo.lock
+2
-2
Cargo.lock
···
803
803
804
804
[[package]]
805
805
name = "bytes"
806
-
version = "1.11.1"
806
+
version = "1.10.1"
807
807
source = "registry+https://github.com/rust-lang/crates.io-index"
808
-
checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33"
808
+
checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a"
809
809
810
810
[[package]]
811
811
name = "byteview"
slingshot/Cargo.toml
slingshot/Cargo.toml
This file has not been changed.
+8
slingshot/src/error.rs
+8
slingshot/src/error.rs
···
100
100
UrlParseError(#[from] url::ParseError),
101
101
#[error(transparent)]
102
102
ReqwestError(#[from] reqwest::Error),
103
+
#[error(transparent)]
104
+
InvalidHeader(#[from] reqwest::header::InvalidHeaderValue),
105
+
#[error(transparent)]
106
+
IdentityError(#[from] IdentityError),
107
+
#[error("upstream service could not be resolved")]
108
+
ServiceNotFound,
109
+
#[error("upstream service was found but no services matched")]
110
+
ServiceNotMatched,
103
111
}
slingshot/src/lib.rs
slingshot/src/lib.rs
This file has not been changed.
+20
-6
slingshot/src/main.rs
+20
-6
slingshot/src/main.rs
···
1
-
// use foyer::HybridCache;
2
-
// use foyer::{Engine, DirectFsDeviceOptions, HybridCacheBuilder};
3
1
use metrics_exporter_prometheus::PrometheusBuilder;
4
2
use slingshot::{
5
3
Identity, Proxy, Repo, consume, error::MainTaskError, firehose_cache, healthcheck, serve,
···
9
7
10
8
use clap::Parser;
11
9
use tokio_util::sync::CancellationToken;
10
+
use url::Url;
12
11
13
12
/// Slingshot record edge cache
14
13
#[derive(Parser, Debug, Clone)]
···
48
47
#[arg(long, env = "SLINGSHOT_IDENTITY_CACHE_DISK_DB")]
49
48
#[clap(default_value_t = 1)]
50
49
identity_cache_disk_gb: usize,
50
+
/// the address of this server
51
+
///
52
+
/// used if --acme-domain is not set, defaulting to `--bind`
53
+
#[arg(long, conflicts_with("acme_domain"), env = "SLINGSHOT_PUBLIC_HOST")]
54
+
base_url: Option<Url>,
51
55
/// the domain pointing to this server
52
56
///
53
57
/// if present:
···
101
105
102
106
let args = Args::parse();
103
107
108
+
let base_url: Url = args
109
+
.base_url
110
+
.or_else(|| {
111
+
args.acme_domain
112
+
.as_ref()
113
+
.map(|d| Url::parse(&format!("https://{d}")).unwrap())
114
+
})
115
+
.unwrap_or_else(|| Url::parse(&format!("http://{}", args.bind)).unwrap());
116
+
104
117
if args.collect_metrics {
105
118
log::trace!("installing metrics server...");
106
119
if let Err(e) = install_metrics_server(args.bind_metrics) {
···
152
165
log::info!("identity service ready.");
153
166
154
167
let repo = Repo::new(identity.clone());
155
-
let proxy = Proxy::new(repo.clone());
168
+
let proxy = Proxy::new(identity.clone());
156
169
157
170
let identity_for_server = identity.clone();
158
171
let server_shutdown = shutdown.clone();
···
164
177
identity_for_server,
165
178
repo,
166
179
proxy,
180
+
base_url,
167
181
args.acme_domain,
168
182
args.acme_contact,
169
183
args.acme_cache_path,
···
236
250
) -> Result<(), metrics_exporter_prometheus::BuildError> {
237
251
log::info!("installing metrics server...");
238
252
PrometheusBuilder::new()
239
-
.set_quantiles(&[0.5, 0.9, 0.99, 1.0])?
240
-
.set_bucket_duration(std::time::Duration::from_secs(300))?
241
-
.set_bucket_count(std::num::NonZero::new(12).unwrap()) // count * duration = 60 mins. stuff doesn't happen that fast here.
253
+
.set_buckets(&[0.001, 0.006, 0.036, 0.216, 1.296, 7.776, 45.656])?
254
+
.set_bucket_duration(std::time::Duration::from_secs(15))?
255
+
.set_bucket_count(std::num::NonZero::new(4).unwrap()) // count * duration = bucket lifetime
242
256
.set_enable_unit_suffix(false) // this seemed buggy for constellation (sometimes wouldn't engage)
243
257
.with_http_listener(bind_metrics)
244
258
.install()?;
+247
-133
slingshot/src/proxy.rs
+247
-133
slingshot/src/proxy.rs
···
1
-
use serde::Deserialize;
2
-
use url::Url;
3
-
use std::{collections::HashMap, time::Duration};
4
-
use crate::{Repo, server::HydrationSource, error::ProxyError};
1
+
use crate::{Identity, error::ProxyError, server::HydrationSource};
2
+
use atrium_api::types::string::{AtIdentifier, Cid, Did, Nsid, RecordKey};
5
3
use reqwest::Client;
6
4
use serde_json::{Map, Value};
5
+
use std::{collections::HashMap, time::Duration};
6
+
use url::Url;
7
7
8
8
pub enum ParamValue {
9
9
String(Vec<String>),
···
13
13
pub struct Params(HashMap<String, ParamValue>);
14
14
15
15
impl TryFrom<Map<String, Value>> for Params {
16
-
type Error = (); // TODO
16
+
type Error = (); // TODO
17
17
fn try_from(val: Map<String, Value>) -> Result<Self, Self::Error> {
18
18
let mut out = HashMap::new();
19
19
for (k, v) in val {
···
70
70
71
71
#[derive(Clone)]
72
72
pub struct Proxy {
73
-
repo: Repo,
73
+
identity: Identity,
74
74
client: Client,
75
75
}
76
76
77
77
impl Proxy {
78
-
pub fn new(repo: Repo) -> Self {
78
+
pub fn new(identity: Identity) -> Self {
79
79
let client = Client::builder()
80
80
.user_agent(format!(
81
81
"microcosm slingshot v{} (contact: @bad-example.com)",
···
85
85
.timeout(Duration::from_secs(6))
86
86
.build()
87
87
.unwrap();
88
-
Self { repo, client }
88
+
Self { client, identity }
89
89
}
90
90
91
91
pub async fn proxy(
92
92
&self,
93
-
xrpc: String,
94
-
service: String,
93
+
service_did: &Did,
94
+
service_id: &str,
95
+
xrpc: &Nsid,
96
+
authorization: Option<&str>,
97
+
atproto_accept_labelers: Option<&str>,
95
98
params: Option<Map<String, Value>>,
96
99
) -> Result<Value, ProxyError> {
97
-
98
-
// hackin it to start
99
-
100
-
// 1. assume did-web (TODO) and get the did doc
101
-
#[derive(Debug, Deserialize)]
102
-
struct ServiceDoc {
103
-
id: String,
104
-
service: Vec<ServiceItem>,
105
-
}
106
-
#[derive(Debug, Deserialize)]
107
-
struct ServiceItem {
108
-
id: String,
109
-
#[expect(unused)]
110
-
r#type: String,
111
-
#[serde(rename = "serviceEndpoint")]
112
-
service_endpoint: Url,
113
-
}
114
-
let dw = service.strip_prefix("did:web:").expect("a did web");
115
-
let (dw, service_id) = dw.split_once("#").expect("whatever");
116
-
let mut dw_url = Url::parse(&format!("https://{dw}"))?;
117
-
dw_url.set_path("/.well-known/did.json");
118
-
let doc: ServiceDoc = self.client
119
-
.get(dw_url)
120
-
.send()
100
+
let mut upstream: Url = self
101
+
.identity
102
+
.did_to_mini_service_doc(service_did)
121
103
.await?
122
-
.error_for_status()?
123
-
.json()
124
-
.await?;
104
+
.ok_or(ProxyError::ServiceNotFound)?
105
+
.get(service_id, None)
106
+
.ok_or(ProxyError::ServiceNotMatched)?
107
+
.endpoint
108
+
.parse()?;
125
109
126
-
assert_eq!(doc.id, format!("did:web:{}", dw));
110
+
upstream.set_path(&format!("/xrpc/{}", xrpc.as_str()));
127
111
128
-
let mut upstream = None;
129
-
for ServiceItem { id, service_endpoint, .. } in doc.service {
130
-
let Some((_, id)) = id.split_once("#") else { continue; };
131
-
if id != service_id { continue; };
132
-
upstream = Some(service_endpoint);
133
-
break;
134
-
}
135
-
136
-
// 2. proxy the request forward
137
-
let mut upstream = upstream.expect("to find it");
138
-
upstream.set_path(&format!("/xrpc/{xrpc}")); // TODO: validate nsid
139
-
140
112
if let Some(params) = params {
141
113
let mut query = upstream.query_pairs_mut();
142
114
let Params(ps) = params.try_into().expect("valid params");
···
161
133
}
162
134
}
163
135
164
-
// TODO: other headers to proxy
165
-
Ok(self.client
136
+
// TODO i mean maybe we should look for headers also in our headers but not obviously
137
+
let mut headers = reqwest::header::HeaderMap::new();
138
+
// TODO: check the jwt aud against the upstream!!!
139
+
if let Some(auth) = authorization {
140
+
headers.insert("Authorization", auth.try_into()?);
141
+
}
142
+
if let Some(aal) = atproto_accept_labelers {
143
+
headers.insert("atproto-accept-labelers", aal.try_into()?);
144
+
}
145
+
146
+
let t0 = std::time::Instant::now();
147
+
let res = self
148
+
.client
166
149
.get(upstream)
150
+
.headers(headers)
167
151
.send()
168
-
.await?
169
-
.error_for_status()?
170
-
.json()
171
-
.await?)
152
+
.await
153
+
.and_then(|r| r.error_for_status());
154
+
155
+
if res.is_ok() {
156
+
metrics::histogram!("slingshot_proxy_upstream_request", "success" => "true")
157
+
.record(t0.elapsed());
158
+
} else {
159
+
metrics::histogram!("slingshot_proxy_upstream_request", "success" => "false")
160
+
.record(t0.elapsed());
161
+
}
162
+
163
+
Ok(res?.json().await?)
172
164
}
173
165
}
174
166
···
188
180
while let Some((i, c)) = chars.next() {
189
181
match c {
190
182
'[' if in_bracket => return Err(format!("nested opening bracket not allowed, at {i}")),
191
-
'[' if key_acc.is_empty() => return Err(format!("missing key before opening bracket, at {i}")),
183
+
'[' if key_acc.is_empty() => {
184
+
return Err(format!("missing key before opening bracket, at {i}"));
185
+
}
192
186
'[' => in_bracket = true,
193
187
']' if in_bracket => {
194
188
in_bracket = false;
195
189
let key = std::mem::take(&mut key_acc);
196
190
let r#type = std::mem::take(&mut type_acc);
197
-
let t = if r#type.is_empty() { None } else { Some(r#type) };
191
+
let t = if r#type.is_empty() {
192
+
None
193
+
} else {
194
+
Some(r#type)
195
+
};
198
196
out.push(PathPart::Vector(key, t));
199
197
// peek ahead because we need a dot after array if there's more and i don't want to add more loop state
200
198
let Some((i, c)) = chars.next() else {
201
199
break;
202
200
};
203
201
if c != '.' {
204
-
return Err(format!("expected dot after close bracket, found {c:?} at {i}"));
202
+
return Err(format!(
203
+
"expected dot after close bracket, found {c:?} at {i}"
204
+
));
205
205
}
206
206
}
207
207
']' => return Err(format!("unexpected close bracket at {i}")),
208
208
'.' if in_bracket => type_acc.push(c),
209
-
'.' if key_acc.is_empty() => return Err(format!("missing key before next segment, at {i}")),
209
+
'.' if key_acc.is_empty() => {
210
+
return Err(format!("missing key before next segment, at {i}"));
211
+
}
210
212
'.' => {
211
213
let key = std::mem::take(&mut key_acc);
212
214
assert!(type_acc.is_empty());
···
251
253
}
252
254
253
255
#[derive(Debug, PartialEq)]
256
+
pub struct FullAtUriParts {
257
+
pub repo: AtIdentifier,
258
+
pub collection: Nsid,
259
+
pub rkey: RecordKey,
260
+
pub cid: Option<Cid>,
261
+
}
262
+
263
+
impl FullAtUriParts {
264
+
pub fn to_uri(&self) -> String {
265
+
let repo: String = self.repo.clone().into(); // no as_str for AtIdentifier atrium???
266
+
let collection = self.collection.as_str();
267
+
let rkey = self.rkey.as_str();
268
+
format!("at://{repo}/{collection}/{rkey}")
269
+
}
270
+
}
271
+
272
+
// TODO: move this to links
273
+
pub fn split_uri(uri: &str) -> Option<(AtIdentifier, Nsid, RecordKey)> {
274
+
let rest = uri.strip_prefix("at://")?;
275
+
let (repo, rest) = rest.split_once("/")?;
276
+
let repo = repo.parse().ok()?;
277
+
let (collection, rkey) = rest.split_once("/")?;
278
+
let collection = collection.parse().ok()?;
279
+
let rkey = rkey.split_once('#').map(|(k, _)| k).unwrap_or(rkey);
280
+
let rkey = rkey.split_once('?').map(|(k, _)| k).unwrap_or(rkey);
281
+
let rkey = rkey.parse().ok()?;
282
+
Some((repo, collection, rkey))
283
+
}
284
+
285
+
#[derive(Debug, PartialEq)]
254
286
pub enum MatchedRef {
255
-
AtUri {
256
-
uri: String,
257
-
cid: Option<String>,
258
-
},
259
-
Identifier(String),
287
+
AtUri(FullAtUriParts),
288
+
Identifier(AtIdentifier),
260
289
}
261
290
262
291
pub fn match_shape(shape: RefShape, val: &Value) -> Option<MatchedRef> {
···
268
297
RefShape::StrongRef => {
269
298
let o = val.as_object()?;
270
299
let uri = o.get("uri")?.as_str()?.to_string();
271
-
let cid = o.get("cid")?.as_str()?.to_string();
272
-
Some(MatchedRef::AtUri { uri, cid: Some(cid) })
300
+
let cid = o.get("cid")?.as_str()?.parse().ok()?;
301
+
let (repo, collection, rkey) = split_uri(&uri)?;
302
+
Some(MatchedRef::AtUri(FullAtUriParts {
303
+
repo,
304
+
collection,
305
+
rkey,
306
+
cid: Some(cid),
307
+
}))
273
308
}
274
309
RefShape::AtUri => {
275
310
let uri = val.as_str()?.to_string();
276
-
Some(MatchedRef::AtUri { uri, cid: None })
311
+
let (repo, collection, rkey) = split_uri(&uri)?;
312
+
Some(MatchedRef::AtUri(FullAtUriParts {
313
+
repo,
314
+
collection,
315
+
rkey,
316
+
cid: None,
317
+
}))
277
318
}
278
319
RefShape::AtUriParts => {
279
320
let o = val.as_object()?;
280
-
let identifier = o.get("repo").or(o.get("did"))?.as_str()?.to_string();
281
-
let collection = o.get("collection")?.as_str()?.to_string();
282
-
let rkey = o.get("rkey")?.as_str()?.to_string();
283
-
let uri = format!("at://{identifier}/{collection}/{rkey}");
284
-
let cid = o.get("cid").and_then(|v| v.as_str()).map(str::to_string);
285
-
Some(MatchedRef::AtUri { uri, cid })
321
+
let repo = o.get("repo").or(o.get("did"))?.as_str()?.parse().ok()?;
322
+
let collection = o.get("collection")?.as_str()?.parse().ok()?;
323
+
let rkey = o.get("rkey")?.as_str()?.parse().ok()?;
324
+
let cid = o
325
+
.get("cid")
326
+
.and_then(|v| v.as_str())
327
+
.and_then(|s| s.parse().ok());
328
+
Some(MatchedRef::AtUri(FullAtUriParts {
329
+
repo,
330
+
collection,
331
+
rkey,
332
+
cid,
333
+
}))
286
334
}
287
335
RefShape::Did => {
288
-
let id = val.as_str()?;
289
-
if !id.starts_with("did:") {
290
-
return None;
291
-
}
292
-
Some(MatchedRef::Identifier(id.to_string()))
336
+
let did = val.as_str()?.parse().ok()?;
337
+
Some(MatchedRef::Identifier(AtIdentifier::Did(did)))
293
338
}
294
339
RefShape::Handle => {
295
-
let id = val.as_str()?;
296
-
if id.contains(':') {
297
-
return None;
298
-
}
299
-
Some(MatchedRef::Identifier(id.to_string()))
340
+
let handle = val.as_str()?.parse().ok()?;
341
+
Some(MatchedRef::Identifier(AtIdentifier::Handle(handle)))
300
342
}
301
343
RefShape::AtIdentifier => {
302
-
Some(MatchedRef::Identifier(val.as_str()?.to_string()))
344
+
let identifier = val.as_str()?.parse().ok()?;
345
+
Some(MatchedRef::Identifier(identifier))
303
346
}
304
347
}
305
348
}
···
339
382
}
340
383
impl<'a> PathWalker<'a> {
341
384
fn new(path_parts: &'a [PathPart], skeleton: &'a Value) -> Self {
342
-
Self { todo: vec![(path_parts, skeleton)] }
385
+
Self {
386
+
todo: vec![(path_parts, skeleton)],
387
+
}
343
388
}
344
389
}
345
390
impl<'a> Iterator for PathWalker<'a> {
···
364
409
let Some(a) = o.get(k).and_then(|v| v.as_array()) else {
365
410
continue;
366
411
};
367
-
for v in a
368
-
.iter()
369
-
.rev()
370
-
.filter(|c| {
371
-
let Some(t) = t else { return true };
372
-
c
373
-
.as_object()
374
-
.and_then(|o| o.get("$type"))
375
-
.and_then(|v| v.as_str())
376
-
.map(|s| s == t)
377
-
.unwrap_or(false)
378
-
})
379
-
{
412
+
for v in a.iter().rev().filter(|c| {
413
+
let Some(t) = t else { return true };
414
+
c.as_object()
415
+
.and_then(|o| o.get("$type"))
416
+
.and_then(|v| v.as_str())
417
+
.map(|s| s == t)
418
+
.unwrap_or(false)
419
+
}) {
380
420
self.todo.push((rest, v))
381
421
}
382
422
}
···
385
425
}
386
426
}
387
427
388
-
389
428
#[cfg(test)]
390
429
mod tests {
391
430
use super::*;
392
431
use serde_json::json;
393
432
433
+
static TEST_CID: &str = "bafyreidffwk5wvh5l76yy7zefiqrovv6yaaegb4wg4zaq35w7nt3quix5a";
434
+
394
435
#[test]
395
436
fn test_parse_record_path() -> Result<(), Box<dyn std::error::Error>> {
396
437
let cases = [
397
438
("", vec![]),
398
439
("subject", vec![PathPart::Scalar("subject".into())]),
399
440
("authorDid", vec![PathPart::Scalar("authorDid".into())]),
400
-
("subject.uri", vec![PathPart::Scalar("subject".into()), PathPart::Scalar("uri".into())]),
441
+
(
442
+
"subject.uri",
443
+
vec![
444
+
PathPart::Scalar("subject".into()),
445
+
PathPart::Scalar("uri".into()),
446
+
],
447
+
),
401
448
("members[]", vec![PathPart::Vector("members".into(), None)]),
402
-
("add[].key", vec![
403
-
PathPart::Vector("add".into(), None),
404
-
PathPart::Scalar("key".into()),
405
-
]),
449
+
(
450
+
"add[].key",
451
+
vec![
452
+
PathPart::Vector("add".into(), None),
453
+
PathPart::Scalar("key".into()),
454
+
],
455
+
),
406
456
("a[b]", vec![PathPart::Vector("a".into(), Some("b".into()))]),
407
-
("a[b.c]", vec![PathPart::Vector("a".into(), Some("b.c".into()))]),
408
-
("facets[app.bsky.richtext.facet].features[app.bsky.richtext.facet#mention].did", vec![
409
-
PathPart::Vector("facets".into(), Some("app.bsky.richtext.facet".into())),
410
-
PathPart::Vector("features".into(), Some("app.bsky.richtext.facet#mention".into())),
411
-
PathPart::Scalar("did".into()),
412
-
]),
457
+
(
458
+
"a[b.c]",
459
+
vec![PathPart::Vector("a".into(), Some("b.c".into()))],
460
+
),
461
+
(
462
+
"facets[app.bsky.richtext.facet].features[app.bsky.richtext.facet#mention].did",
463
+
vec![
464
+
PathPart::Vector("facets".into(), Some("app.bsky.richtext.facet".into())),
465
+
PathPart::Vector(
466
+
"features".into(),
467
+
Some("app.bsky.richtext.facet#mention".into()),
468
+
),
469
+
PathPart::Scalar("did".into()),
470
+
],
471
+
),
413
472
];
414
473
415
474
for (path, expected) in cases {
···
426
485
("strong-ref", json!(""), None),
427
486
("strong-ref", json!({}), None),
428
487
("strong-ref", json!({ "uri": "abc" }), None),
429
-
("strong-ref", json!({ "cid": "def" }), None),
488
+
("strong-ref", json!({ "cid": TEST_CID }), None),
430
489
(
431
490
"strong-ref",
432
-
json!({ "uri": "abc", "cid": "def" }),
433
-
Some(MatchedRef::AtUri { uri: "abc".to_string(), cid: Some("def".to_string()) }),
491
+
json!({ "uri": "at://a.com/xx.yy.zz/1", "cid": TEST_CID }),
492
+
Some(MatchedRef::AtUri(FullAtUriParts {
493
+
repo: "a.com".parse().unwrap(),
494
+
collection: "xx.yy.zz".parse().unwrap(),
495
+
rkey: "1".parse().unwrap(),
496
+
cid: Some(TEST_CID.parse().unwrap()),
497
+
})),
434
498
),
435
499
("at-uri", json!({ "uri": "abc" }), None),
436
-
("at-uri", json!({ "uri": "abc", "cid": "def" }), None),
437
500
(
438
501
"at-uri",
439
-
json!("abc"),
440
-
Some(MatchedRef::AtUri { uri: "abc".to_string(), cid: None }),
502
+
json!({ "uri": "at://did:web:y.com/xx.yy.zz/1", "cid": TEST_CID }),
503
+
None,
441
504
),
505
+
(
506
+
"at-uri",
507
+
json!("at://did:web:y.com/xx.yy.zz/1"),
508
+
Some(MatchedRef::AtUri(FullAtUriParts {
509
+
repo: "did:web:y.com".parse().unwrap(),
510
+
collection: "xx.yy.zz".parse().unwrap(),
511
+
rkey: "1".parse().unwrap(),
512
+
cid: None,
513
+
})),
514
+
),
442
515
("at-uri-parts", json!("abc"), None),
443
516
("at-uri-parts", json!({}), None),
444
517
(
445
518
"at-uri-parts",
446
-
json!({"repo": "a", "collection": "b", "rkey": "c"}),
447
-
Some(MatchedRef::AtUri { uri: "at://a/b/c".to_string(), cid: None }),
519
+
json!({"repo": "a.com", "collection": "xx.yy.zz", "rkey": "1", "cid": TEST_CID}),
520
+
Some(MatchedRef::AtUri(FullAtUriParts {
521
+
repo: "a.com".parse().unwrap(),
522
+
collection: "xx.yy.zz".parse().unwrap(),
523
+
rkey: "1".parse().unwrap(),
524
+
cid: Some(TEST_CID.parse().unwrap()),
525
+
})),
448
526
),
449
527
(
450
528
"at-uri-parts",
451
-
json!({"did": "a", "collection": "b", "rkey": "c"}),
452
-
Some(MatchedRef::AtUri { uri: "at://a/b/c".to_string(), cid: None }),
529
+
json!({"did": "a.com", "collection": "xx.yy.zz", "rkey": "1"}),
530
+
Some(MatchedRef::AtUri(FullAtUriParts {
531
+
repo: "a.com".parse().unwrap(),
532
+
collection: "xx.yy.zz".parse().unwrap(),
533
+
rkey: "1".parse().unwrap(),
534
+
cid: None,
535
+
})),
453
536
),
454
537
(
455
538
"at-uri-parts",
456
539
// 'repo' takes precedence over 'did'
457
-
json!({"did": "a", "repo": "z", "collection": "b", "rkey": "c"}),
458
-
Some(MatchedRef::AtUri { uri: "at://z/b/c".to_string(), cid: None }),
540
+
json!({"did": "did:web:a.com", "repo": "z.com", "collection": "xx.yy.zz", "rkey": "1"}),
541
+
Some(MatchedRef::AtUri(FullAtUriParts {
542
+
repo: "z.com".parse().unwrap(),
543
+
collection: "xx.yy.zz".parse().unwrap(),
544
+
rkey: "1".parse().unwrap(),
545
+
cid: None,
546
+
})),
459
547
),
460
548
(
461
549
"at-uri-parts",
462
-
json!({"repo": "a", "collection": "b", "rkey": "c", "cid": "def"}),
463
-
Some(MatchedRef::AtUri { uri: "at://a/b/c".to_string(), cid: Some("def".to_string()) }),
550
+
json!({"repo": "a.com", "collection": "xx.yy.zz", "rkey": "1", "cid": TEST_CID}),
551
+
Some(MatchedRef::AtUri(FullAtUriParts {
552
+
repo: "a.com".parse().unwrap(),
553
+
collection: "xx.yy.zz".parse().unwrap(),
554
+
rkey: "1".parse().unwrap(),
555
+
cid: Some(TEST_CID.parse().unwrap()),
556
+
})),
464
557
),
465
558
(
466
559
"at-uri-parts",
467
-
json!({"repo": "a", "collection": "b", "rkey": "c", "cid": {}}),
468
-
Some(MatchedRef::AtUri { uri: "at://a/b/c".to_string(), cid: None }),
560
+
json!({"repo": "a.com", "collection": "xx.yy.zz", "rkey": "1", "cid": {}}),
561
+
Some(MatchedRef::AtUri(FullAtUriParts {
562
+
repo: "a.com".parse().unwrap(),
563
+
collection: "xx.yy.zz".parse().unwrap(),
564
+
rkey: "1".parse().unwrap(),
565
+
cid: None,
566
+
})),
469
567
),
470
568
("did", json!({}), None),
471
569
("did", json!(""), None),
472
570
("did", json!("bad-example.com"), None),
473
-
("did", json!("did:plc:xyz"), Some(MatchedRef::Identifier("did:plc:xyz".to_string()))),
571
+
(
572
+
"did",
573
+
json!("did:plc:xyz"),
574
+
Some(MatchedRef::Identifier("did:plc:xyz".parse().unwrap())),
575
+
),
474
576
("handle", json!({}), None),
475
-
("handle", json!("bad-example.com"), Some(MatchedRef::Identifier("bad-example.com".to_string()))),
577
+
(
578
+
"handle",
579
+
json!("bad-example.com"),
580
+
Some(MatchedRef::Identifier("bad-example.com".parse().unwrap())),
581
+
),
476
582
("handle", json!("did:plc:xyz"), None),
477
583
("at-identifier", json!({}), None),
478
-
("at-identifier", json!("bad-example.com"), Some(MatchedRef::Identifier("bad-example.com".to_string()))),
479
-
("at-identifier", json!("did:plc:xyz"), Some(MatchedRef::Identifier("did:plc:xyz".to_string()))),
584
+
(
585
+
"at-identifier",
586
+
json!("bad-example.com"),
587
+
Some(MatchedRef::Identifier("bad-example.com".parse().unwrap())),
588
+
),
589
+
(
590
+
"at-identifier",
591
+
json!("did:plc:xyz"),
592
+
Some(MatchedRef::Identifier("did:plc:xyz".parse().unwrap())),
593
+
),
480
594
];
481
-
for (shape, val, expected) in cases {
595
+
for (i, (shape, val, expected)) in cases.into_iter().enumerate() {
482
596
let s = shape.try_into().unwrap();
483
597
let matched = match_shape(s, &val);
484
-
assert_eq!(matched, expected, "shape: {shape:?}, val: {val:?}");
598
+
assert_eq!(matched, expected, "{i}: shape: {shape:?}, val: {val:?}");
485
599
}
486
600
}
487
601
}
slingshot/src/record.rs
slingshot/src/record.rs
This file has not been changed.
+433
-138
slingshot/src/server.rs
+433
-138
slingshot/src/server.rs
···
1
1
use crate::{
2
2
CachedRecord, ErrorResponseObject, Identity, Proxy, Repo,
3
3
error::{RecordError, ServerError},
4
-
proxy::{extract_links, MatchedRef},
4
+
proxy::{FullAtUriParts, MatchedRef, extract_links, split_uri},
5
5
record::RawRecord,
6
6
};
7
-
use atrium_api::types::string::{Cid, Did, Handle, Nsid, RecordKey};
7
+
use atrium_api::types::string::{AtIdentifier, Cid, Did, Handle, Nsid, RecordKey};
8
8
use foyer::HybridCache;
9
9
use links::at_uri::parse_at_uri as normalize_at_uri;
10
10
use serde::Serialize;
11
-
use std::{path::PathBuf, str::FromStr, sync::Arc, time::Instant, collections::HashMap};
11
+
use std::{collections::HashMap, path::PathBuf, str::FromStr, sync::Arc, time::Instant};
12
12
use tokio::sync::mpsc;
13
13
use tokio_util::sync::CancellationToken;
14
14
···
24
24
};
25
25
use poem_openapi::{
26
26
ApiResponse, ContactObject, ExternalDocumentObject, Object, OpenApi, OpenApiService, Tags,
27
-
Union,
28
-
param::Query, payload::Json, types::Example,
27
+
Union, param::Query, payload::Json, types::Example,
29
28
};
30
29
31
30
fn example_handle() -> String {
···
34
33
fn example_did() -> String {
35
34
"did:plc:hdhoaan3xa3jiuq4fg4mefid".to_string()
36
35
}
36
+
fn example_service_did() -> String {
37
+
"did:web:constellation.microcosm.blue".to_string()
38
+
}
37
39
fn example_collection() -> String {
38
40
"app.bsky.feed.like".to_string()
39
41
}
40
42
fn example_rkey() -> String {
41
43
"3lv4ouczo2b2a".to_string()
42
44
}
45
+
fn example_id_fragment() -> String {
46
+
"#constellation".to_string()
47
+
}
43
48
fn example_uri() -> String {
44
49
format!(
45
50
···
55
60
"zQ3shpq1g134o7HGDb86CtQFxnHqzx5pZWknrVX2Waum3fF6j".to_string()
56
61
}
57
62
58
-
#[derive(Object)]
63
+
#[derive(Debug, Object)]
59
64
#[oai(example = true)]
60
65
struct XrpcErrorResponseObject {
61
66
/// Should correspond an error `name` in the lexicon errors array
···
86
91
}))
87
92
}
88
93
89
-
fn bad_request_handler_resolve_mini(err: poem::Error) -> ResolveMiniIDResponse {
90
-
ResolveMiniIDResponse::BadRequest(Json(XrpcErrorResponseObject {
94
+
fn bad_request_handler_resolve_mini(err: poem::Error) -> ResolveMiniDocResponse {
95
+
ResolveMiniDocResponse::BadRequest(Json(XrpcErrorResponseObject {
91
96
error: "InvalidRequest".to_string(),
92
97
message: format!("Bad request, here's some info that maybe should not be exposed: {err}"),
93
98
}))
94
99
}
95
100
101
+
fn bad_request_handler_resolve_service(err: poem::Error) -> ResolveServiceResponse {
102
+
ResolveServiceResponse::BadRequest(Json(XrpcErrorResponseObject {
103
+
error: "InvalidRequest".to_string(),
104
+
message: format!("Bad request, here's some info that maybe should not be exposed: {err}"),
105
+
}))
106
+
}
107
+
96
108
fn bad_request_handler_proxy_query(err: poem::Error) -> ProxyHydrateResponse {
97
109
ProxyHydrateResponse::BadRequest(Json(XrpcErrorResponseObject {
98
110
error: "InvalidRequest".to_string(),
···
189
201
190
202
#[derive(ApiResponse)]
191
203
#[oai(bad_request_handler = "bad_request_handler_resolve_mini")]
192
-
enum ResolveMiniIDResponse {
204
+
enum ResolveMiniDocResponse {
193
205
/// Identity resolved
194
206
#[oai(status = 200)]
195
207
Ok(Json<MiniDocResponseObject>),
···
199
211
}
200
212
201
213
#[derive(Object)]
202
-
struct ProxyHydrationError {
203
-
reason: String,
214
+
#[oai(example = true)]
215
+
struct ServiceResponseObject {
216
+
/// The service endpoint URL, if found
217
+
endpoint: String,
204
218
}
219
+
impl Example for ServiceResponseObject {
220
+
fn example() -> Self {
221
+
Self {
222
+
endpoint: "https://example.com".to_string(),
223
+
}
224
+
}
225
+
}
205
226
206
-
#[derive(Object)]
207
-
struct ProxyHydrationPending {
208
-
url: String,
227
+
#[derive(ApiResponse)]
228
+
#[oai(bad_request_handler = "bad_request_handler_resolve_service")]
229
+
enum ResolveServiceResponse {
230
+
/// Service resolved
231
+
#[oai(status = 200)]
232
+
Ok(Json<ServiceResponseObject>),
233
+
/// Bad request or service not resolved
234
+
#[oai(status = 400)]
235
+
BadRequest(XrpcError),
209
236
}
210
237
211
238
#[derive(Object)]
212
-
struct ProxyHydrationRecordFound {
213
-
record: serde_json::Value,
239
+
#[oai(rename_all = "camelCase")]
240
+
struct ProxyHydrationError {
241
+
/// Short description of why the hydration failed
242
+
reason: String,
243
+
/// Whether or not it's recommended to retry requesting this item
244
+
should_retry: bool,
245
+
/// URL to follow up at if retrying
246
+
follow_up: String,
214
247
}
215
248
216
249
#[derive(Object)]
217
-
struct ProxyHydrationIdentifierFound {
218
-
record: MiniDocResponseObject,
250
+
#[oai(rename_all = "camelCase")]
251
+
struct ProxyHydrationPending {
252
+
/// URL you can request to finish hydrating this item
253
+
follow_up: String,
254
+
/// Why this item couldn't be hydrated: 'deadline' or 'limit'
255
+
///
256
+
/// - `deadline`: the item fetch didn't complete before the response was
257
+
/// due, but will continue on slingshot in the background -- `followUp`
258
+
/// requests are coalesced into the original item fetch to be available as
259
+
/// early as possible.
260
+
///
261
+
/// - `limit`: slingshot only attempts to hydrate the first 100 items found
262
+
/// in a proxied response, with the remaining marked `pending`. You can
263
+
/// request `followUp` to fetch them.
264
+
///
265
+
/// In the future, Slingshot may put pending links after `limit` into a low-
266
+
/// priority fetch queue, so that these items become available sooner on
267
+
/// follow-up request as well.
268
+
reason: String,
219
269
}
220
270
221
271
// todo: there's gotta be a supertrait that collects these?
222
-
use poem_openapi::types::{Type, ToJSON, ParseFromJSON, IsObjectType};
272
+
use poem_openapi::types::{IsObjectType, ParseFromJSON, ToJSON, Type};
223
273
224
274
#[derive(Union)]
225
275
#[oai(discriminator_name = "status", rename_all = "camelCase")]
···
235
285
/// The original upstream response content
236
286
output: serde_json::Value,
237
287
/// Any hydrated records
238
-
records: HashMap<String, Hydration<ProxyHydrationRecordFound>>,
288
+
records: HashMap<String, Hydration<FoundRecordResponseObject>>,
239
289
/// Any hydrated identifiers
240
-
identifiers: HashMap<String, Hydration<ProxyHydrationIdentifierFound>>,
290
+
identifiers: HashMap<String, Hydration<MiniDocResponseObject>>,
241
291
}
242
292
impl Example for ProxyHydrateResponseObject {
243
293
fn example() -> Self {
244
294
Self {
245
295
output: serde_json::json!({}),
246
-
records: HashMap::from([
247
-
("asdf".into(), Hydration::Pending(ProxyHydrationPending { url: "todo".into() })),
248
-
]),
296
+
records: HashMap::from([(
297
+
"asdf".into(),
298
+
Hydration::Pending(ProxyHydrationPending {
299
+
follow_up: "/xrpc/com.atproto.repo.getRecord?...".to_string(),
300
+
reason: "deadline".to_string(),
301
+
}),
302
+
)]),
249
303
identifiers: HashMap::new(),
250
304
}
251
305
}
···
257
311
#[oai(status = 200)]
258
312
Ok(Json<ProxyHydrateResponseObject>),
259
313
#[oai(status = 400)]
260
-
BadRequest(XrpcError)
314
+
BadRequest(XrpcError),
261
315
}
262
316
263
317
#[derive(Object)]
···
282
336
xrpc: String,
283
337
/// The destination service the request will be forwarded to
284
338
atproto_proxy: String,
339
+
/// An optional auth token to pass on
340
+
///
341
+
/// the `aud` field must match the upstream atproto_proxy service
342
+
authorization: Option<String>,
343
+
/// An optional set of labelers to request be applied by the upstream
344
+
atproto_accept_labelers: Option<String>,
285
345
/// The `params` for the destination service XRPC endpoint
286
346
///
287
347
/// Currently this will be passed along unchecked, but a future version of
···
290
350
params: Option<serde_json::Value>,
291
351
/// Paths within the response to look for at-uris that can be hydrated
292
352
hydration_sources: Vec<HydrationSource>,
293
-
// todo: deadline thing
294
-
353
+
// todo: let clients pass a hydration deadline?
295
354
}
296
355
impl Example for ProxyQueryPayload {
297
356
fn example() -> Self {
298
357
Self {
299
358
xrpc: "app.bsky.feed.getFeedSkeleton".to_string(),
300
359
atproto_proxy: "did:web:blue.mackuba.eu#bsky_fg".to_string(),
360
+
authorization: None,
361
+
atproto_accept_labelers: None,
301
362
params: Some(serde_json::json!({
302
363
"feed": "at://did:plc:oio4hkxaop4ao4wz2pp3f4cr/app.bsky.feed.generator/atproto",
303
364
})),
304
-
hydration_sources: vec![
305
-
HydrationSource {
306
-
path: "feed[].post".to_string(),
307
-
shape: "at-uri".to_string(),
308
-
}
309
-
],
365
+
hydration_sources: vec![HydrationSource {
366
+
path: "feed[].post".to_string(),
367
+
shape: "at-uri".to_string(),
368
+
}],
310
369
}
311
370
}
312
371
}
···
340
399
}
341
400
342
401
struct Xrpc {
402
+
base_url: url::Url,
343
403
cache: HybridCache<String, CachedRecord>,
344
404
identity: Identity,
345
405
proxy: Arc<Proxy>,
···
408
468
/// only retains the most recent version of a record.
409
469
Query(cid): Query<Option<String>>,
410
470
) -> GetRecordResponse {
411
-
self.get_record_impl(repo, collection, rkey, cid).await
471
+
self.get_record_impl(&repo, &collection, &rkey, cid.as_deref())
472
+
.await
412
473
}
413
474
414
475
/// blue.microcosm.repo.getRecordByUri
···
478
539
return bad_at_uri();
479
540
};
480
541
481
-
// TODO: move this to links
482
-
let Some(rest) = normalized.strip_prefix("at://") else {
542
+
let Some((repo, collection, rkey)) = split_uri(&normalized) else {
483
543
return bad_at_uri();
484
544
};
485
-
let Some((repo, rest)) = rest.split_once('/') else {
486
-
return bad_at_uri();
487
-
};
488
-
let Some((collection, rest)) = rest.split_once('/') else {
489
-
return bad_at_uri();
490
-
};
491
-
let rkey = if let Some((rkey, _rest)) = rest.split_once('?') {
492
-
rkey
493
-
} else {
494
-
rest
495
-
};
496
545
497
546
self.get_record_impl(
498
-
repo.to_string(),
499
-
collection.to_string(),
500
-
rkey.to_string(),
501
-
cid,
547
+
Into::<String>::into(repo).as_str(),
548
+
collection.as_str(),
549
+
rkey.as_str(),
550
+
cid.as_deref(),
502
551
)
503
552
.await
504
553
}
···
578
627
/// Handle or DID to resolve
579
628
#[oai(example = "example_handle")]
580
629
Query(identifier): Query<String>,
581
-
) -> ResolveMiniIDResponse {
630
+
) -> ResolveMiniDocResponse {
582
631
self.resolve_mini_id(Query(identifier)).await
583
632
}
584
633
···
596
645
/// Handle or DID to resolve
597
646
#[oai(example = "example_handle")]
598
647
Query(identifier): Query<String>,
599
-
) -> ResolveMiniIDResponse {
648
+
) -> ResolveMiniDocResponse {
649
+
Self::resolve_mini_doc_impl(&identifier, self.identity.clone()).await
650
+
}
651
+
652
+
async fn resolve_mini_doc_impl(identifier: &str, identity: Identity) -> ResolveMiniDocResponse {
600
653
let invalid = |reason: &'static str| {
601
-
ResolveMiniIDResponse::BadRequest(xrpc_error("InvalidRequest", reason))
654
+
ResolveMiniDocResponse::BadRequest(xrpc_error("InvalidRequest", reason))
602
655
};
603
656
604
657
let mut unverified_handle = None;
605
-
let did = match Did::new(identifier.clone()) {
658
+
let did = match Did::new(identifier.to_string()) {
606
659
Ok(did) => did,
607
660
Err(_) => {
608
661
let Ok(alleged_handle) = Handle::new(identifier.to_lowercase()) else {
609
662
return invalid("Identifier was not a valid DID or handle");
610
663
};
611
664
612
-
match self.identity.handle_to_did(alleged_handle.clone()).await {
665
+
match identity.handle_to_did(alleged_handle.clone()).await {
613
666
Ok(res) => {
614
667
if let Some(did) = res {
615
668
// we did it joe
···
627
680
}
628
681
}
629
682
};
630
-
let Ok(partial_doc) = self.identity.did_to_partial_mini_doc(&did).await else {
683
+
let Ok(partial_doc) = identity.did_to_partial_mini_doc(&did).await else {
631
684
return invalid("Failed to get DID doc");
632
685
};
633
686
let Some(partial_doc) = partial_doc else {
···
647
700
"handle.invalid".to_string()
648
701
}
649
702
} else {
650
-
let Ok(handle_did) = self
651
-
.identity
703
+
let Ok(handle_did) = identity
652
704
.handle_to_did(partial_doc.unverified_handle.clone())
653
705
.await
654
706
else {
···
664
716
}
665
717
};
666
718
667
-
ResolveMiniIDResponse::Ok(Json(MiniDocResponseObject {
719
+
ResolveMiniDocResponse::Ok(Json(MiniDocResponseObject {
668
720
did: did.to_string(),
669
721
handle,
670
722
pds: partial_doc.pds,
···
672
724
}))
673
725
}
674
726
727
+
/// com.bad-example.identity.resolveService
728
+
///
729
+
/// resolve an atproto service did + id to its http endpoint
730
+
///
731
+
/// > [!important]
732
+
/// > this endpoint is experimental and may change
733
+
#[oai(
734
+
path = "/com.bad-example.identity.resolveService",
735
+
method = "get",
736
+
tag = "ApiTags::Custom"
737
+
)]
738
+
async fn resolve_service(
739
+
&self,
740
+
/// the service's did
741
+
#[oai(example = "example_service_did")]
742
+
Query(did): Query<String>,
743
+
/// id fragment, starting with '#'
744
+
///
745
+
/// must be url-encoded!
746
+
#[oai(example = "example_id_fragment")]
747
+
Query(id): Query<String>,
748
+
/// optionally, the exact service type to filter
749
+
///
750
+
/// resolving a pds requires matching the type as well as id. service
751
+
/// proxying ignores the type.
752
+
Query(r#type): Query<Option<String>>,
753
+
) -> ResolveServiceResponse {
754
+
let Ok(did) = Did::new(did) else {
755
+
return ResolveServiceResponse::BadRequest(xrpc_error(
756
+
"InvalidRequest",
757
+
"could not parse 'did' into a DID",
758
+
));
759
+
};
760
+
let identity = self.identity.clone();
761
+
Self::resolve_service_impl(&did, &id, r#type.as_deref(), identity).await
762
+
}
763
+
764
+
async fn resolve_service_impl(
765
+
did: &Did,
766
+
id_fragment: &str,
767
+
service_type: Option<&str>,
768
+
identity: Identity,
769
+
) -> ResolveServiceResponse {
770
+
let invalid = |reason: &'static str| {
771
+
ResolveServiceResponse::BadRequest(xrpc_error("InvalidRequest", reason))
772
+
};
773
+
let Ok(service_mini_doc) = identity.did_to_mini_service_doc(did).await else {
774
+
return invalid("Failed to get DID doc");
775
+
};
776
+
let Some(service_mini_doc) = service_mini_doc else {
777
+
return invalid("Failed to find DID doc");
778
+
};
779
+
780
+
let Some(matching) = service_mini_doc.get(id_fragment, service_type) else {
781
+
return invalid("failed to match identity (and maybe type)");
782
+
};
783
+
784
+
ResolveServiceResponse::Ok(Json(ServiceResponseObject {
785
+
endpoint: matching.endpoint.clone(),
786
+
}))
787
+
}
788
+
675
789
/// com.bad-example.proxy.hydrateQueryResponse
676
790
///
677
791
/// > [!important]
···
687
801
&self,
688
802
Json(payload): Json<ProxyQueryPayload>,
689
803
) -> ProxyHydrateResponse {
690
-
// TODO: the Accept request header, if present, gotta be json
691
-
// TODO: find any Authorization header and verify it. TBD about `aud`.
692
-
693
804
let params = if let Some(p) = payload.params {
694
805
let serde_json::Value::Object(map) = p else {
695
806
panic!("params have to be an object");
696
807
};
697
808
Some(map)
698
-
} else { None };
809
+
} else {
810
+
None
811
+
};
699
812
700
-
match self.proxy.proxy(
701
-
payload.xrpc,
702
-
payload.atproto_proxy,
703
-
params,
704
-
).await {
813
+
let Some((service_did, id_fragment)) = payload.atproto_proxy.rsplit_once("#") else {
814
+
return ProxyHydrateResponse::BadRequest(xrpc_error(
815
+
"BadParameter",
816
+
"atproto_proxy could not be understood",
817
+
));
818
+
};
819
+
820
+
let Ok(service_did) = service_did.parse() else {
821
+
return ProxyHydrateResponse::BadRequest(xrpc_error(
822
+
"BadParameter",
823
+
"atproto_proxy service did could not be parsed",
824
+
));
825
+
};
826
+
827
+
let Ok(xrpc) = payload.xrpc.parse() else {
828
+
return ProxyHydrateResponse::BadRequest(xrpc_error(
829
+
"BadParameter",
830
+
"invalid NSID for xrpc param",
831
+
));
832
+
};
833
+
834
+
match self
835
+
.proxy
836
+
.proxy(
837
+
&service_did,
838
+
&format!("#{id_fragment}"),
839
+
&xrpc,
840
+
payload.authorization.as_deref(),
841
+
payload.atproto_accept_labelers.as_deref(),
842
+
params,
843
+
)
844
+
.await
845
+
{
705
846
Ok(skeleton) => {
706
847
let links = match extract_links(payload.hydration_sources, &skeleton) {
707
848
Ok(l) => l,
708
849
Err(e) => {
709
850
log::warn!("problem extracting: {e:?}");
710
-
return ProxyHydrateResponse::BadRequest(xrpc_error("oop", "sorry, error extracting"))
851
+
return ProxyHydrateResponse::BadRequest(xrpc_error(
852
+
"oop",
853
+
"sorry, error extracting",
854
+
));
711
855
}
712
856
};
713
857
let mut records = HashMap::new();
714
858
let mut identifiers = HashMap::new();
715
859
716
860
enum GetThing {
717
-
Record(String, Hydration<ProxyHydrationRecordFound>),
718
-
Identifier(String, Hydration<ProxyHydrationIdentifierFound>),
861
+
Record(String, Hydration<FoundRecordResponseObject>),
862
+
Identifier(String, Hydration<MiniDocResponseObject>),
719
863
}
720
864
721
865
let (tx, mut rx) = mpsc::channel(1);
722
866
723
-
for link in links {
867
+
let t0 = Instant::now();
868
+
869
+
for (i, link) in links.into_iter().enumerate() {
724
870
match link {
725
-
MatchedRef::AtUri { uri, cid } => {
726
-
if records.contains_key(&uri) {
871
+
MatchedRef::AtUri(parts) => {
872
+
let non_canonical_url = parts.to_uri();
873
+
if records.contains_key(&non_canonical_url) {
727
874
log::warn!("skipping duplicate record without checking cid");
728
875
continue;
729
876
}
730
-
let mut u = url::Url::parse("https://example.com").unwrap();
731
-
u.query_pairs_mut().append_pair("at_uri", &uri); // BLEH todo
732
-
records.insert(uri.clone(), Hydration::Pending(ProxyHydrationPending {
733
-
url: format!("/xrpc/blue.microcosm.repo.getRecordByUri?{}", u.query().unwrap()), // TODO better; with cid, etc.
734
-
}));
877
+
let mut follow_up = self.base_url.clone();
878
+
follow_up.set_path("/xrpc/com.atproto.repo.getRecord");
879
+
follow_up
880
+
.query_pairs_mut()
881
+
.append_pair("repo", &Into::<String>::into(parts.repo.clone()))
882
+
.append_pair("collection", parts.collection.as_str())
883
+
.append_pair("rkey", parts.rkey.as_str());
884
+
if let Some(ref cid) = parts.cid {
885
+
follow_up
886
+
.query_pairs_mut()
887
+
.append_pair("cid", &cid.as_ref().to_string());
888
+
}
889
+
890
+
if i >= 100 {
891
+
records.insert(
892
+
non_canonical_url.clone(),
893
+
Hydration::Pending(ProxyHydrationPending {
894
+
reason: "limit".to_string(),
895
+
follow_up: follow_up.to_string(),
896
+
}),
897
+
);
898
+
continue;
899
+
} else {
900
+
records.insert(
901
+
non_canonical_url.clone(),
902
+
Hydration::Pending(ProxyHydrationPending {
903
+
reason: "deadline".to_string(),
904
+
follow_up: follow_up.to_string(),
905
+
}),
906
+
);
907
+
}
908
+
735
909
let tx = tx.clone();
736
910
let identity = self.identity.clone();
737
911
let repo = self.repo.clone();
738
912
tokio::task::spawn(async move {
739
-
let rest = uri.strip_prefix("at://").unwrap();
740
-
let (identifier, rest) = rest.split_once('/').unwrap();
741
-
let (collection, rkey) = rest.split_once('/').unwrap();
742
-
743
-
let did = if identifier.starts_with("did:") {
744
-
Did::new(identifier.to_string()).unwrap()
745
-
} else {
746
-
let handle = Handle::new(identifier.to_string()).unwrap();
747
-
identity.handle_to_did(handle).await.unwrap().unwrap()
913
+
let FullAtUriParts {
914
+
repo: ident,
915
+
collection,
916
+
rkey,
917
+
cid,
918
+
} = parts;
919
+
let did = match ident {
920
+
AtIdentifier::Did(did) => did,
921
+
AtIdentifier::Handle(handle) => {
922
+
let Ok(Some(did)) = identity.handle_to_did(handle).await
923
+
else {
924
+
let res = Hydration::Error(ProxyHydrationError {
925
+
reason: "could not resolve handle".to_string(),
926
+
should_retry: true,
927
+
follow_up: follow_up.to_string(),
928
+
});
929
+
return if tx
930
+
.send(GetThing::Record(non_canonical_url, res))
931
+
.await
932
+
.is_ok()
933
+
{
934
+
metrics::counter!("slingshot_hydrated_one", "type" => "record", "ontime" => "true").increment(1);
935
+
} else {
936
+
metrics::counter!("slingshot_hydrated_one", "type" => "record", "ontime" => "false").increment(1);
937
+
};
938
+
};
939
+
did
940
+
}
748
941
};
749
942
750
-
let res = match repo.get_record(
751
-
&did,
752
-
&Nsid::new(collection.to_string()).unwrap(),
753
-
&RecordKey::new(rkey.to_string()).unwrap(),
754
-
&cid.as_ref().map(|s| Cid::from_str(s).unwrap()),
755
-
).await {
756
-
Ok(CachedRecord::Deleted) =>
757
-
Hydration::Error(ProxyHydrationError {
758
-
reason: "record deleted".to_string(),
759
-
}),
760
-
Ok(CachedRecord::Found(RawRecord { cid: found_cid, record })) => {
761
-
if let Some(c) = cid && found_cid.as_ref().to_string() != c {
762
-
log::warn!("ignoring cid mismatch");
943
+
let res =
944
+
match repo.get_record(&did, &collection, &rkey, &cid).await {
945
+
Ok(CachedRecord::Deleted) => {
946
+
Hydration::Error(ProxyHydrationError {
947
+
reason: "record deleted".to_string(),
948
+
should_retry: false,
949
+
follow_up: follow_up.to_string(),
950
+
})
763
951
}
764
-
let value = serde_json::from_str(&record).unwrap();
765
-
Hydration::Found(ProxyHydrationRecordFound {
766
-
record: value,
767
-
})
768
-
}
769
-
Err(e) => {
770
-
log::warn!("finally oop {e:?}");
771
-
Hydration::Error(ProxyHydrationError {
772
-
reason: "failed to fetch record".to_string(),
773
-
})
774
-
}
775
-
};
776
-
tx.send(GetThing::Record(uri, res)).await
952
+
Ok(CachedRecord::Found(RawRecord {
953
+
cid: found_cid,
954
+
record,
955
+
})) => {
956
+
if cid
957
+
.as_ref()
958
+
.map(|expected| *expected != found_cid)
959
+
.unwrap_or(false)
960
+
{
961
+
Hydration::Error(ProxyHydrationError {
962
+
reason: "not found".to_string(),
963
+
should_retry: false,
964
+
follow_up: follow_up.to_string(),
965
+
})
966
+
} else if let Ok(value) = serde_json::from_str(&record)
967
+
{
968
+
let canonical_uri = FullAtUriParts {
969
+
repo: AtIdentifier::Did(did),
970
+
collection,
971
+
rkey,
972
+
cid: None, // not used for .to_uri
973
+
}
974
+
.to_uri();
975
+
Hydration::Found(FoundRecordResponseObject {
976
+
cid: Some(found_cid.as_ref().to_string()),
977
+
uri: canonical_uri,
978
+
value,
979
+
})
980
+
} else {
981
+
Hydration::Error(ProxyHydrationError {
982
+
reason: "could not parse upstream response"
983
+
.to_string(),
984
+
should_retry: false,
985
+
follow_up: follow_up.to_string(),
986
+
})
987
+
}
988
+
}
989
+
Err(e) => {
990
+
log::warn!("finally oop {e:?}");
991
+
Hydration::Error(ProxyHydrationError {
992
+
reason: "failed to fetch record".to_string(),
993
+
should_retry: true, // TODO
994
+
follow_up: follow_up.to_string(),
995
+
})
996
+
}
997
+
};
998
+
if tx
999
+
.send(GetThing::Record(non_canonical_url, res))
1000
+
.await
1001
+
.is_ok()
1002
+
{
1003
+
metrics::counter!("slingshot_hydrated_one", "type" => "record", "ontime" => "true").increment(1);
1004
+
} else {
1005
+
metrics::counter!("slingshot_hydrated_one", "type" => "record", "ontime" => "false").increment(1);
1006
+
}
777
1007
});
778
1008
}
779
1009
MatchedRef::Identifier(id) => {
780
-
if identifiers.contains_key(&id) {
1010
+
let identifier: String = id.clone().into();
1011
+
if identifiers.contains_key(&identifier) {
781
1012
continue;
782
1013
}
783
-
let mut u = url::Url::parse("https://example.com").unwrap();
784
-
u.query_pairs_mut().append_pair("identifier", &id);
785
-
identifiers.insert(id, Hydration::Pending(ProxyHydrationPending {
786
-
url: format!("/xrpc/blue.microcosm.identity.resolveMiniDoc?{}", u.query().unwrap()), // gross
787
-
}));
788
-
let tx = tx.clone();
789
-
// let doc_fut = self.resolve_mini_doc();
790
-
tokio::task::spawn(async {
791
1014
1015
+
let mut follow_up = self.base_url.clone();
1016
+
follow_up.set_path("/xrpc/blue.microcosm.identity.resolveMiniDoc");
1017
+
1018
+
follow_up
1019
+
.query_pairs_mut()
1020
+
.append_pair("identifier", &identifier);
1021
+
1022
+
if i >= 100 {
1023
+
identifiers.insert(
1024
+
identifier.clone(),
1025
+
Hydration::Pending(ProxyHydrationPending {
1026
+
reason: "limit".to_string(),
1027
+
follow_up: follow_up.to_string(),
1028
+
}),
1029
+
);
1030
+
continue;
1031
+
} else {
1032
+
identifiers.insert(
1033
+
identifier.clone(),
1034
+
Hydration::Pending(ProxyHydrationPending {
1035
+
reason: "deadline".to_string(),
1036
+
follow_up: follow_up.to_string(),
1037
+
}),
1038
+
);
1039
+
}
1040
+
1041
+
let tx = tx.clone();
1042
+
let identity = self.identity.clone();
1043
+
tokio::task::spawn(async move {
1044
+
let res = match Self::resolve_mini_doc_impl(&identifier, identity)
1045
+
.await
1046
+
{
1047
+
ResolveMiniDocResponse::Ok(Json(mini_doc)) => {
1048
+
Hydration::Found(mini_doc)
1049
+
}
1050
+
ResolveMiniDocResponse::BadRequest(e) => {
1051
+
log::warn!("minidoc fail: {:?}", e.0);
1052
+
Hydration::Error(ProxyHydrationError {
1053
+
reason: "failed to resolve mini doc".to_string(),
1054
+
should_retry: false,
1055
+
follow_up: follow_up.to_string(),
1056
+
})
1057
+
}
1058
+
};
1059
+
if tx.send(GetThing::Identifier(identifier, res)).await.is_ok() {
1060
+
metrics::counter!("slingshot_hydrated_one", "type" => "identity", "ontime" => "true").increment(1);
1061
+
} else {
1062
+
metrics::counter!("slingshot_hydrated_one", "type" => "identity", "ontime" => "false").increment(1);
1063
+
}
792
1064
});
793
1065
}
794
1066
}
···
797
1069
// (we shoudl be doing a timeout...)
798
1070
drop(tx);
799
1071
800
-
while let Some(hydration) = rx.recv().await {
801
-
match hydration {
802
-
GetThing::Record(uri, h) => { records.insert(uri, h); }
803
-
GetThing::Identifier(uri, md) => { identifiers.insert(uri, md); }
804
-
};
1072
+
let deadline = t0 + std::time::Duration::from_secs_f64(1.6);
1073
+
let res = tokio::time::timeout_at(deadline.into(), async {
1074
+
while let Some(hydration) = rx.recv().await {
1075
+
match hydration {
1076
+
GetThing::Record(uri, h) => {
1077
+
if let Some(r) = records.get_mut(&uri) {
1078
+
match (&r, &h) {
1079
+
(_, Hydration::Found(_)) => *r = h, // always replace if found
1080
+
(Hydration::Pending(_), _) => *r = h, // or if it was pending
1081
+
_ => {} // else leave it
1082
+
}
1083
+
} else {
1084
+
records.insert(uri, h);
1085
+
}
1086
+
}
1087
+
GetThing::Identifier(identifier, md) => {
1088
+
identifiers.insert(identifier.to_string(), md);
1089
+
}
1090
+
};
1091
+
}
1092
+
})
1093
+
.await;
1094
+
1095
+
if res.is_ok() {
1096
+
metrics::histogram!("slingshot_hydration_all_completed").record(t0.elapsed());
1097
+
} else {
1098
+
metrics::counter!("slingshot_hydration_cut_off").increment(1);
805
1099
}
806
1100
807
1101
ProxyHydrateResponse::Ok(Json(ProxyHydrateResponseObject {
···
815
1109
ProxyHydrateResponse::BadRequest(xrpc_error("oop", "sorry"))
816
1110
}
817
1111
}
818
-
819
1112
}
820
1113
821
1114
async fn get_record_impl(
822
1115
&self,
823
-
repo: String,
824
-
collection: String,
825
-
rkey: String,
826
-
cid: Option<String>,
1116
+
repo: &str,
1117
+
collection: &str,
1118
+
rkey: &str,
1119
+
cid: Option<&str>,
827
1120
) -> GetRecordResponse {
828
-
let did = match Did::new(repo.clone()) {
1121
+
let did = match Did::new(repo.to_string()) {
829
1122
Ok(did) => did,
830
1123
Err(_) => {
831
1124
let Ok(handle) = Handle::new(repo.to_lowercase()) else {
···
856
1149
}
857
1150
};
858
1151
859
-
let Ok(collection) = Nsid::new(collection) else {
1152
+
let Ok(collection) = Nsid::new(collection.to_string()) else {
860
1153
return GetRecordResponse::BadRequest(xrpc_error(
861
1154
"InvalidRequest",
862
1155
"Invalid NSID for collection",
863
1156
));
864
1157
};
865
1158
866
-
let Ok(rkey) = RecordKey::new(rkey) else {
1159
+
let Ok(rkey) = RecordKey::new(rkey.to_string()) else {
867
1160
return GetRecordResponse::BadRequest(xrpc_error("InvalidRequest", "Invalid rkey"));
868
1161
};
869
1162
870
1163
let cid: Option<Cid> = if let Some(cid) = cid {
871
-
let Ok(cid) = Cid::from_str(&cid) else {
1164
+
let Ok(cid) = Cid::from_str(cid) else {
872
1165
return GetRecordResponse::BadRequest(xrpc_error("InvalidRequest", "Invalid CID"));
873
1166
};
874
1167
Some(cid)
···
1017
1310
identity: Identity,
1018
1311
repo: Repo,
1019
1312
proxy: Proxy,
1313
+
base_url: url::Url,
1020
1314
acme_domain: Option<String>,
1021
1315
acme_contact: Option<String>,
1022
1316
acme_cache_path: Option<PathBuf>,
···
1028
1322
let proxy = Arc::new(proxy);
1029
1323
let api_service = OpenApiService::new(
1030
1324
Xrpc {
1325
+
base_url,
1031
1326
cache,
1032
1327
identity,
1033
1328
proxy,
+1
-7
constellation/src/bin/main.rs
+1
-7
constellation/src/bin/main.rs
···
45
45
#[arg(short, long)]
46
46
#[clap(value_enum, default_value_t = StorageBackend::Memory)]
47
47
backend: StorageBackend,
48
-
/// Serve a did:web document for this domain
49
-
#[arg(long)]
50
-
did_web_domain: Option<String>,
51
48
/// Initiate a database backup into this dir, if supported by the storage
52
49
#[arg(long)]
53
50
backup: Option<PathBuf>,
···
106
103
MemStorage::new(),
107
104
fixture,
108
105
None,
109
-
args.did_web_domain,
110
106
stream,
111
107
bind,
112
108
metrics_bind,
···
142
138
rocks,
143
139
fixture,
144
140
args.data,
145
-
args.did_web_domain,
146
141
stream,
147
142
bind,
148
143
metrics_bind,
···
164
159
mut storage: impl LinkStorage,
165
160
fixture: Option<PathBuf>,
166
161
data_dir: Option<PathBuf>,
167
-
did_web_domain: Option<String>,
168
162
stream: String,
169
163
bind: SocketAddr,
170
164
metrics_bind: SocketAddr,
···
217
211
if collect_metrics {
218
212
install_metrics_server(metrics_bind)?;
219
213
}
220
-
serve(readable, bind, did_web_domain, staying_alive).await
214
+
serve(readable, bind, staying_alive).await
221
215
})
222
216
.unwrap();
223
217
stay_alive.drop_guard();
+7
-32
constellation/src/server/mod.rs
+7
-32
constellation/src/server/mod.rs
···
3
3
extract::{Query, Request},
4
4
http::{self, header},
5
5
middleware::{self, Next},
6
-
response::{IntoResponse, Json, Response},
6
+
response::{IntoResponse, Response},
7
7
routing::get,
8
8
Router,
9
9
};
···
37
37
http::StatusCode::INTERNAL_SERVER_ERROR
38
38
}
39
39
40
-
pub async fn serve<S: LinkReader, A: ToSocketAddrs>(
41
-
store: S,
42
-
addr: A,
43
-
did_web_domain: Option<String>,
44
-
stay_alive: CancellationToken,
45
-
) -> anyhow::Result<()> {
46
-
let mut app = Router::new();
47
-
48
-
if let Some(d) = did_web_domain {
49
-
app = app.route(
50
-
"/.well-known/did.json",
51
-
get({
52
-
let domain = d.clone();
53
-
move || did_web(domain)
54
-
}),
55
-
)
56
-
}
57
-
58
-
let app = app
40
+
pub async fn serve<S, A>(store: S, addr: A, stay_alive: CancellationToken) -> anyhow::Result<()>
41
+
where
42
+
S: LinkReader,
43
+
A: ToSocketAddrs,
44
+
{
45
+
let app = Router::new()
59
46
.route("/robots.txt", get(robots))
60
47
.route(
61
48
"/",
···
217
204
User-agent: *
218
205
Disallow: /links
219
206
Disallow: /links/
220
-
Disallow: /xrpc/
221
207
"
222
-
}
223
-
224
-
async fn did_web(domain: String) -> impl IntoResponse {
225
-
Json(serde_json::json!({
226
-
"id": format!("did:web:{domain}"),
227
-
"service": [{
228
-
"id": "#constellation",
229
-
"type": "ConstellationGraphService",
230
-
"serviceEndpoint": format!("https://{domain}")
231
-
}]
232
-
}))
233
208
}
234
209
235
210
#[derive(Template, Serialize, Deserialize)]
+208
-16
slingshot/src/identity.rs
+208
-16
slingshot/src/identity.rs
···
17
17
18
18
use crate::error::IdentityError;
19
19
use atrium_api::{
20
-
did_doc::DidDocument,
20
+
did_doc::{DidDocument, Service as DidDocServic},
21
21
types::string::{Did, Handle},
22
22
};
23
23
use atrium_common::resolver::Resolver;
···
41
41
pub enum IdentityKey {
42
42
Handle(Handle),
43
43
Did(Did),
44
+
ServiceDid(Did),
44
45
}
45
46
46
47
impl IdentityKey {
···
48
49
let s = match self {
49
50
IdentityKey::Handle(h) => h.as_str(),
50
51
IdentityKey::Did(d) => d.as_str(),
52
+
IdentityKey::ServiceDid(d) => d.as_str(),
51
53
};
52
54
std::mem::size_of::<Self>() + std::mem::size_of_val(s)
53
55
}
···
59
61
#[derive(Debug, Serialize, Deserialize)]
60
62
enum IdentityData {
61
63
NotFound,
62
-
Did(Did),
63
-
Doc(PartialMiniDoc),
64
+
Did(Did), // from handle
65
+
Doc(PartialMiniDoc), // from did
66
+
ServiceDoc(MiniServiceDoc), // from service did
64
67
}
65
68
66
69
impl IdentityVal {
···
71
74
IdentityData::Did(d) => std::mem::size_of_val(d.as_str()),
72
75
IdentityData::Doc(d) => {
73
76
std::mem::size_of_val(d.unverified_handle.as_str())
74
-
+ std::mem::size_of_val(d.pds.as_str())
75
-
+ std::mem::size_of_val(d.signing_key.as_str())
77
+
+ std::mem::size_of_val(&d.pds)
78
+
+ std::mem::size_of_val(&d.signing_key)
79
+
}
80
+
IdentityData::ServiceDoc(d) => {
81
+
let mut s = std::mem::size_of::<MiniServiceDoc>();
82
+
s += std::mem::size_of_val(&d.services);
83
+
for sv in &d.services {
84
+
s += std::mem::size_of_val(&sv.full_id);
85
+
s += std::mem::size_of_val(&sv.r#type);
86
+
s += std::mem::size_of_val(&sv.endpoint);
87
+
}
88
+
s
76
89
}
77
90
};
78
91
wrapping + inner
···
168
181
}
169
182
}
170
183
184
+
/// Simplified info from service DID docs
185
+
#[derive(Debug, Clone, Serialize, Deserialize)]
186
+
pub struct MiniServiceDoc {
187
+
services: Vec<MiniService>,
188
+
}
189
+
190
+
impl MiniServiceDoc {
191
+
pub fn get(&self, id_fragment: &str, service_type: Option<&str>) -> Option<&MiniService> {
192
+
self.services.iter().find(|ms| {
193
+
ms.full_id.ends_with(id_fragment)
194
+
&& service_type.map(|t| t == ms.r#type).unwrap_or(true)
195
+
})
196
+
}
197
+
}
198
+
199
+
/// The corresponding service info
200
+
#[derive(Debug, Clone, Serialize, Deserialize)]
201
+
pub struct MiniService {
202
+
/// The full id
203
+
///
204
+
/// for informational purposes only -- services are deduplicated by id fragment
205
+
full_id: String,
206
+
r#type: String,
207
+
/// HTTP endpoint for the actual service
208
+
pub endpoint: String,
209
+
}
210
+
211
+
impl TryFrom<DidDocument> for MiniServiceDoc {
212
+
type Error = String;
213
+
fn try_from(did_doc: DidDocument) -> Result<Self, Self::Error> {
214
+
let mut services = Vec::new();
215
+
let mut seen = HashSet::new();
216
+
217
+
for DidDocServic {
218
+
id,
219
+
r#type,
220
+
service_endpoint,
221
+
} in did_doc.service.unwrap_or(vec![])
222
+
{
223
+
let Some((_, id_fragment)) = id.rsplit_once('#') else {
224
+
continue;
225
+
};
226
+
if !seen.insert((id_fragment.to_string(), r#type.clone())) {
227
+
continue;
228
+
}
229
+
services.push(MiniService {
230
+
full_id: id,
231
+
r#type,
232
+
endpoint: service_endpoint,
233
+
});
234
+
}
235
+
236
+
Ok(Self { services })
237
+
}
238
+
}
239
+
171
240
/// multi-producer *single-consumer* queue structures (wrap in arc-mutex plz)
172
241
///
173
242
/// the hashset allows testing for presense of items in the queue.
···
296
365
let now = UtcDateTime::now();
297
366
let IdentityVal(last_fetch, data) = entry.value();
298
367
match data {
299
-
IdentityData::Doc(_) => {
300
-
log::error!("identity value mixup: got a doc from a handle key (should be a did)");
301
-
Err(IdentityError::IdentityValTypeMixup(handle.to_string()))
302
-
}
303
368
IdentityData::NotFound => {
304
369
if (now - *last_fetch) >= MIN_NOT_FOUND_TTL {
305
370
metrics::counter!("identity_handle_refresh_queued", "reason" => "ttl", "found" => "false").increment(1);
···
313
378
self.queue_refresh(key).await;
314
379
}
315
380
Ok(Some(did.clone()))
381
+
}
382
+
_ => {
383
+
log::error!("identity value mixup: got a doc from a handle key (should be a did)");
384
+
Err(IdentityError::IdentityValTypeMixup(handle.to_string()))
316
385
}
317
386
}
318
387
}
···
362
431
let now = UtcDateTime::now();
363
432
let IdentityVal(last_fetch, data) = entry.value();
364
433
match data {
365
-
IdentityData::Did(_) => {
366
-
log::error!("identity value mixup: got a did from a did key (should be a doc)");
367
-
Err(IdentityError::IdentityValTypeMixup(did.to_string()))
368
-
}
369
434
IdentityData::NotFound => {
370
435
if (now - *last_fetch) >= MIN_NOT_FOUND_TTL {
371
436
metrics::counter!("identity_did_refresh_queued", "reason" => "ttl", "found" => "false").increment(1);
···
373
438
}
374
439
Ok(None)
375
440
}
376
-
IdentityData::Doc(mini_did) => {
441
+
IdentityData::Doc(mini_doc) => {
377
442
if (now - *last_fetch) >= MIN_TTL {
378
443
metrics::counter!("identity_did_refresh_queued", "reason" => "ttl", "found" => "true").increment(1);
379
444
self.queue_refresh(key).await;
380
445
}
381
-
Ok(Some(mini_did.clone()))
446
+
Ok(Some(mini_doc.clone()))
447
+
}
448
+
_ => {
449
+
log::error!("identity value mixup: got a doc from a handle key (should be a did)");
450
+
Err(IdentityError::IdentityValTypeMixup(did.to_string()))
451
+
}
452
+
}
453
+
}
454
+
455
+
/// Fetch (and cache) a service mini doc from a did
456
+
pub async fn did_to_mini_service_doc(
457
+
&self,
458
+
did: &Did,
459
+
) -> Result<Option<MiniServiceDoc>, IdentityError> {
460
+
let key = IdentityKey::ServiceDid(did.clone());
461
+
metrics::counter!("slingshot_get_service_did_doc").increment(1);
462
+
let entry = self
463
+
.cache
464
+
.get_or_fetch(&key, {
465
+
let did = did.clone();
466
+
let resolver = self.did_resolver.clone();
467
+
|| async move {
468
+
let t0 = Instant::now();
469
+
let (res, success) = match resolver.resolve(&did).await {
470
+
Ok(did_doc) if did_doc.id != did.to_string() => (
471
+
// TODO: fix in atrium: should verify id is did
472
+
Err(IdentityError::BadDidDoc(
473
+
"did doc's id did not match did".to_string(),
474
+
)),
475
+
"false",
476
+
),
477
+
Ok(did_doc) => match did_doc.try_into() {
478
+
Ok(mini_service_doc) => (
479
+
Ok(IdentityVal(
480
+
UtcDateTime::now(),
481
+
IdentityData::ServiceDoc(mini_service_doc),
482
+
)),
483
+
"true",
484
+
),
485
+
Err(e) => (Err(IdentityError::BadDidDoc(e)), "false"),
486
+
},
487
+
Err(atrium_identity::Error::NotFound) => (
488
+
Ok(IdentityVal(UtcDateTime::now(), IdentityData::NotFound)),
489
+
"false",
490
+
),
491
+
Err(other) => (Err(IdentityError::ResolutionFailed(other)), "false"),
492
+
};
493
+
metrics::histogram!("slingshot_fetch_service_did_doc", "success" => success)
494
+
.record(t0.elapsed());
495
+
res
496
+
}
497
+
})
498
+
.await?;
499
+
500
+
let now = UtcDateTime::now();
501
+
let IdentityVal(last_fetch, data) = entry.value();
502
+
match data {
503
+
IdentityData::NotFound => {
504
+
if (now - *last_fetch) >= MIN_NOT_FOUND_TTL {
505
+
metrics::counter!("identity_service_did_refresh_queued", "reason" => "ttl", "found" => "false").increment(1);
506
+
self.queue_refresh(key).await;
507
+
}
508
+
Ok(None)
509
+
}
510
+
IdentityData::ServiceDoc(mini_service_doc) => {
511
+
if (now - *last_fetch) >= MIN_TTL {
512
+
metrics::counter!("identity_service_did_refresh_queued", "reason" => "ttl", "found" => "true").increment(1);
513
+
self.queue_refresh(key).await;
514
+
}
515
+
Ok(Some(mini_service_doc.clone()))
516
+
}
517
+
_ => {
518
+
log::error!(
519
+
"identity value mixup: got a doc from a different key type (should be a service did)"
520
+
);
521
+
Err(IdentityError::IdentityValTypeMixup(did.to_string()))
382
522
}
383
523
}
384
524
}
···
519
659
log::warn!(
520
660
"refreshed did doc failed: wrong did doc id. dropping refresh."
521
661
);
662
+
self.complete_refresh(&task_key).await?;
522
663
continue;
523
664
}
524
665
let mini_doc = match did_doc.try_into() {
···
526
667
Err(e) => {
527
668
metrics::counter!("identity_did_refresh", "success" => "false", "reason" => "bad doc").increment(1);
528
669
log::warn!(
529
-
"converting mini doc failed: {e:?}. dropping refresh."
670
+
"converting mini doc for {did:?} failed: {e:?}. dropping refresh."
530
671
);
672
+
self.complete_refresh(&task_key).await?;
531
673
continue;
532
674
}
533
675
};
···
554
696
}
555
697
556
698
self.complete_refresh(&task_key).await?; // failures are bugs, so break loop
699
+
}
700
+
IdentityKey::ServiceDid(ref did) => {
701
+
log::trace!("refreshing service did doc: {did:?}");
702
+
703
+
match self.did_resolver.resolve(did).await {
704
+
Ok(did_doc) => {
705
+
// TODO: fix in atrium: should verify id is did
706
+
if did_doc.id != did.to_string() {
707
+
metrics::counter!("identity_service_did_refresh", "success" => "false", "reason" => "wrong did").increment(1);
708
+
log::warn!(
709
+
"refreshed did doc failed: wrong did doc id. dropping refresh."
710
+
);
711
+
self.complete_refresh(&task_key).await?;
712
+
continue;
713
+
}
714
+
let mini_service_doc = match did_doc.try_into() {
715
+
Ok(md) => md,
716
+
Err(e) => {
717
+
metrics::counter!("identity_service_did_refresh", "success" => "false", "reason" => "bad doc").increment(1);
718
+
log::warn!(
719
+
"converting mini service doc failed: {e:?}. dropping refresh."
720
+
);
721
+
self.complete_refresh(&task_key).await?;
722
+
continue;
723
+
}
724
+
};
725
+
metrics::counter!("identity_service_did_refresh", "success" => "true")
726
+
.increment(1);
727
+
self.cache.insert(
728
+
task_key.clone(),
729
+
IdentityVal(
730
+
UtcDateTime::now(),
731
+
IdentityData::ServiceDoc(mini_service_doc),
732
+
),
733
+
);
734
+
}
735
+
Err(atrium_identity::Error::NotFound) => {
736
+
metrics::counter!("identity_service_did_refresh", "success" => "false", "reason" => "not found").increment(1);
737
+
self.cache.insert(
738
+
task_key.clone(),
739
+
IdentityVal(UtcDateTime::now(), IdentityData::NotFound),
740
+
);
741
+
}
742
+
Err(err) => {
743
+
metrics::counter!("identity_service_did_refresh", "success" => "false", "reason" => "other").increment(1);
744
+
log::warn!(
745
+
"failed to refresh did doc: {err:?}. leaving stale (should we eventually do something?)"
746
+
);
747
+
}
748
+
}
557
749
}
558
750
}
559
751
}
History
2 rounds
2 comments
bad-example.com
submitted
#1
8 commits
expand
collapse
very hack proxy stuff (wip)
identities!!!!
service doc cache
proxy: use service cache
fmt
include headers, timeout hydration, measure times
base_url, typed url parts, api fixups,
only try to hydrate 100 records
no conflicts, ready to merge
expand 0 comments
bad-example.com
submitted
#0
1 commit
expand
collapse
very hack proxy stuff (wip)
the record contents from
com.atproto.repo.getRecordare returned under a key called "value", so this should probably use that.