OR-1 dataflow CPU sketch
at 33f8fac6c10f84c4b158b7caa01fb18c6414edae 168 lines 5.2 kB view raw
1"""FastAPI server for the dataflow graph renderer. 2 3Serves the frontend static files and pushes graph updates over WebSocket 4when the source dfasm file changes. 5""" 6 7from __future__ import annotations 8 9import asyncio 10import json 11import os 12import threading 13import time 14from pathlib import Path 15from typing import Optional 16 17from contextlib import asynccontextmanager 18 19from fastapi import FastAPI, WebSocket, WebSocketDisconnect 20from fastapi.staticfiles import StaticFiles 21from watchdog.observers import Observer 22from watchdog.events import FileSystemEventHandler 23 24from dfgraph.pipeline import run_progressive, PipelineResult 25from dfgraph.graph_json import graph_to_json 26 27 28class ConnectionManager: 29 def __init__(self) -> None: 30 self.active_connections: list[WebSocket] = [] 31 32 async def connect(self, websocket: WebSocket) -> None: 33 await websocket.accept() 34 self.active_connections.append(websocket) 35 36 def disconnect(self, websocket: WebSocket) -> None: 37 self.active_connections.remove(websocket) 38 39 async def broadcast(self, message: dict) -> None: 40 disconnected: list[WebSocket] = [] 41 for connection in self.active_connections: 42 try: 43 await connection.send_json(message) 44 except Exception: 45 disconnected.append(connection) 46 for conn in disconnected: 47 self.active_connections.remove(conn) 48 49 50class DebouncedFileHandler(FileSystemEventHandler): 51 def __init__(self, target_path: str, callback, debounce_s: float = 0.3) -> None: 52 self.target_path = os.path.realpath(target_path) 53 self.callback = callback 54 self.debounce_s = debounce_s 55 self._timer: Optional[threading.Timer] = None 56 57 def on_modified(self, event) -> None: 58 if event.is_directory: 59 return 60 if os.path.realpath(event.src_path) != self.target_path: 61 return 62 if self._timer is not None: 63 self._timer.cancel() 64 self._timer = threading.Timer(self.debounce_s, self.callback) 65 self._timer.daemon = True 66 self._timer.start() 67 68 69def create_app(source_path: Path) -> FastAPI: 70 manager = ConnectionManager() 71 current_json: dict = {} 72 loop: Optional[asyncio.AbstractEventLoop] = None 73 observer: Optional[Observer] = None 74 _initialized: bool = False 75 76 def _reassemble() -> dict: 77 source = source_path.read_text() 78 result = run_progressive(source) 79 return graph_to_json(result) 80 81 def _on_file_change() -> None: 82 nonlocal current_json, loop 83 current_json = _reassemble() 84 if loop is not None and not loop.is_closed(): 85 try: 86 asyncio.run_coroutine_threadsafe( 87 manager.broadcast(current_json), loop 88 ) 89 except RuntimeError: 90 # Loop might be stopped or closed 91 pass 92 93 def _ensure_initialized() -> None: 94 nonlocal current_json, loop, observer, _initialized 95 if _initialized: 96 return 97 _initialized = True 98 99 # Initialize current_json on first use 100 current_json = _reassemble() 101 102 # Set up file watcher 103 try: 104 loop = asyncio.get_running_loop() 105 except RuntimeError: 106 try: 107 loop = asyncio.get_event_loop() 108 if loop.is_closed(): 109 loop = asyncio.new_event_loop() 110 asyncio.set_event_loop(loop) 111 except RuntimeError: 112 loop = asyncio.new_event_loop() 113 asyncio.set_event_loop(loop) 114 115 handler = DebouncedFileHandler( 116 str(source_path), _on_file_change, debounce_s=0.3 117 ) 118 observer = Observer() 119 observer.schedule(handler, str(source_path.parent), recursive=False) 120 observer.daemon = True 121 observer.start() 122 123 @asynccontextmanager 124 async def lifespan(app: FastAPI): 125 nonlocal current_json, loop, observer 126 loop = asyncio.get_event_loop() 127 current_json = _reassemble() 128 129 handler = DebouncedFileHandler( 130 str(source_path), _on_file_change, debounce_s=0.3 131 ) 132 observer = Observer() 133 observer.schedule(handler, str(source_path.parent), recursive=False) 134 observer.daemon = True 135 observer.start() 136 137 yield 138 139 observer.stop() 140 observer.join(timeout=2) 141 142 app = FastAPI(lifespan=lifespan) 143 144 @app.websocket("/ws") 145 async def websocket_endpoint(websocket: WebSocket) -> None: 146 nonlocal current_json 147 _ensure_initialized() 148 await manager.connect(websocket) 149 try: 150 await websocket.send_json(current_json) 151 while True: 152 await websocket.receive_text() 153 except WebSocketDisconnect: 154 manager.disconnect(websocket) 155 156 frontend_dir = Path(__file__).parent / "frontend" 157 app.mount( 158 "/dist", 159 StaticFiles(directory=str(frontend_dir / "dist")), 160 name="dist", 161 ) 162 app.mount( 163 "/", 164 StaticFiles(directory=str(frontend_dir), html=True), 165 name="frontend", 166 ) 167 168 return app