prefect server in zig
1#!/usr/bin/env -S uv run --script --quiet
2# /// script
3# requires-python = ">=3.12"
4# dependencies = ["prefect>=3.0", "httpx"]
5# ///
6"""
7Schedule and runner integration tests for prefect-server.
8
9Tests:
101. Cron scheduler creates flow runs on schedule (server-side)
112. .serve() Runner polls get_scheduled_flow_runs and executes locally
123. .serve() creates deployment with schedule attached
13
14NOTE: .serve() is NOT a worker. It's a Runner that:
15- Creates a deployment
16- Polls POST /deployments/get_scheduled_flow_runs every N seconds
17- Executes runs locally in the same process
18
19Workers are separate standalone daemons that poll work pools.
20
21Requires running server at PREFECT_API_URL.
22"""
23
24import os
25import signal
26import subprocess
27import sys
28import time
29import uuid
30
31import httpx
32
33API_URL = os.environ.get("PREFECT_API_URL", "http://localhost:4200/api")
34
35
36def api(method: str, path: str, **kwargs) -> httpx.Response:
37 """Make API request."""
38 url = f"{API_URL}{path}"
39 return httpx.request(method, url, timeout=30, **kwargs)
40
41
42def wait_for_condition(check_fn, timeout: int = 60, interval: float = 1.0, desc: str = "condition"):
43 """Wait for condition to be true."""
44 start = time.time()
45 while time.time() - start < timeout:
46 result = check_fn()
47 if result:
48 return result
49 time.sleep(interval)
50 raise TimeoutError(f"timeout waiting for {desc}")
51
52
53def test_cron_scheduler():
54 """Test that cron schedules trigger flow runs."""
55 print("\n=== test_cron_scheduler ===")
56
57 suffix = uuid.uuid4().hex[:8]
58
59 # create flow
60 resp = api("POST", "/flows/", json={"name": f"cron-test-flow-{suffix}"})
61 assert resp.status_code in (200, 201), f"create flow failed: {resp.status_code}"
62 flow_id = resp.json()["id"]
63 print(f" flow: {flow_id}")
64
65 # create work pool
66 pool_name = f"cron-test-pool-{suffix}"
67 resp = api("POST", "/work_pools/", json={"name": pool_name, "type": "process"})
68 assert resp.status_code in (200, 201), f"create pool failed: {resp.status_code}"
69 print(f" pool: {pool_name}")
70
71 # create deployment with cron schedule (every minute)
72 # use */1 * * * * to trigger within 60 seconds
73 resp = api("POST", "/deployments/", json={
74 "name": f"cron-test-deploy-{suffix}",
75 "flow_id": flow_id,
76 "work_pool_name": pool_name,
77 "schedules": [{"schedule": {"cron": "*/1 * * * *"}, "active": True}],
78 })
79 assert resp.status_code in (200, 201), f"create deployment failed: {resp.status_code}"
80 deployment_id = resp.json()["id"]
81 print(f" deployment: {deployment_id}")
82 print(f" schedule: */1 * * * * (every minute)")
83
84 # wait for scheduler to create a run (up to 70 seconds for cron)
85 print(" waiting for scheduler to create run (up to 70s)...")
86
87 def check_run_created():
88 resp = api("POST", "/flow_runs/filter", json={
89 "flow_runs": {"deployment_id": {"any_": [deployment_id]}},
90 "limit": 10,
91 })
92 if resp.status_code == 200:
93 runs = resp.json()
94 if runs:
95 return runs[0]
96 return None
97
98 run = wait_for_condition(check_run_created, timeout=70, interval=2, desc="scheduled run")
99 print(f" ✓ scheduler created run: {run['id']}")
100 print(f" state: {run.get('state', {}).get('type', 'unknown')}")
101
102 # cleanup
103 api("DELETE", f"/deployments/{deployment_id}")
104 api("DELETE", f"/work_pools/{pool_name}")
105 api("DELETE", f"/flows/{flow_id}")
106
107 print(" ✓ cron scheduler test passed")
108 return True
109
110
111def test_serve_runner():
112 """Test that .serve() Runner polls get_scheduled_flow_runs and executes locally.
113
114 .serve() is NOT a worker - it's a Runner that:
115 1. Creates a deployment with schedule
116 2. Polls POST /deployments/get_scheduled_flow_runs every N seconds
117 3. Executes matching runs locally in the same process
118
119 This is distinct from Workers which are standalone daemons that poll work pools.
120 """
121 print("\n=== test_serve_runner ===")
122
123 # .serve() creates a Runner that polls for scheduled runs. The server-side
124 # scheduler creates runs, and .serve() picks them up via get_scheduled_flow_runs.
125
126 import threading
127 from prefect import flow
128 from prefect.client.orchestration import get_client
129
130 suffix = uuid.uuid4().hex[:8]
131 deployment_name = f"worker-exec-{suffix}"
132 execution_marker = f"/tmp/worker_exec_marker_{suffix}"
133 serve_error = None
134 serve_started = threading.Event()
135
136 @flow
137 def worker_exec_flow(marker_file: str = execution_marker):
138 """Flow that creates a marker file to prove it ran."""
139 import pathlib
140 pathlib.Path(marker_file).write_text("executed")
141 return "done"
142
143 def run_serve():
144 nonlocal serve_error
145 try:
146 serve_started.set()
147 # use interval=1 for faster testing
148 worker_exec_flow.serve(
149 name=deployment_name,
150 interval=1, # every 1 second - .serve() handles this locally
151 )
152 except Exception as e:
153 serve_error = e
154
155 # start serve in background
156 serve_thread = threading.Thread(target=run_serve, daemon=True)
157 serve_thread.start()
158
159 # wait for serve to start
160 serve_started.wait(timeout=10)
161 time.sleep(2) # give it time to register deployment
162
163 try:
164 with get_client(sync_client=True) as client:
165 # verify deployment exists
166 try:
167 deployment = client.read_deployment_by_name(f"worker-exec-flow/{deployment_name}")
168 print(f" deployment: {deployment.id}")
169 except Exception as e:
170 if serve_error:
171 raise RuntimeError(f"serve failed: {serve_error}")
172 raise RuntimeError(f"deployment not found: {e}")
173
174 # .serve() with interval will execute the flow locally on schedule
175 # wait for it to execute (interval=1s, so should be quick)
176 print(" waiting for .serve() to execute flow (up to 15s)...")
177
178 def check_marker_exists():
179 import pathlib
180 return pathlib.Path(execution_marker).exists()
181
182 try:
183 wait_for_condition(check_marker_exists, timeout=15, interval=0.5, desc="flow execution")
184 print(" ✓ flow executed (marker file created)")
185
186 # verify run was recorded in API
187 runs = client.read_flow_runs()
188 deployment_runs = [r for r in runs if r.deployment_id == deployment.id]
189 if deployment_runs:
190 latest = max(deployment_runs, key=lambda r: r.created)
191 print(f" run state: {latest.state.type if latest.state else 'unknown'}")
192
193 print(" ✓ serve runner test passed")
194 return True
195
196 except TimeoutError:
197 # .serve() might not execute if there's an issue
198 if serve_error:
199 print(f" serve error: {serve_error}")
200 print(" ✗ flow was not executed by .serve()")
201 return False
202
203 finally:
204 # cleanup marker
205 import pathlib
206 pathlib.Path(execution_marker).unlink(missing_ok=True)
207
208
209def test_serve_with_schedule():
210 """Test .serve() creates deployment with schedule."""
211 print("\n=== test_serve_with_schedule ===")
212
213 # this test uses the prefect client .serve() method
214 # which is a blocking call, so we run it in a thread
215
216 import threading
217 from prefect import flow
218 from prefect.client.orchestration import get_client
219
220 suffix = uuid.uuid4().hex[:8]
221 deployment_name = f"serve-schedule-{suffix}"
222 serve_error = None
223
224 @flow
225 def serve_test_flow():
226 return "ok"
227
228 def run_serve():
229 nonlocal serve_error
230 try:
231 # serve with cron schedule
232 serve_test_flow.serve(
233 name=deployment_name,
234 cron="*/5 * * * *", # every 5 minutes
235 )
236 except Exception as e:
237 serve_error = e
238
239 # start serve in background
240 serve_thread = threading.Thread(target=run_serve, daemon=True)
241 serve_thread.start()
242
243 # wait for deployment to be created
244 print(" waiting for deployment (5s)...")
245 time.sleep(5)
246
247 # check deployment exists with schedule
248 with get_client(sync_client=True) as client:
249 try:
250 deployment = client.read_deployment_by_name(f"serve-test-flow/{deployment_name}")
251 print(f" ✓ deployment created: {deployment.id}")
252
253 # check schedules
254 schedules = deployment.schedules
255 if schedules:
256 print(f" ✓ schedule attached: {len(schedules)} schedule(s)")
257 for s in schedules:
258 print(f" - {s.schedule}")
259 else:
260 print(" ✗ no schedules found on deployment")
261 return False
262
263 print(" ✓ .serve() with schedule test passed")
264 return True
265
266 except Exception as e:
267 if serve_error:
268 print(f" ✗ serve failed: {serve_error}")
269 else:
270 print(f" ✗ deployment not found: {e}")
271 return False
272
273
274def main():
275 print(f"api url: {API_URL}")
276
277 # verify server is running
278 try:
279 resp = api("GET", "/health")
280 if resp.status_code != 200:
281 print(f"server not healthy: {resp.status_code}")
282 sys.exit(1)
283 except Exception as e:
284 print(f"cannot connect to server: {e}")
285 sys.exit(1)
286
287 print("server healthy")
288
289 results = []
290
291 # test 1: cron scheduler
292 try:
293 results.append(("cron_scheduler", test_cron_scheduler()))
294 except Exception as e:
295 print(f" ✗ cron scheduler test failed: {e}")
296 results.append(("cron_scheduler", False))
297
298 # test 2: serve runner (NOT a worker - Runner polls for scheduled runs)
299 try:
300 results.append(("serve_runner", test_serve_runner()))
301 except Exception as e:
302 print(f" ✗ serve runner test failed: {e}")
303 results.append(("serve_runner", False))
304
305 # test 3: serve with schedule
306 try:
307 results.append(("serve_with_schedule", test_serve_with_schedule()))
308 except Exception as e:
309 print(f" ✗ serve with schedule test failed: {e}")
310 results.append(("serve_with_schedule", False))
311
312 # summary
313 print("\n=== summary ===")
314 passed = sum(1 for _, ok in results if ok)
315 total = len(results)
316
317 for name, ok in results:
318 status = "✓" if ok else "✗"
319 print(f" {status} {name}")
320
321 print(f"\n{passed}/{total} tests passed")
322
323 sys.exit(0 if passed == total else 1)
324
325
326if __name__ == "__main__":
327 main()