#!/usr/bin/env -S uv run --script --quiet # /// script # requires-python = ">=3.12" # dependencies = ["httpx", "rich", "psutil"] # /// """ Performance benchmark for prefect-server. Measures actual API throughput and latency with concurrent requests. Separate from functional tests (test-api-sequence). Usage: ./scripts/benchmark --server zig ./scripts/benchmark --server python ./scripts/benchmark --compare ./scripts/benchmark --matrix """ import argparse import os import shutil import signal import statistics import subprocess import time from dataclasses import dataclass from pathlib import Path from typing import Literal import httpx import psutil from rich.console import Console from rich.table import Table console = Console() SERVER_PORT = 4200 API_URL = f"http://localhost:{SERVER_PORT}/api" # benchmark configuration WARMUP_REQUESTS = 10 BENCHMARK_DURATION = 5.0 # seconds CONCURRENT_CLIENTS = 4 @dataclass class LatencyStats: min_ms: float avg_ms: float p50_ms: float p95_ms: float p99_ms: float max_ms: float @dataclass class EndpointResult: name: str requests: int rps: float latency: LatencyStats @dataclass class BenchmarkResult: server: str success: bool memory_mb: float endpoints: list[EndpointResult] db_backend: str = "sqlite" broker_backend: str = "memory" error: str | None = None def percentile(data: list[float], p: float) -> float: """Calculate percentile of sorted data.""" if not data: return 0.0 k = (len(data) - 1) * p / 100 f = int(k) c = f + 1 if f + 1 < len(data) else f return data[f] + (k - f) * (data[c] - data[f]) def calculate_latency_stats(latencies_ms: list[float]) -> LatencyStats: """Calculate latency statistics from a list of latencies.""" if not latencies_ms: return LatencyStats(0, 0, 0, 0, 0, 0) sorted_lat = sorted(latencies_ms) return LatencyStats( min_ms=sorted_lat[0], avg_ms=statistics.mean(sorted_lat), p50_ms=percentile(sorted_lat, 50), p95_ms=percentile(sorted_lat, 95), p99_ms=percentile(sorted_lat, 99), max_ms=sorted_lat[-1], ) def wait_for_health(timeout: float = 30.0) -> bool: """Wait for server health endpoint.""" start = time.time() while time.time() - start < timeout: try: resp = httpx.get(f"{API_URL}/health", timeout=2.0) if resp.status_code == 200: return True except httpx.RequestError: pass time.sleep(0.1) return False def kill_port(port: int) -> None: """Kill any process on the given port.""" try: result = subprocess.run(["lsof", "-ti", f":{port}"], capture_output=True, text=True) if result.stdout.strip(): for pid in result.stdout.strip().split("\n"): try: os.kill(int(pid), signal.SIGTERM) except (ProcessLookupError, ValueError): pass time.sleep(0.5) except Exception: pass def get_process_memory_mb(pid: int) -> float: """Get memory usage of a process and all children in MB.""" try: proc = psutil.Process(pid) total = proc.memory_info().rss for child in proc.children(recursive=True): try: total += child.memory_info().rss except (psutil.NoSuchProcess, psutil.AccessDenied): pass return total / (1024 * 1024) except (psutil.NoSuchProcess, psutil.AccessDenied): return 0.0 class ServerManager: """Manages server lifecycle.""" def __init__( self, server_type: Literal["zig", "python"], db_backend: str = "sqlite", broker_backend: str = "memory", ): self.server_type = server_type self.db_backend = db_backend self.broker_backend = broker_backend self.process: subprocess.Popen | None = None self.project_root = Path(__file__).parent.parent self.startup_error: str | None = None def start(self) -> bool: """Start the server.""" kill_port(SERVER_PORT) if self.server_type == "zig": return self._start_zig() return self._start_python() def _reset_postgres(self) -> None: """Reset postgres schema for clean test state.""" try: subprocess.run( [ "docker", "compose", "exec", "-T", "postgres", "psql", "-U", "prefect", "-c", "DROP SCHEMA public CASCADE; CREATE SCHEMA public; GRANT ALL ON SCHEMA public TO prefect;", ], cwd=self.project_root, capture_output=True, check=True, ) except subprocess.CalledProcessError: pass # postgres might not be running yet, that's ok def _start_zig(self) -> bool: binary = self.project_root / "zig-out" / "bin" / "prefect-server" if not binary.exists(): console.print(f"[red]error: zig binary not found at {binary}[/red]") return False if self.db_backend == "sqlite": db_path = self.project_root / "prefect.db" if db_path.exists(): db_path.unlink() elif self.db_backend == "postgres": # reset postgres schema for clean state self._reset_postgres() env = os.environ.copy() env["PREFECT_SERVER_LOGGING_LEVEL"] = "WARNING" env["PREFECT_DATABASE_BACKEND"] = self.db_backend env["PREFECT_BROKER_BACKEND"] = self.broker_backend if self.db_backend == "postgres": env.setdefault("PREFECT_DATABASE_URL", "postgresql://prefect:prefect@localhost:5432/prefect") # first attempt: DEVNULL to avoid buffer issues during benchmark self.process = subprocess.Popen( [str(binary)], cwd=self.project_root, env=env, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) if not wait_for_health(): self.stop() # restart with output capture to get error message self.process = subprocess.Popen( [str(binary)], cwd=self.project_root, env=env, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, ) time.sleep(2) # give server time to fail if self.process.poll() is not None: output, _ = self.process.communicate() self.startup_error = output.strip()[-500:] if output else "server exited" else: self.startup_error = "health check timeout" self.stop() return False return True def _start_python(self) -> bool: if not shutil.which("uvx"): console.print("[red]error: uvx not found[/red]") return False temp_db = self.project_root / ".benchmark-python.db" if temp_db.exists(): temp_db.unlink() env = os.environ.copy() env["PREFECT_SERVER_DATABASE_CONNECTION_URL"] = f"sqlite+aiosqlite:///{temp_db}" env["PREFECT_SERVER_LOGGING_LEVEL"] = "WARNING" env["PREFECT_LOGGING_LEVEL"] = "WARNING" self.process = subprocess.Popen( ["uvx", "prefect", "server", "start", "--port", str(SERVER_PORT)], env=env, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) if not wait_for_health(timeout=60.0): self.stop() return False return True def get_memory_mb(self) -> float: if self.process: return get_process_memory_mb(self.process.pid) return 0.0 def stop(self) -> None: if self.process: self.process.terminate() try: self.process.wait(timeout=5) except subprocess.TimeoutExpired: self.process.kill() self.process = None kill_port(SERVER_PORT) def benchmark_endpoint( client: httpx.Client, name: str, method: str, path: str, json_body: dict | None = None, setup_fn=None, ) -> EndpointResult: """Benchmark a single endpoint.""" # setup if needed setup_data = {} if setup_fn: setup_data = setup_fn(client) # format path with setup data formatted_path = path.format(**setup_data) if setup_data else path # warmup for _ in range(WARMUP_REQUESTS): try: if method == "GET": client.get(formatted_path) else: client.post(formatted_path, json=json_body or {}) except httpx.RequestError: pass # benchmark latencies: list[float] = [] start_time = time.perf_counter() request_count = 0 while time.perf_counter() - start_time < BENCHMARK_DURATION: req_start = time.perf_counter() try: if method == "GET": resp = client.get(formatted_path) else: resp = client.post(formatted_path, json=json_body or {}) if resp.status_code in (200, 201, 204): latencies.append((time.perf_counter() - req_start) * 1000) request_count += 1 except httpx.RequestError: pass elapsed = time.perf_counter() - start_time rps = request_count / elapsed if elapsed > 0 else 0 return EndpointResult( name=name, requests=request_count, rps=rps, latency=calculate_latency_stats(latencies), ) def run_benchmark(manager: ServerManager) -> BenchmarkResult: """Run the full benchmark suite.""" client = httpx.Client(base_url=API_URL, timeout=10.0) endpoints: list[EndpointResult] = [] try: # health endpoint (baseline) endpoints.append(benchmark_endpoint(client, "health", "GET", "/health")) # create a flow for subsequent tests resp = client.post("/flows/", json={"name": "bench-flow"}) if resp.status_code not in (200, 201): return BenchmarkResult( server=manager.server_type, success=False, memory_mb=manager.get_memory_mb(), endpoints=[], db_backend=manager.db_backend, broker_backend=manager.broker_backend, error="failed to create test flow", ) flow_id = resp.json()["id"] # create a flow run for read tests resp = client.post("/flow_runs/", json={ "flow_id": flow_id, "name": "bench-run", "state": {"type": "PENDING", "name": "Pending"}, }) if resp.status_code not in (200, 201): return BenchmarkResult( server=manager.server_type, success=False, memory_mb=manager.get_memory_mb(), endpoints=[], db_backend=manager.db_backend, broker_backend=manager.broker_backend, error="failed to create test flow run", ) flow_run_id = resp.json()["id"] # GET /flows/{id} endpoints.append(benchmark_endpoint( client, "GET flow", "GET", f"/flows/{flow_id}" )) # POST /flows/filter endpoints.append(benchmark_endpoint( client, "filter flows", "POST", "/flows/filter", {"limit": 10} )) # GET /flow_runs/{id} endpoints.append(benchmark_endpoint( client, "GET flow_run", "GET", f"/flow_runs/{flow_run_id}" )) # POST /flow_runs/filter endpoints.append(benchmark_endpoint( client, "filter flow_runs", "POST", "/flow_runs/filter", {"limit": 10} )) # POST /flow_runs/ (create) import uuid def create_flow_run(): return benchmark_endpoint( client, "create flow_run", "POST", "/flow_runs/", {"flow_id": flow_id, "name": f"bench-{uuid.uuid4().hex[:8]}", "state": {"type": "PENDING", "name": "Pending"}} ) endpoints.append(create_flow_run()) memory_mb = manager.get_memory_mb() return BenchmarkResult( server=manager.server_type, success=True, memory_mb=memory_mb, endpoints=endpoints, db_backend=manager.db_backend, broker_backend=manager.broker_backend, ) except Exception as e: return BenchmarkResult( server=manager.server_type, success=False, memory_mb=manager.get_memory_mb(), endpoints=endpoints, db_backend=manager.db_backend, broker_backend=manager.broker_backend, error=str(e), ) finally: client.close() def print_result(result: BenchmarkResult) -> None: """Print benchmark results for a single server.""" if not result.success: console.print(f"[red]benchmark failed: {result.error}[/red]") return table = Table(title=f"{result.server} ({result.db_backend}/{result.broker_backend})") table.add_column("endpoint", style="cyan") table.add_column("reqs", justify="right") table.add_column("rps", justify="right") table.add_column("avg", justify="right") table.add_column("p50", justify="right") table.add_column("p95", justify="right") table.add_column("p99", justify="right") for ep in result.endpoints: table.add_row( ep.name, str(ep.requests), f"{ep.rps:.0f}", f"{ep.latency.avg_ms:.2f}ms", f"{ep.latency.p50_ms:.2f}ms", f"{ep.latency.p95_ms:.2f}ms", f"{ep.latency.p99_ms:.2f}ms", ) console.print(table) console.print(f"memory: {result.memory_mb:.1f}MB\n") def print_comparison(zig: BenchmarkResult, python: BenchmarkResult) -> None: """Print comparison between zig and python.""" console.print() table = Table(title="zig vs python comparison") table.add_column("endpoint", style="cyan") table.add_column("zig rps", justify="right") table.add_column("python rps", justify="right") table.add_column("speedup", justify="right") table.add_column("zig p50", justify="right") table.add_column("python p50", justify="right") zig_eps = {ep.name: ep for ep in zig.endpoints} python_eps = {ep.name: ep for ep in python.endpoints} total_zig_rps = 0 total_python_rps = 0 for name in zig_eps: if name not in python_eps: continue z, p = zig_eps[name], python_eps[name] speedup = z.rps / p.rps if p.rps > 0 else 0 total_zig_rps += z.rps total_python_rps += p.rps speedup_str = f"[green]{speedup:.1f}x[/green]" if speedup >= 1.5 else ( f"[dim]{speedup:.1f}x[/dim]" if speedup >= 1 else f"[yellow]{speedup:.1f}x[/yellow]" ) table.add_row( name, f"{z.rps:.0f}", f"{p.rps:.0f}", speedup_str, f"{z.latency.p50_ms:.2f}ms", f"{p.latency.p50_ms:.2f}ms", ) console.print(table) # summary overall_speedup = total_zig_rps / total_python_rps if total_python_rps > 0 else 0 mem_ratio = python.memory_mb / zig.memory_mb if zig.memory_mb > 0 else 0 console.print() summary = Table(title="summary") summary.add_column("metric", style="cyan") summary.add_column("zig", justify="right") summary.add_column("python", justify="right") summary.add_column("advantage", justify="right") summary.add_row( "total rps", f"{total_zig_rps:.0f}", f"{total_python_rps:.0f}", f"[green]{overall_speedup:.1f}x faster[/green]" if overall_speedup >= 1 else f"[yellow]{1/overall_speedup:.1f}x slower[/yellow]", ) summary.add_row( "memory", f"{zig.memory_mb:.1f}MB", f"{python.memory_mb:.1f}MB", f"[green]{mem_ratio:.1f}x smaller[/green]" if mem_ratio >= 1 else f"[yellow]{1/mem_ratio:.1f}x larger[/yellow]", ) console.print(summary) def ensure_docker_services(db_backend: str, broker_backend: str, project_root: Path) -> bool: """Start required docker services.""" services = [] if db_backend == "postgres": services.append("postgres") if broker_backend == "redis": services.append("redis") if not services: return True try: subprocess.run( ["docker", "compose", "up", "-d"] + services, cwd=project_root, capture_output=True, check=True, ) time.sleep(2 if "postgres" in services else 1) return True except subprocess.CalledProcessError as e: console.print(f"[red]failed to start docker services: {e}[/red]") return False def run_single( server_type: Literal["zig", "python"], db_backend: str = "sqlite", broker_backend: str = "memory", ) -> BenchmarkResult | None: """Run benchmark for a single server configuration.""" backend_label = f"({db_backend}/{broker_backend})" if server_type == "zig" else "" console.print(f"\n[bold]benchmarking {server_type} server {backend_label}[/bold]") console.print(f" starting server...", end=" ") manager = ServerManager(server_type, db_backend, broker_backend) if not manager.start(): console.print("[red]failed[/red]") if manager.startup_error: console.print(f"[dim]{manager.startup_error}[/dim]") return BenchmarkResult( server=server_type, success=False, memory_mb=0, endpoints=[], db_backend=db_backend, broker_backend=broker_backend, error=manager.startup_error or "failed to start server", ) console.print("running benchmark...", end=" ") result = run_benchmark(manager) manager.stop() if result.success: total_rps = sum(ep.rps for ep in result.endpoints) console.print(f"[green]{total_rps:.0f} total rps[/green] ({result.memory_mb:.1f}MB)") else: console.print(f"[red]failed: {result.error}[/red]") return result def print_matrix_results(results: list[BenchmarkResult]) -> None: """Print matrix benchmark results.""" console.print() table = Table(title="benchmark matrix (zig server)") table.add_column("db", style="cyan") table.add_column("broker", style="cyan") table.add_column("total rps", justify="right") table.add_column("memory", justify="right") table.add_column("status", justify="center") for r in results: if r.success: total_rps = sum(ep.rps for ep in r.endpoints) status = "[green]✓[/green]" table.add_row(r.db_backend, r.broker_backend, f"{total_rps:.0f}", f"{r.memory_mb:.1f}MB", status) else: table.add_row(r.db_backend, r.broker_backend, "-", "-", f"[red]✗ {r.error or 'failed'}[/red]") console.print(table) def main(): parser = argparse.ArgumentParser(description="benchmark prefect servers") parser.add_argument("--server", choices=["zig", "python"], help="server to benchmark") parser.add_argument("--compare", action="store_true", help="compare zig vs python") parser.add_argument("--matrix", action="store_true", help="run all db × broker combinations (zig only)") parser.add_argument("--db-backend", choices=["sqlite", "postgres"], default="sqlite") parser.add_argument("--broker-backend", choices=["memory", "redis"], default="memory") parser.add_argument("--duration", type=float, default=5.0, help="benchmark duration per endpoint (seconds)") args = parser.parse_args() if not args.server and not args.compare and not args.matrix: parser.error("must specify --server, --compare, or --matrix") global BENCHMARK_DURATION BENCHMARK_DURATION = args.duration console.print(f"[bold]prefect-server benchmark[/bold]") console.print(f"duration: {BENCHMARK_DURATION}s per endpoint") project_root = Path(__file__).parent.parent if args.matrix: combinations = [ ("sqlite", "memory"), ("sqlite", "redis"), ("postgres", "memory"), ("postgres", "redis"), ] results = [] for db_backend, broker_backend in combinations: if not ensure_docker_services(db_backend, broker_backend, project_root): results.append(BenchmarkResult( server="zig", success=False, memory_mb=0, endpoints=[], db_backend=db_backend, broker_backend=broker_backend, error="docker service failed", )) continue result = run_single("zig", db_backend, broker_backend) if result: results.append(result) print_matrix_results(results) elif args.compare: if not ensure_docker_services(args.db_backend, args.broker_backend, project_root): return zig_result = run_single("zig", args.db_backend, args.broker_backend) python_result = run_single("python") if zig_result and python_result and zig_result.success and python_result.success: print_comparison(zig_result, python_result) else: if zig_result: print_result(zig_result) if python_result: print_result(python_result) else: if not ensure_docker_services(args.db_backend, args.broker_backend, project_root): return result = run_single(args.server, args.db_backend, args.broker_backend) if result: print_result(result) if __name__ == "__main__": main()