declarative relay deployment on hetzner relay.waow.tech
atproto
at main 216 lines 8.8 kB view raw view rendered
1# the micro-PDS trick 2 3a technique for injecting specific (DID, collection) pairs into the collectiondir without crawling entire PDS hosts. 4 5## the problem 6 7the collectiondir indexes (DID, collection) pairs via two paths: 8 91. **firehose** — sees new `create`/`update` commits in real time. only indexes the specific collection in each commit. 102. **requestCrawl** — crawls a PDS by paginating `com.atproto.sync.listRepos` then calling `com.atproto.repo.describeRepo` for each DID. indexes all collections for every repo on that host. 11 12the backfill uses path 2. but bsky PDS shards have ~500K repos each and enforce a shared IP-based rate limit of 3,000 requests per 300 seconds (~10 QPS). the crawl code has no retry on 429 — a single rate limit response kills the entire crawl. at the default 100 QPS, the crawl dies after processing ~2-3K repos (0.6% of the shard). even at 8 QPS, running multiple shard crawls in parallel exhausts the shared budget. 13 14this leaves a gap: repos the relay knows about (firehose is current, revs match) but the collectiondir never indexed because the crawl died before reaching them. 15 16## the trick 17 18the collectiondir's `requestCrawl` doesn't know or care what it's crawling. it talks XRPC to whatever hostname you give it. so: stand up a tiny HTTP server that implements just the two required endpoints — `listRepos` and `describeRepo` — with only the specific DIDs you need, and point `requestCrawl` at it. 19 2022 DIDs with 784 collection pairs, indexed in 3 seconds. no rate limits, because it's our server talking to our server. 21 22## how to do it 23 24### 1. gather the data 25 26for each missing DID, resolve its PDS via `plc.directory` and call `describeRepo` to get its collections: 27 28```python 29import json, urllib.request 30 31did = "did:plc:example" 32doc = json.loads(urllib.request.urlopen(f"https://plc.directory/{did}").read()) 33pds = [s["serviceEndpoint"] for s in doc["service"] if s["id"] == "#atproto_pds"][0] 34desc = json.loads(urllib.request.urlopen(f"{pds}/xrpc/com.atproto.repo.describeRepo?repo={did}").read()) 35# desc["collections"] is what we need 36``` 37 38save all results to a JSON file keyed by DID: 39 40```json 41{ 42 "did:plc:example": { 43 "handle": "user.bsky.social", 44 "did": "did:plc:example", 45 "collections": ["app.bsky.feed.post", "io.atcr.sailor.profile", "..."] 46 } 47} 48``` 49 50### 2. deploy as a k8s Job 51 52create a ConfigMap with the data and a Job that: 53- starts an HTTP server implementing `listRepos` and `describeRepo` 54- calls `requestCrawl` on the collectiondir pointing at itself (`http://POD_IP:8080`) 55- polls `crawlStatus` until the crawl drains 56- exits 57 58```bash 59# create the configmap 60kubectl create configmap micro-pds-data \ 61 --namespace relay \ 62 --from-file=dids.json=missing-dids.json 63 64# create the job (see below for manifest) 65kubectl apply -f micro-pds-job.yaml 66 67# watch it run 68kubectl logs -n relay job/micro-pds-crawl -f 69 70# clean up 71kubectl delete job micro-pds-crawl configmap micro-pds-data -n relay 72``` 73 74### 3. the Job manifest 75 76```yaml 77apiVersion: batch/v1 78kind: Job 79metadata: 80 name: micro-pds-crawl 81 namespace: relay 82spec: 83 backoffLimit: 0 84 activeDeadlineSeconds: 120 85 template: 86 spec: 87 restartPolicy: Never 88 containers: 89 - name: micro-pds 90 image: python:3.12-alpine 91 env: 92 - name: PYTHONUNBUFFERED 93 value: "1" 94 - name: POD_IP 95 valueFrom: 96 fieldRef: 97 fieldPath: status.podIP 98 - name: ADMIN_TOKEN 99 valueFrom: 100 secretKeyRef: 101 name: collectiondir-secret 102 key: COLLECTIONS_ADMIN_TOKEN 103 volumeMounts: 104 - name: data 105 mountPath: /data 106 command: 107 - python3 108 - -c 109 - | 110 import json, os, sys, time, urllib.request 111 from http.server import HTTPServer, BaseHTTPRequestHandler 112 from urllib.parse import urlparse, parse_qs 113 from threading import Thread 114 115 PORT = 8080 116 POD_IP = os.environ["POD_IP"] 117 ADMIN_TOKEN = os.environ["ADMIN_TOKEN"] 118 COLLECTIONDIR = "http://collectiondir.relay.svc.cluster.local:2510" 119 120 with open("/data/dids.json") as f: 121 DID_DATA = json.load(f) 122 123 DIDS = list(DID_DATA.keys()) 124 print(f"micro-pds: serving {len(DIDS)} DIDs") 125 126 class Handler(BaseHTTPRequestHandler): 127 def log_message(self, *args): 128 pass 129 130 def do_GET(self): 131 parsed = urlparse(self.path) 132 params = parse_qs(parsed.query) 133 134 if parsed.path == "/xrpc/com.atproto.sync.listRepos": 135 cursor = params.get("cursor", [""])[0] 136 limit = int(params.get("limit", ["1000"])[0]) 137 start = int(cursor) if cursor else 0 138 batch = DIDS[start:start + limit] 139 repos = [{"did": d, "head": "baf", "rev": "0", "active": True} for d in batch] 140 resp = {"repos": repos} 141 if start + limit < len(DIDS): 142 resp["cursor"] = str(start + limit) 143 self.send_response(200) 144 self.send_header("Content-Type", "application/json") 145 self.end_headers() 146 self.wfile.write(json.dumps(resp).encode()) 147 148 elif parsed.path == "/xrpc/com.atproto.repo.describeRepo": 149 repo = params.get("repo", [""])[0] 150 if repo in DID_DATA: 151 info = DID_DATA[repo] 152 resp = { 153 "handle": info.get("handle", "unknown"), 154 "did": repo, 155 "didDoc": {}, 156 "collections": info["collections"], 157 "handleIsCorrect": True, 158 } 159 self.send_response(200) 160 self.send_header("Content-Type", "application/json") 161 self.end_headers() 162 self.wfile.write(json.dumps(resp).encode()) 163 else: 164 self.send_response(404) 165 self.end_headers() 166 else: 167 self.send_response(404) 168 self.end_headers() 169 170 server = HTTPServer(("0.0.0.0", PORT), Handler) 171 Thread(target=server.serve_forever, daemon=True).start() 172 print(f"micro-pds: listening on {POD_IP}:{PORT}") 173 time.sleep(1) 174 175 # trigger crawl 176 payload = json.dumps({"hostname": f"http://{POD_IP}:{PORT}"}).encode() 177 req = urllib.request.Request( 178 f"{COLLECTIONDIR}/admin/pds/requestCrawl", 179 data=payload, 180 headers={"Content-Type": "application/json", "Authorization": f"Bearer {ADMIN_TOKEN}"}, 181 method="POST", 182 ) 183 with urllib.request.urlopen(req, timeout=10) as resp: 184 print(f"micro-pds: requestCrawl -> {resp.status}") 185 186 # wait for drain 187 while True: 188 time.sleep(2) 189 req = urllib.request.Request( 190 f"{COLLECTIONDIR}/admin/crawlStatus", 191 headers={"Authorization": f"Bearer {ADMIN_TOKEN}"}, 192 ) 193 with urllib.request.urlopen(req, timeout=10) as resp: 194 status = json.loads(resp.read()) 195 if not status.get("host_starts"): 196 break 197 198 print("micro-pds: done") 199 server.shutdown() 200 volumes: 201 - name: data 202 configMap: 203 name: micro-pds-data 204``` 205 206## when to use this 207 208- you have a specific set of DIDs that need indexing and can't wait for a full shard crawl 209- the full crawl is blocked by rate limits 210- you've verified the DIDs exist on their PDS (call `describeRepo` yourself first) 211 212## limitations 213 214- only indexes what you give it. if you miss a DID, it won't be indexed. 215- the collectiondir treats this like any other crawl — it writes to the main pebble DB via the normal `ingestCrawl` pathway. no special code paths, no risk to existing data. 216- the data you gather is a point-in-time snapshot. if a user adds a new collection after you gathered the data, the micro-PDS won't know about it (but the firehose will catch it going forward).