OR-1 dataflow CPU sketch
1"""Tests for dfgraph server WebSocket and file watcher.
2
3Tests verify:
4- dataflow-renderer.AC4.2: Initial load renders the graph without manual refresh
5- dataflow-renderer.AC4.1: Saving the dfasm file triggers re-render within 1 second
6"""
7
8import time
9
10from starlette.testclient import TestClient
11
12from dfgraph.server import create_app
13
14
15class TestInitialLoad:
16 """AC4.2: Initial load renders the graph without manual refresh."""
17
18 def test_websocket_sends_initial_graph_on_connect(self, tmp_path):
19 """Connect to /ws and receive initial graph JSON."""
20 # Create a minimal valid dfasm file
21 dfasm_file = tmp_path / "test.dfasm"
22 dfasm_file.write_text("""@system pe=2, sm=0
23&c1|pe0 <| const, 3
24&c2|pe0 <| const, 7
25&result|pe0 <| add
26&c1|pe0 |> &result|pe0:L
27&c2|pe0 |> &result|pe0:R
28""")
29
30 app = create_app(dfasm_file)
31 with TestClient(app) as client:
32 with client.websocket_connect("/ws") as ws:
33 data = ws.receive_json()
34
35 assert data["type"] == "graph_update"
36 assert "nodes" in data
37 assert "edges" in data
38 assert "metadata" in data
39 # For a valid graph, nodes should be non-empty
40 assert isinstance(data["nodes"], list)
41
42 def test_websocket_graph_has_expected_fields(self, tmp_path):
43 """Graph JSON has all expected fields."""
44 dfasm_file = tmp_path / "test.dfasm"
45 dfasm_file.write_text("""@system pe=2, sm=0
46&c1|pe0 <| const, 3
47&c2|pe0 <| const, 7
48&result|pe0 <| add
49&c1|pe0 |> &result|pe0:L
50&c2|pe0 |> &result|pe0:R
51""")
52
53 app = create_app(dfasm_file)
54 with TestClient(app) as client:
55 with client.websocket_connect("/ws") as ws:
56 data = ws.receive_json()
57
58 assert "type" in data
59 assert "stage" in data
60 assert "nodes" in data
61 assert "edges" in data
62 assert "regions" in data
63 assert "errors" in data
64 assert "parse_error" in data
65 assert "metadata" in data
66
67
68class TestLiveReload:
69 """AC4.1: Saving the dfasm file triggers re-render within 1 second."""
70
71 def test_file_change_broadcasts_update(self, tmp_path):
72 """Modify file on disk and receive updated graph."""
73 dfasm_file = tmp_path / "test.dfasm"
74 dfasm_file.write_text("""@system pe=2, sm=0
75&c1|pe0 <| const, 3
76&c2|pe0 <| const, 7
77&result|pe0 <| add
78&c1|pe0 |> &result|pe0:L
79&c2|pe0 |> &result|pe0:R
80""")
81
82 app = create_app(dfasm_file)
83 with TestClient(app) as client:
84 with client.websocket_connect("/ws") as ws:
85 # Receive initial graph
86 data1 = ws.receive_json()
87 assert data1["type"] == "graph_update"
88 initial_consts = sorted([n["const"] for n in data1["nodes"] if n["opcode"] == "const"])
89
90 # Modify the file
91 dfasm_file.write_text("""@system pe=2, sm=0
92&c1|pe0 <| const, 5
93&c2|pe0 <| const, 9
94&result|pe0 <| add
95&c1|pe0 |> &result|pe0:L
96&c2|pe0 |> &result|pe0:R
97""")
98
99 # Wait for update with generous timeout (up to 2 seconds)
100 start = time.time()
101 data2 = None
102 while time.time() - start < 2.0:
103 try:
104 data2 = ws.receive_json()
105 if data2:
106 break
107 except Exception:
108 time.sleep(0.1)
109
110 assert data2 is not None, "No update received within 2 seconds"
111 assert data2["type"] == "graph_update"
112 updated_consts = sorted([n["const"] for n in data2["nodes"] if n["opcode"] == "const"])
113 # Verify the consts actually changed
114 assert updated_consts != initial_consts, f"Graph not actually updated: {initial_consts} vs {updated_consts}"
115
116 def test_rapid_file_changes_debounced(self, tmp_path):
117 """Rapid file modifications result in single update (debounce)."""
118 dfasm_file = tmp_path / "test.dfasm"
119 dfasm_file.write_text("""@system pe=2, sm=0
120&c1|pe0 <| const, 3
121&c2|pe0 <| const, 7
122&result|pe0 <| add
123&c1|pe0 |> &result|pe0:L
124&c2|pe0 |> &result|pe0:R
125""")
126
127 app = create_app(dfasm_file)
128 with TestClient(app) as client:
129 with client.websocket_connect("/ws") as ws:
130 # Receive initial graph
131 data1 = ws.receive_json()
132 assert data1["type"] == "graph_update"
133
134 # Modify file rapidly 3 times - these should be debounced together
135 for i in range(3):
136 dfasm_file.write_text(f"""@system pe=2, sm=0
137&c1|pe0 <| const, {3 + i}
138&c2|pe0 <| const, {7 + i}
139&result|pe0 <| add
140&c1|pe0 |> &result|pe0:L
141&c2|pe0 |> &result|pe0:R
142""")
143 time.sleep(0.1)
144
145 # Give debounce time to trigger (300ms debounce + buffer)
146 time.sleep(0.5)
147
148 # Collect updates within a 2-second window
149 start = time.time()
150 received_updates = []
151 while time.time() - start < 2.0:
152 try:
153 data = ws.receive_json()
154 if data.get("type") == "graph_update":
155 received_updates.append(data)
156 break
157 except Exception:
158 break
159
160 assert len(received_updates) >= 1, (
161 f"Expected at least 1 debounced update, got {len(received_updates)}"
162 )
163
164
165class TestHttpServing:
166 """HTTP serving of static files."""
167
168 def test_http_get_index_html(self, tmp_path):
169 """GET / returns index.html with dfgraph title."""
170 dfasm_file = tmp_path / "test.dfasm"
171 dfasm_file.write_text("""@system pe=2, sm=0
172&c1|pe0 <| const, 3
173&c2|pe0 <| const, 7
174&result|pe0 <| add
175&c1|pe0 |> &result|pe0:L
176&c2|pe0 |> &result|pe0:R
177""")
178
179 app = create_app(dfasm_file)
180 with TestClient(app) as client:
181 response = client.get("/")
182 assert response.status_code == 200
183 assert "dfgraph" in response.text
184
185
186class TestParseError:
187 """Handle invalid dfasm source gracefully."""
188
189 def test_parse_error_in_initial_graph(self, tmp_path):
190 """Invalid dfasm produces parse error in graph."""
191 dfasm_file = tmp_path / "test.dfasm"
192 # Write invalid dfasm (missing system block)
193 dfasm_file.write_text("this is not valid dfasm syntax @#$")
194
195 app = create_app(dfasm_file)
196 with TestClient(app) as client:
197 with client.websocket_connect("/ws") as ws:
198 data = ws.receive_json()
199
200 assert data["type"] == "graph_update"
201 assert data["parse_error"] is not None or data["stage"] == "parse_error"