···20 "url",
21] }
22futures-util = "0.3.31"
023url = "2.5.4"
24serde = { version = "1.0.215", features = ["derive"] }
25serde_json = { version = "1.0.140", features = ["raw_value"] }
···31[dev-dependencies]
32anyhow = "1.0.93"
33clap = { version = "4.5.20", features = ["derive"] }
0000
···20 "url",
21] }
22futures-util = "0.3.31"
23+metrics = { version = "0.24.2", optional = true }
24url = "2.5.4"
25serde = { version = "1.0.215", features = ["derive"] }
26serde_json = { version = "1.0.140", features = ["raw_value"] }
···32[dev-dependencies]
33anyhow = "1.0.93"
34clap = { version = "4.5.20", features = ["derive"] }
35+36+[features]
37+default = []
38+metrics = ["dep:metrics"]
+1-6
jetstream/src/error.rs
···36/// See [websocket_task](crate::websocket_task).
37#[derive(Error, Debug)]
38pub enum JetstreamEventError {
39- #[error("received websocket message that could not be deserialized as JSON: {0}")]
40- ReceivedMalformedJSON(#[from] serde_json::Error),
41 #[error("failed to load built-in zstd dictionary for decoding: {0}")]
42 CompressionDictionaryError(io::Error),
43- #[error("failed to decode zstd-compressed message: {0}")]
44- CompressionDecoderError(io::Error),
45- #[error("all receivers were dropped but the websocket connection failed to close cleanly")]
46- WebSocketCloseFailure,
47 #[error("failed to send ping or pong: {0}")]
48 PingPongError(#[from] tokio_tungstenite::tungstenite::Error),
49 #[error("jetstream event receiver closed")]
···36/// See [websocket_task](crate::websocket_task).
37#[derive(Error, Debug)]
38pub enum JetstreamEventError {
0039 #[error("failed to load built-in zstd dictionary for decoding: {0}")]
40 CompressionDictionaryError(io::Error),
41+00042 #[error("failed to send ping or pong: {0}")]
43 PingPongError(#[from] tokio_tungstenite::tungstenite::Error),
44 #[error("jetstream event receiver closed")]
+99-4
jetstream/src/lib.rs
···14 stream::StreamExt,
15 SinkExt,
16};
00000017use tokio::{
18 net::TcpStream,
19 sync::mpsc::{
···299 }
300}
3010000000000000000000000000000000000302impl JetstreamConnector {
303 /// Create a Jetstream connector with a valid [JetstreamConfig].
304 ///
305 /// After creation, you can call [connect] to connect to the provided Jetstream instance.
306 pub fn new(config: JetstreamConfig) -> Result<Self, ConfigValidationError> {
000307 // We validate the configuration here so any issues are caught early.
308 config.validate()?;
309 Ok(JetstreamConnector { config })
···359 }
360 };
3610000000362 let mut last_cursor = connect_cursor;
363 retry_attempt += 1;
364 if let Ok((ws_stream, _)) = connect_async(req).await {
···368 websocket_task(dict, ws_stream, send_channel.clone(), &mut last_cursor)
369 .await
370 {
371- if let JetstreamEventError::ReceiverClosedError = e {
372- log::error!("Jetstream receiver channel closed. Exiting consumer.");
373- return;
000000000000374 }
375- log::error!("Jetstream closed after encountering error: {e:?}");
376 } else {
000377 log::warn!("Jetstream connection closed cleanly");
378 }
379 if t_connected.elapsed() > Duration::from_secs(success_threshold_s) {
···425 match socket_read.next().await {
426 Some(Ok(message)) => match message {
427 Message::Text(json) => {
0000000428 let event: JetstreamEvent = match serde_json::from_str(&json) {
429 Ok(ev) => ev,
430 Err(e) => {
000431 log::warn!(
432 "failed to parse json: {e:?} (from {})",
433 json.get(..24).unwrap_or(&json)
···439440 if let Some(last) = last_cursor {
441 if event_cursor <= *last {
000442 log::warn!("event cursor {event_cursor:?} was not newer than the last one: {last:?}. dropping event.");
443 continue;
444 }
···453 } else if let Some(last) = last_cursor.as_mut() {
454 *last = event_cursor;
455 }
00456 }
457 Message::Binary(zstd_json) => {
0000000458 let mut cursor = IoCursor::new(zstd_json);
459 let decoder =
460 zstd::stream::Decoder::with_prepared_dictionary(&mut cursor, &dictionary)
···463 let event: JetstreamEvent = match serde_json::from_reader(decoder) {
464 Ok(ev) => ev,
465 Err(e) => {
000466 log::warn!("failed to parse json: {e:?}");
467 continue;
468 }
···471472 if let Some(last) = last_cursor {
473 if event_cursor <= *last {
000474 log::warn!("event cursor {event_cursor:?} was not newer than the last one: {last:?}. dropping event.");
475 continue;
476 }
···485 } else if let Some(last) = last_cursor.as_mut() {
486 *last = event_cursor;
487 }
00488 }
489 Message::Ping(vec) => {
490 log::trace!("Ping recieved, responding");
···14 stream::StreamExt,
15 SinkExt,
16};
17+#[cfg(feature = "metrics")]
18+use metrics::{
19+ counter,
20+ describe_counter,
21+ Unit,
22+};
23use tokio::{
24 net::TcpStream,
25 sync::mpsc::{
···305 }
306}
307308+#[cfg(feature = "metrics")]
309+fn describe_metrics() {
310+ describe_counter!(
311+ "jetstream_connects",
312+ Unit::Count,
313+ "how many times we've tried to connect"
314+ );
315+ describe_counter!(
316+ "jetstream_disconnects",
317+ Unit::Count,
318+ "how many times we've been disconnected"
319+ );
320+ describe_counter!(
321+ "jetstream_total_events_received",
322+ Unit::Count,
323+ "total number of events received"
324+ );
325+ describe_counter!(
326+ "jetstream_total_bytes_received",
327+ Unit::Count,
328+ "total uncompressed bytes received, not including websocket overhead"
329+ );
330+ describe_counter!(
331+ "jetstream_total_event_errors",
332+ Unit::Count,
333+ "total errors when handling events"
334+ );
335+ describe_counter!(
336+ "jetstream_total_events_sent",
337+ Unit::Count,
338+ "total events sent to the consumer"
339+ );
340+}
341+342impl JetstreamConnector {
343 /// Create a Jetstream connector with a valid [JetstreamConfig].
344 ///
345 /// After creation, you can call [connect] to connect to the provided Jetstream instance.
346 pub fn new(config: JetstreamConfig) -> Result<Self, ConfigValidationError> {
347+ #[cfg(feature = "metrics")]
348+ describe_metrics();
349+350 // We validate the configuration here so any issues are caught early.
351 config.validate()?;
352 Ok(JetstreamConnector { config })
···402 }
403 };
404405+ #[cfg(feature = "metrics")]
406+ if let Some(host) = req.uri().host() {
407+ let retry = if retry_attempt > 0 { "yes" } else { "no" };
408+ counter!("jetstream_connects", "host" => host.to_string(), "retry" => retry)
409+ .increment(1);
410+ }
411+412 let mut last_cursor = connect_cursor;
413 retry_attempt += 1;
414 if let Ok((ws_stream, _)) = connect_async(req).await {
···418 websocket_task(dict, ws_stream, send_channel.clone(), &mut last_cursor)
419 .await
420 {
421+ match e {
422+ JetstreamEventError::ReceiverClosedError => {
423+ #[cfg(feature="metrics")]
424+ counter!("jetstream_disconnects", "reason" => "channel", "fatal" => "yes").increment(1);
425+ log::error!("Jetstream receiver channel closed. Exiting consumer.");
426+ return;
427+ }
428+ JetstreamEventError::CompressionDictionaryError(_) => {
429+ #[cfg(feature="metrics")]
430+ counter!("jetstream_disconnects", "reason" => "zstd", "fatal" => "no").increment(1);
431+ }
432+ JetstreamEventError::PingPongError(_) => {
433+ #[cfg(feature="metrics")]
434+ counter!("jetstream_disconnects", "reason" => "pingpong", "fatal" => "no").increment(1);
435+ }
436 }
437+ log::warn!("Jetstream closed after encountering error: {e:?}");
438 } else {
439+ #[cfg(feature = "metrics")]
440+ counter!("jetstream_disconnects", "reason" => "close", "fatal" => "no")
441+ .increment(1);
442 log::warn!("Jetstream connection closed cleanly");
443 }
444 if t_connected.elapsed() > Duration::from_secs(success_threshold_s) {
···490 match socket_read.next().await {
491 Some(Ok(message)) => match message {
492 Message::Text(json) => {
493+ #[cfg(feature = "metrics")]
494+ {
495+ counter!("jetstream_total_events_received", "compressed" => "false")
496+ .increment(1);
497+ counter!("jetstream_total_bytes_received", "compressed" => "false")
498+ .increment(json.len() as u64);
499+ }
500 let event: JetstreamEvent = match serde_json::from_str(&json) {
501 Ok(ev) => ev,
502 Err(e) => {
503+ #[cfg(feature = "metrics")]
504+ counter!("jetstream_total_event_errors", "reason" => "deserialize")
505+ .increment(1);
506 log::warn!(
507 "failed to parse json: {e:?} (from {})",
508 json.get(..24).unwrap_or(&json)
···514515 if let Some(last) = last_cursor {
516 if event_cursor <= *last {
517+ #[cfg(feature = "metrics")]
518+ counter!("jetstream_total_event_errors", "reason" => "old")
519+ .increment(1);
520 log::warn!("event cursor {event_cursor:?} was not newer than the last one: {last:?}. dropping event.");
521 continue;
522 }
···531 } else if let Some(last) = last_cursor.as_mut() {
532 *last = event_cursor;
533 }
534+ #[cfg(feature = "metrics")]
535+ counter!("jetstream_total_events_sent").increment(1);
536 }
537 Message::Binary(zstd_json) => {
538+ #[cfg(feature = "metrics")]
539+ {
540+ counter!("jetstream_total_events_received", "compressed" => "true")
541+ .increment(1);
542+ counter!("jetstream_total_bytes_received", "compressed" => "true")
543+ .increment(zstd_json.len() as u64);
544+ }
545 let mut cursor = IoCursor::new(zstd_json);
546 let decoder =
547 zstd::stream::Decoder::with_prepared_dictionary(&mut cursor, &dictionary)
···550 let event: JetstreamEvent = match serde_json::from_reader(decoder) {
551 Ok(ev) => ev,
552 Err(e) => {
553+ #[cfg(feature = "metrics")]
554+ counter!("jetstream_total_event_errors", "reason" => "deserialize")
555+ .increment(1);
556 log::warn!("failed to parse json: {e:?}");
557 continue;
558 }
···561562 if let Some(last) = last_cursor {
563 if event_cursor <= *last {
564+ #[cfg(feature = "metrics")]
565+ counter!("jetstream_total_event_errors", "reason" => "old")
566+ .increment(1);
567 log::warn!("event cursor {event_cursor:?} was not newer than the last one: {last:?}. dropping event.");
568 continue;
569 }
···578 } else if let Some(last) = last_cursor.as_mut() {
579 *last = event_cursor;
580 }
581+ #[cfg(feature = "metrics")]
582+ counter!("jetstream_total_events_sent").increment(1);
583 }
584 Message::Ping(vec) => {
585 log::trace!("Ping recieved, responding");