prefect server in zig
1#!/usr/bin/env -S uv run --script --quiet
2# /// script
3# requires-python = ">=3.12"
4# dependencies = ["httpx", "rich"]
5# ///
6"""
7Functional test suite for prefect-server API.
8
9Tests API correctness by exercising all endpoints with expected request/response patterns.
10Includes scheduler integration tests (which have intentional delays to verify background services).
11
12For performance benchmarking, use ./scripts/benchmark instead.
13
14Usage:
15 ./scripts/test-api-sequence # human-readable output
16 ./scripts/test-api-sequence --json # machine-readable for CI
17 ./scripts/test-api-sequence --quiet # minimal output
18"""
19
20import json as json_lib
21import os
22import sys
23import time
24import uuid
25from dataclasses import dataclass, field
26from typing import Callable
27
28import httpx
29from rich.console import Console
30from rich.panel import Panel
31from rich.table import Table
32
33console = Console()
34BASE_URL = os.environ.get("PREFECT_API_URL", "http://localhost:4200/api")
35QUIET = "--json" in sys.argv or "--quiet" in sys.argv
36
37
38@dataclass
39class TestResult:
40 name: str
41 passed: bool
42 duration_ms: float
43 requests: int = 0
44 error: str | None = None
45
46
47class CountingClient(httpx.Client):
48 """HTTP client that counts requests."""
49
50 def __init__(self, *args, **kwargs):
51 super().__init__(*args, **kwargs)
52 self.request_count = 0
53
54 def request(self, *args, **kwargs):
55 self.request_count += 1
56 return super().request(*args, **kwargs)
57
58
59def validate_response(data: dict, required_fields: list[str], field_types: dict | None = None) -> bool:
60 """Validate response contains required fields with expected types."""
61 for field in required_fields:
62 if field not in data:
63 if not QUIET:
64 console.print(f"[red]VALIDATION[/red]: missing field '{field}'")
65 return False
66 if field_types:
67 for field, expected_type in field_types.items():
68 if field in data and data[field] is not None:
69 if not isinstance(data[field], expected_type):
70 if not QUIET:
71 console.print(f"[red]VALIDATION[/red]: field '{field}' expected {expected_type.__name__}, got {type(data[field]).__name__}")
72 return False
73 return True
74
75
76def run_test(name: str, test_fn: Callable[[CountingClient], bool]) -> TestResult:
77 """Run a test function with timing and request counting."""
78 if not QUIET:
79 console.print(Panel(f"testing {name}", style="blue"))
80
81 client = CountingClient(base_url=BASE_URL, timeout=10)
82 start = time.perf_counter()
83
84 try:
85 passed = test_fn(client)
86 duration_ms = (time.perf_counter() - start) * 1000
87 return TestResult(
88 name=name,
89 passed=passed,
90 duration_ms=duration_ms,
91 requests=client.request_count,
92 )
93 except Exception as e:
94 duration_ms = (time.perf_counter() - start) * 1000
95 return TestResult(
96 name=name,
97 passed=False,
98 duration_ms=duration_ms,
99 requests=client.request_count,
100 error=str(e),
101 )
102 finally:
103 client.close()
104
105
106# ---------- test functions ----------
107
108
109def test_admin(client: CountingClient) -> bool:
110 """Test admin/health endpoints."""
111 # health
112 if not QUIET:
113 console.print("[bold]GET /health[/bold]")
114 resp = client.get("/health")
115 if resp.status_code != 200:
116 if not QUIET:
117 console.print(f"[red]FAIL[/red]: {resp.status_code}")
118 return False
119 if not QUIET:
120 console.print(f" {resp.text}")
121
122 # version
123 if not QUIET:
124 console.print("[bold]GET /admin/version[/bold]")
125 resp = client.get("/admin/version")
126 if resp.status_code != 200:
127 if not QUIET:
128 console.print(f"[red]FAIL[/red]: {resp.status_code}")
129 return False
130 if not QUIET:
131 console.print(f" {resp.json()}")
132
133 # csrf-token
134 if not QUIET:
135 console.print("[bold]GET /csrf-token[/bold]")
136 resp = client.get("/csrf-token", params={"client": "test-client"})
137 if resp.status_code == 200:
138 if not QUIET:
139 console.print(f" token received")
140 elif resp.status_code == 422:
141 if not QUIET:
142 console.print(f" csrf protection disabled (ok)")
143 else:
144 if not QUIET:
145 console.print(f"[red]FAIL[/red]: {resp.status_code}")
146 return False
147
148 return True
149
150
151def test_flow_run(client: CountingClient, should_fail: bool = False) -> bool:
152 """Test flow run lifecycle."""
153 suffix = "fail" if should_fail else "success"
154 if not QUIET:
155 console.print(f"server: {BASE_URL}\n")
156
157 # create flow
158 if not QUIET:
159 console.print("[bold]1. POST /flows/[/bold]")
160 resp = client.post("/flows/", json={"name": f"bench-flow-{suffix}"})
161 if resp.status_code not in (200, 201):
162 if not QUIET:
163 console.print(f"[red]FAIL[/red]: {resp.status_code} {resp.text}")
164 return False
165 flow = resp.json()
166 if not validate_response(flow, ["id", "name", "created", "updated"], {"id": str, "name": str}):
167 return False
168 if not QUIET:
169 console.print(f" flow_id: {flow.get('id')}")
170
171 # create flow run
172 if not QUIET:
173 console.print("\n[bold]2. POST /flow_runs/[/bold]")
174 resp = client.post("/flow_runs/", json={
175 "flow_id": flow["id"],
176 "name": f"run-{uuid.uuid4().hex[:8]}",
177 "state": {"type": "PENDING", "name": "Pending"},
178 })
179 if resp.status_code not in (200, 201):
180 if not QUIET:
181 console.print(f"[red]FAIL[/red]: {resp.status_code} {resp.text}")
182 return False
183 flow_run = resp.json()
184 if not validate_response(flow_run, ["id", "name", "flow_id", "state_type"], {"id": str, "name": str}):
185 return False
186 flow_run_id = flow_run.get("id")
187 if not QUIET:
188 console.print(f" flow_run_id: {flow_run_id}")
189
190 # read flow run
191 if not QUIET:
192 console.print("\n[bold]3. GET /flow_runs/{id}[/bold]")
193 resp = client.get(f"/flow_runs/{flow_run_id}")
194 if resp.status_code != 200:
195 if not QUIET:
196 console.print(f"[red]FAIL[/red]: {resp.status_code}")
197 return False
198 if not QUIET:
199 console.print(f" state: {resp.json().get('state_type')}")
200
201 # set RUNNING
202 if not QUIET:
203 console.print("\n[bold]4. POST /flow_runs/{id}/set_state (RUNNING)[/bold]")
204 resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={
205 "state": {"type": "RUNNING", "name": "Running"},
206 "force": False,
207 })
208 if resp.status_code not in (200, 201):
209 if not QUIET:
210 console.print(f"[red]FAIL[/red]: {resp.status_code}")
211 return False
212 if not QUIET:
213 console.print(f" status: {resp.json().get('status')}")
214
215 # set final state
216 final_type = "FAILED" if should_fail else "COMPLETED"
217 final_name = "Failed" if should_fail else "Completed"
218 if not QUIET:
219 console.print(f"\n[bold]5. POST /flow_runs/{{id}}/set_state ({final_type})[/bold]")
220 resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={
221 "state": {"type": final_type, "name": final_name},
222 "force": False,
223 })
224 if resp.status_code not in (200, 201):
225 if not QUIET:
226 console.print(f"[red]FAIL[/red]: {resp.status_code}")
227 return False
228 if not QUIET:
229 console.print(f" status: {resp.json().get('status')}")
230
231 # verify
232 if not QUIET:
233 console.print("\n[bold]6. GET /flow_runs/{id} (verify)[/bold]")
234 resp = client.get(f"/flow_runs/{flow_run_id}")
235 if resp.status_code != 200:
236 return False
237 actual_type = resp.json().get("state_type")
238 if actual_type != final_type:
239 if not QUIET:
240 console.print(f"[red]FAIL[/red]: expected {final_type}, got {actual_type}")
241 return False
242 if not QUIET:
243 console.print(f" [green]state: {actual_type} (correct)[/green]")
244
245 return True
246
247
248def test_flow_with_task_runs(client: CountingClient) -> bool:
249 """Test flow execution with nested task runs - simulates real flow execution."""
250
251 def fail(msg: str) -> bool:
252 if not QUIET: console.print(f"[red]FAIL[/red]: {msg}")
253 return False
254
255 def log(msg: str) -> None:
256 if not QUIET: console.print(msg)
257
258 # setup: create flow
259 log("[bold]setup: create flow[/bold]")
260 resp = client.post("/flows/", json={"name": f"flow-with-tasks-{uuid.uuid4().hex[:8]}"})
261 if resp.status_code not in (200, 201): return fail(f"create flow {resp.status_code}")
262 flow_id = resp.json().get("id")
263 log(f" flow_id: {flow_id}")
264
265 # create flow run
266 log("[bold]1. create flow run[/bold]")
267 resp = client.post("/flow_runs/", json={
268 "flow_id": flow_id,
269 "name": f"run-with-tasks-{uuid.uuid4().hex[:8]}",
270 "state": {"type": "PENDING", "name": "Pending"},
271 })
272 if resp.status_code not in (200, 201): return fail(f"create flow run {resp.status_code}")
273 flow_run_id = resp.json().get("id")
274 log(f" flow_run_id: {flow_run_id}")
275
276 # start flow run
277 log("[bold]2. start flow run (RUNNING)[/bold]")
278 resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={
279 "state": {"type": "RUNNING", "name": "Running"},
280 })
281 if resp.status_code not in (200, 201) or resp.json().get("status") != "ACCEPT":
282 return fail(f"start flow run failed")
283 log(f" flow run started")
284
285 # create and execute task 1
286 log("[bold]3. execute task 1: extract[/bold]")
287 resp = client.post("/task_runs/", json={
288 "flow_run_id": flow_run_id,
289 "task_key": "extract_data",
290 "dynamic_key": "extract-0",
291 "name": "extract_data-0",
292 "state": {"type": "PENDING", "name": "Pending"},
293 })
294 if resp.status_code not in (200, 201): return fail(f"create task 1 {resp.status_code}")
295 task1_id = resp.json().get("id")
296
297 # run task 1
298 resp = client.post(f"/task_runs/{task1_id}/set_state", json={
299 "state": {"type": "RUNNING", "name": "Running"},
300 })
301 if resp.status_code not in (200, 201): return fail(f"task 1 RUNNING {resp.status_code}")
302
303 resp = client.post(f"/task_runs/{task1_id}/set_state", json={
304 "state": {"type": "COMPLETED", "name": "Completed"},
305 })
306 if resp.status_code not in (200, 201): return fail(f"task 1 COMPLETED {resp.status_code}")
307 log(f" task 1 completed")
308
309 # create and execute task 2 (depends on task 1)
310 log("[bold]4. execute task 2: transform[/bold]")
311 resp = client.post("/task_runs/", json={
312 "flow_run_id": flow_run_id,
313 "task_key": "transform_data",
314 "dynamic_key": "transform-0",
315 "name": "transform_data-0",
316 "state": {"type": "PENDING", "name": "Pending"},
317 })
318 if resp.status_code not in (200, 201): return fail(f"create task 2 {resp.status_code}")
319 task2_id = resp.json().get("id")
320
321 resp = client.post(f"/task_runs/{task2_id}/set_state", json={
322 "state": {"type": "RUNNING", "name": "Running"},
323 })
324 resp = client.post(f"/task_runs/{task2_id}/set_state", json={
325 "state": {"type": "COMPLETED", "name": "Completed"},
326 })
327 if resp.status_code not in (200, 201): return fail(f"task 2 COMPLETED {resp.status_code}")
328 log(f" task 2 completed")
329
330 # create and execute task 3
331 log("[bold]5. execute task 3: load[/bold]")
332 resp = client.post("/task_runs/", json={
333 "flow_run_id": flow_run_id,
334 "task_key": "load_data",
335 "dynamic_key": "load-0",
336 "name": "load_data-0",
337 "state": {"type": "PENDING", "name": "Pending"},
338 })
339 if resp.status_code not in (200, 201): return fail(f"create task 3 {resp.status_code}")
340 task3_id = resp.json().get("id")
341
342 resp = client.post(f"/task_runs/{task3_id}/set_state", json={
343 "state": {"type": "RUNNING", "name": "Running"},
344 })
345 resp = client.post(f"/task_runs/{task3_id}/set_state", json={
346 "state": {"type": "COMPLETED", "name": "Completed"},
347 })
348 if resp.status_code not in (200, 201): return fail(f"task 3 COMPLETED {resp.status_code}")
349 log(f" task 3 completed")
350
351 # complete flow run
352 log("[bold]6. complete flow run[/bold]")
353 resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={
354 "state": {"type": "COMPLETED", "name": "Completed"},
355 })
356 if resp.status_code not in (200, 201) or resp.json().get("status") != "ACCEPT":
357 return fail(f"complete flow run failed")
358 log(f" [green]flow run completed[/green]")
359
360 # verify task runs can be read individually and are completed
361 log("[bold]7. verify task runs completed[/bold]")
362 for task_id, task_name in [(task1_id, "extract"), (task2_id, "transform"), (task3_id, "load")]:
363 resp = client.get(f"/task_runs/{task_id}")
364 if resp.status_code != 200: return fail(f"read task {task_name} {resp.status_code}")
365 task_run = resp.json()
366 if task_run.get("flow_run_id") != flow_run_id:
367 return fail(f"task {task_name} not linked to flow run")
368 if task_run.get("state_type") != "COMPLETED":
369 return fail(f"task {task_name} not COMPLETED")
370 log(f" [green]all 3 tasks completed and linked to flow run[/green]")
371
372 return True
373
374
375def test_task_run(client: CountingClient) -> bool:
376 """Test task run lifecycle."""
377 # create
378 if not QUIET:
379 console.print("[bold]POST /task_runs/[/bold]")
380 resp = client.post("/task_runs/", json={
381 "task_key": "bench-task",
382 "dynamic_key": f"key-{uuid.uuid4().hex[:8]}",
383 "name": f"task-{uuid.uuid4().hex[:8]}",
384 "state": {"type": "PENDING", "name": "Pending"},
385 })
386 if resp.status_code not in (200, 201):
387 if not QUIET:
388 console.print(f"[red]FAIL[/red]: {resp.status_code}")
389 return False
390 task_run_id = resp.json().get("id")
391 if not QUIET:
392 console.print(f" task_run_id: {task_run_id}")
393
394 # read
395 resp = client.get(f"/task_runs/{task_run_id}")
396 if resp.status_code != 200:
397 return False
398
399 # RUNNING
400 resp = client.post(f"/task_runs/{task_run_id}/set_state", json={
401 "state": {"type": "RUNNING", "name": "Running"},
402 "force": False,
403 })
404 if resp.status_code not in (200, 201):
405 return False
406 if not QUIET:
407 console.print(f" -> RUNNING: {resp.json().get('status')}")
408
409 # COMPLETED
410 resp = client.post(f"/task_runs/{task_run_id}/set_state", json={
411 "state": {"type": "COMPLETED", "name": "Completed"},
412 "force": False,
413 })
414 if resp.status_code not in (200, 201):
415 return False
416 if not QUIET:
417 console.print(f" -> COMPLETED: {resp.json().get('status')}")
418
419 return True
420
421
422def test_filters(client: CountingClient) -> bool:
423 """Test filter endpoints."""
424 for endpoint, label in [
425 ("/flows/filter", "flows"),
426 ("/flow_runs/filter", "flow_runs"),
427 ("/task_runs/filter", "task_runs"),
428 ]:
429 resp = client.post(endpoint, json={})
430 if resp.status_code != 200:
431 if not QUIET:
432 console.print(f"[red]FAIL[/red]: {endpoint} {resp.status_code}")
433 return False
434 if not QUIET:
435 console.print(f" {label}: {len(resp.json())} items")
436
437 return True
438
439
440def test_logs(client: CountingClient) -> bool:
441 """Test logs endpoint."""
442 from datetime import datetime, timezone
443
444 logs = [
445 {"level": 20, "message": "test log 1", "name": "test", "timestamp": datetime.now(timezone.utc).isoformat()},
446 {"level": 30, "message": "test log 2", "name": "test", "timestamp": datetime.now(timezone.utc).isoformat()},
447 ]
448 resp = client.post("/logs/", json=logs)
449 if resp.status_code not in (200, 201, 204):
450 if not QUIET:
451 console.print(f"[red]FAIL[/red]: {resp.status_code}")
452 return False
453 if not QUIET:
454 console.print(f" {len(logs)} logs sent")
455 return True
456
457
458def test_variables(client: CountingClient) -> bool:
459 """Test variables API (CRUD)."""
460 var_name = f"bench-var-{uuid.uuid4().hex[:8]}"
461
462 # create
463 if not QUIET:
464 console.print("[bold]POST /variables/[/bold]")
465 resp = client.post("/variables/", json={
466 "name": var_name,
467 "value": {"nested": "object", "count": 42},
468 "tags": ["benchmark", "test"],
469 })
470 if resp.status_code != 201:
471 if not QUIET:
472 console.print(f"[red]FAIL[/red]: create {resp.status_code}")
473 return False
474 variable = resp.json()
475 if not validate_response(variable, ["id", "name", "value", "tags", "created", "updated"], {"id": str, "name": str, "tags": list}):
476 return False
477 var_id = variable.get("id")
478 if not QUIET:
479 console.print(f" created: {var_id}")
480
481 # get by name
482 resp = client.get(f"/variables/name/{var_name}")
483 if resp.status_code != 200:
484 return False
485 if not QUIET:
486 console.print(f" get by name: ok")
487
488 # get by id
489 resp = client.get(f"/variables/{var_id}")
490 if resp.status_code != 200:
491 return False
492
493 # update by name
494 resp = client.patch(f"/variables/name/{var_name}", json={"value": "updated"})
495 if resp.status_code != 204:
496 return False
497 if not QUIET:
498 console.print(f" updated by name")
499
500 # filter
501 resp = client.post("/variables/filter", json={"limit": 10})
502 if resp.status_code != 200:
503 return False
504 if not QUIET:
505 console.print(f" filter: {len(resp.json())} items")
506
507 # count
508 resp = client.post("/variables/count", json={})
509 if resp.status_code != 200:
510 return False
511 if not QUIET:
512 console.print(f" count: {resp.text}")
513
514 # duplicate name should fail
515 resp = client.post("/variables/", json={"name": var_name, "value": "dupe"})
516 if resp.status_code != 409:
517 if not QUIET:
518 console.print(f"[red]FAIL[/red]: duplicate should return 409, got {resp.status_code}")
519 return False
520 if not QUIET:
521 console.print(f" duplicate rejected: 409")
522
523 # delete
524 resp = client.delete(f"/variables/name/{var_name}")
525 if resp.status_code != 204:
526 return False
527 if not QUIET:
528 console.print(f" deleted")
529
530 return True
531
532
533def test_blocks(client: CountingClient) -> bool:
534 """Test blocks API (types, schemas, documents)."""
535 slug = f"bench-block-{uuid.uuid4().hex[:8]}"
536
537 # create block type
538 if not QUIET:
539 console.print("[bold]block_types[/bold]")
540 resp = client.post("/block_types/", json={
541 "name": f"Bench Block {slug}",
542 "slug": slug,
543 "description": "benchmark block type",
544 })
545 if resp.status_code not in (200, 201):
546 if not QUIET:
547 console.print(f"[red]FAIL[/red]: create block_type {resp.status_code}")
548 return False
549 block_type = resp.json()
550 if not validate_response(block_type, ["id", "name", "slug"], {"id": str, "name": str, "slug": str}):
551 return False
552 block_type_id = block_type.get("id")
553 if not QUIET:
554 console.print(f" created: {block_type_id}")
555
556 # get by slug
557 resp = client.get(f"/block_types/slug/{slug}")
558 if resp.status_code != 200:
559 return False
560
561 # create schema
562 if not QUIET:
563 console.print("[bold]block_schemas[/bold]")
564 resp = client.post("/block_schemas/", json={
565 "block_type_id": block_type_id,
566 "fields": {"properties": {"value": {"type": "string"}}},
567 "capabilities": ["test"],
568 "version": "1.0.0",
569 })
570 if resp.status_code not in (200, 201):
571 if not QUIET:
572 console.print(f"[red]FAIL[/red]: create block_schema {resp.status_code}")
573 return False
574 block_schema = resp.json()
575 block_schema_id = block_schema.get("id")
576 checksum = block_schema.get("checksum")
577 if not QUIET:
578 console.print(f" created: {block_schema_id}")
579
580 # get by checksum
581 resp = client.get(f"/block_schemas/checksum/{checksum}")
582 if resp.status_code != 200:
583 return False
584
585 # create document
586 if not QUIET:
587 console.print("[bold]block_documents[/bold]")
588 doc_name = f"bench-doc-{uuid.uuid4().hex[:8]}"
589 resp = client.post("/block_documents/", json={
590 "name": doc_name,
591 "block_type_id": block_type_id,
592 "block_schema_id": block_schema_id,
593 "data": {"value": "secret-value"},
594 })
595 if resp.status_code not in (200, 201):
596 if not QUIET:
597 console.print(f"[red]FAIL[/red]: create block_document {resp.status_code}")
598 return False
599 block_doc = resp.json()
600 block_doc_id = block_doc.get("id")
601 if not QUIET:
602 console.print(f" created: {block_doc_id}")
603
604 # get by id
605 resp = client.get(f"/block_documents/{block_doc_id}")
606 if resp.status_code != 200:
607 return False
608
609 # get by slug/name
610 resp = client.get(f"/block_types/slug/{slug}/block_documents/name/{doc_name}")
611 if resp.status_code != 200:
612 return False
613
614 # update
615 resp = client.patch(f"/block_documents/{block_doc_id}", json={"data": {"value": "updated"}})
616 if resp.status_code != 204:
617 return False
618 if not QUIET:
619 console.print(f" updated")
620
621 # filters
622 for endpoint in ["/block_types/filter", "/block_schemas/filter", "/block_documents/filter"]:
623 resp = client.post(endpoint, json={})
624 if resp.status_code != 200:
625 return False
626
627 # delete
628 resp = client.delete(f"/block_documents/{block_doc_id}")
629 if resp.status_code != 204:
630 return False
631 if not QUIET:
632 console.print(f" deleted")
633
634 return True
635
636
637def test_work_pools(client: CountingClient) -> bool:
638 """Test work pools API (pools, queues, workers)."""
639 pool_name = f"test-pool-{uuid.uuid4().hex[:8]}"
640
641 # create work pool
642 if not QUIET:
643 console.print("[bold]work_pools[/bold]")
644 resp = client.post("/work_pools/", json={
645 "name": pool_name,
646 "type": "process",
647 "description": "test work pool",
648 })
649 if resp.status_code not in (200, 201):
650 if not QUIET:
651 console.print(f"[red]FAIL[/red]: create work_pool {resp.status_code} {resp.text}")
652 return False
653 pool = resp.json()
654 if not validate_response(pool, ["id", "name", "type", "status", "default_queue_id"], {"id": str, "name": str}):
655 return False
656 if not QUIET:
657 console.print(f" created: {pool.get('id')}")
658
659 # check default queue was created
660 if not pool.get("default_queue_id"):
661 if not QUIET:
662 console.print("[red]FAIL[/red]: no default_queue_id")
663 return False
664
665 # get by name
666 resp = client.get(f"/work_pools/{pool_name}")
667 if resp.status_code != 200:
668 if not QUIET:
669 console.print(f"[red]FAIL[/red]: get work_pool {resp.status_code}")
670 return False
671
672 # update
673 resp = client.patch(f"/work_pools/{pool_name}", json={"description": "updated"})
674 if resp.status_code != 204:
675 if not QUIET:
676 console.print(f"[red]FAIL[/red]: update work_pool {resp.status_code}")
677 return False
678 if not QUIET:
679 console.print(" updated")
680
681 # filter
682 resp = client.post("/work_pools/filter", json={})
683 if resp.status_code != 200:
684 if not QUIET:
685 console.print(f"[red]FAIL[/red]: filter work_pools {resp.status_code}")
686 return False
687 pools = resp.json()
688 if not isinstance(pools, list):
689 return False
690
691 # create queue
692 if not QUIET:
693 console.print("[bold]work_queues[/bold]")
694 queue_name = f"test-queue-{uuid.uuid4().hex[:8]}"
695 resp = client.post(f"/work_pools/{pool_name}/queues/", json={
696 "name": queue_name,
697 "description": "test queue",
698 "priority": 5,
699 })
700 if resp.status_code not in (200, 201):
701 if not QUIET:
702 console.print(f"[red]FAIL[/red]: create work_queue {resp.status_code} {resp.text}")
703 return False
704 queue = resp.json()
705 if not validate_response(queue, ["id", "name", "priority", "work_pool_id"], {"id": str, "priority": int}):
706 return False
707 if not QUIET:
708 console.print(f" created: {queue.get('id')}")
709
710 # get queue
711 resp = client.get(f"/work_pools/{pool_name}/queues/{queue_name}")
712 if resp.status_code != 200:
713 if not QUIET:
714 console.print(f"[red]FAIL[/red]: get work_queue {resp.status_code}")
715 return False
716
717 # filter queues
718 resp = client.post(f"/work_pools/{pool_name}/queues/filter", json={})
719 if resp.status_code != 200:
720 if not QUIET:
721 console.print(f"[red]FAIL[/red]: filter work_queues {resp.status_code}")
722 return False
723 queues = resp.json()
724 if not isinstance(queues, list) or len(queues) < 2: # default + our queue
725 if not QUIET:
726 console.print(f"[red]FAIL[/red]: expected at least 2 queues, got {len(queues) if isinstance(queues, list) else 'not a list'}")
727 return False
728
729 # worker heartbeat
730 if not QUIET:
731 console.print("[bold]workers[/bold]")
732 resp = client.post(f"/work_pools/{pool_name}/workers/heartbeat", json={
733 "name": "test-worker-1",
734 "heartbeat_interval_seconds": 30,
735 })
736 if resp.status_code != 204:
737 if not QUIET:
738 console.print(f"[red]FAIL[/red]: worker heartbeat {resp.status_code} {resp.text}")
739 return False
740 if not QUIET:
741 console.print(" heartbeat sent")
742
743 # check pool status is now READY
744 resp = client.get(f"/work_pools/{pool_name}")
745 if resp.status_code != 200:
746 return False
747 pool = resp.json()
748 if pool.get("status") != "READY":
749 if not QUIET:
750 console.print(f"[red]FAIL[/red]: expected pool status READY, got {pool.get('status')}")
751 return False
752 if not QUIET:
753 console.print(" pool status: READY")
754
755 # filter workers
756 resp = client.post(f"/work_pools/{pool_name}/workers/filter", json={})
757 if resp.status_code != 200:
758 if not QUIET:
759 console.print(f"[red]FAIL[/red]: filter workers {resp.status_code}")
760 return False
761 workers = resp.json()
762 if not isinstance(workers, list) or len(workers) < 1:
763 return False
764
765 # delete queue (not default)
766 resp = client.delete(f"/work_pools/{pool_name}/queues/{queue_name}")
767 if resp.status_code != 204:
768 if not QUIET:
769 console.print(f"[red]FAIL[/red]: delete work_queue {resp.status_code}")
770 return False
771 if not QUIET:
772 console.print(" deleted queue")
773
774 # delete pool
775 resp = client.delete(f"/work_pools/{pool_name}")
776 if resp.status_code != 204:
777 if not QUIET:
778 console.print(f"[red]FAIL[/red]: delete work_pool {resp.status_code}")
779 return False
780 if not QUIET:
781 console.print(" deleted pool")
782
783 return True
784
785
786def test_deployments(client: CountingClient) -> bool:
787 """Test deployments API (deployments, schedules, create_flow_run)."""
788 # create a flow first
789 if not QUIET:
790 console.print("[bold]setup: create flow[/bold]")
791 resp = client.post("/flows/", json={"name": f"deploy-flow-{uuid.uuid4().hex[:8]}"})
792 if resp.status_code not in (200, 201):
793 if not QUIET:
794 console.print(f"[red]FAIL[/red]: create flow {resp.status_code}")
795 return False
796 flow = resp.json()
797 flow_id = flow.get("id")
798 flow_name = flow.get("name")
799
800 # create deployment
801 deployment_name = f"test-deployment-{uuid.uuid4().hex[:8]}"
802 if not QUIET:
803 console.print("[bold]deployments[/bold]")
804 resp = client.post("/deployments/", json={
805 "name": deployment_name,
806 "flow_id": flow_id,
807 "description": "test deployment",
808 "tags": ["test", "benchmark"],
809 "parameters": {"key": "value"},
810 "schedules": [
811 {"schedule": {"interval": 3600}, "active": True},
812 ],
813 })
814 if resp.status_code not in (200, 201):
815 if not QUIET:
816 console.print(f"[red]FAIL[/red]: create deployment {resp.status_code} {resp.text}")
817 return False
818 deployment = resp.json()
819 if not validate_response(deployment, ["id", "name", "flow_id", "status", "schedules"], {"id": str, "name": str}):
820 return False
821 deployment_id = deployment.get("id")
822 if not QUIET:
823 console.print(f" created: {deployment_id}")
824
825 # verify schedules were created
826 schedules = deployment.get("schedules", [])
827 if not isinstance(schedules, list) or len(schedules) != 1:
828 if not QUIET:
829 console.print(f"[red]FAIL[/red]: expected 1 schedule, got {len(schedules) if isinstance(schedules, list) else 'not a list'}")
830 return False
831 if not QUIET:
832 console.print(f" schedules: {len(schedules)}")
833
834 # get by id
835 resp = client.get(f"/deployments/{deployment_id}")
836 if resp.status_code != 200:
837 if not QUIET:
838 console.print(f"[red]FAIL[/red]: get deployment {resp.status_code}")
839 return False
840
841 # get by name
842 resp = client.get(f"/deployments/name/{flow_name}/{deployment_name}")
843 if resp.status_code != 200:
844 if not QUIET:
845 console.print(f"[red]FAIL[/red]: get deployment by name {resp.status_code}")
846 return False
847 if not QUIET:
848 console.print(" get by name: ok")
849
850 # update
851 resp = client.patch(f"/deployments/{deployment_id}", json={"description": "updated"})
852 if resp.status_code != 204:
853 if not QUIET:
854 console.print(f"[red]FAIL[/red]: update deployment {resp.status_code}")
855 return False
856 if not QUIET:
857 console.print(" updated")
858
859 # filter
860 resp = client.post("/deployments/filter", json={"limit": 10})
861 if resp.status_code != 200:
862 if not QUIET:
863 console.print(f"[red]FAIL[/red]: filter deployments {resp.status_code}")
864 return False
865 if not QUIET:
866 console.print(f" filter: {len(resp.json())} items")
867
868 # count
869 resp = client.post("/deployments/count", json={})
870 if resp.status_code != 200:
871 if not QUIET:
872 console.print(f"[red]FAIL[/red]: count deployments {resp.status_code}")
873 return False
874 if not QUIET:
875 console.print(f" count: {resp.text}")
876
877 # pause
878 resp = client.post(f"/deployments/{deployment_id}/pause_deployment", json={})
879 if resp.status_code != 204:
880 if not QUIET:
881 console.print(f"[red]FAIL[/red]: pause deployment {resp.status_code}")
882 return False
883 if not QUIET:
884 console.print(" paused")
885
886 # resume
887 resp = client.post(f"/deployments/{deployment_id}/resume_deployment", json={})
888 if resp.status_code != 204:
889 if not QUIET:
890 console.print(f"[red]FAIL[/red]: resume deployment {resp.status_code}")
891 return False
892 if not QUIET:
893 console.print(" resumed")
894
895 # create flow run from deployment
896 if not QUIET:
897 console.print("[bold]create_flow_run[/bold]")
898 resp = client.post(f"/deployments/{deployment_id}/create_flow_run", json={})
899 if resp.status_code not in (200, 201):
900 if not QUIET:
901 console.print(f"[red]FAIL[/red]: create flow run {resp.status_code} {resp.text}")
902 return False
903 flow_run = resp.json()
904 if not validate_response(flow_run, ["id", "flow_id", "deployment_id"], {"id": str}):
905 return False
906 if flow_run.get("deployment_id") != deployment_id:
907 if not QUIET:
908 console.print(f"[red]FAIL[/red]: deployment_id mismatch")
909 return False
910 if not QUIET:
911 console.print(f" created flow run: {flow_run.get('id')}")
912
913 # schedules - list
914 if not QUIET:
915 console.print("[bold]deployment_schedules[/bold]")
916 resp = client.get(f"/deployments/{deployment_id}/schedules")
917 if resp.status_code != 200:
918 if not QUIET:
919 console.print(f"[red]FAIL[/red]: list schedules {resp.status_code}")
920 return False
921 schedules = resp.json()
922 schedule_id = schedules[0].get("id") if schedules else None
923 if not QUIET:
924 console.print(f" list: {len(schedules)} schedules")
925
926 # schedules - create
927 resp = client.post(f"/deployments/{deployment_id}/schedules", json={
928 "schedule": {"cron": "0 0 * * *"},
929 "active": False,
930 })
931 if resp.status_code not in (200, 201):
932 if not QUIET:
933 console.print(f"[red]FAIL[/red]: create schedule {resp.status_code}")
934 return False
935 created_schedules = resp.json()
936 if not isinstance(created_schedules, list) or len(created_schedules) != 1:
937 if not QUIET:
938 console.print(f"[red]FAIL[/red]: expected 1 created schedule")
939 return False
940 new_schedule_id = created_schedules[0].get("id")
941 if not QUIET:
942 console.print(f" created schedule: {new_schedule_id}")
943
944 # schedules - update
945 resp = client.patch(f"/deployments/{deployment_id}/schedules/{new_schedule_id}", json={"active": True})
946 if resp.status_code != 204:
947 if not QUIET:
948 console.print(f"[red]FAIL[/red]: update schedule {resp.status_code}")
949 return False
950 if not QUIET:
951 console.print(" updated schedule")
952
953 # schedules - delete
954 resp = client.delete(f"/deployments/{deployment_id}/schedules/{new_schedule_id}")
955 if resp.status_code != 204:
956 if not QUIET:
957 console.print(f"[red]FAIL[/red]: delete schedule {resp.status_code}")
958 return False
959 if not QUIET:
960 console.print(" deleted schedule")
961
962 # delete deployment
963 resp = client.delete(f"/deployments/{deployment_id}")
964 if resp.status_code != 204:
965 if not QUIET:
966 console.print(f"[red]FAIL[/red]: delete deployment {resp.status_code}")
967 return False
968 if not QUIET:
969 console.print(" deleted deployment")
970
971 return True
972
973
974def test_scheduler_idempotency(client: CountingClient) -> bool:
975 """Test that scheduler is idempotent - running twice doesn't create duplicates."""
976 import time as time_mod
977
978 def fail(msg: str) -> bool:
979 if not QUIET: console.print(f"[red]FAIL[/red]: {msg}")
980 return False
981
982 def log(msg: str) -> None:
983 if not QUIET: console.print(msg)
984
985 def wait_for_runs(deployment_id: str, min_count: int, max_wait: float = 10.0) -> int:
986 """Poll until we have at least min_count runs, or timeout (client-side filter)."""
987 start = time_mod.time()
988 while time_mod.time() - start < max_wait:
989 resp = client.post("/flow_runs/filter", json={"limit": 100})
990 if resp.status_code == 200:
991 all_runs = resp.json()
992 matching = [r for r in all_runs if r.get("deployment_id") == deployment_id]
993 if len(matching) >= min_count:
994 return len(matching)
995 time_mod.sleep(0.5)
996 return 0
997
998 # setup: create flow, work pool, deployment with interval schedule
999 log("[bold]setup[/bold]")
1000 resp = client.post("/flows/", json={"name": f"idem-flow-{uuid.uuid4().hex[:8]}"})
1001 if resp.status_code not in (200, 201): return fail(f"create flow {resp.status_code}")
1002 flow_id = resp.json().get("id")
1003
1004 pool_name = f"idem-pool-{uuid.uuid4().hex[:8]}"
1005 resp = client.post("/work_pools/", json={"name": pool_name, "type": "process"})
1006 if resp.status_code not in (200, 201): return fail(f"create work_pool {resp.status_code}")
1007 log(f" pool: {pool_name}")
1008
1009 # create deployment with interval schedule (every hour)
1010 resp = client.post("/deployments/", json={
1011 "name": f"idem-deploy-{uuid.uuid4().hex[:8]}",
1012 "flow_id": flow_id,
1013 "work_pool_name": pool_name,
1014 "schedules": [{"schedule": {"interval": 3600}, "active": True}], # every hour
1015 })
1016 if resp.status_code not in (200, 201): return fail(f"create deployment {resp.status_code}")
1017 deployment = resp.json()
1018 deployment_id = deployment.get("id")
1019 log(f" deployment: {deployment_id}")
1020
1021 # poll for scheduler to create runs and wait for tick to complete
1022 # we need to wait for count to stabilize, not just see the first run
1023 log("[bold]waiting for scheduler tick to complete...[/bold]")
1024 count_after_first = wait_for_runs(deployment_id, min_count=1)
1025 # wait for count to stabilize (scheduler tick may still be inserting)
1026 for _ in range(10):
1027 time_mod.sleep(0.2)
1028 resp = client.post("/flow_runs/filter", json={"limit": 100})
1029 if resp.status_code == 200:
1030 matching = [r for r in resp.json() if r.get("deployment_id") == deployment_id]
1031 if len(matching) == count_after_first:
1032 break # count stable
1033 count_after_first = len(matching)
1034 log(f" runs after first tick: {count_after_first}")
1035
1036 if count_after_first == 0:
1037 return fail("scheduler did not create any runs (timeout)")
1038
1039 # verify idempotency: poll for a few seconds to confirm count stays stable
1040 # if scheduler creates duplicates, count would increase during this window
1041 log("[bold]verifying idempotency (polling to confirm count stable)...[/bold]")
1042 stable_checks = 0
1043 for _ in range(6): # 6 checks over ~3 seconds
1044 time_mod.sleep(0.5)
1045 resp = client.post("/flow_runs/filter", json={"limit": 100})
1046 if resp.status_code != 200: continue
1047 all_runs = resp.json()
1048 matching = [r for r in all_runs if r.get("deployment_id") == deployment_id]
1049 current_count = len(matching)
1050 if current_count == count_after_first:
1051 stable_checks += 1
1052 elif current_count > count_after_first:
1053 return fail(f"idempotency failed: {count_after_first} -> {current_count} runs")
1054
1055 log(f" stable count checks: {stable_checks}/6")
1056 log(f" [green]idempotency verified: count unchanged at {count_after_first}[/green]")
1057
1058 # cleanup
1059 client.delete(f"/deployments/{deployment_id}")
1060 client.delete(f"/work_pools/{pool_name}")
1061 log(" cleanup: ok")
1062
1063 return True
1064
1065
1066def test_parameter_merging(client: CountingClient) -> bool:
1067 """Test that schedule parameters override deployment parameters."""
1068 import time as time_mod
1069
1070 def fail(msg: str) -> bool:
1071 if not QUIET: console.print(f"[red]FAIL[/red]: {msg}")
1072 return False
1073
1074 def log(msg: str) -> None:
1075 if not QUIET: console.print(msg)
1076
1077 # setup
1078 log("[bold]setup[/bold]")
1079 resp = client.post("/flows/", json={"name": f"params-flow-{uuid.uuid4().hex[:8]}"})
1080 if resp.status_code not in (200, 201): return fail(f"create flow {resp.status_code}")
1081 flow_id = resp.json().get("id")
1082
1083 pool_name = f"params-pool-{uuid.uuid4().hex[:8]}"
1084 resp = client.post("/work_pools/", json={"name": pool_name, "type": "process"})
1085 if resp.status_code not in (200, 201): return fail(f"create work_pool {resp.status_code}")
1086 log(f" pool: {pool_name}")
1087
1088 # create deployment with base parameters
1089 # schedule has override parameter
1090 resp = client.post("/deployments/", json={
1091 "name": f"params-deploy-{uuid.uuid4().hex[:8]}",
1092 "flow_id": flow_id,
1093 "work_pool_name": pool_name,
1094 "parameters": {"base_key": "base_value", "override_key": "deployment_value"},
1095 "schedules": [{
1096 "schedule": {"interval": 3600},
1097 "active": True,
1098 "parameters": {"override_key": "schedule_value", "schedule_key": "schedule_only"},
1099 }],
1100 })
1101 if resp.status_code not in (200, 201): return fail(f"create deployment {resp.status_code}")
1102 deployment = resp.json()
1103 deployment_id = deployment.get("id")
1104 log(f" deployment: {deployment_id}")
1105 log(f" deployment params: {deployment.get('parameters')}")
1106
1107 # poll for scheduler to create runs (filter client-side since server filter doesn't support deployment_id yet)
1108 log("[bold]polling for scheduler to create runs...[/bold]")
1109 runs = []
1110 start = time_mod.time()
1111 max_wait = 10.0
1112 while time_mod.time() - start < max_wait:
1113 resp = client.post("/flow_runs/filter", json={"limit": 100})
1114 if resp.status_code == 200:
1115 all_runs = resp.json()
1116 runs = [r for r in all_runs if r.get("deployment_id") == deployment_id]
1117 if len(runs) > 0:
1118 break
1119 time_mod.sleep(0.5)
1120 log(f" found {len(runs)} runs for deployment in {time_mod.time() - start:.1f}s")
1121
1122 if len(runs) == 0:
1123 return fail("scheduler did not create any runs (timeout)")
1124
1125 # check merged parameters on first run
1126 run_params = runs[0].get("parameters", {})
1127 if isinstance(run_params, str):
1128 import json as json_mod
1129 run_params = json_mod.loads(run_params)
1130 log(f" run params: {run_params}")
1131
1132 # verify merging:
1133 # - base_key should be from deployment
1134 # - override_key should be from schedule (override)
1135 # - schedule_key should be from schedule (new key)
1136 if run_params.get("base_key") != "base_value":
1137 return fail(f"base_key not preserved: {run_params.get('base_key')}")
1138 if run_params.get("override_key") != "schedule_value":
1139 return fail(f"override_key not overridden: {run_params.get('override_key')}")
1140 if run_params.get("schedule_key") != "schedule_only":
1141 return fail(f"schedule_key not added: {run_params.get('schedule_key')}")
1142
1143 log(" [green]parameter merging verified[/green]")
1144
1145 # cleanup
1146 client.delete(f"/deployments/{deployment_id}")
1147 client.delete(f"/work_pools/{pool_name}")
1148 log(" cleanup: ok")
1149
1150 return True
1151
1152
1153def test_get_scheduled_flow_runs(client: CountingClient) -> bool:
1154 """Test get_scheduled_flow_runs endpoint (worker polling)."""
1155 from datetime import datetime, timezone
1156
1157 def fail(msg: str) -> bool:
1158 if not QUIET: console.print(f"[red]FAIL[/red]: {msg}")
1159 return False
1160
1161 def log(msg: str) -> None:
1162 if not QUIET: console.print(msg)
1163
1164 # setup
1165 log("[bold]setup: create flow[/bold]")
1166 resp = client.post("/flows/", json={"name": f"schedule-flow-{uuid.uuid4().hex[:8]}"})
1167 if resp.status_code not in (200, 201): return fail(f"create flow {resp.status_code}")
1168 flow_id = resp.json().get("id")
1169
1170 pool_name = f"schedule-pool-{uuid.uuid4().hex[:8]}"
1171 resp = client.post("/work_pools/", json={"name": pool_name, "type": "process"})
1172 if resp.status_code not in (200, 201): return fail(f"create work_pool {resp.status_code}")
1173 pool = resp.json()
1174 pool_id, default_queue_id = pool.get("id"), pool.get("default_queue_id")
1175 log(f" pool: {pool_id}")
1176 if pool.get("status") != "NOT_READY": return fail(f"expected pool NOT_READY, got {pool.get('status')}")
1177
1178 resp = client.post("/deployments/", json={"name": f"schedule-deploy-{uuid.uuid4().hex[:8]}", "flow_id": flow_id, "work_pool_name": pool_name})
1179 if resp.status_code not in (200, 201): return fail(f"create deployment {resp.status_code}")
1180 deployment = resp.json()
1181 deployment_id = deployment.get("id")
1182 log(f" deployment: {deployment_id}")
1183 if deployment.get("status") != "NOT_READY": return fail(f"expected deployment NOT_READY, got {deployment.get('status')}")
1184
1185 # create scheduled flow run
1186 log("[bold]create scheduled flow run[/bold]")
1187 resp = client.post(f"/deployments/{deployment_id}/create_flow_run", json={"state": {"type": "SCHEDULED", "name": "Scheduled"}})
1188 if resp.status_code not in (200, 201): return fail(f"create flow run {resp.status_code}")
1189 flow_run = resp.json()
1190 flow_run_id = flow_run.get("id")
1191 log(f" flow_run: {flow_run_id}")
1192 log(f" state: {flow_run.get('state_type')}")
1193 if flow_run.get("state_type") != "SCHEDULED": return fail(f"expected SCHEDULED, got {flow_run.get('state_type')}")
1194
1195 # test polling
1196 log("[bold]get_scheduled_flow_runs[/bold]")
1197 resp = client.post(f"/work_pools/{pool_name}/get_scheduled_flow_runs", json={})
1198 if resp.status_code != 200: return fail(f"get_scheduled_flow_runs {resp.status_code}")
1199 scheduled_runs = resp.json()
1200 if not isinstance(scheduled_runs, list): return fail(f"expected list, got {type(scheduled_runs)}")
1201 log(f" returned {len(scheduled_runs)} runs")
1202
1203 # verify our run is in results
1204 found = any(item.get("flow_run", {}).get("id") == flow_run_id and
1205 item.get("work_pool_id") == pool_id and item.get("work_queue_id") == default_queue_id
1206 for item in scheduled_runs)
1207 if not found: return fail("scheduled flow run not found in results")
1208 log(" flow run found in results")
1209
1210 # verify status changes
1211 resp = client.get(f"/work_pools/{pool_name}")
1212 if resp.status_code != 200 or resp.json().get("status") != "READY":
1213 return fail(f"expected pool READY after polling")
1214 log(" pool status: READY")
1215
1216 resp = client.get(f"/deployments/{deployment_id}")
1217 if resp.status_code != 200 or resp.json().get("status") != "READY":
1218 return fail(f"expected deployment READY after polling")
1219 log(" deployment status: READY")
1220
1221 # test filters
1222 resp = client.post(f"/work_pools/{pool_name}/get_scheduled_flow_runs", json={"work_queue_names": ["default"]})
1223 if resp.status_code != 200: return fail(f"filter test {resp.status_code}")
1224 log(" filtered by queue: ok")
1225
1226 resp = client.post(f"/work_pools/{pool_name}/get_scheduled_flow_runs", json={"scheduled_before": datetime.now(timezone.utc).isoformat()})
1227 if resp.status_code != 200: return fail(f"scheduled_before test {resp.status_code}")
1228 log(f" scheduled_before filter: {len(resp.json())} runs")
1229
1230 # cleanup
1231 client.delete(f"/deployments/{deployment_id}")
1232 client.delete(f"/work_pools/{pool_name}")
1233 log(" cleanup: ok")
1234 return True
1235
1236
1237def test_retry_failed_flows(client: CountingClient) -> bool:
1238 """Test RetryFailedFlows rule - retry cycle when configured."""
1239
1240 def fail(msg: str) -> bool:
1241 if not QUIET: console.print(f"[red]FAIL[/red]: {msg}")
1242 return False
1243
1244 def log(msg: str) -> None:
1245 if not QUIET: console.print(msg)
1246
1247 # setup
1248 log("[bold]setup[/bold]")
1249 resp = client.post("/flows/", json={"name": f"retry-flow-{uuid.uuid4().hex[:8]}"})
1250 if resp.status_code not in (200, 201): return fail(f"create flow {resp.status_code}")
1251 flow_id = resp.json().get("id")
1252
1253 # create flow run with retry configuration via empirical_policy
1254 log("[bold]create flow run with retries=2, retry_delay=1[/bold]")
1255 resp = client.post("/flow_runs/", json={
1256 "flow_id": flow_id,
1257 "name": f"retry-run-{uuid.uuid4().hex[:8]}",
1258 "state": {"type": "PENDING", "name": "Pending"},
1259 "empirical_policy": {"retries": 2, "retry_delay": 1},
1260 })
1261 if resp.status_code not in (200, 201): return fail(f"create flow run {resp.status_code}")
1262 flow_run = resp.json()
1263 flow_run_id = flow_run.get("id")
1264 log(f" flow_run_id: {flow_run_id}")
1265
1266 # run_count starts at 0, increments when entering RUNNING
1267 # transition to RUNNING (run_count becomes 1)
1268 log("[bold]PENDING → RUNNING (first attempt)[/bold]")
1269 resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={
1270 "state": {"type": "RUNNING", "name": "Running"},
1271 })
1272 if resp.status_code not in (200, 201): return fail(f"set RUNNING {resp.status_code}")
1273 if resp.json().get("status") != "ACCEPT":
1274 return fail(f"expected ACCEPT for RUNNING, got {resp.json().get('status')}")
1275
1276 # verify run_count
1277 resp = client.get(f"/flow_runs/{flow_run_id}")
1278 if resp.status_code != 200: return fail(f"get flow run {resp.status_code}")
1279 run_count = resp.json().get("run_count", 0)
1280 log(f" run_count after RUNNING: {run_count}")
1281
1282 # fail - with retries available, should REJECT and schedule retry
1283 log("[bold]RUNNING → FAILED (expect REJECT + AwaitingRetry)[/bold]")
1284 resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={
1285 "state": {"type": "FAILED", "name": "Failed", "message": "Test failure"},
1286 })
1287 if resp.status_code not in (200, 201): return fail(f"set FAILED {resp.status_code}")
1288 result = resp.json()
1289 status = result.get("status")
1290 log(f" status: {status}")
1291
1292 # with retries, should get REJECT and the run should be in AwaitingRetry state
1293 if status == "REJECT":
1294 log(f" [green]REJECT (retries available)[/green]")
1295 # verify state is now AwaitingRetry (SCHEDULED)
1296 resp = client.get(f"/flow_runs/{flow_run_id}")
1297 if resp.status_code != 200: return fail(f"get flow run {resp.status_code}")
1298 actual_state = resp.json().get("state_type")
1299 actual_name = resp.json().get("state_name")
1300 log(f" state: {actual_state}/{actual_name}")
1301 if actual_state != "SCHEDULED":
1302 return fail(f"expected SCHEDULED (AwaitingRetry), got {actual_state}")
1303 log(f" [green]state: SCHEDULED/AwaitingRetry (correct)[/green]")
1304 elif status == "ACCEPT":
1305 # this could happen if run_count > retries - check if that's the case
1306 log(f" ACCEPT (retries may be exhausted)")
1307 return True
1308 else:
1309 return fail(f"unexpected status: {status}")
1310
1311 # complete the retry cycle: SCHEDULED → PENDING → RUNNING → COMPLETED
1312 log("[bold]retry cycle: SCHEDULED → PENDING → RUNNING → COMPLETED[/bold]")
1313
1314 # SCHEDULED → PENDING
1315 resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={
1316 "state": {"type": "PENDING", "name": "Pending"},
1317 })
1318 if resp.status_code not in (200, 201): return fail(f"retry PENDING {resp.status_code}")
1319 log(f" → PENDING: {resp.json().get('status')}")
1320
1321 # PENDING → RUNNING (run_count increments)
1322 resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={
1323 "state": {"type": "RUNNING", "name": "Running"},
1324 })
1325 if resp.status_code not in (200, 201): return fail(f"retry RUNNING {resp.status_code}")
1326 log(f" → RUNNING: {resp.json().get('status')}")
1327
1328 # verify run_count incremented
1329 resp = client.get(f"/flow_runs/{flow_run_id}")
1330 run_count = resp.json().get("run_count", 0)
1331 log(f" run_count: {run_count}")
1332
1333 # complete successfully
1334 resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={
1335 "state": {"type": "COMPLETED", "name": "Completed"},
1336 })
1337 if resp.status_code not in (200, 201): return fail(f"COMPLETED {resp.status_code}")
1338 if resp.json().get("status") != "ACCEPT":
1339 return fail(f"expected ACCEPT for COMPLETED, got {resp.json().get('status')}")
1340 log(f" [green]→ COMPLETED: ACCEPT[/green]")
1341
1342 return True
1343
1344
1345def test_cancellation_flow(client: CountingClient) -> bool:
1346 """Test cancellation states (CANCELLING, CANCELLED)."""
1347
1348 def fail(msg: str) -> bool:
1349 if not QUIET: console.print(f"[red]FAIL[/red]: {msg}")
1350 return False
1351
1352 def log(msg: str) -> None:
1353 if not QUIET: console.print(msg)
1354
1355 # setup
1356 log("[bold]setup[/bold]")
1357 resp = client.post("/flows/", json={"name": f"cancel-flow-{uuid.uuid4().hex[:8]}"})
1358 if resp.status_code not in (200, 201): return fail(f"create flow {resp.status_code}")
1359 flow_id = resp.json().get("id")
1360
1361 resp = client.post("/flow_runs/", json={
1362 "flow_id": flow_id,
1363 "name": f"cancel-run-{uuid.uuid4().hex[:8]}",
1364 "state": {"type": "PENDING", "name": "Pending"},
1365 })
1366 if resp.status_code not in (200, 201): return fail(f"create flow run {resp.status_code}")
1367 flow_run_id = resp.json().get("id")
1368 log(f" flow_run_id: {flow_run_id}")
1369
1370 # start the run
1371 log("[bold]PENDING → RUNNING[/bold]")
1372 resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={
1373 "state": {"type": "RUNNING", "name": "Running"},
1374 })
1375 if resp.status_code not in (200, 201) or resp.json().get("status") != "ACCEPT":
1376 return fail(f"RUNNING transition failed")
1377 log(f" status: ACCEPT")
1378
1379 # cancel while running - first CANCELLING
1380 log("[bold]RUNNING → CANCELLING[/bold]")
1381 resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={
1382 "state": {"type": "CANCELLING", "name": "Cancelling"},
1383 })
1384 if resp.status_code not in (200, 201): return fail(f"CANCELLING {resp.status_code}")
1385 if resp.json().get("status") != "ACCEPT":
1386 return fail(f"expected ACCEPT for CANCELLING, got {resp.json().get('status')}")
1387 log(f" [green]status: ACCEPT[/green]")
1388
1389 # verify state
1390 resp = client.get(f"/flow_runs/{flow_run_id}")
1391 if resp.json().get("state_type") != "CANCELLING":
1392 return fail(f"expected CANCELLING, got {resp.json().get('state_type')}")
1393
1394 # complete cancellation
1395 log("[bold]CANCELLING → CANCELLED[/bold]")
1396 resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={
1397 "state": {"type": "CANCELLED", "name": "Cancelled"},
1398 })
1399 if resp.status_code not in (200, 201): return fail(f"CANCELLED {resp.status_code}")
1400 if resp.json().get("status") != "ACCEPT":
1401 return fail(f"expected ACCEPT for CANCELLED, got {resp.json().get('status')}")
1402 log(f" [green]status: ACCEPT[/green]")
1403
1404 # verify final state
1405 resp = client.get(f"/flow_runs/{flow_run_id}")
1406 if resp.json().get("state_type") != "CANCELLED":
1407 return fail(f"expected CANCELLED, got {resp.json().get('state_type')}")
1408 log(f" [green]final state: CANCELLED[/green]")
1409
1410 # CANCELLED is terminal - verify can't go back to PENDING (PreventPendingTransitions)
1411 log("[bold]CANCELLED → PENDING (expect REJECT)[/bold]")
1412 resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={
1413 "state": {"type": "PENDING", "name": "Pending"},
1414 })
1415 if resp.status_code not in (200, 201): return fail(f"PENDING attempt {resp.status_code}")
1416 if resp.json().get("status") != "REJECT":
1417 return fail(f"expected REJECT for CANCELLED→PENDING, got {resp.json().get('status')}")
1418 log(f" [green]CANCELLED → PENDING: REJECT (correct)[/green]")
1419
1420 return True
1421
1422
1423def test_serve_pattern(client: CountingClient) -> bool:
1424 """Test Runner/.serve() pattern: poll /deployments/get_scheduled_flow_runs, execute locally.
1425
1426 This simulates what happens when you call flow.serve() - a Runner process polls
1427 the deployments endpoint for scheduled runs and executes them in subprocesses.
1428 """
1429
1430 def fail(msg: str) -> bool:
1431 if not QUIET: console.print(f"[red]FAIL[/red]: {msg}")
1432 return False
1433
1434 def log(msg: str) -> None:
1435 if not QUIET: console.print(msg)
1436
1437 # setup: create flow and deployment (no work pool - serve doesn't use workers)
1438 log("[bold]setup: flow + deployment (no work pool)[/bold]")
1439 resp = client.post("/flows/", json={"name": f"serve-flow-{uuid.uuid4().hex[:8]}"})
1440 if resp.status_code not in (200, 201): return fail(f"create flow {resp.status_code}")
1441 flow_id = resp.json().get("id")
1442
1443 resp = client.post("/deployments/", json={
1444 "name": f"serve-deploy-{uuid.uuid4().hex[:8]}",
1445 "flow_id": flow_id,
1446 # no work_pool_name - this is the serve pattern
1447 })
1448 if resp.status_code not in (200, 201): return fail(f"create deployment {resp.status_code}")
1449 deployment = resp.json()
1450 deployment_id = deployment.get("id")
1451 log(f" deployment: {deployment_id}")
1452
1453 # create a scheduled flow run
1454 log("[bold]1. create scheduled flow run[/bold]")
1455 resp = client.post(f"/deployments/{deployment_id}/create_flow_run", json={
1456 "state": {"type": "SCHEDULED", "name": "Scheduled"},
1457 })
1458 if resp.status_code not in (200, 201): return fail(f"create flow run {resp.status_code}")
1459 flow_run = resp.json()
1460 flow_run_id = flow_run.get("id")
1461 log(f" flow_run: {flow_run_id}")
1462
1463 # Runner polls: POST /deployments/get_scheduled_flow_runs
1464 log("[bold]2. Runner polls /deployments/get_scheduled_flow_runs[/bold]")
1465 from datetime import datetime, timezone
1466 resp = client.post("/deployments/get_scheduled_flow_runs", json={
1467 "deployment_ids": [deployment_id],
1468 "scheduled_before": datetime.now(timezone.utc).isoformat(),
1469 "limit": 10,
1470 })
1471 if resp.status_code != 200: return fail(f"poll {resp.status_code}")
1472 scheduled = resp.json()
1473 found = any(r.get("id") == flow_run_id for r in scheduled)
1474 if not found: return fail("run not found in scheduled runs")
1475 log(f" found {len(scheduled)} scheduled run(s)")
1476
1477 # Runner executes: SCHEDULED → PENDING → RUNNING → COMPLETED
1478 log("[bold]3. Runner executes flow locally[/bold]")
1479
1480 # claim the run
1481 resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={
1482 "state": {"type": "PENDING", "name": "Pending"},
1483 })
1484 if resp.status_code not in (200, 201): return fail(f"PENDING {resp.status_code}")
1485 log(f" → PENDING: {resp.json().get('status')}")
1486
1487 # execute
1488 resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={
1489 "state": {"type": "RUNNING", "name": "Running"},
1490 })
1491 if resp.status_code not in (200, 201): return fail(f"RUNNING {resp.status_code}")
1492 log(f" → RUNNING: {resp.json().get('status')}")
1493
1494 # complete
1495 resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={
1496 "state": {"type": "COMPLETED", "name": "Completed"},
1497 })
1498 if resp.status_code not in (200, 201): return fail(f"COMPLETED {resp.status_code}")
1499 if resp.json().get("status") != "ACCEPT":
1500 return fail(f"expected ACCEPT, got {resp.json().get('status')}")
1501 log(f" [green]→ COMPLETED: ACCEPT[/green]")
1502
1503 # cleanup
1504 client.delete(f"/deployments/{deployment_id}")
1505 log(" cleanup: ok")
1506
1507 return True
1508
1509
1510def test_worker_pattern(client: CountingClient) -> bool:
1511 """Test Worker pattern: poll /work_pools/{name}/get_scheduled_flow_runs, dispatch to infra.
1512
1513 This simulates what a Prefect Worker does - it polls a work pool for scheduled runs
1514 and dispatches them to infrastructure (k8s, docker, etc). The worker doesn't execute
1515 the flow itself; infrastructure does.
1516 """
1517
1518 def fail(msg: str) -> bool:
1519 if not QUIET: console.print(f"[red]FAIL[/red]: {msg}")
1520 return False
1521
1522 def log(msg: str) -> None:
1523 if not QUIET: console.print(msg)
1524
1525 # setup: flow + work pool + deployment with work_pool_name
1526 log("[bold]setup: flow + work pool + deployment[/bold]")
1527 resp = client.post("/flows/", json={"name": f"worker-flow-{uuid.uuid4().hex[:8]}"})
1528 if resp.status_code not in (200, 201): return fail(f"create flow {resp.status_code}")
1529 flow_id = resp.json().get("id")
1530
1531 pool_name = f"worker-pool-{uuid.uuid4().hex[:8]}"
1532 resp = client.post("/work_pools/", json={"name": pool_name, "type": "kubernetes"})
1533 if resp.status_code not in (200, 201): return fail(f"create work_pool {resp.status_code}")
1534 log(f" pool: {pool_name}")
1535
1536 resp = client.post("/deployments/", json={
1537 "name": f"worker-deploy-{uuid.uuid4().hex[:8]}",
1538 "flow_id": flow_id,
1539 "work_pool_name": pool_name, # KEY: uses work pool
1540 })
1541 if resp.status_code not in (200, 201): return fail(f"create deployment {resp.status_code}")
1542 deployment_id = resp.json().get("id")
1543 log(f" deployment: {deployment_id}")
1544
1545 # worker registers via heartbeat
1546 log("[bold]1. Worker sends heartbeat[/bold]")
1547 resp = client.post(f"/work_pools/{pool_name}/workers/heartbeat", json={
1548 "name": "k8s-worker-1",
1549 "heartbeat_interval_seconds": 30,
1550 })
1551 if resp.status_code != 204: return fail(f"heartbeat {resp.status_code}")
1552 log(f" heartbeat sent")
1553
1554 # create a scheduled flow run
1555 log("[bold]2. create scheduled flow run[/bold]")
1556 resp = client.post(f"/deployments/{deployment_id}/create_flow_run", json={
1557 "state": {"type": "SCHEDULED", "name": "Scheduled"},
1558 })
1559 if resp.status_code not in (200, 201): return fail(f"create flow run {resp.status_code}")
1560 flow_run_id = resp.json().get("id")
1561 log(f" flow_run: {flow_run_id}")
1562
1563 # Worker polls: POST /work_pools/{name}/get_scheduled_flow_runs
1564 log("[bold]3. Worker polls /work_pools/{name}/get_scheduled_flow_runs[/bold]")
1565 resp = client.post(f"/work_pools/{pool_name}/get_scheduled_flow_runs", json={})
1566 if resp.status_code != 200: return fail(f"poll {resp.status_code}")
1567 scheduled = resp.json()
1568 found = any(r.get("flow_run", {}).get("id") == flow_run_id for r in scheduled)
1569 if not found: return fail("run not found in work pool queue")
1570 log(f" found run in work pool queue")
1571
1572 # Worker dispatches to infrastructure (simulated - infra reports back state)
1573 log("[bold]4. Infrastructure executes and reports state[/bold]")
1574
1575 # infra claims
1576 resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={
1577 "state": {"type": "PENDING", "name": "Pending"},
1578 })
1579 log(f" → PENDING: {resp.json().get('status')}")
1580
1581 # infra runs
1582 resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={
1583 "state": {"type": "RUNNING", "name": "Running"},
1584 })
1585 log(f" → RUNNING: {resp.json().get('status')}")
1586
1587 # infra completes
1588 resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={
1589 "state": {"type": "COMPLETED", "name": "Completed"},
1590 })
1591 if resp.json().get("status") != "ACCEPT":
1592 return fail(f"expected ACCEPT, got {resp.json().get('status')}")
1593 log(f" [green]→ COMPLETED: ACCEPT[/green]")
1594
1595 # cleanup
1596 client.delete(f"/deployments/{deployment_id}")
1597 client.delete(f"/work_pools/{pool_name}")
1598 log(" cleanup: ok")
1599
1600 return True
1601
1602
1603def test_late_runs(client: CountingClient) -> bool:
1604 """Test that late_runs service marks overdue scheduled runs as Late."""
1605 from datetime import datetime, timezone, timedelta
1606 import time as time_mod
1607 fail = lambda msg: (QUIET or console.print(f"[red]FAIL[/red]: {msg}"), False)[1]
1608 log = lambda msg: QUIET or console.print(msg)
1609
1610 resp = client.post("/flows/", json={"name": f"late-flow-{uuid.uuid4().hex[:8]}"})
1611 if resp.status_code not in (200, 201): return fail(f"create flow {resp.status_code}")
1612 flow_id = resp.json().get("id")
1613
1614 # create flow run with past scheduled time (60s ago, threshold is 15s)
1615 past_time = (datetime.now(timezone.utc) - timedelta(seconds=60)).isoformat()
1616 resp = client.post("/flow_runs/", json={
1617 "flow_id": flow_id, "name": f"late-run-{uuid.uuid4().hex[:8]}",
1618 "state": {"type": "SCHEDULED", "name": "Scheduled"}, "next_scheduled_start_time": past_time})
1619 if resp.status_code not in (200, 201): return fail(f"create flow run {resp.status_code}")
1620 flow_run_id = resp.json().get("id")
1621
1622 # wait for late_runs service (runs every 5s)
1623 for i in range(15):
1624 time_mod.sleep(1)
1625 resp = client.get(f"/flow_runs/{flow_run_id}")
1626 if resp.status_code == 200 and resp.json().get("state_name") == "Late":
1627 log(f" [green]state changed to Late after {i+1}s[/green]")
1628 return True
1629 return fail("late_runs service did not mark run as Late within timeout")
1630
1631
1632def test_work_queue_priority(client: CountingClient) -> bool:
1633 """Test work queue priority ordering in get_scheduled_flow_runs."""
1634 fail = lambda msg: (QUIET or console.print(f"[red]FAIL[/red]: {msg}"), False)[1]
1635 log = lambda msg: QUIET or console.print(msg)
1636
1637 resp = client.post("/flows/", json={"name": f"priority-flow-{uuid.uuid4().hex[:8]}"})
1638 if resp.status_code not in (200, 201): return fail(f"create flow {resp.status_code}")
1639 flow_id = resp.json().get("id")
1640
1641 pool_name = f"priority-pool-{uuid.uuid4().hex[:8]}"
1642 resp = client.post("/work_pools/", json={"name": pool_name, "type": "process"})
1643 if resp.status_code not in (200, 201): return fail(f"create pool {resp.status_code}")
1644
1645 # create high and low priority queues
1646 for name, pri in [("high-priority", 1), ("low-priority", 100)]:
1647 resp = client.post(f"/work_pools/{pool_name}/queues/", json={"name": name, "priority": pri})
1648 if resp.status_code not in (200, 201): return fail(f"create {name} queue {resp.status_code}")
1649
1650 # create deployments for each queue and flow runs in reverse priority order
1651 deployments, flow_runs = {}, {}
1652 for q in ["high-priority", "low-priority", "default"]:
1653 resp = client.post("/deployments/", json={"name": f"d-{q}-{uuid.uuid4().hex[:8]}", "flow_id": flow_id,
1654 "work_pool_name": pool_name, "work_queue_name": q})
1655 if resp.status_code not in (200, 201): return fail(f"create deployment {q} {resp.status_code}")
1656 deployments[q] = resp.json().get("id")
1657 for q in ["low-priority", "default", "high-priority"]:
1658 resp = client.post(f"/deployments/{deployments[q]}/create_flow_run", json={"state": {"type": "SCHEDULED", "name": "Scheduled"}})
1659 if resp.status_code not in (200, 201): return fail(f"create run for {q} {resp.status_code}")
1660 flow_runs[q] = resp.json().get("id")
1661
1662 # poll and verify priority ordering
1663 resp = client.post(f"/work_pools/{pool_name}/get_scheduled_flow_runs", json={})
1664 if resp.status_code != 200: return fail(f"poll {resp.status_code}")
1665 scheduled = resp.json()
1666 if len(scheduled) < 3: return fail(f"expected 3 runs, got {len(scheduled)}")
1667
1668 run_ids = [r.get("flow_run", {}).get("id") for r in scheduled]
1669 pos = {q: run_ids.index(flow_runs[q]) if flow_runs[q] in run_ids else -1 for q in flow_runs}
1670 if -1 in pos.values(): return fail("not all runs found")
1671 if pos["high-priority"] > pos["low-priority"]: return fail("high should come before low")
1672 log(f" [green]priority ordering verified (high={pos['high-priority']}, low={pos['low-priority']})[/green]")
1673
1674 for d in deployments.values(): client.delete(f"/deployments/{d}")
1675 client.delete(f"/work_pools/{pool_name}")
1676 return True
1677
1678
1679def main():
1680 json_output = "--json" in sys.argv
1681
1682 if not QUIET:
1683 console.print("\n[bold cyan]prefect-server API test[/bold cyan]\n")
1684
1685 results: list[TestResult] = []
1686
1687 # run all tests
1688 # core API tests (fast, no sleeps)
1689 results.append(run_test("admin", test_admin))
1690 results.append(run_test("flow_run (success)", lambda c: test_flow_run(c, should_fail=False)))
1691 results.append(run_test("flow_run (failure)", lambda c: test_flow_run(c, should_fail=True)))
1692 results.append(run_test("flow_with_task_runs", test_flow_with_task_runs))
1693 results.append(run_test("task_run", test_task_run))
1694 results.append(run_test("filters", test_filters))
1695 results.append(run_test("logs", test_logs))
1696 results.append(run_test("variables", test_variables))
1697 results.append(run_test("blocks", test_blocks))
1698 results.append(run_test("work_pools", test_work_pools))
1699 results.append(run_test("deployments", test_deployments))
1700
1701 # execution patterns (two distinct models)
1702 results.append(run_test("serve_pattern", test_serve_pattern)) # Runner: polls deployments
1703 results.append(run_test("worker_pattern", test_worker_pattern)) # Worker: polls work pools
1704 results.append(run_test("work_queue_priority", test_work_queue_priority))
1705 results.append(run_test("retry_failed_flows", test_retry_failed_flows))
1706 results.append(run_test("cancellation_flow", test_cancellation_flow))
1707 results.append(run_test("late_runs", test_late_runs)) # background service marks overdue runs
1708
1709 # scheduler integration tests (require sleeps for background service)
1710 results.append(run_test("scheduler_idempotency", test_scheduler_idempotency))
1711 results.append(run_test("parameter_merging", test_parameter_merging))
1712
1713 total_duration = sum(r.duration_ms for r in results)
1714 total_requests = sum(r.requests for r in results)
1715 all_passed = all(r.passed for r in results)
1716
1717 if json_output:
1718 # machine-readable output for benchmark script
1719 output = {
1720 "passed": all_passed,
1721 "total_duration_ms": total_duration,
1722 "total_requests": total_requests,
1723 "sections": [
1724 {
1725 "name": r.name,
1726 "passed": r.passed,
1727 "duration_ms": r.duration_ms,
1728 "requests": r.requests,
1729 "error": r.error,
1730 }
1731 for r in results
1732 ],
1733 }
1734 print(json_lib.dumps(output))
1735 else:
1736 # human-readable output
1737 console.print("\n" + "=" * 60)
1738
1739 table = Table(title="test results")
1740 table.add_column("section", style="cyan")
1741 table.add_column("time", justify="right")
1742 table.add_column("reqs", justify="right")
1743 table.add_column("status", justify="center")
1744
1745 for r in results:
1746 status = "[green]✓[/green]" if r.passed else "[red]✗[/red]"
1747 table.add_row(r.name, f"{r.duration_ms:.1f}ms", str(r.requests), status)
1748
1749 table.add_row("", "", "", "", style="dim")
1750 table.add_row("[bold]total[/bold]", f"[bold]{total_duration:.1f}ms[/bold]", f"[bold]{total_requests}[/bold]", "")
1751
1752 console.print(table)
1753
1754 if all_passed:
1755 console.print("\n[bold green]all tests passed[/bold green]")
1756 else:
1757 console.print("\n[bold red]some tests failed[/bold red]")
1758
1759 sys.exit(0 if all_passed else 1)
1760
1761
1762if __name__ == "__main__":
1763 main()