OR-1 dataflow CPU sketch
1"""Unit tests for monitor REPL functionality.
2
3Tests cover all acceptance criteria for AC4.1-AC4.9, verifying REPL
4commands work correctly with a synchronized backend and proper
5state management.
6"""
7
8from __future__ import annotations
9
10import io
11import sys
12from pathlib import Path
13from tempfile import NamedTemporaryFile
14
15import pytest
16
17from cm_inst import MemOp
18from monitor.backend import SimulationBackend
19from monitor.repl import MonitorREPL
20from tokens import MonadToken, SMToken
21
22
23@pytest.fixture
24def temp_dfasm_file():
25 """Create a temporary dfasm file for testing.
26
27 A minimal valid dfasm program that loads without errors.
28 """
29 content = """@system pe=1, sm=0
30&c1 <| const, 3
31&c2 <| const, 7
32&result <| add
33&c1 |> &result:L
34&c2 |> &result:R
35"""
36 with NamedTemporaryFile(mode="w", suffix=".dfasm", delete=False) as f:
37 f.write(content)
38 f.flush()
39 path = Path(f.name)
40
41 yield path
42
43 # Cleanup
44 path.unlink()
45
46
47@pytest.fixture
48def backend():
49 """Create and start a simulation backend for testing."""
50 b = SimulationBackend()
51 b.start()
52 yield b
53 b.stop()
54
55
56@pytest.fixture
57def repl(backend):
58 """Create a REPL instance with a test backend."""
59 return MonitorREPL(backend)
60
61
62class TestREPLLoad:
63 """Tests for AC4.1: load command assembles and loads dfasm."""
64
65 def test_load_valid_file(self, repl, temp_dfasm_file):
66 """load <path> should load a valid dfasm file."""
67 # Capture output
68 output = io.StringIO()
69 repl.stdout = output
70
71 # Call do_load
72 repl.do_load(str(temp_dfasm_file))
73
74 # Check state
75 assert repl._loaded is True
76 assert repl._last_snapshot is not None
77 assert "Loaded" in output.getvalue()
78
79 def test_load_nonexistent_file(self, repl):
80 """load with nonexistent path should fail gracefully."""
81 output = io.StringIO()
82 repl.stdout = output
83
84 repl.do_load("/nonexistent/path.dfasm")
85
86 assert repl._loaded is False
87 assert "not found" in output.getvalue()
88
89 def test_load_invalid_syntax(self, repl):
90 """load with invalid dfasm syntax should report error."""
91 # Create invalid file
92 with NamedTemporaryFile(mode="w", suffix=".dfasm", delete=False) as f:
93 f.write("@@@ invalid syntax @@@")
94 f.flush()
95 path = Path(f.name)
96
97 try:
98 output = io.StringIO()
99 repl.stdout = output
100
101 repl.do_load(str(path))
102
103 assert repl._loaded is False
104 out = output.getvalue()
105 assert "Error" in out
106 finally:
107 path.unlink()
108
109 def test_load_without_args(self, repl):
110 """load without args should print usage."""
111 output = io.StringIO()
112 repl.stdout = output
113
114 repl.do_load("")
115
116 assert "Usage" in output.getvalue()
117
118
119class TestREPLStep:
120 """Tests for AC4.2: step and stepe commands advance simulation."""
121
122 def test_step_without_load(self, repl):
123 """step without loaded simulation should error (AC4.8)."""
124 output = io.StringIO()
125 repl.stdout = output
126
127 repl.do_step("")
128
129 assert "No simulation loaded" in output.getvalue()
130
131 def test_step_after_load(self, repl, temp_dfasm_file):
132 """step after loading should advance simulation."""
133 # Load first
134 output = io.StringIO()
135 repl.stdout = output
136
137 repl.do_load(str(temp_dfasm_file))
138 assert repl._loaded
139
140 # Clear and step
141 output = io.StringIO()
142 repl.stdout = output
143
144 repl.do_step("")
145
146 # Should produce output (snapshot and/or events)
147 assert len(output.getvalue()) > 0
148
149 def test_stepe_after_load(self, repl, temp_dfasm_file):
150 """stepe after loading should advance by one event."""
151 # Load first
152 output = io.StringIO()
153 repl.stdout = output
154
155 repl.do_load(str(temp_dfasm_file))
156 assert repl._loaded
157
158 # Clear and step
159 output = io.StringIO()
160 repl.stdout = output
161
162 repl.do_stepe("")
163
164 # Should produce output
165 assert len(output.getvalue()) > 0
166
167 def test_stepe_without_load(self, repl):
168 """stepe without loaded simulation should error (AC4.8)."""
169 output = io.StringIO()
170 repl.stdout = output
171
172 repl.do_stepe("")
173
174 assert "No simulation loaded" in output.getvalue()
175
176
177class TestREPLRun:
178 """Tests for AC4.3: run command runs until target time."""
179
180 def test_run_without_load(self, repl):
181 """run without loaded simulation should error (AC4.8)."""
182 output = io.StringIO()
183 repl.stdout = output
184
185 repl.do_run("10")
186
187 assert "No simulation loaded" in output.getvalue()
188
189 def test_run_after_load(self, repl, temp_dfasm_file):
190 """run <until> should run until target time."""
191 # Load first
192 output = io.StringIO()
193 repl.stdout = output
194
195 repl.do_load(str(temp_dfasm_file))
196 assert repl._loaded
197
198 # Clear and run
199 output = io.StringIO()
200 repl.stdout = output
201
202 repl.do_run("10")
203
204 # Should produce output
205 assert len(output.getvalue()) > 0
206
207 def test_run_invalid_time(self, repl, temp_dfasm_file):
208 """run with invalid time should error."""
209 # Load first
210 output = io.StringIO()
211 repl.stdout = output
212
213 repl.do_load(str(temp_dfasm_file))
214
215 # Try invalid time
216 output = io.StringIO()
217 repl.stdout = output
218
219 repl.do_run("not_a_number")
220
221 assert "Invalid time" in output.getvalue()
222
223 def test_run_without_args(self, repl, temp_dfasm_file):
224 """run without args should print usage."""
225 # Load first
226 output = io.StringIO()
227 repl.stdout = output
228
229 repl.do_load(str(temp_dfasm_file))
230
231 # Try without args
232 output = io.StringIO()
233 repl.stdout = output
234
235 repl.do_run("")
236
237 assert "Usage" in output.getvalue()
238
239
240class TestREPLSend:
241 """Tests for AC4.4: send command injects token via FIFO."""
242
243 def test_send_monad_without_load(self, repl):
244 """send without loaded simulation should error (AC4.8)."""
245 output = io.StringIO()
246 repl.stdout = output
247
248 repl.do_send("0 0 0 42")
249
250 assert "No simulation loaded" in output.getvalue()
251
252 def test_send_monad_token(self, repl, temp_dfasm_file):
253 """send <target> <offset> <ctx> <data> should send MonadToken."""
254 # Load first
255 output = io.StringIO()
256 repl.stdout = output
257
258 repl.do_load(str(temp_dfasm_file))
259 assert repl._loaded
260
261 # Clear and send
262 output = io.StringIO()
263 repl.stdout = output
264
265 repl.do_send("0 0 0 42")
266
267 # Should produce output (no exception)
268 assert "Sent" in output.getvalue()
269
270 def test_send_sm_token(self, repl, temp_dfasm_file):
271 """send <target> --sm <addr> <op> should send SMToken."""
272 # Load first
273 output = io.StringIO()
274 repl.stdout = output
275
276 repl.do_load(str(temp_dfasm_file))
277 assert repl._loaded
278
279 # Clear and send SM token
280 output = io.StringIO()
281 repl.stdout = output
282
283 repl.do_send("0 --sm 5 READ")
284
285 # Should produce output (no exception)
286 assert "Sent" in output.getvalue()
287
288 def test_send_sm_token_with_data(self, repl, temp_dfasm_file):
289 """send <target> --sm <addr> <op> <data> should work."""
290 # Load first
291 output = io.StringIO()
292 repl.stdout = output
293
294 repl.do_load(str(temp_dfasm_file))
295 assert repl._loaded
296
297 # Send SM token with data
298 output = io.StringIO()
299 repl.stdout = output
300
301 repl.do_send("0 --sm 5 WRITE 123")
302
303 assert "Sent" in output.getvalue()
304
305 def test_send_invalid_args(self, repl, temp_dfasm_file):
306 """send with invalid args should error gracefully."""
307 # Load first
308 output = io.StringIO()
309 repl.stdout = output
310
311 repl.do_load(str(temp_dfasm_file))
312
313 # Try invalid args
314 output = io.StringIO()
315 repl.stdout = output
316
317 repl.do_send("not_an_int")
318
319 assert "Error" in output.getvalue()
320
321 def test_send_unknown_memop(self, repl, temp_dfasm_file):
322 """send with unknown MemOp should error."""
323 # Load first
324 output = io.StringIO()
325 repl.stdout = output
326
327 repl.do_load(str(temp_dfasm_file))
328
329 # Try unknown MemOp
330 output = io.StringIO()
331 repl.stdout = output
332
333 repl.do_send("0 --sm 5 UNKNOWN_OP")
334
335 assert "Unknown MemOp" in output.getvalue()
336
337 def test_send_without_args(self, repl, temp_dfasm_file):
338 """send without args should print usage."""
339 # Load first
340 output = io.StringIO()
341 repl.stdout = output
342
343 repl.do_load(str(temp_dfasm_file))
344
345 # Try without args
346 output = io.StringIO()
347 repl.stdout = output
348
349 repl.do_send("")
350
351 assert "Usage" in output.getvalue()
352
353
354class TestREPLInject:
355 """Tests for AC4.5: inject command does direct injection."""
356
357 def test_inject_monad_without_load(self, repl):
358 """inject without loaded simulation should error (AC4.8)."""
359 output = io.StringIO()
360 repl.stdout = output
361
362 repl.do_inject("0 0 0 42")
363
364 assert "No simulation loaded" in output.getvalue()
365
366 def test_inject_monad_token(self, repl, temp_dfasm_file):
367 """inject <target> <offset> <ctx> <data> should inject MonadToken."""
368 # Load first
369 output = io.StringIO()
370 repl.stdout = output
371
372 repl.do_load(str(temp_dfasm_file))
373 assert repl._loaded
374
375 # Clear and inject
376 output = io.StringIO()
377 repl.stdout = output
378
379 repl.do_inject("0 0 0 42")
380
381 # Should produce output (no exception)
382 assert "Injected" in output.getvalue()
383
384 def test_inject_sm_token(self, repl):
385 """inject <target> --sm <addr> <op> should inject SMToken without throwing."""
386 # For this test, we just verify token parsing works
387 # Actual injection will fail without SM, but parsing should succeed
388 output = io.StringIO()
389 repl.stdout = output
390
391 # Set loaded manually just to bypass the check
392 repl._loaded = True
393
394 # Just verify the token parsing happens without exception
395 token = repl._parse_token_args("0 --sm 5 WRITE")
396 assert token.addr == 5
397
398 def test_inject_invalid_args(self, repl, temp_dfasm_file):
399 """inject with invalid args should error gracefully."""
400 # Load first
401 output = io.StringIO()
402 repl.stdout = output
403
404 repl.do_load(str(temp_dfasm_file))
405
406 # Try invalid args
407 output = io.StringIO()
408 repl.stdout = output
409
410 repl.do_inject("invalid")
411
412 assert "Error" in output.getvalue()
413
414
415class TestREPLState:
416 """Tests for AC4.6: state, pe, sm commands display readable state."""
417
418 def test_state_without_load(self, repl):
419 """state without loaded simulation should error (AC4.8)."""
420 output = io.StringIO()
421 repl.stdout = output
422
423 repl.do_state("")
424
425 assert "No simulation loaded" in output.getvalue()
426
427 def test_state_after_load(self, repl, temp_dfasm_file):
428 """state should display system state summary."""
429 # Load first
430 output = io.StringIO()
431 repl.stdout = output
432
433 repl.do_load(str(temp_dfasm_file))
434 assert repl._loaded
435
436 # Clear and call state
437 output = io.StringIO()
438 repl.stdout = output
439
440 repl.do_state("")
441
442 out = output.getvalue()
443 # Should show PE/SM counts and time info
444 assert len(out) > 0
445 assert ("PE" in out or "SM" in out or "Time" in out)
446
447 def test_pe_without_load(self, repl):
448 """pe without loaded simulation should error (AC4.8)."""
449 output = io.StringIO()
450 repl.stdout = output
451
452 repl.do_pe("0")
453
454 assert "No simulation loaded" in output.getvalue()
455
456 def test_pe_after_load(self, repl, temp_dfasm_file):
457 """pe <id> should display PE state."""
458 # Load first
459 output = io.StringIO()
460 repl.stdout = output
461
462 repl.do_load(str(temp_dfasm_file))
463 assert repl._loaded
464
465 # Clear and call pe
466 output = io.StringIO()
467 repl.stdout = output
468
469 repl.do_pe("0")
470
471 out = output.getvalue()
472 # Should show PE state or not found message
473 assert len(out) > 0
474
475 def test_pe_invalid_id(self, repl, temp_dfasm_file):
476 """pe with non-integer ID should error."""
477 # Load first
478 output = io.StringIO()
479 repl.stdout = output
480
481 repl.do_load(str(temp_dfasm_file))
482
483 # Try invalid ID
484 output = io.StringIO()
485 repl.stdout = output
486
487 repl.do_pe("invalid_id")
488
489 assert "Invalid PE ID" in output.getvalue()
490
491 def test_sm_without_load(self, repl):
492 """sm without loaded simulation should error (AC4.8)."""
493 output = io.StringIO()
494 repl.stdout = output
495
496 repl.do_sm("0")
497
498 assert "No simulation loaded" in output.getvalue()
499
500 def test_sm_after_load(self, repl, temp_dfasm_file):
501 """sm <id> should display SM state."""
502 # Load first
503 output = io.StringIO()
504 repl.stdout = output
505
506 repl.do_load(str(temp_dfasm_file))
507 assert repl._loaded
508
509 # Clear and call sm
510 output = io.StringIO()
511 repl.stdout = output
512
513 repl.do_sm("0")
514
515 out = output.getvalue()
516 # Should show SM state or not found message
517 assert len(out) > 0
518
519 def test_sm_invalid_id(self, repl, temp_dfasm_file):
520 """sm with non-integer ID should error."""
521 # Load first
522 output = io.StringIO()
523 repl.stdout = output
524
525 repl.do_load(str(temp_dfasm_file))
526
527 # Try invalid ID
528 output = io.StringIO()
529 repl.stdout = output
530
531 repl.do_sm("invalid_id")
532
533 assert "Invalid SM ID" in output.getvalue()
534
535
536class TestREPLLog:
537 """Tests for AC4.7: log command shows recent events."""
538
539 def test_log_without_events(self, repl):
540 """log with no events should indicate no events."""
541 output = io.StringIO()
542 repl.stdout = output
543
544 repl.do_log("")
545
546 assert "No events" in output.getvalue()
547
548 def test_log_after_step(self, repl, temp_dfasm_file):
549 """log after stepping should show events."""
550 # Load and step
551 output = io.StringIO()
552 repl.stdout = output
553
554 repl.do_load(str(temp_dfasm_file))
555 repl.do_step("")
556
557 # Now log
558 output = io.StringIO()
559 repl.stdout = output
560
561 repl.do_log("")
562
563 # Output depends on whether there were events
564 out = output.getvalue()
565 assert len(out) > 0
566
567 def test_log_filter_pe(self, repl, temp_dfasm_file):
568 """log pe:<id> should filter by PE component."""
569 # Load and step a program with multiple components
570 output = io.StringIO()
571 repl.stdout = output
572
573 # Create a dfasm with multiple PEs to test filtering
574 import tempfile
575 from pathlib import Path
576 with tempfile.NamedTemporaryFile(mode='w', suffix='.dfasm', delete=False) as f:
577 f.write("""\
578@system pe=2, sm=1
579&c0|pe0 <| const, 1
580&c1|pe1 <| const, 2
581&pass0|pe0 <| pass
582&pass1|pe1 <| pass
583&c0|pe0 |> &pass0|pe0:L
584&c1|pe1 |> &pass1|pe1:L
585""")
586 temp_file = f.name
587
588 try:
589 repl.do_load(temp_file)
590 # Run simulation to capture events (cycle-accurate timing starts events at time 1+)
591 repl.do_run("100")
592
593 # Filter by PE 0
594 output = io.StringIO()
595 repl.stdout = output
596 repl.do_log("pe:0")
597
598 # Should complete without exception and contain pe:0 events
599 output_str = output.getvalue()
600 assert len(output_str) > 0, "Expected output from filtering"
601 # The filtered output should contain references to pe:0
602 assert "pe:0" in output_str or "PE 0" in output_str, \
603 "Expected filtered output to contain pe:0"
604 # Verify that filtering actually excludes pe:1
605 assert "pe:1" not in output_str and "PE 1" not in output_str, \
606 "Expected filtered output to exclude pe:1"
607 finally:
608 Path(temp_file).unlink(missing_ok=True)
609
610 def test_log_filter_sm(self, repl, temp_dfasm_file):
611 """log sm:<id> should filter by SM component."""
612 # Load and step a program with SM operations
613 output = io.StringIO()
614 repl.stdout = output
615
616 # Create a dfasm with SM operations to generate SM events
617 import tempfile
618 from pathlib import Path
619 with tempfile.NamedTemporaryFile(mode='w', suffix='.dfasm', delete=False) as f:
620 f.write("""\
621@system pe=2, sm=2
622&c0|pe0 <| const, 100
623&write_op|pe0 <| write
624&c1|pe1 <| const, 200
625&read_op|pe1 <| read
626&c0|pe0 |> &write_op|pe0:L
627&c1|pe1 |> &read_op|pe1:L
628""")
629 temp_file = f.name
630
631 try:
632 repl.do_load(temp_file)
633 # Step multiple times to ensure events are generated
634 for _ in range(5):
635 repl.do_step("")
636
637 # Filter by SM 0
638 output = io.StringIO()
639 repl.stdout = output
640 repl.do_log("sm:0")
641
642 # Should complete without exception
643 output_str = output.getvalue()
644 assert len(output_str) > 0, "Expected output from SM filtering"
645 # Verify that sm:1 is NOT in the filtered output (exclusion check)
646 assert "sm:1" not in output_str, \
647 "Expected filtered output (sm:0) to exclude sm:1 events"
648 finally:
649 Path(temp_file).unlink(missing_ok=True)
650
651 def test_log_filter_by_type(self, repl, temp_dfasm_file):
652 """log <type> should filter by event type name."""
653 # Load and step
654 output = io.StringIO()
655 repl.stdout = output
656
657 repl.do_load(str(temp_dfasm_file))
658 repl.do_step("")
659
660 # Filter by type
661 output = io.StringIO()
662 repl.stdout = output
663
664 repl.do_log("TokenReceived")
665
666 # Should complete without exception
667 assert len(output.getvalue()) > 0
668
669
670class TestREPLTime:
671 """Tests for time command."""
672
673 def test_time_shows_current_time(self, repl, temp_dfasm_file):
674 """time should show current simulation time."""
675 # Load first
676 output = io.StringIO()
677 repl.stdout = output
678
679 repl.do_load(str(temp_dfasm_file))
680
681 # Check time
682 output = io.StringIO()
683 repl.stdout = output
684
685 repl.do_time("")
686
687 assert "Simulation time" in output.getvalue()
688
689
690class TestREPLReset:
691 """Tests for AC4.9: reset command clears simulation state."""
692
693 def test_reset_clears_state(self, repl, temp_dfasm_file):
694 """reset should clear loaded state and event history."""
695 # Load first
696 output = io.StringIO()
697 repl.stdout = output
698
699 repl.do_load(str(temp_dfasm_file))
700 assert repl._loaded
701
702 # Reset
703 output = io.StringIO()
704 repl.stdout = output
705
706 repl.do_reset("")
707
708 assert repl._loaded is False
709 assert len(repl._event_history) == 0
710 assert "reset" in output.getvalue()
711
712 def test_step_after_reset_errors(self, repl, temp_dfasm_file):
713 """step after reset should fail (AC4.8)."""
714 # Load and reset
715 output = io.StringIO()
716 repl.stdout = output
717
718 repl.do_load(str(temp_dfasm_file))
719 repl.do_reset("")
720
721 # Try step
722 output = io.StringIO()
723 repl.stdout = output
724
725 repl.do_step("")
726
727 assert "No simulation loaded" in output.getvalue()
728
729
730class TestREPLQuit:
731 """Tests for AC4.9: quit command exits cleanly."""
732
733 def test_quit_returns_true(self, repl):
734 """do_quit should return True to exit cmdloop."""
735 result = repl.do_quit("")
736 assert result is True
737
738 def test_eof_returns_true(self, repl):
739 """do_EOF should return True for Ctrl-D."""
740 result = repl.do_EOF("")
741 assert result is True
742
743
744class TestTokenParsing:
745 """Tests for token parsing in send/inject."""
746
747 def test_parse_monad_token(self, repl):
748 """Parse MonadToken from args."""
749 token = repl._parse_token_args("0 10 5 42")
750 assert isinstance(token, MonadToken)
751 assert token.target == 0
752 assert token.offset == 10
753 assert token.ctx == 5
754 assert token.data == 42
755 assert token.inline is False
756
757 def test_parse_sm_token_read(self, repl):
758 """Parse SMToken with READ op."""
759 token = repl._parse_token_args("1 --sm 20 READ")
760 assert isinstance(token, SMToken)
761 assert token.target == 1
762 assert token.addr == 20
763 assert token.op == MemOp.READ
764 assert token.data is None
765
766 def test_parse_sm_token_with_data(self, repl):
767 """Parse SMToken with data."""
768 token = repl._parse_token_args("1 --sm 20 WRITE 999")
769 assert isinstance(token, SMToken)
770 assert token.target == 1
771 assert token.addr == 20
772 assert token.op == MemOp.WRITE
773 assert token.data == 999
774
775 def test_parse_invalid_target(self, repl):
776 """Parse should fail on invalid target."""
777 with pytest.raises(ValueError):
778 repl._parse_token_args("not_a_number 0 0 0")
779
780 def test_parse_monad_missing_args(self, repl):
781 """Parse should fail if MonadToken args are incomplete."""
782 with pytest.raises(ValueError):
783 repl._parse_token_args("0 10 5") # Missing data
784
785 def test_parse_sm_missing_args(self, repl):
786 """Parse should fail if SMToken args are incomplete."""
787 with pytest.raises(ValueError):
788 repl._parse_token_args("0 --sm 20") # Missing op
789
790 def test_parse_sm_unknown_op(self, repl):
791 """Parse should fail on unknown MemOp."""
792 with pytest.raises(ValueError, match="Unknown MemOp"):
793 repl._parse_token_args("0 --sm 20 INVALID_OP")