···49pub type BoxError = Box<dyn Error + Send + Sync + 'static>;
5051/// Error type for streaming operations
52+#[derive(Debug, thiserror::Error, miette::Diagnostic)]
53pub struct StreamError {
54 kind: StreamErrorKind,
55+ #[source]
56 source: Option<BoxError>,
57}
58···157 }
158}
15900000000160use bytes::Bytes;
161use n0_future::stream::Boxed;
162···196 /// Convert into the inner boxed stream
197 pub fn into_inner(self) -> Boxed<Result<Bytes, StreamError>> {
198 self.inner
199+ }
200+201+ /// Split this stream into two streams that both receive all chunks
202+ ///
203+ /// Chunks are cloned (cheaply via Bytes rc). Spawns a forwarder task.
204+ /// Both returned streams will receive all chunks from the original stream.
205+ /// The forwarder continues as long as at least one stream is alive.
206+ /// If the underlying stream errors, both teed streams will end.
207+ pub fn tee(self) -> (ByteStream, ByteStream) {
208+ use futures::channel::mpsc;
209+ use n0_future::StreamExt as _;
210+211+ let (tx1, rx1) = mpsc::unbounded();
212+ let (tx2, rx2) = mpsc::unbounded();
213+214+ n0_future::task::spawn(async move {
215+ let mut stream = self.inner;
216+ while let Some(result) = stream.next().await {
217+ match result {
218+ Ok(chunk) => {
219+ // Clone chunk (cheap - Bytes is rc'd)
220+ let chunk2 = chunk.clone();
221+222+ // Send to both channels, continue if at least one succeeds
223+ let send1 = tx1.unbounded_send(Ok(chunk));
224+ let send2 = tx2.unbounded_send(Ok(chunk2));
225+226+ // Only stop if both channels are closed
227+ if send1.is_err() && send2.is_err() {
228+ break;
229+ }
230+ }
231+ Err(_e) => {
232+ // Underlying stream errored, stop forwarding.
233+ // Both channels will close, ending both streams.
234+ break;
235+ }
236+ }
237+ }
238+ });
239+240+ (ByteStream::new(rx1), ByteStream::new(rx2))
241 }
242}
243
+5-2
crates/jacquard-common/src/types/blob.rs
···1-use crate::{CowStr, IntoStatic, types::cid::Cid};
0002#[allow(unused)]
3use serde::{Deserialize, Deserializer, Serialize, Serializer, de::Error};
4use smol_str::ToSmolStr;
···24#[serde(rename_all = "camelCase")]
25pub struct Blob<'b> {
26 /// CID (Content Identifier) reference to the blob data
27- pub r#ref: Cid<'b>,
28 /// MIME type of the blob (e.g., "image/png", "video/mp4")
29 #[serde(borrow)]
30 pub mime_type: MimeType<'b>,
···1+use crate::{
2+ CowStr, IntoStatic,
3+ types::cid::{Cid, CidLink},
4+};
5#[allow(unused)]
6use serde::{Deserialize, Deserializer, Serialize, Serializer, de::Error};
7use smol_str::ToSmolStr;
···27#[serde(rename_all = "camelCase")]
28pub struct Blob<'b> {
29 /// CID (Content Identifier) reference to the blob data
30+ pub r#ref: CidLink<'b>,
31 /// MIME type of the blob (e.g., "image/png", "video/mp4")
32 #[serde(borrow)]
33 pub mime_type: MimeType<'b>,
···1516use ipld_core::ipld::Ipld;
17#[cfg(feature = "streaming")]
18-pub use streaming::StreamingResponse;
001920#[cfg(feature = "websocket")]
21pub mod subscription;
···44use crate::{CowStr, error::XrpcResult};
45use crate::{IntoStatic, error::DecodeError};
46#[cfg(feature = "streaming")]
47-use crate::{
48- StreamError,
49- xrpc::streaming::{XrpcProcedureSend, XrpcProcedureStream, XrpcResponseStream, XrpcStreamResp},
50-};
51use crate::{error::TransportError, types::value::RawData};
5253/// Error type for encoding XRPC requests
···272#[cfg_attr(not(target_arch = "wasm32"), trait_variant::make(Send))]
273pub trait XrpcClient: HttpClient {
274 /// Get the base URI for the client.
275- fn base_uri(&self) -> Url;
276277 /// Get the call options for the client.
278 fn opts(&self) -> impl Future<Output = CallOptions<'_>> {
···316 where
317 R: XrpcRequest + Send + Sync,
318 <R as XrpcRequest>::Response: Send + Sync;
00000000000000000000000000000000000000000000000319}
320321/// Stateless XRPC call builder.
···947 /// Stream an XRPC procedure call and its response
948 ///
949 /// Useful for streaming upload of large payloads, or for "pipe-through" operations
950- /// where you processing a large payload.
951 pub async fn stream<S>(
952 self,
953 stream: XrpcProcedureSend<S::Frame<'static>>,
···1516use ipld_core::ipld::Ipld;
17#[cfg(feature = "streaming")]
18+pub use streaming::{
19+ StreamingResponse, XrpcProcedureSend, XrpcProcedureStream, XrpcResponseStream, XrpcStreamResp,
20+};
2122#[cfg(feature = "websocket")]
23pub mod subscription;
···46use crate::{CowStr, error::XrpcResult};
47use crate::{IntoStatic, error::DecodeError};
48#[cfg(feature = "streaming")]
49+use crate::StreamError;
00050use crate::{error::TransportError, types::value::RawData};
5152/// Error type for encoding XRPC requests
···271#[cfg_attr(not(target_arch = "wasm32"), trait_variant::make(Send))]
272pub trait XrpcClient: HttpClient {
273 /// Get the base URI for the client.
274+ fn base_uri(&self) -> impl Future<Output = Url>;
275276 /// Get the call options for the client.
277 fn opts(&self) -> impl Future<Output = CallOptions<'_>> {
···315 where
316 R: XrpcRequest + Send + Sync,
317 <R as XrpcRequest>::Response: Send + Sync;
318+319+}
320+321+/// Stateful XRPC streaming client trait
322+#[cfg(feature = "streaming")]
323+pub trait XrpcStreamingClient: XrpcClient + HttpClientExt {
324+ /// Send an XRPC request and stream the response
325+ #[cfg(not(target_arch = "wasm32"))]
326+ fn download<R>(
327+ &self,
328+ request: R,
329+ ) -> impl Future<Output = Result<StreamingResponse, StreamError>> + Send
330+ where
331+ R: XrpcRequest + Send + Sync,
332+ <R as XrpcRequest>::Response: Send + Sync,
333+ Self: Sync;
334+335+ /// Send an XRPC request and stream the response
336+ #[cfg(target_arch = "wasm32")]
337+ fn download<R>(
338+ &self,
339+ request: R,
340+ ) -> impl Future<Output = Result<StreamingResponse, StreamError>>
341+ where
342+ R: XrpcRequest + Send + Sync,
343+ <R as XrpcRequest>::Response: Send + Sync;
344+345+ /// Stream an XRPC procedure call and its response
346+ #[cfg(not(target_arch = "wasm32"))]
347+ fn stream<S>(
348+ &self,
349+ stream: XrpcProcedureSend<S::Frame<'static>>,
350+ ) -> impl Future<Output = Result<XrpcResponseStream<<<S as XrpcProcedureStream>::Response as XrpcStreamResp>::Frame<'static>>, StreamError>>
351+ where
352+ S: XrpcProcedureStream + 'static,
353+ <<S as XrpcProcedureStream>::Response as XrpcStreamResp>::Frame<'static>: XrpcStreamResp,
354+ Self: Sync;
355+356+ /// Stream an XRPC procedure call and its response
357+ #[cfg(target_arch = "wasm32")]
358+ fn stream<S>(
359+ &self,
360+ stream: XrpcProcedureSend<S::Frame<'static>>,
361+ ) -> impl Future<Output = Result<XrpcResponseStream<<<S as XrpcProcedureStream>::Response as XrpcStreamResp>::Frame<'static>>, StreamError>>
362+ where
363+ S: XrpcProcedureStream + 'static,
364+ <<S as XrpcProcedureStream>::Response as XrpcStreamResp>::Frame<'static>: XrpcStreamResp;
365}
366367/// Stateless XRPC call builder.
···993 /// Stream an XRPC procedure call and its response
994 ///
995 /// Useful for streaming upload of large payloads, or for "pipe-through" operations
996+ /// where you are processing a large payload.
997 pub async fn stream<S>(
998 self,
999 stream: XrpcProcedureSend<S::Frame<'static>>,
+1-1
crates/jacquard-common/src/xrpc/streaming.rs
···208 }
209}
210211-/// XRPC streaming response
212///
213/// Similar to `Response<R>` but holds a streaming body instead of a buffer.
214pub struct StreamingResponse {
···208 }
209}
210211+/// HTTP streaming response
212///
213/// Similar to `Response<R>` but holds a streaming body instead of a buffer.
214pub struct StreamingResponse {
+3-3
crates/jacquard-common/src/xrpc/subscription.rs
···472#[cfg_attr(not(target_arch = "wasm32"), trait_variant::make(Send))]
473pub trait SubscriptionClient: WebSocketClient {
474 /// Get the base URI for the client.
475- fn base_uri(&self) -> Url;
476477 /// Get the subscription options for the client.
478 fn subscription_opts(&self) -> impl Future<Output = SubscriptionOptions<'_>> {
···570}
571572impl<W: WebSocketClient> SubscriptionClient for BasicSubscriptionClient<W> {
573- fn base_uri(&self) -> Url {
574 self.base_uri.clone()
575 }
576···613 Sub: XrpcSubscription + Send + Sync,
614 Self: Sync,
615 {
616- let base = self.base_uri();
617 self.subscription(base)
618 .with_options(opts)
619 .subscribe(params)
···472#[cfg_attr(not(target_arch = "wasm32"), trait_variant::make(Send))]
473pub trait SubscriptionClient: WebSocketClient {
474 /// Get the base URI for the client.
475+ fn base_uri(&self) -> impl Future<Output = Url>;
476477 /// Get the subscription options for the client.
478 fn subscription_opts(&self) -> impl Future<Output = SubscriptionOptions<'_>> {
···570}
571572impl<W: WebSocketClient> SubscriptionClient for BasicSubscriptionClient<W> {
573+ async fn base_uri(&self) -> Url {
574 self.base_uri.clone()
575 }
576···613 Sub: XrpcSubscription + Send + Sync,
614 Self: Sync,
615 {
616+ let base = self.base_uri().await;
617 self.subscription(base)
618 .with_options(opts)
619 .subscribe(params)
···433 T: HttpClient + XrpcExt + Send + Sync + 'static,
434 W: Send + Sync,
435{
436+ async fn base_uri(&self) -> Url {
437+ self.endpoint.read().await.clone().unwrap_or(
438+ Url::parse("https://public.bsky.app").expect("public appview should be valid url"),
439+ )
0000000000000000000440 }
441442 async fn send<R>(&self, request: R) -> XrpcResult<XrpcResponse<R>>
···457 R: XrpcRequest + Send + Sync,
458 <R as XrpcRequest>::Response: Send + Sync,
459 {
460+ let base_uri = self.base_uri().await;
461 let auth = self.access_token().await;
462 opts.auth = auth;
463 let resp = self
···493 }
494}
495496+#[cfg(feature = "streaming")]
497+impl<S, T, W> jacquard_common::http_client::HttpClientExt for CredentialSession<S, T, W>
498+where
499+ S: SessionStore<SessionKey, AtpSession> + Send + Sync + 'static,
500+ T: HttpClient + XrpcExt + jacquard_common::http_client::HttpClientExt + Send + Sync + 'static,
501+ W: Send + Sync,
502+{
503+ async fn send_http_streaming(
504+ &self,
505+ request: http::Request<Vec<u8>>,
506+ ) -> core::result::Result<http::Response<jacquard_common::stream::ByteStream>, Self::Error> {
507+ self.client.send_http_streaming(request).await
508+ }
509+510+ async fn send_http_bidirectional<Str>(
511+ &self,
512+ parts: http::request::Parts,
513+ body: Str,
514+ ) -> core::result::Result<http::Response<jacquard_common::stream::ByteStream>, Self::Error>
515+ where
516+ Str: n0_future::Stream<Item = core::result::Result<bytes::Bytes, jacquard_common::StreamError>>
517+ + Send
518+ + 'static,
519+ {
520+ self.client.send_http_bidirectional(parts, body).await
521+ }
522+}
523+524+#[cfg(feature = "streaming")]
525+impl<S, T, W> jacquard_common::xrpc::XrpcStreamingClient for CredentialSession<S, T, W>
526+where
527+ S: SessionStore<SessionKey, AtpSession> + Send + Sync + 'static,
528+ T: HttpClient + XrpcExt + jacquard_common::http_client::HttpClientExt + Send + Sync + 'static,
529+ W: Send + Sync,
530+{
531+ async fn download<R>(
532+ &self,
533+ request: R,
534+ ) -> core::result::Result<jacquard_common::xrpc::StreamingResponse, jacquard_common::StreamError>
535+ where
536+ R: XrpcRequest + Send + Sync,
537+ <R as XrpcRequest>::Response: Send + Sync,
538+ {
539+ use jacquard_common::{StreamError, xrpc::build_http_request};
540+541+ let base_uri = <Self as XrpcClient>::base_uri(self).await;
542+ let mut opts = self.options.read().await.clone();
543+ opts.auth = self.access_token().await;
544+545+ let http_request = build_http_request(&base_uri, &request, &opts)
546+ .map_err(|e| StreamError::protocol(e.to_string()))?;
547+548+ let response = self
549+ .client
550+ .send_http_streaming(http_request.clone())
551+ .await
552+ .map_err(StreamError::transport)?;
553+554+ let (parts, body) = response.into_parts();
555+ let status = parts.status;
556+557+ // Check if expired based on status code
558+ if status == http::StatusCode::UNAUTHORIZED || status == http::StatusCode::BAD_REQUEST {
559+ // Try to refresh
560+ let auth = self.refresh().await.map_err(StreamError::transport)?;
561+ opts.auth = Some(auth);
562+563+ let http_request = build_http_request(&base_uri, &request, &opts)
564+ .map_err(|e| StreamError::protocol(e.to_string()))?;
565+566+ let response = self
567+ .client
568+ .send_http_streaming(http_request)
569+ .await
570+ .map_err(StreamError::transport)?;
571+ let (parts, body) = response.into_parts();
572+ Ok(jacquard_common::xrpc::StreamingResponse::new(parts, body))
573+ } else {
574+ Ok(jacquard_common::xrpc::StreamingResponse::new(parts, body))
575+ }
576+ }
577+578+ async fn stream<Str>(
579+ &self,
580+ stream: jacquard_common::xrpc::streaming::XrpcProcedureSend<Str::Frame<'static>>,
581+ ) -> core::result::Result<
582+ jacquard_common::xrpc::streaming::XrpcResponseStream<
583+ <<Str as jacquard_common::xrpc::streaming::XrpcProcedureStream>::Response as jacquard_common::xrpc::streaming::XrpcStreamResp>::Frame<'static>,
584+ >,
585+ jacquard_common::StreamError,
586+ >
587+ where
588+ Str: jacquard_common::xrpc::streaming::XrpcProcedureStream + 'static,
589+ <<Str as jacquard_common::xrpc::streaming::XrpcProcedureStream>::Response as jacquard_common::xrpc::streaming::XrpcStreamResp>::Frame<'static>: jacquard_common::xrpc::streaming::XrpcStreamResp,
590+ {
591+ use jacquard_common::StreamError;
592+ use n0_future::{StreamExt, TryStreamExt};
593+594+ let base_uri = self.base_uri().await;
595+ let mut opts = self.options.read().await.clone();
596+ opts.auth = self.access_token().await;
597+598+ let mut url = base_uri;
599+ let mut path = url.path().trim_end_matches('/').to_owned();
600+ path.push_str("/xrpc/");
601+ path.push_str(<Str::Request as jacquard_common::xrpc::XrpcRequest>::NSID);
602+ url.set_path(&path);
603+604+ let mut builder = http::Request::post(url.to_string());
605+606+ if let Some(token) = &opts.auth {
607+ use jacquard_common::AuthorizationToken;
608+ let hv = match token {
609+ AuthorizationToken::Bearer(t) => {
610+ http::HeaderValue::from_str(&format!("Bearer {}", t.as_ref()))
611+ }
612+ AuthorizationToken::Dpop(t) => {
613+ http::HeaderValue::from_str(&format!("DPoP {}", t.as_ref()))
614+ }
615+ }
616+ .map_err(|e| StreamError::protocol(format!("Invalid authorization token: {}", e)))?;
617+ builder = builder.header(http::header::AUTHORIZATION, hv);
618+ }
619+620+ if let Some(proxy) = &opts.atproto_proxy {
621+ builder = builder.header("atproto-proxy", proxy.as_ref());
622+ }
623+ if let Some(labelers) = &opts.atproto_accept_labelers {
624+ if !labelers.is_empty() {
625+ let joined = labelers
626+ .iter()
627+ .map(|s| s.as_ref())
628+ .collect::<Vec<_>>()
629+ .join(", ");
630+ builder = builder.header("atproto-accept-labelers", joined);
631+ }
632+ }
633+ for (name, value) in &opts.extra_headers {
634+ builder = builder.header(name, value);
635+ }
636+637+ let (parts, _) = builder
638+ .body(())
639+ .map_err(|e| StreamError::protocol(e.to_string()))?
640+ .into_parts();
641+642+ let body_stream =
643+ jacquard_common::stream::ByteStream::new(stream.0.map_ok(|f| f.buffer).boxed());
644+645+ let response = self
646+ .client
647+ .send_http_bidirectional(parts.clone(), body_stream.into_inner())
648+ .await
649+ .map_err(StreamError::transport)?;
650+651+ let (resp_parts, resp_body) = response.into_parts();
652+ let status = resp_parts.status;
653+654+ // Check if expired
655+ if status == http::StatusCode::UNAUTHORIZED || status == http::StatusCode::BAD_REQUEST {
656+ // Try to refresh
657+ let auth = self.refresh().await.map_err(StreamError::transport)?;
658+ opts.auth = Some(auth);
659+660+ // Rebuild request with new auth
661+ let mut builder = http::Request::post(url.to_string());
662+ if let Some(token) = &opts.auth {
663+ use jacquard_common::AuthorizationToken;
664+ let hv = match token {
665+ AuthorizationToken::Bearer(t) => {
666+ http::HeaderValue::from_str(&format!("Bearer {}", t.as_ref()))
667+ }
668+ AuthorizationToken::Dpop(t) => {
669+ http::HeaderValue::from_str(&format!("DPoP {}", t.as_ref()))
670+ }
671+ }
672+ .map_err(|e| StreamError::protocol(format!("Invalid authorization token: {}", e)))?;
673+ builder = builder.header(http::header::AUTHORIZATION, hv);
674+ }
675+ if let Some(proxy) = &opts.atproto_proxy {
676+ builder = builder.header("atproto-proxy", proxy.as_ref());
677+ }
678+ if let Some(labelers) = &opts.atproto_accept_labelers {
679+ if !labelers.is_empty() {
680+ let joined = labelers
681+ .iter()
682+ .map(|s| s.as_ref())
683+ .collect::<Vec<_>>()
684+ .join(", ");
685+ builder = builder.header("atproto-accept-labelers", joined);
686+ }
687+ }
688+ for (name, value) in &opts.extra_headers {
689+ builder = builder.header(name, value);
690+ }
691+692+ let (parts, _) = builder
693+ .body(())
694+ .map_err(|e| StreamError::protocol(e.to_string()))?
695+ .into_parts();
696+697+ // Can't retry with the same stream - it's been consumed
698+ // This is a limitation of streaming upload with auth refresh
699+ return Err(StreamError::protocol("Authentication failed on streaming upload and stream cannot be retried".to_string()));
700+ }
701+702+ Ok(jacquard_common::xrpc::streaming::XrpcResponseStream::from_typed_parts(
703+ resp_parts, resp_body,
704+ ))
705+ }
706+}
707+708impl<S, T, W> IdentityResolver for CredentialSession<S, T, W>
709where
710 S: SessionStore<SessionKey, AtpSession> + Send + Sync + 'static,
···789 AuthorizationToken::Bearer(t) => format!("Bearer {}", t.as_ref()),
790 AuthorizationToken::Dpop(t) => format!("DPoP {}", t.as_ref()),
791 };
792+ opts.headers
793+ .push((CowStr::from("Authorization"), CowStr::from(auth_value)));
00794 }
795 opts
796 }
+3
crates/jacquard/src/lib.rs
···219220pub mod client;
221000222pub use common::*;
223#[cfg(feature = "api")]
224pub use jacquard_api as api;
···219220pub mod client;
221222+#[cfg(feature = "streaming")]
223+pub mod streaming;
224+225pub use common::*;
226#[cfg(feature = "api")]
227pub use jacquard_api as api;
+3
crates/jacquard/src/streaming.rs
···000
···1+pub mod blob;
2+pub mod repo;
3+pub mod video;