audio streaming app plyr.fm

fix: remove disconnecting WS from connections before publishing events (#984)

disconnect_ws called _clear_output_if_matches (which publishes to Redis)
while the closing WS was still in self._connections. The _stream_reader
background task could pick up the event and _fan_out would try to send
to the already-closed WS, causing "Cannot call send once a close message
has been sent."

Move the discard(ws) before the _clear_output_if_matches call, matching
the pattern already used in _close_ws_for_did. This also improves
_find_fallback_output since it won't consider the departing WS as a
fallback candidate.

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>

authored by zzstoatzz.io

Claude Opus 4.6 and committed by
GitHub
4d64dd39 40cf6d78

+14 -11
+14 -11
backend/src/backend/_internal/jams.py
··· 412 412 413 413 async def disconnect_ws(self, jam_id: str, ws: WebSocket) -> None: 414 414 """unregister a WebSocket connection.""" 415 + # remove from connections FIRST so _fan_out won't send to this ws 416 + # while _clear_output_if_matches publishes events 417 + if jam_id in self._connections: 418 + self._connections[jam_id].discard(ws) 419 + 415 420 # check if disconnecting WS was the output device 416 421 disconnecting_client_id = self._ws_client_ids.pop(ws, None) 417 422 if disconnecting_client_id: ··· 426 431 for did in dids_to_remove: 427 432 del self._ws_by_did[did] 428 433 429 - if jam_id in self._connections: 430 - self._connections[jam_id].discard(ws) 431 - if not self._connections[jam_id]: 432 - del self._connections[jam_id] 433 - # cancel reader if no more connections 434 - if jam_id in self._reader_tasks: 435 - self._reader_tasks[jam_id].cancel() 436 - with contextlib.suppress(asyncio.CancelledError): 437 - await self._reader_tasks[jam_id] 438 - del self._reader_tasks[jam_id] 439 - logger.info("stopped stream reader for jam %s", jam_id) 434 + if jam_id in self._connections and not self._connections[jam_id]: 435 + del self._connections[jam_id] 436 + # cancel reader if no more connections 437 + if jam_id in self._reader_tasks: 438 + self._reader_tasks[jam_id].cancel() 439 + with contextlib.suppress(asyncio.CancelledError): 440 + await self._reader_tasks[jam_id] 441 + del self._reader_tasks[jam_id] 442 + logger.info("stopped stream reader for jam %s", jam_id) 440 443 441 444 async def _close_ws_for_did(self, did: str) -> None: 442 445 """close any existing WebSocket for this DID."""