prefect server in zig
at 6b9a11e6e4eecbefbd87ec67d2487067cfbcacdb 360 lines 13 kB view raw
1#!/usr/bin/env -S uv run --script --quiet 2# /// script 3# requires-python = ">=3.12" 4# dependencies = ["httpx", "rich"] 5# /// 6""" 7Test the exact API call sequence for a flow run against prefect-zig. 8 9This mimics what the Prefect Python client does, step by step: 101. POST /flows/ - create/get flow 112. POST /flow_runs/ - create flow run with PENDING state 123. GET /flow_runs/{id} - read flow run 134. POST /flow_runs/{id}/set_state - transition to RUNNING 145. (execute user code) 156. POST /flow_runs/{id}/set_state - transition to COMPLETED or FAILED 16""" 17 18import os 19import sys 20import uuid 21from datetime import datetime, timezone 22 23import httpx 24from rich.console import Console 25from rich.panel import Panel 26 27console = Console() 28BASE_URL = os.environ.get("PREFECT_API_URL", "http://localhost:4200/api") 29 30 31def iso_now() -> str: 32 return datetime.now(timezone.utc).isoformat() 33 34 35def test_flow_sequence(flow_name: str = "test-flow", should_fail: bool = False): 36 """Run through the exact sequence of API calls the Prefect client makes.""" 37 38 console.print(Panel(f"testing flow sequence: {flow_name} (fail={should_fail})", style="blue")) 39 console.print(f"server: {BASE_URL}\n") 40 41 with httpx.Client(base_url=BASE_URL, timeout=10) as client: 42 # step 1: create/get flow 43 console.print("[bold]1. POST /flows/[/bold]") 44 resp = client.post("/flows/", json={"name": flow_name}) 45 if resp.status_code not in (200, 201): 46 console.print(f"[red]FAIL[/red]: {resp.status_code} {resp.text}") 47 return False 48 flow = resp.json() 49 console.print(f" flow_id: {flow.get('id')}") 50 51 # validate flow response has required fields 52 for field in ["id", "name", "created"]: 53 if field not in flow: 54 console.print(f"[red]FAIL[/red]: missing field '{field}' in flow response") 55 return False 56 57 # step 2: create flow run 58 console.print("\n[bold]2. POST /flow_runs/[/bold]") 59 flow_run_create = { 60 "flow_id": flow["id"], 61 "name": f"run-{uuid.uuid4().hex[:8]}", 62 "parameters": {"x": 1, "y": 2}, 63 "state": { 64 "type": "PENDING", 65 "name": "Pending", 66 "timestamp": iso_now(), 67 "message": None, 68 }, 69 } 70 resp = client.post("/flow_runs/", json=flow_run_create) 71 if resp.status_code not in (200, 201): 72 console.print(f"[red]FAIL[/red]: {resp.status_code} {resp.text}") 73 return False 74 flow_run = resp.json() 75 flow_run_id = flow_run.get("id") 76 console.print(f" flow_run_id: {flow_run_id}") 77 console.print(f" state: {flow_run.get('state_type')}") 78 79 # validate flow run response 80 for field in ["id", "flow_id", "name", "state_type", "state"]: 81 if field not in flow_run: 82 console.print(f"[red]FAIL[/red]: missing field '{field}' in flow_run response") 83 return False 84 85 # step 3: read flow run (refresh before execution) 86 console.print("\n[bold]3. GET /flow_runs/{id}[/bold]") 87 resp = client.get(f"/flow_runs/{flow_run_id}") 88 if resp.status_code != 200: 89 console.print(f"[red]FAIL[/red]: {resp.status_code} {resp.text}") 90 return False 91 flow_run = resp.json() 92 console.print(f" state: {flow_run.get('state_type')}") 93 94 # step 4: set state to RUNNING 95 console.print("\n[bold]4. POST /flow_runs/{id}/set_state (RUNNING)[/bold]") 96 set_state_running = { 97 "state": { 98 "type": "RUNNING", 99 "name": "Running", 100 "timestamp": iso_now(), 101 "message": None, 102 }, 103 "force": False, 104 } 105 resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json=set_state_running) 106 if resp.status_code != 200: 107 console.print(f"[red]FAIL[/red]: {resp.status_code} {resp.text}") 108 return False 109 result = resp.json() 110 console.print(f" status: {result.get('status')}") 111 console.print(f" state.type: {result.get('state', {}).get('type')}") 112 113 # validate orchestration result 114 if result.get("status") != "ACCEPT": 115 console.print(f"[yellow]WARN[/yellow]: expected ACCEPT, got {result.get('status')}") 116 117 # step 5: simulate user code execution 118 console.print("\n[bold]5. (execute user code)[/bold]") 119 if should_fail: 120 console.print(" simulating failure...") 121 error_message = "Flow run encountered an exception: ValueError('test error')" 122 else: 123 console.print(" simulating success...") 124 error_message = None 125 126 # step 6: set final state 127 if should_fail: 128 console.print("\n[bold]6. POST /flow_runs/{id}/set_state (FAILED)[/bold]") 129 final_state = { 130 "state": { 131 "type": "FAILED", 132 "name": "Failed", 133 "timestamp": iso_now(), 134 "message": error_message, 135 }, 136 "force": False, 137 } 138 else: 139 console.print("\n[bold]6. POST /flow_runs/{id}/set_state (COMPLETED)[/bold]") 140 final_state = { 141 "state": { 142 "type": "COMPLETED", 143 "name": "Completed", 144 "timestamp": iso_now(), 145 "message": None, 146 }, 147 "force": False, 148 } 149 150 resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json=final_state) 151 if resp.status_code != 200: 152 console.print(f"[red]FAIL[/red]: {resp.status_code} {resp.text}") 153 return False 154 result = resp.json() 155 console.print(f" status: {result.get('status')}") 156 console.print(f" state.type: {result.get('state', {}).get('type')}") 157 158 # verify final state 159 console.print("\n[bold]7. GET /flow_runs/{id} (verify final state)[/bold]") 160 resp = client.get(f"/flow_runs/{flow_run_id}") 161 if resp.status_code != 200: 162 console.print(f"[red]FAIL[/red]: {resp.status_code} {resp.text}") 163 return False 164 flow_run = resp.json() 165 final_type = flow_run.get("state_type") 166 expected_type = "FAILED" if should_fail else "COMPLETED" 167 168 if final_type == expected_type: 169 console.print(f" [green]state: {final_type} (correct)[/green]") 170 else: 171 console.print(f" [red]state: {final_type} (expected {expected_type})[/red]") 172 return False 173 174 return True 175 176 177def test_admin_endpoints(): 178 """Test admin/health endpoints.""" 179 console.print(Panel("testing admin endpoints", style="blue")) 180 181 with httpx.Client(base_url=BASE_URL, timeout=10) as client: 182 # health 183 console.print("[bold]GET /health[/bold]") 184 resp = client.get("/health") 185 if resp.status_code != 200: 186 console.print(f"[red]FAIL[/red]: {resp.status_code}") 187 return False 188 console.print(f" {resp.json()}") 189 190 # version 191 console.print("[bold]GET /admin/version[/bold]") 192 resp = client.get("/admin/version") 193 if resp.status_code != 200: 194 console.print(f"[red]FAIL[/red]: {resp.status_code}") 195 return False 196 console.print(f" {resp.json()}") 197 198 # csrf-token 199 console.print("[bold]GET /csrf-token[/bold]") 200 resp = client.get("/csrf-token") 201 if resp.status_code != 200: 202 console.print(f"[red]FAIL[/red]: {resp.status_code}") 203 return False 204 console.print(f" token received") 205 206 return True 207 208 209def test_filter_endpoints(): 210 """Test filter endpoints.""" 211 console.print(Panel("testing filter endpoints", style="blue")) 212 213 with httpx.Client(base_url=BASE_URL, timeout=10) as client: 214 # flows/filter 215 console.print("[bold]POST /flows/filter[/bold]") 216 resp = client.post("/flows/filter", json={}) 217 if resp.status_code != 200: 218 console.print(f"[red]FAIL[/red]: {resp.status_code}") 219 return False 220 flows = resp.json() 221 console.print(f" {len(flows)} flows") 222 223 # flow_runs/filter 224 console.print("[bold]POST /flow_runs/filter[/bold]") 225 resp = client.post("/flow_runs/filter", json={}) 226 if resp.status_code != 200: 227 console.print(f"[red]FAIL[/red]: {resp.status_code}") 228 return False 229 runs = resp.json() 230 console.print(f" {len(runs)} flow runs") 231 232 # task_runs/filter 233 console.print("[bold]POST /task_runs/filter[/bold]") 234 resp = client.post("/task_runs/filter", json={}) 235 if resp.status_code != 200: 236 console.print(f"[red]FAIL[/red]: {resp.status_code}") 237 return False 238 tasks = resp.json() 239 console.print(f" {len(tasks)} task runs") 240 241 return True 242 243 244def test_task_run_sequence(): 245 """Test task run API sequence.""" 246 console.print(Panel("testing task run sequence", style="blue")) 247 248 with httpx.Client(base_url=BASE_URL, timeout=10) as client: 249 # create task run 250 console.print("[bold]POST /task_runs/[/bold]") 251 task_run_create = { 252 "task_key": "test-task", 253 "dynamic_key": f"key-{uuid.uuid4().hex[:8]}", 254 "name": f"task-{uuid.uuid4().hex[:8]}", 255 "state": {"type": "PENDING", "name": "Pending"}, 256 } 257 resp = client.post("/task_runs/", json=task_run_create) 258 if resp.status_code not in (200, 201): 259 console.print(f"[red]FAIL[/red]: {resp.status_code} {resp.text}") 260 return False 261 task_run = resp.json() 262 task_run_id = task_run.get("id") 263 console.print(f" task_run_id: {task_run_id}") 264 265 # read task run 266 console.print(f"[bold]GET /task_runs/{task_run_id}[/bold]") 267 resp = client.get(f"/task_runs/{task_run_id}") 268 if resp.status_code != 200: 269 console.print(f"[red]FAIL[/red]: {resp.status_code}") 270 return False 271 console.print(f" state: {resp.json().get('state_type')}") 272 273 # set state to RUNNING 274 console.print(f"[bold]POST /task_runs/{task_run_id}/set_state (RUNNING)[/bold]") 275 resp = client.post(f"/task_runs/{task_run_id}/set_state", json={ 276 "state": {"type": "RUNNING", "name": "Running"} 277 }) 278 if resp.status_code != 200: 279 console.print(f"[red]FAIL[/red]: {resp.status_code}") 280 return False 281 console.print(f" status: {resp.json().get('status')}") 282 283 # set state to COMPLETED 284 console.print(f"[bold]POST /task_runs/{task_run_id}/set_state (COMPLETED)[/bold]") 285 resp = client.post(f"/task_runs/{task_run_id}/set_state", json={ 286 "state": {"type": "COMPLETED", "name": "Completed"} 287 }) 288 if resp.status_code != 200: 289 console.print(f"[red]FAIL[/red]: {resp.status_code}") 290 return False 291 console.print(f" status: {resp.json().get('status')}") 292 293 return True 294 295 296def test_logs_endpoint(): 297 """Test logs endpoint.""" 298 console.print(Panel("testing logs endpoint", style="blue")) 299 300 with httpx.Client(base_url=BASE_URL, timeout=10) as client: 301 console.print("[bold]POST /logs/[/bold]") 302 logs = [ 303 {"level": 20, "message": "test log 1", "name": "test", "timestamp": iso_now()}, 304 {"level": 30, "message": "test log 2", "name": "test", "timestamp": iso_now()}, 305 ] 306 resp = client.post("/logs/", json=logs) 307 if resp.status_code not in (200, 201, 204): 308 console.print(f"[red]FAIL[/red]: {resp.status_code} {resp.text}") 309 return False 310 console.print(f" {len(logs)} logs sent") 311 312 return True 313 314 315def main(): 316 console.print("\n[bold cyan]prefect-server API test[/bold cyan]\n") 317 318 results = [] 319 320 # admin endpoints 321 results.append(("admin", test_admin_endpoints())) 322 console.print() 323 324 # flow sequence (happy path) 325 results.append(("flow (success)", test_flow_sequence("happy-flow", should_fail=False))) 326 console.print() 327 328 # flow sequence (failure path) 329 results.append(("flow (failure)", test_flow_sequence("sad-flow", should_fail=True))) 330 console.print() 331 332 # task run sequence 333 results.append(("task_run", test_task_run_sequence())) 334 console.print() 335 336 # filter endpoints 337 results.append(("filter", test_filter_endpoints())) 338 console.print() 339 340 # logs endpoint 341 results.append(("logs", test_logs_endpoint())) 342 console.print() 343 344 # summary 345 console.print("=" * 50) 346 all_passed = all(r[1] for r in results) 347 for name, passed in results: 348 status = "[green]✓[/green]" if passed else "[red]✗[/red]" 349 console.print(f" {status} {name}") 350 351 if all_passed: 352 console.print("\n[bold green]all tests passed[/bold green]") 353 sys.exit(0) 354 else: 355 console.print("\n[bold red]some tests failed[/bold red]") 356 sys.exit(1) 357 358 359if __name__ == "__main__": 360 main()