"""Convert IRGraph to JSON-serialisable structure for the frontend. Produces a flat graph representation with all nodes, edges, regions, errors, and metadata needed for both logical and physical views. """ from __future__ import annotations from typing import Any from cm_inst import Addr from asm.ir import ( IRNode, IREdge, IRRegion, RegionKind, SourceLoc, ResolvedDest, collect_all_nodes_and_edges, ) from asm.errors import AssemblyError from asm.opcodes import OP_TO_MNEMONIC from dfgraph.pipeline import PipelineResult from dfgraph.categories import categorise, CATEGORY_COLOURS def _serialise_loc(loc: SourceLoc) -> dict[str, Any]: return { "line": loc.line, "column": loc.column, "end_line": loc.end_line, "end_column": loc.end_column, } def _serialise_addr(addr: Addr) -> dict[str, Any]: return { "offset": addr.a, "port": addr.port.name, "pe": addr.pe, } def _serialise_node(node: IRNode, error_node_names: set[str]) -> dict[str, Any]: category = categorise(node.opcode) mnemonic = OP_TO_MNEMONIC[node.opcode] result: dict[str, Any] = { "id": node.name, "opcode": mnemonic, "category": category.value, "colour": CATEGORY_COLOURS[category], "const": node.const, "pe": node.pe, "iram_offset": node.iram_offset, "ctx": node.ctx, "has_error": node.name in error_node_names, "loc": _serialise_loc(node.loc), } return result def _serialise_edge(edge: IREdge, all_nodes: dict[str, IRNode], error_lines: set[int]) -> dict[str, Any]: result: dict[str, Any] = { "source": edge.source, "target": edge.dest, "port": edge.port.name, "source_port": edge.source_port.name if edge.source_port else None, "has_error": edge.loc.line in error_lines, } source_node = all_nodes.get(edge.source) if source_node: if (isinstance(source_node.dest_l, ResolvedDest) and source_node.dest_l.name == edge.dest): result["addr"] = _serialise_addr(source_node.dest_l.addr) elif (isinstance(source_node.dest_r, ResolvedDest) and source_node.dest_r.name == edge.dest): result["addr"] = _serialise_addr(source_node.dest_r.addr) return result def _serialise_error(error: AssemblyError) -> dict[str, Any]: return { "line": error.loc.line, "column": error.loc.column, "category": error.category.value, "message": error.message, "suggestions": error.suggestions, } def _serialise_region(region: IRRegion) -> dict[str, Any]: node_ids = list(region.body.nodes.keys()) for sub_region in region.body.regions: node_ids.extend(sub_region.body.nodes.keys()) return { "tag": region.tag, "kind": region.kind.value, "node_ids": node_ids, } def _collect_error_node_names(errors: list[AssemblyError], all_nodes: dict[str, IRNode]) -> set[str]: error_lines: set[int] = {e.loc.line for e in errors} return { name for name, node in all_nodes.items() if node.loc.line in error_lines } def graph_to_json(result: PipelineResult) -> dict[str, Any]: if result.graph is None: return { "type": "graph_update", "stage": result.stage.value, "nodes": [], "edges": [], "regions": [], "errors": [], "parse_error": result.parse_error, "metadata": { "stage": result.stage.value, "pe_count": 0, "sm_count": 0, }, } graph = result.graph all_nodes, all_edges = collect_all_nodes_and_edges(graph) error_lines: set[int] = {e.loc.line for e in result.errors} error_node_names = _collect_error_node_names(result.errors, all_nodes) nodes_json = [ _serialise_node(node, error_node_names) for node in all_nodes.values() ] edges_json = [ _serialise_edge(edge, all_nodes, error_lines) for edge in all_edges ] regions_json = [] for region in graph.regions: if region.kind == RegionKind.FUNCTION: regions_json.append(_serialise_region(region)) errors_json = [_serialise_error(e) for e in result.errors] pe_count = graph.system.pe_count if graph.system else 0 sm_count = graph.system.sm_count if graph.system else 0 return { "type": "graph_update", "stage": result.stage.value, "nodes": nodes_json, "edges": edges_json, "regions": regions_json, "errors": errors_json, "parse_error": None, "metadata": { "stage": result.stage.value, "pe_count": pe_count, "sm_count": sm_count, }, }