Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm

hydrating xrpc proxy (wip) #15

open opened by bad-example.com targeting main from slingshot-proxy-hydrate
Labels

None yet.

Participants 1
AT URI
at://did:plc:hdhoaan3xa3jiuq4fg4mefid/sh.tangled.repo.pull/3me2sdlxbvy22
+926 -334
Interdiff #0 โ†’ #1
+2 -2
Cargo.lock
··· 803 804 [[package]] 805 name = "bytes" 806 - version = "1.11.1" 807 source = "registry+https://github.com/rust-lang/crates.io-index" 808 - checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" 809 810 [[package]] 811 name = "byteview"
··· 803 804 [[package]] 805 name = "bytes" 806 + version = "1.10.1" 807 source = "registry+https://github.com/rust-lang/crates.io-index" 808 + checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" 809 810 [[package]] 811 name = "byteview"
slingshot/Cargo.toml

This file has not been changed.

+8
slingshot/src/error.rs
··· 100 UrlParseError(#[from] url::ParseError), 101 #[error(transparent)] 102 ReqwestError(#[from] reqwest::Error), 103 }
··· 100 UrlParseError(#[from] url::ParseError), 101 #[error(transparent)] 102 ReqwestError(#[from] reqwest::Error), 103 + #[error(transparent)] 104 + InvalidHeader(#[from] reqwest::header::InvalidHeaderValue), 105 + #[error(transparent)] 106 + IdentityError(#[from] IdentityError), 107 + #[error("upstream service could not be resolved")] 108 + ServiceNotFound, 109 + #[error("upstream service was found but no services matched")] 110 + ServiceNotMatched, 111 }
slingshot/src/lib.rs

This file has not been changed.

+20 -6
slingshot/src/main.rs
··· 1 - // use foyer::HybridCache; 2 - // use foyer::{Engine, DirectFsDeviceOptions, HybridCacheBuilder}; 3 use metrics_exporter_prometheus::PrometheusBuilder; 4 use slingshot::{ 5 Identity, Proxy, Repo, consume, error::MainTaskError, firehose_cache, healthcheck, serve, ··· 9 10 use clap::Parser; 11 use tokio_util::sync::CancellationToken; 12 13 /// Slingshot record edge cache 14 #[derive(Parser, Debug, Clone)] ··· 48 #[arg(long, env = "SLINGSHOT_IDENTITY_CACHE_DISK_DB")] 49 #[clap(default_value_t = 1)] 50 identity_cache_disk_gb: usize, 51 /// the domain pointing to this server 52 /// 53 /// if present: ··· 101 102 let args = Args::parse(); 103 104 if args.collect_metrics { 105 log::trace!("installing metrics server..."); 106 if let Err(e) = install_metrics_server(args.bind_metrics) { ··· 152 log::info!("identity service ready."); 153 154 let repo = Repo::new(identity.clone()); 155 - let proxy = Proxy::new(repo.clone()); 156 157 let identity_for_server = identity.clone(); 158 let server_shutdown = shutdown.clone(); ··· 164 identity_for_server, 165 repo, 166 proxy, 167 args.acme_domain, 168 args.acme_contact, 169 args.acme_cache_path, ··· 236 ) -> Result<(), metrics_exporter_prometheus::BuildError> { 237 log::info!("installing metrics server..."); 238 PrometheusBuilder::new() 239 - .set_quantiles(&[0.5, 0.9, 0.99, 1.0])? 240 - .set_bucket_duration(std::time::Duration::from_secs(300))? 241 - .set_bucket_count(std::num::NonZero::new(12).unwrap()) // count * duration = 60 mins. stuff doesn't happen that fast here. 242 .set_enable_unit_suffix(false) // this seemed buggy for constellation (sometimes wouldn't engage) 243 .with_http_listener(bind_metrics) 244 .install()?;
··· 1 use metrics_exporter_prometheus::PrometheusBuilder; 2 use slingshot::{ 3 Identity, Proxy, Repo, consume, error::MainTaskError, firehose_cache, healthcheck, serve, ··· 7 8 use clap::Parser; 9 use tokio_util::sync::CancellationToken; 10 + use url::Url; 11 12 /// Slingshot record edge cache 13 #[derive(Parser, Debug, Clone)] ··· 47 #[arg(long, env = "SLINGSHOT_IDENTITY_CACHE_DISK_DB")] 48 #[clap(default_value_t = 1)] 49 identity_cache_disk_gb: usize, 50 + /// the address of this server 51 + /// 52 + /// used if --acme-domain is not set, defaulting to `--bind` 53 + #[arg(long, conflicts_with("acme_domain"), env = "SLINGSHOT_PUBLIC_HOST")] 54 + base_url: Option<Url>, 55 /// the domain pointing to this server 56 /// 57 /// if present: ··· 105 106 let args = Args::parse(); 107 108 + let base_url: Url = args 109 + .base_url 110 + .or_else(|| { 111 + args.acme_domain 112 + .as_ref() 113 + .map(|d| Url::parse(&format!("https://{d}")).unwrap()) 114 + }) 115 + .unwrap_or_else(|| Url::parse(&format!("http://{}", args.bind)).unwrap()); 116 + 117 if args.collect_metrics { 118 log::trace!("installing metrics server..."); 119 if let Err(e) = install_metrics_server(args.bind_metrics) { ··· 165 log::info!("identity service ready."); 166 167 let repo = Repo::new(identity.clone()); 168 + let proxy = Proxy::new(identity.clone()); 169 170 let identity_for_server = identity.clone(); 171 let server_shutdown = shutdown.clone(); ··· 177 identity_for_server, 178 repo, 179 proxy, 180 + base_url, 181 args.acme_domain, 182 args.acme_contact, 183 args.acme_cache_path, ··· 250 ) -> Result<(), metrics_exporter_prometheus::BuildError> { 251 log::info!("installing metrics server..."); 252 PrometheusBuilder::new() 253 + .set_buckets(&[0.001, 0.006, 0.036, 0.216, 1.296, 7.776, 45.656])? 254 + .set_bucket_duration(std::time::Duration::from_secs(15))? 255 + .set_bucket_count(std::num::NonZero::new(4).unwrap()) // count * duration = bucket lifetime 256 .set_enable_unit_suffix(false) // this seemed buggy for constellation (sometimes wouldn't engage) 257 .with_http_listener(bind_metrics) 258 .install()?;
+247 -133
slingshot/src/proxy.rs
··· 1 - use serde::Deserialize; 2 - use url::Url; 3 - use std::{collections::HashMap, time::Duration}; 4 - use crate::{Repo, server::HydrationSource, error::ProxyError}; 5 use reqwest::Client; 6 use serde_json::{Map, Value}; 7 8 pub enum ParamValue { 9 String(Vec<String>), ··· 13 pub struct Params(HashMap<String, ParamValue>); 14 15 impl TryFrom<Map<String, Value>> for Params { 16 - type Error = (); // TODO 17 fn try_from(val: Map<String, Value>) -> Result<Self, Self::Error> { 18 let mut out = HashMap::new(); 19 for (k, v) in val { ··· 70 71 #[derive(Clone)] 72 pub struct Proxy { 73 - repo: Repo, 74 client: Client, 75 } 76 77 impl Proxy { 78 - pub fn new(repo: Repo) -> Self { 79 let client = Client::builder() 80 .user_agent(format!( 81 "microcosm slingshot v{} (contact: @bad-example.com)", ··· 85 .timeout(Duration::from_secs(6)) 86 .build() 87 .unwrap(); 88 - Self { repo, client } 89 } 90 91 pub async fn proxy( 92 &self, 93 - xrpc: String, 94 - service: String, 95 params: Option<Map<String, Value>>, 96 ) -> Result<Value, ProxyError> { 97 - 98 - // hackin it to start 99 - 100 - // 1. assume did-web (TODO) and get the did doc 101 - #[derive(Debug, Deserialize)] 102 - struct ServiceDoc { 103 - id: String, 104 - service: Vec<ServiceItem>, 105 - } 106 - #[derive(Debug, Deserialize)] 107 - struct ServiceItem { 108 - id: String, 109 - #[expect(unused)] 110 - r#type: String, 111 - #[serde(rename = "serviceEndpoint")] 112 - service_endpoint: Url, 113 - } 114 - let dw = service.strip_prefix("did:web:").expect("a did web"); 115 - let (dw, service_id) = dw.split_once("#").expect("whatever"); 116 - let mut dw_url = Url::parse(&format!("https://{dw}"))?; 117 - dw_url.set_path("/.well-known/did.json"); 118 - let doc: ServiceDoc = self.client 119 - .get(dw_url) 120 - .send() 121 .await? 122 - .error_for_status()? 123 - .json() 124 - .await?; 125 126 - assert_eq!(doc.id, format!("did:web:{}", dw)); 127 128 - let mut upstream = None; 129 - for ServiceItem { id, service_endpoint, .. } in doc.service { 130 - let Some((_, id)) = id.split_once("#") else { continue; }; 131 - if id != service_id { continue; }; 132 - upstream = Some(service_endpoint); 133 - break; 134 - } 135 - 136 - // 2. proxy the request forward 137 - let mut upstream = upstream.expect("to find it"); 138 - upstream.set_path(&format!("/xrpc/{xrpc}")); // TODO: validate nsid 139 - 140 if let Some(params) = params { 141 let mut query = upstream.query_pairs_mut(); 142 let Params(ps) = params.try_into().expect("valid params"); ··· 161 } 162 } 163 164 - // TODO: other headers to proxy 165 - Ok(self.client 166 .get(upstream) 167 .send() 168 - .await? 169 - .error_for_status()? 170 - .json() 171 - .await?) 172 } 173 } 174 ··· 188 while let Some((i, c)) = chars.next() { 189 match c { 190 '[' if in_bracket => return Err(format!("nested opening bracket not allowed, at {i}")), 191 - '[' if key_acc.is_empty() => return Err(format!("missing key before opening bracket, at {i}")), 192 '[' => in_bracket = true, 193 ']' if in_bracket => { 194 in_bracket = false; 195 let key = std::mem::take(&mut key_acc); 196 let r#type = std::mem::take(&mut type_acc); 197 - let t = if r#type.is_empty() { None } else { Some(r#type) }; 198 out.push(PathPart::Vector(key, t)); 199 // peek ahead because we need a dot after array if there's more and i don't want to add more loop state 200 let Some((i, c)) = chars.next() else { 201 break; 202 }; 203 if c != '.' { 204 - return Err(format!("expected dot after close bracket, found {c:?} at {i}")); 205 } 206 } 207 ']' => return Err(format!("unexpected close bracket at {i}")), 208 '.' if in_bracket => type_acc.push(c), 209 - '.' if key_acc.is_empty() => return Err(format!("missing key before next segment, at {i}")), 210 '.' => { 211 let key = std::mem::take(&mut key_acc); 212 assert!(type_acc.is_empty()); ··· 251 } 252 253 #[derive(Debug, PartialEq)] 254 pub enum MatchedRef { 255 - AtUri { 256 - uri: String, 257 - cid: Option<String>, 258 - }, 259 - Identifier(String), 260 } 261 262 pub fn match_shape(shape: RefShape, val: &Value) -> Option<MatchedRef> { ··· 268 RefShape::StrongRef => { 269 let o = val.as_object()?; 270 let uri = o.get("uri")?.as_str()?.to_string(); 271 - let cid = o.get("cid")?.as_str()?.to_string(); 272 - Some(MatchedRef::AtUri { uri, cid: Some(cid) }) 273 } 274 RefShape::AtUri => { 275 let uri = val.as_str()?.to_string(); 276 - Some(MatchedRef::AtUri { uri, cid: None }) 277 } 278 RefShape::AtUriParts => { 279 let o = val.as_object()?; 280 - let identifier = o.get("repo").or(o.get("did"))?.as_str()?.to_string(); 281 - let collection = o.get("collection")?.as_str()?.to_string(); 282 - let rkey = o.get("rkey")?.as_str()?.to_string(); 283 - let uri = format!("at://{identifier}/{collection}/{rkey}"); 284 - let cid = o.get("cid").and_then(|v| v.as_str()).map(str::to_string); 285 - Some(MatchedRef::AtUri { uri, cid }) 286 } 287 RefShape::Did => { 288 - let id = val.as_str()?; 289 - if !id.starts_with("did:") { 290 - return None; 291 - } 292 - Some(MatchedRef::Identifier(id.to_string())) 293 } 294 RefShape::Handle => { 295 - let id = val.as_str()?; 296 - if id.contains(':') { 297 - return None; 298 - } 299 - Some(MatchedRef::Identifier(id.to_string())) 300 } 301 RefShape::AtIdentifier => { 302 - Some(MatchedRef::Identifier(val.as_str()?.to_string())) 303 } 304 } 305 } ··· 339 } 340 impl<'a> PathWalker<'a> { 341 fn new(path_parts: &'a [PathPart], skeleton: &'a Value) -> Self { 342 - Self { todo: vec![(path_parts, skeleton)] } 343 } 344 } 345 impl<'a> Iterator for PathWalker<'a> { ··· 364 let Some(a) = o.get(k).and_then(|v| v.as_array()) else { 365 continue; 366 }; 367 - for v in a 368 - .iter() 369 - .rev() 370 - .filter(|c| { 371 - let Some(t) = t else { return true }; 372 - c 373 - .as_object() 374 - .and_then(|o| o.get("$type")) 375 - .and_then(|v| v.as_str()) 376 - .map(|s| s == t) 377 - .unwrap_or(false) 378 - }) 379 - { 380 self.todo.push((rest, v)) 381 } 382 } ··· 385 } 386 } 387 388 - 389 #[cfg(test)] 390 mod tests { 391 use super::*; 392 use serde_json::json; 393 394 #[test] 395 fn test_parse_record_path() -> Result<(), Box<dyn std::error::Error>> { 396 let cases = [ 397 ("", vec![]), 398 ("subject", vec![PathPart::Scalar("subject".into())]), 399 ("authorDid", vec![PathPart::Scalar("authorDid".into())]), 400 - ("subject.uri", vec![PathPart::Scalar("subject".into()), PathPart::Scalar("uri".into())]), 401 ("members[]", vec![PathPart::Vector("members".into(), None)]), 402 - ("add[].key", vec![ 403 - PathPart::Vector("add".into(), None), 404 - PathPart::Scalar("key".into()), 405 - ]), 406 ("a[b]", vec![PathPart::Vector("a".into(), Some("b".into()))]), 407 - ("a[b.c]", vec![PathPart::Vector("a".into(), Some("b.c".into()))]), 408 - ("facets[app.bsky.richtext.facet].features[app.bsky.richtext.facet#mention].did", vec![ 409 - PathPart::Vector("facets".into(), Some("app.bsky.richtext.facet".into())), 410 - PathPart::Vector("features".into(), Some("app.bsky.richtext.facet#mention".into())), 411 - PathPart::Scalar("did".into()), 412 - ]), 413 ]; 414 415 for (path, expected) in cases { ··· 426 ("strong-ref", json!(""), None), 427 ("strong-ref", json!({}), None), 428 ("strong-ref", json!({ "uri": "abc" }), None), 429 - ("strong-ref", json!({ "cid": "def" }), None), 430 ( 431 "strong-ref", 432 - json!({ "uri": "abc", "cid": "def" }), 433 - Some(MatchedRef::AtUri { uri: "abc".to_string(), cid: Some("def".to_string()) }), 434 ), 435 ("at-uri", json!({ "uri": "abc" }), None), 436 - ("at-uri", json!({ "uri": "abc", "cid": "def" }), None), 437 ( 438 "at-uri", 439 - json!("abc"), 440 - Some(MatchedRef::AtUri { uri: "abc".to_string(), cid: None }), 441 ), 442 ("at-uri-parts", json!("abc"), None), 443 ("at-uri-parts", json!({}), None), 444 ( 445 "at-uri-parts", 446 - json!({"repo": "a", "collection": "b", "rkey": "c"}), 447 - Some(MatchedRef::AtUri { uri: "at://a/b/c".to_string(), cid: None }), 448 ), 449 ( 450 "at-uri-parts", 451 - json!({"did": "a", "collection": "b", "rkey": "c"}), 452 - Some(MatchedRef::AtUri { uri: "at://a/b/c".to_string(), cid: None }), 453 ), 454 ( 455 "at-uri-parts", 456 // 'repo' takes precedence over 'did' 457 - json!({"did": "a", "repo": "z", "collection": "b", "rkey": "c"}), 458 - Some(MatchedRef::AtUri { uri: "at://z/b/c".to_string(), cid: None }), 459 ), 460 ( 461 "at-uri-parts", 462 - json!({"repo": "a", "collection": "b", "rkey": "c", "cid": "def"}), 463 - Some(MatchedRef::AtUri { uri: "at://a/b/c".to_string(), cid: Some("def".to_string()) }), 464 ), 465 ( 466 "at-uri-parts", 467 - json!({"repo": "a", "collection": "b", "rkey": "c", "cid": {}}), 468 - Some(MatchedRef::AtUri { uri: "at://a/b/c".to_string(), cid: None }), 469 ), 470 ("did", json!({}), None), 471 ("did", json!(""), None), 472 ("did", json!("bad-example.com"), None), 473 - ("did", json!("did:plc:xyz"), Some(MatchedRef::Identifier("did:plc:xyz".to_string()))), 474 ("handle", json!({}), None), 475 - ("handle", json!("bad-example.com"), Some(MatchedRef::Identifier("bad-example.com".to_string()))), 476 ("handle", json!("did:plc:xyz"), None), 477 ("at-identifier", json!({}), None), 478 - ("at-identifier", json!("bad-example.com"), Some(MatchedRef::Identifier("bad-example.com".to_string()))), 479 - ("at-identifier", json!("did:plc:xyz"), Some(MatchedRef::Identifier("did:plc:xyz".to_string()))), 480 ]; 481 - for (shape, val, expected) in cases { 482 let s = shape.try_into().unwrap(); 483 let matched = match_shape(s, &val); 484 - assert_eq!(matched, expected, "shape: {shape:?}, val: {val:?}"); 485 } 486 } 487 }
··· 1 + use crate::{Identity, error::ProxyError, server::HydrationSource}; 2 + use atrium_api::types::string::{AtIdentifier, Cid, Did, Nsid, RecordKey}; 3 use reqwest::Client; 4 use serde_json::{Map, Value}; 5 + use std::{collections::HashMap, time::Duration}; 6 + use url::Url; 7 8 pub enum ParamValue { 9 String(Vec<String>), ··· 13 pub struct Params(HashMap<String, ParamValue>); 14 15 impl TryFrom<Map<String, Value>> for Params { 16 + type Error = (); // TODO 17 fn try_from(val: Map<String, Value>) -> Result<Self, Self::Error> { 18 let mut out = HashMap::new(); 19 for (k, v) in val { ··· 70 71 #[derive(Clone)] 72 pub struct Proxy { 73 + identity: Identity, 74 client: Client, 75 } 76 77 impl Proxy { 78 + pub fn new(identity: Identity) -> Self { 79 let client = Client::builder() 80 .user_agent(format!( 81 "microcosm slingshot v{} (contact: @bad-example.com)", ··· 85 .timeout(Duration::from_secs(6)) 86 .build() 87 .unwrap(); 88 + Self { client, identity } 89 } 90 91 pub async fn proxy( 92 &self, 93 + service_did: &Did, 94 + service_id: &str, 95 + xrpc: &Nsid, 96 + authorization: Option<&str>, 97 + atproto_accept_labelers: Option<&str>, 98 params: Option<Map<String, Value>>, 99 ) -> Result<Value, ProxyError> { 100 + let mut upstream: Url = self 101 + .identity 102 + .did_to_mini_service_doc(service_did) 103 .await? 104 + .ok_or(ProxyError::ServiceNotFound)? 105 + .get(service_id, None) 106 + .ok_or(ProxyError::ServiceNotMatched)? 107 + .endpoint 108 + .parse()?; 109 110 + upstream.set_path(&format!("/xrpc/{}", xrpc.as_str())); 111 112 if let Some(params) = params { 113 let mut query = upstream.query_pairs_mut(); 114 let Params(ps) = params.try_into().expect("valid params"); ··· 133 } 134 } 135 136 + // TODO i mean maybe we should look for headers also in our headers but not obviously 137 + let mut headers = reqwest::header::HeaderMap::new(); 138 + // TODO: check the jwt aud against the upstream!!! 139 + if let Some(auth) = authorization { 140 + headers.insert("Authorization", auth.try_into()?); 141 + } 142 + if let Some(aal) = atproto_accept_labelers { 143 + headers.insert("atproto-accept-labelers", aal.try_into()?); 144 + } 145 + 146 + let t0 = std::time::Instant::now(); 147 + let res = self 148 + .client 149 .get(upstream) 150 + .headers(headers) 151 .send() 152 + .await 153 + .and_then(|r| r.error_for_status()); 154 + 155 + if res.is_ok() { 156 + metrics::histogram!("slingshot_proxy_upstream_request", "success" => "true") 157 + .record(t0.elapsed()); 158 + } else { 159 + metrics::histogram!("slingshot_proxy_upstream_request", "success" => "false") 160 + .record(t0.elapsed()); 161 + } 162 + 163 + Ok(res?.json().await?) 164 } 165 } 166 ··· 180 while let Some((i, c)) = chars.next() { 181 match c { 182 '[' if in_bracket => return Err(format!("nested opening bracket not allowed, at {i}")), 183 + '[' if key_acc.is_empty() => { 184 + return Err(format!("missing key before opening bracket, at {i}")); 185 + } 186 '[' => in_bracket = true, 187 ']' if in_bracket => { 188 in_bracket = false; 189 let key = std::mem::take(&mut key_acc); 190 let r#type = std::mem::take(&mut type_acc); 191 + let t = if r#type.is_empty() { 192 + None 193 + } else { 194 + Some(r#type) 195 + }; 196 out.push(PathPart::Vector(key, t)); 197 // peek ahead because we need a dot after array if there's more and i don't want to add more loop state 198 let Some((i, c)) = chars.next() else { 199 break; 200 }; 201 if c != '.' { 202 + return Err(format!( 203 + "expected dot after close bracket, found {c:?} at {i}" 204 + )); 205 } 206 } 207 ']' => return Err(format!("unexpected close bracket at {i}")), 208 '.' if in_bracket => type_acc.push(c), 209 + '.' if key_acc.is_empty() => { 210 + return Err(format!("missing key before next segment, at {i}")); 211 + } 212 '.' => { 213 let key = std::mem::take(&mut key_acc); 214 assert!(type_acc.is_empty()); ··· 253 } 254 255 #[derive(Debug, PartialEq)] 256 + pub struct FullAtUriParts { 257 + pub repo: AtIdentifier, 258 + pub collection: Nsid, 259 + pub rkey: RecordKey, 260 + pub cid: Option<Cid>, 261 + } 262 + 263 + impl FullAtUriParts { 264 + pub fn to_uri(&self) -> String { 265 + let repo: String = self.repo.clone().into(); // no as_str for AtIdentifier atrium??? 266 + let collection = self.collection.as_str(); 267 + let rkey = self.rkey.as_str(); 268 + format!("at://{repo}/{collection}/{rkey}") 269 + } 270 + } 271 + 272 + // TODO: move this to links 273 + pub fn split_uri(uri: &str) -> Option<(AtIdentifier, Nsid, RecordKey)> { 274 + let rest = uri.strip_prefix("at://")?; 275 + let (repo, rest) = rest.split_once("/")?; 276 + let repo = repo.parse().ok()?; 277 + let (collection, rkey) = rest.split_once("/")?; 278 + let collection = collection.parse().ok()?; 279 + let rkey = rkey.split_once('#').map(|(k, _)| k).unwrap_or(rkey); 280 + let rkey = rkey.split_once('?').map(|(k, _)| k).unwrap_or(rkey); 281 + let rkey = rkey.parse().ok()?; 282 + Some((repo, collection, rkey)) 283 + } 284 + 285 + #[derive(Debug, PartialEq)] 286 pub enum MatchedRef { 287 + AtUri(FullAtUriParts), 288 + Identifier(AtIdentifier), 289 } 290 291 pub fn match_shape(shape: RefShape, val: &Value) -> Option<MatchedRef> { ··· 297 RefShape::StrongRef => { 298 let o = val.as_object()?; 299 let uri = o.get("uri")?.as_str()?.to_string(); 300 + let cid = o.get("cid")?.as_str()?.parse().ok()?; 301 + let (repo, collection, rkey) = split_uri(&uri)?; 302 + Some(MatchedRef::AtUri(FullAtUriParts { 303 + repo, 304 + collection, 305 + rkey, 306 + cid: Some(cid), 307 + })) 308 } 309 RefShape::AtUri => { 310 let uri = val.as_str()?.to_string(); 311 + let (repo, collection, rkey) = split_uri(&uri)?; 312 + Some(MatchedRef::AtUri(FullAtUriParts { 313 + repo, 314 + collection, 315 + rkey, 316 + cid: None, 317 + })) 318 } 319 RefShape::AtUriParts => { 320 let o = val.as_object()?; 321 + let repo = o.get("repo").or(o.get("did"))?.as_str()?.parse().ok()?; 322 + let collection = o.get("collection")?.as_str()?.parse().ok()?; 323 + let rkey = o.get("rkey")?.as_str()?.parse().ok()?; 324 + let cid = o 325 + .get("cid") 326 + .and_then(|v| v.as_str()) 327 + .and_then(|s| s.parse().ok()); 328 + Some(MatchedRef::AtUri(FullAtUriParts { 329 + repo, 330 + collection, 331 + rkey, 332 + cid, 333 + })) 334 } 335 RefShape::Did => { 336 + let did = val.as_str()?.parse().ok()?; 337 + Some(MatchedRef::Identifier(AtIdentifier::Did(did))) 338 } 339 RefShape::Handle => { 340 + let handle = val.as_str()?.parse().ok()?; 341 + Some(MatchedRef::Identifier(AtIdentifier::Handle(handle))) 342 } 343 RefShape::AtIdentifier => { 344 + let identifier = val.as_str()?.parse().ok()?; 345 + Some(MatchedRef::Identifier(identifier)) 346 } 347 } 348 } ··· 382 } 383 impl<'a> PathWalker<'a> { 384 fn new(path_parts: &'a [PathPart], skeleton: &'a Value) -> Self { 385 + Self { 386 + todo: vec![(path_parts, skeleton)], 387 + } 388 } 389 } 390 impl<'a> Iterator for PathWalker<'a> { ··· 409 let Some(a) = o.get(k).and_then(|v| v.as_array()) else { 410 continue; 411 }; 412 + for v in a.iter().rev().filter(|c| { 413 + let Some(t) = t else { return true }; 414 + c.as_object() 415 + .and_then(|o| o.get("$type")) 416 + .and_then(|v| v.as_str()) 417 + .map(|s| s == t) 418 + .unwrap_or(false) 419 + }) { 420 self.todo.push((rest, v)) 421 } 422 } ··· 425 } 426 } 427 428 #[cfg(test)] 429 mod tests { 430 use super::*; 431 use serde_json::json; 432 433 + static TEST_CID: &str = "bafyreidffwk5wvh5l76yy7zefiqrovv6yaaegb4wg4zaq35w7nt3quix5a"; 434 + 435 #[test] 436 fn test_parse_record_path() -> Result<(), Box<dyn std::error::Error>> { 437 let cases = [ 438 ("", vec![]), 439 ("subject", vec![PathPart::Scalar("subject".into())]), 440 ("authorDid", vec![PathPart::Scalar("authorDid".into())]), 441 + ( 442 + "subject.uri", 443 + vec![ 444 + PathPart::Scalar("subject".into()), 445 + PathPart::Scalar("uri".into()), 446 + ], 447 + ), 448 ("members[]", vec![PathPart::Vector("members".into(), None)]), 449 + ( 450 + "add[].key", 451 + vec![ 452 + PathPart::Vector("add".into(), None), 453 + PathPart::Scalar("key".into()), 454 + ], 455 + ), 456 ("a[b]", vec![PathPart::Vector("a".into(), Some("b".into()))]), 457 + ( 458 + "a[b.c]", 459 + vec![PathPart::Vector("a".into(), Some("b.c".into()))], 460 + ), 461 + ( 462 + "facets[app.bsky.richtext.facet].features[app.bsky.richtext.facet#mention].did", 463 + vec![ 464 + PathPart::Vector("facets".into(), Some("app.bsky.richtext.facet".into())), 465 + PathPart::Vector( 466 + "features".into(), 467 + Some("app.bsky.richtext.facet#mention".into()), 468 + ), 469 + PathPart::Scalar("did".into()), 470 + ], 471 + ), 472 ]; 473 474 for (path, expected) in cases { ··· 485 ("strong-ref", json!(""), None), 486 ("strong-ref", json!({}), None), 487 ("strong-ref", json!({ "uri": "abc" }), None), 488 + ("strong-ref", json!({ "cid": TEST_CID }), None), 489 ( 490 "strong-ref", 491 + json!({ "uri": "at://a.com/xx.yy.zz/1", "cid": TEST_CID }), 492 + Some(MatchedRef::AtUri(FullAtUriParts { 493 + repo: "a.com".parse().unwrap(), 494 + collection: "xx.yy.zz".parse().unwrap(), 495 + rkey: "1".parse().unwrap(), 496 + cid: Some(TEST_CID.parse().unwrap()), 497 + })), 498 ), 499 ("at-uri", json!({ "uri": "abc" }), None), 500 ( 501 "at-uri", 502 + json!({ "uri": "at://did:web:y.com/xx.yy.zz/1", "cid": TEST_CID }), 503 + None, 504 ), 505 + ( 506 + "at-uri", 507 + json!("at://did:web:y.com/xx.yy.zz/1"), 508 + Some(MatchedRef::AtUri(FullAtUriParts { 509 + repo: "did:web:y.com".parse().unwrap(), 510 + collection: "xx.yy.zz".parse().unwrap(), 511 + rkey: "1".parse().unwrap(), 512 + cid: None, 513 + })), 514 + ), 515 ("at-uri-parts", json!("abc"), None), 516 ("at-uri-parts", json!({}), None), 517 ( 518 "at-uri-parts", 519 + json!({"repo": "a.com", "collection": "xx.yy.zz", "rkey": "1", "cid": TEST_CID}), 520 + Some(MatchedRef::AtUri(FullAtUriParts { 521 + repo: "a.com".parse().unwrap(), 522 + collection: "xx.yy.zz".parse().unwrap(), 523 + rkey: "1".parse().unwrap(), 524 + cid: Some(TEST_CID.parse().unwrap()), 525 + })), 526 ), 527 ( 528 "at-uri-parts", 529 + json!({"did": "a.com", "collection": "xx.yy.zz", "rkey": "1"}), 530 + Some(MatchedRef::AtUri(FullAtUriParts { 531 + repo: "a.com".parse().unwrap(), 532 + collection: "xx.yy.zz".parse().unwrap(), 533 + rkey: "1".parse().unwrap(), 534 + cid: None, 535 + })), 536 ), 537 ( 538 "at-uri-parts", 539 // 'repo' takes precedence over 'did' 540 + json!({"did": "did:web:a.com", "repo": "z.com", "collection": "xx.yy.zz", "rkey": "1"}), 541 + Some(MatchedRef::AtUri(FullAtUriParts { 542 + repo: "z.com".parse().unwrap(), 543 + collection: "xx.yy.zz".parse().unwrap(), 544 + rkey: "1".parse().unwrap(), 545 + cid: None, 546 + })), 547 ), 548 ( 549 "at-uri-parts", 550 + json!({"repo": "a.com", "collection": "xx.yy.zz", "rkey": "1", "cid": TEST_CID}), 551 + Some(MatchedRef::AtUri(FullAtUriParts { 552 + repo: "a.com".parse().unwrap(), 553 + collection: "xx.yy.zz".parse().unwrap(), 554 + rkey: "1".parse().unwrap(), 555 + cid: Some(TEST_CID.parse().unwrap()), 556 + })), 557 ), 558 ( 559 "at-uri-parts", 560 + json!({"repo": "a.com", "collection": "xx.yy.zz", "rkey": "1", "cid": {}}), 561 + Some(MatchedRef::AtUri(FullAtUriParts { 562 + repo: "a.com".parse().unwrap(), 563 + collection: "xx.yy.zz".parse().unwrap(), 564 + rkey: "1".parse().unwrap(), 565 + cid: None, 566 + })), 567 ), 568 ("did", json!({}), None), 569 ("did", json!(""), None), 570 ("did", json!("bad-example.com"), None), 571 + ( 572 + "did", 573 + json!("did:plc:xyz"), 574 + Some(MatchedRef::Identifier("did:plc:xyz".parse().unwrap())), 575 + ), 576 ("handle", json!({}), None), 577 + ( 578 + "handle", 579 + json!("bad-example.com"), 580 + Some(MatchedRef::Identifier("bad-example.com".parse().unwrap())), 581 + ), 582 ("handle", json!("did:plc:xyz"), None), 583 ("at-identifier", json!({}), None), 584 + ( 585 + "at-identifier", 586 + json!("bad-example.com"), 587 + Some(MatchedRef::Identifier("bad-example.com".parse().unwrap())), 588 + ), 589 + ( 590 + "at-identifier", 591 + json!("did:plc:xyz"), 592 + Some(MatchedRef::Identifier("did:plc:xyz".parse().unwrap())), 593 + ), 594 ]; 595 + for (i, (shape, val, expected)) in cases.into_iter().enumerate() { 596 let s = shape.try_into().unwrap(); 597 let matched = match_shape(s, &val); 598 + assert_eq!(matched, expected, "{i}: shape: {shape:?}, val: {val:?}"); 599 } 600 } 601 }
slingshot/src/record.rs

This file has not been changed.

+433 -138
slingshot/src/server.rs
··· 1 use crate::{ 2 CachedRecord, ErrorResponseObject, Identity, Proxy, Repo, 3 error::{RecordError, ServerError}, 4 - proxy::{extract_links, MatchedRef}, 5 record::RawRecord, 6 }; 7 - use atrium_api::types::string::{Cid, Did, Handle, Nsid, RecordKey}; 8 use foyer::HybridCache; 9 use links::at_uri::parse_at_uri as normalize_at_uri; 10 use serde::Serialize; 11 - use std::{path::PathBuf, str::FromStr, sync::Arc, time::Instant, collections::HashMap}; 12 use tokio::sync::mpsc; 13 use tokio_util::sync::CancellationToken; 14 ··· 24 }; 25 use poem_openapi::{ 26 ApiResponse, ContactObject, ExternalDocumentObject, Object, OpenApi, OpenApiService, Tags, 27 - Union, 28 - param::Query, payload::Json, types::Example, 29 }; 30 31 fn example_handle() -> String { ··· 34 fn example_did() -> String { 35 "did:plc:hdhoaan3xa3jiuq4fg4mefid".to_string() 36 } 37 fn example_collection() -> String { 38 "app.bsky.feed.like".to_string() 39 } 40 fn example_rkey() -> String { 41 "3lv4ouczo2b2a".to_string() 42 } 43 fn example_uri() -> String { 44 format!( 45 ··· 55 "zQ3shpq1g134o7HGDb86CtQFxnHqzx5pZWknrVX2Waum3fF6j".to_string() 56 } 57 58 - #[derive(Object)] 59 #[oai(example = true)] 60 struct XrpcErrorResponseObject { 61 /// Should correspond an error `name` in the lexicon errors array ··· 86 })) 87 } 88 89 - fn bad_request_handler_resolve_mini(err: poem::Error) -> ResolveMiniIDResponse { 90 - ResolveMiniIDResponse::BadRequest(Json(XrpcErrorResponseObject { 91 error: "InvalidRequest".to_string(), 92 message: format!("Bad request, here's some info that maybe should not be exposed: {err}"), 93 })) 94 } 95 96 fn bad_request_handler_proxy_query(err: poem::Error) -> ProxyHydrateResponse { 97 ProxyHydrateResponse::BadRequest(Json(XrpcErrorResponseObject { 98 error: "InvalidRequest".to_string(), ··· 189 190 #[derive(ApiResponse)] 191 #[oai(bad_request_handler = "bad_request_handler_resolve_mini")] 192 - enum ResolveMiniIDResponse { 193 /// Identity resolved 194 #[oai(status = 200)] 195 Ok(Json<MiniDocResponseObject>), ··· 199 } 200 201 #[derive(Object)] 202 - struct ProxyHydrationError { 203 - reason: String, 204 } 205 206 - #[derive(Object)] 207 - struct ProxyHydrationPending { 208 - url: String, 209 } 210 211 #[derive(Object)] 212 - struct ProxyHydrationRecordFound { 213 - record: serde_json::Value, 214 } 215 216 #[derive(Object)] 217 - struct ProxyHydrationIdentifierFound { 218 - record: MiniDocResponseObject, 219 } 220 221 // todo: there's gotta be a supertrait that collects these? 222 - use poem_openapi::types::{Type, ToJSON, ParseFromJSON, IsObjectType}; 223 224 #[derive(Union)] 225 #[oai(discriminator_name = "status", rename_all = "camelCase")] ··· 235 /// The original upstream response content 236 output: serde_json::Value, 237 /// Any hydrated records 238 - records: HashMap<String, Hydration<ProxyHydrationRecordFound>>, 239 /// Any hydrated identifiers 240 - identifiers: HashMap<String, Hydration<ProxyHydrationIdentifierFound>>, 241 } 242 impl Example for ProxyHydrateResponseObject { 243 fn example() -> Self { 244 Self { 245 output: serde_json::json!({}), 246 - records: HashMap::from([ 247 - ("asdf".into(), Hydration::Pending(ProxyHydrationPending { url: "todo".into() })), 248 - ]), 249 identifiers: HashMap::new(), 250 } 251 } ··· 257 #[oai(status = 200)] 258 Ok(Json<ProxyHydrateResponseObject>), 259 #[oai(status = 400)] 260 - BadRequest(XrpcError) 261 } 262 263 #[derive(Object)] ··· 282 xrpc: String, 283 /// The destination service the request will be forwarded to 284 atproto_proxy: String, 285 /// The `params` for the destination service XRPC endpoint 286 /// 287 /// Currently this will be passed along unchecked, but a future version of ··· 290 params: Option<serde_json::Value>, 291 /// Paths within the response to look for at-uris that can be hydrated 292 hydration_sources: Vec<HydrationSource>, 293 - // todo: deadline thing 294 - 295 } 296 impl Example for ProxyQueryPayload { 297 fn example() -> Self { 298 Self { 299 xrpc: "app.bsky.feed.getFeedSkeleton".to_string(), 300 atproto_proxy: "did:web:blue.mackuba.eu#bsky_fg".to_string(), 301 params: Some(serde_json::json!({ 302 "feed": "at://did:plc:oio4hkxaop4ao4wz2pp3f4cr/app.bsky.feed.generator/atproto", 303 })), 304 - hydration_sources: vec![ 305 - HydrationSource { 306 - path: "feed[].post".to_string(), 307 - shape: "at-uri".to_string(), 308 - } 309 - ], 310 } 311 } 312 } ··· 340 } 341 342 struct Xrpc { 343 cache: HybridCache<String, CachedRecord>, 344 identity: Identity, 345 proxy: Arc<Proxy>, ··· 408 /// only retains the most recent version of a record. 409 Query(cid): Query<Option<String>>, 410 ) -> GetRecordResponse { 411 - self.get_record_impl(repo, collection, rkey, cid).await 412 } 413 414 /// blue.microcosm.repo.getRecordByUri ··· 478 return bad_at_uri(); 479 }; 480 481 - // TODO: move this to links 482 - let Some(rest) = normalized.strip_prefix("at://") else { 483 return bad_at_uri(); 484 }; 485 - let Some((repo, rest)) = rest.split_once('/') else { 486 - return bad_at_uri(); 487 - }; 488 - let Some((collection, rest)) = rest.split_once('/') else { 489 - return bad_at_uri(); 490 - }; 491 - let rkey = if let Some((rkey, _rest)) = rest.split_once('?') { 492 - rkey 493 - } else { 494 - rest 495 - }; 496 497 self.get_record_impl( 498 - repo.to_string(), 499 - collection.to_string(), 500 - rkey.to_string(), 501 - cid, 502 ) 503 .await 504 } ··· 578 /// Handle or DID to resolve 579 #[oai(example = "example_handle")] 580 Query(identifier): Query<String>, 581 - ) -> ResolveMiniIDResponse { 582 self.resolve_mini_id(Query(identifier)).await 583 } 584 ··· 596 /// Handle or DID to resolve 597 #[oai(example = "example_handle")] 598 Query(identifier): Query<String>, 599 - ) -> ResolveMiniIDResponse { 600 let invalid = |reason: &'static str| { 601 - ResolveMiniIDResponse::BadRequest(xrpc_error("InvalidRequest", reason)) 602 }; 603 604 let mut unverified_handle = None; 605 - let did = match Did::new(identifier.clone()) { 606 Ok(did) => did, 607 Err(_) => { 608 let Ok(alleged_handle) = Handle::new(identifier.to_lowercase()) else { 609 return invalid("Identifier was not a valid DID or handle"); 610 }; 611 612 - match self.identity.handle_to_did(alleged_handle.clone()).await { 613 Ok(res) => { 614 if let Some(did) = res { 615 // we did it joe ··· 627 } 628 } 629 }; 630 - let Ok(partial_doc) = self.identity.did_to_partial_mini_doc(&did).await else { 631 return invalid("Failed to get DID doc"); 632 }; 633 let Some(partial_doc) = partial_doc else { ··· 647 "handle.invalid".to_string() 648 } 649 } else { 650 - let Ok(handle_did) = self 651 - .identity 652 .handle_to_did(partial_doc.unverified_handle.clone()) 653 .await 654 else { ··· 664 } 665 }; 666 667 - ResolveMiniIDResponse::Ok(Json(MiniDocResponseObject { 668 did: did.to_string(), 669 handle, 670 pds: partial_doc.pds, ··· 672 })) 673 } 674 675 /// com.bad-example.proxy.hydrateQueryResponse 676 /// 677 /// > [!important] ··· 687 &self, 688 Json(payload): Json<ProxyQueryPayload>, 689 ) -> ProxyHydrateResponse { 690 - // TODO: the Accept request header, if present, gotta be json 691 - // TODO: find any Authorization header and verify it. TBD about `aud`. 692 - 693 let params = if let Some(p) = payload.params { 694 let serde_json::Value::Object(map) = p else { 695 panic!("params have to be an object"); 696 }; 697 Some(map) 698 - } else { None }; 699 700 - match self.proxy.proxy( 701 - payload.xrpc, 702 - payload.atproto_proxy, 703 - params, 704 - ).await { 705 Ok(skeleton) => { 706 let links = match extract_links(payload.hydration_sources, &skeleton) { 707 Ok(l) => l, 708 Err(e) => { 709 log::warn!("problem extracting: {e:?}"); 710 - return ProxyHydrateResponse::BadRequest(xrpc_error("oop", "sorry, error extracting")) 711 } 712 }; 713 let mut records = HashMap::new(); 714 let mut identifiers = HashMap::new(); 715 716 enum GetThing { 717 - Record(String, Hydration<ProxyHydrationRecordFound>), 718 - Identifier(String, Hydration<ProxyHydrationIdentifierFound>), 719 } 720 721 let (tx, mut rx) = mpsc::channel(1); 722 723 - for link in links { 724 match link { 725 - MatchedRef::AtUri { uri, cid } => { 726 - if records.contains_key(&uri) { 727 log::warn!("skipping duplicate record without checking cid"); 728 continue; 729 } 730 - let mut u = url::Url::parse("https://example.com").unwrap(); 731 - u.query_pairs_mut().append_pair("at_uri", &uri); // BLEH todo 732 - records.insert(uri.clone(), Hydration::Pending(ProxyHydrationPending { 733 - url: format!("/xrpc/blue.microcosm.repo.getRecordByUri?{}", u.query().unwrap()), // TODO better; with cid, etc. 734 - })); 735 let tx = tx.clone(); 736 let identity = self.identity.clone(); 737 let repo = self.repo.clone(); 738 tokio::task::spawn(async move { 739 - let rest = uri.strip_prefix("at://").unwrap(); 740 - let (identifier, rest) = rest.split_once('/').unwrap(); 741 - let (collection, rkey) = rest.split_once('/').unwrap(); 742 - 743 - let did = if identifier.starts_with("did:") { 744 - Did::new(identifier.to_string()).unwrap() 745 - } else { 746 - let handle = Handle::new(identifier.to_string()).unwrap(); 747 - identity.handle_to_did(handle).await.unwrap().unwrap() 748 }; 749 750 - let res = match repo.get_record( 751 - &did, 752 - &Nsid::new(collection.to_string()).unwrap(), 753 - &RecordKey::new(rkey.to_string()).unwrap(), 754 - &cid.as_ref().map(|s| Cid::from_str(s).unwrap()), 755 - ).await { 756 - Ok(CachedRecord::Deleted) => 757 - Hydration::Error(ProxyHydrationError { 758 - reason: "record deleted".to_string(), 759 - }), 760 - Ok(CachedRecord::Found(RawRecord { cid: found_cid, record })) => { 761 - if let Some(c) = cid && found_cid.as_ref().to_string() != c { 762 - log::warn!("ignoring cid mismatch"); 763 } 764 - let value = serde_json::from_str(&record).unwrap(); 765 - Hydration::Found(ProxyHydrationRecordFound { 766 - record: value, 767 - }) 768 - } 769 - Err(e) => { 770 - log::warn!("finally oop {e:?}"); 771 - Hydration::Error(ProxyHydrationError { 772 - reason: "failed to fetch record".to_string(), 773 - }) 774 - } 775 - }; 776 - tx.send(GetThing::Record(uri, res)).await 777 }); 778 } 779 MatchedRef::Identifier(id) => { 780 - if identifiers.contains_key(&id) { 781 continue; 782 } 783 - let mut u = url::Url::parse("https://example.com").unwrap(); 784 - u.query_pairs_mut().append_pair("identifier", &id); 785 - identifiers.insert(id, Hydration::Pending(ProxyHydrationPending { 786 - url: format!("/xrpc/blue.microcosm.identity.resolveMiniDoc?{}", u.query().unwrap()), // gross 787 - })); 788 - let tx = tx.clone(); 789 - // let doc_fut = self.resolve_mini_doc(); 790 - tokio::task::spawn(async { 791 792 }); 793 } 794 } ··· 797 // (we shoudl be doing a timeout...) 798 drop(tx); 799 800 - while let Some(hydration) = rx.recv().await { 801 - match hydration { 802 - GetThing::Record(uri, h) => { records.insert(uri, h); } 803 - GetThing::Identifier(uri, md) => { identifiers.insert(uri, md); } 804 - }; 805 } 806 807 ProxyHydrateResponse::Ok(Json(ProxyHydrateResponseObject { ··· 815 ProxyHydrateResponse::BadRequest(xrpc_error("oop", "sorry")) 816 } 817 } 818 - 819 } 820 821 async fn get_record_impl( 822 &self, 823 - repo: String, 824 - collection: String, 825 - rkey: String, 826 - cid: Option<String>, 827 ) -> GetRecordResponse { 828 - let did = match Did::new(repo.clone()) { 829 Ok(did) => did, 830 Err(_) => { 831 let Ok(handle) = Handle::new(repo.to_lowercase()) else { ··· 856 } 857 }; 858 859 - let Ok(collection) = Nsid::new(collection) else { 860 return GetRecordResponse::BadRequest(xrpc_error( 861 "InvalidRequest", 862 "Invalid NSID for collection", 863 )); 864 }; 865 866 - let Ok(rkey) = RecordKey::new(rkey) else { 867 return GetRecordResponse::BadRequest(xrpc_error("InvalidRequest", "Invalid rkey")); 868 }; 869 870 let cid: Option<Cid> = if let Some(cid) = cid { 871 - let Ok(cid) = Cid::from_str(&cid) else { 872 return GetRecordResponse::BadRequest(xrpc_error("InvalidRequest", "Invalid CID")); 873 }; 874 Some(cid) ··· 1017 identity: Identity, 1018 repo: Repo, 1019 proxy: Proxy, 1020 acme_domain: Option<String>, 1021 acme_contact: Option<String>, 1022 acme_cache_path: Option<PathBuf>, ··· 1028 let proxy = Arc::new(proxy); 1029 let api_service = OpenApiService::new( 1030 Xrpc { 1031 cache, 1032 identity, 1033 proxy,
··· 1 use crate::{ 2 CachedRecord, ErrorResponseObject, Identity, Proxy, Repo, 3 error::{RecordError, ServerError}, 4 + proxy::{FullAtUriParts, MatchedRef, extract_links, split_uri}, 5 record::RawRecord, 6 }; 7 + use atrium_api::types::string::{AtIdentifier, Cid, Did, Handle, Nsid, RecordKey}; 8 use foyer::HybridCache; 9 use links::at_uri::parse_at_uri as normalize_at_uri; 10 use serde::Serialize; 11 + use std::{collections::HashMap, path::PathBuf, str::FromStr, sync::Arc, time::Instant}; 12 use tokio::sync::mpsc; 13 use tokio_util::sync::CancellationToken; 14 ··· 24 }; 25 use poem_openapi::{ 26 ApiResponse, ContactObject, ExternalDocumentObject, Object, OpenApi, OpenApiService, Tags, 27 + Union, param::Query, payload::Json, types::Example, 28 }; 29 30 fn example_handle() -> String { ··· 33 fn example_did() -> String { 34 "did:plc:hdhoaan3xa3jiuq4fg4mefid".to_string() 35 } 36 + fn example_service_did() -> String { 37 + "did:web:constellation.microcosm.blue".to_string() 38 + } 39 fn example_collection() -> String { 40 "app.bsky.feed.like".to_string() 41 } 42 fn example_rkey() -> String { 43 "3lv4ouczo2b2a".to_string() 44 } 45 + fn example_id_fragment() -> String { 46 + "#constellation".to_string() 47 + } 48 fn example_uri() -> String { 49 format!( 50 ··· 60 "zQ3shpq1g134o7HGDb86CtQFxnHqzx5pZWknrVX2Waum3fF6j".to_string() 61 } 62 63 + #[derive(Debug, Object)] 64 #[oai(example = true)] 65 struct XrpcErrorResponseObject { 66 /// Should correspond an error `name` in the lexicon errors array ··· 91 })) 92 } 93 94 + fn bad_request_handler_resolve_mini(err: poem::Error) -> ResolveMiniDocResponse { 95 + ResolveMiniDocResponse::BadRequest(Json(XrpcErrorResponseObject { 96 error: "InvalidRequest".to_string(), 97 message: format!("Bad request, here's some info that maybe should not be exposed: {err}"), 98 })) 99 } 100 101 + fn bad_request_handler_resolve_service(err: poem::Error) -> ResolveServiceResponse { 102 + ResolveServiceResponse::BadRequest(Json(XrpcErrorResponseObject { 103 + error: "InvalidRequest".to_string(), 104 + message: format!("Bad request, here's some info that maybe should not be exposed: {err}"), 105 + })) 106 + } 107 + 108 fn bad_request_handler_proxy_query(err: poem::Error) -> ProxyHydrateResponse { 109 ProxyHydrateResponse::BadRequest(Json(XrpcErrorResponseObject { 110 error: "InvalidRequest".to_string(), ··· 201 202 #[derive(ApiResponse)] 203 #[oai(bad_request_handler = "bad_request_handler_resolve_mini")] 204 + enum ResolveMiniDocResponse { 205 /// Identity resolved 206 #[oai(status = 200)] 207 Ok(Json<MiniDocResponseObject>), ··· 211 } 212 213 #[derive(Object)] 214 + #[oai(example = true)] 215 + struct ServiceResponseObject { 216 + /// The service endpoint URL, if found 217 + endpoint: String, 218 } 219 + impl Example for ServiceResponseObject { 220 + fn example() -> Self { 221 + Self { 222 + endpoint: "https://example.com".to_string(), 223 + } 224 + } 225 + } 226 227 + #[derive(ApiResponse)] 228 + #[oai(bad_request_handler = "bad_request_handler_resolve_service")] 229 + enum ResolveServiceResponse { 230 + /// Service resolved 231 + #[oai(status = 200)] 232 + Ok(Json<ServiceResponseObject>), 233 + /// Bad request or service not resolved 234 + #[oai(status = 400)] 235 + BadRequest(XrpcError), 236 } 237 238 #[derive(Object)] 239 + #[oai(rename_all = "camelCase")] 240 + struct ProxyHydrationError { 241 + /// Short description of why the hydration failed 242 + reason: String, 243 + /// Whether or not it's recommended to retry requesting this item 244 + should_retry: bool, 245 + /// URL to follow up at if retrying 246 + follow_up: String, 247 } 248 249 #[derive(Object)] 250 + #[oai(rename_all = "camelCase")] 251 + struct ProxyHydrationPending { 252 + /// URL you can request to finish hydrating this item 253 + follow_up: String, 254 + /// Why this item couldn't be hydrated: 'deadline' or 'limit' 255 + /// 256 + /// - `deadline`: the item fetch didn't complete before the response was 257 + /// due, but will continue on slingshot in the background -- `followUp` 258 + /// requests are coalesced into the original item fetch to be available as 259 + /// early as possible. 260 + /// 261 + /// - `limit`: slingshot only attempts to hydrate the first 100 items found 262 + /// in a proxied response, with the remaining marked `pending`. You can 263 + /// request `followUp` to fetch them. 264 + /// 265 + /// In the future, Slingshot may put pending links after `limit` into a low- 266 + /// priority fetch queue, so that these items become available sooner on 267 + /// follow-up request as well. 268 + reason: String, 269 } 270 271 // todo: there's gotta be a supertrait that collects these? 272 + use poem_openapi::types::{IsObjectType, ParseFromJSON, ToJSON, Type}; 273 274 #[derive(Union)] 275 #[oai(discriminator_name = "status", rename_all = "camelCase")] ··· 285 /// The original upstream response content 286 output: serde_json::Value, 287 /// Any hydrated records 288 + records: HashMap<String, Hydration<FoundRecordResponseObject>>, 289 /// Any hydrated identifiers 290 + identifiers: HashMap<String, Hydration<MiniDocResponseObject>>, 291 } 292 impl Example for ProxyHydrateResponseObject { 293 fn example() -> Self { 294 Self { 295 output: serde_json::json!({}), 296 + records: HashMap::from([( 297 + "asdf".into(), 298 + Hydration::Pending(ProxyHydrationPending { 299 + follow_up: "/xrpc/com.atproto.repo.getRecord?...".to_string(), 300 + reason: "deadline".to_string(), 301 + }), 302 + )]), 303 identifiers: HashMap::new(), 304 } 305 } ··· 311 #[oai(status = 200)] 312 Ok(Json<ProxyHydrateResponseObject>), 313 #[oai(status = 400)] 314 + BadRequest(XrpcError), 315 } 316 317 #[derive(Object)] ··· 336 xrpc: String, 337 /// The destination service the request will be forwarded to 338 atproto_proxy: String, 339 + /// An optional auth token to pass on 340 + /// 341 + /// the `aud` field must match the upstream atproto_proxy service 342 + authorization: Option<String>, 343 + /// An optional set of labelers to request be applied by the upstream 344 + atproto_accept_labelers: Option<String>, 345 /// The `params` for the destination service XRPC endpoint 346 /// 347 /// Currently this will be passed along unchecked, but a future version of ··· 350 params: Option<serde_json::Value>, 351 /// Paths within the response to look for at-uris that can be hydrated 352 hydration_sources: Vec<HydrationSource>, 353 + // todo: let clients pass a hydration deadline? 354 } 355 impl Example for ProxyQueryPayload { 356 fn example() -> Self { 357 Self { 358 xrpc: "app.bsky.feed.getFeedSkeleton".to_string(), 359 atproto_proxy: "did:web:blue.mackuba.eu#bsky_fg".to_string(), 360 + authorization: None, 361 + atproto_accept_labelers: None, 362 params: Some(serde_json::json!({ 363 "feed": "at://did:plc:oio4hkxaop4ao4wz2pp3f4cr/app.bsky.feed.generator/atproto", 364 })), 365 + hydration_sources: vec![HydrationSource { 366 + path: "feed[].post".to_string(), 367 + shape: "at-uri".to_string(), 368 + }], 369 } 370 } 371 } ··· 399 } 400 401 struct Xrpc { 402 + base_url: url::Url, 403 cache: HybridCache<String, CachedRecord>, 404 identity: Identity, 405 proxy: Arc<Proxy>, ··· 468 /// only retains the most recent version of a record. 469 Query(cid): Query<Option<String>>, 470 ) -> GetRecordResponse { 471 + self.get_record_impl(&repo, &collection, &rkey, cid.as_deref()) 472 + .await 473 } 474 475 /// blue.microcosm.repo.getRecordByUri ··· 539 return bad_at_uri(); 540 }; 541 542 + let Some((repo, collection, rkey)) = split_uri(&normalized) else { 543 return bad_at_uri(); 544 }; 545 546 self.get_record_impl( 547 + Into::<String>::into(repo).as_str(), 548 + collection.as_str(), 549 + rkey.as_str(), 550 + cid.as_deref(), 551 ) 552 .await 553 } ··· 627 /// Handle or DID to resolve 628 #[oai(example = "example_handle")] 629 Query(identifier): Query<String>, 630 + ) -> ResolveMiniDocResponse { 631 self.resolve_mini_id(Query(identifier)).await 632 } 633 ··· 645 /// Handle or DID to resolve 646 #[oai(example = "example_handle")] 647 Query(identifier): Query<String>, 648 + ) -> ResolveMiniDocResponse { 649 + Self::resolve_mini_doc_impl(&identifier, self.identity.clone()).await 650 + } 651 + 652 + async fn resolve_mini_doc_impl(identifier: &str, identity: Identity) -> ResolveMiniDocResponse { 653 let invalid = |reason: &'static str| { 654 + ResolveMiniDocResponse::BadRequest(xrpc_error("InvalidRequest", reason)) 655 }; 656 657 let mut unverified_handle = None; 658 + let did = match Did::new(identifier.to_string()) { 659 Ok(did) => did, 660 Err(_) => { 661 let Ok(alleged_handle) = Handle::new(identifier.to_lowercase()) else { 662 return invalid("Identifier was not a valid DID or handle"); 663 }; 664 665 + match identity.handle_to_did(alleged_handle.clone()).await { 666 Ok(res) => { 667 if let Some(did) = res { 668 // we did it joe ··· 680 } 681 } 682 }; 683 + let Ok(partial_doc) = identity.did_to_partial_mini_doc(&did).await else { 684 return invalid("Failed to get DID doc"); 685 }; 686 let Some(partial_doc) = partial_doc else { ··· 700 "handle.invalid".to_string() 701 } 702 } else { 703 + let Ok(handle_did) = identity 704 .handle_to_did(partial_doc.unverified_handle.clone()) 705 .await 706 else { ··· 716 } 717 }; 718 719 + ResolveMiniDocResponse::Ok(Json(MiniDocResponseObject { 720 did: did.to_string(), 721 handle, 722 pds: partial_doc.pds, ··· 724 })) 725 } 726 727 + /// com.bad-example.identity.resolveService 728 + /// 729 + /// resolve an atproto service did + id to its http endpoint 730 + /// 731 + /// > [!important] 732 + /// > this endpoint is experimental and may change 733 + #[oai( 734 + path = "/com.bad-example.identity.resolveService", 735 + method = "get", 736 + tag = "ApiTags::Custom" 737 + )] 738 + async fn resolve_service( 739 + &self, 740 + /// the service's did 741 + #[oai(example = "example_service_did")] 742 + Query(did): Query<String>, 743 + /// id fragment, starting with '#' 744 + /// 745 + /// must be url-encoded! 746 + #[oai(example = "example_id_fragment")] 747 + Query(id): Query<String>, 748 + /// optionally, the exact service type to filter 749 + /// 750 + /// resolving a pds requires matching the type as well as id. service 751 + /// proxying ignores the type. 752 + Query(r#type): Query<Option<String>>, 753 + ) -> ResolveServiceResponse { 754 + let Ok(did) = Did::new(did) else { 755 + return ResolveServiceResponse::BadRequest(xrpc_error( 756 + "InvalidRequest", 757 + "could not parse 'did' into a DID", 758 + )); 759 + }; 760 + let identity = self.identity.clone(); 761 + Self::resolve_service_impl(&did, &id, r#type.as_deref(), identity).await 762 + } 763 + 764 + async fn resolve_service_impl( 765 + did: &Did, 766 + id_fragment: &str, 767 + service_type: Option<&str>, 768 + identity: Identity, 769 + ) -> ResolveServiceResponse { 770 + let invalid = |reason: &'static str| { 771 + ResolveServiceResponse::BadRequest(xrpc_error("InvalidRequest", reason)) 772 + }; 773 + let Ok(service_mini_doc) = identity.did_to_mini_service_doc(did).await else { 774 + return invalid("Failed to get DID doc"); 775 + }; 776 + let Some(service_mini_doc) = service_mini_doc else { 777 + return invalid("Failed to find DID doc"); 778 + }; 779 + 780 + let Some(matching) = service_mini_doc.get(id_fragment, service_type) else { 781 + return invalid("failed to match identity (and maybe type)"); 782 + }; 783 + 784 + ResolveServiceResponse::Ok(Json(ServiceResponseObject { 785 + endpoint: matching.endpoint.clone(), 786 + })) 787 + } 788 + 789 /// com.bad-example.proxy.hydrateQueryResponse 790 /// 791 /// > [!important] ··· 801 &self, 802 Json(payload): Json<ProxyQueryPayload>, 803 ) -> ProxyHydrateResponse { 804 let params = if let Some(p) = payload.params { 805 let serde_json::Value::Object(map) = p else { 806 panic!("params have to be an object"); 807 }; 808 Some(map) 809 + } else { 810 + None 811 + }; 812 813 + let Some((service_did, id_fragment)) = payload.atproto_proxy.rsplit_once("#") else { 814 + return ProxyHydrateResponse::BadRequest(xrpc_error( 815 + "BadParameter", 816 + "atproto_proxy could not be understood", 817 + )); 818 + }; 819 + 820 + let Ok(service_did) = service_did.parse() else { 821 + return ProxyHydrateResponse::BadRequest(xrpc_error( 822 + "BadParameter", 823 + "atproto_proxy service did could not be parsed", 824 + )); 825 + }; 826 + 827 + let Ok(xrpc) = payload.xrpc.parse() else { 828 + return ProxyHydrateResponse::BadRequest(xrpc_error( 829 + "BadParameter", 830 + "invalid NSID for xrpc param", 831 + )); 832 + }; 833 + 834 + match self 835 + .proxy 836 + .proxy( 837 + &service_did, 838 + &format!("#{id_fragment}"), 839 + &xrpc, 840 + payload.authorization.as_deref(), 841 + payload.atproto_accept_labelers.as_deref(), 842 + params, 843 + ) 844 + .await 845 + { 846 Ok(skeleton) => { 847 let links = match extract_links(payload.hydration_sources, &skeleton) { 848 Ok(l) => l, 849 Err(e) => { 850 log::warn!("problem extracting: {e:?}"); 851 + return ProxyHydrateResponse::BadRequest(xrpc_error( 852 + "oop", 853 + "sorry, error extracting", 854 + )); 855 } 856 }; 857 let mut records = HashMap::new(); 858 let mut identifiers = HashMap::new(); 859 860 enum GetThing { 861 + Record(String, Hydration<FoundRecordResponseObject>), 862 + Identifier(String, Hydration<MiniDocResponseObject>), 863 } 864 865 let (tx, mut rx) = mpsc::channel(1); 866 867 + let t0 = Instant::now(); 868 + 869 + for (i, link) in links.into_iter().enumerate() { 870 match link { 871 + MatchedRef::AtUri(parts) => { 872 + let non_canonical_url = parts.to_uri(); 873 + if records.contains_key(&non_canonical_url) { 874 log::warn!("skipping duplicate record without checking cid"); 875 continue; 876 } 877 + let mut follow_up = self.base_url.clone(); 878 + follow_up.set_path("/xrpc/com.atproto.repo.getRecord"); 879 + follow_up 880 + .query_pairs_mut() 881 + .append_pair("repo", &Into::<String>::into(parts.repo.clone())) 882 + .append_pair("collection", parts.collection.as_str()) 883 + .append_pair("rkey", parts.rkey.as_str()); 884 + if let Some(ref cid) = parts.cid { 885 + follow_up 886 + .query_pairs_mut() 887 + .append_pair("cid", &cid.as_ref().to_string()); 888 + } 889 + 890 + if i >= 100 { 891 + records.insert( 892 + non_canonical_url.clone(), 893 + Hydration::Pending(ProxyHydrationPending { 894 + reason: "limit".to_string(), 895 + follow_up: follow_up.to_string(), 896 + }), 897 + ); 898 + continue; 899 + } else { 900 + records.insert( 901 + non_canonical_url.clone(), 902 + Hydration::Pending(ProxyHydrationPending { 903 + reason: "deadline".to_string(), 904 + follow_up: follow_up.to_string(), 905 + }), 906 + ); 907 + } 908 + 909 let tx = tx.clone(); 910 let identity = self.identity.clone(); 911 let repo = self.repo.clone(); 912 tokio::task::spawn(async move { 913 + let FullAtUriParts { 914 + repo: ident, 915 + collection, 916 + rkey, 917 + cid, 918 + } = parts; 919 + let did = match ident { 920 + AtIdentifier::Did(did) => did, 921 + AtIdentifier::Handle(handle) => { 922 + let Ok(Some(did)) = identity.handle_to_did(handle).await 923 + else { 924 + let res = Hydration::Error(ProxyHydrationError { 925 + reason: "could not resolve handle".to_string(), 926 + should_retry: true, 927 + follow_up: follow_up.to_string(), 928 + }); 929 + return if tx 930 + .send(GetThing::Record(non_canonical_url, res)) 931 + .await 932 + .is_ok() 933 + { 934 + metrics::counter!("slingshot_hydrated_one", "type" => "record", "ontime" => "true").increment(1); 935 + } else { 936 + metrics::counter!("slingshot_hydrated_one", "type" => "record", "ontime" => "false").increment(1); 937 + }; 938 + }; 939 + did 940 + } 941 }; 942 943 + let res = 944 + match repo.get_record(&did, &collection, &rkey, &cid).await { 945 + Ok(CachedRecord::Deleted) => { 946 + Hydration::Error(ProxyHydrationError { 947 + reason: "record deleted".to_string(), 948 + should_retry: false, 949 + follow_up: follow_up.to_string(), 950 + }) 951 } 952 + Ok(CachedRecord::Found(RawRecord { 953 + cid: found_cid, 954 + record, 955 + })) => { 956 + if cid 957 + .as_ref() 958 + .map(|expected| *expected != found_cid) 959 + .unwrap_or(false) 960 + { 961 + Hydration::Error(ProxyHydrationError { 962 + reason: "not found".to_string(), 963 + should_retry: false, 964 + follow_up: follow_up.to_string(), 965 + }) 966 + } else if let Ok(value) = serde_json::from_str(&record) 967 + { 968 + let canonical_uri = FullAtUriParts { 969 + repo: AtIdentifier::Did(did), 970 + collection, 971 + rkey, 972 + cid: None, // not used for .to_uri 973 + } 974 + .to_uri(); 975 + Hydration::Found(FoundRecordResponseObject { 976 + cid: Some(found_cid.as_ref().to_string()), 977 + uri: canonical_uri, 978 + value, 979 + }) 980 + } else { 981 + Hydration::Error(ProxyHydrationError { 982 + reason: "could not parse upstream response" 983 + .to_string(), 984 + should_retry: false, 985 + follow_up: follow_up.to_string(), 986 + }) 987 + } 988 + } 989 + Err(e) => { 990 + log::warn!("finally oop {e:?}"); 991 + Hydration::Error(ProxyHydrationError { 992 + reason: "failed to fetch record".to_string(), 993 + should_retry: true, // TODO 994 + follow_up: follow_up.to_string(), 995 + }) 996 + } 997 + }; 998 + if tx 999 + .send(GetThing::Record(non_canonical_url, res)) 1000 + .await 1001 + .is_ok() 1002 + { 1003 + metrics::counter!("slingshot_hydrated_one", "type" => "record", "ontime" => "true").increment(1); 1004 + } else { 1005 + metrics::counter!("slingshot_hydrated_one", "type" => "record", "ontime" => "false").increment(1); 1006 + } 1007 }); 1008 } 1009 MatchedRef::Identifier(id) => { 1010 + let identifier: String = id.clone().into(); 1011 + if identifiers.contains_key(&identifier) { 1012 continue; 1013 } 1014 1015 + let mut follow_up = self.base_url.clone(); 1016 + follow_up.set_path("/xrpc/blue.microcosm.identity.resolveMiniDoc"); 1017 + 1018 + follow_up 1019 + .query_pairs_mut() 1020 + .append_pair("identifier", &identifier); 1021 + 1022 + if i >= 100 { 1023 + identifiers.insert( 1024 + identifier.clone(), 1025 + Hydration::Pending(ProxyHydrationPending { 1026 + reason: "limit".to_string(), 1027 + follow_up: follow_up.to_string(), 1028 + }), 1029 + ); 1030 + continue; 1031 + } else { 1032 + identifiers.insert( 1033 + identifier.clone(), 1034 + Hydration::Pending(ProxyHydrationPending { 1035 + reason: "deadline".to_string(), 1036 + follow_up: follow_up.to_string(), 1037 + }), 1038 + ); 1039 + } 1040 + 1041 + let tx = tx.clone(); 1042 + let identity = self.identity.clone(); 1043 + tokio::task::spawn(async move { 1044 + let res = match Self::resolve_mini_doc_impl(&identifier, identity) 1045 + .await 1046 + { 1047 + ResolveMiniDocResponse::Ok(Json(mini_doc)) => { 1048 + Hydration::Found(mini_doc) 1049 + } 1050 + ResolveMiniDocResponse::BadRequest(e) => { 1051 + log::warn!("minidoc fail: {:?}", e.0); 1052 + Hydration::Error(ProxyHydrationError { 1053 + reason: "failed to resolve mini doc".to_string(), 1054 + should_retry: false, 1055 + follow_up: follow_up.to_string(), 1056 + }) 1057 + } 1058 + }; 1059 + if tx.send(GetThing::Identifier(identifier, res)).await.is_ok() { 1060 + metrics::counter!("slingshot_hydrated_one", "type" => "identity", "ontime" => "true").increment(1); 1061 + } else { 1062 + metrics::counter!("slingshot_hydrated_one", "type" => "identity", "ontime" => "false").increment(1); 1063 + } 1064 }); 1065 } 1066 } ··· 1069 // (we shoudl be doing a timeout...) 1070 drop(tx); 1071 1072 + let deadline = t0 + std::time::Duration::from_secs_f64(1.6); 1073 + let res = tokio::time::timeout_at(deadline.into(), async { 1074 + while let Some(hydration) = rx.recv().await { 1075 + match hydration { 1076 + GetThing::Record(uri, h) => { 1077 + if let Some(r) = records.get_mut(&uri) { 1078 + match (&r, &h) { 1079 + (_, Hydration::Found(_)) => *r = h, // always replace if found 1080 + (Hydration::Pending(_), _) => *r = h, // or if it was pending 1081 + _ => {} // else leave it 1082 + } 1083 + } else { 1084 + records.insert(uri, h); 1085 + } 1086 + } 1087 + GetThing::Identifier(identifier, md) => { 1088 + identifiers.insert(identifier.to_string(), md); 1089 + } 1090 + }; 1091 + } 1092 + }) 1093 + .await; 1094 + 1095 + if res.is_ok() { 1096 + metrics::histogram!("slingshot_hydration_all_completed").record(t0.elapsed()); 1097 + } else { 1098 + metrics::counter!("slingshot_hydration_cut_off").increment(1); 1099 } 1100 1101 ProxyHydrateResponse::Ok(Json(ProxyHydrateResponseObject { ··· 1109 ProxyHydrateResponse::BadRequest(xrpc_error("oop", "sorry")) 1110 } 1111 } 1112 } 1113 1114 async fn get_record_impl( 1115 &self, 1116 + repo: &str, 1117 + collection: &str, 1118 + rkey: &str, 1119 + cid: Option<&str>, 1120 ) -> GetRecordResponse { 1121 + let did = match Did::new(repo.to_string()) { 1122 Ok(did) => did, 1123 Err(_) => { 1124 let Ok(handle) = Handle::new(repo.to_lowercase()) else { ··· 1149 } 1150 }; 1151 1152 + let Ok(collection) = Nsid::new(collection.to_string()) else { 1153 return GetRecordResponse::BadRequest(xrpc_error( 1154 "InvalidRequest", 1155 "Invalid NSID for collection", 1156 )); 1157 }; 1158 1159 + let Ok(rkey) = RecordKey::new(rkey.to_string()) else { 1160 return GetRecordResponse::BadRequest(xrpc_error("InvalidRequest", "Invalid rkey")); 1161 }; 1162 1163 let cid: Option<Cid> = if let Some(cid) = cid { 1164 + let Ok(cid) = Cid::from_str(cid) else { 1165 return GetRecordResponse::BadRequest(xrpc_error("InvalidRequest", "Invalid CID")); 1166 }; 1167 Some(cid) ··· 1310 identity: Identity, 1311 repo: Repo, 1312 proxy: Proxy, 1313 + base_url: url::Url, 1314 acme_domain: Option<String>, 1315 acme_contact: Option<String>, 1316 acme_cache_path: Option<PathBuf>, ··· 1322 let proxy = Arc::new(proxy); 1323 let api_service = OpenApiService::new( 1324 Xrpc { 1325 + base_url, 1326 cache, 1327 identity, 1328 proxy,
+1 -7
constellation/src/bin/main.rs
··· 45 #[arg(short, long)] 46 #[clap(value_enum, default_value_t = StorageBackend::Memory)] 47 backend: StorageBackend, 48 - /// Serve a did:web document for this domain 49 - #[arg(long)] 50 - did_web_domain: Option<String>, 51 /// Initiate a database backup into this dir, if supported by the storage 52 #[arg(long)] 53 backup: Option<PathBuf>, ··· 106 MemStorage::new(), 107 fixture, 108 None, 109 - args.did_web_domain, 110 stream, 111 bind, 112 metrics_bind, ··· 142 rocks, 143 fixture, 144 args.data, 145 - args.did_web_domain, 146 stream, 147 bind, 148 metrics_bind, ··· 164 mut storage: impl LinkStorage, 165 fixture: Option<PathBuf>, 166 data_dir: Option<PathBuf>, 167 - did_web_domain: Option<String>, 168 stream: String, 169 bind: SocketAddr, 170 metrics_bind: SocketAddr, ··· 217 if collect_metrics { 218 install_metrics_server(metrics_bind)?; 219 } 220 - serve(readable, bind, did_web_domain, staying_alive).await 221 }) 222 .unwrap(); 223 stay_alive.drop_guard();
··· 45 #[arg(short, long)] 46 #[clap(value_enum, default_value_t = StorageBackend::Memory)] 47 backend: StorageBackend, 48 /// Initiate a database backup into this dir, if supported by the storage 49 #[arg(long)] 50 backup: Option<PathBuf>, ··· 103 MemStorage::new(), 104 fixture, 105 None, 106 stream, 107 bind, 108 metrics_bind, ··· 138 rocks, 139 fixture, 140 args.data, 141 stream, 142 bind, 143 metrics_bind, ··· 159 mut storage: impl LinkStorage, 160 fixture: Option<PathBuf>, 161 data_dir: Option<PathBuf>, 162 stream: String, 163 bind: SocketAddr, 164 metrics_bind: SocketAddr, ··· 211 if collect_metrics { 212 install_metrics_server(metrics_bind)?; 213 } 214 + serve(readable, bind, staying_alive).await 215 }) 216 .unwrap(); 217 stay_alive.drop_guard();
+7 -32
constellation/src/server/mod.rs
··· 3 extract::{Query, Request}, 4 http::{self, header}, 5 middleware::{self, Next}, 6 - response::{IntoResponse, Json, Response}, 7 routing::get, 8 Router, 9 }; ··· 37 http::StatusCode::INTERNAL_SERVER_ERROR 38 } 39 40 - pub async fn serve<S: LinkReader, A: ToSocketAddrs>( 41 - store: S, 42 - addr: A, 43 - did_web_domain: Option<String>, 44 - stay_alive: CancellationToken, 45 - ) -> anyhow::Result<()> { 46 - let mut app = Router::new(); 47 - 48 - if let Some(d) = did_web_domain { 49 - app = app.route( 50 - "/.well-known/did.json", 51 - get({ 52 - let domain = d.clone(); 53 - move || did_web(domain) 54 - }), 55 - ) 56 - } 57 - 58 - let app = app 59 .route("/robots.txt", get(robots)) 60 .route( 61 "/", ··· 217 User-agent: * 218 Disallow: /links 219 Disallow: /links/ 220 - Disallow: /xrpc/ 221 " 222 - } 223 - 224 - async fn did_web(domain: String) -> impl IntoResponse { 225 - Json(serde_json::json!({ 226 - "id": format!("did:web:{domain}"), 227 - "service": [{ 228 - "id": "#constellation", 229 - "type": "ConstellationGraphService", 230 - "serviceEndpoint": format!("https://{domain}") 231 - }] 232 - })) 233 } 234 235 #[derive(Template, Serialize, Deserialize)]
··· 3 extract::{Query, Request}, 4 http::{self, header}, 5 middleware::{self, Next}, 6 + response::{IntoResponse, Response}, 7 routing::get, 8 Router, 9 }; ··· 37 http::StatusCode::INTERNAL_SERVER_ERROR 38 } 39 40 + pub async fn serve<S, A>(store: S, addr: A, stay_alive: CancellationToken) -> anyhow::Result<()> 41 + where 42 + S: LinkReader, 43 + A: ToSocketAddrs, 44 + { 45 + let app = Router::new() 46 .route("/robots.txt", get(robots)) 47 .route( 48 "/", ··· 204 User-agent: * 205 Disallow: /links 206 Disallow: /links/ 207 " 208 } 209 210 #[derive(Template, Serialize, Deserialize)]
+208 -16
slingshot/src/identity.rs
··· 17 18 use crate::error::IdentityError; 19 use atrium_api::{ 20 - did_doc::DidDocument, 21 types::string::{Did, Handle}, 22 }; 23 use atrium_common::resolver::Resolver; ··· 41 pub enum IdentityKey { 42 Handle(Handle), 43 Did(Did), 44 } 45 46 impl IdentityKey { ··· 48 let s = match self { 49 IdentityKey::Handle(h) => h.as_str(), 50 IdentityKey::Did(d) => d.as_str(), 51 }; 52 std::mem::size_of::<Self>() + std::mem::size_of_val(s) 53 } ··· 59 #[derive(Debug, Serialize, Deserialize)] 60 enum IdentityData { 61 NotFound, 62 - Did(Did), 63 - Doc(PartialMiniDoc), 64 } 65 66 impl IdentityVal { ··· 71 IdentityData::Did(d) => std::mem::size_of_val(d.as_str()), 72 IdentityData::Doc(d) => { 73 std::mem::size_of_val(d.unverified_handle.as_str()) 74 - + std::mem::size_of_val(d.pds.as_str()) 75 - + std::mem::size_of_val(d.signing_key.as_str()) 76 } 77 }; 78 wrapping + inner ··· 168 } 169 } 170 171 /// multi-producer *single-consumer* queue structures (wrap in arc-mutex plz) 172 /// 173 /// the hashset allows testing for presense of items in the queue. ··· 296 let now = UtcDateTime::now(); 297 let IdentityVal(last_fetch, data) = entry.value(); 298 match data { 299 - IdentityData::Doc(_) => { 300 - log::error!("identity value mixup: got a doc from a handle key (should be a did)"); 301 - Err(IdentityError::IdentityValTypeMixup(handle.to_string())) 302 - } 303 IdentityData::NotFound => { 304 if (now - *last_fetch) >= MIN_NOT_FOUND_TTL { 305 metrics::counter!("identity_handle_refresh_queued", "reason" => "ttl", "found" => "false").increment(1); ··· 313 self.queue_refresh(key).await; 314 } 315 Ok(Some(did.clone())) 316 } 317 } 318 } ··· 362 let now = UtcDateTime::now(); 363 let IdentityVal(last_fetch, data) = entry.value(); 364 match data { 365 - IdentityData::Did(_) => { 366 - log::error!("identity value mixup: got a did from a did key (should be a doc)"); 367 - Err(IdentityError::IdentityValTypeMixup(did.to_string())) 368 - } 369 IdentityData::NotFound => { 370 if (now - *last_fetch) >= MIN_NOT_FOUND_TTL { 371 metrics::counter!("identity_did_refresh_queued", "reason" => "ttl", "found" => "false").increment(1); ··· 373 } 374 Ok(None) 375 } 376 - IdentityData::Doc(mini_did) => { 377 if (now - *last_fetch) >= MIN_TTL { 378 metrics::counter!("identity_did_refresh_queued", "reason" => "ttl", "found" => "true").increment(1); 379 self.queue_refresh(key).await; 380 } 381 - Ok(Some(mini_did.clone())) 382 } 383 } 384 } ··· 519 log::warn!( 520 "refreshed did doc failed: wrong did doc id. dropping refresh." 521 ); 522 continue; 523 } 524 let mini_doc = match did_doc.try_into() { ··· 526 Err(e) => { 527 metrics::counter!("identity_did_refresh", "success" => "false", "reason" => "bad doc").increment(1); 528 log::warn!( 529 - "converting mini doc failed: {e:?}. dropping refresh." 530 ); 531 continue; 532 } 533 }; ··· 554 } 555 556 self.complete_refresh(&task_key).await?; // failures are bugs, so break loop 557 } 558 } 559 }
··· 17 18 use crate::error::IdentityError; 19 use atrium_api::{ 20 + did_doc::{DidDocument, Service as DidDocServic}, 21 types::string::{Did, Handle}, 22 }; 23 use atrium_common::resolver::Resolver; ··· 41 pub enum IdentityKey { 42 Handle(Handle), 43 Did(Did), 44 + ServiceDid(Did), 45 } 46 47 impl IdentityKey { ··· 49 let s = match self { 50 IdentityKey::Handle(h) => h.as_str(), 51 IdentityKey::Did(d) => d.as_str(), 52 + IdentityKey::ServiceDid(d) => d.as_str(), 53 }; 54 std::mem::size_of::<Self>() + std::mem::size_of_val(s) 55 } ··· 61 #[derive(Debug, Serialize, Deserialize)] 62 enum IdentityData { 63 NotFound, 64 + Did(Did), // from handle 65 + Doc(PartialMiniDoc), // from did 66 + ServiceDoc(MiniServiceDoc), // from service did 67 } 68 69 impl IdentityVal { ··· 74 IdentityData::Did(d) => std::mem::size_of_val(d.as_str()), 75 IdentityData::Doc(d) => { 76 std::mem::size_of_val(d.unverified_handle.as_str()) 77 + + std::mem::size_of_val(&d.pds) 78 + + std::mem::size_of_val(&d.signing_key) 79 + } 80 + IdentityData::ServiceDoc(d) => { 81 + let mut s = std::mem::size_of::<MiniServiceDoc>(); 82 + s += std::mem::size_of_val(&d.services); 83 + for sv in &d.services { 84 + s += std::mem::size_of_val(&sv.full_id); 85 + s += std::mem::size_of_val(&sv.r#type); 86 + s += std::mem::size_of_val(&sv.endpoint); 87 + } 88 + s 89 } 90 }; 91 wrapping + inner ··· 181 } 182 } 183 184 + /// Simplified info from service DID docs 185 + #[derive(Debug, Clone, Serialize, Deserialize)] 186 + pub struct MiniServiceDoc { 187 + services: Vec<MiniService>, 188 + } 189 + 190 + impl MiniServiceDoc { 191 + pub fn get(&self, id_fragment: &str, service_type: Option<&str>) -> Option<&MiniService> { 192 + self.services.iter().find(|ms| { 193 + ms.full_id.ends_with(id_fragment) 194 + && service_type.map(|t| t == ms.r#type).unwrap_or(true) 195 + }) 196 + } 197 + } 198 + 199 + /// The corresponding service info 200 + #[derive(Debug, Clone, Serialize, Deserialize)] 201 + pub struct MiniService { 202 + /// The full id 203 + /// 204 + /// for informational purposes only -- services are deduplicated by id fragment 205 + full_id: String, 206 + r#type: String, 207 + /// HTTP endpoint for the actual service 208 + pub endpoint: String, 209 + } 210 + 211 + impl TryFrom<DidDocument> for MiniServiceDoc { 212 + type Error = String; 213 + fn try_from(did_doc: DidDocument) -> Result<Self, Self::Error> { 214 + let mut services = Vec::new(); 215 + let mut seen = HashSet::new(); 216 + 217 + for DidDocServic { 218 + id, 219 + r#type, 220 + service_endpoint, 221 + } in did_doc.service.unwrap_or(vec![]) 222 + { 223 + let Some((_, id_fragment)) = id.rsplit_once('#') else { 224 + continue; 225 + }; 226 + if !seen.insert((id_fragment.to_string(), r#type.clone())) { 227 + continue; 228 + } 229 + services.push(MiniService { 230 + full_id: id, 231 + r#type, 232 + endpoint: service_endpoint, 233 + }); 234 + } 235 + 236 + Ok(Self { services }) 237 + } 238 + } 239 + 240 /// multi-producer *single-consumer* queue structures (wrap in arc-mutex plz) 241 /// 242 /// the hashset allows testing for presense of items in the queue. ··· 365 let now = UtcDateTime::now(); 366 let IdentityVal(last_fetch, data) = entry.value(); 367 match data { 368 IdentityData::NotFound => { 369 if (now - *last_fetch) >= MIN_NOT_FOUND_TTL { 370 metrics::counter!("identity_handle_refresh_queued", "reason" => "ttl", "found" => "false").increment(1); ··· 378 self.queue_refresh(key).await; 379 } 380 Ok(Some(did.clone())) 381 + } 382 + _ => { 383 + log::error!("identity value mixup: got a doc from a handle key (should be a did)"); 384 + Err(IdentityError::IdentityValTypeMixup(handle.to_string())) 385 } 386 } 387 } ··· 431 let now = UtcDateTime::now(); 432 let IdentityVal(last_fetch, data) = entry.value(); 433 match data { 434 IdentityData::NotFound => { 435 if (now - *last_fetch) >= MIN_NOT_FOUND_TTL { 436 metrics::counter!("identity_did_refresh_queued", "reason" => "ttl", "found" => "false").increment(1); ··· 438 } 439 Ok(None) 440 } 441 + IdentityData::Doc(mini_doc) => { 442 if (now - *last_fetch) >= MIN_TTL { 443 metrics::counter!("identity_did_refresh_queued", "reason" => "ttl", "found" => "true").increment(1); 444 self.queue_refresh(key).await; 445 } 446 + Ok(Some(mini_doc.clone())) 447 + } 448 + _ => { 449 + log::error!("identity value mixup: got a doc from a handle key (should be a did)"); 450 + Err(IdentityError::IdentityValTypeMixup(did.to_string())) 451 + } 452 + } 453 + } 454 + 455 + /// Fetch (and cache) a service mini doc from a did 456 + pub async fn did_to_mini_service_doc( 457 + &self, 458 + did: &Did, 459 + ) -> Result<Option<MiniServiceDoc>, IdentityError> { 460 + let key = IdentityKey::ServiceDid(did.clone()); 461 + metrics::counter!("slingshot_get_service_did_doc").increment(1); 462 + let entry = self 463 + .cache 464 + .get_or_fetch(&key, { 465 + let did = did.clone(); 466 + let resolver = self.did_resolver.clone(); 467 + || async move { 468 + let t0 = Instant::now(); 469 + let (res, success) = match resolver.resolve(&did).await { 470 + Ok(did_doc) if did_doc.id != did.to_string() => ( 471 + // TODO: fix in atrium: should verify id is did 472 + Err(IdentityError::BadDidDoc( 473 + "did doc's id did not match did".to_string(), 474 + )), 475 + "false", 476 + ), 477 + Ok(did_doc) => match did_doc.try_into() { 478 + Ok(mini_service_doc) => ( 479 + Ok(IdentityVal( 480 + UtcDateTime::now(), 481 + IdentityData::ServiceDoc(mini_service_doc), 482 + )), 483 + "true", 484 + ), 485 + Err(e) => (Err(IdentityError::BadDidDoc(e)), "false"), 486 + }, 487 + Err(atrium_identity::Error::NotFound) => ( 488 + Ok(IdentityVal(UtcDateTime::now(), IdentityData::NotFound)), 489 + "false", 490 + ), 491 + Err(other) => (Err(IdentityError::ResolutionFailed(other)), "false"), 492 + }; 493 + metrics::histogram!("slingshot_fetch_service_did_doc", "success" => success) 494 + .record(t0.elapsed()); 495 + res 496 + } 497 + }) 498 + .await?; 499 + 500 + let now = UtcDateTime::now(); 501 + let IdentityVal(last_fetch, data) = entry.value(); 502 + match data { 503 + IdentityData::NotFound => { 504 + if (now - *last_fetch) >= MIN_NOT_FOUND_TTL { 505 + metrics::counter!("identity_service_did_refresh_queued", "reason" => "ttl", "found" => "false").increment(1); 506 + self.queue_refresh(key).await; 507 + } 508 + Ok(None) 509 + } 510 + IdentityData::ServiceDoc(mini_service_doc) => { 511 + if (now - *last_fetch) >= MIN_TTL { 512 + metrics::counter!("identity_service_did_refresh_queued", "reason" => "ttl", "found" => "true").increment(1); 513 + self.queue_refresh(key).await; 514 + } 515 + Ok(Some(mini_service_doc.clone())) 516 + } 517 + _ => { 518 + log::error!( 519 + "identity value mixup: got a doc from a different key type (should be a service did)" 520 + ); 521 + Err(IdentityError::IdentityValTypeMixup(did.to_string())) 522 } 523 } 524 } ··· 659 log::warn!( 660 "refreshed did doc failed: wrong did doc id. dropping refresh." 661 ); 662 + self.complete_refresh(&task_key).await?; 663 continue; 664 } 665 let mini_doc = match did_doc.try_into() { ··· 667 Err(e) => { 668 metrics::counter!("identity_did_refresh", "success" => "false", "reason" => "bad doc").increment(1); 669 log::warn!( 670 + "converting mini doc for {did:?} failed: {e:?}. dropping refresh." 671 ); 672 + self.complete_refresh(&task_key).await?; 673 continue; 674 } 675 }; ··· 696 } 697 698 self.complete_refresh(&task_key).await?; // failures are bugs, so break loop 699 + } 700 + IdentityKey::ServiceDid(ref did) => { 701 + log::trace!("refreshing service did doc: {did:?}"); 702 + 703 + match self.did_resolver.resolve(did).await { 704 + Ok(did_doc) => { 705 + // TODO: fix in atrium: should verify id is did 706 + if did_doc.id != did.to_string() { 707 + metrics::counter!("identity_service_did_refresh", "success" => "false", "reason" => "wrong did").increment(1); 708 + log::warn!( 709 + "refreshed did doc failed: wrong did doc id. dropping refresh." 710 + ); 711 + self.complete_refresh(&task_key).await?; 712 + continue; 713 + } 714 + let mini_service_doc = match did_doc.try_into() { 715 + Ok(md) => md, 716 + Err(e) => { 717 + metrics::counter!("identity_service_did_refresh", "success" => "false", "reason" => "bad doc").increment(1); 718 + log::warn!( 719 + "converting mini service doc failed: {e:?}. dropping refresh." 720 + ); 721 + self.complete_refresh(&task_key).await?; 722 + continue; 723 + } 724 + }; 725 + metrics::counter!("identity_service_did_refresh", "success" => "true") 726 + .increment(1); 727 + self.cache.insert( 728 + task_key.clone(), 729 + IdentityVal( 730 + UtcDateTime::now(), 731 + IdentityData::ServiceDoc(mini_service_doc), 732 + ), 733 + ); 734 + } 735 + Err(atrium_identity::Error::NotFound) => { 736 + metrics::counter!("identity_service_did_refresh", "success" => "false", "reason" => "not found").increment(1); 737 + self.cache.insert( 738 + task_key.clone(), 739 + IdentityVal(UtcDateTime::now(), IdentityData::NotFound), 740 + ); 741 + } 742 + Err(err) => { 743 + metrics::counter!("identity_service_did_refresh", "success" => "false", "reason" => "other").increment(1); 744 + log::warn!( 745 + "failed to refresh did doc: {err:?}. leaving stale (should we eventually do something?)" 746 + ); 747 + } 748 + } 749 } 750 } 751 }

History

2 rounds 2 comments
sign up or login to add to the discussion
8 commits
expand
very hack proxy stuff (wip)
identities!!!!
service doc cache
proxy: use service cache
fmt
include headers, timeout hydration, measure times
base_url, typed url parts, api fixups,
only try to hydrate 100 records
no conflicts, ready to merge
expand 0 comments
1 commit
expand
very hack proxy stuff (wip)
expand 2 comments

the record contents from com.atproto.repo.getRecord are returned under a key called "value", so this should probably use that.