prefect server in zig
1#!/usr/bin/env -S uv run --script --quiet
2# /// script
3# requires-python = ">=3.12"
4# dependencies = ["httpx"]
5# ///
6"""
7test script for RetryFailedFlows orchestration rule
8
9tests:
101. flow run accepts empirical_policy with retry settings
112. RUNNING -> FAILED transition triggers retry (returns AwaitingRetry state)
123. retry respects max retries limit
13
14usage:
15 PREFECT_API_URL=http://localhost:4200/api ./scripts/test-retry
16"""
17import os
18import httpx
19
20API_URL = os.environ.get("PREFECT_API_URL", "http://localhost:4200/api")
21
22
23def test_empirical_policy_accepted():
24 """test that flow run accepts empirical_policy"""
25 print("\n=== test_empirical_policy_accepted ===")
26
27 # create flow (or get existing)
28 import uuid
29 flow_name = f"retry-test-flow-{uuid.uuid4().hex[:8]}"
30 resp = httpx.post(f"{API_URL}/flows/", json={"name": flow_name})
31 assert resp.status_code in (200, 201), f"failed to create flow: {resp.text}"
32 flow_id = resp.json()["id"]
33 print(f" created flow: {flow_id}")
34
35 # create flow run with retry settings
36 resp = httpx.post(f"{API_URL}/flow_runs/", json={
37 "flow_id": flow_id,
38 "name": "retry-test-run",
39 "empirical_policy": {"retries": 3, "retry_delay": 5}
40 })
41 assert resp.status_code == 201, f"failed to create flow run: {resp.text}"
42 run = resp.json()
43 print(f" created flow run: {run['id']}")
44 print(f" empirical_policy: {run.get('empirical_policy')}")
45
46 assert run.get("empirical_policy") == {"retries": 3, "retry_delay": 5}, \
47 f"empirical_policy not stored correctly: {run.get('empirical_policy')}"
48 print("✓ empirical_policy accepted and returned")
49
50 return flow_id, run["id"]
51
52
53def test_retry_triggers_on_failure(flow_id: str, run_id: str):
54 """test that RUNNING -> FAILED triggers retry"""
55 print("\n=== test_retry_triggers_on_failure ===")
56
57 # transition to RUNNING
58 resp = httpx.post(f"{API_URL}/flow_runs/{run_id}/set_state", json={
59 "state": {"type": "RUNNING", "name": "Running"}
60 })
61 assert resp.status_code == 200, f"failed to set RUNNING: {resp.text}"
62 result = resp.json()
63 print(f" set RUNNING: status={result['status']}")
64
65 # get run to check run_count
66 resp = httpx.get(f"{API_URL}/flow_runs/{run_id}")
67 run = resp.json()
68 print(f" run_count after RUNNING: {run['run_count']}")
69
70 # transition to FAILED - should trigger retry
71 resp = httpx.post(f"{API_URL}/flow_runs/{run_id}/set_state", json={
72 "state": {"type": "FAILED", "name": "Failed"}
73 })
74 assert resp.status_code == 200, f"failed to set FAILED: {resp.text}"
75 result = resp.json()
76 print(f" set FAILED: status={result['status']}, state_type={result['state']['type']}")
77
78 # check if retry was triggered (status=REJECT, state=SCHEDULED/AwaitingRetry)
79 if result["status"] == "REJECT" and result["state"]["type"] == "SCHEDULED":
80 print(f" state_name: {result['state']['name']}")
81 assert result["state"]["name"] == "AwaitingRetry", \
82 f"expected AwaitingRetry, got {result['state']['name']}"
83 print("✓ retry triggered on RUNNING -> FAILED")
84 return True
85 else:
86 print(f"○ retry not triggered: status={result['status']}, state={result['state']['type']}")
87 return False
88
89
90def test_retry_exhaustion(flow_id: str):
91 """test that retries are exhausted after max attempts"""
92 print("\n=== test_retry_exhaustion ===")
93
94 # create flow run with 1 retry
95 resp = httpx.post(f"{API_URL}/flow_runs/", json={
96 "flow_id": flow_id,
97 "name": "retry-exhaust-run",
98 "empirical_policy": {"retries": 1, "retry_delay": 0}
99 })
100 run_id = resp.json()["id"]
101 print(f" created flow run with 1 retry: {run_id}")
102
103 # first attempt: RUNNING -> FAILED (should retry)
104 httpx.post(f"{API_URL}/flow_runs/{run_id}/set_state", json={
105 "state": {"type": "RUNNING", "name": "Running"}
106 })
107 resp = httpx.post(f"{API_URL}/flow_runs/{run_id}/set_state", json={
108 "state": {"type": "FAILED", "name": "Failed"}
109 })
110 result = resp.json()
111 print(f" attempt 1 FAILED: status={result['status']}, state={result['state']['type']}")
112
113 if result["status"] == "REJECT":
114 # retry was triggered, now simulate second attempt
115 # need to set to RUNNING again (from SCHEDULED)
116 httpx.post(f"{API_URL}/flow_runs/{run_id}/set_state", json={
117 "state": {"type": "PENDING", "name": "Pending"}
118 })
119 httpx.post(f"{API_URL}/flow_runs/{run_id}/set_state", json={
120 "state": {"type": "RUNNING", "name": "Running"}
121 })
122
123 # second FAILED - should NOT retry (exhausted)
124 resp = httpx.post(f"{API_URL}/flow_runs/{run_id}/set_state", json={
125 "state": {"type": "FAILED", "name": "Failed"}
126 })
127 result = resp.json()
128 print(f" attempt 2 FAILED: status={result['status']}, state={result['state']['type']}")
129
130 if result["status"] == "ACCEPT" and result["state"]["type"] == "FAILED":
131 print("✓ retries exhausted correctly")
132 return True
133 else:
134 print(f"○ unexpected result after retry exhaustion")
135 return False
136
137 return False
138
139
140def main():
141 print(f"api url: {API_URL}")
142
143 # check server health
144 resp = httpx.get(f"{API_URL}/health")
145 assert resp.status_code == 200, f"server not healthy: {resp.text}"
146 print("✓ server healthy")
147
148 flow_id, run_id = test_empirical_policy_accepted()
149 test_retry_triggers_on_failure(flow_id, run_id)
150 test_retry_exhaustion(flow_id)
151
152 print("\n=== all tests passed ===")
153
154
155if __name__ == "__main__":
156 main()