"""Code generation for OR1 assembly. Converts fully allocated IRGraphs to emulator-ready configuration objects and token streams. Two output modes: 1. Direct mode: Produces PEConfig/SMConfig lists + seed tokens (for direct setup) 2. Token stream mode: Produces bootstrap sequence (SM init → IRAM writes → seeds) Reference: Phase 6 design doc, Tasks 1-2. """ from dataclasses import dataclass from collections import defaultdict from asm.ir import ( IRGraph, IRNode, IREdge, ResolvedDest, collect_all_nodes_and_edges, collect_all_data_defs, DEFAULT_IRAM_CAPACITY, DEFAULT_CTX_SLOTS ) from asm.opcodes import is_dyadic from cm_inst import ALUInst, MemOp, RoutingOp, SMInst from emu.types import PEConfig, SMConfig from tokens import IRAMWriteToken, MonadToken, SMToken from sm_mod import Presence @dataclass(frozen=True) class AssemblyResult: """Result of code generation in direct mode. Attributes: pe_configs: List of PEConfig objects, one per PE sm_configs: List of SMConfig objects, one per SM with data_defs seed_tokens: List of MonadTokens for const nodes with no incoming edges """ pe_configs: list[PEConfig] sm_configs: list[SMConfig] seed_tokens: list[MonadToken] def _build_iram_for_pe( nodes_on_pe: list[IRNode], all_nodes: dict[str, IRNode], ) -> dict[int, ALUInst | SMInst]: """Build IRAM instruction dict for a single PE. Args: nodes_on_pe: List of IRNodes on this PE all_nodes: All nodes in graph (for lookups) Returns: Dict mapping IRAM offset to ALUInst or SMInst """ iram = {} for node in nodes_on_pe: if node.iram_offset is None: # Node not allocated, skip continue if isinstance(node.opcode, MemOp): # Memory operation -> SMInst ret_addr = node.dest_l.addr if isinstance(node.dest_l, ResolvedDest) else None ret_dyadic = False if isinstance(node.dest_l, ResolvedDest): dest_node = all_nodes.get(node.dest_l.name) if dest_node is not None: ret_dyadic = is_dyadic(dest_node.opcode, dest_node.const) inst = SMInst( op=node.opcode, sm_id=node.sm_id, const=node.const, ret=ret_addr, ret_dyadic=ret_dyadic, ) else: # ALU operation -> ALUInst # Extract Addr from ResolvedDest or keep None dest_l_addr = None dest_r_addr = None if node.dest_l is not None and isinstance(node.dest_l, ResolvedDest): dest_l_addr = node.dest_l.addr if node.dest_r is not None and isinstance(node.dest_r, ResolvedDest): dest_r_addr = node.dest_r.addr inst = ALUInst( op=node.opcode, dest_l=dest_l_addr, dest_r=dest_r_addr, const=node.const, ) iram[node.iram_offset] = inst return iram def _compute_route_restrictions( nodes_by_pe: dict[int, list[IRNode]], all_edges: list[IREdge], all_nodes: dict[str, IRNode], pe_id: int, ) -> tuple[set[int], set[int]]: """Compute allowed PE and SM routes for a given PE. Analyzes all edges involving nodes on this PE to determine which other PEs and SMs it can route to. Includes self-routes. Args: nodes_by_pe: Dict mapping PE ID to list of nodes on that PE all_edges: List of all edges in graph all_nodes: Dict of all nodes pe_id: The PE we're computing routes for Returns: Tuple of (allowed_pe_routes set, allowed_sm_routes set) """ nodes_on_pe_set = {node.name for node in nodes_by_pe.get(pe_id, [])} pe_routes = {pe_id} # Always include self-route sm_routes = set() # Scan all edges for those sourced from this PE for edge in all_edges: if edge.source in nodes_on_pe_set: # This edge originates from our PE dest_node = all_nodes.get(edge.dest) if dest_node is not None: if dest_node.pe is not None: pe_routes.add(dest_node.pe) # Scan all nodes on this PE for SM instructions for node in nodes_by_pe.get(pe_id, []): if isinstance(node.opcode, MemOp) and node.sm_id is not None: sm_routes.add(node.sm_id) return pe_routes, sm_routes def generate_direct(graph: IRGraph) -> AssemblyResult: """Generate PEConfig, SMConfig, and seed tokens from an allocated IRGraph. Args: graph: A fully allocated IRGraph (after allocate pass) Returns: AssemblyResult with pe_configs, sm_configs, and seed_tokens """ all_nodes, all_edges = collect_all_nodes_and_edges(graph) all_data_defs = collect_all_data_defs(graph) # Group nodes by PE nodes_by_pe: dict[int, list[IRNode]] = defaultdict(list) for node in all_nodes.values(): if node.pe is not None: nodes_by_pe[node.pe].append(node) # Build PEConfigs pe_configs = [] for pe_id in sorted(nodes_by_pe.keys()): nodes_on_pe = nodes_by_pe[pe_id] # Build IRAM for this PE iram = _build_iram_for_pe(nodes_on_pe, all_nodes) # Compute route restrictions allowed_pe_routes, allowed_sm_routes = _compute_route_restrictions( nodes_by_pe, all_edges, all_nodes, pe_id ) # Create PEConfig config = PEConfig( pe_id=pe_id, iram=iram, ctx_slots=graph.system.ctx_slots if graph.system else DEFAULT_CTX_SLOTS, offsets=graph.system.iram_capacity if graph.system else DEFAULT_IRAM_CAPACITY, allowed_pe_routes=allowed_pe_routes, allowed_sm_routes=allowed_sm_routes, ) pe_configs.append(config) # Build SMConfigs from data_defs sm_configs_by_id: dict[int, dict[int, tuple[Presence, int]]] = defaultdict(dict) for data_def in all_data_defs: if data_def.sm_id is not None and data_def.cell_addr is not None: sm_configs_by_id[data_def.sm_id][data_def.cell_addr] = ( Presence.FULL, data_def.value ) sm_configs = [] for sm_id in sorted(sm_configs_by_id.keys()): initial_cells = sm_configs_by_id[sm_id] config = SMConfig( sm_id=sm_id, initial_cells=initial_cells if initial_cells else None, ) sm_configs.append(config) # Detect seed tokens: CONST nodes with no incoming edges seed_tokens = [] # Build index of edges by destination edges_by_dest = defaultdict(list) for edge in all_edges: edges_by_dest[edge.dest].append(edge) for node in all_nodes.values(): # Check if this is a CONST node if node.opcode == RoutingOp.CONST: # Check if it has no incoming edges if node.name not in edges_by_dest: # This is a seed token token = MonadToken( target=node.pe if node.pe is not None else 0, offset=node.iram_offset if node.iram_offset is not None else 0, ctx=node.ctx if node.ctx is not None else 0, data=node.const if node.const is not None else 0, inline=False, ) seed_tokens.append(token) return AssemblyResult( pe_configs=pe_configs, sm_configs=sm_configs, seed_tokens=seed_tokens, ) def generate_tokens(graph: IRGraph) -> list: """Generate bootstrap token sequence from an allocated IRGraph. Produces tokens in order: SM init → IRAM writes → seeds Args: graph: A fully allocated IRGraph (after allocate pass) Returns: List of tokens (SMToken, IRAMWriteToken, MonadToken) in bootstrap order """ # Use direct mode to get configs and seeds result = generate_direct(graph) tokens = [] # 1. SM init tokens all_data_defs = collect_all_data_defs(graph) for data_def in all_data_defs: if data_def.sm_id is not None and data_def.cell_addr is not None: token = SMToken( target=data_def.sm_id, addr=data_def.cell_addr, op=MemOp.WRITE, flags=None, data=data_def.value, ret=None, ) tokens.append(token) # 2. IRAM write tokens for pe_config in result.pe_configs: offsets = sorted(pe_config.iram.keys()) iram_instructions = [pe_config.iram[offset] for offset in offsets] token = IRAMWriteToken( target=pe_config.pe_id, offset=0, ctx=0, data=0, instructions=tuple(iram_instructions), ) tokens.append(token) # 3. Seed tokens tokens.extend(result.seed_tokens) return tokens