OR-1 dataflow CPU sketch
1"""Simulation backend with command/result protocol and threading support.
2
3The SimulationBackend runs a dedicated thread that owns the SimPy environment.
4Commands arrive via queue.Queue, results are returned via another queue.Queue.
5
6Key features:
7- Thread-safe command/result protocol
8- Event collection via on_event callbacks
9- Tick-level and event-level stepping
10- Token injection and simulation control
11- State snapshots at any point
12"""
13
14from __future__ import annotations
15
16import logging
17import queue
18import threading
19from dataclasses import replace
20from typing import Optional
21
22import simpy
23
24from asm import run_pipeline
25from asm.codegen import generate_direct
26from asm.ir import IRGraph
27from emu.events import EventCallback, SimEvent
28from emu.network import System, build_topology
29from emu.types import PEConfig, SMConfig
30from monitor.commands import (
31 ErrorResult, GraphLoaded, InjectCmd, LoadCmd, ResetCmd, RunUntilCmd,
32 SendCmd, SimCommand, StepEventCmd, StepResult, StepTickCmd, StopCmd,
33)
34from monitor.snapshot import StateSnapshot, capture
35from tokens import Token
36
37logger = logging.getLogger(__name__)
38
39
40class SimulationBackend:
41 """Manages a SimPy simulation environment in a dedicated thread.
42
43 Commands are sent via send_command() and processed in a dedicated thread.
44 Results are returned synchronously via queue communication.
45
46 Attributes:
47 _cmd_queue: Queue for incoming commands from caller
48 _result_queue: Queue for outgoing results to caller
49 _thread: The dedicated simulation thread
50 _env: Current SimPy environment (None if not loaded)
51 _system: Current System instance (None if not loaded)
52 _ir_graph: Current IRGraph (None if not loaded)
53 _last_source: Last loaded dfasm source (for reload on ResetCmd)
54 _events: Thread-local event buffer for collecting SimEvent objects
55 """
56
57 def __init__(self):
58 self._cmd_queue: queue.Queue[SimCommand] = queue.Queue()
59 self._result_queue: queue.Queue = queue.Queue()
60 self._thread: Optional[threading.Thread] = None
61 self._env: Optional[simpy.Environment] = None
62 self._system: Optional[System] = None
63 self._ir_graph: Optional[IRGraph] = None
64 self._last_source: Optional[str] = None
65 self._events: list[SimEvent] = []
66
67 def start(self):
68 """Start the background simulation thread.
69
70 Must be called before send_command().
71 """
72 self._thread = threading.Thread(target=self._run_loop, daemon=True)
73 self._thread.start()
74
75 def stop(self):
76 """Stop the background simulation thread gracefully.
77
78 Sends StopCmd and waits for thread to exit (timeout 5 seconds).
79 """
80 self._cmd_queue.put(StopCmd())
81 if self._thread is not None:
82 self._thread.join(timeout=5.0)
83
84 def send_command(self, cmd: SimCommand, timeout: float | None = None):
85 """Send a command to the backend and wait for result.
86
87 Args:
88 cmd: SimCommand instance (LoadCmd, StepTickCmd, etc.)
89 timeout: Optional timeout in seconds for result retrieval
90
91 Returns:
92 Result dataclass (GraphLoaded, StepResult, or ErrorResult)
93
94 Raises:
95 queue.Empty: If timeout expires before result arrives
96 """
97 self._cmd_queue.put(cmd)
98 return self._result_queue.get(timeout=timeout)
99
100 def _on_event(self, event: SimEvent):
101 """Event callback registered with all PEs and SMs.
102
103 Appends events to the thread-local buffer for collection during steps.
104
105 Args:
106 event: SimEvent from a PE or SM
107 """
108 self._events.append(event)
109
110 def _run_loop(self):
111 """Main loop of the simulation thread.
112
113 Processes commands from _cmd_queue until StopCmd is received.
114 Exceptions are caught and wrapped in ErrorResult.
115 """
116 while True:
117 cmd = self._cmd_queue.get()
118 if isinstance(cmd, StopCmd):
119 break
120 try:
121 result = self._dispatch(cmd)
122 except Exception as e:
123 logger.exception("Backend error processing %s", type(cmd).__name__)
124 result = ErrorResult(message=str(e))
125 self._result_queue.put(result)
126
127 def _dispatch(self, cmd: SimCommand):
128 """Dispatch command to appropriate handler.
129
130 Args:
131 cmd: Command instance
132
133 Returns:
134 Result dataclass
135 """
136 match cmd:
137 case LoadCmd(source=source):
138 return self._handle_load(source)
139 case StepTickCmd():
140 return self._handle_step_tick()
141 case StepEventCmd():
142 return self._handle_step_event()
143 case RunUntilCmd(until=until):
144 return self._handle_run_until(until)
145 case InjectCmd(token=token):
146 return self._handle_inject(token)
147 case SendCmd(token=token):
148 return self._handle_send(token)
149 case ResetCmd(reload=reload):
150 return self._handle_reset(reload)
151 case _:
152 return ErrorResult(message=f"unknown command type: {type(cmd).__name__}")
153
154 def _handle_load(self, source: str) -> GraphLoaded | ErrorResult:
155 """Load a dfasm program and set up the simulation.
156
157 Runs the assembly pipeline, generates direct-mode configuration,
158 builds the topology, and injects seed tokens.
159
160 Args:
161 source: dfasm source code as a string
162
163 Returns:
164 GraphLoaded on success, ErrorResult on failure
165
166 Acceptance criteria:
167 - or1-monitor.AC1.1: Valid program → GraphLoaded with IR and snapshot
168 - or1-monitor.AC1.2: Callbacks wired into all PEs and SMs
169 - or1-monitor.AC1.3: Invalid program → ErrorResult, backend still functional
170 """
171 try:
172 ir_graph = run_pipeline(source)
173 except ValueError as e:
174 return ErrorResult(message=str(e))
175
176 try:
177 result = generate_direct(ir_graph)
178 env = simpy.Environment()
179
180 # Wire on_event callback into all PE and SM configs
181 pe_configs = [replace(cfg, on_event=self._on_event) for cfg in result.pe_configs]
182 sm_configs = [replace(cfg, on_event=self._on_event) for cfg in result.sm_configs]
183
184 system = build_topology(env, pe_configs, sm_configs)
185
186 # Inject seed tokens
187 for seed in result.seed_tokens:
188 system.inject(seed)
189 except Exception as e:
190 return ErrorResult(message=str(e))
191
192 # Commit state atomically — only after everything succeeds
193 self._events.clear()
194 self._env = env
195 self._system = system
196 self._last_source = source
197 self._ir_graph = ir_graph
198
199 snapshot = capture(self._system)
200 return GraphLoaded(ir_graph=ir_graph, snapshot=snapshot)
201
202 def _handle_step_tick(self) -> StepResult:
203 """Step the simulation by one tick (all events at current simulation time).
204
205 Loops env.step() while env.peek() == env.now to process all events
206 at the current simulation time before returning.
207
208 Returns:
209 StepResult with events, snapshot, sim_time, and finished flag
210
211 Acceptance criteria:
212 - or1-monitor.AC5.2: Processes all events at current time before returning
213 - or1-monitor.AC5.5: Result contains events and snapshot
214 - or1-monitor.AC5.6: Finished simulation handled without error
215 """
216 if self._env is None or self._system is None:
217 return StepResult(finished=True)
218
219 self._events.clear()
220
221 if self._env.peek() == float('inf'):
222 return StepResult(
223 snapshot=capture(self._system),
224 sim_time=self._env.now,
225 finished=True,
226 )
227
228 current_time = self._env.peek()
229 while self._env.peek() == current_time:
230 self._env.step()
231
232 return StepResult(
233 events=tuple(self._events),
234 snapshot=capture(self._system),
235 sim_time=self._env.now,
236 finished=self._env.peek() == float('inf'),
237 )
238
239 def _handle_step_event(self) -> StepResult:
240 """Step the simulation by exactly one event.
241
242 Calls env.step() once and returns the result.
243
244 Returns:
245 StepResult with events, snapshot, sim_time, and finished flag
246
247 Acceptance criteria:
248 - or1-monitor.AC5.3: Processes exactly one env.step()
249 - or1-monitor.AC5.5: Result contains events and snapshot
250 """
251 if self._env is None or self._system is None:
252 return StepResult(finished=True)
253
254 self._events.clear()
255
256 if self._env.peek() == float('inf'):
257 return StepResult(
258 snapshot=capture(self._system),
259 sim_time=self._env.now,
260 finished=True,
261 )
262
263 self._env.step()
264
265 return StepResult(
266 events=tuple(self._events),
267 snapshot=capture(self._system),
268 sim_time=self._env.now,
269 finished=self._env.peek() == float('inf'),
270 )
271
272 def _handle_run_until(self, until: float) -> StepResult:
273 """Run the simulation continuously until reaching a target simulation time.
274
275 Batches events per tick to avoid flooding the result. Loops while
276 env.peek() <= until, stepping all events at each time point.
277
278 Args:
279 until: Target simulation time
280
281 Returns:
282 StepResult with all accumulated events, final snapshot, and finished flag
283
284 Acceptance criteria:
285 - or1-monitor.AC5.4: Batches events per tick
286 - or1-monitor.AC5.5: Result contains events and snapshot
287 """
288 if self._env is None or self._system is None:
289 return StepResult(finished=True)
290
291 self._events.clear()
292 all_events: list[SimEvent] = []
293
294 while self._env.peek() <= until and self._env.peek() != float('inf'):
295 current_time = self._env.peek()
296 self._events.clear()
297 while self._env.peek() == current_time:
298 self._env.step()
299 all_events.extend(self._events)
300
301 return StepResult(
302 events=tuple(all_events),
303 snapshot=capture(self._system),
304 sim_time=self._env.now,
305 finished=self._env.peek() == float('inf'),
306 )
307
308 def _handle_inject(self, token: Token) -> StepResult:
309 """Inject a token directly into the simulation (no backpressure).
310
311 Args:
312 token: Token to inject
313
314 Returns:
315 StepResult with snapshot and current state
316 """
317 if self._system is None:
318 return StepResult(finished=True)
319
320 self._events.clear()
321 self._system.inject(token)
322
323 return StepResult(
324 events=tuple(self._events),
325 snapshot=capture(self._system),
326 sim_time=self._env.now if self._env else 0.0,
327 finished=self._env.peek() == float('inf') if self._env else True,
328 )
329
330 def _handle_send(self, token: Token) -> StepResult:
331 """Send a token via SimPy store.put() (respects backpressure).
332
333 Creates a one-shot SimPy process and steps once to allow processing.
334
335 Args:
336 token: Token to send
337
338 Returns:
339 StepResult with events, snapshot, and state
340 """
341 if self._env is None or self._system is None:
342 return StepResult(finished=True)
343
344 self._events.clear()
345
346 def _sender():
347 yield from self._system.send(token)
348
349 self._env.process(_sender())
350 # Step once to process the send event
351 if self._env.peek() != float('inf'):
352 self._env.step()
353
354 return StepResult(
355 events=tuple(self._events),
356 snapshot=capture(self._system),
357 sim_time=self._env.now,
358 finished=self._env.peek() == float('inf'),
359 )
360
361 def _handle_reset(self, reload: bool) -> StepResult | GraphLoaded | ErrorResult:
362 """Reset the simulation (tear down current topology).
363
364 Clears all state. If reload=True, reloads the last program.
365
366 Args:
367 reload: If True, reload the last program after reset
368
369 Returns:
370 StepResult if reload=False, GraphLoaded if reload=True, or ErrorResult
371
372 Acceptance criteria:
373 - or1-monitor.AC1.4: Reset tears down and leaves ready for new LoadCmd
374 - or1-monitor.AC1.5: Reset with reload=True reloads program
375 """
376 self._env = None
377 self._system = None
378 self._events.clear()
379 self._ir_graph = None
380
381 if reload and self._last_source is not None:
382 return self._handle_load(self._last_source)
383
384 return StepResult(sim_time=0.0, finished=True)