tangled
alpha
login
or
join now
nonbinary.computer
/
jacquard
80
fork
atom
A better Rust ATProto crate
80
fork
atom
overview
issues
9
pulls
pipelines
add streaming methods to HttpClient
Orual
5 months ago
0b3a649c
cce91535
+141
-3
5 changed files
expand all
collapse all
unified
split
Cargo.lock
crates
jacquard-common
Cargo.toml
src
http_client.rs
lib.rs
tests
http_streaming.rs
+14
Cargo.lock
···
3171
3171
"url",
3172
3172
"wasm-bindgen",
3173
3173
"wasm-bindgen-futures",
3174
3174
+
"wasm-streams",
3174
3175
"web-sys",
3175
3176
"webpki-roots",
3176
3177
]
···
4408
4409
checksum = "bad67dc8b2a1a6e5448428adec4c3e84c43e561d8c9ee8a9e5aabeb193ec41d1"
4409
4410
dependencies = [
4410
4411
"unicode-ident",
4412
4412
+
]
4413
4413
+
4414
4414
+
[[package]]
4415
4415
+
name = "wasm-streams"
4416
4416
+
version = "0.4.2"
4417
4417
+
source = "registry+https://github.com/rust-lang/crates.io-index"
4418
4418
+
checksum = "15053d8d85c7eccdbefef60f06769760a563c7f0a9d6902a13d35c7800b0ad65"
4419
4419
+
dependencies = [
4420
4420
+
"futures-util",
4421
4421
+
"js-sys",
4422
4422
+
"wasm-bindgen",
4423
4423
+
"wasm-bindgen-futures",
4424
4424
+
"web-sys",
4411
4425
]
4412
4426
4413
4427
[[package]]
+3
-3
crates/jacquard-common/Cargo.toml
···
35
35
url.workspace = true
36
36
http.workspace = true
37
37
38
38
-
reqwest = { workspace = true, optional = true, features = ["charset", "gzip"] }
38
38
+
reqwest = { workspace = true, optional = true, features = ["charset", "gzip", "stream"] }
39
39
serde_ipld_dagcbor.workspace = true
40
40
signature = { version = "2", optional = true }
41
41
tracing = { workspace = true, optional = true }
···
43
43
44
44
# Streaming support (optional)
45
45
n0-future = { version = "0.1", optional = true }
46
46
+
futures = { version = "0.3", optional = true }
46
47
47
48
[target.'cfg(target_family = "wasm")'.dependencies]
48
49
getrandom = { version = "0.3.4", features = ["wasm_js"] }
···
59
60
service-auth = ["crypto-k256", "crypto-p256", "dep:signature"]
60
61
reqwest-client = ["dep:reqwest"]
61
62
tracing = ["dep:tracing"]
62
62
-
streaming = ["n0-future"]
63
63
+
streaming = ["n0-future", "futures"]
63
64
64
65
[dependencies.ed25519-dalek]
65
66
version = "2"
···
80
81
81
82
[dev-dependencies]
82
83
tokio = { version = "1", features = ["macros", "rt"] }
83
83
-
futures = "0.3"
84
84
85
85
[package.metadata.docs.rs]
86
86
features = [ "crypto-k256", "crypto-k256", "crypto-p256"]
+100
crates/jacquard-common/src/http_client.rs
···
17
17
) -> impl Future<Output = core::result::Result<http::Response<Vec<u8>>, Self::Error>>;
18
18
}
19
19
20
20
+
#[cfg(feature = "streaming")]
21
21
+
use crate::stream::{ByteStream, StreamError};
22
22
+
23
23
+
/// Extension trait for HTTP client with streaming support
24
24
+
#[cfg(feature = "streaming")]
25
25
+
#[cfg_attr(not(target_arch = "wasm32"), trait_variant::make(Send))]
26
26
+
pub trait HttpClientExt: HttpClient {
27
27
+
/// Send HTTP request and return streaming response
28
28
+
fn send_http_streaming(
29
29
+
&self,
30
30
+
request: http::Request<Vec<u8>>,
31
31
+
) -> impl Future<Output = Result<http::Response<ByteStream>, Self::Error>>;
32
32
+
33
33
+
/// Send HTTP request with streaming body and receive streaming response
34
34
+
fn send_http_bidirectional<S>(
35
35
+
&self,
36
36
+
parts: http::request::Parts,
37
37
+
body: S,
38
38
+
) -> impl Future<Output = Result<http::Response<ByteStream>, Self::Error>>
39
39
+
where
40
40
+
S: n0_future::Stream<Item = bytes::Bytes> + Send + 'static;
41
41
+
}
42
42
+
20
43
#[cfg(feature = "reqwest-client")]
21
44
impl HttpClient for reqwest::Client {
22
45
type Error = reqwest::Error;
···
77
100
self.as_ref().send_http(request)
78
101
}
79
102
}
103
103
+
104
104
+
#[cfg(all(feature = "streaming", feature = "reqwest-client"))]
105
105
+
impl HttpClientExt for reqwest::Client {
106
106
+
async fn send_http_streaming(
107
107
+
&self,
108
108
+
request: http::Request<Vec<u8>>,
109
109
+
) -> Result<http::Response<ByteStream>, Self::Error> {
110
110
+
// Convert http::Request to reqwest::Request
111
111
+
let (parts, body) = request.into_parts();
112
112
+
113
113
+
let mut req = self.request(parts.method, parts.uri.to_string()).body(body);
114
114
+
115
115
+
// Copy headers
116
116
+
for (name, value) in parts.headers.iter() {
117
117
+
req = req.header(name.as_str(), value.as_bytes());
118
118
+
}
119
119
+
120
120
+
// Send request and get streaming response
121
121
+
let resp = req.send().await?;
122
122
+
123
123
+
// Convert reqwest::Response to http::Response with ByteStream
124
124
+
let mut builder = http::Response::builder().status(resp.status());
125
125
+
126
126
+
// Copy headers
127
127
+
for (name, value) in resp.headers().iter() {
128
128
+
builder = builder.header(name.as_str(), value.as_bytes());
129
129
+
}
130
130
+
131
131
+
// Convert bytes_stream to ByteStream
132
132
+
use futures::StreamExt;
133
133
+
let stream = resp.bytes_stream().map(|result| {
134
134
+
result.map_err(|e| StreamError::transport(e))
135
135
+
});
136
136
+
let byte_stream = ByteStream::new(stream);
137
137
+
138
138
+
Ok(builder.body(byte_stream).expect("Failed to build response"))
139
139
+
}
140
140
+
141
141
+
async fn send_http_bidirectional<S>(
142
142
+
&self,
143
143
+
parts: http::request::Parts,
144
144
+
body: S,
145
145
+
) -> Result<http::Response<ByteStream>, Self::Error>
146
146
+
where
147
147
+
S: n0_future::Stream<Item = bytes::Bytes> + Send + 'static,
148
148
+
{
149
149
+
// Convert stream to reqwest::Body
150
150
+
use futures::StreamExt;
151
151
+
let ok_stream = body.map(Ok::<_, Self::Error>);
152
152
+
let reqwest_body = reqwest::Body::wrap_stream(ok_stream);
153
153
+
154
154
+
let mut req = self
155
155
+
.request(parts.method, parts.uri.to_string())
156
156
+
.body(reqwest_body);
157
157
+
158
158
+
// Copy headers
159
159
+
for (name, value) in parts.headers.iter() {
160
160
+
req = req.header(name.as_str(), value.as_bytes());
161
161
+
}
162
162
+
163
163
+
// Send and convert response
164
164
+
let resp = req.send().await?;
165
165
+
166
166
+
let mut builder = http::Response::builder().status(resp.status());
167
167
+
168
168
+
for (name, value) in resp.headers().iter() {
169
169
+
builder = builder.header(name.as_str(), value.as_bytes());
170
170
+
}
171
171
+
172
172
+
let stream = resp.bytes_stream().map(|result| {
173
173
+
result.map_err(|e| StreamError::transport(e))
174
174
+
});
175
175
+
let byte_stream = ByteStream::new(stream);
176
176
+
177
177
+
Ok(builder.body(byte_stream).expect("Failed to build response"))
178
178
+
}
179
179
+
}
+3
crates/jacquard-common/src/lib.rs
···
224
224
#[cfg(feature = "streaming")]
225
225
pub mod stream;
226
226
227
227
+
#[cfg(feature = "streaming")]
228
228
+
pub use stream::{ByteStream, ByteSink, StreamError, StreamErrorKind};
229
229
+
227
230
pub use types::value::*;
228
231
229
232
/// Authorization token types for XRPC requests.
+21
crates/jacquard-common/tests/http_streaming.rs
···
1
1
+
#![cfg(all(feature = "streaming", feature = "reqwest-client"))]
2
2
+
3
3
+
use jacquard_common::http_client::HttpClientExt;
4
4
+
5
5
+
#[tokio::test]
6
6
+
async fn reqwest_client_can_stream_response() {
7
7
+
let client = reqwest::Client::new();
8
8
+
9
9
+
let request = http::Request::builder()
10
10
+
.uri("https://www.rust-lang.org/")
11
11
+
.body(vec![])
12
12
+
.unwrap();
13
13
+
14
14
+
let response = client.send_http_streaming(request).await.unwrap();
15
15
+
// Just verify we got a response - the fact that it didn't error means the streaming works
16
16
+
assert!(
17
17
+
response.status().is_success() || response.status().is_redirection(),
18
18
+
"Status: {}",
19
19
+
response.status()
20
20
+
);
21
21
+
}