"""Tests for monitor/server.py FastAPI server with WebSocket protocol. Verifies: - or1-monitor.AC3.5: Event log shows SimEvents with simulation timestamps - or1-monitor.AC3.9: Step tick advances simulation and updates panels - or1-monitor.AC3.10: Step event advances exactly one SimPy event - or1-monitor.AC5.1: Backend shared between REPL and server """ import json from starlette.testclient import TestClient from monitor.server import create_app from monitor.backend import SimulationBackend # Simple valid dfasm for testing SIMPLE_DFASM = """@system pe=2, sm=0 &c1|pe0 <| const, 3 &c2|pe0 <| const, 7 &result|pe0 <| add &c1|pe0 |> &result|pe0:L &c2|pe0 |> &result|pe0:R """ class TestServerInitialization: """Test server creation and initialization.""" def test_create_app_succeeds(self): """Test that create_app() returns a FastAPI instance.""" backend = SimulationBackend() backend.start() try: app = create_app(backend) assert app is not None finally: backend.stop() def test_websocket_endpoint_exists(self): """Test that /ws endpoint is available.""" backend = SimulationBackend() backend.start() try: app = create_app(backend) with TestClient(app) as client: # Should be able to connect to /ws with client.websocket_connect("/ws") as ws: # Connection successful pass finally: backend.stop() class TestWebSocketLoadCommand: """Test load command via WebSocket.""" def test_websocket_load_command(self): """Send load command via WebSocket and receive graph_loaded response.""" backend = SimulationBackend() backend.start() try: app = create_app(backend) with TestClient(app) as client: with client.websocket_connect("/ws") as ws: # Send load command cmd = {"cmd": "load", "source": SIMPLE_DFASM} ws.send_text(json.dumps(cmd)) # Receive response response = ws.receive_json() assert response["type"] == "graph_loaded" assert "graph" in response assert "state" in response assert "nodes" in response["graph"] assert "edges" in response["graph"] finally: backend.stop() def test_websocket_load_invalid_program(self): """Send invalid dfasm and receive error.""" backend = SimulationBackend() backend.start() try: app = create_app(backend) with TestClient(app) as client: with client.websocket_connect("/ws") as ws: cmd = {"cmd": "load", "source": "invalid @#$ dfasm"} ws.send_text(json.dumps(cmd)) response = ws.receive_json() assert response["type"] == "error" assert "message" in response finally: backend.stop() class TestWebSocketStepCommands: """Test step_tick and step_event commands.""" def test_websocket_step_tick(self): """Send step_tick command and verify response structure.""" backend = SimulationBackend() backend.start() try: app = create_app(backend) with TestClient(app) as client: with client.websocket_connect("/ws") as ws: # Load a program first load_cmd = {"cmd": "load", "source": SIMPLE_DFASM} ws.send_text(json.dumps(load_cmd)) response = ws.receive_json() assert response["type"] == "graph_loaded" # Step tick step_cmd = {"cmd": "step_tick"} ws.send_text(json.dumps(step_cmd)) response = ws.receive_json() assert response["type"] in ["monitor_update", "error"] if response["type"] == "monitor_update": assert "sim_time" in response assert "events" in response assert "state" in response finally: backend.stop() def test_websocket_step_event(self): """Send step_event command and verify response.""" backend = SimulationBackend() backend.start() try: app = create_app(backend) with TestClient(app) as client: with client.websocket_connect("/ws") as ws: # Load a program load_cmd = {"cmd": "load", "source": SIMPLE_DFASM} ws.send_text(json.dumps(load_cmd)) response = ws.receive_json() assert response["type"] == "graph_loaded" # Step event step_cmd = {"cmd": "step_event"} ws.send_text(json.dumps(step_cmd)) response = ws.receive_json() assert response["type"] in ["monitor_update", "error"] finally: backend.stop() def test_websocket_run_until(self): """Send run_until command.""" backend = SimulationBackend() backend.start() try: app = create_app(backend) with TestClient(app) as client: with client.websocket_connect("/ws") as ws: # Load a program load_cmd = {"cmd": "load", "source": SIMPLE_DFASM} ws.send_text(json.dumps(load_cmd)) response = ws.receive_json() assert response["type"] == "graph_loaded" # Run until time 5.0 run_cmd = {"cmd": "run_until", "until": 5.0} ws.send_text(json.dumps(run_cmd)) response = ws.receive_json() assert response["type"] in ["monitor_update", "error"] finally: backend.stop() class TestWebSocketTokenInjection: """Test token injection via WebSocket.""" def test_websocket_inject_token(self): """Send inject command for a CM token.""" backend = SimulationBackend() backend.start() try: app = create_app(backend) with TestClient(app) as client: with client.websocket_connect("/ws") as ws: # Load a program load_cmd = {"cmd": "load", "source": SIMPLE_DFASM} ws.send_text(json.dumps(load_cmd)) response = ws.receive_json() assert response["type"] == "graph_loaded" # Inject a token inject_cmd = { "cmd": "inject", "target": 0, "offset": 0, "ctx": 0, "data": 42, } ws.send_text(json.dumps(inject_cmd)) response = ws.receive_json() assert response["type"] in ["monitor_update", "error"] finally: backend.stop() def test_websocket_send_token(self): """Send send command for a CM token (respects backpressure).""" backend = SimulationBackend() backend.start() try: app = create_app(backend) with TestClient(app) as client: with client.websocket_connect("/ws") as ws: # Load a program load_cmd = {"cmd": "load", "source": SIMPLE_DFASM} ws.send_text(json.dumps(load_cmd)) response = ws.receive_json() assert response["type"] == "graph_loaded" # Send a token send_cmd = { "cmd": "send", "target": 0, "offset": 0, "ctx": 0, "data": 7, } ws.send_text(json.dumps(send_cmd)) response = ws.receive_json() assert response["type"] in ["monitor_update", "error"] finally: backend.stop() class TestWebSocketReset: """Test reset command.""" def test_websocket_reset(self): """Send reset command.""" backend = SimulationBackend() backend.start() try: app = create_app(backend) with TestClient(app) as client: with client.websocket_connect("/ws") as ws: # Load a program load_cmd = {"cmd": "load", "source": SIMPLE_DFASM} ws.send_text(json.dumps(load_cmd)) response = ws.receive_json() assert response["type"] == "graph_loaded" # Reset reset_cmd = {"cmd": "reset", "reload": False} ws.send_text(json.dumps(reset_cmd)) response = ws.receive_json() # Reset with reload=False returns a reset type response assert response["type"] == "reset" assert response["sim_time"] == 0.0 finally: backend.stop() def test_websocket_reset_with_reload(self): """Send reset command with reload=True.""" backend = SimulationBackend() backend.start() try: app = create_app(backend) with TestClient(app) as client: with client.websocket_connect("/ws") as ws: # Load a program load_cmd = {"cmd": "load", "source": SIMPLE_DFASM} ws.send_text(json.dumps(load_cmd)) response = ws.receive_json() assert response["type"] == "graph_loaded" # Reset with reload reset_cmd = {"cmd": "reset", "reload": True} ws.send_text(json.dumps(reset_cmd)) response = ws.receive_json() # Should receive graph_loaded again assert response["type"] in ["graph_loaded", "error"] finally: backend.stop() class TestRESTEndpoints: """Test REST API endpoints.""" def test_rest_load_endpoint(self): """Test POST /load endpoint.""" backend = SimulationBackend() backend.start() try: app = create_app(backend) with TestClient(app) as client: response = client.post("/load", json={"source": SIMPLE_DFASM}) assert response.status_code == 200 data = response.json() assert data["type"] == "graph_loaded" finally: backend.stop() def test_rest_reset_endpoint(self): """Test POST /reset endpoint.""" backend = SimulationBackend() backend.start() try: app = create_app(backend) with TestClient(app) as client: # Load first response = client.post("/load", json={"source": SIMPLE_DFASM}) assert response.status_code == 200 # Reset response = client.post("/reset", json={"reload": False}) assert response.status_code == 200 finally: backend.stop() def test_rest_state_endpoint(self): """Test GET /state endpoint.""" backend = SimulationBackend() backend.start() try: app = create_app(backend) with TestClient(app) as client: # Load first response = client.post("/load", json={"source": SIMPLE_DFASM}) assert response.status_code == 200 # Get state response = client.get("/state") assert response.status_code == 200 data = response.json() assert "type" in data finally: backend.stop() class TestBackendSharing: """Test that backend is shared between REPL and server.""" def test_shared_backend_instance(self): """Test that the same backend instance is used for all operations.""" backend = SimulationBackend() backend.start() try: app = create_app(backend) with TestClient(app) as client: # Load via WebSocket with client.websocket_connect("/ws") as ws: load_cmd = {"cmd": "load", "source": SIMPLE_DFASM} ws.send_text(json.dumps(load_cmd)) response = ws.receive_json() assert response["type"] == "graph_loaded" assert "graph" in response assert "state" in response ws_graph = response["graph"] # Verify the same state is accessible via REST /state endpoint state_response = client.get("/state") assert state_response.status_code == 200 state_data = state_response.json() # Should have the loaded graph data assert state_data["type"] == "graph_loaded" assert "graph" in state_data assert "state" in state_data # Verify both interfaces have the same graph assert state_data["graph"]["nodes"] == ws_graph["nodes"] finally: backend.stop() class TestConnectionManager: """Test WebSocket connection management.""" def test_multiple_websocket_connections(self): """Test multiple simultaneous WebSocket connections.""" backend = SimulationBackend() backend.start() try: app = create_app(backend) with TestClient(app) as client: # Load via first connection with client.websocket_connect("/ws") as ws1: load_cmd = {"cmd": "load", "source": SIMPLE_DFASM} ws1.send_text(json.dumps(load_cmd)) response1 = ws1.receive_json() assert response1["type"] == "graph_loaded" # Connect second client (should receive current state on connect) with client.websocket_connect("/ws") as ws2: # Second client should receive current graph_loaded state on connect response2 = ws2.receive_json() assert response2["type"] == "graph_loaded" assert "graph" in response2 assert "state" in response2 finally: backend.stop() def test_websocket_disconnect_handling(self): """Test that disconnecting WebSocket is handled gracefully.""" backend = SimulationBackend() backend.start() try: app = create_app(backend) with TestClient(app) as client: with client.websocket_connect("/ws") as ws: load_cmd = {"cmd": "load", "source": SIMPLE_DFASM} ws.send_text(json.dumps(load_cmd)) response = ws.receive_json() assert response["type"] == "graph_loaded" # Disconnect (context manager exits) # Should not crash when sending to disconnected client finally: backend.stop() class TestEventSerialization: """Test that events are properly serialized in responses.""" def test_monitor_update_contains_events(self): """Test that monitor_update responses contain events.""" backend = SimulationBackend() backend.start() try: app = create_app(backend) with TestClient(app) as client: with client.websocket_connect("/ws") as ws: # Load load_cmd = {"cmd": "load", "source": SIMPLE_DFASM} ws.send_text(json.dumps(load_cmd)) ws.receive_json() # Step tick step_cmd = {"cmd": "step_tick"} ws.send_text(json.dumps(step_cmd)) response = ws.receive_json() if response["type"] == "monitor_update": # Check that events field exists assert "events" in response assert isinstance(response["events"], list) # Events might be empty for this simple program # but the field should be present finally: backend.stop() def test_graph_loaded_contains_state(self): """Test that graph_loaded responses contain state info.""" backend = SimulationBackend() backend.start() try: app = create_app(backend) with TestClient(app) as client: with client.websocket_connect("/ws") as ws: load_cmd = {"cmd": "load", "source": SIMPLE_DFASM} ws.send_text(json.dumps(load_cmd)) response = ws.receive_json() assert response["type"] == "graph_loaded" assert "state" in response assert "pes" in response["state"] assert "sms" in response["state"] finally: backend.stop()