#!/usr/bin/env -S uv run --script --quiet # /// script # requires-python = ">=3.12" # dependencies = ["atproto"] # /// """ consume the firehose from an atproto relay and print events. usage: ./scripts/firehose ./scripts/firehose --duration 30 ./scripts/firehose --relay-url wss://bsky.network ./scripts/firehose --collection app.bsky.feed.like """ import argparse import signal import time from collections import defaultdict from atproto import ( CAR, AtUri, FirehoseSubscribeReposClient, firehose_models, models, parse_subscribe_repos_message, ) def get_ops_by_type( commit: models.ComAtprotoSyncSubscribeRepos.Commit, collections: set[str], ) -> defaultdict: ops = defaultdict(lambda: {"created": [], "deleted": []}) car = CAR.from_bytes(commit.blocks) for op in commit.ops: uri = AtUri.from_str(f"at://{commit.repo}/{op.path}") if op.action == "create" and op.cid: raw = car.blocks.get(op.cid) if not raw: continue if collections and uri.collection not in collections: continue record = models.get_or_create(raw, strict=False) ops[uri.collection]["created"].append( {"record": record, "uri": str(uri), "author": commit.repo} ) elif op.action == "delete": ops[uri.collection]["deleted"].append({"uri": str(uri)}) return ops def main(): parser = argparse.ArgumentParser(description="consume an atproto relay firehose") parser.add_argument( "--relay-url", default="wss://relay.waow.tech", help="relay websocket url (default: wss://relay.waow.tech)", ) parser.add_argument( "--duration", type=int, default=10, help="seconds to consume (default: 10, 0 = forever)", ) parser.add_argument( "--collection", action="append", default=None, help="filter by collection (default: app.bsky.feed.post). repeatable.", ) args = parser.parse_args() collections = set(args.collection) if args.collection else {"app.bsky.feed.post"} deadline = time.time() + args.duration if args.duration > 0 else float("inf") counts: dict[str, int] = defaultdict(int) total = 0 base_uri = args.relay_url.rstrip("/") + "/xrpc" client = FirehoseSubscribeReposClient(base_uri=base_uri) def stop(*_): client.stop() signal.signal(signal.SIGINT, stop) def on_message(message: firehose_models.MessageFrame) -> None: nonlocal total if time.time() >= deadline: client.stop() return commit = parse_subscribe_repos_message(message) if not isinstance(commit, models.ComAtprotoSyncSubscribeRepos.Commit): return if not commit.blocks: return ops = get_ops_by_type(commit, collections) for collection, actions in ops.items(): for item in actions["created"]: record = item["record"] author = item["author"] counts[collection] += 1 total += 1 if collection == "app.bsky.feed.post": text = getattr(record, "text", "") inline = text.replace("\n", " ")[:120] print(f"[{author}] {inline}") elif collection == "app.bsky.feed.like": subject = getattr(record, "subject", None) uri = getattr(subject, "uri", "?") if subject else "?" print(f"[{author}] liked {uri}") elif collection == "app.bsky.graph.follow": subject = getattr(record, "subject", "?") print(f"[{author}] followed {subject}") else: print(f"[{author}] {collection}") print(f"consuming from {args.relay_url} for {args.duration}s...") print(f"filtering: {', '.join(collections)}") print() client.start(on_message) print() print(f"--- {total} events in {args.duration}s ---") for col, count in sorted(counts.items()): print(f" {col}: {count}") if __name__ == "__main__": main()