OR-1 dataflow CPU sketch
at ba08ffded3d3b2badb2a7e22816feafaacea5ded 532 lines 17 kB view raw
1"""Tests for monitor/backend.py SimulationBackend. 2 3Tests verify: 4- or1-monitor.AC1.1: LoadCmd with valid program assembles, builds topology, injects seeds 5- or1-monitor.AC1.2: LoadCmd wires event callbacks into all PEs and SMs 6- or1-monitor.AC1.3: LoadCmd with invalid program returns ErrorResult without crashing 7- or1-monitor.AC1.4: ResetCmd tears down and leaves ready for new LoadCmd 8- or1-monitor.AC1.5: ResetCmd with reload=True reloads program 9- or1-monitor.AC5.2: StepTickCmd processes all events at current time before returning 10- or1-monitor.AC5.3: StepEventCmd processes exactly one event 11- or1-monitor.AC5.4: RunUntilCmd batches events per tick 12- or1-monitor.AC5.5: StepResult contains both events and snapshot 13- or1-monitor.AC5.6: Stepping when simulation finished returns finished=True 14""" 15 16import pytest 17import simpy 18from threading import Thread 19import time 20 21from monitor.backend import SimulationBackend 22from monitor.commands import ( 23 LoadCmd, StepTickCmd, StepEventCmd, RunUntilCmd, InjectCmd, SendCmd, 24 ResetCmd, StopCmd, GraphLoaded, StepResult, ErrorResult 25) 26from monitor.snapshot import StateSnapshot 27from tokens import MonadToken 28 29 30class TestLoadCommand: 31 """Tests for LoadCmd acceptance criteria.""" 32 33 def test_ac11_valid_program_returns_graphloaded(self): 34 """AC1.1: LoadCmd with valid dfasm assembles and returns GraphLoaded.""" 35 backend = SimulationBackend() 36 source = """\ 37@system pe=1, sm=0 38&c|pe0 <| const, 42 39""" 40 result = backend._handle_load(source) 41 42 assert isinstance(result, GraphLoaded) 43 assert result.ir_graph is not None 44 assert result.snapshot is not None 45 assert isinstance(result.snapshot, StateSnapshot) 46 47 def test_ac11_snapshot_has_seed_tokens(self): 48 """AC1.1: Initial snapshot shows injected seed tokens in PE input queue.""" 49 backend = SimulationBackend() 50 source = """\ 51@system pe=1, sm=0 52&c|pe0 <| const, 42 53""" 54 result = backend._handle_load(source) 55 56 assert isinstance(result, GraphLoaded) 57 snapshot = result.snapshot 58 # Seed token from const node should be in PE 0's input queue 59 assert 0 in snapshot.pes 60 pe_snap = snapshot.pes[0] 61 assert len(pe_snap.input_queue) >= 1 # At least the seed token 62 63 def test_ac12_callbacks_wired_to_pes_and_sms(self): 64 """AC1.2: LoadCmd wires on_event callbacks into all PEs and SMs.""" 65 backend = SimulationBackend() 66 source = """\ 67@system pe=1, sm=0 68&const_val|pe0 <| const, 1 69&add_op|pe0 <| add 70&const_val|pe0 |> &add_op|pe0:L 71""" 72 result = backend._handle_load(source) 73 74 assert isinstance(result, GraphLoaded) 75 # Run the simulation to capture events (cycle-accurate timing starts events at time 1+) 76 step_result = backend._handle_run_until(100) 77 # If callbacks are wired, events should be captured 78 assert isinstance(step_result, StepResult) 79 assert len(step_result.events) > 0, "Expected events to be collected if callbacks are wired" 80 # Verify specific event types appear (TokenReceived, Matched, Executed) 81 event_types = {type(e).__name__ for e in step_result.events} 82 assert "TokenReceived" in event_types or "Matched" in event_types or "Executed" in event_types, \ 83 f"Expected at least one of TokenReceived/Matched/Executed, got: {event_types}" 84 85 def test_ac13_invalid_program_returns_error(self): 86 """AC1.3: LoadCmd with invalid dfasm returns ErrorResult.""" 87 backend = SimulationBackend() 88 # Invalid: references undefined label 89 source = """\ 90@system pe=1, sm=0 91&a|pe0 <| const, 5 92&a|pe0 |> &undefined|pe0:L 93""" 94 result = backend._handle_load(source) 95 96 assert isinstance(result, ErrorResult) 97 assert result.message # Should have error message 98 assert backend._system is None # Should not have loaded system 99 100 def test_ac13_backend_still_functional_after_error(self): 101 """AC1.3: Backend remains functional after error (can accept new LoadCmd).""" 102 backend = SimulationBackend() 103 104 # First: attempt invalid load 105 invalid_source = """\ 106@system pe=1, sm=0 107&a|pe0 <| const, 5 108&a|pe0 |> &undefined|pe0:L 109""" 110 result1 = backend._handle_load(invalid_source) 111 assert isinstance(result1, ErrorResult) 112 113 # Second: load valid program — should succeed 114 valid_source = """\ 115@system pe=1, sm=0 116&c|pe0 <| const, 99 117""" 118 result2 = backend._handle_load(valid_source) 119 assert isinstance(result2, GraphLoaded) 120 121 122class TestResetCommand: 123 """Tests for ResetCmd acceptance criteria.""" 124 125 def test_ac14_reset_tears_down_system(self): 126 """AC1.4: ResetCmd tears down current simulation.""" 127 backend = SimulationBackend() 128 source = """\ 129@system pe=1, sm=0 130&c|pe0 <| const, 42 131""" 132 # Load a program 133 backend._handle_load(source) 134 assert backend._system is not None 135 136 # Reset 137 result = backend._handle_reset(reload=False) 138 139 assert backend._system is None 140 assert backend._env is None 141 assert isinstance(result, StepResult) 142 143 def test_ac14_reset_ready_for_new_load(self): 144 """AC1.4: After reset, backend is ready for new LoadCmd.""" 145 backend = SimulationBackend() 146 source1 = """\ 147@system pe=1, sm=0 148&c|pe0 <| const, 42 149""" 150 # Load first program 151 backend._handle_load(source1) 152 153 # Reset 154 backend._handle_reset(reload=False) 155 156 # Load second program — should succeed 157 source2 = """\ 158@system pe=2, sm=0 159&a|pe0 <| const, 1 160&b|pe1 <| pass 161""" 162 result = backend._handle_load(source2) 163 assert isinstance(result, GraphLoaded) 164 165 def test_ac15_reset_with_reload_reloads_program(self): 166 """AC1.5: ResetCmd with reload=True reloads the last program.""" 167 backend = SimulationBackend() 168 source = """\ 169@system pe=1, sm=0 170&c|pe0 <| const, 42 171""" 172 # Load program 173 result1 = backend._handle_load(source) 174 assert isinstance(result1, GraphLoaded) 175 ir_graph1 = result1.ir_graph 176 177 # Reset with reload 178 result2 = backend._handle_reset(reload=True) 179 180 assert isinstance(result2, GraphLoaded) 181 assert backend._system is not None 182 assert result2.ir_graph is not None 183 184 185class TestStepTickCommand: 186 """Tests for StepTickCmd acceptance criteria.""" 187 188 def test_ac52_processes_all_events_at_current_time(self): 189 """AC5.2: StepTickCmd processes all events at current simulation time.""" 190 backend = SimulationBackend() 191 source = """\ 192@system pe=1, sm=0 193&c1|pe0 <| const, 1 194&c2|pe0 <| const, 2 195""" 196 backend._handle_load(source) 197 198 # Run simulation to capture events (cycle-accurate timing starts events at time 1+) 199 result = backend._handle_run_until(100) 200 201 assert isinstance(result, StepResult) 202 assert result.snapshot is not None 203 # Verify events were collected 204 assert len(result.events) > 0, "Expected events to be processed" 205 # After stepping, peek should advance or reach infinity 206 assert result.finished or result.snapshot.next_time > 0, \ 207 f"Expected simulation to progress" 208 209 def test_ac55_result_contains_events_and_snapshot(self): 210 """AC5.5: StepResult contains both events and snapshot.""" 211 backend = SimulationBackend() 212 source = """\ 213@system pe=1, sm=0 214&c|pe0 <| const, 42 215""" 216 backend._handle_load(source) 217 218 result = backend._handle_step_tick() 219 220 assert isinstance(result, StepResult) 221 assert result.snapshot is not None 222 assert isinstance(result.snapshot, StateSnapshot) 223 assert result.events is not None 224 225 def test_ac56_finished_simulation_returns_finished_true(self): 226 """AC5.6: Stepping when finished returns finished=True without error.""" 227 backend = SimulationBackend() 228 source = """\ 229@system pe=1, sm=0 230&c|pe0 <| const, 42 231""" 232 backend._handle_load(source) 233 234 # Step until finished 235 while True: 236 result = backend._handle_step_tick() 237 if result.finished: 238 break 239 # Safety check to prevent infinite loop 240 if backend._env.now > 1000: 241 pytest.fail("Simulation did not finish within 1000 time units") 242 243 # Verify finished state 244 assert result.finished is True 245 assert result.snapshot is not None 246 assert backend._env.peek() == float('inf') 247 248 249class TestStepEventCommand: 250 """Tests for StepEventCmd acceptance criteria.""" 251 252 def test_ac53_processes_exactly_one_event(self): 253 """AC5.3: StepEventCmd processes exactly one event.""" 254 backend = SimulationBackend() 255 source = """\ 256@system pe=1, sm=0 257&c1|pe0 <| const, 1 258&c2|pe0 <| const, 2 259&result|pe0 <| add 260&c1|pe0 |> &result|pe0:L 261&c2|pe0 |> &result|pe0:R 262""" 263 backend._handle_load(source) 264 265 # Collect events across multiple steps - at least some steps should have events 266 all_events = [] 267 for _ in range(10): 268 result = backend._handle_step_event() 269 assert isinstance(result, StepResult) 270 all_events.extend(result.events) 271 if result.finished: 272 break 273 274 # After stepping multiple times, we should have collected some events 275 assert len(all_events) >= 1, f"Expected at least one event across 10 steps, got {len(all_events)}" 276 277 def test_ac53_repeated_events_make_progress(self): 278 """AC5.3: Multiple StepEventCmd calls process each event separately.""" 279 backend = SimulationBackend() 280 source = """\ 281@system pe=1, sm=0 282&c1|pe0 <| const, 1 283&c2|pe0 <| const, 2 284&result|pe0 <| add 285&c1|pe0 |> &result|pe0:L 286&c2|pe0 |> &result|pe0:R 287""" 288 backend._handle_load(source) 289 290 # Collect time values after each step 291 times = [] 292 event_count = 0 293 for _ in range(10): 294 result = backend._handle_step_event() 295 times.append(backend._env.now) 296 # Track total events processed 297 event_count += len(result.events) 298 if result.finished: 299 break 300 301 # Times should be non-decreasing (verifies events are stepped individually) 302 assert times == sorted(times), f"Times not monotonic: {times}" 303 # Verify that at least some events were processed (not a crash) 304 assert event_count >= 1, f"Expected at least one event across steps, got {event_count}" 305 306 307class TestRunUntilCommand: 308 """Tests for RunUntilCmd acceptance criteria.""" 309 310 def test_ac54_batches_events_per_tick(self): 311 """AC5.4: RunUntilCmd batches events per tick.""" 312 backend = SimulationBackend() 313 source = """\ 314@system pe=1, sm=0 315&c1|pe0 <| const, 1 316&c2|pe0 <| const, 2 317&result|pe0 <| add 318&c1|pe0 |> &result|pe0:L 319&c2|pe0 |> &result|pe0:R 320""" 321 backend._handle_load(source) 322 323 # Run until time 10 324 result = backend._handle_run_until(10.0) 325 326 assert isinstance(result, StepResult) 327 assert result.snapshot is not None 328 # Verify events were collected and sim time <= target or finished 329 assert len(result.events) > 0, "Expected events to be batched" 330 assert result.sim_time <= 10.0 or result.finished, \ 331 f"Expected sim_time <= 10.0 or finished, got {result.sim_time} finished={result.finished}" 332 333 def test_ac54_stops_at_target_time(self): 334 """AC5.4: RunUntilCmd stops at or before target time.""" 335 backend = SimulationBackend() 336 source = """\ 337@system pe=1, sm=0 338&c|pe0 <| const, 42 339""" 340 backend._handle_load(source) 341 342 target = 50.0 343 result = backend._handle_run_until(target) 344 345 # Sim time should be <= target (or finished) 346 assert backend._env.now <= target or result.finished 347 348 349class TestInjectCommand: 350 """Tests for InjectCmd.""" 351 352 def test_inject_token_appears_in_snapshot(self): 353 """InjectCmd injects token into correct PE.""" 354 backend = SimulationBackend() 355 source = """\ 356@system pe=1, sm=0 357&c|pe0 <| const, 42 358""" 359 backend._handle_load(source) 360 361 # Inject a token 362 token = MonadToken(target=0, offset=0, ctx=0, data=99, inline=True) 363 result = backend._handle_inject(token) 364 365 assert isinstance(result, StepResult) 366 # Token should be in PE 0's input queue 367 snapshot = result.snapshot 368 assert 0 in snapshot.pes 369 pe_snap = snapshot.pes[0] 370 assert token in pe_snap.input_queue 371 372 373class TestSendCommand: 374 """Tests for SendCmd.""" 375 376 def test_send_token_respects_backpressure(self): 377 """SendCmd sends token via SimPy store.put().""" 378 backend = SimulationBackend() 379 source = """\ 380@system pe=1, sm=0 381&c|pe0 <| const, 42 382""" 383 backend._handle_load(source) 384 385 # Send a token (should go through SimPy backpressure mechanism) 386 token = MonadToken(target=0, offset=0, ctx=0, data=77, inline=True) 387 result = backend._handle_send(token) 388 389 assert isinstance(result, StepResult) 390 assert result.snapshot is not None 391 392 393class TestThreadedInterface: 394 """Tests for the threaded interface (start, send_command, stop).""" 395 396 def test_start_stop_threading(self): 397 """Backend threading interface starts and stops cleanly.""" 398 backend = SimulationBackend() 399 backend.start() 400 401 # Send a command 402 source = """\ 403@system pe=1, sm=0 404&c|pe0 <| const, 42 405""" 406 result = backend.send_command(LoadCmd(source=source), timeout=5.0) 407 408 assert isinstance(result, GraphLoaded) 409 410 # Stop 411 backend.stop() 412 413 def test_send_command_timeout(self): 414 """send_command respects timeout parameter.""" 415 backend = SimulationBackend() 416 backend.start() 417 418 source = """\ 419@system pe=1, sm=0 420&c|pe0 <| const, 42 421""" 422 # This should succeed within timeout 423 result = backend.send_command(LoadCmd(source=source), timeout=5.0) 424 assert isinstance(result, GraphLoaded) 425 426 backend.stop() 427 428 def test_threaded_step_commands(self): 429 """Multiple step commands work in threaded mode.""" 430 backend = SimulationBackend() 431 backend.start() 432 433 source = """\ 434@system pe=1, sm=0 435&c|pe0 <| const, 42 436""" 437 backend.send_command(LoadCmd(source=source), timeout=5.0) 438 439 # Step a few times 440 for _ in range(3): 441 result = backend.send_command(StepTickCmd(), timeout=5.0) 442 assert isinstance(result, StepResult) 443 if result.finished: 444 break 445 446 backend.stop() 447 448 def test_error_handling_in_thread(self): 449 """Backend catches and returns errors from thread.""" 450 backend = SimulationBackend() 451 backend.start() 452 453 # Send invalid program 454 result = backend.send_command( 455 LoadCmd(source="@system pe=1, sm=0\n&a|pe0 |> &undefined|pe0:L"), 456 timeout=5.0 457 ) 458 459 assert isinstance(result, ErrorResult) 460 assert result.message 461 462 backend.stop() 463 464 465class TestSequentialWorkflow: 466 """Integration tests for typical workflows.""" 467 468 def test_load_step_reset_reload_workflow(self): 469 """Workflow: Load → Step → Reset with reload → Step again.""" 470 backend = SimulationBackend() 471 472 # Load 473 source = """\ 474@system pe=1, sm=0 475&c|pe0 <| const, 42 476""" 477 result1 = backend._handle_load(source) 478 assert isinstance(result1, GraphLoaded) 479 480 # Step 481 result2 = backend._handle_step_tick() 482 assert isinstance(result2, StepResult) 483 484 # Reset with reload 485 result3 = backend._handle_reset(reload=True) 486 assert isinstance(result3, GraphLoaded) 487 488 # Step again 489 result4 = backend._handle_step_tick() 490 assert isinstance(result4, StepResult) 491 492 def test_load_with_multiple_pes_and_sms(self): 493 """Load a program with multiple PEs and SMs.""" 494 backend = SimulationBackend() 495 # Create a program that actually uses the SM (with a write operation) 496 source = """\ 497@system pe=2, sm=1 498&const_val|pe0 <| const, 42 499&write_op|pe0 <| write 500&relay|pe1 <| pass 501&const_val|pe0 |> &write_op|pe0:L 502&write_op|pe0 |> &relay|pe1:L 503""" 504 result = backend._handle_load(source) 505 506 assert isinstance(result, GraphLoaded) 507 snapshot = result.snapshot 508 509 # Verify multiple PEs are present 510 assert len(snapshot.pes) > 0 511 # SMs may or may not have state depending on program execution 512 # The system should have been set up correctly 513 assert backend._system is not None 514 515 def test_run_until_completion(self): 516 """Run simulation until completion.""" 517 backend = SimulationBackend() 518 source = """\ 519@system pe=1, sm=0 520&c|pe0 <| const, 42 521""" 522 backend._handle_load(source) 523 524 # Run until completion 525 while True: 526 result = backend._handle_step_tick() 527 if result.finished: 528 break 529 if backend._env.now > 1000: 530 pytest.fail("Simulation did not complete") 531 532 assert result.finished is True