OR-1 dataflow CPU sketch
1"""Tests for monitor/server.py FastAPI server with WebSocket protocol.
2
3Verifies:
4- or1-monitor.AC3.5: Event log shows SimEvents with simulation timestamps
5- or1-monitor.AC3.9: Step tick advances simulation and updates panels
6- or1-monitor.AC3.10: Step event advances exactly one SimPy event
7- or1-monitor.AC5.1: Backend shared between REPL and server
8"""
9
10import json
11from starlette.testclient import TestClient
12
13from monitor.server import create_app
14from monitor.backend import SimulationBackend
15
16
17# Simple valid dfasm for testing
18SIMPLE_DFASM = """@system pe=2, sm=0
19&c1|pe0 <| const, 3
20&c2|pe0 <| const, 7
21&result|pe0 <| add
22&c1|pe0 |> &result|pe0:L
23&c2|pe0 |> &result|pe0:R
24"""
25
26
27class TestServerInitialization:
28 """Test server creation and initialization."""
29
30 def test_create_app_succeeds(self):
31 """Test that create_app() returns a FastAPI instance."""
32 backend = SimulationBackend()
33 backend.start()
34 try:
35 app = create_app(backend)
36 assert app is not None
37 finally:
38 backend.stop()
39
40 def test_websocket_endpoint_exists(self):
41 """Test that /ws endpoint is available."""
42 backend = SimulationBackend()
43 backend.start()
44 try:
45 app = create_app(backend)
46 with TestClient(app) as client:
47 # Should be able to connect to /ws
48 with client.websocket_connect("/ws") as ws:
49 # Connection successful
50 pass
51 finally:
52 backend.stop()
53
54
55class TestWebSocketLoadCommand:
56 """Test load command via WebSocket."""
57
58 def test_websocket_load_command(self):
59 """Send load command via WebSocket and receive graph_loaded response."""
60 backend = SimulationBackend()
61 backend.start()
62 try:
63 app = create_app(backend)
64 with TestClient(app) as client:
65 with client.websocket_connect("/ws") as ws:
66 # Send load command
67 cmd = {"cmd": "load", "source": SIMPLE_DFASM}
68 ws.send_text(json.dumps(cmd))
69
70 # Receive response
71 response = ws.receive_json()
72 assert response["type"] == "graph_loaded"
73 assert "graph" in response
74 assert "state" in response
75 assert "nodes" in response["graph"]
76 assert "edges" in response["graph"]
77
78 finally:
79 backend.stop()
80
81 def test_websocket_load_invalid_program(self):
82 """Send invalid dfasm and receive error."""
83 backend = SimulationBackend()
84 backend.start()
85 try:
86 app = create_app(backend)
87 with TestClient(app) as client:
88 with client.websocket_connect("/ws") as ws:
89 cmd = {"cmd": "load", "source": "invalid @#$ dfasm"}
90 ws.send_text(json.dumps(cmd))
91
92 response = ws.receive_json()
93 assert response["type"] == "error"
94 assert "message" in response
95
96 finally:
97 backend.stop()
98
99
100class TestWebSocketStepCommands:
101 """Test step_tick and step_event commands."""
102
103 def test_websocket_step_tick(self):
104 """Send step_tick command and verify response structure."""
105 backend = SimulationBackend()
106 backend.start()
107 try:
108 app = create_app(backend)
109 with TestClient(app) as client:
110 with client.websocket_connect("/ws") as ws:
111 # Load a program first
112 load_cmd = {"cmd": "load", "source": SIMPLE_DFASM}
113 ws.send_text(json.dumps(load_cmd))
114 response = ws.receive_json()
115 assert response["type"] == "graph_loaded"
116
117 # Step tick
118 step_cmd = {"cmd": "step_tick"}
119 ws.send_text(json.dumps(step_cmd))
120
121 response = ws.receive_json()
122 assert response["type"] in ["monitor_update", "error"]
123 if response["type"] == "monitor_update":
124 assert "sim_time" in response
125 assert "events" in response
126 assert "state" in response
127
128 finally:
129 backend.stop()
130
131 def test_websocket_step_event(self):
132 """Send step_event command and verify response."""
133 backend = SimulationBackend()
134 backend.start()
135 try:
136 app = create_app(backend)
137 with TestClient(app) as client:
138 with client.websocket_connect("/ws") as ws:
139 # Load a program
140 load_cmd = {"cmd": "load", "source": SIMPLE_DFASM}
141 ws.send_text(json.dumps(load_cmd))
142 response = ws.receive_json()
143 assert response["type"] == "graph_loaded"
144
145 # Step event
146 step_cmd = {"cmd": "step_event"}
147 ws.send_text(json.dumps(step_cmd))
148
149 response = ws.receive_json()
150 assert response["type"] in ["monitor_update", "error"]
151
152 finally:
153 backend.stop()
154
155 def test_websocket_run_until(self):
156 """Send run_until command."""
157 backend = SimulationBackend()
158 backend.start()
159 try:
160 app = create_app(backend)
161 with TestClient(app) as client:
162 with client.websocket_connect("/ws") as ws:
163 # Load a program
164 load_cmd = {"cmd": "load", "source": SIMPLE_DFASM}
165 ws.send_text(json.dumps(load_cmd))
166 response = ws.receive_json()
167 assert response["type"] == "graph_loaded"
168
169 # Run until time 5.0
170 run_cmd = {"cmd": "run_until", "until": 5.0}
171 ws.send_text(json.dumps(run_cmd))
172
173 response = ws.receive_json()
174 assert response["type"] in ["monitor_update", "error"]
175
176 finally:
177 backend.stop()
178
179
180class TestWebSocketTokenInjection:
181 """Test token injection via WebSocket."""
182
183 def test_websocket_inject_token(self):
184 """Send inject command for a CM token."""
185 backend = SimulationBackend()
186 backend.start()
187 try:
188 app = create_app(backend)
189 with TestClient(app) as client:
190 with client.websocket_connect("/ws") as ws:
191 # Load a program
192 load_cmd = {"cmd": "load", "source": SIMPLE_DFASM}
193 ws.send_text(json.dumps(load_cmd))
194 response = ws.receive_json()
195 assert response["type"] == "graph_loaded"
196
197 # Inject a token
198 inject_cmd = {
199 "cmd": "inject",
200 "target": 0,
201 "offset": 0,
202 "ctx": 0,
203 "data": 42,
204 }
205 ws.send_text(json.dumps(inject_cmd))
206
207 response = ws.receive_json()
208 assert response["type"] in ["monitor_update", "error"]
209
210 finally:
211 backend.stop()
212
213 def test_websocket_send_token(self):
214 """Send send command for a CM token (respects backpressure)."""
215 backend = SimulationBackend()
216 backend.start()
217 try:
218 app = create_app(backend)
219 with TestClient(app) as client:
220 with client.websocket_connect("/ws") as ws:
221 # Load a program
222 load_cmd = {"cmd": "load", "source": SIMPLE_DFASM}
223 ws.send_text(json.dumps(load_cmd))
224 response = ws.receive_json()
225 assert response["type"] == "graph_loaded"
226
227 # Send a token
228 send_cmd = {
229 "cmd": "send",
230 "target": 0,
231 "offset": 0,
232 "ctx": 0,
233 "data": 7,
234 }
235 ws.send_text(json.dumps(send_cmd))
236
237 response = ws.receive_json()
238 assert response["type"] in ["monitor_update", "error"]
239
240 finally:
241 backend.stop()
242
243
244class TestWebSocketReset:
245 """Test reset command."""
246
247 def test_websocket_reset(self):
248 """Send reset command."""
249 backend = SimulationBackend()
250 backend.start()
251 try:
252 app = create_app(backend)
253 with TestClient(app) as client:
254 with client.websocket_connect("/ws") as ws:
255 # Load a program
256 load_cmd = {"cmd": "load", "source": SIMPLE_DFASM}
257 ws.send_text(json.dumps(load_cmd))
258 response = ws.receive_json()
259 assert response["type"] == "graph_loaded"
260
261 # Reset
262 reset_cmd = {"cmd": "reset", "reload": False}
263 ws.send_text(json.dumps(reset_cmd))
264
265 response = ws.receive_json()
266 # Reset with reload=False returns a reset type response
267 assert response["type"] == "reset"
268 assert response["sim_time"] == 0.0
269
270 finally:
271 backend.stop()
272
273 def test_websocket_reset_with_reload(self):
274 """Send reset command with reload=True."""
275 backend = SimulationBackend()
276 backend.start()
277 try:
278 app = create_app(backend)
279 with TestClient(app) as client:
280 with client.websocket_connect("/ws") as ws:
281 # Load a program
282 load_cmd = {"cmd": "load", "source": SIMPLE_DFASM}
283 ws.send_text(json.dumps(load_cmd))
284 response = ws.receive_json()
285 assert response["type"] == "graph_loaded"
286
287 # Reset with reload
288 reset_cmd = {"cmd": "reset", "reload": True}
289 ws.send_text(json.dumps(reset_cmd))
290
291 response = ws.receive_json()
292 # Should receive graph_loaded again
293 assert response["type"] in ["graph_loaded", "error"]
294
295 finally:
296 backend.stop()
297
298
299class TestRESTEndpoints:
300 """Test REST API endpoints."""
301
302 def test_rest_load_endpoint(self):
303 """Test POST /load endpoint."""
304 backend = SimulationBackend()
305 backend.start()
306 try:
307 app = create_app(backend)
308 with TestClient(app) as client:
309 response = client.post("/load", json={"source": SIMPLE_DFASM})
310 assert response.status_code == 200
311 data = response.json()
312 assert data["type"] == "graph_loaded"
313
314 finally:
315 backend.stop()
316
317 def test_rest_reset_endpoint(self):
318 """Test POST /reset endpoint."""
319 backend = SimulationBackend()
320 backend.start()
321 try:
322 app = create_app(backend)
323 with TestClient(app) as client:
324 # Load first
325 response = client.post("/load", json={"source": SIMPLE_DFASM})
326 assert response.status_code == 200
327
328 # Reset
329 response = client.post("/reset", json={"reload": False})
330 assert response.status_code == 200
331
332 finally:
333 backend.stop()
334
335 def test_rest_state_endpoint(self):
336 """Test GET /state endpoint."""
337 backend = SimulationBackend()
338 backend.start()
339 try:
340 app = create_app(backend)
341 with TestClient(app) as client:
342 # Load first
343 response = client.post("/load", json={"source": SIMPLE_DFASM})
344 assert response.status_code == 200
345
346 # Get state
347 response = client.get("/state")
348 assert response.status_code == 200
349 data = response.json()
350 assert "type" in data
351
352 finally:
353 backend.stop()
354
355
356class TestBackendSharing:
357 """Test that backend is shared between REPL and server."""
358
359 def test_shared_backend_instance(self):
360 """Test that the same backend instance is used for all operations."""
361 backend = SimulationBackend()
362 backend.start()
363 try:
364 app = create_app(backend)
365
366 with TestClient(app) as client:
367 # Load via WebSocket
368 with client.websocket_connect("/ws") as ws:
369 load_cmd = {"cmd": "load", "source": SIMPLE_DFASM}
370 ws.send_text(json.dumps(load_cmd))
371 response = ws.receive_json()
372 assert response["type"] == "graph_loaded"
373 assert "graph" in response
374 assert "state" in response
375 ws_graph = response["graph"]
376
377 # Verify the same state is accessible via REST /state endpoint
378 state_response = client.get("/state")
379 assert state_response.status_code == 200
380 state_data = state_response.json()
381 # Should have the loaded graph data
382 assert state_data["type"] == "graph_loaded"
383 assert "graph" in state_data
384 assert "state" in state_data
385 # Verify both interfaces have the same graph
386 assert state_data["graph"]["nodes"] == ws_graph["nodes"]
387
388 finally:
389 backend.stop()
390
391
392class TestConnectionManager:
393 """Test WebSocket connection management."""
394
395 def test_multiple_websocket_connections(self):
396 """Test multiple simultaneous WebSocket connections."""
397 backend = SimulationBackend()
398 backend.start()
399 try:
400 app = create_app(backend)
401 with TestClient(app) as client:
402 # Load via first connection
403 with client.websocket_connect("/ws") as ws1:
404 load_cmd = {"cmd": "load", "source": SIMPLE_DFASM}
405 ws1.send_text(json.dumps(load_cmd))
406 response1 = ws1.receive_json()
407 assert response1["type"] == "graph_loaded"
408
409 # Connect second client (should receive current state on connect)
410 with client.websocket_connect("/ws") as ws2:
411 # Second client should receive current graph_loaded state on connect
412 response2 = ws2.receive_json()
413 assert response2["type"] == "graph_loaded"
414 assert "graph" in response2
415 assert "state" in response2
416
417 finally:
418 backend.stop()
419
420 def test_websocket_disconnect_handling(self):
421 """Test that disconnecting WebSocket is handled gracefully."""
422 backend = SimulationBackend()
423 backend.start()
424 try:
425 app = create_app(backend)
426 with TestClient(app) as client:
427 with client.websocket_connect("/ws") as ws:
428 load_cmd = {"cmd": "load", "source": SIMPLE_DFASM}
429 ws.send_text(json.dumps(load_cmd))
430 response = ws.receive_json()
431 assert response["type"] == "graph_loaded"
432
433 # Disconnect (context manager exits)
434 # Should not crash when sending to disconnected client
435
436 finally:
437 backend.stop()
438
439
440class TestEventSerialization:
441 """Test that events are properly serialized in responses."""
442
443 def test_monitor_update_contains_events(self):
444 """Test that monitor_update responses contain events."""
445 backend = SimulationBackend()
446 backend.start()
447 try:
448 app = create_app(backend)
449 with TestClient(app) as client:
450 with client.websocket_connect("/ws") as ws:
451 # Load
452 load_cmd = {"cmd": "load", "source": SIMPLE_DFASM}
453 ws.send_text(json.dumps(load_cmd))
454 ws.receive_json()
455
456 # Step tick
457 step_cmd = {"cmd": "step_tick"}
458 ws.send_text(json.dumps(step_cmd))
459
460 response = ws.receive_json()
461 if response["type"] == "monitor_update":
462 # Check that events field exists
463 assert "events" in response
464 assert isinstance(response["events"], list)
465 # Events might be empty for this simple program
466 # but the field should be present
467
468 finally:
469 backend.stop()
470
471 def test_graph_loaded_contains_state(self):
472 """Test that graph_loaded responses contain state info."""
473 backend = SimulationBackend()
474 backend.start()
475 try:
476 app = create_app(backend)
477 with TestClient(app) as client:
478 with client.websocket_connect("/ws") as ws:
479 load_cmd = {"cmd": "load", "source": SIMPLE_DFASM}
480 ws.send_text(json.dumps(load_cmd))
481
482 response = ws.receive_json()
483 assert response["type"] == "graph_loaded"
484 assert "state" in response
485 assert "pes" in response["state"]
486 assert "sms" in response["state"]
487
488 finally:
489 backend.stop()