#!/usr/bin/env -S uv run --script --quiet # /// script # requires-python = ">=3.12" # dependencies = ["websockets"] # /// """test websocket ping/pong and message handling""" import asyncio import json import uuid from datetime import datetime, timezone async def test_websocket(): import websockets uri = "ws://localhost:4200/api/events/in" print(f"Connecting to {uri}...") try: async with websockets.connect(uri) as ws: print("Connected!") # Test ping/pong (protocol level) print("Sending ping...") pong = await ws.ping() await pong print("Pong received!") # Send a test event event = { "id": str(uuid.uuid4()), "occurred": datetime.now(timezone.utc).isoformat(), "event": "prefect.test.event", "resource": { "prefect.resource.id": "test-resource-123" }, "payload": {"test": True} } print(f"Sending event: {event['event']}") await ws.send(json.dumps(event)) print("Event sent!") # Keep connection open briefly await asyncio.sleep(0.5) print("Done!") except Exception as e: print(f"Error: {type(e).__name__}: {e}") if __name__ == "__main__": asyncio.run(test_websocket())