audio streaming app
plyr.fm
1"""tests for jam api endpoints."""
2
3from collections.abc import AsyncGenerator
4from typing import Any
5from unittest.mock import AsyncMock
6
7import pytest
8from fastapi import FastAPI
9from httpx import ASGITransport, AsyncClient
10from sqlalchemy.ext.asyncio import AsyncSession
11
12from backend._internal import Session
13from backend._internal.jams import JamService, jam_service
14from backend.main import app
15from backend.models import Artist
16
17
18async def _update_queue(
19 client: AsyncClient, code: str, track_ids: list[str], current_index: int
20) -> dict[str, Any]:
21 """send an update_queue command and return the response json."""
22 r = await client.post(
23 f"/jams/{code}/command",
24 json={
25 "type": "update_queue",
26 "track_ids": track_ids,
27 "current_index": current_index,
28 },
29 )
30 assert r.status_code == 200
31 return r.json()
32
33
34class MockSession(Session):
35 """mock session for auth bypass in tests."""
36
37 def __init__(self, did: str = "did:test:host"):
38 self.did = did
39 self.access_token = "test_token"
40 self.refresh_token = "test_refresh"
41 self.session_id = "test_session"
42 self.handle = "test.host"
43 self.oauth_session = {}
44
45
46@pytest.fixture
47async def test_app(db_session: AsyncSession) -> AsyncGenerator[FastAPI, None]:
48 """create test app with mocked auth and jams flag."""
49 from backend._internal import require_auth
50
51 mock_session = MockSession()
52
53 async def mock_require_auth() -> Session:
54 return mock_session
55
56 app.dependency_overrides[require_auth] = mock_require_auth
57
58 # create the test artist
59 artist = Artist(
60 did="did:test:host",
61 handle="test.host",
62 display_name="Test Host",
63 )
64 db_session.add(artist)
65 await db_session.flush()
66
67 await db_session.commit()
68
69 yield app
70
71 app.dependency_overrides.clear()
72
73
74@pytest.fixture
75async def second_user(db_session: AsyncSession) -> str:
76 """create a second test artist with jams flag."""
77 artist = Artist(
78 did="did:test:joiner",
79 handle="test.joiner",
80 display_name="Test Joiner",
81 )
82 db_session.add(artist)
83 await db_session.commit()
84 return "did:test:joiner"
85
86
87async def test_create_jam(test_app: FastAPI, db_session: AsyncSession) -> None:
88 """test POST /jams/ creates a jam."""
89 async with AsyncClient(
90 transport=ASGITransport(app=test_app), base_url="http://test"
91 ) as client:
92 response = await client.post(
93 "/jams/",
94 json={"name": "test jam", "track_ids": ["track1", "track2"]},
95 )
96
97 assert response.status_code == 200
98 data = response.json()
99 assert data["name"] == "test jam"
100 assert data["host_did"] == "did:test:host"
101 assert data["is_active"] is True
102 assert len(data["code"]) == 8
103 assert data["state"]["track_ids"] == ["track1", "track2"]
104 assert data["state"]["current_index"] == 0
105 assert data["state"]["current_track_id"] == "track1"
106 assert data["revision"] == 1
107
108
109async def test_create_jam_empty(test_app: FastAPI, db_session: AsyncSession) -> None:
110 """test creating a jam with no tracks."""
111 async with AsyncClient(
112 transport=ASGITransport(app=test_app), base_url="http://test"
113 ) as client:
114 response = await client.post("/jams/", json={})
115
116 assert response.status_code == 200
117 data = response.json()
118 assert data["state"]["track_ids"] == []
119 assert data["state"]["is_playing"] is False
120
121
122async def test_get_jam_by_code(test_app: FastAPI, db_session: AsyncSession) -> None:
123 """test GET /jams/{code} returns jam details."""
124 async with AsyncClient(
125 transport=ASGITransport(app=test_app), base_url="http://test"
126 ) as client:
127 create_response = await client.post("/jams/", json={"name": "get test"})
128 code = create_response.json()["code"]
129
130 response = await client.get(f"/jams/{code}")
131
132 assert response.status_code == 200
133 data = response.json()
134 assert data["code"] == code
135 assert data["name"] == "get test"
136
137
138async def test_get_jam_not_found(test_app: FastAPI, db_session: AsyncSession) -> None:
139 """test GET /jams/{code} returns 404 for unknown code."""
140 async with AsyncClient(
141 transport=ASGITransport(app=test_app), base_url="http://test"
142 ) as client:
143 response = await client.get("/jams/nonexist")
144
145 assert response.status_code == 404
146
147
148async def test_join_jam(
149 test_app: FastAPI, db_session: AsyncSession, second_user: str
150) -> None:
151 """test POST /jams/{code}/join adds a participant."""
152 from backend._internal import require_auth
153
154 # create jam as host
155 async with AsyncClient(
156 transport=ASGITransport(app=test_app), base_url="http://test"
157 ) as client:
158 create_response = await client.post("/jams/", json={"name": "join test"})
159 code = create_response.json()["code"]
160
161 # switch to second user
162 async def mock_joiner_auth() -> Session:
163 return MockSession(did=second_user)
164
165 app.dependency_overrides[require_auth] = mock_joiner_auth
166
167 async with AsyncClient(
168 transport=ASGITransport(app=test_app), base_url="http://test"
169 ) as client:
170 response = await client.post(f"/jams/{code}/join")
171
172 assert response.status_code == 200
173 data = response.json()
174 assert len(data["participants"]) == 2
175 participant_dids = {p["did"] for p in data["participants"]}
176 assert "did:test:host" in participant_dids
177 assert second_user in participant_dids
178
179
180async def test_leave_jam(test_app: FastAPI, db_session: AsyncSession) -> None:
181 """test POST /jams/{code}/leave removes participant."""
182 async with AsyncClient(
183 transport=ASGITransport(app=test_app), base_url="http://test"
184 ) as client:
185 create_response = await client.post("/jams/", json={"name": "leave test"})
186 code = create_response.json()["code"]
187
188 response = await client.post(f"/jams/{code}/leave")
189
190 assert response.status_code == 200
191 assert response.json()["ok"] is True
192
193
194async def test_leave_ends_jam_when_last(
195 test_app: FastAPI, db_session: AsyncSession
196) -> None:
197 """test that leaving as last participant ends the jam."""
198 async with AsyncClient(
199 transport=ASGITransport(app=test_app), base_url="http://test"
200 ) as client:
201 create_response = await client.post("/jams/", json={"name": "last leave test"})
202 code = create_response.json()["code"]
203
204 # leave (only participant)
205 await client.post(f"/jams/{code}/leave")
206
207 # jam should no longer be active
208 get_response = await client.get(f"/jams/{code}")
209
210 assert get_response.status_code == 200
211 assert get_response.json()["is_active"] is False
212
213
214async def test_end_jam_host_only(
215 test_app: FastAPI, db_session: AsyncSession, second_user: str
216) -> None:
217 """test that only the host can end a jam."""
218 from backend._internal import require_auth
219
220 # create jam as host
221 async with AsyncClient(
222 transport=ASGITransport(app=test_app), base_url="http://test"
223 ) as client:
224 create_response = await client.post("/jams/", json={"name": "end test"})
225 code = create_response.json()["code"]
226
227 # switch to second user and try to end
228 async def mock_joiner_auth() -> Session:
229 return MockSession(did=second_user)
230
231 app.dependency_overrides[require_auth] = mock_joiner_auth
232
233 async with AsyncClient(
234 transport=ASGITransport(app=test_app), base_url="http://test"
235 ) as client:
236 # join first
237 await client.post(f"/jams/{code}/join")
238 # try to end
239 response = await client.post(f"/jams/{code}/end")
240
241 assert response.status_code == 403
242
243
244async def test_end_jam_by_host(test_app: FastAPI, db_session: AsyncSession) -> None:
245 """test host can end their jam."""
246 async with AsyncClient(
247 transport=ASGITransport(app=test_app), base_url="http://test"
248 ) as client:
249 create_response = await client.post("/jams/", json={"name": "host end test"})
250 code = create_response.json()["code"]
251
252 response = await client.post(f"/jams/{code}/end")
253
254 assert response.status_code == 200
255 assert response.json()["ok"] is True
256
257
258async def test_command_play_pause(test_app: FastAPI, db_session: AsyncSession) -> None:
259 """test play and pause commands."""
260 async with AsyncClient(
261 transport=ASGITransport(app=test_app), base_url="http://test"
262 ) as client:
263 create_response = await client.post(
264 "/jams/",
265 json={"track_ids": ["t1"]},
266 )
267 code = create_response.json()["code"]
268
269 # play
270 play_response = await client.post(
271 f"/jams/{code}/command", json={"type": "play"}
272 )
273 assert play_response.status_code == 200
274 assert play_response.json()["state"]["is_playing"] is True
275
276 # pause
277 pause_response = await client.post(
278 f"/jams/{code}/command", json={"type": "pause"}
279 )
280 assert pause_response.status_code == 200
281 assert pause_response.json()["state"]["is_playing"] is False
282
283
284async def test_command_seek(test_app: FastAPI, db_session: AsyncSession) -> None:
285 """test seek command."""
286 async with AsyncClient(
287 transport=ASGITransport(app=test_app), base_url="http://test"
288 ) as client:
289 create_response = await client.post(
290 "/jams/",
291 json={"track_ids": ["t1"]},
292 )
293 code = create_response.json()["code"]
294
295 response = await client.post(
296 f"/jams/{code}/command",
297 json={"type": "seek", "position_ms": 30000},
298 )
299
300 assert response.status_code == 200
301 assert response.json()["state"]["progress_ms"] == 30000
302
303
304async def test_update_queue_change_index(
305 test_app: FastAPI, db_session: AsyncSession
306) -> None:
307 """test update_queue to advance and go back (replaces next/previous)."""
308 tracks = ["t1", "t2", "t3"]
309 async with AsyncClient(
310 transport=ASGITransport(app=test_app), base_url="http://test"
311 ) as client:
312 create_response = await client.post("/jams/", json={"track_ids": tracks})
313 code = create_response.json()["code"]
314
315 r1 = await _update_queue(client, code, tracks, 1)
316 assert r1["state"]["current_index"] == 1
317 assert r1["state"]["current_track_id"] == "t2"
318
319 r2 = await _update_queue(client, code, tracks, 2)
320 assert r2["state"]["current_index"] == 2
321
322 r3 = await _update_queue(client, code, tracks, 1)
323 assert r3["state"]["current_index"] == 1
324 assert r3["state"]["current_track_id"] == "t2"
325
326
327async def test_update_queue_add_tracks(
328 test_app: FastAPI, db_session: AsyncSession
329) -> None:
330 """test update_queue with extended track list (replaces add_tracks)."""
331 async with AsyncClient(
332 transport=ASGITransport(app=test_app), base_url="http://test"
333 ) as client:
334 create_response = await client.post("/jams/", json={"track_ids": ["t1"]})
335 code = create_response.json()["code"]
336 r = await _update_queue(client, code, ["t1", "t2", "t3"], 0)
337
338 assert r["state"]["track_ids"] == ["t1", "t2", "t3"]
339 assert r["tracks_changed"] is True
340
341
342async def test_sequential_update_queue(
343 test_app: FastAPI, db_session: AsyncSession
344) -> None:
345 """two update_queue calls: extend tracks, then advance — regression for shallow-copy bug."""
346 tracks = ["t1", "t2", "t3"]
347 async with AsyncClient(
348 transport=ASGITransport(app=test_app), base_url="http://test"
349 ) as client:
350 create_response = await client.post("/jams/", json={"track_ids": ["t1"]})
351 code = create_response.json()["code"]
352 await _update_queue(client, code, tracks, 0)
353 r = await _update_queue(client, code, tracks, 1)
354
355 assert r["state"]["track_ids"] == tracks
356 assert r["state"]["current_index"] == 1
357 assert r["state"]["current_track_id"] == "t2"
358
359
360async def test_update_queue_remove_track(
361 test_app: FastAPI, db_session: AsyncSession
362) -> None:
363 """test update_queue with a track removed."""
364 async with AsyncClient(
365 transport=ASGITransport(app=test_app), base_url="http://test"
366 ) as client:
367 create_response = await client.post(
368 "/jams/", json={"track_ids": ["t1", "t2", "t3"]}
369 )
370 code = create_response.json()["code"]
371 r = await _update_queue(client, code, ["t1", "t3"], 0)
372
373 assert r["state"]["track_ids"] == ["t1", "t3"]
374 assert r["tracks_changed"] is True
375
376
377async def test_update_queue_reorder(
378 test_app: FastAPI, db_session: AsyncSession
379) -> None:
380 """test update_queue with reordered track list (replaces move_track)."""
381 async with AsyncClient(
382 transport=ASGITransport(app=test_app), base_url="http://test"
383 ) as client:
384 create_response = await client.post(
385 "/jams/", json={"track_ids": ["t1", "t2", "t3", "t4"]}
386 )
387 code = create_response.json()["code"]
388
389 r1 = await _update_queue(client, code, ["t3", "t1", "t2", "t4"], 1)
390 assert r1["state"]["track_ids"] == ["t3", "t1", "t2", "t4"]
391 assert r1["state"]["current_index"] == 1
392 assert r1["state"]["current_track_id"] == "t1"
393
394 r2 = await _update_queue(client, code, ["t3", "t2", "t4", "t1"], 3)
395 assert r2["state"]["track_ids"] == ["t3", "t2", "t4", "t1"]
396 assert r2["state"]["current_index"] == 3
397 assert r2["state"]["current_track_id"] == "t1"
398 assert r2["tracks_changed"] is True
399
400
401async def test_update_queue_same_tracks_no_change(
402 test_app: FastAPI, db_session: AsyncSession
403) -> None:
404 """test update_queue with same tracks reports tracks_changed=False."""
405 tracks = ["t1", "t2"]
406 async with AsyncClient(
407 transport=ASGITransport(app=test_app), base_url="http://test"
408 ) as client:
409 create_response = await client.post("/jams/", json={"track_ids": tracks})
410 code = create_response.json()["code"]
411 r = await _update_queue(client, code, tracks, 0)
412
413 assert r["state"]["track_ids"] == tracks
414 assert r["tracks_changed"] is False
415
416
417async def test_update_queue_clear_upcoming(
418 test_app: FastAPI, db_session: AsyncSession
419) -> None:
420 """test update_queue with truncated list (replaces clear_upcoming)."""
421 full = ["t1", "t2", "t3", "t4"]
422 async with AsyncClient(
423 transport=ASGITransport(app=test_app), base_url="http://test"
424 ) as client:
425 create_response = await client.post("/jams/", json={"track_ids": full})
426 code = create_response.json()["code"]
427 await _update_queue(client, code, full, 1)
428 r = await _update_queue(client, code, ["t1", "t2"], 1)
429
430 assert r["state"]["track_ids"] == ["t1", "t2"]
431 assert r["state"]["current_index"] == 1
432 assert r["state"]["current_track_id"] == "t2"
433 assert r["tracks_changed"] is True
434
435
436async def test_revision_monotonicity(
437 test_app: FastAPI, db_session: AsyncSession
438) -> None:
439 """test that revision increases monotonically."""
440 async with AsyncClient(
441 transport=ASGITransport(app=test_app), base_url="http://test"
442 ) as client:
443 create_response = await client.post(
444 "/jams/",
445 json={"track_ids": ["t1"]},
446 )
447 code = create_response.json()["code"]
448 rev = create_response.json()["revision"]
449 assert rev == 1
450
451 for _ in range(5):
452 cmd_response = await client.post(
453 f"/jams/{code}/command", json={"type": "play"}
454 )
455 new_rev = cmd_response.json()["revision"]
456 assert new_rev > rev
457 rev = new_rev
458
459
460async def test_auto_leave_previous_jam(
461 test_app: FastAPI, db_session: AsyncSession
462) -> None:
463 """test that creating a new jam auto-leaves the previous one."""
464 async with AsyncClient(
465 transport=ASGITransport(app=test_app), base_url="http://test"
466 ) as client:
467 # create first jam
468 first_response = await client.post("/jams/", json={"name": "first jam"})
469 first_code = first_response.json()["code"]
470
471 # create second jam (should auto-leave first)
472 second_response = await client.post("/jams/", json={"name": "second jam"})
473 second_code = second_response.json()["code"]
474
475 assert first_code != second_code
476
477 # check active jam is the second one
478 active_response = await client.get("/jams/active")
479
480 assert active_response.status_code == 200
481 assert active_response.json()["code"] == second_code
482
483
484async def test_get_active_jam_none(test_app: FastAPI, db_session: AsyncSession) -> None:
485 """test GET /jams/active returns null when not in a jam."""
486 async with AsyncClient(
487 transport=ASGITransport(app=test_app), base_url="http://test"
488 ) as client:
489 response = await client.get("/jams/active")
490
491 assert response.status_code == 200
492 assert response.json() is None
493
494
495async def test_code_uniqueness(test_app: FastAPI, db_session: AsyncSession) -> None:
496 """test that each jam gets a unique code."""
497 codes = set()
498 async with AsyncClient(
499 transport=ASGITransport(app=test_app), base_url="http://test"
500 ) as client:
501 for i in range(5):
502 response = await client.post("/jams/", json={"name": f"jam {i}"})
503 assert response.status_code == 200
504 codes.add(response.json()["code"])
505
506 assert len(codes) == 5
507
508
509async def test_update_queue_set_index(
510 test_app: FastAPI, db_session: AsyncSession
511) -> None:
512 """test update_queue to jump to a specific track (replaces set_index)."""
513 tracks = ["t1", "t2", "t3", "t4"]
514 async with AsyncClient(
515 transport=ASGITransport(app=test_app), base_url="http://test"
516 ) as client:
517 create_response = await client.post("/jams/", json={"track_ids": tracks})
518 code = create_response.json()["code"]
519
520 r1 = await _update_queue(client, code, tracks, 2)
521 assert r1["state"]["current_index"] == 2
522 assert r1["state"]["current_track_id"] == "t3"
523 assert r1["state"]["progress_ms"] == 0
524
525 r2 = await _update_queue(client, code, tracks, 0)
526 assert r2["state"]["current_index"] == 0
527 assert r2["state"]["current_track_id"] == "t1"
528
529
530async def test_command_non_participant_rejected(
531 test_app: FastAPI, db_session: AsyncSession, second_user: str
532) -> None:
533 """test that non-participants cannot send commands."""
534 from backend._internal import require_auth
535
536 # create jam as host
537 async with AsyncClient(
538 transport=ASGITransport(app=test_app), base_url="http://test"
539 ) as client:
540 create_response = await client.post("/jams/", json={"track_ids": ["t1", "t2"]})
541 code = create_response.json()["code"]
542
543 # switch to second user (NOT joined)
544 async def mock_joiner_auth() -> Session:
545 return MockSession(did=second_user)
546
547 app.dependency_overrides[require_auth] = mock_joiner_auth
548
549 async with AsyncClient(
550 transport=ASGITransport(app=test_app), base_url="http://test"
551 ) as client:
552 response = await client.post(
553 f"/jams/{code}/command",
554 json={
555 "type": "update_queue",
556 "track_ids": ["t1", "t2"],
557 "current_index": 1,
558 },
559 )
560
561 assert response.status_code == 400
562
563
564async def test_sequential_commands_get_distinct_revisions(
565 test_app: FastAPI, db_session: AsyncSession
566) -> None:
567 """test that each command gets a distinct revision (verifies no clobbering)."""
568 async with AsyncClient(
569 transport=ASGITransport(app=test_app), base_url="http://test"
570 ) as client:
571 create_response = await client.post(
572 "/jams/", json={"track_ids": ["t1", "t2", "t3"]}
573 )
574 code = create_response.json()["code"]
575 assert create_response.json()["revision"] == 1
576
577 tracks = ["t1", "t2", "t3"]
578 r1 = await _update_queue(client, code, tracks, 1)
579 r2 = await _update_queue(client, code, tracks, 2)
580
581 assert r1["revision"] == 2
582 assert r2["revision"] == 3
583 assert r1["state"]["current_index"] == 1
584 assert r2["state"]["current_index"] == 2
585
586
587async def test_did_socket_replacement() -> None:
588 """test that connecting a second WS for the same DID closes the first."""
589 from starlette.websockets import WebSocket
590
591 service = JamService()
592 jam_id = "test-jam-123"
593
594 ws1 = AsyncMock(spec=WebSocket)
595 ws2 = AsyncMock(spec=WebSocket)
596
597 # connect first socket
598 await service.connect_ws(jam_id, ws1, "did:test:user")
599 assert jam_id in service._connections
600 assert ws1 in service._connections[jam_id]
601
602 # connect second socket for same DID — should close first
603 await service.connect_ws(jam_id, ws2, "did:test:user")
604
605 # first socket should have been closed with code 4010
606 ws1.close.assert_awaited_once_with(code=4010, reason="replaced by new connection")
607
608 # only second socket should be in connections
609 assert ws2 in service._connections[jam_id]
610 assert ws1 not in service._connections[jam_id]
611
612 # DID mapping should point to second socket
613 assert service._ws_by_did["did:test:user"] == (jam_id, ws2)
614
615
616# ── output device tests ────────────────────────────────────────────
617
618
619async def test_create_jam_has_null_output(
620 test_app: FastAPI, db_session: AsyncSession
621) -> None:
622 """test that newly created jams have output_client_id = null."""
623 async with AsyncClient(
624 transport=ASGITransport(app=test_app), base_url="http://test"
625 ) as client:
626 response = await client.post("/jams/", json={"track_ids": ["t1"]})
627
628 assert response.status_code == 200
629 assert response.json()["state"]["output_client_id"] is None
630
631
632async def test_auto_set_output_on_host_sync(
633 test_app: FastAPI, db_session: AsyncSession
634) -> None:
635 """test that output_client_id is auto-set to host on first sync with real DB state."""
636 from starlette.websockets import WebSocket
637
638 # create jam via API (writes to DB)
639 async with AsyncClient(
640 transport=ASGITransport(app=test_app), base_url="http://test"
641 ) as client:
642 create_response = await client.post("/jams/", json={"track_ids": ["t1"]})
643 code = create_response.json()["code"]
644 jam_id = create_response.json()["id"]
645 assert create_response.json()["state"]["output_client_id"] is None
646
647 # connect host WS and send sync with client_id
648 ws = AsyncMock(spec=WebSocket)
649 await jam_service.connect_ws(jam_id, ws, "did:test:host")
650 await jam_service._handle_sync(
651 jam_id, "did:test:host", {"client_id": "host-abc-123", "last_id": None}, ws
652 )
653
654 # verify DB state was updated
655 async with AsyncClient(
656 transport=ASGITransport(app=test_app), base_url="http://test"
657 ) as client:
658 get_response = await client.get(f"/jams/{code}")
659 state = get_response.json()["state"]
660 assert state["output_client_id"] == "host-abc-123"
661 assert state["output_did"] == "did:test:host"
662
663 # cleanup
664 await jam_service.disconnect_ws(jam_id, ws)
665
666
667async def test_set_output_command(test_app: FastAPI, db_session: AsyncSession) -> None:
668 """test set_output command changes output_client_id in state."""
669 from starlette.websockets import WebSocket
670
671 async with AsyncClient(
672 transport=ASGITransport(app=test_app), base_url="http://test"
673 ) as client:
674 create_response = await client.post("/jams/", json={"track_ids": ["t1"]})
675 code = create_response.json()["code"]
676 jam_id = create_response.json()["id"]
677
678 # register a WS with a client_id so set_output can validate
679 ws = AsyncMock(spec=WebSocket)
680 await jam_service.connect_ws(jam_id, ws, "did:test:host")
681 jam_service._ws_client_ids[ws] = "my-client-id"
682
683 async with AsyncClient(
684 transport=ASGITransport(app=test_app), base_url="http://test"
685 ) as client:
686 response = await client.post(
687 f"/jams/{code}/command",
688 json={"type": "set_output", "client_id": "my-client-id"},
689 )
690
691 assert response.status_code == 200
692 assert response.json()["state"]["output_client_id"] == "my-client-id"
693
694 # cleanup
695 await jam_service.disconnect_ws(jam_id, ws)
696
697
698async def test_set_output_validates_client_id(
699 test_app: FastAPI, db_session: AsyncSession
700) -> None:
701 """test that set_output rejects client_id that doesn't match sender's WS."""
702 from starlette.websockets import WebSocket
703
704 async with AsyncClient(
705 transport=ASGITransport(app=test_app), base_url="http://test"
706 ) as client:
707 create_response = await client.post("/jams/", json={"track_ids": ["t1"]})
708 code = create_response.json()["code"]
709 jam_id = create_response.json()["id"]
710
711 # register WS with one client_id
712 ws = AsyncMock(spec=WebSocket)
713 await jam_service.connect_ws(jam_id, ws, "did:test:host")
714 jam_service._ws_client_ids[ws] = "real-client-id"
715
716 # try to set output with a different client_id
717 async with AsyncClient(
718 transport=ASGITransport(app=test_app), base_url="http://test"
719 ) as client:
720 response = await client.post(
721 f"/jams/{code}/command",
722 json={"type": "set_output", "client_id": "spoofed-client-id"},
723 )
724
725 # should fail — client_id mismatch
726 assert response.status_code == 400
727
728 # cleanup
729 await jam_service.disconnect_ws(jam_id, ws)
730
731
732async def test_output_disconnect_no_remaining_pauses(
733 test_app: FastAPI, db_session: AsyncSession
734) -> None:
735 """test that output clears and playback pauses when the only client disconnects (no fallback)."""
736 from starlette.websockets import WebSocket
737
738 # create jam and set output via API
739 async with AsyncClient(
740 transport=ASGITransport(app=test_app), base_url="http://test"
741 ) as client:
742 create_response = await client.post(
743 "/jams/", json={"track_ids": ["t1"], "is_playing": True}
744 )
745 code = create_response.json()["code"]
746 jam_id = create_response.json()["id"]
747
748 # register host WS as output device
749 ws = AsyncMock(spec=WebSocket)
750 await jam_service.connect_ws(jam_id, ws, "did:test:host")
751 jam_service._ws_client_ids[ws] = "host-output-client"
752
753 async with AsyncClient(
754 transport=ASGITransport(app=test_app), base_url="http://test"
755 ) as client:
756 await client.post(
757 f"/jams/{code}/command",
758 json={"type": "set_output", "client_id": "host-output-client"},
759 )
760
761 # disconnect output device (no other clients connected)
762 await jam_service.disconnect_ws(jam_id, ws)
763
764 # verify DB state: output cleared, playback paused
765 async with AsyncClient(
766 transport=ASGITransport(app=test_app), base_url="http://test"
767 ) as client:
768 get_response = await client.get(f"/jams/{code}")
769 state = get_response.json()["state"]
770 assert state["output_client_id"] is None
771 assert state["output_did"] is None
772 assert state["is_playing"] is False
773
774
775async def test_output_clears_on_ws_replacement(
776 test_app: FastAPI, db_session: AsyncSession
777) -> None:
778 """regression: _close_ws_for_did must clear output before removing client_id mapping.
779
780 when a user reconnects (new WS replaces old), _close_ws_for_did fires instead of
781 disconnect_ws. if it pops the client_id first, the output stays pinned to a dead device.
782 """
783 from starlette.websockets import WebSocket
784
785 # create jam and set host as output
786 async with AsyncClient(
787 transport=ASGITransport(app=test_app), base_url="http://test"
788 ) as client:
789 create_response = await client.post(
790 "/jams/", json={"track_ids": ["t1"], "is_playing": True}
791 )
792 code = create_response.json()["code"]
793 jam_id = create_response.json()["id"]
794
795 ws1 = AsyncMock(spec=WebSocket)
796 await jam_service.connect_ws(jam_id, ws1, "did:test:host")
797 jam_service._ws_client_ids[ws1] = "old-client-id"
798
799 async with AsyncClient(
800 transport=ASGITransport(app=test_app), base_url="http://test"
801 ) as client:
802 await client.post(
803 f"/jams/{code}/command",
804 json={"type": "set_output", "client_id": "old-client-id"},
805 )
806
807 # reconnect — new WS replaces old (this calls _close_ws_for_did internally)
808 ws2 = AsyncMock(spec=WebSocket)
809 await jam_service.connect_ws(jam_id, ws2, "did:test:host")
810
811 # output should have been cleared by _close_ws_for_did
812 async with AsyncClient(
813 transport=ASGITransport(app=test_app), base_url="http://test"
814 ) as client:
815 get_response = await client.get(f"/jams/{code}")
816 state = get_response.json()["state"]
817 assert state["output_client_id"] is None, (
818 "output_client_id should be cleared when output device's WS is replaced"
819 )
820 assert state["is_playing"] is False
821
822 # cleanup
823 await jam_service.disconnect_ws(jam_id, ws2)
824
825
826async def test_non_output_disconnect_no_effect() -> None:
827 """test that a non-output participant disconnecting doesn't affect output."""
828 from starlette.websockets import WebSocket
829
830 service = JamService()
831 jam_id = "test-noeffect"
832
833 ws_host = AsyncMock(spec=WebSocket)
834 ws_joiner = AsyncMock(spec=WebSocket)
835
836 await service.connect_ws(jam_id, ws_host, "did:test:host")
837 service._ws_client_ids[ws_host] = "host-client"
838
839 await service.connect_ws(jam_id, ws_joiner, "did:test:joiner")
840 service._ws_client_ids[ws_joiner] = "joiner-client"
841
842 # disconnect the non-output joiner
843 await service.disconnect_ws(jam_id, ws_joiner)
844
845 # host's client_id should still be tracked
846 assert service._ws_client_ids[ws_host] == "host-client"
847 assert ws_host in service._connections[jam_id]
848
849 # cleanup
850 await service.disconnect_ws(jam_id, ws_host)
851
852
853# ── cross-client command tests ────────────────────────────────────
854
855
856async def test_cross_client_update_queue(
857 test_app: FastAPI, db_session: AsyncSession, second_user: str
858) -> None:
859 """test that a non-host participant can send update_queue and it updates state for all."""
860 from backend._internal import require_auth
861
862 tracks = ["t1", "t2", "t3", "t4"]
863
864 # create jam as host with 4 tracks
865 async with AsyncClient(
866 transport=ASGITransport(app=test_app), base_url="http://test"
867 ) as client:
868 create_response = await client.post(
869 "/jams/",
870 json={
871 "name": "cross client test",
872 "track_ids": tracks,
873 "is_playing": True,
874 },
875 )
876 code = create_response.json()["code"]
877 assert create_response.json()["state"]["current_index"] == 0
878 assert create_response.json()["state"]["current_track_id"] == "t1"
879
880 # second user joins
881 async def mock_joiner_auth() -> Session:
882 return MockSession(did=second_user)
883
884 app.dependency_overrides[require_auth] = mock_joiner_auth
885
886 async with AsyncClient(
887 transport=ASGITransport(app=test_app), base_url="http://test"
888 ) as client:
889 join_response = await client.post(f"/jams/{code}/join")
890 assert join_response.status_code == 200
891
892 r1 = await _update_queue(client, code, tracks, 1)
893 assert r1["state"]["current_index"] == 1
894 assert r1["state"]["current_track_id"] == "t2"
895 assert r1["state"]["progress_ms"] == 0
896 assert r1["state"]["is_playing"] is True
897
898 r2 = await _update_queue(client, code, tracks, 2)
899 assert r2["state"]["current_index"] == 2
900 assert r2["state"]["current_track_id"] == "t3"
901
902 # verify state from host's perspective
903 async def mock_host_auth() -> Session:
904 return MockSession(did="did:test:host")
905
906 app.dependency_overrides[require_auth] = mock_host_auth
907
908 async with AsyncClient(
909 transport=ASGITransport(app=test_app), base_url="http://test"
910 ) as client:
911 get_response = await client.get(f"/jams/{code}")
912 assert get_response.status_code == 200
913 host_view = get_response.json()
914 assert host_view["state"]["current_index"] == 2
915 assert host_view["state"]["current_track_id"] == "t3"
916
917
918async def test_cross_client_play_pause(
919 test_app: FastAPI, db_session: AsyncSession, second_user: str
920) -> None:
921 """test that a non-host participant can play/pause."""
922 from backend._internal import require_auth
923
924 # create jam as host (starts paused)
925 async with AsyncClient(
926 transport=ASGITransport(app=test_app), base_url="http://test"
927 ) as client:
928 create_response = await client.post(
929 "/jams/",
930 json={"track_ids": ["t1", "t2"]},
931 )
932 code = create_response.json()["code"]
933 assert create_response.json()["state"]["is_playing"] is False
934
935 # joiner joins and sends play
936 async def mock_joiner_auth() -> Session:
937 return MockSession(did=second_user)
938
939 app.dependency_overrides[require_auth] = mock_joiner_auth
940
941 async with AsyncClient(
942 transport=ASGITransport(app=test_app), base_url="http://test"
943 ) as client:
944 await client.post(f"/jams/{code}/join")
945
946 play_response = await client.post(
947 f"/jams/{code}/command", json={"type": "play"}
948 )
949 assert play_response.status_code == 200
950 assert play_response.json()["state"]["is_playing"] is True
951
952 pause_response = await client.post(
953 f"/jams/{code}/command", json={"type": "pause"}
954 )
955 assert pause_response.status_code == 200
956 assert pause_response.json()["state"]["is_playing"] is False
957
958
959async def test_cross_client_seek(
960 test_app: FastAPI, db_session: AsyncSession, second_user: str
961) -> None:
962 """test that a non-host participant can seek."""
963 from backend._internal import require_auth
964
965 async with AsyncClient(
966 transport=ASGITransport(app=test_app), base_url="http://test"
967 ) as client:
968 create_response = await client.post("/jams/", json={"track_ids": ["t1"]})
969 code = create_response.json()["code"]
970
971 async def mock_joiner_auth() -> Session:
972 return MockSession(did=second_user)
973
974 app.dependency_overrides[require_auth] = mock_joiner_auth
975
976 async with AsyncClient(
977 transport=ASGITransport(app=test_app), base_url="http://test"
978 ) as client:
979 await client.post(f"/jams/{code}/join")
980
981 seek_response = await client.post(
982 f"/jams/{code}/command",
983 json={"type": "seek", "position_ms": 45000},
984 )
985 assert seek_response.status_code == 200
986 assert seek_response.json()["state"]["progress_ms"] == 45000
987
988
989async def test_cross_client_update_queue_add_tracks(
990 test_app: FastAPI, db_session: AsyncSession, second_user: str
991) -> None:
992 """test that a non-host participant can extend the queue via update_queue."""
993 from backend._internal import require_auth
994
995 async with AsyncClient(
996 transport=ASGITransport(app=test_app), base_url="http://test"
997 ) as client:
998 create_response = await client.post("/jams/", json={"track_ids": ["t1"]})
999 code = create_response.json()["code"]
1000
1001 async def mock_joiner_auth() -> Session:
1002 return MockSession(did=second_user)
1003
1004 app.dependency_overrides[require_auth] = mock_joiner_auth
1005
1006 async with AsyncClient(
1007 transport=ASGITransport(app=test_app), base_url="http://test"
1008 ) as client:
1009 await client.post(f"/jams/{code}/join")
1010
1011 r = await _update_queue(client, code, ["t1", "t2", "t3"], 0)
1012 assert r["state"]["track_ids"] == ["t1", "t2", "t3"]
1013 assert r["tracks_changed"] is True
1014
1015
1016async def test_output_preserved_across_cross_client_commands(
1017 test_app: FastAPI, db_session: AsyncSession, second_user: str
1018) -> None:
1019 """test that output_client_id is preserved when non-output sends commands."""
1020 from starlette.websockets import WebSocket
1021
1022 from backend._internal import require_auth
1023
1024 # create jam as host
1025 async with AsyncClient(
1026 transport=ASGITransport(app=test_app), base_url="http://test"
1027 ) as client:
1028 create_response = await client.post(
1029 "/jams/",
1030 json={"track_ids": ["t1", "t2", "t3"], "is_playing": True},
1031 )
1032 code = create_response.json()["code"]
1033 jam_id = create_response.json()["id"]
1034
1035 # set up host as output device
1036 ws_host = AsyncMock(spec=WebSocket)
1037 await jam_service.connect_ws(jam_id, ws_host, "did:test:host")
1038 jam_service._ws_client_ids[ws_host] = "host-client-id"
1039
1040 async with AsyncClient(
1041 transport=ASGITransport(app=test_app), base_url="http://test"
1042 ) as client:
1043 set_output_response = await client.post(
1044 f"/jams/{code}/command",
1045 json={"type": "set_output", "client_id": "host-client-id"},
1046 )
1047 assert set_output_response.status_code == 200
1048 assert (
1049 set_output_response.json()["state"]["output_client_id"] == "host-client-id"
1050 )
1051
1052 # joiner joins and sends update_queue — output_client_id should be preserved
1053 async def mock_joiner_auth() -> Session:
1054 return MockSession(did=second_user)
1055
1056 app.dependency_overrides[require_auth] = mock_joiner_auth
1057
1058 async with AsyncClient(
1059 transport=ASGITransport(app=test_app), base_url="http://test"
1060 ) as client:
1061 await client.post(f"/jams/{code}/join")
1062
1063 r = await _update_queue(client, code, ["t1", "t2", "t3"], 1)
1064 state = r["state"]
1065 assert state["current_index"] == 1
1066 assert state["output_client_id"] == "host-client-id"
1067 assert state["output_did"] == "did:test:host"
1068 assert state["is_playing"] is True
1069
1070 # cleanup
1071 await jam_service.disconnect_ws(jam_id, ws_host)
1072
1073
1074# ── update_queue edge case tests ──────────────────────────────────
1075
1076
1077async def test_update_queue_resets_progress_on_track_change(
1078 test_app: FastAPI, db_session: AsyncSession
1079) -> None:
1080 """test that progress resets to 0 when current track changes via update_queue."""
1081 async with AsyncClient(
1082 transport=ASGITransport(app=test_app), base_url="http://test"
1083 ) as client:
1084 create_response = await client.post("/jams/", json={"track_ids": ["t1", "t2"]})
1085 code = create_response.json()["code"]
1086 await client.post(
1087 f"/jams/{code}/command", json={"type": "seek", "position_ms": 30000}
1088 )
1089 r = await _update_queue(client, code, ["t1", "t2"], 1)
1090
1091 assert r["state"]["current_track_id"] == "t2"
1092 assert r["state"]["progress_ms"] == 0
1093
1094
1095async def test_update_queue_preserves_progress_on_same_track(
1096 test_app: FastAPI, db_session: AsyncSession
1097) -> None:
1098 """test that progress is preserved when current track stays the same."""
1099 async with AsyncClient(
1100 transport=ASGITransport(app=test_app), base_url="http://test"
1101 ) as client:
1102 create_response = await client.post("/jams/", json={"track_ids": ["t1", "t2"]})
1103 code = create_response.json()["code"]
1104 await client.post(
1105 f"/jams/{code}/command", json={"type": "seek", "position_ms": 30000}
1106 )
1107 r = await _update_queue(client, code, ["t1", "t2", "t3"], 0)
1108
1109 assert r["state"]["current_track_id"] == "t1"
1110 assert r["state"]["progress_ms"] == 30000
1111
1112
1113async def test_update_queue_clamps_index(
1114 test_app: FastAPI, db_session: AsyncSession
1115) -> None:
1116 """test that out-of-bounds current_index is clamped."""
1117 tracks = ["t1", "t2", "t3"]
1118 async with AsyncClient(
1119 transport=ASGITransport(app=test_app), base_url="http://test"
1120 ) as client:
1121 create_response = await client.post("/jams/", json={"track_ids": tracks})
1122 code = create_response.json()["code"]
1123
1124 r = await _update_queue(client, code, tracks, 99)
1125 assert r["state"]["current_index"] == 2
1126 assert r["state"]["current_track_id"] == "t3"
1127
1128 r = await _update_queue(client, code, tracks, -5)
1129 assert r["state"]["current_index"] == 0
1130 assert r["state"]["current_track_id"] == "t1"
1131
1132 r = await _update_queue(client, code, [], 0)
1133 assert r["state"]["current_index"] == 0
1134 assert r["state"]["current_track_id"] is None
1135
1136
1137# ── output mode tests ──────────────────────────────────────────────
1138
1139
1140async def test_set_mode_host_only(
1141 test_app: FastAPI, db_session: AsyncSession, second_user: str
1142) -> None:
1143 """test that only the host can set output mode."""
1144 from backend._internal import require_auth
1145
1146 # create jam as host
1147 async with AsyncClient(
1148 transport=ASGITransport(app=test_app), base_url="http://test"
1149 ) as client:
1150 create_response = await client.post("/jams/", json={"track_ids": ["t1"]})
1151 code = create_response.json()["code"]
1152
1153 # switch to second user
1154 async def mock_joiner_auth() -> Session:
1155 return MockSession(did=second_user)
1156
1157 app.dependency_overrides[require_auth] = mock_joiner_auth
1158
1159 async with AsyncClient(
1160 transport=ASGITransport(app=test_app), base_url="http://test"
1161 ) as client:
1162 await client.post(f"/jams/{code}/join")
1163
1164 # non-host tries to set mode — should fail
1165 response = await client.post(
1166 f"/jams/{code}/command",
1167 json={"type": "set_mode", "mode": "everyone"},
1168 )
1169
1170 assert response.status_code == 400
1171
1172
1173async def test_set_mode_everyone(test_app: FastAPI, db_session: AsyncSession) -> None:
1174 """test that host can set mode to everyone and output fields are cleared."""
1175 from starlette.websockets import WebSocket
1176
1177 async with AsyncClient(
1178 transport=ASGITransport(app=test_app), base_url="http://test"
1179 ) as client:
1180 create_response = await client.post("/jams/", json={"track_ids": ["t1"]})
1181 code = create_response.json()["code"]
1182 jam_id = create_response.json()["id"]
1183
1184 # set up host as output device first
1185 ws = AsyncMock(spec=WebSocket)
1186 await jam_service.connect_ws(jam_id, ws, "did:test:host")
1187 jam_service._ws_client_ids[ws] = "host-client"
1188
1189 async with AsyncClient(
1190 transport=ASGITransport(app=test_app), base_url="http://test"
1191 ) as client:
1192 await client.post(
1193 f"/jams/{code}/command",
1194 json={"type": "set_output", "client_id": "host-client"},
1195 )
1196
1197 # switch to everyone mode
1198 response = await client.post(
1199 f"/jams/{code}/command",
1200 json={"type": "set_mode", "mode": "everyone"},
1201 )
1202
1203 assert response.status_code == 200
1204 state = response.json()["state"]
1205 assert state["output_mode"] == "everyone"
1206 assert state["output_client_id"] is None
1207 assert state["output_did"] is None
1208
1209 await jam_service.disconnect_ws(jam_id, ws)
1210
1211
1212async def test_set_mode_back_to_one_speaker(
1213 test_app: FastAPI, db_session: AsyncSession
1214) -> None:
1215 """test round-trip: one_speaker → everyone → one_speaker."""
1216 async with AsyncClient(
1217 transport=ASGITransport(app=test_app), base_url="http://test"
1218 ) as client:
1219 create_response = await client.post("/jams/", json={"track_ids": ["t1"]})
1220 code = create_response.json()["code"]
1221 assert create_response.json()["state"]["output_mode"] == "one_speaker"
1222
1223 # switch to everyone
1224 r1 = await client.post(
1225 f"/jams/{code}/command",
1226 json={"type": "set_mode", "mode": "everyone"},
1227 )
1228 assert r1.json()["state"]["output_mode"] == "everyone"
1229
1230 # switch back to one_speaker
1231 r2 = await client.post(
1232 f"/jams/{code}/command",
1233 json={"type": "set_mode", "mode": "one_speaker"},
1234 )
1235 assert r2.json()["state"]["output_mode"] == "one_speaker"
1236
1237
1238async def test_everyone_mode_skips_output_disconnect_pause(
1239 test_app: FastAPI, db_session: AsyncSession
1240) -> None:
1241 """test that in everyone mode, disconnecting a WS does NOT pause playback."""
1242 from starlette.websockets import WebSocket
1243
1244 async with AsyncClient(
1245 transport=ASGITransport(app=test_app), base_url="http://test"
1246 ) as client:
1247 create_response = await client.post(
1248 "/jams/", json={"track_ids": ["t1"], "is_playing": True}
1249 )
1250 code = create_response.json()["code"]
1251 jam_id = create_response.json()["id"]
1252
1253 # switch to everyone mode
1254 await client.post(
1255 f"/jams/{code}/command",
1256 json={"type": "set_mode", "mode": "everyone"},
1257 )
1258
1259 # connect a WS and then disconnect it
1260 ws = AsyncMock(spec=WebSocket)
1261 await jam_service.connect_ws(jam_id, ws, "did:test:host")
1262 jam_service._ws_client_ids[ws] = "host-client"
1263 await jam_service.disconnect_ws(jam_id, ws)
1264
1265 # jam should still be playing
1266 async with AsyncClient(
1267 transport=ASGITransport(app=test_app), base_url="http://test"
1268 ) as client:
1269 get_response = await client.get(f"/jams/{code}")
1270 state = get_response.json()["state"]
1271 assert state["is_playing"] is True
1272 assert state["output_mode"] == "everyone"
1273
1274
1275async def test_everyone_mode_skips_auto_output(
1276 test_app: FastAPI, db_session: AsyncSession
1277) -> None:
1278 """test that in everyone mode, host sync does NOT auto-assign output."""
1279 from starlette.websockets import WebSocket
1280
1281 async with AsyncClient(
1282 transport=ASGITransport(app=test_app), base_url="http://test"
1283 ) as client:
1284 create_response = await client.post("/jams/", json={"track_ids": ["t1"]})
1285 code = create_response.json()["code"]
1286 jam_id = create_response.json()["id"]
1287
1288 # switch to everyone mode
1289 await client.post(
1290 f"/jams/{code}/command",
1291 json={"type": "set_mode", "mode": "everyone"},
1292 )
1293
1294 # host connects and sends sync
1295 ws = AsyncMock(spec=WebSocket)
1296 await jam_service.connect_ws(jam_id, ws, "did:test:host")
1297 await jam_service._handle_sync(
1298 jam_id, "did:test:host", {"client_id": "host-abc", "last_id": None}, ws
1299 )
1300
1301 # output_client_id should remain None (not auto-set)
1302 async with AsyncClient(
1303 transport=ASGITransport(app=test_app), base_url="http://test"
1304 ) as client:
1305 get_response = await client.get(f"/jams/{code}")
1306 state = get_response.json()["state"]
1307 assert state["output_client_id"] is None
1308 assert state["output_mode"] == "everyone"
1309
1310 await jam_service.disconnect_ws(jam_id, ws)
1311
1312
1313async def test_set_mode_invalid_value(
1314 test_app: FastAPI, db_session: AsyncSession
1315) -> None:
1316 """test that an invalid mode value is rejected."""
1317 async with AsyncClient(
1318 transport=ASGITransport(app=test_app), base_url="http://test"
1319 ) as client:
1320 create_response = await client.post("/jams/", json={"track_ids": ["t1"]})
1321 code = create_response.json()["code"]
1322
1323 response = await client.post(
1324 f"/jams/{code}/command",
1325 json={"type": "set_mode", "mode": "invalid_mode"},
1326 )
1327
1328 assert response.status_code == 400
1329
1330
1331# ── output fallback tests ──────────────────────────────────────────
1332
1333
1334async def test_output_fallback_on_disconnect(
1335 test_app: FastAPI, db_session: AsyncSession, second_user: str
1336) -> None:
1337 """test that output reassigns to remaining client when output WS disconnects."""
1338 from starlette.websockets import WebSocket
1339
1340 from backend._internal import require_auth
1341
1342 async with AsyncClient(
1343 transport=ASGITransport(app=test_app), base_url="http://test"
1344 ) as client:
1345 create_response = await client.post(
1346 "/jams/", json={"track_ids": ["t1"], "is_playing": True}
1347 )
1348 code = create_response.json()["code"]
1349 jam_id = create_response.json()["id"]
1350
1351 # second user joins
1352 async def mock_joiner_auth() -> Session:
1353 return MockSession(did=second_user)
1354
1355 app.dependency_overrides[require_auth] = mock_joiner_auth
1356
1357 async with AsyncClient(
1358 transport=ASGITransport(app=test_app), base_url="http://test"
1359 ) as client:
1360 await client.post(f"/jams/{code}/join")
1361
1362 # restore host auth
1363 async def mock_host_auth() -> Session:
1364 return MockSession(did="did:test:host")
1365
1366 app.dependency_overrides[require_auth] = mock_host_auth
1367
1368 # connect both WSs
1369 ws_host = AsyncMock(spec=WebSocket)
1370 ws_joiner = AsyncMock(spec=WebSocket)
1371 await jam_service.connect_ws(jam_id, ws_host, "did:test:host")
1372 jam_service._ws_client_ids[ws_host] = "host-client"
1373 await jam_service.connect_ws(jam_id, ws_joiner, second_user)
1374 jam_service._ws_client_ids[ws_joiner] = "joiner-client"
1375
1376 # set joiner as output device
1377 app.dependency_overrides[require_auth] = mock_joiner_auth
1378 async with AsyncClient(
1379 transport=ASGITransport(app=test_app), base_url="http://test"
1380 ) as client:
1381 await client.post(
1382 f"/jams/{code}/command",
1383 json={"type": "set_output", "client_id": "joiner-client"},
1384 )
1385
1386 # disconnect the output device (joiner)
1387 await jam_service.disconnect_ws(jam_id, ws_joiner)
1388
1389 # output should have fallen back to host, playback should NOT be paused
1390 app.dependency_overrides[require_auth] = mock_host_auth
1391 async with AsyncClient(
1392 transport=ASGITransport(app=test_app), base_url="http://test"
1393 ) as client:
1394 get_response = await client.get(f"/jams/{code}")
1395 state = get_response.json()["state"]
1396 assert state["output_client_id"] == "host-client"
1397 assert state["output_did"] == "did:test:host"
1398 assert state["is_playing"] is True
1399
1400 await jam_service.disconnect_ws(jam_id, ws_host)
1401
1402
1403async def test_output_fallback_prefers_host(
1404 test_app: FastAPI, db_session: AsyncSession, second_user: str
1405) -> None:
1406 """test that fallback prefers host over other connected clients."""
1407 from starlette.websockets import WebSocket
1408
1409 from backend._internal import require_auth
1410
1411 # create a third user
1412 third_did = "did:test:third"
1413 third_artist = Artist(
1414 did=third_did,
1415 handle="test.third",
1416 display_name="Test Third",
1417 )
1418 db_session.add(third_artist)
1419 await db_session.commit()
1420
1421 async with AsyncClient(
1422 transport=ASGITransport(app=test_app), base_url="http://test"
1423 ) as client:
1424 create_response = await client.post(
1425 "/jams/", json={"track_ids": ["t1"], "is_playing": True}
1426 )
1427 code = create_response.json()["code"]
1428 jam_id = create_response.json()["id"]
1429
1430 # join second and third users
1431 for did in [second_user, third_did]:
1432
1433 async def _auth(did: str = did) -> Session:
1434 return MockSession(did=did)
1435
1436 app.dependency_overrides[require_auth] = _auth
1437 async with AsyncClient(
1438 transport=ASGITransport(app=test_app), base_url="http://test"
1439 ) as client:
1440 await client.post(f"/jams/{code}/join")
1441
1442 # connect all three WSs
1443 ws_host = AsyncMock(spec=WebSocket)
1444 ws_joiner = AsyncMock(spec=WebSocket)
1445 ws_third = AsyncMock(spec=WebSocket)
1446 await jam_service.connect_ws(jam_id, ws_host, "did:test:host")
1447 jam_service._ws_client_ids[ws_host] = "host-client"
1448 await jam_service.connect_ws(jam_id, ws_joiner, second_user)
1449 jam_service._ws_client_ids[ws_joiner] = "joiner-client"
1450 await jam_service.connect_ws(jam_id, ws_third, third_did)
1451 jam_service._ws_client_ids[ws_third] = "third-client"
1452
1453 # set third user as output
1454 async def mock_third_auth() -> Session:
1455 return MockSession(did=third_did)
1456
1457 app.dependency_overrides[require_auth] = mock_third_auth
1458 async with AsyncClient(
1459 transport=ASGITransport(app=test_app), base_url="http://test"
1460 ) as client:
1461 await client.post(
1462 f"/jams/{code}/command",
1463 json={"type": "set_output", "client_id": "third-client"},
1464 )
1465
1466 # disconnect the output (third user)
1467 await jam_service.disconnect_ws(jam_id, ws_third)
1468
1469 # fallback should prefer host over joiner
1470 async def mock_host_auth() -> Session:
1471 return MockSession(did="did:test:host")
1472
1473 app.dependency_overrides[require_auth] = mock_host_auth
1474 async with AsyncClient(
1475 transport=ASGITransport(app=test_app), base_url="http://test"
1476 ) as client:
1477 get_response = await client.get(f"/jams/{code}")
1478 state = get_response.json()["state"]
1479 assert state["output_client_id"] == "host-client"
1480 assert state["output_did"] == "did:test:host"
1481 assert state["is_playing"] is True
1482
1483 await jam_service.disconnect_ws(jam_id, ws_host)
1484 await jam_service.disconnect_ws(jam_id, ws_joiner)
1485
1486
1487async def test_non_host_auto_output_on_sync(
1488 test_app: FastAPI, db_session: AsyncSession, second_user: str
1489) -> None:
1490 """test that a non-host gets auto-assigned as output when no output is set."""
1491 from starlette.websockets import WebSocket
1492
1493 from backend._internal import require_auth
1494
1495 async with AsyncClient(
1496 transport=ASGITransport(app=test_app), base_url="http://test"
1497 ) as client:
1498 create_response = await client.post("/jams/", json={"track_ids": ["t1"]})
1499 code = create_response.json()["code"]
1500 jam_id = create_response.json()["id"]
1501 assert create_response.json()["state"]["output_client_id"] is None
1502
1503 # join as second user
1504 async def mock_joiner_auth() -> Session:
1505 return MockSession(did=second_user)
1506
1507 app.dependency_overrides[require_auth] = mock_joiner_auth
1508 async with AsyncClient(
1509 transport=ASGITransport(app=test_app), base_url="http://test"
1510 ) as client:
1511 await client.post(f"/jams/{code}/join")
1512
1513 # connect joiner WS and send sync with client_id (no host WS connected)
1514 ws = AsyncMock(spec=WebSocket)
1515 await jam_service.connect_ws(jam_id, ws, second_user)
1516 await jam_service._handle_sync(
1517 jam_id, second_user, {"client_id": "joiner-abc-123", "last_id": None}, ws
1518 )
1519
1520 # verify non-host was auto-assigned as output
1521 async def mock_host_auth() -> Session:
1522 return MockSession(did="did:test:host")
1523
1524 app.dependency_overrides[require_auth] = mock_host_auth
1525 async with AsyncClient(
1526 transport=ASGITransport(app=test_app), base_url="http://test"
1527 ) as client:
1528 get_response = await client.get(f"/jams/{code}")
1529 state = get_response.json()["state"]
1530 assert state["output_client_id"] == "joiner-abc-123"
1531 assert state["output_did"] == second_user
1532
1533 await jam_service.disconnect_ws(jam_id, ws)
1534
1535
1536async def test_jam_preview_returns_host_info(
1537 test_app: FastAPI, db_session: AsyncSession
1538) -> None:
1539 """test GET /jams/{code}/preview returns host info without auth."""
1540 # create jam as authenticated user
1541 async with AsyncClient(
1542 transport=ASGITransport(app=test_app), base_url="http://test"
1543 ) as client:
1544 create_response = await client.post("/jams/", json={"name": "preview test"})
1545 code = create_response.json()["code"]
1546
1547 # clear auth overrides — preview is public
1548 app.dependency_overrides.clear()
1549
1550 async with AsyncClient(
1551 transport=ASGITransport(app=test_app), base_url="http://test"
1552 ) as client:
1553 response = await client.get(f"/jams/{code}/preview")
1554
1555 assert response.status_code == 200
1556 data = response.json()
1557 assert data["code"] == code
1558 assert data["name"] == "preview test"
1559 assert data["is_active"] is True
1560 assert data["host_handle"] == "test.host"
1561 assert data["host_display_name"] == "Test Host"
1562 assert data["participant_count"] >= 1
1563
1564
1565async def test_jam_preview_not_found(
1566 test_app: FastAPI, db_session: AsyncSession
1567) -> None:
1568 """test GET /jams/{code}/preview returns 404 for nonexistent code."""
1569 app.dependency_overrides.clear()
1570
1571 async with AsyncClient(
1572 transport=ASGITransport(app=test_app), base_url="http://test"
1573 ) as client:
1574 response = await client.get("/jams/nonexist/preview")
1575
1576 assert response.status_code == 404