#!/usr/bin/env -S uv run --script --quiet # /// script # requires-python = ">=3.12" # dependencies = ["httpx"] # /// """ test script for RetryFailedFlows orchestration rule tests: 1. flow run accepts empirical_policy with retry settings 2. RUNNING -> FAILED transition triggers retry (returns AwaitingRetry state) 3. retry respects max retries limit usage: PREFECT_API_URL=http://localhost:4200/api ./scripts/test-retry """ import os import httpx API_URL = os.environ.get("PREFECT_API_URL", "http://localhost:4200/api") def test_empirical_policy_accepted(): """test that flow run accepts empirical_policy""" print("\n=== test_empirical_policy_accepted ===") # create flow (or get existing) import uuid flow_name = f"retry-test-flow-{uuid.uuid4().hex[:8]}" resp = httpx.post(f"{API_URL}/flows/", json={"name": flow_name}) assert resp.status_code in (200, 201), f"failed to create flow: {resp.text}" flow_id = resp.json()["id"] print(f" created flow: {flow_id}") # create flow run with retry settings resp = httpx.post(f"{API_URL}/flow_runs/", json={ "flow_id": flow_id, "name": "retry-test-run", "empirical_policy": {"retries": 3, "retry_delay": 5} }) assert resp.status_code == 201, f"failed to create flow run: {resp.text}" run = resp.json() print(f" created flow run: {run['id']}") print(f" empirical_policy: {run.get('empirical_policy')}") assert run.get("empirical_policy") == {"retries": 3, "retry_delay": 5}, \ f"empirical_policy not stored correctly: {run.get('empirical_policy')}" print("✓ empirical_policy accepted and returned") return flow_id, run["id"] def test_retry_triggers_on_failure(flow_id: str, run_id: str): """test that RUNNING -> FAILED triggers retry""" print("\n=== test_retry_triggers_on_failure ===") # transition to RUNNING resp = httpx.post(f"{API_URL}/flow_runs/{run_id}/set_state", json={ "state": {"type": "RUNNING", "name": "Running"} }) assert resp.status_code == 200, f"failed to set RUNNING: {resp.text}" result = resp.json() print(f" set RUNNING: status={result['status']}") # get run to check run_count resp = httpx.get(f"{API_URL}/flow_runs/{run_id}") run = resp.json() print(f" run_count after RUNNING: {run['run_count']}") # transition to FAILED - should trigger retry resp = httpx.post(f"{API_URL}/flow_runs/{run_id}/set_state", json={ "state": {"type": "FAILED", "name": "Failed"} }) assert resp.status_code == 200, f"failed to set FAILED: {resp.text}" result = resp.json() print(f" set FAILED: status={result['status']}, state_type={result['state']['type']}") # check if retry was triggered (status=REJECT, state=SCHEDULED/AwaitingRetry) if result["status"] == "REJECT" and result["state"]["type"] == "SCHEDULED": print(f" state_name: {result['state']['name']}") assert result["state"]["name"] == "AwaitingRetry", \ f"expected AwaitingRetry, got {result['state']['name']}" print("✓ retry triggered on RUNNING -> FAILED") return True else: print(f"○ retry not triggered: status={result['status']}, state={result['state']['type']}") return False def test_retry_exhaustion(flow_id: str): """test that retries are exhausted after max attempts""" print("\n=== test_retry_exhaustion ===") # create flow run with 1 retry resp = httpx.post(f"{API_URL}/flow_runs/", json={ "flow_id": flow_id, "name": "retry-exhaust-run", "empirical_policy": {"retries": 1, "retry_delay": 0} }) run_id = resp.json()["id"] print(f" created flow run with 1 retry: {run_id}") # first attempt: RUNNING -> FAILED (should retry) httpx.post(f"{API_URL}/flow_runs/{run_id}/set_state", json={ "state": {"type": "RUNNING", "name": "Running"} }) resp = httpx.post(f"{API_URL}/flow_runs/{run_id}/set_state", json={ "state": {"type": "FAILED", "name": "Failed"} }) result = resp.json() print(f" attempt 1 FAILED: status={result['status']}, state={result['state']['type']}") if result["status"] == "REJECT": # retry was triggered, now simulate second attempt # need to set to RUNNING again (from SCHEDULED) httpx.post(f"{API_URL}/flow_runs/{run_id}/set_state", json={ "state": {"type": "PENDING", "name": "Pending"} }) httpx.post(f"{API_URL}/flow_runs/{run_id}/set_state", json={ "state": {"type": "RUNNING", "name": "Running"} }) # second FAILED - should NOT retry (exhausted) resp = httpx.post(f"{API_URL}/flow_runs/{run_id}/set_state", json={ "state": {"type": "FAILED", "name": "Failed"} }) result = resp.json() print(f" attempt 2 FAILED: status={result['status']}, state={result['state']['type']}") if result["status"] == "ACCEPT" and result["state"]["type"] == "FAILED": print("✓ retries exhausted correctly") return True else: print(f"○ unexpected result after retry exhaustion") return False return False def main(): print(f"api url: {API_URL}") # check server health resp = httpx.get(f"{API_URL}/health") assert resp.status_code == 200, f"server not healthy: {resp.text}" print("✓ server healthy") flow_id, run_id = test_empirical_policy_accepted() test_retry_triggers_on_failure(flow_id, run_id) test_retry_exhaustion(flow_id) print("\n=== all tests passed ===") if __name__ == "__main__": main()