OR-1 dataflow CPU sketch
at a8ec18c41dee4e0e31246e4d5becf68b6e828e90 384 lines 13 kB view raw
1"""Simulation backend with command/result protocol and threading support. 2 3The SimulationBackend runs a dedicated thread that owns the SimPy environment. 4Commands arrive via queue.Queue, results are returned via another queue.Queue. 5 6Key features: 7- Thread-safe command/result protocol 8- Event collection via on_event callbacks 9- Tick-level and event-level stepping 10- Token injection and simulation control 11- State snapshots at any point 12""" 13 14from __future__ import annotations 15 16import logging 17import queue 18import threading 19from dataclasses import replace 20from typing import Optional 21 22import simpy 23 24from asm import run_pipeline 25from asm.codegen import generate_direct 26from asm.ir import IRGraph 27from emu.events import EventCallback, SimEvent 28from emu.network import System, build_topology 29from emu.types import PEConfig, SMConfig 30from monitor.commands import ( 31 ErrorResult, GraphLoaded, InjectCmd, LoadCmd, ResetCmd, RunUntilCmd, 32 SendCmd, SimCommand, StepEventCmd, StepResult, StepTickCmd, StopCmd, 33) 34from monitor.snapshot import StateSnapshot, capture 35from tokens import Token 36 37logger = logging.getLogger(__name__) 38 39 40class SimulationBackend: 41 """Manages a SimPy simulation environment in a dedicated thread. 42 43 Commands are sent via send_command() and processed in a dedicated thread. 44 Results are returned synchronously via queue communication. 45 46 Attributes: 47 _cmd_queue: Queue for incoming commands from caller 48 _result_queue: Queue for outgoing results to caller 49 _thread: The dedicated simulation thread 50 _env: Current SimPy environment (None if not loaded) 51 _system: Current System instance (None if not loaded) 52 _ir_graph: Current IRGraph (None if not loaded) 53 _last_source: Last loaded dfasm source (for reload on ResetCmd) 54 _events: Thread-local event buffer for collecting SimEvent objects 55 """ 56 57 def __init__(self): 58 self._cmd_queue: queue.Queue[SimCommand] = queue.Queue() 59 self._result_queue: queue.Queue = queue.Queue() 60 self._thread: Optional[threading.Thread] = None 61 self._env: Optional[simpy.Environment] = None 62 self._system: Optional[System] = None 63 self._ir_graph: Optional[IRGraph] = None 64 self._last_source: Optional[str] = None 65 self._events: list[SimEvent] = [] 66 67 def start(self): 68 """Start the background simulation thread. 69 70 Must be called before send_command(). 71 """ 72 self._thread = threading.Thread(target=self._run_loop, daemon=True) 73 self._thread.start() 74 75 def stop(self): 76 """Stop the background simulation thread gracefully. 77 78 Sends StopCmd and waits for thread to exit (timeout 5 seconds). 79 """ 80 self._cmd_queue.put(StopCmd()) 81 if self._thread is not None: 82 self._thread.join(timeout=5.0) 83 84 def send_command(self, cmd: SimCommand, timeout: float | None = None): 85 """Send a command to the backend and wait for result. 86 87 Args: 88 cmd: SimCommand instance (LoadCmd, StepTickCmd, etc.) 89 timeout: Optional timeout in seconds for result retrieval 90 91 Returns: 92 Result dataclass (GraphLoaded, StepResult, or ErrorResult) 93 94 Raises: 95 queue.Empty: If timeout expires before result arrives 96 """ 97 self._cmd_queue.put(cmd) 98 return self._result_queue.get(timeout=timeout) 99 100 def _on_event(self, event: SimEvent): 101 """Event callback registered with all PEs and SMs. 102 103 Appends events to the thread-local buffer for collection during steps. 104 105 Args: 106 event: SimEvent from a PE or SM 107 """ 108 self._events.append(event) 109 110 def _run_loop(self): 111 """Main loop of the simulation thread. 112 113 Processes commands from _cmd_queue until StopCmd is received. 114 Exceptions are caught and wrapped in ErrorResult. 115 """ 116 while True: 117 cmd = self._cmd_queue.get() 118 if isinstance(cmd, StopCmd): 119 break 120 try: 121 result = self._dispatch(cmd) 122 except Exception as e: 123 logger.exception("Backend error processing %s", type(cmd).__name__) 124 result = ErrorResult(message=str(e)) 125 self._result_queue.put(result) 126 127 def _dispatch(self, cmd: SimCommand): 128 """Dispatch command to appropriate handler. 129 130 Args: 131 cmd: Command instance 132 133 Returns: 134 Result dataclass 135 """ 136 match cmd: 137 case LoadCmd(source=source): 138 return self._handle_load(source) 139 case StepTickCmd(): 140 return self._handle_step_tick() 141 case StepEventCmd(): 142 return self._handle_step_event() 143 case RunUntilCmd(until=until): 144 return self._handle_run_until(until) 145 case InjectCmd(token=token): 146 return self._handle_inject(token) 147 case SendCmd(token=token): 148 return self._handle_send(token) 149 case ResetCmd(reload=reload): 150 return self._handle_reset(reload) 151 case _: 152 return ErrorResult(message=f"unknown command type: {type(cmd).__name__}") 153 154 def _handle_load(self, source: str) -> GraphLoaded | ErrorResult: 155 """Load a dfasm program and set up the simulation. 156 157 Runs the assembly pipeline, generates direct-mode configuration, 158 builds the topology, and injects seed tokens. 159 160 Args: 161 source: dfasm source code as a string 162 163 Returns: 164 GraphLoaded on success, ErrorResult on failure 165 166 Acceptance criteria: 167 - or1-monitor.AC1.1: Valid program → GraphLoaded with IR and snapshot 168 - or1-monitor.AC1.2: Callbacks wired into all PEs and SMs 169 - or1-monitor.AC1.3: Invalid program → ErrorResult, backend still functional 170 """ 171 try: 172 ir_graph = run_pipeline(source) 173 except ValueError as e: 174 return ErrorResult(message=str(e)) 175 176 try: 177 result = generate_direct(ir_graph) 178 env = simpy.Environment() 179 180 # Wire on_event callback into all PE and SM configs 181 pe_configs = [replace(cfg, on_event=self._on_event) for cfg in result.pe_configs] 182 sm_configs = [replace(cfg, on_event=self._on_event) for cfg in result.sm_configs] 183 184 system = build_topology(env, pe_configs, sm_configs) 185 186 # Inject seed tokens 187 for seed in result.seed_tokens: 188 system.inject(seed) 189 except Exception as e: 190 return ErrorResult(message=str(e)) 191 192 # Commit state atomically — only after everything succeeds 193 self._events.clear() 194 self._env = env 195 self._system = system 196 self._last_source = source 197 self._ir_graph = ir_graph 198 199 snapshot = capture(self._system) 200 return GraphLoaded(ir_graph=ir_graph, snapshot=snapshot) 201 202 def _handle_step_tick(self) -> StepResult: 203 """Step the simulation by one tick (all events at current simulation time). 204 205 Loops env.step() while env.peek() == env.now to process all events 206 at the current simulation time before returning. 207 208 Returns: 209 StepResult with events, snapshot, sim_time, and finished flag 210 211 Acceptance criteria: 212 - or1-monitor.AC5.2: Processes all events at current time before returning 213 - or1-monitor.AC5.5: Result contains events and snapshot 214 - or1-monitor.AC5.6: Finished simulation handled without error 215 """ 216 if self._env is None or self._system is None: 217 return StepResult(finished=True) 218 219 self._events.clear() 220 221 if self._env.peek() == float('inf'): 222 return StepResult( 223 snapshot=capture(self._system), 224 sim_time=self._env.now, 225 finished=True, 226 ) 227 228 current_time = self._env.peek() 229 while self._env.peek() == current_time: 230 self._env.step() 231 232 return StepResult( 233 events=tuple(self._events), 234 snapshot=capture(self._system), 235 sim_time=self._env.now, 236 finished=self._env.peek() == float('inf'), 237 ) 238 239 def _handle_step_event(self) -> StepResult: 240 """Step the simulation by exactly one event. 241 242 Calls env.step() once and returns the result. 243 244 Returns: 245 StepResult with events, snapshot, sim_time, and finished flag 246 247 Acceptance criteria: 248 - or1-monitor.AC5.3: Processes exactly one env.step() 249 - or1-monitor.AC5.5: Result contains events and snapshot 250 """ 251 if self._env is None or self._system is None: 252 return StepResult(finished=True) 253 254 self._events.clear() 255 256 if self._env.peek() == float('inf'): 257 return StepResult( 258 snapshot=capture(self._system), 259 sim_time=self._env.now, 260 finished=True, 261 ) 262 263 self._env.step() 264 265 return StepResult( 266 events=tuple(self._events), 267 snapshot=capture(self._system), 268 sim_time=self._env.now, 269 finished=self._env.peek() == float('inf'), 270 ) 271 272 def _handle_run_until(self, until: float) -> StepResult: 273 """Run the simulation continuously until reaching a target simulation time. 274 275 Batches events per tick to avoid flooding the result. Loops while 276 env.peek() <= until, stepping all events at each time point. 277 278 Args: 279 until: Target simulation time 280 281 Returns: 282 StepResult with all accumulated events, final snapshot, and finished flag 283 284 Acceptance criteria: 285 - or1-monitor.AC5.4: Batches events per tick 286 - or1-monitor.AC5.5: Result contains events and snapshot 287 """ 288 if self._env is None or self._system is None: 289 return StepResult(finished=True) 290 291 self._events.clear() 292 all_events: list[SimEvent] = [] 293 294 while self._env.peek() <= until and self._env.peek() != float('inf'): 295 current_time = self._env.peek() 296 self._events.clear() 297 while self._env.peek() == current_time: 298 self._env.step() 299 all_events.extend(self._events) 300 301 return StepResult( 302 events=tuple(all_events), 303 snapshot=capture(self._system), 304 sim_time=self._env.now, 305 finished=self._env.peek() == float('inf'), 306 ) 307 308 def _handle_inject(self, token: Token) -> StepResult: 309 """Inject a token directly into the simulation (no backpressure). 310 311 Args: 312 token: Token to inject 313 314 Returns: 315 StepResult with snapshot and current state 316 """ 317 if self._system is None: 318 return StepResult(finished=True) 319 320 self._events.clear() 321 self._system.inject(token) 322 323 return StepResult( 324 events=tuple(self._events), 325 snapshot=capture(self._system), 326 sim_time=self._env.now if self._env else 0.0, 327 finished=self._env.peek() == float('inf') if self._env else True, 328 ) 329 330 def _handle_send(self, token: Token) -> StepResult: 331 """Send a token via SimPy store.put() (respects backpressure). 332 333 Creates a one-shot SimPy process and steps once to allow processing. 334 335 Args: 336 token: Token to send 337 338 Returns: 339 StepResult with events, snapshot, and state 340 """ 341 if self._env is None or self._system is None: 342 return StepResult(finished=True) 343 344 self._events.clear() 345 346 def _sender(): 347 yield from self._system.send(token) 348 349 self._env.process(_sender()) 350 # Step once to process the send event 351 if self._env.peek() != float('inf'): 352 self._env.step() 353 354 return StepResult( 355 events=tuple(self._events), 356 snapshot=capture(self._system), 357 sim_time=self._env.now, 358 finished=self._env.peek() == float('inf'), 359 ) 360 361 def _handle_reset(self, reload: bool) -> StepResult | GraphLoaded | ErrorResult: 362 """Reset the simulation (tear down current topology). 363 364 Clears all state. If reload=True, reloads the last program. 365 366 Args: 367 reload: If True, reload the last program after reset 368 369 Returns: 370 StepResult if reload=False, GraphLoaded if reload=True, or ErrorResult 371 372 Acceptance criteria: 373 - or1-monitor.AC1.4: Reset tears down and leaves ready for new LoadCmd 374 - or1-monitor.AC1.5: Reset with reload=True reloads program 375 """ 376 self._env = None 377 self._system = None 378 self._events.clear() 379 self._ir_graph = None 380 381 if reload and self._last_source is not None: 382 return self._handle_load(self._last_source) 383 384 return StepResult(sim_time=0.0, finished=True)