declarative relay deployment on hetzner
relay.waow.tech
atproto
1#!/usr/bin/env -S uv run --script --quiet
2# /// script
3# requires-python = ">=3.12"
4# dependencies = ["atproto"]
5# ///
6"""
7consume the firehose from an atproto relay and print events.
8
9usage:
10 ./scripts/firehose
11 ./scripts/firehose --duration 30
12 ./scripts/firehose --relay-url wss://bsky.network
13 ./scripts/firehose --collection app.bsky.feed.like
14"""
15
16import argparse
17import signal
18import time
19from collections import defaultdict
20
21from atproto import (
22 CAR,
23 AtUri,
24 FirehoseSubscribeReposClient,
25 firehose_models,
26 models,
27 parse_subscribe_repos_message,
28)
29
30
31def get_ops_by_type(
32 commit: models.ComAtprotoSyncSubscribeRepos.Commit,
33 collections: set[str],
34) -> defaultdict:
35 ops = defaultdict(lambda: {"created": [], "deleted": []})
36
37 car = CAR.from_bytes(commit.blocks)
38 for op in commit.ops:
39 uri = AtUri.from_str(f"at://{commit.repo}/{op.path}")
40
41 if op.action == "create" and op.cid:
42 raw = car.blocks.get(op.cid)
43 if not raw:
44 continue
45
46 if collections and uri.collection not in collections:
47 continue
48
49 record = models.get_or_create(raw, strict=False)
50 ops[uri.collection]["created"].append(
51 {"record": record, "uri": str(uri), "author": commit.repo}
52 )
53
54 elif op.action == "delete":
55 ops[uri.collection]["deleted"].append({"uri": str(uri)})
56
57 return ops
58
59
60def main():
61 parser = argparse.ArgumentParser(description="consume an atproto relay firehose")
62 parser.add_argument(
63 "--relay-url",
64 default="wss://relay.waow.tech",
65 help="relay websocket url (default: wss://relay.waow.tech)",
66 )
67 parser.add_argument(
68 "--duration",
69 type=int,
70 default=10,
71 help="seconds to consume (default: 10, 0 = forever)",
72 )
73 parser.add_argument(
74 "--collection",
75 action="append",
76 default=None,
77 help="filter by collection (default: app.bsky.feed.post). repeatable.",
78 )
79 args = parser.parse_args()
80
81 collections = set(args.collection) if args.collection else {"app.bsky.feed.post"}
82 deadline = time.time() + args.duration if args.duration > 0 else float("inf")
83
84 counts: dict[str, int] = defaultdict(int)
85 total = 0
86
87 base_uri = args.relay_url.rstrip("/") + "/xrpc"
88 client = FirehoseSubscribeReposClient(base_uri=base_uri)
89
90 def stop(*_):
91 client.stop()
92
93 signal.signal(signal.SIGINT, stop)
94
95 def on_message(message: firehose_models.MessageFrame) -> None:
96 nonlocal total
97
98 if time.time() >= deadline:
99 client.stop()
100 return
101
102 commit = parse_subscribe_repos_message(message)
103 if not isinstance(commit, models.ComAtprotoSyncSubscribeRepos.Commit):
104 return
105
106 if not commit.blocks:
107 return
108
109 ops = get_ops_by_type(commit, collections)
110
111 for collection, actions in ops.items():
112 for item in actions["created"]:
113 record = item["record"]
114 author = item["author"]
115 counts[collection] += 1
116 total += 1
117
118 if collection == "app.bsky.feed.post":
119 text = getattr(record, "text", "")
120 inline = text.replace("\n", " ")[:120]
121 print(f"[{author}] {inline}")
122 elif collection == "app.bsky.feed.like":
123 subject = getattr(record, "subject", None)
124 uri = getattr(subject, "uri", "?") if subject else "?"
125 print(f"[{author}] liked {uri}")
126 elif collection == "app.bsky.graph.follow":
127 subject = getattr(record, "subject", "?")
128 print(f"[{author}] followed {subject}")
129 else:
130 print(f"[{author}] {collection}")
131
132 print(f"consuming from {args.relay_url} for {args.duration}s...")
133 print(f"filtering: {', '.join(collections)}")
134 print()
135
136 client.start(on_message)
137
138 print()
139 print(f"--- {total} events in {args.duration}s ---")
140 for col, count in sorted(counts.items()):
141 print(f" {col}: {count}")
142
143
144if __name__ == "__main__":
145 main()