tangled
alpha
login
or
join now
nonbinary.computer
/
jacquard
80
fork
atom
A better Rust ATProto crate
80
fork
atom
overview
issues
9
pulls
pipelines
add streaming examples and module documentation
Orual
5 months ago
fa087a22
92910c6f
+105
-1
5 changed files
expand all
collapse all
unified
split
Cargo.lock
crates
jacquard-common
Cargo.toml
examples
streaming_download.rs
streaming_upload.rs
src
stream.rs
+1
Cargo.lock
···
1982
1982
"cid",
1983
1983
"ed25519-dalek",
1984
1984
"futures",
1985
1985
+
"futures-lite",
1985
1986
"getrandom 0.3.4",
1986
1987
"http",
1987
1988
"ipld-core",
+2
-1
crates/jacquard-common/Cargo.toml
···
81
81
features = ["arithmetic"]
82
82
83
83
[dev-dependencies]
84
84
-
tokio = { version = "1", features = ["macros", "rt"] }
84
84
+
tokio = { version = "1", features = ["macros", "rt", "rt-multi-thread"] }
85
85
+
futures-lite = "2.6"
85
86
86
87
[package.metadata.docs.rs]
87
88
features = [ "crypto-k256", "crypto-k256", "crypto-p256"]
+27
crates/jacquard-common/examples/streaming_download.rs
···
1
1
+
//! Example: Download large file using streaming
2
2
+
#![cfg(all(feature = "streaming", feature = "reqwest-client"))]
3
3
+
4
4
+
use jacquard_common::http_client::HttpClientExt;
5
5
+
6
6
+
#[tokio::main]
7
7
+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
8
8
+
let client = reqwest::Client::new();
9
9
+
10
10
+
let request = http::Request::builder()
11
11
+
.uri("https://httpbin.org/bytes/1024")
12
12
+
.body(vec![])
13
13
+
.unwrap();
14
14
+
15
15
+
let response = client.send_http_streaming(request).await?;
16
16
+
println!("Status: {}", response.status());
17
17
+
println!("Headers: {:?}", response.headers());
18
18
+
19
19
+
let (_parts, _body) = response.into_parts();
20
20
+
println!("Received streaming response body (ByteStream)");
21
21
+
22
22
+
// Note: To iterate over chunks, use futures_lite::StreamExt on the pinned inner stream:
23
23
+
// let mut stream = Box::pin(body.into_inner());
24
24
+
// while let Some(chunk) = stream.as_mut().try_next().await? { ... }
25
25
+
26
26
+
Ok(())
27
27
+
}
+33
crates/jacquard-common/examples/streaming_upload.rs
···
1
1
+
//! Example: Upload data using streaming request body
2
2
+
#![cfg(all(feature = "streaming", feature = "reqwest-client"))]
3
3
+
4
4
+
use jacquard_common::http_client::HttpClientExt;
5
5
+
use futures::stream;
6
6
+
use bytes::Bytes;
7
7
+
8
8
+
#[tokio::main]
9
9
+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
10
10
+
let client = reqwest::Client::new();
11
11
+
12
12
+
// Create a stream of data chunks
13
13
+
let chunks = vec![
14
14
+
Bytes::from("Hello, "),
15
15
+
Bytes::from("streaming "),
16
16
+
Bytes::from("world!"),
17
17
+
];
18
18
+
let body_stream = stream::iter(chunks);
19
19
+
20
20
+
// Build request and split into parts
21
21
+
let request = http::Request::builder()
22
22
+
.method(http::Method::POST)
23
23
+
.uri("https://httpbin.org/post")
24
24
+
.body(())
25
25
+
.unwrap();
26
26
+
27
27
+
let (parts, _) = request.into_parts();
28
28
+
29
29
+
let response = client.send_http_bidirectional(parts, body_stream).await?;
30
30
+
println!("Status: {}", response.status());
31
31
+
32
32
+
Ok(())
33
33
+
}
+42
crates/jacquard-common/src/stream.rs
···
1
1
//! Stream abstractions for HTTP request/response bodies
2
2
+
//!
3
3
+
//! This module provides platform-agnostic streaming types for handling large
4
4
+
//! payloads efficiently without loading everything into memory.
5
5
+
//!
6
6
+
//! # Features
7
7
+
//!
8
8
+
//! - [`ByteStream`]: Streaming response bodies
9
9
+
//! - [`ByteSink`]: Streaming request bodies
10
10
+
//! - [`StreamError`]: Concrete error type for streaming operations
11
11
+
//!
12
12
+
//! # Platform Support
13
13
+
//!
14
14
+
//! Uses `n0-future` for platform-agnostic async streams that work on both
15
15
+
//! native and WASM targets without requiring `Send` bounds on WASM.
16
16
+
//!
17
17
+
//! # Examples
18
18
+
//!
19
19
+
//! ## Streaming Download
20
20
+
//!
21
21
+
//! ```no_run
22
22
+
//! # #[cfg(all(feature = "streaming", feature = "reqwest-client"))]
23
23
+
//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
24
24
+
//! use jacquard_common::http_client::{HttpClient, HttpClientExt};
25
25
+
//! use futures_lite::StreamExt;
26
26
+
//!
27
27
+
//! let client = reqwest::Client::new();
28
28
+
//! let request = http::Request::builder()
29
29
+
//! .uri("https://example.com/large-file")
30
30
+
//! .body(vec![])
31
31
+
//! .unwrap();
32
32
+
//!
33
33
+
//! let response = client.send_http_streaming(request).await?;
34
34
+
//! let (_parts, body) = response.into_parts();
35
35
+
//! let mut stream = Box::pin(body.into_inner());
36
36
+
//!
37
37
+
//! // Use futures_lite::StreamExt for iteration
38
38
+
//! while let Some(chunk) = stream.as_mut().try_next().await? {
39
39
+
//! // Process chunk without loading entire file into memory
40
40
+
//! }
41
41
+
//! # Ok(())
42
42
+
//! # }
43
43
+
//! ```
2
44
3
45
use std::error::Error;
4
46
use std::fmt;