audio streaming app plyr.fm

fix: add Redis tombstone to prevent ghost tracks from Jetstream replay

when Jetstream reconnects it rewinds its cursor by 5s and replays events.
if a track was created and deleted within that window, the replayed delete
no-ops (row already gone) and the replayed create re-creates the track as
a ghost — no PDS record, no audio, unfixable via UI.

writes a 5-minute Redis tombstone on every track delete (both ingest and
API paths). ingest_track_create checks for the tombstone before creating
from scratch, skipping if found. fail-open on Redis errors so degraded
Redis doesn't change current behavior.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+218 -5
+48 -3
backend/src/backend/_internal/tasks/ingest.py
··· 20 20 from backend.models import Artist, Playlist, Track, TrackComment, TrackLike 21 21 from backend.utilities.database import db_session 22 22 from backend.utilities.lexicon import validate_record 23 + from backend.utilities.redis import get_async_redis_client 23 24 24 25 logger = logging.getLogger(__name__) 25 26 ··· 57 58 return dt > datetime.now(UTC) + _MAX_CLOCK_SKEW 58 59 59 60 61 + _TOMBSTONE_PREFIX = "plyr:tombstone:" 62 + _TOMBSTONE_TTL_SECONDS = 300 # 5 min — well beyond 5s cursor rewind 63 + 64 + # NOTE: this suppresses ANY create for the same URI within the TTL window, 65 + # not just stale replays. this is acceptable because plyr.fm always generates 66 + # fresh TID-based rkeys for new tracks (upload path + restore-record), so a 67 + # legitimate same-URI re-create never happens in practice. if ATProto same-URI 68 + # putRecord support is ever needed, this will need a sequence check instead. 69 + 70 + 71 + async def _write_tombstone(uri: str) -> None: 72 + """mark a URI as recently deleted so replayed creates are skipped.""" 73 + try: 74 + redis = get_async_redis_client() 75 + await redis.set(f"{_TOMBSTONE_PREFIX}{uri}", "1", ex=_TOMBSTONE_TTL_SECONDS) 76 + except Exception: 77 + logger.debug("tombstone write failed for %s", uri) 78 + 79 + 80 + async def _check_tombstone(uri: str) -> bool: 81 + """return True if the URI was recently deleted (fail-open on errors).""" 82 + try: 83 + redis = get_async_redis_client() 84 + return await redis.exists(f"{_TOMBSTONE_PREFIX}{uri}") > 0 85 + except Exception: 86 + logger.debug("tombstone check failed for %s", uri) 87 + return False 88 + 89 + 60 90 # --- track tasks --- 61 91 62 92 ··· 135 165 ) 136 166 else: 137 167 logger.debug("ingest_track_create: duplicate URI %s, skipping", uri) 168 + return 169 + 170 + # recently deleted? skip to prevent ghost tracks from cursor rewind 171 + if await _check_tombstone(uri): 172 + logfire.info( 173 + "ingest: skipping create for tombstoned URI", 174 + uri=uri, 175 + artist_did=did, 176 + ) 138 177 return 139 178 140 179 # no existing row — create from scratch (external ATProto client) ··· 289 328 await db.commit() 290 329 logfire.info("ingest: track deleted", uri=uri, artist_did=did) 291 330 else: 292 - logger.debug("ingest_track_delete: track %s not found", uri) 331 + logfire.warn( 332 + "ingest: track not found for delete", 333 + uri=uri, 334 + artist_did=did, 335 + rkey=rkey, 336 + ) 293 337 except OperationalError: 294 - # deadlock with the API delete — the other transaction will handle it 295 - logger.debug("ingest_track_delete: deadlock on %s, skipping", uri) 338 + logfire.warn("ingest: deadlock on track delete", uri=uri, artist_did=did) 339 + 340 + await _write_tombstone(uri) 296 341 297 342 298 343 # --- like tasks ---
+8
backend/src/backend/api/tracks/mutations.py
··· 33 33 schedule_move_track_audio, 34 34 ) 35 35 from backend._internal.tasks.hooks import invalidate_tracks_discovery_cache 36 + from backend._internal.tasks.ingest import _write_tombstone 36 37 from backend.config import settings 37 38 from backend.models import Artist, Track, TrackTag, get_db 38 39 from backend.schemas import MessageResponse, TrackResponse ··· 123 124 # capture album_id before deletion for list sync 124 125 album_id_to_sync = track.album_id 125 126 127 + # capture URI before deletion for tombstone 128 + record_uri = track.atproto_record_uri 129 + 126 130 # delete track record 127 131 await db.delete(track) 128 132 await db.commit() 133 + 134 + # write tombstone so Jetstream replay doesn't re-create the track 135 + if record_uri: 136 + await _write_tombstone(record_uri) 129 137 130 138 # invalidate anonymous discovery feed cache 131 139 await invalidate_tracks_discovery_cache()
+43
backend/tests/api/test_track_deletion.py
··· 390 390 # both audio file and image should be deleted (no album shares the image) 391 391 assert "test_file_unshared_img" in delete_calls 392 392 assert track_image_id in delete_calls 393 + 394 + 395 + async def test_api_delete_writes_tombstone( 396 + test_app: FastAPI, db_session: AsyncSession, test_artist: Artist 397 + ): 398 + """API track deletion writes a Redis tombstone to prevent ghost re-creation.""" 399 + record_uri = "at://did:plc:artist123/fm.plyr.track/tombstone1" 400 + track = Track( 401 + title="tombstone test", 402 + artist_did=test_artist.did, 403 + file_id="test_file_tombstone", 404 + file_type="mp3", 405 + extra={}, 406 + atproto_record_uri=record_uri, 407 + atproto_record_cid="bafytombstone", 408 + ) 409 + db_session.add(track) 410 + await db_session.commit() 411 + await db_session.refresh(track) 412 + 413 + mock_redis = AsyncMock() 414 + mock_redis.set = AsyncMock() 415 + 416 + with ( 417 + patch("backend.api.tracks.mutations.storage.delete", new_callable=AsyncMock), 418 + patch( 419 + "backend.api.tracks.mutations.delete_record_by_uri", 420 + new_callable=AsyncMock, 421 + ), 422 + patch( 423 + "backend._internal.tasks.ingest.get_async_redis_client", 424 + return_value=mock_redis, 425 + ), 426 + ): 427 + async with AsyncClient( 428 + transport=ASGITransport(app=test_app), base_url="http://test" 429 + ) as client: 430 + response = await client.delete(f"/tracks/{track.id}") 431 + 432 + assert response.status_code == 200 433 + mock_redis.set.assert_called_once() 434 + call_args = mock_redis.set.call_args 435 + assert record_uri in call_args.args[0]
+117
backend/tests/test_jetstream.py
··· 12 12 from backend._internal.jetstream import JetstreamConsumer, consume_jetstream 13 13 from backend._internal.tasks.ingest import ( 14 14 SubjectNotFoundError, 15 + _write_tombstone, 15 16 ingest_comment_create, 16 17 ingest_comment_delete, 17 18 ingest_like_create, ··· 1356 1357 uri="at://did:plc:jetstream_test/fm.plyr.list/bad6", 1357 1358 ) 1358 1359 # nothing to assert on DB — just confirm no exception raised 1360 + 1361 + 1362 + # --- ghost track prevention tests --- 1363 + 1364 + 1365 + class TestGhostTrackPrevention: 1366 + """tombstone mechanism prevents ghost tracks from Jetstream cursor rewind.""" 1367 + 1368 + def _mock_redis(self) -> tuple[AsyncMock, dict[str, str]]: 1369 + """return a mock redis client backed by an in-memory dict.""" 1370 + store: dict[str, str] = {} 1371 + mock = AsyncMock() 1372 + 1373 + async def _set(key: str, value: str, ex: int | None = None) -> None: 1374 + store[key] = value 1375 + 1376 + async def _exists(key: str) -> int: 1377 + return 1 if key in store else 0 1378 + 1379 + mock.set = AsyncMock(side_effect=_set) 1380 + mock.exists = AsyncMock(side_effect=_exists) 1381 + return mock, store 1382 + 1383 + async def test_delete_then_create_skips_ghost( 1384 + self, db_session: AsyncSession, artist: Artist 1385 + ) -> None: 1386 + """replayed create after delete is skipped via tombstone.""" 1387 + uri = f"at://{artist.did}/fm.plyr.track/ghost1" 1388 + mock_redis, _store = self._mock_redis() 1389 + 1390 + with patch( 1391 + "backend._internal.tasks.ingest.get_async_redis_client", 1392 + return_value=mock_redis, 1393 + ): 1394 + # delete fires tombstone even when row doesn't exist (exact replay scenario) 1395 + await ingest_track_delete(did=artist.did, rkey="ghost1", uri=uri) 1396 + 1397 + # replayed create should be skipped 1398 + record = { 1399 + "title": "Ghost Track", 1400 + "artist": "Test", 1401 + "audioUrl": "https://r2.example.com/ghost.mp3", 1402 + "fileType": "mp3", 1403 + "createdAt": _recent_ts(), 1404 + } 1405 + await ingest_track_create( 1406 + did=artist.did, rkey="ghost1", record=record, uri=uri, cid="bafyghost" 1407 + ) 1408 + 1409 + result = await db_session.execute( 1410 + select(Track).where(Track.atproto_record_uri == uri) 1411 + ) 1412 + assert result.scalar_one_or_none() is None 1413 + 1414 + async def test_tombstone_does_not_block_different_uri( 1415 + self, db_session: AsyncSession, artist: Artist 1416 + ) -> None: 1417 + """tombstone for URI A does not block creation of URI B.""" 1418 + uri_a = f"at://{artist.did}/fm.plyr.track/deleted1" 1419 + uri_b = f"at://{artist.did}/fm.plyr.track/new1" 1420 + mock_redis, _store = self._mock_redis() 1421 + 1422 + with patch( 1423 + "backend._internal.tasks.ingest.get_async_redis_client", 1424 + return_value=mock_redis, 1425 + ): 1426 + await _write_tombstone(uri_a) 1427 + 1428 + record = { 1429 + "title": "New Track", 1430 + "artist": "Test", 1431 + "audioUrl": "https://r2.example.com/new.mp3", 1432 + "fileType": "mp3", 1433 + "createdAt": _recent_ts(), 1434 + } 1435 + await ingest_track_create( 1436 + did=artist.did, rkey="new1", record=record, uri=uri_b, cid="bafynew" 1437 + ) 1438 + 1439 + result = await db_session.execute( 1440 + select(Track).where(Track.atproto_record_uri == uri_b) 1441 + ) 1442 + assert result.scalar_one() is not None 1443 + 1444 + async def test_redis_down_allows_create( 1445 + self, db_session: AsyncSession, artist: Artist 1446 + ) -> None: 1447 + """when Redis is unavailable, create proceeds (fail-open).""" 1448 + uri = f"at://{artist.did}/fm.plyr.track/failopen1" 1449 + mock_redis = AsyncMock() 1450 + mock_redis.exists = AsyncMock(side_effect=ConnectionError("redis down")) 1451 + mock_redis.set = AsyncMock(side_effect=ConnectionError("redis down")) 1452 + 1453 + with patch( 1454 + "backend._internal.tasks.ingest.get_async_redis_client", 1455 + return_value=mock_redis, 1456 + ): 1457 + record = { 1458 + "title": "Fail Open Track", 1459 + "artist": "Test", 1460 + "audioUrl": "https://r2.example.com/failopen.mp3", 1461 + "fileType": "mp3", 1462 + "createdAt": _recent_ts(), 1463 + } 1464 + await ingest_track_create( 1465 + did=artist.did, 1466 + rkey="failopen1", 1467 + record=record, 1468 + uri=uri, 1469 + cid="bafyfailopen", 1470 + ) 1471 + 1472 + result = await db_session.execute( 1473 + select(Track).where(Track.atproto_record_uri == uri) 1474 + ) 1475 + assert result.scalar_one() is not None
+2 -2
loq.toml
··· 219 219 220 220 [[rules]] 221 221 path = "backend/src/backend/_internal/tasks/ingest.py" 222 - max_lines = 603 222 + max_lines = 648 223 223 224 224 [[rules]] 225 225 path = "backend/tests/test_jetstream.py" 226 - max_lines = 1358 226 + max_lines = 1475 227 227 228 228 [[rules]] 229 229 path = "frontend/src/lib/components/embed/CollectionEmbed.svelte"