···226226 println!("jetstream closed the websocket cleanly.");
227227 break;
228228 }
229229- r => eprintln!("jetstream: close result after error: {r:?}"),
229229+ Err(_) => {
230230+ counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "dirty close").increment(1);
231231+ println!("jetstream failed to close the websocket cleanly.");
232232+ break;
233233+ }
234234+ Ok(r) => {
235235+ eprintln!("jetstream: close result after error: {r:?}");
236236+ counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "read error")
237237+ .increment(1);
238238+ // if we didn't immediately get ConnectionClosed, we should keep polling read
239239+ // until we get it.
240240+ continue;
241241+ }
230242 }
231231- counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "read error")
232232- .increment(1);
233233- // if we didn't immediately get ConnectionClosed, we should keep polling read
234234- // until we get it.
235235- continue;
236243 }
237244 };
238245