+3
-2
Cargo.lock
+3
-2
Cargo.lock
···
803
804
[[package]]
805
name = "bytes"
806
-
version = "1.11.1"
807
source = "registry+https://github.com/rust-lang/crates.io-index"
808
-
checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33"
809
810
[[package]]
811
name = "byteview"
···
5965
"form_urlencoded",
5966
"idna",
5967
"percent-encoding",
5968
]
5969
5970
[[package]]
···
803
804
[[package]]
805
name = "bytes"
806
+
version = "1.10.1"
807
source = "registry+https://github.com/rust-lang/crates.io-index"
808
+
checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a"
809
810
[[package]]
811
name = "byteview"
···
5965
"form_urlencoded",
5966
"idna",
5967
"percent-encoding",
5968
+
"serde",
5969
]
5970
5971
[[package]]
+1
-7
constellation/src/bin/main.rs
+1
-7
constellation/src/bin/main.rs
···
45
#[arg(short, long)]
46
#[clap(value_enum, default_value_t = StorageBackend::Memory)]
47
backend: StorageBackend,
48
-
/// Serve a did:web document for this domain
49
-
#[arg(long)]
50
-
did_web_domain: Option<String>,
51
/// Initiate a database backup into this dir, if supported by the storage
52
#[arg(long)]
53
backup: Option<PathBuf>,
···
106
MemStorage::new(),
107
fixture,
108
None,
109
-
args.did_web_domain,
110
stream,
111
bind,
112
metrics_bind,
···
142
rocks,
143
fixture,
144
args.data,
145
-
args.did_web_domain,
146
stream,
147
bind,
148
metrics_bind,
···
164
mut storage: impl LinkStorage,
165
fixture: Option<PathBuf>,
166
data_dir: Option<PathBuf>,
167
-
did_web_domain: Option<String>,
168
stream: String,
169
bind: SocketAddr,
170
metrics_bind: SocketAddr,
···
217
if collect_metrics {
218
install_metrics_server(metrics_bind)?;
219
}
220
-
serve(readable, bind, did_web_domain, staying_alive).await
221
})
222
.unwrap();
223
stay_alive.drop_guard();
···
45
#[arg(short, long)]
46
#[clap(value_enum, default_value_t = StorageBackend::Memory)]
47
backend: StorageBackend,
48
/// Initiate a database backup into this dir, if supported by the storage
49
#[arg(long)]
50
backup: Option<PathBuf>,
···
103
MemStorage::new(),
104
fixture,
105
None,
106
stream,
107
bind,
108
metrics_bind,
···
138
rocks,
139
fixture,
140
args.data,
141
stream,
142
bind,
143
metrics_bind,
···
159
mut storage: impl LinkStorage,
160
fixture: Option<PathBuf>,
161
data_dir: Option<PathBuf>,
162
stream: String,
163
bind: SocketAddr,
164
metrics_bind: SocketAddr,
···
211
if collect_metrics {
212
install_metrics_server(metrics_bind)?;
213
}
214
+
serve(readable, bind, staying_alive).await
215
})
216
.unwrap();
217
stay_alive.drop_guard();
+7
-32
constellation/src/server/mod.rs
+7
-32
constellation/src/server/mod.rs
···
3
extract::{Query, Request},
4
http::{self, header},
5
middleware::{self, Next},
6
-
response::{IntoResponse, Json, Response},
7
routing::get,
8
Router,
9
};
···
37
http::StatusCode::INTERNAL_SERVER_ERROR
38
}
39
40
-
pub async fn serve<S: LinkReader, A: ToSocketAddrs>(
41
-
store: S,
42
-
addr: A,
43
-
did_web_domain: Option<String>,
44
-
stay_alive: CancellationToken,
45
-
) -> anyhow::Result<()> {
46
-
let mut app = Router::new();
47
-
48
-
if let Some(d) = did_web_domain {
49
-
app = app.route(
50
-
"/.well-known/did.json",
51
-
get({
52
-
let domain = d.clone();
53
-
move || did_web(domain)
54
-
}),
55
-
)
56
-
}
57
-
58
-
let app = app
59
.route("/robots.txt", get(robots))
60
.route(
61
"/",
···
217
User-agent: *
218
Disallow: /links
219
Disallow: /links/
220
-
Disallow: /xrpc/
221
"
222
-
}
223
-
224
-
async fn did_web(domain: String) -> impl IntoResponse {
225
-
Json(serde_json::json!({
226
-
"id": format!("did:web:{domain}"),
227
-
"service": [{
228
-
"id": "#constellation",
229
-
"type": "ConstellationGraphService",
230
-
"serviceEndpoint": format!("https://{domain}")
231
-
}]
232
-
}))
233
}
234
235
#[derive(Template, Serialize, Deserialize)]
···
3
extract::{Query, Request},
4
http::{self, header},
5
middleware::{self, Next},
6
+
response::{IntoResponse, Response},
7
routing::get,
8
Router,
9
};
···
37
http::StatusCode::INTERNAL_SERVER_ERROR
38
}
39
40
+
pub async fn serve<S, A>(store: S, addr: A, stay_alive: CancellationToken) -> anyhow::Result<()>
41
+
where
42
+
S: LinkReader,
43
+
A: ToSocketAddrs,
44
+
{
45
+
let app = Router::new()
46
.route("/robots.txt", get(robots))
47
.route(
48
"/",
···
204
User-agent: *
205
Disallow: /links
206
Disallow: /links/
207
"
208
}
209
210
#[derive(Template, Serialize, Deserialize)]
+1
-1
slingshot/Cargo.toml
+1
-1
slingshot/Cargo.toml
+18
slingshot/src/error.rs
+18
slingshot/src/error.rs
···
91
#[error("upstream non-atproto bad request")]
92
UpstreamBadBadNotGoodRequest(reqwest::Error),
93
}
94
+
95
+
#[derive(Debug, Error)]
96
+
pub enum ProxyError {
97
+
#[error("failed to parse path: {0}")]
98
+
PathParseError(String),
99
+
#[error(transparent)]
100
+
UrlParseError(#[from] url::ParseError),
101
+
#[error(transparent)]
102
+
ReqwestError(#[from] reqwest::Error),
103
+
#[error(transparent)]
104
+
InvalidHeader(#[from] reqwest::header::InvalidHeaderValue),
105
+
#[error(transparent)]
106
+
IdentityError(#[from] IdentityError),
107
+
#[error("upstream service could not be resolved")]
108
+
ServiceNotFound,
109
+
#[error("upstream service was found but no services matched")]
110
+
ServiceNotMatched,
111
+
}
+208
-16
slingshot/src/identity.rs
+208
-16
slingshot/src/identity.rs
···
17
18
use crate::error::IdentityError;
19
use atrium_api::{
20
-
did_doc::DidDocument,
21
types::string::{Did, Handle},
22
};
23
use atrium_common::resolver::Resolver;
···
41
pub enum IdentityKey {
42
Handle(Handle),
43
Did(Did),
44
}
45
46
impl IdentityKey {
···
48
let s = match self {
49
IdentityKey::Handle(h) => h.as_str(),
50
IdentityKey::Did(d) => d.as_str(),
51
};
52
std::mem::size_of::<Self>() + std::mem::size_of_val(s)
53
}
···
59
#[derive(Debug, Serialize, Deserialize)]
60
enum IdentityData {
61
NotFound,
62
-
Did(Did),
63
-
Doc(PartialMiniDoc),
64
}
65
66
impl IdentityVal {
···
71
IdentityData::Did(d) => std::mem::size_of_val(d.as_str()),
72
IdentityData::Doc(d) => {
73
std::mem::size_of_val(d.unverified_handle.as_str())
74
-
+ std::mem::size_of_val(d.pds.as_str())
75
-
+ std::mem::size_of_val(d.signing_key.as_str())
76
}
77
};
78
wrapping + inner
···
168
}
169
}
170
171
/// multi-producer *single-consumer* queue structures (wrap in arc-mutex plz)
172
///
173
/// the hashset allows testing for presense of items in the queue.
···
296
let now = UtcDateTime::now();
297
let IdentityVal(last_fetch, data) = entry.value();
298
match data {
299
-
IdentityData::Doc(_) => {
300
-
log::error!("identity value mixup: got a doc from a handle key (should be a did)");
301
-
Err(IdentityError::IdentityValTypeMixup(handle.to_string()))
302
-
}
303
IdentityData::NotFound => {
304
if (now - *last_fetch) >= MIN_NOT_FOUND_TTL {
305
metrics::counter!("identity_handle_refresh_queued", "reason" => "ttl", "found" => "false").increment(1);
···
313
self.queue_refresh(key).await;
314
}
315
Ok(Some(did.clone()))
316
}
317
}
318
}
···
362
let now = UtcDateTime::now();
363
let IdentityVal(last_fetch, data) = entry.value();
364
match data {
365
-
IdentityData::Did(_) => {
366
-
log::error!("identity value mixup: got a did from a did key (should be a doc)");
367
-
Err(IdentityError::IdentityValTypeMixup(did.to_string()))
368
-
}
369
IdentityData::NotFound => {
370
if (now - *last_fetch) >= MIN_NOT_FOUND_TTL {
371
metrics::counter!("identity_did_refresh_queued", "reason" => "ttl", "found" => "false").increment(1);
···
373
}
374
Ok(None)
375
}
376
-
IdentityData::Doc(mini_did) => {
377
if (now - *last_fetch) >= MIN_TTL {
378
metrics::counter!("identity_did_refresh_queued", "reason" => "ttl", "found" => "true").increment(1);
379
self.queue_refresh(key).await;
380
}
381
-
Ok(Some(mini_did.clone()))
382
}
383
}
384
}
···
519
log::warn!(
520
"refreshed did doc failed: wrong did doc id. dropping refresh."
521
);
522
continue;
523
}
524
let mini_doc = match did_doc.try_into() {
···
526
Err(e) => {
527
metrics::counter!("identity_did_refresh", "success" => "false", "reason" => "bad doc").increment(1);
528
log::warn!(
529
-
"converting mini doc failed: {e:?}. dropping refresh."
530
);
531
continue;
532
}
533
};
···
554
}
555
556
self.complete_refresh(&task_key).await?; // failures are bugs, so break loop
557
}
558
}
559
}
···
17
18
use crate::error::IdentityError;
19
use atrium_api::{
20
+
did_doc::{DidDocument, Service as DidDocServic},
21
types::string::{Did, Handle},
22
};
23
use atrium_common::resolver::Resolver;
···
41
pub enum IdentityKey {
42
Handle(Handle),
43
Did(Did),
44
+
ServiceDid(Did),
45
}
46
47
impl IdentityKey {
···
49
let s = match self {
50
IdentityKey::Handle(h) => h.as_str(),
51
IdentityKey::Did(d) => d.as_str(),
52
+
IdentityKey::ServiceDid(d) => d.as_str(),
53
};
54
std::mem::size_of::<Self>() + std::mem::size_of_val(s)
55
}
···
61
#[derive(Debug, Serialize, Deserialize)]
62
enum IdentityData {
63
NotFound,
64
+
Did(Did), // from handle
65
+
Doc(PartialMiniDoc), // from did
66
+
ServiceDoc(MiniServiceDoc), // from service did
67
}
68
69
impl IdentityVal {
···
74
IdentityData::Did(d) => std::mem::size_of_val(d.as_str()),
75
IdentityData::Doc(d) => {
76
std::mem::size_of_val(d.unverified_handle.as_str())
77
+
+ std::mem::size_of_val(&d.pds)
78
+
+ std::mem::size_of_val(&d.signing_key)
79
+
}
80
+
IdentityData::ServiceDoc(d) => {
81
+
let mut s = std::mem::size_of::<MiniServiceDoc>();
82
+
s += std::mem::size_of_val(&d.services);
83
+
for sv in &d.services {
84
+
s += std::mem::size_of_val(&sv.full_id);
85
+
s += std::mem::size_of_val(&sv.r#type);
86
+
s += std::mem::size_of_val(&sv.endpoint);
87
+
}
88
+
s
89
}
90
};
91
wrapping + inner
···
181
}
182
}
183
184
+
/// Simplified info from service DID docs
185
+
#[derive(Debug, Clone, Serialize, Deserialize)]
186
+
pub struct MiniServiceDoc {
187
+
services: Vec<MiniService>,
188
+
}
189
+
190
+
impl MiniServiceDoc {
191
+
pub fn get(&self, id_fragment: &str, service_type: Option<&str>) -> Option<&MiniService> {
192
+
self.services.iter().find(|ms| {
193
+
ms.full_id.ends_with(id_fragment)
194
+
&& service_type.map(|t| t == ms.r#type).unwrap_or(true)
195
+
})
196
+
}
197
+
}
198
+
199
+
/// The corresponding service info
200
+
#[derive(Debug, Clone, Serialize, Deserialize)]
201
+
pub struct MiniService {
202
+
/// The full id
203
+
///
204
+
/// for informational purposes only -- services are deduplicated by id fragment
205
+
full_id: String,
206
+
r#type: String,
207
+
/// HTTP endpoint for the actual service
208
+
pub endpoint: String,
209
+
}
210
+
211
+
impl TryFrom<DidDocument> for MiniServiceDoc {
212
+
type Error = String;
213
+
fn try_from(did_doc: DidDocument) -> Result<Self, Self::Error> {
214
+
let mut services = Vec::new();
215
+
let mut seen = HashSet::new();
216
+
217
+
for DidDocServic {
218
+
id,
219
+
r#type,
220
+
service_endpoint,
221
+
} in did_doc.service.unwrap_or(vec![])
222
+
{
223
+
let Some((_, id_fragment)) = id.rsplit_once('#') else {
224
+
continue;
225
+
};
226
+
if !seen.insert((id_fragment.to_string(), r#type.clone())) {
227
+
continue;
228
+
}
229
+
services.push(MiniService {
230
+
full_id: id,
231
+
r#type,
232
+
endpoint: service_endpoint,
233
+
});
234
+
}
235
+
236
+
Ok(Self { services })
237
+
}
238
+
}
239
+
240
/// multi-producer *single-consumer* queue structures (wrap in arc-mutex plz)
241
///
242
/// the hashset allows testing for presense of items in the queue.
···
365
let now = UtcDateTime::now();
366
let IdentityVal(last_fetch, data) = entry.value();
367
match data {
368
IdentityData::NotFound => {
369
if (now - *last_fetch) >= MIN_NOT_FOUND_TTL {
370
metrics::counter!("identity_handle_refresh_queued", "reason" => "ttl", "found" => "false").increment(1);
···
378
self.queue_refresh(key).await;
379
}
380
Ok(Some(did.clone()))
381
+
}
382
+
_ => {
383
+
log::error!("identity value mixup: got a doc from a handle key (should be a did)");
384
+
Err(IdentityError::IdentityValTypeMixup(handle.to_string()))
385
}
386
}
387
}
···
431
let now = UtcDateTime::now();
432
let IdentityVal(last_fetch, data) = entry.value();
433
match data {
434
IdentityData::NotFound => {
435
if (now - *last_fetch) >= MIN_NOT_FOUND_TTL {
436
metrics::counter!("identity_did_refresh_queued", "reason" => "ttl", "found" => "false").increment(1);
···
438
}
439
Ok(None)
440
}
441
+
IdentityData::Doc(mini_doc) => {
442
if (now - *last_fetch) >= MIN_TTL {
443
metrics::counter!("identity_did_refresh_queued", "reason" => "ttl", "found" => "true").increment(1);
444
self.queue_refresh(key).await;
445
}
446
+
Ok(Some(mini_doc.clone()))
447
+
}
448
+
_ => {
449
+
log::error!("identity value mixup: got a doc from a handle key (should be a did)");
450
+
Err(IdentityError::IdentityValTypeMixup(did.to_string()))
451
+
}
452
+
}
453
+
}
454
+
455
+
/// Fetch (and cache) a service mini doc from a did
456
+
pub async fn did_to_mini_service_doc(
457
+
&self,
458
+
did: &Did,
459
+
) -> Result<Option<MiniServiceDoc>, IdentityError> {
460
+
let key = IdentityKey::ServiceDid(did.clone());
461
+
metrics::counter!("slingshot_get_service_did_doc").increment(1);
462
+
let entry = self
463
+
.cache
464
+
.get_or_fetch(&key, {
465
+
let did = did.clone();
466
+
let resolver = self.did_resolver.clone();
467
+
|| async move {
468
+
let t0 = Instant::now();
469
+
let (res, success) = match resolver.resolve(&did).await {
470
+
Ok(did_doc) if did_doc.id != did.to_string() => (
471
+
// TODO: fix in atrium: should verify id is did
472
+
Err(IdentityError::BadDidDoc(
473
+
"did doc's id did not match did".to_string(),
474
+
)),
475
+
"false",
476
+
),
477
+
Ok(did_doc) => match did_doc.try_into() {
478
+
Ok(mini_service_doc) => (
479
+
Ok(IdentityVal(
480
+
UtcDateTime::now(),
481
+
IdentityData::ServiceDoc(mini_service_doc),
482
+
)),
483
+
"true",
484
+
),
485
+
Err(e) => (Err(IdentityError::BadDidDoc(e)), "false"),
486
+
},
487
+
Err(atrium_identity::Error::NotFound) => (
488
+
Ok(IdentityVal(UtcDateTime::now(), IdentityData::NotFound)),
489
+
"false",
490
+
),
491
+
Err(other) => (Err(IdentityError::ResolutionFailed(other)), "false"),
492
+
};
493
+
metrics::histogram!("slingshot_fetch_service_did_doc", "success" => success)
494
+
.record(t0.elapsed());
495
+
res
496
+
}
497
+
})
498
+
.await?;
499
+
500
+
let now = UtcDateTime::now();
501
+
let IdentityVal(last_fetch, data) = entry.value();
502
+
match data {
503
+
IdentityData::NotFound => {
504
+
if (now - *last_fetch) >= MIN_NOT_FOUND_TTL {
505
+
metrics::counter!("identity_service_did_refresh_queued", "reason" => "ttl", "found" => "false").increment(1);
506
+
self.queue_refresh(key).await;
507
+
}
508
+
Ok(None)
509
+
}
510
+
IdentityData::ServiceDoc(mini_service_doc) => {
511
+
if (now - *last_fetch) >= MIN_TTL {
512
+
metrics::counter!("identity_service_did_refresh_queued", "reason" => "ttl", "found" => "true").increment(1);
513
+
self.queue_refresh(key).await;
514
+
}
515
+
Ok(Some(mini_service_doc.clone()))
516
+
}
517
+
_ => {
518
+
log::error!(
519
+
"identity value mixup: got a doc from a different key type (should be a service did)"
520
+
);
521
+
Err(IdentityError::IdentityValTypeMixup(did.to_string()))
522
}
523
}
524
}
···
659
log::warn!(
660
"refreshed did doc failed: wrong did doc id. dropping refresh."
661
);
662
+
self.complete_refresh(&task_key).await?;
663
continue;
664
}
665
let mini_doc = match did_doc.try_into() {
···
667
Err(e) => {
668
metrics::counter!("identity_did_refresh", "success" => "false", "reason" => "bad doc").increment(1);
669
log::warn!(
670
+
"converting mini doc for {did:?} failed: {e:?}. dropping refresh."
671
);
672
+
self.complete_refresh(&task_key).await?;
673
continue;
674
}
675
};
···
696
}
697
698
self.complete_refresh(&task_key).await?; // failures are bugs, so break loop
699
+
}
700
+
IdentityKey::ServiceDid(ref did) => {
701
+
log::trace!("refreshing service did doc: {did:?}");
702
+
703
+
match self.did_resolver.resolve(did).await {
704
+
Ok(did_doc) => {
705
+
// TODO: fix in atrium: should verify id is did
706
+
if did_doc.id != did.to_string() {
707
+
metrics::counter!("identity_service_did_refresh", "success" => "false", "reason" => "wrong did").increment(1);
708
+
log::warn!(
709
+
"refreshed did doc failed: wrong did doc id. dropping refresh."
710
+
);
711
+
self.complete_refresh(&task_key).await?;
712
+
continue;
713
+
}
714
+
let mini_service_doc = match did_doc.try_into() {
715
+
Ok(md) => md,
716
+
Err(e) => {
717
+
metrics::counter!("identity_service_did_refresh", "success" => "false", "reason" => "bad doc").increment(1);
718
+
log::warn!(
719
+
"converting mini service doc failed: {e:?}. dropping refresh."
720
+
);
721
+
self.complete_refresh(&task_key).await?;
722
+
continue;
723
+
}
724
+
};
725
+
metrics::counter!("identity_service_did_refresh", "success" => "true")
726
+
.increment(1);
727
+
self.cache.insert(
728
+
task_key.clone(),
729
+
IdentityVal(
730
+
UtcDateTime::now(),
731
+
IdentityData::ServiceDoc(mini_service_doc),
732
+
),
733
+
);
734
+
}
735
+
Err(atrium_identity::Error::NotFound) => {
736
+
metrics::counter!("identity_service_did_refresh", "success" => "false", "reason" => "not found").increment(1);
737
+
self.cache.insert(
738
+
task_key.clone(),
739
+
IdentityVal(UtcDateTime::now(), IdentityData::NotFound),
740
+
);
741
+
}
742
+
Err(err) => {
743
+
metrics::counter!("identity_service_did_refresh", "success" => "false", "reason" => "other").increment(1);
744
+
log::warn!(
745
+
"failed to refresh did doc: {err:?}. leaving stale (should we eventually do something?)"
746
+
);
747
+
}
748
+
}
749
}
750
}
751
}
+2
slingshot/src/lib.rs
+2
slingshot/src/lib.rs
···
3
mod firehose_cache;
4
mod healthcheck;
5
mod identity;
6
mod record;
7
mod server;
8
···
10
pub use firehose_cache::firehose_cache;
11
pub use healthcheck::healthcheck;
12
pub use identity::{Identity, IdentityKey};
13
pub use record::{CachedRecord, ErrorResponseObject, Repo};
14
pub use server::serve;
···
3
mod firehose_cache;
4
mod healthcheck;
5
mod identity;
6
+
mod proxy;
7
mod record;
8
mod server;
9
···
11
pub use firehose_cache::firehose_cache;
12
pub use healthcheck::healthcheck;
13
pub use identity::{Identity, IdentityKey};
14
+
pub use proxy::Proxy;
15
pub use record::{CachedRecord, ErrorResponseObject, Repo};
16
pub use server::serve;
+23
-8
slingshot/src/main.rs
+23
-8
slingshot/src/main.rs
···
1
-
// use foyer::HybridCache;
2
-
// use foyer::{Engine, DirectFsDeviceOptions, HybridCacheBuilder};
3
use metrics_exporter_prometheus::PrometheusBuilder;
4
use slingshot::{
5
-
Identity, Repo, consume, error::MainTaskError, firehose_cache, healthcheck, serve,
6
};
7
use std::net::SocketAddr;
8
use std::path::PathBuf;
9
10
use clap::Parser;
11
use tokio_util::sync::CancellationToken;
12
13
/// Slingshot record edge cache
14
#[derive(Parser, Debug, Clone)]
···
48
#[arg(long, env = "SLINGSHOT_IDENTITY_CACHE_DISK_DB")]
49
#[clap(default_value_t = 1)]
50
identity_cache_disk_gb: usize,
51
/// the domain pointing to this server
52
///
53
/// if present:
···
101
102
let args = Args::parse();
103
104
if args.collect_metrics {
105
log::trace!("installing metrics server...");
106
if let Err(e) = install_metrics_server(args.bind_metrics) {
···
143
)
144
.await
145
.map_err(|e| format!("identity setup failed: {e:?}"))?;
146
-
147
-
log::info!("identity service ready.");
148
let identity_refresher = identity.clone();
149
let identity_shutdown = shutdown.clone();
150
tasks.spawn(async move {
151
identity_refresher.run_refresher(identity_shutdown).await?;
152
Ok(())
153
});
154
155
let repo = Repo::new(identity.clone());
156
157
let identity_for_server = identity.clone();
158
let server_shutdown = shutdown.clone();
···
163
server_cache_handle,
164
identity_for_server,
165
repo,
166
args.acme_domain,
167
args.acme_contact,
168
args.acme_cache_path,
···
235
) -> Result<(), metrics_exporter_prometheus::BuildError> {
236
log::info!("installing metrics server...");
237
PrometheusBuilder::new()
238
-
.set_quantiles(&[0.5, 0.9, 0.99, 1.0])?
239
-
.set_bucket_duration(std::time::Duration::from_secs(300))?
240
-
.set_bucket_count(std::num::NonZero::new(12).unwrap()) // count * duration = 60 mins. stuff doesn't happen that fast here.
241
.set_enable_unit_suffix(false) // this seemed buggy for constellation (sometimes wouldn't engage)
242
.with_http_listener(bind_metrics)
243
.install()?;
···
1
use metrics_exporter_prometheus::PrometheusBuilder;
2
use slingshot::{
3
+
Identity, Proxy, Repo, consume, error::MainTaskError, firehose_cache, healthcheck, serve,
4
};
5
use std::net::SocketAddr;
6
use std::path::PathBuf;
7
8
use clap::Parser;
9
use tokio_util::sync::CancellationToken;
10
+
use url::Url;
11
12
/// Slingshot record edge cache
13
#[derive(Parser, Debug, Clone)]
···
47
#[arg(long, env = "SLINGSHOT_IDENTITY_CACHE_DISK_DB")]
48
#[clap(default_value_t = 1)]
49
identity_cache_disk_gb: usize,
50
+
/// the address of this server
51
+
///
52
+
/// used if --acme-domain is not set, defaulting to `--bind`
53
+
#[arg(long, conflicts_with("acme_domain"), env = "SLINGSHOT_PUBLIC_HOST")]
54
+
base_url: Option<Url>,
55
/// the domain pointing to this server
56
///
57
/// if present:
···
105
106
let args = Args::parse();
107
108
+
let base_url: Url = args
109
+
.base_url
110
+
.or_else(|| {
111
+
args.acme_domain
112
+
.as_ref()
113
+
.map(|d| Url::parse(&format!("https://{d}")).unwrap())
114
+
})
115
+
.unwrap_or_else(|| Url::parse(&format!("http://{}", args.bind)).unwrap());
116
+
117
if args.collect_metrics {
118
log::trace!("installing metrics server...");
119
if let Err(e) = install_metrics_server(args.bind_metrics) {
···
156
)
157
.await
158
.map_err(|e| format!("identity setup failed: {e:?}"))?;
159
let identity_refresher = identity.clone();
160
let identity_shutdown = shutdown.clone();
161
tasks.spawn(async move {
162
identity_refresher.run_refresher(identity_shutdown).await?;
163
Ok(())
164
});
165
+
log::info!("identity service ready.");
166
167
let repo = Repo::new(identity.clone());
168
+
let proxy = Proxy::new(identity.clone());
169
170
let identity_for_server = identity.clone();
171
let server_shutdown = shutdown.clone();
···
176
server_cache_handle,
177
identity_for_server,
178
repo,
179
+
proxy,
180
+
base_url,
181
args.acme_domain,
182
args.acme_contact,
183
args.acme_cache_path,
···
250
) -> Result<(), metrics_exporter_prometheus::BuildError> {
251
log::info!("installing metrics server...");
252
PrometheusBuilder::new()
253
+
.set_buckets(&[0.001, 0.006, 0.036, 0.216, 1.296, 7.776, 45.656])?
254
+
.set_bucket_duration(std::time::Duration::from_secs(15))?
255
+
.set_bucket_count(std::num::NonZero::new(4).unwrap()) // count * duration = bucket lifetime
256
.set_enable_unit_suffix(false) // this seemed buggy for constellation (sometimes wouldn't engage)
257
.with_http_listener(bind_metrics)
258
.install()?;
+601
slingshot/src/proxy.rs
+601
slingshot/src/proxy.rs
···
···
1
+
use crate::{Identity, error::ProxyError, server::HydrationSource};
2
+
use atrium_api::types::string::{AtIdentifier, Cid, Did, Nsid, RecordKey};
3
+
use reqwest::Client;
4
+
use serde_json::{Map, Value};
5
+
use std::{collections::HashMap, time::Duration};
6
+
use url::Url;
7
+
8
+
pub enum ParamValue {
9
+
String(Vec<String>),
10
+
Int(Vec<i64>),
11
+
Bool(Vec<bool>),
12
+
}
13
+
pub struct Params(HashMap<String, ParamValue>);
14
+
15
+
impl TryFrom<Map<String, Value>> for Params {
16
+
type Error = (); // TODO
17
+
fn try_from(val: Map<String, Value>) -> Result<Self, Self::Error> {
18
+
let mut out = HashMap::new();
19
+
for (k, v) in val {
20
+
match v {
21
+
Value::String(s) => out.insert(k, ParamValue::String(vec![s])),
22
+
Value::Bool(b) => out.insert(k, ParamValue::Bool(vec![b])),
23
+
Value::Number(n) => {
24
+
let Some(i) = n.as_i64() else {
25
+
return Err(());
26
+
};
27
+
out.insert(k, ParamValue::Int(vec![i]))
28
+
}
29
+
Value::Array(a) => {
30
+
let Some(first) = a.first() else {
31
+
continue;
32
+
};
33
+
if first.is_string() {
34
+
let mut vals = Vec::with_capacity(a.len());
35
+
for v in a {
36
+
let Some(v) = v.as_str() else {
37
+
return Err(());
38
+
};
39
+
vals.push(v.to_string());
40
+
}
41
+
out.insert(k, ParamValue::String(vals));
42
+
} else if first.is_i64() {
43
+
let mut vals = Vec::with_capacity(a.len());
44
+
for v in a {
45
+
let Some(v) = v.as_i64() else {
46
+
return Err(());
47
+
};
48
+
vals.push(v);
49
+
}
50
+
out.insert(k, ParamValue::Int(vals));
51
+
} else if first.is_boolean() {
52
+
let mut vals = Vec::with_capacity(a.len());
53
+
for v in a {
54
+
let Some(v) = v.as_bool() else {
55
+
return Err(());
56
+
};
57
+
vals.push(v);
58
+
}
59
+
out.insert(k, ParamValue::Bool(vals));
60
+
}
61
+
todo!();
62
+
}
63
+
_ => return Err(()),
64
+
};
65
+
}
66
+
67
+
Ok(Self(out))
68
+
}
69
+
}
70
+
71
+
#[derive(Clone)]
72
+
pub struct Proxy {
73
+
identity: Identity,
74
+
client: Client,
75
+
}
76
+
77
+
impl Proxy {
78
+
pub fn new(identity: Identity) -> Self {
79
+
let client = Client::builder()
80
+
.user_agent(format!(
81
+
"microcosm slingshot v{} (contact: @bad-example.com)",
82
+
env!("CARGO_PKG_VERSION")
83
+
))
84
+
.no_proxy()
85
+
.timeout(Duration::from_secs(6))
86
+
.build()
87
+
.unwrap();
88
+
Self { client, identity }
89
+
}
90
+
91
+
pub async fn proxy(
92
+
&self,
93
+
service_did: &Did,
94
+
service_id: &str,
95
+
xrpc: &Nsid,
96
+
authorization: Option<&str>,
97
+
atproto_accept_labelers: Option<&str>,
98
+
params: Option<Map<String, Value>>,
99
+
) -> Result<Value, ProxyError> {
100
+
let mut upstream: Url = self
101
+
.identity
102
+
.did_to_mini_service_doc(service_did)
103
+
.await?
104
+
.ok_or(ProxyError::ServiceNotFound)?
105
+
.get(service_id, None)
106
+
.ok_or(ProxyError::ServiceNotMatched)?
107
+
.endpoint
108
+
.parse()?;
109
+
110
+
upstream.set_path(&format!("/xrpc/{}", xrpc.as_str()));
111
+
112
+
if let Some(params) = params {
113
+
let mut query = upstream.query_pairs_mut();
114
+
let Params(ps) = params.try_into().expect("valid params");
115
+
for (k, pvs) in ps {
116
+
match pvs {
117
+
ParamValue::String(s) => {
118
+
for s in s {
119
+
query.append_pair(&k, &s);
120
+
}
121
+
}
122
+
ParamValue::Int(i) => {
123
+
for i in i {
124
+
query.append_pair(&k, &i.to_string());
125
+
}
126
+
}
127
+
ParamValue::Bool(b) => {
128
+
for b in b {
129
+
query.append_pair(&k, &b.to_string());
130
+
}
131
+
}
132
+
}
133
+
}
134
+
}
135
+
136
+
// TODO i mean maybe we should look for headers also in our headers but not obviously
137
+
let mut headers = reqwest::header::HeaderMap::new();
138
+
// TODO: check the jwt aud against the upstream!!!
139
+
if let Some(auth) = authorization {
140
+
headers.insert("Authorization", auth.try_into()?);
141
+
}
142
+
if let Some(aal) = atproto_accept_labelers {
143
+
headers.insert("atproto-accept-labelers", aal.try_into()?);
144
+
}
145
+
146
+
let t0 = std::time::Instant::now();
147
+
let res = self
148
+
.client
149
+
.get(upstream)
150
+
.headers(headers)
151
+
.send()
152
+
.await
153
+
.and_then(|r| r.error_for_status());
154
+
155
+
if res.is_ok() {
156
+
metrics::histogram!("slingshot_proxy_upstream_request", "success" => "true")
157
+
.record(t0.elapsed());
158
+
} else {
159
+
metrics::histogram!("slingshot_proxy_upstream_request", "success" => "false")
160
+
.record(t0.elapsed());
161
+
}
162
+
163
+
Ok(res?.json().await?)
164
+
}
165
+
}
166
+
167
+
#[derive(Debug, PartialEq)]
168
+
pub enum PathPart {
169
+
Scalar(String),
170
+
Vector(String, Option<String>), // key, $type
171
+
}
172
+
173
+
pub fn parse_record_path(input: &str) -> Result<Vec<PathPart>, String> {
174
+
let mut out = Vec::new();
175
+
176
+
let mut key_acc = String::new();
177
+
let mut type_acc = String::new();
178
+
let mut in_bracket = false;
179
+
let mut chars = input.chars().enumerate();
180
+
while let Some((i, c)) = chars.next() {
181
+
match c {
182
+
'[' if in_bracket => return Err(format!("nested opening bracket not allowed, at {i}")),
183
+
'[' if key_acc.is_empty() => {
184
+
return Err(format!("missing key before opening bracket, at {i}"));
185
+
}
186
+
'[' => in_bracket = true,
187
+
']' if in_bracket => {
188
+
in_bracket = false;
189
+
let key = std::mem::take(&mut key_acc);
190
+
let r#type = std::mem::take(&mut type_acc);
191
+
let t = if r#type.is_empty() {
192
+
None
193
+
} else {
194
+
Some(r#type)
195
+
};
196
+
out.push(PathPart::Vector(key, t));
197
+
// peek ahead because we need a dot after array if there's more and i don't want to add more loop state
198
+
let Some((i, c)) = chars.next() else {
199
+
break;
200
+
};
201
+
if c != '.' {
202
+
return Err(format!(
203
+
"expected dot after close bracket, found {c:?} at {i}"
204
+
));
205
+
}
206
+
}
207
+
']' => return Err(format!("unexpected close bracket at {i}")),
208
+
'.' if in_bracket => type_acc.push(c),
209
+
'.' if key_acc.is_empty() => {
210
+
return Err(format!("missing key before next segment, at {i}"));
211
+
}
212
+
'.' => {
213
+
let key = std::mem::take(&mut key_acc);
214
+
assert!(type_acc.is_empty());
215
+
out.push(PathPart::Scalar(key));
216
+
}
217
+
_ if in_bracket => type_acc.push(c),
218
+
_ => key_acc.push(c),
219
+
}
220
+
}
221
+
if in_bracket {
222
+
return Err("unclosed bracket".into());
223
+
}
224
+
if !key_acc.is_empty() {
225
+
out.push(PathPart::Scalar(key_acc));
226
+
}
227
+
Ok(out)
228
+
}
229
+
230
+
#[derive(Debug, Clone, Copy, PartialEq)]
231
+
pub enum RefShape {
232
+
StrongRef,
233
+
AtUri,
234
+
AtUriParts,
235
+
Did,
236
+
Handle,
237
+
AtIdentifier,
238
+
}
239
+
240
+
impl TryFrom<&str> for RefShape {
241
+
type Error = String;
242
+
fn try_from(s: &str) -> Result<Self, Self::Error> {
243
+
match s {
244
+
"strong-ref" => Ok(Self::StrongRef),
245
+
"at-uri" => Ok(Self::AtUri),
246
+
"at-uri-parts" => Ok(Self::AtUriParts),
247
+
"did" => Ok(Self::Did),
248
+
"handle" => Ok(Self::Handle),
249
+
"at-identifier" => Ok(Self::AtIdentifier),
250
+
_ => Err(format!("unknown shape: {s}")),
251
+
}
252
+
}
253
+
}
254
+
255
+
#[derive(Debug, PartialEq)]
256
+
pub struct FullAtUriParts {
257
+
pub repo: AtIdentifier,
258
+
pub collection: Nsid,
259
+
pub rkey: RecordKey,
260
+
pub cid: Option<Cid>,
261
+
}
262
+
263
+
impl FullAtUriParts {
264
+
pub fn to_uri(&self) -> String {
265
+
let repo: String = self.repo.clone().into(); // no as_str for AtIdentifier atrium???
266
+
let collection = self.collection.as_str();
267
+
let rkey = self.rkey.as_str();
268
+
format!("at://{repo}/{collection}/{rkey}")
269
+
}
270
+
}
271
+
272
+
// TODO: move this to links
273
+
pub fn split_uri(uri: &str) -> Option<(AtIdentifier, Nsid, RecordKey)> {
274
+
let rest = uri.strip_prefix("at://")?;
275
+
let (repo, rest) = rest.split_once("/")?;
276
+
let repo = repo.parse().ok()?;
277
+
let (collection, rkey) = rest.split_once("/")?;
278
+
let collection = collection.parse().ok()?;
279
+
let rkey = rkey.split_once('#').map(|(k, _)| k).unwrap_or(rkey);
280
+
let rkey = rkey.split_once('?').map(|(k, _)| k).unwrap_or(rkey);
281
+
let rkey = rkey.parse().ok()?;
282
+
Some((repo, collection, rkey))
283
+
}
284
+
285
+
#[derive(Debug, PartialEq)]
286
+
pub enum MatchedRef {
287
+
AtUri(FullAtUriParts),
288
+
Identifier(AtIdentifier),
289
+
}
290
+
291
+
pub fn match_shape(shape: RefShape, val: &Value) -> Option<MatchedRef> {
292
+
// TODO: actually validate at-uri format
293
+
// TODO: actually validate everything else also
294
+
// TODO: should this function normalize identifiers to DIDs probably?
295
+
// or just return at-uri parts so the caller can resolve and reassemble
296
+
match shape {
297
+
RefShape::StrongRef => {
298
+
let o = val.as_object()?;
299
+
let uri = o.get("uri")?.as_str()?.to_string();
300
+
let cid = o.get("cid")?.as_str()?.parse().ok()?;
301
+
let (repo, collection, rkey) = split_uri(&uri)?;
302
+
Some(MatchedRef::AtUri(FullAtUriParts {
303
+
repo,
304
+
collection,
305
+
rkey,
306
+
cid: Some(cid),
307
+
}))
308
+
}
309
+
RefShape::AtUri => {
310
+
let uri = val.as_str()?.to_string();
311
+
let (repo, collection, rkey) = split_uri(&uri)?;
312
+
Some(MatchedRef::AtUri(FullAtUriParts {
313
+
repo,
314
+
collection,
315
+
rkey,
316
+
cid: None,
317
+
}))
318
+
}
319
+
RefShape::AtUriParts => {
320
+
let o = val.as_object()?;
321
+
let repo = o.get("repo").or(o.get("did"))?.as_str()?.parse().ok()?;
322
+
let collection = o.get("collection")?.as_str()?.parse().ok()?;
323
+
let rkey = o.get("rkey")?.as_str()?.parse().ok()?;
324
+
let cid = o
325
+
.get("cid")
326
+
.and_then(|v| v.as_str())
327
+
.and_then(|s| s.parse().ok());
328
+
Some(MatchedRef::AtUri(FullAtUriParts {
329
+
repo,
330
+
collection,
331
+
rkey,
332
+
cid,
333
+
}))
334
+
}
335
+
RefShape::Did => {
336
+
let did = val.as_str()?.parse().ok()?;
337
+
Some(MatchedRef::Identifier(AtIdentifier::Did(did)))
338
+
}
339
+
RefShape::Handle => {
340
+
let handle = val.as_str()?.parse().ok()?;
341
+
Some(MatchedRef::Identifier(AtIdentifier::Handle(handle)))
342
+
}
343
+
RefShape::AtIdentifier => {
344
+
let identifier = val.as_str()?.parse().ok()?;
345
+
Some(MatchedRef::Identifier(identifier))
346
+
}
347
+
}
348
+
}
349
+
350
+
// TODO: send back metadata about the matching
351
+
pub fn extract_links(
352
+
sources: Vec<HydrationSource>,
353
+
skeleton: &Value,
354
+
) -> Result<Vec<MatchedRef>, String> {
355
+
// collect early to catch errors from the client
356
+
// (TODO maybe the handler should do this and pass in the processed stuff probably definitely yeah)
357
+
let sources = sources
358
+
.into_iter()
359
+
.map(|HydrationSource { path, shape }| {
360
+
let path_parts = parse_record_path(&path)?;
361
+
let shape: RefShape = shape.as_str().try_into()?;
362
+
Ok((path_parts, shape))
363
+
})
364
+
.collect::<Result<Vec<_>, String>>()?;
365
+
366
+
// lazy first impl, just re-walk the skeleton as many times as needed
367
+
// not deduplicating for now
368
+
let mut out = Vec::new();
369
+
for (path_parts, shape) in sources {
370
+
for val in PathWalker::new(&path_parts, skeleton) {
371
+
if let Some(matched) = match_shape(shape, val) {
372
+
out.push(matched);
373
+
}
374
+
}
375
+
}
376
+
377
+
Ok(out)
378
+
}
379
+
380
+
struct PathWalker<'a> {
381
+
todo: Vec<(&'a [PathPart], &'a Value)>,
382
+
}
383
+
impl<'a> PathWalker<'a> {
384
+
fn new(path_parts: &'a [PathPart], skeleton: &'a Value) -> Self {
385
+
Self {
386
+
todo: vec![(path_parts, skeleton)],
387
+
}
388
+
}
389
+
}
390
+
impl<'a> Iterator for PathWalker<'a> {
391
+
type Item = &'a Value;
392
+
fn next(&mut self) -> Option<Self::Item> {
393
+
loop {
394
+
let (parts, val) = self.todo.pop()?;
395
+
let Some((part, rest)) = parts.split_first() else {
396
+
return Some(val);
397
+
};
398
+
let Some(o) = val.as_object() else {
399
+
continue;
400
+
};
401
+
match part {
402
+
PathPart::Scalar(k) => {
403
+
let Some(v) = o.get(k) else {
404
+
continue;
405
+
};
406
+
self.todo.push((rest, v));
407
+
}
408
+
PathPart::Vector(k, t) => {
409
+
let Some(a) = o.get(k).and_then(|v| v.as_array()) else {
410
+
continue;
411
+
};
412
+
for v in a.iter().rev().filter(|c| {
413
+
let Some(t) = t else { return true };
414
+
c.as_object()
415
+
.and_then(|o| o.get("$type"))
416
+
.and_then(|v| v.as_str())
417
+
.map(|s| s == t)
418
+
.unwrap_or(false)
419
+
}) {
420
+
self.todo.push((rest, v))
421
+
}
422
+
}
423
+
}
424
+
}
425
+
}
426
+
}
427
+
428
+
#[cfg(test)]
429
+
mod tests {
430
+
use super::*;
431
+
use serde_json::json;
432
+
433
+
static TEST_CID: &str = "bafyreidffwk5wvh5l76yy7zefiqrovv6yaaegb4wg4zaq35w7nt3quix5a";
434
+
435
+
#[test]
436
+
fn test_parse_record_path() -> Result<(), Box<dyn std::error::Error>> {
437
+
let cases = [
438
+
("", vec![]),
439
+
("subject", vec![PathPart::Scalar("subject".into())]),
440
+
("authorDid", vec![PathPart::Scalar("authorDid".into())]),
441
+
(
442
+
"subject.uri",
443
+
vec![
444
+
PathPart::Scalar("subject".into()),
445
+
PathPart::Scalar("uri".into()),
446
+
],
447
+
),
448
+
("members[]", vec![PathPart::Vector("members".into(), None)]),
449
+
(
450
+
"add[].key",
451
+
vec![
452
+
PathPart::Vector("add".into(), None),
453
+
PathPart::Scalar("key".into()),
454
+
],
455
+
),
456
+
("a[b]", vec![PathPart::Vector("a".into(), Some("b".into()))]),
457
+
(
458
+
"a[b.c]",
459
+
vec![PathPart::Vector("a".into(), Some("b.c".into()))],
460
+
),
461
+
(
462
+
"facets[app.bsky.richtext.facet].features[app.bsky.richtext.facet#mention].did",
463
+
vec![
464
+
PathPart::Vector("facets".into(), Some("app.bsky.richtext.facet".into())),
465
+
PathPart::Vector(
466
+
"features".into(),
467
+
Some("app.bsky.richtext.facet#mention".into()),
468
+
),
469
+
PathPart::Scalar("did".into()),
470
+
],
471
+
),
472
+
];
473
+
474
+
for (path, expected) in cases {
475
+
let parsed = parse_record_path(path)?;
476
+
assert_eq!(parsed, expected, "path: {path:?}");
477
+
}
478
+
479
+
Ok(())
480
+
}
481
+
482
+
#[test]
483
+
fn test_match_shape() {
484
+
let cases = [
485
+
("strong-ref", json!(""), None),
486
+
("strong-ref", json!({}), None),
487
+
("strong-ref", json!({ "uri": "abc" }), None),
488
+
("strong-ref", json!({ "cid": TEST_CID }), None),
489
+
(
490
+
"strong-ref",
491
+
json!({ "uri": "at://a.com/xx.yy.zz/1", "cid": TEST_CID }),
492
+
Some(MatchedRef::AtUri(FullAtUriParts {
493
+
repo: "a.com".parse().unwrap(),
494
+
collection: "xx.yy.zz".parse().unwrap(),
495
+
rkey: "1".parse().unwrap(),
496
+
cid: Some(TEST_CID.parse().unwrap()),
497
+
})),
498
+
),
499
+
("at-uri", json!({ "uri": "abc" }), None),
500
+
(
501
+
"at-uri",
502
+
json!({ "uri": "at://did:web:y.com/xx.yy.zz/1", "cid": TEST_CID }),
503
+
None,
504
+
),
505
+
(
506
+
"at-uri",
507
+
json!("at://did:web:y.com/xx.yy.zz/1"),
508
+
Some(MatchedRef::AtUri(FullAtUriParts {
509
+
repo: "did:web:y.com".parse().unwrap(),
510
+
collection: "xx.yy.zz".parse().unwrap(),
511
+
rkey: "1".parse().unwrap(),
512
+
cid: None,
513
+
})),
514
+
),
515
+
("at-uri-parts", json!("abc"), None),
516
+
("at-uri-parts", json!({}), None),
517
+
(
518
+
"at-uri-parts",
519
+
json!({"repo": "a.com", "collection": "xx.yy.zz", "rkey": "1", "cid": TEST_CID}),
520
+
Some(MatchedRef::AtUri(FullAtUriParts {
521
+
repo: "a.com".parse().unwrap(),
522
+
collection: "xx.yy.zz".parse().unwrap(),
523
+
rkey: "1".parse().unwrap(),
524
+
cid: Some(TEST_CID.parse().unwrap()),
525
+
})),
526
+
),
527
+
(
528
+
"at-uri-parts",
529
+
json!({"did": "a.com", "collection": "xx.yy.zz", "rkey": "1"}),
530
+
Some(MatchedRef::AtUri(FullAtUriParts {
531
+
repo: "a.com".parse().unwrap(),
532
+
collection: "xx.yy.zz".parse().unwrap(),
533
+
rkey: "1".parse().unwrap(),
534
+
cid: None,
535
+
})),
536
+
),
537
+
(
538
+
"at-uri-parts",
539
+
// 'repo' takes precedence over 'did'
540
+
json!({"did": "did:web:a.com", "repo": "z.com", "collection": "xx.yy.zz", "rkey": "1"}),
541
+
Some(MatchedRef::AtUri(FullAtUriParts {
542
+
repo: "z.com".parse().unwrap(),
543
+
collection: "xx.yy.zz".parse().unwrap(),
544
+
rkey: "1".parse().unwrap(),
545
+
cid: None,
546
+
})),
547
+
),
548
+
(
549
+
"at-uri-parts",
550
+
json!({"repo": "a.com", "collection": "xx.yy.zz", "rkey": "1", "cid": TEST_CID}),
551
+
Some(MatchedRef::AtUri(FullAtUriParts {
552
+
repo: "a.com".parse().unwrap(),
553
+
collection: "xx.yy.zz".parse().unwrap(),
554
+
rkey: "1".parse().unwrap(),
555
+
cid: Some(TEST_CID.parse().unwrap()),
556
+
})),
557
+
),
558
+
(
559
+
"at-uri-parts",
560
+
json!({"repo": "a.com", "collection": "xx.yy.zz", "rkey": "1", "cid": {}}),
561
+
Some(MatchedRef::AtUri(FullAtUriParts {
562
+
repo: "a.com".parse().unwrap(),
563
+
collection: "xx.yy.zz".parse().unwrap(),
564
+
rkey: "1".parse().unwrap(),
565
+
cid: None,
566
+
})),
567
+
),
568
+
("did", json!({}), None),
569
+
("did", json!(""), None),
570
+
("did", json!("bad-example.com"), None),
571
+
(
572
+
"did",
573
+
json!("did:plc:xyz"),
574
+
Some(MatchedRef::Identifier("did:plc:xyz".parse().unwrap())),
575
+
),
576
+
("handle", json!({}), None),
577
+
(
578
+
"handle",
579
+
json!("bad-example.com"),
580
+
Some(MatchedRef::Identifier("bad-example.com".parse().unwrap())),
581
+
),
582
+
("handle", json!("did:plc:xyz"), None),
583
+
("at-identifier", json!({}), None),
584
+
(
585
+
"at-identifier",
586
+
json!("bad-example.com"),
587
+
Some(MatchedRef::Identifier("bad-example.com".parse().unwrap())),
588
+
),
589
+
(
590
+
"at-identifier",
591
+
json!("did:plc:xyz"),
592
+
Some(MatchedRef::Identifier("did:plc:xyz".parse().unwrap())),
593
+
),
594
+
];
595
+
for (i, (shape, val, expected)) in cases.into_iter().enumerate() {
596
+
let s = shape.try_into().unwrap();
597
+
let matched = match_shape(s, &val);
598
+
assert_eq!(matched, expected, "{i}: shape: {shape:?}, val: {val:?}");
599
+
}
600
+
}
601
+
}
+2
-2
slingshot/src/record.rs
+2
-2
slingshot/src/record.rs
+613
-47
slingshot/src/server.rs
+613
-47
slingshot/src/server.rs
···
1
use crate::{
2
-
CachedRecord, ErrorResponseObject, Identity, Repo,
3
error::{RecordError, ServerError},
4
};
5
-
use atrium_api::types::string::{Cid, Did, Handle, Nsid, RecordKey};
6
use foyer::HybridCache;
7
use links::at_uri::parse_at_uri as normalize_at_uri;
8
use serde::Serialize;
9
-
use std::path::PathBuf;
10
-
use std::str::FromStr;
11
-
use std::sync::Arc;
12
-
use std::time::Instant;
13
use tokio_util::sync::CancellationToken;
14
15
use poem::{
···
24
};
25
use poem_openapi::{
26
ApiResponse, ContactObject, ExternalDocumentObject, Object, OpenApi, OpenApiService, Tags,
27
-
param::Query, payload::Json, types::Example,
28
};
29
30
fn example_handle() -> String {
···
33
fn example_did() -> String {
34
"did:plc:hdhoaan3xa3jiuq4fg4mefid".to_string()
35
}
36
fn example_collection() -> String {
37
"app.bsky.feed.like".to_string()
38
}
39
fn example_rkey() -> String {
40
"3lv4ouczo2b2a".to_string()
41
}
42
fn example_uri() -> String {
43
format!(
···
54
"zQ3shpq1g134o7HGDb86CtQFxnHqzx5pZWknrVX2Waum3fF6j".to_string()
55
}
56
57
-
#[derive(Object)]
58
#[oai(example = true)]
59
struct XrpcErrorResponseObject {
60
/// Should correspond an error `name` in the lexicon errors array
···
85
}))
86
}
87
88
-
fn bad_request_handler_resolve_mini(err: poem::Error) -> ResolveMiniIDResponse {
89
-
ResolveMiniIDResponse::BadRequest(Json(XrpcErrorResponseObject {
90
error: "InvalidRequest".to_string(),
91
message: format!("Bad request, here's some info that maybe should not be exposed: {err}"),
92
}))
···
181
182
#[derive(ApiResponse)]
183
#[oai(bad_request_handler = "bad_request_handler_resolve_mini")]
184
-
enum ResolveMiniIDResponse {
185
/// Identity resolved
186
#[oai(status = 200)]
187
Ok(Json<MiniDocResponseObject>),
···
192
193
#[derive(Object)]
194
#[oai(example = true)]
195
struct FoundDidResponseObject {
196
/// the DID, bi-directionally verified if using Slingshot
197
did: String,
···
219
}
220
221
struct Xrpc {
222
cache: HybridCache<String, CachedRecord>,
223
identity: Identity,
224
repo: Arc<Repo>,
225
}
226
···
286
/// only retains the most recent version of a record.
287
Query(cid): Query<Option<String>>,
288
) -> GetRecordResponse {
289
-
self.get_record_impl(repo, collection, rkey, cid).await
290
}
291
292
/// blue.microcosm.repo.getRecordByUri
···
356
return bad_at_uri();
357
};
358
359
-
// TODO: move this to links
360
-
let Some(rest) = normalized.strip_prefix("at://") else {
361
-
return bad_at_uri();
362
-
};
363
-
let Some((repo, rest)) = rest.split_once('/') else {
364
return bad_at_uri();
365
};
366
-
let Some((collection, rest)) = rest.split_once('/') else {
367
-
return bad_at_uri();
368
-
};
369
-
let rkey = if let Some((rkey, _rest)) = rest.split_once('?') {
370
-
rkey
371
-
} else {
372
-
rest
373
-
};
374
375
self.get_record_impl(
376
-
repo.to_string(),
377
-
collection.to_string(),
378
-
rkey.to_string(),
379
-
cid,
380
)
381
.await
382
}
···
456
/// Handle or DID to resolve
457
#[oai(example = "example_handle")]
458
Query(identifier): Query<String>,
459
-
) -> ResolveMiniIDResponse {
460
self.resolve_mini_id(Query(identifier)).await
461
}
462
···
474
/// Handle or DID to resolve
475
#[oai(example = "example_handle")]
476
Query(identifier): Query<String>,
477
-
) -> ResolveMiniIDResponse {
478
let invalid = |reason: &'static str| {
479
-
ResolveMiniIDResponse::BadRequest(xrpc_error("InvalidRequest", reason))
480
};
481
482
let mut unverified_handle = None;
483
-
let did = match Did::new(identifier.clone()) {
484
Ok(did) => did,
485
Err(_) => {
486
let Ok(alleged_handle) = Handle::new(identifier.to_lowercase()) else {
487
return invalid("Identifier was not a valid DID or handle");
488
};
489
490
-
match self.identity.handle_to_did(alleged_handle.clone()).await {
491
Ok(res) => {
492
if let Some(did) = res {
493
// we did it joe
···
505
}
506
}
507
};
508
-
let Ok(partial_doc) = self.identity.did_to_partial_mini_doc(&did).await else {
509
return invalid("Failed to get DID doc");
510
};
511
let Some(partial_doc) = partial_doc else {
···
525
"handle.invalid".to_string()
526
}
527
} else {
528
-
let Ok(handle_did) = self
529
-
.identity
530
.handle_to_did(partial_doc.unverified_handle.clone())
531
.await
532
else {
···
542
}
543
};
544
545
-
ResolveMiniIDResponse::Ok(Json(MiniDocResponseObject {
546
did: did.to_string(),
547
handle,
548
pds: partial_doc.pds,
···
550
}))
551
}
552
553
async fn get_record_impl(
554
&self,
555
-
repo: String,
556
-
collection: String,
557
-
rkey: String,
558
-
cid: Option<String>,
559
) -> GetRecordResponse {
560
-
let did = match Did::new(repo.clone()) {
561
Ok(did) => did,
562
Err(_) => {
563
let Ok(handle) = Handle::new(repo.to_lowercase()) else {
···
588
}
589
};
590
591
-
let Ok(collection) = Nsid::new(collection) else {
592
return GetRecordResponse::BadRequest(xrpc_error(
593
"InvalidRequest",
594
"Invalid NSID for collection",
595
));
596
};
597
598
-
let Ok(rkey) = RecordKey::new(rkey) else {
599
return GetRecordResponse::BadRequest(xrpc_error("InvalidRequest", "Invalid rkey"));
600
};
601
602
let cid: Option<Cid> = if let Some(cid) = cid {
603
-
let Ok(cid) = Cid::from_str(&cid) else {
604
return GetRecordResponse::BadRequest(xrpc_error("InvalidRequest", "Invalid CID"));
605
};
606
Some(cid)
···
748
cache: HybridCache<String, CachedRecord>,
749
identity: Identity,
750
repo: Repo,
751
acme_domain: Option<String>,
752
acme_contact: Option<String>,
753
acme_cache_path: Option<PathBuf>,
···
756
bind: std::net::SocketAddr,
757
) -> Result<(), ServerError> {
758
let repo = Arc::new(repo);
759
let api_service = OpenApiService::new(
760
Xrpc {
761
cache,
762
identity,
763
repo,
764
},
765
"Slingshot",
···
823
.with(
824
Cors::new()
825
.allow_origin_regex("*")
826
-
.allow_methods([Method::GET])
827
.allow_credentials(false),
828
)
829
.with(CatchPanic::new())
···
1
use crate::{
2
+
CachedRecord, ErrorResponseObject, Identity, Proxy, Repo,
3
error::{RecordError, ServerError},
4
+
proxy::{FullAtUriParts, MatchedRef, extract_links, split_uri},
5
+
record::RawRecord,
6
};
7
+
use atrium_api::types::string::{AtIdentifier, Cid, Did, Handle, Nsid, RecordKey};
8
use foyer::HybridCache;
9
use links::at_uri::parse_at_uri as normalize_at_uri;
10
use serde::Serialize;
11
+
use std::{collections::HashMap, path::PathBuf, str::FromStr, sync::Arc, time::Instant};
12
+
use tokio::sync::mpsc;
13
use tokio_util::sync::CancellationToken;
14
15
use poem::{
···
24
};
25
use poem_openapi::{
26
ApiResponse, ContactObject, ExternalDocumentObject, Object, OpenApi, OpenApiService, Tags,
27
+
Union, param::Query, payload::Json, types::Example,
28
};
29
30
fn example_handle() -> String {
···
33
fn example_did() -> String {
34
"did:plc:hdhoaan3xa3jiuq4fg4mefid".to_string()
35
}
36
+
fn example_service_did() -> String {
37
+
"did:web:constellation.microcosm.blue".to_string()
38
+
}
39
fn example_collection() -> String {
40
"app.bsky.feed.like".to_string()
41
}
42
fn example_rkey() -> String {
43
"3lv4ouczo2b2a".to_string()
44
+
}
45
+
fn example_id_fragment() -> String {
46
+
"#constellation".to_string()
47
}
48
fn example_uri() -> String {
49
format!(
···
60
"zQ3shpq1g134o7HGDb86CtQFxnHqzx5pZWknrVX2Waum3fF6j".to_string()
61
}
62
63
+
#[derive(Debug, Object)]
64
#[oai(example = true)]
65
struct XrpcErrorResponseObject {
66
/// Should correspond an error `name` in the lexicon errors array
···
91
}))
92
}
93
94
+
fn bad_request_handler_resolve_mini(err: poem::Error) -> ResolveMiniDocResponse {
95
+
ResolveMiniDocResponse::BadRequest(Json(XrpcErrorResponseObject {
96
+
error: "InvalidRequest".to_string(),
97
+
message: format!("Bad request, here's some info that maybe should not be exposed: {err}"),
98
+
}))
99
+
}
100
+
101
+
fn bad_request_handler_resolve_service(err: poem::Error) -> ResolveServiceResponse {
102
+
ResolveServiceResponse::BadRequest(Json(XrpcErrorResponseObject {
103
+
error: "InvalidRequest".to_string(),
104
+
message: format!("Bad request, here's some info that maybe should not be exposed: {err}"),
105
+
}))
106
+
}
107
+
108
+
fn bad_request_handler_proxy_query(err: poem::Error) -> ProxyHydrateResponse {
109
+
ProxyHydrateResponse::BadRequest(Json(XrpcErrorResponseObject {
110
error: "InvalidRequest".to_string(),
111
message: format!("Bad request, here's some info that maybe should not be exposed: {err}"),
112
}))
···
201
202
#[derive(ApiResponse)]
203
#[oai(bad_request_handler = "bad_request_handler_resolve_mini")]
204
+
enum ResolveMiniDocResponse {
205
/// Identity resolved
206
#[oai(status = 200)]
207
Ok(Json<MiniDocResponseObject>),
···
212
213
#[derive(Object)]
214
#[oai(example = true)]
215
+
struct ServiceResponseObject {
216
+
/// The service endpoint URL, if found
217
+
endpoint: String,
218
+
}
219
+
impl Example for ServiceResponseObject {
220
+
fn example() -> Self {
221
+
Self {
222
+
endpoint: "https://example.com".to_string(),
223
+
}
224
+
}
225
+
}
226
+
227
+
#[derive(ApiResponse)]
228
+
#[oai(bad_request_handler = "bad_request_handler_resolve_service")]
229
+
enum ResolveServiceResponse {
230
+
/// Service resolved
231
+
#[oai(status = 200)]
232
+
Ok(Json<ServiceResponseObject>),
233
+
/// Bad request or service not resolved
234
+
#[oai(status = 400)]
235
+
BadRequest(XrpcError),
236
+
}
237
+
238
+
#[derive(Object)]
239
+
#[oai(rename_all = "camelCase")]
240
+
struct ProxyHydrationError {
241
+
/// Short description of why the hydration failed
242
+
reason: String,
243
+
/// Whether or not it's recommended to retry requesting this item
244
+
should_retry: bool,
245
+
/// URL to follow up at if retrying
246
+
follow_up: String,
247
+
}
248
+
249
+
#[derive(Object)]
250
+
#[oai(rename_all = "camelCase")]
251
+
struct ProxyHydrationPending {
252
+
/// URL you can request to finish hydrating this item
253
+
follow_up: String,
254
+
/// Why this item couldn't be hydrated: 'deadline' or 'limit'
255
+
///
256
+
/// - `deadline`: the item fetch didn't complete before the response was
257
+
/// due, but will continue on slingshot in the background -- `followUp`
258
+
/// requests are coalesced into the original item fetch to be available as
259
+
/// early as possible.
260
+
///
261
+
/// - `limit`: slingshot only attempts to hydrate the first 100 items found
262
+
/// in a proxied response, with the remaining marked `pending`. You can
263
+
/// request `followUp` to fetch them.
264
+
///
265
+
/// In the future, Slingshot may put pending links after `limit` into a low-
266
+
/// priority fetch queue, so that these items become available sooner on
267
+
/// follow-up request as well.
268
+
reason: String,
269
+
}
270
+
271
+
// todo: there's gotta be a supertrait that collects these?
272
+
use poem_openapi::types::{IsObjectType, ParseFromJSON, ToJSON, Type};
273
+
274
+
#[derive(Union)]
275
+
#[oai(discriminator_name = "status", rename_all = "camelCase")]
276
+
enum Hydration<T: Send + Sync + Type + ToJSON + ParseFromJSON + IsObjectType> {
277
+
Error(ProxyHydrationError),
278
+
Pending(ProxyHydrationPending),
279
+
Found(T),
280
+
}
281
+
282
+
#[derive(Object)]
283
+
#[oai(example = true)]
284
+
struct ProxyHydrateResponseObject {
285
+
/// The original upstream response content
286
+
output: serde_json::Value,
287
+
/// Any hydrated records
288
+
records: HashMap<String, Hydration<FoundRecordResponseObject>>,
289
+
/// Any hydrated identifiers
290
+
identifiers: HashMap<String, Hydration<MiniDocResponseObject>>,
291
+
}
292
+
impl Example for ProxyHydrateResponseObject {
293
+
fn example() -> Self {
294
+
Self {
295
+
output: serde_json::json!({}),
296
+
records: HashMap::from([(
297
+
"asdf".into(),
298
+
Hydration::Pending(ProxyHydrationPending {
299
+
follow_up: "/xrpc/com.atproto.repo.getRecord?...".to_string(),
300
+
reason: "deadline".to_string(),
301
+
}),
302
+
)]),
303
+
identifiers: HashMap::new(),
304
+
}
305
+
}
306
+
}
307
+
308
+
#[derive(ApiResponse)]
309
+
#[oai(bad_request_handler = "bad_request_handler_proxy_query")]
310
+
enum ProxyHydrateResponse {
311
+
#[oai(status = 200)]
312
+
Ok(Json<ProxyHydrateResponseObject>),
313
+
#[oai(status = 400)]
314
+
BadRequest(XrpcError),
315
+
}
316
+
317
+
#[derive(Object)]
318
+
pub struct HydrationSource {
319
+
/// Record Path syntax for locating fields
320
+
pub path: String,
321
+
/// What to expect at the path: 'strong-ref', 'at-uri', 'at-uri-parts', 'did', 'handle', or 'at-identifier'.
322
+
///
323
+
/// - `strong-ref`: object in the shape of `com.atproto.repo.strongRef` with `uri` and `cid` keys.
324
+
/// - `at-uri`: string, must have all segments present (identifier, collection, rkey)
325
+
/// - `at-uri-parts`: object with keys (`repo` or `did`), `collection`, `rkey`, and optional `cid`. Other keys may be present and will be ignored.
326
+
/// - `did`: string, `did` format
327
+
/// - `handle`: string, `handle` format
328
+
/// - `at-identifier`: string, `did` or `handle` format
329
+
pub shape: String,
330
+
}
331
+
332
+
#[derive(Object)]
333
+
#[oai(example = true)]
334
+
struct ProxyQueryPayload {
335
+
/// The NSID of the XRPC you wish to forward
336
+
xrpc: String,
337
+
/// The destination service the request will be forwarded to
338
+
atproto_proxy: String,
339
+
/// An optional auth token to pass on
340
+
///
341
+
/// the `aud` field must match the upstream atproto_proxy service
342
+
authorization: Option<String>,
343
+
/// An optional set of labelers to request be applied by the upstream
344
+
atproto_accept_labelers: Option<String>,
345
+
/// The `params` for the destination service XRPC endpoint
346
+
///
347
+
/// Currently this will be passed along unchecked, but a future version of
348
+
/// slingshot may attempt to do lexicon resolution to validate `params`
349
+
/// based on the upstream service
350
+
params: Option<serde_json::Value>,
351
+
/// Paths within the response to look for at-uris that can be hydrated
352
+
hydration_sources: Vec<HydrationSource>,
353
+
// todo: let clients pass a hydration deadline?
354
+
}
355
+
impl Example for ProxyQueryPayload {
356
+
fn example() -> Self {
357
+
Self {
358
+
xrpc: "app.bsky.feed.getFeedSkeleton".to_string(),
359
+
atproto_proxy: "did:web:blue.mackuba.eu#bsky_fg".to_string(),
360
+
authorization: None,
361
+
atproto_accept_labelers: None,
362
+
params: Some(serde_json::json!({
363
+
"feed": "at://did:plc:oio4hkxaop4ao4wz2pp3f4cr/app.bsky.feed.generator/atproto",
364
+
})),
365
+
hydration_sources: vec![HydrationSource {
366
+
path: "feed[].post".to_string(),
367
+
shape: "at-uri".to_string(),
368
+
}],
369
+
}
370
+
}
371
+
}
372
+
373
+
#[derive(Object)]
374
+
#[oai(example = true)]
375
struct FoundDidResponseObject {
376
/// the DID, bi-directionally verified if using Slingshot
377
did: String,
···
399
}
400
401
struct Xrpc {
402
+
base_url: url::Url,
403
cache: HybridCache<String, CachedRecord>,
404
identity: Identity,
405
+
proxy: Arc<Proxy>,
406
repo: Arc<Repo>,
407
}
408
···
468
/// only retains the most recent version of a record.
469
Query(cid): Query<Option<String>>,
470
) -> GetRecordResponse {
471
+
self.get_record_impl(&repo, &collection, &rkey, cid.as_deref())
472
+
.await
473
}
474
475
/// blue.microcosm.repo.getRecordByUri
···
539
return bad_at_uri();
540
};
541
542
+
let Some((repo, collection, rkey)) = split_uri(&normalized) else {
543
return bad_at_uri();
544
};
545
546
self.get_record_impl(
547
+
Into::<String>::into(repo).as_str(),
548
+
collection.as_str(),
549
+
rkey.as_str(),
550
+
cid.as_deref(),
551
)
552
.await
553
}
···
627
/// Handle or DID to resolve
628
#[oai(example = "example_handle")]
629
Query(identifier): Query<String>,
630
+
) -> ResolveMiniDocResponse {
631
self.resolve_mini_id(Query(identifier)).await
632
}
633
···
645
/// Handle or DID to resolve
646
#[oai(example = "example_handle")]
647
Query(identifier): Query<String>,
648
+
) -> ResolveMiniDocResponse {
649
+
Self::resolve_mini_doc_impl(&identifier, self.identity.clone()).await
650
+
}
651
+
652
+
async fn resolve_mini_doc_impl(identifier: &str, identity: Identity) -> ResolveMiniDocResponse {
653
let invalid = |reason: &'static str| {
654
+
ResolveMiniDocResponse::BadRequest(xrpc_error("InvalidRequest", reason))
655
};
656
657
let mut unverified_handle = None;
658
+
let did = match Did::new(identifier.to_string()) {
659
Ok(did) => did,
660
Err(_) => {
661
let Ok(alleged_handle) = Handle::new(identifier.to_lowercase()) else {
662
return invalid("Identifier was not a valid DID or handle");
663
};
664
665
+
match identity.handle_to_did(alleged_handle.clone()).await {
666
Ok(res) => {
667
if let Some(did) = res {
668
// we did it joe
···
680
}
681
}
682
};
683
+
let Ok(partial_doc) = identity.did_to_partial_mini_doc(&did).await else {
684
return invalid("Failed to get DID doc");
685
};
686
let Some(partial_doc) = partial_doc else {
···
700
"handle.invalid".to_string()
701
}
702
} else {
703
+
let Ok(handle_did) = identity
704
.handle_to_did(partial_doc.unverified_handle.clone())
705
.await
706
else {
···
716
}
717
};
718
719
+
ResolveMiniDocResponse::Ok(Json(MiniDocResponseObject {
720
did: did.to_string(),
721
handle,
722
pds: partial_doc.pds,
···
724
}))
725
}
726
727
+
/// com.bad-example.identity.resolveService
728
+
///
729
+
/// resolve an atproto service did + id to its http endpoint
730
+
///
731
+
/// > [!important]
732
+
/// > this endpoint is experimental and may change
733
+
#[oai(
734
+
path = "/com.bad-example.identity.resolveService",
735
+
method = "get",
736
+
tag = "ApiTags::Custom"
737
+
)]
738
+
async fn resolve_service(
739
+
&self,
740
+
/// the service's did
741
+
#[oai(example = "example_service_did")]
742
+
Query(did): Query<String>,
743
+
/// id fragment, starting with '#'
744
+
///
745
+
/// must be url-encoded!
746
+
#[oai(example = "example_id_fragment")]
747
+
Query(id): Query<String>,
748
+
/// optionally, the exact service type to filter
749
+
///
750
+
/// resolving a pds requires matching the type as well as id. service
751
+
/// proxying ignores the type.
752
+
Query(r#type): Query<Option<String>>,
753
+
) -> ResolveServiceResponse {
754
+
let Ok(did) = Did::new(did) else {
755
+
return ResolveServiceResponse::BadRequest(xrpc_error(
756
+
"InvalidRequest",
757
+
"could not parse 'did' into a DID",
758
+
));
759
+
};
760
+
let identity = self.identity.clone();
761
+
Self::resolve_service_impl(&did, &id, r#type.as_deref(), identity).await
762
+
}
763
+
764
+
async fn resolve_service_impl(
765
+
did: &Did,
766
+
id_fragment: &str,
767
+
service_type: Option<&str>,
768
+
identity: Identity,
769
+
) -> ResolveServiceResponse {
770
+
let invalid = |reason: &'static str| {
771
+
ResolveServiceResponse::BadRequest(xrpc_error("InvalidRequest", reason))
772
+
};
773
+
let Ok(service_mini_doc) = identity.did_to_mini_service_doc(did).await else {
774
+
return invalid("Failed to get DID doc");
775
+
};
776
+
let Some(service_mini_doc) = service_mini_doc else {
777
+
return invalid("Failed to find DID doc");
778
+
};
779
+
780
+
let Some(matching) = service_mini_doc.get(id_fragment, service_type) else {
781
+
return invalid("failed to match identity (and maybe type)");
782
+
};
783
+
784
+
ResolveServiceResponse::Ok(Json(ServiceResponseObject {
785
+
endpoint: matching.endpoint.clone(),
786
+
}))
787
+
}
788
+
789
+
/// com.bad-example.proxy.hydrateQueryResponse
790
+
///
791
+
/// > [!important]
792
+
/// > Unstable! This endpoint is experimental and may change.
793
+
///
794
+
/// Fetch + include records referenced from an upstream xrpc query response
795
+
#[oai(
796
+
path = "/com.bad-example.proxy.hydrateQueryResponse",
797
+
method = "post",
798
+
tag = "ApiTags::Custom"
799
+
)]
800
+
async fn proxy_hydrate_query(
801
+
&self,
802
+
Json(payload): Json<ProxyQueryPayload>,
803
+
) -> ProxyHydrateResponse {
804
+
let params = if let Some(p) = payload.params {
805
+
let serde_json::Value::Object(map) = p else {
806
+
panic!("params have to be an object");
807
+
};
808
+
Some(map)
809
+
} else {
810
+
None
811
+
};
812
+
813
+
let Some((service_did, id_fragment)) = payload.atproto_proxy.rsplit_once("#") else {
814
+
return ProxyHydrateResponse::BadRequest(xrpc_error(
815
+
"BadParameter",
816
+
"atproto_proxy could not be understood",
817
+
));
818
+
};
819
+
820
+
let Ok(service_did) = service_did.parse() else {
821
+
return ProxyHydrateResponse::BadRequest(xrpc_error(
822
+
"BadParameter",
823
+
"atproto_proxy service did could not be parsed",
824
+
));
825
+
};
826
+
827
+
let Ok(xrpc) = payload.xrpc.parse() else {
828
+
return ProxyHydrateResponse::BadRequest(xrpc_error(
829
+
"BadParameter",
830
+
"invalid NSID for xrpc param",
831
+
));
832
+
};
833
+
834
+
match self
835
+
.proxy
836
+
.proxy(
837
+
&service_did,
838
+
&format!("#{id_fragment}"),
839
+
&xrpc,
840
+
payload.authorization.as_deref(),
841
+
payload.atproto_accept_labelers.as_deref(),
842
+
params,
843
+
)
844
+
.await
845
+
{
846
+
Ok(skeleton) => {
847
+
let links = match extract_links(payload.hydration_sources, &skeleton) {
848
+
Ok(l) => l,
849
+
Err(e) => {
850
+
log::warn!("problem extracting: {e:?}");
851
+
return ProxyHydrateResponse::BadRequest(xrpc_error(
852
+
"oop",
853
+
"sorry, error extracting",
854
+
));
855
+
}
856
+
};
857
+
let mut records = HashMap::new();
858
+
let mut identifiers = HashMap::new();
859
+
860
+
enum GetThing {
861
+
Record(String, Hydration<FoundRecordResponseObject>),
862
+
Identifier(String, Hydration<MiniDocResponseObject>),
863
+
}
864
+
865
+
let (tx, mut rx) = mpsc::channel(1);
866
+
867
+
let t0 = Instant::now();
868
+
869
+
for (i, link) in links.into_iter().enumerate() {
870
+
match link {
871
+
MatchedRef::AtUri(parts) => {
872
+
let non_canonical_url = parts.to_uri();
873
+
if records.contains_key(&non_canonical_url) {
874
+
log::warn!("skipping duplicate record without checking cid");
875
+
continue;
876
+
}
877
+
let mut follow_up = self.base_url.clone();
878
+
follow_up.set_path("/xrpc/com.atproto.repo.getRecord");
879
+
follow_up
880
+
.query_pairs_mut()
881
+
.append_pair("repo", &Into::<String>::into(parts.repo.clone()))
882
+
.append_pair("collection", parts.collection.as_str())
883
+
.append_pair("rkey", parts.rkey.as_str());
884
+
if let Some(ref cid) = parts.cid {
885
+
follow_up
886
+
.query_pairs_mut()
887
+
.append_pair("cid", &cid.as_ref().to_string());
888
+
}
889
+
890
+
if i >= 100 {
891
+
records.insert(
892
+
non_canonical_url.clone(),
893
+
Hydration::Pending(ProxyHydrationPending {
894
+
reason: "limit".to_string(),
895
+
follow_up: follow_up.to_string(),
896
+
}),
897
+
);
898
+
continue;
899
+
} else {
900
+
records.insert(
901
+
non_canonical_url.clone(),
902
+
Hydration::Pending(ProxyHydrationPending {
903
+
reason: "deadline".to_string(),
904
+
follow_up: follow_up.to_string(),
905
+
}),
906
+
);
907
+
}
908
+
909
+
let tx = tx.clone();
910
+
let identity = self.identity.clone();
911
+
let repo = self.repo.clone();
912
+
tokio::task::spawn(async move {
913
+
let FullAtUriParts {
914
+
repo: ident,
915
+
collection,
916
+
rkey,
917
+
cid,
918
+
} = parts;
919
+
let did = match ident {
920
+
AtIdentifier::Did(did) => did,
921
+
AtIdentifier::Handle(handle) => {
922
+
let Ok(Some(did)) = identity.handle_to_did(handle).await
923
+
else {
924
+
let res = Hydration::Error(ProxyHydrationError {
925
+
reason: "could not resolve handle".to_string(),
926
+
should_retry: true,
927
+
follow_up: follow_up.to_string(),
928
+
});
929
+
return if tx
930
+
.send(GetThing::Record(non_canonical_url, res))
931
+
.await
932
+
.is_ok()
933
+
{
934
+
metrics::counter!("slingshot_hydrated_one", "type" => "record", "ontime" => "true").increment(1);
935
+
} else {
936
+
metrics::counter!("slingshot_hydrated_one", "type" => "record", "ontime" => "false").increment(1);
937
+
};
938
+
};
939
+
did
940
+
}
941
+
};
942
+
943
+
let res =
944
+
match repo.get_record(&did, &collection, &rkey, &cid).await {
945
+
Ok(CachedRecord::Deleted) => {
946
+
Hydration::Error(ProxyHydrationError {
947
+
reason: "record deleted".to_string(),
948
+
should_retry: false,
949
+
follow_up: follow_up.to_string(),
950
+
})
951
+
}
952
+
Ok(CachedRecord::Found(RawRecord {
953
+
cid: found_cid,
954
+
record,
955
+
})) => {
956
+
if cid
957
+
.as_ref()
958
+
.map(|expected| *expected != found_cid)
959
+
.unwrap_or(false)
960
+
{
961
+
Hydration::Error(ProxyHydrationError {
962
+
reason: "not found".to_string(),
963
+
should_retry: false,
964
+
follow_up: follow_up.to_string(),
965
+
})
966
+
} else if let Ok(value) = serde_json::from_str(&record)
967
+
{
968
+
let canonical_uri = FullAtUriParts {
969
+
repo: AtIdentifier::Did(did),
970
+
collection,
971
+
rkey,
972
+
cid: None, // not used for .to_uri
973
+
}
974
+
.to_uri();
975
+
Hydration::Found(FoundRecordResponseObject {
976
+
cid: Some(found_cid.as_ref().to_string()),
977
+
uri: canonical_uri,
978
+
value,
979
+
})
980
+
} else {
981
+
Hydration::Error(ProxyHydrationError {
982
+
reason: "could not parse upstream response"
983
+
.to_string(),
984
+
should_retry: false,
985
+
follow_up: follow_up.to_string(),
986
+
})
987
+
}
988
+
}
989
+
Err(e) => {
990
+
log::warn!("finally oop {e:?}");
991
+
Hydration::Error(ProxyHydrationError {
992
+
reason: "failed to fetch record".to_string(),
993
+
should_retry: true, // TODO
994
+
follow_up: follow_up.to_string(),
995
+
})
996
+
}
997
+
};
998
+
if tx
999
+
.send(GetThing::Record(non_canonical_url, res))
1000
+
.await
1001
+
.is_ok()
1002
+
{
1003
+
metrics::counter!("slingshot_hydrated_one", "type" => "record", "ontime" => "true").increment(1);
1004
+
} else {
1005
+
metrics::counter!("slingshot_hydrated_one", "type" => "record", "ontime" => "false").increment(1);
1006
+
}
1007
+
});
1008
+
}
1009
+
MatchedRef::Identifier(id) => {
1010
+
let identifier: String = id.clone().into();
1011
+
if identifiers.contains_key(&identifier) {
1012
+
continue;
1013
+
}
1014
+
1015
+
let mut follow_up = self.base_url.clone();
1016
+
follow_up.set_path("/xrpc/blue.microcosm.identity.resolveMiniDoc");
1017
+
1018
+
follow_up
1019
+
.query_pairs_mut()
1020
+
.append_pair("identifier", &identifier);
1021
+
1022
+
if i >= 100 {
1023
+
identifiers.insert(
1024
+
identifier.clone(),
1025
+
Hydration::Pending(ProxyHydrationPending {
1026
+
reason: "limit".to_string(),
1027
+
follow_up: follow_up.to_string(),
1028
+
}),
1029
+
);
1030
+
continue;
1031
+
} else {
1032
+
identifiers.insert(
1033
+
identifier.clone(),
1034
+
Hydration::Pending(ProxyHydrationPending {
1035
+
reason: "deadline".to_string(),
1036
+
follow_up: follow_up.to_string(),
1037
+
}),
1038
+
);
1039
+
}
1040
+
1041
+
let tx = tx.clone();
1042
+
let identity = self.identity.clone();
1043
+
tokio::task::spawn(async move {
1044
+
let res = match Self::resolve_mini_doc_impl(&identifier, identity)
1045
+
.await
1046
+
{
1047
+
ResolveMiniDocResponse::Ok(Json(mini_doc)) => {
1048
+
Hydration::Found(mini_doc)
1049
+
}
1050
+
ResolveMiniDocResponse::BadRequest(e) => {
1051
+
log::warn!("minidoc fail: {:?}", e.0);
1052
+
Hydration::Error(ProxyHydrationError {
1053
+
reason: "failed to resolve mini doc".to_string(),
1054
+
should_retry: false,
1055
+
follow_up: follow_up.to_string(),
1056
+
})
1057
+
}
1058
+
};
1059
+
if tx.send(GetThing::Identifier(identifier, res)).await.is_ok() {
1060
+
metrics::counter!("slingshot_hydrated_one", "type" => "identity", "ontime" => "true").increment(1);
1061
+
} else {
1062
+
metrics::counter!("slingshot_hydrated_one", "type" => "identity", "ontime" => "false").increment(1);
1063
+
}
1064
+
});
1065
+
}
1066
+
}
1067
+
}
1068
+
// so the channel can close when all are completed
1069
+
// (we shoudl be doing a timeout...)
1070
+
drop(tx);
1071
+
1072
+
let deadline = t0 + std::time::Duration::from_secs_f64(1.6);
1073
+
let res = tokio::time::timeout_at(deadline.into(), async {
1074
+
while let Some(hydration) = rx.recv().await {
1075
+
match hydration {
1076
+
GetThing::Record(uri, h) => {
1077
+
if let Some(r) = records.get_mut(&uri) {
1078
+
match (&r, &h) {
1079
+
(_, Hydration::Found(_)) => *r = h, // always replace if found
1080
+
(Hydration::Pending(_), _) => *r = h, // or if it was pending
1081
+
_ => {} // else leave it
1082
+
}
1083
+
} else {
1084
+
records.insert(uri, h);
1085
+
}
1086
+
}
1087
+
GetThing::Identifier(identifier, md) => {
1088
+
identifiers.insert(identifier.to_string(), md);
1089
+
}
1090
+
};
1091
+
}
1092
+
})
1093
+
.await;
1094
+
1095
+
if res.is_ok() {
1096
+
metrics::histogram!("slingshot_hydration_all_completed").record(t0.elapsed());
1097
+
} else {
1098
+
metrics::counter!("slingshot_hydration_cut_off").increment(1);
1099
+
}
1100
+
1101
+
ProxyHydrateResponse::Ok(Json(ProxyHydrateResponseObject {
1102
+
output: skeleton,
1103
+
records,
1104
+
identifiers,
1105
+
}))
1106
+
}
1107
+
Err(e) => {
1108
+
log::warn!("oh no: {e:?}");
1109
+
ProxyHydrateResponse::BadRequest(xrpc_error("oop", "sorry"))
1110
+
}
1111
+
}
1112
+
}
1113
+
1114
async fn get_record_impl(
1115
&self,
1116
+
repo: &str,
1117
+
collection: &str,
1118
+
rkey: &str,
1119
+
cid: Option<&str>,
1120
) -> GetRecordResponse {
1121
+
let did = match Did::new(repo.to_string()) {
1122
Ok(did) => did,
1123
Err(_) => {
1124
let Ok(handle) = Handle::new(repo.to_lowercase()) else {
···
1149
}
1150
};
1151
1152
+
let Ok(collection) = Nsid::new(collection.to_string()) else {
1153
return GetRecordResponse::BadRequest(xrpc_error(
1154
"InvalidRequest",
1155
"Invalid NSID for collection",
1156
));
1157
};
1158
1159
+
let Ok(rkey) = RecordKey::new(rkey.to_string()) else {
1160
return GetRecordResponse::BadRequest(xrpc_error("InvalidRequest", "Invalid rkey"));
1161
};
1162
1163
let cid: Option<Cid> = if let Some(cid) = cid {
1164
+
let Ok(cid) = Cid::from_str(cid) else {
1165
return GetRecordResponse::BadRequest(xrpc_error("InvalidRequest", "Invalid CID"));
1166
};
1167
Some(cid)
···
1309
cache: HybridCache<String, CachedRecord>,
1310
identity: Identity,
1311
repo: Repo,
1312
+
proxy: Proxy,
1313
+
base_url: url::Url,
1314
acme_domain: Option<String>,
1315
acme_contact: Option<String>,
1316
acme_cache_path: Option<PathBuf>,
···
1319
bind: std::net::SocketAddr,
1320
) -> Result<(), ServerError> {
1321
let repo = Arc::new(repo);
1322
+
let proxy = Arc::new(proxy);
1323
let api_service = OpenApiService::new(
1324
Xrpc {
1325
+
base_url,
1326
cache,
1327
identity,
1328
+
proxy,
1329
repo,
1330
},
1331
"Slingshot",
···
1389
.with(
1390
Cors::new()
1391
.allow_origin_regex("*")
1392
+
.allow_methods([Method::GET, Method::POST])
1393
.allow_credentials(false),
1394
)
1395
.with(CatchPanic::new())
History
2 rounds
2 comments
bad-example.com
submitted
#1
8 commits
expand
collapse
very hack proxy stuff (wip)
identities!!!!
service doc cache
proxy: use service cache
fmt
include headers, timeout hydration, measure times
base_url, typed url parts, api fixups,
only try to hydrate 100 records
no conflicts, ready to merge
expand 0 comments
bad-example.com
submitted
#0
1 commit
expand
collapse
very hack proxy stuff (wip)
the record contents from
com.atproto.repo.getRecordare returned under a key called "value", so this should probably use that.