OR-1 dataflow CPU sketch
at 00d336d2d4b197bbb9dbbf3641f5f112bf0cf3ec 201 lines 6.9 kB view raw
1"""Tests for dfgraph server WebSocket and file watcher. 2 3Tests verify: 4- dataflow-renderer.AC4.2: Initial load renders the graph without manual refresh 5- dataflow-renderer.AC4.1: Saving the dfasm file triggers re-render within 1 second 6""" 7 8import time 9 10from starlette.testclient import TestClient 11 12from dfgraph.server import create_app 13 14 15class TestInitialLoad: 16 """AC4.2: Initial load renders the graph without manual refresh.""" 17 18 def test_websocket_sends_initial_graph_on_connect(self, tmp_path): 19 """Connect to /ws and receive initial graph JSON.""" 20 # Create a minimal valid dfasm file 21 dfasm_file = tmp_path / "test.dfasm" 22 dfasm_file.write_text("""@system pe=2, sm=0 23&c1|pe0 <| const, 3 24&c2|pe0 <| const, 7 25&result|pe0 <| add 26&c1|pe0 |> &result|pe0:L 27&c2|pe0 |> &result|pe0:R 28""") 29 30 app = create_app(dfasm_file) 31 with TestClient(app) as client: 32 with client.websocket_connect("/ws") as ws: 33 data = ws.receive_json() 34 35 assert data["type"] == "graph_update" 36 assert "nodes" in data 37 assert "edges" in data 38 assert "metadata" in data 39 # For a valid graph, nodes should be non-empty 40 assert isinstance(data["nodes"], list) 41 42 def test_websocket_graph_has_expected_fields(self, tmp_path): 43 """Graph JSON has all expected fields.""" 44 dfasm_file = tmp_path / "test.dfasm" 45 dfasm_file.write_text("""@system pe=2, sm=0 46&c1|pe0 <| const, 3 47&c2|pe0 <| const, 7 48&result|pe0 <| add 49&c1|pe0 |> &result|pe0:L 50&c2|pe0 |> &result|pe0:R 51""") 52 53 app = create_app(dfasm_file) 54 with TestClient(app) as client: 55 with client.websocket_connect("/ws") as ws: 56 data = ws.receive_json() 57 58 assert "type" in data 59 assert "stage" in data 60 assert "nodes" in data 61 assert "edges" in data 62 assert "regions" in data 63 assert "errors" in data 64 assert "parse_error" in data 65 assert "metadata" in data 66 67 68class TestLiveReload: 69 """AC4.1: Saving the dfasm file triggers re-render within 1 second.""" 70 71 def test_file_change_broadcasts_update(self, tmp_path): 72 """Modify file on disk and receive updated graph.""" 73 dfasm_file = tmp_path / "test.dfasm" 74 dfasm_file.write_text("""@system pe=2, sm=0 75&c1|pe0 <| const, 3 76&c2|pe0 <| const, 7 77&result|pe0 <| add 78&c1|pe0 |> &result|pe0:L 79&c2|pe0 |> &result|pe0:R 80""") 81 82 app = create_app(dfasm_file) 83 with TestClient(app) as client: 84 with client.websocket_connect("/ws") as ws: 85 # Receive initial graph 86 data1 = ws.receive_json() 87 assert data1["type"] == "graph_update" 88 initial_consts = sorted([n["const"] for n in data1["nodes"] if n["opcode"] == "const"]) 89 90 # Modify the file 91 dfasm_file.write_text("""@system pe=2, sm=0 92&c1|pe0 <| const, 5 93&c2|pe0 <| const, 9 94&result|pe0 <| add 95&c1|pe0 |> &result|pe0:L 96&c2|pe0 |> &result|pe0:R 97""") 98 99 # Wait for update with generous timeout (up to 2 seconds) 100 start = time.time() 101 data2 = None 102 while time.time() - start < 2.0: 103 try: 104 data2 = ws.receive_json() 105 if data2: 106 break 107 except Exception: 108 time.sleep(0.1) 109 110 assert data2 is not None, "No update received within 2 seconds" 111 assert data2["type"] == "graph_update" 112 updated_consts = sorted([n["const"] for n in data2["nodes"] if n["opcode"] == "const"]) 113 # Verify the consts actually changed 114 assert updated_consts != initial_consts, f"Graph not actually updated: {initial_consts} vs {updated_consts}" 115 116 def test_rapid_file_changes_debounced(self, tmp_path): 117 """Rapid file modifications result in single update (debounce).""" 118 dfasm_file = tmp_path / "test.dfasm" 119 dfasm_file.write_text("""@system pe=2, sm=0 120&c1|pe0 <| const, 3 121&c2|pe0 <| const, 7 122&result|pe0 <| add 123&c1|pe0 |> &result|pe0:L 124&c2|pe0 |> &result|pe0:R 125""") 126 127 app = create_app(dfasm_file) 128 with TestClient(app) as client: 129 with client.websocket_connect("/ws") as ws: 130 # Receive initial graph 131 data1 = ws.receive_json() 132 assert data1["type"] == "graph_update" 133 134 # Modify file rapidly 3 times - these should be debounced together 135 for i in range(3): 136 dfasm_file.write_text(f"""@system pe=2, sm=0 137&c1|pe0 <| const, {3 + i} 138&c2|pe0 <| const, {7 + i} 139&result|pe0 <| add 140&c1|pe0 |> &result|pe0:L 141&c2|pe0 |> &result|pe0:R 142""") 143 time.sleep(0.1) 144 145 # Give debounce time to trigger (300ms debounce + buffer) 146 time.sleep(0.5) 147 148 # Collect updates within a 2-second window 149 start = time.time() 150 received_updates = [] 151 while time.time() - start < 2.0: 152 try: 153 data = ws.receive_json() 154 if data.get("type") == "graph_update": 155 received_updates.append(data) 156 break 157 except Exception: 158 break 159 160 assert len(received_updates) >= 1, ( 161 f"Expected at least 1 debounced update, got {len(received_updates)}" 162 ) 163 164 165class TestHttpServing: 166 """HTTP serving of static files.""" 167 168 def test_http_get_index_html(self, tmp_path): 169 """GET / returns index.html with dfgraph title.""" 170 dfasm_file = tmp_path / "test.dfasm" 171 dfasm_file.write_text("""@system pe=2, sm=0 172&c1|pe0 <| const, 3 173&c2|pe0 <| const, 7 174&result|pe0 <| add 175&c1|pe0 |> &result|pe0:L 176&c2|pe0 |> &result|pe0:R 177""") 178 179 app = create_app(dfasm_file) 180 with TestClient(app) as client: 181 response = client.get("/") 182 assert response.status_code == 200 183 assert "dfgraph" in response.text 184 185 186class TestParseError: 187 """Handle invalid dfasm source gracefully.""" 188 189 def test_parse_error_in_initial_graph(self, tmp_path): 190 """Invalid dfasm produces parse error in graph.""" 191 dfasm_file = tmp_path / "test.dfasm" 192 # Write invalid dfasm (missing system block) 193 dfasm_file.write_text("this is not valid dfasm syntax @#$") 194 195 app = create_app(dfasm_file) 196 with TestClient(app) as client: 197 with client.websocket_connect("/ws") as ws: 198 data = ws.receive_json() 199 200 assert data["type"] == "graph_update" 201 assert data["parse_error"] is not None or data["stage"] == "parse_error"