OR-1 dataflow CPU sketch
at f0d21b33bcce61de0a2a4fa3ebea15c49be7a8a7 169 lines 5.0 kB view raw
1"""Convert IRGraph to JSON-serialisable structure for the frontend. 2 3Produces a flat graph representation with all nodes, edges, regions, 4errors, and metadata needed for both logical and physical views. 5""" 6 7from __future__ import annotations 8 9from typing import Any, Union 10 11from cm_inst import Addr, Port 12from asm.ir import ( 13 IRGraph, IRNode, IREdge, IRRegion, RegionKind, 14 SourceLoc, NameRef, ResolvedDest, 15 collect_all_nodes_and_edges, 16) 17from asm.errors import AssemblyError 18from asm.opcodes import OP_TO_MNEMONIC 19from dfgraph.pipeline import PipelineResult, PipelineStage 20from dfgraph.categories import categorise, CATEGORY_COLOURS, OpcodeCategory 21 22 23def _serialise_loc(loc: SourceLoc) -> dict[str, Any]: 24 return { 25 "line": loc.line, 26 "column": loc.column, 27 "end_line": loc.end_line, 28 "end_column": loc.end_column, 29 } 30 31 32def _serialise_addr(addr: Addr) -> dict[str, Any]: 33 return { 34 "offset": addr.a, 35 "port": addr.port.name, 36 "pe": addr.pe, 37 } 38 39 40def _serialise_node(node: IRNode, error_node_names: set[str]) -> dict[str, Any]: 41 category = categorise(node.opcode) 42 mnemonic = OP_TO_MNEMONIC[node.opcode] 43 44 result: dict[str, Any] = { 45 "id": node.name, 46 "opcode": mnemonic, 47 "category": category.value, 48 "colour": CATEGORY_COLOURS[category], 49 "const": node.const, 50 "pe": node.pe, 51 "iram_offset": node.iram_offset, 52 "ctx": node.ctx, 53 "has_error": node.name in error_node_names, 54 "loc": _serialise_loc(node.loc), 55 } 56 57 return result 58 59 60def _serialise_edge(edge: IREdge, all_nodes: dict[str, IRNode], 61 error_lines: set[int]) -> dict[str, Any]: 62 result: dict[str, Any] = { 63 "source": edge.source, 64 "target": edge.dest, 65 "port": edge.port.name, 66 "source_port": edge.source_port.name if edge.source_port else None, 67 "has_error": edge.loc.line in error_lines, 68 } 69 70 source_node = all_nodes.get(edge.source) 71 if source_node: 72 if (isinstance(source_node.dest_l, ResolvedDest) 73 and source_node.dest_l.name == edge.dest): 74 result["addr"] = _serialise_addr(source_node.dest_l.addr) 75 elif (isinstance(source_node.dest_r, ResolvedDest) 76 and source_node.dest_r.name == edge.dest): 77 result["addr"] = _serialise_addr(source_node.dest_r.addr) 78 79 return result 80 81 82def _serialise_error(error: AssemblyError) -> dict[str, Any]: 83 return { 84 "line": error.loc.line, 85 "column": error.loc.column, 86 "category": error.category.value, 87 "message": error.message, 88 "suggestions": error.suggestions, 89 } 90 91 92def _serialise_region(region: IRRegion) -> dict[str, Any]: 93 node_ids = list(region.body.nodes.keys()) 94 for sub_region in region.body.regions: 95 node_ids.extend(sub_region.body.nodes.keys()) 96 97 return { 98 "tag": region.tag, 99 "kind": region.kind.value, 100 "node_ids": node_ids, 101 } 102 103 104def _collect_error_node_names(errors: list[AssemblyError], 105 all_nodes: dict[str, IRNode]) -> set[str]: 106 error_lines: set[int] = {e.loc.line for e in errors} 107 return { 108 name for name, node in all_nodes.items() 109 if node.loc.line in error_lines 110 } 111 112 113def graph_to_json(result: PipelineResult) -> dict[str, Any]: 114 if result.graph is None: 115 return { 116 "type": "graph_update", 117 "stage": result.stage.value, 118 "nodes": [], 119 "edges": [], 120 "regions": [], 121 "errors": [], 122 "parse_error": result.parse_error, 123 "metadata": { 124 "stage": result.stage.value, 125 "pe_count": 0, 126 "sm_count": 0, 127 }, 128 } 129 130 graph = result.graph 131 all_nodes, all_edges = collect_all_nodes_and_edges(graph) 132 error_lines: set[int] = {e.loc.line for e in result.errors} 133 error_node_names = _collect_error_node_names(result.errors, all_nodes) 134 135 nodes_json = [ 136 _serialise_node(node, error_node_names) 137 for node in all_nodes.values() 138 ] 139 140 edges_json = [ 141 _serialise_edge(edge, all_nodes, error_lines) 142 for edge in all_edges 143 ] 144 145 regions_json = [] 146 for subgraph_regions in [graph.regions]: 147 for region in subgraph_regions: 148 if region.kind == RegionKind.FUNCTION: 149 regions_json.append(_serialise_region(region)) 150 151 errors_json = [_serialise_error(e) for e in result.errors] 152 153 pe_count = graph.system.pe_count if graph.system else 0 154 sm_count = graph.system.sm_count if graph.system else 0 155 156 return { 157 "type": "graph_update", 158 "stage": result.stage.value, 159 "nodes": nodes_json, 160 "edges": edges_json, 161 "regions": regions_json, 162 "errors": errors_json, 163 "parse_error": None, 164 "metadata": { 165 "stage": result.stage.value, 166 "pe_count": pe_count, 167 "sm_count": sm_count, 168 }, 169 }