···1+use std::time::{
2+ Duration,
3+ SystemTime,
4+ SystemTimeError,
5+ UNIX_EPOCH,
6+};
7+8+use chrono::Utc;
9+use serde::{
10+ Deserialize,
11+ Serialize,
12+};
13+use serde_json::value::RawValue;
14+15+use crate::exports;
16+17+/// Opaque wrapper for the time_us cursor used by jetstream
18+#[derive(Debug, Serialize, Deserialize, Copy, Clone, PartialEq, PartialOrd)]
19+pub struct Cursor(u64);
20+21+#[derive(Debug, Deserialize)]
22+#[serde(rename_all = "snake_case")]
23+pub struct JetstreamEvent {
24+ #[serde(rename = "time_us")]
25+ pub cursor: Cursor,
26+ pub did: exports::Did,
27+ pub kind: EventKind,
28+ pub commit: Option<CommitEvent>,
29+ pub identity: Option<IdentityEvent>,
30+ pub account: Option<AccountEvent>,
31+}
32+33+#[derive(Debug, Deserialize, PartialEq)]
34+#[serde(rename_all = "snake_case")]
35+pub enum EventKind {
36+ Commit,
37+ Identity,
38+ Account,
39+}
40+41+#[derive(Debug, Deserialize)]
42+#[serde(rename_all = "snake_case")]
43+pub struct CommitEvent {
44+ pub collection: exports::Nsid,
45+ pub rkey: exports::RecordKey,
46+ pub rev: String,
47+ pub operation: CommitOp,
48+ pub record: Option<Box<RawValue>>,
49+ pub cid: Option<exports::Cid>,
50+}
51+52+#[derive(Debug, Deserialize, PartialEq)]
53+#[serde(rename_all = "snake_case")]
54+pub enum CommitOp {
55+ Create,
56+ Update,
57+ Delete,
58+}
59+60+#[derive(Debug, Deserialize, PartialEq)]
61+pub struct IdentityEvent {
62+ pub did: exports::Did,
63+ pub handle: Option<exports::Handle>,
64+ pub seq: u64,
65+ pub time: chrono::DateTime<Utc>,
66+}
67+68+#[derive(Debug, Deserialize, PartialEq)]
69+pub struct AccountEvent {
70+ pub active: bool,
71+ pub did: exports::Did,
72+ pub seq: u64,
73+ pub time: chrono::DateTime<Utc>,
74+ pub status: Option<String>,
75+}
76+77+impl Cursor {
78+ /// Get a cursor that will consume all available jetstream replay
79+ ///
80+ /// This sets the cursor to zero.
81+ ///
82+ /// Jetstream instances typically only have a few days of replay.
83+ pub fn from_start() -> Self {
84+ Self(0)
85+ }
86+ /// Get a cursor for a specific time
87+ ///
88+ /// Panics: if t is older than the unix epoch: Jan 1, 1970.
89+ ///
90+ /// If you want to receive all available jetstream replay (typically a few days), use
91+ /// .from_start()
92+ ///
93+ /// Warning: this exploits the internal implementation detail of jetstream cursors
94+ /// being ~microsecond timestamps.
95+ pub fn at(t: SystemTime) -> Self {
96+ let unix_dt = t
97+ .duration_since(UNIX_EPOCH)
98+ .expect("cannot set jetstream cursor earlier than unix epoch");
99+ Self(unix_dt.as_micros() as u64)
100+ }
101+ /// Get a cursor rewound from now by this amount
102+ ///
103+ /// Panics: if d is greater than the time since the unix epoch: Jan 1, 1970.
104+ ///
105+ /// Jetstream instances typically only have a few days of replay.
106+ ///
107+ /// Warning: this exploits the internal implementation detail of jetstream cursors
108+ /// being ~microsecond timestamps.
109+ pub fn back_by(d: Duration) -> Self {
110+ Self::at(SystemTime::now() - d)
111+ }
112+ /// Get a Cursor from a raw u64
113+ ///
114+ /// For example, from a jetstream event's `time_us` field.
115+ pub fn from_raw_u64(time_us: u64) -> Self {
116+ Self(time_us)
117+ }
118+ /// Get the raw u64 value from this cursor.
119+ pub fn to_raw_u64(&self) -> u64 {
120+ self.0
121+ }
122+ /// Format the cursor value for use in a jetstream connection url querystring
123+ pub fn to_jetstream(&self) -> String {
124+ self.0.to_string()
125+ }
126+ /// Compute the time span since an earlier cursor or [SystemTime]
127+ ///
128+ /// Warning: this exploits the internal implementation detail of jetstream cursors
129+ /// being ~microsecond timestamps.
130+ pub fn duration_since(
131+ &self,
132+ earlier: impl Into<SystemTime>,
133+ ) -> Result<Duration, SystemTimeError> {
134+ let t: SystemTime = self.into();
135+ t.duration_since(earlier.into())
136+ }
137+ /// Compute the age of the cursor vs the local clock
138+ ///
139+ /// Warning: this exploits the internal implementation detail of jetstream cursors
140+ pub fn elapsed(&self) -> Result<Duration, SystemTimeError> {
141+ let t: SystemTime = self.into();
142+ t.elapsed()
143+ }
144+ /// Get the immediate next cursor value
145+ ///
146+ /// This is possible for the implementation of jetstream cursors
147+ pub fn next(&self) -> Cursor {
148+ Self(self.0 + 1)
149+ }
150+}
151+152+impl From<&Cursor> for SystemTime {
153+ /// Convert a cursor directly to a [SystemTime]
154+ ///
155+ /// Warning: this exploits the internal implementation detail of jetstream cursors
156+ /// being ~microsecond timestamps.
157+ fn from(c: &Cursor) -> Self {
158+ UNIX_EPOCH + Duration::from_micros(c.0)
159+ }
160+}
161+162+#[cfg(test)]
163+mod test {
164+ use super::*;
165+166+ #[test]
167+ fn test_parse_commit_event() -> anyhow::Result<()> {
168+ let json = r#"{
169+ "rev":"3llrdsginou2i",
170+ "operation":"create",
171+ "collection":"app.bsky.feed.post",
172+ "rkey":"3llrdsglqdc2s",
173+ "cid": "bafyreidofvwoqvd2cnzbun6dkzgfucxh57tirf3ohhde7lsvh4fu3jehgy",
174+ "record": {"$type":"app.bsky.feed.post","createdAt":"2025-04-01T16:58:06.154Z","langs":["en"],"text":"I wish apirl 1st would stop existing lol"}
175+ }"#;
176+ let commit: CommitEvent = serde_json::from_str(json)?;
177+ assert_eq!(
178+ commit.cid.unwrap(),
179+ "bafyreidofvwoqvd2cnzbun6dkzgfucxh57tirf3ohhde7lsvh4fu3jehgy".parse()?
180+ );
181+ assert_eq!(
182+ commit.record.unwrap().get(),
183+ r#"{"$type":"app.bsky.feed.post","createdAt":"2025-04-01T16:58:06.154Z","langs":["en"],"text":"I wish apirl 1st would stop existing lol"}"#
184+ );
185+ Ok(())
186+ }
187+188+ #[test]
189+ fn test_parse_whole_event() -> anyhow::Result<()> {
190+ let json = r#"{"did":"did:plc:ai3dzf35cth7s3st7n7jsd7r","time_us":1743526687419798,"kind":"commit","commit":{"rev":"3llrdsginou2i","operation":"create","collection":"app.bsky.feed.post","rkey":"3llrdsglqdc2s","record":{"$type":"app.bsky.feed.post","createdAt":"2025-04-01T16:58:06.154Z","langs":["en"],"text":"I wish apirl 1st would stop existing lol"},"cid":"bafyreidofvwoqvd2cnzbun6dkzgfucxh57tirf3ohhde7lsvh4fu3jehgy"}}"#;
191+ let event: JetstreamEvent = serde_json::from_str(json)?;
192+ assert_eq!(event.kind, EventKind::Commit);
193+ assert!(event.commit.is_some());
194+ let commit = event.commit.unwrap();
195+ assert_eq!(
196+ commit.cid.unwrap(),
197+ "bafyreidofvwoqvd2cnzbun6dkzgfucxh57tirf3ohhde7lsvh4fu3jehgy".parse()?
198+ );
199+ assert_eq!(
200+ commit.record.unwrap().get(),
201+ r#"{"$type":"app.bsky.feed.post","createdAt":"2025-04-01T16:58:06.154Z","langs":["en"],"text":"I wish apirl 1st would stop existing lol"}"#
202+ );
203+ Ok(())
204+ }
205+}
-40
jetstream/src/events/account.rs
···1-use chrono::Utc;
2-use serde::Deserialize;
3-4-use crate::{
5- events::EventInfo,
6- exports,
7-};
8-9-/// An event representing a change to an account.
10-#[derive(Deserialize, Debug)]
11-pub struct AccountEvent {
12- /// Basic metadata included with every event.
13- #[serde(flatten)]
14- pub info: EventInfo,
15- /// Account specific data bundled with this event.
16- pub account: AccountData,
17-}
18-19-/// Account specific data bundled with an account event.
20-#[derive(Deserialize, Debug)]
21-pub struct AccountData {
22- /// Whether the account is currently active.
23- pub active: bool,
24- /// The DID of the account.
25- pub did: exports::Did,
26- pub seq: u64,
27- pub time: chrono::DateTime<Utc>,
28- /// If `active` is `false` this will be present to explain why the account is inactive.
29- pub status: Option<AccountStatus>,
30-}
31-32-/// The possible reasons an account might be listed as inactive.
33-#[derive(Deserialize, Debug)]
34-#[serde(rename_all = "lowercase")]
35-pub enum AccountStatus {
36- Deactivated,
37- Deleted,
38- Suspended,
39- TakenDown,
40-}
···0000000000000000000000000000000000000000
-55
jetstream/src/events/commit.rs
···1-use serde::Deserialize;
2-3-use crate::{
4- events::EventInfo,
5- exports,
6-};
7-8-/// An event representing a repo commit, which can be a `create`, `update`, or `delete` operation.
9-#[derive(Deserialize, Debug)]
10-#[serde(untagged, rename_all = "snake_case")]
11-pub enum CommitEvent<R> {
12- CreateOrUpdate {
13- #[serde(flatten)]
14- info: EventInfo,
15- commit: CommitData<R>,
16- },
17- Delete {
18- #[serde(flatten)]
19- info: EventInfo,
20- commit: CommitInfo,
21- },
22-}
23-24-/// The type of commit operation that was performed.
25-#[derive(Deserialize, Debug, PartialEq)]
26-#[serde(rename_all = "snake_case")]
27-pub enum CommitType {
28- Create,
29- Update,
30- Delete,
31-}
32-33-/// Basic commit specific info bundled with every event, also the only data included with a `delete`
34-/// operation.
35-#[derive(Deserialize, Debug)]
36-pub struct CommitInfo {
37- /// The type of commit operation that was performed.
38- pub operation: CommitType,
39- pub rev: String,
40- pub rkey: exports::RecordKey,
41- /// The NSID of the record type that this commit is associated with.
42- pub collection: exports::Nsid,
43-}
44-45-/// Detailed data bundled with a commit event. This data is only included when the event is
46-/// `create` or `update`.
47-#[derive(Deserialize, Debug)]
48-pub struct CommitData<R> {
49- #[serde(flatten)]
50- pub info: CommitInfo,
51- /// The CID of the record that was operated on.
52- pub cid: exports::Cid,
53- /// The record that was operated on.
54- pub record: R,
55-}
···1-use chrono::Utc;
2-use serde::Deserialize;
3-4-use crate::{
5- events::EventInfo,
6- exports,
7-};
8-9-/// An event representing a change to an identity.
10-#[derive(Deserialize, Debug)]
11-pub struct IdentityEvent {
12- /// Basic metadata included with every event.
13- #[serde(flatten)]
14- pub info: EventInfo,
15- /// Identity specific data bundled with this event.
16- pub identity: IdentityData,
17-}
18-19-/// Identity specific data bundled with an identity event.
20-#[derive(Deserialize, Debug)]
21-pub struct IdentityData {
22- /// The DID of the identity.
23- pub did: exports::Did,
24- /// The handle associated with the identity.
25- pub handle: Option<exports::Handle>,
26- pub seq: u64,
27- pub time: chrono::DateTime<Utc>,
28-}
···0000000000000000000000000000
-138
jetstream/src/events/mod.rs
···1-pub mod account;
2-pub mod commit;
3-pub mod identity;
4-5-use std::time::{
6- Duration,
7- SystemTime,
8- SystemTimeError,
9- UNIX_EPOCH,
10-};
11-12-use serde::Deserialize;
13-14-use crate::exports;
15-16-/// Opaque wrapper for the time_us cursor used by jetstream
17-///
18-/// Generally, you should use a cursor
19-#[derive(Deserialize, Debug, Clone, PartialEq, PartialOrd)]
20-pub struct Cursor(u64);
21-22-/// Basic data that is included with every event.
23-#[derive(Deserialize, Debug)]
24-pub struct EventInfo {
25- pub did: exports::Did,
26- pub time_us: Cursor,
27- pub kind: EventKind,
28-}
29-30-#[derive(Deserialize, Debug)]
31-#[serde(untagged)]
32-pub enum JetstreamEvent<R> {
33- Commit(commit::CommitEvent<R>),
34- Identity(identity::IdentityEvent),
35- Account(account::AccountEvent),
36-}
37-38-#[derive(Deserialize, Debug)]
39-#[serde(rename_all = "snake_case")]
40-pub enum EventKind {
41- Commit,
42- Identity,
43- Account,
44-}
45-46-impl<R> JetstreamEvent<R> {
47- pub fn cursor(&self) -> Cursor {
48- match self {
49- JetstreamEvent::Commit(commit::CommitEvent::CreateOrUpdate { info, .. }) => {
50- info.time_us.clone()
51- }
52- JetstreamEvent::Commit(commit::CommitEvent::Delete { info, .. }) => {
53- info.time_us.clone()
54- }
55- JetstreamEvent::Identity(e) => e.info.time_us.clone(),
56- JetstreamEvent::Account(e) => e.info.time_us.clone(),
57- }
58- }
59-}
60-61-impl Cursor {
62- /// Get a cursor that will consume all available jetstream replay
63- ///
64- /// This sets the cursor to zero.
65- ///
66- /// Jetstream instances typically only have a few days of replay.
67- pub fn from_start() -> Self {
68- Self(0)
69- }
70- /// Get a cursor for a specific time
71- ///
72- /// Panics: if t is older than the unix epoch: Jan 1, 1970.
73- ///
74- /// If you want to receive all available jetstream replay (typically a few days), use
75- /// .from_start()
76- ///
77- /// Warning: this exploits the internal implementation detail of jetstream cursors
78- /// being ~microsecond timestamps.
79- pub fn at(t: SystemTime) -> Self {
80- let unix_dt = t
81- .duration_since(UNIX_EPOCH)
82- .expect("cannot set jetstream cursor earlier than unix epoch");
83- Self(unix_dt.as_micros() as u64)
84- }
85- /// Get a cursor rewound from now by this amount
86- ///
87- /// Panics: if d is greater than the time since the unix epoch: Jan 1, 1970.
88- ///
89- /// Jetstream instances typically only have a few days of replay.
90- ///
91- /// Warning: this exploits the internal implementation detail of jetstream cursors
92- /// being ~microsecond timestamps.
93- pub fn back_by(d: Duration) -> Self {
94- Self::at(SystemTime::now() - d)
95- }
96- /// Get a Cursor from a raw u64
97- ///
98- /// For example, from a jetstream event's `time_us` field.
99- pub fn from_raw_u64(time_us: u64) -> Self {
100- Self(time_us)
101- }
102- /// Get the raw u64 value from this cursor.
103- pub fn to_raw_u64(&self) -> u64 {
104- self.0
105- }
106- /// Format the cursor value for use in a jetstream connection url querystring
107- pub fn to_jetstream(&self) -> String {
108- self.0.to_string()
109- }
110- /// Compute the time span since an earlier cursor or [SystemTime]
111- ///
112- /// Warning: this exploits the internal implementation detail of jetstream cursors
113- /// being ~microsecond timestamps.
114- pub fn duration_since(
115- &self,
116- earlier: impl Into<SystemTime>,
117- ) -> Result<Duration, SystemTimeError> {
118- let t: SystemTime = self.into();
119- t.duration_since(earlier.into())
120- }
121- /// Compute the age of the cursor vs the local clock
122- ///
123- /// Warning: this exploits the internal implementation detail of jetstream cursors
124- pub fn elapsed(&self) -> Result<Duration, SystemTimeError> {
125- let t: SystemTime = self.into();
126- t.elapsed()
127- }
128-}
129-130-impl From<&Cursor> for SystemTime {
131- /// Convert a cursor directly to a [SystemTime]
132- ///
133- /// Warning: this exploits the internal implementation detail of jetstream cursors
134- /// being ~microsecond timestamps.
135- fn from(c: &Cursor) -> Self {
136- UNIX_EPOCH + Duration::from_micros(c.0)
137- }
138-}
···3pub mod exports;
45use std::{
6- io::{
7- Cursor as IoCursor,
8- Read,
9- },
10- marker::PhantomData,
11 time::{
12 Duration,
13 Instant,
14 },
15};
1617-use atrium_api::record::KnownRecord;
18use futures_util::{
19 stream::StreamExt,
20 SinkExt,
21};
22-use serde::de::DeserializeOwned;
23use tokio::{
24 net::TcpStream,
25 sync::mpsc::{
···124const JETSTREAM_ZSTD_DICTIONARY: &[u8] = include_bytes!("../zstd/dictionary");
125126/// A receiver channel for consuming Jetstream events.
127-pub type JetstreamReceiver<R> = Receiver<JetstreamEvent<R>>;
128129/// An internal sender channel for sending Jetstream events to [JetstreamReceiver]'s.
130-type JetstreamSender<R> = Sender<JetstreamEvent<R>>;
131132/// A wrapper connector type for working with a WebSocket connection to a Jetstream instance to
133/// receive and consume events. See [JetstreamConnector::connect] for more info.
134-pub struct JetstreamConnector<R: DeserializeOwned> {
135 /// The configuration for the Jetstream connection.
136- config: JetstreamConfig<R>,
137}
138139pub enum JetstreamCompression {
···163 }
164}
165166-pub struct JetstreamConfig<R: DeserializeOwned = KnownRecord> {
167 /// A Jetstream endpoint to connect to with a WebSocket Scheme i.e.
168 /// `wss://jetstream1.us-east.bsky.network/subscribe`.
169 pub endpoint: String,
···200 /// can help prevent that if your consumer sometimes pauses, at a cost of higher memory
201 /// usage while events are buffered.
202 pub channel_size: usize,
203- /// Marker for record deserializable type.
204- ///
205- /// See examples/arbitrary_record.rs for an example using serde_json::Value
206- ///
207- /// You can omit this if you construct `JetstreamConfig { a: b, ..Default::default() }.
208- /// If you have to specify it, use `std::marker::PhantomData` with no type parameters.
209- pub record_type: PhantomData<R>,
210}
211212-impl<R: DeserializeOwned> Default for JetstreamConfig<R> {
213 fn default() -> Self {
214 JetstreamConfig {
215 endpoint: DefaultJetstreamEndpoints::USEastOne.into(),
···220 omit_user_agent_jetstream_info: false,
221 replay_on_reconnect: false,
222 channel_size: 4096, // a few seconds of firehose buffer
223- record_type: PhantomData,
224 }
225 }
226}
227228-impl<R: DeserializeOwned> JetstreamConfig<R> {
229 /// Constructs a new endpoint URL with the given [JetstreamConfig] applied.
230 pub fn get_request_builder(
231 &self,
···313 }
314}
315316-impl<R: DeserializeOwned + Send + 'static> JetstreamConnector<R> {
317 /// Create a Jetstream connector with a valid [JetstreamConfig].
318 ///
319 /// After creation, you can call [connect] to connect to the provided Jetstream instance.
320- pub fn new(config: JetstreamConfig<R>) -> Result<Self, ConfigValidationError> {
321 // We validate the configuration here so any issues are caught early.
322 config.validate()?;
323 Ok(JetstreamConnector { config })
···327 ///
328 /// A [JetstreamReceiver] is returned which can be used to respond to events. When all instances
329 /// of this receiver are dropped, the connection and task are automatically closed.
330- pub async fn connect(&self) -> Result<JetstreamReceiver<R>, ConnectionError> {
331 self.connect_cursor(None).await
332 }
333···343 pub async fn connect_cursor(
344 &self,
345 cursor: Option<Cursor>,
346- ) -> Result<JetstreamReceiver<R>, ConnectionError> {
347 // We validate the config again for good measure. Probably not necessary but it can't hurt.
348 self.config
349 .validate()
···365 loop {
366 let dict = DecoderDictionary::copy(JETSTREAM_ZSTD_DICTIONARY);
367368- let req = match build_request(connect_cursor.clone()) {
369 Ok(req) => req,
370 Err(e) => {
371 log::error!("Could not build jetstream websocket request: {e:?}");
···373 }
374 };
375376- let mut last_cursor = connect_cursor.clone();
377 retry_attempt += 1;
378 if let Ok((ws_stream, _)) = connect_async(req).await {
379 let t_connected = Instant::now();
···424425/// The main task that handles the WebSocket connection and sends [JetstreamEvent]'s to any
426/// receivers that are listening for them.
427-async fn websocket_task<R: DeserializeOwned>(
428 dictionary: DecoderDictionary<'_>,
429 ws: WebSocketStream<MaybeTlsStream<TcpStream>>,
430- send_channel: JetstreamSender<R>,
431 last_cursor: &mut Option<Cursor>,
432) -> Result<(), JetstreamEventError> {
433 // TODO: Use the write half to allow the user to change configuration settings on the fly.
···439 Some(Ok(message)) => {
440 match message {
441 Message::Text(json) => {
442- let event: JetstreamEvent<R> = serde_json::from_str(&json)
443 .map_err(JetstreamEventError::ReceivedMalformedJSON)?;
444- let event_cursor = event.cursor();
445446 if let Some(last) = last_cursor {
447 if event_cursor <= *last {
···464 }
465 Message::Binary(zstd_json) => {
466 let mut cursor = IoCursor::new(zstd_json);
467- let mut decoder = zstd::stream::Decoder::with_prepared_dictionary(
468 &mut cursor,
469 &dictionary,
470 )
471 .map_err(JetstreamEventError::CompressionDictionaryError)?;
472473- let mut json = String::new();
474- decoder
475- .read_to_string(&mut json)
476- .map_err(JetstreamEventError::CompressionDecoderError)?;
477-478- let event: JetstreamEvent<R> = serde_json::from_str(&json)
479 .map_err(JetstreamEventError::ReceivedMalformedJSON)?;
480- let event_cursor = event.cursor();
481482 if let Some(last) = last_cursor {
483 if event_cursor <= *last {
···3pub mod exports;
45use std::{
6+ io::Cursor as IoCursor,
00007 time::{
8 Duration,
9 Instant,
10 },
11};
12013use futures_util::{
14 stream::StreamExt,
15 SinkExt,
16};
017use tokio::{
18 net::TcpStream,
19 sync::mpsc::{
···118const JETSTREAM_ZSTD_DICTIONARY: &[u8] = include_bytes!("../zstd/dictionary");
119120/// A receiver channel for consuming Jetstream events.
121+pub type JetstreamReceiver = Receiver<JetstreamEvent>;
122123/// An internal sender channel for sending Jetstream events to [JetstreamReceiver]'s.
124+type JetstreamSender = Sender<JetstreamEvent>;
125126/// A wrapper connector type for working with a WebSocket connection to a Jetstream instance to
127/// receive and consume events. See [JetstreamConnector::connect] for more info.
128+pub struct JetstreamConnector {
129 /// The configuration for the Jetstream connection.
130+ config: JetstreamConfig,
131}
132133pub enum JetstreamCompression {
···157 }
158}
159160+pub struct JetstreamConfig {
161 /// A Jetstream endpoint to connect to with a WebSocket Scheme i.e.
162 /// `wss://jetstream1.us-east.bsky.network/subscribe`.
163 pub endpoint: String,
···194 /// can help prevent that if your consumer sometimes pauses, at a cost of higher memory
195 /// usage while events are buffered.
196 pub channel_size: usize,
0000000197}
198199+impl Default for JetstreamConfig {
200 fn default() -> Self {
201 JetstreamConfig {
202 endpoint: DefaultJetstreamEndpoints::USEastOne.into(),
···207 omit_user_agent_jetstream_info: false,
208 replay_on_reconnect: false,
209 channel_size: 4096, // a few seconds of firehose buffer
0210 }
211 }
212}
213214+impl JetstreamConfig {
215 /// Constructs a new endpoint URL with the given [JetstreamConfig] applied.
216 pub fn get_request_builder(
217 &self,
···299 }
300}
301302+impl JetstreamConnector {
303 /// Create a Jetstream connector with a valid [JetstreamConfig].
304 ///
305 /// After creation, you can call [connect] to connect to the provided Jetstream instance.
306+ pub fn new(config: JetstreamConfig) -> Result<Self, ConfigValidationError> {
307 // We validate the configuration here so any issues are caught early.
308 config.validate()?;
309 Ok(JetstreamConnector { config })
···313 ///
314 /// A [JetstreamReceiver] is returned which can be used to respond to events. When all instances
315 /// of this receiver are dropped, the connection and task are automatically closed.
316+ pub async fn connect(&self) -> Result<JetstreamReceiver, ConnectionError> {
317 self.connect_cursor(None).await
318 }
319···329 pub async fn connect_cursor(
330 &self,
331 cursor: Option<Cursor>,
332+ ) -> Result<JetstreamReceiver, ConnectionError> {
333 // We validate the config again for good measure. Probably not necessary but it can't hurt.
334 self.config
335 .validate()
···351 loop {
352 let dict = DecoderDictionary::copy(JETSTREAM_ZSTD_DICTIONARY);
353354+ let req = match build_request(connect_cursor) {
355 Ok(req) => req,
356 Err(e) => {
357 log::error!("Could not build jetstream websocket request: {e:?}");
···359 }
360 };
361362+ let mut last_cursor = connect_cursor;
363 retry_attempt += 1;
364 if let Ok((ws_stream, _)) = connect_async(req).await {
365 let t_connected = Instant::now();
···410411/// The main task that handles the WebSocket connection and sends [JetstreamEvent]'s to any
412/// receivers that are listening for them.
413+async fn websocket_task(
414 dictionary: DecoderDictionary<'_>,
415 ws: WebSocketStream<MaybeTlsStream<TcpStream>>,
416+ send_channel: JetstreamSender,
417 last_cursor: &mut Option<Cursor>,
418) -> Result<(), JetstreamEventError> {
419 // TODO: Use the write half to allow the user to change configuration settings on the fly.
···425 Some(Ok(message)) => {
426 match message {
427 Message::Text(json) => {
428+ let event: JetstreamEvent = serde_json::from_str(&json)
429 .map_err(JetstreamEventError::ReceivedMalformedJSON)?;
430+ let event_cursor = event.cursor;
431432 if let Some(last) = last_cursor {
433 if event_cursor <= *last {
···450 }
451 Message::Binary(zstd_json) => {
452 let mut cursor = IoCursor::new(zstd_json);
453+ let decoder = zstd::stream::Decoder::with_prepared_dictionary(
454 &mut cursor,
455 &dictionary,
456 )
457 .map_err(JetstreamEventError::CompressionDictionaryError)?;
458459+ let event: JetstreamEvent = serde_json::from_reader(decoder)
00000460 .map_err(JetstreamEventError::ReceivedMalformedJSON)?;
461+ let event_cursor = event.cursor;
462463 if let Some(last) = last_cursor {
464 if event_cursor <= *last {
···122```bash
123sudo sysctl -w net.ipv6.conf.default.disable_ipv6=1
124```
125+126+127+---
128+129+## fuzzing
130+131+got bit by https://github.com/cloudflare/cardinality-estimator/pull/12, so now we have a fuzz target.
132+133+install cargo-fuzz and then
134+135+```bash
136+RUSTFLAGS="-Z sanitizer=address" cargo +nightly fuzz run cardinality_estimator
137+```
138+139+to fuzz the counts value things
+92-140
ufos/src/consumer.rs
···1use jetstream::{
2- events::{
3- account::AccountEvent,
4- commit::{CommitData, CommitEvent, CommitInfo, CommitType},
5- Cursor, EventInfo, JetstreamEvent,
6- },
7- exports::Did,
8 DefaultJetstreamEndpoints, JetstreamCompression, JetstreamConfig, JetstreamConnector,
9 JetstreamReceiver,
10};
···12use std::time::Duration;
13use tokio::sync::mpsc::{channel, Receiver, Sender};
1415-use crate::{CreateRecord, DeleteAccount, DeleteRecord, EventBatch, ModifyRecord, UpdateRecord};
01617-const MAX_BATCHED_RECORDS: usize = 128; // *non-blocking* limit. drops oldest batched record per collection once reached.
18-const MAX_BATCHED_MODIFIES: usize = 512; // hard limit, total updates and deletes across all collections.
19-const MAX_ACCOUNT_REMOVES: usize = 512; // hard limit, total account deletions. actually the least frequent event, but tiny.
20-const MAX_BATCHED_COLLECTIONS: usize = 64; // hard limit, MAX_BATCHED_RECORDS applies per collection
21-const MIN_BATCH_SPAN_SECS: f64 = 2.; // try to get a bit of rest a bit.
22-const MAX_BATCH_SPAN_SECS: f64 = 60.; // hard limit of duration from oldest to latest event cursor within a batch, in seconds.
02324-const SEND_TIMEOUT_S: f64 = 60.;
25-const BATCH_QUEUE_SIZE: usize = 512; // 4096 got OOM'd. update: 1024 also got OOM'd during L0 compaction blocking
000002627#[derive(Debug)]
28-struct Batcher {
29- jetstream_receiver: JetstreamReceiver<serde_json::Value>,
30- batch_sender: Sender<EventBatch>,
31- current_batch: EventBatch,
32}
3334pub async fn consume(
35 jetstream_endpoint: &str,
36 cursor: Option<Cursor>,
37 no_compress: bool,
38-) -> anyhow::Result<Receiver<EventBatch>> {
39 let endpoint = DefaultJetstreamEndpoints::endpoint_or_shortcut(jetstream_endpoint);
40 if endpoint == jetstream_endpoint {
41- eprintln!("connecting to jetstream at {endpoint}");
42 } else {
43- eprintln!("connecting to jetstream at {jetstream_endpoint} => {endpoint}");
44 }
45- let config: JetstreamConfig<serde_json::Value> = JetstreamConfig {
46 endpoint,
47 compression: if no_compress {
48 JetstreamCompression::None
49 } else {
50 JetstreamCompression::Zstd
51 },
52- channel_size: 64, // small because we'd rather buffer events into batches
053 ..Default::default()
54 };
55 let jetstream_receiver = JetstreamConnector::new(config)?
56 .connect_cursor(cursor)
57 .await?;
58- let (batch_sender, batch_reciever) = channel::<EventBatch>(BATCH_QUEUE_SIZE);
59 let mut batcher = Batcher::new(jetstream_receiver, batch_sender);
60 tokio::task::spawn(async move { batcher.run().await });
61 Ok(batch_reciever)
62}
6364impl Batcher {
65- fn new(
66- jetstream_receiver: JetstreamReceiver<serde_json::Value>,
67- batch_sender: Sender<EventBatch>,
68- ) -> Self {
69 Self {
70 jetstream_receiver,
71 batch_sender,
···73 }
74 }
7576- async fn run(&mut self) -> anyhow::Result<()> {
77 loop {
78 if let Some(event) = self.jetstream_receiver.recv().await {
79 self.handle_event(event).await?
···83 }
84 }
8586- async fn handle_event(
87- &mut self,
88- event: JetstreamEvent<serde_json::Value>,
89- ) -> anyhow::Result<()> {
90- let event_cursor = event.cursor();
91-92- if let Some(earliest) = &self.current_batch.first_jetstream_cursor {
93- if event_cursor.duration_since(earliest)? > Duration::from_secs_f64(MAX_BATCH_SPAN_SECS)
94 {
95- self.send_current_batch_now().await?;
96 }
97 } else {
98- self.current_batch.first_jetstream_cursor = Some(event_cursor.clone());
99 }
100101- match event {
102- JetstreamEvent::Commit(CommitEvent::CreateOrUpdate { commit, info }) => {
103- match commit.info.operation {
104- CommitType::Create => self.handle_create_record(commit, info).await?,
105- CommitType::Update => {
106- self.handle_modify_record(modify_update(commit, info))
107- .await?
108- }
109- CommitType::Delete => {
110- panic!("jetstream Commit::CreateOrUpdate had Delete operation type")
111- }
000112 }
113 }
114- JetstreamEvent::Commit(CommitEvent::Delete { commit, info }) => {
115- self.handle_modify_record(modify_delete(commit, info))
116- .await?
117- }
118- JetstreamEvent::Account(AccountEvent { info, account }) if !account.active => {
119- self.handle_remove_account(info.did, info.time_us).await?
120- }
121- JetstreamEvent::Account(_) => {} // ignore account *activations*
122- JetstreamEvent::Identity(_) => {} // identity events are noops for us
123- };
124- self.current_batch.last_jetstream_cursor = Some(event_cursor.clone());
125126 // if the queue is empty and we have enough, send immediately. otherewise, let the current batch fill up.
127- if let Some(earliest) = &self.current_batch.first_jetstream_cursor {
128- if event_cursor.duration_since(earliest)?.as_secs_f64() > MIN_BATCH_SPAN_SECS
129 && self.batch_sender.capacity() == BATCH_QUEUE_SIZE
130 {
131- log::trace!("queue empty: immediately sending batch.");
132- if let Err(send_err) = self
133- .batch_sender
134- .send(mem::take(&mut self.current_batch))
135- .await
136- {
137- anyhow::bail!("Could not send batch, likely because the receiver closed or dropped: {send_err:?}");
138- }
139 }
140 }
141 Ok(())
142 }
143144- // holds up all consumer progress until it can send to the channel
145- // use this when the current batch is too full to add more to it
146- async fn send_current_batch_now(&mut self) -> anyhow::Result<()> {
147- log::warn!(
148- "attempting to send batch now (capacity: {})",
149- self.batch_sender.capacity()
150 );
151- self.batch_sender
152- .send_timeout(
153- mem::take(&mut self.current_batch),
154- Duration::from_secs_f64(SEND_TIMEOUT_S),
155- )
156- .await?;
157- Ok(())
158- }
159160- async fn handle_create_record(
161- &mut self,
162- commit: CommitData<serde_json::Value>,
163- info: EventInfo,
164- ) -> anyhow::Result<()> {
165- if !self
166- .current_batch
167- .record_creates
168- .contains_key(&commit.info.collection)
169- && self.current_batch.record_creates.len() >= MAX_BATCHED_COLLECTIONS
170- {
171- self.send_current_batch_now().await?;
172 }
173- let record = CreateRecord {
174- did: info.did,
175- rkey: commit.info.rkey,
176- record: commit.record,
177- cursor: info.time_us,
178- };
179- let collection = self
180- .current_batch
181- .record_creates
182- .entry(commit.info.collection)
183- .or_default();
184- collection.total_seen += 1;
185- collection.samples.push_front(record);
186- collection.samples.truncate(MAX_BATCHED_RECORDS);
187- Ok(())
188- }
189190- async fn handle_modify_record(&mut self, modify_record: ModifyRecord) -> anyhow::Result<()> {
191- if self.current_batch.record_modifies.len() >= MAX_BATCHED_MODIFIES {
192- self.send_current_batch_now().await?;
193- }
194- self.current_batch.record_modifies.push(modify_record);
195 Ok(())
196 }
197198- async fn handle_remove_account(&mut self, did: Did, cursor: Cursor) -> anyhow::Result<()> {
199- if self.current_batch.account_removes.len() >= MAX_ACCOUNT_REMOVES {
200- self.send_current_batch_now().await?;
201 }
202 self.current_batch
0203 .account_removes
204 .push(DeleteAccount { did, cursor });
205 Ok(())
206 }
207-}
208209-fn modify_update(commit: CommitData<serde_json::Value>, info: EventInfo) -> ModifyRecord {
210- ModifyRecord::Update(UpdateRecord {
211- did: info.did,
212- collection: commit.info.collection,
213- rkey: commit.info.rkey,
214- record: commit.record,
215- cursor: info.time_us,
216- })
217-}
218-219-fn modify_delete(commit_info: CommitInfo, info: EventInfo) -> ModifyRecord {
220- ModifyRecord::Delete(DeleteRecord {
221- did: info.did,
222- collection: commit_info.collection,
223- rkey: commit_info.rkey,
224- cursor: info.time_us,
225- })
00226}
···1use jetstream::{
2+ events::{Cursor, EventKind, JetstreamEvent},
3+ exports::{Did, Nsid},
00004 DefaultJetstreamEndpoints, JetstreamCompression, JetstreamConfig, JetstreamConnector,
5 JetstreamReceiver,
6};
···8use std::time::Duration;
9use tokio::sync::mpsc::{channel, Receiver, Sender};
1011+use crate::error::{BatchInsertError, FirehoseEventError};
12+use crate::{DeleteAccount, EventBatch, UFOsCommit};
1314+pub const MAX_BATCHED_RECORDS: usize = 128; // *non-blocking* limit. drops oldest batched record per collection once reached.
15+pub const MAX_ACCOUNT_REMOVES: usize = 1024; // hard limit, extremely unlikely to reach, but just in case
16+pub const MAX_BATCHED_COLLECTIONS: usize = 64; // hard limit, MAX_BATCHED_RECORDS applies per-collection
17+pub const MIN_BATCH_SPAN_SECS: f64 = 2.; // breathe
18+pub const MAX_BATCH_SPAN_SECS: f64 = 60.; // hard limit, pause consumer if we're unable to send by now
19+pub const SEND_TIMEOUT_S: f64 = 15.; // if the channel is blocked longer than this, something is probably up
20+pub const BATCH_QUEUE_SIZE: usize = 1; // nearly-rendez-vous
2122+pub type LimitedBatch = EventBatch<MAX_BATCHED_RECORDS>;
23+24+#[derive(Debug, Default)]
25+struct CurrentBatch {
26+ initial_cursor: Option<Cursor>,
27+ batch: LimitedBatch,
28+}
2930#[derive(Debug)]
31+pub struct Batcher {
32+ jetstream_receiver: JetstreamReceiver,
33+ batch_sender: Sender<LimitedBatch>,
34+ current_batch: CurrentBatch,
35}
3637pub async fn consume(
38 jetstream_endpoint: &str,
39 cursor: Option<Cursor>,
40 no_compress: bool,
41+) -> anyhow::Result<Receiver<LimitedBatch>> {
42 let endpoint = DefaultJetstreamEndpoints::endpoint_or_shortcut(jetstream_endpoint);
43 if endpoint == jetstream_endpoint {
44+ log::info!("connecting to jetstream at {endpoint}");
45 } else {
46+ log::info!("connecting to jetstream at {jetstream_endpoint} => {endpoint}");
47 }
48+ let config: JetstreamConfig = JetstreamConfig {
49 endpoint,
50 compression: if no_compress {
51 JetstreamCompression::None
52 } else {
53 JetstreamCompression::Zstd
54 },
55+ replay_on_reconnect: true,
56+ channel_size: 1024, // buffer up to ~1s of jetstream events
57 ..Default::default()
58 };
59 let jetstream_receiver = JetstreamConnector::new(config)?
60 .connect_cursor(cursor)
61 .await?;
62+ let (batch_sender, batch_reciever) = channel::<LimitedBatch>(BATCH_QUEUE_SIZE);
63 let mut batcher = Batcher::new(jetstream_receiver, batch_sender);
64 tokio::task::spawn(async move { batcher.run().await });
65 Ok(batch_reciever)
66}
6768impl Batcher {
69+ pub fn new(jetstream_receiver: JetstreamReceiver, batch_sender: Sender<LimitedBatch>) -> Self {
00070 Self {
71 jetstream_receiver,
72 batch_sender,
···74 }
75 }
7677+ pub async fn run(&mut self) -> anyhow::Result<()> {
78 loop {
79 if let Some(event) = self.jetstream_receiver.recv().await {
80 self.handle_event(event).await?
···84 }
85 }
8687+ async fn handle_event(&mut self, event: JetstreamEvent) -> anyhow::Result<()> {
88+ if let Some(earliest) = &self.current_batch.initial_cursor {
89+ if event.cursor.duration_since(earliest)? > Duration::from_secs_f64(MAX_BATCH_SPAN_SECS)
0000090 {
91+ self.send_current_batch_now(false).await?;
92 }
93 } else {
94+ self.current_batch.initial_cursor = Some(event.cursor);
95 }
9697+ match event.kind {
98+ EventKind::Commit => {
99+ let commit = event
100+ .commit
101+ .ok_or(FirehoseEventError::CommitEventMissingCommit)?;
102+ let (commit, nsid) = UFOsCommit::from_commit_info(commit, event.did, event.cursor)?;
103+ self.handle_commit(commit, nsid).await?;
104+ }
105+ EventKind::Account => {
106+ let account = event
107+ .account
108+ .ok_or(FirehoseEventError::AccountEventMissingAccount)?;
109+ if !account.active {
110+ self.handle_delete_account(event.did, event.cursor).await?;
111 }
112 }
113+ _ => {}
114+ }
000000000115116 // if the queue is empty and we have enough, send immediately. otherewise, let the current batch fill up.
117+ if let Some(earliest) = &self.current_batch.initial_cursor {
118+ if event.cursor.duration_since(earliest)?.as_secs_f64() > MIN_BATCH_SPAN_SECS
119 && self.batch_sender.capacity() == BATCH_QUEUE_SIZE
120 {
121+ self.send_current_batch_now(true).await?;
0000000122 }
123 }
124 Ok(())
125 }
126127+ async fn handle_commit(&mut self, commit: UFOsCommit, collection: Nsid) -> anyhow::Result<()> {
128+ let optimistic_res = self.current_batch.batch.insert_commit_by_nsid(
129+ &collection,
130+ commit,
131+ MAX_BATCHED_COLLECTIONS,
0132 );
00000000133134+ if let Err(BatchInsertError::BatchFull(commit)) = optimistic_res {
135+ self.send_current_batch_now(false).await?;
136+ self.current_batch.batch.insert_commit_by_nsid(
137+ &collection,
138+ commit,
139+ MAX_BATCHED_COLLECTIONS,
140+ )?;
141+ } else {
142+ optimistic_res?;
000143 }
000000000000000014400000145 Ok(())
146 }
147148+ async fn handle_delete_account(&mut self, did: Did, cursor: Cursor) -> anyhow::Result<()> {
149+ if self.current_batch.batch.account_removes.len() >= MAX_ACCOUNT_REMOVES {
150+ self.send_current_batch_now(false).await?;
151 }
152 self.current_batch
153+ .batch
154 .account_removes
155 .push(DeleteAccount { did, cursor });
156 Ok(())
157 }
0158159+ // holds up all consumer progress until it can send to the channel
160+ // use this when the current batch is too full to add more to it
161+ async fn send_current_batch_now(&mut self, small: bool) -> anyhow::Result<()> {
162+ let beginning = match self.current_batch.initial_cursor.map(|c| c.elapsed()) {
163+ None => "unknown".to_string(),
164+ Some(Ok(t)) => format!("{:?}", t),
165+ Some(Err(e)) => format!("+{:?}", e.duration()),
166+ };
167+ log::info!(
168+ "sending batch now from {beginning}, {}, queue capacity: {}",
169+ if small { "small" } else { "full" },
170+ self.batch_sender.capacity(),
171+ );
172+ let current = mem::take(&mut self.current_batch);
173+ self.batch_sender
174+ .send_timeout(current.batch, Duration::from_secs_f64(SEND_TIMEOUT_S))
175+ .await?;
176+ Ok(())
177+ }
178}