#!/usr/bin/env -S uv run --script --quiet # /// script # requires-python = ">=3.12" # dependencies = ["httpx", "rich"] # /// """ Functional test suite for prefect-server API. Tests API correctness by exercising all endpoints with expected request/response patterns. Includes scheduler integration tests (which have intentional delays to verify background services). For performance benchmarking, use ./scripts/benchmark instead. Usage: ./scripts/test-api-sequence # human-readable output ./scripts/test-api-sequence --json # machine-readable for CI ./scripts/test-api-sequence --quiet # minimal output """ import json as json_lib import os import sys import time import uuid from dataclasses import dataclass, field from typing import Callable import httpx from rich.console import Console from rich.panel import Panel from rich.table import Table console = Console() BASE_URL = os.environ.get("PREFECT_API_URL", "http://localhost:4200/api") QUIET = "--json" in sys.argv or "--quiet" in sys.argv @dataclass class TestResult: name: str passed: bool duration_ms: float requests: int = 0 error: str | None = None class CountingClient(httpx.Client): """HTTP client that counts requests.""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.request_count = 0 def request(self, *args, **kwargs): self.request_count += 1 return super().request(*args, **kwargs) def validate_response(data: dict, required_fields: list[str], field_types: dict | None = None) -> bool: """Validate response contains required fields with expected types.""" for field in required_fields: if field not in data: if not QUIET: console.print(f"[red]VALIDATION[/red]: missing field '{field}'") return False if field_types: for field, expected_type in field_types.items(): if field in data and data[field] is not None: if not isinstance(data[field], expected_type): if not QUIET: console.print(f"[red]VALIDATION[/red]: field '{field}' expected {expected_type.__name__}, got {type(data[field]).__name__}") return False return True def run_test(name: str, test_fn: Callable[[CountingClient], bool]) -> TestResult: """Run a test function with timing and request counting.""" if not QUIET: console.print(Panel(f"testing {name}", style="blue")) client = CountingClient(base_url=BASE_URL, timeout=10) start = time.perf_counter() try: passed = test_fn(client) duration_ms = (time.perf_counter() - start) * 1000 return TestResult( name=name, passed=passed, duration_ms=duration_ms, requests=client.request_count, ) except Exception as e: duration_ms = (time.perf_counter() - start) * 1000 return TestResult( name=name, passed=False, duration_ms=duration_ms, requests=client.request_count, error=str(e), ) finally: client.close() # ---------- test functions ---------- def test_admin(client: CountingClient) -> bool: """Test admin/health endpoints.""" # health if not QUIET: console.print("[bold]GET /health[/bold]") resp = client.get("/health") if resp.status_code != 200: if not QUIET: console.print(f"[red]FAIL[/red]: {resp.status_code}") return False if not QUIET: console.print(f" {resp.text}") # version if not QUIET: console.print("[bold]GET /admin/version[/bold]") resp = client.get("/admin/version") if resp.status_code != 200: if not QUIET: console.print(f"[red]FAIL[/red]: {resp.status_code}") return False if not QUIET: console.print(f" {resp.json()}") # csrf-token if not QUIET: console.print("[bold]GET /csrf-token[/bold]") resp = client.get("/csrf-token", params={"client": "test-client"}) if resp.status_code == 200: if not QUIET: console.print(f" token received") elif resp.status_code == 422: if not QUIET: console.print(f" csrf protection disabled (ok)") else: if not QUIET: console.print(f"[red]FAIL[/red]: {resp.status_code}") return False return True def test_flow_run(client: CountingClient, should_fail: bool = False) -> bool: """Test flow run lifecycle.""" suffix = "fail" if should_fail else "success" if not QUIET: console.print(f"server: {BASE_URL}\n") # create flow if not QUIET: console.print("[bold]1. POST /flows/[/bold]") resp = client.post("/flows/", json={"name": f"bench-flow-{suffix}"}) if resp.status_code not in (200, 201): if not QUIET: console.print(f"[red]FAIL[/red]: {resp.status_code} {resp.text}") return False flow = resp.json() if not validate_response(flow, ["id", "name", "created", "updated"], {"id": str, "name": str}): return False if not QUIET: console.print(f" flow_id: {flow.get('id')}") # create flow run if not QUIET: console.print("\n[bold]2. POST /flow_runs/[/bold]") resp = client.post("/flow_runs/", json={ "flow_id": flow["id"], "name": f"run-{uuid.uuid4().hex[:8]}", "state": {"type": "PENDING", "name": "Pending"}, }) if resp.status_code not in (200, 201): if not QUIET: console.print(f"[red]FAIL[/red]: {resp.status_code} {resp.text}") return False flow_run = resp.json() if not validate_response(flow_run, ["id", "name", "flow_id", "state_type"], {"id": str, "name": str}): return False flow_run_id = flow_run.get("id") if not QUIET: console.print(f" flow_run_id: {flow_run_id}") # read flow run if not QUIET: console.print("\n[bold]3. GET /flow_runs/{id}[/bold]") resp = client.get(f"/flow_runs/{flow_run_id}") if resp.status_code != 200: if not QUIET: console.print(f"[red]FAIL[/red]: {resp.status_code}") return False if not QUIET: console.print(f" state: {resp.json().get('state_type')}") # set RUNNING if not QUIET: console.print("\n[bold]4. POST /flow_runs/{id}/set_state (RUNNING)[/bold]") resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={ "state": {"type": "RUNNING", "name": "Running"}, "force": False, }) if resp.status_code not in (200, 201): if not QUIET: console.print(f"[red]FAIL[/red]: {resp.status_code}") return False if not QUIET: console.print(f" status: {resp.json().get('status')}") # set final state final_type = "FAILED" if should_fail else "COMPLETED" final_name = "Failed" if should_fail else "Completed" if not QUIET: console.print(f"\n[bold]5. POST /flow_runs/{{id}}/set_state ({final_type})[/bold]") resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={ "state": {"type": final_type, "name": final_name}, "force": False, }) if resp.status_code not in (200, 201): if not QUIET: console.print(f"[red]FAIL[/red]: {resp.status_code}") return False if not QUIET: console.print(f" status: {resp.json().get('status')}") # verify if not QUIET: console.print("\n[bold]6. GET /flow_runs/{id} (verify)[/bold]") resp = client.get(f"/flow_runs/{flow_run_id}") if resp.status_code != 200: return False actual_type = resp.json().get("state_type") if actual_type != final_type: if not QUIET: console.print(f"[red]FAIL[/red]: expected {final_type}, got {actual_type}") return False if not QUIET: console.print(f" [green]state: {actual_type} (correct)[/green]") return True def test_orchestration_rules(client: CountingClient) -> bool: """Test orchestration rules (PreventPendingTransitions).""" if not QUIET: console.print(f"server: {BASE_URL}\n") # 1. create flow if not QUIET: console.print("[bold]1. create flow[/bold]") resp = client.post("/flows/", json={"name": f"orchestration-test-{uuid.uuid4().hex[:8]}"}) if resp.status_code not in (200, 201): if not QUIET: console.print(f"[red]FAIL[/red]: create flow {resp.status_code}") return False flow_id = resp.json()["id"] if not QUIET: console.print(f" flow_id: {flow_id}") # 2. create flow run in PENDING state if not QUIET: console.print("\n[bold]2. create flow run (PENDING)[/bold]") resp = client.post("/flow_runs/", json={ "flow_id": flow_id, "name": f"orch-run-{uuid.uuid4().hex[:8]}", "state": {"type": "PENDING", "name": "Pending"}, }) if resp.status_code not in (200, 201): if not QUIET: console.print(f"[red]FAIL[/red]: create flow run {resp.status_code}") return False flow_run_id = resp.json()["id"] if not QUIET: console.print(f" flow_run_id: {flow_run_id}") # 3. try PENDING → PENDING (should be REJECT) if not QUIET: console.print("\n[bold]3. PENDING → PENDING (expect REJECT)[/bold]") resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={ "state": {"type": "PENDING", "name": "Pending"}, }) if resp.status_code not in (200, 201): if not QUIET: console.print(f"[red]FAIL[/red]: unexpected status code {resp.status_code}") return False result = resp.json() status = result.get("status") if status != "REJECT": if not QUIET: console.print(f"[red]FAIL[/red]: expected REJECT, got {status}") return False if not QUIET: reason = result.get("details", {}).get("reason", "") console.print(f" [green]status: {status} (correct)[/green]") console.print(f" reason: {reason}") # 4. PENDING → RUNNING (should be ACCEPT) if not QUIET: console.print("\n[bold]4. PENDING → RUNNING (expect ACCEPT)[/bold]") resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={ "state": {"type": "RUNNING", "name": "Running"}, }) if resp.status_code not in (200, 201): if not QUIET: console.print(f"[red]FAIL[/red]: {resp.status_code}") return False result = resp.json() status = result.get("status") if status != "ACCEPT": if not QUIET: console.print(f"[red]FAIL[/red]: expected ACCEPT, got {status}") return False if not QUIET: console.print(f" [green]status: {status} (correct)[/green]") # 5. try RUNNING → PENDING (should be REJECT) if not QUIET: console.print("\n[bold]5. RUNNING → PENDING (expect REJECT)[/bold]") resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={ "state": {"type": "PENDING", "name": "Pending"}, }) if resp.status_code not in (200, 201): if not QUIET: console.print(f"[red]FAIL[/red]: unexpected status code {resp.status_code}") return False result = resp.json() status = result.get("status") if status != "REJECT": if not QUIET: console.print(f"[red]FAIL[/red]: expected REJECT, got {status}") return False if not QUIET: console.print(f" [green]status: {status} (correct)[/green]") # 6. verify run is still RUNNING (reject didn't change state) if not QUIET: console.print("\n[bold]6. verify state unchanged after REJECT[/bold]") resp = client.get(f"/flow_runs/{flow_run_id}") if resp.status_code != 200: return False actual_state = resp.json().get("state_type") if actual_state != "RUNNING": if not QUIET: console.print(f"[red]FAIL[/red]: expected RUNNING, got {actual_state}") return False if not QUIET: console.print(f" [green]state: {actual_state} (correct - unchanged)[/green]") # 7. complete normally if not QUIET: console.print("\n[bold]7. RUNNING → COMPLETED (expect ACCEPT)[/bold]") resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={ "state": {"type": "COMPLETED", "name": "Completed"}, }) if resp.status_code not in (200, 201): return False status = resp.json().get("status") if status != "ACCEPT": if not QUIET: console.print(f"[red]FAIL[/red]: expected ACCEPT, got {status}") return False if not QUIET: console.print(f" [green]status: {status} (correct)[/green]") # ========================================================================= # CopyScheduledTime rule tests # ========================================================================= # 8. create a SCHEDULED flow run with next_scheduled_start_time scheduled_time = "2025-06-15T10:00:00Z" if not QUIET: console.print(f"\n[bold]8. create SCHEDULED run (next_scheduled_start_time={scheduled_time})[/bold]") resp = client.post("/flow_runs/", json={ "flow_id": flow_id, "name": f"scheduled-run-{uuid.uuid4().hex[:8]}", "state": {"type": "SCHEDULED", "name": "Scheduled"}, "next_scheduled_start_time": scheduled_time, }) if resp.status_code not in (200, 201): if not QUIET: console.print(f"[red]FAIL[/red]: create scheduled run {resp.status_code}") return False scheduled_run_id = resp.json()["id"] if not QUIET: console.print(f" scheduled_run_id: {scheduled_run_id}") # 9. transition SCHEDULED → PENDING (CopyScheduledTime should copy scheduled_time) if not QUIET: console.print("\n[bold]9. SCHEDULED → PENDING (expect scheduled_time copied)[/bold]") resp = client.post(f"/flow_runs/{scheduled_run_id}/set_state", json={ "state": {"type": "PENDING", "name": "Pending"}, }) if resp.status_code not in (200, 201): if not QUIET: console.print(f"[red]FAIL[/red]: set_state {resp.status_code}") return False status = resp.json().get("status") if status != "ACCEPT": if not QUIET: console.print(f"[red]FAIL[/red]: expected ACCEPT, got {status}") return False if not QUIET: console.print(f" [green]status: {status} (correct)[/green]") # 10. verify expected_start_time was set from next_scheduled_start_time if not QUIET: console.print("\n[bold]10. verify expected_start_time copied[/bold]") resp = client.get(f"/flow_runs/{scheduled_run_id}") if resp.status_code != 200: return False run_data = resp.json() expected_start = run_data.get("expected_start_time") if expected_start != scheduled_time: if not QUIET: console.print(f"[red]FAIL[/red]: expected_start_time={expected_start}, expected {scheduled_time}") return False if not QUIET: console.print(f" [green]expected_start_time: {expected_start} (correct)[/green]") return True def test_task_run(client: CountingClient) -> bool: """Test task run lifecycle.""" # create if not QUIET: console.print("[bold]POST /task_runs/[/bold]") resp = client.post("/task_runs/", json={ "task_key": "bench-task", "dynamic_key": f"key-{uuid.uuid4().hex[:8]}", "name": f"task-{uuid.uuid4().hex[:8]}", "state": {"type": "PENDING", "name": "Pending"}, }) if resp.status_code not in (200, 201): if not QUIET: console.print(f"[red]FAIL[/red]: {resp.status_code}") return False task_run_id = resp.json().get("id") if not QUIET: console.print(f" task_run_id: {task_run_id}") # read resp = client.get(f"/task_runs/{task_run_id}") if resp.status_code != 200: return False # RUNNING resp = client.post(f"/task_runs/{task_run_id}/set_state", json={ "state": {"type": "RUNNING", "name": "Running"}, "force": False, }) if resp.status_code not in (200, 201): return False if not QUIET: console.print(f" -> RUNNING: {resp.json().get('status')}") # COMPLETED resp = client.post(f"/task_runs/{task_run_id}/set_state", json={ "state": {"type": "COMPLETED", "name": "Completed"}, "force": False, }) if resp.status_code not in (200, 201): return False if not QUIET: console.print(f" -> COMPLETED: {resp.json().get('status')}") return True def test_filters(client: CountingClient) -> bool: """Test filter endpoints.""" for endpoint, label in [ ("/flows/filter", "flows"), ("/flow_runs/filter", "flow_runs"), ("/task_runs/filter", "task_runs"), ]: resp = client.post(endpoint, json={}) if resp.status_code != 200: if not QUIET: console.print(f"[red]FAIL[/red]: {endpoint} {resp.status_code}") return False if not QUIET: console.print(f" {label}: {len(resp.json())} items") return True def test_logs(client: CountingClient) -> bool: """Test logs endpoint.""" from datetime import datetime, timezone logs = [ {"level": 20, "message": "test log 1", "name": "test", "timestamp": datetime.now(timezone.utc).isoformat()}, {"level": 30, "message": "test log 2", "name": "test", "timestamp": datetime.now(timezone.utc).isoformat()}, ] resp = client.post("/logs/", json=logs) if resp.status_code not in (200, 201, 204): if not QUIET: console.print(f"[red]FAIL[/red]: {resp.status_code}") return False if not QUIET: console.print(f" {len(logs)} logs sent") return True def test_variables(client: CountingClient) -> bool: """Test variables API (CRUD).""" var_name = f"bench-var-{uuid.uuid4().hex[:8]}" # create if not QUIET: console.print("[bold]POST /variables/[/bold]") resp = client.post("/variables/", json={ "name": var_name, "value": {"nested": "object", "count": 42}, "tags": ["benchmark", "test"], }) if resp.status_code != 201: if not QUIET: console.print(f"[red]FAIL[/red]: create {resp.status_code}") return False variable = resp.json() if not validate_response(variable, ["id", "name", "value", "tags", "created", "updated"], {"id": str, "name": str, "tags": list}): return False var_id = variable.get("id") if not QUIET: console.print(f" created: {var_id}") # get by name resp = client.get(f"/variables/name/{var_name}") if resp.status_code != 200: return False if not QUIET: console.print(f" get by name: ok") # get by id resp = client.get(f"/variables/{var_id}") if resp.status_code != 200: return False # update by name resp = client.patch(f"/variables/name/{var_name}", json={"value": "updated"}) if resp.status_code != 204: return False if not QUIET: console.print(f" updated by name") # filter resp = client.post("/variables/filter", json={"limit": 10}) if resp.status_code != 200: return False if not QUIET: console.print(f" filter: {len(resp.json())} items") # count resp = client.post("/variables/count", json={}) if resp.status_code != 200: return False if not QUIET: console.print(f" count: {resp.text}") # duplicate name should fail resp = client.post("/variables/", json={"name": var_name, "value": "dupe"}) if resp.status_code != 409: if not QUIET: console.print(f"[red]FAIL[/red]: duplicate should return 409, got {resp.status_code}") return False if not QUIET: console.print(f" duplicate rejected: 409") # delete resp = client.delete(f"/variables/name/{var_name}") if resp.status_code != 204: return False if not QUIET: console.print(f" deleted") return True def test_blocks(client: CountingClient) -> bool: """Test blocks API (types, schemas, documents).""" slug = f"bench-block-{uuid.uuid4().hex[:8]}" # create block type if not QUIET: console.print("[bold]block_types[/bold]") resp = client.post("/block_types/", json={ "name": f"Bench Block {slug}", "slug": slug, "description": "benchmark block type", }) if resp.status_code not in (200, 201): if not QUIET: console.print(f"[red]FAIL[/red]: create block_type {resp.status_code}") return False block_type = resp.json() if not validate_response(block_type, ["id", "name", "slug"], {"id": str, "name": str, "slug": str}): return False block_type_id = block_type.get("id") if not QUIET: console.print(f" created: {block_type_id}") # get by slug resp = client.get(f"/block_types/slug/{slug}") if resp.status_code != 200: return False # create schema if not QUIET: console.print("[bold]block_schemas[/bold]") resp = client.post("/block_schemas/", json={ "block_type_id": block_type_id, "fields": {"properties": {"value": {"type": "string"}}}, "capabilities": ["test"], "version": "1.0.0", }) if resp.status_code not in (200, 201): if not QUIET: console.print(f"[red]FAIL[/red]: create block_schema {resp.status_code}") return False block_schema = resp.json() block_schema_id = block_schema.get("id") checksum = block_schema.get("checksum") if not QUIET: console.print(f" created: {block_schema_id}") # get by checksum resp = client.get(f"/block_schemas/checksum/{checksum}") if resp.status_code != 200: return False # create document if not QUIET: console.print("[bold]block_documents[/bold]") doc_name = f"bench-doc-{uuid.uuid4().hex[:8]}" resp = client.post("/block_documents/", json={ "name": doc_name, "block_type_id": block_type_id, "block_schema_id": block_schema_id, "data": {"value": "secret-value"}, }) if resp.status_code not in (200, 201): if not QUIET: console.print(f"[red]FAIL[/red]: create block_document {resp.status_code}") return False block_doc = resp.json() block_doc_id = block_doc.get("id") if not QUIET: console.print(f" created: {block_doc_id}") # get by id resp = client.get(f"/block_documents/{block_doc_id}") if resp.status_code != 200: return False # get by slug/name resp = client.get(f"/block_types/slug/{slug}/block_documents/name/{doc_name}") if resp.status_code != 200: return False # update resp = client.patch(f"/block_documents/{block_doc_id}", json={"data": {"value": "updated"}}) if resp.status_code != 204: return False if not QUIET: console.print(f" updated") # filters for endpoint in ["/block_types/filter", "/block_schemas/filter", "/block_documents/filter"]: resp = client.post(endpoint, json={}) if resp.status_code != 200: return False # delete resp = client.delete(f"/block_documents/{block_doc_id}") if resp.status_code != 204: return False if not QUIET: console.print(f" deleted") return True def test_work_pools(client: CountingClient) -> bool: """Test work pools API (pools, queues, workers).""" pool_name = f"test-pool-{uuid.uuid4().hex[:8]}" # create work pool if not QUIET: console.print("[bold]work_pools[/bold]") resp = client.post("/work_pools/", json={ "name": pool_name, "type": "process", "description": "test work pool", }) if resp.status_code not in (200, 201): if not QUIET: console.print(f"[red]FAIL[/red]: create work_pool {resp.status_code} {resp.text}") return False pool = resp.json() if not validate_response(pool, ["id", "name", "type", "status", "default_queue_id"], {"id": str, "name": str}): return False if not QUIET: console.print(f" created: {pool.get('id')}") # check default queue was created if not pool.get("default_queue_id"): if not QUIET: console.print("[red]FAIL[/red]: no default_queue_id") return False # get by name resp = client.get(f"/work_pools/{pool_name}") if resp.status_code != 200: if not QUIET: console.print(f"[red]FAIL[/red]: get work_pool {resp.status_code}") return False # update resp = client.patch(f"/work_pools/{pool_name}", json={"description": "updated"}) if resp.status_code != 204: if not QUIET: console.print(f"[red]FAIL[/red]: update work_pool {resp.status_code}") return False if not QUIET: console.print(" updated") # filter resp = client.post("/work_pools/filter", json={}) if resp.status_code != 200: if not QUIET: console.print(f"[red]FAIL[/red]: filter work_pools {resp.status_code}") return False pools = resp.json() if not isinstance(pools, list): return False # create queue if not QUIET: console.print("[bold]work_queues[/bold]") queue_name = f"test-queue-{uuid.uuid4().hex[:8]}" resp = client.post(f"/work_pools/{pool_name}/queues/", json={ "name": queue_name, "description": "test queue", "priority": 5, }) if resp.status_code not in (200, 201): if not QUIET: console.print(f"[red]FAIL[/red]: create work_queue {resp.status_code} {resp.text}") return False queue = resp.json() if not validate_response(queue, ["id", "name", "priority", "work_pool_id"], {"id": str, "priority": int}): return False if not QUIET: console.print(f" created: {queue.get('id')}") # get queue resp = client.get(f"/work_pools/{pool_name}/queues/{queue_name}") if resp.status_code != 200: if not QUIET: console.print(f"[red]FAIL[/red]: get work_queue {resp.status_code}") return False # filter queues resp = client.post(f"/work_pools/{pool_name}/queues/filter", json={}) if resp.status_code != 200: if not QUIET: console.print(f"[red]FAIL[/red]: filter work_queues {resp.status_code}") return False queues = resp.json() if not isinstance(queues, list) or len(queues) < 2: # default + our queue if not QUIET: console.print(f"[red]FAIL[/red]: expected at least 2 queues, got {len(queues) if isinstance(queues, list) else 'not a list'}") return False # worker heartbeat if not QUIET: console.print("[bold]workers[/bold]") resp = client.post(f"/work_pools/{pool_name}/workers/heartbeat", json={ "name": "test-worker-1", "heartbeat_interval_seconds": 30, }) if resp.status_code != 204: if not QUIET: console.print(f"[red]FAIL[/red]: worker heartbeat {resp.status_code} {resp.text}") return False if not QUIET: console.print(" heartbeat sent") # check pool status is now READY resp = client.get(f"/work_pools/{pool_name}") if resp.status_code != 200: return False pool = resp.json() if pool.get("status") != "READY": if not QUIET: console.print(f"[red]FAIL[/red]: expected pool status READY, got {pool.get('status')}") return False if not QUIET: console.print(" pool status: READY") # filter workers resp = client.post(f"/work_pools/{pool_name}/workers/filter", json={}) if resp.status_code != 200: if not QUIET: console.print(f"[red]FAIL[/red]: filter workers {resp.status_code}") return False workers = resp.json() if not isinstance(workers, list) or len(workers) < 1: return False # delete queue (not default) resp = client.delete(f"/work_pools/{pool_name}/queues/{queue_name}") if resp.status_code != 204: if not QUIET: console.print(f"[red]FAIL[/red]: delete work_queue {resp.status_code}") return False if not QUIET: console.print(" deleted queue") # delete pool resp = client.delete(f"/work_pools/{pool_name}") if resp.status_code != 204: if not QUIET: console.print(f"[red]FAIL[/red]: delete work_pool {resp.status_code}") return False if not QUIET: console.print(" deleted pool") return True def test_deployments(client: CountingClient) -> bool: """Test deployments API (deployments, schedules, create_flow_run).""" # create a flow first if not QUIET: console.print("[bold]setup: create flow[/bold]") resp = client.post("/flows/", json={"name": f"deploy-flow-{uuid.uuid4().hex[:8]}"}) if resp.status_code not in (200, 201): if not QUIET: console.print(f"[red]FAIL[/red]: create flow {resp.status_code}") return False flow = resp.json() flow_id = flow.get("id") flow_name = flow.get("name") # create deployment deployment_name = f"test-deployment-{uuid.uuid4().hex[:8]}" if not QUIET: console.print("[bold]deployments[/bold]") resp = client.post("/deployments/", json={ "name": deployment_name, "flow_id": flow_id, "description": "test deployment", "tags": ["test", "benchmark"], "parameters": {"key": "value"}, "schedules": [ {"schedule": {"interval": 3600}, "active": True}, ], }) if resp.status_code not in (200, 201): if not QUIET: console.print(f"[red]FAIL[/red]: create deployment {resp.status_code} {resp.text}") return False deployment = resp.json() if not validate_response(deployment, ["id", "name", "flow_id", "status", "schedules"], {"id": str, "name": str}): return False deployment_id = deployment.get("id") if not QUIET: console.print(f" created: {deployment_id}") # verify schedules were created schedules = deployment.get("schedules", []) if not isinstance(schedules, list) or len(schedules) != 1: if not QUIET: console.print(f"[red]FAIL[/red]: expected 1 schedule, got {len(schedules) if isinstance(schedules, list) else 'not a list'}") return False if not QUIET: console.print(f" schedules: {len(schedules)}") # get by id resp = client.get(f"/deployments/{deployment_id}") if resp.status_code != 200: if not QUIET: console.print(f"[red]FAIL[/red]: get deployment {resp.status_code}") return False # get by name resp = client.get(f"/deployments/name/{flow_name}/{deployment_name}") if resp.status_code != 200: if not QUIET: console.print(f"[red]FAIL[/red]: get deployment by name {resp.status_code}") return False if not QUIET: console.print(" get by name: ok") # update resp = client.patch(f"/deployments/{deployment_id}", json={"description": "updated"}) if resp.status_code != 204: if not QUIET: console.print(f"[red]FAIL[/red]: update deployment {resp.status_code}") return False if not QUIET: console.print(" updated") # filter resp = client.post("/deployments/filter", json={"limit": 10}) if resp.status_code != 200: if not QUIET: console.print(f"[red]FAIL[/red]: filter deployments {resp.status_code}") return False if not QUIET: console.print(f" filter: {len(resp.json())} items") # count resp = client.post("/deployments/count", json={}) if resp.status_code != 200: if not QUIET: console.print(f"[red]FAIL[/red]: count deployments {resp.status_code}") return False if not QUIET: console.print(f" count: {resp.text}") # pause resp = client.post(f"/deployments/{deployment_id}/pause_deployment", json={}) if resp.status_code != 204: if not QUIET: console.print(f"[red]FAIL[/red]: pause deployment {resp.status_code}") return False if not QUIET: console.print(" paused") # resume resp = client.post(f"/deployments/{deployment_id}/resume_deployment", json={}) if resp.status_code != 204: if not QUIET: console.print(f"[red]FAIL[/red]: resume deployment {resp.status_code}") return False if not QUIET: console.print(" resumed") # create flow run from deployment if not QUIET: console.print("[bold]create_flow_run[/bold]") resp = client.post(f"/deployments/{deployment_id}/create_flow_run", json={}) if resp.status_code not in (200, 201): if not QUIET: console.print(f"[red]FAIL[/red]: create flow run {resp.status_code} {resp.text}") return False flow_run = resp.json() if not validate_response(flow_run, ["id", "flow_id", "deployment_id"], {"id": str}): return False if flow_run.get("deployment_id") != deployment_id: if not QUIET: console.print(f"[red]FAIL[/red]: deployment_id mismatch") return False if not QUIET: console.print(f" created flow run: {flow_run.get('id')}") # schedules - list if not QUIET: console.print("[bold]deployment_schedules[/bold]") resp = client.get(f"/deployments/{deployment_id}/schedules") if resp.status_code != 200: if not QUIET: console.print(f"[red]FAIL[/red]: list schedules {resp.status_code}") return False schedules = resp.json() schedule_id = schedules[0].get("id") if schedules else None if not QUIET: console.print(f" list: {len(schedules)} schedules") # schedules - create resp = client.post(f"/deployments/{deployment_id}/schedules", json={ "schedule": {"cron": "0 0 * * *"}, "active": False, }) if resp.status_code not in (200, 201): if not QUIET: console.print(f"[red]FAIL[/red]: create schedule {resp.status_code}") return False created_schedules = resp.json() if not isinstance(created_schedules, list) or len(created_schedules) != 1: if not QUIET: console.print(f"[red]FAIL[/red]: expected 1 created schedule") return False new_schedule_id = created_schedules[0].get("id") if not QUIET: console.print(f" created schedule: {new_schedule_id}") # schedules - update resp = client.patch(f"/deployments/{deployment_id}/schedules/{new_schedule_id}", json={"active": True}) if resp.status_code != 204: if not QUIET: console.print(f"[red]FAIL[/red]: update schedule {resp.status_code}") return False if not QUIET: console.print(" updated schedule") # schedules - delete resp = client.delete(f"/deployments/{deployment_id}/schedules/{new_schedule_id}") if resp.status_code != 204: if not QUIET: console.print(f"[red]FAIL[/red]: delete schedule {resp.status_code}") return False if not QUIET: console.print(" deleted schedule") # delete deployment resp = client.delete(f"/deployments/{deployment_id}") if resp.status_code != 204: if not QUIET: console.print(f"[red]FAIL[/red]: delete deployment {resp.status_code}") return False if not QUIET: console.print(" deleted deployment") return True def test_scheduler_idempotency(client: CountingClient) -> bool: """Test that scheduler is idempotent - running twice doesn't create duplicates.""" import time as time_mod def fail(msg: str) -> bool: if not QUIET: console.print(f"[red]FAIL[/red]: {msg}") return False def log(msg: str) -> None: if not QUIET: console.print(msg) # setup: create flow, work pool, deployment with interval schedule log("[bold]setup[/bold]") resp = client.post("/flows/", json={"name": f"idem-flow-{uuid.uuid4().hex[:8]}"}) if resp.status_code not in (200, 201): return fail(f"create flow {resp.status_code}") flow_id = resp.json().get("id") pool_name = f"idem-pool-{uuid.uuid4().hex[:8]}" resp = client.post("/work_pools/", json={"name": pool_name, "type": "process"}) if resp.status_code not in (200, 201): return fail(f"create work_pool {resp.status_code}") log(f" pool: {pool_name}") # create deployment with interval schedule (every hour) resp = client.post("/deployments/", json={ "name": f"idem-deploy-{uuid.uuid4().hex[:8]}", "flow_id": flow_id, "work_pool_name": pool_name, "schedules": [{"schedule": {"interval": 3600}, "active": True}], # every hour }) if resp.status_code not in (200, 201): return fail(f"create deployment {resp.status_code}") deployment = resp.json() deployment_id = deployment.get("id") log(f" deployment: {deployment_id}") # wait for scheduler to run once (default 5s interval) log("[bold]waiting for scheduler (7s)...[/bold]") time_mod.sleep(7) # count runs after first scheduler tick resp = client.post("/flow_runs/filter", json={ "flow_runs": {"deployment_id": {"any_": [deployment_id]}}, "limit": 100, }) if resp.status_code != 200: return fail(f"filter flow_runs {resp.status_code}") runs_after_first = resp.json() count_after_first = len(runs_after_first) log(f" runs after first tick: {count_after_first}") if count_after_first == 0: return fail("scheduler did not create any runs") # wait for scheduler to run again log("[bold]waiting for second scheduler tick (7s)...[/bold]") time_mod.sleep(7) # count runs after second scheduler tick resp = client.post("/flow_runs/filter", json={ "flow_runs": {"deployment_id": {"any_": [deployment_id]}}, "limit": 100, }) if resp.status_code != 200: return fail(f"filter flow_runs {resp.status_code}") runs_after_second = resp.json() count_after_second = len(runs_after_second) log(f" runs after second tick: {count_after_second}") # key test: same number of runs means idempotency works # (scheduler shouldn't create duplicates for same scheduled times) if count_after_second != count_after_first: return fail(f"idempotency failed: {count_after_first} -> {count_after_second} runs") log(f" [green]idempotency verified: count unchanged[/green]") # cleanup client.delete(f"/deployments/{deployment_id}") client.delete(f"/work_pools/{pool_name}") log(" cleanup: ok") return True def test_parameter_merging(client: CountingClient) -> bool: """Test that schedule parameters override deployment parameters.""" import time as time_mod def fail(msg: str) -> bool: if not QUIET: console.print(f"[red]FAIL[/red]: {msg}") return False def log(msg: str) -> None: if not QUIET: console.print(msg) # setup log("[bold]setup[/bold]") resp = client.post("/flows/", json={"name": f"params-flow-{uuid.uuid4().hex[:8]}"}) if resp.status_code not in (200, 201): return fail(f"create flow {resp.status_code}") flow_id = resp.json().get("id") pool_name = f"params-pool-{uuid.uuid4().hex[:8]}" resp = client.post("/work_pools/", json={"name": pool_name, "type": "process"}) if resp.status_code not in (200, 201): return fail(f"create work_pool {resp.status_code}") log(f" pool: {pool_name}") # create deployment with base parameters # schedule has override parameter resp = client.post("/deployments/", json={ "name": f"params-deploy-{uuid.uuid4().hex[:8]}", "flow_id": flow_id, "work_pool_name": pool_name, "parameters": {"base_key": "base_value", "override_key": "deployment_value"}, "schedules": [{ "schedule": {"interval": 3600}, "active": True, "parameters": {"override_key": "schedule_value", "schedule_key": "schedule_only"}, }], }) if resp.status_code not in (200, 201): return fail(f"create deployment {resp.status_code}") deployment = resp.json() deployment_id = deployment.get("id") log(f" deployment: {deployment_id}") log(f" deployment params: {deployment.get('parameters')}") # wait for scheduler to create runs log("[bold]waiting for scheduler (7s)...[/bold]") time_mod.sleep(7) # get the scheduled runs and check their parameters resp = client.post("/flow_runs/filter", json={ "flow_runs": {"deployment_id": {"any_": [deployment_id]}}, "limit": 10, }) if resp.status_code != 200: return fail(f"filter flow_runs {resp.status_code}") runs = resp.json() log(f" found {len(runs)} runs") if len(runs) == 0: return fail("scheduler did not create any runs") # check merged parameters on first run run_params = runs[0].get("parameters", {}) if isinstance(run_params, str): import json as json_mod run_params = json_mod.loads(run_params) log(f" run params: {run_params}") # verify merging: # - base_key should be from deployment # - override_key should be from schedule (override) # - schedule_key should be from schedule (new key) if run_params.get("base_key") != "base_value": return fail(f"base_key not preserved: {run_params.get('base_key')}") if run_params.get("override_key") != "schedule_value": return fail(f"override_key not overridden: {run_params.get('override_key')}") if run_params.get("schedule_key") != "schedule_only": return fail(f"schedule_key not added: {run_params.get('schedule_key')}") log(" [green]parameter merging verified[/green]") # cleanup client.delete(f"/deployments/{deployment_id}") client.delete(f"/work_pools/{pool_name}") log(" cleanup: ok") return True def test_get_scheduled_flow_runs(client: CountingClient) -> bool: """Test get_scheduled_flow_runs endpoint (worker polling).""" from datetime import datetime, timezone def fail(msg: str) -> bool: if not QUIET: console.print(f"[red]FAIL[/red]: {msg}") return False def log(msg: str) -> None: if not QUIET: console.print(msg) # setup log("[bold]setup: create flow[/bold]") resp = client.post("/flows/", json={"name": f"schedule-flow-{uuid.uuid4().hex[:8]}"}) if resp.status_code not in (200, 201): return fail(f"create flow {resp.status_code}") flow_id = resp.json().get("id") pool_name = f"schedule-pool-{uuid.uuid4().hex[:8]}" resp = client.post("/work_pools/", json={"name": pool_name, "type": "process"}) if resp.status_code not in (200, 201): return fail(f"create work_pool {resp.status_code}") pool = resp.json() pool_id, default_queue_id = pool.get("id"), pool.get("default_queue_id") log(f" pool: {pool_id}") if pool.get("status") != "NOT_READY": return fail(f"expected pool NOT_READY, got {pool.get('status')}") resp = client.post("/deployments/", json={"name": f"schedule-deploy-{uuid.uuid4().hex[:8]}", "flow_id": flow_id, "work_pool_name": pool_name}) if resp.status_code not in (200, 201): return fail(f"create deployment {resp.status_code}") deployment = resp.json() deployment_id = deployment.get("id") log(f" deployment: {deployment_id}") if deployment.get("status") != "NOT_READY": return fail(f"expected deployment NOT_READY, got {deployment.get('status')}") # create scheduled flow run log("[bold]create scheduled flow run[/bold]") resp = client.post(f"/deployments/{deployment_id}/create_flow_run", json={"state": {"type": "SCHEDULED", "name": "Scheduled"}}) if resp.status_code not in (200, 201): return fail(f"create flow run {resp.status_code}") flow_run = resp.json() flow_run_id = flow_run.get("id") log(f" flow_run: {flow_run_id}") log(f" state: {flow_run.get('state_type')}") if flow_run.get("state_type") != "SCHEDULED": return fail(f"expected SCHEDULED, got {flow_run.get('state_type')}") # test polling log("[bold]get_scheduled_flow_runs[/bold]") resp = client.post(f"/work_pools/{pool_name}/get_scheduled_flow_runs", json={}) if resp.status_code != 200: return fail(f"get_scheduled_flow_runs {resp.status_code}") scheduled_runs = resp.json() if not isinstance(scheduled_runs, list): return fail(f"expected list, got {type(scheduled_runs)}") log(f" returned {len(scheduled_runs)} runs") # verify our run is in results found = any(item.get("flow_run", {}).get("id") == flow_run_id and item.get("work_pool_id") == pool_id and item.get("work_queue_id") == default_queue_id for item in scheduled_runs) if not found: return fail("scheduled flow run not found in results") log(" flow run found in results") # verify status changes resp = client.get(f"/work_pools/{pool_name}") if resp.status_code != 200 or resp.json().get("status") != "READY": return fail(f"expected pool READY after polling") log(" pool status: READY") resp = client.get(f"/deployments/{deployment_id}") if resp.status_code != 200 or resp.json().get("status") != "READY": return fail(f"expected deployment READY after polling") log(" deployment status: READY") # test filters resp = client.post(f"/work_pools/{pool_name}/get_scheduled_flow_runs", json={"work_queue_names": ["default"]}) if resp.status_code != 200: return fail(f"filter test {resp.status_code}") log(" filtered by queue: ok") resp = client.post(f"/work_pools/{pool_name}/get_scheduled_flow_runs", json={"scheduled_before": datetime.now(timezone.utc).isoformat()}) if resp.status_code != 200: return fail(f"scheduled_before test {resp.status_code}") log(f" scheduled_before filter: {len(resp.json())} runs") # cleanup client.delete(f"/deployments/{deployment_id}") client.delete(f"/work_pools/{pool_name}") log(" cleanup: ok") return True def main(): json_output = "--json" in sys.argv if not QUIET: console.print("\n[bold cyan]prefect-server API test[/bold cyan]\n") results: list[TestResult] = [] # run all tests results.append(run_test("admin", test_admin)) results.append(run_test("flow_run (success)", lambda c: test_flow_run(c, should_fail=False))) results.append(run_test("flow_run (failure)", lambda c: test_flow_run(c, should_fail=True))) results.append(run_test("orchestration_rules", test_orchestration_rules)) results.append(run_test("task_run", test_task_run)) results.append(run_test("filters", test_filters)) results.append(run_test("logs", test_logs)) results.append(run_test("variables", test_variables)) results.append(run_test("blocks", test_blocks)) results.append(run_test("work_pools", test_work_pools)) results.append(run_test("deployments", test_deployments)) results.append(run_test("get_scheduled_flow_runs", test_get_scheduled_flow_runs)) results.append(run_test("scheduler_idempotency", test_scheduler_idempotency)) results.append(run_test("parameter_merging", test_parameter_merging)) total_duration = sum(r.duration_ms for r in results) total_requests = sum(r.requests for r in results) all_passed = all(r.passed for r in results) if json_output: # machine-readable output for benchmark script output = { "passed": all_passed, "total_duration_ms": total_duration, "total_requests": total_requests, "sections": [ { "name": r.name, "passed": r.passed, "duration_ms": r.duration_ms, "requests": r.requests, "error": r.error, } for r in results ], } print(json_lib.dumps(output)) else: # human-readable output console.print("\n" + "=" * 60) table = Table(title="test results") table.add_column("section", style="cyan") table.add_column("time", justify="right") table.add_column("reqs", justify="right") table.add_column("status", justify="center") for r in results: status = "[green]✓[/green]" if r.passed else "[red]✗[/red]" table.add_row(r.name, f"{r.duration_ms:.1f}ms", str(r.requests), status) table.add_row("", "", "", "", style="dim") table.add_row("[bold]total[/bold]", f"[bold]{total_duration:.1f}ms[/bold]", f"[bold]{total_requests}[/bold]", "") console.print(table) if all_passed: console.print("\n[bold green]all tests passed[/bold green]") else: console.print("\n[bold red]some tests failed[/bold red]") sys.exit(0 if all_passed else 1) if __name__ == "__main__": main()