tangled
alpha
login
or
join now
microcosm.blue
/
microcosm-rs
65
fork
atom
Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
65
fork
atom
overview
issues
8
pulls
2
pipelines
very hack proxy stuff (wip)
bad-example.com
1 month ago
50febb8b
b636480a
+784
-12
8 changed files
expand all
collapse all
unified
split
Cargo.lock
slingshot
Cargo.toml
src
error.rs
lib.rs
main.rs
proxy.rs
record.rs
server.rs
+1
Cargo.lock
···
5965
5965
"form_urlencoded",
5966
5966
"idna",
5967
5967
"percent-encoding",
5968
5968
+
"serde",
5968
5969
]
5969
5970
5970
5971
[[package]]
+1
-1
slingshot/Cargo.toml
···
28
28
tokio = { version = "1.47.0", features = ["full"] }
29
29
tokio-util = "0.7.15"
30
30
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
31
31
-
url = "2.5.4"
31
31
+
url = { version = "2.5.4", features = ["serde"] }
+10
slingshot/src/error.rs
···
91
91
#[error("upstream non-atproto bad request")]
92
92
UpstreamBadBadNotGoodRequest(reqwest::Error),
93
93
}
94
94
+
95
95
+
#[derive(Debug, Error)]
96
96
+
pub enum ProxyError {
97
97
+
#[error("failed to parse path: {0}")]
98
98
+
PathParseError(String),
99
99
+
#[error(transparent)]
100
100
+
UrlParseError(#[from] url::ParseError),
101
101
+
#[error(transparent)]
102
102
+
ReqwestError(#[from] reqwest::Error),
103
103
+
}
+2
slingshot/src/lib.rs
···
3
3
mod firehose_cache;
4
4
mod healthcheck;
5
5
mod identity;
6
6
+
mod proxy;
6
7
mod record;
7
8
mod server;
8
9
···
10
11
pub use firehose_cache::firehose_cache;
11
12
pub use healthcheck::healthcheck;
12
13
pub use identity::{Identity, IdentityKey};
14
14
+
pub use proxy::Proxy;
13
15
pub use record::{CachedRecord, ErrorResponseObject, Repo};
14
16
pub use server::serve;
+4
-3
slingshot/src/main.rs
···
2
2
// use foyer::{Engine, DirectFsDeviceOptions, HybridCacheBuilder};
3
3
use metrics_exporter_prometheus::PrometheusBuilder;
4
4
use slingshot::{
5
5
-
Identity, Repo, consume, error::MainTaskError, firehose_cache, healthcheck, serve,
5
5
+
Identity, Proxy, Repo, consume, error::MainTaskError, firehose_cache, healthcheck, serve,
6
6
};
7
7
use std::net::SocketAddr;
8
8
use std::path::PathBuf;
···
143
143
)
144
144
.await
145
145
.map_err(|e| format!("identity setup failed: {e:?}"))?;
146
146
-
147
147
-
log::info!("identity service ready.");
148
146
let identity_refresher = identity.clone();
149
147
let identity_shutdown = shutdown.clone();
150
148
tasks.spawn(async move {
151
149
identity_refresher.run_refresher(identity_shutdown).await?;
152
150
Ok(())
153
151
});
152
152
+
log::info!("identity service ready.");
154
153
155
154
let repo = Repo::new(identity.clone());
155
155
+
let proxy = Proxy::new(repo.clone());
156
156
157
157
let identity_for_server = identity.clone();
158
158
let server_shutdown = shutdown.clone();
···
163
163
server_cache_handle,
164
164
identity_for_server,
165
165
repo,
166
166
+
proxy,
166
167
args.acme_domain,
167
168
args.acme_contact,
168
169
args.acme_cache_path,
+487
slingshot/src/proxy.rs
···
1
1
+
use serde::Deserialize;
2
2
+
use url::Url;
3
3
+
use std::{collections::HashMap, time::Duration};
4
4
+
use crate::{Repo, server::HydrationSource, error::ProxyError};
5
5
+
use reqwest::Client;
6
6
+
use serde_json::{Map, Value};
7
7
+
8
8
+
pub enum ParamValue {
9
9
+
String(Vec<String>),
10
10
+
Int(Vec<i64>),
11
11
+
Bool(Vec<bool>),
12
12
+
}
13
13
+
pub struct Params(HashMap<String, ParamValue>);
14
14
+
15
15
+
impl TryFrom<Map<String, Value>> for Params {
16
16
+
type Error = (); // TODO
17
17
+
fn try_from(val: Map<String, Value>) -> Result<Self, Self::Error> {
18
18
+
let mut out = HashMap::new();
19
19
+
for (k, v) in val {
20
20
+
match v {
21
21
+
Value::String(s) => out.insert(k, ParamValue::String(vec![s])),
22
22
+
Value::Bool(b) => out.insert(k, ParamValue::Bool(vec![b])),
23
23
+
Value::Number(n) => {
24
24
+
let Some(i) = n.as_i64() else {
25
25
+
return Err(());
26
26
+
};
27
27
+
out.insert(k, ParamValue::Int(vec![i]))
28
28
+
}
29
29
+
Value::Array(a) => {
30
30
+
let Some(first) = a.first() else {
31
31
+
continue;
32
32
+
};
33
33
+
if first.is_string() {
34
34
+
let mut vals = Vec::with_capacity(a.len());
35
35
+
for v in a {
36
36
+
let Some(v) = v.as_str() else {
37
37
+
return Err(());
38
38
+
};
39
39
+
vals.push(v.to_string());
40
40
+
}
41
41
+
out.insert(k, ParamValue::String(vals));
42
42
+
} else if first.is_i64() {
43
43
+
let mut vals = Vec::with_capacity(a.len());
44
44
+
for v in a {
45
45
+
let Some(v) = v.as_i64() else {
46
46
+
return Err(());
47
47
+
};
48
48
+
vals.push(v);
49
49
+
}
50
50
+
out.insert(k, ParamValue::Int(vals));
51
51
+
} else if first.is_boolean() {
52
52
+
let mut vals = Vec::with_capacity(a.len());
53
53
+
for v in a {
54
54
+
let Some(v) = v.as_bool() else {
55
55
+
return Err(());
56
56
+
};
57
57
+
vals.push(v);
58
58
+
}
59
59
+
out.insert(k, ParamValue::Bool(vals));
60
60
+
}
61
61
+
todo!();
62
62
+
}
63
63
+
_ => return Err(()),
64
64
+
};
65
65
+
}
66
66
+
67
67
+
Ok(Self(out))
68
68
+
}
69
69
+
}
70
70
+
71
71
+
#[derive(Clone)]
72
72
+
pub struct Proxy {
73
73
+
repo: Repo,
74
74
+
client: Client,
75
75
+
}
76
76
+
77
77
+
impl Proxy {
78
78
+
pub fn new(repo: Repo) -> Self {
79
79
+
let client = Client::builder()
80
80
+
.user_agent(format!(
81
81
+
"microcosm slingshot v{} (contact: @bad-example.com)",
82
82
+
env!("CARGO_PKG_VERSION")
83
83
+
))
84
84
+
.no_proxy()
85
85
+
.timeout(Duration::from_secs(6))
86
86
+
.build()
87
87
+
.unwrap();
88
88
+
Self { repo, client }
89
89
+
}
90
90
+
91
91
+
pub async fn proxy(
92
92
+
&self,
93
93
+
xrpc: String,
94
94
+
service: String,
95
95
+
params: Option<Map<String, Value>>,
96
96
+
) -> Result<Value, ProxyError> {
97
97
+
98
98
+
// hackin it to start
99
99
+
100
100
+
// 1. assume did-web (TODO) and get the did doc
101
101
+
#[derive(Debug, Deserialize)]
102
102
+
struct ServiceDoc {
103
103
+
id: String,
104
104
+
service: Vec<ServiceItem>,
105
105
+
}
106
106
+
#[derive(Debug, Deserialize)]
107
107
+
struct ServiceItem {
108
108
+
id: String,
109
109
+
#[expect(unused)]
110
110
+
r#type: String,
111
111
+
#[serde(rename = "serviceEndpoint")]
112
112
+
service_endpoint: Url,
113
113
+
}
114
114
+
let dw = service.strip_prefix("did:web:").expect("a did web");
115
115
+
let (dw, service_id) = dw.split_once("#").expect("whatever");
116
116
+
let mut dw_url = Url::parse(&format!("https://{dw}"))?;
117
117
+
dw_url.set_path("/.well-known/did.json");
118
118
+
let doc: ServiceDoc = self.client
119
119
+
.get(dw_url)
120
120
+
.send()
121
121
+
.await?
122
122
+
.error_for_status()?
123
123
+
.json()
124
124
+
.await?;
125
125
+
126
126
+
assert_eq!(doc.id, format!("did:web:{}", dw));
127
127
+
128
128
+
let mut upstream = None;
129
129
+
for ServiceItem { id, service_endpoint, .. } in doc.service {
130
130
+
let Some((_, id)) = id.split_once("#") else { continue; };
131
131
+
if id != service_id { continue; };
132
132
+
upstream = Some(service_endpoint);
133
133
+
break;
134
134
+
}
135
135
+
136
136
+
// 2. proxy the request forward
137
137
+
let mut upstream = upstream.expect("to find it");
138
138
+
upstream.set_path(&format!("/xrpc/{xrpc}")); // TODO: validate nsid
139
139
+
140
140
+
if let Some(params) = params {
141
141
+
let mut query = upstream.query_pairs_mut();
142
142
+
let Params(ps) = params.try_into().expect("valid params");
143
143
+
for (k, pvs) in ps {
144
144
+
match pvs {
145
145
+
ParamValue::String(s) => {
146
146
+
for s in s {
147
147
+
query.append_pair(&k, &s);
148
148
+
}
149
149
+
}
150
150
+
ParamValue::Int(i) => {
151
151
+
for i in i {
152
152
+
query.append_pair(&k, &i.to_string());
153
153
+
}
154
154
+
}
155
155
+
ParamValue::Bool(b) => {
156
156
+
for b in b {
157
157
+
query.append_pair(&k, &b.to_string());
158
158
+
}
159
159
+
}
160
160
+
}
161
161
+
}
162
162
+
}
163
163
+
164
164
+
// TODO: other headers to proxy
165
165
+
Ok(self.client
166
166
+
.get(upstream)
167
167
+
.send()
168
168
+
.await?
169
169
+
.error_for_status()?
170
170
+
.json()
171
171
+
.await?)
172
172
+
}
173
173
+
}
174
174
+
175
175
+
#[derive(Debug, PartialEq)]
176
176
+
pub enum PathPart {
177
177
+
Scalar(String),
178
178
+
Vector(String, Option<String>), // key, $type
179
179
+
}
180
180
+
181
181
+
pub fn parse_record_path(input: &str) -> Result<Vec<PathPart>, String> {
182
182
+
let mut out = Vec::new();
183
183
+
184
184
+
let mut key_acc = String::new();
185
185
+
let mut type_acc = String::new();
186
186
+
let mut in_bracket = false;
187
187
+
let mut chars = input.chars().enumerate();
188
188
+
while let Some((i, c)) = chars.next() {
189
189
+
match c {
190
190
+
'[' if in_bracket => return Err(format!("nested opening bracket not allowed, at {i}")),
191
191
+
'[' if key_acc.is_empty() => return Err(format!("missing key before opening bracket, at {i}")),
192
192
+
'[' => in_bracket = true,
193
193
+
']' if in_bracket => {
194
194
+
in_bracket = false;
195
195
+
let key = std::mem::take(&mut key_acc);
196
196
+
let r#type = std::mem::take(&mut type_acc);
197
197
+
let t = if r#type.is_empty() { None } else { Some(r#type) };
198
198
+
out.push(PathPart::Vector(key, t));
199
199
+
// peek ahead because we need a dot after array if there's more and i don't want to add more loop state
200
200
+
let Some((i, c)) = chars.next() else {
201
201
+
break;
202
202
+
};
203
203
+
if c != '.' {
204
204
+
return Err(format!("expected dot after close bracket, found {c:?} at {i}"));
205
205
+
}
206
206
+
}
207
207
+
']' => return Err(format!("unexpected close bracket at {i}")),
208
208
+
'.' if in_bracket => type_acc.push(c),
209
209
+
'.' if key_acc.is_empty() => return Err(format!("missing key before next segment, at {i}")),
210
210
+
'.' => {
211
211
+
let key = std::mem::take(&mut key_acc);
212
212
+
assert!(type_acc.is_empty());
213
213
+
out.push(PathPart::Scalar(key));
214
214
+
}
215
215
+
_ if in_bracket => type_acc.push(c),
216
216
+
_ => key_acc.push(c),
217
217
+
}
218
218
+
}
219
219
+
if in_bracket {
220
220
+
return Err("unclosed bracket".into());
221
221
+
}
222
222
+
if !key_acc.is_empty() {
223
223
+
out.push(PathPart::Scalar(key_acc));
224
224
+
}
225
225
+
Ok(out)
226
226
+
}
227
227
+
228
228
+
#[derive(Debug, Clone, Copy, PartialEq)]
229
229
+
pub enum RefShape {
230
230
+
StrongRef,
231
231
+
AtUri,
232
232
+
AtUriParts,
233
233
+
Did,
234
234
+
Handle,
235
235
+
AtIdentifier,
236
236
+
}
237
237
+
238
238
+
impl TryFrom<&str> for RefShape {
239
239
+
type Error = String;
240
240
+
fn try_from(s: &str) -> Result<Self, Self::Error> {
241
241
+
match s {
242
242
+
"strong-ref" => Ok(Self::StrongRef),
243
243
+
"at-uri" => Ok(Self::AtUri),
244
244
+
"at-uri-parts" => Ok(Self::AtUriParts),
245
245
+
"did" => Ok(Self::Did),
246
246
+
"handle" => Ok(Self::Handle),
247
247
+
"at-identifier" => Ok(Self::AtIdentifier),
248
248
+
_ => Err(format!("unknown shape: {s}")),
249
249
+
}
250
250
+
}
251
251
+
}
252
252
+
253
253
+
#[derive(Debug, PartialEq)]
254
254
+
pub enum MatchedRef {
255
255
+
AtUri {
256
256
+
uri: String,
257
257
+
cid: Option<String>,
258
258
+
},
259
259
+
Identifier(String),
260
260
+
}
261
261
+
262
262
+
pub fn match_shape(shape: RefShape, val: &Value) -> Option<MatchedRef> {
263
263
+
// TODO: actually validate at-uri format
264
264
+
// TODO: actually validate everything else also
265
265
+
// TODO: should this function normalize identifiers to DIDs probably?
266
266
+
// or just return at-uri parts so the caller can resolve and reassemble
267
267
+
match shape {
268
268
+
RefShape::StrongRef => {
269
269
+
let o = val.as_object()?;
270
270
+
let uri = o.get("uri")?.as_str()?.to_string();
271
271
+
let cid = o.get("cid")?.as_str()?.to_string();
272
272
+
Some(MatchedRef::AtUri { uri, cid: Some(cid) })
273
273
+
}
274
274
+
RefShape::AtUri => {
275
275
+
let uri = val.as_str()?.to_string();
276
276
+
Some(MatchedRef::AtUri { uri, cid: None })
277
277
+
}
278
278
+
RefShape::AtUriParts => {
279
279
+
let o = val.as_object()?;
280
280
+
let identifier = o.get("repo").or(o.get("did"))?.as_str()?.to_string();
281
281
+
let collection = o.get("collection")?.as_str()?.to_string();
282
282
+
let rkey = o.get("rkey")?.as_str()?.to_string();
283
283
+
let uri = format!("at://{identifier}/{collection}/{rkey}");
284
284
+
let cid = o.get("cid").and_then(|v| v.as_str()).map(str::to_string);
285
285
+
Some(MatchedRef::AtUri { uri, cid })
286
286
+
}
287
287
+
RefShape::Did => {
288
288
+
let id = val.as_str()?;
289
289
+
if !id.starts_with("did:") {
290
290
+
return None;
291
291
+
}
292
292
+
Some(MatchedRef::Identifier(id.to_string()))
293
293
+
}
294
294
+
RefShape::Handle => {
295
295
+
let id = val.as_str()?;
296
296
+
if id.contains(':') {
297
297
+
return None;
298
298
+
}
299
299
+
Some(MatchedRef::Identifier(id.to_string()))
300
300
+
}
301
301
+
RefShape::AtIdentifier => {
302
302
+
Some(MatchedRef::Identifier(val.as_str()?.to_string()))
303
303
+
}
304
304
+
}
305
305
+
}
306
306
+
307
307
+
// TODO: send back metadata about the matching
308
308
+
pub fn extract_links(
309
309
+
sources: Vec<HydrationSource>,
310
310
+
skeleton: &Value,
311
311
+
) -> Result<Vec<MatchedRef>, String> {
312
312
+
// collect early to catch errors from the client
313
313
+
// (TODO maybe the handler should do this and pass in the processed stuff probably definitely yeah)
314
314
+
let sources = sources
315
315
+
.into_iter()
316
316
+
.map(|HydrationSource { path, shape }| {
317
317
+
let path_parts = parse_record_path(&path)?;
318
318
+
let shape: RefShape = shape.as_str().try_into()?;
319
319
+
Ok((path_parts, shape))
320
320
+
})
321
321
+
.collect::<Result<Vec<_>, String>>()?;
322
322
+
323
323
+
// lazy first impl, just re-walk the skeleton as many times as needed
324
324
+
// not deduplicating for now
325
325
+
let mut out = Vec::new();
326
326
+
for (path_parts, shape) in sources {
327
327
+
for val in PathWalker::new(&path_parts, skeleton) {
328
328
+
if let Some(matched) = match_shape(shape, val) {
329
329
+
out.push(matched);
330
330
+
}
331
331
+
}
332
332
+
}
333
333
+
334
334
+
Ok(out)
335
335
+
}
336
336
+
337
337
+
struct PathWalker<'a> {
338
338
+
todo: Vec<(&'a [PathPart], &'a Value)>,
339
339
+
}
340
340
+
impl<'a> PathWalker<'a> {
341
341
+
fn new(path_parts: &'a [PathPart], skeleton: &'a Value) -> Self {
342
342
+
Self { todo: vec![(path_parts, skeleton)] }
343
343
+
}
344
344
+
}
345
345
+
impl<'a> Iterator for PathWalker<'a> {
346
346
+
type Item = &'a Value;
347
347
+
fn next(&mut self) -> Option<Self::Item> {
348
348
+
loop {
349
349
+
let (parts, val) = self.todo.pop()?;
350
350
+
let Some((part, rest)) = parts.split_first() else {
351
351
+
return Some(val);
352
352
+
};
353
353
+
let Some(o) = val.as_object() else {
354
354
+
continue;
355
355
+
};
356
356
+
match part {
357
357
+
PathPart::Scalar(k) => {
358
358
+
let Some(v) = o.get(k) else {
359
359
+
continue;
360
360
+
};
361
361
+
self.todo.push((rest, v));
362
362
+
}
363
363
+
PathPart::Vector(k, t) => {
364
364
+
let Some(a) = o.get(k).and_then(|v| v.as_array()) else {
365
365
+
continue;
366
366
+
};
367
367
+
for v in a
368
368
+
.iter()
369
369
+
.rev()
370
370
+
.filter(|c| {
371
371
+
let Some(t) = t else { return true };
372
372
+
c
373
373
+
.as_object()
374
374
+
.and_then(|o| o.get("$type"))
375
375
+
.and_then(|v| v.as_str())
376
376
+
.map(|s| s == t)
377
377
+
.unwrap_or(false)
378
378
+
})
379
379
+
{
380
380
+
self.todo.push((rest, v))
381
381
+
}
382
382
+
}
383
383
+
}
384
384
+
}
385
385
+
}
386
386
+
}
387
387
+
388
388
+
389
389
+
#[cfg(test)]
390
390
+
mod tests {
391
391
+
use super::*;
392
392
+
use serde_json::json;
393
393
+
394
394
+
#[test]
395
395
+
fn test_parse_record_path() -> Result<(), Box<dyn std::error::Error>> {
396
396
+
let cases = [
397
397
+
("", vec![]),
398
398
+
("subject", vec![PathPart::Scalar("subject".into())]),
399
399
+
("authorDid", vec![PathPart::Scalar("authorDid".into())]),
400
400
+
("subject.uri", vec![PathPart::Scalar("subject".into()), PathPart::Scalar("uri".into())]),
401
401
+
("members[]", vec![PathPart::Vector("members".into(), None)]),
402
402
+
("add[].key", vec![
403
403
+
PathPart::Vector("add".into(), None),
404
404
+
PathPart::Scalar("key".into()),
405
405
+
]),
406
406
+
("a[b]", vec![PathPart::Vector("a".into(), Some("b".into()))]),
407
407
+
("a[b.c]", vec![PathPart::Vector("a".into(), Some("b.c".into()))]),
408
408
+
("facets[app.bsky.richtext.facet].features[app.bsky.richtext.facet#mention].did", vec![
409
409
+
PathPart::Vector("facets".into(), Some("app.bsky.richtext.facet".into())),
410
410
+
PathPart::Vector("features".into(), Some("app.bsky.richtext.facet#mention".into())),
411
411
+
PathPart::Scalar("did".into()),
412
412
+
]),
413
413
+
];
414
414
+
415
415
+
for (path, expected) in cases {
416
416
+
let parsed = parse_record_path(path)?;
417
417
+
assert_eq!(parsed, expected, "path: {path:?}");
418
418
+
}
419
419
+
420
420
+
Ok(())
421
421
+
}
422
422
+
423
423
+
#[test]
424
424
+
fn test_match_shape() {
425
425
+
let cases = [
426
426
+
("strong-ref", json!(""), None),
427
427
+
("strong-ref", json!({}), None),
428
428
+
("strong-ref", json!({ "uri": "abc" }), None),
429
429
+
("strong-ref", json!({ "cid": "def" }), None),
430
430
+
(
431
431
+
"strong-ref",
432
432
+
json!({ "uri": "abc", "cid": "def" }),
433
433
+
Some(MatchedRef::AtUri { uri: "abc".to_string(), cid: Some("def".to_string()) }),
434
434
+
),
435
435
+
("at-uri", json!({ "uri": "abc" }), None),
436
436
+
("at-uri", json!({ "uri": "abc", "cid": "def" }), None),
437
437
+
(
438
438
+
"at-uri",
439
439
+
json!("abc"),
440
440
+
Some(MatchedRef::AtUri { uri: "abc".to_string(), cid: None }),
441
441
+
),
442
442
+
("at-uri-parts", json!("abc"), None),
443
443
+
("at-uri-parts", json!({}), None),
444
444
+
(
445
445
+
"at-uri-parts",
446
446
+
json!({"repo": "a", "collection": "b", "rkey": "c"}),
447
447
+
Some(MatchedRef::AtUri { uri: "at://a/b/c".to_string(), cid: None }),
448
448
+
),
449
449
+
(
450
450
+
"at-uri-parts",
451
451
+
json!({"did": "a", "collection": "b", "rkey": "c"}),
452
452
+
Some(MatchedRef::AtUri { uri: "at://a/b/c".to_string(), cid: None }),
453
453
+
),
454
454
+
(
455
455
+
"at-uri-parts",
456
456
+
// 'repo' takes precedence over 'did'
457
457
+
json!({"did": "a", "repo": "z", "collection": "b", "rkey": "c"}),
458
458
+
Some(MatchedRef::AtUri { uri: "at://z/b/c".to_string(), cid: None }),
459
459
+
),
460
460
+
(
461
461
+
"at-uri-parts",
462
462
+
json!({"repo": "a", "collection": "b", "rkey": "c", "cid": "def"}),
463
463
+
Some(MatchedRef::AtUri { uri: "at://a/b/c".to_string(), cid: Some("def".to_string()) }),
464
464
+
),
465
465
+
(
466
466
+
"at-uri-parts",
467
467
+
json!({"repo": "a", "collection": "b", "rkey": "c", "cid": {}}),
468
468
+
Some(MatchedRef::AtUri { uri: "at://a/b/c".to_string(), cid: None }),
469
469
+
),
470
470
+
("did", json!({}), None),
471
471
+
("did", json!(""), None),
472
472
+
("did", json!("bad-example.com"), None),
473
473
+
("did", json!("did:plc:xyz"), Some(MatchedRef::Identifier("did:plc:xyz".to_string()))),
474
474
+
("handle", json!({}), None),
475
475
+
("handle", json!("bad-example.com"), Some(MatchedRef::Identifier("bad-example.com".to_string()))),
476
476
+
("handle", json!("did:plc:xyz"), None),
477
477
+
("at-identifier", json!({}), None),
478
478
+
("at-identifier", json!("bad-example.com"), Some(MatchedRef::Identifier("bad-example.com".to_string()))),
479
479
+
("at-identifier", json!("did:plc:xyz"), Some(MatchedRef::Identifier("did:plc:xyz".to_string()))),
480
480
+
];
481
481
+
for (shape, val, expected) in cases {
482
482
+
let s = shape.try_into().unwrap();
483
483
+
let matched = match_shape(s, &val);
484
484
+
assert_eq!(matched, expected, "shape: {shape:?}, val: {val:?}");
485
485
+
}
486
486
+
}
487
487
+
}
+2
-2
slingshot/src/record.rs
···
11
11
12
12
#[derive(Debug, Serialize, Deserialize)]
13
13
pub struct RawRecord {
14
14
-
cid: Cid,
15
15
-
record: String,
14
14
+
pub cid: Cid,
15
15
+
pub record: String,
16
16
}
17
17
18
18
// TODO: should be able to do typed CID
+277
-6
slingshot/src/server.rs
···
1
1
use crate::{
2
2
-
CachedRecord, ErrorResponseObject, Identity, Repo,
2
2
+
CachedRecord, ErrorResponseObject, Identity, Proxy, Repo,
3
3
error::{RecordError, ServerError},
4
4
+
proxy::{extract_links, MatchedRef},
5
5
+
record::RawRecord,
4
6
};
5
7
use atrium_api::types::string::{Cid, Did, Handle, Nsid, RecordKey};
6
8
use foyer::HybridCache;
7
9
use links::at_uri::parse_at_uri as normalize_at_uri;
8
10
use serde::Serialize;
9
9
-
use std::path::PathBuf;
10
10
-
use std::str::FromStr;
11
11
-
use std::sync::Arc;
12
12
-
use std::time::Instant;
11
11
+
use std::{path::PathBuf, str::FromStr, sync::Arc, time::Instant, collections::HashMap};
12
12
+
use tokio::sync::mpsc;
13
13
use tokio_util::sync::CancellationToken;
14
14
15
15
use poem::{
···
24
24
};
25
25
use poem_openapi::{
26
26
ApiResponse, ContactObject, ExternalDocumentObject, Object, OpenApi, OpenApiService, Tags,
27
27
+
Union,
27
28
param::Query, payload::Json, types::Example,
28
29
};
29
30
···
87
88
88
89
fn bad_request_handler_resolve_mini(err: poem::Error) -> ResolveMiniIDResponse {
89
90
ResolveMiniIDResponse::BadRequest(Json(XrpcErrorResponseObject {
91
91
+
error: "InvalidRequest".to_string(),
92
92
+
message: format!("Bad request, here's some info that maybe should not be exposed: {err}"),
93
93
+
}))
94
94
+
}
95
95
+
96
96
+
fn bad_request_handler_proxy_query(err: poem::Error) -> ProxyHydrateResponse {
97
97
+
ProxyHydrateResponse::BadRequest(Json(XrpcErrorResponseObject {
90
98
error: "InvalidRequest".to_string(),
91
99
message: format!("Bad request, here's some info that maybe should not be exposed: {err}"),
92
100
}))
···
191
199
}
192
200
193
201
#[derive(Object)]
202
202
+
struct ProxyHydrationError {
203
203
+
reason: String,
204
204
+
}
205
205
+
206
206
+
#[derive(Object)]
207
207
+
struct ProxyHydrationPending {
208
208
+
url: String,
209
209
+
}
210
210
+
211
211
+
#[derive(Object)]
212
212
+
struct ProxyHydrationRecordFound {
213
213
+
record: serde_json::Value,
214
214
+
}
215
215
+
216
216
+
#[derive(Object)]
217
217
+
struct ProxyHydrationIdentifierFound {
218
218
+
record: MiniDocResponseObject,
219
219
+
}
220
220
+
221
221
+
// todo: there's gotta be a supertrait that collects these?
222
222
+
use poem_openapi::types::{Type, ToJSON, ParseFromJSON, IsObjectType};
223
223
+
224
224
+
#[derive(Union)]
225
225
+
#[oai(discriminator_name = "status", rename_all = "camelCase")]
226
226
+
enum Hydration<T: Send + Sync + Type + ToJSON + ParseFromJSON + IsObjectType> {
227
227
+
Error(ProxyHydrationError),
228
228
+
Pending(ProxyHydrationPending),
229
229
+
Found(T),
230
230
+
}
231
231
+
232
232
+
#[derive(Object)]
233
233
+
#[oai(example = true)]
234
234
+
struct ProxyHydrateResponseObject {
235
235
+
/// The original upstream response content
236
236
+
output: serde_json::Value,
237
237
+
/// Any hydrated records
238
238
+
records: HashMap<String, Hydration<ProxyHydrationRecordFound>>,
239
239
+
/// Any hydrated identifiers
240
240
+
identifiers: HashMap<String, Hydration<ProxyHydrationIdentifierFound>>,
241
241
+
}
242
242
+
impl Example for ProxyHydrateResponseObject {
243
243
+
fn example() -> Self {
244
244
+
Self {
245
245
+
output: serde_json::json!({}),
246
246
+
records: HashMap::from([
247
247
+
("asdf".into(), Hydration::Pending(ProxyHydrationPending { url: "todo".into() })),
248
248
+
]),
249
249
+
identifiers: HashMap::new(),
250
250
+
}
251
251
+
}
252
252
+
}
253
253
+
254
254
+
#[derive(ApiResponse)]
255
255
+
#[oai(bad_request_handler = "bad_request_handler_proxy_query")]
256
256
+
enum ProxyHydrateResponse {
257
257
+
#[oai(status = 200)]
258
258
+
Ok(Json<ProxyHydrateResponseObject>),
259
259
+
#[oai(status = 400)]
260
260
+
BadRequest(XrpcError)
261
261
+
}
262
262
+
263
263
+
#[derive(Object)]
264
264
+
pub struct HydrationSource {
265
265
+
/// Record Path syntax for locating fields
266
266
+
pub path: String,
267
267
+
/// What to expect at the path: 'strong-ref', 'at-uri', 'at-uri-parts', 'did', 'handle', or 'at-identifier'.
268
268
+
///
269
269
+
/// - `strong-ref`: object in the shape of `com.atproto.repo.strongRef` with `uri` and `cid` keys.
270
270
+
/// - `at-uri`: string, must have all segments present (identifier, collection, rkey)
271
271
+
/// - `at-uri-parts`: object with keys (`repo` or `did`), `collection`, `rkey`, and optional `cid`. Other keys may be present and will be ignored.
272
272
+
/// - `did`: string, `did` format
273
273
+
/// - `handle`: string, `handle` format
274
274
+
/// - `at-identifier`: string, `did` or `handle` format
275
275
+
pub shape: String,
276
276
+
}
277
277
+
278
278
+
#[derive(Object)]
279
279
+
#[oai(example = true)]
280
280
+
struct ProxyQueryPayload {
281
281
+
/// The NSID of the XRPC you wish to forward
282
282
+
xrpc: String,
283
283
+
/// The destination service the request will be forwarded to
284
284
+
atproto_proxy: String,
285
285
+
/// The `params` for the destination service XRPC endpoint
286
286
+
///
287
287
+
/// Currently this will be passed along unchecked, but a future version of
288
288
+
/// slingshot may attempt to do lexicon resolution to validate `params`
289
289
+
/// based on the upstream service
290
290
+
params: Option<serde_json::Value>,
291
291
+
/// Paths within the response to look for at-uris that can be hydrated
292
292
+
hydration_sources: Vec<HydrationSource>,
293
293
+
// todo: deadline thing
294
294
+
295
295
+
}
296
296
+
impl Example for ProxyQueryPayload {
297
297
+
fn example() -> Self {
298
298
+
Self {
299
299
+
xrpc: "app.bsky.feed.getFeedSkeleton".to_string(),
300
300
+
atproto_proxy: "did:web:blue.mackuba.eu#bsky_fg".to_string(),
301
301
+
params: Some(serde_json::json!({
302
302
+
"feed": "at://did:plc:oio4hkxaop4ao4wz2pp3f4cr/app.bsky.feed.generator/atproto",
303
303
+
})),
304
304
+
hydration_sources: vec![
305
305
+
HydrationSource {
306
306
+
path: "feed[].post".to_string(),
307
307
+
shape: "at-uri".to_string(),
308
308
+
}
309
309
+
],
310
310
+
}
311
311
+
}
312
312
+
}
313
313
+
314
314
+
#[derive(Object)]
194
315
#[oai(example = true)]
195
316
struct FoundDidResponseObject {
196
317
/// the DID, bi-directionally verified if using Slingshot
···
221
342
struct Xrpc {
222
343
cache: HybridCache<String, CachedRecord>,
223
344
identity: Identity,
345
345
+
proxy: Arc<Proxy>,
224
346
repo: Arc<Repo>,
225
347
}
226
348
···
550
672
}))
551
673
}
552
674
675
675
+
/// com.bad-example.proxy.hydrateQueryResponse
676
676
+
///
677
677
+
/// > [!important]
678
678
+
/// > Unstable! This endpoint is experimental and may change.
679
679
+
///
680
680
+
/// Fetch + include records referenced from an upstream xrpc query response
681
681
+
#[oai(
682
682
+
path = "/com.bad-example.proxy.hydrateQueryResponse",
683
683
+
method = "post",
684
684
+
tag = "ApiTags::Custom"
685
685
+
)]
686
686
+
async fn proxy_hydrate_query(
687
687
+
&self,
688
688
+
Json(payload): Json<ProxyQueryPayload>,
689
689
+
) -> ProxyHydrateResponse {
690
690
+
// TODO: the Accept request header, if present, gotta be json
691
691
+
// TODO: find any Authorization header and verify it. TBD about `aud`.
692
692
+
693
693
+
let params = if let Some(p) = payload.params {
694
694
+
let serde_json::Value::Object(map) = p else {
695
695
+
panic!("params have to be an object");
696
696
+
};
697
697
+
Some(map)
698
698
+
} else { None };
699
699
+
700
700
+
match self.proxy.proxy(
701
701
+
payload.xrpc,
702
702
+
payload.atproto_proxy,
703
703
+
params,
704
704
+
).await {
705
705
+
Ok(skeleton) => {
706
706
+
let links = match extract_links(payload.hydration_sources, &skeleton) {
707
707
+
Ok(l) => l,
708
708
+
Err(e) => {
709
709
+
log::warn!("problem extracting: {e:?}");
710
710
+
return ProxyHydrateResponse::BadRequest(xrpc_error("oop", "sorry, error extracting"))
711
711
+
}
712
712
+
};
713
713
+
let mut records = HashMap::new();
714
714
+
let mut identifiers = HashMap::new();
715
715
+
716
716
+
enum GetThing {
717
717
+
Record(String, Hydration<ProxyHydrationRecordFound>),
718
718
+
Identifier(String, Hydration<ProxyHydrationIdentifierFound>),
719
719
+
}
720
720
+
721
721
+
let (tx, mut rx) = mpsc::channel(1);
722
722
+
723
723
+
for link in links {
724
724
+
match link {
725
725
+
MatchedRef::AtUri { uri, cid } => {
726
726
+
if records.contains_key(&uri) {
727
727
+
log::warn!("skipping duplicate record without checking cid");
728
728
+
continue;
729
729
+
}
730
730
+
let mut u = url::Url::parse("https://example.com").unwrap();
731
731
+
u.query_pairs_mut().append_pair("at_uri", &uri); // BLEH todo
732
732
+
records.insert(uri.clone(), Hydration::Pending(ProxyHydrationPending {
733
733
+
url: format!("/xrpc/blue.microcosm.repo.getRecordByUri?{}", u.query().unwrap()), // TODO better; with cid, etc.
734
734
+
}));
735
735
+
let tx = tx.clone();
736
736
+
let identity = self.identity.clone();
737
737
+
let repo = self.repo.clone();
738
738
+
tokio::task::spawn(async move {
739
739
+
let rest = uri.strip_prefix("at://").unwrap();
740
740
+
let (identifier, rest) = rest.split_once('/').unwrap();
741
741
+
let (collection, rkey) = rest.split_once('/').unwrap();
742
742
+
743
743
+
let did = if identifier.starts_with("did:") {
744
744
+
Did::new(identifier.to_string()).unwrap()
745
745
+
} else {
746
746
+
let handle = Handle::new(identifier.to_string()).unwrap();
747
747
+
identity.handle_to_did(handle).await.unwrap().unwrap()
748
748
+
};
749
749
+
750
750
+
let res = match repo.get_record(
751
751
+
&did,
752
752
+
&Nsid::new(collection.to_string()).unwrap(),
753
753
+
&RecordKey::new(rkey.to_string()).unwrap(),
754
754
+
&cid.as_ref().map(|s| Cid::from_str(s).unwrap()),
755
755
+
).await {
756
756
+
Ok(CachedRecord::Deleted) =>
757
757
+
Hydration::Error(ProxyHydrationError {
758
758
+
reason: "record deleted".to_string(),
759
759
+
}),
760
760
+
Ok(CachedRecord::Found(RawRecord { cid: found_cid, record })) => {
761
761
+
if let Some(c) = cid && found_cid.as_ref().to_string() != c {
762
762
+
log::warn!("ignoring cid mismatch");
763
763
+
}
764
764
+
let value = serde_json::from_str(&record).unwrap();
765
765
+
Hydration::Found(ProxyHydrationRecordFound {
766
766
+
record: value,
767
767
+
})
768
768
+
}
769
769
+
Err(e) => {
770
770
+
log::warn!("finally oop {e:?}");
771
771
+
Hydration::Error(ProxyHydrationError {
772
772
+
reason: "failed to fetch record".to_string(),
773
773
+
})
774
774
+
}
775
775
+
};
776
776
+
tx.send(GetThing::Record(uri, res)).await
777
777
+
});
778
778
+
}
779
779
+
MatchedRef::Identifier(id) => {
780
780
+
if identifiers.contains_key(&id) {
781
781
+
continue;
782
782
+
}
783
783
+
let mut u = url::Url::parse("https://example.com").unwrap();
784
784
+
u.query_pairs_mut().append_pair("identifier", &id);
785
785
+
identifiers.insert(id, Hydration::Pending(ProxyHydrationPending {
786
786
+
url: format!("/xrpc/blue.microcosm.identity.resolveMiniDoc?{}", u.query().unwrap()), // gross
787
787
+
}));
788
788
+
let tx = tx.clone();
789
789
+
// let doc_fut = self.resolve_mini_doc();
790
790
+
tokio::task::spawn(async {
791
791
+
792
792
+
});
793
793
+
}
794
794
+
}
795
795
+
}
796
796
+
// so the channel can close when all are completed
797
797
+
// (we shoudl be doing a timeout...)
798
798
+
drop(tx);
799
799
+
800
800
+
while let Some(hydration) = rx.recv().await {
801
801
+
match hydration {
802
802
+
GetThing::Record(uri, h) => { records.insert(uri, h); }
803
803
+
GetThing::Identifier(uri, md) => { identifiers.insert(uri, md); }
804
804
+
};
805
805
+
}
806
806
+
807
807
+
ProxyHydrateResponse::Ok(Json(ProxyHydrateResponseObject {
808
808
+
output: skeleton,
809
809
+
records,
810
810
+
identifiers,
811
811
+
}))
812
812
+
}
813
813
+
Err(e) => {
814
814
+
log::warn!("oh no: {e:?}");
815
815
+
ProxyHydrateResponse::BadRequest(xrpc_error("oop", "sorry"))
816
816
+
}
817
817
+
}
818
818
+
819
819
+
}
820
820
+
553
821
async fn get_record_impl(
554
822
&self,
555
823
repo: String,
···
748
1016
cache: HybridCache<String, CachedRecord>,
749
1017
identity: Identity,
750
1018
repo: Repo,
1019
1019
+
proxy: Proxy,
751
1020
acme_domain: Option<String>,
752
1021
acme_contact: Option<String>,
753
1022
acme_cache_path: Option<PathBuf>,
···
756
1025
bind: std::net::SocketAddr,
757
1026
) -> Result<(), ServerError> {
758
1027
let repo = Arc::new(repo);
1028
1028
+
let proxy = Arc::new(proxy);
759
1029
let api_service = OpenApiService::new(
760
1030
Xrpc {
761
1031
cache,
762
1032
identity,
1033
1033
+
proxy,
763
1034
repo,
764
1035
},
765
1036
"Slingshot",
···
823
1094
.with(
824
1095
Cors::new()
825
1096
.allow_origin_regex("*")
826
826
-
.allow_methods([Method::GET])
1097
1097
+
.allow_methods([Method::GET, Method::POST])
827
1098
.allow_credentials(false),
828
1099
)
829
1100
.with(CatchPanic::new())