OR-1 dataflow CPU sketch
1"""FastAPI server for the dataflow graph renderer.
2
3Serves the frontend static files and pushes graph updates over WebSocket
4when the source dfasm file changes.
5"""
6
7from __future__ import annotations
8
9import asyncio
10import json
11import os
12import threading
13import time
14from pathlib import Path
15from typing import Optional
16
17from contextlib import asynccontextmanager
18
19from fastapi import FastAPI, WebSocket, WebSocketDisconnect
20from fastapi.staticfiles import StaticFiles
21from watchdog.observers import Observer
22from watchdog.events import FileSystemEventHandler
23
24from dfgraph.pipeline import run_progressive, PipelineResult
25from dfgraph.graph_json import graph_to_json
26
27
28class ConnectionManager:
29 def __init__(self) -> None:
30 self.active_connections: list[WebSocket] = []
31
32 async def connect(self, websocket: WebSocket) -> None:
33 await websocket.accept()
34 self.active_connections.append(websocket)
35
36 def disconnect(self, websocket: WebSocket) -> None:
37 self.active_connections.remove(websocket)
38
39 async def broadcast(self, message: dict) -> None:
40 disconnected: list[WebSocket] = []
41 for connection in self.active_connections:
42 try:
43 await connection.send_json(message)
44 except Exception:
45 disconnected.append(connection)
46 for conn in disconnected:
47 self.active_connections.remove(conn)
48
49
50class DebouncedFileHandler(FileSystemEventHandler):
51 def __init__(self, target_path: str, callback, debounce_s: float = 0.3) -> None:
52 self.target_path = os.path.realpath(target_path)
53 self.callback = callback
54 self.debounce_s = debounce_s
55 self._timer: Optional[threading.Timer] = None
56
57 def on_modified(self, event) -> None:
58 if event.is_directory:
59 return
60 if os.path.realpath(event.src_path) != self.target_path:
61 return
62 if self._timer is not None:
63 self._timer.cancel()
64 self._timer = threading.Timer(self.debounce_s, self.callback)
65 self._timer.daemon = True
66 self._timer.start()
67
68
69def create_app(source_path: Path) -> FastAPI:
70 manager = ConnectionManager()
71 current_json: dict = {}
72 loop: Optional[asyncio.AbstractEventLoop] = None
73 observer: Optional[Observer] = None
74 _initialized: bool = False
75
76 def _reassemble() -> dict:
77 source = source_path.read_text()
78 result = run_progressive(source)
79 return graph_to_json(result)
80
81 def _on_file_change() -> None:
82 nonlocal current_json, loop
83 current_json = _reassemble()
84 if loop is not None and not loop.is_closed():
85 try:
86 asyncio.run_coroutine_threadsafe(
87 manager.broadcast(current_json), loop
88 )
89 except RuntimeError:
90 # Loop might be stopped or closed
91 pass
92
93 def _ensure_initialized() -> None:
94 nonlocal current_json, loop, observer, _initialized
95 if _initialized:
96 return
97 _initialized = True
98
99 # Initialize current_json on first use
100 current_json = _reassemble()
101
102 # Set up file watcher
103 try:
104 loop = asyncio.get_running_loop()
105 except RuntimeError:
106 try:
107 loop = asyncio.get_event_loop()
108 if loop.is_closed():
109 loop = asyncio.new_event_loop()
110 asyncio.set_event_loop(loop)
111 except RuntimeError:
112 loop = asyncio.new_event_loop()
113 asyncio.set_event_loop(loop)
114
115 handler = DebouncedFileHandler(
116 str(source_path), _on_file_change, debounce_s=0.3
117 )
118 observer = Observer()
119 observer.schedule(handler, str(source_path.parent), recursive=False)
120 observer.daemon = True
121 observer.start()
122
123 @asynccontextmanager
124 async def lifespan(app: FastAPI):
125 nonlocal current_json, loop, observer
126 loop = asyncio.get_event_loop()
127 current_json = _reassemble()
128
129 handler = DebouncedFileHandler(
130 str(source_path), _on_file_change, debounce_s=0.3
131 )
132 observer = Observer()
133 observer.schedule(handler, str(source_path.parent), recursive=False)
134 observer.daemon = True
135 observer.start()
136
137 yield
138
139 observer.stop()
140 observer.join(timeout=2)
141
142 app = FastAPI(lifespan=lifespan)
143
144 @app.websocket("/ws")
145 async def websocket_endpoint(websocket: WebSocket) -> None:
146 nonlocal current_json
147 _ensure_initialized()
148 await manager.connect(websocket)
149 try:
150 await websocket.send_json(current_json)
151 while True:
152 await websocket.receive_text()
153 except WebSocketDisconnect:
154 manager.disconnect(websocket)
155
156 frontend_dir = Path(__file__).parent / "frontend"
157 app.mount(
158 "/dist",
159 StaticFiles(directory=str(frontend_dir / "dist")),
160 name="dist",
161 )
162 app.mount(
163 "/",
164 StaticFiles(directory=str(frontend_dir), html=True),
165 name="frontend",
166 )
167
168 return app