prefect server in zig
1#!/usr/bin/env -S uv run --script --quiet
2# /// script
3# requires-python = ">=3.12"
4# dependencies = ["websockets"]
5# ///
6"""test websocket ping/pong and message handling"""
7import asyncio
8import json
9import uuid
10from datetime import datetime, timezone
11
12async def test_websocket():
13 import websockets
14
15 uri = "ws://localhost:4200/api/events/in"
16 print(f"Connecting to {uri}...")
17
18 try:
19 async with websockets.connect(uri) as ws:
20 print("Connected!")
21
22 # Test ping/pong (protocol level)
23 print("Sending ping...")
24 pong = await ws.ping()
25 await pong
26 print("Pong received!")
27
28 # Send a test event
29 event = {
30 "id": str(uuid.uuid4()),
31 "occurred": datetime.now(timezone.utc).isoformat(),
32 "event": "prefect.test.event",
33 "resource": {
34 "prefect.resource.id": "test-resource-123"
35 },
36 "payload": {"test": True}
37 }
38
39 print(f"Sending event: {event['event']}")
40 await ws.send(json.dumps(event))
41 print("Event sent!")
42
43 # Keep connection open briefly
44 await asyncio.sleep(0.5)
45 print("Done!")
46
47 except Exception as e:
48 print(f"Error: {type(e).__name__}: {e}")
49
50if __name__ == "__main__":
51 asyncio.run(test_websocket())