A better Rust ATProto crate

moving toward a matched architecture with the http side

Orual 3d82b460 f91b11b4

+479 -94
+5 -1
crates/jacquard-common/src/xrpc.rs
··· 20 20 pub mod subscription; 21 21 22 22 #[cfg(feature = "websocket")] 23 - pub use subscription::{MessageEncoding, SubscriptionEndpoint, SubscriptionResp, XrpcSubscription}; 23 + pub use subscription::{ 24 + BasicSubscriptionClient, MessageEncoding, SubscriptionCall, SubscriptionClient, 25 + SubscriptionEndpoint, SubscriptionExt, SubscriptionOptions, SubscriptionResp, 26 + TungsteniteSubscriptionClient, XrpcSubscription, 27 + }; 24 28 25 29 use bytes::Bytes; 26 30 use http::{
+299 -1
crates/jacquard-common/src/xrpc/subscription.rs
··· 5 5 6 6 use serde::{Deserialize, Serialize}; 7 7 use std::error::Error; 8 + use std::future::Future; 9 + use url::Url; 8 10 9 - use crate::IntoStatic; 11 + use crate::websocket::{WebSocketClient, WebSocketConnection}; 12 + use crate::{CowStr, IntoStatic}; 10 13 11 14 /// Encoding format for subscription messages 12 15 #[derive(Debug, Clone, Copy, PartialEq, Eq)] ··· 93 96 /// Stream response type 94 97 type Stream: SubscriptionResp; 95 98 } 99 + 100 + /// Per-subscription options for WebSocket subscriptions. 101 + #[derive(Debug, Default, Clone)] 102 + pub struct SubscriptionOptions<'a> { 103 + /// Extra headers to attach to this subscription (e.g., Authorization). 104 + pub headers: Vec<(CowStr<'a>, CowStr<'a>)>, 105 + } 106 + 107 + impl IntoStatic for SubscriptionOptions<'_> { 108 + type Output = SubscriptionOptions<'static>; 109 + 110 + fn into_static(self) -> Self::Output { 111 + SubscriptionOptions { 112 + headers: self 113 + .headers 114 + .into_iter() 115 + .map(|(k, v)| (k.into_static(), v.into_static())) 116 + .collect(), 117 + } 118 + } 119 + } 120 + 121 + /// Extension for stateless subscription calls on any `WebSocketClient`. 122 + /// 123 + /// Provides a builder pattern for establishing WebSocket subscriptions with custom options. 124 + pub trait SubscriptionExt: WebSocketClient { 125 + /// Start building a subscription call for the given base URL. 126 + fn subscription<'a>(&'a self, base: Url) -> SubscriptionCall<'a, Self> 127 + where 128 + Self: Sized, 129 + { 130 + SubscriptionCall { 131 + client: self, 132 + base, 133 + opts: SubscriptionOptions::default(), 134 + } 135 + } 136 + } 137 + 138 + impl<T: WebSocketClient> SubscriptionExt for T {} 139 + 140 + /// Stateless subscription call builder. 141 + /// 142 + /// Provides methods for adding headers and establishing typed subscriptions. 143 + pub struct SubscriptionCall<'a, C: WebSocketClient> { 144 + pub(crate) client: &'a C, 145 + pub(crate) base: Url, 146 + pub(crate) opts: SubscriptionOptions<'a>, 147 + } 148 + 149 + impl<'a, C: WebSocketClient> SubscriptionCall<'a, C> { 150 + /// Add an extra header. 151 + pub fn header(mut self, name: impl Into<CowStr<'a>>, value: impl Into<CowStr<'a>>) -> Self { 152 + self.opts.headers.push((name.into(), value.into())); 153 + self 154 + } 155 + 156 + /// Replace the builder's options entirely. 157 + pub fn with_options(mut self, opts: SubscriptionOptions<'a>) -> Self { 158 + self.opts = opts; 159 + self 160 + } 161 + 162 + /// Subscribe to the given XRPC subscription endpoint. 163 + /// 164 + /// Builds a WebSocket URL from the base, appends the NSID path, 165 + /// encodes query parameters from the subscription type, and connects. 166 + pub async fn subscribe<Sub>(self, params: &Sub) -> Result<WebSocketConnection, C::Error> 167 + where 168 + Sub: XrpcSubscription, 169 + { 170 + let mut url = self.base.clone(); 171 + let mut path = url.path().trim_end_matches('/').to_owned(); 172 + path.push_str("/xrpc/"); 173 + path.push_str(Sub::NSID); 174 + url.set_path(&path); 175 + 176 + let query_params = params.query_params(); 177 + if !query_params.is_empty() { 178 + let qs = query_params 179 + .iter() 180 + .map(|(k, v)| format!("{}={}", k, v)) 181 + .collect::<Vec<_>>() 182 + .join("&"); 183 + url.set_query(Some(&qs)); 184 + } else { 185 + url.set_query(None); 186 + } 187 + 188 + self.client 189 + .connect_with_headers(url, self.opts.headers) 190 + .await 191 + } 192 + } 193 + 194 + /// Stateful subscription client trait. 195 + /// 196 + /// Analogous to `XrpcClient` but for WebSocket subscriptions. 197 + /// Provides a stateful interface for subscribing with configured base URI and options. 198 + #[cfg_attr(not(target_arch = "wasm32"), trait_variant::make(Send))] 199 + pub trait SubscriptionClient: WebSocketClient { 200 + /// Get the base URI for the client. 201 + fn base_uri(&self) -> Url; 202 + 203 + /// Get the subscription options for the client. 204 + fn subscription_opts(&self) -> impl Future<Output = SubscriptionOptions<'_>> { 205 + async { SubscriptionOptions::default() } 206 + } 207 + 208 + /// Subscribe to an XRPC subscription endpoint using the client's base URI and options. 209 + #[cfg(not(target_arch = "wasm32"))] 210 + fn subscribe<Sub>( 211 + &self, 212 + params: &Sub, 213 + ) -> impl Future<Output = Result<WebSocketConnection, Self::Error>> 214 + where 215 + Sub: XrpcSubscription + Send + Sync, 216 + Self: Sync; 217 + 218 + /// Subscribe to an XRPC subscription endpoint using the client's base URI and options. 219 + #[cfg(target_arch = "wasm32")] 220 + fn subscribe<Sub>( 221 + &self, 222 + params: &Sub, 223 + ) -> impl Future<Output = Result<WebSocketConnection, Self::Error>> 224 + where 225 + Sub: XrpcSubscription + Send + Sync; 226 + 227 + /// Subscribe with custom options. 228 + #[cfg(not(target_arch = "wasm32"))] 229 + fn subscribe_with_opts<Sub>( 230 + &self, 231 + params: &Sub, 232 + opts: SubscriptionOptions<'_>, 233 + ) -> impl Future<Output = Result<WebSocketConnection, Self::Error>> 234 + where 235 + Sub: XrpcSubscription + Send + Sync, 236 + Self: Sync; 237 + 238 + /// Subscribe with custom options. 239 + #[cfg(target_arch = "wasm32")] 240 + fn subscribe_with_opts<Sub>( 241 + &self, 242 + params: &Sub, 243 + opts: SubscriptionOptions<'_>, 244 + ) -> impl Future<Output = Result<WebSocketConnection, Self::Error>> 245 + where 246 + Sub: XrpcSubscription + Send + Sync; 247 + } 248 + 249 + /// Simple stateless subscription client wrapping a WebSocketClient. 250 + /// 251 + /// Analogous to a basic HTTP client but for WebSocket subscriptions. 252 + /// Does not manage sessions or authentication - useful for public subscriptions 253 + /// or when you want to handle auth manually via headers. 254 + pub struct BasicSubscriptionClient<W: WebSocketClient> { 255 + client: W, 256 + base_uri: Url, 257 + opts: SubscriptionOptions<'static>, 258 + } 259 + 260 + impl<W: WebSocketClient> BasicSubscriptionClient<W> { 261 + /// Create a new basic subscription client with the given WebSocket client and base URI. 262 + pub fn new(client: W, base_uri: Url) -> Self { 263 + Self { 264 + client, 265 + base_uri, 266 + opts: SubscriptionOptions::default(), 267 + } 268 + } 269 + 270 + /// Create with default options. 271 + pub fn with_options(mut self, opts: SubscriptionOptions<'_>) -> Self { 272 + self.opts = opts.into_static(); 273 + self 274 + } 275 + 276 + /// Get a reference to the inner WebSocket client. 277 + pub fn inner(&self) -> &W { 278 + &self.client 279 + } 280 + } 281 + 282 + impl<W: WebSocketClient> WebSocketClient for BasicSubscriptionClient<W> { 283 + type Error = W::Error; 284 + 285 + async fn connect(&self, url: Url) -> Result<WebSocketConnection, Self::Error> { 286 + self.client.connect(url).await 287 + } 288 + 289 + async fn connect_with_headers( 290 + &self, 291 + url: Url, 292 + headers: Vec<(CowStr<'_>, CowStr<'_>)>, 293 + ) -> Result<WebSocketConnection, Self::Error> { 294 + self.client.connect_with_headers(url, headers).await 295 + } 296 + } 297 + 298 + impl<W: WebSocketClient> SubscriptionClient for BasicSubscriptionClient<W> { 299 + fn base_uri(&self) -> Url { 300 + self.base_uri.clone() 301 + } 302 + 303 + async fn subscription_opts(&self) -> SubscriptionOptions<'_> { 304 + self.opts.clone() 305 + } 306 + 307 + #[cfg(not(target_arch = "wasm32"))] 308 + async fn subscribe<Sub>( 309 + &self, 310 + params: &Sub, 311 + ) -> Result<WebSocketConnection, Self::Error> 312 + where 313 + Sub: XrpcSubscription + Send + Sync, 314 + Self: Sync, 315 + { 316 + let opts = self.subscription_opts().await; 317 + self.subscribe_with_opts(params, opts).await 318 + } 319 + 320 + #[cfg(target_arch = "wasm32")] 321 + async fn subscribe<Sub>( 322 + &self, 323 + params: &Sub, 324 + ) -> Result<WebSocketConnection, Self::Error> 325 + where 326 + Sub: XrpcSubscription + Send + Sync, 327 + { 328 + let opts = self.subscription_opts().await; 329 + self.subscribe_with_opts(params, opts).await 330 + } 331 + 332 + #[cfg(not(target_arch = "wasm32"))] 333 + async fn subscribe_with_opts<Sub>( 334 + &self, 335 + params: &Sub, 336 + opts: SubscriptionOptions<'_>, 337 + ) -> Result<WebSocketConnection, Self::Error> 338 + where 339 + Sub: XrpcSubscription + Send + Sync, 340 + Self: Sync, 341 + { 342 + let base = self.base_uri(); 343 + self.subscription(base) 344 + .with_options(opts) 345 + .subscribe(params) 346 + .await 347 + } 348 + 349 + #[cfg(target_arch = "wasm32")] 350 + async fn subscribe_with_opts<Sub>( 351 + &self, 352 + params: &Sub, 353 + opts: SubscriptionOptions<'_>, 354 + ) -> Result<WebSocketConnection, Self::Error> 355 + where 356 + Sub: XrpcSubscription + Send + Sync, 357 + { 358 + let base = self.base_uri(); 359 + self.subscription(base) 360 + .with_options(opts) 361 + .subscribe(params) 362 + .await 363 + } 364 + } 365 + 366 + /// Type alias for a basic subscription client using the default TungsteniteClient. 367 + /// 368 + /// Provides a simple, stateless WebSocket subscription client without session management. 369 + /// Useful for public subscriptions or when handling authentication manually. 370 + /// 371 + /// # Example 372 + /// 373 + /// ```no_run 374 + /// # use jacquard_common::xrpc::{TungsteniteSubscriptionClient, SubscriptionClient}; 375 + /// # use url::Url; 376 + /// # #[tokio::main] 377 + /// # async fn main() -> Result<(), Box<dyn std::error::Error>> { 378 + /// let base = Url::parse("wss://bsky.network")?; 379 + /// let client = TungsteniteSubscriptionClient::from_base_uri(base); 380 + /// // let conn = client.subscribe(&params).await?; 381 + /// # Ok(()) 382 + /// # } 383 + /// ``` 384 + pub type TungsteniteSubscriptionClient = 385 + BasicSubscriptionClient<crate::websocket::tungstenite_client::TungsteniteClient>; 386 + 387 + impl TungsteniteSubscriptionClient { 388 + /// Create a new Tungstenite-backed subscription client with the given base URI. 389 + pub fn from_base_uri(base_uri: Url) -> Self { 390 + let client = crate::websocket::tungstenite_client::TungsteniteClient::new(); 391 + BasicSubscriptionClient::new(client, base_uri) 392 + } 393 + }
+80 -43
crates/jacquard-oauth/src/client.rs
··· 436 436 } 437 437 } 438 438 439 - #[cfg(feature = "websocket")] 440 - impl<T, S, W> OAuthSession<T, S, W> 441 - where 442 - S: ClientAuthStore, 443 - T: OAuthResolver, 444 - W: WebSocketClient, 445 - { 446 - /// Subscribe to an XRPC WebSocket subscription. 447 - /// 448 - /// Connects to the WebSocket endpoint and threads through DPoP authentication headers. 449 - pub async fn subscribe<Sub>(&self, params: &Sub) -> Result<WebSocketConnection, W::Error> 450 - where 451 - Sub: XrpcSubscription, 452 - { 453 - let base_uri = self.endpoint().await; 454 - 455 - // Build WebSocket URL 456 - let mut ws_url = base_uri.clone(); 457 - ws_url.set_scheme("wss").ok(); 458 - ws_url.set_path(&format!("/xrpc/{}", Sub::NSID)); 459 - 460 - // Add query params 461 - let query_params = params.query_params(); 462 - if !query_params.is_empty() { 463 - let query_string = serde_html_form::to_string(&query_params).unwrap_or_default(); 464 - ws_url.set_query(Some(&query_string)); 465 - } 466 - 467 - // Thread DPoP auth headers (even though tokio-tungstenite-wasm doesn't support them yet) 468 - let token = self.access_token().await; 469 - let auth_value = match token { 470 - AuthorizationToken::Bearer(t) => format!("Bearer {}", t.as_ref()), 471 - AuthorizationToken::Dpop(t) => format!("DPoP {}", t.as_ref()), 472 - }; 473 - let headers = vec![( 474 - CowStr::from("Authorization"), 475 - CowStr::from(auth_value), 476 - )]; 477 - 478 - self.ws_client.connect_with_headers(ws_url, headers).await 479 - } 480 - } 481 - 482 439 impl<T, S, W> HttpClient for OAuthSession<T, S, W> 483 440 where 484 441 S: ClientAuthStore + Send + Sync + 'static, ··· 605 562 async { self.client.resolve_did_doc(did).await } 606 563 } 607 564 } 565 + 566 + #[cfg(feature = "websocket")] 567 + impl<T, S, W> WebSocketClient for OAuthSession<T, S, W> 568 + where 569 + S: ClientAuthStore + Send + Sync + 'static, 570 + T: OAuthResolver + Send + Sync + 'static, 571 + W: WebSocketClient + Send + Sync, 572 + { 573 + type Error = W::Error; 574 + 575 + async fn connect(&self, url: Url) -> std::result::Result<WebSocketConnection, Self::Error> { 576 + self.ws_client.connect(url).await 577 + } 578 + 579 + async fn connect_with_headers( 580 + &self, 581 + url: Url, 582 + headers: Vec<(CowStr<'_>, CowStr<'_>)>, 583 + ) -> std::result::Result<WebSocketConnection, Self::Error> { 584 + self.ws_client.connect_with_headers(url, headers).await 585 + } 586 + } 587 + 588 + #[cfg(feature = "websocket")] 589 + impl<T, S, W> jacquard_common::xrpc::SubscriptionClient for OAuthSession<T, S, W> 590 + where 591 + S: ClientAuthStore + Send + Sync + 'static, 592 + T: OAuthResolver + Send + Sync + 'static, 593 + W: WebSocketClient + Send + Sync, 594 + { 595 + fn base_uri(&self) -> Url { 596 + #[cfg(not(target_arch = "wasm32"))] 597 + if tokio::runtime::Handle::try_current().is_ok() { 598 + return tokio::task::block_in_place(|| self.data.blocking_read().host_url.clone()); 599 + } 600 + 601 + self.data.blocking_read().host_url.clone() 602 + } 603 + 604 + async fn subscription_opts(&self) -> jacquard_common::xrpc::SubscriptionOptions<'_> { 605 + let mut opts = jacquard_common::xrpc::SubscriptionOptions::default(); 606 + let token = self.access_token().await; 607 + let auth_value = match token { 608 + AuthorizationToken::Bearer(t) => format!("Bearer {}", t.as_ref()), 609 + AuthorizationToken::Dpop(t) => format!("DPoP {}", t.as_ref()), 610 + }; 611 + opts.headers.push(( 612 + CowStr::from("Authorization"), 613 + CowStr::from(auth_value), 614 + )); 615 + opts 616 + } 617 + 618 + async fn subscribe<Sub>( 619 + &self, 620 + params: &Sub, 621 + ) -> std::result::Result<WebSocketConnection, Self::Error> 622 + where 623 + Sub: XrpcSubscription + Send + Sync, 624 + { 625 + let opts = self.subscription_opts().await; 626 + self.subscribe_with_opts(params, opts).await 627 + } 628 + 629 + async fn subscribe_with_opts<Sub>( 630 + &self, 631 + params: &Sub, 632 + opts: jacquard_common::xrpc::SubscriptionOptions<'_>, 633 + ) -> std::result::Result<WebSocketConnection, Self::Error> 634 + where 635 + Sub: XrpcSubscription + Send + Sync, 636 + { 637 + use jacquard_common::xrpc::SubscriptionExt; 638 + let base = self.base_uri(); 639 + self.subscription(base) 640 + .with_options(opts) 641 + .subscribe(params) 642 + .await 643 + } 644 + }
+95 -49
crates/jacquard/src/client/credential_session.rs
··· 411 411 } 412 412 } 413 413 414 - #[cfg(feature = "websocket")] 415 - impl<S, T, W> CredentialSession<S, T, W> 416 - where 417 - S: SessionStore<SessionKey, AtpSession>, 418 - W: WebSocketClient, 419 - { 420 - /// Subscribe to an XRPC WebSocket subscription. 421 - /// 422 - /// Connects to the WebSocket endpoint and threads through authentication headers. 423 - pub async fn subscribe<Sub>( 424 - &self, 425 - params: &Sub, 426 - ) -> Result<WebSocketConnection, W::Error> 427 - where 428 - Sub: XrpcSubscription, 429 - { 430 - let base_uri = self.endpoint().await; 431 - 432 - // Build WebSocket URL 433 - let mut ws_url = base_uri.clone(); 434 - ws_url.set_scheme("wss").ok(); 435 - ws_url.set_path(&format!("/xrpc/{}", Sub::NSID)); 436 - 437 - // Add query params 438 - let query_params = params.query_params(); 439 - if !query_params.is_empty() { 440 - let query_string = serde_html_form::to_string(&query_params) 441 - .unwrap_or_default(); 442 - ws_url.set_query(Some(&query_string)); 443 - } 444 - 445 - // Thread auth headers (even though tokio-tungstenite-wasm doesn't support them yet) 446 - let headers = if let Some(token) = self.access_token().await { 447 - let auth_value = match token { 448 - AuthorizationToken::Bearer(t) => format!("Bearer {}", t.as_ref()), 449 - AuthorizationToken::Dpop(t) => format!("DPoP {}", t.as_ref()), 450 - }; 451 - vec![( 452 - CowStr::from("Authorization"), 453 - CowStr::from(auth_value), 454 - )] 455 - } else { 456 - vec![] 457 - }; 458 - 459 - self.ws_client.connect_with_headers(ws_url, headers).await 460 - } 461 - } 462 - 463 414 impl<S, T, W> HttpClient for CredentialSession<S, T, W> 464 415 where 465 416 S: SessionStore<SessionKey, AtpSession> + Send + Sync + 'static, ··· 585 536 async { self.client.resolve_did_doc(did).await } 586 537 } 587 538 } 539 + 540 + #[cfg(feature = "websocket")] 541 + impl<S, T, W> WebSocketClient for CredentialSession<S, T, W> 542 + where 543 + S: SessionStore<SessionKey, AtpSession> + Send + Sync + 'static, 544 + T: Send + Sync + 'static, 545 + W: WebSocketClient + Send + Sync, 546 + { 547 + type Error = W::Error; 548 + 549 + async fn connect(&self, url: Url) -> Result<WebSocketConnection, Self::Error> { 550 + self.ws_client.connect(url).await 551 + } 552 + 553 + async fn connect_with_headers( 554 + &self, 555 + url: Url, 556 + headers: Vec<(CowStr<'_>, CowStr<'_>)>, 557 + ) -> Result<WebSocketConnection, Self::Error> { 558 + self.ws_client.connect_with_headers(url, headers).await 559 + } 560 + } 561 + 562 + #[cfg(feature = "websocket")] 563 + impl<S, T, W> jacquard_common::xrpc::SubscriptionClient for CredentialSession<S, T, W> 564 + where 565 + S: SessionStore<SessionKey, AtpSession> + Send + Sync + 'static, 566 + T: Send + Sync + 'static, 567 + W: WebSocketClient + Send + Sync, 568 + { 569 + fn base_uri(&self) -> Url { 570 + #[cfg(not(target_arch = "wasm32"))] 571 + if tokio::runtime::Handle::try_current().is_ok() { 572 + tokio::task::block_in_place(|| { 573 + self.endpoint.blocking_read().clone().unwrap_or( 574 + Url::parse("https://public.bsky.app") 575 + .expect("public appview should be valid url"), 576 + ) 577 + }) 578 + } else { 579 + self.endpoint.blocking_read().clone().unwrap_or( 580 + Url::parse("https://public.bsky.app").expect("public appview should be valid url"), 581 + ) 582 + } 583 + 584 + #[cfg(target_arch = "wasm32")] 585 + { 586 + self.endpoint.blocking_read().clone().unwrap_or( 587 + Url::parse("https://public.bsky.app").expect("public appview should be valid url"), 588 + ) 589 + } 590 + } 591 + 592 + async fn subscription_opts(&self) -> jacquard_common::xrpc::SubscriptionOptions<'_> { 593 + let mut opts = jacquard_common::xrpc::SubscriptionOptions::default(); 594 + if let Some(token) = self.access_token().await { 595 + let auth_value = match token { 596 + AuthorizationToken::Bearer(t) => format!("Bearer {}", t.as_ref()), 597 + AuthorizationToken::Dpop(t) => format!("DPoP {}", t.as_ref()), 598 + }; 599 + opts.headers.push(( 600 + CowStr::from("Authorization"), 601 + CowStr::from(auth_value), 602 + )); 603 + } 604 + opts 605 + } 606 + 607 + async fn subscribe<Sub>( 608 + &self, 609 + params: &Sub, 610 + ) -> Result<WebSocketConnection, Self::Error> 611 + where 612 + Sub: XrpcSubscription + Send + Sync, 613 + { 614 + let opts = self.subscription_opts().await; 615 + self.subscribe_with_opts(params, opts).await 616 + } 617 + 618 + async fn subscribe_with_opts<Sub>( 619 + &self, 620 + params: &Sub, 621 + opts: jacquard_common::xrpc::SubscriptionOptions<'_>, 622 + ) -> Result<WebSocketConnection, Self::Error> 623 + where 624 + Sub: XrpcSubscription + Send + Sync, 625 + { 626 + use jacquard_common::xrpc::SubscriptionExt; 627 + let base = self.base_uri(); 628 + self.subscription(base) 629 + .with_options(opts) 630 + .subscribe(params) 631 + .await 632 + } 633 + }