""" Tests for network topology, routing, and backpressure. Verifies: - or1-emu.AC4.1: PE-to-PE routing — token with dest PE_id N arrives at PE N's input store - or1-emu.AC4.2: SM routing — token routes to correct SM by SM_id - or1-emu.AC4.3: Backpressure blocking — PE blocks on put() when destination store at capacity - or1-emu.AC4.4: Backpressure release — backpressure releases when consumer drains store """ import simpy from cm_inst import ALUInst, Addr, ArithOp, MemOp, Port, RoutingOp, SMInst from emu import build_topology, PEConfig, SMConfig from emu.pe import ProcessingElement from sm_mod import Presence from tokens import CMToken, DyadToken, MonadToken, SMToken class TestAC41PEtoPERouting: """Test AC4.1: PE-to-PE routing""" def test_monad_token_routes_to_target_pe(self): """PE0 with PASS instruction outputs token routing to PE1's input_store.""" env = simpy.Environment() # PE0 has PASS instruction at offset 0, routing to PE1 # Note: We use an output_store to collect results without involving PE1's process pe0_iram = { 0: ALUInst( op=RoutingOp.PASS, dest_l=Addr(a=0, port=Port.L, pe=1), dest_r=None, const=None, ) } pe0 = ProcessingElement(env, 0, pe0_iram) # Set up output store to collect results (no matching/processing) output_store = simpy.Store(env, capacity=10) pe0.route_table[1] = output_store # Inject a MonadToken to PE0 def inject(): seed_token = MonadToken( target=0, offset=0, ctx=0, data=42, inline=False, ) yield pe0.input_store.put(seed_token) env.process(inject()) env.run(until=100) # Verify output_store received a token assert len(output_store.items) > 0 result_token = output_store.items[0] # PASS returns left operand (data=42) assert result_token.data == 42 assert isinstance(result_token, DyadToken) def test_dual_mode_routes_to_both_pes(self): """Dual-mode instruction routes to both dest_l and dest_r PEs.""" env = simpy.Environment() # PE0 with CONST instruction (dual mode), routes to PE1 and PE2 pe0_iram = { 0: ALUInst( op=RoutingOp.CONST, dest_l=Addr(a=1, port=Port.L, pe=1), dest_r=Addr(a=2, port=Port.R, pe=2), const=99, ) } pe0 = ProcessingElement(env, 0, pe0_iram) # Set up output stores for each destination output_store_1 = simpy.Store(env, capacity=10) output_store_2 = simpy.Store(env, capacity=10) pe0.route_table[1] = output_store_1 pe0.route_table[2] = output_store_2 # Inject token def inject(): seed_token = MonadToken( target=0, offset=0, ctx=0, data=0, inline=False, ) yield pe0.input_store.put(seed_token) env.process(inject()) env.run(until=100) # Both stores should have received tokens assert len(output_store_1.items) > 0 assert len(output_store_2.items) > 0 # Both should have CONST value (99) assert output_store_1.items[0].data == 99 assert output_store_2.items[0].data == 99 class TestAC42SMRouting: """Test AC4.2: SM routing""" def test_direct_sm_injection(self): """Direct injection into SM via inject_sm() works correctly.""" env = simpy.Environment() # Initialize SM0 with a FULL cell at address 5 sm_config = SMConfig( sm_id=0, cell_count=512, initial_cells={5: (Presence.FULL, 42)}, ) sys = build_topology( env, [PEConfig(0, {})], [sm_config], ) # Set up output store for SM results output_store = simpy.Store(env, capacity=10) sys.sms[0].route_table[0] = output_store # Create a READ token for cell 5, returning to PE0 return_route = CMToken(target=0, offset=10, ctx=0, data=0) sm_token = SMToken( target=0, addr=5, op=MemOp.READ, flags=None, data=None, ret=return_route, ) sys.inject(sm_token) env.run() # Verify result arrived in output_store assert len(output_store.items) > 0 result = output_store.items[0] assert result.data == 42 # Cell data was read assert result.target == 0 assert result.offset == 10 def test_pe_emits_sm_write(self): """PE emits SMInst that writes to SM.""" env = simpy.Environment() # PE0 with SMInst(WRITE) at offset 0 pe0_iram = { 0: SMInst( op=MemOp.WRITE, sm_id=0, const=5, # cell address ret=None, ) } sys = build_topology( env, [PEConfig(0, pe0_iram)], [SMConfig(0, cell_count=512)], ) # Inject MonadToken with data=42 to PE0 seed_token = MonadToken( target=0, offset=0, ctx=0, data=42, inline=False, ) sys.inject(seed_token) env.run() # Verify SM0's cell 5 is now FULL with data 42 cell = sys.sms[0].cells[5] assert cell.pres == Presence.FULL assert cell.data_l == 42 def test_pe_emits_sm_read_returns_to_pe(self): """PE emits SMInst(READ) which returns result to PE.""" env = simpy.Environment() # Initialize SM0 with FULL cell at address 3 sm_config = SMConfig( sm_id=0, cell_count=512, initial_cells={3: (Presence.FULL, 77)}, ) # PE0 with SMInst(READ) at offset 0 pe0_iram = { 0: SMInst( op=MemOp.READ, sm_id=0, const=3, # cell address ret=Addr(a=20, port=Port.L, pe=1), # return to PE1 ) } sys = build_topology( env, [PEConfig(0, pe0_iram), PEConfig(1, {})], [sm_config], ) # Set up output store for PE1 results output_store = simpy.Store(env, capacity=10) sys.sms[0].route_table[1] = output_store # Inject MonadToken to PE0 def inject(): seed_token = MonadToken( target=0, offset=0, ctx=0, data=0, inline=False, ) yield sys.pes[0].input_store.put(seed_token) env.process(inject()) env.run() # Verify result arrived in output_store assert len(output_store.items) > 0 result = output_store.items[0] assert result.data == 77 # Read cell data assert result.target == 1 assert result.offset == 20 class TestAC43Backpressure: """Test AC4.3: Backpressure blocking""" def test_backpressure_blocks_on_full_store(self): """Delivery process blocks when destination store is full. With process-per-token architecture, the PE spawns async delivery processes. The PE itself doesn't block on delivery (pipelined), but the delivery process blocks when destination store is full. With sufficient time, eventual delivery will complete and populate destination store up to capacity. """ env = simpy.Environment() # PE0 with CONST instruction (emits to destination store) pe0_iram = { 0: ALUInst( op=RoutingOp.CONST, dest_l=Addr(a=0, port=Port.L, pe=1), dest_r=None, const=10, ) } pe0 = ProcessingElement(env, 0, pe0_iram, fifo_capacity=8) # Set up a small destination store to trigger backpressure in delivery dest_store = simpy.Store(env, capacity=2) pe0.route_table[1] = dest_store # Inject 4 tokens to PE0 def inject_tokens(): for i in range(4): token = MonadToken( target=0, offset=0, ctx=0, data=i, inline=False, ) yield pe0.input_store.put(token) env.process(inject_tokens()) # Run simulation with sufficient time for delivery processes env.run(until=100) # All 4 tokens should be processed and delivered (with delivery async) # Destination store should accumulate tokens up to its capacity (2) assert len(dest_store.items) == 2 # PE input_store should be empty (all tokens dequeued and processed) assert len(pe0.input_store.items) == 0 # PE should have emitted all 4 tokens (logged in output_log) # delivery may be blocked on store capacity, but all tokens were processed assert len(pe0.output_log) == 4 def test_pe_unblocks_with_some_tokens(self): """After partial time, some tokens reach destination and store fills.""" env = simpy.Environment() pe0_iram = { 0: ALUInst( op=RoutingOp.CONST, dest_l=Addr(a=0, port=Port.L, pe=1), dest_r=None, const=100, ) } pe0 = ProcessingElement(env, 0, pe0_iram, fifo_capacity=8) # Small destination store dest_store = simpy.Store(env, capacity=2) pe0.route_table[1] = dest_store # Inject 6 tokens def inject_tokens(): for i in range(6): token = MonadToken( target=0, offset=0, ctx=0, data=i, inline=False, ) yield pe0.input_store.put(token) env.process(inject_tokens()) env.run(until=50) # Destination store should be at capacity assert len(dest_store.items) == 2 # fifo_capacity class TestAC44BackpressureRelease: """Test AC4.4: Backpressure release when consumer drains store""" def test_backpressure_releases_with_consumer(self): """When consumer drains destination store, producer unblocks and continues.""" env = simpy.Environment() pe0_iram = { 0: ALUInst( op=RoutingOp.CONST, dest_l=Addr(a=0, port=Port.L, pe=1), dest_r=None, const=42, ) } pe0 = ProcessingElement(env, 0, pe0_iram, fifo_capacity=8) # Small destination store dest_store = simpy.Store(env, capacity=2) pe0.route_table[1] = dest_store # Track consumed tokens consumed = [] # Inject 4 tokens to PE0 def inject_tokens(): for i in range(4): token = MonadToken( target=0, offset=0, ctx=0, data=i, inline=False, ) yield pe0.input_store.put(token) # Consumer process that drains the destination store def consumer(): while True: token = yield dest_store.get() consumed.append(token) env.process(inject_tokens()) env.process(consumer()) env.run() # All 4 injected tokens should have been consumed by the consumer assert len(consumed) == 4 # PE0's input store should be fully drained after all tokens processed assert len(pe0.input_store.items) == 0 def test_multiple_producers_with_consumer(self): """Multiple producers routing to shared destination, consumer drains.""" env = simpy.Environment() # PE0 and PE2 both emit to shared destination pe0_iram = { 0: ALUInst( op=RoutingOp.CONST, dest_l=Addr(a=0, port=Port.L, pe=1), dest_r=None, const=10, ) } pe2_iram = { 0: ALUInst( op=RoutingOp.CONST, dest_l=Addr(a=1, port=Port.L, pe=1), dest_r=None, const=20, ) } pe0 = ProcessingElement(env, 0, pe0_iram, fifo_capacity=8) pe2 = ProcessingElement(env, 2, pe2_iram, fifo_capacity=8) # Shared destination store dest_store = simpy.Store(env, capacity=2) pe0.route_table[1] = dest_store pe2.route_table[1] = dest_store # Inject seeds to both PEs def inject_pe0(): token = MonadToken(target=0, offset=0, ctx=0, data=0, inline=False) yield pe0.input_store.put(token) def inject_pe2(): token = MonadToken(target=2, offset=0, ctx=0, data=2, inline=False) yield pe2.input_store.put(token) # Consumer that drains destination consumed = [] def consumer(): while True: token = yield dest_store.get() consumed.append(token) env.process(inject_pe0()) env.process(inject_pe2()) env.process(consumer()) env.run() assert len(consumed) == 2 assert len(pe0.input_store.items) == 0 assert len(pe2.input_store.items) == 0 class TestNetworkIntegration: """Integration tests for complete network scenarios.""" def test_chain_routing_pe0_to_output(self): """Tokens flow from PE0 through routing.""" env = simpy.Environment() # PE0: PASS to output pe0_iram = { 0: ALUInst( op=RoutingOp.PASS, dest_l=Addr(a=0, port=Port.L, pe=1), dest_r=None, const=None, ) } pe0 = ProcessingElement(env, 0, pe0_iram) # Set up output store output_store = simpy.Store(env, capacity=10) pe0.route_table[1] = output_store # Inject seed def inject(): seed = MonadToken(target=0, offset=0, ctx=0, data=123, inline=False) yield pe0.input_store.put(seed) env.process(inject()) env.run() # Token should arrive at output assert len(output_store.items) > 0 result = output_store.items[0] assert result.data == 123 def test_sm_write_then_read_via_pe(self): """PE writes to SM, then reads back via another operation.""" env = simpy.Environment() # Initialize SM0 with empty cells sm_config = SMConfig(sm_id=0, cell_count=512) # PE0: First write value 88 to cell 10, then read it back # We'll use two separate simulations or a more complex IRAM # For simplicity, do just the write in this test pe0_iram = { 0: SMInst( op=MemOp.WRITE, sm_id=0, const=10, ret=None, ) } sys = build_topology( env, [PEConfig(0, pe0_iram)], [sm_config], ) seed = MonadToken(target=0, offset=0, ctx=0, data=88, inline=False) sys.inject(seed) env.run() # Verify cell 10 is FULL with value 88 cell = sys.sms[0].cells[10] assert cell.pres == Presence.FULL assert cell.data_l == 88 class TestRestrictedTopology: """Test restricted topology via PEConfig allowed routes (AC7.6–AC7.7). Verifies: - AC7.6: build_topology applies route restrictions from PEConfig - AC7.7: PEConfig with None routes preserves full-mesh (backward compatibility) """ def test_ac76_restricted_topology_pe_routes(self): """AC7.6: build_topology restricts PE routes based on allowed_pe_routes.""" env = simpy.Environment() # Create 3 PEs but restrict PE 0 to only route to PE 1 pe0_config = PEConfig(pe_id=0, iram={}, allowed_pe_routes={1}) pe1_config = PEConfig(pe_id=1, iram={}) pe2_config = PEConfig(pe_id=2, iram={}) sys = build_topology(env, [pe0_config, pe1_config, pe2_config], []) # PE 0 should only have PE 1 in its route_table pe0 = sys.pes[0] assert set(pe0.route_table.keys()) == {1} def test_ac76_restricted_topology_sm_routes(self): """AC7.6: build_topology restricts SM routes based on allowed_sm_routes.""" env = simpy.Environment() # Create PE 0 restricted to SM 0 only (not SM 1) pe0_config = PEConfig(pe_id=0, iram={}, allowed_sm_routes={0}) sm0_config = SMConfig(sm_id=0) sm1_config = SMConfig(sm_id=1) sys = build_topology(env, [pe0_config], [sm0_config, sm1_config]) # PE 0 should only have SM 0 in its sm_routes pe0 = sys.pes[0] assert set(pe0.sm_routes.keys()) == {0} def test_ac76_restricted_topology_both_pe_and_sm(self): """AC7.6: build_topology applies both PE and SM route restrictions.""" env = simpy.Environment() # Create PE 0 restricted to PE 1 and SM 0 only pe0_config = PEConfig( pe_id=0, iram={}, allowed_pe_routes={1}, allowed_sm_routes={0} ) pe1_config = PEConfig(pe_id=1, iram={}) pe2_config = PEConfig(pe_id=2, iram={}) sm0_config = SMConfig(sm_id=0) sm1_config = SMConfig(sm_id=1) sys = build_topology( env, [pe0_config, pe1_config, pe2_config], [sm0_config, sm1_config] ) # PE 0 should be restricted in both dimensions pe0 = sys.pes[0] assert set(pe0.route_table.keys()) == {1} assert set(pe0.sm_routes.keys()) == {0} # PE 1 and PE 2 should have full-mesh (no restrictions) pe1 = sys.pes[1] assert set(pe1.route_table.keys()) == {0, 1, 2} assert set(pe1.sm_routes.keys()) == {0, 1} pe2 = sys.pes[2] assert set(pe2.route_table.keys()) == {0, 1, 2} assert set(pe2.sm_routes.keys()) == {0, 1} def test_ac77_none_routes_preserves_full_mesh(self): """AC7.7: PEConfig with None routes preserves full-mesh topology (backward compat).""" env = simpy.Environment() # Create 3 PEs with no route restrictions (None) pe0_config = PEConfig(pe_id=0, iram={}) # allowed_pe_routes=None, allowed_sm_routes=None pe1_config = PEConfig(pe_id=1, iram={}) pe2_config = PEConfig(pe_id=2, iram={}) sm0_config = SMConfig(sm_id=0) sm1_config = SMConfig(sm_id=1) sys = build_topology( env, [pe0_config, pe1_config, pe2_config], [sm0_config, sm1_config] ) # All PEs should have full-mesh routes for pe_id in [0, 1, 2]: pe = sys.pes[pe_id] assert set(pe.route_table.keys()) == {0, 1, 2} assert set(pe.sm_routes.keys()) == {0, 1} def test_ac77_existing_tests_still_pass(self): """AC7.7: Existing test scenarios still work with full-mesh (regression test).""" env = simpy.Environment() # This is the basic test from test_integration.py: CONST feeds ADD pe0_iram = { 0: ALUInst( op=RoutingOp.CONST, dest_l=Addr(a=0, port=Port.L, pe=1), dest_r=None, const=7, ), 1: ALUInst( op=RoutingOp.CONST, dest_l=Addr(a=0, port=Port.R, pe=1), dest_r=None, const=3, ), } pe1_iram = { 0: ALUInst( op=ArithOp.ADD, dest_l=Addr(a=0, port=Port.L, pe=2), dest_r=None, const=None, ), } sys = build_topology( env, [ PEConfig(pe_id=0, iram=pe0_iram), PEConfig(pe_id=1, iram=pe1_iram), PEConfig(pe_id=2, iram={}), ], [], ) # All PEs should have full-mesh routes for pe_id in [0, 1, 2]: pe = sys.pes[pe_id] assert set(pe.route_table.keys()) == {0, 1, 2} # Collector to verify routing works collector_store = simpy.Store(env, capacity=100) sys.pes[1].route_table[2] = collector_store # Inject tokens def injector(): yield sys.pes[0].input_store.put(MonadToken(target=0, offset=0, ctx=0, data=0, inline=False)) yield sys.pes[0].input_store.put(MonadToken(target=0, offset=1, ctx=0, data=0, inline=False)) env.process(injector()) env.run() # Verify result: 7 + 3 = 10 routed to collector assert len(collector_store.items) > 0 result = collector_store.items[0] assert result.data == 10 class TestSystemInjectTokenAPI: """Test System.inject() unified API.""" def test_inject_token_monad(self): """System.inject() can inject MonadToken and PE executes it.""" env = simpy.Environment() # PE0 with PASS instruction routing to PE1 pe0_iram = { 0: ALUInst( op=RoutingOp.PASS, dest_l=Addr(a=0, port=Port.L, pe=1), dest_r=None, const=None, ) } sys = build_topology(env, [PEConfig(0, pe0_iram), PEConfig(1, {})], []) # Inject MonadToken via unified API token = MonadToken(target=0, offset=0, ctx=0, data=0xABCD, inline=False) sys.inject(token) env.run() # PE0 should have executed PASS and emitted the token assert len(sys.pes[0].output_log) >= 1 emitted = [t for t in sys.pes[0].output_log if hasattr(t, 'data') and t.data == 0xABCD] assert len(emitted) == 1 def test_inject_token_dyad(self): """System.inject() can inject DyadToken.""" env = simpy.Environment() # PE0 with ADD instruction pe0_iram = { 0: ALUInst( op=ArithOp.ADD, dest_l=Addr(a=0, port=Port.L, pe=1), dest_r=None, const=None, ) } sys = build_topology(env, [PEConfig(0, pe0_iram), PEConfig(1, {})], []) # Set up output stores output_store = simpy.Store(env, capacity=10) sys.pes[0].route_table[1] = output_store # Create and inject first DyadToken token1 = DyadToken(target=0, offset=0, ctx=0, data=10, port=Port.L, gen=0, wide=False) sys.inject(token1) # Create and inject second DyadToken to fire the instruction token2 = DyadToken(target=0, offset=0, ctx=0, data=20, port=Port.R, gen=0, wide=False) sys.inject(token2) env.run() # Verify ADD result (10 + 20 = 30) assert len(output_store.items) == 1 assert output_store.items[0].data == 30