prefect server in zig
at f511403a2b901063559cd17995b45527418e76c6 51 lines 1.4 kB view raw
1#!/usr/bin/env -S uv run --script --quiet 2# /// script 3# requires-python = ">=3.12" 4# dependencies = ["websockets"] 5# /// 6"""test websocket ping/pong and message handling""" 7import asyncio 8import json 9import uuid 10from datetime import datetime, timezone 11 12async def test_websocket(): 13 import websockets 14 15 uri = "ws://localhost:4200/api/events/in" 16 print(f"Connecting to {uri}...") 17 18 try: 19 async with websockets.connect(uri) as ws: 20 print("Connected!") 21 22 # Test ping/pong (protocol level) 23 print("Sending ping...") 24 pong = await ws.ping() 25 await pong 26 print("Pong received!") 27 28 # Send a test event 29 event = { 30 "id": str(uuid.uuid4()), 31 "occurred": datetime.now(timezone.utc).isoformat(), 32 "event": "prefect.test.event", 33 "resource": { 34 "prefect.resource.id": "test-resource-123" 35 }, 36 "payload": {"test": True} 37 } 38 39 print(f"Sending event: {event['event']}") 40 await ws.send(json.dumps(event)) 41 print("Event sent!") 42 43 # Keep connection open briefly 44 await asyncio.sleep(0.5) 45 print("Done!") 46 47 except Exception as e: 48 print(f"Error: {type(e).__name__}: {e}") 49 50if __name__ == "__main__": 51 asyncio.run(test_websocket())