this repo has no description

Misc updates

- add some additional logging
- move cursor to query params
- add heartbeat displaying current time_us
- skip ping/pong control frames

+48 -6
+48 -6
src/consumer.rs
··· 1 1 use std::str::FromStr; 2 2 3 3 use anyhow::{anyhow, Context, Result}; 4 + use chrono::DateTime; 4 5 use futures_util::SinkExt; 5 6 use futures_util::StreamExt; 6 7 use http::HeaderValue; ··· 62 63 let last_time_us = 63 64 consumer_control_get(&self.pool, &self.config.jetstream_hostname).await?; 64 65 66 + tracing::info!(cursor = ?last_time_us, "loaded cursor from database"); 67 + 68 + let cursor_param = if let Some(cursor) = last_time_us { 69 + format!("&cursor={}", cursor) 70 + } else { 71 + String::new() 72 + }; 73 + 65 74 let uri = Uri::from_str(&format!( 66 - "wss://{}/subscribe?compress={}&requireHello=true", 67 - self.config.jetstream_hostname, self.config.compression 75 + "wss://{}/subscribe?compress={}&requireHello=true{}", 76 + self.config.jetstream_hostname, self.config.compression, cursor_param 68 77 )) 69 78 .context("invalid jetstream URL")?; 70 79 71 - tracing::debug!(uri = ?uri, "connecting to jetstream"); 80 + tracing::info!(uri = %uri, "connecting to jetstream"); 72 81 73 82 let (mut client, _) = ClientBuilder::from_uri(uri) 74 83 .add_header( ··· 83 92 wanted_collections: self.config.collections.clone(), 84 93 wanted_dids: vec![], 85 94 max_message_size_bytes: MAX_MESSAGE_SIZE as u64, 86 - cursor: last_time_us, 95 + cursor: None, 87 96 }; 88 97 let serialized_update = serde_json::to_string(&update) 89 98 .map_err(|err| anyhow::Error::msg(err).context("cannot serialize update"))?; 90 99 100 + tracing::info!(message = %serialized_update, "sending subscription update to jetstream"); 101 + 91 102 client 92 103 .send(Message::text(serialized_update)) 93 104 .await ··· 108 119 let sleeper = sleep(interval); 109 120 tokio::pin!(sleeper); 110 121 122 + let heartbeat_interval = std::time::Duration::from_secs(15); 123 + let heartbeat_sleeper = sleep(heartbeat_interval); 124 + tokio::pin!(heartbeat_sleeper); 125 + 111 126 let mut time_usec = 0i64; 112 127 113 128 loop { ··· 119 134 consumer_control_insert(&self.pool, &self.config.jetstream_hostname, time_usec).await?; 120 135 sleeper.as_mut().reset(Instant::now() + interval); 121 136 }, 137 + () = &mut heartbeat_sleeper => { 138 + if time_usec > 0 { 139 + let datetime = DateTime::from_timestamp_micros(time_usec) 140 + .map(|dt| dt.to_rfc3339()) 141 + .unwrap_or_else(|| format!("{} microseconds", time_usec)); 142 + tracing::info!(time_us = time_usec, timestamp = %datetime, "consumer heartbeat"); 143 + } 144 + heartbeat_sleeper.as_mut().reset(Instant::now() + heartbeat_interval); 145 + }, 122 146 item = client.next() => { 123 147 if item.is_none() { 124 148 tracing::warn!("jetstream connection closed"); ··· 134 158 135 159 let event = if self.config.compression { 136 160 if !item.is_binary() { 137 - tracing::debug!("compression enabled but message from jetstream is not binary"); 161 + // Skip WebSocket control frames (ping, pong, close) 162 + if item.is_ping() || item.is_pong() || item.is_close() { 163 + continue; 164 + } 165 + // Log unexpected non-binary message types 166 + tracing::warn!("received unexpected non-binary message from jetstream (not ping/pong/close)"); 138 167 continue; 139 168 } 140 169 let payload = item.into_payload(); ··· 149 178 .context(anyhow!("cannot deserialize message")) 150 179 } else { 151 180 if !item.is_text() { 152 - tracing::debug!("compression enabled but message from jetstream is not binary"); 181 + // Skip WebSocket control frames (ping, pong, close) 182 + if item.is_ping() || item.is_pong() || item.is_close() { 183 + continue; 184 + } 185 + // Log unexpected non-text message types 186 + tracing::warn!("received unexpected non-text message from jetstream (not ping/pong/close)"); 153 187 continue; 154 188 } 155 189 item.as_text() ··· 166 200 } 167 201 let event = event.unwrap(); 168 202 203 + let previous_time_usec = time_usec; 169 204 time_usec = std::cmp::max(time_usec, event.time_us); 205 + 206 + if previous_time_usec == 0 { 207 + let datetime = DateTime::from_timestamp_micros(event.time_us) 208 + .map(|dt| dt.to_rfc3339()) 209 + .unwrap_or_else(|| format!("{} microseconds", event.time_us)); 210 + tracing::info!(time_us = event.time_us, timestamp = %datetime, "received first event from jetstream"); 211 + } 170 212 171 213 if event.clone().kind != "commit" { 172 214 continue;