+2
-2
Cargo.lock
+2
-2
Cargo.lock
slingshot/Cargo.toml
slingshot/Cargo.toml
This file has not been changed.
+8
slingshot/src/error.rs
+8
slingshot/src/error.rs
···
100
UrlParseError(#[from] url::ParseError),
101
#[error(transparent)]
102
ReqwestError(#[from] reqwest::Error),
103
+
#[error(transparent)]
104
+
InvalidHeader(#[from] reqwest::header::InvalidHeaderValue),
105
+
#[error(transparent)]
106
+
IdentityError(#[from] IdentityError),
107
+
#[error("upstream service could not be resolved")]
108
+
ServiceNotFound,
109
+
#[error("upstream service was found but no services matched")]
110
+
ServiceNotMatched,
111
}
slingshot/src/lib.rs
slingshot/src/lib.rs
This file has not been changed.
+20
-6
slingshot/src/main.rs
+20
-6
slingshot/src/main.rs
···
1
-
// use foyer::HybridCache;
2
-
// use foyer::{Engine, DirectFsDeviceOptions, HybridCacheBuilder};
3
use metrics_exporter_prometheus::PrometheusBuilder;
4
use slingshot::{
5
Identity, Proxy, Repo, consume, error::MainTaskError, firehose_cache, healthcheck, serve,
···
9
10
use clap::Parser;
11
use tokio_util::sync::CancellationToken;
12
13
/// Slingshot record edge cache
14
#[derive(Parser, Debug, Clone)]
···
48
#[arg(long, env = "SLINGSHOT_IDENTITY_CACHE_DISK_DB")]
49
#[clap(default_value_t = 1)]
50
identity_cache_disk_gb: usize,
51
/// the domain pointing to this server
52
///
53
/// if present:
···
101
102
let args = Args::parse();
103
104
if args.collect_metrics {
105
log::trace!("installing metrics server...");
106
if let Err(e) = install_metrics_server(args.bind_metrics) {
···
152
log::info!("identity service ready.");
153
154
let repo = Repo::new(identity.clone());
155
-
let proxy = Proxy::new(repo.clone());
156
157
let identity_for_server = identity.clone();
158
let server_shutdown = shutdown.clone();
···
164
identity_for_server,
165
repo,
166
proxy,
167
args.acme_domain,
168
args.acme_contact,
169
args.acme_cache_path,
···
236
) -> Result<(), metrics_exporter_prometheus::BuildError> {
237
log::info!("installing metrics server...");
238
PrometheusBuilder::new()
239
-
.set_quantiles(&[0.5, 0.9, 0.99, 1.0])?
240
-
.set_bucket_duration(std::time::Duration::from_secs(300))?
241
-
.set_bucket_count(std::num::NonZero::new(12).unwrap()) // count * duration = 60 mins. stuff doesn't happen that fast here.
242
.set_enable_unit_suffix(false) // this seemed buggy for constellation (sometimes wouldn't engage)
243
.with_http_listener(bind_metrics)
244
.install()?;
···
1
use metrics_exporter_prometheus::PrometheusBuilder;
2
use slingshot::{
3
Identity, Proxy, Repo, consume, error::MainTaskError, firehose_cache, healthcheck, serve,
···
7
8
use clap::Parser;
9
use tokio_util::sync::CancellationToken;
10
+
use url::Url;
11
12
/// Slingshot record edge cache
13
#[derive(Parser, Debug, Clone)]
···
47
#[arg(long, env = "SLINGSHOT_IDENTITY_CACHE_DISK_DB")]
48
#[clap(default_value_t = 1)]
49
identity_cache_disk_gb: usize,
50
+
/// the address of this server
51
+
///
52
+
/// used if --acme-domain is not set, defaulting to `--bind`
53
+
#[arg(long, conflicts_with("acme_domain"), env = "SLINGSHOT_PUBLIC_HOST")]
54
+
base_url: Option<Url>,
55
/// the domain pointing to this server
56
///
57
/// if present:
···
105
106
let args = Args::parse();
107
108
+
let base_url: Url = args
109
+
.base_url
110
+
.or_else(|| {
111
+
args.acme_domain
112
+
.as_ref()
113
+
.map(|d| Url::parse(&format!("https://{d}")).unwrap())
114
+
})
115
+
.unwrap_or_else(|| Url::parse(&format!("http://{}", args.bind)).unwrap());
116
+
117
if args.collect_metrics {
118
log::trace!("installing metrics server...");
119
if let Err(e) = install_metrics_server(args.bind_metrics) {
···
165
log::info!("identity service ready.");
166
167
let repo = Repo::new(identity.clone());
168
+
let proxy = Proxy::new(identity.clone());
169
170
let identity_for_server = identity.clone();
171
let server_shutdown = shutdown.clone();
···
177
identity_for_server,
178
repo,
179
proxy,
180
+
base_url,
181
args.acme_domain,
182
args.acme_contact,
183
args.acme_cache_path,
···
250
) -> Result<(), metrics_exporter_prometheus::BuildError> {
251
log::info!("installing metrics server...");
252
PrometheusBuilder::new()
253
+
.set_buckets(&[0.001, 0.006, 0.036, 0.216, 1.296, 7.776, 45.656])?
254
+
.set_bucket_duration(std::time::Duration::from_secs(15))?
255
+
.set_bucket_count(std::num::NonZero::new(4).unwrap()) // count * duration = bucket lifetime
256
.set_enable_unit_suffix(false) // this seemed buggy for constellation (sometimes wouldn't engage)
257
.with_http_listener(bind_metrics)
258
.install()?;
+247
-133
slingshot/src/proxy.rs
+247
-133
slingshot/src/proxy.rs
···
1
-
use serde::Deserialize;
2
-
use url::Url;
3
-
use std::{collections::HashMap, time::Duration};
4
-
use crate::{Repo, server::HydrationSource, error::ProxyError};
5
use reqwest::Client;
6
use serde_json::{Map, Value};
7
8
pub enum ParamValue {
9
String(Vec<String>),
···
13
pub struct Params(HashMap<String, ParamValue>);
14
15
impl TryFrom<Map<String, Value>> for Params {
16
-
type Error = (); // TODO
17
fn try_from(val: Map<String, Value>) -> Result<Self, Self::Error> {
18
let mut out = HashMap::new();
19
for (k, v) in val {
···
70
71
#[derive(Clone)]
72
pub struct Proxy {
73
-
repo: Repo,
74
client: Client,
75
}
76
77
impl Proxy {
78
-
pub fn new(repo: Repo) -> Self {
79
let client = Client::builder()
80
.user_agent(format!(
81
"microcosm slingshot v{} (contact: @bad-example.com)",
···
85
.timeout(Duration::from_secs(6))
86
.build()
87
.unwrap();
88
-
Self { repo, client }
89
}
90
91
pub async fn proxy(
92
&self,
93
-
xrpc: String,
94
-
service: String,
95
params: Option<Map<String, Value>>,
96
) -> Result<Value, ProxyError> {
97
-
98
-
// hackin it to start
99
-
100
-
// 1. assume did-web (TODO) and get the did doc
101
-
#[derive(Debug, Deserialize)]
102
-
struct ServiceDoc {
103
-
id: String,
104
-
service: Vec<ServiceItem>,
105
-
}
106
-
#[derive(Debug, Deserialize)]
107
-
struct ServiceItem {
108
-
id: String,
109
-
#[expect(unused)]
110
-
r#type: String,
111
-
#[serde(rename = "serviceEndpoint")]
112
-
service_endpoint: Url,
113
-
}
114
-
let dw = service.strip_prefix("did:web:").expect("a did web");
115
-
let (dw, service_id) = dw.split_once("#").expect("whatever");
116
-
let mut dw_url = Url::parse(&format!("https://{dw}"))?;
117
-
dw_url.set_path("/.well-known/did.json");
118
-
let doc: ServiceDoc = self.client
119
-
.get(dw_url)
120
-
.send()
121
.await?
122
-
.error_for_status()?
123
-
.json()
124
-
.await?;
125
126
-
assert_eq!(doc.id, format!("did:web:{}", dw));
127
128
-
let mut upstream = None;
129
-
for ServiceItem { id, service_endpoint, .. } in doc.service {
130
-
let Some((_, id)) = id.split_once("#") else { continue; };
131
-
if id != service_id { continue; };
132
-
upstream = Some(service_endpoint);
133
-
break;
134
-
}
135
-
136
-
// 2. proxy the request forward
137
-
let mut upstream = upstream.expect("to find it");
138
-
upstream.set_path(&format!("/xrpc/{xrpc}")); // TODO: validate nsid
139
-
140
if let Some(params) = params {
141
let mut query = upstream.query_pairs_mut();
142
let Params(ps) = params.try_into().expect("valid params");
···
161
}
162
}
163
164
-
// TODO: other headers to proxy
165
-
Ok(self.client
166
.get(upstream)
167
.send()
168
-
.await?
169
-
.error_for_status()?
170
-
.json()
171
-
.await?)
172
}
173
}
174
···
188
while let Some((i, c)) = chars.next() {
189
match c {
190
'[' if in_bracket => return Err(format!("nested opening bracket not allowed, at {i}")),
191
-
'[' if key_acc.is_empty() => return Err(format!("missing key before opening bracket, at {i}")),
192
'[' => in_bracket = true,
193
']' if in_bracket => {
194
in_bracket = false;
195
let key = std::mem::take(&mut key_acc);
196
let r#type = std::mem::take(&mut type_acc);
197
-
let t = if r#type.is_empty() { None } else { Some(r#type) };
198
out.push(PathPart::Vector(key, t));
199
// peek ahead because we need a dot after array if there's more and i don't want to add more loop state
200
let Some((i, c)) = chars.next() else {
201
break;
202
};
203
if c != '.' {
204
-
return Err(format!("expected dot after close bracket, found {c:?} at {i}"));
205
}
206
}
207
']' => return Err(format!("unexpected close bracket at {i}")),
208
'.' if in_bracket => type_acc.push(c),
209
-
'.' if key_acc.is_empty() => return Err(format!("missing key before next segment, at {i}")),
210
'.' => {
211
let key = std::mem::take(&mut key_acc);
212
assert!(type_acc.is_empty());
···
251
}
252
253
#[derive(Debug, PartialEq)]
254
pub enum MatchedRef {
255
-
AtUri {
256
-
uri: String,
257
-
cid: Option<String>,
258
-
},
259
-
Identifier(String),
260
}
261
262
pub fn match_shape(shape: RefShape, val: &Value) -> Option<MatchedRef> {
···
268
RefShape::StrongRef => {
269
let o = val.as_object()?;
270
let uri = o.get("uri")?.as_str()?.to_string();
271
-
let cid = o.get("cid")?.as_str()?.to_string();
272
-
Some(MatchedRef::AtUri { uri, cid: Some(cid) })
273
}
274
RefShape::AtUri => {
275
let uri = val.as_str()?.to_string();
276
-
Some(MatchedRef::AtUri { uri, cid: None })
277
}
278
RefShape::AtUriParts => {
279
let o = val.as_object()?;
280
-
let identifier = o.get("repo").or(o.get("did"))?.as_str()?.to_string();
281
-
let collection = o.get("collection")?.as_str()?.to_string();
282
-
let rkey = o.get("rkey")?.as_str()?.to_string();
283
-
let uri = format!("at://{identifier}/{collection}/{rkey}");
284
-
let cid = o.get("cid").and_then(|v| v.as_str()).map(str::to_string);
285
-
Some(MatchedRef::AtUri { uri, cid })
286
}
287
RefShape::Did => {
288
-
let id = val.as_str()?;
289
-
if !id.starts_with("did:") {
290
-
return None;
291
-
}
292
-
Some(MatchedRef::Identifier(id.to_string()))
293
}
294
RefShape::Handle => {
295
-
let id = val.as_str()?;
296
-
if id.contains(':') {
297
-
return None;
298
-
}
299
-
Some(MatchedRef::Identifier(id.to_string()))
300
}
301
RefShape::AtIdentifier => {
302
-
Some(MatchedRef::Identifier(val.as_str()?.to_string()))
303
}
304
}
305
}
···
339
}
340
impl<'a> PathWalker<'a> {
341
fn new(path_parts: &'a [PathPart], skeleton: &'a Value) -> Self {
342
-
Self { todo: vec![(path_parts, skeleton)] }
343
}
344
}
345
impl<'a> Iterator for PathWalker<'a> {
···
364
let Some(a) = o.get(k).and_then(|v| v.as_array()) else {
365
continue;
366
};
367
-
for v in a
368
-
.iter()
369
-
.rev()
370
-
.filter(|c| {
371
-
let Some(t) = t else { return true };
372
-
c
373
-
.as_object()
374
-
.and_then(|o| o.get("$type"))
375
-
.and_then(|v| v.as_str())
376
-
.map(|s| s == t)
377
-
.unwrap_or(false)
378
-
})
379
-
{
380
self.todo.push((rest, v))
381
}
382
}
···
385
}
386
}
387
388
-
389
#[cfg(test)]
390
mod tests {
391
use super::*;
392
use serde_json::json;
393
394
#[test]
395
fn test_parse_record_path() -> Result<(), Box<dyn std::error::Error>> {
396
let cases = [
397
("", vec![]),
398
("subject", vec![PathPart::Scalar("subject".into())]),
399
("authorDid", vec![PathPart::Scalar("authorDid".into())]),
400
-
("subject.uri", vec![PathPart::Scalar("subject".into()), PathPart::Scalar("uri".into())]),
401
("members[]", vec![PathPart::Vector("members".into(), None)]),
402
-
("add[].key", vec![
403
-
PathPart::Vector("add".into(), None),
404
-
PathPart::Scalar("key".into()),
405
-
]),
406
("a[b]", vec![PathPart::Vector("a".into(), Some("b".into()))]),
407
-
("a[b.c]", vec![PathPart::Vector("a".into(), Some("b.c".into()))]),
408
-
("facets[app.bsky.richtext.facet].features[app.bsky.richtext.facet#mention].did", vec![
409
-
PathPart::Vector("facets".into(), Some("app.bsky.richtext.facet".into())),
410
-
PathPart::Vector("features".into(), Some("app.bsky.richtext.facet#mention".into())),
411
-
PathPart::Scalar("did".into()),
412
-
]),
413
];
414
415
for (path, expected) in cases {
···
426
("strong-ref", json!(""), None),
427
("strong-ref", json!({}), None),
428
("strong-ref", json!({ "uri": "abc" }), None),
429
-
("strong-ref", json!({ "cid": "def" }), None),
430
(
431
"strong-ref",
432
-
json!({ "uri": "abc", "cid": "def" }),
433
-
Some(MatchedRef::AtUri { uri: "abc".to_string(), cid: Some("def".to_string()) }),
434
),
435
("at-uri", json!({ "uri": "abc" }), None),
436
-
("at-uri", json!({ "uri": "abc", "cid": "def" }), None),
437
(
438
"at-uri",
439
-
json!("abc"),
440
-
Some(MatchedRef::AtUri { uri: "abc".to_string(), cid: None }),
441
),
442
("at-uri-parts", json!("abc"), None),
443
("at-uri-parts", json!({}), None),
444
(
445
"at-uri-parts",
446
-
json!({"repo": "a", "collection": "b", "rkey": "c"}),
447
-
Some(MatchedRef::AtUri { uri: "at://a/b/c".to_string(), cid: None }),
448
),
449
(
450
"at-uri-parts",
451
-
json!({"did": "a", "collection": "b", "rkey": "c"}),
452
-
Some(MatchedRef::AtUri { uri: "at://a/b/c".to_string(), cid: None }),
453
),
454
(
455
"at-uri-parts",
456
// 'repo' takes precedence over 'did'
457
-
json!({"did": "a", "repo": "z", "collection": "b", "rkey": "c"}),
458
-
Some(MatchedRef::AtUri { uri: "at://z/b/c".to_string(), cid: None }),
459
),
460
(
461
"at-uri-parts",
462
-
json!({"repo": "a", "collection": "b", "rkey": "c", "cid": "def"}),
463
-
Some(MatchedRef::AtUri { uri: "at://a/b/c".to_string(), cid: Some("def".to_string()) }),
464
),
465
(
466
"at-uri-parts",
467
-
json!({"repo": "a", "collection": "b", "rkey": "c", "cid": {}}),
468
-
Some(MatchedRef::AtUri { uri: "at://a/b/c".to_string(), cid: None }),
469
),
470
("did", json!({}), None),
471
("did", json!(""), None),
472
("did", json!("bad-example.com"), None),
473
-
("did", json!("did:plc:xyz"), Some(MatchedRef::Identifier("did:plc:xyz".to_string()))),
474
("handle", json!({}), None),
475
-
("handle", json!("bad-example.com"), Some(MatchedRef::Identifier("bad-example.com".to_string()))),
476
("handle", json!("did:plc:xyz"), None),
477
("at-identifier", json!({}), None),
478
-
("at-identifier", json!("bad-example.com"), Some(MatchedRef::Identifier("bad-example.com".to_string()))),
479
-
("at-identifier", json!("did:plc:xyz"), Some(MatchedRef::Identifier("did:plc:xyz".to_string()))),
480
];
481
-
for (shape, val, expected) in cases {
482
let s = shape.try_into().unwrap();
483
let matched = match_shape(s, &val);
484
-
assert_eq!(matched, expected, "shape: {shape:?}, val: {val:?}");
485
}
486
}
487
}
···
1
+
use crate::{Identity, error::ProxyError, server::HydrationSource};
2
+
use atrium_api::types::string::{AtIdentifier, Cid, Did, Nsid, RecordKey};
3
use reqwest::Client;
4
use serde_json::{Map, Value};
5
+
use std::{collections::HashMap, time::Duration};
6
+
use url::Url;
7
8
pub enum ParamValue {
9
String(Vec<String>),
···
13
pub struct Params(HashMap<String, ParamValue>);
14
15
impl TryFrom<Map<String, Value>> for Params {
16
+
type Error = (); // TODO
17
fn try_from(val: Map<String, Value>) -> Result<Self, Self::Error> {
18
let mut out = HashMap::new();
19
for (k, v) in val {
···
70
71
#[derive(Clone)]
72
pub struct Proxy {
73
+
identity: Identity,
74
client: Client,
75
}
76
77
impl Proxy {
78
+
pub fn new(identity: Identity) -> Self {
79
let client = Client::builder()
80
.user_agent(format!(
81
"microcosm slingshot v{} (contact: @bad-example.com)",
···
85
.timeout(Duration::from_secs(6))
86
.build()
87
.unwrap();
88
+
Self { client, identity }
89
}
90
91
pub async fn proxy(
92
&self,
93
+
service_did: &Did,
94
+
service_id: &str,
95
+
xrpc: &Nsid,
96
+
authorization: Option<&str>,
97
+
atproto_accept_labelers: Option<&str>,
98
params: Option<Map<String, Value>>,
99
) -> Result<Value, ProxyError> {
100
+
let mut upstream: Url = self
101
+
.identity
102
+
.did_to_mini_service_doc(service_did)
103
.await?
104
+
.ok_or(ProxyError::ServiceNotFound)?
105
+
.get(service_id, None)
106
+
.ok_or(ProxyError::ServiceNotMatched)?
107
+
.endpoint
108
+
.parse()?;
109
110
+
upstream.set_path(&format!("/xrpc/{}", xrpc.as_str()));
111
112
if let Some(params) = params {
113
let mut query = upstream.query_pairs_mut();
114
let Params(ps) = params.try_into().expect("valid params");
···
133
}
134
}
135
136
+
// TODO i mean maybe we should look for headers also in our headers but not obviously
137
+
let mut headers = reqwest::header::HeaderMap::new();
138
+
// TODO: check the jwt aud against the upstream!!!
139
+
if let Some(auth) = authorization {
140
+
headers.insert("Authorization", auth.try_into()?);
141
+
}
142
+
if let Some(aal) = atproto_accept_labelers {
143
+
headers.insert("atproto-accept-labelers", aal.try_into()?);
144
+
}
145
+
146
+
let t0 = std::time::Instant::now();
147
+
let res = self
148
+
.client
149
.get(upstream)
150
+
.headers(headers)
151
.send()
152
+
.await
153
+
.and_then(|r| r.error_for_status());
154
+
155
+
if res.is_ok() {
156
+
metrics::histogram!("slingshot_proxy_upstream_request", "success" => "true")
157
+
.record(t0.elapsed());
158
+
} else {
159
+
metrics::histogram!("slingshot_proxy_upstream_request", "success" => "false")
160
+
.record(t0.elapsed());
161
+
}
162
+
163
+
Ok(res?.json().await?)
164
}
165
}
166
···
180
while let Some((i, c)) = chars.next() {
181
match c {
182
'[' if in_bracket => return Err(format!("nested opening bracket not allowed, at {i}")),
183
+
'[' if key_acc.is_empty() => {
184
+
return Err(format!("missing key before opening bracket, at {i}"));
185
+
}
186
'[' => in_bracket = true,
187
']' if in_bracket => {
188
in_bracket = false;
189
let key = std::mem::take(&mut key_acc);
190
let r#type = std::mem::take(&mut type_acc);
191
+
let t = if r#type.is_empty() {
192
+
None
193
+
} else {
194
+
Some(r#type)
195
+
};
196
out.push(PathPart::Vector(key, t));
197
// peek ahead because we need a dot after array if there's more and i don't want to add more loop state
198
let Some((i, c)) = chars.next() else {
199
break;
200
};
201
if c != '.' {
202
+
return Err(format!(
203
+
"expected dot after close bracket, found {c:?} at {i}"
204
+
));
205
}
206
}
207
']' => return Err(format!("unexpected close bracket at {i}")),
208
'.' if in_bracket => type_acc.push(c),
209
+
'.' if key_acc.is_empty() => {
210
+
return Err(format!("missing key before next segment, at {i}"));
211
+
}
212
'.' => {
213
let key = std::mem::take(&mut key_acc);
214
assert!(type_acc.is_empty());
···
253
}
254
255
#[derive(Debug, PartialEq)]
256
+
pub struct FullAtUriParts {
257
+
pub repo: AtIdentifier,
258
+
pub collection: Nsid,
259
+
pub rkey: RecordKey,
260
+
pub cid: Option<Cid>,
261
+
}
262
+
263
+
impl FullAtUriParts {
264
+
pub fn to_uri(&self) -> String {
265
+
let repo: String = self.repo.clone().into(); // no as_str for AtIdentifier atrium???
266
+
let collection = self.collection.as_str();
267
+
let rkey = self.rkey.as_str();
268
+
format!("at://{repo}/{collection}/{rkey}")
269
+
}
270
+
}
271
+
272
+
// TODO: move this to links
273
+
pub fn split_uri(uri: &str) -> Option<(AtIdentifier, Nsid, RecordKey)> {
274
+
let rest = uri.strip_prefix("at://")?;
275
+
let (repo, rest) = rest.split_once("/")?;
276
+
let repo = repo.parse().ok()?;
277
+
let (collection, rkey) = rest.split_once("/")?;
278
+
let collection = collection.parse().ok()?;
279
+
let rkey = rkey.split_once('#').map(|(k, _)| k).unwrap_or(rkey);
280
+
let rkey = rkey.split_once('?').map(|(k, _)| k).unwrap_or(rkey);
281
+
let rkey = rkey.parse().ok()?;
282
+
Some((repo, collection, rkey))
283
+
}
284
+
285
+
#[derive(Debug, PartialEq)]
286
pub enum MatchedRef {
287
+
AtUri(FullAtUriParts),
288
+
Identifier(AtIdentifier),
289
}
290
291
pub fn match_shape(shape: RefShape, val: &Value) -> Option<MatchedRef> {
···
297
RefShape::StrongRef => {
298
let o = val.as_object()?;
299
let uri = o.get("uri")?.as_str()?.to_string();
300
+
let cid = o.get("cid")?.as_str()?.parse().ok()?;
301
+
let (repo, collection, rkey) = split_uri(&uri)?;
302
+
Some(MatchedRef::AtUri(FullAtUriParts {
303
+
repo,
304
+
collection,
305
+
rkey,
306
+
cid: Some(cid),
307
+
}))
308
}
309
RefShape::AtUri => {
310
let uri = val.as_str()?.to_string();
311
+
let (repo, collection, rkey) = split_uri(&uri)?;
312
+
Some(MatchedRef::AtUri(FullAtUriParts {
313
+
repo,
314
+
collection,
315
+
rkey,
316
+
cid: None,
317
+
}))
318
}
319
RefShape::AtUriParts => {
320
let o = val.as_object()?;
321
+
let repo = o.get("repo").or(o.get("did"))?.as_str()?.parse().ok()?;
322
+
let collection = o.get("collection")?.as_str()?.parse().ok()?;
323
+
let rkey = o.get("rkey")?.as_str()?.parse().ok()?;
324
+
let cid = o
325
+
.get("cid")
326
+
.and_then(|v| v.as_str())
327
+
.and_then(|s| s.parse().ok());
328
+
Some(MatchedRef::AtUri(FullAtUriParts {
329
+
repo,
330
+
collection,
331
+
rkey,
332
+
cid,
333
+
}))
334
}
335
RefShape::Did => {
336
+
let did = val.as_str()?.parse().ok()?;
337
+
Some(MatchedRef::Identifier(AtIdentifier::Did(did)))
338
}
339
RefShape::Handle => {
340
+
let handle = val.as_str()?.parse().ok()?;
341
+
Some(MatchedRef::Identifier(AtIdentifier::Handle(handle)))
342
}
343
RefShape::AtIdentifier => {
344
+
let identifier = val.as_str()?.parse().ok()?;
345
+
Some(MatchedRef::Identifier(identifier))
346
}
347
}
348
}
···
382
}
383
impl<'a> PathWalker<'a> {
384
fn new(path_parts: &'a [PathPart], skeleton: &'a Value) -> Self {
385
+
Self {
386
+
todo: vec![(path_parts, skeleton)],
387
+
}
388
}
389
}
390
impl<'a> Iterator for PathWalker<'a> {
···
409
let Some(a) = o.get(k).and_then(|v| v.as_array()) else {
410
continue;
411
};
412
+
for v in a.iter().rev().filter(|c| {
413
+
let Some(t) = t else { return true };
414
+
c.as_object()
415
+
.and_then(|o| o.get("$type"))
416
+
.and_then(|v| v.as_str())
417
+
.map(|s| s == t)
418
+
.unwrap_or(false)
419
+
}) {
420
self.todo.push((rest, v))
421
}
422
}
···
425
}
426
}
427
428
#[cfg(test)]
429
mod tests {
430
use super::*;
431
use serde_json::json;
432
433
+
static TEST_CID: &str = "bafyreidffwk5wvh5l76yy7zefiqrovv6yaaegb4wg4zaq35w7nt3quix5a";
434
+
435
#[test]
436
fn test_parse_record_path() -> Result<(), Box<dyn std::error::Error>> {
437
let cases = [
438
("", vec![]),
439
("subject", vec![PathPart::Scalar("subject".into())]),
440
("authorDid", vec![PathPart::Scalar("authorDid".into())]),
441
+
(
442
+
"subject.uri",
443
+
vec![
444
+
PathPart::Scalar("subject".into()),
445
+
PathPart::Scalar("uri".into()),
446
+
],
447
+
),
448
("members[]", vec![PathPart::Vector("members".into(), None)]),
449
+
(
450
+
"add[].key",
451
+
vec![
452
+
PathPart::Vector("add".into(), None),
453
+
PathPart::Scalar("key".into()),
454
+
],
455
+
),
456
("a[b]", vec![PathPart::Vector("a".into(), Some("b".into()))]),
457
+
(
458
+
"a[b.c]",
459
+
vec![PathPart::Vector("a".into(), Some("b.c".into()))],
460
+
),
461
+
(
462
+
"facets[app.bsky.richtext.facet].features[app.bsky.richtext.facet#mention].did",
463
+
vec![
464
+
PathPart::Vector("facets".into(), Some("app.bsky.richtext.facet".into())),
465
+
PathPart::Vector(
466
+
"features".into(),
467
+
Some("app.bsky.richtext.facet#mention".into()),
468
+
),
469
+
PathPart::Scalar("did".into()),
470
+
],
471
+
),
472
];
473
474
for (path, expected) in cases {
···
485
("strong-ref", json!(""), None),
486
("strong-ref", json!({}), None),
487
("strong-ref", json!({ "uri": "abc" }), None),
488
+
("strong-ref", json!({ "cid": TEST_CID }), None),
489
(
490
"strong-ref",
491
+
json!({ "uri": "at://a.com/xx.yy.zz/1", "cid": TEST_CID }),
492
+
Some(MatchedRef::AtUri(FullAtUriParts {
493
+
repo: "a.com".parse().unwrap(),
494
+
collection: "xx.yy.zz".parse().unwrap(),
495
+
rkey: "1".parse().unwrap(),
496
+
cid: Some(TEST_CID.parse().unwrap()),
497
+
})),
498
),
499
("at-uri", json!({ "uri": "abc" }), None),
500
(
501
"at-uri",
502
+
json!({ "uri": "at://did:web:y.com/xx.yy.zz/1", "cid": TEST_CID }),
503
+
None,
504
),
505
+
(
506
+
"at-uri",
507
+
json!("at://did:web:y.com/xx.yy.zz/1"),
508
+
Some(MatchedRef::AtUri(FullAtUriParts {
509
+
repo: "did:web:y.com".parse().unwrap(),
510
+
collection: "xx.yy.zz".parse().unwrap(),
511
+
rkey: "1".parse().unwrap(),
512
+
cid: None,
513
+
})),
514
+
),
515
("at-uri-parts", json!("abc"), None),
516
("at-uri-parts", json!({}), None),
517
(
518
"at-uri-parts",
519
+
json!({"repo": "a.com", "collection": "xx.yy.zz", "rkey": "1", "cid": TEST_CID}),
520
+
Some(MatchedRef::AtUri(FullAtUriParts {
521
+
repo: "a.com".parse().unwrap(),
522
+
collection: "xx.yy.zz".parse().unwrap(),
523
+
rkey: "1".parse().unwrap(),
524
+
cid: Some(TEST_CID.parse().unwrap()),
525
+
})),
526
),
527
(
528
"at-uri-parts",
529
+
json!({"did": "a.com", "collection": "xx.yy.zz", "rkey": "1"}),
530
+
Some(MatchedRef::AtUri(FullAtUriParts {
531
+
repo: "a.com".parse().unwrap(),
532
+
collection: "xx.yy.zz".parse().unwrap(),
533
+
rkey: "1".parse().unwrap(),
534
+
cid: None,
535
+
})),
536
),
537
(
538
"at-uri-parts",
539
// 'repo' takes precedence over 'did'
540
+
json!({"did": "did:web:a.com", "repo": "z.com", "collection": "xx.yy.zz", "rkey": "1"}),
541
+
Some(MatchedRef::AtUri(FullAtUriParts {
542
+
repo: "z.com".parse().unwrap(),
543
+
collection: "xx.yy.zz".parse().unwrap(),
544
+
rkey: "1".parse().unwrap(),
545
+
cid: None,
546
+
})),
547
),
548
(
549
"at-uri-parts",
550
+
json!({"repo": "a.com", "collection": "xx.yy.zz", "rkey": "1", "cid": TEST_CID}),
551
+
Some(MatchedRef::AtUri(FullAtUriParts {
552
+
repo: "a.com".parse().unwrap(),
553
+
collection: "xx.yy.zz".parse().unwrap(),
554
+
rkey: "1".parse().unwrap(),
555
+
cid: Some(TEST_CID.parse().unwrap()),
556
+
})),
557
),
558
(
559
"at-uri-parts",
560
+
json!({"repo": "a.com", "collection": "xx.yy.zz", "rkey": "1", "cid": {}}),
561
+
Some(MatchedRef::AtUri(FullAtUriParts {
562
+
repo: "a.com".parse().unwrap(),
563
+
collection: "xx.yy.zz".parse().unwrap(),
564
+
rkey: "1".parse().unwrap(),
565
+
cid: None,
566
+
})),
567
),
568
("did", json!({}), None),
569
("did", json!(""), None),
570
("did", json!("bad-example.com"), None),
571
+
(
572
+
"did",
573
+
json!("did:plc:xyz"),
574
+
Some(MatchedRef::Identifier("did:plc:xyz".parse().unwrap())),
575
+
),
576
("handle", json!({}), None),
577
+
(
578
+
"handle",
579
+
json!("bad-example.com"),
580
+
Some(MatchedRef::Identifier("bad-example.com".parse().unwrap())),
581
+
),
582
("handle", json!("did:plc:xyz"), None),
583
("at-identifier", json!({}), None),
584
+
(
585
+
"at-identifier",
586
+
json!("bad-example.com"),
587
+
Some(MatchedRef::Identifier("bad-example.com".parse().unwrap())),
588
+
),
589
+
(
590
+
"at-identifier",
591
+
json!("did:plc:xyz"),
592
+
Some(MatchedRef::Identifier("did:plc:xyz".parse().unwrap())),
593
+
),
594
];
595
+
for (i, (shape, val, expected)) in cases.into_iter().enumerate() {
596
let s = shape.try_into().unwrap();
597
let matched = match_shape(s, &val);
598
+
assert_eq!(matched, expected, "{i}: shape: {shape:?}, val: {val:?}");
599
}
600
}
601
}
slingshot/src/record.rs
slingshot/src/record.rs
This file has not been changed.
+433
-138
slingshot/src/server.rs
+433
-138
slingshot/src/server.rs
···
1
use crate::{
2
CachedRecord, ErrorResponseObject, Identity, Proxy, Repo,
3
error::{RecordError, ServerError},
4
-
proxy::{extract_links, MatchedRef},
5
record::RawRecord,
6
};
7
-
use atrium_api::types::string::{Cid, Did, Handle, Nsid, RecordKey};
8
use foyer::HybridCache;
9
use links::at_uri::parse_at_uri as normalize_at_uri;
10
use serde::Serialize;
11
-
use std::{path::PathBuf, str::FromStr, sync::Arc, time::Instant, collections::HashMap};
12
use tokio::sync::mpsc;
13
use tokio_util::sync::CancellationToken;
14
···
24
};
25
use poem_openapi::{
26
ApiResponse, ContactObject, ExternalDocumentObject, Object, OpenApi, OpenApiService, Tags,
27
-
Union,
28
-
param::Query, payload::Json, types::Example,
29
};
30
31
fn example_handle() -> String {
···
34
fn example_did() -> String {
35
"did:plc:hdhoaan3xa3jiuq4fg4mefid".to_string()
36
}
37
fn example_collection() -> String {
38
"app.bsky.feed.like".to_string()
39
}
40
fn example_rkey() -> String {
41
"3lv4ouczo2b2a".to_string()
42
}
43
fn example_uri() -> String {
44
format!(
45
···
55
"zQ3shpq1g134o7HGDb86CtQFxnHqzx5pZWknrVX2Waum3fF6j".to_string()
56
}
57
58
-
#[derive(Object)]
59
#[oai(example = true)]
60
struct XrpcErrorResponseObject {
61
/// Should correspond an error `name` in the lexicon errors array
···
86
}))
87
}
88
89
-
fn bad_request_handler_resolve_mini(err: poem::Error) -> ResolveMiniIDResponse {
90
-
ResolveMiniIDResponse::BadRequest(Json(XrpcErrorResponseObject {
91
error: "InvalidRequest".to_string(),
92
message: format!("Bad request, here's some info that maybe should not be exposed: {err}"),
93
}))
94
}
95
96
fn bad_request_handler_proxy_query(err: poem::Error) -> ProxyHydrateResponse {
97
ProxyHydrateResponse::BadRequest(Json(XrpcErrorResponseObject {
98
error: "InvalidRequest".to_string(),
···
189
190
#[derive(ApiResponse)]
191
#[oai(bad_request_handler = "bad_request_handler_resolve_mini")]
192
-
enum ResolveMiniIDResponse {
193
/// Identity resolved
194
#[oai(status = 200)]
195
Ok(Json<MiniDocResponseObject>),
···
199
}
200
201
#[derive(Object)]
202
-
struct ProxyHydrationError {
203
-
reason: String,
204
}
205
206
-
#[derive(Object)]
207
-
struct ProxyHydrationPending {
208
-
url: String,
209
}
210
211
#[derive(Object)]
212
-
struct ProxyHydrationRecordFound {
213
-
record: serde_json::Value,
214
}
215
216
#[derive(Object)]
217
-
struct ProxyHydrationIdentifierFound {
218
-
record: MiniDocResponseObject,
219
}
220
221
// todo: there's gotta be a supertrait that collects these?
222
-
use poem_openapi::types::{Type, ToJSON, ParseFromJSON, IsObjectType};
223
224
#[derive(Union)]
225
#[oai(discriminator_name = "status", rename_all = "camelCase")]
···
235
/// The original upstream response content
236
output: serde_json::Value,
237
/// Any hydrated records
238
-
records: HashMap<String, Hydration<ProxyHydrationRecordFound>>,
239
/// Any hydrated identifiers
240
-
identifiers: HashMap<String, Hydration<ProxyHydrationIdentifierFound>>,
241
}
242
impl Example for ProxyHydrateResponseObject {
243
fn example() -> Self {
244
Self {
245
output: serde_json::json!({}),
246
-
records: HashMap::from([
247
-
("asdf".into(), Hydration::Pending(ProxyHydrationPending { url: "todo".into() })),
248
-
]),
249
identifiers: HashMap::new(),
250
}
251
}
···
257
#[oai(status = 200)]
258
Ok(Json<ProxyHydrateResponseObject>),
259
#[oai(status = 400)]
260
-
BadRequest(XrpcError)
261
}
262
263
#[derive(Object)]
···
282
xrpc: String,
283
/// The destination service the request will be forwarded to
284
atproto_proxy: String,
285
/// The `params` for the destination service XRPC endpoint
286
///
287
/// Currently this will be passed along unchecked, but a future version of
···
290
params: Option<serde_json::Value>,
291
/// Paths within the response to look for at-uris that can be hydrated
292
hydration_sources: Vec<HydrationSource>,
293
-
// todo: deadline thing
294
-
295
}
296
impl Example for ProxyQueryPayload {
297
fn example() -> Self {
298
Self {
299
xrpc: "app.bsky.feed.getFeedSkeleton".to_string(),
300
atproto_proxy: "did:web:blue.mackuba.eu#bsky_fg".to_string(),
301
params: Some(serde_json::json!({
302
"feed": "at://did:plc:oio4hkxaop4ao4wz2pp3f4cr/app.bsky.feed.generator/atproto",
303
})),
304
-
hydration_sources: vec![
305
-
HydrationSource {
306
-
path: "feed[].post".to_string(),
307
-
shape: "at-uri".to_string(),
308
-
}
309
-
],
310
}
311
}
312
}
···
340
}
341
342
struct Xrpc {
343
cache: HybridCache<String, CachedRecord>,
344
identity: Identity,
345
proxy: Arc<Proxy>,
···
408
/// only retains the most recent version of a record.
409
Query(cid): Query<Option<String>>,
410
) -> GetRecordResponse {
411
-
self.get_record_impl(repo, collection, rkey, cid).await
412
}
413
414
/// blue.microcosm.repo.getRecordByUri
···
478
return bad_at_uri();
479
};
480
481
-
// TODO: move this to links
482
-
let Some(rest) = normalized.strip_prefix("at://") else {
483
return bad_at_uri();
484
};
485
-
let Some((repo, rest)) = rest.split_once('/') else {
486
-
return bad_at_uri();
487
-
};
488
-
let Some((collection, rest)) = rest.split_once('/') else {
489
-
return bad_at_uri();
490
-
};
491
-
let rkey = if let Some((rkey, _rest)) = rest.split_once('?') {
492
-
rkey
493
-
} else {
494
-
rest
495
-
};
496
497
self.get_record_impl(
498
-
repo.to_string(),
499
-
collection.to_string(),
500
-
rkey.to_string(),
501
-
cid,
502
)
503
.await
504
}
···
578
/// Handle or DID to resolve
579
#[oai(example = "example_handle")]
580
Query(identifier): Query<String>,
581
-
) -> ResolveMiniIDResponse {
582
self.resolve_mini_id(Query(identifier)).await
583
}
584
···
596
/// Handle or DID to resolve
597
#[oai(example = "example_handle")]
598
Query(identifier): Query<String>,
599
-
) -> ResolveMiniIDResponse {
600
let invalid = |reason: &'static str| {
601
-
ResolveMiniIDResponse::BadRequest(xrpc_error("InvalidRequest", reason))
602
};
603
604
let mut unverified_handle = None;
605
-
let did = match Did::new(identifier.clone()) {
606
Ok(did) => did,
607
Err(_) => {
608
let Ok(alleged_handle) = Handle::new(identifier.to_lowercase()) else {
609
return invalid("Identifier was not a valid DID or handle");
610
};
611
612
-
match self.identity.handle_to_did(alleged_handle.clone()).await {
613
Ok(res) => {
614
if let Some(did) = res {
615
// we did it joe
···
627
}
628
}
629
};
630
-
let Ok(partial_doc) = self.identity.did_to_partial_mini_doc(&did).await else {
631
return invalid("Failed to get DID doc");
632
};
633
let Some(partial_doc) = partial_doc else {
···
647
"handle.invalid".to_string()
648
}
649
} else {
650
-
let Ok(handle_did) = self
651
-
.identity
652
.handle_to_did(partial_doc.unverified_handle.clone())
653
.await
654
else {
···
664
}
665
};
666
667
-
ResolveMiniIDResponse::Ok(Json(MiniDocResponseObject {
668
did: did.to_string(),
669
handle,
670
pds: partial_doc.pds,
···
672
}))
673
}
674
675
/// com.bad-example.proxy.hydrateQueryResponse
676
///
677
/// > [!important]
···
687
&self,
688
Json(payload): Json<ProxyQueryPayload>,
689
) -> ProxyHydrateResponse {
690
-
// TODO: the Accept request header, if present, gotta be json
691
-
// TODO: find any Authorization header and verify it. TBD about `aud`.
692
-
693
let params = if let Some(p) = payload.params {
694
let serde_json::Value::Object(map) = p else {
695
panic!("params have to be an object");
696
};
697
Some(map)
698
-
} else { None };
699
700
-
match self.proxy.proxy(
701
-
payload.xrpc,
702
-
payload.atproto_proxy,
703
-
params,
704
-
).await {
705
Ok(skeleton) => {
706
let links = match extract_links(payload.hydration_sources, &skeleton) {
707
Ok(l) => l,
708
Err(e) => {
709
log::warn!("problem extracting: {e:?}");
710
-
return ProxyHydrateResponse::BadRequest(xrpc_error("oop", "sorry, error extracting"))
711
}
712
};
713
let mut records = HashMap::new();
714
let mut identifiers = HashMap::new();
715
716
enum GetThing {
717
-
Record(String, Hydration<ProxyHydrationRecordFound>),
718
-
Identifier(String, Hydration<ProxyHydrationIdentifierFound>),
719
}
720
721
let (tx, mut rx) = mpsc::channel(1);
722
723
-
for link in links {
724
match link {
725
-
MatchedRef::AtUri { uri, cid } => {
726
-
if records.contains_key(&uri) {
727
log::warn!("skipping duplicate record without checking cid");
728
continue;
729
}
730
-
let mut u = url::Url::parse("https://example.com").unwrap();
731
-
u.query_pairs_mut().append_pair("at_uri", &uri); // BLEH todo
732
-
records.insert(uri.clone(), Hydration::Pending(ProxyHydrationPending {
733
-
url: format!("/xrpc/blue.microcosm.repo.getRecordByUri?{}", u.query().unwrap()), // TODO better; with cid, etc.
734
-
}));
735
let tx = tx.clone();
736
let identity = self.identity.clone();
737
let repo = self.repo.clone();
738
tokio::task::spawn(async move {
739
-
let rest = uri.strip_prefix("at://").unwrap();
740
-
let (identifier, rest) = rest.split_once('/').unwrap();
741
-
let (collection, rkey) = rest.split_once('/').unwrap();
742
-
743
-
let did = if identifier.starts_with("did:") {
744
-
Did::new(identifier.to_string()).unwrap()
745
-
} else {
746
-
let handle = Handle::new(identifier.to_string()).unwrap();
747
-
identity.handle_to_did(handle).await.unwrap().unwrap()
748
};
749
750
-
let res = match repo.get_record(
751
-
&did,
752
-
&Nsid::new(collection.to_string()).unwrap(),
753
-
&RecordKey::new(rkey.to_string()).unwrap(),
754
-
&cid.as_ref().map(|s| Cid::from_str(s).unwrap()),
755
-
).await {
756
-
Ok(CachedRecord::Deleted) =>
757
-
Hydration::Error(ProxyHydrationError {
758
-
reason: "record deleted".to_string(),
759
-
}),
760
-
Ok(CachedRecord::Found(RawRecord { cid: found_cid, record })) => {
761
-
if let Some(c) = cid && found_cid.as_ref().to_string() != c {
762
-
log::warn!("ignoring cid mismatch");
763
}
764
-
let value = serde_json::from_str(&record).unwrap();
765
-
Hydration::Found(ProxyHydrationRecordFound {
766
-
record: value,
767
-
})
768
-
}
769
-
Err(e) => {
770
-
log::warn!("finally oop {e:?}");
771
-
Hydration::Error(ProxyHydrationError {
772
-
reason: "failed to fetch record".to_string(),
773
-
})
774
-
}
775
-
};
776
-
tx.send(GetThing::Record(uri, res)).await
777
});
778
}
779
MatchedRef::Identifier(id) => {
780
-
if identifiers.contains_key(&id) {
781
continue;
782
}
783
-
let mut u = url::Url::parse("https://example.com").unwrap();
784
-
u.query_pairs_mut().append_pair("identifier", &id);
785
-
identifiers.insert(id, Hydration::Pending(ProxyHydrationPending {
786
-
url: format!("/xrpc/blue.microcosm.identity.resolveMiniDoc?{}", u.query().unwrap()), // gross
787
-
}));
788
-
let tx = tx.clone();
789
-
// let doc_fut = self.resolve_mini_doc();
790
-
tokio::task::spawn(async {
791
792
});
793
}
794
}
···
797
// (we shoudl be doing a timeout...)
798
drop(tx);
799
800
-
while let Some(hydration) = rx.recv().await {
801
-
match hydration {
802
-
GetThing::Record(uri, h) => { records.insert(uri, h); }
803
-
GetThing::Identifier(uri, md) => { identifiers.insert(uri, md); }
804
-
};
805
}
806
807
ProxyHydrateResponse::Ok(Json(ProxyHydrateResponseObject {
···
815
ProxyHydrateResponse::BadRequest(xrpc_error("oop", "sorry"))
816
}
817
}
818
-
819
}
820
821
async fn get_record_impl(
822
&self,
823
-
repo: String,
824
-
collection: String,
825
-
rkey: String,
826
-
cid: Option<String>,
827
) -> GetRecordResponse {
828
-
let did = match Did::new(repo.clone()) {
829
Ok(did) => did,
830
Err(_) => {
831
let Ok(handle) = Handle::new(repo.to_lowercase()) else {
···
856
}
857
};
858
859
-
let Ok(collection) = Nsid::new(collection) else {
860
return GetRecordResponse::BadRequest(xrpc_error(
861
"InvalidRequest",
862
"Invalid NSID for collection",
863
));
864
};
865
866
-
let Ok(rkey) = RecordKey::new(rkey) else {
867
return GetRecordResponse::BadRequest(xrpc_error("InvalidRequest", "Invalid rkey"));
868
};
869
870
let cid: Option<Cid> = if let Some(cid) = cid {
871
-
let Ok(cid) = Cid::from_str(&cid) else {
872
return GetRecordResponse::BadRequest(xrpc_error("InvalidRequest", "Invalid CID"));
873
};
874
Some(cid)
···
1017
identity: Identity,
1018
repo: Repo,
1019
proxy: Proxy,
1020
acme_domain: Option<String>,
1021
acme_contact: Option<String>,
1022
acme_cache_path: Option<PathBuf>,
···
1028
let proxy = Arc::new(proxy);
1029
let api_service = OpenApiService::new(
1030
Xrpc {
1031
cache,
1032
identity,
1033
proxy,
···
1
use crate::{
2
CachedRecord, ErrorResponseObject, Identity, Proxy, Repo,
3
error::{RecordError, ServerError},
4
+
proxy::{FullAtUriParts, MatchedRef, extract_links, split_uri},
5
record::RawRecord,
6
};
7
+
use atrium_api::types::string::{AtIdentifier, Cid, Did, Handle, Nsid, RecordKey};
8
use foyer::HybridCache;
9
use links::at_uri::parse_at_uri as normalize_at_uri;
10
use serde::Serialize;
11
+
use std::{collections::HashMap, path::PathBuf, str::FromStr, sync::Arc, time::Instant};
12
use tokio::sync::mpsc;
13
use tokio_util::sync::CancellationToken;
14
···
24
};
25
use poem_openapi::{
26
ApiResponse, ContactObject, ExternalDocumentObject, Object, OpenApi, OpenApiService, Tags,
27
+
Union, param::Query, payload::Json, types::Example,
28
};
29
30
fn example_handle() -> String {
···
33
fn example_did() -> String {
34
"did:plc:hdhoaan3xa3jiuq4fg4mefid".to_string()
35
}
36
+
fn example_service_did() -> String {
37
+
"did:web:constellation.microcosm.blue".to_string()
38
+
}
39
fn example_collection() -> String {
40
"app.bsky.feed.like".to_string()
41
}
42
fn example_rkey() -> String {
43
"3lv4ouczo2b2a".to_string()
44
}
45
+
fn example_id_fragment() -> String {
46
+
"#constellation".to_string()
47
+
}
48
fn example_uri() -> String {
49
format!(
50
···
60
"zQ3shpq1g134o7HGDb86CtQFxnHqzx5pZWknrVX2Waum3fF6j".to_string()
61
}
62
63
+
#[derive(Debug, Object)]
64
#[oai(example = true)]
65
struct XrpcErrorResponseObject {
66
/// Should correspond an error `name` in the lexicon errors array
···
91
}))
92
}
93
94
+
fn bad_request_handler_resolve_mini(err: poem::Error) -> ResolveMiniDocResponse {
95
+
ResolveMiniDocResponse::BadRequest(Json(XrpcErrorResponseObject {
96
error: "InvalidRequest".to_string(),
97
message: format!("Bad request, here's some info that maybe should not be exposed: {err}"),
98
}))
99
}
100
101
+
fn bad_request_handler_resolve_service(err: poem::Error) -> ResolveServiceResponse {
102
+
ResolveServiceResponse::BadRequest(Json(XrpcErrorResponseObject {
103
+
error: "InvalidRequest".to_string(),
104
+
message: format!("Bad request, here's some info that maybe should not be exposed: {err}"),
105
+
}))
106
+
}
107
+
108
fn bad_request_handler_proxy_query(err: poem::Error) -> ProxyHydrateResponse {
109
ProxyHydrateResponse::BadRequest(Json(XrpcErrorResponseObject {
110
error: "InvalidRequest".to_string(),
···
201
202
#[derive(ApiResponse)]
203
#[oai(bad_request_handler = "bad_request_handler_resolve_mini")]
204
+
enum ResolveMiniDocResponse {
205
/// Identity resolved
206
#[oai(status = 200)]
207
Ok(Json<MiniDocResponseObject>),
···
211
}
212
213
#[derive(Object)]
214
+
#[oai(example = true)]
215
+
struct ServiceResponseObject {
216
+
/// The service endpoint URL, if found
217
+
endpoint: String,
218
}
219
+
impl Example for ServiceResponseObject {
220
+
fn example() -> Self {
221
+
Self {
222
+
endpoint: "https://example.com".to_string(),
223
+
}
224
+
}
225
+
}
226
227
+
#[derive(ApiResponse)]
228
+
#[oai(bad_request_handler = "bad_request_handler_resolve_service")]
229
+
enum ResolveServiceResponse {
230
+
/// Service resolved
231
+
#[oai(status = 200)]
232
+
Ok(Json<ServiceResponseObject>),
233
+
/// Bad request or service not resolved
234
+
#[oai(status = 400)]
235
+
BadRequest(XrpcError),
236
}
237
238
#[derive(Object)]
239
+
#[oai(rename_all = "camelCase")]
240
+
struct ProxyHydrationError {
241
+
/// Short description of why the hydration failed
242
+
reason: String,
243
+
/// Whether or not it's recommended to retry requesting this item
244
+
should_retry: bool,
245
+
/// URL to follow up at if retrying
246
+
follow_up: String,
247
}
248
249
#[derive(Object)]
250
+
#[oai(rename_all = "camelCase")]
251
+
struct ProxyHydrationPending {
252
+
/// URL you can request to finish hydrating this item
253
+
follow_up: String,
254
+
/// Why this item couldn't be hydrated: 'deadline' or 'limit'
255
+
///
256
+
/// - `deadline`: the item fetch didn't complete before the response was
257
+
/// due, but will continue on slingshot in the background -- `followUp`
258
+
/// requests are coalesced into the original item fetch to be available as
259
+
/// early as possible.
260
+
///
261
+
/// - `limit`: slingshot only attempts to hydrate the first 100 items found
262
+
/// in a proxied response, with the remaining marked `pending`. You can
263
+
/// request `followUp` to fetch them.
264
+
///
265
+
/// In the future, Slingshot may put pending links after `limit` into a low-
266
+
/// priority fetch queue, so that these items become available sooner on
267
+
/// follow-up request as well.
268
+
reason: String,
269
}
270
271
// todo: there's gotta be a supertrait that collects these?
272
+
use poem_openapi::types::{IsObjectType, ParseFromJSON, ToJSON, Type};
273
274
#[derive(Union)]
275
#[oai(discriminator_name = "status", rename_all = "camelCase")]
···
285
/// The original upstream response content
286
output: serde_json::Value,
287
/// Any hydrated records
288
+
records: HashMap<String, Hydration<FoundRecordResponseObject>>,
289
/// Any hydrated identifiers
290
+
identifiers: HashMap<String, Hydration<MiniDocResponseObject>>,
291
}
292
impl Example for ProxyHydrateResponseObject {
293
fn example() -> Self {
294
Self {
295
output: serde_json::json!({}),
296
+
records: HashMap::from([(
297
+
"asdf".into(),
298
+
Hydration::Pending(ProxyHydrationPending {
299
+
follow_up: "/xrpc/com.atproto.repo.getRecord?...".to_string(),
300
+
reason: "deadline".to_string(),
301
+
}),
302
+
)]),
303
identifiers: HashMap::new(),
304
}
305
}
···
311
#[oai(status = 200)]
312
Ok(Json<ProxyHydrateResponseObject>),
313
#[oai(status = 400)]
314
+
BadRequest(XrpcError),
315
}
316
317
#[derive(Object)]
···
336
xrpc: String,
337
/// The destination service the request will be forwarded to
338
atproto_proxy: String,
339
+
/// An optional auth token to pass on
340
+
///
341
+
/// the `aud` field must match the upstream atproto_proxy service
342
+
authorization: Option<String>,
343
+
/// An optional set of labelers to request be applied by the upstream
344
+
atproto_accept_labelers: Option<String>,
345
/// The `params` for the destination service XRPC endpoint
346
///
347
/// Currently this will be passed along unchecked, but a future version of
···
350
params: Option<serde_json::Value>,
351
/// Paths within the response to look for at-uris that can be hydrated
352
hydration_sources: Vec<HydrationSource>,
353
+
// todo: let clients pass a hydration deadline?
354
}
355
impl Example for ProxyQueryPayload {
356
fn example() -> Self {
357
Self {
358
xrpc: "app.bsky.feed.getFeedSkeleton".to_string(),
359
atproto_proxy: "did:web:blue.mackuba.eu#bsky_fg".to_string(),
360
+
authorization: None,
361
+
atproto_accept_labelers: None,
362
params: Some(serde_json::json!({
363
"feed": "at://did:plc:oio4hkxaop4ao4wz2pp3f4cr/app.bsky.feed.generator/atproto",
364
})),
365
+
hydration_sources: vec![HydrationSource {
366
+
path: "feed[].post".to_string(),
367
+
shape: "at-uri".to_string(),
368
+
}],
369
}
370
}
371
}
···
399
}
400
401
struct Xrpc {
402
+
base_url: url::Url,
403
cache: HybridCache<String, CachedRecord>,
404
identity: Identity,
405
proxy: Arc<Proxy>,
···
468
/// only retains the most recent version of a record.
469
Query(cid): Query<Option<String>>,
470
) -> GetRecordResponse {
471
+
self.get_record_impl(&repo, &collection, &rkey, cid.as_deref())
472
+
.await
473
}
474
475
/// blue.microcosm.repo.getRecordByUri
···
539
return bad_at_uri();
540
};
541
542
+
let Some((repo, collection, rkey)) = split_uri(&normalized) else {
543
return bad_at_uri();
544
};
545
546
self.get_record_impl(
547
+
Into::<String>::into(repo).as_str(),
548
+
collection.as_str(),
549
+
rkey.as_str(),
550
+
cid.as_deref(),
551
)
552
.await
553
}
···
627
/// Handle or DID to resolve
628
#[oai(example = "example_handle")]
629
Query(identifier): Query<String>,
630
+
) -> ResolveMiniDocResponse {
631
self.resolve_mini_id(Query(identifier)).await
632
}
633
···
645
/// Handle or DID to resolve
646
#[oai(example = "example_handle")]
647
Query(identifier): Query<String>,
648
+
) -> ResolveMiniDocResponse {
649
+
Self::resolve_mini_doc_impl(&identifier, self.identity.clone()).await
650
+
}
651
+
652
+
async fn resolve_mini_doc_impl(identifier: &str, identity: Identity) -> ResolveMiniDocResponse {
653
let invalid = |reason: &'static str| {
654
+
ResolveMiniDocResponse::BadRequest(xrpc_error("InvalidRequest", reason))
655
};
656
657
let mut unverified_handle = None;
658
+
let did = match Did::new(identifier.to_string()) {
659
Ok(did) => did,
660
Err(_) => {
661
let Ok(alleged_handle) = Handle::new(identifier.to_lowercase()) else {
662
return invalid("Identifier was not a valid DID or handle");
663
};
664
665
+
match identity.handle_to_did(alleged_handle.clone()).await {
666
Ok(res) => {
667
if let Some(did) = res {
668
// we did it joe
···
680
}
681
}
682
};
683
+
let Ok(partial_doc) = identity.did_to_partial_mini_doc(&did).await else {
684
return invalid("Failed to get DID doc");
685
};
686
let Some(partial_doc) = partial_doc else {
···
700
"handle.invalid".to_string()
701
}
702
} else {
703
+
let Ok(handle_did) = identity
704
.handle_to_did(partial_doc.unverified_handle.clone())
705
.await
706
else {
···
716
}
717
};
718
719
+
ResolveMiniDocResponse::Ok(Json(MiniDocResponseObject {
720
did: did.to_string(),
721
handle,
722
pds: partial_doc.pds,
···
724
}))
725
}
726
727
+
/// com.bad-example.identity.resolveService
728
+
///
729
+
/// resolve an atproto service did + id to its http endpoint
730
+
///
731
+
/// > [!important]
732
+
/// > this endpoint is experimental and may change
733
+
#[oai(
734
+
path = "/com.bad-example.identity.resolveService",
735
+
method = "get",
736
+
tag = "ApiTags::Custom"
737
+
)]
738
+
async fn resolve_service(
739
+
&self,
740
+
/// the service's did
741
+
#[oai(example = "example_service_did")]
742
+
Query(did): Query<String>,
743
+
/// id fragment, starting with '#'
744
+
///
745
+
/// must be url-encoded!
746
+
#[oai(example = "example_id_fragment")]
747
+
Query(id): Query<String>,
748
+
/// optionally, the exact service type to filter
749
+
///
750
+
/// resolving a pds requires matching the type as well as id. service
751
+
/// proxying ignores the type.
752
+
Query(r#type): Query<Option<String>>,
753
+
) -> ResolveServiceResponse {
754
+
let Ok(did) = Did::new(did) else {
755
+
return ResolveServiceResponse::BadRequest(xrpc_error(
756
+
"InvalidRequest",
757
+
"could not parse 'did' into a DID",
758
+
));
759
+
};
760
+
let identity = self.identity.clone();
761
+
Self::resolve_service_impl(&did, &id, r#type.as_deref(), identity).await
762
+
}
763
+
764
+
async fn resolve_service_impl(
765
+
did: &Did,
766
+
id_fragment: &str,
767
+
service_type: Option<&str>,
768
+
identity: Identity,
769
+
) -> ResolveServiceResponse {
770
+
let invalid = |reason: &'static str| {
771
+
ResolveServiceResponse::BadRequest(xrpc_error("InvalidRequest", reason))
772
+
};
773
+
let Ok(service_mini_doc) = identity.did_to_mini_service_doc(did).await else {
774
+
return invalid("Failed to get DID doc");
775
+
};
776
+
let Some(service_mini_doc) = service_mini_doc else {
777
+
return invalid("Failed to find DID doc");
778
+
};
779
+
780
+
let Some(matching) = service_mini_doc.get(id_fragment, service_type) else {
781
+
return invalid("failed to match identity (and maybe type)");
782
+
};
783
+
784
+
ResolveServiceResponse::Ok(Json(ServiceResponseObject {
785
+
endpoint: matching.endpoint.clone(),
786
+
}))
787
+
}
788
+
789
/// com.bad-example.proxy.hydrateQueryResponse
790
///
791
/// > [!important]
···
801
&self,
802
Json(payload): Json<ProxyQueryPayload>,
803
) -> ProxyHydrateResponse {
804
let params = if let Some(p) = payload.params {
805
let serde_json::Value::Object(map) = p else {
806
panic!("params have to be an object");
807
};
808
Some(map)
809
+
} else {
810
+
None
811
+
};
812
813
+
let Some((service_did, id_fragment)) = payload.atproto_proxy.rsplit_once("#") else {
814
+
return ProxyHydrateResponse::BadRequest(xrpc_error(
815
+
"BadParameter",
816
+
"atproto_proxy could not be understood",
817
+
));
818
+
};
819
+
820
+
let Ok(service_did) = service_did.parse() else {
821
+
return ProxyHydrateResponse::BadRequest(xrpc_error(
822
+
"BadParameter",
823
+
"atproto_proxy service did could not be parsed",
824
+
));
825
+
};
826
+
827
+
let Ok(xrpc) = payload.xrpc.parse() else {
828
+
return ProxyHydrateResponse::BadRequest(xrpc_error(
829
+
"BadParameter",
830
+
"invalid NSID for xrpc param",
831
+
));
832
+
};
833
+
834
+
match self
835
+
.proxy
836
+
.proxy(
837
+
&service_did,
838
+
&format!("#{id_fragment}"),
839
+
&xrpc,
840
+
payload.authorization.as_deref(),
841
+
payload.atproto_accept_labelers.as_deref(),
842
+
params,
843
+
)
844
+
.await
845
+
{
846
Ok(skeleton) => {
847
let links = match extract_links(payload.hydration_sources, &skeleton) {
848
Ok(l) => l,
849
Err(e) => {
850
log::warn!("problem extracting: {e:?}");
851
+
return ProxyHydrateResponse::BadRequest(xrpc_error(
852
+
"oop",
853
+
"sorry, error extracting",
854
+
));
855
}
856
};
857
let mut records = HashMap::new();
858
let mut identifiers = HashMap::new();
859
860
enum GetThing {
861
+
Record(String, Hydration<FoundRecordResponseObject>),
862
+
Identifier(String, Hydration<MiniDocResponseObject>),
863
}
864
865
let (tx, mut rx) = mpsc::channel(1);
866
867
+
let t0 = Instant::now();
868
+
869
+
for (i, link) in links.into_iter().enumerate() {
870
match link {
871
+
MatchedRef::AtUri(parts) => {
872
+
let non_canonical_url = parts.to_uri();
873
+
if records.contains_key(&non_canonical_url) {
874
log::warn!("skipping duplicate record without checking cid");
875
continue;
876
}
877
+
let mut follow_up = self.base_url.clone();
878
+
follow_up.set_path("/xrpc/com.atproto.repo.getRecord");
879
+
follow_up
880
+
.query_pairs_mut()
881
+
.append_pair("repo", &Into::<String>::into(parts.repo.clone()))
882
+
.append_pair("collection", parts.collection.as_str())
883
+
.append_pair("rkey", parts.rkey.as_str());
884
+
if let Some(ref cid) = parts.cid {
885
+
follow_up
886
+
.query_pairs_mut()
887
+
.append_pair("cid", &cid.as_ref().to_string());
888
+
}
889
+
890
+
if i >= 100 {
891
+
records.insert(
892
+
non_canonical_url.clone(),
893
+
Hydration::Pending(ProxyHydrationPending {
894
+
reason: "limit".to_string(),
895
+
follow_up: follow_up.to_string(),
896
+
}),
897
+
);
898
+
continue;
899
+
} else {
900
+
records.insert(
901
+
non_canonical_url.clone(),
902
+
Hydration::Pending(ProxyHydrationPending {
903
+
reason: "deadline".to_string(),
904
+
follow_up: follow_up.to_string(),
905
+
}),
906
+
);
907
+
}
908
+
909
let tx = tx.clone();
910
let identity = self.identity.clone();
911
let repo = self.repo.clone();
912
tokio::task::spawn(async move {
913
+
let FullAtUriParts {
914
+
repo: ident,
915
+
collection,
916
+
rkey,
917
+
cid,
918
+
} = parts;
919
+
let did = match ident {
920
+
AtIdentifier::Did(did) => did,
921
+
AtIdentifier::Handle(handle) => {
922
+
let Ok(Some(did)) = identity.handle_to_did(handle).await
923
+
else {
924
+
let res = Hydration::Error(ProxyHydrationError {
925
+
reason: "could not resolve handle".to_string(),
926
+
should_retry: true,
927
+
follow_up: follow_up.to_string(),
928
+
});
929
+
return if tx
930
+
.send(GetThing::Record(non_canonical_url, res))
931
+
.await
932
+
.is_ok()
933
+
{
934
+
metrics::counter!("slingshot_hydrated_one", "type" => "record", "ontime" => "true").increment(1);
935
+
} else {
936
+
metrics::counter!("slingshot_hydrated_one", "type" => "record", "ontime" => "false").increment(1);
937
+
};
938
+
};
939
+
did
940
+
}
941
};
942
943
+
let res =
944
+
match repo.get_record(&did, &collection, &rkey, &cid).await {
945
+
Ok(CachedRecord::Deleted) => {
946
+
Hydration::Error(ProxyHydrationError {
947
+
reason: "record deleted".to_string(),
948
+
should_retry: false,
949
+
follow_up: follow_up.to_string(),
950
+
})
951
}
952
+
Ok(CachedRecord::Found(RawRecord {
953
+
cid: found_cid,
954
+
record,
955
+
})) => {
956
+
if cid
957
+
.as_ref()
958
+
.map(|expected| *expected != found_cid)
959
+
.unwrap_or(false)
960
+
{
961
+
Hydration::Error(ProxyHydrationError {
962
+
reason: "not found".to_string(),
963
+
should_retry: false,
964
+
follow_up: follow_up.to_string(),
965
+
})
966
+
} else if let Ok(value) = serde_json::from_str(&record)
967
+
{
968
+
let canonical_uri = FullAtUriParts {
969
+
repo: AtIdentifier::Did(did),
970
+
collection,
971
+
rkey,
972
+
cid: None, // not used for .to_uri
973
+
}
974
+
.to_uri();
975
+
Hydration::Found(FoundRecordResponseObject {
976
+
cid: Some(found_cid.as_ref().to_string()),
977
+
uri: canonical_uri,
978
+
value,
979
+
})
980
+
} else {
981
+
Hydration::Error(ProxyHydrationError {
982
+
reason: "could not parse upstream response"
983
+
.to_string(),
984
+
should_retry: false,
985
+
follow_up: follow_up.to_string(),
986
+
})
987
+
}
988
+
}
989
+
Err(e) => {
990
+
log::warn!("finally oop {e:?}");
991
+
Hydration::Error(ProxyHydrationError {
992
+
reason: "failed to fetch record".to_string(),
993
+
should_retry: true, // TODO
994
+
follow_up: follow_up.to_string(),
995
+
})
996
+
}
997
+
};
998
+
if tx
999
+
.send(GetThing::Record(non_canonical_url, res))
1000
+
.await
1001
+
.is_ok()
1002
+
{
1003
+
metrics::counter!("slingshot_hydrated_one", "type" => "record", "ontime" => "true").increment(1);
1004
+
} else {
1005
+
metrics::counter!("slingshot_hydrated_one", "type" => "record", "ontime" => "false").increment(1);
1006
+
}
1007
});
1008
}
1009
MatchedRef::Identifier(id) => {
1010
+
let identifier: String = id.clone().into();
1011
+
if identifiers.contains_key(&identifier) {
1012
continue;
1013
}
1014
1015
+
let mut follow_up = self.base_url.clone();
1016
+
follow_up.set_path("/xrpc/blue.microcosm.identity.resolveMiniDoc");
1017
+
1018
+
follow_up
1019
+
.query_pairs_mut()
1020
+
.append_pair("identifier", &identifier);
1021
+
1022
+
if i >= 100 {
1023
+
identifiers.insert(
1024
+
identifier.clone(),
1025
+
Hydration::Pending(ProxyHydrationPending {
1026
+
reason: "limit".to_string(),
1027
+
follow_up: follow_up.to_string(),
1028
+
}),
1029
+
);
1030
+
continue;
1031
+
} else {
1032
+
identifiers.insert(
1033
+
identifier.clone(),
1034
+
Hydration::Pending(ProxyHydrationPending {
1035
+
reason: "deadline".to_string(),
1036
+
follow_up: follow_up.to_string(),
1037
+
}),
1038
+
);
1039
+
}
1040
+
1041
+
let tx = tx.clone();
1042
+
let identity = self.identity.clone();
1043
+
tokio::task::spawn(async move {
1044
+
let res = match Self::resolve_mini_doc_impl(&identifier, identity)
1045
+
.await
1046
+
{
1047
+
ResolveMiniDocResponse::Ok(Json(mini_doc)) => {
1048
+
Hydration::Found(mini_doc)
1049
+
}
1050
+
ResolveMiniDocResponse::BadRequest(e) => {
1051
+
log::warn!("minidoc fail: {:?}", e.0);
1052
+
Hydration::Error(ProxyHydrationError {
1053
+
reason: "failed to resolve mini doc".to_string(),
1054
+
should_retry: false,
1055
+
follow_up: follow_up.to_string(),
1056
+
})
1057
+
}
1058
+
};
1059
+
if tx.send(GetThing::Identifier(identifier, res)).await.is_ok() {
1060
+
metrics::counter!("slingshot_hydrated_one", "type" => "identity", "ontime" => "true").increment(1);
1061
+
} else {
1062
+
metrics::counter!("slingshot_hydrated_one", "type" => "identity", "ontime" => "false").increment(1);
1063
+
}
1064
});
1065
}
1066
}
···
1069
// (we shoudl be doing a timeout...)
1070
drop(tx);
1071
1072
+
let deadline = t0 + std::time::Duration::from_secs_f64(1.6);
1073
+
let res = tokio::time::timeout_at(deadline.into(), async {
1074
+
while let Some(hydration) = rx.recv().await {
1075
+
match hydration {
1076
+
GetThing::Record(uri, h) => {
1077
+
if let Some(r) = records.get_mut(&uri) {
1078
+
match (&r, &h) {
1079
+
(_, Hydration::Found(_)) => *r = h, // always replace if found
1080
+
(Hydration::Pending(_), _) => *r = h, // or if it was pending
1081
+
_ => {} // else leave it
1082
+
}
1083
+
} else {
1084
+
records.insert(uri, h);
1085
+
}
1086
+
}
1087
+
GetThing::Identifier(identifier, md) => {
1088
+
identifiers.insert(identifier.to_string(), md);
1089
+
}
1090
+
};
1091
+
}
1092
+
})
1093
+
.await;
1094
+
1095
+
if res.is_ok() {
1096
+
metrics::histogram!("slingshot_hydration_all_completed").record(t0.elapsed());
1097
+
} else {
1098
+
metrics::counter!("slingshot_hydration_cut_off").increment(1);
1099
}
1100
1101
ProxyHydrateResponse::Ok(Json(ProxyHydrateResponseObject {
···
1109
ProxyHydrateResponse::BadRequest(xrpc_error("oop", "sorry"))
1110
}
1111
}
1112
}
1113
1114
async fn get_record_impl(
1115
&self,
1116
+
repo: &str,
1117
+
collection: &str,
1118
+
rkey: &str,
1119
+
cid: Option<&str>,
1120
) -> GetRecordResponse {
1121
+
let did = match Did::new(repo.to_string()) {
1122
Ok(did) => did,
1123
Err(_) => {
1124
let Ok(handle) = Handle::new(repo.to_lowercase()) else {
···
1149
}
1150
};
1151
1152
+
let Ok(collection) = Nsid::new(collection.to_string()) else {
1153
return GetRecordResponse::BadRequest(xrpc_error(
1154
"InvalidRequest",
1155
"Invalid NSID for collection",
1156
));
1157
};
1158
1159
+
let Ok(rkey) = RecordKey::new(rkey.to_string()) else {
1160
return GetRecordResponse::BadRequest(xrpc_error("InvalidRequest", "Invalid rkey"));
1161
};
1162
1163
let cid: Option<Cid> = if let Some(cid) = cid {
1164
+
let Ok(cid) = Cid::from_str(cid) else {
1165
return GetRecordResponse::BadRequest(xrpc_error("InvalidRequest", "Invalid CID"));
1166
};
1167
Some(cid)
···
1310
identity: Identity,
1311
repo: Repo,
1312
proxy: Proxy,
1313
+
base_url: url::Url,
1314
acme_domain: Option<String>,
1315
acme_contact: Option<String>,
1316
acme_cache_path: Option<PathBuf>,
···
1322
let proxy = Arc::new(proxy);
1323
let api_service = OpenApiService::new(
1324
Xrpc {
1325
+
base_url,
1326
cache,
1327
identity,
1328
proxy,
+1
-7
constellation/src/bin/main.rs
+1
-7
constellation/src/bin/main.rs
···
45
#[arg(short, long)]
46
#[clap(value_enum, default_value_t = StorageBackend::Memory)]
47
backend: StorageBackend,
48
-
/// Serve a did:web document for this domain
49
-
#[arg(long)]
50
-
did_web_domain: Option<String>,
51
/// Initiate a database backup into this dir, if supported by the storage
52
#[arg(long)]
53
backup: Option<PathBuf>,
···
106
MemStorage::new(),
107
fixture,
108
None,
109
-
args.did_web_domain,
110
stream,
111
bind,
112
metrics_bind,
···
142
rocks,
143
fixture,
144
args.data,
145
-
args.did_web_domain,
146
stream,
147
bind,
148
metrics_bind,
···
164
mut storage: impl LinkStorage,
165
fixture: Option<PathBuf>,
166
data_dir: Option<PathBuf>,
167
-
did_web_domain: Option<String>,
168
stream: String,
169
bind: SocketAddr,
170
metrics_bind: SocketAddr,
···
217
if collect_metrics {
218
install_metrics_server(metrics_bind)?;
219
}
220
-
serve(readable, bind, did_web_domain, staying_alive).await
221
})
222
.unwrap();
223
stay_alive.drop_guard();
···
45
#[arg(short, long)]
46
#[clap(value_enum, default_value_t = StorageBackend::Memory)]
47
backend: StorageBackend,
48
/// Initiate a database backup into this dir, if supported by the storage
49
#[arg(long)]
50
backup: Option<PathBuf>,
···
103
MemStorage::new(),
104
fixture,
105
None,
106
stream,
107
bind,
108
metrics_bind,
···
138
rocks,
139
fixture,
140
args.data,
141
stream,
142
bind,
143
metrics_bind,
···
159
mut storage: impl LinkStorage,
160
fixture: Option<PathBuf>,
161
data_dir: Option<PathBuf>,
162
stream: String,
163
bind: SocketAddr,
164
metrics_bind: SocketAddr,
···
211
if collect_metrics {
212
install_metrics_server(metrics_bind)?;
213
}
214
+
serve(readable, bind, staying_alive).await
215
})
216
.unwrap();
217
stay_alive.drop_guard();
+7
-32
constellation/src/server/mod.rs
+7
-32
constellation/src/server/mod.rs
···
3
extract::{Query, Request},
4
http::{self, header},
5
middleware::{self, Next},
6
-
response::{IntoResponse, Json, Response},
7
routing::get,
8
Router,
9
};
···
37
http::StatusCode::INTERNAL_SERVER_ERROR
38
}
39
40
-
pub async fn serve<S: LinkReader, A: ToSocketAddrs>(
41
-
store: S,
42
-
addr: A,
43
-
did_web_domain: Option<String>,
44
-
stay_alive: CancellationToken,
45
-
) -> anyhow::Result<()> {
46
-
let mut app = Router::new();
47
-
48
-
if let Some(d) = did_web_domain {
49
-
app = app.route(
50
-
"/.well-known/did.json",
51
-
get({
52
-
let domain = d.clone();
53
-
move || did_web(domain)
54
-
}),
55
-
)
56
-
}
57
-
58
-
let app = app
59
.route("/robots.txt", get(robots))
60
.route(
61
"/",
···
217
User-agent: *
218
Disallow: /links
219
Disallow: /links/
220
-
Disallow: /xrpc/
221
"
222
-
}
223
-
224
-
async fn did_web(domain: String) -> impl IntoResponse {
225
-
Json(serde_json::json!({
226
-
"id": format!("did:web:{domain}"),
227
-
"service": [{
228
-
"id": "#constellation",
229
-
"type": "ConstellationGraphService",
230
-
"serviceEndpoint": format!("https://{domain}")
231
-
}]
232
-
}))
233
}
234
235
#[derive(Template, Serialize, Deserialize)]
···
3
extract::{Query, Request},
4
http::{self, header},
5
middleware::{self, Next},
6
+
response::{IntoResponse, Response},
7
routing::get,
8
Router,
9
};
···
37
http::StatusCode::INTERNAL_SERVER_ERROR
38
}
39
40
+
pub async fn serve<S, A>(store: S, addr: A, stay_alive: CancellationToken) -> anyhow::Result<()>
41
+
where
42
+
S: LinkReader,
43
+
A: ToSocketAddrs,
44
+
{
45
+
let app = Router::new()
46
.route("/robots.txt", get(robots))
47
.route(
48
"/",
···
204
User-agent: *
205
Disallow: /links
206
Disallow: /links/
207
"
208
}
209
210
#[derive(Template, Serialize, Deserialize)]
+208
-16
slingshot/src/identity.rs
+208
-16
slingshot/src/identity.rs
···
17
18
use crate::error::IdentityError;
19
use atrium_api::{
20
-
did_doc::DidDocument,
21
types::string::{Did, Handle},
22
};
23
use atrium_common::resolver::Resolver;
···
41
pub enum IdentityKey {
42
Handle(Handle),
43
Did(Did),
44
}
45
46
impl IdentityKey {
···
48
let s = match self {
49
IdentityKey::Handle(h) => h.as_str(),
50
IdentityKey::Did(d) => d.as_str(),
51
};
52
std::mem::size_of::<Self>() + std::mem::size_of_val(s)
53
}
···
59
#[derive(Debug, Serialize, Deserialize)]
60
enum IdentityData {
61
NotFound,
62
-
Did(Did),
63
-
Doc(PartialMiniDoc),
64
}
65
66
impl IdentityVal {
···
71
IdentityData::Did(d) => std::mem::size_of_val(d.as_str()),
72
IdentityData::Doc(d) => {
73
std::mem::size_of_val(d.unverified_handle.as_str())
74
-
+ std::mem::size_of_val(d.pds.as_str())
75
-
+ std::mem::size_of_val(d.signing_key.as_str())
76
}
77
};
78
wrapping + inner
···
168
}
169
}
170
171
/// multi-producer *single-consumer* queue structures (wrap in arc-mutex plz)
172
///
173
/// the hashset allows testing for presense of items in the queue.
···
296
let now = UtcDateTime::now();
297
let IdentityVal(last_fetch, data) = entry.value();
298
match data {
299
-
IdentityData::Doc(_) => {
300
-
log::error!("identity value mixup: got a doc from a handle key (should be a did)");
301
-
Err(IdentityError::IdentityValTypeMixup(handle.to_string()))
302
-
}
303
IdentityData::NotFound => {
304
if (now - *last_fetch) >= MIN_NOT_FOUND_TTL {
305
metrics::counter!("identity_handle_refresh_queued", "reason" => "ttl", "found" => "false").increment(1);
···
313
self.queue_refresh(key).await;
314
}
315
Ok(Some(did.clone()))
316
}
317
}
318
}
···
362
let now = UtcDateTime::now();
363
let IdentityVal(last_fetch, data) = entry.value();
364
match data {
365
-
IdentityData::Did(_) => {
366
-
log::error!("identity value mixup: got a did from a did key (should be a doc)");
367
-
Err(IdentityError::IdentityValTypeMixup(did.to_string()))
368
-
}
369
IdentityData::NotFound => {
370
if (now - *last_fetch) >= MIN_NOT_FOUND_TTL {
371
metrics::counter!("identity_did_refresh_queued", "reason" => "ttl", "found" => "false").increment(1);
···
373
}
374
Ok(None)
375
}
376
-
IdentityData::Doc(mini_did) => {
377
if (now - *last_fetch) >= MIN_TTL {
378
metrics::counter!("identity_did_refresh_queued", "reason" => "ttl", "found" => "true").increment(1);
379
self.queue_refresh(key).await;
380
}
381
-
Ok(Some(mini_did.clone()))
382
}
383
}
384
}
···
519
log::warn!(
520
"refreshed did doc failed: wrong did doc id. dropping refresh."
521
);
522
continue;
523
}
524
let mini_doc = match did_doc.try_into() {
···
526
Err(e) => {
527
metrics::counter!("identity_did_refresh", "success" => "false", "reason" => "bad doc").increment(1);
528
log::warn!(
529
-
"converting mini doc failed: {e:?}. dropping refresh."
530
);
531
continue;
532
}
533
};
···
554
}
555
556
self.complete_refresh(&task_key).await?; // failures are bugs, so break loop
557
}
558
}
559
}
···
17
18
use crate::error::IdentityError;
19
use atrium_api::{
20
+
did_doc::{DidDocument, Service as DidDocServic},
21
types::string::{Did, Handle},
22
};
23
use atrium_common::resolver::Resolver;
···
41
pub enum IdentityKey {
42
Handle(Handle),
43
Did(Did),
44
+
ServiceDid(Did),
45
}
46
47
impl IdentityKey {
···
49
let s = match self {
50
IdentityKey::Handle(h) => h.as_str(),
51
IdentityKey::Did(d) => d.as_str(),
52
+
IdentityKey::ServiceDid(d) => d.as_str(),
53
};
54
std::mem::size_of::<Self>() + std::mem::size_of_val(s)
55
}
···
61
#[derive(Debug, Serialize, Deserialize)]
62
enum IdentityData {
63
NotFound,
64
+
Did(Did), // from handle
65
+
Doc(PartialMiniDoc), // from did
66
+
ServiceDoc(MiniServiceDoc), // from service did
67
}
68
69
impl IdentityVal {
···
74
IdentityData::Did(d) => std::mem::size_of_val(d.as_str()),
75
IdentityData::Doc(d) => {
76
std::mem::size_of_val(d.unverified_handle.as_str())
77
+
+ std::mem::size_of_val(&d.pds)
78
+
+ std::mem::size_of_val(&d.signing_key)
79
+
}
80
+
IdentityData::ServiceDoc(d) => {
81
+
let mut s = std::mem::size_of::<MiniServiceDoc>();
82
+
s += std::mem::size_of_val(&d.services);
83
+
for sv in &d.services {
84
+
s += std::mem::size_of_val(&sv.full_id);
85
+
s += std::mem::size_of_val(&sv.r#type);
86
+
s += std::mem::size_of_val(&sv.endpoint);
87
+
}
88
+
s
89
}
90
};
91
wrapping + inner
···
181
}
182
}
183
184
+
/// Simplified info from service DID docs
185
+
#[derive(Debug, Clone, Serialize, Deserialize)]
186
+
pub struct MiniServiceDoc {
187
+
services: Vec<MiniService>,
188
+
}
189
+
190
+
impl MiniServiceDoc {
191
+
pub fn get(&self, id_fragment: &str, service_type: Option<&str>) -> Option<&MiniService> {
192
+
self.services.iter().find(|ms| {
193
+
ms.full_id.ends_with(id_fragment)
194
+
&& service_type.map(|t| t == ms.r#type).unwrap_or(true)
195
+
})
196
+
}
197
+
}
198
+
199
+
/// The corresponding service info
200
+
#[derive(Debug, Clone, Serialize, Deserialize)]
201
+
pub struct MiniService {
202
+
/// The full id
203
+
///
204
+
/// for informational purposes only -- services are deduplicated by id fragment
205
+
full_id: String,
206
+
r#type: String,
207
+
/// HTTP endpoint for the actual service
208
+
pub endpoint: String,
209
+
}
210
+
211
+
impl TryFrom<DidDocument> for MiniServiceDoc {
212
+
type Error = String;
213
+
fn try_from(did_doc: DidDocument) -> Result<Self, Self::Error> {
214
+
let mut services = Vec::new();
215
+
let mut seen = HashSet::new();
216
+
217
+
for DidDocServic {
218
+
id,
219
+
r#type,
220
+
service_endpoint,
221
+
} in did_doc.service.unwrap_or(vec![])
222
+
{
223
+
let Some((_, id_fragment)) = id.rsplit_once('#') else {
224
+
continue;
225
+
};
226
+
if !seen.insert((id_fragment.to_string(), r#type.clone())) {
227
+
continue;
228
+
}
229
+
services.push(MiniService {
230
+
full_id: id,
231
+
r#type,
232
+
endpoint: service_endpoint,
233
+
});
234
+
}
235
+
236
+
Ok(Self { services })
237
+
}
238
+
}
239
+
240
/// multi-producer *single-consumer* queue structures (wrap in arc-mutex plz)
241
///
242
/// the hashset allows testing for presense of items in the queue.
···
365
let now = UtcDateTime::now();
366
let IdentityVal(last_fetch, data) = entry.value();
367
match data {
368
IdentityData::NotFound => {
369
if (now - *last_fetch) >= MIN_NOT_FOUND_TTL {
370
metrics::counter!("identity_handle_refresh_queued", "reason" => "ttl", "found" => "false").increment(1);
···
378
self.queue_refresh(key).await;
379
}
380
Ok(Some(did.clone()))
381
+
}
382
+
_ => {
383
+
log::error!("identity value mixup: got a doc from a handle key (should be a did)");
384
+
Err(IdentityError::IdentityValTypeMixup(handle.to_string()))
385
}
386
}
387
}
···
431
let now = UtcDateTime::now();
432
let IdentityVal(last_fetch, data) = entry.value();
433
match data {
434
IdentityData::NotFound => {
435
if (now - *last_fetch) >= MIN_NOT_FOUND_TTL {
436
metrics::counter!("identity_did_refresh_queued", "reason" => "ttl", "found" => "false").increment(1);
···
438
}
439
Ok(None)
440
}
441
+
IdentityData::Doc(mini_doc) => {
442
if (now - *last_fetch) >= MIN_TTL {
443
metrics::counter!("identity_did_refresh_queued", "reason" => "ttl", "found" => "true").increment(1);
444
self.queue_refresh(key).await;
445
}
446
+
Ok(Some(mini_doc.clone()))
447
+
}
448
+
_ => {
449
+
log::error!("identity value mixup: got a doc from a handle key (should be a did)");
450
+
Err(IdentityError::IdentityValTypeMixup(did.to_string()))
451
+
}
452
+
}
453
+
}
454
+
455
+
/// Fetch (and cache) a service mini doc from a did
456
+
pub async fn did_to_mini_service_doc(
457
+
&self,
458
+
did: &Did,
459
+
) -> Result<Option<MiniServiceDoc>, IdentityError> {
460
+
let key = IdentityKey::ServiceDid(did.clone());
461
+
metrics::counter!("slingshot_get_service_did_doc").increment(1);
462
+
let entry = self
463
+
.cache
464
+
.get_or_fetch(&key, {
465
+
let did = did.clone();
466
+
let resolver = self.did_resolver.clone();
467
+
|| async move {
468
+
let t0 = Instant::now();
469
+
let (res, success) = match resolver.resolve(&did).await {
470
+
Ok(did_doc) if did_doc.id != did.to_string() => (
471
+
// TODO: fix in atrium: should verify id is did
472
+
Err(IdentityError::BadDidDoc(
473
+
"did doc's id did not match did".to_string(),
474
+
)),
475
+
"false",
476
+
),
477
+
Ok(did_doc) => match did_doc.try_into() {
478
+
Ok(mini_service_doc) => (
479
+
Ok(IdentityVal(
480
+
UtcDateTime::now(),
481
+
IdentityData::ServiceDoc(mini_service_doc),
482
+
)),
483
+
"true",
484
+
),
485
+
Err(e) => (Err(IdentityError::BadDidDoc(e)), "false"),
486
+
},
487
+
Err(atrium_identity::Error::NotFound) => (
488
+
Ok(IdentityVal(UtcDateTime::now(), IdentityData::NotFound)),
489
+
"false",
490
+
),
491
+
Err(other) => (Err(IdentityError::ResolutionFailed(other)), "false"),
492
+
};
493
+
metrics::histogram!("slingshot_fetch_service_did_doc", "success" => success)
494
+
.record(t0.elapsed());
495
+
res
496
+
}
497
+
})
498
+
.await?;
499
+
500
+
let now = UtcDateTime::now();
501
+
let IdentityVal(last_fetch, data) = entry.value();
502
+
match data {
503
+
IdentityData::NotFound => {
504
+
if (now - *last_fetch) >= MIN_NOT_FOUND_TTL {
505
+
metrics::counter!("identity_service_did_refresh_queued", "reason" => "ttl", "found" => "false").increment(1);
506
+
self.queue_refresh(key).await;
507
+
}
508
+
Ok(None)
509
+
}
510
+
IdentityData::ServiceDoc(mini_service_doc) => {
511
+
if (now - *last_fetch) >= MIN_TTL {
512
+
metrics::counter!("identity_service_did_refresh_queued", "reason" => "ttl", "found" => "true").increment(1);
513
+
self.queue_refresh(key).await;
514
+
}
515
+
Ok(Some(mini_service_doc.clone()))
516
+
}
517
+
_ => {
518
+
log::error!(
519
+
"identity value mixup: got a doc from a different key type (should be a service did)"
520
+
);
521
+
Err(IdentityError::IdentityValTypeMixup(did.to_string()))
522
}
523
}
524
}
···
659
log::warn!(
660
"refreshed did doc failed: wrong did doc id. dropping refresh."
661
);
662
+
self.complete_refresh(&task_key).await?;
663
continue;
664
}
665
let mini_doc = match did_doc.try_into() {
···
667
Err(e) => {
668
metrics::counter!("identity_did_refresh", "success" => "false", "reason" => "bad doc").increment(1);
669
log::warn!(
670
+
"converting mini doc for {did:?} failed: {e:?}. dropping refresh."
671
);
672
+
self.complete_refresh(&task_key).await?;
673
continue;
674
}
675
};
···
696
}
697
698
self.complete_refresh(&task_key).await?; // failures are bugs, so break loop
699
+
}
700
+
IdentityKey::ServiceDid(ref did) => {
701
+
log::trace!("refreshing service did doc: {did:?}");
702
+
703
+
match self.did_resolver.resolve(did).await {
704
+
Ok(did_doc) => {
705
+
// TODO: fix in atrium: should verify id is did
706
+
if did_doc.id != did.to_string() {
707
+
metrics::counter!("identity_service_did_refresh", "success" => "false", "reason" => "wrong did").increment(1);
708
+
log::warn!(
709
+
"refreshed did doc failed: wrong did doc id. dropping refresh."
710
+
);
711
+
self.complete_refresh(&task_key).await?;
712
+
continue;
713
+
}
714
+
let mini_service_doc = match did_doc.try_into() {
715
+
Ok(md) => md,
716
+
Err(e) => {
717
+
metrics::counter!("identity_service_did_refresh", "success" => "false", "reason" => "bad doc").increment(1);
718
+
log::warn!(
719
+
"converting mini service doc failed: {e:?}. dropping refresh."
720
+
);
721
+
self.complete_refresh(&task_key).await?;
722
+
continue;
723
+
}
724
+
};
725
+
metrics::counter!("identity_service_did_refresh", "success" => "true")
726
+
.increment(1);
727
+
self.cache.insert(
728
+
task_key.clone(),
729
+
IdentityVal(
730
+
UtcDateTime::now(),
731
+
IdentityData::ServiceDoc(mini_service_doc),
732
+
),
733
+
);
734
+
}
735
+
Err(atrium_identity::Error::NotFound) => {
736
+
metrics::counter!("identity_service_did_refresh", "success" => "false", "reason" => "not found").increment(1);
737
+
self.cache.insert(
738
+
task_key.clone(),
739
+
IdentityVal(UtcDateTime::now(), IdentityData::NotFound),
740
+
);
741
+
}
742
+
Err(err) => {
743
+
metrics::counter!("identity_service_did_refresh", "success" => "false", "reason" => "other").increment(1);
744
+
log::warn!(
745
+
"failed to refresh did doc: {err:?}. leaving stale (should we eventually do something?)"
746
+
);
747
+
}
748
+
}
749
}
750
}
751
}
History
2 rounds
2 comments
bad-example.com
submitted
#1
8 commits
expand
collapse
very hack proxy stuff (wip)
identities!!!!
service doc cache
proxy: use service cache
fmt
include headers, timeout hydration, measure times
base_url, typed url parts, api fixups,
only try to hydrate 100 records
no conflicts, ready to merge
expand 0 comments
bad-example.com
submitted
#0
1 commit
expand
collapse
very hack proxy stuff (wip)
the record contents from
com.atproto.repo.getRecordare returned under a key called "value", so this should probably use that.