OR-1 dataflow CPU sketch
1"""Convert IRGraph to JSON-serialisable structure for the frontend.
2
3Produces a flat graph representation with all nodes, edges, regions,
4errors, and metadata needed for both logical and physical views.
5"""
6
7from __future__ import annotations
8
9from typing import Any
10
11from cm_inst import Addr
12from asm.ir import (
13 IRNode, IREdge, IRRegion, RegionKind,
14 SourceLoc, ResolvedDest,
15 collect_all_nodes_and_edges,
16)
17from asm.errors import AssemblyError
18from asm.opcodes import OP_TO_MNEMONIC
19from dfgraph.pipeline import PipelineResult
20from dfgraph.categories import categorise, CATEGORY_COLOURS
21
22
23def _serialise_loc(loc: SourceLoc) -> dict[str, Any]:
24 return {
25 "line": loc.line,
26 "column": loc.column,
27 "end_line": loc.end_line,
28 "end_column": loc.end_column,
29 }
30
31
32def _serialise_addr(addr: Addr) -> dict[str, Any]:
33 return {
34 "offset": addr.a,
35 "port": addr.port.name,
36 "pe": addr.pe,
37 }
38
39
40def _serialise_node(node: IRNode, error_node_names: set[str]) -> dict[str, Any]:
41 category = categorise(node.opcode)
42 mnemonic = OP_TO_MNEMONIC[node.opcode]
43
44 result: dict[str, Any] = {
45 "id": node.name,
46 "opcode": mnemonic,
47 "category": category.value,
48 "colour": CATEGORY_COLOURS[category],
49 "const": node.const,
50 "pe": node.pe,
51 "iram_offset": node.iram_offset,
52 "ctx": node.ctx,
53 "has_error": node.name in error_node_names,
54 "loc": _serialise_loc(node.loc),
55 }
56
57 return result
58
59
60def _serialise_edge(edge: IREdge, all_nodes: dict[str, IRNode],
61 error_lines: set[int]) -> dict[str, Any]:
62 result: dict[str, Any] = {
63 "source": edge.source,
64 "target": edge.dest,
65 "port": edge.port.name,
66 "source_port": edge.source_port.name if edge.source_port else None,
67 "has_error": edge.loc.line in error_lines,
68 }
69
70 source_node = all_nodes.get(edge.source)
71 if source_node:
72 if (isinstance(source_node.dest_l, ResolvedDest)
73 and source_node.dest_l.name == edge.dest):
74 result["addr"] = _serialise_addr(source_node.dest_l.addr)
75 elif (isinstance(source_node.dest_r, ResolvedDest)
76 and source_node.dest_r.name == edge.dest):
77 result["addr"] = _serialise_addr(source_node.dest_r.addr)
78
79 return result
80
81
82def _serialise_error(error: AssemblyError) -> dict[str, Any]:
83 return {
84 "line": error.loc.line,
85 "column": error.loc.column,
86 "category": error.category.value,
87 "message": error.message,
88 "suggestions": error.suggestions,
89 }
90
91
92def _serialise_region(region: IRRegion) -> dict[str, Any]:
93 node_ids = list(region.body.nodes.keys())
94 for sub_region in region.body.regions:
95 node_ids.extend(sub_region.body.nodes.keys())
96
97 return {
98 "tag": region.tag,
99 "kind": region.kind.value,
100 "node_ids": node_ids,
101 }
102
103
104def _collect_error_node_names(errors: list[AssemblyError],
105 all_nodes: dict[str, IRNode]) -> set[str]:
106 error_lines: set[int] = {e.loc.line for e in errors}
107 return {
108 name for name, node in all_nodes.items()
109 if node.loc.line in error_lines
110 }
111
112
113def graph_to_json(result: PipelineResult) -> dict[str, Any]:
114 if result.graph is None:
115 return {
116 "type": "graph_update",
117 "stage": result.stage.value,
118 "nodes": [],
119 "edges": [],
120 "regions": [],
121 "errors": [],
122 "parse_error": result.parse_error,
123 "metadata": {
124 "stage": result.stage.value,
125 "pe_count": 0,
126 "sm_count": 0,
127 },
128 }
129
130 graph = result.graph
131 all_nodes, all_edges = collect_all_nodes_and_edges(graph)
132 error_lines: set[int] = {e.loc.line for e in result.errors}
133 error_node_names = _collect_error_node_names(result.errors, all_nodes)
134
135 nodes_json = [
136 _serialise_node(node, error_node_names)
137 for node in all_nodes.values()
138 ]
139
140 edges_json = [
141 _serialise_edge(edge, all_nodes, error_lines)
142 for edge in all_edges
143 ]
144
145 regions_json = []
146 for region in graph.regions:
147 if region.kind == RegionKind.FUNCTION:
148 regions_json.append(_serialise_region(region))
149
150 errors_json = [_serialise_error(e) for e in result.errors]
151
152 pe_count = graph.system.pe_count if graph.system else 0
153 sm_count = graph.system.sm_count if graph.system else 0
154
155 return {
156 "type": "graph_update",
157 "stage": result.stage.value,
158 "nodes": nodes_json,
159 "edges": edges_json,
160 "regions": regions_json,
161 "errors": errors_json,
162 "parse_error": None,
163 "metadata": {
164 "stage": result.stage.value,
165 "pe_count": pe_count,
166 "sm_count": sm_count,
167 },
168 }