#!/usr/bin/env -S uv run --script --quiet # /// script # requires-python = ">=3.12" # dependencies = ["websockets"] # /// """ consume events from an atproto jetstream endpoint and print them. jetstream re-encodes the relay's CBOR firehose as plain JSON over websockets, so no atproto SDK is needed. usage: ./scripts/jetstream ./scripts/jetstream --duration 30 ./scripts/jetstream --url wss://jetstream1.us-east.bsky.network ./scripts/jetstream --collection app.bsky.feed.like ./scripts/jetstream --collection app.bsky.feed.post --collection app.bsky.feed.like """ import argparse import json import signal import time from collections import defaultdict from urllib.parse import urlencode import websockets.sync.client as ws def format_event(event: dict) -> str | None: """format a jetstream event for display. returns None to skip.""" kind = event.get("kind") did = event.get("did", "?") if kind == "commit": commit = event.get("commit", {}) collection = commit.get("collection", "") operation = commit.get("operation", "") record = commit.get("record", {}) if operation == "delete": return None if collection == "app.bsky.feed.post": text = record.get("text", "") inline = text.replace("\n", " ")[:120] return f"[{did}] {inline}" elif collection == "app.bsky.feed.like": uri = record.get("subject", {}).get("uri", "?") return f"[{did}] liked {uri}" elif collection == "app.bsky.graph.follow": subject = record.get("subject", "?") return f"[{did}] followed {subject}" else: return f"[{did}] {collection}#{operation}" elif kind == "identity": handle = event.get("identity", {}).get("handle", "?") return f"[{did}] identity -> {handle}" return None def main(): parser = argparse.ArgumentParser(description="consume an atproto jetstream") parser.add_argument( "--url", default="wss://jetstream.waow.tech", help="jetstream base url (default: wss://jetstream.waow.tech)", ) parser.add_argument( "--duration", type=int, default=10, help="seconds to consume (default: 10, 0 = forever)", ) parser.add_argument( "--collection", action="append", default=None, help="filter by collection (repeatable). omit for all events.", ) args = parser.parse_args() # build subscribe url with wantedCollections query params params = {} if args.collection: params["wantedCollections"] = args.collection query = urlencode(params, doseq=True) url = f"{args.url.rstrip('/')}/subscribe" if query: url = f"{url}?{query}" deadline = time.time() + args.duration if args.duration > 0 else float("inf") counts: dict[str, int] = defaultdict(int) total = 0 stopping = False def stop(*_): nonlocal stopping stopping = True signal.signal(signal.SIGINT, stop) collections_desc = ", ".join(args.collection) if args.collection else "all" duration_desc = f"{args.duration}s" if args.duration > 0 else "forever" print(f"consuming from {args.url} for {duration_desc}...") print(f"filtering: {collections_desc}") print() try: with ws.connect(url) as conn: while not stopping: if time.time() >= deadline: break try: raw = conn.recv(timeout=1.0) except TimeoutError: continue event = json.loads(raw) kind = event.get("kind", "unknown") collection = "" if kind == "commit": collection = event.get("commit", {}).get("collection", "unknown") counts[collection] += 1 else: counts[kind] += 1 total += 1 line = format_event(event) if line: print(line) except Exception as e: print(f"\nerror: {e}") print() print(f"--- {total} events ---") for col, count in sorted(counts.items()): print(f" {col}: {count}") if __name__ == "__main__": main()