OR-1 dataflow CPU sketch
1"""Convert IRGraph with StateSnapshot to JSON for the monitor frontend.
2
3Extends dfgraph/graph_json.py with execution state overlay: active, matched,
4executed flags on nodes, and token_flow on edges. Synthesizes SM nodes with
5live state from snapshots and event-driven overlay.
6"""
7
8from __future__ import annotations
9
10import logging
11import math
12from typing import Any
13
14from cm_inst import MemOp
15from asm.ir import (
16 IRGraph, IRNode, IREdge, ResolvedDest,
17 collect_all_nodes_and_edges, collect_all_data_defs,
18)
19from asm.opcodes import OP_TO_MNEMONIC
20from dfgraph.categories import OpcodeCategory, categorise, CATEGORY_COLOURS
21from emu.events import (
22 SimEvent, TokenReceived, Matched, Executed, Emitted,
23 CellWritten, DeferredRead as DeferredReadEvent,
24 DeferredSatisfied, ResultSent,
25)
26from monitor.snapshot import StateSnapshot
27
28logger = logging.getLogger(__name__)
29
30SM_NODE_PREFIX = "__sm_"
31
32
33def _safe_time(value: float) -> float | None:
34 """Convert non-finite floats to None for JSON serialization."""
35 if math.isinf(value) or math.isnan(value):
36 return None
37 return value
38
39
40def _parse_component(component: str) -> tuple[str, int]:
41 """Parse component string format 'pe:0' or 'sm:1' into (kind, id)."""
42 kind, _, id_str = component.partition(":")
43 if not id_str:
44 raise ValueError(f"Invalid component format: {component}")
45 return kind, int(id_str)
46
47
48def _serialise_node(node: IRNode) -> dict[str, Any]:
49 """Serialize an IR node to JSON."""
50 category = categorise(node.opcode)
51 mnemonic = OP_TO_MNEMONIC[node.opcode]
52
53 return {
54 "id": node.name,
55 "opcode": mnemonic,
56 "category": category.value,
57 "colour": CATEGORY_COLOURS[category],
58 "const": node.const,
59 "pe": node.pe,
60 "iram_offset": node.iram_offset,
61 "ctx": node.ctx,
62 "has_error": False,
63 "loc": {
64 "line": 0,
65 "column": 0,
66 "end_line": None,
67 "end_column": None,
68 },
69 "active": False,
70 "matched": False,
71 "executed": False,
72 }
73
74
75def _serialise_edge(edge: IREdge) -> dict[str, Any]:
76 """Serialize an IR edge to JSON."""
77 return {
78 "source": edge.source,
79 "target": edge.dest,
80 "port": edge.port.name,
81 "source_port": edge.source_port.name if edge.source_port else None,
82 "has_error": False,
83 "token_flow": False,
84 }
85
86
87def _serialise_pe_state(pe_id: int, snapshot: StateSnapshot) -> dict[str, Any]:
88 """Serialize PE state from snapshot."""
89 pe_snap = snapshot.pes.get(pe_id)
90 if not pe_snap:
91 return {}
92
93 iram_json = {}
94 for offset, inst in pe_snap.iram.items():
95 opcode_str = str(inst.op.name) if hasattr(inst.op, 'name') else str(inst.op)
96 iram_json[str(offset)] = {
97 "opcode": opcode_str,
98 "offset": offset,
99 }
100
101 matching_store_json = []
102 try:
103 for row in pe_snap.matching_store:
104 row_json = []
105 for entry in row:
106 if isinstance(entry, dict):
107 row_json.append({
108 "occupied": entry.get("occupied", False),
109 "data": entry.get("data", 0),
110 "port": entry.get("port", None),
111 })
112 else:
113 logger.warning(f"Non-dict entry in matching_store: {type(entry).__name__}")
114 matching_store_json.append(row_json)
115 except (TypeError, KeyError) as e:
116 logger.error(f"Error serializing matching_store: {e}")
117
118 return {
119 "iram": iram_json,
120 "matching_store": matching_store_json,
121 "gen_counters": list(pe_snap.gen_counters),
122 "input_queue_depth": len(pe_snap.input_queue),
123 "output_count": len(pe_snap.output_log),
124 }
125
126
127def _serialise_sm_state(sm_id: int, snapshot: StateSnapshot) -> dict[str, Any]:
128 """Serialize SM state from snapshot."""
129 sm_snap = snapshot.sms.get(sm_id)
130 if not sm_snap:
131 return {}
132
133 cells = {}
134 for addr, cell_snap in sm_snap.cells.items():
135 cells[str(addr)] = {
136 "presence": cell_snap.pres.name,
137 "data_l": cell_snap.data_l,
138 "data_r": cell_snap.data_r,
139 }
140
141 deferred_read = None
142 if sm_snap.deferred_read:
143 deferred_read = {
144 "cell_addr": sm_snap.deferred_read["cell_addr"],
145 "return_route": str(sm_snap.deferred_read["return_route"]),
146 }
147
148 return {
149 "cells": cells,
150 "deferred_read": deferred_read,
151 "t0_store_size": len(sm_snap.t0_store),
152 "input_queue_depth": len(sm_snap.input_queue),
153 }
154
155
156def _serialise_event(event: SimEvent) -> dict[str, Any]:
157 """Serialize a SimEvent to JSON."""
158 base = {
159 "type": type(event).__name__,
160 "time": event.time,
161 "component": event.component,
162 }
163
164 if isinstance(event, TokenReceived):
165 base["details"] = {"token": str(event.token)}
166 elif isinstance(event, Matched):
167 base["details"] = {
168 "left": event.left,
169 "right": event.right,
170 "ctx": event.ctx,
171 "offset": event.offset,
172 }
173 elif isinstance(event, Executed):
174 base["details"] = {
175 "op": str(event.op),
176 "result": event.result,
177 "bool_out": event.bool_out,
178 }
179 elif isinstance(event, Emitted):
180 base["details"] = {"token": str(event.token)}
181 else:
182 base["details"] = {}
183
184 return base
185
186
187# ---------------------------------------------------------------------------
188# SM node/edge synthesis
189# ---------------------------------------------------------------------------
190
191
192def _collect_referenced_sm_ids(
193 all_nodes: dict[str, IRNode],
194 ir_graph: IRGraph,
195) -> set[int]:
196 """Collect SM IDs referenced by MemOp nodes or data definitions."""
197 sm_ids: set[int] = set()
198 for node in all_nodes.values():
199 if isinstance(node.opcode, MemOp) and node.sm_id is not None:
200 sm_ids.add(node.sm_id)
201 for data_def in collect_all_data_defs(ir_graph):
202 if data_def.sm_id is not None:
203 sm_ids.add(data_def.sm_id)
204 return sm_ids
205
206
207def _build_sm_node_label(sm_id: int, snapshot: StateSnapshot) -> str:
208 """Build a compact text label for an SM node showing cell contents and deferred reads."""
209 sm_snap = snapshot.sms.get(sm_id)
210 if not sm_snap:
211 return f"SM {sm_id}"
212
213 lines = [f"SM {sm_id}"]
214
215 for addr in sorted(sm_snap.cells.keys()):
216 cell = sm_snap.cells[addr]
217 pres = cell.pres.name
218 if cell.data_l is not None:
219 lines.append(f"[{addr}] {pres} {cell.data_l}")
220 else:
221 lines.append(f"[{addr}] {pres}")
222
223 if sm_snap.deferred_read:
224 dr = sm_snap.deferred_read
225 lines.append(f"DR: [{dr['cell_addr']}]")
226
227 return "\n".join(lines)
228
229
230def _synthesize_sm_nodes(
231 sm_ids: set[int],
232 snapshot: StateSnapshot,
233) -> list[dict[str, Any]]:
234 """Create synthetic monitor graph nodes for each referenced SM instance."""
235 category = OpcodeCategory.STRUCTURE_MEMORY
236 nodes = []
237 for sm_id in sorted(sm_ids):
238 label = _build_sm_node_label(sm_id, snapshot)
239 sm_snap = snapshot.sms.get(sm_id)
240
241 sm_state = None
242 if sm_snap:
243 cells = {}
244 for addr, cell in sm_snap.cells.items():
245 cells[str(addr)] = {
246 "presence": cell.pres.name,
247 "data_l": cell.data_l,
248 "data_r": cell.data_r,
249 }
250 deferred_read = None
251 if sm_snap.deferred_read:
252 deferred_read = {
253 "cell_addr": sm_snap.deferred_read["cell_addr"],
254 "return_route": str(sm_snap.deferred_read["return_route"]),
255 }
256 sm_state = {
257 "cells": cells,
258 "deferred_read": deferred_read,
259 "t0_store_size": len(sm_snap.t0_store),
260 }
261
262 nodes.append({
263 "id": f"{SM_NODE_PREFIX}{sm_id}",
264 "opcode": "sm",
265 "label": label,
266 "category": category.value,
267 "colour": CATEGORY_COLOURS[category],
268 "const": None,
269 "pe": None,
270 "iram_offset": None,
271 "ctx": None,
272 "has_error": False,
273 "loc": {"line": 0, "column": 0, "end_line": None, "end_column": None},
274 "sm_id": sm_id,
275 "synthetic": True,
276 "sm_state": sm_state,
277 # Execution overlay
278 "active": False,
279 "matched": False,
280 "executed": False,
281 "cell_written": False,
282 })
283 return nodes
284
285
286def _synthesize_sm_edges(
287 all_nodes: dict[str, IRNode],
288) -> list[dict[str, Any]]:
289 """Create synthetic edges between MemOp nodes and their target SM nodes."""
290 edges: list[dict[str, Any]] = []
291 for node in all_nodes.values():
292 if not isinstance(node.opcode, MemOp) or node.sm_id is None:
293 continue
294
295 sm_node_id = f"{SM_NODE_PREFIX}{node.sm_id}"
296
297 # Request edge: instruction → SM
298 edges.append({
299 "source": node.name,
300 "target": sm_node_id,
301 "port": "REQ",
302 "source_port": None,
303 "has_error": False,
304 "token_flow": False,
305 "synthetic": True,
306 })
307
308 # Return edge: SM → requesting node (data flows back to the reader)
309 if isinstance(node.dest_l, ResolvedDest):
310 edges.append({
311 "source": sm_node_id,
312 "target": node.name,
313 "port": "RET",
314 "source_port": None,
315 "has_error": False,
316 "token_flow": False,
317 "synthetic": True,
318 })
319
320 return edges
321
322
323def _apply_sm_event_overlay(
324 events: list[SimEvent],
325 node_by_id: dict[str, dict[str, Any]],
326 edge_by_key: dict[tuple[str, str], dict[str, Any]],
327) -> None:
328 """Apply execution overlay from SM events onto synthesized SM nodes and edges."""
329 for event in events:
330 try:
331 kind, comp_id = _parse_component(event.component)
332 except ValueError:
333 continue
334 if kind != "sm":
335 continue
336
337 sm_node_id = f"{SM_NODE_PREFIX}{comp_id}"
338 sm_node = node_by_id.get(sm_node_id)
339 if not sm_node:
340 continue
341
342 if isinstance(event, TokenReceived):
343 sm_node["active"] = True
344
345 elif isinstance(event, CellWritten):
346 sm_node["cell_written"] = True
347
348 elif isinstance(event, (DeferredReadEvent, DeferredSatisfied)):
349 sm_node["active"] = True
350
351 elif isinstance(event, ResultSent):
352 # Mark return edges from this SM as having token flow
353 for key, edge in edge_by_key.items():
354 if key[0] == sm_node_id:
355 edge["token_flow"] = True
356
357
358# ---------------------------------------------------------------------------
359# Public API
360# ---------------------------------------------------------------------------
361
362
363def graph_to_monitor_json(
364 ir_graph: IRGraph,
365 snapshot: StateSnapshot,
366 events: list[SimEvent],
367) -> dict[str, Any]:
368 """Convert IRGraph + StateSnapshot + events to monitor JSON."""
369 all_nodes, all_edges = collect_all_nodes_and_edges(ir_graph)
370
371 # Serialize base nodes and edges
372 nodes_json = [_serialise_node(node) for node in all_nodes.values()]
373 edges_json = [_serialise_edge(edge) for edge in all_edges]
374
375 # Synthesize SM nodes and edges
376 sm_ids = _collect_referenced_sm_ids(all_nodes, ir_graph)
377 sm_nodes = _synthesize_sm_nodes(sm_ids, snapshot)
378 sm_edges = _synthesize_sm_edges(all_nodes)
379 nodes_json.extend(sm_nodes)
380 edges_json.extend(sm_edges)
381
382 # Create lookup dicts for quick updates
383 node_by_id = {n["id"]: n for n in nodes_json}
384 edge_by_key = {(e["source"], e["target"]): e for e in edges_json}
385
386 # Apply PE execution overlay from events
387 for event in events:
388 if isinstance(event, TokenReceived):
389 try:
390 kind, pe_id = _parse_component(event.component)
391 if kind == "pe":
392 for node in all_nodes.values():
393 if node.pe == pe_id:
394 if node.name in node_by_id:
395 node_by_id[node.name]["active"] = True
396 except ValueError:
397 pass
398
399 elif isinstance(event, Matched):
400 try:
401 kind, pe_id = _parse_component(event.component)
402 if kind == "pe":
403 for node in all_nodes.values():
404 if node.pe == pe_id and node.iram_offset == event.offset:
405 if node.name in node_by_id:
406 node_by_id[node.name]["matched"] = True
407 except ValueError:
408 pass
409
410 elif isinstance(event, Executed):
411 try:
412 kind, pe_id = _parse_component(event.component)
413 if kind == "pe":
414 for node in all_nodes.values():
415 if node.pe == pe_id:
416 if node.name in node_by_id:
417 node_by_id[node.name]["executed"] = True
418 except ValueError:
419 pass
420
421 # Mark token flow on edges based on emitted tokens
422 for event in events:
423 if isinstance(event, Emitted):
424 token = event.token
425 if hasattr(token, 'target'):
426 for edge in edges_json:
427 dest_id = edge["target"]
428 dest_node = node_by_id.get(dest_id)
429 if dest_node and dest_node.get("pe") == token.target:
430 edge["token_flow"] = True
431
432 # Apply SM event overlay
433 _apply_sm_event_overlay(events, node_by_id, edge_by_key)
434
435 # Serialize state
436 state_json = {
437 "pes": {
438 str(pe_id): _serialise_pe_state(pe_id, snapshot)
439 for pe_id in snapshot.pes.keys()
440 },
441 "sms": {
442 str(sm_id): _serialise_sm_state(sm_id, snapshot)
443 for sm_id in snapshot.sms.keys()
444 },
445 }
446
447 events_json = [_serialise_event(e) for e in events]
448
449 return {
450 "type": "monitor_update",
451 "graph": {
452 "nodes": nodes_json,
453 "edges": edges_json,
454 "regions": [],
455 },
456 "state": state_json,
457 "events": events_json,
458 "sim_time": snapshot.sim_time,
459 "next_time": _safe_time(snapshot.next_time),
460 "finished": snapshot.next_time == float('inf'),
461 }
462
463
464def graph_loaded_json(ir_graph: IRGraph, snapshot: StateSnapshot) -> dict[str, Any]:
465 """Initial graph load (no events, no execution overlay flags set)."""
466 all_nodes, all_edges = collect_all_nodes_and_edges(ir_graph)
467
468 nodes_json = [_serialise_node(node) for node in all_nodes.values()]
469 edges_json = [_serialise_edge(edge) for edge in all_edges]
470
471 # Synthesize SM nodes and edges
472 sm_ids = _collect_referenced_sm_ids(all_nodes, ir_graph)
473 sm_nodes = _synthesize_sm_nodes(sm_ids, snapshot)
474 sm_edges = _synthesize_sm_edges(all_nodes)
475 nodes_json.extend(sm_nodes)
476 edges_json.extend(sm_edges)
477
478 state_json = {
479 "pes": {
480 str(pe_id): _serialise_pe_state(pe_id, snapshot)
481 for pe_id in snapshot.pes.keys()
482 },
483 "sms": {
484 str(sm_id): _serialise_sm_state(sm_id, snapshot)
485 for sm_id in snapshot.sms.keys()
486 },
487 }
488
489 return {
490 "type": "graph_loaded",
491 "graph": {
492 "nodes": nodes_json,
493 "edges": edges_json,
494 "regions": [],
495 },
496 "state": state_json,
497 "sim_time": snapshot.sim_time,
498 "next_time": _safe_time(snapshot.next_time),
499 "finished": snapshot.next_time == float('inf'),
500 }