prefect server in zig
1#!/usr/bin/env -S uv run --script --quiet
2# /// script
3# requires-python = ">=3.12"
4# dependencies = ["httpx", "rich"]
5# ///
6"""
7Functional test suite for prefect-server API.
8
9Tests API correctness by exercising all endpoints with expected request/response patterns.
10Includes scheduler integration tests (which have intentional delays to verify background services).
11
12For performance benchmarking, use ./scripts/benchmark instead.
13
14Usage:
15 ./scripts/test-api-sequence # human-readable output
16 ./scripts/test-api-sequence --json # machine-readable for CI
17 ./scripts/test-api-sequence --quiet # minimal output
18"""
19
20import json as json_lib
21import os
22import sys
23import time
24import uuid
25from dataclasses import dataclass, field
26from typing import Callable
27
28import httpx
29from rich.console import Console
30from rich.panel import Panel
31from rich.table import Table
32
33console = Console()
34BASE_URL = os.environ.get("PREFECT_API_URL", "http://localhost:4200/api")
35QUIET = "--json" in sys.argv or "--quiet" in sys.argv
36
37
38@dataclass
39class TestResult:
40 name: str
41 passed: bool
42 duration_ms: float
43 requests: int = 0
44 error: str | None = None
45
46
47class CountingClient(httpx.Client):
48 """HTTP client that counts requests."""
49
50 def __init__(self, *args, **kwargs):
51 super().__init__(*args, **kwargs)
52 self.request_count = 0
53
54 def request(self, *args, **kwargs):
55 self.request_count += 1
56 return super().request(*args, **kwargs)
57
58
59def validate_response(data: dict, required_fields: list[str], field_types: dict | None = None) -> bool:
60 """Validate response contains required fields with expected types."""
61 for field in required_fields:
62 if field not in data:
63 if not QUIET:
64 console.print(f"[red]VALIDATION[/red]: missing field '{field}'")
65 return False
66 if field_types:
67 for field, expected_type in field_types.items():
68 if field in data and data[field] is not None:
69 if not isinstance(data[field], expected_type):
70 if not QUIET:
71 console.print(f"[red]VALIDATION[/red]: field '{field}' expected {expected_type.__name__}, got {type(data[field]).__name__}")
72 return False
73 return True
74
75
76def run_test(name: str, test_fn: Callable[[CountingClient], bool]) -> TestResult:
77 """Run a test function with timing and request counting."""
78 if not QUIET:
79 console.print(Panel(f"testing {name}", style="blue"))
80
81 client = CountingClient(base_url=BASE_URL, timeout=10)
82 start = time.perf_counter()
83
84 try:
85 passed = test_fn(client)
86 duration_ms = (time.perf_counter() - start) * 1000
87 return TestResult(
88 name=name,
89 passed=passed,
90 duration_ms=duration_ms,
91 requests=client.request_count,
92 )
93 except Exception as e:
94 duration_ms = (time.perf_counter() - start) * 1000
95 return TestResult(
96 name=name,
97 passed=False,
98 duration_ms=duration_ms,
99 requests=client.request_count,
100 error=str(e),
101 )
102 finally:
103 client.close()
104
105
106# ---------- test functions ----------
107
108
109def test_admin(client: CountingClient) -> bool:
110 """Test admin/health endpoints."""
111 # health
112 if not QUIET:
113 console.print("[bold]GET /health[/bold]")
114 resp = client.get("/health")
115 if resp.status_code != 200:
116 if not QUIET:
117 console.print(f"[red]FAIL[/red]: {resp.status_code}")
118 return False
119 if not QUIET:
120 console.print(f" {resp.text}")
121
122 # version
123 if not QUIET:
124 console.print("[bold]GET /admin/version[/bold]")
125 resp = client.get("/admin/version")
126 if resp.status_code != 200:
127 if not QUIET:
128 console.print(f"[red]FAIL[/red]: {resp.status_code}")
129 return False
130 if not QUIET:
131 console.print(f" {resp.json()}")
132
133 # csrf-token
134 if not QUIET:
135 console.print("[bold]GET /csrf-token[/bold]")
136 resp = client.get("/csrf-token", params={"client": "test-client"})
137 if resp.status_code == 200:
138 if not QUIET:
139 console.print(f" token received")
140 elif resp.status_code == 422:
141 if not QUIET:
142 console.print(f" csrf protection disabled (ok)")
143 else:
144 if not QUIET:
145 console.print(f"[red]FAIL[/red]: {resp.status_code}")
146 return False
147
148 return True
149
150
151def test_flow_run(client: CountingClient, should_fail: bool = False) -> bool:
152 """Test flow run lifecycle."""
153 suffix = "fail" if should_fail else "success"
154 if not QUIET:
155 console.print(f"server: {BASE_URL}\n")
156
157 # create flow
158 if not QUIET:
159 console.print("[bold]1. POST /flows/[/bold]")
160 resp = client.post("/flows/", json={"name": f"bench-flow-{suffix}"})
161 if resp.status_code not in (200, 201):
162 if not QUIET:
163 console.print(f"[red]FAIL[/red]: {resp.status_code} {resp.text}")
164 return False
165 flow = resp.json()
166 if not validate_response(flow, ["id", "name", "created", "updated"], {"id": str, "name": str}):
167 return False
168 if not QUIET:
169 console.print(f" flow_id: {flow.get('id')}")
170
171 # create flow run
172 if not QUIET:
173 console.print("\n[bold]2. POST /flow_runs/[/bold]")
174 resp = client.post("/flow_runs/", json={
175 "flow_id": flow["id"],
176 "name": f"run-{uuid.uuid4().hex[:8]}",
177 "state": {"type": "PENDING", "name": "Pending"},
178 })
179 if resp.status_code not in (200, 201):
180 if not QUIET:
181 console.print(f"[red]FAIL[/red]: {resp.status_code} {resp.text}")
182 return False
183 flow_run = resp.json()
184 if not validate_response(flow_run, ["id", "name", "flow_id", "state_type"], {"id": str, "name": str}):
185 return False
186 flow_run_id = flow_run.get("id")
187 if not QUIET:
188 console.print(f" flow_run_id: {flow_run_id}")
189
190 # read flow run
191 if not QUIET:
192 console.print("\n[bold]3. GET /flow_runs/{id}[/bold]")
193 resp = client.get(f"/flow_runs/{flow_run_id}")
194 if resp.status_code != 200:
195 if not QUIET:
196 console.print(f"[red]FAIL[/red]: {resp.status_code}")
197 return False
198 if not QUIET:
199 console.print(f" state: {resp.json().get('state_type')}")
200
201 # set RUNNING
202 if not QUIET:
203 console.print("\n[bold]4. POST /flow_runs/{id}/set_state (RUNNING)[/bold]")
204 resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={
205 "state": {"type": "RUNNING", "name": "Running"},
206 "force": False,
207 })
208 if resp.status_code not in (200, 201):
209 if not QUIET:
210 console.print(f"[red]FAIL[/red]: {resp.status_code}")
211 return False
212 if not QUIET:
213 console.print(f" status: {resp.json().get('status')}")
214
215 # set final state
216 final_type = "FAILED" if should_fail else "COMPLETED"
217 final_name = "Failed" if should_fail else "Completed"
218 if not QUIET:
219 console.print(f"\n[bold]5. POST /flow_runs/{{id}}/set_state ({final_type})[/bold]")
220 resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={
221 "state": {"type": final_type, "name": final_name},
222 "force": False,
223 })
224 if resp.status_code not in (200, 201):
225 if not QUIET:
226 console.print(f"[red]FAIL[/red]: {resp.status_code}")
227 return False
228 if not QUIET:
229 console.print(f" status: {resp.json().get('status')}")
230
231 # verify
232 if not QUIET:
233 console.print("\n[bold]6. GET /flow_runs/{id} (verify)[/bold]")
234 resp = client.get(f"/flow_runs/{flow_run_id}")
235 if resp.status_code != 200:
236 return False
237 actual_type = resp.json().get("state_type")
238 if actual_type != final_type:
239 if not QUIET:
240 console.print(f"[red]FAIL[/red]: expected {final_type}, got {actual_type}")
241 return False
242 if not QUIET:
243 console.print(f" [green]state: {actual_type} (correct)[/green]")
244
245 return True
246
247
248def test_orchestration_rules(client: CountingClient) -> bool:
249 """Test orchestration rules (PreventPendingTransitions)."""
250 if not QUIET:
251 console.print(f"server: {BASE_URL}\n")
252
253 # 1. create flow
254 if not QUIET:
255 console.print("[bold]1. create flow[/bold]")
256 resp = client.post("/flows/", json={"name": f"orchestration-test-{uuid.uuid4().hex[:8]}"})
257 if resp.status_code not in (200, 201):
258 if not QUIET:
259 console.print(f"[red]FAIL[/red]: create flow {resp.status_code}")
260 return False
261 flow_id = resp.json()["id"]
262 if not QUIET:
263 console.print(f" flow_id: {flow_id}")
264
265 # 2. create flow run in PENDING state
266 if not QUIET:
267 console.print("\n[bold]2. create flow run (PENDING)[/bold]")
268 resp = client.post("/flow_runs/", json={
269 "flow_id": flow_id,
270 "name": f"orch-run-{uuid.uuid4().hex[:8]}",
271 "state": {"type": "PENDING", "name": "Pending"},
272 })
273 if resp.status_code not in (200, 201):
274 if not QUIET:
275 console.print(f"[red]FAIL[/red]: create flow run {resp.status_code}")
276 return False
277 flow_run_id = resp.json()["id"]
278 if not QUIET:
279 console.print(f" flow_run_id: {flow_run_id}")
280
281 # 3. try PENDING → PENDING (should be REJECT)
282 if not QUIET:
283 console.print("\n[bold]3. PENDING → PENDING (expect REJECT)[/bold]")
284 resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={
285 "state": {"type": "PENDING", "name": "Pending"},
286 })
287 if resp.status_code not in (200, 201):
288 if not QUIET:
289 console.print(f"[red]FAIL[/red]: unexpected status code {resp.status_code}")
290 return False
291 result = resp.json()
292 status = result.get("status")
293 if status != "REJECT":
294 if not QUIET:
295 console.print(f"[red]FAIL[/red]: expected REJECT, got {status}")
296 return False
297 if not QUIET:
298 reason = result.get("details", {}).get("reason", "")
299 console.print(f" [green]status: {status} (correct)[/green]")
300 console.print(f" reason: {reason}")
301
302 # 4. PENDING → RUNNING (should be ACCEPT)
303 if not QUIET:
304 console.print("\n[bold]4. PENDING → RUNNING (expect ACCEPT)[/bold]")
305 resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={
306 "state": {"type": "RUNNING", "name": "Running"},
307 })
308 if resp.status_code not in (200, 201):
309 if not QUIET:
310 console.print(f"[red]FAIL[/red]: {resp.status_code}")
311 return False
312 result = resp.json()
313 status = result.get("status")
314 if status != "ACCEPT":
315 if not QUIET:
316 console.print(f"[red]FAIL[/red]: expected ACCEPT, got {status}")
317 return False
318 if not QUIET:
319 console.print(f" [green]status: {status} (correct)[/green]")
320
321 # 5. try RUNNING → PENDING (should be REJECT)
322 if not QUIET:
323 console.print("\n[bold]5. RUNNING → PENDING (expect REJECT)[/bold]")
324 resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={
325 "state": {"type": "PENDING", "name": "Pending"},
326 })
327 if resp.status_code not in (200, 201):
328 if not QUIET:
329 console.print(f"[red]FAIL[/red]: unexpected status code {resp.status_code}")
330 return False
331 result = resp.json()
332 status = result.get("status")
333 if status != "REJECT":
334 if not QUIET:
335 console.print(f"[red]FAIL[/red]: expected REJECT, got {status}")
336 return False
337 if not QUIET:
338 console.print(f" [green]status: {status} (correct)[/green]")
339
340 # 6. verify run is still RUNNING (reject didn't change state)
341 if not QUIET:
342 console.print("\n[bold]6. verify state unchanged after REJECT[/bold]")
343 resp = client.get(f"/flow_runs/{flow_run_id}")
344 if resp.status_code != 200:
345 return False
346 actual_state = resp.json().get("state_type")
347 if actual_state != "RUNNING":
348 if not QUIET:
349 console.print(f"[red]FAIL[/red]: expected RUNNING, got {actual_state}")
350 return False
351 if not QUIET:
352 console.print(f" [green]state: {actual_state} (correct - unchanged)[/green]")
353
354 # 7. complete normally
355 if not QUIET:
356 console.print("\n[bold]7. RUNNING → COMPLETED (expect ACCEPT)[/bold]")
357 resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={
358 "state": {"type": "COMPLETED", "name": "Completed"},
359 })
360 if resp.status_code not in (200, 201):
361 return False
362 status = resp.json().get("status")
363 if status != "ACCEPT":
364 if not QUIET:
365 console.print(f"[red]FAIL[/red]: expected ACCEPT, got {status}")
366 return False
367 if not QUIET:
368 console.print(f" [green]status: {status} (correct)[/green]")
369
370 # =========================================================================
371 # CopyScheduledTime rule tests
372 # =========================================================================
373
374 # 8. create a SCHEDULED flow run with next_scheduled_start_time
375 scheduled_time = "2025-06-15T10:00:00Z"
376 if not QUIET:
377 console.print(f"\n[bold]8. create SCHEDULED run (next_scheduled_start_time={scheduled_time})[/bold]")
378 resp = client.post("/flow_runs/", json={
379 "flow_id": flow_id,
380 "name": f"scheduled-run-{uuid.uuid4().hex[:8]}",
381 "state": {"type": "SCHEDULED", "name": "Scheduled"},
382 "next_scheduled_start_time": scheduled_time,
383 })
384 if resp.status_code not in (200, 201):
385 if not QUIET:
386 console.print(f"[red]FAIL[/red]: create scheduled run {resp.status_code}")
387 return False
388 scheduled_run_id = resp.json()["id"]
389 if not QUIET:
390 console.print(f" scheduled_run_id: {scheduled_run_id}")
391
392 # 9. transition SCHEDULED → PENDING (CopyScheduledTime should copy scheduled_time)
393 if not QUIET:
394 console.print("\n[bold]9. SCHEDULED → PENDING (expect scheduled_time copied)[/bold]")
395 resp = client.post(f"/flow_runs/{scheduled_run_id}/set_state", json={
396 "state": {"type": "PENDING", "name": "Pending"},
397 })
398 if resp.status_code not in (200, 201):
399 if not QUIET:
400 console.print(f"[red]FAIL[/red]: set_state {resp.status_code}")
401 return False
402 status = resp.json().get("status")
403 if status != "ACCEPT":
404 if not QUIET:
405 console.print(f"[red]FAIL[/red]: expected ACCEPT, got {status}")
406 return False
407 if not QUIET:
408 console.print(f" [green]status: {status} (correct)[/green]")
409
410 # 10. verify expected_start_time was set from next_scheduled_start_time
411 if not QUIET:
412 console.print("\n[bold]10. verify expected_start_time copied[/bold]")
413 resp = client.get(f"/flow_runs/{scheduled_run_id}")
414 if resp.status_code != 200:
415 return False
416 run_data = resp.json()
417 expected_start = run_data.get("expected_start_time")
418 if expected_start != scheduled_time:
419 if not QUIET:
420 console.print(f"[red]FAIL[/red]: expected_start_time={expected_start}, expected {scheduled_time}")
421 return False
422 if not QUIET:
423 console.print(f" [green]expected_start_time: {expected_start} (correct)[/green]")
424
425 return True
426
427
428def test_task_run(client: CountingClient) -> bool:
429 """Test task run lifecycle."""
430 # create
431 if not QUIET:
432 console.print("[bold]POST /task_runs/[/bold]")
433 resp = client.post("/task_runs/", json={
434 "task_key": "bench-task",
435 "dynamic_key": f"key-{uuid.uuid4().hex[:8]}",
436 "name": f"task-{uuid.uuid4().hex[:8]}",
437 "state": {"type": "PENDING", "name": "Pending"},
438 })
439 if resp.status_code not in (200, 201):
440 if not QUIET:
441 console.print(f"[red]FAIL[/red]: {resp.status_code}")
442 return False
443 task_run_id = resp.json().get("id")
444 if not QUIET:
445 console.print(f" task_run_id: {task_run_id}")
446
447 # read
448 resp = client.get(f"/task_runs/{task_run_id}")
449 if resp.status_code != 200:
450 return False
451
452 # RUNNING
453 resp = client.post(f"/task_runs/{task_run_id}/set_state", json={
454 "state": {"type": "RUNNING", "name": "Running"},
455 "force": False,
456 })
457 if resp.status_code not in (200, 201):
458 return False
459 if not QUIET:
460 console.print(f" -> RUNNING: {resp.json().get('status')}")
461
462 # COMPLETED
463 resp = client.post(f"/task_runs/{task_run_id}/set_state", json={
464 "state": {"type": "COMPLETED", "name": "Completed"},
465 "force": False,
466 })
467 if resp.status_code not in (200, 201):
468 return False
469 if not QUIET:
470 console.print(f" -> COMPLETED: {resp.json().get('status')}")
471
472 return True
473
474
475def test_filters(client: CountingClient) -> bool:
476 """Test filter endpoints."""
477 for endpoint, label in [
478 ("/flows/filter", "flows"),
479 ("/flow_runs/filter", "flow_runs"),
480 ("/task_runs/filter", "task_runs"),
481 ]:
482 resp = client.post(endpoint, json={})
483 if resp.status_code != 200:
484 if not QUIET:
485 console.print(f"[red]FAIL[/red]: {endpoint} {resp.status_code}")
486 return False
487 if not QUIET:
488 console.print(f" {label}: {len(resp.json())} items")
489
490 return True
491
492
493def test_logs(client: CountingClient) -> bool:
494 """Test logs endpoint."""
495 from datetime import datetime, timezone
496
497 logs = [
498 {"level": 20, "message": "test log 1", "name": "test", "timestamp": datetime.now(timezone.utc).isoformat()},
499 {"level": 30, "message": "test log 2", "name": "test", "timestamp": datetime.now(timezone.utc).isoformat()},
500 ]
501 resp = client.post("/logs/", json=logs)
502 if resp.status_code not in (200, 201, 204):
503 if not QUIET:
504 console.print(f"[red]FAIL[/red]: {resp.status_code}")
505 return False
506 if not QUIET:
507 console.print(f" {len(logs)} logs sent")
508 return True
509
510
511def test_variables(client: CountingClient) -> bool:
512 """Test variables API (CRUD)."""
513 var_name = f"bench-var-{uuid.uuid4().hex[:8]}"
514
515 # create
516 if not QUIET:
517 console.print("[bold]POST /variables/[/bold]")
518 resp = client.post("/variables/", json={
519 "name": var_name,
520 "value": {"nested": "object", "count": 42},
521 "tags": ["benchmark", "test"],
522 })
523 if resp.status_code != 201:
524 if not QUIET:
525 console.print(f"[red]FAIL[/red]: create {resp.status_code}")
526 return False
527 variable = resp.json()
528 if not validate_response(variable, ["id", "name", "value", "tags", "created", "updated"], {"id": str, "name": str, "tags": list}):
529 return False
530 var_id = variable.get("id")
531 if not QUIET:
532 console.print(f" created: {var_id}")
533
534 # get by name
535 resp = client.get(f"/variables/name/{var_name}")
536 if resp.status_code != 200:
537 return False
538 if not QUIET:
539 console.print(f" get by name: ok")
540
541 # get by id
542 resp = client.get(f"/variables/{var_id}")
543 if resp.status_code != 200:
544 return False
545
546 # update by name
547 resp = client.patch(f"/variables/name/{var_name}", json={"value": "updated"})
548 if resp.status_code != 204:
549 return False
550 if not QUIET:
551 console.print(f" updated by name")
552
553 # filter
554 resp = client.post("/variables/filter", json={"limit": 10})
555 if resp.status_code != 200:
556 return False
557 if not QUIET:
558 console.print(f" filter: {len(resp.json())} items")
559
560 # count
561 resp = client.post("/variables/count", json={})
562 if resp.status_code != 200:
563 return False
564 if not QUIET:
565 console.print(f" count: {resp.text}")
566
567 # duplicate name should fail
568 resp = client.post("/variables/", json={"name": var_name, "value": "dupe"})
569 if resp.status_code != 409:
570 if not QUIET:
571 console.print(f"[red]FAIL[/red]: duplicate should return 409, got {resp.status_code}")
572 return False
573 if not QUIET:
574 console.print(f" duplicate rejected: 409")
575
576 # delete
577 resp = client.delete(f"/variables/name/{var_name}")
578 if resp.status_code != 204:
579 return False
580 if not QUIET:
581 console.print(f" deleted")
582
583 return True
584
585
586def test_blocks(client: CountingClient) -> bool:
587 """Test blocks API (types, schemas, documents)."""
588 slug = f"bench-block-{uuid.uuid4().hex[:8]}"
589
590 # create block type
591 if not QUIET:
592 console.print("[bold]block_types[/bold]")
593 resp = client.post("/block_types/", json={
594 "name": f"Bench Block {slug}",
595 "slug": slug,
596 "description": "benchmark block type",
597 })
598 if resp.status_code not in (200, 201):
599 if not QUIET:
600 console.print(f"[red]FAIL[/red]: create block_type {resp.status_code}")
601 return False
602 block_type = resp.json()
603 if not validate_response(block_type, ["id", "name", "slug"], {"id": str, "name": str, "slug": str}):
604 return False
605 block_type_id = block_type.get("id")
606 if not QUIET:
607 console.print(f" created: {block_type_id}")
608
609 # get by slug
610 resp = client.get(f"/block_types/slug/{slug}")
611 if resp.status_code != 200:
612 return False
613
614 # create schema
615 if not QUIET:
616 console.print("[bold]block_schemas[/bold]")
617 resp = client.post("/block_schemas/", json={
618 "block_type_id": block_type_id,
619 "fields": {"properties": {"value": {"type": "string"}}},
620 "capabilities": ["test"],
621 "version": "1.0.0",
622 })
623 if resp.status_code not in (200, 201):
624 if not QUIET:
625 console.print(f"[red]FAIL[/red]: create block_schema {resp.status_code}")
626 return False
627 block_schema = resp.json()
628 block_schema_id = block_schema.get("id")
629 checksum = block_schema.get("checksum")
630 if not QUIET:
631 console.print(f" created: {block_schema_id}")
632
633 # get by checksum
634 resp = client.get(f"/block_schemas/checksum/{checksum}")
635 if resp.status_code != 200:
636 return False
637
638 # create document
639 if not QUIET:
640 console.print("[bold]block_documents[/bold]")
641 doc_name = f"bench-doc-{uuid.uuid4().hex[:8]}"
642 resp = client.post("/block_documents/", json={
643 "name": doc_name,
644 "block_type_id": block_type_id,
645 "block_schema_id": block_schema_id,
646 "data": {"value": "secret-value"},
647 })
648 if resp.status_code not in (200, 201):
649 if not QUIET:
650 console.print(f"[red]FAIL[/red]: create block_document {resp.status_code}")
651 return False
652 block_doc = resp.json()
653 block_doc_id = block_doc.get("id")
654 if not QUIET:
655 console.print(f" created: {block_doc_id}")
656
657 # get by id
658 resp = client.get(f"/block_documents/{block_doc_id}")
659 if resp.status_code != 200:
660 return False
661
662 # get by slug/name
663 resp = client.get(f"/block_types/slug/{slug}/block_documents/name/{doc_name}")
664 if resp.status_code != 200:
665 return False
666
667 # update
668 resp = client.patch(f"/block_documents/{block_doc_id}", json={"data": {"value": "updated"}})
669 if resp.status_code != 204:
670 return False
671 if not QUIET:
672 console.print(f" updated")
673
674 # filters
675 for endpoint in ["/block_types/filter", "/block_schemas/filter", "/block_documents/filter"]:
676 resp = client.post(endpoint, json={})
677 if resp.status_code != 200:
678 return False
679
680 # delete
681 resp = client.delete(f"/block_documents/{block_doc_id}")
682 if resp.status_code != 204:
683 return False
684 if not QUIET:
685 console.print(f" deleted")
686
687 return True
688
689
690def test_work_pools(client: CountingClient) -> bool:
691 """Test work pools API (pools, queues, workers)."""
692 pool_name = f"test-pool-{uuid.uuid4().hex[:8]}"
693
694 # create work pool
695 if not QUIET:
696 console.print("[bold]work_pools[/bold]")
697 resp = client.post("/work_pools/", json={
698 "name": pool_name,
699 "type": "process",
700 "description": "test work pool",
701 })
702 if resp.status_code not in (200, 201):
703 if not QUIET:
704 console.print(f"[red]FAIL[/red]: create work_pool {resp.status_code} {resp.text}")
705 return False
706 pool = resp.json()
707 if not validate_response(pool, ["id", "name", "type", "status", "default_queue_id"], {"id": str, "name": str}):
708 return False
709 if not QUIET:
710 console.print(f" created: {pool.get('id')}")
711
712 # check default queue was created
713 if not pool.get("default_queue_id"):
714 if not QUIET:
715 console.print("[red]FAIL[/red]: no default_queue_id")
716 return False
717
718 # get by name
719 resp = client.get(f"/work_pools/{pool_name}")
720 if resp.status_code != 200:
721 if not QUIET:
722 console.print(f"[red]FAIL[/red]: get work_pool {resp.status_code}")
723 return False
724
725 # update
726 resp = client.patch(f"/work_pools/{pool_name}", json={"description": "updated"})
727 if resp.status_code != 204:
728 if not QUIET:
729 console.print(f"[red]FAIL[/red]: update work_pool {resp.status_code}")
730 return False
731 if not QUIET:
732 console.print(" updated")
733
734 # filter
735 resp = client.post("/work_pools/filter", json={})
736 if resp.status_code != 200:
737 if not QUIET:
738 console.print(f"[red]FAIL[/red]: filter work_pools {resp.status_code}")
739 return False
740 pools = resp.json()
741 if not isinstance(pools, list):
742 return False
743
744 # create queue
745 if not QUIET:
746 console.print("[bold]work_queues[/bold]")
747 queue_name = f"test-queue-{uuid.uuid4().hex[:8]}"
748 resp = client.post(f"/work_pools/{pool_name}/queues/", json={
749 "name": queue_name,
750 "description": "test queue",
751 "priority": 5,
752 })
753 if resp.status_code not in (200, 201):
754 if not QUIET:
755 console.print(f"[red]FAIL[/red]: create work_queue {resp.status_code} {resp.text}")
756 return False
757 queue = resp.json()
758 if not validate_response(queue, ["id", "name", "priority", "work_pool_id"], {"id": str, "priority": int}):
759 return False
760 if not QUIET:
761 console.print(f" created: {queue.get('id')}")
762
763 # get queue
764 resp = client.get(f"/work_pools/{pool_name}/queues/{queue_name}")
765 if resp.status_code != 200:
766 if not QUIET:
767 console.print(f"[red]FAIL[/red]: get work_queue {resp.status_code}")
768 return False
769
770 # filter queues
771 resp = client.post(f"/work_pools/{pool_name}/queues/filter", json={})
772 if resp.status_code != 200:
773 if not QUIET:
774 console.print(f"[red]FAIL[/red]: filter work_queues {resp.status_code}")
775 return False
776 queues = resp.json()
777 if not isinstance(queues, list) or len(queues) < 2: # default + our queue
778 if not QUIET:
779 console.print(f"[red]FAIL[/red]: expected at least 2 queues, got {len(queues) if isinstance(queues, list) else 'not a list'}")
780 return False
781
782 # worker heartbeat
783 if not QUIET:
784 console.print("[bold]workers[/bold]")
785 resp = client.post(f"/work_pools/{pool_name}/workers/heartbeat", json={
786 "name": "test-worker-1",
787 "heartbeat_interval_seconds": 30,
788 })
789 if resp.status_code != 204:
790 if not QUIET:
791 console.print(f"[red]FAIL[/red]: worker heartbeat {resp.status_code} {resp.text}")
792 return False
793 if not QUIET:
794 console.print(" heartbeat sent")
795
796 # check pool status is now READY
797 resp = client.get(f"/work_pools/{pool_name}")
798 if resp.status_code != 200:
799 return False
800 pool = resp.json()
801 if pool.get("status") != "READY":
802 if not QUIET:
803 console.print(f"[red]FAIL[/red]: expected pool status READY, got {pool.get('status')}")
804 return False
805 if not QUIET:
806 console.print(" pool status: READY")
807
808 # filter workers
809 resp = client.post(f"/work_pools/{pool_name}/workers/filter", json={})
810 if resp.status_code != 200:
811 if not QUIET:
812 console.print(f"[red]FAIL[/red]: filter workers {resp.status_code}")
813 return False
814 workers = resp.json()
815 if not isinstance(workers, list) or len(workers) < 1:
816 return False
817
818 # delete queue (not default)
819 resp = client.delete(f"/work_pools/{pool_name}/queues/{queue_name}")
820 if resp.status_code != 204:
821 if not QUIET:
822 console.print(f"[red]FAIL[/red]: delete work_queue {resp.status_code}")
823 return False
824 if not QUIET:
825 console.print(" deleted queue")
826
827 # delete pool
828 resp = client.delete(f"/work_pools/{pool_name}")
829 if resp.status_code != 204:
830 if not QUIET:
831 console.print(f"[red]FAIL[/red]: delete work_pool {resp.status_code}")
832 return False
833 if not QUIET:
834 console.print(" deleted pool")
835
836 return True
837
838
839def test_deployments(client: CountingClient) -> bool:
840 """Test deployments API (deployments, schedules, create_flow_run)."""
841 # create a flow first
842 if not QUIET:
843 console.print("[bold]setup: create flow[/bold]")
844 resp = client.post("/flows/", json={"name": f"deploy-flow-{uuid.uuid4().hex[:8]}"})
845 if resp.status_code not in (200, 201):
846 if not QUIET:
847 console.print(f"[red]FAIL[/red]: create flow {resp.status_code}")
848 return False
849 flow = resp.json()
850 flow_id = flow.get("id")
851 flow_name = flow.get("name")
852
853 # create deployment
854 deployment_name = f"test-deployment-{uuid.uuid4().hex[:8]}"
855 if not QUIET:
856 console.print("[bold]deployments[/bold]")
857 resp = client.post("/deployments/", json={
858 "name": deployment_name,
859 "flow_id": flow_id,
860 "description": "test deployment",
861 "tags": ["test", "benchmark"],
862 "parameters": {"key": "value"},
863 "schedules": [
864 {"schedule": {"interval": 3600}, "active": True},
865 ],
866 })
867 if resp.status_code not in (200, 201):
868 if not QUIET:
869 console.print(f"[red]FAIL[/red]: create deployment {resp.status_code} {resp.text}")
870 return False
871 deployment = resp.json()
872 if not validate_response(deployment, ["id", "name", "flow_id", "status", "schedules"], {"id": str, "name": str}):
873 return False
874 deployment_id = deployment.get("id")
875 if not QUIET:
876 console.print(f" created: {deployment_id}")
877
878 # verify schedules were created
879 schedules = deployment.get("schedules", [])
880 if not isinstance(schedules, list) or len(schedules) != 1:
881 if not QUIET:
882 console.print(f"[red]FAIL[/red]: expected 1 schedule, got {len(schedules) if isinstance(schedules, list) else 'not a list'}")
883 return False
884 if not QUIET:
885 console.print(f" schedules: {len(schedules)}")
886
887 # get by id
888 resp = client.get(f"/deployments/{deployment_id}")
889 if resp.status_code != 200:
890 if not QUIET:
891 console.print(f"[red]FAIL[/red]: get deployment {resp.status_code}")
892 return False
893
894 # get by name
895 resp = client.get(f"/deployments/name/{flow_name}/{deployment_name}")
896 if resp.status_code != 200:
897 if not QUIET:
898 console.print(f"[red]FAIL[/red]: get deployment by name {resp.status_code}")
899 return False
900 if not QUIET:
901 console.print(" get by name: ok")
902
903 # update
904 resp = client.patch(f"/deployments/{deployment_id}", json={"description": "updated"})
905 if resp.status_code != 204:
906 if not QUIET:
907 console.print(f"[red]FAIL[/red]: update deployment {resp.status_code}")
908 return False
909 if not QUIET:
910 console.print(" updated")
911
912 # filter
913 resp = client.post("/deployments/filter", json={"limit": 10})
914 if resp.status_code != 200:
915 if not QUIET:
916 console.print(f"[red]FAIL[/red]: filter deployments {resp.status_code}")
917 return False
918 if not QUIET:
919 console.print(f" filter: {len(resp.json())} items")
920
921 # count
922 resp = client.post("/deployments/count", json={})
923 if resp.status_code != 200:
924 if not QUIET:
925 console.print(f"[red]FAIL[/red]: count deployments {resp.status_code}")
926 return False
927 if not QUIET:
928 console.print(f" count: {resp.text}")
929
930 # pause
931 resp = client.post(f"/deployments/{deployment_id}/pause_deployment", json={})
932 if resp.status_code != 204:
933 if not QUIET:
934 console.print(f"[red]FAIL[/red]: pause deployment {resp.status_code}")
935 return False
936 if not QUIET:
937 console.print(" paused")
938
939 # resume
940 resp = client.post(f"/deployments/{deployment_id}/resume_deployment", json={})
941 if resp.status_code != 204:
942 if not QUIET:
943 console.print(f"[red]FAIL[/red]: resume deployment {resp.status_code}")
944 return False
945 if not QUIET:
946 console.print(" resumed")
947
948 # create flow run from deployment
949 if not QUIET:
950 console.print("[bold]create_flow_run[/bold]")
951 resp = client.post(f"/deployments/{deployment_id}/create_flow_run", json={})
952 if resp.status_code not in (200, 201):
953 if not QUIET:
954 console.print(f"[red]FAIL[/red]: create flow run {resp.status_code} {resp.text}")
955 return False
956 flow_run = resp.json()
957 if not validate_response(flow_run, ["id", "flow_id", "deployment_id"], {"id": str}):
958 return False
959 if flow_run.get("deployment_id") != deployment_id:
960 if not QUIET:
961 console.print(f"[red]FAIL[/red]: deployment_id mismatch")
962 return False
963 if not QUIET:
964 console.print(f" created flow run: {flow_run.get('id')}")
965
966 # schedules - list
967 if not QUIET:
968 console.print("[bold]deployment_schedules[/bold]")
969 resp = client.get(f"/deployments/{deployment_id}/schedules")
970 if resp.status_code != 200:
971 if not QUIET:
972 console.print(f"[red]FAIL[/red]: list schedules {resp.status_code}")
973 return False
974 schedules = resp.json()
975 schedule_id = schedules[0].get("id") if schedules else None
976 if not QUIET:
977 console.print(f" list: {len(schedules)} schedules")
978
979 # schedules - create
980 resp = client.post(f"/deployments/{deployment_id}/schedules", json={
981 "schedule": {"cron": "0 0 * * *"},
982 "active": False,
983 })
984 if resp.status_code not in (200, 201):
985 if not QUIET:
986 console.print(f"[red]FAIL[/red]: create schedule {resp.status_code}")
987 return False
988 created_schedules = resp.json()
989 if not isinstance(created_schedules, list) or len(created_schedules) != 1:
990 if not QUIET:
991 console.print(f"[red]FAIL[/red]: expected 1 created schedule")
992 return False
993 new_schedule_id = created_schedules[0].get("id")
994 if not QUIET:
995 console.print(f" created schedule: {new_schedule_id}")
996
997 # schedules - update
998 resp = client.patch(f"/deployments/{deployment_id}/schedules/{new_schedule_id}", json={"active": True})
999 if resp.status_code != 204:
1000 if not QUIET:
1001 console.print(f"[red]FAIL[/red]: update schedule {resp.status_code}")
1002 return False
1003 if not QUIET:
1004 console.print(" updated schedule")
1005
1006 # schedules - delete
1007 resp = client.delete(f"/deployments/{deployment_id}/schedules/{new_schedule_id}")
1008 if resp.status_code != 204:
1009 if not QUIET:
1010 console.print(f"[red]FAIL[/red]: delete schedule {resp.status_code}")
1011 return False
1012 if not QUIET:
1013 console.print(" deleted schedule")
1014
1015 # delete deployment
1016 resp = client.delete(f"/deployments/{deployment_id}")
1017 if resp.status_code != 204:
1018 if not QUIET:
1019 console.print(f"[red]FAIL[/red]: delete deployment {resp.status_code}")
1020 return False
1021 if not QUIET:
1022 console.print(" deleted deployment")
1023
1024 return True
1025
1026
1027def test_scheduler_idempotency(client: CountingClient) -> bool:
1028 """Test that scheduler is idempotent - running twice doesn't create duplicates."""
1029 import time as time_mod
1030
1031 def fail(msg: str) -> bool:
1032 if not QUIET: console.print(f"[red]FAIL[/red]: {msg}")
1033 return False
1034
1035 def log(msg: str) -> None:
1036 if not QUIET: console.print(msg)
1037
1038 # setup: create flow, work pool, deployment with interval schedule
1039 log("[bold]setup[/bold]")
1040 resp = client.post("/flows/", json={"name": f"idem-flow-{uuid.uuid4().hex[:8]}"})
1041 if resp.status_code not in (200, 201): return fail(f"create flow {resp.status_code}")
1042 flow_id = resp.json().get("id")
1043
1044 pool_name = f"idem-pool-{uuid.uuid4().hex[:8]}"
1045 resp = client.post("/work_pools/", json={"name": pool_name, "type": "process"})
1046 if resp.status_code not in (200, 201): return fail(f"create work_pool {resp.status_code}")
1047 log(f" pool: {pool_name}")
1048
1049 # create deployment with interval schedule (every hour)
1050 resp = client.post("/deployments/", json={
1051 "name": f"idem-deploy-{uuid.uuid4().hex[:8]}",
1052 "flow_id": flow_id,
1053 "work_pool_name": pool_name,
1054 "schedules": [{"schedule": {"interval": 3600}, "active": True}], # every hour
1055 })
1056 if resp.status_code not in (200, 201): return fail(f"create deployment {resp.status_code}")
1057 deployment = resp.json()
1058 deployment_id = deployment.get("id")
1059 log(f" deployment: {deployment_id}")
1060
1061 # wait for scheduler to run once (default 5s interval)
1062 log("[bold]waiting for scheduler (7s)...[/bold]")
1063 time_mod.sleep(7)
1064
1065 # count runs after first scheduler tick
1066 resp = client.post("/flow_runs/filter", json={
1067 "flow_runs": {"deployment_id": {"any_": [deployment_id]}},
1068 "limit": 100,
1069 })
1070 if resp.status_code != 200: return fail(f"filter flow_runs {resp.status_code}")
1071 runs_after_first = resp.json()
1072 count_after_first = len(runs_after_first)
1073 log(f" runs after first tick: {count_after_first}")
1074
1075 if count_after_first == 0:
1076 return fail("scheduler did not create any runs")
1077
1078 # wait for scheduler to run again
1079 log("[bold]waiting for second scheduler tick (7s)...[/bold]")
1080 time_mod.sleep(7)
1081
1082 # count runs after second scheduler tick
1083 resp = client.post("/flow_runs/filter", json={
1084 "flow_runs": {"deployment_id": {"any_": [deployment_id]}},
1085 "limit": 100,
1086 })
1087 if resp.status_code != 200: return fail(f"filter flow_runs {resp.status_code}")
1088 runs_after_second = resp.json()
1089 count_after_second = len(runs_after_second)
1090 log(f" runs after second tick: {count_after_second}")
1091
1092 # key test: same number of runs means idempotency works
1093 # (scheduler shouldn't create duplicates for same scheduled times)
1094 if count_after_second != count_after_first:
1095 return fail(f"idempotency failed: {count_after_first} -> {count_after_second} runs")
1096 log(f" [green]idempotency verified: count unchanged[/green]")
1097
1098 # cleanup
1099 client.delete(f"/deployments/{deployment_id}")
1100 client.delete(f"/work_pools/{pool_name}")
1101 log(" cleanup: ok")
1102
1103 return True
1104
1105
1106def test_parameter_merging(client: CountingClient) -> bool:
1107 """Test that schedule parameters override deployment parameters."""
1108 import time as time_mod
1109
1110 def fail(msg: str) -> bool:
1111 if not QUIET: console.print(f"[red]FAIL[/red]: {msg}")
1112 return False
1113
1114 def log(msg: str) -> None:
1115 if not QUIET: console.print(msg)
1116
1117 # setup
1118 log("[bold]setup[/bold]")
1119 resp = client.post("/flows/", json={"name": f"params-flow-{uuid.uuid4().hex[:8]}"})
1120 if resp.status_code not in (200, 201): return fail(f"create flow {resp.status_code}")
1121 flow_id = resp.json().get("id")
1122
1123 pool_name = f"params-pool-{uuid.uuid4().hex[:8]}"
1124 resp = client.post("/work_pools/", json={"name": pool_name, "type": "process"})
1125 if resp.status_code not in (200, 201): return fail(f"create work_pool {resp.status_code}")
1126 log(f" pool: {pool_name}")
1127
1128 # create deployment with base parameters
1129 # schedule has override parameter
1130 resp = client.post("/deployments/", json={
1131 "name": f"params-deploy-{uuid.uuid4().hex[:8]}",
1132 "flow_id": flow_id,
1133 "work_pool_name": pool_name,
1134 "parameters": {"base_key": "base_value", "override_key": "deployment_value"},
1135 "schedules": [{
1136 "schedule": {"interval": 3600},
1137 "active": True,
1138 "parameters": {"override_key": "schedule_value", "schedule_key": "schedule_only"},
1139 }],
1140 })
1141 if resp.status_code not in (200, 201): return fail(f"create deployment {resp.status_code}")
1142 deployment = resp.json()
1143 deployment_id = deployment.get("id")
1144 log(f" deployment: {deployment_id}")
1145 log(f" deployment params: {deployment.get('parameters')}")
1146
1147 # wait for scheduler to create runs
1148 log("[bold]waiting for scheduler (7s)...[/bold]")
1149 time_mod.sleep(7)
1150
1151 # get the scheduled runs and check their parameters
1152 resp = client.post("/flow_runs/filter", json={
1153 "flow_runs": {"deployment_id": {"any_": [deployment_id]}},
1154 "limit": 10,
1155 })
1156 if resp.status_code != 200: return fail(f"filter flow_runs {resp.status_code}")
1157 runs = resp.json()
1158 log(f" found {len(runs)} runs")
1159
1160 if len(runs) == 0:
1161 return fail("scheduler did not create any runs")
1162
1163 # check merged parameters on first run
1164 run_params = runs[0].get("parameters", {})
1165 if isinstance(run_params, str):
1166 import json as json_mod
1167 run_params = json_mod.loads(run_params)
1168 log(f" run params: {run_params}")
1169
1170 # verify merging:
1171 # - base_key should be from deployment
1172 # - override_key should be from schedule (override)
1173 # - schedule_key should be from schedule (new key)
1174 if run_params.get("base_key") != "base_value":
1175 return fail(f"base_key not preserved: {run_params.get('base_key')}")
1176 if run_params.get("override_key") != "schedule_value":
1177 return fail(f"override_key not overridden: {run_params.get('override_key')}")
1178 if run_params.get("schedule_key") != "schedule_only":
1179 return fail(f"schedule_key not added: {run_params.get('schedule_key')}")
1180
1181 log(" [green]parameter merging verified[/green]")
1182
1183 # cleanup
1184 client.delete(f"/deployments/{deployment_id}")
1185 client.delete(f"/work_pools/{pool_name}")
1186 log(" cleanup: ok")
1187
1188 return True
1189
1190
1191def test_get_scheduled_flow_runs(client: CountingClient) -> bool:
1192 """Test get_scheduled_flow_runs endpoint (worker polling)."""
1193 from datetime import datetime, timezone
1194
1195 def fail(msg: str) -> bool:
1196 if not QUIET: console.print(f"[red]FAIL[/red]: {msg}")
1197 return False
1198
1199 def log(msg: str) -> None:
1200 if not QUIET: console.print(msg)
1201
1202 # setup
1203 log("[bold]setup: create flow[/bold]")
1204 resp = client.post("/flows/", json={"name": f"schedule-flow-{uuid.uuid4().hex[:8]}"})
1205 if resp.status_code not in (200, 201): return fail(f"create flow {resp.status_code}")
1206 flow_id = resp.json().get("id")
1207
1208 pool_name = f"schedule-pool-{uuid.uuid4().hex[:8]}"
1209 resp = client.post("/work_pools/", json={"name": pool_name, "type": "process"})
1210 if resp.status_code not in (200, 201): return fail(f"create work_pool {resp.status_code}")
1211 pool = resp.json()
1212 pool_id, default_queue_id = pool.get("id"), pool.get("default_queue_id")
1213 log(f" pool: {pool_id}")
1214 if pool.get("status") != "NOT_READY": return fail(f"expected pool NOT_READY, got {pool.get('status')}")
1215
1216 resp = client.post("/deployments/", json={"name": f"schedule-deploy-{uuid.uuid4().hex[:8]}", "flow_id": flow_id, "work_pool_name": pool_name})
1217 if resp.status_code not in (200, 201): return fail(f"create deployment {resp.status_code}")
1218 deployment = resp.json()
1219 deployment_id = deployment.get("id")
1220 log(f" deployment: {deployment_id}")
1221 if deployment.get("status") != "NOT_READY": return fail(f"expected deployment NOT_READY, got {deployment.get('status')}")
1222
1223 # create scheduled flow run
1224 log("[bold]create scheduled flow run[/bold]")
1225 resp = client.post(f"/deployments/{deployment_id}/create_flow_run", json={"state": {"type": "SCHEDULED", "name": "Scheduled"}})
1226 if resp.status_code not in (200, 201): return fail(f"create flow run {resp.status_code}")
1227 flow_run = resp.json()
1228 flow_run_id = flow_run.get("id")
1229 log(f" flow_run: {flow_run_id}")
1230 log(f" state: {flow_run.get('state_type')}")
1231 if flow_run.get("state_type") != "SCHEDULED": return fail(f"expected SCHEDULED, got {flow_run.get('state_type')}")
1232
1233 # test polling
1234 log("[bold]get_scheduled_flow_runs[/bold]")
1235 resp = client.post(f"/work_pools/{pool_name}/get_scheduled_flow_runs", json={})
1236 if resp.status_code != 200: return fail(f"get_scheduled_flow_runs {resp.status_code}")
1237 scheduled_runs = resp.json()
1238 if not isinstance(scheduled_runs, list): return fail(f"expected list, got {type(scheduled_runs)}")
1239 log(f" returned {len(scheduled_runs)} runs")
1240
1241 # verify our run is in results
1242 found = any(item.get("flow_run", {}).get("id") == flow_run_id and
1243 item.get("work_pool_id") == pool_id and item.get("work_queue_id") == default_queue_id
1244 for item in scheduled_runs)
1245 if not found: return fail("scheduled flow run not found in results")
1246 log(" flow run found in results")
1247
1248 # verify status changes
1249 resp = client.get(f"/work_pools/{pool_name}")
1250 if resp.status_code != 200 or resp.json().get("status") != "READY":
1251 return fail(f"expected pool READY after polling")
1252 log(" pool status: READY")
1253
1254 resp = client.get(f"/deployments/{deployment_id}")
1255 if resp.status_code != 200 or resp.json().get("status") != "READY":
1256 return fail(f"expected deployment READY after polling")
1257 log(" deployment status: READY")
1258
1259 # test filters
1260 resp = client.post(f"/work_pools/{pool_name}/get_scheduled_flow_runs", json={"work_queue_names": ["default"]})
1261 if resp.status_code != 200: return fail(f"filter test {resp.status_code}")
1262 log(" filtered by queue: ok")
1263
1264 resp = client.post(f"/work_pools/{pool_name}/get_scheduled_flow_runs", json={"scheduled_before": datetime.now(timezone.utc).isoformat()})
1265 if resp.status_code != 200: return fail(f"scheduled_before test {resp.status_code}")
1266 log(f" scheduled_before filter: {len(resp.json())} runs")
1267
1268 # cleanup
1269 client.delete(f"/deployments/{deployment_id}")
1270 client.delete(f"/work_pools/{pool_name}")
1271 log(" cleanup: ok")
1272 return True
1273
1274
1275def main():
1276 json_output = "--json" in sys.argv
1277
1278 if not QUIET:
1279 console.print("\n[bold cyan]prefect-server API test[/bold cyan]\n")
1280
1281 results: list[TestResult] = []
1282
1283 # run all tests
1284 results.append(run_test("admin", test_admin))
1285 results.append(run_test("flow_run (success)", lambda c: test_flow_run(c, should_fail=False)))
1286 results.append(run_test("flow_run (failure)", lambda c: test_flow_run(c, should_fail=True)))
1287 results.append(run_test("orchestration_rules", test_orchestration_rules))
1288 results.append(run_test("task_run", test_task_run))
1289 results.append(run_test("filters", test_filters))
1290 results.append(run_test("logs", test_logs))
1291 results.append(run_test("variables", test_variables))
1292 results.append(run_test("blocks", test_blocks))
1293 results.append(run_test("work_pools", test_work_pools))
1294 results.append(run_test("deployments", test_deployments))
1295 results.append(run_test("get_scheduled_flow_runs", test_get_scheduled_flow_runs))
1296 results.append(run_test("scheduler_idempotency", test_scheduler_idempotency))
1297 results.append(run_test("parameter_merging", test_parameter_merging))
1298
1299 total_duration = sum(r.duration_ms for r in results)
1300 total_requests = sum(r.requests for r in results)
1301 all_passed = all(r.passed for r in results)
1302
1303 if json_output:
1304 # machine-readable output for benchmark script
1305 output = {
1306 "passed": all_passed,
1307 "total_duration_ms": total_duration,
1308 "total_requests": total_requests,
1309 "sections": [
1310 {
1311 "name": r.name,
1312 "passed": r.passed,
1313 "duration_ms": r.duration_ms,
1314 "requests": r.requests,
1315 "error": r.error,
1316 }
1317 for r in results
1318 ],
1319 }
1320 print(json_lib.dumps(output))
1321 else:
1322 # human-readable output
1323 console.print("\n" + "=" * 60)
1324
1325 table = Table(title="test results")
1326 table.add_column("section", style="cyan")
1327 table.add_column("time", justify="right")
1328 table.add_column("reqs", justify="right")
1329 table.add_column("status", justify="center")
1330
1331 for r in results:
1332 status = "[green]✓[/green]" if r.passed else "[red]✗[/red]"
1333 table.add_row(r.name, f"{r.duration_ms:.1f}ms", str(r.requests), status)
1334
1335 table.add_row("", "", "", "", style="dim")
1336 table.add_row("[bold]total[/bold]", f"[bold]{total_duration:.1f}ms[/bold]", f"[bold]{total_requests}[/bold]", "")
1337
1338 console.print(table)
1339
1340 if all_passed:
1341 console.print("\n[bold green]all tests passed[/bold green]")
1342 else:
1343 console.print("\n[bold red]some tests failed[/bold red]")
1344
1345 sys.exit(0 if all_passed else 1)
1346
1347
1348if __name__ == "__main__":
1349 main()