A better Rust ATProto crate

WIP on typed http streaming infra

Orual 9714fd1f 0d9dc178

+420 -13
+79 -2
Cargo.lock
··· 1261 1261 ] 1262 1262 1263 1263 [[package]] 1264 + name = "genawaiter" 1265 + version = "0.99.1" 1266 + source = "registry+https://github.com/rust-lang/crates.io-index" 1267 + checksum = "c86bd0361bcbde39b13475e6e36cb24c329964aa2611be285289d1e4b751c1a0" 1268 + dependencies = [ 1269 + "futures-core", 1270 + "genawaiter-macro", 1271 + "genawaiter-proc-macro", 1272 + "proc-macro-hack", 1273 + ] 1274 + 1275 + [[package]] 1276 + name = "genawaiter-macro" 1277 + version = "0.99.1" 1278 + source = "registry+https://github.com/rust-lang/crates.io-index" 1279 + checksum = "0b32dfe1fdfc0bbde1f22a5da25355514b5e450c33a6af6770884c8750aedfbc" 1280 + 1281 + [[package]] 1282 + name = "genawaiter-proc-macro" 1283 + version = "0.99.1" 1284 + source = "registry+https://github.com/rust-lang/crates.io-index" 1285 + checksum = "784f84eebc366e15251c4a8c3acee82a6a6f427949776ecb88377362a9621738" 1286 + dependencies = [ 1287 + "proc-macro-error 0.4.12", 1288 + "proc-macro-hack", 1289 + "proc-macro2", 1290 + "quote", 1291 + "syn 1.0.109", 1292 + ] 1293 + 1294 + [[package]] 1264 1295 name = "generator" 1265 1296 version = "0.8.7" 1266 1297 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1983 2014 "ed25519-dalek", 1984 2015 "futures", 1985 2016 "futures-lite", 2017 + "genawaiter", 1986 2018 "getrandom 0.3.4", 1987 2019 "http", 1988 2020 "ipld-core", ··· 2006 2038 "thiserror 2.0.17", 2007 2039 "tokio", 2008 2040 "tokio-tungstenite-wasm", 2041 + "tokio-util", 2009 2042 "tracing", 2010 2043 "trait-variant", 2011 2044 "url", ··· 2884 2917 2885 2918 [[package]] 2886 2919 name = "proc-macro-error" 2920 + version = "0.4.12" 2921 + source = "registry+https://github.com/rust-lang/crates.io-index" 2922 + checksum = "18f33027081eba0a6d8aba6d1b1c3a3be58cbb12106341c2d5759fcd9b5277e7" 2923 + dependencies = [ 2924 + "proc-macro-error-attr 0.4.12", 2925 + "proc-macro2", 2926 + "quote", 2927 + "syn 1.0.109", 2928 + "version_check", 2929 + ] 2930 + 2931 + [[package]] 2932 + name = "proc-macro-error" 2887 2933 version = "1.0.4" 2888 2934 source = "registry+https://github.com/rust-lang/crates.io-index" 2889 2935 checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" 2890 2936 dependencies = [ 2891 - "proc-macro-error-attr", 2937 + "proc-macro-error-attr 1.0.4", 2892 2938 "proc-macro2", 2893 2939 "quote", 2894 2940 "syn 1.0.109", ··· 2897 2943 2898 2944 [[package]] 2899 2945 name = "proc-macro-error-attr" 2946 + version = "0.4.12" 2947 + source = "registry+https://github.com/rust-lang/crates.io-index" 2948 + checksum = "8a5b4b77fdb63c1eca72173d68d24501c54ab1269409f6b672c85deb18af69de" 2949 + dependencies = [ 2950 + "proc-macro2", 2951 + "quote", 2952 + "syn 1.0.109", 2953 + "syn-mid", 2954 + "version_check", 2955 + ] 2956 + 2957 + [[package]] 2958 + name = "proc-macro-error-attr" 2900 2959 version = "1.0.4" 2901 2960 source = "registry+https://github.com/rust-lang/crates.io-index" 2902 2961 checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869" ··· 2905 2964 "quote", 2906 2965 "version_check", 2907 2966 ] 2967 + 2968 + [[package]] 2969 + name = "proc-macro-hack" 2970 + version = "0.5.20+deprecated" 2971 + source = "registry+https://github.com/rust-lang/crates.io-index" 2972 + checksum = "dc375e1527247fe1a97d8b7156678dfe7c1af2fc075c9a4db3690ecd2a148068" 2908 2973 2909 2974 [[package]] 2910 2975 name = "proc-macro2" ··· 3720 3785 "ciborium", 3721 3786 "hex_fmt", 3722 3787 "indoc", 3723 - "proc-macro-error", 3788 + "proc-macro-error 1.0.4", 3724 3789 "proc-macro2", 3725 3790 "quote", 3726 3791 "serde", ··· 3775 3840 checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" 3776 3841 dependencies = [ 3777 3842 "proc-macro2", 3843 + "quote", 3778 3844 "unicode-ident", 3779 3845 ] 3780 3846 ··· 3787 3853 "proc-macro2", 3788 3854 "quote", 3789 3855 "unicode-ident", 3856 + ] 3857 + 3858 + [[package]] 3859 + name = "syn-mid" 3860 + version = "0.5.4" 3861 + source = "registry+https://github.com/rust-lang/crates.io-index" 3862 + checksum = "fea305d57546cc8cd04feb14b62ec84bf17f50e3f7b12560d7bfa9265f39d9ed" 3863 + dependencies = [ 3864 + "proc-macro2", 3865 + "quote", 3866 + "syn 1.0.109", 3790 3867 ] 3791 3868 3792 3869 [[package]]
+2
crates/jacquard-common/Cargo.toml
··· 45 45 n0-future = { version = "0.1", optional = true } 46 46 futures = { version = "0.3", optional = true } 47 47 tokio-tungstenite-wasm = { version = "0.4", optional = true } 48 + genawaiter = { version = "0.99.1", features = ["futures03"] } 48 49 49 50 [target.'cfg(target_family = "wasm")'.dependencies] 50 51 getrandom = { version = "0.3.4", features = ["wasm_js"] } 51 52 52 53 [target.'cfg(not(target_arch = "wasm32"))'.dependencies] 53 54 reqwest = { workspace = true, optional = true, features = [ "http2", "system-proxy", "rustls-tls"] } 55 + tokio-util = { version = "0.7.16", features = ["io"] } 54 56 55 57 [features] 56 58 default = ["service-auth", "reqwest-client", "crypto", "websocket"]
+3 -4
crates/jacquard-common/src/http_client.rs
··· 37 37 body: S, 38 38 ) -> impl Future<Output = Result<http::Response<ByteStream>, Self::Error>> 39 39 where 40 - S: n0_future::Stream<Item = bytes::Bytes> + Send + 'static; 40 + S: n0_future::Stream<Item = Result<bytes::Bytes, StreamError>> + Send + 'static; 41 41 } 42 42 43 43 #[cfg(feature = "reqwest-client")] ··· 145 145 body: S, 146 146 ) -> Result<http::Response<ByteStream>, Self::Error> 147 147 where 148 - S: n0_future::Stream<Item = bytes::Bytes> + Send + 'static, 148 + S: n0_future::Stream<Item = Result<bytes::Bytes, StreamError>> + Send + 'static, 149 149 { 150 150 // Convert stream to reqwest::Body 151 151 use futures::StreamExt; 152 - let ok_stream = body.map(Ok::<_, Self::Error>); 153 - let reqwest_body = reqwest::Body::wrap_stream(ok_stream); 152 + let reqwest_body = reqwest::Body::wrap_stream(body); 154 153 155 154 let mut req = self 156 155 .request(parts.method, parts.uri.to_string())
+27 -5
crates/jacquard-common/src/stream.rs
··· 66 66 Protocol, 67 67 /// Message deserialization failed 68 68 Decode, 69 + /// Message serialization failed 70 + Encode, 69 71 /// Wrong message format (e.g., text frame when expecting binary) 70 72 WrongMessageFormat, 71 73 } ··· 118 120 } 119 121 } 120 122 123 + /// Create an encode error with source 124 + pub fn encode(source: impl Error + Send + Sync + 'static) -> Self { 125 + Self { 126 + kind: StreamErrorKind::Encode, 127 + source: Some(Box::new(source)), 128 + } 129 + } 130 + 121 131 /// Create a wrong message format error 122 132 pub fn wrong_message_format(msg: impl Into<String>) -> Self { 123 133 Self { ··· 134 144 StreamErrorKind::Closed => write!(f, "Stream closed"), 135 145 StreamErrorKind::Protocol => write!(f, "Protocol error"), 136 146 StreamErrorKind::Decode => write!(f, "Decode error"), 147 + StreamErrorKind::Encode => write!(f, "Encode error"), 137 148 StreamErrorKind::WrongMessageFormat => write!(f, "Wrong message format"), 138 149 }?; 139 150 ··· 154 165 } 155 166 156 167 use bytes::Bytes; 168 + use n0_future::stream::Boxed; 157 169 158 170 /// Platform-agnostic byte stream abstraction 159 171 pub struct ByteStream { 160 - inner: Box<dyn n0_future::Stream<Item = Result<Bytes, StreamError>> + Unpin>, 172 + inner: Boxed<Result<Bytes, StreamError>>, 161 173 } 162 174 163 175 impl ByteStream { 164 176 /// Create a new byte stream from any compatible stream 177 + #[cfg(not(target_arch = "wasm32"))] 178 + pub fn new<S>(stream: S) -> Self 179 + where 180 + S: n0_future::Stream<Item = Result<Bytes, StreamError>> + Unpin + Send + 'static, 181 + { 182 + Self { 183 + inner: Box::pin(stream), 184 + } 185 + } 186 + 187 + /// Create a new byte stream from any compatible stream 188 + #[cfg(target_arch = "wasm32")] 165 189 pub fn new<S>(stream: S) -> Self 166 190 where 167 191 S: n0_future::Stream<Item = Result<Bytes, StreamError>> + Unpin + 'static, 168 192 { 169 193 Self { 170 - inner: Box::new(stream), 194 + inner: Box::pin(stream), 171 195 } 172 196 } 173 197 ··· 177 201 } 178 202 179 203 /// Convert into the inner boxed stream 180 - pub fn into_inner( 181 - self, 182 - ) -> Box<dyn n0_future::Stream<Item = Result<Bytes, StreamError>> + Unpin> { 204 + pub fn into_inner(self) -> Boxed<Result<Bytes, StreamError>> { 183 205 self.inner 184 206 } 185 207 }
+103 -1
crates/jacquard-common/src/xrpc.rs
··· 38 38 use std::{error::Error, marker::PhantomData}; 39 39 use url::Url; 40 40 41 - use crate::http_client::HttpClient; 41 + use crate::http_client::{HttpClient, HttpClientExt}; 42 42 use crate::types::value::Data; 43 43 use crate::{AuthorizationToken, error::AuthError}; 44 44 use crate::{CowStr, error::XrpcResult}; 45 45 use crate::{IntoStatic, error::DecodeError}; 46 + #[cfg(feature = "streaming")] 47 + use crate::{ 48 + StreamError, 49 + xrpc::streaming::{XrpcProcedureSend, XrpcProcedureStream, XrpcResponseStream, XrpcStreamResp}, 50 + }; 46 51 use crate::{error::TransportError, types::value::RawData}; 47 52 48 53 /// Error type for encoding XRPC requests ··· 913 918 XrpcError::Generic(e) => XrpcError::Generic(e), 914 919 XrpcError::Decode(e) => XrpcError::Decode(e), 915 920 } 921 + } 922 + } 923 + 924 + #[cfg(feature = "streaming")] 925 + impl<'a, C: HttpClient + HttpClientExt> XrpcCall<'a, C> { 926 + /// Send an XRPC call and stream the binary response. 927 + /// 928 + /// Useful for downloading blobs and entire repository archives 929 + pub async fn download<R>(self, request: &R) -> Result<StreamingResponse, StreamError> 930 + where 931 + R: XrpcRequest, 932 + <R as XrpcRequest>::Response: Send + Sync, 933 + { 934 + let http_request = 935 + build_http_request(&self.base, request, &self.opts).map_err(StreamError::transport)?; 936 + 937 + let http_response = self 938 + .client 939 + .send_http_streaming(http_request) 940 + .await 941 + .map_err(StreamError::transport)?; 942 + let (parts, body) = http_response.into_parts(); 943 + 944 + Ok(StreamingResponse::new(parts, body)) 945 + } 946 + 947 + /// Stream an XRPC procedure call and its response 948 + /// 949 + /// Useful for streaming upload of large payloads, or for "pipe-through" operations 950 + /// where you processing a large payload. 951 + pub async fn stream<S>( 952 + self, 953 + stream: XrpcProcedureSend<S::Frame<'static>>, 954 + ) -> Result<XrpcResponseStream<<S::Response as XrpcStreamResp>::Frame<'static>>, StreamError> 955 + where 956 + S: XrpcProcedureStream + 'static, 957 + <<S as XrpcProcedureStream>::Response as XrpcStreamResp>::Frame<'static>: XrpcStreamResp, 958 + { 959 + use futures::TryStreamExt; 960 + use n0_future::StreamExt; 961 + 962 + let mut url = self.base; 963 + let mut path = url.path().trim_end_matches('/').to_owned(); 964 + path.push_str("/xrpc/"); 965 + path.push_str(<S::Request as XrpcRequest>::NSID); 966 + url.set_path(&path); 967 + 968 + let mut builder = http::Request::post(url.to_string()); 969 + 970 + if let Some(token) = &self.opts.auth { 971 + let hv = match token { 972 + AuthorizationToken::Bearer(t) => { 973 + HeaderValue::from_str(&format!("Bearer {}", t.as_ref())) 974 + } 975 + AuthorizationToken::Dpop(t) => { 976 + HeaderValue::from_str(&format!("DPoP {}", t.as_ref())) 977 + } 978 + } 979 + .map_err(|e| StreamError::protocol(format!("Invalid authorization token: {}", e)))?; 980 + builder = builder.header(Header::Authorization, hv); 981 + } 982 + 983 + if let Some(proxy) = &self.opts.atproto_proxy { 984 + builder = builder.header(Header::AtprotoProxy, proxy.as_ref()); 985 + } 986 + if let Some(labelers) = &self.opts.atproto_accept_labelers { 987 + if !labelers.is_empty() { 988 + let joined = labelers 989 + .iter() 990 + .map(|s| s.as_ref()) 991 + .collect::<Vec<_>>() 992 + .join(", "); 993 + builder = builder.header(Header::AtprotoAcceptLabelers, joined); 994 + } 995 + } 996 + for (name, value) in &self.opts.extra_headers { 997 + builder = builder.header(name, value); 998 + } 999 + 1000 + let (parts, _) = builder 1001 + .body(()) 1002 + .map_err(|e| StreamError::protocol(e.to_string()))? 1003 + .into_parts(); 1004 + 1005 + let body_stream = stream.0.map_ok(|f| f.buffer).boxed(); 1006 + 1007 + let resp = self 1008 + .client 1009 + .send_http_bidirectional(parts, body_stream) 1010 + .await 1011 + .map_err(StreamError::transport)?; 1012 + 1013 + let (parts, body) = resp.into_parts(); 1014 + 1015 + Ok(XrpcResponseStream::< 1016 + <<S as XrpcProcedureStream>::Response as XrpcStreamResp>::Frame<'static>, 1017 + >::from_typed_parts(parts, body)) 916 1018 } 917 1019 } 918 1020
+206 -1
crates/jacquard-common/src/xrpc/streaming.rs
··· 1 1 //! Streaming support for XRPC requests and responses 2 2 3 - use crate::stream::ByteStream; 3 + use crate::{IntoStatic, StreamError, stream::ByteStream, xrpc::XrpcRequest}; 4 + use bytes::Bytes; 4 5 use http::StatusCode; 6 + use n0_future::{StreamExt, TryStreamExt, stream::Boxed}; 7 + use serde::{Deserialize, Serialize}; 8 + #[cfg(not(target_arch = "wasm32"))] 9 + use std::path::Path; 10 + use std::{marker::PhantomData, pin::Pin}; 11 + 12 + pub trait XrpcProcedureStream { 13 + /// The NSID for this XRPC method 14 + const NSID: &'static str; 15 + /// The upload encoding 16 + const ENCODING: &'static str; 17 + 18 + type Frame<'de>; 19 + 20 + type Request: XrpcRequest; 21 + 22 + /// Response type returned from the XRPC call (marker struct) 23 + type Response: XrpcStreamResp; 24 + 25 + fn encode_frame<'de>(data: Self::Frame<'de>) -> Result<Bytes, StreamError> 26 + where 27 + Self::Frame<'de>: Serialize, 28 + { 29 + Ok(Bytes::from_owner( 30 + serde_ipld_dagcbor::to_vec(&data).map_err(StreamError::encode)?, 31 + )) 32 + } 33 + 34 + /// Decode the request body for procedures. 35 + /// 36 + /// Default implementation deserializes from CBOR. Override for non-CBOR encodings. 37 + fn decode_frame<'de>(frame: &'de [u8]) -> Result<Self::Frame<'de>, StreamError> 38 + where 39 + Self::Frame<'de>: Deserialize<'de>, 40 + { 41 + Ok(serde_ipld_dagcbor::from_slice(frame).map_err(StreamError::decode)?) 42 + } 43 + } 44 + 45 + /// Trait for XRPC Response types 46 + /// 47 + /// It mirrors the NSID and carries the encoding types as well as Output (success) and Err types 48 + pub trait XrpcStreamResp { 49 + /// The NSID for this XRPC method 50 + const NSID: &'static str; 51 + 52 + /// Output encoding (MIME type) 53 + const ENCODING: &'static str; 54 + 55 + /// Response output type 56 + type Frame<'de>: IntoStatic; 57 + 58 + fn encode_frame<'de>(data: Self::Frame<'de>) -> Result<Bytes, StreamError> 59 + where 60 + Self::Frame<'de>: Serialize, 61 + { 62 + Ok(Bytes::from_owner( 63 + serde_ipld_dagcbor::to_vec(&data).map_err(StreamError::encode)?, 64 + )) 65 + } 66 + 67 + /// Decode the request body for procedures. 68 + /// 69 + /// Default implementation deserializes from CBOR. Override for non-CBOR encodings. 70 + /// 71 + /// TODO: make this handle when frames are fragmented? 72 + fn decode_frame<'de>(frame: &'de [u8]) -> Result<Self::Frame<'de>, StreamError> 73 + where 74 + Self::Frame<'de>: Deserialize<'de>, 75 + { 76 + Ok(serde_ipld_dagcbor::from_slice(frame).map_err(StreamError::decode)?) 77 + } 78 + } 79 + 80 + #[repr(transparent)] 81 + pub struct XrpcStreamFrame<F = ()> { 82 + pub buffer: Bytes, 83 + _marker: PhantomData<F>, 84 + } 85 + 86 + impl XrpcStreamFrame { 87 + pub fn new(buffer: Bytes) -> Self { 88 + Self { 89 + buffer, 90 + _marker: PhantomData, 91 + } 92 + } 93 + } 94 + 95 + impl<F> XrpcStreamFrame<F> { 96 + pub fn new_typed<G>(buffer: Bytes) -> Self { 97 + Self { 98 + buffer, 99 + _marker: PhantomData, 100 + } 101 + } 102 + } 103 + 104 + /// Dumb file upload stream 105 + /// 106 + /// Unavailable on wasm due to use of tokio I/O 107 + #[cfg(not(target_arch = "wasm32"))] 108 + pub async fn upload_stream(file: impl AsRef<Path>) -> Result<XrpcProcedureSend, tokio::io::Error> { 109 + use tokio_util::io::ReaderStream; 110 + 111 + let file = tokio::fs::File::open(file).await?; 112 + let reader = ReaderStream::new(file); 113 + let stream = reader 114 + .map(|b| match b { 115 + Ok(bytes) => Ok(XrpcStreamFrame::new(bytes)), 116 + Err(err) => Err(StreamError::transport(err)), 117 + }) 118 + .boxed(); 119 + 120 + Ok(XrpcProcedureSend(stream)) 121 + } 122 + 123 + /// Encode a stream of items into the corresponding XRPC procedure stream. 124 + pub fn encode_stream<P: XrpcProcedureStream + 'static>( 125 + s: Boxed<P::Frame<'static>>, 126 + ) -> XrpcProcedureSend<P::Frame<'static>> 127 + where 128 + <P as XrpcProcedureStream>::Frame<'static>: Serialize, 129 + { 130 + let stream = s 131 + .map(|f| P::encode_frame(f).map(|b| XrpcStreamFrame::new_typed::<P::Frame<'_>>(b))) 132 + .boxed(); 133 + 134 + XrpcProcedureSend(stream) 135 + } 136 + 137 + /// Sending stream for streaming XRPC procedure uplink. 138 + pub struct XrpcProcedureSend<F = ()>(pub Boxed<Result<XrpcStreamFrame<F>, StreamError>>); 139 + 140 + /// Sink half of XRPC procedure uplink stream, for use in pipe scenarios. 141 + pub struct XrpcProcedureSink<F = ()>( 142 + pub Pin<Box<dyn n0_future::Sink<XrpcStreamFrame<F>, Error = StreamError> + Send>>, 143 + ); 144 + 145 + pub struct XrpcResponseStream<F = ()> { 146 + parts: http::response::Parts, 147 + body: Boxed<Result<XrpcStreamFrame<F>, StreamError>>, 148 + } 149 + 150 + impl XrpcResponseStream { 151 + pub fn from_bytestream(StreamingResponse { parts, body }: StreamingResponse) -> Self { 152 + Self { 153 + parts, 154 + body: body 155 + .into_inner() 156 + .map_ok(|b| XrpcStreamFrame::new(b)) 157 + .boxed(), 158 + } 159 + } 160 + 161 + pub fn from_parts(parts: http::response::Parts, body: ByteStream) -> Self { 162 + Self { 163 + parts, 164 + body: body 165 + .into_inner() 166 + .map_ok(|b| XrpcStreamFrame::new(b)) 167 + .boxed(), 168 + } 169 + } 170 + 171 + pub fn into_parts(self) -> (http::response::Parts, ByteStream) { 172 + ( 173 + self.parts, 174 + ByteStream::new(self.body.map_ok(|f| f.buffer).boxed()), 175 + ) 176 + } 177 + 178 + pub fn into_bytestream(self) -> ByteStream { 179 + ByteStream::new(self.body.map_ok(|f| f.buffer).boxed()) 180 + } 181 + } 182 + 183 + impl<F: XrpcStreamResp> XrpcResponseStream<F> { 184 + pub fn from_stream(StreamingResponse { parts, body }: StreamingResponse) -> Self { 185 + Self { 186 + parts, 187 + body: body 188 + .into_inner() 189 + .map_ok(|b| XrpcStreamFrame::new_typed::<F::Frame<'_>>(b)) 190 + .boxed(), 191 + } 192 + } 193 + 194 + pub fn from_typed_parts(parts: http::response::Parts, body: ByteStream) -> Self { 195 + Self { 196 + parts, 197 + body: body 198 + .into_inner() 199 + .map_ok(|b| XrpcStreamFrame::new_typed::<F::Frame<'_>>(b)) 200 + .boxed(), 201 + } 202 + } 203 + } 204 + 205 + impl<F: XrpcStreamResp + 'static> XrpcResponseStream<F> { 206 + pub fn into_bytestream(self) -> ByteStream { 207 + ByteStream::new(self.body.map_ok(|f| f.buffer).boxed()) 208 + } 209 + } 5 210 6 211 /// XRPC streaming response 7 212 ///