Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm

hydrating xrpc proxy (wip) #15

open opened by bad-example.com targeting main from slingshot-proxy-hydrate
Labels

None yet.

Participants 1
AT URI
at://did:plc:hdhoaan3xa3jiuq4fg4mefid/sh.tangled.repo.pull/3me2sdlxbvy22
+784 -12
Diff #0
+1
Cargo.lock
··· 5965 5965 "form_urlencoded", 5966 5966 "idna", 5967 5967 "percent-encoding", 5968 + "serde", 5968 5969 ] 5969 5970 5970 5971 [[package]]
+1 -1
slingshot/Cargo.toml
··· 28 28 tokio = { version = "1.47.0", features = ["full"] } 29 29 tokio-util = "0.7.15" 30 30 tracing-subscriber = { version = "0.3.19", features = ["env-filter"] } 31 - url = "2.5.4" 31 + url = { version = "2.5.4", features = ["serde"] }
+10
slingshot/src/error.rs
··· 91 91 #[error("upstream non-atproto bad request")] 92 92 UpstreamBadBadNotGoodRequest(reqwest::Error), 93 93 } 94 + 95 + #[derive(Debug, Error)] 96 + pub enum ProxyError { 97 + #[error("failed to parse path: {0}")] 98 + PathParseError(String), 99 + #[error(transparent)] 100 + UrlParseError(#[from] url::ParseError), 101 + #[error(transparent)] 102 + ReqwestError(#[from] reqwest::Error), 103 + }
+2
slingshot/src/lib.rs
··· 3 3 mod firehose_cache; 4 4 mod healthcheck; 5 5 mod identity; 6 + mod proxy; 6 7 mod record; 7 8 mod server; 8 9 ··· 10 11 pub use firehose_cache::firehose_cache; 11 12 pub use healthcheck::healthcheck; 12 13 pub use identity::{Identity, IdentityKey}; 14 + pub use proxy::Proxy; 13 15 pub use record::{CachedRecord, ErrorResponseObject, Repo}; 14 16 pub use server::serve;
+4 -3
slingshot/src/main.rs
··· 2 2 // use foyer::{Engine, DirectFsDeviceOptions, HybridCacheBuilder}; 3 3 use metrics_exporter_prometheus::PrometheusBuilder; 4 4 use slingshot::{ 5 - Identity, Repo, consume, error::MainTaskError, firehose_cache, healthcheck, serve, 5 + Identity, Proxy, Repo, consume, error::MainTaskError, firehose_cache, healthcheck, serve, 6 6 }; 7 7 use std::net::SocketAddr; 8 8 use std::path::PathBuf; ··· 143 143 ) 144 144 .await 145 145 .map_err(|e| format!("identity setup failed: {e:?}"))?; 146 - 147 - log::info!("identity service ready."); 148 146 let identity_refresher = identity.clone(); 149 147 let identity_shutdown = shutdown.clone(); 150 148 tasks.spawn(async move { 151 149 identity_refresher.run_refresher(identity_shutdown).await?; 152 150 Ok(()) 153 151 }); 152 + log::info!("identity service ready."); 154 153 155 154 let repo = Repo::new(identity.clone()); 155 + let proxy = Proxy::new(repo.clone()); 156 156 157 157 let identity_for_server = identity.clone(); 158 158 let server_shutdown = shutdown.clone(); ··· 163 163 server_cache_handle, 164 164 identity_for_server, 165 165 repo, 166 + proxy, 166 167 args.acme_domain, 167 168 args.acme_contact, 168 169 args.acme_cache_path,
+487
slingshot/src/proxy.rs
··· 1 + use serde::Deserialize; 2 + use url::Url; 3 + use std::{collections::HashMap, time::Duration}; 4 + use crate::{Repo, server::HydrationSource, error::ProxyError}; 5 + use reqwest::Client; 6 + use serde_json::{Map, Value}; 7 + 8 + pub enum ParamValue { 9 + String(Vec<String>), 10 + Int(Vec<i64>), 11 + Bool(Vec<bool>), 12 + } 13 + pub struct Params(HashMap<String, ParamValue>); 14 + 15 + impl TryFrom<Map<String, Value>> for Params { 16 + type Error = (); // TODO 17 + fn try_from(val: Map<String, Value>) -> Result<Self, Self::Error> { 18 + let mut out = HashMap::new(); 19 + for (k, v) in val { 20 + match v { 21 + Value::String(s) => out.insert(k, ParamValue::String(vec![s])), 22 + Value::Bool(b) => out.insert(k, ParamValue::Bool(vec![b])), 23 + Value::Number(n) => { 24 + let Some(i) = n.as_i64() else { 25 + return Err(()); 26 + }; 27 + out.insert(k, ParamValue::Int(vec![i])) 28 + } 29 + Value::Array(a) => { 30 + let Some(first) = a.first() else { 31 + continue; 32 + }; 33 + if first.is_string() { 34 + let mut vals = Vec::with_capacity(a.len()); 35 + for v in a { 36 + let Some(v) = v.as_str() else { 37 + return Err(()); 38 + }; 39 + vals.push(v.to_string()); 40 + } 41 + out.insert(k, ParamValue::String(vals)); 42 + } else if first.is_i64() { 43 + let mut vals = Vec::with_capacity(a.len()); 44 + for v in a { 45 + let Some(v) = v.as_i64() else { 46 + return Err(()); 47 + }; 48 + vals.push(v); 49 + } 50 + out.insert(k, ParamValue::Int(vals)); 51 + } else if first.is_boolean() { 52 + let mut vals = Vec::with_capacity(a.len()); 53 + for v in a { 54 + let Some(v) = v.as_bool() else { 55 + return Err(()); 56 + }; 57 + vals.push(v); 58 + } 59 + out.insert(k, ParamValue::Bool(vals)); 60 + } 61 + todo!(); 62 + } 63 + _ => return Err(()), 64 + }; 65 + } 66 + 67 + Ok(Self(out)) 68 + } 69 + } 70 + 71 + #[derive(Clone)] 72 + pub struct Proxy { 73 + repo: Repo, 74 + client: Client, 75 + } 76 + 77 + impl Proxy { 78 + pub fn new(repo: Repo) -> Self { 79 + let client = Client::builder() 80 + .user_agent(format!( 81 + "microcosm slingshot v{} (contact: @bad-example.com)", 82 + env!("CARGO_PKG_VERSION") 83 + )) 84 + .no_proxy() 85 + .timeout(Duration::from_secs(6)) 86 + .build() 87 + .unwrap(); 88 + Self { repo, client } 89 + } 90 + 91 + pub async fn proxy( 92 + &self, 93 + xrpc: String, 94 + service: String, 95 + params: Option<Map<String, Value>>, 96 + ) -> Result<Value, ProxyError> { 97 + 98 + // hackin it to start 99 + 100 + // 1. assume did-web (TODO) and get the did doc 101 + #[derive(Debug, Deserialize)] 102 + struct ServiceDoc { 103 + id: String, 104 + service: Vec<ServiceItem>, 105 + } 106 + #[derive(Debug, Deserialize)] 107 + struct ServiceItem { 108 + id: String, 109 + #[expect(unused)] 110 + r#type: String, 111 + #[serde(rename = "serviceEndpoint")] 112 + service_endpoint: Url, 113 + } 114 + let dw = service.strip_prefix("did:web:").expect("a did web"); 115 + let (dw, service_id) = dw.split_once("#").expect("whatever"); 116 + let mut dw_url = Url::parse(&format!("https://{dw}"))?; 117 + dw_url.set_path("/.well-known/did.json"); 118 + let doc: ServiceDoc = self.client 119 + .get(dw_url) 120 + .send() 121 + .await? 122 + .error_for_status()? 123 + .json() 124 + .await?; 125 + 126 + assert_eq!(doc.id, format!("did:web:{}", dw)); 127 + 128 + let mut upstream = None; 129 + for ServiceItem { id, service_endpoint, .. } in doc.service { 130 + let Some((_, id)) = id.split_once("#") else { continue; }; 131 + if id != service_id { continue; }; 132 + upstream = Some(service_endpoint); 133 + break; 134 + } 135 + 136 + // 2. proxy the request forward 137 + let mut upstream = upstream.expect("to find it"); 138 + upstream.set_path(&format!("/xrpc/{xrpc}")); // TODO: validate nsid 139 + 140 + if let Some(params) = params { 141 + let mut query = upstream.query_pairs_mut(); 142 + let Params(ps) = params.try_into().expect("valid params"); 143 + for (k, pvs) in ps { 144 + match pvs { 145 + ParamValue::String(s) => { 146 + for s in s { 147 + query.append_pair(&k, &s); 148 + } 149 + } 150 + ParamValue::Int(i) => { 151 + for i in i { 152 + query.append_pair(&k, &i.to_string()); 153 + } 154 + } 155 + ParamValue::Bool(b) => { 156 + for b in b { 157 + query.append_pair(&k, &b.to_string()); 158 + } 159 + } 160 + } 161 + } 162 + } 163 + 164 + // TODO: other headers to proxy 165 + Ok(self.client 166 + .get(upstream) 167 + .send() 168 + .await? 169 + .error_for_status()? 170 + .json() 171 + .await?) 172 + } 173 + } 174 + 175 + #[derive(Debug, PartialEq)] 176 + pub enum PathPart { 177 + Scalar(String), 178 + Vector(String, Option<String>), // key, $type 179 + } 180 + 181 + pub fn parse_record_path(input: &str) -> Result<Vec<PathPart>, String> { 182 + let mut out = Vec::new(); 183 + 184 + let mut key_acc = String::new(); 185 + let mut type_acc = String::new(); 186 + let mut in_bracket = false; 187 + let mut chars = input.chars().enumerate(); 188 + while let Some((i, c)) = chars.next() { 189 + match c { 190 + '[' if in_bracket => return Err(format!("nested opening bracket not allowed, at {i}")), 191 + '[' if key_acc.is_empty() => return Err(format!("missing key before opening bracket, at {i}")), 192 + '[' => in_bracket = true, 193 + ']' if in_bracket => { 194 + in_bracket = false; 195 + let key = std::mem::take(&mut key_acc); 196 + let r#type = std::mem::take(&mut type_acc); 197 + let t = if r#type.is_empty() { None } else { Some(r#type) }; 198 + out.push(PathPart::Vector(key, t)); 199 + // peek ahead because we need a dot after array if there's more and i don't want to add more loop state 200 + let Some((i, c)) = chars.next() else { 201 + break; 202 + }; 203 + if c != '.' { 204 + return Err(format!("expected dot after close bracket, found {c:?} at {i}")); 205 + } 206 + } 207 + ']' => return Err(format!("unexpected close bracket at {i}")), 208 + '.' if in_bracket => type_acc.push(c), 209 + '.' if key_acc.is_empty() => return Err(format!("missing key before next segment, at {i}")), 210 + '.' => { 211 + let key = std::mem::take(&mut key_acc); 212 + assert!(type_acc.is_empty()); 213 + out.push(PathPart::Scalar(key)); 214 + } 215 + _ if in_bracket => type_acc.push(c), 216 + _ => key_acc.push(c), 217 + } 218 + } 219 + if in_bracket { 220 + return Err("unclosed bracket".into()); 221 + } 222 + if !key_acc.is_empty() { 223 + out.push(PathPart::Scalar(key_acc)); 224 + } 225 + Ok(out) 226 + } 227 + 228 + #[derive(Debug, Clone, Copy, PartialEq)] 229 + pub enum RefShape { 230 + StrongRef, 231 + AtUri, 232 + AtUriParts, 233 + Did, 234 + Handle, 235 + AtIdentifier, 236 + } 237 + 238 + impl TryFrom<&str> for RefShape { 239 + type Error = String; 240 + fn try_from(s: &str) -> Result<Self, Self::Error> { 241 + match s { 242 + "strong-ref" => Ok(Self::StrongRef), 243 + "at-uri" => Ok(Self::AtUri), 244 + "at-uri-parts" => Ok(Self::AtUriParts), 245 + "did" => Ok(Self::Did), 246 + "handle" => Ok(Self::Handle), 247 + "at-identifier" => Ok(Self::AtIdentifier), 248 + _ => Err(format!("unknown shape: {s}")), 249 + } 250 + } 251 + } 252 + 253 + #[derive(Debug, PartialEq)] 254 + pub enum MatchedRef { 255 + AtUri { 256 + uri: String, 257 + cid: Option<String>, 258 + }, 259 + Identifier(String), 260 + } 261 + 262 + pub fn match_shape(shape: RefShape, val: &Value) -> Option<MatchedRef> { 263 + // TODO: actually validate at-uri format 264 + // TODO: actually validate everything else also 265 + // TODO: should this function normalize identifiers to DIDs probably? 266 + // or just return at-uri parts so the caller can resolve and reassemble 267 + match shape { 268 + RefShape::StrongRef => { 269 + let o = val.as_object()?; 270 + let uri = o.get("uri")?.as_str()?.to_string(); 271 + let cid = o.get("cid")?.as_str()?.to_string(); 272 + Some(MatchedRef::AtUri { uri, cid: Some(cid) }) 273 + } 274 + RefShape::AtUri => { 275 + let uri = val.as_str()?.to_string(); 276 + Some(MatchedRef::AtUri { uri, cid: None }) 277 + } 278 + RefShape::AtUriParts => { 279 + let o = val.as_object()?; 280 + let identifier = o.get("repo").or(o.get("did"))?.as_str()?.to_string(); 281 + let collection = o.get("collection")?.as_str()?.to_string(); 282 + let rkey = o.get("rkey")?.as_str()?.to_string(); 283 + let uri = format!("at://{identifier}/{collection}/{rkey}"); 284 + let cid = o.get("cid").and_then(|v| v.as_str()).map(str::to_string); 285 + Some(MatchedRef::AtUri { uri, cid }) 286 + } 287 + RefShape::Did => { 288 + let id = val.as_str()?; 289 + if !id.starts_with("did:") { 290 + return None; 291 + } 292 + Some(MatchedRef::Identifier(id.to_string())) 293 + } 294 + RefShape::Handle => { 295 + let id = val.as_str()?; 296 + if id.contains(':') { 297 + return None; 298 + } 299 + Some(MatchedRef::Identifier(id.to_string())) 300 + } 301 + RefShape::AtIdentifier => { 302 + Some(MatchedRef::Identifier(val.as_str()?.to_string())) 303 + } 304 + } 305 + } 306 + 307 + // TODO: send back metadata about the matching 308 + pub fn extract_links( 309 + sources: Vec<HydrationSource>, 310 + skeleton: &Value, 311 + ) -> Result<Vec<MatchedRef>, String> { 312 + // collect early to catch errors from the client 313 + // (TODO maybe the handler should do this and pass in the processed stuff probably definitely yeah) 314 + let sources = sources 315 + .into_iter() 316 + .map(|HydrationSource { path, shape }| { 317 + let path_parts = parse_record_path(&path)?; 318 + let shape: RefShape = shape.as_str().try_into()?; 319 + Ok((path_parts, shape)) 320 + }) 321 + .collect::<Result<Vec<_>, String>>()?; 322 + 323 + // lazy first impl, just re-walk the skeleton as many times as needed 324 + // not deduplicating for now 325 + let mut out = Vec::new(); 326 + for (path_parts, shape) in sources { 327 + for val in PathWalker::new(&path_parts, skeleton) { 328 + if let Some(matched) = match_shape(shape, val) { 329 + out.push(matched); 330 + } 331 + } 332 + } 333 + 334 + Ok(out) 335 + } 336 + 337 + struct PathWalker<'a> { 338 + todo: Vec<(&'a [PathPart], &'a Value)>, 339 + } 340 + impl<'a> PathWalker<'a> { 341 + fn new(path_parts: &'a [PathPart], skeleton: &'a Value) -> Self { 342 + Self { todo: vec![(path_parts, skeleton)] } 343 + } 344 + } 345 + impl<'a> Iterator for PathWalker<'a> { 346 + type Item = &'a Value; 347 + fn next(&mut self) -> Option<Self::Item> { 348 + loop { 349 + let (parts, val) = self.todo.pop()?; 350 + let Some((part, rest)) = parts.split_first() else { 351 + return Some(val); 352 + }; 353 + let Some(o) = val.as_object() else { 354 + continue; 355 + }; 356 + match part { 357 + PathPart::Scalar(k) => { 358 + let Some(v) = o.get(k) else { 359 + continue; 360 + }; 361 + self.todo.push((rest, v)); 362 + } 363 + PathPart::Vector(k, t) => { 364 + let Some(a) = o.get(k).and_then(|v| v.as_array()) else { 365 + continue; 366 + }; 367 + for v in a 368 + .iter() 369 + .rev() 370 + .filter(|c| { 371 + let Some(t) = t else { return true }; 372 + c 373 + .as_object() 374 + .and_then(|o| o.get("$type")) 375 + .and_then(|v| v.as_str()) 376 + .map(|s| s == t) 377 + .unwrap_or(false) 378 + }) 379 + { 380 + self.todo.push((rest, v)) 381 + } 382 + } 383 + } 384 + } 385 + } 386 + } 387 + 388 + 389 + #[cfg(test)] 390 + mod tests { 391 + use super::*; 392 + use serde_json::json; 393 + 394 + #[test] 395 + fn test_parse_record_path() -> Result<(), Box<dyn std::error::Error>> { 396 + let cases = [ 397 + ("", vec![]), 398 + ("subject", vec![PathPart::Scalar("subject".into())]), 399 + ("authorDid", vec![PathPart::Scalar("authorDid".into())]), 400 + ("subject.uri", vec![PathPart::Scalar("subject".into()), PathPart::Scalar("uri".into())]), 401 + ("members[]", vec![PathPart::Vector("members".into(), None)]), 402 + ("add[].key", vec![ 403 + PathPart::Vector("add".into(), None), 404 + PathPart::Scalar("key".into()), 405 + ]), 406 + ("a[b]", vec![PathPart::Vector("a".into(), Some("b".into()))]), 407 + ("a[b.c]", vec![PathPart::Vector("a".into(), Some("b.c".into()))]), 408 + ("facets[app.bsky.richtext.facet].features[app.bsky.richtext.facet#mention].did", vec![ 409 + PathPart::Vector("facets".into(), Some("app.bsky.richtext.facet".into())), 410 + PathPart::Vector("features".into(), Some("app.bsky.richtext.facet#mention".into())), 411 + PathPart::Scalar("did".into()), 412 + ]), 413 + ]; 414 + 415 + for (path, expected) in cases { 416 + let parsed = parse_record_path(path)?; 417 + assert_eq!(parsed, expected, "path: {path:?}"); 418 + } 419 + 420 + Ok(()) 421 + } 422 + 423 + #[test] 424 + fn test_match_shape() { 425 + let cases = [ 426 + ("strong-ref", json!(""), None), 427 + ("strong-ref", json!({}), None), 428 + ("strong-ref", json!({ "uri": "abc" }), None), 429 + ("strong-ref", json!({ "cid": "def" }), None), 430 + ( 431 + "strong-ref", 432 + json!({ "uri": "abc", "cid": "def" }), 433 + Some(MatchedRef::AtUri { uri: "abc".to_string(), cid: Some("def".to_string()) }), 434 + ), 435 + ("at-uri", json!({ "uri": "abc" }), None), 436 + ("at-uri", json!({ "uri": "abc", "cid": "def" }), None), 437 + ( 438 + "at-uri", 439 + json!("abc"), 440 + Some(MatchedRef::AtUri { uri: "abc".to_string(), cid: None }), 441 + ), 442 + ("at-uri-parts", json!("abc"), None), 443 + ("at-uri-parts", json!({}), None), 444 + ( 445 + "at-uri-parts", 446 + json!({"repo": "a", "collection": "b", "rkey": "c"}), 447 + Some(MatchedRef::AtUri { uri: "at://a/b/c".to_string(), cid: None }), 448 + ), 449 + ( 450 + "at-uri-parts", 451 + json!({"did": "a", "collection": "b", "rkey": "c"}), 452 + Some(MatchedRef::AtUri { uri: "at://a/b/c".to_string(), cid: None }), 453 + ), 454 + ( 455 + "at-uri-parts", 456 + // 'repo' takes precedence over 'did' 457 + json!({"did": "a", "repo": "z", "collection": "b", "rkey": "c"}), 458 + Some(MatchedRef::AtUri { uri: "at://z/b/c".to_string(), cid: None }), 459 + ), 460 + ( 461 + "at-uri-parts", 462 + json!({"repo": "a", "collection": "b", "rkey": "c", "cid": "def"}), 463 + Some(MatchedRef::AtUri { uri: "at://a/b/c".to_string(), cid: Some("def".to_string()) }), 464 + ), 465 + ( 466 + "at-uri-parts", 467 + json!({"repo": "a", "collection": "b", "rkey": "c", "cid": {}}), 468 + Some(MatchedRef::AtUri { uri: "at://a/b/c".to_string(), cid: None }), 469 + ), 470 + ("did", json!({}), None), 471 + ("did", json!(""), None), 472 + ("did", json!("bad-example.com"), None), 473 + ("did", json!("did:plc:xyz"), Some(MatchedRef::Identifier("did:plc:xyz".to_string()))), 474 + ("handle", json!({}), None), 475 + ("handle", json!("bad-example.com"), Some(MatchedRef::Identifier("bad-example.com".to_string()))), 476 + ("handle", json!("did:plc:xyz"), None), 477 + ("at-identifier", json!({}), None), 478 + ("at-identifier", json!("bad-example.com"), Some(MatchedRef::Identifier("bad-example.com".to_string()))), 479 + ("at-identifier", json!("did:plc:xyz"), Some(MatchedRef::Identifier("did:plc:xyz".to_string()))), 480 + ]; 481 + for (shape, val, expected) in cases { 482 + let s = shape.try_into().unwrap(); 483 + let matched = match_shape(s, &val); 484 + assert_eq!(matched, expected, "shape: {shape:?}, val: {val:?}"); 485 + } 486 + } 487 + }
+2 -2
slingshot/src/record.rs
··· 11 11 12 12 #[derive(Debug, Serialize, Deserialize)] 13 13 pub struct RawRecord { 14 - cid: Cid, 15 - record: String, 14 + pub cid: Cid, 15 + pub record: String, 16 16 } 17 17 18 18 // TODO: should be able to do typed CID
+277 -6
slingshot/src/server.rs
··· 1 1 use crate::{ 2 - CachedRecord, ErrorResponseObject, Identity, Repo, 2 + CachedRecord, ErrorResponseObject, Identity, Proxy, Repo, 3 3 error::{RecordError, ServerError}, 4 + proxy::{extract_links, MatchedRef}, 5 + record::RawRecord, 4 6 }; 5 7 use atrium_api::types::string::{Cid, Did, Handle, Nsid, RecordKey}; 6 8 use foyer::HybridCache; 7 9 use links::at_uri::parse_at_uri as normalize_at_uri; 8 10 use serde::Serialize; 9 - use std::path::PathBuf; 10 - use std::str::FromStr; 11 - use std::sync::Arc; 12 - use std::time::Instant; 11 + use std::{path::PathBuf, str::FromStr, sync::Arc, time::Instant, collections::HashMap}; 12 + use tokio::sync::mpsc; 13 13 use tokio_util::sync::CancellationToken; 14 14 15 15 use poem::{ ··· 24 24 }; 25 25 use poem_openapi::{ 26 26 ApiResponse, ContactObject, ExternalDocumentObject, Object, OpenApi, OpenApiService, Tags, 27 + Union, 27 28 param::Query, payload::Json, types::Example, 28 29 }; 29 30 ··· 92 93 })) 93 94 } 94 95 96 + fn bad_request_handler_proxy_query(err: poem::Error) -> ProxyHydrateResponse { 97 + ProxyHydrateResponse::BadRequest(Json(XrpcErrorResponseObject { 98 + error: "InvalidRequest".to_string(), 99 + message: format!("Bad request, here's some info that maybe should not be exposed: {err}"), 100 + })) 101 + } 102 + 95 103 fn bad_request_handler_resolve_handle(err: poem::Error) -> JustDidResponse { 96 104 JustDidResponse::BadRequest(Json(XrpcErrorResponseObject { 97 105 error: "InvalidRequest".to_string(), ··· 190 198 BadRequest(XrpcError), 191 199 } 192 200 201 + #[derive(Object)] 202 + struct ProxyHydrationError { 203 + reason: String, 204 + } 205 + 206 + #[derive(Object)] 207 + struct ProxyHydrationPending { 208 + url: String, 209 + } 210 + 211 + #[derive(Object)] 212 + struct ProxyHydrationRecordFound { 213 + record: serde_json::Value, 214 + } 215 + 216 + #[derive(Object)] 217 + struct ProxyHydrationIdentifierFound { 218 + record: MiniDocResponseObject, 219 + } 220 + 221 + // todo: there's gotta be a supertrait that collects these? 222 + use poem_openapi::types::{Type, ToJSON, ParseFromJSON, IsObjectType}; 223 + 224 + #[derive(Union)] 225 + #[oai(discriminator_name = "status", rename_all = "camelCase")] 226 + enum Hydration<T: Send + Sync + Type + ToJSON + ParseFromJSON + IsObjectType> { 227 + Error(ProxyHydrationError), 228 + Pending(ProxyHydrationPending), 229 + Found(T), 230 + } 231 + 232 + #[derive(Object)] 233 + #[oai(example = true)] 234 + struct ProxyHydrateResponseObject { 235 + /// The original upstream response content 236 + output: serde_json::Value, 237 + /// Any hydrated records 238 + records: HashMap<String, Hydration<ProxyHydrationRecordFound>>, 239 + /// Any hydrated identifiers 240 + identifiers: HashMap<String, Hydration<ProxyHydrationIdentifierFound>>, 241 + } 242 + impl Example for ProxyHydrateResponseObject { 243 + fn example() -> Self { 244 + Self { 245 + output: serde_json::json!({}), 246 + records: HashMap::from([ 247 + ("asdf".into(), Hydration::Pending(ProxyHydrationPending { url: "todo".into() })), 248 + ]), 249 + identifiers: HashMap::new(), 250 + } 251 + } 252 + } 253 + 254 + #[derive(ApiResponse)] 255 + #[oai(bad_request_handler = "bad_request_handler_proxy_query")] 256 + enum ProxyHydrateResponse { 257 + #[oai(status = 200)] 258 + Ok(Json<ProxyHydrateResponseObject>), 259 + #[oai(status = 400)] 260 + BadRequest(XrpcError) 261 + } 262 + 263 + #[derive(Object)] 264 + pub struct HydrationSource { 265 + /// Record Path syntax for locating fields 266 + pub path: String, 267 + /// What to expect at the path: 'strong-ref', 'at-uri', 'at-uri-parts', 'did', 'handle', or 'at-identifier'. 268 + /// 269 + /// - `strong-ref`: object in the shape of `com.atproto.repo.strongRef` with `uri` and `cid` keys. 270 + /// - `at-uri`: string, must have all segments present (identifier, collection, rkey) 271 + /// - `at-uri-parts`: object with keys (`repo` or `did`), `collection`, `rkey`, and optional `cid`. Other keys may be present and will be ignored. 272 + /// - `did`: string, `did` format 273 + /// - `handle`: string, `handle` format 274 + /// - `at-identifier`: string, `did` or `handle` format 275 + pub shape: String, 276 + } 277 + 278 + #[derive(Object)] 279 + #[oai(example = true)] 280 + struct ProxyQueryPayload { 281 + /// The NSID of the XRPC you wish to forward 282 + xrpc: String, 283 + /// The destination service the request will be forwarded to 284 + atproto_proxy: String, 285 + /// The `params` for the destination service XRPC endpoint 286 + /// 287 + /// Currently this will be passed along unchecked, but a future version of 288 + /// slingshot may attempt to do lexicon resolution to validate `params` 289 + /// based on the upstream service 290 + params: Option<serde_json::Value>, 291 + /// Paths within the response to look for at-uris that can be hydrated 292 + hydration_sources: Vec<HydrationSource>, 293 + // todo: deadline thing 294 + 295 + } 296 + impl Example for ProxyQueryPayload { 297 + fn example() -> Self { 298 + Self { 299 + xrpc: "app.bsky.feed.getFeedSkeleton".to_string(), 300 + atproto_proxy: "did:web:blue.mackuba.eu#bsky_fg".to_string(), 301 + params: Some(serde_json::json!({ 302 + "feed": "at://did:plc:oio4hkxaop4ao4wz2pp3f4cr/app.bsky.feed.generator/atproto", 303 + })), 304 + hydration_sources: vec![ 305 + HydrationSource { 306 + path: "feed[].post".to_string(), 307 + shape: "at-uri".to_string(), 308 + } 309 + ], 310 + } 311 + } 312 + } 313 + 193 314 #[derive(Object)] 194 315 #[oai(example = true)] 195 316 struct FoundDidResponseObject { ··· 221 342 struct Xrpc { 222 343 cache: HybridCache<String, CachedRecord>, 223 344 identity: Identity, 345 + proxy: Arc<Proxy>, 224 346 repo: Arc<Repo>, 225 347 } 226 348 ··· 550 672 })) 551 673 } 552 674 675 + /// com.bad-example.proxy.hydrateQueryResponse 676 + /// 677 + /// > [!important] 678 + /// > Unstable! This endpoint is experimental and may change. 679 + /// 680 + /// Fetch + include records referenced from an upstream xrpc query response 681 + #[oai( 682 + path = "/com.bad-example.proxy.hydrateQueryResponse", 683 + method = "post", 684 + tag = "ApiTags::Custom" 685 + )] 686 + async fn proxy_hydrate_query( 687 + &self, 688 + Json(payload): Json<ProxyQueryPayload>, 689 + ) -> ProxyHydrateResponse { 690 + // TODO: the Accept request header, if present, gotta be json 691 + // TODO: find any Authorization header and verify it. TBD about `aud`. 692 + 693 + let params = if let Some(p) = payload.params { 694 + let serde_json::Value::Object(map) = p else { 695 + panic!("params have to be an object"); 696 + }; 697 + Some(map) 698 + } else { None }; 699 + 700 + match self.proxy.proxy( 701 + payload.xrpc, 702 + payload.atproto_proxy, 703 + params, 704 + ).await { 705 + Ok(skeleton) => { 706 + let links = match extract_links(payload.hydration_sources, &skeleton) { 707 + Ok(l) => l, 708 + Err(e) => { 709 + log::warn!("problem extracting: {e:?}"); 710 + return ProxyHydrateResponse::BadRequest(xrpc_error("oop", "sorry, error extracting")) 711 + } 712 + }; 713 + let mut records = HashMap::new(); 714 + let mut identifiers = HashMap::new(); 715 + 716 + enum GetThing { 717 + Record(String, Hydration<ProxyHydrationRecordFound>), 718 + Identifier(String, Hydration<ProxyHydrationIdentifierFound>), 719 + } 720 + 721 + let (tx, mut rx) = mpsc::channel(1); 722 + 723 + for link in links { 724 + match link { 725 + MatchedRef::AtUri { uri, cid } => { 726 + if records.contains_key(&uri) { 727 + log::warn!("skipping duplicate record without checking cid"); 728 + continue; 729 + } 730 + let mut u = url::Url::parse("https://example.com").unwrap(); 731 + u.query_pairs_mut().append_pair("at_uri", &uri); // BLEH todo 732 + records.insert(uri.clone(), Hydration::Pending(ProxyHydrationPending { 733 + url: format!("/xrpc/blue.microcosm.repo.getRecordByUri?{}", u.query().unwrap()), // TODO better; with cid, etc. 734 + })); 735 + let tx = tx.clone(); 736 + let identity = self.identity.clone(); 737 + let repo = self.repo.clone(); 738 + tokio::task::spawn(async move { 739 + let rest = uri.strip_prefix("at://").unwrap(); 740 + let (identifier, rest) = rest.split_once('/').unwrap(); 741 + let (collection, rkey) = rest.split_once('/').unwrap(); 742 + 743 + let did = if identifier.starts_with("did:") { 744 + Did::new(identifier.to_string()).unwrap() 745 + } else { 746 + let handle = Handle::new(identifier.to_string()).unwrap(); 747 + identity.handle_to_did(handle).await.unwrap().unwrap() 748 + }; 749 + 750 + let res = match repo.get_record( 751 + &did, 752 + &Nsid::new(collection.to_string()).unwrap(), 753 + &RecordKey::new(rkey.to_string()).unwrap(), 754 + &cid.as_ref().map(|s| Cid::from_str(s).unwrap()), 755 + ).await { 756 + Ok(CachedRecord::Deleted) => 757 + Hydration::Error(ProxyHydrationError { 758 + reason: "record deleted".to_string(), 759 + }), 760 + Ok(CachedRecord::Found(RawRecord { cid: found_cid, record })) => { 761 + if let Some(c) = cid && found_cid.as_ref().to_string() != c { 762 + log::warn!("ignoring cid mismatch"); 763 + } 764 + let value = serde_json::from_str(&record).unwrap(); 765 + Hydration::Found(ProxyHydrationRecordFound { 766 + record: value, 767 + }) 768 + } 769 + Err(e) => { 770 + log::warn!("finally oop {e:?}"); 771 + Hydration::Error(ProxyHydrationError { 772 + reason: "failed to fetch record".to_string(), 773 + }) 774 + } 775 + }; 776 + tx.send(GetThing::Record(uri, res)).await 777 + }); 778 + } 779 + MatchedRef::Identifier(id) => { 780 + if identifiers.contains_key(&id) { 781 + continue; 782 + } 783 + let mut u = url::Url::parse("https://example.com").unwrap(); 784 + u.query_pairs_mut().append_pair("identifier", &id); 785 + identifiers.insert(id, Hydration::Pending(ProxyHydrationPending { 786 + url: format!("/xrpc/blue.microcosm.identity.resolveMiniDoc?{}", u.query().unwrap()), // gross 787 + })); 788 + let tx = tx.clone(); 789 + // let doc_fut = self.resolve_mini_doc(); 790 + tokio::task::spawn(async { 791 + 792 + }); 793 + } 794 + } 795 + } 796 + // so the channel can close when all are completed 797 + // (we shoudl be doing a timeout...) 798 + drop(tx); 799 + 800 + while let Some(hydration) = rx.recv().await { 801 + match hydration { 802 + GetThing::Record(uri, h) => { records.insert(uri, h); } 803 + GetThing::Identifier(uri, md) => { identifiers.insert(uri, md); } 804 + }; 805 + } 806 + 807 + ProxyHydrateResponse::Ok(Json(ProxyHydrateResponseObject { 808 + output: skeleton, 809 + records, 810 + identifiers, 811 + })) 812 + } 813 + Err(e) => { 814 + log::warn!("oh no: {e:?}"); 815 + ProxyHydrateResponse::BadRequest(xrpc_error("oop", "sorry")) 816 + } 817 + } 818 + 819 + } 820 + 553 821 async fn get_record_impl( 554 822 &self, 555 823 repo: String, ··· 748 1016 cache: HybridCache<String, CachedRecord>, 749 1017 identity: Identity, 750 1018 repo: Repo, 1019 + proxy: Proxy, 751 1020 acme_domain: Option<String>, 752 1021 acme_contact: Option<String>, 753 1022 acme_cache_path: Option<PathBuf>, ··· 756 1025 bind: std::net::SocketAddr, 757 1026 ) -> Result<(), ServerError> { 758 1027 let repo = Arc::new(repo); 1028 + let proxy = Arc::new(proxy); 759 1029 let api_service = OpenApiService::new( 760 1030 Xrpc { 761 1031 cache, 762 1032 identity, 1033 + proxy, 763 1034 repo, 764 1035 }, 765 1036 "Slingshot", ··· 823 1094 .with( 824 1095 Cors::new() 825 1096 .allow_origin_regex("*") 826 - .allow_methods([Method::GET]) 1097 + .allow_methods([Method::GET, Method::POST]) 827 1098 .allow_credentials(false), 828 1099 ) 829 1100 .with(CatchPanic::new())

History

2 rounds 2 comments
sign up or login to add to the discussion
8 commits
expand
very hack proxy stuff (wip)
identities!!!!
service doc cache
proxy: use service cache
fmt
include headers, timeout hydration, measure times
base_url, typed url parts, api fixups,
only try to hydrate 100 records
no conflicts, ready to merge
expand 0 comments
bad-example.com submitted #0
1 commit
expand
very hack proxy stuff (wip)
expand 2 comments

the record contents from com.atproto.repo.getRecord are returned under a key called "value", so this should probably use that.