prefect server in zig
1#!/usr/bin/env -S uv run --script --quiet
2# /// script
3# requires-python = ">=3.12"
4# dependencies = ["httpx", "rich", "psutil"]
5# ///
6"""
7Performance benchmark for prefect-server.
8
9Measures actual API throughput and latency with concurrent requests.
10Separate from functional tests (test-api-sequence).
11
12Usage:
13 ./scripts/benchmark --server zig
14 ./scripts/benchmark --server python
15 ./scripts/benchmark --compare
16 ./scripts/benchmark --matrix
17"""
18
19import argparse
20import os
21import shutil
22import signal
23import statistics
24import subprocess
25import time
26from dataclasses import dataclass
27from pathlib import Path
28from typing import Literal
29
30import httpx
31import psutil
32from rich.console import Console
33from rich.table import Table
34
35console = Console()
36
37SERVER_PORT = 4200
38API_URL = f"http://localhost:{SERVER_PORT}/api"
39
40# benchmark configuration
41WARMUP_REQUESTS = 10
42BENCHMARK_DURATION = 5.0 # seconds
43CONCURRENT_CLIENTS = 4
44
45
46@dataclass
47class LatencyStats:
48 min_ms: float
49 avg_ms: float
50 p50_ms: float
51 p95_ms: float
52 p99_ms: float
53 max_ms: float
54
55
56@dataclass
57class EndpointResult:
58 name: str
59 requests: int
60 rps: float
61 latency: LatencyStats
62
63
64@dataclass
65class BenchmarkResult:
66 server: str
67 success: bool
68 memory_mb: float
69 endpoints: list[EndpointResult]
70 db_backend: str = "sqlite"
71 broker_backend: str = "memory"
72 error: str | None = None
73
74
75def percentile(data: list[float], p: float) -> float:
76 """Calculate percentile of sorted data."""
77 if not data:
78 return 0.0
79 k = (len(data) - 1) * p / 100
80 f = int(k)
81 c = f + 1 if f + 1 < len(data) else f
82 return data[f] + (k - f) * (data[c] - data[f])
83
84
85def calculate_latency_stats(latencies_ms: list[float]) -> LatencyStats:
86 """Calculate latency statistics from a list of latencies."""
87 if not latencies_ms:
88 return LatencyStats(0, 0, 0, 0, 0, 0)
89 sorted_lat = sorted(latencies_ms)
90 return LatencyStats(
91 min_ms=sorted_lat[0],
92 avg_ms=statistics.mean(sorted_lat),
93 p50_ms=percentile(sorted_lat, 50),
94 p95_ms=percentile(sorted_lat, 95),
95 p99_ms=percentile(sorted_lat, 99),
96 max_ms=sorted_lat[-1],
97 )
98
99
100def wait_for_health(timeout: float = 30.0) -> bool:
101 """Wait for server health endpoint."""
102 start = time.time()
103 while time.time() - start < timeout:
104 try:
105 resp = httpx.get(f"{API_URL}/health", timeout=2.0)
106 if resp.status_code == 200:
107 return True
108 except httpx.RequestError:
109 pass
110 time.sleep(0.1)
111 return False
112
113
114def kill_port(port: int) -> None:
115 """Kill any process on the given port."""
116 try:
117 result = subprocess.run(["lsof", "-ti", f":{port}"], capture_output=True, text=True)
118 if result.stdout.strip():
119 for pid in result.stdout.strip().split("\n"):
120 try:
121 os.kill(int(pid), signal.SIGTERM)
122 except (ProcessLookupError, ValueError):
123 pass
124 time.sleep(0.5)
125 except Exception:
126 pass
127
128
129def get_process_memory_mb(pid: int) -> float:
130 """Get memory usage of a process and all children in MB."""
131 try:
132 proc = psutil.Process(pid)
133 total = proc.memory_info().rss
134 for child in proc.children(recursive=True):
135 try:
136 total += child.memory_info().rss
137 except (psutil.NoSuchProcess, psutil.AccessDenied):
138 pass
139 return total / (1024 * 1024)
140 except (psutil.NoSuchProcess, psutil.AccessDenied):
141 return 0.0
142
143
144class ServerManager:
145 """Manages server lifecycle."""
146
147 def __init__(
148 self,
149 server_type: Literal["zig", "python"],
150 db_backend: str = "sqlite",
151 broker_backend: str = "memory",
152 ):
153 self.server_type = server_type
154 self.db_backend = db_backend
155 self.broker_backend = broker_backend
156 self.process: subprocess.Popen | None = None
157 self.project_root = Path(__file__).parent.parent
158 self.startup_error: str | None = None
159
160 def start(self) -> bool:
161 """Start the server."""
162 kill_port(SERVER_PORT)
163
164 if self.server_type == "zig":
165 return self._start_zig()
166 return self._start_python()
167
168 def _reset_postgres(self) -> None:
169 """Reset postgres schema for clean test state."""
170 try:
171 subprocess.run(
172 [
173 "docker", "compose", "exec", "-T", "postgres",
174 "psql", "-U", "prefect", "-c",
175 "DROP SCHEMA public CASCADE; CREATE SCHEMA public; GRANT ALL ON SCHEMA public TO prefect;",
176 ],
177 cwd=self.project_root,
178 capture_output=True,
179 check=True,
180 )
181 except subprocess.CalledProcessError:
182 pass # postgres might not be running yet, that's ok
183
184 def _start_zig(self) -> bool:
185 binary = self.project_root / "zig-out" / "bin" / "prefect-server"
186 if not binary.exists():
187 console.print(f"[red]error: zig binary not found at {binary}[/red]")
188 return False
189
190 if self.db_backend == "sqlite":
191 db_path = self.project_root / "prefect.db"
192 if db_path.exists():
193 db_path.unlink()
194 elif self.db_backend == "postgres":
195 # reset postgres schema for clean state
196 self._reset_postgres()
197
198 env = os.environ.copy()
199 env["PREFECT_SERVER_LOGGING_LEVEL"] = "WARNING"
200 env["PREFECT_DATABASE_BACKEND"] = self.db_backend
201 env["PREFECT_BROKER_BACKEND"] = self.broker_backend
202 if self.db_backend == "postgres":
203 env.setdefault("PREFECT_DATABASE_URL", "postgresql://prefect:prefect@localhost:5432/prefect")
204
205 # first attempt: DEVNULL to avoid buffer issues during benchmark
206 self.process = subprocess.Popen(
207 [str(binary)],
208 cwd=self.project_root,
209 env=env,
210 stdout=subprocess.DEVNULL,
211 stderr=subprocess.DEVNULL,
212 )
213
214 if not wait_for_health():
215 self.stop()
216 # restart with output capture to get error message
217 self.process = subprocess.Popen(
218 [str(binary)],
219 cwd=self.project_root,
220 env=env,
221 stdout=subprocess.PIPE,
222 stderr=subprocess.STDOUT,
223 text=True,
224 )
225 time.sleep(2) # give server time to fail
226 if self.process.poll() is not None:
227 output, _ = self.process.communicate()
228 self.startup_error = output.strip()[-500:] if output else "server exited"
229 else:
230 self.startup_error = "health check timeout"
231 self.stop()
232 return False
233 return True
234
235 def _start_python(self) -> bool:
236 if not shutil.which("uvx"):
237 console.print("[red]error: uvx not found[/red]")
238 return False
239
240 temp_db = self.project_root / ".benchmark-python.db"
241 if temp_db.exists():
242 temp_db.unlink()
243
244 env = os.environ.copy()
245 env["PREFECT_SERVER_DATABASE_CONNECTION_URL"] = f"sqlite+aiosqlite:///{temp_db}"
246 env["PREFECT_SERVER_LOGGING_LEVEL"] = "WARNING"
247 env["PREFECT_LOGGING_LEVEL"] = "WARNING"
248
249 self.process = subprocess.Popen(
250 ["uvx", "prefect", "server", "start", "--port", str(SERVER_PORT)],
251 env=env,
252 stdout=subprocess.DEVNULL,
253 stderr=subprocess.DEVNULL,
254 )
255
256 if not wait_for_health(timeout=60.0):
257 self.stop()
258 return False
259 return True
260
261 def get_memory_mb(self) -> float:
262 if self.process:
263 return get_process_memory_mb(self.process.pid)
264 return 0.0
265
266 def stop(self) -> None:
267 if self.process:
268 self.process.terminate()
269 try:
270 self.process.wait(timeout=5)
271 except subprocess.TimeoutExpired:
272 self.process.kill()
273 self.process = None
274 kill_port(SERVER_PORT)
275
276
277def benchmark_endpoint(
278 client: httpx.Client,
279 name: str,
280 method: str,
281 path: str,
282 json_body: dict | None = None,
283 setup_fn=None,
284) -> EndpointResult:
285 """Benchmark a single endpoint."""
286 # setup if needed
287 setup_data = {}
288 if setup_fn:
289 setup_data = setup_fn(client)
290
291 # format path with setup data
292 formatted_path = path.format(**setup_data) if setup_data else path
293
294 # warmup
295 for _ in range(WARMUP_REQUESTS):
296 try:
297 if method == "GET":
298 client.get(formatted_path)
299 else:
300 client.post(formatted_path, json=json_body or {})
301 except httpx.RequestError:
302 pass
303
304 # benchmark
305 latencies: list[float] = []
306 start_time = time.perf_counter()
307 request_count = 0
308
309 while time.perf_counter() - start_time < BENCHMARK_DURATION:
310 req_start = time.perf_counter()
311 try:
312 if method == "GET":
313 resp = client.get(formatted_path)
314 else:
315 resp = client.post(formatted_path, json=json_body or {})
316 if resp.status_code in (200, 201, 204):
317 latencies.append((time.perf_counter() - req_start) * 1000)
318 request_count += 1
319 except httpx.RequestError:
320 pass
321
322 elapsed = time.perf_counter() - start_time
323 rps = request_count / elapsed if elapsed > 0 else 0
324
325 return EndpointResult(
326 name=name,
327 requests=request_count,
328 rps=rps,
329 latency=calculate_latency_stats(latencies),
330 )
331
332
333def run_benchmark(manager: ServerManager) -> BenchmarkResult:
334 """Run the full benchmark suite."""
335 client = httpx.Client(base_url=API_URL, timeout=10.0)
336 endpoints: list[EndpointResult] = []
337
338 try:
339 # health endpoint (baseline)
340 endpoints.append(benchmark_endpoint(client, "health", "GET", "/health"))
341
342 # create a flow for subsequent tests
343 resp = client.post("/flows/", json={"name": "bench-flow"})
344 if resp.status_code not in (200, 201):
345 return BenchmarkResult(
346 server=manager.server_type,
347 success=False,
348 memory_mb=manager.get_memory_mb(),
349 endpoints=[],
350 db_backend=manager.db_backend,
351 broker_backend=manager.broker_backend,
352 error="failed to create test flow",
353 )
354 flow_id = resp.json()["id"]
355
356 # create a flow run for read tests
357 resp = client.post("/flow_runs/", json={
358 "flow_id": flow_id,
359 "name": "bench-run",
360 "state": {"type": "PENDING", "name": "Pending"},
361 })
362 if resp.status_code not in (200, 201):
363 return BenchmarkResult(
364 server=manager.server_type,
365 success=False,
366 memory_mb=manager.get_memory_mb(),
367 endpoints=[],
368 db_backend=manager.db_backend,
369 broker_backend=manager.broker_backend,
370 error="failed to create test flow run",
371 )
372 flow_run_id = resp.json()["id"]
373
374 # GET /flows/{id}
375 endpoints.append(benchmark_endpoint(
376 client, "GET flow", "GET", f"/flows/{flow_id}"
377 ))
378
379 # POST /flows/filter
380 endpoints.append(benchmark_endpoint(
381 client, "filter flows", "POST", "/flows/filter", {"limit": 10}
382 ))
383
384 # GET /flow_runs/{id}
385 endpoints.append(benchmark_endpoint(
386 client, "GET flow_run", "GET", f"/flow_runs/{flow_run_id}"
387 ))
388
389 # POST /flow_runs/filter
390 endpoints.append(benchmark_endpoint(
391 client, "filter flow_runs", "POST", "/flow_runs/filter", {"limit": 10}
392 ))
393
394 # POST /flow_runs/ (create)
395 import uuid
396 def create_flow_run():
397 return benchmark_endpoint(
398 client, "create flow_run", "POST", "/flow_runs/",
399 {"flow_id": flow_id, "name": f"bench-{uuid.uuid4().hex[:8]}",
400 "state": {"type": "PENDING", "name": "Pending"}}
401 )
402 endpoints.append(create_flow_run())
403
404 memory_mb = manager.get_memory_mb()
405
406 return BenchmarkResult(
407 server=manager.server_type,
408 success=True,
409 memory_mb=memory_mb,
410 endpoints=endpoints,
411 db_backend=manager.db_backend,
412 broker_backend=manager.broker_backend,
413 )
414
415 except Exception as e:
416 return BenchmarkResult(
417 server=manager.server_type,
418 success=False,
419 memory_mb=manager.get_memory_mb(),
420 endpoints=endpoints,
421 db_backend=manager.db_backend,
422 broker_backend=manager.broker_backend,
423 error=str(e),
424 )
425 finally:
426 client.close()
427
428
429def print_result(result: BenchmarkResult) -> None:
430 """Print benchmark results for a single server."""
431 if not result.success:
432 console.print(f"[red]benchmark failed: {result.error}[/red]")
433 return
434
435 table = Table(title=f"{result.server} ({result.db_backend}/{result.broker_backend})")
436 table.add_column("endpoint", style="cyan")
437 table.add_column("reqs", justify="right")
438 table.add_column("rps", justify="right")
439 table.add_column("avg", justify="right")
440 table.add_column("p50", justify="right")
441 table.add_column("p95", justify="right")
442 table.add_column("p99", justify="right")
443
444 for ep in result.endpoints:
445 table.add_row(
446 ep.name,
447 str(ep.requests),
448 f"{ep.rps:.0f}",
449 f"{ep.latency.avg_ms:.2f}ms",
450 f"{ep.latency.p50_ms:.2f}ms",
451 f"{ep.latency.p95_ms:.2f}ms",
452 f"{ep.latency.p99_ms:.2f}ms",
453 )
454
455 console.print(table)
456 console.print(f"memory: {result.memory_mb:.1f}MB\n")
457
458
459def print_comparison(zig: BenchmarkResult, python: BenchmarkResult) -> None:
460 """Print comparison between zig and python."""
461 console.print()
462 table = Table(title="zig vs python comparison")
463 table.add_column("endpoint", style="cyan")
464 table.add_column("zig rps", justify="right")
465 table.add_column("python rps", justify="right")
466 table.add_column("speedup", justify="right")
467 table.add_column("zig p50", justify="right")
468 table.add_column("python p50", justify="right")
469
470 zig_eps = {ep.name: ep for ep in zig.endpoints}
471 python_eps = {ep.name: ep for ep in python.endpoints}
472
473 total_zig_rps = 0
474 total_python_rps = 0
475
476 for name in zig_eps:
477 if name not in python_eps:
478 continue
479 z, p = zig_eps[name], python_eps[name]
480 speedup = z.rps / p.rps if p.rps > 0 else 0
481
482 total_zig_rps += z.rps
483 total_python_rps += p.rps
484
485 speedup_str = f"[green]{speedup:.1f}x[/green]" if speedup >= 1.5 else (
486 f"[dim]{speedup:.1f}x[/dim]" if speedup >= 1 else f"[yellow]{speedup:.1f}x[/yellow]"
487 )
488
489 table.add_row(
490 name,
491 f"{z.rps:.0f}",
492 f"{p.rps:.0f}",
493 speedup_str,
494 f"{z.latency.p50_ms:.2f}ms",
495 f"{p.latency.p50_ms:.2f}ms",
496 )
497
498 console.print(table)
499
500 # summary
501 overall_speedup = total_zig_rps / total_python_rps if total_python_rps > 0 else 0
502 mem_ratio = python.memory_mb / zig.memory_mb if zig.memory_mb > 0 else 0
503
504 console.print()
505 summary = Table(title="summary")
506 summary.add_column("metric", style="cyan")
507 summary.add_column("zig", justify="right")
508 summary.add_column("python", justify="right")
509 summary.add_column("advantage", justify="right")
510
511 summary.add_row(
512 "total rps",
513 f"{total_zig_rps:.0f}",
514 f"{total_python_rps:.0f}",
515 f"[green]{overall_speedup:.1f}x faster[/green]" if overall_speedup >= 1 else f"[yellow]{1/overall_speedup:.1f}x slower[/yellow]",
516 )
517 summary.add_row(
518 "memory",
519 f"{zig.memory_mb:.1f}MB",
520 f"{python.memory_mb:.1f}MB",
521 f"[green]{mem_ratio:.1f}x smaller[/green]" if mem_ratio >= 1 else f"[yellow]{1/mem_ratio:.1f}x larger[/yellow]",
522 )
523
524 console.print(summary)
525
526
527def ensure_docker_services(db_backend: str, broker_backend: str, project_root: Path) -> bool:
528 """Start required docker services."""
529 services = []
530 if db_backend == "postgres":
531 services.append("postgres")
532 if broker_backend == "redis":
533 services.append("redis")
534
535 if not services:
536 return True
537
538 try:
539 subprocess.run(
540 ["docker", "compose", "up", "-d"] + services,
541 cwd=project_root,
542 capture_output=True,
543 check=True,
544 )
545 time.sleep(2 if "postgres" in services else 1)
546 return True
547 except subprocess.CalledProcessError as e:
548 console.print(f"[red]failed to start docker services: {e}[/red]")
549 return False
550
551
552def run_single(
553 server_type: Literal["zig", "python"],
554 db_backend: str = "sqlite",
555 broker_backend: str = "memory",
556) -> BenchmarkResult | None:
557 """Run benchmark for a single server configuration."""
558 backend_label = f"({db_backend}/{broker_backend})" if server_type == "zig" else ""
559 console.print(f"\n[bold]benchmarking {server_type} server {backend_label}[/bold]")
560 console.print(f" starting server...", end=" ")
561
562 manager = ServerManager(server_type, db_backend, broker_backend)
563 if not manager.start():
564 console.print("[red]failed[/red]")
565 if manager.startup_error:
566 console.print(f"[dim]{manager.startup_error}[/dim]")
567 return BenchmarkResult(
568 server=server_type,
569 success=False,
570 memory_mb=0,
571 endpoints=[],
572 db_backend=db_backend,
573 broker_backend=broker_backend,
574 error=manager.startup_error or "failed to start server",
575 )
576
577 console.print("running benchmark...", end=" ")
578 result = run_benchmark(manager)
579 manager.stop()
580
581 if result.success:
582 total_rps = sum(ep.rps for ep in result.endpoints)
583 console.print(f"[green]{total_rps:.0f} total rps[/green] ({result.memory_mb:.1f}MB)")
584 else:
585 console.print(f"[red]failed: {result.error}[/red]")
586
587 return result
588
589
590def print_matrix_results(results: list[BenchmarkResult]) -> None:
591 """Print matrix benchmark results."""
592 console.print()
593 table = Table(title="benchmark matrix (zig server)")
594 table.add_column("db", style="cyan")
595 table.add_column("broker", style="cyan")
596 table.add_column("total rps", justify="right")
597 table.add_column("memory", justify="right")
598 table.add_column("status", justify="center")
599
600 for r in results:
601 if r.success:
602 total_rps = sum(ep.rps for ep in r.endpoints)
603 status = "[green]✓[/green]"
604 table.add_row(r.db_backend, r.broker_backend, f"{total_rps:.0f}", f"{r.memory_mb:.1f}MB", status)
605 else:
606 table.add_row(r.db_backend, r.broker_backend, "-", "-", f"[red]✗ {r.error or 'failed'}[/red]")
607
608 console.print(table)
609
610
611def main():
612 parser = argparse.ArgumentParser(description="benchmark prefect servers")
613 parser.add_argument("--server", choices=["zig", "python"], help="server to benchmark")
614 parser.add_argument("--compare", action="store_true", help="compare zig vs python")
615 parser.add_argument("--matrix", action="store_true", help="run all db × broker combinations (zig only)")
616 parser.add_argument("--db-backend", choices=["sqlite", "postgres"], default="sqlite")
617 parser.add_argument("--broker-backend", choices=["memory", "redis"], default="memory")
618 parser.add_argument("--duration", type=float, default=5.0, help="benchmark duration per endpoint (seconds)")
619
620 args = parser.parse_args()
621
622 if not args.server and not args.compare and not args.matrix:
623 parser.error("must specify --server, --compare, or --matrix")
624
625 global BENCHMARK_DURATION
626 BENCHMARK_DURATION = args.duration
627
628 console.print(f"[bold]prefect-server benchmark[/bold]")
629 console.print(f"duration: {BENCHMARK_DURATION}s per endpoint")
630
631 project_root = Path(__file__).parent.parent
632
633 if args.matrix:
634 combinations = [
635 ("sqlite", "memory"),
636 ("sqlite", "redis"),
637 ("postgres", "memory"),
638 ("postgres", "redis"),
639 ]
640
641 results = []
642 for db_backend, broker_backend in combinations:
643 if not ensure_docker_services(db_backend, broker_backend, project_root):
644 results.append(BenchmarkResult(
645 server="zig", success=False, memory_mb=0, endpoints=[],
646 db_backend=db_backend, broker_backend=broker_backend,
647 error="docker service failed",
648 ))
649 continue
650 result = run_single("zig", db_backend, broker_backend)
651 if result:
652 results.append(result)
653
654 print_matrix_results(results)
655
656 elif args.compare:
657 if not ensure_docker_services(args.db_backend, args.broker_backend, project_root):
658 return
659
660 zig_result = run_single("zig", args.db_backend, args.broker_backend)
661 python_result = run_single("python")
662
663 if zig_result and python_result and zig_result.success and python_result.success:
664 print_comparison(zig_result, python_result)
665 else:
666 if zig_result:
667 print_result(zig_result)
668 if python_result:
669 print_result(python_result)
670
671 else:
672 if not ensure_docker_services(args.db_backend, args.broker_backend, project_root):
673 return
674 result = run_single(args.server, args.db_backend, args.broker_backend)
675 if result:
676 print_result(result)
677
678
679if __name__ == "__main__":
680 main()