OR-1 dataflow CPU sketch
at a8ec18c41dee4e0e31246e4d5becf68b6e828e90 500 lines 16 kB view raw
1"""Convert IRGraph with StateSnapshot to JSON for the monitor frontend. 2 3Extends dfgraph/graph_json.py with execution state overlay: active, matched, 4executed flags on nodes, and token_flow on edges. Synthesizes SM nodes with 5live state from snapshots and event-driven overlay. 6""" 7 8from __future__ import annotations 9 10import logging 11import math 12from typing import Any 13 14from cm_inst import MemOp 15from asm.ir import ( 16 IRGraph, IRNode, IREdge, ResolvedDest, 17 collect_all_nodes_and_edges, collect_all_data_defs, 18) 19from asm.opcodes import OP_TO_MNEMONIC 20from dfgraph.categories import OpcodeCategory, categorise, CATEGORY_COLOURS 21from emu.events import ( 22 SimEvent, TokenReceived, Matched, Executed, Emitted, 23 CellWritten, DeferredRead as DeferredReadEvent, 24 DeferredSatisfied, ResultSent, 25) 26from monitor.snapshot import StateSnapshot 27 28logger = logging.getLogger(__name__) 29 30SM_NODE_PREFIX = "__sm_" 31 32 33def _safe_time(value: float) -> float | None: 34 """Convert non-finite floats to None for JSON serialization.""" 35 if math.isinf(value) or math.isnan(value): 36 return None 37 return value 38 39 40def _parse_component(component: str) -> tuple[str, int]: 41 """Parse component string format 'pe:0' or 'sm:1' into (kind, id).""" 42 kind, _, id_str = component.partition(":") 43 if not id_str: 44 raise ValueError(f"Invalid component format: {component}") 45 return kind, int(id_str) 46 47 48def _serialise_node(node: IRNode) -> dict[str, Any]: 49 """Serialize an IR node to JSON.""" 50 category = categorise(node.opcode) 51 mnemonic = OP_TO_MNEMONIC[node.opcode] 52 53 return { 54 "id": node.name, 55 "opcode": mnemonic, 56 "category": category.value, 57 "colour": CATEGORY_COLOURS[category], 58 "const": node.const, 59 "pe": node.pe, 60 "iram_offset": node.iram_offset, 61 "ctx": node.ctx, 62 "has_error": False, 63 "loc": { 64 "line": 0, 65 "column": 0, 66 "end_line": None, 67 "end_column": None, 68 }, 69 "active": False, 70 "matched": False, 71 "executed": False, 72 } 73 74 75def _serialise_edge(edge: IREdge) -> dict[str, Any]: 76 """Serialize an IR edge to JSON.""" 77 return { 78 "source": edge.source, 79 "target": edge.dest, 80 "port": edge.port.name, 81 "source_port": edge.source_port.name if edge.source_port else None, 82 "has_error": False, 83 "token_flow": False, 84 } 85 86 87def _serialise_pe_state(pe_id: int, snapshot: StateSnapshot) -> dict[str, Any]: 88 """Serialize PE state from snapshot.""" 89 pe_snap = snapshot.pes.get(pe_id) 90 if not pe_snap: 91 return {} 92 93 iram_json = {} 94 for offset, inst in pe_snap.iram.items(): 95 opcode_str = str(inst.op.name) if hasattr(inst.op, 'name') else str(inst.op) 96 iram_json[str(offset)] = { 97 "opcode": opcode_str, 98 "offset": offset, 99 } 100 101 matching_store_json = [] 102 try: 103 for row in pe_snap.matching_store: 104 row_json = [] 105 for entry in row: 106 if isinstance(entry, dict): 107 row_json.append({ 108 "occupied": entry.get("occupied", False), 109 "data": entry.get("data", 0), 110 "port": entry.get("port", None), 111 }) 112 else: 113 logger.warning(f"Non-dict entry in matching_store: {type(entry).__name__}") 114 matching_store_json.append(row_json) 115 except (TypeError, KeyError) as e: 116 logger.error(f"Error serializing matching_store: {e}") 117 118 return { 119 "iram": iram_json, 120 "matching_store": matching_store_json, 121 "gen_counters": list(pe_snap.gen_counters), 122 "input_queue_depth": len(pe_snap.input_queue), 123 "output_count": len(pe_snap.output_log), 124 } 125 126 127def _serialise_sm_state(sm_id: int, snapshot: StateSnapshot) -> dict[str, Any]: 128 """Serialize SM state from snapshot.""" 129 sm_snap = snapshot.sms.get(sm_id) 130 if not sm_snap: 131 return {} 132 133 cells = {} 134 for addr, cell_snap in sm_snap.cells.items(): 135 cells[str(addr)] = { 136 "presence": cell_snap.pres.name, 137 "data_l": cell_snap.data_l, 138 "data_r": cell_snap.data_r, 139 } 140 141 deferred_read = None 142 if sm_snap.deferred_read: 143 deferred_read = { 144 "cell_addr": sm_snap.deferred_read["cell_addr"], 145 "return_route": str(sm_snap.deferred_read["return_route"]), 146 } 147 148 return { 149 "cells": cells, 150 "deferred_read": deferred_read, 151 "t0_store_size": len(sm_snap.t0_store), 152 "input_queue_depth": len(sm_snap.input_queue), 153 } 154 155 156def _serialise_event(event: SimEvent) -> dict[str, Any]: 157 """Serialize a SimEvent to JSON.""" 158 base = { 159 "type": type(event).__name__, 160 "time": event.time, 161 "component": event.component, 162 } 163 164 if isinstance(event, TokenReceived): 165 base["details"] = {"token": str(event.token)} 166 elif isinstance(event, Matched): 167 base["details"] = { 168 "left": event.left, 169 "right": event.right, 170 "ctx": event.ctx, 171 "offset": event.offset, 172 } 173 elif isinstance(event, Executed): 174 base["details"] = { 175 "op": str(event.op), 176 "result": event.result, 177 "bool_out": event.bool_out, 178 } 179 elif isinstance(event, Emitted): 180 base["details"] = {"token": str(event.token)} 181 else: 182 base["details"] = {} 183 184 return base 185 186 187# --------------------------------------------------------------------------- 188# SM node/edge synthesis 189# --------------------------------------------------------------------------- 190 191 192def _collect_referenced_sm_ids( 193 all_nodes: dict[str, IRNode], 194 ir_graph: IRGraph, 195) -> set[int]: 196 """Collect SM IDs referenced by MemOp nodes or data definitions.""" 197 sm_ids: set[int] = set() 198 for node in all_nodes.values(): 199 if isinstance(node.opcode, MemOp) and node.sm_id is not None: 200 sm_ids.add(node.sm_id) 201 for data_def in collect_all_data_defs(ir_graph): 202 if data_def.sm_id is not None: 203 sm_ids.add(data_def.sm_id) 204 return sm_ids 205 206 207def _build_sm_node_label(sm_id: int, snapshot: StateSnapshot) -> str: 208 """Build a compact text label for an SM node showing cell contents and deferred reads.""" 209 sm_snap = snapshot.sms.get(sm_id) 210 if not sm_snap: 211 return f"SM {sm_id}" 212 213 lines = [f"SM {sm_id}"] 214 215 for addr in sorted(sm_snap.cells.keys()): 216 cell = sm_snap.cells[addr] 217 pres = cell.pres.name 218 if cell.data_l is not None: 219 lines.append(f"[{addr}] {pres} {cell.data_l}") 220 else: 221 lines.append(f"[{addr}] {pres}") 222 223 if sm_snap.deferred_read: 224 dr = sm_snap.deferred_read 225 lines.append(f"DR: [{dr['cell_addr']}]") 226 227 return "\n".join(lines) 228 229 230def _synthesize_sm_nodes( 231 sm_ids: set[int], 232 snapshot: StateSnapshot, 233) -> list[dict[str, Any]]: 234 """Create synthetic monitor graph nodes for each referenced SM instance.""" 235 category = OpcodeCategory.STRUCTURE_MEMORY 236 nodes = [] 237 for sm_id in sorted(sm_ids): 238 label = _build_sm_node_label(sm_id, snapshot) 239 sm_snap = snapshot.sms.get(sm_id) 240 241 sm_state = None 242 if sm_snap: 243 cells = {} 244 for addr, cell in sm_snap.cells.items(): 245 cells[str(addr)] = { 246 "presence": cell.pres.name, 247 "data_l": cell.data_l, 248 "data_r": cell.data_r, 249 } 250 deferred_read = None 251 if sm_snap.deferred_read: 252 deferred_read = { 253 "cell_addr": sm_snap.deferred_read["cell_addr"], 254 "return_route": str(sm_snap.deferred_read["return_route"]), 255 } 256 sm_state = { 257 "cells": cells, 258 "deferred_read": deferred_read, 259 "t0_store_size": len(sm_snap.t0_store), 260 } 261 262 nodes.append({ 263 "id": f"{SM_NODE_PREFIX}{sm_id}", 264 "opcode": "sm", 265 "label": label, 266 "category": category.value, 267 "colour": CATEGORY_COLOURS[category], 268 "const": None, 269 "pe": None, 270 "iram_offset": None, 271 "ctx": None, 272 "has_error": False, 273 "loc": {"line": 0, "column": 0, "end_line": None, "end_column": None}, 274 "sm_id": sm_id, 275 "synthetic": True, 276 "sm_state": sm_state, 277 # Execution overlay 278 "active": False, 279 "matched": False, 280 "executed": False, 281 "cell_written": False, 282 }) 283 return nodes 284 285 286def _synthesize_sm_edges( 287 all_nodes: dict[str, IRNode], 288) -> list[dict[str, Any]]: 289 """Create synthetic edges between MemOp nodes and their target SM nodes.""" 290 edges: list[dict[str, Any]] = [] 291 for node in all_nodes.values(): 292 if not isinstance(node.opcode, MemOp) or node.sm_id is None: 293 continue 294 295 sm_node_id = f"{SM_NODE_PREFIX}{node.sm_id}" 296 297 # Request edge: instruction → SM 298 edges.append({ 299 "source": node.name, 300 "target": sm_node_id, 301 "port": "REQ", 302 "source_port": None, 303 "has_error": False, 304 "token_flow": False, 305 "synthetic": True, 306 }) 307 308 # Return edge: SM → requesting node (data flows back to the reader) 309 if isinstance(node.dest_l, ResolvedDest): 310 edges.append({ 311 "source": sm_node_id, 312 "target": node.name, 313 "port": "RET", 314 "source_port": None, 315 "has_error": False, 316 "token_flow": False, 317 "synthetic": True, 318 }) 319 320 return edges 321 322 323def _apply_sm_event_overlay( 324 events: list[SimEvent], 325 node_by_id: dict[str, dict[str, Any]], 326 edge_by_key: dict[tuple[str, str], dict[str, Any]], 327) -> None: 328 """Apply execution overlay from SM events onto synthesized SM nodes and edges.""" 329 for event in events: 330 try: 331 kind, comp_id = _parse_component(event.component) 332 except ValueError: 333 continue 334 if kind != "sm": 335 continue 336 337 sm_node_id = f"{SM_NODE_PREFIX}{comp_id}" 338 sm_node = node_by_id.get(sm_node_id) 339 if not sm_node: 340 continue 341 342 if isinstance(event, TokenReceived): 343 sm_node["active"] = True 344 345 elif isinstance(event, CellWritten): 346 sm_node["cell_written"] = True 347 348 elif isinstance(event, (DeferredReadEvent, DeferredSatisfied)): 349 sm_node["active"] = True 350 351 elif isinstance(event, ResultSent): 352 # Mark return edges from this SM as having token flow 353 for key, edge in edge_by_key.items(): 354 if key[0] == sm_node_id: 355 edge["token_flow"] = True 356 357 358# --------------------------------------------------------------------------- 359# Public API 360# --------------------------------------------------------------------------- 361 362 363def graph_to_monitor_json( 364 ir_graph: IRGraph, 365 snapshot: StateSnapshot, 366 events: list[SimEvent], 367) -> dict[str, Any]: 368 """Convert IRGraph + StateSnapshot + events to monitor JSON.""" 369 all_nodes, all_edges = collect_all_nodes_and_edges(ir_graph) 370 371 # Serialize base nodes and edges 372 nodes_json = [_serialise_node(node) for node in all_nodes.values()] 373 edges_json = [_serialise_edge(edge) for edge in all_edges] 374 375 # Synthesize SM nodes and edges 376 sm_ids = _collect_referenced_sm_ids(all_nodes, ir_graph) 377 sm_nodes = _synthesize_sm_nodes(sm_ids, snapshot) 378 sm_edges = _synthesize_sm_edges(all_nodes) 379 nodes_json.extend(sm_nodes) 380 edges_json.extend(sm_edges) 381 382 # Create lookup dicts for quick updates 383 node_by_id = {n["id"]: n for n in nodes_json} 384 edge_by_key = {(e["source"], e["target"]): e for e in edges_json} 385 386 # Apply PE execution overlay from events 387 for event in events: 388 if isinstance(event, TokenReceived): 389 try: 390 kind, pe_id = _parse_component(event.component) 391 if kind == "pe": 392 for node in all_nodes.values(): 393 if node.pe == pe_id: 394 if node.name in node_by_id: 395 node_by_id[node.name]["active"] = True 396 except ValueError: 397 pass 398 399 elif isinstance(event, Matched): 400 try: 401 kind, pe_id = _parse_component(event.component) 402 if kind == "pe": 403 for node in all_nodes.values(): 404 if node.pe == pe_id and node.iram_offset == event.offset: 405 if node.name in node_by_id: 406 node_by_id[node.name]["matched"] = True 407 except ValueError: 408 pass 409 410 elif isinstance(event, Executed): 411 try: 412 kind, pe_id = _parse_component(event.component) 413 if kind == "pe": 414 for node in all_nodes.values(): 415 if node.pe == pe_id: 416 if node.name in node_by_id: 417 node_by_id[node.name]["executed"] = True 418 except ValueError: 419 pass 420 421 # Mark token flow on edges based on emitted tokens 422 for event in events: 423 if isinstance(event, Emitted): 424 token = event.token 425 if hasattr(token, 'target'): 426 for edge in edges_json: 427 dest_id = edge["target"] 428 dest_node = node_by_id.get(dest_id) 429 if dest_node and dest_node.get("pe") == token.target: 430 edge["token_flow"] = True 431 432 # Apply SM event overlay 433 _apply_sm_event_overlay(events, node_by_id, edge_by_key) 434 435 # Serialize state 436 state_json = { 437 "pes": { 438 str(pe_id): _serialise_pe_state(pe_id, snapshot) 439 for pe_id in snapshot.pes.keys() 440 }, 441 "sms": { 442 str(sm_id): _serialise_sm_state(sm_id, snapshot) 443 for sm_id in snapshot.sms.keys() 444 }, 445 } 446 447 events_json = [_serialise_event(e) for e in events] 448 449 return { 450 "type": "monitor_update", 451 "graph": { 452 "nodes": nodes_json, 453 "edges": edges_json, 454 "regions": [], 455 }, 456 "state": state_json, 457 "events": events_json, 458 "sim_time": snapshot.sim_time, 459 "next_time": _safe_time(snapshot.next_time), 460 "finished": snapshot.next_time == float('inf'), 461 } 462 463 464def graph_loaded_json(ir_graph: IRGraph, snapshot: StateSnapshot) -> dict[str, Any]: 465 """Initial graph load (no events, no execution overlay flags set).""" 466 all_nodes, all_edges = collect_all_nodes_and_edges(ir_graph) 467 468 nodes_json = [_serialise_node(node) for node in all_nodes.values()] 469 edges_json = [_serialise_edge(edge) for edge in all_edges] 470 471 # Synthesize SM nodes and edges 472 sm_ids = _collect_referenced_sm_ids(all_nodes, ir_graph) 473 sm_nodes = _synthesize_sm_nodes(sm_ids, snapshot) 474 sm_edges = _synthesize_sm_edges(all_nodes) 475 nodes_json.extend(sm_nodes) 476 edges_json.extend(sm_edges) 477 478 state_json = { 479 "pes": { 480 str(pe_id): _serialise_pe_state(pe_id, snapshot) 481 for pe_id in snapshot.pes.keys() 482 }, 483 "sms": { 484 str(sm_id): _serialise_sm_state(sm_id, snapshot) 485 for sm_id in snapshot.sms.keys() 486 }, 487 } 488 489 return { 490 "type": "graph_loaded", 491 "graph": { 492 "nodes": nodes_json, 493 "edges": edges_json, 494 "regions": [], 495 }, 496 "state": state_json, 497 "sim_time": snapshot.sim_time, 498 "next_time": _safe_time(snapshot.next_time), 499 "finished": snapshot.next_time == float('inf'), 500 }