tangled
alpha
login
or
join now
nonbinary.computer
/
or1-design
0
fork
atom
OR-1 dataflow CPU sketch
0
fork
atom
overview
issues
pulls
pipelines
feat: add IR-to-JSON conversion for dfgraph
Orual
2 weeks ago
5df3bf85
bf838867
+169
1 changed file
expand all
collapse all
unified
split
dfgraph
graph_json.py
+169
dfgraph/graph_json.py
···
1
1
+
"""Convert IRGraph to JSON-serialisable structure for the frontend.
2
2
+
3
3
+
Produces a flat graph representation with all nodes, edges, regions,
4
4
+
errors, and metadata needed for both logical and physical views.
5
5
+
"""
6
6
+
7
7
+
from __future__ import annotations
8
8
+
9
9
+
from typing import Any, Union
10
10
+
11
11
+
from cm_inst import Addr, Port
12
12
+
from asm.ir import (
13
13
+
IRGraph, IRNode, IREdge, IRRegion, RegionKind,
14
14
+
SourceLoc, NameRef, ResolvedDest,
15
15
+
collect_all_nodes_and_edges,
16
16
+
)
17
17
+
from asm.errors import AssemblyError
18
18
+
from asm.opcodes import OP_TO_MNEMONIC
19
19
+
from dfgraph.pipeline import PipelineResult, PipelineStage
20
20
+
from dfgraph.categories import categorise, CATEGORY_COLOURS, OpcodeCategory
21
21
+
22
22
+
23
23
+
def _serialise_loc(loc: SourceLoc) -> dict[str, Any]:
24
24
+
return {
25
25
+
"line": loc.line,
26
26
+
"column": loc.column,
27
27
+
"end_line": loc.end_line,
28
28
+
"end_column": loc.end_column,
29
29
+
}
30
30
+
31
31
+
32
32
+
def _serialise_addr(addr: Addr) -> dict[str, Any]:
33
33
+
return {
34
34
+
"offset": addr.a,
35
35
+
"port": addr.port.name,
36
36
+
"pe": addr.pe,
37
37
+
}
38
38
+
39
39
+
40
40
+
def _serialise_node(node: IRNode, error_node_names: set[str]) -> dict[str, Any]:
41
41
+
category = categorise(node.opcode)
42
42
+
mnemonic = OP_TO_MNEMONIC[node.opcode]
43
43
+
44
44
+
result: dict[str, Any] = {
45
45
+
"id": node.name,
46
46
+
"opcode": mnemonic,
47
47
+
"category": category.value,
48
48
+
"colour": CATEGORY_COLOURS[category],
49
49
+
"const": node.const,
50
50
+
"pe": node.pe,
51
51
+
"iram_offset": node.iram_offset,
52
52
+
"ctx": node.ctx,
53
53
+
"has_error": node.name in error_node_names,
54
54
+
"loc": _serialise_loc(node.loc),
55
55
+
}
56
56
+
57
57
+
return result
58
58
+
59
59
+
60
60
+
def _serialise_edge(edge: IREdge, all_nodes: dict[str, IRNode],
61
61
+
error_lines: set[int]) -> dict[str, Any]:
62
62
+
result: dict[str, Any] = {
63
63
+
"source": edge.source,
64
64
+
"target": edge.dest,
65
65
+
"port": edge.port.name,
66
66
+
"source_port": edge.source_port.name if edge.source_port else None,
67
67
+
"has_error": edge.loc.line in error_lines,
68
68
+
}
69
69
+
70
70
+
source_node = all_nodes.get(edge.source)
71
71
+
if source_node:
72
72
+
if (isinstance(source_node.dest_l, ResolvedDest)
73
73
+
and source_node.dest_l.name == edge.dest):
74
74
+
result["addr"] = _serialise_addr(source_node.dest_l.addr)
75
75
+
elif (isinstance(source_node.dest_r, ResolvedDest)
76
76
+
and source_node.dest_r.name == edge.dest):
77
77
+
result["addr"] = _serialise_addr(source_node.dest_r.addr)
78
78
+
79
79
+
return result
80
80
+
81
81
+
82
82
+
def _serialise_error(error: AssemblyError) -> dict[str, Any]:
83
83
+
return {
84
84
+
"line": error.loc.line,
85
85
+
"column": error.loc.column,
86
86
+
"category": error.category.value,
87
87
+
"message": error.message,
88
88
+
"suggestions": error.suggestions,
89
89
+
}
90
90
+
91
91
+
92
92
+
def _serialise_region(region: IRRegion) -> dict[str, Any]:
93
93
+
node_ids = list(region.body.nodes.keys())
94
94
+
for sub_region in region.body.regions:
95
95
+
node_ids.extend(sub_region.body.nodes.keys())
96
96
+
97
97
+
return {
98
98
+
"tag": region.tag,
99
99
+
"kind": region.kind.value,
100
100
+
"node_ids": node_ids,
101
101
+
}
102
102
+
103
103
+
104
104
+
def _collect_error_node_names(errors: list[AssemblyError],
105
105
+
all_nodes: dict[str, IRNode]) -> set[str]:
106
106
+
error_lines: set[int] = {e.loc.line for e in errors}
107
107
+
return {
108
108
+
name for name, node in all_nodes.items()
109
109
+
if node.loc.line in error_lines
110
110
+
}
111
111
+
112
112
+
113
113
+
def graph_to_json(result: PipelineResult) -> dict[str, Any]:
114
114
+
if result.graph is None:
115
115
+
return {
116
116
+
"type": "graph_update",
117
117
+
"stage": result.stage.value,
118
118
+
"nodes": [],
119
119
+
"edges": [],
120
120
+
"regions": [],
121
121
+
"errors": [],
122
122
+
"parse_error": result.parse_error,
123
123
+
"metadata": {
124
124
+
"stage": result.stage.value,
125
125
+
"pe_count": 0,
126
126
+
"sm_count": 0,
127
127
+
},
128
128
+
}
129
129
+
130
130
+
graph = result.graph
131
131
+
all_nodes, all_edges = collect_all_nodes_and_edges(graph)
132
132
+
error_lines: set[int] = {e.loc.line for e in result.errors}
133
133
+
error_node_names = _collect_error_node_names(result.errors, all_nodes)
134
134
+
135
135
+
nodes_json = [
136
136
+
_serialise_node(node, error_node_names)
137
137
+
for node in all_nodes.values()
138
138
+
]
139
139
+
140
140
+
edges_json = [
141
141
+
_serialise_edge(edge, all_nodes, error_lines)
142
142
+
for edge in all_edges
143
143
+
]
144
144
+
145
145
+
regions_json = []
146
146
+
for subgraph_regions in [graph.regions]:
147
147
+
for region in subgraph_regions:
148
148
+
if region.kind == RegionKind.FUNCTION:
149
149
+
regions_json.append(_serialise_region(region))
150
150
+
151
151
+
errors_json = [_serialise_error(e) for e in result.errors]
152
152
+
153
153
+
pe_count = graph.system.pe_count if graph.system else 0
154
154
+
sm_count = graph.system.sm_count if graph.system else 0
155
155
+
156
156
+
return {
157
157
+
"type": "graph_update",
158
158
+
"stage": result.stage.value,
159
159
+
"nodes": nodes_json,
160
160
+
"edges": edges_json,
161
161
+
"regions": regions_json,
162
162
+
"errors": errors_json,
163
163
+
"parse_error": None,
164
164
+
"metadata": {
165
165
+
"stage": result.stage.value,
166
166
+
"pe_count": pe_count,
167
167
+
"sm_count": sm_count,
168
168
+
},
169
169
+
}