#!/usr/bin/env -S uv run --script --quiet # /// script # requires-python = ">=3.12" # dependencies = ["prefect>=3.0", "httpx"] # /// """ Schedule and runner integration tests for prefect-server. Tests: 1. Cron scheduler creates flow runs on schedule (server-side) 2. .serve() Runner polls get_scheduled_flow_runs and executes locally 3. .serve() creates deployment with schedule attached NOTE: .serve() is NOT a worker. It's a Runner that: - Creates a deployment - Polls POST /deployments/get_scheduled_flow_runs every N seconds - Executes runs locally in the same process Workers are separate standalone daemons that poll work pools. Requires running server at PREFECT_API_URL. """ import os import signal import subprocess import sys import time import uuid import httpx API_URL = os.environ.get("PREFECT_API_URL", "http://localhost:4200/api") def api(method: str, path: str, **kwargs) -> httpx.Response: """Make API request.""" url = f"{API_URL}{path}" return httpx.request(method, url, timeout=30, **kwargs) def wait_for_condition(check_fn, timeout: int = 60, interval: float = 1.0, desc: str = "condition"): """Wait for condition to be true.""" start = time.time() while time.time() - start < timeout: result = check_fn() if result: return result time.sleep(interval) raise TimeoutError(f"timeout waiting for {desc}") def test_cron_scheduler(): """Test that cron schedules trigger flow runs.""" print("\n=== test_cron_scheduler ===") suffix = uuid.uuid4().hex[:8] # create flow resp = api("POST", "/flows/", json={"name": f"cron-test-flow-{suffix}"}) assert resp.status_code in (200, 201), f"create flow failed: {resp.status_code}" flow_id = resp.json()["id"] print(f" flow: {flow_id}") # create work pool pool_name = f"cron-test-pool-{suffix}" resp = api("POST", "/work_pools/", json={"name": pool_name, "type": "process"}) assert resp.status_code in (200, 201), f"create pool failed: {resp.status_code}" print(f" pool: {pool_name}") # create deployment with cron schedule (every minute) # use */1 * * * * to trigger within 60 seconds resp = api("POST", "/deployments/", json={ "name": f"cron-test-deploy-{suffix}", "flow_id": flow_id, "work_pool_name": pool_name, "schedules": [{"schedule": {"cron": "*/1 * * * *"}, "active": True}], }) assert resp.status_code in (200, 201), f"create deployment failed: {resp.status_code}" deployment_id = resp.json()["id"] print(f" deployment: {deployment_id}") print(f" schedule: */1 * * * * (every minute)") # wait for scheduler to create a run (up to 70 seconds for cron) print(" waiting for scheduler to create run (up to 70s)...") def check_run_created(): resp = api("POST", "/flow_runs/filter", json={ "flow_runs": {"deployment_id": {"any_": [deployment_id]}}, "limit": 10, }) if resp.status_code == 200: runs = resp.json() if runs: return runs[0] return None run = wait_for_condition(check_run_created, timeout=70, interval=2, desc="scheduled run") print(f" ✓ scheduler created run: {run['id']}") print(f" state: {run.get('state', {}).get('type', 'unknown')}") # cleanup api("DELETE", f"/deployments/{deployment_id}") api("DELETE", f"/work_pools/{pool_name}") api("DELETE", f"/flows/{flow_id}") print(" ✓ cron scheduler test passed") return True def test_serve_runner(): """Test that .serve() Runner polls get_scheduled_flow_runs and executes locally. .serve() is NOT a worker - it's a Runner that: 1. Creates a deployment with schedule 2. Polls POST /deployments/get_scheduled_flow_runs every N seconds 3. Executes matching runs locally in the same process This is distinct from Workers which are standalone daemons that poll work pools. """ print("\n=== test_serve_runner ===") # .serve() creates a Runner that polls for scheduled runs. The server-side # scheduler creates runs, and .serve() picks them up via get_scheduled_flow_runs. import threading from prefect import flow from prefect.client.orchestration import get_client suffix = uuid.uuid4().hex[:8] deployment_name = f"worker-exec-{suffix}" execution_marker = f"/tmp/worker_exec_marker_{suffix}" serve_error = None serve_started = threading.Event() @flow def worker_exec_flow(marker_file: str = execution_marker): """Flow that creates a marker file to prove it ran.""" import pathlib pathlib.Path(marker_file).write_text("executed") return "done" def run_serve(): nonlocal serve_error try: serve_started.set() # use interval=1 for faster testing worker_exec_flow.serve( name=deployment_name, interval=1, # every 1 second - .serve() handles this locally ) except Exception as e: serve_error = e # start serve in background serve_thread = threading.Thread(target=run_serve, daemon=True) serve_thread.start() # wait for serve to start serve_started.wait(timeout=10) time.sleep(2) # give it time to register deployment try: with get_client(sync_client=True) as client: # verify deployment exists try: deployment = client.read_deployment_by_name(f"worker-exec-flow/{deployment_name}") print(f" deployment: {deployment.id}") except Exception as e: if serve_error: raise RuntimeError(f"serve failed: {serve_error}") raise RuntimeError(f"deployment not found: {e}") # .serve() with interval will execute the flow locally on schedule # wait for it to execute (interval=1s, so should be quick) print(" waiting for .serve() to execute flow (up to 15s)...") def check_marker_exists(): import pathlib return pathlib.Path(execution_marker).exists() try: wait_for_condition(check_marker_exists, timeout=15, interval=0.5, desc="flow execution") print(" ✓ flow executed (marker file created)") # verify run was recorded in API runs = client.read_flow_runs() deployment_runs = [r for r in runs if r.deployment_id == deployment.id] if deployment_runs: latest = max(deployment_runs, key=lambda r: r.created) print(f" run state: {latest.state.type if latest.state else 'unknown'}") print(" ✓ serve runner test passed") return True except TimeoutError: # .serve() might not execute if there's an issue if serve_error: print(f" serve error: {serve_error}") print(" ✗ flow was not executed by .serve()") return False finally: # cleanup marker import pathlib pathlib.Path(execution_marker).unlink(missing_ok=True) def test_serve_with_schedule(): """Test .serve() creates deployment with schedule.""" print("\n=== test_serve_with_schedule ===") # this test uses the prefect client .serve() method # which is a blocking call, so we run it in a thread import threading from prefect import flow from prefect.client.orchestration import get_client suffix = uuid.uuid4().hex[:8] deployment_name = f"serve-schedule-{suffix}" serve_error = None @flow def serve_test_flow(): return "ok" def run_serve(): nonlocal serve_error try: # serve with cron schedule serve_test_flow.serve( name=deployment_name, cron="*/5 * * * *", # every 5 minutes ) except Exception as e: serve_error = e # start serve in background serve_thread = threading.Thread(target=run_serve, daemon=True) serve_thread.start() # wait for deployment to be created print(" waiting for deployment (5s)...") time.sleep(5) # check deployment exists with schedule with get_client(sync_client=True) as client: try: deployment = client.read_deployment_by_name(f"serve-test-flow/{deployment_name}") print(f" ✓ deployment created: {deployment.id}") # check schedules schedules = deployment.schedules if schedules: print(f" ✓ schedule attached: {len(schedules)} schedule(s)") for s in schedules: print(f" - {s.schedule}") else: print(" ✗ no schedules found on deployment") return False print(" ✓ .serve() with schedule test passed") return True except Exception as e: if serve_error: print(f" ✗ serve failed: {serve_error}") else: print(f" ✗ deployment not found: {e}") return False def main(): print(f"api url: {API_URL}") # verify server is running try: resp = api("GET", "/health") if resp.status_code != 200: print(f"server not healthy: {resp.status_code}") sys.exit(1) except Exception as e: print(f"cannot connect to server: {e}") sys.exit(1) print("server healthy") results = [] # test 1: cron scheduler try: results.append(("cron_scheduler", test_cron_scheduler())) except Exception as e: print(f" ✗ cron scheduler test failed: {e}") results.append(("cron_scheduler", False)) # test 2: serve runner (NOT a worker - Runner polls for scheduled runs) try: results.append(("serve_runner", test_serve_runner())) except Exception as e: print(f" ✗ serve runner test failed: {e}") results.append(("serve_runner", False)) # test 3: serve with schedule try: results.append(("serve_with_schedule", test_serve_with_schedule())) except Exception as e: print(f" ✗ serve with schedule test failed: {e}") results.append(("serve_with_schedule", False)) # summary print("\n=== summary ===") passed = sum(1 for _, ok in results if ok) total = len(results) for name, ok in results: status = "✓" if ok else "✗" print(f" {status} {name}") print(f"\n{passed}/{total} tests passed") sys.exit(0 if passed == total else 1) if __name__ == "__main__": main()