declarative relay deployment on hetzner
relay.waow.tech
atproto
1# the micro-PDS trick
2
3a technique for injecting specific (DID, collection) pairs into the collectiondir without crawling entire PDS hosts.
4
5## the problem
6
7the collectiondir indexes (DID, collection) pairs via two paths:
8
91. **firehose** — sees new `create`/`update` commits in real time. only indexes the specific collection in each commit.
102. **requestCrawl** — crawls a PDS by paginating `com.atproto.sync.listRepos` then calling `com.atproto.repo.describeRepo` for each DID. indexes all collections for every repo on that host.
11
12the backfill uses path 2. but bsky PDS shards have ~500K repos each and enforce a shared IP-based rate limit of 3,000 requests per 300 seconds (~10 QPS). the crawl code has no retry on 429 — a single rate limit response kills the entire crawl. at the default 100 QPS, the crawl dies after processing ~2-3K repos (0.6% of the shard). even at 8 QPS, running multiple shard crawls in parallel exhausts the shared budget.
13
14this leaves a gap: repos the relay knows about (firehose is current, revs match) but the collectiondir never indexed because the crawl died before reaching them.
15
16## the trick
17
18the collectiondir's `requestCrawl` doesn't know or care what it's crawling. it talks XRPC to whatever hostname you give it. so: stand up a tiny HTTP server that implements just the two required endpoints — `listRepos` and `describeRepo` — with only the specific DIDs you need, and point `requestCrawl` at it.
19
2022 DIDs with 784 collection pairs, indexed in 3 seconds. no rate limits, because it's our server talking to our server.
21
22## how to do it
23
24### 1. gather the data
25
26for each missing DID, resolve its PDS via `plc.directory` and call `describeRepo` to get its collections:
27
28```python
29import json, urllib.request
30
31did = "did:plc:example"
32doc = json.loads(urllib.request.urlopen(f"https://plc.directory/{did}").read())
33pds = [s["serviceEndpoint"] for s in doc["service"] if s["id"] == "#atproto_pds"][0]
34desc = json.loads(urllib.request.urlopen(f"{pds}/xrpc/com.atproto.repo.describeRepo?repo={did}").read())
35# desc["collections"] is what we need
36```
37
38save all results to a JSON file keyed by DID:
39
40```json
41{
42 "did:plc:example": {
43 "handle": "user.bsky.social",
44 "did": "did:plc:example",
45 "collections": ["app.bsky.feed.post", "io.atcr.sailor.profile", "..."]
46 }
47}
48```
49
50### 2. deploy as a k8s Job
51
52create a ConfigMap with the data and a Job that:
53- starts an HTTP server implementing `listRepos` and `describeRepo`
54- calls `requestCrawl` on the collectiondir pointing at itself (`http://POD_IP:8080`)
55- polls `crawlStatus` until the crawl drains
56- exits
57
58```bash
59# create the configmap
60kubectl create configmap micro-pds-data \
61 --namespace relay \
62 --from-file=dids.json=missing-dids.json
63
64# create the job (see below for manifest)
65kubectl apply -f micro-pds-job.yaml
66
67# watch it run
68kubectl logs -n relay job/micro-pds-crawl -f
69
70# clean up
71kubectl delete job micro-pds-crawl configmap micro-pds-data -n relay
72```
73
74### 3. the Job manifest
75
76```yaml
77apiVersion: batch/v1
78kind: Job
79metadata:
80 name: micro-pds-crawl
81 namespace: relay
82spec:
83 backoffLimit: 0
84 activeDeadlineSeconds: 120
85 template:
86 spec:
87 restartPolicy: Never
88 containers:
89 - name: micro-pds
90 image: python:3.12-alpine
91 env:
92 - name: PYTHONUNBUFFERED
93 value: "1"
94 - name: POD_IP
95 valueFrom:
96 fieldRef:
97 fieldPath: status.podIP
98 - name: ADMIN_TOKEN
99 valueFrom:
100 secretKeyRef:
101 name: collectiondir-secret
102 key: COLLECTIONS_ADMIN_TOKEN
103 volumeMounts:
104 - name: data
105 mountPath: /data
106 command:
107 - python3
108 - -c
109 - |
110 import json, os, sys, time, urllib.request
111 from http.server import HTTPServer, BaseHTTPRequestHandler
112 from urllib.parse import urlparse, parse_qs
113 from threading import Thread
114
115 PORT = 8080
116 POD_IP = os.environ["POD_IP"]
117 ADMIN_TOKEN = os.environ["ADMIN_TOKEN"]
118 COLLECTIONDIR = "http://collectiondir.relay.svc.cluster.local:2510"
119
120 with open("/data/dids.json") as f:
121 DID_DATA = json.load(f)
122
123 DIDS = list(DID_DATA.keys())
124 print(f"micro-pds: serving {len(DIDS)} DIDs")
125
126 class Handler(BaseHTTPRequestHandler):
127 def log_message(self, *args):
128 pass
129
130 def do_GET(self):
131 parsed = urlparse(self.path)
132 params = parse_qs(parsed.query)
133
134 if parsed.path == "/xrpc/com.atproto.sync.listRepos":
135 cursor = params.get("cursor", [""])[0]
136 limit = int(params.get("limit", ["1000"])[0])
137 start = int(cursor) if cursor else 0
138 batch = DIDS[start:start + limit]
139 repos = [{"did": d, "head": "baf", "rev": "0", "active": True} for d in batch]
140 resp = {"repos": repos}
141 if start + limit < len(DIDS):
142 resp["cursor"] = str(start + limit)
143 self.send_response(200)
144 self.send_header("Content-Type", "application/json")
145 self.end_headers()
146 self.wfile.write(json.dumps(resp).encode())
147
148 elif parsed.path == "/xrpc/com.atproto.repo.describeRepo":
149 repo = params.get("repo", [""])[0]
150 if repo in DID_DATA:
151 info = DID_DATA[repo]
152 resp = {
153 "handle": info.get("handle", "unknown"),
154 "did": repo,
155 "didDoc": {},
156 "collections": info["collections"],
157 "handleIsCorrect": True,
158 }
159 self.send_response(200)
160 self.send_header("Content-Type", "application/json")
161 self.end_headers()
162 self.wfile.write(json.dumps(resp).encode())
163 else:
164 self.send_response(404)
165 self.end_headers()
166 else:
167 self.send_response(404)
168 self.end_headers()
169
170 server = HTTPServer(("0.0.0.0", PORT), Handler)
171 Thread(target=server.serve_forever, daemon=True).start()
172 print(f"micro-pds: listening on {POD_IP}:{PORT}")
173 time.sleep(1)
174
175 # trigger crawl
176 payload = json.dumps({"hostname": f"http://{POD_IP}:{PORT}"}).encode()
177 req = urllib.request.Request(
178 f"{COLLECTIONDIR}/admin/pds/requestCrawl",
179 data=payload,
180 headers={"Content-Type": "application/json", "Authorization": f"Bearer {ADMIN_TOKEN}"},
181 method="POST",
182 )
183 with urllib.request.urlopen(req, timeout=10) as resp:
184 print(f"micro-pds: requestCrawl -> {resp.status}")
185
186 # wait for drain
187 while True:
188 time.sleep(2)
189 req = urllib.request.Request(
190 f"{COLLECTIONDIR}/admin/crawlStatus",
191 headers={"Authorization": f"Bearer {ADMIN_TOKEN}"},
192 )
193 with urllib.request.urlopen(req, timeout=10) as resp:
194 status = json.loads(resp.read())
195 if not status.get("host_starts"):
196 break
197
198 print("micro-pds: done")
199 server.shutdown()
200 volumes:
201 - name: data
202 configMap:
203 name: micro-pds-data
204```
205
206## when to use this
207
208- you have a specific set of DIDs that need indexing and can't wait for a full shard crawl
209- the full crawl is blocked by rate limits
210- you've verified the DIDs exist on their PDS (call `describeRepo` yourself first)
211
212## limitations
213
214- only indexes what you give it. if you miss a DID, it won't be indexed.
215- the collectiondir treats this like any other crawl — it writes to the main pebble DB via the normal `ingestCrawl` pathway. no special code paths, no risk to existing data.
216- the data you gather is a point-in-time snapshot. if a user adds a new collection after you gathered the data, the micro-PDS won't know about it (but the firehose will catch it going forward).