prefect server in zig

add benchmarking framework, rename to prefect-server

- scripts/benchmark: compare zig vs python prefect servers
- justfile: bench-zig, bench-python, bench-compare commands
- src/main.zig: improved request error logging
- docs: rename prefect-zig to prefect-server throughout

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+423 -8
+17 -1
CLAUDE.md
··· 1 - # prefect-zig 1 + # prefect-server 2 2 3 3 zig 0.15.2 implementation of prefect server using zap (facil.io). 4 4 ··· 17 17 ``` 18 18 just dev # builds, clears db, runs server with DEBUG logging 19 19 just test # runs test_flow.py against localhost:4200 20 + ``` 21 + 22 + ## benchmarking 23 + 24 + ``` 25 + just bench-zig # benchmark zig server with test_flow.py 26 + just bench-python # benchmark python server with test_flow.py 27 + just bench-compare # compare both servers 28 + just bench-compare iterations=3 # multiple iterations for reliability 29 + 30 + # with custom workload: 31 + just bench-zig workload=scripts/test-with-client 32 + just bench-compare workload=scripts/test-flow iterations=5 33 + 34 + # or directly: 35 + ./scripts/benchmark --compare test_flow.py --iterations 3 20 36 ``` 21 37 22 38 ## env vars
+2 -2
README.md
··· 1 - # prefect-zig 1 + # prefect-server 2 2 3 3 minimal prefect server in zig - single binary, sqlite storage. 4 4 5 5 ## build & run 6 6 7 7 ```bash 8 - zig build && ./zig-out/bin/prefect-zig 8 + zig build && ./zig-out/bin/prefect-server 9 9 ``` 10 10 11 11 server runs on `localhost:4200`.
+1 -1
ROADMAP.md
··· 1 - # prefect-zig roadmap 1 + # prefect-server roadmap 2 2 3 3 comparison with prefect server (python). checkmarks indicate implemented. 4 4
+1 -1
docs/README.md
··· 1 - # prefect-zig internals 1 + # prefect-server internals 2 2 3 3 zig implementation of a prefect-compatible server. 4 4
+10
justfile
··· 10 10 11 11 test: 12 12 PREFECT_API_URL=http://localhost:4200/api uv run python test_flow.py 13 + 14 + # benchmarking 15 + bench-zig workload="test_flow.py": 16 + ./scripts/benchmark --server zig {{workload}} 17 + 18 + bench-python workload="test_flow.py": 19 + ./scripts/benchmark --server python {{workload}} 20 + 21 + bench-compare workload="test_flow.py" iterations="1": 22 + ./scripts/benchmark --compare {{workload}} --iterations {{iterations}}
+383
scripts/benchmark
··· 1 + #!/usr/bin/env -S uv run --script --quiet 2 + # /// script 3 + # requires-python = ">=3.12" 4 + # dependencies = ["httpx", "rich"] 5 + # /// 6 + """ 7 + benchmark runner for prefect-server 8 + 9 + compares performance between zig and python prefect servers by running 10 + the same workload against each and measuring execution time. 11 + 12 + usage: 13 + ./scripts/benchmark --server zig test_flow.py 14 + ./scripts/benchmark --server python test_flow.py 15 + ./scripts/benchmark --compare test_flow.py 16 + ./scripts/benchmark --compare scripts/test-with-client 17 + 18 + the workload script must respect PREFECT_API_URL environment variable. 19 + """ 20 + 21 + import argparse 22 + import os 23 + import shutil 24 + import signal 25 + import subprocess 26 + import sys 27 + import time 28 + from dataclasses import dataclass 29 + from pathlib import Path 30 + from typing import Literal 31 + 32 + import httpx 33 + from rich.console import Console 34 + from rich.table import Table 35 + 36 + console = Console() 37 + 38 + SERVER_PORT = 4200 39 + API_URL = f"http://localhost:{SERVER_PORT}/api" 40 + HEALTH_ENDPOINT = f"{API_URL}/health" 41 + 42 + 43 + @dataclass 44 + class BenchmarkResult: 45 + server: str 46 + workload: str 47 + duration_ms: float 48 + success: bool 49 + error: str | None = None 50 + 51 + 52 + def wait_for_health(timeout: float = 30.0) -> bool: 53 + """wait for server health endpoint to respond""" 54 + start = time.time() 55 + while time.time() - start < timeout: 56 + try: 57 + resp = httpx.get(HEALTH_ENDPOINT, timeout=2.0) 58 + if resp.status_code == 200: 59 + return True 60 + except httpx.RequestError: 61 + pass 62 + time.sleep(0.1) 63 + return False 64 + 65 + 66 + def kill_port(port: int) -> None: 67 + """kill any process listening on the given port""" 68 + try: 69 + result = subprocess.run( 70 + ["lsof", "-ti", f":{port}"], 71 + capture_output=True, 72 + text=True, 73 + ) 74 + if result.stdout.strip(): 75 + pids = result.stdout.strip().split("\n") 76 + for pid in pids: 77 + try: 78 + os.kill(int(pid), signal.SIGTERM) 79 + except (ProcessLookupError, ValueError): 80 + pass 81 + time.sleep(0.5) 82 + except Exception: 83 + pass 84 + 85 + 86 + class ServerManager: 87 + """manages starting and stopping prefect servers""" 88 + 89 + def __init__(self, server_type: Literal["zig", "python"]): 90 + self.server_type = server_type 91 + self.process: subprocess.Popen | None = None 92 + self.project_root = Path(__file__).parent.parent 93 + 94 + def start(self) -> bool: 95 + """start the server, returns True if successful""" 96 + kill_port(SERVER_PORT) 97 + 98 + if self.server_type == "zig": 99 + return self._start_zig() 100 + else: 101 + return self._start_python() 102 + 103 + def _start_zig(self) -> bool: 104 + """start the zig server""" 105 + binary = self.project_root / "zig-out" / "bin" / "prefect-server" 106 + if not binary.exists(): 107 + console.print(f"[red]error: zig binary not found at {binary}[/red]") 108 + console.print("[yellow]run 'zig build' first[/yellow]") 109 + return False 110 + 111 + # clean database for consistent benchmarks 112 + db_path = self.project_root / "prefect.db" 113 + if db_path.exists(): 114 + db_path.unlink() 115 + 116 + env = os.environ.copy() 117 + env["PREFECT_SERVER_LOGGING_LEVEL"] = "WARNING" 118 + 119 + self.process = subprocess.Popen( 120 + [str(binary)], 121 + cwd=self.project_root, 122 + env=env, 123 + stdout=subprocess.DEVNULL, 124 + stderr=subprocess.DEVNULL, 125 + ) 126 + 127 + if not wait_for_health(): 128 + self.stop() 129 + return False 130 + return True 131 + 132 + def _start_python(self) -> bool: 133 + """start the python prefect server""" 134 + # check if uvx is available 135 + if not shutil.which("uvx"): 136 + console.print("[red]error: uvx not found[/red]") 137 + return False 138 + 139 + # use a temp database for python server too 140 + temp_db = self.project_root / ".benchmark-python.db" 141 + if temp_db.exists(): 142 + temp_db.unlink() 143 + 144 + env = os.environ.copy() 145 + env["PREFECT_SERVER_DATABASE_CONNECTION_URL"] = f"sqlite+aiosqlite:///{temp_db}" 146 + env["PREFECT_SERVER_LOGGING_LEVEL"] = "WARNING" 147 + env["PREFECT_LOGGING_LEVEL"] = "WARNING" 148 + 149 + self.process = subprocess.Popen( 150 + ["uvx", "prefect", "server", "start", "--port", str(SERVER_PORT)], 151 + env=env, 152 + stdout=subprocess.DEVNULL, 153 + stderr=subprocess.DEVNULL, 154 + ) 155 + 156 + # python server takes longer to start 157 + if not wait_for_health(timeout=60.0): 158 + self.stop() 159 + return False 160 + return True 161 + 162 + def stop(self) -> None: 163 + """stop the server""" 164 + if self.process: 165 + self.process.terminate() 166 + try: 167 + self.process.wait(timeout=5) 168 + except subprocess.TimeoutExpired: 169 + self.process.kill() 170 + self.process = None 171 + 172 + # ensure port is free 173 + kill_port(SERVER_PORT) 174 + 175 + 176 + def run_workload(workload_path: str, project_root: Path) -> tuple[float, bool, str | None]: 177 + """ 178 + run a workload script and return (duration_ms, success, error_message) 179 + """ 180 + path = Path(workload_path) 181 + if not path.is_absolute(): 182 + path = project_root / path 183 + if not path.exists(): 184 + return 0, False, f"workload not found: {workload_path}" 185 + 186 + env = os.environ.copy() 187 + env["PREFECT_API_URL"] = API_URL 188 + env["PREFECT_LOGGING_LEVEL"] = "WARNING" 189 + 190 + # clear uv's virtual env vars so nested uv run uses the project's venv 191 + env.pop("VIRTUAL_ENV", None) 192 + env.pop("UV_RUN_RECURSION_DEPTH", None) 193 + 194 + # determine how to run the script 195 + if path.suffix == ".py": 196 + # run in project context so venv dependencies are available 197 + cmd = ["uv", "run", "--project", str(project_root), "python", str(path)] 198 + else: 199 + # assume it's an executable script (like scripts/test-with-client) 200 + cmd = [str(path)] 201 + 202 + start = time.perf_counter() 203 + try: 204 + result = subprocess.run( 205 + cmd, 206 + cwd=project_root, 207 + env=env, 208 + capture_output=True, 209 + text=True, 210 + timeout=300, # 5 minute timeout 211 + ) 212 + duration_ms = (time.perf_counter() - start) * 1000 213 + 214 + if result.returncode != 0: 215 + return duration_ms, False, result.stderr or result.stdout 216 + return duration_ms, True, None 217 + 218 + except subprocess.TimeoutExpired: 219 + duration_ms = (time.perf_counter() - start) * 1000 220 + return duration_ms, False, "timeout after 5 minutes" 221 + except Exception as e: 222 + duration_ms = (time.perf_counter() - start) * 1000 223 + return duration_ms, False, str(e) 224 + 225 + 226 + def benchmark_single( 227 + server_type: Literal["zig", "python"], 228 + workload: str, 229 + iterations: int = 1, 230 + ) -> list[BenchmarkResult]: 231 + """run benchmark against a single server""" 232 + results = [] 233 + 234 + console.print(f"\n[bold]benchmarking {server_type} server[/bold]") 235 + 236 + manager = ServerManager(server_type) 237 + 238 + for i in range(iterations): 239 + if iterations > 1: 240 + console.print(f" iteration {i + 1}/{iterations}...", end=" ") 241 + else: 242 + console.print(f" starting server...", end=" ") 243 + 244 + if not manager.start(): 245 + results.append(BenchmarkResult( 246 + server=server_type, 247 + workload=workload, 248 + duration_ms=0, 249 + success=False, 250 + error="failed to start server", 251 + )) 252 + console.print("[red]failed[/red]") 253 + continue 254 + 255 + console.print("running workload...", end=" ") 256 + 257 + duration_ms, success, error = run_workload(workload, manager.project_root) 258 + manager.stop() 259 + 260 + results.append(BenchmarkResult( 261 + server=server_type, 262 + workload=workload, 263 + duration_ms=duration_ms, 264 + success=success, 265 + error=error, 266 + )) 267 + 268 + if success: 269 + console.print(f"[green]{duration_ms:.0f}ms[/green]") 270 + else: 271 + console.print(f"[red]failed: {error}[/red]") 272 + 273 + return results 274 + 275 + 276 + def print_comparison(zig_results: list[BenchmarkResult], python_results: list[BenchmarkResult]): 277 + """print a comparison table""" 278 + table = Table(title="benchmark results") 279 + table.add_column("server", style="cyan") 280 + table.add_column("duration", justify="right") 281 + table.add_column("status", justify="center") 282 + 283 + def avg_duration(results: list[BenchmarkResult]) -> float: 284 + successful = [r for r in results if r.success] 285 + if not successful: 286 + return 0 287 + return sum(r.duration_ms for r in successful) / len(successful) 288 + 289 + def format_duration(ms: float) -> str: 290 + if ms < 1000: 291 + return f"{ms:.0f}ms" 292 + return f"{ms/1000:.2f}s" 293 + 294 + zig_avg = avg_duration(zig_results) 295 + python_avg = avg_duration(python_results) 296 + 297 + zig_success = sum(1 for r in zig_results if r.success) 298 + python_success = sum(1 for r in python_results if r.success) 299 + 300 + table.add_row( 301 + "zig", 302 + format_duration(zig_avg), 303 + f"{zig_success}/{len(zig_results)} ok", 304 + ) 305 + table.add_row( 306 + "python", 307 + format_duration(python_avg), 308 + f"{python_success}/{len(python_results)} ok", 309 + ) 310 + 311 + console.print() 312 + console.print(table) 313 + 314 + # show speedup if both succeeded 315 + if zig_avg > 0 and python_avg > 0: 316 + if zig_avg < python_avg: 317 + speedup = python_avg / zig_avg 318 + console.print(f"\n[bold green]zig is {speedup:.1f}x faster[/bold green]") 319 + elif python_avg < zig_avg: 320 + speedup = zig_avg / python_avg 321 + console.print(f"\n[bold yellow]python is {speedup:.1f}x faster[/bold yellow]") 322 + else: 323 + console.print("\n[bold]both servers performed equally[/bold]") 324 + 325 + 326 + def main(): 327 + parser = argparse.ArgumentParser( 328 + description="benchmark prefect servers", 329 + formatter_class=argparse.RawDescriptionHelpFormatter, 330 + epilog=""" 331 + examples: 332 + ./scripts/benchmark --server zig test_flow.py 333 + ./scripts/benchmark --server python test_flow.py 334 + ./scripts/benchmark --compare test_flow.py 335 + ./scripts/benchmark --compare test_flow.py --iterations 3 336 + """, 337 + ) 338 + parser.add_argument( 339 + "workload", 340 + help="path to workload script (must respect PREFECT_API_URL)", 341 + ) 342 + parser.add_argument( 343 + "--server", 344 + choices=["zig", "python"], 345 + help="server to benchmark (use --compare to benchmark both)", 346 + ) 347 + parser.add_argument( 348 + "--compare", 349 + action="store_true", 350 + help="benchmark both servers and compare results", 351 + ) 352 + parser.add_argument( 353 + "--iterations", 354 + type=int, 355 + default=1, 356 + help="number of iterations per server (default: 1)", 357 + ) 358 + 359 + args = parser.parse_args() 360 + 361 + if not args.server and not args.compare: 362 + parser.error("must specify --server or --compare") 363 + 364 + console.print(f"[bold]prefect-server benchmark[/bold]") 365 + console.print(f"workload: {args.workload}") 366 + if args.iterations > 1: 367 + console.print(f"iterations: {args.iterations}") 368 + 369 + if args.compare: 370 + zig_results = benchmark_single("zig", args.workload, args.iterations) 371 + python_results = benchmark_single("python", args.workload, args.iterations) 372 + print_comparison(zig_results, python_results) 373 + else: 374 + results = benchmark_single(args.server, args.workload, args.iterations) 375 + if results: 376 + successful = [r for r in results if r.success] 377 + if successful: 378 + avg = sum(r.duration_ms for r in successful) / len(successful) 379 + console.print(f"\n[bold]average: {avg:.0f}ms[/bold]") 380 + 381 + 382 + if __name__ == "__main__": 383 + main()
+7 -1
src/main.zig
··· 107 107 }; 108 108 109 109 fn onRequest(r: zap.Request) !void { 110 + const method = r.method orelse "?"; 111 + const path = r.path orelse "/"; 112 + 110 113 routes.handle(r) catch |err| { 111 - log.err("server", "request error: {}", .{err}); 114 + log.err("server", "{s} {s} - error: {}", .{ method, path, err }); 112 115 r.setStatus(.internal_server_error); 113 116 r.sendBody("{\"detail\":\"internal error\"}") catch {}; 117 + return; 114 118 }; 119 + 120 + log.debug("server", "{s} {s}", .{ method, path }); 115 121 } 116 122 117 123 fn onUpgrade(r: zap.Request, target_protocol: []const u8) !void {
+2 -2
test_flow.py
··· 1 1 """ 2 - test script for prefect-zig server 2 + test script for prefect-server 3 3 4 4 tests: 5 5 1. basic flow + task execution ··· 18 18 from prefect.transactions import transaction 19 19 20 20 # use a temp dir for result storage so tests are isolated 21 - RESULT_DIR = Path(tempfile.mkdtemp(prefix="prefect-zig-test-")) 21 + RESULT_DIR = Path(tempfile.mkdtemp(prefix="prefect-server-test-")) 22 22 23 23 24 24 # --- basic tasks ---