tangled
alpha
login
or
join now
nonbinary.computer
/
jacquard
80
fork
atom
A better Rust ATProto crate
80
fork
atom
overview
issues
9
pulls
pipelines
raw and loosely typed data decode options
Orual
5 months ago
3fc5713e
0f033cfb
+132
-29
1 changed file
expand all
collapse all
unified
split
crates
jacquard-common
src
xrpc
subscription.rs
+132
-29
crates/jacquard-common/src/xrpc/subscription.rs
···
3
3
//! This module defines traits and types for typed WebSocket subscriptions,
4
4
//! mirroring the request/response pattern used for HTTP XRPC endpoints.
5
5
6
6
+
use n0_future::stream::Boxed;
6
7
use serde::{Deserialize, Serialize};
7
8
use std::error::Error;
8
9
use std::future::Future;
···
10
11
use url::Url;
11
12
12
13
use crate::stream::StreamError;
13
13
-
use crate::websocket::{WebSocketClient, WebSocketConnection};
14
14
-
use crate::{CowStr, IntoStatic};
14
14
+
use crate::websocket::{WebSocketClient, WebSocketConnection, WsSink, WsStream};
15
15
+
use crate::{CowStr, Data, IntoStatic, RawData, WsMessage};
15
16
16
17
/// Encoding format for subscription messages
17
18
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
···
173
174
pub fn into_stream(
174
175
self,
175
176
) -> (
176
176
-
crate::websocket::WsSink,
177
177
-
n0_future::stream::Boxed<Result<StreamMessage<'static, S>, StreamError>>,
177
177
+
WsSink,
178
178
+
Boxed<Result<StreamMessage<'static, S>, StreamError>>,
178
179
)
179
180
where
180
181
for<'a> StreamMessage<'a, S>: IntoStatic<Output = StreamMessage<'static, S>>,
···
183
184
184
185
let (tx, rx) = self.connection.split();
185
186
186
186
-
let stream: n0_future::stream::Boxed<_> = match S::ENCODING {
187
187
-
MessageEncoding::Json => {
188
188
-
Box::pin(rx.into_inner().filter_map(|msg| decode_json_msg::<S>(msg)))
189
189
-
}
190
190
-
MessageEncoding::DagCbor => {
191
191
-
Box::pin(rx.into_inner().filter_map(|msg| decode_cbor_msg::<S>(msg)))
192
192
-
}
187
187
+
let stream = match S::ENCODING {
188
188
+
MessageEncoding::Json => rx
189
189
+
.into_inner()
190
190
+
.filter_map(|msg| decode_json_msg::<S>(msg))
191
191
+
.boxed(),
192
192
+
MessageEncoding::DagCbor => rx
193
193
+
.into_inner()
194
194
+
.filter_map(|msg| decode_cbor_msg::<S>(msg))
195
195
+
.boxed(),
196
196
+
};
197
197
+
198
198
+
(tx, stream)
199
199
+
}
200
200
+
201
201
+
/// Converts the subscription into a stream of raw atproto data.
202
202
+
pub fn into_raw_data_stream(self) -> (WsSink, Boxed<Result<RawData<'static>, StreamError>>) {
203
203
+
use n0_future::StreamExt as _;
204
204
+
205
205
+
let (tx, rx) = self.connection.split();
206
206
+
207
207
+
fn parse_msg<'a>(bytes: &'a [u8]) -> Result<RawData<'a>, serde_json::Error> {
208
208
+
serde_json::from_slice(bytes)
209
209
+
}
210
210
+
fn parse_cbor<'a>(
211
211
+
bytes: &'a [u8],
212
212
+
) -> Result<RawData<'a>, serde_ipld_dagcbor::DecodeError<std::convert::Infallible>>
213
213
+
{
214
214
+
serde_ipld_dagcbor::from_slice(bytes)
215
215
+
}
216
216
+
217
217
+
let stream = match S::ENCODING {
218
218
+
MessageEncoding::Json => rx
219
219
+
.into_inner()
220
220
+
.filter_map(|msg_result| match msg_result {
221
221
+
Ok(WsMessage::Text(text)) => Some(
222
222
+
parse_msg(text.as_ref())
223
223
+
.map(|v| v.into_static())
224
224
+
.map_err(StreamError::decode),
225
225
+
),
226
226
+
Ok(WsMessage::Binary(bytes)) => Some(
227
227
+
parse_msg(&bytes)
228
228
+
.map(|v| v.into_static())
229
229
+
.map_err(StreamError::decode),
230
230
+
),
231
231
+
Ok(WsMessage::Close(_)) => Some(Err(StreamError::closed())),
232
232
+
Err(e) => Some(Err(e)),
233
233
+
})
234
234
+
.boxed(),
235
235
+
MessageEncoding::DagCbor => rx
236
236
+
.into_inner()
237
237
+
.filter_map(|msg_result| match msg_result {
238
238
+
Ok(WsMessage::Binary(bytes)) => Some(
239
239
+
parse_cbor(&bytes)
240
240
+
.map(|v| v.into_static())
241
241
+
.map_err(|e| StreamError::decode(crate::error::DecodeError::from(e))),
242
242
+
),
243
243
+
Ok(WsMessage::Text(_)) => Some(Err(StreamError::wrong_message_format(
244
244
+
"expected binary frame for CBOR, got text",
245
245
+
))),
246
246
+
Ok(WsMessage::Close(_)) => Some(Err(StreamError::closed())),
247
247
+
Err(e) => Some(Err(e)),
248
248
+
})
249
249
+
.boxed(),
250
250
+
};
251
251
+
252
252
+
(tx, stream)
253
253
+
}
254
254
+
255
255
+
/// Converts the subscription into a stream of loosely-typed atproto data.
256
256
+
pub fn into_data_stream(self) -> (WsSink, Boxed<Result<Data<'static>, StreamError>>) {
257
257
+
use n0_future::StreamExt as _;
258
258
+
259
259
+
let (tx, rx) = self.connection.split();
260
260
+
261
261
+
fn parse_msg<'a>(bytes: &'a [u8]) -> Result<Data<'a>, serde_json::Error> {
262
262
+
serde_json::from_slice(bytes)
263
263
+
}
264
264
+
fn parse_cbor<'a>(
265
265
+
bytes: &'a [u8],
266
266
+
) -> Result<Data<'a>, serde_ipld_dagcbor::DecodeError<std::convert::Infallible>> {
267
267
+
serde_ipld_dagcbor::from_slice(bytes)
268
268
+
}
269
269
+
270
270
+
let stream = match S::ENCODING {
271
271
+
MessageEncoding::Json => rx
272
272
+
.into_inner()
273
273
+
.filter_map(|msg_result| match msg_result {
274
274
+
Ok(WsMessage::Text(text)) => Some(
275
275
+
parse_msg(text.as_ref())
276
276
+
.map(|v| v.into_static())
277
277
+
.map_err(StreamError::decode),
278
278
+
),
279
279
+
Ok(WsMessage::Binary(bytes)) => Some(
280
280
+
parse_msg(&bytes)
281
281
+
.map(|v| v.into_static())
282
282
+
.map_err(StreamError::decode),
283
283
+
),
284
284
+
Ok(WsMessage::Close(_)) => Some(Err(StreamError::closed())),
285
285
+
Err(e) => Some(Err(e)),
286
286
+
})
287
287
+
.boxed(),
288
288
+
MessageEncoding::DagCbor => rx
289
289
+
.into_inner()
290
290
+
.filter_map(|msg_result| match msg_result {
291
291
+
Ok(WsMessage::Binary(bytes)) => Some(
292
292
+
parse_cbor(&bytes)
293
293
+
.map(|v| v.into_static())
294
294
+
.map_err(|e| StreamError::decode(crate::error::DecodeError::from(e))),
295
295
+
),
296
296
+
Ok(WsMessage::Text(_)) => Some(Err(StreamError::wrong_message_format(
297
297
+
"expected binary frame for CBOR, got text",
298
298
+
))),
299
299
+
Ok(WsMessage::Close(_)) => Some(Err(StreamError::closed())),
300
300
+
Err(e) => Some(Err(e)),
301
301
+
})
302
302
+
.boxed(),
193
303
};
194
304
195
305
(tx, stream)
···
205
315
/// Replaces the internal WebSocket stream with one copy and returns a typed decoded
206
316
/// stream. Both streams receive all messages. Useful for observing raw messages
207
317
/// while also processing typed messages.
208
208
-
pub fn tee(
209
209
-
&mut self,
210
210
-
) -> n0_future::stream::Boxed<Result<StreamMessage<'static, S>, StreamError>>
318
318
+
pub fn tee(&mut self) -> Boxed<Result<StreamMessage<'static, S>, StreamError>>
211
319
where
212
320
for<'a> StreamMessage<'a, S>: IntoStatic<Output = StreamMessage<'static, S>>,
213
321
{
214
322
use n0_future::StreamExt as _;
215
323
216
324
let rx = self.connection.receiver_mut();
217
217
-
let (raw_rx, typed_rx_source) = std::mem::replace(
218
218
-
rx,
219
219
-
crate::websocket::WsStream::new(futures::stream::empty()),
220
220
-
)
221
221
-
.tee();
325
325
+
let (raw_rx, typed_rx_source) =
326
326
+
std::mem::replace(rx, WsStream::new(n0_future::stream::empty())).tee();
222
327
223
328
// Put the raw stream back
224
329
*rx = raw_rx;
225
330
226
331
match S::ENCODING {
227
227
-
MessageEncoding::Json => Box::pin(
228
228
-
typed_rx_source
229
229
-
.into_inner()
230
230
-
.filter_map(|msg| decode_json_msg::<S>(msg)),
231
231
-
),
232
232
-
MessageEncoding::DagCbor => Box::pin(
233
233
-
typed_rx_source
234
234
-
.into_inner()
235
235
-
.filter_map(|msg| decode_cbor_msg::<S>(msg)),
236
236
-
),
332
332
+
MessageEncoding::Json => typed_rx_source
333
333
+
.into_inner()
334
334
+
.filter_map(|msg| decode_json_msg::<S>(msg))
335
335
+
.boxed(),
336
336
+
MessageEncoding::DagCbor => typed_rx_source
337
337
+
.into_inner()
338
338
+
.filter_map(|msg| decode_cbor_msg::<S>(msg))
339
339
+
.boxed(),
237
340
}
238
341
}
239
342
}