AT-based link agregator. Mirror of https://github.com/likeandscribe/frontpage

drainpipe: Handle ping event (#278)

* Handle ping event

* Add error message back

* Cleaner send error handling

authored by tom.sherman.is and committed by

GitHub 626d80f3 90f2f609

+24 -15
+24 -15
packages-rs/drainpipe/src/jetstream.rs
··· 235 235 ) -> Result<(), JetstreamEventError> { 236 236 // TODO: Use the write half to allow the user to change configuration settings on the fly. 237 237 let (_, mut read) = ws.split(); 238 - loop { 239 - match read.next().await { 240 - None => { 241 - log::error!("The WebSocket connection was closed unexpectedly."); 242 - return Err(JetstreamEventError::WebSocketCloseFailure); 238 + while let Some(message) = read.next().await { 239 + let event = match decode_message(message, &dictionary) { 240 + Ok(None) => { 241 + // Ping message, nothing to do. 242 + log::debug!("Received ping message, ignoring."); 243 + continue; 243 244 } 244 245 245 - Some(message) => { 246 - send_channel.send(decode_message(message, &dictionary)).map_err(|e| { 247 - log::error!("All receivers for the Jetstream connection have been dropped, closing connection. {:?}", e); 246 + Ok(Some(event)) => Ok(event), 247 + Err(e) => Err(e), 248 + }; 248 249 249 - JetstreamEventError::WebSocketCloseFailure 250 - })?; 251 - } 252 - } 250 + send_channel.send(event).map_err(|e| { 251 + log::error!("All receivers for the Jetstream connection have been dropped, closing connection. {:?}", e); 252 + JetstreamEventError::WebSocketCloseFailure 253 + })?; 253 254 } 255 + 256 + log::error!("The WebSocket connection was closed unexpectedly."); 257 + Err(JetstreamEventError::WebSocketCloseFailure) 254 258 } 255 259 256 260 fn decode_message( 257 261 message: Result<Message, tokio_tungstenite::tungstenite::Error>, 258 262 dictionary: &DecoderDictionary<'_>, 259 - ) -> Result<JetstreamEvent, JetstreamEventError> { 263 + ) -> Result<Option<JetstreamEvent>, JetstreamEventError> { 260 264 let json = match message { 265 + // Ignore ping messages 266 + Ok(Message::Ping(_)) => return Ok(None), 267 + 261 268 Ok(Message::Text(json)) => json, 262 269 263 270 Ok(Message::Binary(zstd_json)) => { ··· 279 286 Err(e) => Err(JetstreamEventError::WebsocketReceiveFailure(e))?, 280 287 }; 281 288 282 - serde_json::from_str::<JetstreamEvent>(&json) 283 - .map_err(|e| JetstreamEventError::ReceivedMalformedJSON { error: e, json }) 289 + let decoded = serde_json::from_str::<JetstreamEvent>(&json) 290 + .map_err(|e| JetstreamEventError::ReceivedMalformedJSON { error: e, json })?; 291 + 292 + Ok(Some(decoded)) 284 293 }