declarative relay deployment on hetzner relay.waow.tech
atproto
at main 145 lines 4.3 kB view raw
1#!/usr/bin/env -S uv run --script --quiet 2# /// script 3# requires-python = ">=3.12" 4# dependencies = ["atproto"] 5# /// 6""" 7consume the firehose from an atproto relay and print events. 8 9usage: 10 ./scripts/firehose 11 ./scripts/firehose --duration 30 12 ./scripts/firehose --relay-url wss://bsky.network 13 ./scripts/firehose --collection app.bsky.feed.like 14""" 15 16import argparse 17import signal 18import time 19from collections import defaultdict 20 21from atproto import ( 22 CAR, 23 AtUri, 24 FirehoseSubscribeReposClient, 25 firehose_models, 26 models, 27 parse_subscribe_repos_message, 28) 29 30 31def get_ops_by_type( 32 commit: models.ComAtprotoSyncSubscribeRepos.Commit, 33 collections: set[str], 34) -> defaultdict: 35 ops = defaultdict(lambda: {"created": [], "deleted": []}) 36 37 car = CAR.from_bytes(commit.blocks) 38 for op in commit.ops: 39 uri = AtUri.from_str(f"at://{commit.repo}/{op.path}") 40 41 if op.action == "create" and op.cid: 42 raw = car.blocks.get(op.cid) 43 if not raw: 44 continue 45 46 if collections and uri.collection not in collections: 47 continue 48 49 record = models.get_or_create(raw, strict=False) 50 ops[uri.collection]["created"].append( 51 {"record": record, "uri": str(uri), "author": commit.repo} 52 ) 53 54 elif op.action == "delete": 55 ops[uri.collection]["deleted"].append({"uri": str(uri)}) 56 57 return ops 58 59 60def main(): 61 parser = argparse.ArgumentParser(description="consume an atproto relay firehose") 62 parser.add_argument( 63 "--relay-url", 64 default="wss://relay.waow.tech", 65 help="relay websocket url (default: wss://relay.waow.tech)", 66 ) 67 parser.add_argument( 68 "--duration", 69 type=int, 70 default=10, 71 help="seconds to consume (default: 10, 0 = forever)", 72 ) 73 parser.add_argument( 74 "--collection", 75 action="append", 76 default=None, 77 help="filter by collection (default: app.bsky.feed.post). repeatable.", 78 ) 79 args = parser.parse_args() 80 81 collections = set(args.collection) if args.collection else {"app.bsky.feed.post"} 82 deadline = time.time() + args.duration if args.duration > 0 else float("inf") 83 84 counts: dict[str, int] = defaultdict(int) 85 total = 0 86 87 base_uri = args.relay_url.rstrip("/") + "/xrpc" 88 client = FirehoseSubscribeReposClient(base_uri=base_uri) 89 90 def stop(*_): 91 client.stop() 92 93 signal.signal(signal.SIGINT, stop) 94 95 def on_message(message: firehose_models.MessageFrame) -> None: 96 nonlocal total 97 98 if time.time() >= deadline: 99 client.stop() 100 return 101 102 commit = parse_subscribe_repos_message(message) 103 if not isinstance(commit, models.ComAtprotoSyncSubscribeRepos.Commit): 104 return 105 106 if not commit.blocks: 107 return 108 109 ops = get_ops_by_type(commit, collections) 110 111 for collection, actions in ops.items(): 112 for item in actions["created"]: 113 record = item["record"] 114 author = item["author"] 115 counts[collection] += 1 116 total += 1 117 118 if collection == "app.bsky.feed.post": 119 text = getattr(record, "text", "") 120 inline = text.replace("\n", " ")[:120] 121 print(f"[{author}] {inline}") 122 elif collection == "app.bsky.feed.like": 123 subject = getattr(record, "subject", None) 124 uri = getattr(subject, "uri", "?") if subject else "?" 125 print(f"[{author}] liked {uri}") 126 elif collection == "app.bsky.graph.follow": 127 subject = getattr(record, "subject", "?") 128 print(f"[{author}] followed {subject}") 129 else: 130 print(f"[{author}] {collection}") 131 132 print(f"consuming from {args.relay_url} for {args.duration}s...") 133 print(f"filtering: {', '.join(collections)}") 134 print() 135 136 client.start(on_message) 137 138 print() 139 print(f"--- {total} events in {args.duration}s ---") 140 for col, count in sorted(counts.items()): 141 print(f" {col}: {count}") 142 143 144if __name__ == "__main__": 145 main()