tangled
alpha
login
or
join now
nonbinary.computer
/
jacquard
80
fork
atom
A better Rust ATProto crate
80
fork
atom
overview
issues
9
pulls
pipelines
add WebSocketClient trait and connection type
Orual
5 months ago
92910c6f
57910273
+90
3 changed files
expand all
collapse all
unified
split
crates
jacquard-common
Cargo.toml
src
lib.rs
websocket.rs
+1
crates/jacquard-common/Cargo.toml
···
61
61
reqwest-client = ["dep:reqwest"]
62
62
tracing = ["dep:tracing"]
63
63
streaming = ["n0-future", "futures"]
64
64
+
websocket = ["streaming"]
64
65
65
66
[dependencies.ed25519-dalek]
66
67
version = "2"
+6
crates/jacquard-common/src/lib.rs
···
230
230
#[cfg(feature = "streaming")]
231
231
pub use xrpc::StreamingResponse;
232
232
233
233
+
#[cfg(feature = "websocket")]
234
234
+
pub mod websocket;
235
235
+
236
236
+
#[cfg(feature = "websocket")]
237
237
+
pub use websocket::{WebSocketClient, WebSocketConnection};
238
238
+
233
239
pub use types::value::*;
234
240
235
241
/// Authorization token types for XRPC requests.
+83
crates/jacquard-common/src/websocket.rs
···
1
1
+
//! WebSocket client abstraction
2
2
+
3
3
+
use crate::stream::{ByteStream, ByteSink};
4
4
+
use std::future::Future;
5
5
+
use url::Url;
6
6
+
7
7
+
/// WebSocket client trait
8
8
+
#[cfg_attr(not(target_arch = "wasm32"), trait_variant::make(Send))]
9
9
+
pub trait WebSocketClient {
10
10
+
/// Error type for WebSocket operations
11
11
+
type Error: std::error::Error + Send + Sync + 'static;
12
12
+
13
13
+
/// Connect to a WebSocket endpoint
14
14
+
fn connect(
15
15
+
&self,
16
16
+
url: Url,
17
17
+
) -> impl Future<Output = Result<WebSocketConnection, Self::Error>>;
18
18
+
}
19
19
+
20
20
+
/// WebSocket connection with bidirectional streams
21
21
+
pub struct WebSocketConnection {
22
22
+
tx: ByteSink,
23
23
+
rx: ByteStream,
24
24
+
}
25
25
+
26
26
+
impl WebSocketConnection {
27
27
+
/// Create a new WebSocket connection
28
28
+
pub fn new(tx: ByteSink, rx: ByteStream) -> Self {
29
29
+
Self { tx, rx }
30
30
+
}
31
31
+
32
32
+
/// Get mutable access to the sender
33
33
+
pub fn sender_mut(&mut self) -> &mut ByteSink {
34
34
+
&mut self.tx
35
35
+
}
36
36
+
37
37
+
/// Get mutable access to the receiver
38
38
+
pub fn receiver_mut(&mut self) -> &mut ByteStream {
39
39
+
&mut self.rx
40
40
+
}
41
41
+
42
42
+
/// Split into sender and receiver
43
43
+
pub fn split(self) -> (ByteSink, ByteStream) {
44
44
+
(self.tx, self.rx)
45
45
+
}
46
46
+
47
47
+
/// Check if connection is open (always true for this abstraction)
48
48
+
pub fn is_open(&self) -> bool {
49
49
+
true
50
50
+
}
51
51
+
}
52
52
+
53
53
+
impl std::fmt::Debug for WebSocketConnection {
54
54
+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55
55
+
f.debug_struct("WebSocketConnection")
56
56
+
.finish_non_exhaustive()
57
57
+
}
58
58
+
}
59
59
+
60
60
+
#[cfg(test)]
61
61
+
mod tests {
62
62
+
use super::*;
63
63
+
use crate::stream::StreamError;
64
64
+
65
65
+
#[test]
66
66
+
fn websocket_connection_has_tx_and_rx() {
67
67
+
use futures::stream;
68
68
+
use futures::sink::SinkExt;
69
69
+
use bytes::Bytes;
70
70
+
71
71
+
let rx_stream = stream::iter(vec![Ok(Bytes::from("test"))]);
72
72
+
let rx = ByteStream::new(rx_stream);
73
73
+
74
74
+
// Create a sink that converts Infallible to StreamError
75
75
+
let drain_sink = futures::sink::drain().sink_map_err(|_: std::convert::Infallible| {
76
76
+
StreamError::closed()
77
77
+
});
78
78
+
let tx = ByteSink::new(drain_sink);
79
79
+
80
80
+
let conn = WebSocketConnection::new(tx, rx);
81
81
+
assert!(conn.is_open());
82
82
+
}
83
83
+
}