prefect server in zig
at f511403a2b901063559cd17995b45527418e76c6 1349 lines 49 kB view raw
1#!/usr/bin/env -S uv run --script --quiet 2# /// script 3# requires-python = ">=3.12" 4# dependencies = ["httpx", "rich"] 5# /// 6""" 7Functional test suite for prefect-server API. 8 9Tests API correctness by exercising all endpoints with expected request/response patterns. 10Includes scheduler integration tests (which have intentional delays to verify background services). 11 12For performance benchmarking, use ./scripts/benchmark instead. 13 14Usage: 15 ./scripts/test-api-sequence # human-readable output 16 ./scripts/test-api-sequence --json # machine-readable for CI 17 ./scripts/test-api-sequence --quiet # minimal output 18""" 19 20import json as json_lib 21import os 22import sys 23import time 24import uuid 25from dataclasses import dataclass, field 26from typing import Callable 27 28import httpx 29from rich.console import Console 30from rich.panel import Panel 31from rich.table import Table 32 33console = Console() 34BASE_URL = os.environ.get("PREFECT_API_URL", "http://localhost:4200/api") 35QUIET = "--json" in sys.argv or "--quiet" in sys.argv 36 37 38@dataclass 39class TestResult: 40 name: str 41 passed: bool 42 duration_ms: float 43 requests: int = 0 44 error: str | None = None 45 46 47class CountingClient(httpx.Client): 48 """HTTP client that counts requests.""" 49 50 def __init__(self, *args, **kwargs): 51 super().__init__(*args, **kwargs) 52 self.request_count = 0 53 54 def request(self, *args, **kwargs): 55 self.request_count += 1 56 return super().request(*args, **kwargs) 57 58 59def validate_response(data: dict, required_fields: list[str], field_types: dict | None = None) -> bool: 60 """Validate response contains required fields with expected types.""" 61 for field in required_fields: 62 if field not in data: 63 if not QUIET: 64 console.print(f"[red]VALIDATION[/red]: missing field '{field}'") 65 return False 66 if field_types: 67 for field, expected_type in field_types.items(): 68 if field in data and data[field] is not None: 69 if not isinstance(data[field], expected_type): 70 if not QUIET: 71 console.print(f"[red]VALIDATION[/red]: field '{field}' expected {expected_type.__name__}, got {type(data[field]).__name__}") 72 return False 73 return True 74 75 76def run_test(name: str, test_fn: Callable[[CountingClient], bool]) -> TestResult: 77 """Run a test function with timing and request counting.""" 78 if not QUIET: 79 console.print(Panel(f"testing {name}", style="blue")) 80 81 client = CountingClient(base_url=BASE_URL, timeout=10) 82 start = time.perf_counter() 83 84 try: 85 passed = test_fn(client) 86 duration_ms = (time.perf_counter() - start) * 1000 87 return TestResult( 88 name=name, 89 passed=passed, 90 duration_ms=duration_ms, 91 requests=client.request_count, 92 ) 93 except Exception as e: 94 duration_ms = (time.perf_counter() - start) * 1000 95 return TestResult( 96 name=name, 97 passed=False, 98 duration_ms=duration_ms, 99 requests=client.request_count, 100 error=str(e), 101 ) 102 finally: 103 client.close() 104 105 106# ---------- test functions ---------- 107 108 109def test_admin(client: CountingClient) -> bool: 110 """Test admin/health endpoints.""" 111 # health 112 if not QUIET: 113 console.print("[bold]GET /health[/bold]") 114 resp = client.get("/health") 115 if resp.status_code != 200: 116 if not QUIET: 117 console.print(f"[red]FAIL[/red]: {resp.status_code}") 118 return False 119 if not QUIET: 120 console.print(f" {resp.text}") 121 122 # version 123 if not QUIET: 124 console.print("[bold]GET /admin/version[/bold]") 125 resp = client.get("/admin/version") 126 if resp.status_code != 200: 127 if not QUIET: 128 console.print(f"[red]FAIL[/red]: {resp.status_code}") 129 return False 130 if not QUIET: 131 console.print(f" {resp.json()}") 132 133 # csrf-token 134 if not QUIET: 135 console.print("[bold]GET /csrf-token[/bold]") 136 resp = client.get("/csrf-token", params={"client": "test-client"}) 137 if resp.status_code == 200: 138 if not QUIET: 139 console.print(f" token received") 140 elif resp.status_code == 422: 141 if not QUIET: 142 console.print(f" csrf protection disabled (ok)") 143 else: 144 if not QUIET: 145 console.print(f"[red]FAIL[/red]: {resp.status_code}") 146 return False 147 148 return True 149 150 151def test_flow_run(client: CountingClient, should_fail: bool = False) -> bool: 152 """Test flow run lifecycle.""" 153 suffix = "fail" if should_fail else "success" 154 if not QUIET: 155 console.print(f"server: {BASE_URL}\n") 156 157 # create flow 158 if not QUIET: 159 console.print("[bold]1. POST /flows/[/bold]") 160 resp = client.post("/flows/", json={"name": f"bench-flow-{suffix}"}) 161 if resp.status_code not in (200, 201): 162 if not QUIET: 163 console.print(f"[red]FAIL[/red]: {resp.status_code} {resp.text}") 164 return False 165 flow = resp.json() 166 if not validate_response(flow, ["id", "name", "created", "updated"], {"id": str, "name": str}): 167 return False 168 if not QUIET: 169 console.print(f" flow_id: {flow.get('id')}") 170 171 # create flow run 172 if not QUIET: 173 console.print("\n[bold]2. POST /flow_runs/[/bold]") 174 resp = client.post("/flow_runs/", json={ 175 "flow_id": flow["id"], 176 "name": f"run-{uuid.uuid4().hex[:8]}", 177 "state": {"type": "PENDING", "name": "Pending"}, 178 }) 179 if resp.status_code not in (200, 201): 180 if not QUIET: 181 console.print(f"[red]FAIL[/red]: {resp.status_code} {resp.text}") 182 return False 183 flow_run = resp.json() 184 if not validate_response(flow_run, ["id", "name", "flow_id", "state_type"], {"id": str, "name": str}): 185 return False 186 flow_run_id = flow_run.get("id") 187 if not QUIET: 188 console.print(f" flow_run_id: {flow_run_id}") 189 190 # read flow run 191 if not QUIET: 192 console.print("\n[bold]3. GET /flow_runs/{id}[/bold]") 193 resp = client.get(f"/flow_runs/{flow_run_id}") 194 if resp.status_code != 200: 195 if not QUIET: 196 console.print(f"[red]FAIL[/red]: {resp.status_code}") 197 return False 198 if not QUIET: 199 console.print(f" state: {resp.json().get('state_type')}") 200 201 # set RUNNING 202 if not QUIET: 203 console.print("\n[bold]4. POST /flow_runs/{id}/set_state (RUNNING)[/bold]") 204 resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={ 205 "state": {"type": "RUNNING", "name": "Running"}, 206 "force": False, 207 }) 208 if resp.status_code not in (200, 201): 209 if not QUIET: 210 console.print(f"[red]FAIL[/red]: {resp.status_code}") 211 return False 212 if not QUIET: 213 console.print(f" status: {resp.json().get('status')}") 214 215 # set final state 216 final_type = "FAILED" if should_fail else "COMPLETED" 217 final_name = "Failed" if should_fail else "Completed" 218 if not QUIET: 219 console.print(f"\n[bold]5. POST /flow_runs/{{id}}/set_state ({final_type})[/bold]") 220 resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={ 221 "state": {"type": final_type, "name": final_name}, 222 "force": False, 223 }) 224 if resp.status_code not in (200, 201): 225 if not QUIET: 226 console.print(f"[red]FAIL[/red]: {resp.status_code}") 227 return False 228 if not QUIET: 229 console.print(f" status: {resp.json().get('status')}") 230 231 # verify 232 if not QUIET: 233 console.print("\n[bold]6. GET /flow_runs/{id} (verify)[/bold]") 234 resp = client.get(f"/flow_runs/{flow_run_id}") 235 if resp.status_code != 200: 236 return False 237 actual_type = resp.json().get("state_type") 238 if actual_type != final_type: 239 if not QUIET: 240 console.print(f"[red]FAIL[/red]: expected {final_type}, got {actual_type}") 241 return False 242 if not QUIET: 243 console.print(f" [green]state: {actual_type} (correct)[/green]") 244 245 return True 246 247 248def test_orchestration_rules(client: CountingClient) -> bool: 249 """Test orchestration rules (PreventPendingTransitions).""" 250 if not QUIET: 251 console.print(f"server: {BASE_URL}\n") 252 253 # 1. create flow 254 if not QUIET: 255 console.print("[bold]1. create flow[/bold]") 256 resp = client.post("/flows/", json={"name": f"orchestration-test-{uuid.uuid4().hex[:8]}"}) 257 if resp.status_code not in (200, 201): 258 if not QUIET: 259 console.print(f"[red]FAIL[/red]: create flow {resp.status_code}") 260 return False 261 flow_id = resp.json()["id"] 262 if not QUIET: 263 console.print(f" flow_id: {flow_id}") 264 265 # 2. create flow run in PENDING state 266 if not QUIET: 267 console.print("\n[bold]2. create flow run (PENDING)[/bold]") 268 resp = client.post("/flow_runs/", json={ 269 "flow_id": flow_id, 270 "name": f"orch-run-{uuid.uuid4().hex[:8]}", 271 "state": {"type": "PENDING", "name": "Pending"}, 272 }) 273 if resp.status_code not in (200, 201): 274 if not QUIET: 275 console.print(f"[red]FAIL[/red]: create flow run {resp.status_code}") 276 return False 277 flow_run_id = resp.json()["id"] 278 if not QUIET: 279 console.print(f" flow_run_id: {flow_run_id}") 280 281 # 3. try PENDING → PENDING (should be REJECT) 282 if not QUIET: 283 console.print("\n[bold]3. PENDING → PENDING (expect REJECT)[/bold]") 284 resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={ 285 "state": {"type": "PENDING", "name": "Pending"}, 286 }) 287 if resp.status_code not in (200, 201): 288 if not QUIET: 289 console.print(f"[red]FAIL[/red]: unexpected status code {resp.status_code}") 290 return False 291 result = resp.json() 292 status = result.get("status") 293 if status != "REJECT": 294 if not QUIET: 295 console.print(f"[red]FAIL[/red]: expected REJECT, got {status}") 296 return False 297 if not QUIET: 298 reason = result.get("details", {}).get("reason", "") 299 console.print(f" [green]status: {status} (correct)[/green]") 300 console.print(f" reason: {reason}") 301 302 # 4. PENDING → RUNNING (should be ACCEPT) 303 if not QUIET: 304 console.print("\n[bold]4. PENDING → RUNNING (expect ACCEPT)[/bold]") 305 resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={ 306 "state": {"type": "RUNNING", "name": "Running"}, 307 }) 308 if resp.status_code not in (200, 201): 309 if not QUIET: 310 console.print(f"[red]FAIL[/red]: {resp.status_code}") 311 return False 312 result = resp.json() 313 status = result.get("status") 314 if status != "ACCEPT": 315 if not QUIET: 316 console.print(f"[red]FAIL[/red]: expected ACCEPT, got {status}") 317 return False 318 if not QUIET: 319 console.print(f" [green]status: {status} (correct)[/green]") 320 321 # 5. try RUNNING → PENDING (should be REJECT) 322 if not QUIET: 323 console.print("\n[bold]5. RUNNING → PENDING (expect REJECT)[/bold]") 324 resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={ 325 "state": {"type": "PENDING", "name": "Pending"}, 326 }) 327 if resp.status_code not in (200, 201): 328 if not QUIET: 329 console.print(f"[red]FAIL[/red]: unexpected status code {resp.status_code}") 330 return False 331 result = resp.json() 332 status = result.get("status") 333 if status != "REJECT": 334 if not QUIET: 335 console.print(f"[red]FAIL[/red]: expected REJECT, got {status}") 336 return False 337 if not QUIET: 338 console.print(f" [green]status: {status} (correct)[/green]") 339 340 # 6. verify run is still RUNNING (reject didn't change state) 341 if not QUIET: 342 console.print("\n[bold]6. verify state unchanged after REJECT[/bold]") 343 resp = client.get(f"/flow_runs/{flow_run_id}") 344 if resp.status_code != 200: 345 return False 346 actual_state = resp.json().get("state_type") 347 if actual_state != "RUNNING": 348 if not QUIET: 349 console.print(f"[red]FAIL[/red]: expected RUNNING, got {actual_state}") 350 return False 351 if not QUIET: 352 console.print(f" [green]state: {actual_state} (correct - unchanged)[/green]") 353 354 # 7. complete normally 355 if not QUIET: 356 console.print("\n[bold]7. RUNNING → COMPLETED (expect ACCEPT)[/bold]") 357 resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={ 358 "state": {"type": "COMPLETED", "name": "Completed"}, 359 }) 360 if resp.status_code not in (200, 201): 361 return False 362 status = resp.json().get("status") 363 if status != "ACCEPT": 364 if not QUIET: 365 console.print(f"[red]FAIL[/red]: expected ACCEPT, got {status}") 366 return False 367 if not QUIET: 368 console.print(f" [green]status: {status} (correct)[/green]") 369 370 # ========================================================================= 371 # CopyScheduledTime rule tests 372 # ========================================================================= 373 374 # 8. create a SCHEDULED flow run with next_scheduled_start_time 375 scheduled_time = "2025-06-15T10:00:00Z" 376 if not QUIET: 377 console.print(f"\n[bold]8. create SCHEDULED run (next_scheduled_start_time={scheduled_time})[/bold]") 378 resp = client.post("/flow_runs/", json={ 379 "flow_id": flow_id, 380 "name": f"scheduled-run-{uuid.uuid4().hex[:8]}", 381 "state": {"type": "SCHEDULED", "name": "Scheduled"}, 382 "next_scheduled_start_time": scheduled_time, 383 }) 384 if resp.status_code not in (200, 201): 385 if not QUIET: 386 console.print(f"[red]FAIL[/red]: create scheduled run {resp.status_code}") 387 return False 388 scheduled_run_id = resp.json()["id"] 389 if not QUIET: 390 console.print(f" scheduled_run_id: {scheduled_run_id}") 391 392 # 9. transition SCHEDULED → PENDING (CopyScheduledTime should copy scheduled_time) 393 if not QUIET: 394 console.print("\n[bold]9. SCHEDULED → PENDING (expect scheduled_time copied)[/bold]") 395 resp = client.post(f"/flow_runs/{scheduled_run_id}/set_state", json={ 396 "state": {"type": "PENDING", "name": "Pending"}, 397 }) 398 if resp.status_code not in (200, 201): 399 if not QUIET: 400 console.print(f"[red]FAIL[/red]: set_state {resp.status_code}") 401 return False 402 status = resp.json().get("status") 403 if status != "ACCEPT": 404 if not QUIET: 405 console.print(f"[red]FAIL[/red]: expected ACCEPT, got {status}") 406 return False 407 if not QUIET: 408 console.print(f" [green]status: {status} (correct)[/green]") 409 410 # 10. verify expected_start_time was set from next_scheduled_start_time 411 if not QUIET: 412 console.print("\n[bold]10. verify expected_start_time copied[/bold]") 413 resp = client.get(f"/flow_runs/{scheduled_run_id}") 414 if resp.status_code != 200: 415 return False 416 run_data = resp.json() 417 expected_start = run_data.get("expected_start_time") 418 if expected_start != scheduled_time: 419 if not QUIET: 420 console.print(f"[red]FAIL[/red]: expected_start_time={expected_start}, expected {scheduled_time}") 421 return False 422 if not QUIET: 423 console.print(f" [green]expected_start_time: {expected_start} (correct)[/green]") 424 425 return True 426 427 428def test_task_run(client: CountingClient) -> bool: 429 """Test task run lifecycle.""" 430 # create 431 if not QUIET: 432 console.print("[bold]POST /task_runs/[/bold]") 433 resp = client.post("/task_runs/", json={ 434 "task_key": "bench-task", 435 "dynamic_key": f"key-{uuid.uuid4().hex[:8]}", 436 "name": f"task-{uuid.uuid4().hex[:8]}", 437 "state": {"type": "PENDING", "name": "Pending"}, 438 }) 439 if resp.status_code not in (200, 201): 440 if not QUIET: 441 console.print(f"[red]FAIL[/red]: {resp.status_code}") 442 return False 443 task_run_id = resp.json().get("id") 444 if not QUIET: 445 console.print(f" task_run_id: {task_run_id}") 446 447 # read 448 resp = client.get(f"/task_runs/{task_run_id}") 449 if resp.status_code != 200: 450 return False 451 452 # RUNNING 453 resp = client.post(f"/task_runs/{task_run_id}/set_state", json={ 454 "state": {"type": "RUNNING", "name": "Running"}, 455 "force": False, 456 }) 457 if resp.status_code not in (200, 201): 458 return False 459 if not QUIET: 460 console.print(f" -> RUNNING: {resp.json().get('status')}") 461 462 # COMPLETED 463 resp = client.post(f"/task_runs/{task_run_id}/set_state", json={ 464 "state": {"type": "COMPLETED", "name": "Completed"}, 465 "force": False, 466 }) 467 if resp.status_code not in (200, 201): 468 return False 469 if not QUIET: 470 console.print(f" -> COMPLETED: {resp.json().get('status')}") 471 472 return True 473 474 475def test_filters(client: CountingClient) -> bool: 476 """Test filter endpoints.""" 477 for endpoint, label in [ 478 ("/flows/filter", "flows"), 479 ("/flow_runs/filter", "flow_runs"), 480 ("/task_runs/filter", "task_runs"), 481 ]: 482 resp = client.post(endpoint, json={}) 483 if resp.status_code != 200: 484 if not QUIET: 485 console.print(f"[red]FAIL[/red]: {endpoint} {resp.status_code}") 486 return False 487 if not QUIET: 488 console.print(f" {label}: {len(resp.json())} items") 489 490 return True 491 492 493def test_logs(client: CountingClient) -> bool: 494 """Test logs endpoint.""" 495 from datetime import datetime, timezone 496 497 logs = [ 498 {"level": 20, "message": "test log 1", "name": "test", "timestamp": datetime.now(timezone.utc).isoformat()}, 499 {"level": 30, "message": "test log 2", "name": "test", "timestamp": datetime.now(timezone.utc).isoformat()}, 500 ] 501 resp = client.post("/logs/", json=logs) 502 if resp.status_code not in (200, 201, 204): 503 if not QUIET: 504 console.print(f"[red]FAIL[/red]: {resp.status_code}") 505 return False 506 if not QUIET: 507 console.print(f" {len(logs)} logs sent") 508 return True 509 510 511def test_variables(client: CountingClient) -> bool: 512 """Test variables API (CRUD).""" 513 var_name = f"bench-var-{uuid.uuid4().hex[:8]}" 514 515 # create 516 if not QUIET: 517 console.print("[bold]POST /variables/[/bold]") 518 resp = client.post("/variables/", json={ 519 "name": var_name, 520 "value": {"nested": "object", "count": 42}, 521 "tags": ["benchmark", "test"], 522 }) 523 if resp.status_code != 201: 524 if not QUIET: 525 console.print(f"[red]FAIL[/red]: create {resp.status_code}") 526 return False 527 variable = resp.json() 528 if not validate_response(variable, ["id", "name", "value", "tags", "created", "updated"], {"id": str, "name": str, "tags": list}): 529 return False 530 var_id = variable.get("id") 531 if not QUIET: 532 console.print(f" created: {var_id}") 533 534 # get by name 535 resp = client.get(f"/variables/name/{var_name}") 536 if resp.status_code != 200: 537 return False 538 if not QUIET: 539 console.print(f" get by name: ok") 540 541 # get by id 542 resp = client.get(f"/variables/{var_id}") 543 if resp.status_code != 200: 544 return False 545 546 # update by name 547 resp = client.patch(f"/variables/name/{var_name}", json={"value": "updated"}) 548 if resp.status_code != 204: 549 return False 550 if not QUIET: 551 console.print(f" updated by name") 552 553 # filter 554 resp = client.post("/variables/filter", json={"limit": 10}) 555 if resp.status_code != 200: 556 return False 557 if not QUIET: 558 console.print(f" filter: {len(resp.json())} items") 559 560 # count 561 resp = client.post("/variables/count", json={}) 562 if resp.status_code != 200: 563 return False 564 if not QUIET: 565 console.print(f" count: {resp.text}") 566 567 # duplicate name should fail 568 resp = client.post("/variables/", json={"name": var_name, "value": "dupe"}) 569 if resp.status_code != 409: 570 if not QUIET: 571 console.print(f"[red]FAIL[/red]: duplicate should return 409, got {resp.status_code}") 572 return False 573 if not QUIET: 574 console.print(f" duplicate rejected: 409") 575 576 # delete 577 resp = client.delete(f"/variables/name/{var_name}") 578 if resp.status_code != 204: 579 return False 580 if not QUIET: 581 console.print(f" deleted") 582 583 return True 584 585 586def test_blocks(client: CountingClient) -> bool: 587 """Test blocks API (types, schemas, documents).""" 588 slug = f"bench-block-{uuid.uuid4().hex[:8]}" 589 590 # create block type 591 if not QUIET: 592 console.print("[bold]block_types[/bold]") 593 resp = client.post("/block_types/", json={ 594 "name": f"Bench Block {slug}", 595 "slug": slug, 596 "description": "benchmark block type", 597 }) 598 if resp.status_code not in (200, 201): 599 if not QUIET: 600 console.print(f"[red]FAIL[/red]: create block_type {resp.status_code}") 601 return False 602 block_type = resp.json() 603 if not validate_response(block_type, ["id", "name", "slug"], {"id": str, "name": str, "slug": str}): 604 return False 605 block_type_id = block_type.get("id") 606 if not QUIET: 607 console.print(f" created: {block_type_id}") 608 609 # get by slug 610 resp = client.get(f"/block_types/slug/{slug}") 611 if resp.status_code != 200: 612 return False 613 614 # create schema 615 if not QUIET: 616 console.print("[bold]block_schemas[/bold]") 617 resp = client.post("/block_schemas/", json={ 618 "block_type_id": block_type_id, 619 "fields": {"properties": {"value": {"type": "string"}}}, 620 "capabilities": ["test"], 621 "version": "1.0.0", 622 }) 623 if resp.status_code not in (200, 201): 624 if not QUIET: 625 console.print(f"[red]FAIL[/red]: create block_schema {resp.status_code}") 626 return False 627 block_schema = resp.json() 628 block_schema_id = block_schema.get("id") 629 checksum = block_schema.get("checksum") 630 if not QUIET: 631 console.print(f" created: {block_schema_id}") 632 633 # get by checksum 634 resp = client.get(f"/block_schemas/checksum/{checksum}") 635 if resp.status_code != 200: 636 return False 637 638 # create document 639 if not QUIET: 640 console.print("[bold]block_documents[/bold]") 641 doc_name = f"bench-doc-{uuid.uuid4().hex[:8]}" 642 resp = client.post("/block_documents/", json={ 643 "name": doc_name, 644 "block_type_id": block_type_id, 645 "block_schema_id": block_schema_id, 646 "data": {"value": "secret-value"}, 647 }) 648 if resp.status_code not in (200, 201): 649 if not QUIET: 650 console.print(f"[red]FAIL[/red]: create block_document {resp.status_code}") 651 return False 652 block_doc = resp.json() 653 block_doc_id = block_doc.get("id") 654 if not QUIET: 655 console.print(f" created: {block_doc_id}") 656 657 # get by id 658 resp = client.get(f"/block_documents/{block_doc_id}") 659 if resp.status_code != 200: 660 return False 661 662 # get by slug/name 663 resp = client.get(f"/block_types/slug/{slug}/block_documents/name/{doc_name}") 664 if resp.status_code != 200: 665 return False 666 667 # update 668 resp = client.patch(f"/block_documents/{block_doc_id}", json={"data": {"value": "updated"}}) 669 if resp.status_code != 204: 670 return False 671 if not QUIET: 672 console.print(f" updated") 673 674 # filters 675 for endpoint in ["/block_types/filter", "/block_schemas/filter", "/block_documents/filter"]: 676 resp = client.post(endpoint, json={}) 677 if resp.status_code != 200: 678 return False 679 680 # delete 681 resp = client.delete(f"/block_documents/{block_doc_id}") 682 if resp.status_code != 204: 683 return False 684 if not QUIET: 685 console.print(f" deleted") 686 687 return True 688 689 690def test_work_pools(client: CountingClient) -> bool: 691 """Test work pools API (pools, queues, workers).""" 692 pool_name = f"test-pool-{uuid.uuid4().hex[:8]}" 693 694 # create work pool 695 if not QUIET: 696 console.print("[bold]work_pools[/bold]") 697 resp = client.post("/work_pools/", json={ 698 "name": pool_name, 699 "type": "process", 700 "description": "test work pool", 701 }) 702 if resp.status_code not in (200, 201): 703 if not QUIET: 704 console.print(f"[red]FAIL[/red]: create work_pool {resp.status_code} {resp.text}") 705 return False 706 pool = resp.json() 707 if not validate_response(pool, ["id", "name", "type", "status", "default_queue_id"], {"id": str, "name": str}): 708 return False 709 if not QUIET: 710 console.print(f" created: {pool.get('id')}") 711 712 # check default queue was created 713 if not pool.get("default_queue_id"): 714 if not QUIET: 715 console.print("[red]FAIL[/red]: no default_queue_id") 716 return False 717 718 # get by name 719 resp = client.get(f"/work_pools/{pool_name}") 720 if resp.status_code != 200: 721 if not QUIET: 722 console.print(f"[red]FAIL[/red]: get work_pool {resp.status_code}") 723 return False 724 725 # update 726 resp = client.patch(f"/work_pools/{pool_name}", json={"description": "updated"}) 727 if resp.status_code != 204: 728 if not QUIET: 729 console.print(f"[red]FAIL[/red]: update work_pool {resp.status_code}") 730 return False 731 if not QUIET: 732 console.print(" updated") 733 734 # filter 735 resp = client.post("/work_pools/filter", json={}) 736 if resp.status_code != 200: 737 if not QUIET: 738 console.print(f"[red]FAIL[/red]: filter work_pools {resp.status_code}") 739 return False 740 pools = resp.json() 741 if not isinstance(pools, list): 742 return False 743 744 # create queue 745 if not QUIET: 746 console.print("[bold]work_queues[/bold]") 747 queue_name = f"test-queue-{uuid.uuid4().hex[:8]}" 748 resp = client.post(f"/work_pools/{pool_name}/queues/", json={ 749 "name": queue_name, 750 "description": "test queue", 751 "priority": 5, 752 }) 753 if resp.status_code not in (200, 201): 754 if not QUIET: 755 console.print(f"[red]FAIL[/red]: create work_queue {resp.status_code} {resp.text}") 756 return False 757 queue = resp.json() 758 if not validate_response(queue, ["id", "name", "priority", "work_pool_id"], {"id": str, "priority": int}): 759 return False 760 if not QUIET: 761 console.print(f" created: {queue.get('id')}") 762 763 # get queue 764 resp = client.get(f"/work_pools/{pool_name}/queues/{queue_name}") 765 if resp.status_code != 200: 766 if not QUIET: 767 console.print(f"[red]FAIL[/red]: get work_queue {resp.status_code}") 768 return False 769 770 # filter queues 771 resp = client.post(f"/work_pools/{pool_name}/queues/filter", json={}) 772 if resp.status_code != 200: 773 if not QUIET: 774 console.print(f"[red]FAIL[/red]: filter work_queues {resp.status_code}") 775 return False 776 queues = resp.json() 777 if not isinstance(queues, list) or len(queues) < 2: # default + our queue 778 if not QUIET: 779 console.print(f"[red]FAIL[/red]: expected at least 2 queues, got {len(queues) if isinstance(queues, list) else 'not a list'}") 780 return False 781 782 # worker heartbeat 783 if not QUIET: 784 console.print("[bold]workers[/bold]") 785 resp = client.post(f"/work_pools/{pool_name}/workers/heartbeat", json={ 786 "name": "test-worker-1", 787 "heartbeat_interval_seconds": 30, 788 }) 789 if resp.status_code != 204: 790 if not QUIET: 791 console.print(f"[red]FAIL[/red]: worker heartbeat {resp.status_code} {resp.text}") 792 return False 793 if not QUIET: 794 console.print(" heartbeat sent") 795 796 # check pool status is now READY 797 resp = client.get(f"/work_pools/{pool_name}") 798 if resp.status_code != 200: 799 return False 800 pool = resp.json() 801 if pool.get("status") != "READY": 802 if not QUIET: 803 console.print(f"[red]FAIL[/red]: expected pool status READY, got {pool.get('status')}") 804 return False 805 if not QUIET: 806 console.print(" pool status: READY") 807 808 # filter workers 809 resp = client.post(f"/work_pools/{pool_name}/workers/filter", json={}) 810 if resp.status_code != 200: 811 if not QUIET: 812 console.print(f"[red]FAIL[/red]: filter workers {resp.status_code}") 813 return False 814 workers = resp.json() 815 if not isinstance(workers, list) or len(workers) < 1: 816 return False 817 818 # delete queue (not default) 819 resp = client.delete(f"/work_pools/{pool_name}/queues/{queue_name}") 820 if resp.status_code != 204: 821 if not QUIET: 822 console.print(f"[red]FAIL[/red]: delete work_queue {resp.status_code}") 823 return False 824 if not QUIET: 825 console.print(" deleted queue") 826 827 # delete pool 828 resp = client.delete(f"/work_pools/{pool_name}") 829 if resp.status_code != 204: 830 if not QUIET: 831 console.print(f"[red]FAIL[/red]: delete work_pool {resp.status_code}") 832 return False 833 if not QUIET: 834 console.print(" deleted pool") 835 836 return True 837 838 839def test_deployments(client: CountingClient) -> bool: 840 """Test deployments API (deployments, schedules, create_flow_run).""" 841 # create a flow first 842 if not QUIET: 843 console.print("[bold]setup: create flow[/bold]") 844 resp = client.post("/flows/", json={"name": f"deploy-flow-{uuid.uuid4().hex[:8]}"}) 845 if resp.status_code not in (200, 201): 846 if not QUIET: 847 console.print(f"[red]FAIL[/red]: create flow {resp.status_code}") 848 return False 849 flow = resp.json() 850 flow_id = flow.get("id") 851 flow_name = flow.get("name") 852 853 # create deployment 854 deployment_name = f"test-deployment-{uuid.uuid4().hex[:8]}" 855 if not QUIET: 856 console.print("[bold]deployments[/bold]") 857 resp = client.post("/deployments/", json={ 858 "name": deployment_name, 859 "flow_id": flow_id, 860 "description": "test deployment", 861 "tags": ["test", "benchmark"], 862 "parameters": {"key": "value"}, 863 "schedules": [ 864 {"schedule": {"interval": 3600}, "active": True}, 865 ], 866 }) 867 if resp.status_code not in (200, 201): 868 if not QUIET: 869 console.print(f"[red]FAIL[/red]: create deployment {resp.status_code} {resp.text}") 870 return False 871 deployment = resp.json() 872 if not validate_response(deployment, ["id", "name", "flow_id", "status", "schedules"], {"id": str, "name": str}): 873 return False 874 deployment_id = deployment.get("id") 875 if not QUIET: 876 console.print(f" created: {deployment_id}") 877 878 # verify schedules were created 879 schedules = deployment.get("schedules", []) 880 if not isinstance(schedules, list) or len(schedules) != 1: 881 if not QUIET: 882 console.print(f"[red]FAIL[/red]: expected 1 schedule, got {len(schedules) if isinstance(schedules, list) else 'not a list'}") 883 return False 884 if not QUIET: 885 console.print(f" schedules: {len(schedules)}") 886 887 # get by id 888 resp = client.get(f"/deployments/{deployment_id}") 889 if resp.status_code != 200: 890 if not QUIET: 891 console.print(f"[red]FAIL[/red]: get deployment {resp.status_code}") 892 return False 893 894 # get by name 895 resp = client.get(f"/deployments/name/{flow_name}/{deployment_name}") 896 if resp.status_code != 200: 897 if not QUIET: 898 console.print(f"[red]FAIL[/red]: get deployment by name {resp.status_code}") 899 return False 900 if not QUIET: 901 console.print(" get by name: ok") 902 903 # update 904 resp = client.patch(f"/deployments/{deployment_id}", json={"description": "updated"}) 905 if resp.status_code != 204: 906 if not QUIET: 907 console.print(f"[red]FAIL[/red]: update deployment {resp.status_code}") 908 return False 909 if not QUIET: 910 console.print(" updated") 911 912 # filter 913 resp = client.post("/deployments/filter", json={"limit": 10}) 914 if resp.status_code != 200: 915 if not QUIET: 916 console.print(f"[red]FAIL[/red]: filter deployments {resp.status_code}") 917 return False 918 if not QUIET: 919 console.print(f" filter: {len(resp.json())} items") 920 921 # count 922 resp = client.post("/deployments/count", json={}) 923 if resp.status_code != 200: 924 if not QUIET: 925 console.print(f"[red]FAIL[/red]: count deployments {resp.status_code}") 926 return False 927 if not QUIET: 928 console.print(f" count: {resp.text}") 929 930 # pause 931 resp = client.post(f"/deployments/{deployment_id}/pause_deployment", json={}) 932 if resp.status_code != 204: 933 if not QUIET: 934 console.print(f"[red]FAIL[/red]: pause deployment {resp.status_code}") 935 return False 936 if not QUIET: 937 console.print(" paused") 938 939 # resume 940 resp = client.post(f"/deployments/{deployment_id}/resume_deployment", json={}) 941 if resp.status_code != 204: 942 if not QUIET: 943 console.print(f"[red]FAIL[/red]: resume deployment {resp.status_code}") 944 return False 945 if not QUIET: 946 console.print(" resumed") 947 948 # create flow run from deployment 949 if not QUIET: 950 console.print("[bold]create_flow_run[/bold]") 951 resp = client.post(f"/deployments/{deployment_id}/create_flow_run", json={}) 952 if resp.status_code not in (200, 201): 953 if not QUIET: 954 console.print(f"[red]FAIL[/red]: create flow run {resp.status_code} {resp.text}") 955 return False 956 flow_run = resp.json() 957 if not validate_response(flow_run, ["id", "flow_id", "deployment_id"], {"id": str}): 958 return False 959 if flow_run.get("deployment_id") != deployment_id: 960 if not QUIET: 961 console.print(f"[red]FAIL[/red]: deployment_id mismatch") 962 return False 963 if not QUIET: 964 console.print(f" created flow run: {flow_run.get('id')}") 965 966 # schedules - list 967 if not QUIET: 968 console.print("[bold]deployment_schedules[/bold]") 969 resp = client.get(f"/deployments/{deployment_id}/schedules") 970 if resp.status_code != 200: 971 if not QUIET: 972 console.print(f"[red]FAIL[/red]: list schedules {resp.status_code}") 973 return False 974 schedules = resp.json() 975 schedule_id = schedules[0].get("id") if schedules else None 976 if not QUIET: 977 console.print(f" list: {len(schedules)} schedules") 978 979 # schedules - create 980 resp = client.post(f"/deployments/{deployment_id}/schedules", json={ 981 "schedule": {"cron": "0 0 * * *"}, 982 "active": False, 983 }) 984 if resp.status_code not in (200, 201): 985 if not QUIET: 986 console.print(f"[red]FAIL[/red]: create schedule {resp.status_code}") 987 return False 988 created_schedules = resp.json() 989 if not isinstance(created_schedules, list) or len(created_schedules) != 1: 990 if not QUIET: 991 console.print(f"[red]FAIL[/red]: expected 1 created schedule") 992 return False 993 new_schedule_id = created_schedules[0].get("id") 994 if not QUIET: 995 console.print(f" created schedule: {new_schedule_id}") 996 997 # schedules - update 998 resp = client.patch(f"/deployments/{deployment_id}/schedules/{new_schedule_id}", json={"active": True}) 999 if resp.status_code != 204: 1000 if not QUIET: 1001 console.print(f"[red]FAIL[/red]: update schedule {resp.status_code}") 1002 return False 1003 if not QUIET: 1004 console.print(" updated schedule") 1005 1006 # schedules - delete 1007 resp = client.delete(f"/deployments/{deployment_id}/schedules/{new_schedule_id}") 1008 if resp.status_code != 204: 1009 if not QUIET: 1010 console.print(f"[red]FAIL[/red]: delete schedule {resp.status_code}") 1011 return False 1012 if not QUIET: 1013 console.print(" deleted schedule") 1014 1015 # delete deployment 1016 resp = client.delete(f"/deployments/{deployment_id}") 1017 if resp.status_code != 204: 1018 if not QUIET: 1019 console.print(f"[red]FAIL[/red]: delete deployment {resp.status_code}") 1020 return False 1021 if not QUIET: 1022 console.print(" deleted deployment") 1023 1024 return True 1025 1026 1027def test_scheduler_idempotency(client: CountingClient) -> bool: 1028 """Test that scheduler is idempotent - running twice doesn't create duplicates.""" 1029 import time as time_mod 1030 1031 def fail(msg: str) -> bool: 1032 if not QUIET: console.print(f"[red]FAIL[/red]: {msg}") 1033 return False 1034 1035 def log(msg: str) -> None: 1036 if not QUIET: console.print(msg) 1037 1038 # setup: create flow, work pool, deployment with interval schedule 1039 log("[bold]setup[/bold]") 1040 resp = client.post("/flows/", json={"name": f"idem-flow-{uuid.uuid4().hex[:8]}"}) 1041 if resp.status_code not in (200, 201): return fail(f"create flow {resp.status_code}") 1042 flow_id = resp.json().get("id") 1043 1044 pool_name = f"idem-pool-{uuid.uuid4().hex[:8]}" 1045 resp = client.post("/work_pools/", json={"name": pool_name, "type": "process"}) 1046 if resp.status_code not in (200, 201): return fail(f"create work_pool {resp.status_code}") 1047 log(f" pool: {pool_name}") 1048 1049 # create deployment with interval schedule (every hour) 1050 resp = client.post("/deployments/", json={ 1051 "name": f"idem-deploy-{uuid.uuid4().hex[:8]}", 1052 "flow_id": flow_id, 1053 "work_pool_name": pool_name, 1054 "schedules": [{"schedule": {"interval": 3600}, "active": True}], # every hour 1055 }) 1056 if resp.status_code not in (200, 201): return fail(f"create deployment {resp.status_code}") 1057 deployment = resp.json() 1058 deployment_id = deployment.get("id") 1059 log(f" deployment: {deployment_id}") 1060 1061 # wait for scheduler to run once (default 5s interval) 1062 log("[bold]waiting for scheduler (7s)...[/bold]") 1063 time_mod.sleep(7) 1064 1065 # count runs after first scheduler tick 1066 resp = client.post("/flow_runs/filter", json={ 1067 "flow_runs": {"deployment_id": {"any_": [deployment_id]}}, 1068 "limit": 100, 1069 }) 1070 if resp.status_code != 200: return fail(f"filter flow_runs {resp.status_code}") 1071 runs_after_first = resp.json() 1072 count_after_first = len(runs_after_first) 1073 log(f" runs after first tick: {count_after_first}") 1074 1075 if count_after_first == 0: 1076 return fail("scheduler did not create any runs") 1077 1078 # wait for scheduler to run again 1079 log("[bold]waiting for second scheduler tick (7s)...[/bold]") 1080 time_mod.sleep(7) 1081 1082 # count runs after second scheduler tick 1083 resp = client.post("/flow_runs/filter", json={ 1084 "flow_runs": {"deployment_id": {"any_": [deployment_id]}}, 1085 "limit": 100, 1086 }) 1087 if resp.status_code != 200: return fail(f"filter flow_runs {resp.status_code}") 1088 runs_after_second = resp.json() 1089 count_after_second = len(runs_after_second) 1090 log(f" runs after second tick: {count_after_second}") 1091 1092 # key test: same number of runs means idempotency works 1093 # (scheduler shouldn't create duplicates for same scheduled times) 1094 if count_after_second != count_after_first: 1095 return fail(f"idempotency failed: {count_after_first} -> {count_after_second} runs") 1096 log(f" [green]idempotency verified: count unchanged[/green]") 1097 1098 # cleanup 1099 client.delete(f"/deployments/{deployment_id}") 1100 client.delete(f"/work_pools/{pool_name}") 1101 log(" cleanup: ok") 1102 1103 return True 1104 1105 1106def test_parameter_merging(client: CountingClient) -> bool: 1107 """Test that schedule parameters override deployment parameters.""" 1108 import time as time_mod 1109 1110 def fail(msg: str) -> bool: 1111 if not QUIET: console.print(f"[red]FAIL[/red]: {msg}") 1112 return False 1113 1114 def log(msg: str) -> None: 1115 if not QUIET: console.print(msg) 1116 1117 # setup 1118 log("[bold]setup[/bold]") 1119 resp = client.post("/flows/", json={"name": f"params-flow-{uuid.uuid4().hex[:8]}"}) 1120 if resp.status_code not in (200, 201): return fail(f"create flow {resp.status_code}") 1121 flow_id = resp.json().get("id") 1122 1123 pool_name = f"params-pool-{uuid.uuid4().hex[:8]}" 1124 resp = client.post("/work_pools/", json={"name": pool_name, "type": "process"}) 1125 if resp.status_code not in (200, 201): return fail(f"create work_pool {resp.status_code}") 1126 log(f" pool: {pool_name}") 1127 1128 # create deployment with base parameters 1129 # schedule has override parameter 1130 resp = client.post("/deployments/", json={ 1131 "name": f"params-deploy-{uuid.uuid4().hex[:8]}", 1132 "flow_id": flow_id, 1133 "work_pool_name": pool_name, 1134 "parameters": {"base_key": "base_value", "override_key": "deployment_value"}, 1135 "schedules": [{ 1136 "schedule": {"interval": 3600}, 1137 "active": True, 1138 "parameters": {"override_key": "schedule_value", "schedule_key": "schedule_only"}, 1139 }], 1140 }) 1141 if resp.status_code not in (200, 201): return fail(f"create deployment {resp.status_code}") 1142 deployment = resp.json() 1143 deployment_id = deployment.get("id") 1144 log(f" deployment: {deployment_id}") 1145 log(f" deployment params: {deployment.get('parameters')}") 1146 1147 # wait for scheduler to create runs 1148 log("[bold]waiting for scheduler (7s)...[/bold]") 1149 time_mod.sleep(7) 1150 1151 # get the scheduled runs and check their parameters 1152 resp = client.post("/flow_runs/filter", json={ 1153 "flow_runs": {"deployment_id": {"any_": [deployment_id]}}, 1154 "limit": 10, 1155 }) 1156 if resp.status_code != 200: return fail(f"filter flow_runs {resp.status_code}") 1157 runs = resp.json() 1158 log(f" found {len(runs)} runs") 1159 1160 if len(runs) == 0: 1161 return fail("scheduler did not create any runs") 1162 1163 # check merged parameters on first run 1164 run_params = runs[0].get("parameters", {}) 1165 if isinstance(run_params, str): 1166 import json as json_mod 1167 run_params = json_mod.loads(run_params) 1168 log(f" run params: {run_params}") 1169 1170 # verify merging: 1171 # - base_key should be from deployment 1172 # - override_key should be from schedule (override) 1173 # - schedule_key should be from schedule (new key) 1174 if run_params.get("base_key") != "base_value": 1175 return fail(f"base_key not preserved: {run_params.get('base_key')}") 1176 if run_params.get("override_key") != "schedule_value": 1177 return fail(f"override_key not overridden: {run_params.get('override_key')}") 1178 if run_params.get("schedule_key") != "schedule_only": 1179 return fail(f"schedule_key not added: {run_params.get('schedule_key')}") 1180 1181 log(" [green]parameter merging verified[/green]") 1182 1183 # cleanup 1184 client.delete(f"/deployments/{deployment_id}") 1185 client.delete(f"/work_pools/{pool_name}") 1186 log(" cleanup: ok") 1187 1188 return True 1189 1190 1191def test_get_scheduled_flow_runs(client: CountingClient) -> bool: 1192 """Test get_scheduled_flow_runs endpoint (worker polling).""" 1193 from datetime import datetime, timezone 1194 1195 def fail(msg: str) -> bool: 1196 if not QUIET: console.print(f"[red]FAIL[/red]: {msg}") 1197 return False 1198 1199 def log(msg: str) -> None: 1200 if not QUIET: console.print(msg) 1201 1202 # setup 1203 log("[bold]setup: create flow[/bold]") 1204 resp = client.post("/flows/", json={"name": f"schedule-flow-{uuid.uuid4().hex[:8]}"}) 1205 if resp.status_code not in (200, 201): return fail(f"create flow {resp.status_code}") 1206 flow_id = resp.json().get("id") 1207 1208 pool_name = f"schedule-pool-{uuid.uuid4().hex[:8]}" 1209 resp = client.post("/work_pools/", json={"name": pool_name, "type": "process"}) 1210 if resp.status_code not in (200, 201): return fail(f"create work_pool {resp.status_code}") 1211 pool = resp.json() 1212 pool_id, default_queue_id = pool.get("id"), pool.get("default_queue_id") 1213 log(f" pool: {pool_id}") 1214 if pool.get("status") != "NOT_READY": return fail(f"expected pool NOT_READY, got {pool.get('status')}") 1215 1216 resp = client.post("/deployments/", json={"name": f"schedule-deploy-{uuid.uuid4().hex[:8]}", "flow_id": flow_id, "work_pool_name": pool_name}) 1217 if resp.status_code not in (200, 201): return fail(f"create deployment {resp.status_code}") 1218 deployment = resp.json() 1219 deployment_id = deployment.get("id") 1220 log(f" deployment: {deployment_id}") 1221 if deployment.get("status") != "NOT_READY": return fail(f"expected deployment NOT_READY, got {deployment.get('status')}") 1222 1223 # create scheduled flow run 1224 log("[bold]create scheduled flow run[/bold]") 1225 resp = client.post(f"/deployments/{deployment_id}/create_flow_run", json={"state": {"type": "SCHEDULED", "name": "Scheduled"}}) 1226 if resp.status_code not in (200, 201): return fail(f"create flow run {resp.status_code}") 1227 flow_run = resp.json() 1228 flow_run_id = flow_run.get("id") 1229 log(f" flow_run: {flow_run_id}") 1230 log(f" state: {flow_run.get('state_type')}") 1231 if flow_run.get("state_type") != "SCHEDULED": return fail(f"expected SCHEDULED, got {flow_run.get('state_type')}") 1232 1233 # test polling 1234 log("[bold]get_scheduled_flow_runs[/bold]") 1235 resp = client.post(f"/work_pools/{pool_name}/get_scheduled_flow_runs", json={}) 1236 if resp.status_code != 200: return fail(f"get_scheduled_flow_runs {resp.status_code}") 1237 scheduled_runs = resp.json() 1238 if not isinstance(scheduled_runs, list): return fail(f"expected list, got {type(scheduled_runs)}") 1239 log(f" returned {len(scheduled_runs)} runs") 1240 1241 # verify our run is in results 1242 found = any(item.get("flow_run", {}).get("id") == flow_run_id and 1243 item.get("work_pool_id") == pool_id and item.get("work_queue_id") == default_queue_id 1244 for item in scheduled_runs) 1245 if not found: return fail("scheduled flow run not found in results") 1246 log(" flow run found in results") 1247 1248 # verify status changes 1249 resp = client.get(f"/work_pools/{pool_name}") 1250 if resp.status_code != 200 or resp.json().get("status") != "READY": 1251 return fail(f"expected pool READY after polling") 1252 log(" pool status: READY") 1253 1254 resp = client.get(f"/deployments/{deployment_id}") 1255 if resp.status_code != 200 or resp.json().get("status") != "READY": 1256 return fail(f"expected deployment READY after polling") 1257 log(" deployment status: READY") 1258 1259 # test filters 1260 resp = client.post(f"/work_pools/{pool_name}/get_scheduled_flow_runs", json={"work_queue_names": ["default"]}) 1261 if resp.status_code != 200: return fail(f"filter test {resp.status_code}") 1262 log(" filtered by queue: ok") 1263 1264 resp = client.post(f"/work_pools/{pool_name}/get_scheduled_flow_runs", json={"scheduled_before": datetime.now(timezone.utc).isoformat()}) 1265 if resp.status_code != 200: return fail(f"scheduled_before test {resp.status_code}") 1266 log(f" scheduled_before filter: {len(resp.json())} runs") 1267 1268 # cleanup 1269 client.delete(f"/deployments/{deployment_id}") 1270 client.delete(f"/work_pools/{pool_name}") 1271 log(" cleanup: ok") 1272 return True 1273 1274 1275def main(): 1276 json_output = "--json" in sys.argv 1277 1278 if not QUIET: 1279 console.print("\n[bold cyan]prefect-server API test[/bold cyan]\n") 1280 1281 results: list[TestResult] = [] 1282 1283 # run all tests 1284 results.append(run_test("admin", test_admin)) 1285 results.append(run_test("flow_run (success)", lambda c: test_flow_run(c, should_fail=False))) 1286 results.append(run_test("flow_run (failure)", lambda c: test_flow_run(c, should_fail=True))) 1287 results.append(run_test("orchestration_rules", test_orchestration_rules)) 1288 results.append(run_test("task_run", test_task_run)) 1289 results.append(run_test("filters", test_filters)) 1290 results.append(run_test("logs", test_logs)) 1291 results.append(run_test("variables", test_variables)) 1292 results.append(run_test("blocks", test_blocks)) 1293 results.append(run_test("work_pools", test_work_pools)) 1294 results.append(run_test("deployments", test_deployments)) 1295 results.append(run_test("get_scheduled_flow_runs", test_get_scheduled_flow_runs)) 1296 results.append(run_test("scheduler_idempotency", test_scheduler_idempotency)) 1297 results.append(run_test("parameter_merging", test_parameter_merging)) 1298 1299 total_duration = sum(r.duration_ms for r in results) 1300 total_requests = sum(r.requests for r in results) 1301 all_passed = all(r.passed for r in results) 1302 1303 if json_output: 1304 # machine-readable output for benchmark script 1305 output = { 1306 "passed": all_passed, 1307 "total_duration_ms": total_duration, 1308 "total_requests": total_requests, 1309 "sections": [ 1310 { 1311 "name": r.name, 1312 "passed": r.passed, 1313 "duration_ms": r.duration_ms, 1314 "requests": r.requests, 1315 "error": r.error, 1316 } 1317 for r in results 1318 ], 1319 } 1320 print(json_lib.dumps(output)) 1321 else: 1322 # human-readable output 1323 console.print("\n" + "=" * 60) 1324 1325 table = Table(title="test results") 1326 table.add_column("section", style="cyan") 1327 table.add_column("time", justify="right") 1328 table.add_column("reqs", justify="right") 1329 table.add_column("status", justify="center") 1330 1331 for r in results: 1332 status = "[green]✓[/green]" if r.passed else "[red]✗[/red]" 1333 table.add_row(r.name, f"{r.duration_ms:.1f}ms", str(r.requests), status) 1334 1335 table.add_row("", "", "", "", style="dim") 1336 table.add_row("[bold]total[/bold]", f"[bold]{total_duration:.1f}ms[/bold]", f"[bold]{total_requests}[/bold]", "") 1337 1338 console.print(table) 1339 1340 if all_passed: 1341 console.print("\n[bold green]all tests passed[/bold green]") 1342 else: 1343 console.print("\n[bold red]some tests failed[/bold red]") 1344 1345 sys.exit(0 if all_passed else 1) 1346 1347 1348if __name__ == "__main__": 1349 main()