OR-1 dataflow CPU sketch
at ba08ffded3d3b2badb2a7e22816feafaacea5ded 489 lines 18 kB view raw
1"""Tests for monitor/server.py FastAPI server with WebSocket protocol. 2 3Verifies: 4- or1-monitor.AC3.5: Event log shows SimEvents with simulation timestamps 5- or1-monitor.AC3.9: Step tick advances simulation and updates panels 6- or1-monitor.AC3.10: Step event advances exactly one SimPy event 7- or1-monitor.AC5.1: Backend shared between REPL and server 8""" 9 10import json 11from starlette.testclient import TestClient 12 13from monitor.server import create_app 14from monitor.backend import SimulationBackend 15 16 17# Simple valid dfasm for testing 18SIMPLE_DFASM = """@system pe=2, sm=0 19&c1|pe0 <| const, 3 20&c2|pe0 <| const, 7 21&result|pe0 <| add 22&c1|pe0 |> &result|pe0:L 23&c2|pe0 |> &result|pe0:R 24""" 25 26 27class TestServerInitialization: 28 """Test server creation and initialization.""" 29 30 def test_create_app_succeeds(self): 31 """Test that create_app() returns a FastAPI instance.""" 32 backend = SimulationBackend() 33 backend.start() 34 try: 35 app = create_app(backend) 36 assert app is not None 37 finally: 38 backend.stop() 39 40 def test_websocket_endpoint_exists(self): 41 """Test that /ws endpoint is available.""" 42 backend = SimulationBackend() 43 backend.start() 44 try: 45 app = create_app(backend) 46 with TestClient(app) as client: 47 # Should be able to connect to /ws 48 with client.websocket_connect("/ws") as ws: 49 # Connection successful 50 pass 51 finally: 52 backend.stop() 53 54 55class TestWebSocketLoadCommand: 56 """Test load command via WebSocket.""" 57 58 def test_websocket_load_command(self): 59 """Send load command via WebSocket and receive graph_loaded response.""" 60 backend = SimulationBackend() 61 backend.start() 62 try: 63 app = create_app(backend) 64 with TestClient(app) as client: 65 with client.websocket_connect("/ws") as ws: 66 # Send load command 67 cmd = {"cmd": "load", "source": SIMPLE_DFASM} 68 ws.send_text(json.dumps(cmd)) 69 70 # Receive response 71 response = ws.receive_json() 72 assert response["type"] == "graph_loaded" 73 assert "graph" in response 74 assert "state" in response 75 assert "nodes" in response["graph"] 76 assert "edges" in response["graph"] 77 78 finally: 79 backend.stop() 80 81 def test_websocket_load_invalid_program(self): 82 """Send invalid dfasm and receive error.""" 83 backend = SimulationBackend() 84 backend.start() 85 try: 86 app = create_app(backend) 87 with TestClient(app) as client: 88 with client.websocket_connect("/ws") as ws: 89 cmd = {"cmd": "load", "source": "invalid @#$ dfasm"} 90 ws.send_text(json.dumps(cmd)) 91 92 response = ws.receive_json() 93 assert response["type"] == "error" 94 assert "message" in response 95 96 finally: 97 backend.stop() 98 99 100class TestWebSocketStepCommands: 101 """Test step_tick and step_event commands.""" 102 103 def test_websocket_step_tick(self): 104 """Send step_tick command and verify response structure.""" 105 backend = SimulationBackend() 106 backend.start() 107 try: 108 app = create_app(backend) 109 with TestClient(app) as client: 110 with client.websocket_connect("/ws") as ws: 111 # Load a program first 112 load_cmd = {"cmd": "load", "source": SIMPLE_DFASM} 113 ws.send_text(json.dumps(load_cmd)) 114 response = ws.receive_json() 115 assert response["type"] == "graph_loaded" 116 117 # Step tick 118 step_cmd = {"cmd": "step_tick"} 119 ws.send_text(json.dumps(step_cmd)) 120 121 response = ws.receive_json() 122 assert response["type"] in ["monitor_update", "error"] 123 if response["type"] == "monitor_update": 124 assert "sim_time" in response 125 assert "events" in response 126 assert "state" in response 127 128 finally: 129 backend.stop() 130 131 def test_websocket_step_event(self): 132 """Send step_event command and verify response.""" 133 backend = SimulationBackend() 134 backend.start() 135 try: 136 app = create_app(backend) 137 with TestClient(app) as client: 138 with client.websocket_connect("/ws") as ws: 139 # Load a program 140 load_cmd = {"cmd": "load", "source": SIMPLE_DFASM} 141 ws.send_text(json.dumps(load_cmd)) 142 response = ws.receive_json() 143 assert response["type"] == "graph_loaded" 144 145 # Step event 146 step_cmd = {"cmd": "step_event"} 147 ws.send_text(json.dumps(step_cmd)) 148 149 response = ws.receive_json() 150 assert response["type"] in ["monitor_update", "error"] 151 152 finally: 153 backend.stop() 154 155 def test_websocket_run_until(self): 156 """Send run_until command.""" 157 backend = SimulationBackend() 158 backend.start() 159 try: 160 app = create_app(backend) 161 with TestClient(app) as client: 162 with client.websocket_connect("/ws") as ws: 163 # Load a program 164 load_cmd = {"cmd": "load", "source": SIMPLE_DFASM} 165 ws.send_text(json.dumps(load_cmd)) 166 response = ws.receive_json() 167 assert response["type"] == "graph_loaded" 168 169 # Run until time 5.0 170 run_cmd = {"cmd": "run_until", "until": 5.0} 171 ws.send_text(json.dumps(run_cmd)) 172 173 response = ws.receive_json() 174 assert response["type"] in ["monitor_update", "error"] 175 176 finally: 177 backend.stop() 178 179 180class TestWebSocketTokenInjection: 181 """Test token injection via WebSocket.""" 182 183 def test_websocket_inject_token(self): 184 """Send inject command for a CM token.""" 185 backend = SimulationBackend() 186 backend.start() 187 try: 188 app = create_app(backend) 189 with TestClient(app) as client: 190 with client.websocket_connect("/ws") as ws: 191 # Load a program 192 load_cmd = {"cmd": "load", "source": SIMPLE_DFASM} 193 ws.send_text(json.dumps(load_cmd)) 194 response = ws.receive_json() 195 assert response["type"] == "graph_loaded" 196 197 # Inject a token 198 inject_cmd = { 199 "cmd": "inject", 200 "target": 0, 201 "offset": 0, 202 "ctx": 0, 203 "data": 42, 204 } 205 ws.send_text(json.dumps(inject_cmd)) 206 207 response = ws.receive_json() 208 assert response["type"] in ["monitor_update", "error"] 209 210 finally: 211 backend.stop() 212 213 def test_websocket_send_token(self): 214 """Send send command for a CM token (respects backpressure).""" 215 backend = SimulationBackend() 216 backend.start() 217 try: 218 app = create_app(backend) 219 with TestClient(app) as client: 220 with client.websocket_connect("/ws") as ws: 221 # Load a program 222 load_cmd = {"cmd": "load", "source": SIMPLE_DFASM} 223 ws.send_text(json.dumps(load_cmd)) 224 response = ws.receive_json() 225 assert response["type"] == "graph_loaded" 226 227 # Send a token 228 send_cmd = { 229 "cmd": "send", 230 "target": 0, 231 "offset": 0, 232 "ctx": 0, 233 "data": 7, 234 } 235 ws.send_text(json.dumps(send_cmd)) 236 237 response = ws.receive_json() 238 assert response["type"] in ["monitor_update", "error"] 239 240 finally: 241 backend.stop() 242 243 244class TestWebSocketReset: 245 """Test reset command.""" 246 247 def test_websocket_reset(self): 248 """Send reset command.""" 249 backend = SimulationBackend() 250 backend.start() 251 try: 252 app = create_app(backend) 253 with TestClient(app) as client: 254 with client.websocket_connect("/ws") as ws: 255 # Load a program 256 load_cmd = {"cmd": "load", "source": SIMPLE_DFASM} 257 ws.send_text(json.dumps(load_cmd)) 258 response = ws.receive_json() 259 assert response["type"] == "graph_loaded" 260 261 # Reset 262 reset_cmd = {"cmd": "reset", "reload": False} 263 ws.send_text(json.dumps(reset_cmd)) 264 265 response = ws.receive_json() 266 # Reset with reload=False returns a reset type response 267 assert response["type"] == "reset" 268 assert response["sim_time"] == 0.0 269 270 finally: 271 backend.stop() 272 273 def test_websocket_reset_with_reload(self): 274 """Send reset command with reload=True.""" 275 backend = SimulationBackend() 276 backend.start() 277 try: 278 app = create_app(backend) 279 with TestClient(app) as client: 280 with client.websocket_connect("/ws") as ws: 281 # Load a program 282 load_cmd = {"cmd": "load", "source": SIMPLE_DFASM} 283 ws.send_text(json.dumps(load_cmd)) 284 response = ws.receive_json() 285 assert response["type"] == "graph_loaded" 286 287 # Reset with reload 288 reset_cmd = {"cmd": "reset", "reload": True} 289 ws.send_text(json.dumps(reset_cmd)) 290 291 response = ws.receive_json() 292 # Should receive graph_loaded again 293 assert response["type"] in ["graph_loaded", "error"] 294 295 finally: 296 backend.stop() 297 298 299class TestRESTEndpoints: 300 """Test REST API endpoints.""" 301 302 def test_rest_load_endpoint(self): 303 """Test POST /load endpoint.""" 304 backend = SimulationBackend() 305 backend.start() 306 try: 307 app = create_app(backend) 308 with TestClient(app) as client: 309 response = client.post("/load", json={"source": SIMPLE_DFASM}) 310 assert response.status_code == 200 311 data = response.json() 312 assert data["type"] == "graph_loaded" 313 314 finally: 315 backend.stop() 316 317 def test_rest_reset_endpoint(self): 318 """Test POST /reset endpoint.""" 319 backend = SimulationBackend() 320 backend.start() 321 try: 322 app = create_app(backend) 323 with TestClient(app) as client: 324 # Load first 325 response = client.post("/load", json={"source": SIMPLE_DFASM}) 326 assert response.status_code == 200 327 328 # Reset 329 response = client.post("/reset", json={"reload": False}) 330 assert response.status_code == 200 331 332 finally: 333 backend.stop() 334 335 def test_rest_state_endpoint(self): 336 """Test GET /state endpoint.""" 337 backend = SimulationBackend() 338 backend.start() 339 try: 340 app = create_app(backend) 341 with TestClient(app) as client: 342 # Load first 343 response = client.post("/load", json={"source": SIMPLE_DFASM}) 344 assert response.status_code == 200 345 346 # Get state 347 response = client.get("/state") 348 assert response.status_code == 200 349 data = response.json() 350 assert "type" in data 351 352 finally: 353 backend.stop() 354 355 356class TestBackendSharing: 357 """Test that backend is shared between REPL and server.""" 358 359 def test_shared_backend_instance(self): 360 """Test that the same backend instance is used for all operations.""" 361 backend = SimulationBackend() 362 backend.start() 363 try: 364 app = create_app(backend) 365 366 with TestClient(app) as client: 367 # Load via WebSocket 368 with client.websocket_connect("/ws") as ws: 369 load_cmd = {"cmd": "load", "source": SIMPLE_DFASM} 370 ws.send_text(json.dumps(load_cmd)) 371 response = ws.receive_json() 372 assert response["type"] == "graph_loaded" 373 assert "graph" in response 374 assert "state" in response 375 ws_graph = response["graph"] 376 377 # Verify the same state is accessible via REST /state endpoint 378 state_response = client.get("/state") 379 assert state_response.status_code == 200 380 state_data = state_response.json() 381 # Should have the loaded graph data 382 assert state_data["type"] == "graph_loaded" 383 assert "graph" in state_data 384 assert "state" in state_data 385 # Verify both interfaces have the same graph 386 assert state_data["graph"]["nodes"] == ws_graph["nodes"] 387 388 finally: 389 backend.stop() 390 391 392class TestConnectionManager: 393 """Test WebSocket connection management.""" 394 395 def test_multiple_websocket_connections(self): 396 """Test multiple simultaneous WebSocket connections.""" 397 backend = SimulationBackend() 398 backend.start() 399 try: 400 app = create_app(backend) 401 with TestClient(app) as client: 402 # Load via first connection 403 with client.websocket_connect("/ws") as ws1: 404 load_cmd = {"cmd": "load", "source": SIMPLE_DFASM} 405 ws1.send_text(json.dumps(load_cmd)) 406 response1 = ws1.receive_json() 407 assert response1["type"] == "graph_loaded" 408 409 # Connect second client (should receive current state on connect) 410 with client.websocket_connect("/ws") as ws2: 411 # Second client should receive current graph_loaded state on connect 412 response2 = ws2.receive_json() 413 assert response2["type"] == "graph_loaded" 414 assert "graph" in response2 415 assert "state" in response2 416 417 finally: 418 backend.stop() 419 420 def test_websocket_disconnect_handling(self): 421 """Test that disconnecting WebSocket is handled gracefully.""" 422 backend = SimulationBackend() 423 backend.start() 424 try: 425 app = create_app(backend) 426 with TestClient(app) as client: 427 with client.websocket_connect("/ws") as ws: 428 load_cmd = {"cmd": "load", "source": SIMPLE_DFASM} 429 ws.send_text(json.dumps(load_cmd)) 430 response = ws.receive_json() 431 assert response["type"] == "graph_loaded" 432 433 # Disconnect (context manager exits) 434 # Should not crash when sending to disconnected client 435 436 finally: 437 backend.stop() 438 439 440class TestEventSerialization: 441 """Test that events are properly serialized in responses.""" 442 443 def test_monitor_update_contains_events(self): 444 """Test that monitor_update responses contain events.""" 445 backend = SimulationBackend() 446 backend.start() 447 try: 448 app = create_app(backend) 449 with TestClient(app) as client: 450 with client.websocket_connect("/ws") as ws: 451 # Load 452 load_cmd = {"cmd": "load", "source": SIMPLE_DFASM} 453 ws.send_text(json.dumps(load_cmd)) 454 ws.receive_json() 455 456 # Step tick 457 step_cmd = {"cmd": "step_tick"} 458 ws.send_text(json.dumps(step_cmd)) 459 460 response = ws.receive_json() 461 if response["type"] == "monitor_update": 462 # Check that events field exists 463 assert "events" in response 464 assert isinstance(response["events"], list) 465 # Events might be empty for this simple program 466 # but the field should be present 467 468 finally: 469 backend.stop() 470 471 def test_graph_loaded_contains_state(self): 472 """Test that graph_loaded responses contain state info.""" 473 backend = SimulationBackend() 474 backend.start() 475 try: 476 app = create_app(backend) 477 with TestClient(app) as client: 478 with client.websocket_connect("/ws") as ws: 479 load_cmd = {"cmd": "load", "source": SIMPLE_DFASM} 480 ws.send_text(json.dumps(load_cmd)) 481 482 response = ws.receive_json() 483 assert response["type"] == "graph_loaded" 484 assert "state" in response 485 assert "pes" in response["state"] 486 assert "sms" in response["state"] 487 488 finally: 489 backend.stop()