""" Tests for network topology, routing, and backpressure. Verifies: - or1-emu.AC4.1: PE-to-PE routing — token with dest PE_id N arrives at PE N's input store - or1-emu.AC4.2: SM routing — token routes to correct SM by SM_id - or1-emu.AC4.3: Backpressure blocking — PE blocks on put() when destination store at capacity - or1-emu.AC4.4: Backpressure release — backpressure releases when consumer drains store """ import simpy from cm_inst import ALUInst, Addr, ArithOp, MemOp, Port, RoutingOp, SMInst from emu import build_topology, PEConfig, SMConfig from emu.pe import ProcessingElement from sm_mod import Presence from tokens import CMToken, DyadToken, MonadToken, SMToken class TestAC41PEtoPERouting: """Test AC4.1: PE-to-PE routing""" def test_monad_token_routes_to_target_pe(self): """PE0 with PASS instruction outputs token routing to PE1's input_store.""" env = simpy.Environment() # PE0 has PASS instruction at offset 0, routing to PE1 # Note: We use an output_store to collect results without involving PE1's process pe0_iram = { 0: ALUInst( op=RoutingOp.PASS, dest_l=Addr(a=0, port=Port.L, pe=1), dest_r=None, const=None, ) } pe0 = ProcessingElement(env, 0, pe0_iram) # Set up output store to collect results (no matching/processing) output_store = simpy.Store(env, capacity=10) pe0.route_table[1] = output_store # Inject a MonadToken to PE0 def inject(): seed_token = MonadToken( target=0, offset=0, ctx=0, data=42, inline=False, ) yield pe0.input_store.put(seed_token) env.process(inject()) env.run(until=100) # Verify output_store received a token assert len(output_store.items) > 0 result_token = output_store.items[0] # PASS returns left operand (data=42) assert result_token.data == 42 assert isinstance(result_token, DyadToken) def test_dual_mode_routes_to_both_pes(self): """Dual-mode instruction routes to both dest_l and dest_r PEs.""" env = simpy.Environment() # PE0 with CONST instruction (dual mode), routes to PE1 and PE2 pe0_iram = { 0: ALUInst( op=RoutingOp.CONST, dest_l=Addr(a=1, port=Port.L, pe=1), dest_r=Addr(a=2, port=Port.R, pe=2), const=99, ) } pe0 = ProcessingElement(env, 0, pe0_iram) # Set up output stores for each destination output_store_1 = simpy.Store(env, capacity=10) output_store_2 = simpy.Store(env, capacity=10) pe0.route_table[1] = output_store_1 pe0.route_table[2] = output_store_2 # Inject token def inject(): seed_token = MonadToken( target=0, offset=0, ctx=0, data=0, inline=False, ) yield pe0.input_store.put(seed_token) env.process(inject()) env.run(until=100) # Both stores should have received tokens assert len(output_store_1.items) > 0 assert len(output_store_2.items) > 0 # Both should have CONST value (99) assert output_store_1.items[0].data == 99 assert output_store_2.items[0].data == 99 class TestAC42SMRouting: """Test AC4.2: SM routing""" def test_direct_sm_injection(self): """Direct injection into SM via inject_sm() works correctly.""" env = simpy.Environment() # Initialize SM0 with a FULL cell at address 5 sm_config = SMConfig( sm_id=0, cell_count=512, initial_cells={5: (Presence.FULL, 42)}, ) sys = build_topology( env, [PEConfig(0, {})], [sm_config], ) # Set up output store for SM results output_store = simpy.Store(env, capacity=10) sys.sms[0].route_table[0] = output_store # Create a READ token for cell 5, returning to PE0 return_route = CMToken(target=0, offset=10, ctx=0, data=0) sm_token = SMToken( target=0, addr=5, op=MemOp.READ, flags=None, data=None, ret=return_route, ) sys.inject(sm_token) env.run() # Verify result arrived in output_store assert len(output_store.items) > 0 result = output_store.items[0] assert result.data == 42 # Cell data was read assert result.target == 0 assert result.offset == 10 def test_pe_emits_sm_write(self): """PE emits SMInst that writes to SM.""" env = simpy.Environment() # PE0 with SMInst(WRITE) at offset 0 pe0_iram = { 0: SMInst( op=MemOp.WRITE, sm_id=0, const=5, # cell address ret=None, ) } sys = build_topology( env, [PEConfig(0, pe0_iram)], [SMConfig(0, cell_count=512)], ) # Inject MonadToken with data=42 to PE0 seed_token = MonadToken( target=0, offset=0, ctx=0, data=42, inline=False, ) sys.inject(seed_token) env.run() # Verify SM0's cell 5 is now FULL with data 42 cell = sys.sms[0].cells[5] assert cell.pres == Presence.FULL assert cell.data_l == 42 def test_pe_emits_sm_read_returns_to_pe(self): """PE emits SMInst(READ) which returns result to PE.""" env = simpy.Environment() # Initialize SM0 with FULL cell at address 3 sm_config = SMConfig( sm_id=0, cell_count=512, initial_cells={3: (Presence.FULL, 77)}, ) # PE0 with SMInst(READ) at offset 0 pe0_iram = { 0: SMInst( op=MemOp.READ, sm_id=0, const=3, # cell address ret=Addr(a=20, port=Port.L, pe=1), # return to PE1 ) } sys = build_topology( env, [PEConfig(0, pe0_iram), PEConfig(1, {})], [sm_config], ) # Set up output store for PE1 results output_store = simpy.Store(env, capacity=10) sys.sms[0].route_table[1] = output_store # Inject MonadToken to PE0 def inject(): seed_token = MonadToken( target=0, offset=0, ctx=0, data=0, inline=False, ) yield sys.pes[0].input_store.put(seed_token) env.process(inject()) env.run() # Verify result arrived in output_store assert len(output_store.items) > 0 result = output_store.items[0] assert result.data == 77 # Read cell data assert result.target == 1 assert result.offset == 20 class TestAC43Backpressure: """Test AC4.3: Backpressure blocking""" def test_backpressure_blocks_on_full_store(self): """PE blocks when destination store reaches capacity.""" env = simpy.Environment() # PE0 with CONST instruction (emits to destination store) pe0_iram = { 0: ALUInst( op=RoutingOp.CONST, dest_l=Addr(a=0, port=Port.L, pe=1), dest_r=None, const=10, ) } pe0 = ProcessingElement(env, 0, pe0_iram, fifo_capacity=8) # Set up a small destination store to trigger backpressure dest_store = simpy.Store(env, capacity=2) pe0.route_table[1] = dest_store # Inject 4 tokens to PE0 def inject_tokens(): for i in range(4): token = MonadToken( target=0, offset=0, ctx=0, data=i, inline=False, ) yield pe0.input_store.put(token) env.process(inject_tokens()) # Run simulation until backpressure takes effect env.run(until=100) # Destination store should have exactly 2 items (at capacity) assert len(dest_store.items) == 2 # PE0 should have processed the first 2 tokens successfully # and blocked on the 3rd assert len(pe0.input_store.items) > 0 def test_pe_unblocks_with_some_tokens(self): """After partial time, some tokens reach destination and store fills.""" env = simpy.Environment() pe0_iram = { 0: ALUInst( op=RoutingOp.CONST, dest_l=Addr(a=0, port=Port.L, pe=1), dest_r=None, const=100, ) } pe0 = ProcessingElement(env, 0, pe0_iram, fifo_capacity=8) # Small destination store dest_store = simpy.Store(env, capacity=2) pe0.route_table[1] = dest_store # Inject 6 tokens def inject_tokens(): for i in range(6): token = MonadToken( target=0, offset=0, ctx=0, data=i, inline=False, ) yield pe0.input_store.put(token) env.process(inject_tokens()) env.run(until=50) # Destination store should be at capacity assert len(dest_store.items) == 2 # fifo_capacity class TestAC44BackpressureRelease: """Test AC4.4: Backpressure release when consumer drains store""" def test_backpressure_releases_with_consumer(self): """When consumer drains destination store, producer unblocks and continues.""" env = simpy.Environment() pe0_iram = { 0: ALUInst( op=RoutingOp.CONST, dest_l=Addr(a=0, port=Port.L, pe=1), dest_r=None, const=42, ) } pe0 = ProcessingElement(env, 0, pe0_iram, fifo_capacity=8) # Small destination store dest_store = simpy.Store(env, capacity=2) pe0.route_table[1] = dest_store # Track consumed tokens consumed = [] # Inject 4 tokens to PE0 def inject_tokens(): for i in range(4): token = MonadToken( target=0, offset=0, ctx=0, data=i, inline=False, ) yield pe0.input_store.put(token) # Consumer process that drains the destination store def consumer(): while True: token = yield dest_store.get() consumed.append(token) env.process(inject_tokens()) env.process(consumer()) env.run() # All 4 injected tokens should have been consumed by the consumer assert len(consumed) == 4 # PE0's input store should be fully drained after all tokens processed assert len(pe0.input_store.items) == 0 def test_multiple_producers_with_consumer(self): """Multiple producers routing to shared destination, consumer drains.""" env = simpy.Environment() # PE0 and PE2 both emit to shared destination pe0_iram = { 0: ALUInst( op=RoutingOp.CONST, dest_l=Addr(a=0, port=Port.L, pe=1), dest_r=None, const=10, ) } pe2_iram = { 0: ALUInst( op=RoutingOp.CONST, dest_l=Addr(a=1, port=Port.L, pe=1), dest_r=None, const=20, ) } pe0 = ProcessingElement(env, 0, pe0_iram, fifo_capacity=8) pe2 = ProcessingElement(env, 2, pe2_iram, fifo_capacity=8) # Shared destination store dest_store = simpy.Store(env, capacity=2) pe0.route_table[1] = dest_store pe2.route_table[1] = dest_store # Inject seeds to both PEs def inject_pe0(): token = MonadToken(target=0, offset=0, ctx=0, data=0, inline=False) yield pe0.input_store.put(token) def inject_pe2(): token = MonadToken(target=2, offset=0, ctx=0, data=2, inline=False) yield pe2.input_store.put(token) # Consumer that drains destination consumed = [] def consumer(): while True: token = yield dest_store.get() consumed.append(token) env.process(inject_pe0()) env.process(inject_pe2()) env.process(consumer()) env.run() assert len(consumed) == 2 assert len(pe0.input_store.items) == 0 assert len(pe2.input_store.items) == 0 class TestNetworkIntegration: """Integration tests for complete network scenarios.""" def test_chain_routing_pe0_to_output(self): """Tokens flow from PE0 through routing.""" env = simpy.Environment() # PE0: PASS to output pe0_iram = { 0: ALUInst( op=RoutingOp.PASS, dest_l=Addr(a=0, port=Port.L, pe=1), dest_r=None, const=None, ) } pe0 = ProcessingElement(env, 0, pe0_iram) # Set up output store output_store = simpy.Store(env, capacity=10) pe0.route_table[1] = output_store # Inject seed def inject(): seed = MonadToken(target=0, offset=0, ctx=0, data=123, inline=False) yield pe0.input_store.put(seed) env.process(inject()) env.run() # Token should arrive at output assert len(output_store.items) > 0 result = output_store.items[0] assert result.data == 123 def test_sm_write_then_read_via_pe(self): """PE writes to SM, then reads back via another operation.""" env = simpy.Environment() # Initialize SM0 with empty cells sm_config = SMConfig(sm_id=0, cell_count=512) # PE0: First write value 88 to cell 10, then read it back # We'll use two separate simulations or a more complex IRAM # For simplicity, do just the write in this test pe0_iram = { 0: SMInst( op=MemOp.WRITE, sm_id=0, const=10, ret=None, ) } sys = build_topology( env, [PEConfig(0, pe0_iram)], [sm_config], ) seed = MonadToken(target=0, offset=0, ctx=0, data=88, inline=False) sys.inject(seed) env.run() # Verify cell 10 is FULL with value 88 cell = sys.sms[0].cells[10] assert cell.pres == Presence.FULL assert cell.data_l == 88 class TestRestrictedTopology: """Test restricted topology via PEConfig allowed routes (AC7.6–AC7.7). Verifies: - AC7.6: build_topology applies route restrictions from PEConfig - AC7.7: PEConfig with None routes preserves full-mesh (backward compatibility) """ def test_ac76_restricted_topology_pe_routes(self): """AC7.6: build_topology restricts PE routes based on allowed_pe_routes.""" env = simpy.Environment() # Create 3 PEs but restrict PE 0 to only route to PE 1 pe0_config = PEConfig(pe_id=0, iram={}, allowed_pe_routes={1}) pe1_config = PEConfig(pe_id=1, iram={}) pe2_config = PEConfig(pe_id=2, iram={}) sys = build_topology(env, [pe0_config, pe1_config, pe2_config], []) # PE 0 should only have PE 1 in its route_table pe0 = sys.pes[0] assert set(pe0.route_table.keys()) == {1} def test_ac76_restricted_topology_sm_routes(self): """AC7.6: build_topology restricts SM routes based on allowed_sm_routes.""" env = simpy.Environment() # Create PE 0 restricted to SM 0 only (not SM 1) pe0_config = PEConfig(pe_id=0, iram={}, allowed_sm_routes={0}) sm0_config = SMConfig(sm_id=0) sm1_config = SMConfig(sm_id=1) sys = build_topology(env, [pe0_config], [sm0_config, sm1_config]) # PE 0 should only have SM 0 in its sm_routes pe0 = sys.pes[0] assert set(pe0.sm_routes.keys()) == {0} def test_ac76_restricted_topology_both_pe_and_sm(self): """AC7.6: build_topology applies both PE and SM route restrictions.""" env = simpy.Environment() # Create PE 0 restricted to PE 1 and SM 0 only pe0_config = PEConfig( pe_id=0, iram={}, allowed_pe_routes={1}, allowed_sm_routes={0} ) pe1_config = PEConfig(pe_id=1, iram={}) pe2_config = PEConfig(pe_id=2, iram={}) sm0_config = SMConfig(sm_id=0) sm1_config = SMConfig(sm_id=1) sys = build_topology( env, [pe0_config, pe1_config, pe2_config], [sm0_config, sm1_config] ) # PE 0 should be restricted in both dimensions pe0 = sys.pes[0] assert set(pe0.route_table.keys()) == {1} assert set(pe0.sm_routes.keys()) == {0} # PE 1 and PE 2 should have full-mesh (no restrictions) pe1 = sys.pes[1] assert set(pe1.route_table.keys()) == {0, 1, 2} assert set(pe1.sm_routes.keys()) == {0, 1} pe2 = sys.pes[2] assert set(pe2.route_table.keys()) == {0, 1, 2} assert set(pe2.sm_routes.keys()) == {0, 1} def test_ac77_none_routes_preserves_full_mesh(self): """AC7.7: PEConfig with None routes preserves full-mesh topology (backward compat).""" env = simpy.Environment() # Create 3 PEs with no route restrictions (None) pe0_config = PEConfig(pe_id=0, iram={}) # allowed_pe_routes=None, allowed_sm_routes=None pe1_config = PEConfig(pe_id=1, iram={}) pe2_config = PEConfig(pe_id=2, iram={}) sm0_config = SMConfig(sm_id=0) sm1_config = SMConfig(sm_id=1) sys = build_topology( env, [pe0_config, pe1_config, pe2_config], [sm0_config, sm1_config] ) # All PEs should have full-mesh routes for pe_id in [0, 1, 2]: pe = sys.pes[pe_id] assert set(pe.route_table.keys()) == {0, 1, 2} assert set(pe.sm_routes.keys()) == {0, 1} def test_ac77_existing_tests_still_pass(self): """AC7.7: Existing test scenarios still work with full-mesh (regression test).""" env = simpy.Environment() # This is the basic test from test_integration.py: CONST feeds ADD pe0_iram = { 0: ALUInst( op=RoutingOp.CONST, dest_l=Addr(a=0, port=Port.L, pe=1), dest_r=None, const=7, ), 1: ALUInst( op=RoutingOp.CONST, dest_l=Addr(a=0, port=Port.R, pe=1), dest_r=None, const=3, ), } pe1_iram = { 0: ALUInst( op=ArithOp.ADD, dest_l=Addr(a=0, port=Port.L, pe=2), dest_r=None, const=None, ), } sys = build_topology( env, [ PEConfig(pe_id=0, iram=pe0_iram), PEConfig(pe_id=1, iram=pe1_iram), PEConfig(pe_id=2, iram={}), ], [], ) # All PEs should have full-mesh routes for pe_id in [0, 1, 2]: pe = sys.pes[pe_id] assert set(pe.route_table.keys()) == {0, 1, 2} # Collector to verify routing works collector_store = simpy.Store(env, capacity=100) sys.pes[1].route_table[2] = collector_store # Inject tokens def injector(): yield sys.pes[0].input_store.put(MonadToken(target=0, offset=0, ctx=0, data=0, inline=False)) yield sys.pes[0].input_store.put(MonadToken(target=0, offset=1, ctx=0, data=0, inline=False)) env.process(injector()) env.run() # Verify result: 7 + 3 = 10 routed to collector assert len(collector_store.items) > 0 result = collector_store.items[0] assert result.data == 10 class TestSystemInjectTokenAPI: """Test System.inject() unified API.""" def test_inject_token_monad(self): """System.inject() can inject MonadToken and PE executes it.""" env = simpy.Environment() # PE0 with PASS instruction routing to PE1 pe0_iram = { 0: ALUInst( op=RoutingOp.PASS, dest_l=Addr(a=0, port=Port.L, pe=1), dest_r=None, const=None, ) } sys = build_topology(env, [PEConfig(0, pe0_iram), PEConfig(1, {})], []) # Inject MonadToken via unified API token = MonadToken(target=0, offset=0, ctx=0, data=0xABCD, inline=False) sys.inject(token) env.run() # PE0 should have executed PASS and emitted the token assert len(sys.pes[0].output_log) >= 1 emitted = [t for t in sys.pes[0].output_log if hasattr(t, 'data') and t.data == 0xABCD] assert len(emitted) == 1 def test_inject_token_dyad(self): """System.inject() can inject DyadToken.""" env = simpy.Environment() # PE0 with ADD instruction pe0_iram = { 0: ALUInst( op=ArithOp.ADD, dest_l=Addr(a=0, port=Port.L, pe=1), dest_r=None, const=None, ) } sys = build_topology(env, [PEConfig(0, pe0_iram), PEConfig(1, {})], []) # Set up output stores output_store = simpy.Store(env, capacity=10) sys.pes[0].route_table[1] = output_store # Create and inject first DyadToken token1 = DyadToken(target=0, offset=0, ctx=0, data=10, port=Port.L, gen=0, wide=False) sys.inject(token1) # Create and inject second DyadToken to fire the instruction token2 = DyadToken(target=0, offset=0, ctx=0, data=20, port=Port.R, gen=0, wide=False) sys.inject(token2) env.run() # Verify ADD result (10 + 20 = 30) assert len(output_store.items) == 1 assert output_store.items[0].data == 30