prefect server in zig
1#!/usr/bin/env -S uv run --script --quiet
2# /// script
3# requires-python = ">=3.12"
4# dependencies = ["httpx", "rich"]
5# ///
6"""
7Test the exact API call sequence for a flow run against prefect-zig.
8
9This mimics what the Prefect Python client does, step by step:
101. POST /flows/ - create/get flow
112. POST /flow_runs/ - create flow run with PENDING state
123. GET /flow_runs/{id} - read flow run
134. POST /flow_runs/{id}/set_state - transition to RUNNING
145. (execute user code)
156. POST /flow_runs/{id}/set_state - transition to COMPLETED or FAILED
16"""
17
18import os
19import sys
20import uuid
21from datetime import datetime, timezone
22
23import httpx
24from rich.console import Console
25from rich.panel import Panel
26
27console = Console()
28BASE_URL = os.environ.get("PREFECT_API_URL", "http://localhost:4200/api")
29
30
31def iso_now() -> str:
32 return datetime.now(timezone.utc).isoformat()
33
34
35def test_flow_sequence(flow_name: str = "test-flow", should_fail: bool = False):
36 """Run through the exact sequence of API calls the Prefect client makes."""
37
38 console.print(Panel(f"testing flow sequence: {flow_name} (fail={should_fail})", style="blue"))
39 console.print(f"server: {BASE_URL}\n")
40
41 with httpx.Client(base_url=BASE_URL, timeout=10) as client:
42 # step 1: create/get flow
43 console.print("[bold]1. POST /flows/[/bold]")
44 resp = client.post("/flows/", json={"name": flow_name})
45 if resp.status_code not in (200, 201):
46 console.print(f"[red]FAIL[/red]: {resp.status_code} {resp.text}")
47 return False
48 flow = resp.json()
49 console.print(f" flow_id: {flow.get('id')}")
50
51 # validate flow response has required fields
52 for field in ["id", "name", "created"]:
53 if field not in flow:
54 console.print(f"[red]FAIL[/red]: missing field '{field}' in flow response")
55 return False
56
57 # step 2: create flow run
58 console.print("\n[bold]2. POST /flow_runs/[/bold]")
59 flow_run_create = {
60 "flow_id": flow["id"],
61 "name": f"run-{uuid.uuid4().hex[:8]}",
62 "parameters": {"x": 1, "y": 2},
63 "state": {
64 "type": "PENDING",
65 "name": "Pending",
66 "timestamp": iso_now(),
67 "message": None,
68 },
69 }
70 resp = client.post("/flow_runs/", json=flow_run_create)
71 if resp.status_code not in (200, 201):
72 console.print(f"[red]FAIL[/red]: {resp.status_code} {resp.text}")
73 return False
74 flow_run = resp.json()
75 flow_run_id = flow_run.get("id")
76 console.print(f" flow_run_id: {flow_run_id}")
77 console.print(f" state: {flow_run.get('state_type')}")
78
79 # validate flow run response
80 for field in ["id", "flow_id", "name", "state_type", "state"]:
81 if field not in flow_run:
82 console.print(f"[red]FAIL[/red]: missing field '{field}' in flow_run response")
83 return False
84
85 # step 3: read flow run (refresh before execution)
86 console.print("\n[bold]3. GET /flow_runs/{id}[/bold]")
87 resp = client.get(f"/flow_runs/{flow_run_id}")
88 if resp.status_code != 200:
89 console.print(f"[red]FAIL[/red]: {resp.status_code} {resp.text}")
90 return False
91 flow_run = resp.json()
92 console.print(f" state: {flow_run.get('state_type')}")
93
94 # step 4: set state to RUNNING
95 console.print("\n[bold]4. POST /flow_runs/{id}/set_state (RUNNING)[/bold]")
96 set_state_running = {
97 "state": {
98 "type": "RUNNING",
99 "name": "Running",
100 "timestamp": iso_now(),
101 "message": None,
102 },
103 "force": False,
104 }
105 resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json=set_state_running)
106 if resp.status_code != 200:
107 console.print(f"[red]FAIL[/red]: {resp.status_code} {resp.text}")
108 return False
109 result = resp.json()
110 console.print(f" status: {result.get('status')}")
111 console.print(f" state.type: {result.get('state', {}).get('type')}")
112
113 # validate orchestration result
114 if result.get("status") != "ACCEPT":
115 console.print(f"[yellow]WARN[/yellow]: expected ACCEPT, got {result.get('status')}")
116
117 # step 5: simulate user code execution
118 console.print("\n[bold]5. (execute user code)[/bold]")
119 if should_fail:
120 console.print(" simulating failure...")
121 error_message = "Flow run encountered an exception: ValueError('test error')"
122 else:
123 console.print(" simulating success...")
124 error_message = None
125
126 # step 6: set final state
127 if should_fail:
128 console.print("\n[bold]6. POST /flow_runs/{id}/set_state (FAILED)[/bold]")
129 final_state = {
130 "state": {
131 "type": "FAILED",
132 "name": "Failed",
133 "timestamp": iso_now(),
134 "message": error_message,
135 },
136 "force": False,
137 }
138 else:
139 console.print("\n[bold]6. POST /flow_runs/{id}/set_state (COMPLETED)[/bold]")
140 final_state = {
141 "state": {
142 "type": "COMPLETED",
143 "name": "Completed",
144 "timestamp": iso_now(),
145 "message": None,
146 },
147 "force": False,
148 }
149
150 resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json=final_state)
151 if resp.status_code != 200:
152 console.print(f"[red]FAIL[/red]: {resp.status_code} {resp.text}")
153 return False
154 result = resp.json()
155 console.print(f" status: {result.get('status')}")
156 console.print(f" state.type: {result.get('state', {}).get('type')}")
157
158 # verify final state
159 console.print("\n[bold]7. GET /flow_runs/{id} (verify final state)[/bold]")
160 resp = client.get(f"/flow_runs/{flow_run_id}")
161 if resp.status_code != 200:
162 console.print(f"[red]FAIL[/red]: {resp.status_code} {resp.text}")
163 return False
164 flow_run = resp.json()
165 final_type = flow_run.get("state_type")
166 expected_type = "FAILED" if should_fail else "COMPLETED"
167
168 if final_type == expected_type:
169 console.print(f" [green]state: {final_type} (correct)[/green]")
170 else:
171 console.print(f" [red]state: {final_type} (expected {expected_type})[/red]")
172 return False
173
174 return True
175
176
177def test_admin_endpoints():
178 """Test admin/health endpoints."""
179 console.print(Panel("testing admin endpoints", style="blue"))
180
181 with httpx.Client(base_url=BASE_URL, timeout=10) as client:
182 # health
183 console.print("[bold]GET /health[/bold]")
184 resp = client.get("/health")
185 if resp.status_code != 200:
186 console.print(f"[red]FAIL[/red]: {resp.status_code}")
187 return False
188 console.print(f" {resp.json()}")
189
190 # version
191 console.print("[bold]GET /admin/version[/bold]")
192 resp = client.get("/admin/version")
193 if resp.status_code != 200:
194 console.print(f"[red]FAIL[/red]: {resp.status_code}")
195 return False
196 console.print(f" {resp.json()}")
197
198 # csrf-token
199 console.print("[bold]GET /csrf-token[/bold]")
200 resp = client.get("/csrf-token")
201 if resp.status_code != 200:
202 console.print(f"[red]FAIL[/red]: {resp.status_code}")
203 return False
204 console.print(f" token received")
205
206 return True
207
208
209def test_filter_endpoints():
210 """Test filter endpoints."""
211 console.print(Panel("testing filter endpoints", style="blue"))
212
213 with httpx.Client(base_url=BASE_URL, timeout=10) as client:
214 # flows/filter
215 console.print("[bold]POST /flows/filter[/bold]")
216 resp = client.post("/flows/filter", json={})
217 if resp.status_code != 200:
218 console.print(f"[red]FAIL[/red]: {resp.status_code}")
219 return False
220 flows = resp.json()
221 console.print(f" {len(flows)} flows")
222
223 # flow_runs/filter
224 console.print("[bold]POST /flow_runs/filter[/bold]")
225 resp = client.post("/flow_runs/filter", json={})
226 if resp.status_code != 200:
227 console.print(f"[red]FAIL[/red]: {resp.status_code}")
228 return False
229 runs = resp.json()
230 console.print(f" {len(runs)} flow runs")
231
232 # task_runs/filter
233 console.print("[bold]POST /task_runs/filter[/bold]")
234 resp = client.post("/task_runs/filter", json={})
235 if resp.status_code != 200:
236 console.print(f"[red]FAIL[/red]: {resp.status_code}")
237 return False
238 tasks = resp.json()
239 console.print(f" {len(tasks)} task runs")
240
241 return True
242
243
244def test_task_run_sequence():
245 """Test task run API sequence."""
246 console.print(Panel("testing task run sequence", style="blue"))
247
248 with httpx.Client(base_url=BASE_URL, timeout=10) as client:
249 # create task run
250 console.print("[bold]POST /task_runs/[/bold]")
251 task_run_create = {
252 "task_key": "test-task",
253 "dynamic_key": f"key-{uuid.uuid4().hex[:8]}",
254 "name": f"task-{uuid.uuid4().hex[:8]}",
255 "state": {"type": "PENDING", "name": "Pending"},
256 }
257 resp = client.post("/task_runs/", json=task_run_create)
258 if resp.status_code not in (200, 201):
259 console.print(f"[red]FAIL[/red]: {resp.status_code} {resp.text}")
260 return False
261 task_run = resp.json()
262 task_run_id = task_run.get("id")
263 console.print(f" task_run_id: {task_run_id}")
264
265 # read task run
266 console.print(f"[bold]GET /task_runs/{task_run_id}[/bold]")
267 resp = client.get(f"/task_runs/{task_run_id}")
268 if resp.status_code != 200:
269 console.print(f"[red]FAIL[/red]: {resp.status_code}")
270 return False
271 console.print(f" state: {resp.json().get('state_type')}")
272
273 # set state to RUNNING
274 console.print(f"[bold]POST /task_runs/{task_run_id}/set_state (RUNNING)[/bold]")
275 resp = client.post(f"/task_runs/{task_run_id}/set_state", json={
276 "state": {"type": "RUNNING", "name": "Running"}
277 })
278 if resp.status_code != 200:
279 console.print(f"[red]FAIL[/red]: {resp.status_code}")
280 return False
281 console.print(f" status: {resp.json().get('status')}")
282
283 # set state to COMPLETED
284 console.print(f"[bold]POST /task_runs/{task_run_id}/set_state (COMPLETED)[/bold]")
285 resp = client.post(f"/task_runs/{task_run_id}/set_state", json={
286 "state": {"type": "COMPLETED", "name": "Completed"}
287 })
288 if resp.status_code != 200:
289 console.print(f"[red]FAIL[/red]: {resp.status_code}")
290 return False
291 console.print(f" status: {resp.json().get('status')}")
292
293 return True
294
295
296def test_logs_endpoint():
297 """Test logs endpoint."""
298 console.print(Panel("testing logs endpoint", style="blue"))
299
300 with httpx.Client(base_url=BASE_URL, timeout=10) as client:
301 console.print("[bold]POST /logs/[/bold]")
302 logs = [
303 {"level": 20, "message": "test log 1", "name": "test", "timestamp": iso_now()},
304 {"level": 30, "message": "test log 2", "name": "test", "timestamp": iso_now()},
305 ]
306 resp = client.post("/logs/", json=logs)
307 if resp.status_code not in (200, 201, 204):
308 console.print(f"[red]FAIL[/red]: {resp.status_code} {resp.text}")
309 return False
310 console.print(f" {len(logs)} logs sent")
311
312 return True
313
314
315def main():
316 console.print("\n[bold cyan]prefect-server API test[/bold cyan]\n")
317
318 results = []
319
320 # admin endpoints
321 results.append(("admin", test_admin_endpoints()))
322 console.print()
323
324 # flow sequence (happy path)
325 results.append(("flow (success)", test_flow_sequence("happy-flow", should_fail=False)))
326 console.print()
327
328 # flow sequence (failure path)
329 results.append(("flow (failure)", test_flow_sequence("sad-flow", should_fail=True)))
330 console.print()
331
332 # task run sequence
333 results.append(("task_run", test_task_run_sequence()))
334 console.print()
335
336 # filter endpoints
337 results.append(("filter", test_filter_endpoints()))
338 console.print()
339
340 # logs endpoint
341 results.append(("logs", test_logs_endpoint()))
342 console.print()
343
344 # summary
345 console.print("=" * 50)
346 all_passed = all(r[1] for r in results)
347 for name, passed in results:
348 status = "[green]✓[/green]" if passed else "[red]✗[/red]"
349 console.print(f" {status} {name}")
350
351 if all_passed:
352 console.print("\n[bold green]all tests passed[/bold green]")
353 sys.exit(0)
354 else:
355 console.print("\n[bold red]some tests failed[/bold red]")
356 sys.exit(1)
357
358
359if __name__ == "__main__":
360 main()