A better Rust ATProto crate

integrating optional websocket into agent struct

Orual 99e47dfb 96bc86e1

+198 -26
+13 -1
crates/jacquard-common/src/websocket.rs
··· 475 475 476 476 /// WebSocket client trait 477 477 #[cfg_attr(not(target_arch = "wasm32"), trait_variant::make(Send))] 478 - pub trait WebSocketClient { 478 + pub trait WebSocketClient: Sync { 479 479 /// Error type for WebSocket operations 480 480 type Error: std::error::Error + Send + Sync + 'static; 481 481 482 482 /// Connect to a WebSocket endpoint 483 483 fn connect(&self, url: Url) -> impl Future<Output = Result<WebSocketConnection, Self::Error>>; 484 + 485 + /// Connect to a WebSocket endpoint with custom headers 486 + /// 487 + /// Default implementation ignores headers and calls `connect()`. 488 + /// Override this method to support authentication headers for subscriptions. 489 + fn connect_with_headers( 490 + &self, 491 + url: Url, 492 + _headers: Vec<(CowStr<'_>, CowStr<'_>)>, 493 + ) -> impl Future<Output = Result<WebSocketConnection, Self::Error>> { 494 + async move { self.connect(url).await } 495 + } 484 496 } 485 497 486 498 /// WebSocket connection with bidirectional streams
+1
crates/jacquard-oauth/Cargo.toml
··· 49 49 loopback = ["dep:rouille"] 50 50 browser-open = ["dep:webbrowser"] 51 51 tracing = ["dep:tracing"] 52 + websocket = ["jacquard-common/websocket"]
+88 -8
crates/jacquard-oauth/src/client.rs
··· 19 19 build_http_request, process_response, 20 20 }, 21 21 }; 22 + 23 + #[cfg(feature = "websocket")] 24 + use jacquard_common::websocket::{WebSocketClient, WebSocketConnection}; 25 + #[cfg(feature = "websocket")] 26 + use jacquard_common::xrpc::XrpcSubscription; 22 27 use jacquard_identity::{ 23 28 JacquardResolver, 24 29 resolver::{DidDocResponse, IdentityError, IdentityResolver, ResolverOptions}, ··· 279 284 } 280 285 } 281 286 282 - pub struct OAuthSession<T, S> 287 + pub struct OAuthSession<T, S, W = ()> 283 288 where 284 289 T: OAuthResolver, 285 290 S: ClientAuthStore, 286 291 { 287 292 pub registry: Arc<SessionRegistry<T, S>>, 288 293 pub client: Arc<T>, 294 + pub ws_client: W, 289 295 pub data: RwLock<ClientSessionData<'static>>, 290 296 pub options: RwLock<CallOptions<'static>>, 291 297 } 292 298 293 - impl<T, S> OAuthSession<T, S> 299 + impl<T, S> OAuthSession<T, S, ()> 294 300 where 295 301 T: OAuthResolver, 296 302 S: ClientAuthStore, ··· 303 309 Self { 304 310 registry, 305 311 client, 312 + ws_client: (), 313 + data: RwLock::new(data), 314 + options: RwLock::new(CallOptions::default()), 315 + } 316 + } 317 + } 318 + 319 + impl<T, S, W> OAuthSession<T, S, W> 320 + where 321 + T: OAuthResolver, 322 + S: ClientAuthStore, 323 + { 324 + pub fn new_with_ws( 325 + registry: Arc<SessionRegistry<T, S>>, 326 + client: Arc<T>, 327 + ws_client: W, 328 + data: ClientSessionData<'static>, 329 + ) -> Self { 330 + Self { 331 + registry, 332 + client, 333 + ws_client, 306 334 data: RwLock::new(data), 307 335 options: RwLock::new(CallOptions::default()), 308 336 } ··· 312 340 Self { 313 341 registry: self.registry, 314 342 client: self.client, 343 + ws_client: self.ws_client, 315 344 data: self.data, 316 345 options: RwLock::new(options.into_static()), 317 346 } 347 + } 348 + 349 + /// Get a reference to the WebSocket client. 350 + pub fn ws_client(&self) -> &W { 351 + &self.ws_client 318 352 } 319 353 320 354 pub async fn set_options(&self, options: CallOptions<'_>) { ··· 344 378 .map(|t| AuthorizationToken::Dpop(t.clone())) 345 379 } 346 380 } 347 - impl<T, S> OAuthSession<T, S> 381 + impl<T, S, W> OAuthSession<T, S, W> 348 382 where 349 383 S: ClientAuthStore + Send + Sync + 'static, 350 384 T: OAuthResolver + DpopExt + Send + Sync + 'static, ··· 373 407 T: OAuthResolver, 374 408 S: ClientAuthStore, 375 409 { 376 - pub fn from_session(session: &OAuthSession<T, S>) -> Self { 410 + pub fn from_session<W>(session: &OAuthSession<T, S, W>) -> Self { 377 411 Self { 378 412 registry: session.registry.clone(), 379 413 client: session.client.clone(), 380 414 } 381 415 } 382 416 } 383 - impl<T, S> OAuthSession<T, S> 417 + impl<T, S, W> OAuthSession<T, S, W> 384 418 where 385 419 S: ClientAuthStore + Send + Sync + 'static, 386 420 T: OAuthResolver + DpopExt + Send + Sync + 'static, ··· 402 436 } 403 437 } 404 438 405 - impl<T, S> HttpClient for OAuthSession<T, S> 439 + #[cfg(feature = "websocket")] 440 + impl<T, S, W> OAuthSession<T, S, W> 441 + where 442 + S: ClientAuthStore, 443 + T: OAuthResolver, 444 + W: WebSocketClient, 445 + { 446 + /// Subscribe to an XRPC WebSocket subscription. 447 + /// 448 + /// Connects to the WebSocket endpoint and threads through DPoP authentication headers. 449 + pub async fn subscribe<Sub>(&self, params: &Sub) -> Result<WebSocketConnection, W::Error> 450 + where 451 + Sub: XrpcSubscription, 452 + { 453 + let base_uri = self.endpoint().await; 454 + 455 + // Build WebSocket URL 456 + let mut ws_url = base_uri.clone(); 457 + ws_url.set_scheme("wss").ok(); 458 + ws_url.set_path(&format!("/xrpc/{}", Sub::NSID)); 459 + 460 + // Add query params 461 + let query_params = params.query_params(); 462 + if !query_params.is_empty() { 463 + let query_string = serde_html_form::to_string(&query_params).unwrap_or_default(); 464 + ws_url.set_query(Some(&query_string)); 465 + } 466 + 467 + // Thread DPoP auth headers (even though tokio-tungstenite-wasm doesn't support them yet) 468 + let token = self.access_token().await; 469 + let auth_value = match token { 470 + AuthorizationToken::Bearer(t) => format!("Bearer {}", t.as_ref()), 471 + AuthorizationToken::Dpop(t) => format!("DPoP {}", t.as_ref()), 472 + }; 473 + let headers = vec![( 474 + CowStr::from("Authorization"), 475 + CowStr::from(auth_value), 476 + )]; 477 + 478 + self.ws_client.connect_with_headers(ws_url, headers).await 479 + } 480 + } 481 + 482 + impl<T, S, W> HttpClient for OAuthSession<T, S, W> 406 483 where 407 484 S: ClientAuthStore + Send + Sync + 'static, 408 485 T: OAuthResolver + DpopExt + Send + Sync + 'static, 486 + W: Send + Sync, 409 487 { 410 488 type Error = T::Error; 411 489 ··· 417 495 } 418 496 } 419 497 420 - impl<T, S> XrpcClient for OAuthSession<T, S> 498 + impl<T, S, W> XrpcClient for OAuthSession<T, S, W> 421 499 where 422 500 S: ClientAuthStore + Send + Sync + 'static, 423 501 T: OAuthResolver + DpopExt + XrpcExt + Send + Sync + 'static, 502 + W: Send + Sync, 424 503 { 425 504 fn base_uri(&self) -> Url { 426 505 // base_uri is a synchronous trait method; we must avoid async `.read().await`. ··· 502 581 } 503 582 } 504 583 505 - impl<T, S> IdentityResolver for OAuthSession<T, S> 584 + impl<T, S, W> IdentityResolver for OAuthSession<T, S, W> 506 585 where 507 586 S: ClientAuthStore + Send + Sync + 'static, 508 587 T: OAuthResolver + IdentityResolver + XrpcExt + Send + Sync + 'static, 588 + W: Send + Sync, 509 589 { 510 590 fn options(&self) -> &ResolverOptions { 511 591 self.client.options()
+9 -8
crates/jacquard/src/client.rs
··· 49 49 use jacquard_oauth::client::OAuthSession; 50 50 use jacquard_oauth::dpop::DpopExt; 51 51 use jacquard_oauth::resolver::OAuthResolver; 52 - use std::marker::PhantomData; 53 52 54 53 use serde::Serialize; 55 54 pub use token::FileAuthStore; ··· 210 209 fn refresh(&self) -> impl Future<Output = Result<AuthorizationToken<'static>, ClientError>>; 211 210 } 212 211 213 - impl<S, T> AgentSession for CredentialSession<S, T> 212 + impl<S, T, W> AgentSession for CredentialSession<S, T, W> 214 213 where 215 214 S: SessionStore<SessionKey, AtpSession> + Send + Sync + 'static, 216 215 T: IdentityResolver + HttpClient + XrpcExt + Send + Sync + 'static, 216 + W: Send + Sync, 217 217 { 218 218 fn session_kind(&self) -> AgentKind { 219 219 AgentKind::AppPassword ··· 227 227 )>, 228 228 > { 229 229 async move { 230 - CredentialSession::<S, T>::session_info(self) 230 + CredentialSession::<S, T, W>::session_info(self) 231 231 .await 232 232 .map(|(did, sid)| (did, Some(sid))) 233 233 } 234 234 } 235 235 fn endpoint(&self) -> impl Future<Output = url::Url> { 236 - async move { CredentialSession::<S, T>::endpoint(self).await } 236 + async move { CredentialSession::<S, T, W>::endpoint(self).await } 237 237 } 238 238 fn set_options<'a>(&'a self, opts: CallOptions<'a>) -> impl Future<Output = ()> { 239 - async move { CredentialSession::<S, T>::set_options(self, opts).await } 239 + async move { CredentialSession::<S, T, W>::set_options(self, opts).await } 240 240 } 241 241 fn refresh(&self) -> impl Future<Output = Result<AuthorizationToken<'static>, ClientError>> { 242 242 async move { 243 - Ok(CredentialSession::<S, T>::refresh(self) 243 + Ok(CredentialSession::<S, T, W>::refresh(self) 244 244 .await? 245 245 .into_static()) 246 246 } 247 247 } 248 248 } 249 249 250 - impl<T, S> AgentSession for OAuthSession<T, S> 250 + impl<T, S, W> AgentSession for OAuthSession<T, S, W> 251 251 where 252 252 S: ClientAuthStore + Send + Sync + 'static, 253 253 T: OAuthResolver + DpopExt + XrpcExt + Send + Sync + 'static, 254 + W: Send + Sync, 254 255 { 255 256 fn session_kind(&self) -> AgentKind { 256 257 AgentKind::OAuth ··· 264 265 )>, 265 266 > { 266 267 async { 267 - let (did, sid) = OAuthSession::<T, S>::session_info(self).await; 268 + let (did, sid) = OAuthSession::<T, S, W>::session_info(self).await; 268 269 Some((did.into_static(), Some(sid.into_static()))) 269 270 } 270 271 }
+87 -9
crates/jacquard/src/client/credential_session.rs
··· 22 22 }; 23 23 use std::any::Any; 24 24 25 + #[cfg(feature = "websocket")] 26 + use jacquard_common::websocket::{WebSocketClient, WebSocketConnection}; 27 + #[cfg(feature = "websocket")] 28 + use jacquard_common::xrpc::XrpcSubscription; 29 + 25 30 /// Storage key for app‑password sessions: `(account DID, session id)`. 26 31 pub type SessionKey = (Did<'static>, CowStr<'static>); 27 32 ··· 30 35 /// - Persists sessions via a pluggable `SessionStore`. 31 36 /// - Automatically refreshes on token expiry. 32 37 /// - Tracks a base endpoint, defaulting to the public appview until login/restore. 33 - pub struct CredentialSession<S, T> 38 + /// - Optional WebSocket client for subscription support. 39 + pub struct CredentialSession<S, T, W = ()> 34 40 where 35 41 S: SessionStore<SessionKey, AtpSession>, 36 42 { 37 43 store: Arc<S>, 38 44 client: Arc<T>, 45 + ws_client: W, 39 46 /// Default call options applied to each request (auth/headers/labelers). 40 47 pub options: RwLock<CallOptions<'static>>, 41 48 /// Active session key, if any. ··· 44 51 pub endpoint: RwLock<Option<Url>>, 45 52 } 46 53 47 - impl<S, T> CredentialSession<S, T> 54 + impl<S, T> CredentialSession<S, T, ()> 48 55 where 49 56 S: SessionStore<SessionKey, AtpSession>, 50 57 { 51 - /// Create a new credential session using the given store and client. 58 + /// Create a new credential session using the given store and client (no WebSocket support). 52 59 pub fn new(store: Arc<S>, client: Arc<T>) -> Self { 53 60 Self { 54 61 store, 55 62 client, 63 + ws_client: (), 56 64 options: RwLock::new(CallOptions::default()), 57 65 key: RwLock::new(None), 58 66 endpoint: RwLock::new(None), ··· 60 68 } 61 69 } 62 70 63 - impl<S, T> CredentialSession<S, T> 71 + impl<S, T, W> CredentialSession<S, T, W> 64 72 where 65 73 S: SessionStore<SessionKey, AtpSession>, 66 74 { 75 + /// Create a new credential session with WebSocket client support. 76 + pub fn new_with_ws(store: Arc<S>, client: Arc<T>, ws_client: W) -> Self { 77 + Self { 78 + store, 79 + client, 80 + ws_client, 81 + options: RwLock::new(CallOptions::default()), 82 + key: RwLock::new(None), 83 + endpoint: RwLock::new(None), 84 + } 85 + } 86 + 87 + /// Get a reference to the WebSocket client. 88 + pub fn ws_client(&self) -> &W { 89 + &self.ws_client 90 + } 91 + 67 92 /// Return a copy configured with the provided default call options. 68 93 pub fn with_options(self, options: CallOptions<'_>) -> Self { 69 94 Self { 70 95 client: self.client, 71 96 store: self.store, 97 + ws_client: self.ws_client, 72 98 options: RwLock::new(options.into_static()), 73 99 key: self.key, 74 100 endpoint: self.endpoint, ··· 112 138 } 113 139 } 114 140 115 - impl<S, T> CredentialSession<S, T> 141 + impl<S, T, W> CredentialSession<S, T, W> 116 142 where 117 143 S: SessionStore<SessionKey, AtpSession>, 118 144 T: HttpClient, ··· 150 176 } 151 177 } 152 178 153 - impl<S, T> CredentialSession<S, T> 179 + impl<S, T, W> CredentialSession<S, T, W> 154 180 where 155 181 S: SessionStore<SessionKey, AtpSession>, 156 182 T: HttpClient + IdentityResolver + XrpcExt + Sync + Send, ··· 385 411 } 386 412 } 387 413 388 - impl<S, T> HttpClient for CredentialSession<S, T> 414 + #[cfg(feature = "websocket")] 415 + impl<S, T, W> CredentialSession<S, T, W> 416 + where 417 + S: SessionStore<SessionKey, AtpSession>, 418 + W: WebSocketClient, 419 + { 420 + /// Subscribe to an XRPC WebSocket subscription. 421 + /// 422 + /// Connects to the WebSocket endpoint and threads through authentication headers. 423 + pub async fn subscribe<Sub>( 424 + &self, 425 + params: &Sub, 426 + ) -> Result<WebSocketConnection, W::Error> 427 + where 428 + Sub: XrpcSubscription, 429 + { 430 + let base_uri = self.endpoint().await; 431 + 432 + // Build WebSocket URL 433 + let mut ws_url = base_uri.clone(); 434 + ws_url.set_scheme("wss").ok(); 435 + ws_url.set_path(&format!("/xrpc/{}", Sub::NSID)); 436 + 437 + // Add query params 438 + let query_params = params.query_params(); 439 + if !query_params.is_empty() { 440 + let query_string = serde_html_form::to_string(&query_params) 441 + .unwrap_or_default(); 442 + ws_url.set_query(Some(&query_string)); 443 + } 444 + 445 + // Thread auth headers (even though tokio-tungstenite-wasm doesn't support them yet) 446 + let headers = if let Some(token) = self.access_token().await { 447 + let auth_value = match token { 448 + AuthorizationToken::Bearer(t) => format!("Bearer {}", t.as_ref()), 449 + AuthorizationToken::Dpop(t) => format!("DPoP {}", t.as_ref()), 450 + }; 451 + vec![( 452 + CowStr::from("Authorization"), 453 + CowStr::from(auth_value), 454 + )] 455 + } else { 456 + vec![] 457 + }; 458 + 459 + self.ws_client.connect_with_headers(ws_url, headers).await 460 + } 461 + } 462 + 463 + impl<S, T, W> HttpClient for CredentialSession<S, T, W> 389 464 where 390 465 S: SessionStore<SessionKey, AtpSession> + Send + Sync + 'static, 391 466 T: HttpClient + XrpcExt + Send + Sync + 'static, 467 + W: Send + Sync, 392 468 { 393 469 type Error = T::Error; 394 470 ··· 400 476 } 401 477 } 402 478 403 - impl<S, T> XrpcClient for CredentialSession<S, T> 479 + impl<S, T, W> XrpcClient for CredentialSession<S, T, W> 404 480 where 405 481 S: SessionStore<SessionKey, AtpSession> + Send + Sync + 'static, 406 482 T: HttpClient + XrpcExt + Send + Sync + 'static, 483 + W: Send + Sync, 407 484 { 408 485 fn base_uri(&self) -> Url { 409 486 // base_uri is a synchronous trait method; avoid `.await` here. ··· 484 561 } 485 562 } 486 563 487 - impl<S, T> IdentityResolver for CredentialSession<S, T> 564 + impl<S, T, W> IdentityResolver for CredentialSession<S, T, W> 488 565 where 489 566 S: SessionStore<SessionKey, AtpSession> + Send + Sync + 'static, 490 567 T: HttpClient + IdentityResolver + Send + Sync + 'static, 568 + W: Send + Sync, 491 569 { 492 570 fn options(&self) -> &ResolverOptions { 493 571 self.client.options()