audio streaming app plyr.fm

fix: deduplicate teal.fm scrobbles (#1024)

* fix: deduplicate teal.fm scrobbles during track transitions

backend: add redis SET NX dedup (60s TTL) in schedule_teal_scrobble so
concurrent requests from multiple fly machines only schedule one task.

frontend: lock play counting during track transitions via _playCountLocked
flag — set on resetPlayCount(), cleared on loadeddata — preventing spurious
fires from stale currentTime/duration before new audio loads.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: move redis import to top level in sync.py

deferred imports are only for circular import avoidance, not here.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>

authored by zzstoatzz.io

Claude Opus 4.6 and committed by
GitHub
b21fb402 61189057

+128 -1
+12 -1
backend/src/backend/_internal/tasks/sync.py
··· 7 8 from backend._internal.background import get_docket 9 from backend._internal.follow_graph import warm_follows_cache 10 11 logger = logging.getLogger(__name__) 12 ··· 106 duration: int | None, 107 album_name: str | None, 108 ) -> None: 109 - """schedule a teal scrobble via docket.""" 110 docket = get_docket() 111 await docket.add(scrobble_to_teal)( 112 session_id, track_id, track_title, artist_name, duration, album_name
··· 7 8 from backend._internal.background import get_docket 9 from backend._internal.follow_graph import warm_follows_cache 10 + from backend.utilities.redis import get_async_redis_client 11 12 logger = logging.getLogger(__name__) 13 ··· 107 duration: int | None, 108 album_name: str | None, 109 ) -> None: 110 + """schedule a teal scrobble via docket. 111 + 112 + uses redis SET NX with a 60s TTL to deduplicate concurrent requests 113 + from multiple fly machines or rapid-fire frontend calls. 114 + """ 115 + redis = get_async_redis_client() 116 + dedup_key = f"teal-scrobble:{session_id}:{track_id}" 117 + if not await redis.set(dedup_key, "1", nx=True, ex=60): 118 + logfire.info("teal scrobble deduped", track_id=track_id) 119 + return 120 + 121 docket = get_docket() 122 await docket.add(scrobble_to_teal)( 123 session_id, track_id, track_title, artist_name, duration, album_name
+103
backend/tests/test_teal_scrobbling.py
··· 1 """tests for teal.fm scrobbling integration.""" 2 3 from datetime import UTC, datetime 4 5 from backend._internal.atproto.teal import ( 6 build_teal_play_record, 7 build_teal_status_record, 8 ) 9 from backend._internal.atproto.tid import datetime_to_tid 10 from backend.config import TealSettings, settings 11 12 ··· 174 # alpha namespaces should NOT be in scope when overridden 175 assert "fm.teal.alpha.feed.play" not in scope 176 assert "fm.teal.alpha.actor.status" not in scope
··· 1 """tests for teal.fm scrobbling integration.""" 2 3 from datetime import UTC, datetime 4 + from unittest.mock import AsyncMock, MagicMock, patch 5 + 6 + import pytest 7 8 from backend._internal.atproto.teal import ( 9 build_teal_play_record, 10 build_teal_status_record, 11 ) 12 from backend._internal.atproto.tid import datetime_to_tid 13 + from backend._internal.tasks.sync import schedule_teal_scrobble 14 from backend.config import TealSettings, settings 15 16 ··· 178 # alpha namespaces should NOT be in scope when overridden 179 assert "fm.teal.alpha.feed.play" not in scope 180 assert "fm.teal.alpha.actor.status" not in scope 181 + 182 + 183 + class TestScheduleTealScrobbleDedup: 184 + """tests for redis-based deduplication in schedule_teal_scrobble.""" 185 + 186 + @pytest.fixture 187 + def mock_docket(self) -> MagicMock: 188 + """mock docket that tracks add() calls.""" 189 + mock = MagicMock() 190 + mock.add.return_value = AsyncMock() 191 + return mock 192 + 193 + @pytest.fixture 194 + def mock_redis(self) -> AsyncMock: 195 + """mock redis client that simulates SET NX behavior.""" 196 + seen_keys: set[str] = set() 197 + 198 + async def fake_set( 199 + key: str, value: str, *, nx: bool = False, ex: int | None = None 200 + ) -> bool | None: 201 + if nx and key in seen_keys: 202 + return False 203 + seen_keys.add(key) 204 + return True 205 + 206 + redis = AsyncMock() 207 + redis.set = AsyncMock(side_effect=fake_set) 208 + return redis 209 + 210 + async def test_duplicate_scrobble_only_schedules_once( 211 + self, mock_docket: MagicMock, mock_redis: AsyncMock 212 + ) -> None: 213 + """calling schedule_teal_scrobble twice with same (session_id, track_id) 214 + within the TTL window should only schedule one docket task.""" 215 + with ( 216 + patch( 217 + "backend._internal.tasks.sync.get_docket", 218 + return_value=mock_docket, 219 + ), 220 + patch( 221 + "backend._internal.tasks.sync.get_async_redis_client", 222 + return_value=mock_redis, 223 + ), 224 + ): 225 + # first call should schedule 226 + await schedule_teal_scrobble( 227 + session_id="test-session", 228 + track_id=42, 229 + track_title="Test Track", 230 + artist_name="Test Artist", 231 + duration=180, 232 + album_name="Test Album", 233 + ) 234 + 235 + # second call with same session+track should be deduped 236 + await schedule_teal_scrobble( 237 + session_id="test-session", 238 + track_id=42, 239 + track_title="Test Track", 240 + artist_name="Test Artist", 241 + duration=180, 242 + album_name="Test Album", 243 + ) 244 + 245 + assert mock_docket.add.call_count == 1 246 + 247 + async def test_different_tracks_both_schedule( 248 + self, mock_docket: MagicMock, mock_redis: AsyncMock 249 + ) -> None: 250 + """different track_ids should each get their own scrobble.""" 251 + with ( 252 + patch( 253 + "backend._internal.tasks.sync.get_docket", 254 + return_value=mock_docket, 255 + ), 256 + patch( 257 + "backend._internal.tasks.sync.get_async_redis_client", 258 + return_value=mock_redis, 259 + ), 260 + ): 261 + await schedule_teal_scrobble( 262 + session_id="test-session", 263 + track_id=1, 264 + track_title="Track One", 265 + artist_name="Artist", 266 + duration=180, 267 + album_name=None, 268 + ) 269 + 270 + await schedule_teal_scrobble( 271 + session_id="test-session", 272 + track_id=2, 273 + track_title="Track Two", 274 + artist_name="Artist", 275 + duration=200, 276 + album_name=None, 277 + ) 278 + 279 + assert mock_docket.add.call_count == 2
+3
frontend/src/lib/components/player/Player.svelte
··· 391 positionRestored = true; 392 } 393 isLoadingTrack = false; 394 }, 395 { once: true } 396 );
··· 391 positionRestored = true; 392 } 393 isLoadingTrack = false; 394 + // unlock play counting now that new audio is ready 395 + // (prevents spurious fires from stale currentTime during transitions) 396 + player.unlockPlayCount(); 397 }, 398 { once: true } 399 );
+10
frontend/src/lib/player.svelte.ts
··· 20 // (reactive state updates are batched, so we need this to block rapid-fire calls) 21 private _playCountPending: number | null = null; 22 23 setRef(code: string | null, trackId: number | null = null) { 24 this.ref = code; 25 this._refTrackId = trackId; ··· 45 } 46 47 incrementPlayCount() { 48 if (!this.currentTrack || this.playCountedForTrack === this.currentTrack.id || !this.duration) { 49 return; 50 } ··· 79 resetPlayCount() { 80 this.playCountedForTrack = null; 81 this._playCountPending = null; 82 } 83 84 reset() {
··· 20 // (reactive state updates are batched, so we need this to block rapid-fire calls) 21 private _playCountPending: number | null = null; 22 23 + // lock play counting during track transitions to prevent spurious fires 24 + // from stale currentTime/duration values before new audio loads 25 + private _playCountLocked = false; 26 + 27 setRef(code: string | null, trackId: number | null = null) { 28 this.ref = code; 29 this._refTrackId = trackId; ··· 49 } 50 51 incrementPlayCount() { 52 + if (this._playCountLocked) return; 53 if (!this.currentTrack || this.playCountedForTrack === this.currentTrack.id || !this.duration) { 54 return; 55 } ··· 84 resetPlayCount() { 85 this.playCountedForTrack = null; 86 this._playCountPending = null; 87 + this._playCountLocked = true; 88 + } 89 + 90 + unlockPlayCount() { 91 + this._playCountLocked = false; 92 } 93 94 reset() {