+1
Cargo.lock
+1
Cargo.lock
+1
-1
slingshot/Cargo.toml
+1
-1
slingshot/Cargo.toml
+10
slingshot/src/error.rs
+10
slingshot/src/error.rs
···
91
#[error("upstream non-atproto bad request")]
92
UpstreamBadBadNotGoodRequest(reqwest::Error),
93
}
94
+
95
+
#[derive(Debug, Error)]
96
+
pub enum ProxyError {
97
+
#[error("failed to parse path: {0}")]
98
+
PathParseError(String),
99
+
#[error(transparent)]
100
+
UrlParseError(#[from] url::ParseError),
101
+
#[error(transparent)]
102
+
ReqwestError(#[from] reqwest::Error),
103
+
}
+2
slingshot/src/lib.rs
+2
slingshot/src/lib.rs
···
3
mod firehose_cache;
4
mod healthcheck;
5
mod identity;
6
mod record;
7
mod server;
8
···
10
pub use firehose_cache::firehose_cache;
11
pub use healthcheck::healthcheck;
12
pub use identity::{Identity, IdentityKey};
13
pub use record::{CachedRecord, ErrorResponseObject, Repo};
14
pub use server::serve;
···
3
mod firehose_cache;
4
mod healthcheck;
5
mod identity;
6
+
mod proxy;
7
mod record;
8
mod server;
9
···
11
pub use firehose_cache::firehose_cache;
12
pub use healthcheck::healthcheck;
13
pub use identity::{Identity, IdentityKey};
14
+
pub use proxy::Proxy;
15
pub use record::{CachedRecord, ErrorResponseObject, Repo};
16
pub use server::serve;
+4
-3
slingshot/src/main.rs
+4
-3
slingshot/src/main.rs
···
2
// use foyer::{Engine, DirectFsDeviceOptions, HybridCacheBuilder};
3
use metrics_exporter_prometheus::PrometheusBuilder;
4
use slingshot::{
5
-
Identity, Repo, consume, error::MainTaskError, firehose_cache, healthcheck, serve,
6
};
7
use std::net::SocketAddr;
8
use std::path::PathBuf;
···
143
)
144
.await
145
.map_err(|e| format!("identity setup failed: {e:?}"))?;
146
-
147
-
log::info!("identity service ready.");
148
let identity_refresher = identity.clone();
149
let identity_shutdown = shutdown.clone();
150
tasks.spawn(async move {
151
identity_refresher.run_refresher(identity_shutdown).await?;
152
Ok(())
153
});
154
155
let repo = Repo::new(identity.clone());
156
157
let identity_for_server = identity.clone();
158
let server_shutdown = shutdown.clone();
···
163
server_cache_handle,
164
identity_for_server,
165
repo,
166
args.acme_domain,
167
args.acme_contact,
168
args.acme_cache_path,
···
2
// use foyer::{Engine, DirectFsDeviceOptions, HybridCacheBuilder};
3
use metrics_exporter_prometheus::PrometheusBuilder;
4
use slingshot::{
5
+
Identity, Proxy, Repo, consume, error::MainTaskError, firehose_cache, healthcheck, serve,
6
};
7
use std::net::SocketAddr;
8
use std::path::PathBuf;
···
143
)
144
.await
145
.map_err(|e| format!("identity setup failed: {e:?}"))?;
146
let identity_refresher = identity.clone();
147
let identity_shutdown = shutdown.clone();
148
tasks.spawn(async move {
149
identity_refresher.run_refresher(identity_shutdown).await?;
150
Ok(())
151
});
152
+
log::info!("identity service ready.");
153
154
let repo = Repo::new(identity.clone());
155
+
let proxy = Proxy::new(repo.clone());
156
157
let identity_for_server = identity.clone();
158
let server_shutdown = shutdown.clone();
···
163
server_cache_handle,
164
identity_for_server,
165
repo,
166
+
proxy,
167
args.acme_domain,
168
args.acme_contact,
169
args.acme_cache_path,
+487
slingshot/src/proxy.rs
+487
slingshot/src/proxy.rs
···
···
1
+
use serde::Deserialize;
2
+
use url::Url;
3
+
use std::{collections::HashMap, time::Duration};
4
+
use crate::{Repo, server::HydrationSource, error::ProxyError};
5
+
use reqwest::Client;
6
+
use serde_json::{Map, Value};
7
+
8
+
pub enum ParamValue {
9
+
String(Vec<String>),
10
+
Int(Vec<i64>),
11
+
Bool(Vec<bool>),
12
+
}
13
+
pub struct Params(HashMap<String, ParamValue>);
14
+
15
+
impl TryFrom<Map<String, Value>> for Params {
16
+
type Error = (); // TODO
17
+
fn try_from(val: Map<String, Value>) -> Result<Self, Self::Error> {
18
+
let mut out = HashMap::new();
19
+
for (k, v) in val {
20
+
match v {
21
+
Value::String(s) => out.insert(k, ParamValue::String(vec![s])),
22
+
Value::Bool(b) => out.insert(k, ParamValue::Bool(vec![b])),
23
+
Value::Number(n) => {
24
+
let Some(i) = n.as_i64() else {
25
+
return Err(());
26
+
};
27
+
out.insert(k, ParamValue::Int(vec![i]))
28
+
}
29
+
Value::Array(a) => {
30
+
let Some(first) = a.first() else {
31
+
continue;
32
+
};
33
+
if first.is_string() {
34
+
let mut vals = Vec::with_capacity(a.len());
35
+
for v in a {
36
+
let Some(v) = v.as_str() else {
37
+
return Err(());
38
+
};
39
+
vals.push(v.to_string());
40
+
}
41
+
out.insert(k, ParamValue::String(vals));
42
+
} else if first.is_i64() {
43
+
let mut vals = Vec::with_capacity(a.len());
44
+
for v in a {
45
+
let Some(v) = v.as_i64() else {
46
+
return Err(());
47
+
};
48
+
vals.push(v);
49
+
}
50
+
out.insert(k, ParamValue::Int(vals));
51
+
} else if first.is_boolean() {
52
+
let mut vals = Vec::with_capacity(a.len());
53
+
for v in a {
54
+
let Some(v) = v.as_bool() else {
55
+
return Err(());
56
+
};
57
+
vals.push(v);
58
+
}
59
+
out.insert(k, ParamValue::Bool(vals));
60
+
}
61
+
todo!();
62
+
}
63
+
_ => return Err(()),
64
+
};
65
+
}
66
+
67
+
Ok(Self(out))
68
+
}
69
+
}
70
+
71
+
#[derive(Clone)]
72
+
pub struct Proxy {
73
+
repo: Repo,
74
+
client: Client,
75
+
}
76
+
77
+
impl Proxy {
78
+
pub fn new(repo: Repo) -> Self {
79
+
let client = Client::builder()
80
+
.user_agent(format!(
81
+
"microcosm slingshot v{} (contact: @bad-example.com)",
82
+
env!("CARGO_PKG_VERSION")
83
+
))
84
+
.no_proxy()
85
+
.timeout(Duration::from_secs(6))
86
+
.build()
87
+
.unwrap();
88
+
Self { repo, client }
89
+
}
90
+
91
+
pub async fn proxy(
92
+
&self,
93
+
xrpc: String,
94
+
service: String,
95
+
params: Option<Map<String, Value>>,
96
+
) -> Result<Value, ProxyError> {
97
+
98
+
// hackin it to start
99
+
100
+
// 1. assume did-web (TODO) and get the did doc
101
+
#[derive(Debug, Deserialize)]
102
+
struct ServiceDoc {
103
+
id: String,
104
+
service: Vec<ServiceItem>,
105
+
}
106
+
#[derive(Debug, Deserialize)]
107
+
struct ServiceItem {
108
+
id: String,
109
+
#[expect(unused)]
110
+
r#type: String,
111
+
#[serde(rename = "serviceEndpoint")]
112
+
service_endpoint: Url,
113
+
}
114
+
let dw = service.strip_prefix("did:web:").expect("a did web");
115
+
let (dw, service_id) = dw.split_once("#").expect("whatever");
116
+
let mut dw_url = Url::parse(&format!("https://{dw}"))?;
117
+
dw_url.set_path("/.well-known/did.json");
118
+
let doc: ServiceDoc = self.client
119
+
.get(dw_url)
120
+
.send()
121
+
.await?
122
+
.error_for_status()?
123
+
.json()
124
+
.await?;
125
+
126
+
assert_eq!(doc.id, format!("did:web:{}", dw));
127
+
128
+
let mut upstream = None;
129
+
for ServiceItem { id, service_endpoint, .. } in doc.service {
130
+
let Some((_, id)) = id.split_once("#") else { continue; };
131
+
if id != service_id { continue; };
132
+
upstream = Some(service_endpoint);
133
+
break;
134
+
}
135
+
136
+
// 2. proxy the request forward
137
+
let mut upstream = upstream.expect("to find it");
138
+
upstream.set_path(&format!("/xrpc/{xrpc}")); // TODO: validate nsid
139
+
140
+
if let Some(params) = params {
141
+
let mut query = upstream.query_pairs_mut();
142
+
let Params(ps) = params.try_into().expect("valid params");
143
+
for (k, pvs) in ps {
144
+
match pvs {
145
+
ParamValue::String(s) => {
146
+
for s in s {
147
+
query.append_pair(&k, &s);
148
+
}
149
+
}
150
+
ParamValue::Int(i) => {
151
+
for i in i {
152
+
query.append_pair(&k, &i.to_string());
153
+
}
154
+
}
155
+
ParamValue::Bool(b) => {
156
+
for b in b {
157
+
query.append_pair(&k, &b.to_string());
158
+
}
159
+
}
160
+
}
161
+
}
162
+
}
163
+
164
+
// TODO: other headers to proxy
165
+
Ok(self.client
166
+
.get(upstream)
167
+
.send()
168
+
.await?
169
+
.error_for_status()?
170
+
.json()
171
+
.await?)
172
+
}
173
+
}
174
+
175
+
#[derive(Debug, PartialEq)]
176
+
pub enum PathPart {
177
+
Scalar(String),
178
+
Vector(String, Option<String>), // key, $type
179
+
}
180
+
181
+
pub fn parse_record_path(input: &str) -> Result<Vec<PathPart>, String> {
182
+
let mut out = Vec::new();
183
+
184
+
let mut key_acc = String::new();
185
+
let mut type_acc = String::new();
186
+
let mut in_bracket = false;
187
+
let mut chars = input.chars().enumerate();
188
+
while let Some((i, c)) = chars.next() {
189
+
match c {
190
+
'[' if in_bracket => return Err(format!("nested opening bracket not allowed, at {i}")),
191
+
'[' if key_acc.is_empty() => return Err(format!("missing key before opening bracket, at {i}")),
192
+
'[' => in_bracket = true,
193
+
']' if in_bracket => {
194
+
in_bracket = false;
195
+
let key = std::mem::take(&mut key_acc);
196
+
let r#type = std::mem::take(&mut type_acc);
197
+
let t = if r#type.is_empty() { None } else { Some(r#type) };
198
+
out.push(PathPart::Vector(key, t));
199
+
// peek ahead because we need a dot after array if there's more and i don't want to add more loop state
200
+
let Some((i, c)) = chars.next() else {
201
+
break;
202
+
};
203
+
if c != '.' {
204
+
return Err(format!("expected dot after close bracket, found {c:?} at {i}"));
205
+
}
206
+
}
207
+
']' => return Err(format!("unexpected close bracket at {i}")),
208
+
'.' if in_bracket => type_acc.push(c),
209
+
'.' if key_acc.is_empty() => return Err(format!("missing key before next segment, at {i}")),
210
+
'.' => {
211
+
let key = std::mem::take(&mut key_acc);
212
+
assert!(type_acc.is_empty());
213
+
out.push(PathPart::Scalar(key));
214
+
}
215
+
_ if in_bracket => type_acc.push(c),
216
+
_ => key_acc.push(c),
217
+
}
218
+
}
219
+
if in_bracket {
220
+
return Err("unclosed bracket".into());
221
+
}
222
+
if !key_acc.is_empty() {
223
+
out.push(PathPart::Scalar(key_acc));
224
+
}
225
+
Ok(out)
226
+
}
227
+
228
+
#[derive(Debug, Clone, Copy, PartialEq)]
229
+
pub enum RefShape {
230
+
StrongRef,
231
+
AtUri,
232
+
AtUriParts,
233
+
Did,
234
+
Handle,
235
+
AtIdentifier,
236
+
}
237
+
238
+
impl TryFrom<&str> for RefShape {
239
+
type Error = String;
240
+
fn try_from(s: &str) -> Result<Self, Self::Error> {
241
+
match s {
242
+
"strong-ref" => Ok(Self::StrongRef),
243
+
"at-uri" => Ok(Self::AtUri),
244
+
"at-uri-parts" => Ok(Self::AtUriParts),
245
+
"did" => Ok(Self::Did),
246
+
"handle" => Ok(Self::Handle),
247
+
"at-identifier" => Ok(Self::AtIdentifier),
248
+
_ => Err(format!("unknown shape: {s}")),
249
+
}
250
+
}
251
+
}
252
+
253
+
#[derive(Debug, PartialEq)]
254
+
pub enum MatchedRef {
255
+
AtUri {
256
+
uri: String,
257
+
cid: Option<String>,
258
+
},
259
+
Identifier(String),
260
+
}
261
+
262
+
pub fn match_shape(shape: RefShape, val: &Value) -> Option<MatchedRef> {
263
+
// TODO: actually validate at-uri format
264
+
// TODO: actually validate everything else also
265
+
// TODO: should this function normalize identifiers to DIDs probably?
266
+
// or just return at-uri parts so the caller can resolve and reassemble
267
+
match shape {
268
+
RefShape::StrongRef => {
269
+
let o = val.as_object()?;
270
+
let uri = o.get("uri")?.as_str()?.to_string();
271
+
let cid = o.get("cid")?.as_str()?.to_string();
272
+
Some(MatchedRef::AtUri { uri, cid: Some(cid) })
273
+
}
274
+
RefShape::AtUri => {
275
+
let uri = val.as_str()?.to_string();
276
+
Some(MatchedRef::AtUri { uri, cid: None })
277
+
}
278
+
RefShape::AtUriParts => {
279
+
let o = val.as_object()?;
280
+
let identifier = o.get("repo").or(o.get("did"))?.as_str()?.to_string();
281
+
let collection = o.get("collection")?.as_str()?.to_string();
282
+
let rkey = o.get("rkey")?.as_str()?.to_string();
283
+
let uri = format!("at://{identifier}/{collection}/{rkey}");
284
+
let cid = o.get("cid").and_then(|v| v.as_str()).map(str::to_string);
285
+
Some(MatchedRef::AtUri { uri, cid })
286
+
}
287
+
RefShape::Did => {
288
+
let id = val.as_str()?;
289
+
if !id.starts_with("did:") {
290
+
return None;
291
+
}
292
+
Some(MatchedRef::Identifier(id.to_string()))
293
+
}
294
+
RefShape::Handle => {
295
+
let id = val.as_str()?;
296
+
if id.contains(':') {
297
+
return None;
298
+
}
299
+
Some(MatchedRef::Identifier(id.to_string()))
300
+
}
301
+
RefShape::AtIdentifier => {
302
+
Some(MatchedRef::Identifier(val.as_str()?.to_string()))
303
+
}
304
+
}
305
+
}
306
+
307
+
// TODO: send back metadata about the matching
308
+
pub fn extract_links(
309
+
sources: Vec<HydrationSource>,
310
+
skeleton: &Value,
311
+
) -> Result<Vec<MatchedRef>, String> {
312
+
// collect early to catch errors from the client
313
+
// (TODO maybe the handler should do this and pass in the processed stuff probably definitely yeah)
314
+
let sources = sources
315
+
.into_iter()
316
+
.map(|HydrationSource { path, shape }| {
317
+
let path_parts = parse_record_path(&path)?;
318
+
let shape: RefShape = shape.as_str().try_into()?;
319
+
Ok((path_parts, shape))
320
+
})
321
+
.collect::<Result<Vec<_>, String>>()?;
322
+
323
+
// lazy first impl, just re-walk the skeleton as many times as needed
324
+
// not deduplicating for now
325
+
let mut out = Vec::new();
326
+
for (path_parts, shape) in sources {
327
+
for val in PathWalker::new(&path_parts, skeleton) {
328
+
if let Some(matched) = match_shape(shape, val) {
329
+
out.push(matched);
330
+
}
331
+
}
332
+
}
333
+
334
+
Ok(out)
335
+
}
336
+
337
+
struct PathWalker<'a> {
338
+
todo: Vec<(&'a [PathPart], &'a Value)>,
339
+
}
340
+
impl<'a> PathWalker<'a> {
341
+
fn new(path_parts: &'a [PathPart], skeleton: &'a Value) -> Self {
342
+
Self { todo: vec![(path_parts, skeleton)] }
343
+
}
344
+
}
345
+
impl<'a> Iterator for PathWalker<'a> {
346
+
type Item = &'a Value;
347
+
fn next(&mut self) -> Option<Self::Item> {
348
+
loop {
349
+
let (parts, val) = self.todo.pop()?;
350
+
let Some((part, rest)) = parts.split_first() else {
351
+
return Some(val);
352
+
};
353
+
let Some(o) = val.as_object() else {
354
+
continue;
355
+
};
356
+
match part {
357
+
PathPart::Scalar(k) => {
358
+
let Some(v) = o.get(k) else {
359
+
continue;
360
+
};
361
+
self.todo.push((rest, v));
362
+
}
363
+
PathPart::Vector(k, t) => {
364
+
let Some(a) = o.get(k).and_then(|v| v.as_array()) else {
365
+
continue;
366
+
};
367
+
for v in a
368
+
.iter()
369
+
.rev()
370
+
.filter(|c| {
371
+
let Some(t) = t else { return true };
372
+
c
373
+
.as_object()
374
+
.and_then(|o| o.get("$type"))
375
+
.and_then(|v| v.as_str())
376
+
.map(|s| s == t)
377
+
.unwrap_or(false)
378
+
})
379
+
{
380
+
self.todo.push((rest, v))
381
+
}
382
+
}
383
+
}
384
+
}
385
+
}
386
+
}
387
+
388
+
389
+
#[cfg(test)]
390
+
mod tests {
391
+
use super::*;
392
+
use serde_json::json;
393
+
394
+
#[test]
395
+
fn test_parse_record_path() -> Result<(), Box<dyn std::error::Error>> {
396
+
let cases = [
397
+
("", vec![]),
398
+
("subject", vec![PathPart::Scalar("subject".into())]),
399
+
("authorDid", vec![PathPart::Scalar("authorDid".into())]),
400
+
("subject.uri", vec![PathPart::Scalar("subject".into()), PathPart::Scalar("uri".into())]),
401
+
("members[]", vec![PathPart::Vector("members".into(), None)]),
402
+
("add[].key", vec![
403
+
PathPart::Vector("add".into(), None),
404
+
PathPart::Scalar("key".into()),
405
+
]),
406
+
("a[b]", vec![PathPart::Vector("a".into(), Some("b".into()))]),
407
+
("a[b.c]", vec![PathPart::Vector("a".into(), Some("b.c".into()))]),
408
+
("facets[app.bsky.richtext.facet].features[app.bsky.richtext.facet#mention].did", vec![
409
+
PathPart::Vector("facets".into(), Some("app.bsky.richtext.facet".into())),
410
+
PathPart::Vector("features".into(), Some("app.bsky.richtext.facet#mention".into())),
411
+
PathPart::Scalar("did".into()),
412
+
]),
413
+
];
414
+
415
+
for (path, expected) in cases {
416
+
let parsed = parse_record_path(path)?;
417
+
assert_eq!(parsed, expected, "path: {path:?}");
418
+
}
419
+
420
+
Ok(())
421
+
}
422
+
423
+
#[test]
424
+
fn test_match_shape() {
425
+
let cases = [
426
+
("strong-ref", json!(""), None),
427
+
("strong-ref", json!({}), None),
428
+
("strong-ref", json!({ "uri": "abc" }), None),
429
+
("strong-ref", json!({ "cid": "def" }), None),
430
+
(
431
+
"strong-ref",
432
+
json!({ "uri": "abc", "cid": "def" }),
433
+
Some(MatchedRef::AtUri { uri: "abc".to_string(), cid: Some("def".to_string()) }),
434
+
),
435
+
("at-uri", json!({ "uri": "abc" }), None),
436
+
("at-uri", json!({ "uri": "abc", "cid": "def" }), None),
437
+
(
438
+
"at-uri",
439
+
json!("abc"),
440
+
Some(MatchedRef::AtUri { uri: "abc".to_string(), cid: None }),
441
+
),
442
+
("at-uri-parts", json!("abc"), None),
443
+
("at-uri-parts", json!({}), None),
444
+
(
445
+
"at-uri-parts",
446
+
json!({"repo": "a", "collection": "b", "rkey": "c"}),
447
+
Some(MatchedRef::AtUri { uri: "at://a/b/c".to_string(), cid: None }),
448
+
),
449
+
(
450
+
"at-uri-parts",
451
+
json!({"did": "a", "collection": "b", "rkey": "c"}),
452
+
Some(MatchedRef::AtUri { uri: "at://a/b/c".to_string(), cid: None }),
453
+
),
454
+
(
455
+
"at-uri-parts",
456
+
// 'repo' takes precedence over 'did'
457
+
json!({"did": "a", "repo": "z", "collection": "b", "rkey": "c"}),
458
+
Some(MatchedRef::AtUri { uri: "at://z/b/c".to_string(), cid: None }),
459
+
),
460
+
(
461
+
"at-uri-parts",
462
+
json!({"repo": "a", "collection": "b", "rkey": "c", "cid": "def"}),
463
+
Some(MatchedRef::AtUri { uri: "at://a/b/c".to_string(), cid: Some("def".to_string()) }),
464
+
),
465
+
(
466
+
"at-uri-parts",
467
+
json!({"repo": "a", "collection": "b", "rkey": "c", "cid": {}}),
468
+
Some(MatchedRef::AtUri { uri: "at://a/b/c".to_string(), cid: None }),
469
+
),
470
+
("did", json!({}), None),
471
+
("did", json!(""), None),
472
+
("did", json!("bad-example.com"), None),
473
+
("did", json!("did:plc:xyz"), Some(MatchedRef::Identifier("did:plc:xyz".to_string()))),
474
+
("handle", json!({}), None),
475
+
("handle", json!("bad-example.com"), Some(MatchedRef::Identifier("bad-example.com".to_string()))),
476
+
("handle", json!("did:plc:xyz"), None),
477
+
("at-identifier", json!({}), None),
478
+
("at-identifier", json!("bad-example.com"), Some(MatchedRef::Identifier("bad-example.com".to_string()))),
479
+
("at-identifier", json!("did:plc:xyz"), Some(MatchedRef::Identifier("did:plc:xyz".to_string()))),
480
+
];
481
+
for (shape, val, expected) in cases {
482
+
let s = shape.try_into().unwrap();
483
+
let matched = match_shape(s, &val);
484
+
assert_eq!(matched, expected, "shape: {shape:?}, val: {val:?}");
485
+
}
486
+
}
487
+
}
+2
-2
slingshot/src/record.rs
+2
-2
slingshot/src/record.rs
+277
-6
slingshot/src/server.rs
+277
-6
slingshot/src/server.rs
···
1
use crate::{
2
-
CachedRecord, ErrorResponseObject, Identity, Repo,
3
error::{RecordError, ServerError},
4
};
5
use atrium_api::types::string::{Cid, Did, Handle, Nsid, RecordKey};
6
use foyer::HybridCache;
7
use links::at_uri::parse_at_uri as normalize_at_uri;
8
use serde::Serialize;
9
-
use std::path::PathBuf;
10
-
use std::str::FromStr;
11
-
use std::sync::Arc;
12
-
use std::time::Instant;
13
use tokio_util::sync::CancellationToken;
14
15
use poem::{
···
24
};
25
use poem_openapi::{
26
ApiResponse, ContactObject, ExternalDocumentObject, Object, OpenApi, OpenApiService, Tags,
27
param::Query, payload::Json, types::Example,
28
};
29
···
92
}))
93
}
94
95
fn bad_request_handler_resolve_handle(err: poem::Error) -> JustDidResponse {
96
JustDidResponse::BadRequest(Json(XrpcErrorResponseObject {
97
error: "InvalidRequest".to_string(),
···
190
BadRequest(XrpcError),
191
}
192
193
#[derive(Object)]
194
#[oai(example = true)]
195
struct FoundDidResponseObject {
···
221
struct Xrpc {
222
cache: HybridCache<String, CachedRecord>,
223
identity: Identity,
224
repo: Arc<Repo>,
225
}
226
···
550
}))
551
}
552
553
async fn get_record_impl(
554
&self,
555
repo: String,
···
748
cache: HybridCache<String, CachedRecord>,
749
identity: Identity,
750
repo: Repo,
751
acme_domain: Option<String>,
752
acme_contact: Option<String>,
753
acme_cache_path: Option<PathBuf>,
···
756
bind: std::net::SocketAddr,
757
) -> Result<(), ServerError> {
758
let repo = Arc::new(repo);
759
let api_service = OpenApiService::new(
760
Xrpc {
761
cache,
762
identity,
763
repo,
764
},
765
"Slingshot",
···
823
.with(
824
Cors::new()
825
.allow_origin_regex("*")
826
-
.allow_methods([Method::GET])
827
.allow_credentials(false),
828
)
829
.with(CatchPanic::new())
···
1
use crate::{
2
+
CachedRecord, ErrorResponseObject, Identity, Proxy, Repo,
3
error::{RecordError, ServerError},
4
+
proxy::{extract_links, MatchedRef},
5
+
record::RawRecord,
6
};
7
use atrium_api::types::string::{Cid, Did, Handle, Nsid, RecordKey};
8
use foyer::HybridCache;
9
use links::at_uri::parse_at_uri as normalize_at_uri;
10
use serde::Serialize;
11
+
use std::{path::PathBuf, str::FromStr, sync::Arc, time::Instant, collections::HashMap};
12
+
use tokio::sync::mpsc;
13
use tokio_util::sync::CancellationToken;
14
15
use poem::{
···
24
};
25
use poem_openapi::{
26
ApiResponse, ContactObject, ExternalDocumentObject, Object, OpenApi, OpenApiService, Tags,
27
+
Union,
28
param::Query, payload::Json, types::Example,
29
};
30
···
93
}))
94
}
95
96
+
fn bad_request_handler_proxy_query(err: poem::Error) -> ProxyHydrateResponse {
97
+
ProxyHydrateResponse::BadRequest(Json(XrpcErrorResponseObject {
98
+
error: "InvalidRequest".to_string(),
99
+
message: format!("Bad request, here's some info that maybe should not be exposed: {err}"),
100
+
}))
101
+
}
102
+
103
fn bad_request_handler_resolve_handle(err: poem::Error) -> JustDidResponse {
104
JustDidResponse::BadRequest(Json(XrpcErrorResponseObject {
105
error: "InvalidRequest".to_string(),
···
198
BadRequest(XrpcError),
199
}
200
201
+
#[derive(Object)]
202
+
struct ProxyHydrationError {
203
+
reason: String,
204
+
}
205
+
206
+
#[derive(Object)]
207
+
struct ProxyHydrationPending {
208
+
url: String,
209
+
}
210
+
211
+
#[derive(Object)]
212
+
struct ProxyHydrationRecordFound {
213
+
record: serde_json::Value,
214
+
}
215
+
216
+
#[derive(Object)]
217
+
struct ProxyHydrationIdentifierFound {
218
+
record: MiniDocResponseObject,
219
+
}
220
+
221
+
// todo: there's gotta be a supertrait that collects these?
222
+
use poem_openapi::types::{Type, ToJSON, ParseFromJSON, IsObjectType};
223
+
224
+
#[derive(Union)]
225
+
#[oai(discriminator_name = "status", rename_all = "camelCase")]
226
+
enum Hydration<T: Send + Sync + Type + ToJSON + ParseFromJSON + IsObjectType> {
227
+
Error(ProxyHydrationError),
228
+
Pending(ProxyHydrationPending),
229
+
Found(T),
230
+
}
231
+
232
+
#[derive(Object)]
233
+
#[oai(example = true)]
234
+
struct ProxyHydrateResponseObject {
235
+
/// The original upstream response content
236
+
output: serde_json::Value,
237
+
/// Any hydrated records
238
+
records: HashMap<String, Hydration<ProxyHydrationRecordFound>>,
239
+
/// Any hydrated identifiers
240
+
identifiers: HashMap<String, Hydration<ProxyHydrationIdentifierFound>>,
241
+
}
242
+
impl Example for ProxyHydrateResponseObject {
243
+
fn example() -> Self {
244
+
Self {
245
+
output: serde_json::json!({}),
246
+
records: HashMap::from([
247
+
("asdf".into(), Hydration::Pending(ProxyHydrationPending { url: "todo".into() })),
248
+
]),
249
+
identifiers: HashMap::new(),
250
+
}
251
+
}
252
+
}
253
+
254
+
#[derive(ApiResponse)]
255
+
#[oai(bad_request_handler = "bad_request_handler_proxy_query")]
256
+
enum ProxyHydrateResponse {
257
+
#[oai(status = 200)]
258
+
Ok(Json<ProxyHydrateResponseObject>),
259
+
#[oai(status = 400)]
260
+
BadRequest(XrpcError)
261
+
}
262
+
263
+
#[derive(Object)]
264
+
pub struct HydrationSource {
265
+
/// Record Path syntax for locating fields
266
+
pub path: String,
267
+
/// What to expect at the path: 'strong-ref', 'at-uri', 'at-uri-parts', 'did', 'handle', or 'at-identifier'.
268
+
///
269
+
/// - `strong-ref`: object in the shape of `com.atproto.repo.strongRef` with `uri` and `cid` keys.
270
+
/// - `at-uri`: string, must have all segments present (identifier, collection, rkey)
271
+
/// - `at-uri-parts`: object with keys (`repo` or `did`), `collection`, `rkey`, and optional `cid`. Other keys may be present and will be ignored.
272
+
/// - `did`: string, `did` format
273
+
/// - `handle`: string, `handle` format
274
+
/// - `at-identifier`: string, `did` or `handle` format
275
+
pub shape: String,
276
+
}
277
+
278
+
#[derive(Object)]
279
+
#[oai(example = true)]
280
+
struct ProxyQueryPayload {
281
+
/// The NSID of the XRPC you wish to forward
282
+
xrpc: String,
283
+
/// The destination service the request will be forwarded to
284
+
atproto_proxy: String,
285
+
/// The `params` for the destination service XRPC endpoint
286
+
///
287
+
/// Currently this will be passed along unchecked, but a future version of
288
+
/// slingshot may attempt to do lexicon resolution to validate `params`
289
+
/// based on the upstream service
290
+
params: Option<serde_json::Value>,
291
+
/// Paths within the response to look for at-uris that can be hydrated
292
+
hydration_sources: Vec<HydrationSource>,
293
+
// todo: deadline thing
294
+
295
+
}
296
+
impl Example for ProxyQueryPayload {
297
+
fn example() -> Self {
298
+
Self {
299
+
xrpc: "app.bsky.feed.getFeedSkeleton".to_string(),
300
+
atproto_proxy: "did:web:blue.mackuba.eu#bsky_fg".to_string(),
301
+
params: Some(serde_json::json!({
302
+
"feed": "at://did:plc:oio4hkxaop4ao4wz2pp3f4cr/app.bsky.feed.generator/atproto",
303
+
})),
304
+
hydration_sources: vec![
305
+
HydrationSource {
306
+
path: "feed[].post".to_string(),
307
+
shape: "at-uri".to_string(),
308
+
}
309
+
],
310
+
}
311
+
}
312
+
}
313
+
314
#[derive(Object)]
315
#[oai(example = true)]
316
struct FoundDidResponseObject {
···
342
struct Xrpc {
343
cache: HybridCache<String, CachedRecord>,
344
identity: Identity,
345
+
proxy: Arc<Proxy>,
346
repo: Arc<Repo>,
347
}
348
···
672
}))
673
}
674
675
+
/// com.bad-example.proxy.hydrateQueryResponse
676
+
///
677
+
/// > [!important]
678
+
/// > Unstable! This endpoint is experimental and may change.
679
+
///
680
+
/// Fetch + include records referenced from an upstream xrpc query response
681
+
#[oai(
682
+
path = "/com.bad-example.proxy.hydrateQueryResponse",
683
+
method = "post",
684
+
tag = "ApiTags::Custom"
685
+
)]
686
+
async fn proxy_hydrate_query(
687
+
&self,
688
+
Json(payload): Json<ProxyQueryPayload>,
689
+
) -> ProxyHydrateResponse {
690
+
// TODO: the Accept request header, if present, gotta be json
691
+
// TODO: find any Authorization header and verify it. TBD about `aud`.
692
+
693
+
let params = if let Some(p) = payload.params {
694
+
let serde_json::Value::Object(map) = p else {
695
+
panic!("params have to be an object");
696
+
};
697
+
Some(map)
698
+
} else { None };
699
+
700
+
match self.proxy.proxy(
701
+
payload.xrpc,
702
+
payload.atproto_proxy,
703
+
params,
704
+
).await {
705
+
Ok(skeleton) => {
706
+
let links = match extract_links(payload.hydration_sources, &skeleton) {
707
+
Ok(l) => l,
708
+
Err(e) => {
709
+
log::warn!("problem extracting: {e:?}");
710
+
return ProxyHydrateResponse::BadRequest(xrpc_error("oop", "sorry, error extracting"))
711
+
}
712
+
};
713
+
let mut records = HashMap::new();
714
+
let mut identifiers = HashMap::new();
715
+
716
+
enum GetThing {
717
+
Record(String, Hydration<ProxyHydrationRecordFound>),
718
+
Identifier(String, Hydration<ProxyHydrationIdentifierFound>),
719
+
}
720
+
721
+
let (tx, mut rx) = mpsc::channel(1);
722
+
723
+
for link in links {
724
+
match link {
725
+
MatchedRef::AtUri { uri, cid } => {
726
+
if records.contains_key(&uri) {
727
+
log::warn!("skipping duplicate record without checking cid");
728
+
continue;
729
+
}
730
+
let mut u = url::Url::parse("https://example.com").unwrap();
731
+
u.query_pairs_mut().append_pair("at_uri", &uri); // BLEH todo
732
+
records.insert(uri.clone(), Hydration::Pending(ProxyHydrationPending {
733
+
url: format!("/xrpc/blue.microcosm.repo.getRecordByUri?{}", u.query().unwrap()), // TODO better; with cid, etc.
734
+
}));
735
+
let tx = tx.clone();
736
+
let identity = self.identity.clone();
737
+
let repo = self.repo.clone();
738
+
tokio::task::spawn(async move {
739
+
let rest = uri.strip_prefix("at://").unwrap();
740
+
let (identifier, rest) = rest.split_once('/').unwrap();
741
+
let (collection, rkey) = rest.split_once('/').unwrap();
742
+
743
+
let did = if identifier.starts_with("did:") {
744
+
Did::new(identifier.to_string()).unwrap()
745
+
} else {
746
+
let handle = Handle::new(identifier.to_string()).unwrap();
747
+
identity.handle_to_did(handle).await.unwrap().unwrap()
748
+
};
749
+
750
+
let res = match repo.get_record(
751
+
&did,
752
+
&Nsid::new(collection.to_string()).unwrap(),
753
+
&RecordKey::new(rkey.to_string()).unwrap(),
754
+
&cid.as_ref().map(|s| Cid::from_str(s).unwrap()),
755
+
).await {
756
+
Ok(CachedRecord::Deleted) =>
757
+
Hydration::Error(ProxyHydrationError {
758
+
reason: "record deleted".to_string(),
759
+
}),
760
+
Ok(CachedRecord::Found(RawRecord { cid: found_cid, record })) => {
761
+
if let Some(c) = cid && found_cid.as_ref().to_string() != c {
762
+
log::warn!("ignoring cid mismatch");
763
+
}
764
+
let value = serde_json::from_str(&record).unwrap();
765
+
Hydration::Found(ProxyHydrationRecordFound {
766
+
record: value,
767
+
})
768
+
}
769
+
Err(e) => {
770
+
log::warn!("finally oop {e:?}");
771
+
Hydration::Error(ProxyHydrationError {
772
+
reason: "failed to fetch record".to_string(),
773
+
})
774
+
}
775
+
};
776
+
tx.send(GetThing::Record(uri, res)).await
777
+
});
778
+
}
779
+
MatchedRef::Identifier(id) => {
780
+
if identifiers.contains_key(&id) {
781
+
continue;
782
+
}
783
+
let mut u = url::Url::parse("https://example.com").unwrap();
784
+
u.query_pairs_mut().append_pair("identifier", &id);
785
+
identifiers.insert(id, Hydration::Pending(ProxyHydrationPending {
786
+
url: format!("/xrpc/blue.microcosm.identity.resolveMiniDoc?{}", u.query().unwrap()), // gross
787
+
}));
788
+
let tx = tx.clone();
789
+
// let doc_fut = self.resolve_mini_doc();
790
+
tokio::task::spawn(async {
791
+
792
+
});
793
+
}
794
+
}
795
+
}
796
+
// so the channel can close when all are completed
797
+
// (we shoudl be doing a timeout...)
798
+
drop(tx);
799
+
800
+
while let Some(hydration) = rx.recv().await {
801
+
match hydration {
802
+
GetThing::Record(uri, h) => { records.insert(uri, h); }
803
+
GetThing::Identifier(uri, md) => { identifiers.insert(uri, md); }
804
+
};
805
+
}
806
+
807
+
ProxyHydrateResponse::Ok(Json(ProxyHydrateResponseObject {
808
+
output: skeleton,
809
+
records,
810
+
identifiers,
811
+
}))
812
+
}
813
+
Err(e) => {
814
+
log::warn!("oh no: {e:?}");
815
+
ProxyHydrateResponse::BadRequest(xrpc_error("oop", "sorry"))
816
+
}
817
+
}
818
+
819
+
}
820
+
821
async fn get_record_impl(
822
&self,
823
repo: String,
···
1016
cache: HybridCache<String, CachedRecord>,
1017
identity: Identity,
1018
repo: Repo,
1019
+
proxy: Proxy,
1020
acme_domain: Option<String>,
1021
acme_contact: Option<String>,
1022
acme_cache_path: Option<PathBuf>,
···
1025
bind: std::net::SocketAddr,
1026
) -> Result<(), ServerError> {
1027
let repo = Arc::new(repo);
1028
+
let proxy = Arc::new(proxy);
1029
let api_service = OpenApiService::new(
1030
Xrpc {
1031
cache,
1032
identity,
1033
+
proxy,
1034
repo,
1035
},
1036
"Slingshot",
···
1094
.with(
1095
Cors::new()
1096
.allow_origin_regex("*")
1097
+
.allow_methods([Method::GET, Method::POST])
1098
.allow_credentials(false),
1099
)
1100
.with(CatchPanic::new())
History
2 rounds
2 comments
bad-example.com
submitted
#1
8 commits
expand
collapse
very hack proxy stuff (wip)
identities!!!!
service doc cache
proxy: use service cache
fmt
include headers, timeout hydration, measure times
base_url, typed url parts, api fixups,
only try to hydrate 100 records
no conflicts, ready to merge
expand 0 comments
bad-example.com
submitted
#0
1 commit
expand
collapse
very hack proxy stuff (wip)
the record contents from
com.atproto.repo.getRecordare returned under a key called "value", so this should probably use that.