prefect server in zig
at main 156 lines 5.7 kB view raw
1#!/usr/bin/env -S uv run --script --quiet 2# /// script 3# requires-python = ">=3.12" 4# dependencies = ["httpx"] 5# /// 6""" 7test script for RetryFailedFlows orchestration rule 8 9tests: 101. flow run accepts empirical_policy with retry settings 112. RUNNING -> FAILED transition triggers retry (returns AwaitingRetry state) 123. retry respects max retries limit 13 14usage: 15 PREFECT_API_URL=http://localhost:4200/api ./scripts/test-retry 16""" 17import os 18import httpx 19 20API_URL = os.environ.get("PREFECT_API_URL", "http://localhost:4200/api") 21 22 23def test_empirical_policy_accepted(): 24 """test that flow run accepts empirical_policy""" 25 print("\n=== test_empirical_policy_accepted ===") 26 27 # create flow (or get existing) 28 import uuid 29 flow_name = f"retry-test-flow-{uuid.uuid4().hex[:8]}" 30 resp = httpx.post(f"{API_URL}/flows/", json={"name": flow_name}) 31 assert resp.status_code in (200, 201), f"failed to create flow: {resp.text}" 32 flow_id = resp.json()["id"] 33 print(f" created flow: {flow_id}") 34 35 # create flow run with retry settings 36 resp = httpx.post(f"{API_URL}/flow_runs/", json={ 37 "flow_id": flow_id, 38 "name": "retry-test-run", 39 "empirical_policy": {"retries": 3, "retry_delay": 5} 40 }) 41 assert resp.status_code == 201, f"failed to create flow run: {resp.text}" 42 run = resp.json() 43 print(f" created flow run: {run['id']}") 44 print(f" empirical_policy: {run.get('empirical_policy')}") 45 46 assert run.get("empirical_policy") == {"retries": 3, "retry_delay": 5}, \ 47 f"empirical_policy not stored correctly: {run.get('empirical_policy')}" 48 print("✓ empirical_policy accepted and returned") 49 50 return flow_id, run["id"] 51 52 53def test_retry_triggers_on_failure(flow_id: str, run_id: str): 54 """test that RUNNING -> FAILED triggers retry""" 55 print("\n=== test_retry_triggers_on_failure ===") 56 57 # transition to RUNNING 58 resp = httpx.post(f"{API_URL}/flow_runs/{run_id}/set_state", json={ 59 "state": {"type": "RUNNING", "name": "Running"} 60 }) 61 assert resp.status_code == 200, f"failed to set RUNNING: {resp.text}" 62 result = resp.json() 63 print(f" set RUNNING: status={result['status']}") 64 65 # get run to check run_count 66 resp = httpx.get(f"{API_URL}/flow_runs/{run_id}") 67 run = resp.json() 68 print(f" run_count after RUNNING: {run['run_count']}") 69 70 # transition to FAILED - should trigger retry 71 resp = httpx.post(f"{API_URL}/flow_runs/{run_id}/set_state", json={ 72 "state": {"type": "FAILED", "name": "Failed"} 73 }) 74 assert resp.status_code == 200, f"failed to set FAILED: {resp.text}" 75 result = resp.json() 76 print(f" set FAILED: status={result['status']}, state_type={result['state']['type']}") 77 78 # check if retry was triggered (status=REJECT, state=SCHEDULED/AwaitingRetry) 79 if result["status"] == "REJECT" and result["state"]["type"] == "SCHEDULED": 80 print(f" state_name: {result['state']['name']}") 81 assert result["state"]["name"] == "AwaitingRetry", \ 82 f"expected AwaitingRetry, got {result['state']['name']}" 83 print("✓ retry triggered on RUNNING -> FAILED") 84 return True 85 else: 86 print(f"○ retry not triggered: status={result['status']}, state={result['state']['type']}") 87 return False 88 89 90def test_retry_exhaustion(flow_id: str): 91 """test that retries are exhausted after max attempts""" 92 print("\n=== test_retry_exhaustion ===") 93 94 # create flow run with 1 retry 95 resp = httpx.post(f"{API_URL}/flow_runs/", json={ 96 "flow_id": flow_id, 97 "name": "retry-exhaust-run", 98 "empirical_policy": {"retries": 1, "retry_delay": 0} 99 }) 100 run_id = resp.json()["id"] 101 print(f" created flow run with 1 retry: {run_id}") 102 103 # first attempt: RUNNING -> FAILED (should retry) 104 httpx.post(f"{API_URL}/flow_runs/{run_id}/set_state", json={ 105 "state": {"type": "RUNNING", "name": "Running"} 106 }) 107 resp = httpx.post(f"{API_URL}/flow_runs/{run_id}/set_state", json={ 108 "state": {"type": "FAILED", "name": "Failed"} 109 }) 110 result = resp.json() 111 print(f" attempt 1 FAILED: status={result['status']}, state={result['state']['type']}") 112 113 if result["status"] == "REJECT": 114 # retry was triggered, now simulate second attempt 115 # need to set to RUNNING again (from SCHEDULED) 116 httpx.post(f"{API_URL}/flow_runs/{run_id}/set_state", json={ 117 "state": {"type": "PENDING", "name": "Pending"} 118 }) 119 httpx.post(f"{API_URL}/flow_runs/{run_id}/set_state", json={ 120 "state": {"type": "RUNNING", "name": "Running"} 121 }) 122 123 # second FAILED - should NOT retry (exhausted) 124 resp = httpx.post(f"{API_URL}/flow_runs/{run_id}/set_state", json={ 125 "state": {"type": "FAILED", "name": "Failed"} 126 }) 127 result = resp.json() 128 print(f" attempt 2 FAILED: status={result['status']}, state={result['state']['type']}") 129 130 if result["status"] == "ACCEPT" and result["state"]["type"] == "FAILED": 131 print("✓ retries exhausted correctly") 132 return True 133 else: 134 print(f"○ unexpected result after retry exhaustion") 135 return False 136 137 return False 138 139 140def main(): 141 print(f"api url: {API_URL}") 142 143 # check server health 144 resp = httpx.get(f"{API_URL}/health") 145 assert resp.status_code == 200, f"server not healthy: {resp.text}" 146 print("✓ server healthy") 147 148 flow_id, run_id = test_empirical_policy_accepted() 149 test_retry_triggers_on_failure(flow_id, run_id) 150 test_retry_exhaustion(flow_id) 151 152 print("\n=== all tests passed ===") 153 154 155if __name__ == "__main__": 156 main()