#!/usr/bin/env -S uv run --script --quiet # /// script # requires-python = ">=3.12" # dependencies = ["httpx", "rich"] # /// """ Test the exact API call sequence for a flow run against prefect-zig. This mimics what the Prefect Python client does, step by step: 1. POST /flows/ - create/get flow 2. POST /flow_runs/ - create flow run with PENDING state 3. GET /flow_runs/{id} - read flow run 4. POST /flow_runs/{id}/set_state - transition to RUNNING 5. (execute user code) 6. POST /flow_runs/{id}/set_state - transition to COMPLETED or FAILED """ import os import sys import uuid from datetime import datetime, timezone import httpx from rich.console import Console from rich.panel import Panel console = Console() BASE_URL = os.environ.get("PREFECT_API_URL", "http://localhost:4200/api") def iso_now() -> str: return datetime.now(timezone.utc).isoformat() def test_flow_sequence(flow_name: str = "test-flow", should_fail: bool = False): """Run through the exact sequence of API calls the Prefect client makes.""" console.print(Panel(f"testing flow sequence: {flow_name} (fail={should_fail})", style="blue")) console.print(f"server: {BASE_URL}\n") with httpx.Client(base_url=BASE_URL, timeout=10) as client: # step 1: create/get flow console.print("[bold]1. POST /flows/[/bold]") resp = client.post("/flows/", json={"name": flow_name}) if resp.status_code not in (200, 201): console.print(f"[red]FAIL[/red]: {resp.status_code} {resp.text}") return False flow = resp.json() console.print(f" flow_id: {flow.get('id')}") # validate flow response has required fields for field in ["id", "name", "created"]: if field not in flow: console.print(f"[red]FAIL[/red]: missing field '{field}' in flow response") return False # step 2: create flow run console.print("\n[bold]2. POST /flow_runs/[/bold]") flow_run_create = { "flow_id": flow["id"], "name": f"run-{uuid.uuid4().hex[:8]}", "parameters": {"x": 1, "y": 2}, "state": { "type": "PENDING", "name": "Pending", "timestamp": iso_now(), "message": None, }, } resp = client.post("/flow_runs/", json=flow_run_create) if resp.status_code not in (200, 201): console.print(f"[red]FAIL[/red]: {resp.status_code} {resp.text}") return False flow_run = resp.json() flow_run_id = flow_run.get("id") console.print(f" flow_run_id: {flow_run_id}") console.print(f" state: {flow_run.get('state_type')}") # validate flow run response for field in ["id", "flow_id", "name", "state_type", "state"]: if field not in flow_run: console.print(f"[red]FAIL[/red]: missing field '{field}' in flow_run response") return False # step 3: read flow run (refresh before execution) console.print("\n[bold]3. GET /flow_runs/{id}[/bold]") resp = client.get(f"/flow_runs/{flow_run_id}") if resp.status_code != 200: console.print(f"[red]FAIL[/red]: {resp.status_code} {resp.text}") return False flow_run = resp.json() console.print(f" state: {flow_run.get('state_type')}") # step 4: set state to RUNNING console.print("\n[bold]4. POST /flow_runs/{id}/set_state (RUNNING)[/bold]") set_state_running = { "state": { "type": "RUNNING", "name": "Running", "timestamp": iso_now(), "message": None, }, "force": False, } resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json=set_state_running) if resp.status_code != 200: console.print(f"[red]FAIL[/red]: {resp.status_code} {resp.text}") return False result = resp.json() console.print(f" status: {result.get('status')}") console.print(f" state.type: {result.get('state', {}).get('type')}") # validate orchestration result if result.get("status") != "ACCEPT": console.print(f"[yellow]WARN[/yellow]: expected ACCEPT, got {result.get('status')}") # step 5: simulate user code execution console.print("\n[bold]5. (execute user code)[/bold]") if should_fail: console.print(" simulating failure...") error_message = "Flow run encountered an exception: ValueError('test error')" else: console.print(" simulating success...") error_message = None # step 6: set final state if should_fail: console.print("\n[bold]6. POST /flow_runs/{id}/set_state (FAILED)[/bold]") final_state = { "state": { "type": "FAILED", "name": "Failed", "timestamp": iso_now(), "message": error_message, }, "force": False, } else: console.print("\n[bold]6. POST /flow_runs/{id}/set_state (COMPLETED)[/bold]") final_state = { "state": { "type": "COMPLETED", "name": "Completed", "timestamp": iso_now(), "message": None, }, "force": False, } resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json=final_state) if resp.status_code != 200: console.print(f"[red]FAIL[/red]: {resp.status_code} {resp.text}") return False result = resp.json() console.print(f" status: {result.get('status')}") console.print(f" state.type: {result.get('state', {}).get('type')}") # verify final state console.print("\n[bold]7. GET /flow_runs/{id} (verify final state)[/bold]") resp = client.get(f"/flow_runs/{flow_run_id}") if resp.status_code != 200: console.print(f"[red]FAIL[/red]: {resp.status_code} {resp.text}") return False flow_run = resp.json() final_type = flow_run.get("state_type") expected_type = "FAILED" if should_fail else "COMPLETED" if final_type == expected_type: console.print(f" [green]state: {final_type} (correct)[/green]") else: console.print(f" [red]state: {final_type} (expected {expected_type})[/red]") return False return True def test_admin_endpoints(): """Test admin/health endpoints.""" console.print(Panel("testing admin endpoints", style="blue")) with httpx.Client(base_url=BASE_URL, timeout=10) as client: # health console.print("[bold]GET /health[/bold]") resp = client.get("/health") if resp.status_code != 200: console.print(f"[red]FAIL[/red]: {resp.status_code}") return False console.print(f" {resp.json()}") # version console.print("[bold]GET /admin/version[/bold]") resp = client.get("/admin/version") if resp.status_code != 200: console.print(f"[red]FAIL[/red]: {resp.status_code}") return False console.print(f" {resp.json()}") # csrf-token console.print("[bold]GET /csrf-token[/bold]") resp = client.get("/csrf-token") if resp.status_code != 200: console.print(f"[red]FAIL[/red]: {resp.status_code}") return False console.print(f" token received") return True def test_filter_endpoints(): """Test filter endpoints.""" console.print(Panel("testing filter endpoints", style="blue")) with httpx.Client(base_url=BASE_URL, timeout=10) as client: # flows/filter console.print("[bold]POST /flows/filter[/bold]") resp = client.post("/flows/filter", json={}) if resp.status_code != 200: console.print(f"[red]FAIL[/red]: {resp.status_code}") return False flows = resp.json() console.print(f" {len(flows)} flows") # flow_runs/filter console.print("[bold]POST /flow_runs/filter[/bold]") resp = client.post("/flow_runs/filter", json={}) if resp.status_code != 200: console.print(f"[red]FAIL[/red]: {resp.status_code}") return False runs = resp.json() console.print(f" {len(runs)} flow runs") # task_runs/filter console.print("[bold]POST /task_runs/filter[/bold]") resp = client.post("/task_runs/filter", json={}) if resp.status_code != 200: console.print(f"[red]FAIL[/red]: {resp.status_code}") return False tasks = resp.json() console.print(f" {len(tasks)} task runs") return True def test_task_run_sequence(): """Test task run API sequence.""" console.print(Panel("testing task run sequence", style="blue")) with httpx.Client(base_url=BASE_URL, timeout=10) as client: # create task run console.print("[bold]POST /task_runs/[/bold]") task_run_create = { "task_key": "test-task", "dynamic_key": f"key-{uuid.uuid4().hex[:8]}", "name": f"task-{uuid.uuid4().hex[:8]}", "state": {"type": "PENDING", "name": "Pending"}, } resp = client.post("/task_runs/", json=task_run_create) if resp.status_code not in (200, 201): console.print(f"[red]FAIL[/red]: {resp.status_code} {resp.text}") return False task_run = resp.json() task_run_id = task_run.get("id") console.print(f" task_run_id: {task_run_id}") # read task run console.print(f"[bold]GET /task_runs/{task_run_id}[/bold]") resp = client.get(f"/task_runs/{task_run_id}") if resp.status_code != 200: console.print(f"[red]FAIL[/red]: {resp.status_code}") return False console.print(f" state: {resp.json().get('state_type')}") # set state to RUNNING console.print(f"[bold]POST /task_runs/{task_run_id}/set_state (RUNNING)[/bold]") resp = client.post(f"/task_runs/{task_run_id}/set_state", json={ "state": {"type": "RUNNING", "name": "Running"} }) if resp.status_code != 200: console.print(f"[red]FAIL[/red]: {resp.status_code}") return False console.print(f" status: {resp.json().get('status')}") # set state to COMPLETED console.print(f"[bold]POST /task_runs/{task_run_id}/set_state (COMPLETED)[/bold]") resp = client.post(f"/task_runs/{task_run_id}/set_state", json={ "state": {"type": "COMPLETED", "name": "Completed"} }) if resp.status_code != 200: console.print(f"[red]FAIL[/red]: {resp.status_code}") return False console.print(f" status: {resp.json().get('status')}") return True def test_logs_endpoint(): """Test logs endpoint.""" console.print(Panel("testing logs endpoint", style="blue")) with httpx.Client(base_url=BASE_URL, timeout=10) as client: console.print("[bold]POST /logs/[/bold]") logs = [ {"level": 20, "message": "test log 1", "name": "test", "timestamp": iso_now()}, {"level": 30, "message": "test log 2", "name": "test", "timestamp": iso_now()}, ] resp = client.post("/logs/", json=logs) if resp.status_code not in (200, 201, 204): console.print(f"[red]FAIL[/red]: {resp.status_code} {resp.text}") return False console.print(f" {len(logs)} logs sent") return True def main(): console.print("\n[bold cyan]prefect-server API test[/bold cyan]\n") results = [] # admin endpoints results.append(("admin", test_admin_endpoints())) console.print() # flow sequence (happy path) results.append(("flow (success)", test_flow_sequence("happy-flow", should_fail=False))) console.print() # flow sequence (failure path) results.append(("flow (failure)", test_flow_sequence("sad-flow", should_fail=True))) console.print() # task run sequence results.append(("task_run", test_task_run_sequence())) console.print() # filter endpoints results.append(("filter", test_filter_endpoints())) console.print() # logs endpoint results.append(("logs", test_logs_endpoint())) console.print() # summary console.print("=" * 50) all_passed = all(r[1] for r in results) for name, passed in results: status = "[green]✓[/green]" if passed else "[red]✗[/red]" console.print(f" {status} {name}") if all_passed: console.print("\n[bold green]all tests passed[/bold green]") sys.exit(0) else: console.print("\n[bold red]some tests failed[/bold red]") sys.exit(1) if __name__ == "__main__": main()