OR-1 dataflow CPU sketch
1"""Tests for monitor/backend.py SimulationBackend.
2
3Tests verify:
4- or1-monitor.AC1.1: LoadCmd with valid program assembles, builds topology, injects seeds
5- or1-monitor.AC1.2: LoadCmd wires event callbacks into all PEs and SMs
6- or1-monitor.AC1.3: LoadCmd with invalid program returns ErrorResult without crashing
7- or1-monitor.AC1.4: ResetCmd tears down and leaves ready for new LoadCmd
8- or1-monitor.AC1.5: ResetCmd with reload=True reloads program
9- or1-monitor.AC5.2: StepTickCmd processes all events at current time before returning
10- or1-monitor.AC5.3: StepEventCmd processes exactly one event
11- or1-monitor.AC5.4: RunUntilCmd batches events per tick
12- or1-monitor.AC5.5: StepResult contains both events and snapshot
13- or1-monitor.AC5.6: Stepping when simulation finished returns finished=True
14"""
15
16import pytest
17import simpy
18from threading import Thread
19import time
20
21from monitor.backend import SimulationBackend
22from monitor.commands import (
23 LoadCmd, StepTickCmd, StepEventCmd, RunUntilCmd, InjectCmd, SendCmd,
24 ResetCmd, StopCmd, GraphLoaded, StepResult, ErrorResult
25)
26from monitor.snapshot import StateSnapshot
27from tokens import MonadToken
28
29
30class TestLoadCommand:
31 """Tests for LoadCmd acceptance criteria."""
32
33 def test_ac11_valid_program_returns_graphloaded(self):
34 """AC1.1: LoadCmd with valid dfasm assembles and returns GraphLoaded."""
35 backend = SimulationBackend()
36 source = """\
37@system pe=1, sm=0
38&c|pe0 <| const, 42
39"""
40 result = backend._handle_load(source)
41
42 assert isinstance(result, GraphLoaded)
43 assert result.ir_graph is not None
44 assert result.snapshot is not None
45 assert isinstance(result.snapshot, StateSnapshot)
46
47 def test_ac11_snapshot_has_seed_tokens(self):
48 """AC1.1: Initial snapshot shows injected seed tokens in PE input queue."""
49 backend = SimulationBackend()
50 source = """\
51@system pe=1, sm=0
52&c|pe0 <| const, 42
53"""
54 result = backend._handle_load(source)
55
56 assert isinstance(result, GraphLoaded)
57 snapshot = result.snapshot
58 # Seed token from const node should be in PE 0's input queue
59 assert 0 in snapshot.pes
60 pe_snap = snapshot.pes[0]
61 assert len(pe_snap.input_queue) >= 1 # At least the seed token
62
63 def test_ac12_callbacks_wired_to_pes_and_sms(self):
64 """AC1.2: LoadCmd wires on_event callbacks into all PEs and SMs."""
65 backend = SimulationBackend()
66 source = """\
67@system pe=1, sm=0
68&const_val|pe0 <| const, 1
69&add_op|pe0 <| add
70&const_val|pe0 |> &add_op|pe0:L
71"""
72 result = backend._handle_load(source)
73
74 assert isinstance(result, GraphLoaded)
75 # Run the simulation to capture events (cycle-accurate timing starts events at time 1+)
76 step_result = backend._handle_run_until(100)
77 # If callbacks are wired, events should be captured
78 assert isinstance(step_result, StepResult)
79 assert len(step_result.events) > 0, "Expected events to be collected if callbacks are wired"
80 # Verify specific event types appear (TokenReceived, Matched, Executed)
81 event_types = {type(e).__name__ for e in step_result.events}
82 assert "TokenReceived" in event_types or "Matched" in event_types or "Executed" in event_types, \
83 f"Expected at least one of TokenReceived/Matched/Executed, got: {event_types}"
84
85 def test_ac13_invalid_program_returns_error(self):
86 """AC1.3: LoadCmd with invalid dfasm returns ErrorResult."""
87 backend = SimulationBackend()
88 # Invalid: references undefined label
89 source = """\
90@system pe=1, sm=0
91&a|pe0 <| const, 5
92&a|pe0 |> &undefined|pe0:L
93"""
94 result = backend._handle_load(source)
95
96 assert isinstance(result, ErrorResult)
97 assert result.message # Should have error message
98 assert backend._system is None # Should not have loaded system
99
100 def test_ac13_backend_still_functional_after_error(self):
101 """AC1.3: Backend remains functional after error (can accept new LoadCmd)."""
102 backend = SimulationBackend()
103
104 # First: attempt invalid load
105 invalid_source = """\
106@system pe=1, sm=0
107&a|pe0 <| const, 5
108&a|pe0 |> &undefined|pe0:L
109"""
110 result1 = backend._handle_load(invalid_source)
111 assert isinstance(result1, ErrorResult)
112
113 # Second: load valid program — should succeed
114 valid_source = """\
115@system pe=1, sm=0
116&c|pe0 <| const, 99
117"""
118 result2 = backend._handle_load(valid_source)
119 assert isinstance(result2, GraphLoaded)
120
121
122class TestResetCommand:
123 """Tests for ResetCmd acceptance criteria."""
124
125 def test_ac14_reset_tears_down_system(self):
126 """AC1.4: ResetCmd tears down current simulation."""
127 backend = SimulationBackend()
128 source = """\
129@system pe=1, sm=0
130&c|pe0 <| const, 42
131"""
132 # Load a program
133 backend._handle_load(source)
134 assert backend._system is not None
135
136 # Reset
137 result = backend._handle_reset(reload=False)
138
139 assert backend._system is None
140 assert backend._env is None
141 assert isinstance(result, StepResult)
142
143 def test_ac14_reset_ready_for_new_load(self):
144 """AC1.4: After reset, backend is ready for new LoadCmd."""
145 backend = SimulationBackend()
146 source1 = """\
147@system pe=1, sm=0
148&c|pe0 <| const, 42
149"""
150 # Load first program
151 backend._handle_load(source1)
152
153 # Reset
154 backend._handle_reset(reload=False)
155
156 # Load second program — should succeed
157 source2 = """\
158@system pe=2, sm=0
159&a|pe0 <| const, 1
160&b|pe1 <| pass
161"""
162 result = backend._handle_load(source2)
163 assert isinstance(result, GraphLoaded)
164
165 def test_ac15_reset_with_reload_reloads_program(self):
166 """AC1.5: ResetCmd with reload=True reloads the last program."""
167 backend = SimulationBackend()
168 source = """\
169@system pe=1, sm=0
170&c|pe0 <| const, 42
171"""
172 # Load program
173 result1 = backend._handle_load(source)
174 assert isinstance(result1, GraphLoaded)
175 ir_graph1 = result1.ir_graph
176
177 # Reset with reload
178 result2 = backend._handle_reset(reload=True)
179
180 assert isinstance(result2, GraphLoaded)
181 assert backend._system is not None
182 assert result2.ir_graph is not None
183
184
185class TestStepTickCommand:
186 """Tests for StepTickCmd acceptance criteria."""
187
188 def test_ac52_processes_all_events_at_current_time(self):
189 """AC5.2: StepTickCmd processes all events at current simulation time."""
190 backend = SimulationBackend()
191 source = """\
192@system pe=1, sm=0
193&c1|pe0 <| const, 1
194&c2|pe0 <| const, 2
195"""
196 backend._handle_load(source)
197
198 # Run simulation to capture events (cycle-accurate timing starts events at time 1+)
199 result = backend._handle_run_until(100)
200
201 assert isinstance(result, StepResult)
202 assert result.snapshot is not None
203 # Verify events were collected
204 assert len(result.events) > 0, "Expected events to be processed"
205 # After stepping, peek should advance or reach infinity
206 assert result.finished or result.snapshot.next_time > 0, \
207 f"Expected simulation to progress"
208
209 def test_ac55_result_contains_events_and_snapshot(self):
210 """AC5.5: StepResult contains both events and snapshot."""
211 backend = SimulationBackend()
212 source = """\
213@system pe=1, sm=0
214&c|pe0 <| const, 42
215"""
216 backend._handle_load(source)
217
218 result = backend._handle_step_tick()
219
220 assert isinstance(result, StepResult)
221 assert result.snapshot is not None
222 assert isinstance(result.snapshot, StateSnapshot)
223 assert result.events is not None
224
225 def test_ac56_finished_simulation_returns_finished_true(self):
226 """AC5.6: Stepping when finished returns finished=True without error."""
227 backend = SimulationBackend()
228 source = """\
229@system pe=1, sm=0
230&c|pe0 <| const, 42
231"""
232 backend._handle_load(source)
233
234 # Step until finished
235 while True:
236 result = backend._handle_step_tick()
237 if result.finished:
238 break
239 # Safety check to prevent infinite loop
240 if backend._env.now > 1000:
241 pytest.fail("Simulation did not finish within 1000 time units")
242
243 # Verify finished state
244 assert result.finished is True
245 assert result.snapshot is not None
246 assert backend._env.peek() == float('inf')
247
248
249class TestStepEventCommand:
250 """Tests for StepEventCmd acceptance criteria."""
251
252 def test_ac53_processes_exactly_one_event(self):
253 """AC5.3: StepEventCmd processes exactly one event."""
254 backend = SimulationBackend()
255 source = """\
256@system pe=1, sm=0
257&c1|pe0 <| const, 1
258&c2|pe0 <| const, 2
259&result|pe0 <| add
260&c1|pe0 |> &result|pe0:L
261&c2|pe0 |> &result|pe0:R
262"""
263 backend._handle_load(source)
264
265 # Collect events across multiple steps - at least some steps should have events
266 all_events = []
267 for _ in range(10):
268 result = backend._handle_step_event()
269 assert isinstance(result, StepResult)
270 all_events.extend(result.events)
271 if result.finished:
272 break
273
274 # After stepping multiple times, we should have collected some events
275 assert len(all_events) >= 1, f"Expected at least one event across 10 steps, got {len(all_events)}"
276
277 def test_ac53_repeated_events_make_progress(self):
278 """AC5.3: Multiple StepEventCmd calls process each event separately."""
279 backend = SimulationBackend()
280 source = """\
281@system pe=1, sm=0
282&c1|pe0 <| const, 1
283&c2|pe0 <| const, 2
284&result|pe0 <| add
285&c1|pe0 |> &result|pe0:L
286&c2|pe0 |> &result|pe0:R
287"""
288 backend._handle_load(source)
289
290 # Collect time values after each step
291 times = []
292 event_count = 0
293 for _ in range(10):
294 result = backend._handle_step_event()
295 times.append(backend._env.now)
296 # Track total events processed
297 event_count += len(result.events)
298 if result.finished:
299 break
300
301 # Times should be non-decreasing (verifies events are stepped individually)
302 assert times == sorted(times), f"Times not monotonic: {times}"
303 # Verify that at least some events were processed (not a crash)
304 assert event_count >= 1, f"Expected at least one event across steps, got {event_count}"
305
306
307class TestRunUntilCommand:
308 """Tests for RunUntilCmd acceptance criteria."""
309
310 def test_ac54_batches_events_per_tick(self):
311 """AC5.4: RunUntilCmd batches events per tick."""
312 backend = SimulationBackend()
313 source = """\
314@system pe=1, sm=0
315&c1|pe0 <| const, 1
316&c2|pe0 <| const, 2
317&result|pe0 <| add
318&c1|pe0 |> &result|pe0:L
319&c2|pe0 |> &result|pe0:R
320"""
321 backend._handle_load(source)
322
323 # Run until time 10
324 result = backend._handle_run_until(10.0)
325
326 assert isinstance(result, StepResult)
327 assert result.snapshot is not None
328 # Verify events were collected and sim time <= target or finished
329 assert len(result.events) > 0, "Expected events to be batched"
330 assert result.sim_time <= 10.0 or result.finished, \
331 f"Expected sim_time <= 10.0 or finished, got {result.sim_time} finished={result.finished}"
332
333 def test_ac54_stops_at_target_time(self):
334 """AC5.4: RunUntilCmd stops at or before target time."""
335 backend = SimulationBackend()
336 source = """\
337@system pe=1, sm=0
338&c|pe0 <| const, 42
339"""
340 backend._handle_load(source)
341
342 target = 50.0
343 result = backend._handle_run_until(target)
344
345 # Sim time should be <= target (or finished)
346 assert backend._env.now <= target or result.finished
347
348
349class TestInjectCommand:
350 """Tests for InjectCmd."""
351
352 def test_inject_token_appears_in_snapshot(self):
353 """InjectCmd injects token into correct PE."""
354 backend = SimulationBackend()
355 source = """\
356@system pe=1, sm=0
357&c|pe0 <| const, 42
358"""
359 backend._handle_load(source)
360
361 # Inject a token
362 token = MonadToken(target=0, offset=0, ctx=0, data=99, inline=True)
363 result = backend._handle_inject(token)
364
365 assert isinstance(result, StepResult)
366 # Token should be in PE 0's input queue
367 snapshot = result.snapshot
368 assert 0 in snapshot.pes
369 pe_snap = snapshot.pes[0]
370 assert token in pe_snap.input_queue
371
372
373class TestSendCommand:
374 """Tests for SendCmd."""
375
376 def test_send_token_respects_backpressure(self):
377 """SendCmd sends token via SimPy store.put()."""
378 backend = SimulationBackend()
379 source = """\
380@system pe=1, sm=0
381&c|pe0 <| const, 42
382"""
383 backend._handle_load(source)
384
385 # Send a token (should go through SimPy backpressure mechanism)
386 token = MonadToken(target=0, offset=0, ctx=0, data=77, inline=True)
387 result = backend._handle_send(token)
388
389 assert isinstance(result, StepResult)
390 assert result.snapshot is not None
391
392
393class TestThreadedInterface:
394 """Tests for the threaded interface (start, send_command, stop)."""
395
396 def test_start_stop_threading(self):
397 """Backend threading interface starts and stops cleanly."""
398 backend = SimulationBackend()
399 backend.start()
400
401 # Send a command
402 source = """\
403@system pe=1, sm=0
404&c|pe0 <| const, 42
405"""
406 result = backend.send_command(LoadCmd(source=source), timeout=5.0)
407
408 assert isinstance(result, GraphLoaded)
409
410 # Stop
411 backend.stop()
412
413 def test_send_command_timeout(self):
414 """send_command respects timeout parameter."""
415 backend = SimulationBackend()
416 backend.start()
417
418 source = """\
419@system pe=1, sm=0
420&c|pe0 <| const, 42
421"""
422 # This should succeed within timeout
423 result = backend.send_command(LoadCmd(source=source), timeout=5.0)
424 assert isinstance(result, GraphLoaded)
425
426 backend.stop()
427
428 def test_threaded_step_commands(self):
429 """Multiple step commands work in threaded mode."""
430 backend = SimulationBackend()
431 backend.start()
432
433 source = """\
434@system pe=1, sm=0
435&c|pe0 <| const, 42
436"""
437 backend.send_command(LoadCmd(source=source), timeout=5.0)
438
439 # Step a few times
440 for _ in range(3):
441 result = backend.send_command(StepTickCmd(), timeout=5.0)
442 assert isinstance(result, StepResult)
443 if result.finished:
444 break
445
446 backend.stop()
447
448 def test_error_handling_in_thread(self):
449 """Backend catches and returns errors from thread."""
450 backend = SimulationBackend()
451 backend.start()
452
453 # Send invalid program
454 result = backend.send_command(
455 LoadCmd(source="@system pe=1, sm=0\n&a|pe0 |> &undefined|pe0:L"),
456 timeout=5.0
457 )
458
459 assert isinstance(result, ErrorResult)
460 assert result.message
461
462 backend.stop()
463
464
465class TestSequentialWorkflow:
466 """Integration tests for typical workflows."""
467
468 def test_load_step_reset_reload_workflow(self):
469 """Workflow: Load → Step → Reset with reload → Step again."""
470 backend = SimulationBackend()
471
472 # Load
473 source = """\
474@system pe=1, sm=0
475&c|pe0 <| const, 42
476"""
477 result1 = backend._handle_load(source)
478 assert isinstance(result1, GraphLoaded)
479
480 # Step
481 result2 = backend._handle_step_tick()
482 assert isinstance(result2, StepResult)
483
484 # Reset with reload
485 result3 = backend._handle_reset(reload=True)
486 assert isinstance(result3, GraphLoaded)
487
488 # Step again
489 result4 = backend._handle_step_tick()
490 assert isinstance(result4, StepResult)
491
492 def test_load_with_multiple_pes_and_sms(self):
493 """Load a program with multiple PEs and SMs."""
494 backend = SimulationBackend()
495 # Create a program that actually uses the SM (with a write operation)
496 source = """\
497@system pe=2, sm=1
498&const_val|pe0 <| const, 42
499&write_op|pe0 <| write
500&relay|pe1 <| pass
501&const_val|pe0 |> &write_op|pe0:L
502&write_op|pe0 |> &relay|pe1:L
503"""
504 result = backend._handle_load(source)
505
506 assert isinstance(result, GraphLoaded)
507 snapshot = result.snapshot
508
509 # Verify multiple PEs are present
510 assert len(snapshot.pes) > 0
511 # SMs may or may not have state depending on program execution
512 # The system should have been set up correctly
513 assert backend._system is not None
514
515 def test_run_until_completion(self):
516 """Run simulation until completion."""
517 backend = SimulationBackend()
518 source = """\
519@system pe=1, sm=0
520&c|pe0 <| const, 42
521"""
522 backend._handle_load(source)
523
524 # Run until completion
525 while True:
526 result = backend._handle_step_tick()
527 if result.finished:
528 break
529 if backend._env.now > 1000:
530 pytest.fail("Simulation did not complete")
531
532 assert result.finished is True