audio streaming app plyr.fm
at main 1576 lines 56 kB view raw
1"""tests for jam api endpoints.""" 2 3from collections.abc import AsyncGenerator 4from typing import Any 5from unittest.mock import AsyncMock 6 7import pytest 8from fastapi import FastAPI 9from httpx import ASGITransport, AsyncClient 10from sqlalchemy.ext.asyncio import AsyncSession 11 12from backend._internal import Session 13from backend._internal.jams import JamService, jam_service 14from backend.main import app 15from backend.models import Artist 16 17 18async def _update_queue( 19 client: AsyncClient, code: str, track_ids: list[str], current_index: int 20) -> dict[str, Any]: 21 """send an update_queue command and return the response json.""" 22 r = await client.post( 23 f"/jams/{code}/command", 24 json={ 25 "type": "update_queue", 26 "track_ids": track_ids, 27 "current_index": current_index, 28 }, 29 ) 30 assert r.status_code == 200 31 return r.json() 32 33 34class MockSession(Session): 35 """mock session for auth bypass in tests.""" 36 37 def __init__(self, did: str = "did:test:host"): 38 self.did = did 39 self.access_token = "test_token" 40 self.refresh_token = "test_refresh" 41 self.session_id = "test_session" 42 self.handle = "test.host" 43 self.oauth_session = {} 44 45 46@pytest.fixture 47async def test_app(db_session: AsyncSession) -> AsyncGenerator[FastAPI, None]: 48 """create test app with mocked auth and jams flag.""" 49 from backend._internal import require_auth 50 51 mock_session = MockSession() 52 53 async def mock_require_auth() -> Session: 54 return mock_session 55 56 app.dependency_overrides[require_auth] = mock_require_auth 57 58 # create the test artist 59 artist = Artist( 60 did="did:test:host", 61 handle="test.host", 62 display_name="Test Host", 63 ) 64 db_session.add(artist) 65 await db_session.flush() 66 67 await db_session.commit() 68 69 yield app 70 71 app.dependency_overrides.clear() 72 73 74@pytest.fixture 75async def second_user(db_session: AsyncSession) -> str: 76 """create a second test artist with jams flag.""" 77 artist = Artist( 78 did="did:test:joiner", 79 handle="test.joiner", 80 display_name="Test Joiner", 81 ) 82 db_session.add(artist) 83 await db_session.commit() 84 return "did:test:joiner" 85 86 87async def test_create_jam(test_app: FastAPI, db_session: AsyncSession) -> None: 88 """test POST /jams/ creates a jam.""" 89 async with AsyncClient( 90 transport=ASGITransport(app=test_app), base_url="http://test" 91 ) as client: 92 response = await client.post( 93 "/jams/", 94 json={"name": "test jam", "track_ids": ["track1", "track2"]}, 95 ) 96 97 assert response.status_code == 200 98 data = response.json() 99 assert data["name"] == "test jam" 100 assert data["host_did"] == "did:test:host" 101 assert data["is_active"] is True 102 assert len(data["code"]) == 8 103 assert data["state"]["track_ids"] == ["track1", "track2"] 104 assert data["state"]["current_index"] == 0 105 assert data["state"]["current_track_id"] == "track1" 106 assert data["revision"] == 1 107 108 109async def test_create_jam_empty(test_app: FastAPI, db_session: AsyncSession) -> None: 110 """test creating a jam with no tracks.""" 111 async with AsyncClient( 112 transport=ASGITransport(app=test_app), base_url="http://test" 113 ) as client: 114 response = await client.post("/jams/", json={}) 115 116 assert response.status_code == 200 117 data = response.json() 118 assert data["state"]["track_ids"] == [] 119 assert data["state"]["is_playing"] is False 120 121 122async def test_get_jam_by_code(test_app: FastAPI, db_session: AsyncSession) -> None: 123 """test GET /jams/{code} returns jam details.""" 124 async with AsyncClient( 125 transport=ASGITransport(app=test_app), base_url="http://test" 126 ) as client: 127 create_response = await client.post("/jams/", json={"name": "get test"}) 128 code = create_response.json()["code"] 129 130 response = await client.get(f"/jams/{code}") 131 132 assert response.status_code == 200 133 data = response.json() 134 assert data["code"] == code 135 assert data["name"] == "get test" 136 137 138async def test_get_jam_not_found(test_app: FastAPI, db_session: AsyncSession) -> None: 139 """test GET /jams/{code} returns 404 for unknown code.""" 140 async with AsyncClient( 141 transport=ASGITransport(app=test_app), base_url="http://test" 142 ) as client: 143 response = await client.get("/jams/nonexist") 144 145 assert response.status_code == 404 146 147 148async def test_join_jam( 149 test_app: FastAPI, db_session: AsyncSession, second_user: str 150) -> None: 151 """test POST /jams/{code}/join adds a participant.""" 152 from backend._internal import require_auth 153 154 # create jam as host 155 async with AsyncClient( 156 transport=ASGITransport(app=test_app), base_url="http://test" 157 ) as client: 158 create_response = await client.post("/jams/", json={"name": "join test"}) 159 code = create_response.json()["code"] 160 161 # switch to second user 162 async def mock_joiner_auth() -> Session: 163 return MockSession(did=second_user) 164 165 app.dependency_overrides[require_auth] = mock_joiner_auth 166 167 async with AsyncClient( 168 transport=ASGITransport(app=test_app), base_url="http://test" 169 ) as client: 170 response = await client.post(f"/jams/{code}/join") 171 172 assert response.status_code == 200 173 data = response.json() 174 assert len(data["participants"]) == 2 175 participant_dids = {p["did"] for p in data["participants"]} 176 assert "did:test:host" in participant_dids 177 assert second_user in participant_dids 178 179 180async def test_leave_jam(test_app: FastAPI, db_session: AsyncSession) -> None: 181 """test POST /jams/{code}/leave removes participant.""" 182 async with AsyncClient( 183 transport=ASGITransport(app=test_app), base_url="http://test" 184 ) as client: 185 create_response = await client.post("/jams/", json={"name": "leave test"}) 186 code = create_response.json()["code"] 187 188 response = await client.post(f"/jams/{code}/leave") 189 190 assert response.status_code == 200 191 assert response.json()["ok"] is True 192 193 194async def test_leave_ends_jam_when_last( 195 test_app: FastAPI, db_session: AsyncSession 196) -> None: 197 """test that leaving as last participant ends the jam.""" 198 async with AsyncClient( 199 transport=ASGITransport(app=test_app), base_url="http://test" 200 ) as client: 201 create_response = await client.post("/jams/", json={"name": "last leave test"}) 202 code = create_response.json()["code"] 203 204 # leave (only participant) 205 await client.post(f"/jams/{code}/leave") 206 207 # jam should no longer be active 208 get_response = await client.get(f"/jams/{code}") 209 210 assert get_response.status_code == 200 211 assert get_response.json()["is_active"] is False 212 213 214async def test_end_jam_host_only( 215 test_app: FastAPI, db_session: AsyncSession, second_user: str 216) -> None: 217 """test that only the host can end a jam.""" 218 from backend._internal import require_auth 219 220 # create jam as host 221 async with AsyncClient( 222 transport=ASGITransport(app=test_app), base_url="http://test" 223 ) as client: 224 create_response = await client.post("/jams/", json={"name": "end test"}) 225 code = create_response.json()["code"] 226 227 # switch to second user and try to end 228 async def mock_joiner_auth() -> Session: 229 return MockSession(did=second_user) 230 231 app.dependency_overrides[require_auth] = mock_joiner_auth 232 233 async with AsyncClient( 234 transport=ASGITransport(app=test_app), base_url="http://test" 235 ) as client: 236 # join first 237 await client.post(f"/jams/{code}/join") 238 # try to end 239 response = await client.post(f"/jams/{code}/end") 240 241 assert response.status_code == 403 242 243 244async def test_end_jam_by_host(test_app: FastAPI, db_session: AsyncSession) -> None: 245 """test host can end their jam.""" 246 async with AsyncClient( 247 transport=ASGITransport(app=test_app), base_url="http://test" 248 ) as client: 249 create_response = await client.post("/jams/", json={"name": "host end test"}) 250 code = create_response.json()["code"] 251 252 response = await client.post(f"/jams/{code}/end") 253 254 assert response.status_code == 200 255 assert response.json()["ok"] is True 256 257 258async def test_command_play_pause(test_app: FastAPI, db_session: AsyncSession) -> None: 259 """test play and pause commands.""" 260 async with AsyncClient( 261 transport=ASGITransport(app=test_app), base_url="http://test" 262 ) as client: 263 create_response = await client.post( 264 "/jams/", 265 json={"track_ids": ["t1"]}, 266 ) 267 code = create_response.json()["code"] 268 269 # play 270 play_response = await client.post( 271 f"/jams/{code}/command", json={"type": "play"} 272 ) 273 assert play_response.status_code == 200 274 assert play_response.json()["state"]["is_playing"] is True 275 276 # pause 277 pause_response = await client.post( 278 f"/jams/{code}/command", json={"type": "pause"} 279 ) 280 assert pause_response.status_code == 200 281 assert pause_response.json()["state"]["is_playing"] is False 282 283 284async def test_command_seek(test_app: FastAPI, db_session: AsyncSession) -> None: 285 """test seek command.""" 286 async with AsyncClient( 287 transport=ASGITransport(app=test_app), base_url="http://test" 288 ) as client: 289 create_response = await client.post( 290 "/jams/", 291 json={"track_ids": ["t1"]}, 292 ) 293 code = create_response.json()["code"] 294 295 response = await client.post( 296 f"/jams/{code}/command", 297 json={"type": "seek", "position_ms": 30000}, 298 ) 299 300 assert response.status_code == 200 301 assert response.json()["state"]["progress_ms"] == 30000 302 303 304async def test_update_queue_change_index( 305 test_app: FastAPI, db_session: AsyncSession 306) -> None: 307 """test update_queue to advance and go back (replaces next/previous).""" 308 tracks = ["t1", "t2", "t3"] 309 async with AsyncClient( 310 transport=ASGITransport(app=test_app), base_url="http://test" 311 ) as client: 312 create_response = await client.post("/jams/", json={"track_ids": tracks}) 313 code = create_response.json()["code"] 314 315 r1 = await _update_queue(client, code, tracks, 1) 316 assert r1["state"]["current_index"] == 1 317 assert r1["state"]["current_track_id"] == "t2" 318 319 r2 = await _update_queue(client, code, tracks, 2) 320 assert r2["state"]["current_index"] == 2 321 322 r3 = await _update_queue(client, code, tracks, 1) 323 assert r3["state"]["current_index"] == 1 324 assert r3["state"]["current_track_id"] == "t2" 325 326 327async def test_update_queue_add_tracks( 328 test_app: FastAPI, db_session: AsyncSession 329) -> None: 330 """test update_queue with extended track list (replaces add_tracks).""" 331 async with AsyncClient( 332 transport=ASGITransport(app=test_app), base_url="http://test" 333 ) as client: 334 create_response = await client.post("/jams/", json={"track_ids": ["t1"]}) 335 code = create_response.json()["code"] 336 r = await _update_queue(client, code, ["t1", "t2", "t3"], 0) 337 338 assert r["state"]["track_ids"] == ["t1", "t2", "t3"] 339 assert r["tracks_changed"] is True 340 341 342async def test_sequential_update_queue( 343 test_app: FastAPI, db_session: AsyncSession 344) -> None: 345 """two update_queue calls: extend tracks, then advance — regression for shallow-copy bug.""" 346 tracks = ["t1", "t2", "t3"] 347 async with AsyncClient( 348 transport=ASGITransport(app=test_app), base_url="http://test" 349 ) as client: 350 create_response = await client.post("/jams/", json={"track_ids": ["t1"]}) 351 code = create_response.json()["code"] 352 await _update_queue(client, code, tracks, 0) 353 r = await _update_queue(client, code, tracks, 1) 354 355 assert r["state"]["track_ids"] == tracks 356 assert r["state"]["current_index"] == 1 357 assert r["state"]["current_track_id"] == "t2" 358 359 360async def test_update_queue_remove_track( 361 test_app: FastAPI, db_session: AsyncSession 362) -> None: 363 """test update_queue with a track removed.""" 364 async with AsyncClient( 365 transport=ASGITransport(app=test_app), base_url="http://test" 366 ) as client: 367 create_response = await client.post( 368 "/jams/", json={"track_ids": ["t1", "t2", "t3"]} 369 ) 370 code = create_response.json()["code"] 371 r = await _update_queue(client, code, ["t1", "t3"], 0) 372 373 assert r["state"]["track_ids"] == ["t1", "t3"] 374 assert r["tracks_changed"] is True 375 376 377async def test_update_queue_reorder( 378 test_app: FastAPI, db_session: AsyncSession 379) -> None: 380 """test update_queue with reordered track list (replaces move_track).""" 381 async with AsyncClient( 382 transport=ASGITransport(app=test_app), base_url="http://test" 383 ) as client: 384 create_response = await client.post( 385 "/jams/", json={"track_ids": ["t1", "t2", "t3", "t4"]} 386 ) 387 code = create_response.json()["code"] 388 389 r1 = await _update_queue(client, code, ["t3", "t1", "t2", "t4"], 1) 390 assert r1["state"]["track_ids"] == ["t3", "t1", "t2", "t4"] 391 assert r1["state"]["current_index"] == 1 392 assert r1["state"]["current_track_id"] == "t1" 393 394 r2 = await _update_queue(client, code, ["t3", "t2", "t4", "t1"], 3) 395 assert r2["state"]["track_ids"] == ["t3", "t2", "t4", "t1"] 396 assert r2["state"]["current_index"] == 3 397 assert r2["state"]["current_track_id"] == "t1" 398 assert r2["tracks_changed"] is True 399 400 401async def test_update_queue_same_tracks_no_change( 402 test_app: FastAPI, db_session: AsyncSession 403) -> None: 404 """test update_queue with same tracks reports tracks_changed=False.""" 405 tracks = ["t1", "t2"] 406 async with AsyncClient( 407 transport=ASGITransport(app=test_app), base_url="http://test" 408 ) as client: 409 create_response = await client.post("/jams/", json={"track_ids": tracks}) 410 code = create_response.json()["code"] 411 r = await _update_queue(client, code, tracks, 0) 412 413 assert r["state"]["track_ids"] == tracks 414 assert r["tracks_changed"] is False 415 416 417async def test_update_queue_clear_upcoming( 418 test_app: FastAPI, db_session: AsyncSession 419) -> None: 420 """test update_queue with truncated list (replaces clear_upcoming).""" 421 full = ["t1", "t2", "t3", "t4"] 422 async with AsyncClient( 423 transport=ASGITransport(app=test_app), base_url="http://test" 424 ) as client: 425 create_response = await client.post("/jams/", json={"track_ids": full}) 426 code = create_response.json()["code"] 427 await _update_queue(client, code, full, 1) 428 r = await _update_queue(client, code, ["t1", "t2"], 1) 429 430 assert r["state"]["track_ids"] == ["t1", "t2"] 431 assert r["state"]["current_index"] == 1 432 assert r["state"]["current_track_id"] == "t2" 433 assert r["tracks_changed"] is True 434 435 436async def test_revision_monotonicity( 437 test_app: FastAPI, db_session: AsyncSession 438) -> None: 439 """test that revision increases monotonically.""" 440 async with AsyncClient( 441 transport=ASGITransport(app=test_app), base_url="http://test" 442 ) as client: 443 create_response = await client.post( 444 "/jams/", 445 json={"track_ids": ["t1"]}, 446 ) 447 code = create_response.json()["code"] 448 rev = create_response.json()["revision"] 449 assert rev == 1 450 451 for _ in range(5): 452 cmd_response = await client.post( 453 f"/jams/{code}/command", json={"type": "play"} 454 ) 455 new_rev = cmd_response.json()["revision"] 456 assert new_rev > rev 457 rev = new_rev 458 459 460async def test_auto_leave_previous_jam( 461 test_app: FastAPI, db_session: AsyncSession 462) -> None: 463 """test that creating a new jam auto-leaves the previous one.""" 464 async with AsyncClient( 465 transport=ASGITransport(app=test_app), base_url="http://test" 466 ) as client: 467 # create first jam 468 first_response = await client.post("/jams/", json={"name": "first jam"}) 469 first_code = first_response.json()["code"] 470 471 # create second jam (should auto-leave first) 472 second_response = await client.post("/jams/", json={"name": "second jam"}) 473 second_code = second_response.json()["code"] 474 475 assert first_code != second_code 476 477 # check active jam is the second one 478 active_response = await client.get("/jams/active") 479 480 assert active_response.status_code == 200 481 assert active_response.json()["code"] == second_code 482 483 484async def test_get_active_jam_none(test_app: FastAPI, db_session: AsyncSession) -> None: 485 """test GET /jams/active returns null when not in a jam.""" 486 async with AsyncClient( 487 transport=ASGITransport(app=test_app), base_url="http://test" 488 ) as client: 489 response = await client.get("/jams/active") 490 491 assert response.status_code == 200 492 assert response.json() is None 493 494 495async def test_code_uniqueness(test_app: FastAPI, db_session: AsyncSession) -> None: 496 """test that each jam gets a unique code.""" 497 codes = set() 498 async with AsyncClient( 499 transport=ASGITransport(app=test_app), base_url="http://test" 500 ) as client: 501 for i in range(5): 502 response = await client.post("/jams/", json={"name": f"jam {i}"}) 503 assert response.status_code == 200 504 codes.add(response.json()["code"]) 505 506 assert len(codes) == 5 507 508 509async def test_update_queue_set_index( 510 test_app: FastAPI, db_session: AsyncSession 511) -> None: 512 """test update_queue to jump to a specific track (replaces set_index).""" 513 tracks = ["t1", "t2", "t3", "t4"] 514 async with AsyncClient( 515 transport=ASGITransport(app=test_app), base_url="http://test" 516 ) as client: 517 create_response = await client.post("/jams/", json={"track_ids": tracks}) 518 code = create_response.json()["code"] 519 520 r1 = await _update_queue(client, code, tracks, 2) 521 assert r1["state"]["current_index"] == 2 522 assert r1["state"]["current_track_id"] == "t3" 523 assert r1["state"]["progress_ms"] == 0 524 525 r2 = await _update_queue(client, code, tracks, 0) 526 assert r2["state"]["current_index"] == 0 527 assert r2["state"]["current_track_id"] == "t1" 528 529 530async def test_command_non_participant_rejected( 531 test_app: FastAPI, db_session: AsyncSession, second_user: str 532) -> None: 533 """test that non-participants cannot send commands.""" 534 from backend._internal import require_auth 535 536 # create jam as host 537 async with AsyncClient( 538 transport=ASGITransport(app=test_app), base_url="http://test" 539 ) as client: 540 create_response = await client.post("/jams/", json={"track_ids": ["t1", "t2"]}) 541 code = create_response.json()["code"] 542 543 # switch to second user (NOT joined) 544 async def mock_joiner_auth() -> Session: 545 return MockSession(did=second_user) 546 547 app.dependency_overrides[require_auth] = mock_joiner_auth 548 549 async with AsyncClient( 550 transport=ASGITransport(app=test_app), base_url="http://test" 551 ) as client: 552 response = await client.post( 553 f"/jams/{code}/command", 554 json={ 555 "type": "update_queue", 556 "track_ids": ["t1", "t2"], 557 "current_index": 1, 558 }, 559 ) 560 561 assert response.status_code == 400 562 563 564async def test_sequential_commands_get_distinct_revisions( 565 test_app: FastAPI, db_session: AsyncSession 566) -> None: 567 """test that each command gets a distinct revision (verifies no clobbering).""" 568 async with AsyncClient( 569 transport=ASGITransport(app=test_app), base_url="http://test" 570 ) as client: 571 create_response = await client.post( 572 "/jams/", json={"track_ids": ["t1", "t2", "t3"]} 573 ) 574 code = create_response.json()["code"] 575 assert create_response.json()["revision"] == 1 576 577 tracks = ["t1", "t2", "t3"] 578 r1 = await _update_queue(client, code, tracks, 1) 579 r2 = await _update_queue(client, code, tracks, 2) 580 581 assert r1["revision"] == 2 582 assert r2["revision"] == 3 583 assert r1["state"]["current_index"] == 1 584 assert r2["state"]["current_index"] == 2 585 586 587async def test_did_socket_replacement() -> None: 588 """test that connecting a second WS for the same DID closes the first.""" 589 from starlette.websockets import WebSocket 590 591 service = JamService() 592 jam_id = "test-jam-123" 593 594 ws1 = AsyncMock(spec=WebSocket) 595 ws2 = AsyncMock(spec=WebSocket) 596 597 # connect first socket 598 await service.connect_ws(jam_id, ws1, "did:test:user") 599 assert jam_id in service._connections 600 assert ws1 in service._connections[jam_id] 601 602 # connect second socket for same DID — should close first 603 await service.connect_ws(jam_id, ws2, "did:test:user") 604 605 # first socket should have been closed with code 4010 606 ws1.close.assert_awaited_once_with(code=4010, reason="replaced by new connection") 607 608 # only second socket should be in connections 609 assert ws2 in service._connections[jam_id] 610 assert ws1 not in service._connections[jam_id] 611 612 # DID mapping should point to second socket 613 assert service._ws_by_did["did:test:user"] == (jam_id, ws2) 614 615 616# ── output device tests ──────────────────────────────────────────── 617 618 619async def test_create_jam_has_null_output( 620 test_app: FastAPI, db_session: AsyncSession 621) -> None: 622 """test that newly created jams have output_client_id = null.""" 623 async with AsyncClient( 624 transport=ASGITransport(app=test_app), base_url="http://test" 625 ) as client: 626 response = await client.post("/jams/", json={"track_ids": ["t1"]}) 627 628 assert response.status_code == 200 629 assert response.json()["state"]["output_client_id"] is None 630 631 632async def test_auto_set_output_on_host_sync( 633 test_app: FastAPI, db_session: AsyncSession 634) -> None: 635 """test that output_client_id is auto-set to host on first sync with real DB state.""" 636 from starlette.websockets import WebSocket 637 638 # create jam via API (writes to DB) 639 async with AsyncClient( 640 transport=ASGITransport(app=test_app), base_url="http://test" 641 ) as client: 642 create_response = await client.post("/jams/", json={"track_ids": ["t1"]}) 643 code = create_response.json()["code"] 644 jam_id = create_response.json()["id"] 645 assert create_response.json()["state"]["output_client_id"] is None 646 647 # connect host WS and send sync with client_id 648 ws = AsyncMock(spec=WebSocket) 649 await jam_service.connect_ws(jam_id, ws, "did:test:host") 650 await jam_service._handle_sync( 651 jam_id, "did:test:host", {"client_id": "host-abc-123", "last_id": None}, ws 652 ) 653 654 # verify DB state was updated 655 async with AsyncClient( 656 transport=ASGITransport(app=test_app), base_url="http://test" 657 ) as client: 658 get_response = await client.get(f"/jams/{code}") 659 state = get_response.json()["state"] 660 assert state["output_client_id"] == "host-abc-123" 661 assert state["output_did"] == "did:test:host" 662 663 # cleanup 664 await jam_service.disconnect_ws(jam_id, ws) 665 666 667async def test_set_output_command(test_app: FastAPI, db_session: AsyncSession) -> None: 668 """test set_output command changes output_client_id in state.""" 669 from starlette.websockets import WebSocket 670 671 async with AsyncClient( 672 transport=ASGITransport(app=test_app), base_url="http://test" 673 ) as client: 674 create_response = await client.post("/jams/", json={"track_ids": ["t1"]}) 675 code = create_response.json()["code"] 676 jam_id = create_response.json()["id"] 677 678 # register a WS with a client_id so set_output can validate 679 ws = AsyncMock(spec=WebSocket) 680 await jam_service.connect_ws(jam_id, ws, "did:test:host") 681 jam_service._ws_client_ids[ws] = "my-client-id" 682 683 async with AsyncClient( 684 transport=ASGITransport(app=test_app), base_url="http://test" 685 ) as client: 686 response = await client.post( 687 f"/jams/{code}/command", 688 json={"type": "set_output", "client_id": "my-client-id"}, 689 ) 690 691 assert response.status_code == 200 692 assert response.json()["state"]["output_client_id"] == "my-client-id" 693 694 # cleanup 695 await jam_service.disconnect_ws(jam_id, ws) 696 697 698async def test_set_output_validates_client_id( 699 test_app: FastAPI, db_session: AsyncSession 700) -> None: 701 """test that set_output rejects client_id that doesn't match sender's WS.""" 702 from starlette.websockets import WebSocket 703 704 async with AsyncClient( 705 transport=ASGITransport(app=test_app), base_url="http://test" 706 ) as client: 707 create_response = await client.post("/jams/", json={"track_ids": ["t1"]}) 708 code = create_response.json()["code"] 709 jam_id = create_response.json()["id"] 710 711 # register WS with one client_id 712 ws = AsyncMock(spec=WebSocket) 713 await jam_service.connect_ws(jam_id, ws, "did:test:host") 714 jam_service._ws_client_ids[ws] = "real-client-id" 715 716 # try to set output with a different client_id 717 async with AsyncClient( 718 transport=ASGITransport(app=test_app), base_url="http://test" 719 ) as client: 720 response = await client.post( 721 f"/jams/{code}/command", 722 json={"type": "set_output", "client_id": "spoofed-client-id"}, 723 ) 724 725 # should fail — client_id mismatch 726 assert response.status_code == 400 727 728 # cleanup 729 await jam_service.disconnect_ws(jam_id, ws) 730 731 732async def test_output_disconnect_no_remaining_pauses( 733 test_app: FastAPI, db_session: AsyncSession 734) -> None: 735 """test that output clears and playback pauses when the only client disconnects (no fallback).""" 736 from starlette.websockets import WebSocket 737 738 # create jam and set output via API 739 async with AsyncClient( 740 transport=ASGITransport(app=test_app), base_url="http://test" 741 ) as client: 742 create_response = await client.post( 743 "/jams/", json={"track_ids": ["t1"], "is_playing": True} 744 ) 745 code = create_response.json()["code"] 746 jam_id = create_response.json()["id"] 747 748 # register host WS as output device 749 ws = AsyncMock(spec=WebSocket) 750 await jam_service.connect_ws(jam_id, ws, "did:test:host") 751 jam_service._ws_client_ids[ws] = "host-output-client" 752 753 async with AsyncClient( 754 transport=ASGITransport(app=test_app), base_url="http://test" 755 ) as client: 756 await client.post( 757 f"/jams/{code}/command", 758 json={"type": "set_output", "client_id": "host-output-client"}, 759 ) 760 761 # disconnect output device (no other clients connected) 762 await jam_service.disconnect_ws(jam_id, ws) 763 764 # verify DB state: output cleared, playback paused 765 async with AsyncClient( 766 transport=ASGITransport(app=test_app), base_url="http://test" 767 ) as client: 768 get_response = await client.get(f"/jams/{code}") 769 state = get_response.json()["state"] 770 assert state["output_client_id"] is None 771 assert state["output_did"] is None 772 assert state["is_playing"] is False 773 774 775async def test_output_clears_on_ws_replacement( 776 test_app: FastAPI, db_session: AsyncSession 777) -> None: 778 """regression: _close_ws_for_did must clear output before removing client_id mapping. 779 780 when a user reconnects (new WS replaces old), _close_ws_for_did fires instead of 781 disconnect_ws. if it pops the client_id first, the output stays pinned to a dead device. 782 """ 783 from starlette.websockets import WebSocket 784 785 # create jam and set host as output 786 async with AsyncClient( 787 transport=ASGITransport(app=test_app), base_url="http://test" 788 ) as client: 789 create_response = await client.post( 790 "/jams/", json={"track_ids": ["t1"], "is_playing": True} 791 ) 792 code = create_response.json()["code"] 793 jam_id = create_response.json()["id"] 794 795 ws1 = AsyncMock(spec=WebSocket) 796 await jam_service.connect_ws(jam_id, ws1, "did:test:host") 797 jam_service._ws_client_ids[ws1] = "old-client-id" 798 799 async with AsyncClient( 800 transport=ASGITransport(app=test_app), base_url="http://test" 801 ) as client: 802 await client.post( 803 f"/jams/{code}/command", 804 json={"type": "set_output", "client_id": "old-client-id"}, 805 ) 806 807 # reconnect — new WS replaces old (this calls _close_ws_for_did internally) 808 ws2 = AsyncMock(spec=WebSocket) 809 await jam_service.connect_ws(jam_id, ws2, "did:test:host") 810 811 # output should have been cleared by _close_ws_for_did 812 async with AsyncClient( 813 transport=ASGITransport(app=test_app), base_url="http://test" 814 ) as client: 815 get_response = await client.get(f"/jams/{code}") 816 state = get_response.json()["state"] 817 assert state["output_client_id"] is None, ( 818 "output_client_id should be cleared when output device's WS is replaced" 819 ) 820 assert state["is_playing"] is False 821 822 # cleanup 823 await jam_service.disconnect_ws(jam_id, ws2) 824 825 826async def test_non_output_disconnect_no_effect() -> None: 827 """test that a non-output participant disconnecting doesn't affect output.""" 828 from starlette.websockets import WebSocket 829 830 service = JamService() 831 jam_id = "test-noeffect" 832 833 ws_host = AsyncMock(spec=WebSocket) 834 ws_joiner = AsyncMock(spec=WebSocket) 835 836 await service.connect_ws(jam_id, ws_host, "did:test:host") 837 service._ws_client_ids[ws_host] = "host-client" 838 839 await service.connect_ws(jam_id, ws_joiner, "did:test:joiner") 840 service._ws_client_ids[ws_joiner] = "joiner-client" 841 842 # disconnect the non-output joiner 843 await service.disconnect_ws(jam_id, ws_joiner) 844 845 # host's client_id should still be tracked 846 assert service._ws_client_ids[ws_host] == "host-client" 847 assert ws_host in service._connections[jam_id] 848 849 # cleanup 850 await service.disconnect_ws(jam_id, ws_host) 851 852 853# ── cross-client command tests ──────────────────────────────────── 854 855 856async def test_cross_client_update_queue( 857 test_app: FastAPI, db_session: AsyncSession, second_user: str 858) -> None: 859 """test that a non-host participant can send update_queue and it updates state for all.""" 860 from backend._internal import require_auth 861 862 tracks = ["t1", "t2", "t3", "t4"] 863 864 # create jam as host with 4 tracks 865 async with AsyncClient( 866 transport=ASGITransport(app=test_app), base_url="http://test" 867 ) as client: 868 create_response = await client.post( 869 "/jams/", 870 json={ 871 "name": "cross client test", 872 "track_ids": tracks, 873 "is_playing": True, 874 }, 875 ) 876 code = create_response.json()["code"] 877 assert create_response.json()["state"]["current_index"] == 0 878 assert create_response.json()["state"]["current_track_id"] == "t1" 879 880 # second user joins 881 async def mock_joiner_auth() -> Session: 882 return MockSession(did=second_user) 883 884 app.dependency_overrides[require_auth] = mock_joiner_auth 885 886 async with AsyncClient( 887 transport=ASGITransport(app=test_app), base_url="http://test" 888 ) as client: 889 join_response = await client.post(f"/jams/{code}/join") 890 assert join_response.status_code == 200 891 892 r1 = await _update_queue(client, code, tracks, 1) 893 assert r1["state"]["current_index"] == 1 894 assert r1["state"]["current_track_id"] == "t2" 895 assert r1["state"]["progress_ms"] == 0 896 assert r1["state"]["is_playing"] is True 897 898 r2 = await _update_queue(client, code, tracks, 2) 899 assert r2["state"]["current_index"] == 2 900 assert r2["state"]["current_track_id"] == "t3" 901 902 # verify state from host's perspective 903 async def mock_host_auth() -> Session: 904 return MockSession(did="did:test:host") 905 906 app.dependency_overrides[require_auth] = mock_host_auth 907 908 async with AsyncClient( 909 transport=ASGITransport(app=test_app), base_url="http://test" 910 ) as client: 911 get_response = await client.get(f"/jams/{code}") 912 assert get_response.status_code == 200 913 host_view = get_response.json() 914 assert host_view["state"]["current_index"] == 2 915 assert host_view["state"]["current_track_id"] == "t3" 916 917 918async def test_cross_client_play_pause( 919 test_app: FastAPI, db_session: AsyncSession, second_user: str 920) -> None: 921 """test that a non-host participant can play/pause.""" 922 from backend._internal import require_auth 923 924 # create jam as host (starts paused) 925 async with AsyncClient( 926 transport=ASGITransport(app=test_app), base_url="http://test" 927 ) as client: 928 create_response = await client.post( 929 "/jams/", 930 json={"track_ids": ["t1", "t2"]}, 931 ) 932 code = create_response.json()["code"] 933 assert create_response.json()["state"]["is_playing"] is False 934 935 # joiner joins and sends play 936 async def mock_joiner_auth() -> Session: 937 return MockSession(did=second_user) 938 939 app.dependency_overrides[require_auth] = mock_joiner_auth 940 941 async with AsyncClient( 942 transport=ASGITransport(app=test_app), base_url="http://test" 943 ) as client: 944 await client.post(f"/jams/{code}/join") 945 946 play_response = await client.post( 947 f"/jams/{code}/command", json={"type": "play"} 948 ) 949 assert play_response.status_code == 200 950 assert play_response.json()["state"]["is_playing"] is True 951 952 pause_response = await client.post( 953 f"/jams/{code}/command", json={"type": "pause"} 954 ) 955 assert pause_response.status_code == 200 956 assert pause_response.json()["state"]["is_playing"] is False 957 958 959async def test_cross_client_seek( 960 test_app: FastAPI, db_session: AsyncSession, second_user: str 961) -> None: 962 """test that a non-host participant can seek.""" 963 from backend._internal import require_auth 964 965 async with AsyncClient( 966 transport=ASGITransport(app=test_app), base_url="http://test" 967 ) as client: 968 create_response = await client.post("/jams/", json={"track_ids": ["t1"]}) 969 code = create_response.json()["code"] 970 971 async def mock_joiner_auth() -> Session: 972 return MockSession(did=second_user) 973 974 app.dependency_overrides[require_auth] = mock_joiner_auth 975 976 async with AsyncClient( 977 transport=ASGITransport(app=test_app), base_url="http://test" 978 ) as client: 979 await client.post(f"/jams/{code}/join") 980 981 seek_response = await client.post( 982 f"/jams/{code}/command", 983 json={"type": "seek", "position_ms": 45000}, 984 ) 985 assert seek_response.status_code == 200 986 assert seek_response.json()["state"]["progress_ms"] == 45000 987 988 989async def test_cross_client_update_queue_add_tracks( 990 test_app: FastAPI, db_session: AsyncSession, second_user: str 991) -> None: 992 """test that a non-host participant can extend the queue via update_queue.""" 993 from backend._internal import require_auth 994 995 async with AsyncClient( 996 transport=ASGITransport(app=test_app), base_url="http://test" 997 ) as client: 998 create_response = await client.post("/jams/", json={"track_ids": ["t1"]}) 999 code = create_response.json()["code"] 1000 1001 async def mock_joiner_auth() -> Session: 1002 return MockSession(did=second_user) 1003 1004 app.dependency_overrides[require_auth] = mock_joiner_auth 1005 1006 async with AsyncClient( 1007 transport=ASGITransport(app=test_app), base_url="http://test" 1008 ) as client: 1009 await client.post(f"/jams/{code}/join") 1010 1011 r = await _update_queue(client, code, ["t1", "t2", "t3"], 0) 1012 assert r["state"]["track_ids"] == ["t1", "t2", "t3"] 1013 assert r["tracks_changed"] is True 1014 1015 1016async def test_output_preserved_across_cross_client_commands( 1017 test_app: FastAPI, db_session: AsyncSession, second_user: str 1018) -> None: 1019 """test that output_client_id is preserved when non-output sends commands.""" 1020 from starlette.websockets import WebSocket 1021 1022 from backend._internal import require_auth 1023 1024 # create jam as host 1025 async with AsyncClient( 1026 transport=ASGITransport(app=test_app), base_url="http://test" 1027 ) as client: 1028 create_response = await client.post( 1029 "/jams/", 1030 json={"track_ids": ["t1", "t2", "t3"], "is_playing": True}, 1031 ) 1032 code = create_response.json()["code"] 1033 jam_id = create_response.json()["id"] 1034 1035 # set up host as output device 1036 ws_host = AsyncMock(spec=WebSocket) 1037 await jam_service.connect_ws(jam_id, ws_host, "did:test:host") 1038 jam_service._ws_client_ids[ws_host] = "host-client-id" 1039 1040 async with AsyncClient( 1041 transport=ASGITransport(app=test_app), base_url="http://test" 1042 ) as client: 1043 set_output_response = await client.post( 1044 f"/jams/{code}/command", 1045 json={"type": "set_output", "client_id": "host-client-id"}, 1046 ) 1047 assert set_output_response.status_code == 200 1048 assert ( 1049 set_output_response.json()["state"]["output_client_id"] == "host-client-id" 1050 ) 1051 1052 # joiner joins and sends update_queue — output_client_id should be preserved 1053 async def mock_joiner_auth() -> Session: 1054 return MockSession(did=second_user) 1055 1056 app.dependency_overrides[require_auth] = mock_joiner_auth 1057 1058 async with AsyncClient( 1059 transport=ASGITransport(app=test_app), base_url="http://test" 1060 ) as client: 1061 await client.post(f"/jams/{code}/join") 1062 1063 r = await _update_queue(client, code, ["t1", "t2", "t3"], 1) 1064 state = r["state"] 1065 assert state["current_index"] == 1 1066 assert state["output_client_id"] == "host-client-id" 1067 assert state["output_did"] == "did:test:host" 1068 assert state["is_playing"] is True 1069 1070 # cleanup 1071 await jam_service.disconnect_ws(jam_id, ws_host) 1072 1073 1074# ── update_queue edge case tests ────────────────────────────────── 1075 1076 1077async def test_update_queue_resets_progress_on_track_change( 1078 test_app: FastAPI, db_session: AsyncSession 1079) -> None: 1080 """test that progress resets to 0 when current track changes via update_queue.""" 1081 async with AsyncClient( 1082 transport=ASGITransport(app=test_app), base_url="http://test" 1083 ) as client: 1084 create_response = await client.post("/jams/", json={"track_ids": ["t1", "t2"]}) 1085 code = create_response.json()["code"] 1086 await client.post( 1087 f"/jams/{code}/command", json={"type": "seek", "position_ms": 30000} 1088 ) 1089 r = await _update_queue(client, code, ["t1", "t2"], 1) 1090 1091 assert r["state"]["current_track_id"] == "t2" 1092 assert r["state"]["progress_ms"] == 0 1093 1094 1095async def test_update_queue_preserves_progress_on_same_track( 1096 test_app: FastAPI, db_session: AsyncSession 1097) -> None: 1098 """test that progress is preserved when current track stays the same.""" 1099 async with AsyncClient( 1100 transport=ASGITransport(app=test_app), base_url="http://test" 1101 ) as client: 1102 create_response = await client.post("/jams/", json={"track_ids": ["t1", "t2"]}) 1103 code = create_response.json()["code"] 1104 await client.post( 1105 f"/jams/{code}/command", json={"type": "seek", "position_ms": 30000} 1106 ) 1107 r = await _update_queue(client, code, ["t1", "t2", "t3"], 0) 1108 1109 assert r["state"]["current_track_id"] == "t1" 1110 assert r["state"]["progress_ms"] == 30000 1111 1112 1113async def test_update_queue_clamps_index( 1114 test_app: FastAPI, db_session: AsyncSession 1115) -> None: 1116 """test that out-of-bounds current_index is clamped.""" 1117 tracks = ["t1", "t2", "t3"] 1118 async with AsyncClient( 1119 transport=ASGITransport(app=test_app), base_url="http://test" 1120 ) as client: 1121 create_response = await client.post("/jams/", json={"track_ids": tracks}) 1122 code = create_response.json()["code"] 1123 1124 r = await _update_queue(client, code, tracks, 99) 1125 assert r["state"]["current_index"] == 2 1126 assert r["state"]["current_track_id"] == "t3" 1127 1128 r = await _update_queue(client, code, tracks, -5) 1129 assert r["state"]["current_index"] == 0 1130 assert r["state"]["current_track_id"] == "t1" 1131 1132 r = await _update_queue(client, code, [], 0) 1133 assert r["state"]["current_index"] == 0 1134 assert r["state"]["current_track_id"] is None 1135 1136 1137# ── output mode tests ────────────────────────────────────────────── 1138 1139 1140async def test_set_mode_host_only( 1141 test_app: FastAPI, db_session: AsyncSession, second_user: str 1142) -> None: 1143 """test that only the host can set output mode.""" 1144 from backend._internal import require_auth 1145 1146 # create jam as host 1147 async with AsyncClient( 1148 transport=ASGITransport(app=test_app), base_url="http://test" 1149 ) as client: 1150 create_response = await client.post("/jams/", json={"track_ids": ["t1"]}) 1151 code = create_response.json()["code"] 1152 1153 # switch to second user 1154 async def mock_joiner_auth() -> Session: 1155 return MockSession(did=second_user) 1156 1157 app.dependency_overrides[require_auth] = mock_joiner_auth 1158 1159 async with AsyncClient( 1160 transport=ASGITransport(app=test_app), base_url="http://test" 1161 ) as client: 1162 await client.post(f"/jams/{code}/join") 1163 1164 # non-host tries to set mode — should fail 1165 response = await client.post( 1166 f"/jams/{code}/command", 1167 json={"type": "set_mode", "mode": "everyone"}, 1168 ) 1169 1170 assert response.status_code == 400 1171 1172 1173async def test_set_mode_everyone(test_app: FastAPI, db_session: AsyncSession) -> None: 1174 """test that host can set mode to everyone and output fields are cleared.""" 1175 from starlette.websockets import WebSocket 1176 1177 async with AsyncClient( 1178 transport=ASGITransport(app=test_app), base_url="http://test" 1179 ) as client: 1180 create_response = await client.post("/jams/", json={"track_ids": ["t1"]}) 1181 code = create_response.json()["code"] 1182 jam_id = create_response.json()["id"] 1183 1184 # set up host as output device first 1185 ws = AsyncMock(spec=WebSocket) 1186 await jam_service.connect_ws(jam_id, ws, "did:test:host") 1187 jam_service._ws_client_ids[ws] = "host-client" 1188 1189 async with AsyncClient( 1190 transport=ASGITransport(app=test_app), base_url="http://test" 1191 ) as client: 1192 await client.post( 1193 f"/jams/{code}/command", 1194 json={"type": "set_output", "client_id": "host-client"}, 1195 ) 1196 1197 # switch to everyone mode 1198 response = await client.post( 1199 f"/jams/{code}/command", 1200 json={"type": "set_mode", "mode": "everyone"}, 1201 ) 1202 1203 assert response.status_code == 200 1204 state = response.json()["state"] 1205 assert state["output_mode"] == "everyone" 1206 assert state["output_client_id"] is None 1207 assert state["output_did"] is None 1208 1209 await jam_service.disconnect_ws(jam_id, ws) 1210 1211 1212async def test_set_mode_back_to_one_speaker( 1213 test_app: FastAPI, db_session: AsyncSession 1214) -> None: 1215 """test round-trip: one_speaker → everyone → one_speaker.""" 1216 async with AsyncClient( 1217 transport=ASGITransport(app=test_app), base_url="http://test" 1218 ) as client: 1219 create_response = await client.post("/jams/", json={"track_ids": ["t1"]}) 1220 code = create_response.json()["code"] 1221 assert create_response.json()["state"]["output_mode"] == "one_speaker" 1222 1223 # switch to everyone 1224 r1 = await client.post( 1225 f"/jams/{code}/command", 1226 json={"type": "set_mode", "mode": "everyone"}, 1227 ) 1228 assert r1.json()["state"]["output_mode"] == "everyone" 1229 1230 # switch back to one_speaker 1231 r2 = await client.post( 1232 f"/jams/{code}/command", 1233 json={"type": "set_mode", "mode": "one_speaker"}, 1234 ) 1235 assert r2.json()["state"]["output_mode"] == "one_speaker" 1236 1237 1238async def test_everyone_mode_skips_output_disconnect_pause( 1239 test_app: FastAPI, db_session: AsyncSession 1240) -> None: 1241 """test that in everyone mode, disconnecting a WS does NOT pause playback.""" 1242 from starlette.websockets import WebSocket 1243 1244 async with AsyncClient( 1245 transport=ASGITransport(app=test_app), base_url="http://test" 1246 ) as client: 1247 create_response = await client.post( 1248 "/jams/", json={"track_ids": ["t1"], "is_playing": True} 1249 ) 1250 code = create_response.json()["code"] 1251 jam_id = create_response.json()["id"] 1252 1253 # switch to everyone mode 1254 await client.post( 1255 f"/jams/{code}/command", 1256 json={"type": "set_mode", "mode": "everyone"}, 1257 ) 1258 1259 # connect a WS and then disconnect it 1260 ws = AsyncMock(spec=WebSocket) 1261 await jam_service.connect_ws(jam_id, ws, "did:test:host") 1262 jam_service._ws_client_ids[ws] = "host-client" 1263 await jam_service.disconnect_ws(jam_id, ws) 1264 1265 # jam should still be playing 1266 async with AsyncClient( 1267 transport=ASGITransport(app=test_app), base_url="http://test" 1268 ) as client: 1269 get_response = await client.get(f"/jams/{code}") 1270 state = get_response.json()["state"] 1271 assert state["is_playing"] is True 1272 assert state["output_mode"] == "everyone" 1273 1274 1275async def test_everyone_mode_skips_auto_output( 1276 test_app: FastAPI, db_session: AsyncSession 1277) -> None: 1278 """test that in everyone mode, host sync does NOT auto-assign output.""" 1279 from starlette.websockets import WebSocket 1280 1281 async with AsyncClient( 1282 transport=ASGITransport(app=test_app), base_url="http://test" 1283 ) as client: 1284 create_response = await client.post("/jams/", json={"track_ids": ["t1"]}) 1285 code = create_response.json()["code"] 1286 jam_id = create_response.json()["id"] 1287 1288 # switch to everyone mode 1289 await client.post( 1290 f"/jams/{code}/command", 1291 json={"type": "set_mode", "mode": "everyone"}, 1292 ) 1293 1294 # host connects and sends sync 1295 ws = AsyncMock(spec=WebSocket) 1296 await jam_service.connect_ws(jam_id, ws, "did:test:host") 1297 await jam_service._handle_sync( 1298 jam_id, "did:test:host", {"client_id": "host-abc", "last_id": None}, ws 1299 ) 1300 1301 # output_client_id should remain None (not auto-set) 1302 async with AsyncClient( 1303 transport=ASGITransport(app=test_app), base_url="http://test" 1304 ) as client: 1305 get_response = await client.get(f"/jams/{code}") 1306 state = get_response.json()["state"] 1307 assert state["output_client_id"] is None 1308 assert state["output_mode"] == "everyone" 1309 1310 await jam_service.disconnect_ws(jam_id, ws) 1311 1312 1313async def test_set_mode_invalid_value( 1314 test_app: FastAPI, db_session: AsyncSession 1315) -> None: 1316 """test that an invalid mode value is rejected.""" 1317 async with AsyncClient( 1318 transport=ASGITransport(app=test_app), base_url="http://test" 1319 ) as client: 1320 create_response = await client.post("/jams/", json={"track_ids": ["t1"]}) 1321 code = create_response.json()["code"] 1322 1323 response = await client.post( 1324 f"/jams/{code}/command", 1325 json={"type": "set_mode", "mode": "invalid_mode"}, 1326 ) 1327 1328 assert response.status_code == 400 1329 1330 1331# ── output fallback tests ────────────────────────────────────────── 1332 1333 1334async def test_output_fallback_on_disconnect( 1335 test_app: FastAPI, db_session: AsyncSession, second_user: str 1336) -> None: 1337 """test that output reassigns to remaining client when output WS disconnects.""" 1338 from starlette.websockets import WebSocket 1339 1340 from backend._internal import require_auth 1341 1342 async with AsyncClient( 1343 transport=ASGITransport(app=test_app), base_url="http://test" 1344 ) as client: 1345 create_response = await client.post( 1346 "/jams/", json={"track_ids": ["t1"], "is_playing": True} 1347 ) 1348 code = create_response.json()["code"] 1349 jam_id = create_response.json()["id"] 1350 1351 # second user joins 1352 async def mock_joiner_auth() -> Session: 1353 return MockSession(did=second_user) 1354 1355 app.dependency_overrides[require_auth] = mock_joiner_auth 1356 1357 async with AsyncClient( 1358 transport=ASGITransport(app=test_app), base_url="http://test" 1359 ) as client: 1360 await client.post(f"/jams/{code}/join") 1361 1362 # restore host auth 1363 async def mock_host_auth() -> Session: 1364 return MockSession(did="did:test:host") 1365 1366 app.dependency_overrides[require_auth] = mock_host_auth 1367 1368 # connect both WSs 1369 ws_host = AsyncMock(spec=WebSocket) 1370 ws_joiner = AsyncMock(spec=WebSocket) 1371 await jam_service.connect_ws(jam_id, ws_host, "did:test:host") 1372 jam_service._ws_client_ids[ws_host] = "host-client" 1373 await jam_service.connect_ws(jam_id, ws_joiner, second_user) 1374 jam_service._ws_client_ids[ws_joiner] = "joiner-client" 1375 1376 # set joiner as output device 1377 app.dependency_overrides[require_auth] = mock_joiner_auth 1378 async with AsyncClient( 1379 transport=ASGITransport(app=test_app), base_url="http://test" 1380 ) as client: 1381 await client.post( 1382 f"/jams/{code}/command", 1383 json={"type": "set_output", "client_id": "joiner-client"}, 1384 ) 1385 1386 # disconnect the output device (joiner) 1387 await jam_service.disconnect_ws(jam_id, ws_joiner) 1388 1389 # output should have fallen back to host, playback should NOT be paused 1390 app.dependency_overrides[require_auth] = mock_host_auth 1391 async with AsyncClient( 1392 transport=ASGITransport(app=test_app), base_url="http://test" 1393 ) as client: 1394 get_response = await client.get(f"/jams/{code}") 1395 state = get_response.json()["state"] 1396 assert state["output_client_id"] == "host-client" 1397 assert state["output_did"] == "did:test:host" 1398 assert state["is_playing"] is True 1399 1400 await jam_service.disconnect_ws(jam_id, ws_host) 1401 1402 1403async def test_output_fallback_prefers_host( 1404 test_app: FastAPI, db_session: AsyncSession, second_user: str 1405) -> None: 1406 """test that fallback prefers host over other connected clients.""" 1407 from starlette.websockets import WebSocket 1408 1409 from backend._internal import require_auth 1410 1411 # create a third user 1412 third_did = "did:test:third" 1413 third_artist = Artist( 1414 did=third_did, 1415 handle="test.third", 1416 display_name="Test Third", 1417 ) 1418 db_session.add(third_artist) 1419 await db_session.commit() 1420 1421 async with AsyncClient( 1422 transport=ASGITransport(app=test_app), base_url="http://test" 1423 ) as client: 1424 create_response = await client.post( 1425 "/jams/", json={"track_ids": ["t1"], "is_playing": True} 1426 ) 1427 code = create_response.json()["code"] 1428 jam_id = create_response.json()["id"] 1429 1430 # join second and third users 1431 for did in [second_user, third_did]: 1432 1433 async def _auth(did: str = did) -> Session: 1434 return MockSession(did=did) 1435 1436 app.dependency_overrides[require_auth] = _auth 1437 async with AsyncClient( 1438 transport=ASGITransport(app=test_app), base_url="http://test" 1439 ) as client: 1440 await client.post(f"/jams/{code}/join") 1441 1442 # connect all three WSs 1443 ws_host = AsyncMock(spec=WebSocket) 1444 ws_joiner = AsyncMock(spec=WebSocket) 1445 ws_third = AsyncMock(spec=WebSocket) 1446 await jam_service.connect_ws(jam_id, ws_host, "did:test:host") 1447 jam_service._ws_client_ids[ws_host] = "host-client" 1448 await jam_service.connect_ws(jam_id, ws_joiner, second_user) 1449 jam_service._ws_client_ids[ws_joiner] = "joiner-client" 1450 await jam_service.connect_ws(jam_id, ws_third, third_did) 1451 jam_service._ws_client_ids[ws_third] = "third-client" 1452 1453 # set third user as output 1454 async def mock_third_auth() -> Session: 1455 return MockSession(did=third_did) 1456 1457 app.dependency_overrides[require_auth] = mock_third_auth 1458 async with AsyncClient( 1459 transport=ASGITransport(app=test_app), base_url="http://test" 1460 ) as client: 1461 await client.post( 1462 f"/jams/{code}/command", 1463 json={"type": "set_output", "client_id": "third-client"}, 1464 ) 1465 1466 # disconnect the output (third user) 1467 await jam_service.disconnect_ws(jam_id, ws_third) 1468 1469 # fallback should prefer host over joiner 1470 async def mock_host_auth() -> Session: 1471 return MockSession(did="did:test:host") 1472 1473 app.dependency_overrides[require_auth] = mock_host_auth 1474 async with AsyncClient( 1475 transport=ASGITransport(app=test_app), base_url="http://test" 1476 ) as client: 1477 get_response = await client.get(f"/jams/{code}") 1478 state = get_response.json()["state"] 1479 assert state["output_client_id"] == "host-client" 1480 assert state["output_did"] == "did:test:host" 1481 assert state["is_playing"] is True 1482 1483 await jam_service.disconnect_ws(jam_id, ws_host) 1484 await jam_service.disconnect_ws(jam_id, ws_joiner) 1485 1486 1487async def test_non_host_auto_output_on_sync( 1488 test_app: FastAPI, db_session: AsyncSession, second_user: str 1489) -> None: 1490 """test that a non-host gets auto-assigned as output when no output is set.""" 1491 from starlette.websockets import WebSocket 1492 1493 from backend._internal import require_auth 1494 1495 async with AsyncClient( 1496 transport=ASGITransport(app=test_app), base_url="http://test" 1497 ) as client: 1498 create_response = await client.post("/jams/", json={"track_ids": ["t1"]}) 1499 code = create_response.json()["code"] 1500 jam_id = create_response.json()["id"] 1501 assert create_response.json()["state"]["output_client_id"] is None 1502 1503 # join as second user 1504 async def mock_joiner_auth() -> Session: 1505 return MockSession(did=second_user) 1506 1507 app.dependency_overrides[require_auth] = mock_joiner_auth 1508 async with AsyncClient( 1509 transport=ASGITransport(app=test_app), base_url="http://test" 1510 ) as client: 1511 await client.post(f"/jams/{code}/join") 1512 1513 # connect joiner WS and send sync with client_id (no host WS connected) 1514 ws = AsyncMock(spec=WebSocket) 1515 await jam_service.connect_ws(jam_id, ws, second_user) 1516 await jam_service._handle_sync( 1517 jam_id, second_user, {"client_id": "joiner-abc-123", "last_id": None}, ws 1518 ) 1519 1520 # verify non-host was auto-assigned as output 1521 async def mock_host_auth() -> Session: 1522 return MockSession(did="did:test:host") 1523 1524 app.dependency_overrides[require_auth] = mock_host_auth 1525 async with AsyncClient( 1526 transport=ASGITransport(app=test_app), base_url="http://test" 1527 ) as client: 1528 get_response = await client.get(f"/jams/{code}") 1529 state = get_response.json()["state"] 1530 assert state["output_client_id"] == "joiner-abc-123" 1531 assert state["output_did"] == second_user 1532 1533 await jam_service.disconnect_ws(jam_id, ws) 1534 1535 1536async def test_jam_preview_returns_host_info( 1537 test_app: FastAPI, db_session: AsyncSession 1538) -> None: 1539 """test GET /jams/{code}/preview returns host info without auth.""" 1540 # create jam as authenticated user 1541 async with AsyncClient( 1542 transport=ASGITransport(app=test_app), base_url="http://test" 1543 ) as client: 1544 create_response = await client.post("/jams/", json={"name": "preview test"}) 1545 code = create_response.json()["code"] 1546 1547 # clear auth overrides — preview is public 1548 app.dependency_overrides.clear() 1549 1550 async with AsyncClient( 1551 transport=ASGITransport(app=test_app), base_url="http://test" 1552 ) as client: 1553 response = await client.get(f"/jams/{code}/preview") 1554 1555 assert response.status_code == 200 1556 data = response.json() 1557 assert data["code"] == code 1558 assert data["name"] == "preview test" 1559 assert data["is_active"] is True 1560 assert data["host_handle"] == "test.host" 1561 assert data["host_display_name"] == "Test Host" 1562 assert data["participant_count"] >= 1 1563 1564 1565async def test_jam_preview_not_found( 1566 test_app: FastAPI, db_session: AsyncSession 1567) -> None: 1568 """test GET /jams/{code}/preview returns 404 for nonexistent code.""" 1569 app.dependency_overrides.clear() 1570 1571 async with AsyncClient( 1572 transport=ASGITransport(app=test_app), base_url="http://test" 1573 ) as client: 1574 response = await client.get("/jams/nonexist/preview") 1575 1576 assert response.status_code == 404