thing to allow reconfiguration using the original sub parameters
additionally carried through the actual subscription parameters and drove the trait that way
allows easier reconnect logic, etc.
···409409 pub fn into_inner(self) -> Pin<Box<dyn n0_future::Sink<WsMessage, Error = StreamError>>> {
410410 self.0
411411 }
412412+413413+ /// get a mutable reference to the inner boxed sink
414414+ #[cfg(not(target_arch = "wasm32"))]
415415+ pub fn get_mut(
416416+ &mut self,
417417+ ) -> &mut Pin<Box<dyn n0_future::Sink<WsMessage, Error = StreamError> + Send>> {
418418+ use std::borrow::BorrowMut;
419419+420420+ self.0.borrow_mut()
421421+ }
422422+423423+ /// get a mutable reference to the inner boxed sink
424424+ #[cfg(target_arch = "wasm32")]
425425+ pub fn get_mut(
426426+ &mut self,
427427+ ) -> &mut Pin<Box<dyn n0_future::Sink<WsMessage, Error = StreamError> + 'static>> {
428428+ use std::borrow::BorrowMut;
429429+430430+ self.0.borrow_mut()
431431+ }
412432}
413433414434impl fmt::Debug for WsSink {
+56
crates/jacquard-common/src/xrpc/subscription.rs
···216216 }
217217}
218218219219+/// Websocket subscriber-sent control message
220220+///
221221+/// Note: this is not meaningful for atproto event stream endpoints as
222222+/// those do not support control after the fact. Jetstream does, however.
223223+///
224224+/// If you wish to control an ongoing Jetstream connection, wrap the [`WsSink`]
225225+/// returned from one of the `into_*` methods of the [`SubscriptionStream`]
226226+/// in a [`SubscriptionController`] with the corresponding message implementing
227227+/// this trait as a generic parameter.
228228+pub trait SubscriptionControlMessage: Serialize {
229229+ /// The subscription this is associated with
230230+ type Subscription: XrpcSubscription;
231231+232232+ /// Encode the control message for transmission
233233+ ///
234234+ /// Defaults to json text (matches Jetstream)
235235+ fn encode(&self) -> Result<WsMessage, StreamError> {
236236+ Ok(WsMessage::from(
237237+ serde_json::to_string(&self).map_err(StreamError::encode)?,
238238+ ))
239239+ }
240240+241241+ /// Decode the control message
242242+ fn decode<'de>(frame: &'de [u8]) -> Result<Self, StreamError>
243243+ where
244244+ Self: Deserialize<'de>,
245245+ {
246246+ Ok(serde_json::from_slice(frame).map_err(StreamError::decode)?)
247247+ }
248248+}
249249+250250+/// Control a websocket stream with a given subscription control message
251251+pub struct SubscriptionController<S: SubscriptionControlMessage> {
252252+ controller: WsSink,
253253+ _marker: PhantomData<fn() -> S>,
254254+}
255255+256256+impl<S: SubscriptionControlMessage> SubscriptionController<S> {
257257+ /// Create a new subscription controller from a WebSocket sink.
258258+ pub fn new(controller: WsSink) -> Self {
259259+ Self {
260260+ controller,
261261+ _marker: PhantomData,
262262+ }
263263+ }
264264+265265+ /// Configure the upstream connection via the websocket
266266+ pub async fn configure(&mut self, params: &S) -> Result<(), StreamError> {
267267+ let message = params.encode()?;
268268+269269+ n0_future::SinkExt::send(self.controller.get_mut(), message)
270270+ .await
271271+ .map_err(StreamError::transport)
272272+ }
273273+}
274274+219275/// Typed subscription stream wrapping a WebSocket connection.
220276///
221277/// Analogous to `Response<R>` for XRPC but for subscription streams.