···11+[package]
22+authors = ["videah <videah@selfish.systems>"]
33+name = "jetstream-oxide"
44+version = "0.1.1"
55+edition = "2021"
66+license = "MIT"
77+description = "Library for easily interacting with and consuming the Bluesky Jetstream service."
88+repository = "https://github.com/videah/jetstream-oxide"
99+readme = "README.md"
1010+1111+[dependencies]
1212+async-trait = "0.1.83"
1313+atrium-api = { version = "0.24.7", default-features = false, features = [
1414+ "namespace-appbsky",
1515+] }
1616+tokio = { version = "1.41.1", features = ["full", "sync", "time"] }
1717+tokio-tungstenite = { version = "0.24.0", features = [
1818+ "connect",
1919+ "native-tls-vendored",
2020+ "url",
2121+] }
2222+futures-util = "0.3.31"
2323+url = "2.5.4"
2424+serde = { version = "1.0.215", features = ["derive"] }
2525+serde_json = "1.0.132"
2626+chrono = "0.4.38"
2727+zstd = "0.13.2"
2828+thiserror = "2.0.3"
2929+flume = "0.11.1"
3030+log = "0.4.22"
3131+tokio-util = "0.7.13"
3232+3333+[dev-dependencies]
3434+anyhow = "1.0.93"
3535+clap = { version = "4.5.20", features = ["derive"] }
+21
jetstream/LICENSE
···11+MIT License
22+33+Copyright (c) 2024 videah
44+55+Permission is hereby granted, free of charge, to any person obtaining a copy
66+of this software and associated documentation files (the "Software"), to deal
77+in the Software without restriction, including without limitation the rights
88+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
99+copies of the Software, and to permit persons to whom the Software is
1010+furnished to do so, subject to the following conditions:
1111+1212+The above copyright notice and this permission notice shall be included in all
1313+copies or substantial portions of the Software.
1414+1515+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
1616+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
1717+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
1818+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
1919+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
2020+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
2121+SOFTWARE.
+59
jetstream/README.md
···11+# jetstream-oxide
22+33+[](https://crates.io/crates/jetstream-oxide)
44+[](https://docs.rs/jetstream-oxide/latest/jetstream_oxide)
55+66+A typed Rust library for easily interacting with and consuming the
77+Bluesky [Jetstream](https://github.com/bluesky-social/jetstream)
88+service.
99+1010+```rust
1111+let config = JetstreamConfig {
1212+ endpoint: DefaultJetstreamEndpoints::USEastOne.into(),
1313+ compression: JetstreamCompression::Zstd,
1414+ ..Default::default()
1515+};
1616+1717+let jetstream = JetstreamConnector::new(config).unwrap();
1818+let receiver = jetstream.connect().await?;
1919+2020+while let Ok(event) = receiver.recv_async().await {
2121+ if let Commit(commit) = event {
2222+ match commit {
2323+ CommitEvent::Create { info, commit } => {
2424+ println!("Received create event: {:#?}", info);
2525+ }
2626+ CommitEvent::Update { info, commit } => {
2727+ println!("Received update event: {:#?}", info);
2828+ }
2929+ CommitEvent::Delete { info, commit } => {
3030+ println!("Received delete event: {:#?}", info);
3131+ }
3232+ }
3333+ }
3434+}
3535+```
3636+3737+## Example
3838+3939+A small example CLI utility to show how to use this crate can be found in the `examples` directory. To run it, use the
4040+following command:
4141+4242+```sh
4343+cargo run --example basic -- --nsid "app.bsky.feed.post"
4444+```
4545+4646+This will display a real-time feed of every single post that is being made or deleted in the entire Bluesky network,
4747+right in your terminal!
4848+4949+You can filter it down to just specific accounts like this:
5050+5151+```sh
5252+cargo run --example basic -- \
5353+--nsid "app.bsky.feed.post" \
5454+--did "did:plc:inze6wrmsm7pjl7yta3oig77"
5555+```
5656+5757+This listens for posts that *I personally make*. You can substitute your own DID and make a few test posts yourself if
5858+you'd
5959+like of course!
+63
jetstream/examples/basic.rs
···11+//! A very basic example of how to listen for create/delete events on a specific DID and NSID.
22+33+use atrium_api::{record::KnownRecord::AppBskyFeedPost, types::string};
44+use clap::Parser;
55+use jetstream_oxide::{
66+ events::{commit::CommitEvent, JetstreamEvent::Commit},
77+ DefaultJetstreamEndpoints, JetstreamCompression, JetstreamConfig, JetstreamConnector,
88+};
99+1010+#[derive(Parser, Debug)]
1111+#[command(version, about, long_about = None)]
1212+struct Args {
1313+ /// The DIDs to listen for events on, if not provided we will listen for all DIDs.
1414+ #[arg(short, long)]
1515+ did: Option<Vec<string::Did>>,
1616+ /// The NSID for the collection to listen for (e.g. `app.bsky.feed.post`).
1717+ #[arg(short, long)]
1818+ nsid: string::Nsid,
1919+}
2020+2121+#[tokio::main]
2222+async fn main() -> anyhow::Result<()> {
2323+ let args = Args::parse();
2424+2525+ let dids = args.did.unwrap_or_default();
2626+ let config = JetstreamConfig {
2727+ endpoint: DefaultJetstreamEndpoints::USEastOne.into(),
2828+ wanted_collections: vec![args.nsid.clone()],
2929+ wanted_dids: dids.clone(),
3030+ compression: JetstreamCompression::Zstd,
3131+ cursor: None,
3232+ };
3333+3434+ let jetstream = JetstreamConnector::new(config)?;
3535+ let receiver = jetstream.connect().await?;
3636+3737+ println!(
3838+ "Listening for '{}' events on DIDs: {:?}",
3939+ args.nsid.to_string(),
4040+ dids,
4141+ );
4242+4343+ while let Ok(event) = receiver.recv_async().await {
4444+ if let Commit(commit) = event {
4545+ match commit {
4646+ CommitEvent::Create { info: _, commit } => {
4747+ if let AppBskyFeedPost(record) = commit.record {
4848+ println!(
4949+ "New post created! ({})\n\n'{}'",
5050+ commit.info.rkey, record.text
5151+ );
5252+ }
5353+ }
5454+ CommitEvent::Delete { info: _, commit } => {
5555+ println!("A post has been deleted. ({})", commit.rkey);
5656+ }
5757+ _ => {}
5858+ }
5959+ }
6060+ }
6161+6262+ Ok(())
6363+}
···11+//! Various error types.
22+use std::io;
33+44+use thiserror::Error;
55+66+/// Possible errors that can occur when a [JetstreamConfig](crate::JetstreamConfig) that is passed
77+/// to a [JetstreamConnector](crate::JetstreamConnector) is invalid.
88+#[derive(Error, Debug)]
99+pub enum ConfigValidationError {
1010+ #[error("too many wanted collections: {0} > 100")]
1111+ TooManyWantedCollections(usize),
1212+ #[error("too many wanted DIDs: {0} > 10,000")]
1313+ TooManyDids(usize),
1414+}
1515+1616+/// Possible errors that can occur in the process of connecting to a Jetstream instance over
1717+/// WebSockets.
1818+///
1919+/// See [JetstreamConnector::connect](crate::JetstreamConnector::connect).
2020+#[derive(Error, Debug)]
2121+pub enum ConnectionError {
2222+ #[error("invalid endpoint: {0}")]
2323+ InvalidEndpoint(#[from] url::ParseError),
2424+ #[error("failed to connect to Jetstream instance: {0}")]
2525+ WebSocketFailure(#[from] tokio_tungstenite::tungstenite::Error),
2626+ #[error("the Jetstream config is invalid (this really should not happen here): {0}")]
2727+ InvalidConfig(#[from] ConfigValidationError),
2828+}
2929+3030+/// Possible errors that can occur when receiving events from a Jetstream instance over WebSockets.
3131+///
3232+/// See [websocket_task](crate::websocket_task).
3333+#[derive(Error, Debug)]
3434+pub enum JetstreamEventError {
3535+ #[error("received websocket message that could not be deserialized as JSON: {0}")]
3636+ ReceivedMalformedJSON(#[from] serde_json::Error),
3737+ #[error("failed to load built-in zstd dictionary for decoding: {0}")]
3838+ CompressionDictionaryError(io::Error),
3939+ #[error("failed to decode zstd-compressed message: {0}")]
4040+ CompressionDecoderError(io::Error),
4141+ #[error("all receivers were dropped but the websocket connection failed to close cleanly")]
4242+ WebSocketCloseFailure,
4343+}
+40
jetstream/src/events/account.rs
···11+use chrono::Utc;
22+use serde::Deserialize;
33+44+use crate::{
55+ events::EventInfo,
66+ exports,
77+};
88+99+/// An event representing a change to an account.
1010+#[derive(Deserialize, Debug)]
1111+pub struct AccountEvent {
1212+ /// Basic metadata included with every event.
1313+ #[serde(flatten)]
1414+ pub info: EventInfo,
1515+ /// Account specific data bundled with this event.
1616+ pub account: AccountData,
1717+}
1818+1919+/// Account specific data bundled with an account event.
2020+#[derive(Deserialize, Debug)]
2121+pub struct AccountData {
2222+ /// Whether the account is currently active.
2323+ pub active: bool,
2424+ /// The DID of the account.
2525+ pub did: exports::Did,
2626+ pub seq: u64,
2727+ pub time: chrono::DateTime<Utc>,
2828+ /// If `active` is `false` this will be present to explain why the account is inactive.
2929+ pub status: Option<AccountStatus>,
3030+}
3131+3232+/// The possible reasons an account might be listed as inactive.
3333+#[derive(Deserialize, Debug)]
3434+#[serde(rename_all = "lowercase")]
3535+pub enum AccountStatus {
3636+ Deactivated,
3737+ Deleted,
3838+ Suspended,
3939+ TakenDown,
4040+}
+61
jetstream/src/events/commit.rs
···11+use atrium_api::record::KnownRecord;
22+use serde::Deserialize;
33+44+use crate::{
55+ events::EventInfo,
66+ exports,
77+};
88+99+/// An event representing a repo commit, which can be a `create`, `update`, or `delete` operation.
1010+#[derive(Deserialize, Debug)]
1111+#[serde(untagged, rename_all = "snake_case")]
1212+pub enum CommitEvent {
1313+ Create {
1414+ #[serde(flatten)]
1515+ info: EventInfo,
1616+ commit: CommitData,
1717+ },
1818+ Update {
1919+ #[serde(flatten)]
2020+ info: EventInfo,
2121+ commit: CommitData,
2222+ },
2323+ Delete {
2424+ #[serde(flatten)]
2525+ info: EventInfo,
2626+ commit: CommitInfo,
2727+ },
2828+}
2929+3030+/// The type of commit operation that was performed.
3131+#[derive(Deserialize, Debug)]
3232+#[serde(rename_all = "snake_case")]
3333+pub enum CommitType {
3434+ Create,
3535+ Update,
3636+ Delete,
3737+}
3838+3939+/// Basic commit specific info bundled with every event, also the only data included with a `delete`
4040+/// operation.
4141+#[derive(Deserialize, Debug)]
4242+pub struct CommitInfo {
4343+ /// The type of commit operation that was performed.
4444+ pub operation: CommitType,
4545+ pub rev: String,
4646+ pub rkey: String,
4747+ /// The NSID of the record type that this commit is associated with.
4848+ pub collection: exports::Nsid,
4949+}
5050+5151+/// Detailed data bundled with a commit event. This data is only included when the event is
5252+/// `create` or `update`.
5353+#[derive(Deserialize, Debug)]
5454+pub struct CommitData {
5555+ #[serde(flatten)]
5656+ pub info: CommitInfo,
5757+ /// The CID of the record that was operated on.
5858+ pub cid: exports::Cid,
5959+ /// The record that was operated on.
6060+ pub record: KnownRecord,
6161+}
+28
jetstream/src/events/identity.rs
···11+use chrono::Utc;
22+use serde::Deserialize;
33+44+use crate::{
55+ events::EventInfo,
66+ exports,
77+};
88+99+/// An event representing a change to an identity.
1010+#[derive(Deserialize, Debug)]
1111+pub struct IdentityEvent {
1212+ /// Basic metadata included with every event.
1313+ #[serde(flatten)]
1414+ pub info: EventInfo,
1515+ /// Identity specific data bundled with this event.
1616+ pub identity: IdentityData,
1717+}
1818+1919+/// Identity specific data bundled with an identity event.
2020+#[derive(Deserialize, Debug)]
2121+pub struct IdentityData {
2222+ /// The DID of the identity.
2323+ pub did: exports::Did,
2424+ /// The handle associated with the identity.
2525+ pub handle: Option<exports::Handle>,
2626+ pub seq: u64,
2727+ pub time: chrono::DateTime<Utc>,
2828+}
+31
jetstream/src/events/mod.rs
···11+pub mod account;
22+pub mod commit;
33+pub mod identity;
44+55+use serde::Deserialize;
66+77+use crate::exports;
88+99+/// Basic data that is included with every event.
1010+#[derive(Deserialize, Debug)]
1111+pub struct EventInfo {
1212+ pub did: exports::Did,
1313+ pub time_us: u64,
1414+ pub kind: EventKind,
1515+}
1616+1717+#[derive(Deserialize, Debug)]
1818+#[serde(untagged)]
1919+pub enum JetstreamEvent {
2020+ Commit(commit::CommitEvent),
2121+ Identity(identity::IdentityEvent),
2222+ Account(account::AccountEvent),
2323+}
2424+2525+#[derive(Deserialize, Debug)]
2626+#[serde(rename_all = "snake_case")]
2727+pub enum EventKind {
2828+ Commit,
2929+ Identity,
3030+ Account,
3131+}
+8
jetstream/src/exports.rs
···11+//! Useful exports for third-party crates used by this project.
22+33+pub use atrium_api::types::string::{
44+ Cid,
55+ Did,
66+ Handle,
77+ Nsid,
88+};
+367
jetstream/src/lib.rs
···11+pub mod error;
22+pub mod events;
33+pub mod exports;
44+55+use std::{
66+ io::{Cursor, Read},
77+ sync::Arc,
88+ time::Duration,
99+};
1010+1111+use chrono::Utc;
1212+use futures_util::{stream::StreamExt, SinkExt};
1313+use tokio::{net::TcpStream, sync::Mutex};
1414+use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};
1515+use tokio_util::sync::CancellationToken;
1616+use url::Url;
1717+use zstd::dict::DecoderDictionary;
1818+1919+use crate::{
2020+ error::{ConfigValidationError, ConnectionError, JetstreamEventError},
2121+ events::JetstreamEvent,
2222+};
2323+2424+/// The Jetstream endpoints officially provided by Bluesky themselves.
2525+///
2626+/// There are no guarantees that these endpoints will always be available, but you are free
2727+/// to run your own Jetstream instance in any case.
2828+pub enum DefaultJetstreamEndpoints {
2929+ /// `jetstream1.us-east.bsky.network`
3030+ USEastOne,
3131+ /// `jetstream2.us-east.bsky.network`
3232+ USEastTwo,
3333+ /// `jetstream1.us-west.bsky.network`
3434+ USWestOne,
3535+ /// `jetstream2.us-west.bsky.network`
3636+ USWestTwo,
3737+}
3838+3939+impl From<DefaultJetstreamEndpoints> for String {
4040+ fn from(endpoint: DefaultJetstreamEndpoints) -> Self {
4141+ match endpoint {
4242+ DefaultJetstreamEndpoints::USEastOne => {
4343+ "wss://jetstream1.us-east.bsky.network/subscribe".to_owned()
4444+ }
4545+ DefaultJetstreamEndpoints::USEastTwo => {
4646+ "wss://jetstream2.us-east.bsky.network/subscribe".to_owned()
4747+ }
4848+ DefaultJetstreamEndpoints::USWestOne => {
4949+ "wss://jetstream1.us-west.bsky.network/subscribe".to_owned()
5050+ }
5151+ DefaultJetstreamEndpoints::USWestTwo => {
5252+ "wss://jetstream2.us-west.bsky.network/subscribe".to_owned()
5353+ }
5454+ }
5555+ }
5656+}
5757+5858+/// The maximum number of wanted collections that can be requested on a single Jetstream connection.
5959+const MAX_WANTED_COLLECTIONS: usize = 100;
6060+/// The maximum number of wanted DIDs that can be requested on a single Jetstream connection.
6161+const MAX_WANTED_DIDS: usize = 10_000;
6262+6363+/// The custom `zstd` dictionary used for decoding compressed Jetstream messages.
6464+///
6565+/// Sourced from the [official Bluesky Jetstream repo.](https://github.com/bluesky-social/jetstream/tree/main/pkg/models)
6666+const JETSTREAM_ZSTD_DICTIONARY: &[u8] = include_bytes!("../zstd/dictionary");
6767+6868+/// A receiver channel for consuming Jetstream events.
6969+pub type JetstreamReceiver = flume::Receiver<JetstreamEvent>;
7070+7171+/// An internal sender channel for sending Jetstream events to [JetstreamReceiver]'s.
7272+type JetstreamSender = flume::Sender<JetstreamEvent>;
7373+7474+/// A wrapper connector type for working with a WebSocket connection to a Jetstream instance to
7575+/// receive and consume events. See [JetstreamConnector::connect] for more info.
7676+pub struct JetstreamConnector {
7777+ /// The configuration for the Jetstream connection.
7878+ config: JetstreamConfig,
7979+}
8080+8181+pub enum JetstreamCompression {
8282+ /// No compression, just raw plaintext JSON.
8383+ None,
8484+ /// Use the `zstd` compression algorithm, which can result in a ~56% smaller messages on
8585+ /// average. See [here](https://github.com/bluesky-social/jetstream?tab=readme-ov-file#compression) for more info.
8686+ Zstd,
8787+}
8888+8989+impl From<JetstreamCompression> for bool {
9090+ fn from(compression: JetstreamCompression) -> Self {
9191+ match compression {
9292+ JetstreamCompression::None => false,
9393+ JetstreamCompression::Zstd => true,
9494+ }
9595+ }
9696+}
9797+9898+pub struct JetstreamConfig {
9999+ /// A Jetstream endpoint to connect to with a WebSocket Scheme i.e.
100100+ /// `wss://jetstream1.us-east.bsky.network/subscribe`.
101101+ pub endpoint: String,
102102+ /// A list of collection [NSIDs](https://atproto.com/specs/nsid) to filter events for.
103103+ ///
104104+ /// An empty list will receive events for *all* collections.
105105+ ///
106106+ /// Regardless of desired collections, all subscribers receive
107107+ /// [AccountEvent](events::account::AccountEvent) and
108108+ /// [IdentityEvent](events::identity::Identity) events.
109109+ pub wanted_collections: Vec<exports::Nsid>,
110110+ /// A list of repo [DIDs](https://atproto.com/specs/did) to filter events for.
111111+ ///
112112+ /// An empty list will receive events for *all* repos, which is a lot of events!
113113+ pub wanted_dids: Vec<exports::Did>,
114114+ /// The compression algorithm to request and use for the WebSocket connection (if any).
115115+ pub compression: JetstreamCompression,
116116+ /// An optional timestamp to begin playback from.
117117+ ///
118118+ /// An absent cursor or a cursor from the future will result in live-tail operation.
119119+ ///
120120+ /// When reconnecting, use the time_us from your most recently processed event and maybe
121121+ /// provide a negative buffer (i.e. subtract a few seconds) to ensure gapless playback.
122122+ pub cursor: Option<chrono::DateTime<Utc>>,
123123+}
124124+125125+impl Default for JetstreamConfig {
126126+ fn default() -> Self {
127127+ JetstreamConfig {
128128+ endpoint: DefaultJetstreamEndpoints::USEastOne.into(),
129129+ wanted_collections: Vec::new(),
130130+ wanted_dids: Vec::new(),
131131+ compression: JetstreamCompression::None,
132132+ cursor: None,
133133+ }
134134+ }
135135+}
136136+137137+impl JetstreamConfig {
138138+ /// Constructs a new endpoint URL with the given [JetstreamConfig] applied.
139139+ pub fn construct_endpoint(&self, endpoint: &str) -> Result<Url, url::ParseError> {
140140+ let did_search_query = self
141141+ .wanted_dids
142142+ .iter()
143143+ .map(|s| ("wantedDids", s.to_string()));
144144+145145+ let collection_search_query = self
146146+ .wanted_collections
147147+ .iter()
148148+ .map(|s| ("wantedCollections", s.to_string()));
149149+150150+ let compression = (
151151+ "compress",
152152+ match self.compression {
153153+ JetstreamCompression::None => "false".to_owned(),
154154+ JetstreamCompression::Zstd => "true".to_owned(),
155155+ },
156156+ );
157157+158158+ let cursor = self
159159+ .cursor
160160+ .map(|c| ("cursor", c.timestamp_micros().to_string()));
161161+162162+ let params = did_search_query
163163+ .chain(collection_search_query)
164164+ .chain(std::iter::once(compression))
165165+ .chain(cursor)
166166+ .collect::<Vec<(&str, String)>>();
167167+168168+ Url::parse_with_params(endpoint, params)
169169+ }
170170+171171+ /// Validates the configuration to make sure it is within the limits of the Jetstream API.
172172+ ///
173173+ /// # Constants
174174+ /// The following constants are used to validate the configuration and should only be changed
175175+ /// if the Jetstream API has itself changed.
176176+ /// - [MAX_WANTED_COLLECTIONS]
177177+ /// - [MAX_WANTED_DIDS]
178178+ pub fn validate(&self) -> Result<(), ConfigValidationError> {
179179+ let collections = self.wanted_collections.len();
180180+ let dids = self.wanted_dids.len();
181181+182182+ if collections > MAX_WANTED_COLLECTIONS {
183183+ return Err(ConfigValidationError::TooManyWantedCollections(collections));
184184+ }
185185+186186+ if dids > MAX_WANTED_DIDS {
187187+ return Err(ConfigValidationError::TooManyDids(dids));
188188+ }
189189+190190+ Ok(())
191191+ }
192192+}
193193+194194+impl JetstreamConnector {
195195+ /// Create a Jetstream connector with a valid [JetstreamConfig].
196196+ ///
197197+ /// After creation, you can call [connect] to connect to the provided Jetstream instance.
198198+ pub fn new(config: JetstreamConfig) -> Result<Self, ConfigValidationError> {
199199+ // We validate the configuration here so any issues are caught early.
200200+ config.validate()?;
201201+ Ok(JetstreamConnector { config })
202202+ }
203203+204204+ /// Connects to a Jetstream instance as defined in the [JetstreamConfig].
205205+ ///
206206+ /// A [JetstreamReceiver] is returned which can be used to respond to events. When all instances
207207+ /// of this receiver are dropped, the connection and task are automatically closed.
208208+ pub async fn connect(&self) -> Result<JetstreamReceiver, ConnectionError> {
209209+ // We validate the config again for good measure. Probably not necessary but it can't hurt.
210210+ self.config
211211+ .validate()
212212+ .map_err(ConnectionError::InvalidConfig)?;
213213+214214+ // TODO: Run some benchmarks and look into using a bounded channel instead.
215215+ let (send_channel, receive_channel) = flume::unbounded();
216216+217217+ let configured_endpoint = self
218218+ .config
219219+ .construct_endpoint(&self.config.endpoint)
220220+ .map_err(ConnectionError::InvalidEndpoint)?;
221221+222222+ tokio::task::spawn(async move {
223223+ let max_retries = 10;
224224+ let base_delay_ms = 1_000; // 1 second
225225+ let max_delay_ms = 30_000; // 30 seconds
226226+227227+ for retry_attempt in 0..max_retries {
228228+ let dict = DecoderDictionary::copy(JETSTREAM_ZSTD_DICTIONARY);
229229+230230+ if let Ok((ws_stream, _)) = connect_async(&configured_endpoint).await {
231231+ let _ = websocket_task(dict, ws_stream, send_channel.clone()).await;
232232+ }
233233+234234+ // Exponential backoff
235235+ let delay_ms = base_delay_ms * (2_u64.pow(retry_attempt));
236236+237237+ log::error!("Connection failed, retrying in {delay_ms}ms...");
238238+ tokio::time::sleep(Duration::from_millis(delay_ms.min(max_delay_ms))).await;
239239+ log::info!("Attempting to reconnect...")
240240+ }
241241+ log::error!("Connection retries exhausted. Jetstream is disconnected.");
242242+ });
243243+244244+ Ok(receive_channel)
245245+ }
246246+}
247247+248248+/// The main task that handles the WebSocket connection and sends [JetstreamEvent]'s to any
249249+/// receivers that are listening for them.
250250+async fn websocket_task(
251251+ dictionary: DecoderDictionary<'_>,
252252+ ws: WebSocketStream<MaybeTlsStream<TcpStream>>,
253253+ send_channel: JetstreamSender,
254254+) -> Result<(), JetstreamEventError> {
255255+ // TODO: Use the write half to allow the user to change configuration settings on the fly.
256256+ let (socket_write, mut socket_read) = ws.split();
257257+ let shared_socket_write = Arc::new(Mutex::new(socket_write));
258258+259259+ let ping_cancellation_token = CancellationToken::new();
260260+ let mut ping_interval = tokio::time::interval(Duration::from_secs(30));
261261+ let ping_cancelled = ping_cancellation_token.clone();
262262+ let ping_shared_socket_write = shared_socket_write.clone();
263263+ tokio::spawn(async move {
264264+ loop {
265265+ ping_interval.tick().await;
266266+ let false = ping_cancelled.is_cancelled() else {
267267+ break;
268268+ };
269269+ log::trace!("Sending ping");
270270+ match ping_shared_socket_write
271271+ .lock()
272272+ .await
273273+ .send(Message::Ping("ping".as_bytes().to_vec()))
274274+ .await
275275+ {
276276+ Ok(_) => (),
277277+ Err(error) => {
278278+ log::error!("Ping failed: {error}");
279279+ break;
280280+ }
281281+ }
282282+ }
283283+ });
284284+285285+ let mut closing_connection = false;
286286+ loop {
287287+ match socket_read.next().await {
288288+ Some(Ok(message)) => {
289289+ match message {
290290+ Message::Text(json) => {
291291+ let event = serde_json::from_str::<JetstreamEvent>(&json)
292292+ .map_err(JetstreamEventError::ReceivedMalformedJSON)?;
293293+294294+ if send_channel.send(event).is_err() {
295295+ // We can assume that all receivers have been dropped, so we can close the
296296+ // connection and exit the task.
297297+ log::info!(
298298+ "All receivers for the Jetstream connection have been dropped, closing connection."
299299+ );
300300+ closing_connection = true;
301301+ }
302302+ }
303303+ Message::Binary(zstd_json) => {
304304+ let mut cursor = Cursor::new(zstd_json);
305305+ let mut decoder = zstd::stream::Decoder::with_prepared_dictionary(
306306+ &mut cursor,
307307+ &dictionary,
308308+ )
309309+ .map_err(JetstreamEventError::CompressionDictionaryError)?;
310310+311311+ let mut json = String::new();
312312+ decoder
313313+ .read_to_string(&mut json)
314314+ .map_err(JetstreamEventError::CompressionDecoderError)?;
315315+316316+ let event = serde_json::from_str::<JetstreamEvent>(&json)
317317+ .map_err(JetstreamEventError::ReceivedMalformedJSON)?;
318318+319319+ if send_channel.send(event).is_err() {
320320+ // We can assume that all receivers have been dropped, so we can close the
321321+ // connection and exit the task.
322322+ log::info!(
323323+ "All receivers for the Jetstream connection have been dropped, closing connection..."
324324+ );
325325+ closing_connection = true;
326326+ }
327327+ }
328328+ Message::Ping(vec) => {
329329+ log::trace!("Ping recieved, responding");
330330+ _ = shared_socket_write
331331+ .lock()
332332+ .await
333333+ .send(Message::Pong(vec))
334334+ .await;
335335+ }
336336+ Message::Close(close_frame) => {
337337+ if let Some(close_frame) = close_frame {
338338+ let reason = close_frame.reason;
339339+ let code = close_frame.code;
340340+ log::trace!("Connection closed. Reason: {reason}, Code: {code}");
341341+ }
342342+ }
343343+ Message::Pong(pong) => {
344344+ let pong_payload =
345345+ String::from_utf8(pong).unwrap_or("Invalid payload".to_string());
346346+ log::trace!("Pong recieved. Payload: {pong_payload}");
347347+ }
348348+ Message::Frame(_) => (),
349349+ }
350350+ }
351351+ Some(Err(error)) => {
352352+ log::error!("Web socket error: {error}");
353353+ ping_cancellation_token.cancel();
354354+ closing_connection = true;
355355+ }
356356+ None => {
357357+ log::error!("No web socket result");
358358+ ping_cancellation_token.cancel();
359359+ closing_connection = true;
360360+ }
361361+ }
362362+ if closing_connection {
363363+ _ = shared_socket_write.lock().await.close().await;
364364+ return Ok(());
365365+ }
366366+ }
367367+}