prefect server in zig
at main 327 lines 11 kB view raw
1#!/usr/bin/env -S uv run --script --quiet 2# /// script 3# requires-python = ">=3.12" 4# dependencies = ["prefect>=3.0", "httpx"] 5# /// 6""" 7Schedule and runner integration tests for prefect-server. 8 9Tests: 101. Cron scheduler creates flow runs on schedule (server-side) 112. .serve() Runner polls get_scheduled_flow_runs and executes locally 123. .serve() creates deployment with schedule attached 13 14NOTE: .serve() is NOT a worker. It's a Runner that: 15- Creates a deployment 16- Polls POST /deployments/get_scheduled_flow_runs every N seconds 17- Executes runs locally in the same process 18 19Workers are separate standalone daemons that poll work pools. 20 21Requires running server at PREFECT_API_URL. 22""" 23 24import os 25import signal 26import subprocess 27import sys 28import time 29import uuid 30 31import httpx 32 33API_URL = os.environ.get("PREFECT_API_URL", "http://localhost:4200/api") 34 35 36def api(method: str, path: str, **kwargs) -> httpx.Response: 37 """Make API request.""" 38 url = f"{API_URL}{path}" 39 return httpx.request(method, url, timeout=30, **kwargs) 40 41 42def wait_for_condition(check_fn, timeout: int = 60, interval: float = 1.0, desc: str = "condition"): 43 """Wait for condition to be true.""" 44 start = time.time() 45 while time.time() - start < timeout: 46 result = check_fn() 47 if result: 48 return result 49 time.sleep(interval) 50 raise TimeoutError(f"timeout waiting for {desc}") 51 52 53def test_cron_scheduler(): 54 """Test that cron schedules trigger flow runs.""" 55 print("\n=== test_cron_scheduler ===") 56 57 suffix = uuid.uuid4().hex[:8] 58 59 # create flow 60 resp = api("POST", "/flows/", json={"name": f"cron-test-flow-{suffix}"}) 61 assert resp.status_code in (200, 201), f"create flow failed: {resp.status_code}" 62 flow_id = resp.json()["id"] 63 print(f" flow: {flow_id}") 64 65 # create work pool 66 pool_name = f"cron-test-pool-{suffix}" 67 resp = api("POST", "/work_pools/", json={"name": pool_name, "type": "process"}) 68 assert resp.status_code in (200, 201), f"create pool failed: {resp.status_code}" 69 print(f" pool: {pool_name}") 70 71 # create deployment with cron schedule (every minute) 72 # use */1 * * * * to trigger within 60 seconds 73 resp = api("POST", "/deployments/", json={ 74 "name": f"cron-test-deploy-{suffix}", 75 "flow_id": flow_id, 76 "work_pool_name": pool_name, 77 "schedules": [{"schedule": {"cron": "*/1 * * * *"}, "active": True}], 78 }) 79 assert resp.status_code in (200, 201), f"create deployment failed: {resp.status_code}" 80 deployment_id = resp.json()["id"] 81 print(f" deployment: {deployment_id}") 82 print(f" schedule: */1 * * * * (every minute)") 83 84 # wait for scheduler to create a run (up to 70 seconds for cron) 85 print(" waiting for scheduler to create run (up to 70s)...") 86 87 def check_run_created(): 88 resp = api("POST", "/flow_runs/filter", json={ 89 "flow_runs": {"deployment_id": {"any_": [deployment_id]}}, 90 "limit": 10, 91 }) 92 if resp.status_code == 200: 93 runs = resp.json() 94 if runs: 95 return runs[0] 96 return None 97 98 run = wait_for_condition(check_run_created, timeout=70, interval=2, desc="scheduled run") 99 print(f" ✓ scheduler created run: {run['id']}") 100 print(f" state: {run.get('state', {}).get('type', 'unknown')}") 101 102 # cleanup 103 api("DELETE", f"/deployments/{deployment_id}") 104 api("DELETE", f"/work_pools/{pool_name}") 105 api("DELETE", f"/flows/{flow_id}") 106 107 print(" ✓ cron scheduler test passed") 108 return True 109 110 111def test_serve_runner(): 112 """Test that .serve() Runner polls get_scheduled_flow_runs and executes locally. 113 114 .serve() is NOT a worker - it's a Runner that: 115 1. Creates a deployment with schedule 116 2. Polls POST /deployments/get_scheduled_flow_runs every N seconds 117 3. Executes matching runs locally in the same process 118 119 This is distinct from Workers which are standalone daemons that poll work pools. 120 """ 121 print("\n=== test_serve_runner ===") 122 123 # .serve() creates a Runner that polls for scheduled runs. The server-side 124 # scheduler creates runs, and .serve() picks them up via get_scheduled_flow_runs. 125 126 import threading 127 from prefect import flow 128 from prefect.client.orchestration import get_client 129 130 suffix = uuid.uuid4().hex[:8] 131 deployment_name = f"worker-exec-{suffix}" 132 execution_marker = f"/tmp/worker_exec_marker_{suffix}" 133 serve_error = None 134 serve_started = threading.Event() 135 136 @flow 137 def worker_exec_flow(marker_file: str = execution_marker): 138 """Flow that creates a marker file to prove it ran.""" 139 import pathlib 140 pathlib.Path(marker_file).write_text("executed") 141 return "done" 142 143 def run_serve(): 144 nonlocal serve_error 145 try: 146 serve_started.set() 147 # use interval=1 for faster testing 148 worker_exec_flow.serve( 149 name=deployment_name, 150 interval=1, # every 1 second - .serve() handles this locally 151 ) 152 except Exception as e: 153 serve_error = e 154 155 # start serve in background 156 serve_thread = threading.Thread(target=run_serve, daemon=True) 157 serve_thread.start() 158 159 # wait for serve to start 160 serve_started.wait(timeout=10) 161 time.sleep(2) # give it time to register deployment 162 163 try: 164 with get_client(sync_client=True) as client: 165 # verify deployment exists 166 try: 167 deployment = client.read_deployment_by_name(f"worker-exec-flow/{deployment_name}") 168 print(f" deployment: {deployment.id}") 169 except Exception as e: 170 if serve_error: 171 raise RuntimeError(f"serve failed: {serve_error}") 172 raise RuntimeError(f"deployment not found: {e}") 173 174 # .serve() with interval will execute the flow locally on schedule 175 # wait for it to execute (interval=1s, so should be quick) 176 print(" waiting for .serve() to execute flow (up to 15s)...") 177 178 def check_marker_exists(): 179 import pathlib 180 return pathlib.Path(execution_marker).exists() 181 182 try: 183 wait_for_condition(check_marker_exists, timeout=15, interval=0.5, desc="flow execution") 184 print(" ✓ flow executed (marker file created)") 185 186 # verify run was recorded in API 187 runs = client.read_flow_runs() 188 deployment_runs = [r for r in runs if r.deployment_id == deployment.id] 189 if deployment_runs: 190 latest = max(deployment_runs, key=lambda r: r.created) 191 print(f" run state: {latest.state.type if latest.state else 'unknown'}") 192 193 print(" ✓ serve runner test passed") 194 return True 195 196 except TimeoutError: 197 # .serve() might not execute if there's an issue 198 if serve_error: 199 print(f" serve error: {serve_error}") 200 print(" ✗ flow was not executed by .serve()") 201 return False 202 203 finally: 204 # cleanup marker 205 import pathlib 206 pathlib.Path(execution_marker).unlink(missing_ok=True) 207 208 209def test_serve_with_schedule(): 210 """Test .serve() creates deployment with schedule.""" 211 print("\n=== test_serve_with_schedule ===") 212 213 # this test uses the prefect client .serve() method 214 # which is a blocking call, so we run it in a thread 215 216 import threading 217 from prefect import flow 218 from prefect.client.orchestration import get_client 219 220 suffix = uuid.uuid4().hex[:8] 221 deployment_name = f"serve-schedule-{suffix}" 222 serve_error = None 223 224 @flow 225 def serve_test_flow(): 226 return "ok" 227 228 def run_serve(): 229 nonlocal serve_error 230 try: 231 # serve with cron schedule 232 serve_test_flow.serve( 233 name=deployment_name, 234 cron="*/5 * * * *", # every 5 minutes 235 ) 236 except Exception as e: 237 serve_error = e 238 239 # start serve in background 240 serve_thread = threading.Thread(target=run_serve, daemon=True) 241 serve_thread.start() 242 243 # wait for deployment to be created 244 print(" waiting for deployment (5s)...") 245 time.sleep(5) 246 247 # check deployment exists with schedule 248 with get_client(sync_client=True) as client: 249 try: 250 deployment = client.read_deployment_by_name(f"serve-test-flow/{deployment_name}") 251 print(f" ✓ deployment created: {deployment.id}") 252 253 # check schedules 254 schedules = deployment.schedules 255 if schedules: 256 print(f" ✓ schedule attached: {len(schedules)} schedule(s)") 257 for s in schedules: 258 print(f" - {s.schedule}") 259 else: 260 print(" ✗ no schedules found on deployment") 261 return False 262 263 print(" ✓ .serve() with schedule test passed") 264 return True 265 266 except Exception as e: 267 if serve_error: 268 print(f" ✗ serve failed: {serve_error}") 269 else: 270 print(f" ✗ deployment not found: {e}") 271 return False 272 273 274def main(): 275 print(f"api url: {API_URL}") 276 277 # verify server is running 278 try: 279 resp = api("GET", "/health") 280 if resp.status_code != 200: 281 print(f"server not healthy: {resp.status_code}") 282 sys.exit(1) 283 except Exception as e: 284 print(f"cannot connect to server: {e}") 285 sys.exit(1) 286 287 print("server healthy") 288 289 results = [] 290 291 # test 1: cron scheduler 292 try: 293 results.append(("cron_scheduler", test_cron_scheduler())) 294 except Exception as e: 295 print(f" ✗ cron scheduler test failed: {e}") 296 results.append(("cron_scheduler", False)) 297 298 # test 2: serve runner (NOT a worker - Runner polls for scheduled runs) 299 try: 300 results.append(("serve_runner", test_serve_runner())) 301 except Exception as e: 302 print(f" ✗ serve runner test failed: {e}") 303 results.append(("serve_runner", False)) 304 305 # test 3: serve with schedule 306 try: 307 results.append(("serve_with_schedule", test_serve_with_schedule())) 308 except Exception as e: 309 print(f" ✗ serve with schedule test failed: {e}") 310 results.append(("serve_with_schedule", False)) 311 312 # summary 313 print("\n=== summary ===") 314 passed = sum(1 for _, ok in results if ok) 315 total = len(results) 316 317 for name, ok in results: 318 status = "✓" if ok else "✗" 319 print(f" {status} {name}") 320 321 print(f"\n{passed}/{total} tests passed") 322 323 sys.exit(0 if passed == total else 1) 324 325 326if __name__ == "__main__": 327 main()