prefect server in zig
at main 1763 lines 69 kB view raw
1#!/usr/bin/env -S uv run --script --quiet 2# /// script 3# requires-python = ">=3.12" 4# dependencies = ["httpx", "rich"] 5# /// 6""" 7Functional test suite for prefect-server API. 8 9Tests API correctness by exercising all endpoints with expected request/response patterns. 10Includes scheduler integration tests (which have intentional delays to verify background services). 11 12For performance benchmarking, use ./scripts/benchmark instead. 13 14Usage: 15 ./scripts/test-api-sequence # human-readable output 16 ./scripts/test-api-sequence --json # machine-readable for CI 17 ./scripts/test-api-sequence --quiet # minimal output 18""" 19 20import json as json_lib 21import os 22import sys 23import time 24import uuid 25from dataclasses import dataclass, field 26from typing import Callable 27 28import httpx 29from rich.console import Console 30from rich.panel import Panel 31from rich.table import Table 32 33console = Console() 34BASE_URL = os.environ.get("PREFECT_API_URL", "http://localhost:4200/api") 35QUIET = "--json" in sys.argv or "--quiet" in sys.argv 36 37 38@dataclass 39class TestResult: 40 name: str 41 passed: bool 42 duration_ms: float 43 requests: int = 0 44 error: str | None = None 45 46 47class CountingClient(httpx.Client): 48 """HTTP client that counts requests.""" 49 50 def __init__(self, *args, **kwargs): 51 super().__init__(*args, **kwargs) 52 self.request_count = 0 53 54 def request(self, *args, **kwargs): 55 self.request_count += 1 56 return super().request(*args, **kwargs) 57 58 59def validate_response(data: dict, required_fields: list[str], field_types: dict | None = None) -> bool: 60 """Validate response contains required fields with expected types.""" 61 for field in required_fields: 62 if field not in data: 63 if not QUIET: 64 console.print(f"[red]VALIDATION[/red]: missing field '{field}'") 65 return False 66 if field_types: 67 for field, expected_type in field_types.items(): 68 if field in data and data[field] is not None: 69 if not isinstance(data[field], expected_type): 70 if not QUIET: 71 console.print(f"[red]VALIDATION[/red]: field '{field}' expected {expected_type.__name__}, got {type(data[field]).__name__}") 72 return False 73 return True 74 75 76def run_test(name: str, test_fn: Callable[[CountingClient], bool]) -> TestResult: 77 """Run a test function with timing and request counting.""" 78 if not QUIET: 79 console.print(Panel(f"testing {name}", style="blue")) 80 81 client = CountingClient(base_url=BASE_URL, timeout=10) 82 start = time.perf_counter() 83 84 try: 85 passed = test_fn(client) 86 duration_ms = (time.perf_counter() - start) * 1000 87 return TestResult( 88 name=name, 89 passed=passed, 90 duration_ms=duration_ms, 91 requests=client.request_count, 92 ) 93 except Exception as e: 94 duration_ms = (time.perf_counter() - start) * 1000 95 return TestResult( 96 name=name, 97 passed=False, 98 duration_ms=duration_ms, 99 requests=client.request_count, 100 error=str(e), 101 ) 102 finally: 103 client.close() 104 105 106# ---------- test functions ---------- 107 108 109def test_admin(client: CountingClient) -> bool: 110 """Test admin/health endpoints.""" 111 # health 112 if not QUIET: 113 console.print("[bold]GET /health[/bold]") 114 resp = client.get("/health") 115 if resp.status_code != 200: 116 if not QUIET: 117 console.print(f"[red]FAIL[/red]: {resp.status_code}") 118 return False 119 if not QUIET: 120 console.print(f" {resp.text}") 121 122 # version 123 if not QUIET: 124 console.print("[bold]GET /admin/version[/bold]") 125 resp = client.get("/admin/version") 126 if resp.status_code != 200: 127 if not QUIET: 128 console.print(f"[red]FAIL[/red]: {resp.status_code}") 129 return False 130 if not QUIET: 131 console.print(f" {resp.json()}") 132 133 # csrf-token 134 if not QUIET: 135 console.print("[bold]GET /csrf-token[/bold]") 136 resp = client.get("/csrf-token", params={"client": "test-client"}) 137 if resp.status_code == 200: 138 if not QUIET: 139 console.print(f" token received") 140 elif resp.status_code == 422: 141 if not QUIET: 142 console.print(f" csrf protection disabled (ok)") 143 else: 144 if not QUIET: 145 console.print(f"[red]FAIL[/red]: {resp.status_code}") 146 return False 147 148 return True 149 150 151def test_flow_run(client: CountingClient, should_fail: bool = False) -> bool: 152 """Test flow run lifecycle.""" 153 suffix = "fail" if should_fail else "success" 154 if not QUIET: 155 console.print(f"server: {BASE_URL}\n") 156 157 # create flow 158 if not QUIET: 159 console.print("[bold]1. POST /flows/[/bold]") 160 resp = client.post("/flows/", json={"name": f"bench-flow-{suffix}"}) 161 if resp.status_code not in (200, 201): 162 if not QUIET: 163 console.print(f"[red]FAIL[/red]: {resp.status_code} {resp.text}") 164 return False 165 flow = resp.json() 166 if not validate_response(flow, ["id", "name", "created", "updated"], {"id": str, "name": str}): 167 return False 168 if not QUIET: 169 console.print(f" flow_id: {flow.get('id')}") 170 171 # create flow run 172 if not QUIET: 173 console.print("\n[bold]2. POST /flow_runs/[/bold]") 174 resp = client.post("/flow_runs/", json={ 175 "flow_id": flow["id"], 176 "name": f"run-{uuid.uuid4().hex[:8]}", 177 "state": {"type": "PENDING", "name": "Pending"}, 178 }) 179 if resp.status_code not in (200, 201): 180 if not QUIET: 181 console.print(f"[red]FAIL[/red]: {resp.status_code} {resp.text}") 182 return False 183 flow_run = resp.json() 184 if not validate_response(flow_run, ["id", "name", "flow_id", "state_type"], {"id": str, "name": str}): 185 return False 186 flow_run_id = flow_run.get("id") 187 if not QUIET: 188 console.print(f" flow_run_id: {flow_run_id}") 189 190 # read flow run 191 if not QUIET: 192 console.print("\n[bold]3. GET /flow_runs/{id}[/bold]") 193 resp = client.get(f"/flow_runs/{flow_run_id}") 194 if resp.status_code != 200: 195 if not QUIET: 196 console.print(f"[red]FAIL[/red]: {resp.status_code}") 197 return False 198 if not QUIET: 199 console.print(f" state: {resp.json().get('state_type')}") 200 201 # set RUNNING 202 if not QUIET: 203 console.print("\n[bold]4. POST /flow_runs/{id}/set_state (RUNNING)[/bold]") 204 resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={ 205 "state": {"type": "RUNNING", "name": "Running"}, 206 "force": False, 207 }) 208 if resp.status_code not in (200, 201): 209 if not QUIET: 210 console.print(f"[red]FAIL[/red]: {resp.status_code}") 211 return False 212 if not QUIET: 213 console.print(f" status: {resp.json().get('status')}") 214 215 # set final state 216 final_type = "FAILED" if should_fail else "COMPLETED" 217 final_name = "Failed" if should_fail else "Completed" 218 if not QUIET: 219 console.print(f"\n[bold]5. POST /flow_runs/{{id}}/set_state ({final_type})[/bold]") 220 resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={ 221 "state": {"type": final_type, "name": final_name}, 222 "force": False, 223 }) 224 if resp.status_code not in (200, 201): 225 if not QUIET: 226 console.print(f"[red]FAIL[/red]: {resp.status_code}") 227 return False 228 if not QUIET: 229 console.print(f" status: {resp.json().get('status')}") 230 231 # verify 232 if not QUIET: 233 console.print("\n[bold]6. GET /flow_runs/{id} (verify)[/bold]") 234 resp = client.get(f"/flow_runs/{flow_run_id}") 235 if resp.status_code != 200: 236 return False 237 actual_type = resp.json().get("state_type") 238 if actual_type != final_type: 239 if not QUIET: 240 console.print(f"[red]FAIL[/red]: expected {final_type}, got {actual_type}") 241 return False 242 if not QUIET: 243 console.print(f" [green]state: {actual_type} (correct)[/green]") 244 245 return True 246 247 248def test_flow_with_task_runs(client: CountingClient) -> bool: 249 """Test flow execution with nested task runs - simulates real flow execution.""" 250 251 def fail(msg: str) -> bool: 252 if not QUIET: console.print(f"[red]FAIL[/red]: {msg}") 253 return False 254 255 def log(msg: str) -> None: 256 if not QUIET: console.print(msg) 257 258 # setup: create flow 259 log("[bold]setup: create flow[/bold]") 260 resp = client.post("/flows/", json={"name": f"flow-with-tasks-{uuid.uuid4().hex[:8]}"}) 261 if resp.status_code not in (200, 201): return fail(f"create flow {resp.status_code}") 262 flow_id = resp.json().get("id") 263 log(f" flow_id: {flow_id}") 264 265 # create flow run 266 log("[bold]1. create flow run[/bold]") 267 resp = client.post("/flow_runs/", json={ 268 "flow_id": flow_id, 269 "name": f"run-with-tasks-{uuid.uuid4().hex[:8]}", 270 "state": {"type": "PENDING", "name": "Pending"}, 271 }) 272 if resp.status_code not in (200, 201): return fail(f"create flow run {resp.status_code}") 273 flow_run_id = resp.json().get("id") 274 log(f" flow_run_id: {flow_run_id}") 275 276 # start flow run 277 log("[bold]2. start flow run (RUNNING)[/bold]") 278 resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={ 279 "state": {"type": "RUNNING", "name": "Running"}, 280 }) 281 if resp.status_code not in (200, 201) or resp.json().get("status") != "ACCEPT": 282 return fail(f"start flow run failed") 283 log(f" flow run started") 284 285 # create and execute task 1 286 log("[bold]3. execute task 1: extract[/bold]") 287 resp = client.post("/task_runs/", json={ 288 "flow_run_id": flow_run_id, 289 "task_key": "extract_data", 290 "dynamic_key": "extract-0", 291 "name": "extract_data-0", 292 "state": {"type": "PENDING", "name": "Pending"}, 293 }) 294 if resp.status_code not in (200, 201): return fail(f"create task 1 {resp.status_code}") 295 task1_id = resp.json().get("id") 296 297 # run task 1 298 resp = client.post(f"/task_runs/{task1_id}/set_state", json={ 299 "state": {"type": "RUNNING", "name": "Running"}, 300 }) 301 if resp.status_code not in (200, 201): return fail(f"task 1 RUNNING {resp.status_code}") 302 303 resp = client.post(f"/task_runs/{task1_id}/set_state", json={ 304 "state": {"type": "COMPLETED", "name": "Completed"}, 305 }) 306 if resp.status_code not in (200, 201): return fail(f"task 1 COMPLETED {resp.status_code}") 307 log(f" task 1 completed") 308 309 # create and execute task 2 (depends on task 1) 310 log("[bold]4. execute task 2: transform[/bold]") 311 resp = client.post("/task_runs/", json={ 312 "flow_run_id": flow_run_id, 313 "task_key": "transform_data", 314 "dynamic_key": "transform-0", 315 "name": "transform_data-0", 316 "state": {"type": "PENDING", "name": "Pending"}, 317 }) 318 if resp.status_code not in (200, 201): return fail(f"create task 2 {resp.status_code}") 319 task2_id = resp.json().get("id") 320 321 resp = client.post(f"/task_runs/{task2_id}/set_state", json={ 322 "state": {"type": "RUNNING", "name": "Running"}, 323 }) 324 resp = client.post(f"/task_runs/{task2_id}/set_state", json={ 325 "state": {"type": "COMPLETED", "name": "Completed"}, 326 }) 327 if resp.status_code not in (200, 201): return fail(f"task 2 COMPLETED {resp.status_code}") 328 log(f" task 2 completed") 329 330 # create and execute task 3 331 log("[bold]5. execute task 3: load[/bold]") 332 resp = client.post("/task_runs/", json={ 333 "flow_run_id": flow_run_id, 334 "task_key": "load_data", 335 "dynamic_key": "load-0", 336 "name": "load_data-0", 337 "state": {"type": "PENDING", "name": "Pending"}, 338 }) 339 if resp.status_code not in (200, 201): return fail(f"create task 3 {resp.status_code}") 340 task3_id = resp.json().get("id") 341 342 resp = client.post(f"/task_runs/{task3_id}/set_state", json={ 343 "state": {"type": "RUNNING", "name": "Running"}, 344 }) 345 resp = client.post(f"/task_runs/{task3_id}/set_state", json={ 346 "state": {"type": "COMPLETED", "name": "Completed"}, 347 }) 348 if resp.status_code not in (200, 201): return fail(f"task 3 COMPLETED {resp.status_code}") 349 log(f" task 3 completed") 350 351 # complete flow run 352 log("[bold]6. complete flow run[/bold]") 353 resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={ 354 "state": {"type": "COMPLETED", "name": "Completed"}, 355 }) 356 if resp.status_code not in (200, 201) or resp.json().get("status") != "ACCEPT": 357 return fail(f"complete flow run failed") 358 log(f" [green]flow run completed[/green]") 359 360 # verify task runs can be read individually and are completed 361 log("[bold]7. verify task runs completed[/bold]") 362 for task_id, task_name in [(task1_id, "extract"), (task2_id, "transform"), (task3_id, "load")]: 363 resp = client.get(f"/task_runs/{task_id}") 364 if resp.status_code != 200: return fail(f"read task {task_name} {resp.status_code}") 365 task_run = resp.json() 366 if task_run.get("flow_run_id") != flow_run_id: 367 return fail(f"task {task_name} not linked to flow run") 368 if task_run.get("state_type") != "COMPLETED": 369 return fail(f"task {task_name} not COMPLETED") 370 log(f" [green]all 3 tasks completed and linked to flow run[/green]") 371 372 return True 373 374 375def test_task_run(client: CountingClient) -> bool: 376 """Test task run lifecycle.""" 377 # create 378 if not QUIET: 379 console.print("[bold]POST /task_runs/[/bold]") 380 resp = client.post("/task_runs/", json={ 381 "task_key": "bench-task", 382 "dynamic_key": f"key-{uuid.uuid4().hex[:8]}", 383 "name": f"task-{uuid.uuid4().hex[:8]}", 384 "state": {"type": "PENDING", "name": "Pending"}, 385 }) 386 if resp.status_code not in (200, 201): 387 if not QUIET: 388 console.print(f"[red]FAIL[/red]: {resp.status_code}") 389 return False 390 task_run_id = resp.json().get("id") 391 if not QUIET: 392 console.print(f" task_run_id: {task_run_id}") 393 394 # read 395 resp = client.get(f"/task_runs/{task_run_id}") 396 if resp.status_code != 200: 397 return False 398 399 # RUNNING 400 resp = client.post(f"/task_runs/{task_run_id}/set_state", json={ 401 "state": {"type": "RUNNING", "name": "Running"}, 402 "force": False, 403 }) 404 if resp.status_code not in (200, 201): 405 return False 406 if not QUIET: 407 console.print(f" -> RUNNING: {resp.json().get('status')}") 408 409 # COMPLETED 410 resp = client.post(f"/task_runs/{task_run_id}/set_state", json={ 411 "state": {"type": "COMPLETED", "name": "Completed"}, 412 "force": False, 413 }) 414 if resp.status_code not in (200, 201): 415 return False 416 if not QUIET: 417 console.print(f" -> COMPLETED: {resp.json().get('status')}") 418 419 return True 420 421 422def test_filters(client: CountingClient) -> bool: 423 """Test filter endpoints.""" 424 for endpoint, label in [ 425 ("/flows/filter", "flows"), 426 ("/flow_runs/filter", "flow_runs"), 427 ("/task_runs/filter", "task_runs"), 428 ]: 429 resp = client.post(endpoint, json={}) 430 if resp.status_code != 200: 431 if not QUIET: 432 console.print(f"[red]FAIL[/red]: {endpoint} {resp.status_code}") 433 return False 434 if not QUIET: 435 console.print(f" {label}: {len(resp.json())} items") 436 437 return True 438 439 440def test_logs(client: CountingClient) -> bool: 441 """Test logs endpoint.""" 442 from datetime import datetime, timezone 443 444 logs = [ 445 {"level": 20, "message": "test log 1", "name": "test", "timestamp": datetime.now(timezone.utc).isoformat()}, 446 {"level": 30, "message": "test log 2", "name": "test", "timestamp": datetime.now(timezone.utc).isoformat()}, 447 ] 448 resp = client.post("/logs/", json=logs) 449 if resp.status_code not in (200, 201, 204): 450 if not QUIET: 451 console.print(f"[red]FAIL[/red]: {resp.status_code}") 452 return False 453 if not QUIET: 454 console.print(f" {len(logs)} logs sent") 455 return True 456 457 458def test_variables(client: CountingClient) -> bool: 459 """Test variables API (CRUD).""" 460 var_name = f"bench-var-{uuid.uuid4().hex[:8]}" 461 462 # create 463 if not QUIET: 464 console.print("[bold]POST /variables/[/bold]") 465 resp = client.post("/variables/", json={ 466 "name": var_name, 467 "value": {"nested": "object", "count": 42}, 468 "tags": ["benchmark", "test"], 469 }) 470 if resp.status_code != 201: 471 if not QUIET: 472 console.print(f"[red]FAIL[/red]: create {resp.status_code}") 473 return False 474 variable = resp.json() 475 if not validate_response(variable, ["id", "name", "value", "tags", "created", "updated"], {"id": str, "name": str, "tags": list}): 476 return False 477 var_id = variable.get("id") 478 if not QUIET: 479 console.print(f" created: {var_id}") 480 481 # get by name 482 resp = client.get(f"/variables/name/{var_name}") 483 if resp.status_code != 200: 484 return False 485 if not QUIET: 486 console.print(f" get by name: ok") 487 488 # get by id 489 resp = client.get(f"/variables/{var_id}") 490 if resp.status_code != 200: 491 return False 492 493 # update by name 494 resp = client.patch(f"/variables/name/{var_name}", json={"value": "updated"}) 495 if resp.status_code != 204: 496 return False 497 if not QUIET: 498 console.print(f" updated by name") 499 500 # filter 501 resp = client.post("/variables/filter", json={"limit": 10}) 502 if resp.status_code != 200: 503 return False 504 if not QUIET: 505 console.print(f" filter: {len(resp.json())} items") 506 507 # count 508 resp = client.post("/variables/count", json={}) 509 if resp.status_code != 200: 510 return False 511 if not QUIET: 512 console.print(f" count: {resp.text}") 513 514 # duplicate name should fail 515 resp = client.post("/variables/", json={"name": var_name, "value": "dupe"}) 516 if resp.status_code != 409: 517 if not QUIET: 518 console.print(f"[red]FAIL[/red]: duplicate should return 409, got {resp.status_code}") 519 return False 520 if not QUIET: 521 console.print(f" duplicate rejected: 409") 522 523 # delete 524 resp = client.delete(f"/variables/name/{var_name}") 525 if resp.status_code != 204: 526 return False 527 if not QUIET: 528 console.print(f" deleted") 529 530 return True 531 532 533def test_blocks(client: CountingClient) -> bool: 534 """Test blocks API (types, schemas, documents).""" 535 slug = f"bench-block-{uuid.uuid4().hex[:8]}" 536 537 # create block type 538 if not QUIET: 539 console.print("[bold]block_types[/bold]") 540 resp = client.post("/block_types/", json={ 541 "name": f"Bench Block {slug}", 542 "slug": slug, 543 "description": "benchmark block type", 544 }) 545 if resp.status_code not in (200, 201): 546 if not QUIET: 547 console.print(f"[red]FAIL[/red]: create block_type {resp.status_code}") 548 return False 549 block_type = resp.json() 550 if not validate_response(block_type, ["id", "name", "slug"], {"id": str, "name": str, "slug": str}): 551 return False 552 block_type_id = block_type.get("id") 553 if not QUIET: 554 console.print(f" created: {block_type_id}") 555 556 # get by slug 557 resp = client.get(f"/block_types/slug/{slug}") 558 if resp.status_code != 200: 559 return False 560 561 # create schema 562 if not QUIET: 563 console.print("[bold]block_schemas[/bold]") 564 resp = client.post("/block_schemas/", json={ 565 "block_type_id": block_type_id, 566 "fields": {"properties": {"value": {"type": "string"}}}, 567 "capabilities": ["test"], 568 "version": "1.0.0", 569 }) 570 if resp.status_code not in (200, 201): 571 if not QUIET: 572 console.print(f"[red]FAIL[/red]: create block_schema {resp.status_code}") 573 return False 574 block_schema = resp.json() 575 block_schema_id = block_schema.get("id") 576 checksum = block_schema.get("checksum") 577 if not QUIET: 578 console.print(f" created: {block_schema_id}") 579 580 # get by checksum 581 resp = client.get(f"/block_schemas/checksum/{checksum}") 582 if resp.status_code != 200: 583 return False 584 585 # create document 586 if not QUIET: 587 console.print("[bold]block_documents[/bold]") 588 doc_name = f"bench-doc-{uuid.uuid4().hex[:8]}" 589 resp = client.post("/block_documents/", json={ 590 "name": doc_name, 591 "block_type_id": block_type_id, 592 "block_schema_id": block_schema_id, 593 "data": {"value": "secret-value"}, 594 }) 595 if resp.status_code not in (200, 201): 596 if not QUIET: 597 console.print(f"[red]FAIL[/red]: create block_document {resp.status_code}") 598 return False 599 block_doc = resp.json() 600 block_doc_id = block_doc.get("id") 601 if not QUIET: 602 console.print(f" created: {block_doc_id}") 603 604 # get by id 605 resp = client.get(f"/block_documents/{block_doc_id}") 606 if resp.status_code != 200: 607 return False 608 609 # get by slug/name 610 resp = client.get(f"/block_types/slug/{slug}/block_documents/name/{doc_name}") 611 if resp.status_code != 200: 612 return False 613 614 # update 615 resp = client.patch(f"/block_documents/{block_doc_id}", json={"data": {"value": "updated"}}) 616 if resp.status_code != 204: 617 return False 618 if not QUIET: 619 console.print(f" updated") 620 621 # filters 622 for endpoint in ["/block_types/filter", "/block_schemas/filter", "/block_documents/filter"]: 623 resp = client.post(endpoint, json={}) 624 if resp.status_code != 200: 625 return False 626 627 # delete 628 resp = client.delete(f"/block_documents/{block_doc_id}") 629 if resp.status_code != 204: 630 return False 631 if not QUIET: 632 console.print(f" deleted") 633 634 return True 635 636 637def test_work_pools(client: CountingClient) -> bool: 638 """Test work pools API (pools, queues, workers).""" 639 pool_name = f"test-pool-{uuid.uuid4().hex[:8]}" 640 641 # create work pool 642 if not QUIET: 643 console.print("[bold]work_pools[/bold]") 644 resp = client.post("/work_pools/", json={ 645 "name": pool_name, 646 "type": "process", 647 "description": "test work pool", 648 }) 649 if resp.status_code not in (200, 201): 650 if not QUIET: 651 console.print(f"[red]FAIL[/red]: create work_pool {resp.status_code} {resp.text}") 652 return False 653 pool = resp.json() 654 if not validate_response(pool, ["id", "name", "type", "status", "default_queue_id"], {"id": str, "name": str}): 655 return False 656 if not QUIET: 657 console.print(f" created: {pool.get('id')}") 658 659 # check default queue was created 660 if not pool.get("default_queue_id"): 661 if not QUIET: 662 console.print("[red]FAIL[/red]: no default_queue_id") 663 return False 664 665 # get by name 666 resp = client.get(f"/work_pools/{pool_name}") 667 if resp.status_code != 200: 668 if not QUIET: 669 console.print(f"[red]FAIL[/red]: get work_pool {resp.status_code}") 670 return False 671 672 # update 673 resp = client.patch(f"/work_pools/{pool_name}", json={"description": "updated"}) 674 if resp.status_code != 204: 675 if not QUIET: 676 console.print(f"[red]FAIL[/red]: update work_pool {resp.status_code}") 677 return False 678 if not QUIET: 679 console.print(" updated") 680 681 # filter 682 resp = client.post("/work_pools/filter", json={}) 683 if resp.status_code != 200: 684 if not QUIET: 685 console.print(f"[red]FAIL[/red]: filter work_pools {resp.status_code}") 686 return False 687 pools = resp.json() 688 if not isinstance(pools, list): 689 return False 690 691 # create queue 692 if not QUIET: 693 console.print("[bold]work_queues[/bold]") 694 queue_name = f"test-queue-{uuid.uuid4().hex[:8]}" 695 resp = client.post(f"/work_pools/{pool_name}/queues/", json={ 696 "name": queue_name, 697 "description": "test queue", 698 "priority": 5, 699 }) 700 if resp.status_code not in (200, 201): 701 if not QUIET: 702 console.print(f"[red]FAIL[/red]: create work_queue {resp.status_code} {resp.text}") 703 return False 704 queue = resp.json() 705 if not validate_response(queue, ["id", "name", "priority", "work_pool_id"], {"id": str, "priority": int}): 706 return False 707 if not QUIET: 708 console.print(f" created: {queue.get('id')}") 709 710 # get queue 711 resp = client.get(f"/work_pools/{pool_name}/queues/{queue_name}") 712 if resp.status_code != 200: 713 if not QUIET: 714 console.print(f"[red]FAIL[/red]: get work_queue {resp.status_code}") 715 return False 716 717 # filter queues 718 resp = client.post(f"/work_pools/{pool_name}/queues/filter", json={}) 719 if resp.status_code != 200: 720 if not QUIET: 721 console.print(f"[red]FAIL[/red]: filter work_queues {resp.status_code}") 722 return False 723 queues = resp.json() 724 if not isinstance(queues, list) or len(queues) < 2: # default + our queue 725 if not QUIET: 726 console.print(f"[red]FAIL[/red]: expected at least 2 queues, got {len(queues) if isinstance(queues, list) else 'not a list'}") 727 return False 728 729 # worker heartbeat 730 if not QUIET: 731 console.print("[bold]workers[/bold]") 732 resp = client.post(f"/work_pools/{pool_name}/workers/heartbeat", json={ 733 "name": "test-worker-1", 734 "heartbeat_interval_seconds": 30, 735 }) 736 if resp.status_code != 204: 737 if not QUIET: 738 console.print(f"[red]FAIL[/red]: worker heartbeat {resp.status_code} {resp.text}") 739 return False 740 if not QUIET: 741 console.print(" heartbeat sent") 742 743 # check pool status is now READY 744 resp = client.get(f"/work_pools/{pool_name}") 745 if resp.status_code != 200: 746 return False 747 pool = resp.json() 748 if pool.get("status") != "READY": 749 if not QUIET: 750 console.print(f"[red]FAIL[/red]: expected pool status READY, got {pool.get('status')}") 751 return False 752 if not QUIET: 753 console.print(" pool status: READY") 754 755 # filter workers 756 resp = client.post(f"/work_pools/{pool_name}/workers/filter", json={}) 757 if resp.status_code != 200: 758 if not QUIET: 759 console.print(f"[red]FAIL[/red]: filter workers {resp.status_code}") 760 return False 761 workers = resp.json() 762 if not isinstance(workers, list) or len(workers) < 1: 763 return False 764 765 # delete queue (not default) 766 resp = client.delete(f"/work_pools/{pool_name}/queues/{queue_name}") 767 if resp.status_code != 204: 768 if not QUIET: 769 console.print(f"[red]FAIL[/red]: delete work_queue {resp.status_code}") 770 return False 771 if not QUIET: 772 console.print(" deleted queue") 773 774 # delete pool 775 resp = client.delete(f"/work_pools/{pool_name}") 776 if resp.status_code != 204: 777 if not QUIET: 778 console.print(f"[red]FAIL[/red]: delete work_pool {resp.status_code}") 779 return False 780 if not QUIET: 781 console.print(" deleted pool") 782 783 return True 784 785 786def test_deployments(client: CountingClient) -> bool: 787 """Test deployments API (deployments, schedules, create_flow_run).""" 788 # create a flow first 789 if not QUIET: 790 console.print("[bold]setup: create flow[/bold]") 791 resp = client.post("/flows/", json={"name": f"deploy-flow-{uuid.uuid4().hex[:8]}"}) 792 if resp.status_code not in (200, 201): 793 if not QUIET: 794 console.print(f"[red]FAIL[/red]: create flow {resp.status_code}") 795 return False 796 flow = resp.json() 797 flow_id = flow.get("id") 798 flow_name = flow.get("name") 799 800 # create deployment 801 deployment_name = f"test-deployment-{uuid.uuid4().hex[:8]}" 802 if not QUIET: 803 console.print("[bold]deployments[/bold]") 804 resp = client.post("/deployments/", json={ 805 "name": deployment_name, 806 "flow_id": flow_id, 807 "description": "test deployment", 808 "tags": ["test", "benchmark"], 809 "parameters": {"key": "value"}, 810 "schedules": [ 811 {"schedule": {"interval": 3600}, "active": True}, 812 ], 813 }) 814 if resp.status_code not in (200, 201): 815 if not QUIET: 816 console.print(f"[red]FAIL[/red]: create deployment {resp.status_code} {resp.text}") 817 return False 818 deployment = resp.json() 819 if not validate_response(deployment, ["id", "name", "flow_id", "status", "schedules"], {"id": str, "name": str}): 820 return False 821 deployment_id = deployment.get("id") 822 if not QUIET: 823 console.print(f" created: {deployment_id}") 824 825 # verify schedules were created 826 schedules = deployment.get("schedules", []) 827 if not isinstance(schedules, list) or len(schedules) != 1: 828 if not QUIET: 829 console.print(f"[red]FAIL[/red]: expected 1 schedule, got {len(schedules) if isinstance(schedules, list) else 'not a list'}") 830 return False 831 if not QUIET: 832 console.print(f" schedules: {len(schedules)}") 833 834 # get by id 835 resp = client.get(f"/deployments/{deployment_id}") 836 if resp.status_code != 200: 837 if not QUIET: 838 console.print(f"[red]FAIL[/red]: get deployment {resp.status_code}") 839 return False 840 841 # get by name 842 resp = client.get(f"/deployments/name/{flow_name}/{deployment_name}") 843 if resp.status_code != 200: 844 if not QUIET: 845 console.print(f"[red]FAIL[/red]: get deployment by name {resp.status_code}") 846 return False 847 if not QUIET: 848 console.print(" get by name: ok") 849 850 # update 851 resp = client.patch(f"/deployments/{deployment_id}", json={"description": "updated"}) 852 if resp.status_code != 204: 853 if not QUIET: 854 console.print(f"[red]FAIL[/red]: update deployment {resp.status_code}") 855 return False 856 if not QUIET: 857 console.print(" updated") 858 859 # filter 860 resp = client.post("/deployments/filter", json={"limit": 10}) 861 if resp.status_code != 200: 862 if not QUIET: 863 console.print(f"[red]FAIL[/red]: filter deployments {resp.status_code}") 864 return False 865 if not QUIET: 866 console.print(f" filter: {len(resp.json())} items") 867 868 # count 869 resp = client.post("/deployments/count", json={}) 870 if resp.status_code != 200: 871 if not QUIET: 872 console.print(f"[red]FAIL[/red]: count deployments {resp.status_code}") 873 return False 874 if not QUIET: 875 console.print(f" count: {resp.text}") 876 877 # pause 878 resp = client.post(f"/deployments/{deployment_id}/pause_deployment", json={}) 879 if resp.status_code != 204: 880 if not QUIET: 881 console.print(f"[red]FAIL[/red]: pause deployment {resp.status_code}") 882 return False 883 if not QUIET: 884 console.print(" paused") 885 886 # resume 887 resp = client.post(f"/deployments/{deployment_id}/resume_deployment", json={}) 888 if resp.status_code != 204: 889 if not QUIET: 890 console.print(f"[red]FAIL[/red]: resume deployment {resp.status_code}") 891 return False 892 if not QUIET: 893 console.print(" resumed") 894 895 # create flow run from deployment 896 if not QUIET: 897 console.print("[bold]create_flow_run[/bold]") 898 resp = client.post(f"/deployments/{deployment_id}/create_flow_run", json={}) 899 if resp.status_code not in (200, 201): 900 if not QUIET: 901 console.print(f"[red]FAIL[/red]: create flow run {resp.status_code} {resp.text}") 902 return False 903 flow_run = resp.json() 904 if not validate_response(flow_run, ["id", "flow_id", "deployment_id"], {"id": str}): 905 return False 906 if flow_run.get("deployment_id") != deployment_id: 907 if not QUIET: 908 console.print(f"[red]FAIL[/red]: deployment_id mismatch") 909 return False 910 if not QUIET: 911 console.print(f" created flow run: {flow_run.get('id')}") 912 913 # schedules - list 914 if not QUIET: 915 console.print("[bold]deployment_schedules[/bold]") 916 resp = client.get(f"/deployments/{deployment_id}/schedules") 917 if resp.status_code != 200: 918 if not QUIET: 919 console.print(f"[red]FAIL[/red]: list schedules {resp.status_code}") 920 return False 921 schedules = resp.json() 922 schedule_id = schedules[0].get("id") if schedules else None 923 if not QUIET: 924 console.print(f" list: {len(schedules)} schedules") 925 926 # schedules - create 927 resp = client.post(f"/deployments/{deployment_id}/schedules", json={ 928 "schedule": {"cron": "0 0 * * *"}, 929 "active": False, 930 }) 931 if resp.status_code not in (200, 201): 932 if not QUIET: 933 console.print(f"[red]FAIL[/red]: create schedule {resp.status_code}") 934 return False 935 created_schedules = resp.json() 936 if not isinstance(created_schedules, list) or len(created_schedules) != 1: 937 if not QUIET: 938 console.print(f"[red]FAIL[/red]: expected 1 created schedule") 939 return False 940 new_schedule_id = created_schedules[0].get("id") 941 if not QUIET: 942 console.print(f" created schedule: {new_schedule_id}") 943 944 # schedules - update 945 resp = client.patch(f"/deployments/{deployment_id}/schedules/{new_schedule_id}", json={"active": True}) 946 if resp.status_code != 204: 947 if not QUIET: 948 console.print(f"[red]FAIL[/red]: update schedule {resp.status_code}") 949 return False 950 if not QUIET: 951 console.print(" updated schedule") 952 953 # schedules - delete 954 resp = client.delete(f"/deployments/{deployment_id}/schedules/{new_schedule_id}") 955 if resp.status_code != 204: 956 if not QUIET: 957 console.print(f"[red]FAIL[/red]: delete schedule {resp.status_code}") 958 return False 959 if not QUIET: 960 console.print(" deleted schedule") 961 962 # delete deployment 963 resp = client.delete(f"/deployments/{deployment_id}") 964 if resp.status_code != 204: 965 if not QUIET: 966 console.print(f"[red]FAIL[/red]: delete deployment {resp.status_code}") 967 return False 968 if not QUIET: 969 console.print(" deleted deployment") 970 971 return True 972 973 974def test_scheduler_idempotency(client: CountingClient) -> bool: 975 """Test that scheduler is idempotent - running twice doesn't create duplicates.""" 976 import time as time_mod 977 978 def fail(msg: str) -> bool: 979 if not QUIET: console.print(f"[red]FAIL[/red]: {msg}") 980 return False 981 982 def log(msg: str) -> None: 983 if not QUIET: console.print(msg) 984 985 def wait_for_runs(deployment_id: str, min_count: int, max_wait: float = 10.0) -> int: 986 """Poll until we have at least min_count runs, or timeout (client-side filter).""" 987 start = time_mod.time() 988 while time_mod.time() - start < max_wait: 989 resp = client.post("/flow_runs/filter", json={"limit": 100}) 990 if resp.status_code == 200: 991 all_runs = resp.json() 992 matching = [r for r in all_runs if r.get("deployment_id") == deployment_id] 993 if len(matching) >= min_count: 994 return len(matching) 995 time_mod.sleep(0.5) 996 return 0 997 998 # setup: create flow, work pool, deployment with interval schedule 999 log("[bold]setup[/bold]") 1000 resp = client.post("/flows/", json={"name": f"idem-flow-{uuid.uuid4().hex[:8]}"}) 1001 if resp.status_code not in (200, 201): return fail(f"create flow {resp.status_code}") 1002 flow_id = resp.json().get("id") 1003 1004 pool_name = f"idem-pool-{uuid.uuid4().hex[:8]}" 1005 resp = client.post("/work_pools/", json={"name": pool_name, "type": "process"}) 1006 if resp.status_code not in (200, 201): return fail(f"create work_pool {resp.status_code}") 1007 log(f" pool: {pool_name}") 1008 1009 # create deployment with interval schedule (every hour) 1010 resp = client.post("/deployments/", json={ 1011 "name": f"idem-deploy-{uuid.uuid4().hex[:8]}", 1012 "flow_id": flow_id, 1013 "work_pool_name": pool_name, 1014 "schedules": [{"schedule": {"interval": 3600}, "active": True}], # every hour 1015 }) 1016 if resp.status_code not in (200, 201): return fail(f"create deployment {resp.status_code}") 1017 deployment = resp.json() 1018 deployment_id = deployment.get("id") 1019 log(f" deployment: {deployment_id}") 1020 1021 # poll for scheduler to create runs and wait for tick to complete 1022 # we need to wait for count to stabilize, not just see the first run 1023 log("[bold]waiting for scheduler tick to complete...[/bold]") 1024 count_after_first = wait_for_runs(deployment_id, min_count=1) 1025 # wait for count to stabilize (scheduler tick may still be inserting) 1026 for _ in range(10): 1027 time_mod.sleep(0.2) 1028 resp = client.post("/flow_runs/filter", json={"limit": 100}) 1029 if resp.status_code == 200: 1030 matching = [r for r in resp.json() if r.get("deployment_id") == deployment_id] 1031 if len(matching) == count_after_first: 1032 break # count stable 1033 count_after_first = len(matching) 1034 log(f" runs after first tick: {count_after_first}") 1035 1036 if count_after_first == 0: 1037 return fail("scheduler did not create any runs (timeout)") 1038 1039 # verify idempotency: poll for a few seconds to confirm count stays stable 1040 # if scheduler creates duplicates, count would increase during this window 1041 log("[bold]verifying idempotency (polling to confirm count stable)...[/bold]") 1042 stable_checks = 0 1043 for _ in range(6): # 6 checks over ~3 seconds 1044 time_mod.sleep(0.5) 1045 resp = client.post("/flow_runs/filter", json={"limit": 100}) 1046 if resp.status_code != 200: continue 1047 all_runs = resp.json() 1048 matching = [r for r in all_runs if r.get("deployment_id") == deployment_id] 1049 current_count = len(matching) 1050 if current_count == count_after_first: 1051 stable_checks += 1 1052 elif current_count > count_after_first: 1053 return fail(f"idempotency failed: {count_after_first} -> {current_count} runs") 1054 1055 log(f" stable count checks: {stable_checks}/6") 1056 log(f" [green]idempotency verified: count unchanged at {count_after_first}[/green]") 1057 1058 # cleanup 1059 client.delete(f"/deployments/{deployment_id}") 1060 client.delete(f"/work_pools/{pool_name}") 1061 log(" cleanup: ok") 1062 1063 return True 1064 1065 1066def test_parameter_merging(client: CountingClient) -> bool: 1067 """Test that schedule parameters override deployment parameters.""" 1068 import time as time_mod 1069 1070 def fail(msg: str) -> bool: 1071 if not QUIET: console.print(f"[red]FAIL[/red]: {msg}") 1072 return False 1073 1074 def log(msg: str) -> None: 1075 if not QUIET: console.print(msg) 1076 1077 # setup 1078 log("[bold]setup[/bold]") 1079 resp = client.post("/flows/", json={"name": f"params-flow-{uuid.uuid4().hex[:8]}"}) 1080 if resp.status_code not in (200, 201): return fail(f"create flow {resp.status_code}") 1081 flow_id = resp.json().get("id") 1082 1083 pool_name = f"params-pool-{uuid.uuid4().hex[:8]}" 1084 resp = client.post("/work_pools/", json={"name": pool_name, "type": "process"}) 1085 if resp.status_code not in (200, 201): return fail(f"create work_pool {resp.status_code}") 1086 log(f" pool: {pool_name}") 1087 1088 # create deployment with base parameters 1089 # schedule has override parameter 1090 resp = client.post("/deployments/", json={ 1091 "name": f"params-deploy-{uuid.uuid4().hex[:8]}", 1092 "flow_id": flow_id, 1093 "work_pool_name": pool_name, 1094 "parameters": {"base_key": "base_value", "override_key": "deployment_value"}, 1095 "schedules": [{ 1096 "schedule": {"interval": 3600}, 1097 "active": True, 1098 "parameters": {"override_key": "schedule_value", "schedule_key": "schedule_only"}, 1099 }], 1100 }) 1101 if resp.status_code not in (200, 201): return fail(f"create deployment {resp.status_code}") 1102 deployment = resp.json() 1103 deployment_id = deployment.get("id") 1104 log(f" deployment: {deployment_id}") 1105 log(f" deployment params: {deployment.get('parameters')}") 1106 1107 # poll for scheduler to create runs (filter client-side since server filter doesn't support deployment_id yet) 1108 log("[bold]polling for scheduler to create runs...[/bold]") 1109 runs = [] 1110 start = time_mod.time() 1111 max_wait = 10.0 1112 while time_mod.time() - start < max_wait: 1113 resp = client.post("/flow_runs/filter", json={"limit": 100}) 1114 if resp.status_code == 200: 1115 all_runs = resp.json() 1116 runs = [r for r in all_runs if r.get("deployment_id") == deployment_id] 1117 if len(runs) > 0: 1118 break 1119 time_mod.sleep(0.5) 1120 log(f" found {len(runs)} runs for deployment in {time_mod.time() - start:.1f}s") 1121 1122 if len(runs) == 0: 1123 return fail("scheduler did not create any runs (timeout)") 1124 1125 # check merged parameters on first run 1126 run_params = runs[0].get("parameters", {}) 1127 if isinstance(run_params, str): 1128 import json as json_mod 1129 run_params = json_mod.loads(run_params) 1130 log(f" run params: {run_params}") 1131 1132 # verify merging: 1133 # - base_key should be from deployment 1134 # - override_key should be from schedule (override) 1135 # - schedule_key should be from schedule (new key) 1136 if run_params.get("base_key") != "base_value": 1137 return fail(f"base_key not preserved: {run_params.get('base_key')}") 1138 if run_params.get("override_key") != "schedule_value": 1139 return fail(f"override_key not overridden: {run_params.get('override_key')}") 1140 if run_params.get("schedule_key") != "schedule_only": 1141 return fail(f"schedule_key not added: {run_params.get('schedule_key')}") 1142 1143 log(" [green]parameter merging verified[/green]") 1144 1145 # cleanup 1146 client.delete(f"/deployments/{deployment_id}") 1147 client.delete(f"/work_pools/{pool_name}") 1148 log(" cleanup: ok") 1149 1150 return True 1151 1152 1153def test_get_scheduled_flow_runs(client: CountingClient) -> bool: 1154 """Test get_scheduled_flow_runs endpoint (worker polling).""" 1155 from datetime import datetime, timezone 1156 1157 def fail(msg: str) -> bool: 1158 if not QUIET: console.print(f"[red]FAIL[/red]: {msg}") 1159 return False 1160 1161 def log(msg: str) -> None: 1162 if not QUIET: console.print(msg) 1163 1164 # setup 1165 log("[bold]setup: create flow[/bold]") 1166 resp = client.post("/flows/", json={"name": f"schedule-flow-{uuid.uuid4().hex[:8]}"}) 1167 if resp.status_code not in (200, 201): return fail(f"create flow {resp.status_code}") 1168 flow_id = resp.json().get("id") 1169 1170 pool_name = f"schedule-pool-{uuid.uuid4().hex[:8]}" 1171 resp = client.post("/work_pools/", json={"name": pool_name, "type": "process"}) 1172 if resp.status_code not in (200, 201): return fail(f"create work_pool {resp.status_code}") 1173 pool = resp.json() 1174 pool_id, default_queue_id = pool.get("id"), pool.get("default_queue_id") 1175 log(f" pool: {pool_id}") 1176 if pool.get("status") != "NOT_READY": return fail(f"expected pool NOT_READY, got {pool.get('status')}") 1177 1178 resp = client.post("/deployments/", json={"name": f"schedule-deploy-{uuid.uuid4().hex[:8]}", "flow_id": flow_id, "work_pool_name": pool_name}) 1179 if resp.status_code not in (200, 201): return fail(f"create deployment {resp.status_code}") 1180 deployment = resp.json() 1181 deployment_id = deployment.get("id") 1182 log(f" deployment: {deployment_id}") 1183 if deployment.get("status") != "NOT_READY": return fail(f"expected deployment NOT_READY, got {deployment.get('status')}") 1184 1185 # create scheduled flow run 1186 log("[bold]create scheduled flow run[/bold]") 1187 resp = client.post(f"/deployments/{deployment_id}/create_flow_run", json={"state": {"type": "SCHEDULED", "name": "Scheduled"}}) 1188 if resp.status_code not in (200, 201): return fail(f"create flow run {resp.status_code}") 1189 flow_run = resp.json() 1190 flow_run_id = flow_run.get("id") 1191 log(f" flow_run: {flow_run_id}") 1192 log(f" state: {flow_run.get('state_type')}") 1193 if flow_run.get("state_type") != "SCHEDULED": return fail(f"expected SCHEDULED, got {flow_run.get('state_type')}") 1194 1195 # test polling 1196 log("[bold]get_scheduled_flow_runs[/bold]") 1197 resp = client.post(f"/work_pools/{pool_name}/get_scheduled_flow_runs", json={}) 1198 if resp.status_code != 200: return fail(f"get_scheduled_flow_runs {resp.status_code}") 1199 scheduled_runs = resp.json() 1200 if not isinstance(scheduled_runs, list): return fail(f"expected list, got {type(scheduled_runs)}") 1201 log(f" returned {len(scheduled_runs)} runs") 1202 1203 # verify our run is in results 1204 found = any(item.get("flow_run", {}).get("id") == flow_run_id and 1205 item.get("work_pool_id") == pool_id and item.get("work_queue_id") == default_queue_id 1206 for item in scheduled_runs) 1207 if not found: return fail("scheduled flow run not found in results") 1208 log(" flow run found in results") 1209 1210 # verify status changes 1211 resp = client.get(f"/work_pools/{pool_name}") 1212 if resp.status_code != 200 or resp.json().get("status") != "READY": 1213 return fail(f"expected pool READY after polling") 1214 log(" pool status: READY") 1215 1216 resp = client.get(f"/deployments/{deployment_id}") 1217 if resp.status_code != 200 or resp.json().get("status") != "READY": 1218 return fail(f"expected deployment READY after polling") 1219 log(" deployment status: READY") 1220 1221 # test filters 1222 resp = client.post(f"/work_pools/{pool_name}/get_scheduled_flow_runs", json={"work_queue_names": ["default"]}) 1223 if resp.status_code != 200: return fail(f"filter test {resp.status_code}") 1224 log(" filtered by queue: ok") 1225 1226 resp = client.post(f"/work_pools/{pool_name}/get_scheduled_flow_runs", json={"scheduled_before": datetime.now(timezone.utc).isoformat()}) 1227 if resp.status_code != 200: return fail(f"scheduled_before test {resp.status_code}") 1228 log(f" scheduled_before filter: {len(resp.json())} runs") 1229 1230 # cleanup 1231 client.delete(f"/deployments/{deployment_id}") 1232 client.delete(f"/work_pools/{pool_name}") 1233 log(" cleanup: ok") 1234 return True 1235 1236 1237def test_retry_failed_flows(client: CountingClient) -> bool: 1238 """Test RetryFailedFlows rule - retry cycle when configured.""" 1239 1240 def fail(msg: str) -> bool: 1241 if not QUIET: console.print(f"[red]FAIL[/red]: {msg}") 1242 return False 1243 1244 def log(msg: str) -> None: 1245 if not QUIET: console.print(msg) 1246 1247 # setup 1248 log("[bold]setup[/bold]") 1249 resp = client.post("/flows/", json={"name": f"retry-flow-{uuid.uuid4().hex[:8]}"}) 1250 if resp.status_code not in (200, 201): return fail(f"create flow {resp.status_code}") 1251 flow_id = resp.json().get("id") 1252 1253 # create flow run with retry configuration via empirical_policy 1254 log("[bold]create flow run with retries=2, retry_delay=1[/bold]") 1255 resp = client.post("/flow_runs/", json={ 1256 "flow_id": flow_id, 1257 "name": f"retry-run-{uuid.uuid4().hex[:8]}", 1258 "state": {"type": "PENDING", "name": "Pending"}, 1259 "empirical_policy": {"retries": 2, "retry_delay": 1}, 1260 }) 1261 if resp.status_code not in (200, 201): return fail(f"create flow run {resp.status_code}") 1262 flow_run = resp.json() 1263 flow_run_id = flow_run.get("id") 1264 log(f" flow_run_id: {flow_run_id}") 1265 1266 # run_count starts at 0, increments when entering RUNNING 1267 # transition to RUNNING (run_count becomes 1) 1268 log("[bold]PENDING → RUNNING (first attempt)[/bold]") 1269 resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={ 1270 "state": {"type": "RUNNING", "name": "Running"}, 1271 }) 1272 if resp.status_code not in (200, 201): return fail(f"set RUNNING {resp.status_code}") 1273 if resp.json().get("status") != "ACCEPT": 1274 return fail(f"expected ACCEPT for RUNNING, got {resp.json().get('status')}") 1275 1276 # verify run_count 1277 resp = client.get(f"/flow_runs/{flow_run_id}") 1278 if resp.status_code != 200: return fail(f"get flow run {resp.status_code}") 1279 run_count = resp.json().get("run_count", 0) 1280 log(f" run_count after RUNNING: {run_count}") 1281 1282 # fail - with retries available, should REJECT and schedule retry 1283 log("[bold]RUNNING → FAILED (expect REJECT + AwaitingRetry)[/bold]") 1284 resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={ 1285 "state": {"type": "FAILED", "name": "Failed", "message": "Test failure"}, 1286 }) 1287 if resp.status_code not in (200, 201): return fail(f"set FAILED {resp.status_code}") 1288 result = resp.json() 1289 status = result.get("status") 1290 log(f" status: {status}") 1291 1292 # with retries, should get REJECT and the run should be in AwaitingRetry state 1293 if status == "REJECT": 1294 log(f" [green]REJECT (retries available)[/green]") 1295 # verify state is now AwaitingRetry (SCHEDULED) 1296 resp = client.get(f"/flow_runs/{flow_run_id}") 1297 if resp.status_code != 200: return fail(f"get flow run {resp.status_code}") 1298 actual_state = resp.json().get("state_type") 1299 actual_name = resp.json().get("state_name") 1300 log(f" state: {actual_state}/{actual_name}") 1301 if actual_state != "SCHEDULED": 1302 return fail(f"expected SCHEDULED (AwaitingRetry), got {actual_state}") 1303 log(f" [green]state: SCHEDULED/AwaitingRetry (correct)[/green]") 1304 elif status == "ACCEPT": 1305 # this could happen if run_count > retries - check if that's the case 1306 log(f" ACCEPT (retries may be exhausted)") 1307 return True 1308 else: 1309 return fail(f"unexpected status: {status}") 1310 1311 # complete the retry cycle: SCHEDULED → PENDING → RUNNING → COMPLETED 1312 log("[bold]retry cycle: SCHEDULED → PENDING → RUNNING → COMPLETED[/bold]") 1313 1314 # SCHEDULED → PENDING 1315 resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={ 1316 "state": {"type": "PENDING", "name": "Pending"}, 1317 }) 1318 if resp.status_code not in (200, 201): return fail(f"retry PENDING {resp.status_code}") 1319 log(f" → PENDING: {resp.json().get('status')}") 1320 1321 # PENDING → RUNNING (run_count increments) 1322 resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={ 1323 "state": {"type": "RUNNING", "name": "Running"}, 1324 }) 1325 if resp.status_code not in (200, 201): return fail(f"retry RUNNING {resp.status_code}") 1326 log(f" → RUNNING: {resp.json().get('status')}") 1327 1328 # verify run_count incremented 1329 resp = client.get(f"/flow_runs/{flow_run_id}") 1330 run_count = resp.json().get("run_count", 0) 1331 log(f" run_count: {run_count}") 1332 1333 # complete successfully 1334 resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={ 1335 "state": {"type": "COMPLETED", "name": "Completed"}, 1336 }) 1337 if resp.status_code not in (200, 201): return fail(f"COMPLETED {resp.status_code}") 1338 if resp.json().get("status") != "ACCEPT": 1339 return fail(f"expected ACCEPT for COMPLETED, got {resp.json().get('status')}") 1340 log(f" [green]→ COMPLETED: ACCEPT[/green]") 1341 1342 return True 1343 1344 1345def test_cancellation_flow(client: CountingClient) -> bool: 1346 """Test cancellation states (CANCELLING, CANCELLED).""" 1347 1348 def fail(msg: str) -> bool: 1349 if not QUIET: console.print(f"[red]FAIL[/red]: {msg}") 1350 return False 1351 1352 def log(msg: str) -> None: 1353 if not QUIET: console.print(msg) 1354 1355 # setup 1356 log("[bold]setup[/bold]") 1357 resp = client.post("/flows/", json={"name": f"cancel-flow-{uuid.uuid4().hex[:8]}"}) 1358 if resp.status_code not in (200, 201): return fail(f"create flow {resp.status_code}") 1359 flow_id = resp.json().get("id") 1360 1361 resp = client.post("/flow_runs/", json={ 1362 "flow_id": flow_id, 1363 "name": f"cancel-run-{uuid.uuid4().hex[:8]}", 1364 "state": {"type": "PENDING", "name": "Pending"}, 1365 }) 1366 if resp.status_code not in (200, 201): return fail(f"create flow run {resp.status_code}") 1367 flow_run_id = resp.json().get("id") 1368 log(f" flow_run_id: {flow_run_id}") 1369 1370 # start the run 1371 log("[bold]PENDING → RUNNING[/bold]") 1372 resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={ 1373 "state": {"type": "RUNNING", "name": "Running"}, 1374 }) 1375 if resp.status_code not in (200, 201) or resp.json().get("status") != "ACCEPT": 1376 return fail(f"RUNNING transition failed") 1377 log(f" status: ACCEPT") 1378 1379 # cancel while running - first CANCELLING 1380 log("[bold]RUNNING → CANCELLING[/bold]") 1381 resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={ 1382 "state": {"type": "CANCELLING", "name": "Cancelling"}, 1383 }) 1384 if resp.status_code not in (200, 201): return fail(f"CANCELLING {resp.status_code}") 1385 if resp.json().get("status") != "ACCEPT": 1386 return fail(f"expected ACCEPT for CANCELLING, got {resp.json().get('status')}") 1387 log(f" [green]status: ACCEPT[/green]") 1388 1389 # verify state 1390 resp = client.get(f"/flow_runs/{flow_run_id}") 1391 if resp.json().get("state_type") != "CANCELLING": 1392 return fail(f"expected CANCELLING, got {resp.json().get('state_type')}") 1393 1394 # complete cancellation 1395 log("[bold]CANCELLING → CANCELLED[/bold]") 1396 resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={ 1397 "state": {"type": "CANCELLED", "name": "Cancelled"}, 1398 }) 1399 if resp.status_code not in (200, 201): return fail(f"CANCELLED {resp.status_code}") 1400 if resp.json().get("status") != "ACCEPT": 1401 return fail(f"expected ACCEPT for CANCELLED, got {resp.json().get('status')}") 1402 log(f" [green]status: ACCEPT[/green]") 1403 1404 # verify final state 1405 resp = client.get(f"/flow_runs/{flow_run_id}") 1406 if resp.json().get("state_type") != "CANCELLED": 1407 return fail(f"expected CANCELLED, got {resp.json().get('state_type')}") 1408 log(f" [green]final state: CANCELLED[/green]") 1409 1410 # CANCELLED is terminal - verify can't go back to PENDING (PreventPendingTransitions) 1411 log("[bold]CANCELLED → PENDING (expect REJECT)[/bold]") 1412 resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={ 1413 "state": {"type": "PENDING", "name": "Pending"}, 1414 }) 1415 if resp.status_code not in (200, 201): return fail(f"PENDING attempt {resp.status_code}") 1416 if resp.json().get("status") != "REJECT": 1417 return fail(f"expected REJECT for CANCELLED→PENDING, got {resp.json().get('status')}") 1418 log(f" [green]CANCELLED → PENDING: REJECT (correct)[/green]") 1419 1420 return True 1421 1422 1423def test_serve_pattern(client: CountingClient) -> bool: 1424 """Test Runner/.serve() pattern: poll /deployments/get_scheduled_flow_runs, execute locally. 1425 1426 This simulates what happens when you call flow.serve() - a Runner process polls 1427 the deployments endpoint for scheduled runs and executes them in subprocesses. 1428 """ 1429 1430 def fail(msg: str) -> bool: 1431 if not QUIET: console.print(f"[red]FAIL[/red]: {msg}") 1432 return False 1433 1434 def log(msg: str) -> None: 1435 if not QUIET: console.print(msg) 1436 1437 # setup: create flow and deployment (no work pool - serve doesn't use workers) 1438 log("[bold]setup: flow + deployment (no work pool)[/bold]") 1439 resp = client.post("/flows/", json={"name": f"serve-flow-{uuid.uuid4().hex[:8]}"}) 1440 if resp.status_code not in (200, 201): return fail(f"create flow {resp.status_code}") 1441 flow_id = resp.json().get("id") 1442 1443 resp = client.post("/deployments/", json={ 1444 "name": f"serve-deploy-{uuid.uuid4().hex[:8]}", 1445 "flow_id": flow_id, 1446 # no work_pool_name - this is the serve pattern 1447 }) 1448 if resp.status_code not in (200, 201): return fail(f"create deployment {resp.status_code}") 1449 deployment = resp.json() 1450 deployment_id = deployment.get("id") 1451 log(f" deployment: {deployment_id}") 1452 1453 # create a scheduled flow run 1454 log("[bold]1. create scheduled flow run[/bold]") 1455 resp = client.post(f"/deployments/{deployment_id}/create_flow_run", json={ 1456 "state": {"type": "SCHEDULED", "name": "Scheduled"}, 1457 }) 1458 if resp.status_code not in (200, 201): return fail(f"create flow run {resp.status_code}") 1459 flow_run = resp.json() 1460 flow_run_id = flow_run.get("id") 1461 log(f" flow_run: {flow_run_id}") 1462 1463 # Runner polls: POST /deployments/get_scheduled_flow_runs 1464 log("[bold]2. Runner polls /deployments/get_scheduled_flow_runs[/bold]") 1465 from datetime import datetime, timezone 1466 resp = client.post("/deployments/get_scheduled_flow_runs", json={ 1467 "deployment_ids": [deployment_id], 1468 "scheduled_before": datetime.now(timezone.utc).isoformat(), 1469 "limit": 10, 1470 }) 1471 if resp.status_code != 200: return fail(f"poll {resp.status_code}") 1472 scheduled = resp.json() 1473 found = any(r.get("id") == flow_run_id for r in scheduled) 1474 if not found: return fail("run not found in scheduled runs") 1475 log(f" found {len(scheduled)} scheduled run(s)") 1476 1477 # Runner executes: SCHEDULED → PENDING → RUNNING → COMPLETED 1478 log("[bold]3. Runner executes flow locally[/bold]") 1479 1480 # claim the run 1481 resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={ 1482 "state": {"type": "PENDING", "name": "Pending"}, 1483 }) 1484 if resp.status_code not in (200, 201): return fail(f"PENDING {resp.status_code}") 1485 log(f" → PENDING: {resp.json().get('status')}") 1486 1487 # execute 1488 resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={ 1489 "state": {"type": "RUNNING", "name": "Running"}, 1490 }) 1491 if resp.status_code not in (200, 201): return fail(f"RUNNING {resp.status_code}") 1492 log(f" → RUNNING: {resp.json().get('status')}") 1493 1494 # complete 1495 resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={ 1496 "state": {"type": "COMPLETED", "name": "Completed"}, 1497 }) 1498 if resp.status_code not in (200, 201): return fail(f"COMPLETED {resp.status_code}") 1499 if resp.json().get("status") != "ACCEPT": 1500 return fail(f"expected ACCEPT, got {resp.json().get('status')}") 1501 log(f" [green]→ COMPLETED: ACCEPT[/green]") 1502 1503 # cleanup 1504 client.delete(f"/deployments/{deployment_id}") 1505 log(" cleanup: ok") 1506 1507 return True 1508 1509 1510def test_worker_pattern(client: CountingClient) -> bool: 1511 """Test Worker pattern: poll /work_pools/{name}/get_scheduled_flow_runs, dispatch to infra. 1512 1513 This simulates what a Prefect Worker does - it polls a work pool for scheduled runs 1514 and dispatches them to infrastructure (k8s, docker, etc). The worker doesn't execute 1515 the flow itself; infrastructure does. 1516 """ 1517 1518 def fail(msg: str) -> bool: 1519 if not QUIET: console.print(f"[red]FAIL[/red]: {msg}") 1520 return False 1521 1522 def log(msg: str) -> None: 1523 if not QUIET: console.print(msg) 1524 1525 # setup: flow + work pool + deployment with work_pool_name 1526 log("[bold]setup: flow + work pool + deployment[/bold]") 1527 resp = client.post("/flows/", json={"name": f"worker-flow-{uuid.uuid4().hex[:8]}"}) 1528 if resp.status_code not in (200, 201): return fail(f"create flow {resp.status_code}") 1529 flow_id = resp.json().get("id") 1530 1531 pool_name = f"worker-pool-{uuid.uuid4().hex[:8]}" 1532 resp = client.post("/work_pools/", json={"name": pool_name, "type": "kubernetes"}) 1533 if resp.status_code not in (200, 201): return fail(f"create work_pool {resp.status_code}") 1534 log(f" pool: {pool_name}") 1535 1536 resp = client.post("/deployments/", json={ 1537 "name": f"worker-deploy-{uuid.uuid4().hex[:8]}", 1538 "flow_id": flow_id, 1539 "work_pool_name": pool_name, # KEY: uses work pool 1540 }) 1541 if resp.status_code not in (200, 201): return fail(f"create deployment {resp.status_code}") 1542 deployment_id = resp.json().get("id") 1543 log(f" deployment: {deployment_id}") 1544 1545 # worker registers via heartbeat 1546 log("[bold]1. Worker sends heartbeat[/bold]") 1547 resp = client.post(f"/work_pools/{pool_name}/workers/heartbeat", json={ 1548 "name": "k8s-worker-1", 1549 "heartbeat_interval_seconds": 30, 1550 }) 1551 if resp.status_code != 204: return fail(f"heartbeat {resp.status_code}") 1552 log(f" heartbeat sent") 1553 1554 # create a scheduled flow run 1555 log("[bold]2. create scheduled flow run[/bold]") 1556 resp = client.post(f"/deployments/{deployment_id}/create_flow_run", json={ 1557 "state": {"type": "SCHEDULED", "name": "Scheduled"}, 1558 }) 1559 if resp.status_code not in (200, 201): return fail(f"create flow run {resp.status_code}") 1560 flow_run_id = resp.json().get("id") 1561 log(f" flow_run: {flow_run_id}") 1562 1563 # Worker polls: POST /work_pools/{name}/get_scheduled_flow_runs 1564 log("[bold]3. Worker polls /work_pools/{name}/get_scheduled_flow_runs[/bold]") 1565 resp = client.post(f"/work_pools/{pool_name}/get_scheduled_flow_runs", json={}) 1566 if resp.status_code != 200: return fail(f"poll {resp.status_code}") 1567 scheduled = resp.json() 1568 found = any(r.get("flow_run", {}).get("id") == flow_run_id for r in scheduled) 1569 if not found: return fail("run not found in work pool queue") 1570 log(f" found run in work pool queue") 1571 1572 # Worker dispatches to infrastructure (simulated - infra reports back state) 1573 log("[bold]4. Infrastructure executes and reports state[/bold]") 1574 1575 # infra claims 1576 resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={ 1577 "state": {"type": "PENDING", "name": "Pending"}, 1578 }) 1579 log(f" → PENDING: {resp.json().get('status')}") 1580 1581 # infra runs 1582 resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={ 1583 "state": {"type": "RUNNING", "name": "Running"}, 1584 }) 1585 log(f" → RUNNING: {resp.json().get('status')}") 1586 1587 # infra completes 1588 resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={ 1589 "state": {"type": "COMPLETED", "name": "Completed"}, 1590 }) 1591 if resp.json().get("status") != "ACCEPT": 1592 return fail(f"expected ACCEPT, got {resp.json().get('status')}") 1593 log(f" [green]→ COMPLETED: ACCEPT[/green]") 1594 1595 # cleanup 1596 client.delete(f"/deployments/{deployment_id}") 1597 client.delete(f"/work_pools/{pool_name}") 1598 log(" cleanup: ok") 1599 1600 return True 1601 1602 1603def test_late_runs(client: CountingClient) -> bool: 1604 """Test that late_runs service marks overdue scheduled runs as Late.""" 1605 from datetime import datetime, timezone, timedelta 1606 import time as time_mod 1607 fail = lambda msg: (QUIET or console.print(f"[red]FAIL[/red]: {msg}"), False)[1] 1608 log = lambda msg: QUIET or console.print(msg) 1609 1610 resp = client.post("/flows/", json={"name": f"late-flow-{uuid.uuid4().hex[:8]}"}) 1611 if resp.status_code not in (200, 201): return fail(f"create flow {resp.status_code}") 1612 flow_id = resp.json().get("id") 1613 1614 # create flow run with past scheduled time (60s ago, threshold is 15s) 1615 past_time = (datetime.now(timezone.utc) - timedelta(seconds=60)).isoformat() 1616 resp = client.post("/flow_runs/", json={ 1617 "flow_id": flow_id, "name": f"late-run-{uuid.uuid4().hex[:8]}", 1618 "state": {"type": "SCHEDULED", "name": "Scheduled"}, "next_scheduled_start_time": past_time}) 1619 if resp.status_code not in (200, 201): return fail(f"create flow run {resp.status_code}") 1620 flow_run_id = resp.json().get("id") 1621 1622 # wait for late_runs service (runs every 5s) 1623 for i in range(15): 1624 time_mod.sleep(1) 1625 resp = client.get(f"/flow_runs/{flow_run_id}") 1626 if resp.status_code == 200 and resp.json().get("state_name") == "Late": 1627 log(f" [green]state changed to Late after {i+1}s[/green]") 1628 return True 1629 return fail("late_runs service did not mark run as Late within timeout") 1630 1631 1632def test_work_queue_priority(client: CountingClient) -> bool: 1633 """Test work queue priority ordering in get_scheduled_flow_runs.""" 1634 fail = lambda msg: (QUIET or console.print(f"[red]FAIL[/red]: {msg}"), False)[1] 1635 log = lambda msg: QUIET or console.print(msg) 1636 1637 resp = client.post("/flows/", json={"name": f"priority-flow-{uuid.uuid4().hex[:8]}"}) 1638 if resp.status_code not in (200, 201): return fail(f"create flow {resp.status_code}") 1639 flow_id = resp.json().get("id") 1640 1641 pool_name = f"priority-pool-{uuid.uuid4().hex[:8]}" 1642 resp = client.post("/work_pools/", json={"name": pool_name, "type": "process"}) 1643 if resp.status_code not in (200, 201): return fail(f"create pool {resp.status_code}") 1644 1645 # create high and low priority queues 1646 for name, pri in [("high-priority", 1), ("low-priority", 100)]: 1647 resp = client.post(f"/work_pools/{pool_name}/queues/", json={"name": name, "priority": pri}) 1648 if resp.status_code not in (200, 201): return fail(f"create {name} queue {resp.status_code}") 1649 1650 # create deployments for each queue and flow runs in reverse priority order 1651 deployments, flow_runs = {}, {} 1652 for q in ["high-priority", "low-priority", "default"]: 1653 resp = client.post("/deployments/", json={"name": f"d-{q}-{uuid.uuid4().hex[:8]}", "flow_id": flow_id, 1654 "work_pool_name": pool_name, "work_queue_name": q}) 1655 if resp.status_code not in (200, 201): return fail(f"create deployment {q} {resp.status_code}") 1656 deployments[q] = resp.json().get("id") 1657 for q in ["low-priority", "default", "high-priority"]: 1658 resp = client.post(f"/deployments/{deployments[q]}/create_flow_run", json={"state": {"type": "SCHEDULED", "name": "Scheduled"}}) 1659 if resp.status_code not in (200, 201): return fail(f"create run for {q} {resp.status_code}") 1660 flow_runs[q] = resp.json().get("id") 1661 1662 # poll and verify priority ordering 1663 resp = client.post(f"/work_pools/{pool_name}/get_scheduled_flow_runs", json={}) 1664 if resp.status_code != 200: return fail(f"poll {resp.status_code}") 1665 scheduled = resp.json() 1666 if len(scheduled) < 3: return fail(f"expected 3 runs, got {len(scheduled)}") 1667 1668 run_ids = [r.get("flow_run", {}).get("id") for r in scheduled] 1669 pos = {q: run_ids.index(flow_runs[q]) if flow_runs[q] in run_ids else -1 for q in flow_runs} 1670 if -1 in pos.values(): return fail("not all runs found") 1671 if pos["high-priority"] > pos["low-priority"]: return fail("high should come before low") 1672 log(f" [green]priority ordering verified (high={pos['high-priority']}, low={pos['low-priority']})[/green]") 1673 1674 for d in deployments.values(): client.delete(f"/deployments/{d}") 1675 client.delete(f"/work_pools/{pool_name}") 1676 return True 1677 1678 1679def main(): 1680 json_output = "--json" in sys.argv 1681 1682 if not QUIET: 1683 console.print("\n[bold cyan]prefect-server API test[/bold cyan]\n") 1684 1685 results: list[TestResult] = [] 1686 1687 # run all tests 1688 # core API tests (fast, no sleeps) 1689 results.append(run_test("admin", test_admin)) 1690 results.append(run_test("flow_run (success)", lambda c: test_flow_run(c, should_fail=False))) 1691 results.append(run_test("flow_run (failure)", lambda c: test_flow_run(c, should_fail=True))) 1692 results.append(run_test("flow_with_task_runs", test_flow_with_task_runs)) 1693 results.append(run_test("task_run", test_task_run)) 1694 results.append(run_test("filters", test_filters)) 1695 results.append(run_test("logs", test_logs)) 1696 results.append(run_test("variables", test_variables)) 1697 results.append(run_test("blocks", test_blocks)) 1698 results.append(run_test("work_pools", test_work_pools)) 1699 results.append(run_test("deployments", test_deployments)) 1700 1701 # execution patterns (two distinct models) 1702 results.append(run_test("serve_pattern", test_serve_pattern)) # Runner: polls deployments 1703 results.append(run_test("worker_pattern", test_worker_pattern)) # Worker: polls work pools 1704 results.append(run_test("work_queue_priority", test_work_queue_priority)) 1705 results.append(run_test("retry_failed_flows", test_retry_failed_flows)) 1706 results.append(run_test("cancellation_flow", test_cancellation_flow)) 1707 results.append(run_test("late_runs", test_late_runs)) # background service marks overdue runs 1708 1709 # scheduler integration tests (require sleeps for background service) 1710 results.append(run_test("scheduler_idempotency", test_scheduler_idempotency)) 1711 results.append(run_test("parameter_merging", test_parameter_merging)) 1712 1713 total_duration = sum(r.duration_ms for r in results) 1714 total_requests = sum(r.requests for r in results) 1715 all_passed = all(r.passed for r in results) 1716 1717 if json_output: 1718 # machine-readable output for benchmark script 1719 output = { 1720 "passed": all_passed, 1721 "total_duration_ms": total_duration, 1722 "total_requests": total_requests, 1723 "sections": [ 1724 { 1725 "name": r.name, 1726 "passed": r.passed, 1727 "duration_ms": r.duration_ms, 1728 "requests": r.requests, 1729 "error": r.error, 1730 } 1731 for r in results 1732 ], 1733 } 1734 print(json_lib.dumps(output)) 1735 else: 1736 # human-readable output 1737 console.print("\n" + "=" * 60) 1738 1739 table = Table(title="test results") 1740 table.add_column("section", style="cyan") 1741 table.add_column("time", justify="right") 1742 table.add_column("reqs", justify="right") 1743 table.add_column("status", justify="center") 1744 1745 for r in results: 1746 status = "[green]✓[/green]" if r.passed else "[red]✗[/red]" 1747 table.add_row(r.name, f"{r.duration_ms:.1f}ms", str(r.requests), status) 1748 1749 table.add_row("", "", "", "", style="dim") 1750 table.add_row("[bold]total[/bold]", f"[bold]{total_duration:.1f}ms[/bold]", f"[bold]{total_requests}[/bold]", "") 1751 1752 console.print(table) 1753 1754 if all_passed: 1755 console.print("\n[bold green]all tests passed[/bold green]") 1756 else: 1757 console.print("\n[bold red]some tests failed[/bold red]") 1758 1759 sys.exit(0 if all_passed else 1) 1760 1761 1762if __name__ == "__main__": 1763 main()