declarative relay deployment on hetzner relay.waow.tech
atproto

the micro-PDS trick#

a technique for injecting specific (DID, collection) pairs into the collectiondir without crawling entire PDS hosts.

the problem#

the collectiondir indexes (DID, collection) pairs via two paths:

  1. firehose — sees new create/update commits in real time. only indexes the specific collection in each commit.
  2. requestCrawl — crawls a PDS by paginating com.atproto.sync.listRepos then calling com.atproto.repo.describeRepo for each DID. indexes all collections for every repo on that host.

the backfill uses path 2. but bsky PDS shards have ~500K repos each and enforce a shared IP-based rate limit of 3,000 requests per 300 seconds (~10 QPS). the crawl code has no retry on 429 — a single rate limit response kills the entire crawl. at the default 100 QPS, the crawl dies after processing ~2-3K repos (0.6% of the shard). even at 8 QPS, running multiple shard crawls in parallel exhausts the shared budget.

this leaves a gap: repos the relay knows about (firehose is current, revs match) but the collectiondir never indexed because the crawl died before reaching them.

the trick#

the collectiondir's requestCrawl doesn't know or care what it's crawling. it talks XRPC to whatever hostname you give it. so: stand up a tiny HTTP server that implements just the two required endpoints — listRepos and describeRepo — with only the specific DIDs you need, and point requestCrawl at it.

22 DIDs with 784 collection pairs, indexed in 3 seconds. no rate limits, because it's our server talking to our server.

how to do it#

1. gather the data#

for each missing DID, resolve its PDS via plc.directory and call describeRepo to get its collections:

import json, urllib.request

did = "did:plc:example"
doc = json.loads(urllib.request.urlopen(f"https://plc.directory/{did}").read())
pds = [s["serviceEndpoint"] for s in doc["service"] if s["id"] == "#atproto_pds"][0]
desc = json.loads(urllib.request.urlopen(f"{pds}/xrpc/com.atproto.repo.describeRepo?repo={did}").read())
# desc["collections"] is what we need

save all results to a JSON file keyed by DID:

{
  "did:plc:example": {
    "handle": "user.bsky.social",
    "did": "did:plc:example",
    "collections": ["app.bsky.feed.post", "io.atcr.sailor.profile", "..."]
  }
}

2. deploy as a k8s Job#

create a ConfigMap with the data and a Job that:

  • starts an HTTP server implementing listRepos and describeRepo
  • calls requestCrawl on the collectiondir pointing at itself (http://POD_IP:8080)
  • polls crawlStatus until the crawl drains
  • exits
# create the configmap
kubectl create configmap micro-pds-data \
    --namespace relay \
    --from-file=dids.json=missing-dids.json

# create the job (see below for manifest)
kubectl apply -f micro-pds-job.yaml

# watch it run
kubectl logs -n relay job/micro-pds-crawl -f

# clean up
kubectl delete job micro-pds-crawl configmap micro-pds-data -n relay

3. the Job manifest#

apiVersion: batch/v1
kind: Job
metadata:
  name: micro-pds-crawl
  namespace: relay
spec:
  backoffLimit: 0
  activeDeadlineSeconds: 120
  template:
    spec:
      restartPolicy: Never
      containers:
        - name: micro-pds
          image: python:3.12-alpine
          env:
            - name: PYTHONUNBUFFERED
              value: "1"
            - name: POD_IP
              valueFrom:
                fieldRef:
                  fieldPath: status.podIP
            - name: ADMIN_TOKEN
              valueFrom:
                secretKeyRef:
                  name: collectiondir-secret
                  key: COLLECTIONS_ADMIN_TOKEN
          volumeMounts:
            - name: data
              mountPath: /data
          command:
            - python3
            - -c
            - |
              import json, os, sys, time, urllib.request
              from http.server import HTTPServer, BaseHTTPRequestHandler
              from urllib.parse import urlparse, parse_qs
              from threading import Thread

              PORT = 8080
              POD_IP = os.environ["POD_IP"]
              ADMIN_TOKEN = os.environ["ADMIN_TOKEN"]
              COLLECTIONDIR = "http://collectiondir.relay.svc.cluster.local:2510"

              with open("/data/dids.json") as f:
                  DID_DATA = json.load(f)

              DIDS = list(DID_DATA.keys())
              print(f"micro-pds: serving {len(DIDS)} DIDs")

              class Handler(BaseHTTPRequestHandler):
                  def log_message(self, *args):
                      pass

                  def do_GET(self):
                      parsed = urlparse(self.path)
                      params = parse_qs(parsed.query)

                      if parsed.path == "/xrpc/com.atproto.sync.listRepos":
                          cursor = params.get("cursor", [""])[0]
                          limit = int(params.get("limit", ["1000"])[0])
                          start = int(cursor) if cursor else 0
                          batch = DIDS[start:start + limit]
                          repos = [{"did": d, "head": "baf", "rev": "0", "active": True} for d in batch]
                          resp = {"repos": repos}
                          if start + limit < len(DIDS):
                              resp["cursor"] = str(start + limit)
                          self.send_response(200)
                          self.send_header("Content-Type", "application/json")
                          self.end_headers()
                          self.wfile.write(json.dumps(resp).encode())

                      elif parsed.path == "/xrpc/com.atproto.repo.describeRepo":
                          repo = params.get("repo", [""])[0]
                          if repo in DID_DATA:
                              info = DID_DATA[repo]
                              resp = {
                                  "handle": info.get("handle", "unknown"),
                                  "did": repo,
                                  "didDoc": {},
                                  "collections": info["collections"],
                                  "handleIsCorrect": True,
                              }
                              self.send_response(200)
                              self.send_header("Content-Type", "application/json")
                              self.end_headers()
                              self.wfile.write(json.dumps(resp).encode())
                          else:
                              self.send_response(404)
                              self.end_headers()
                      else:
                          self.send_response(404)
                          self.end_headers()

              server = HTTPServer(("0.0.0.0", PORT), Handler)
              Thread(target=server.serve_forever, daemon=True).start()
              print(f"micro-pds: listening on {POD_IP}:{PORT}")
              time.sleep(1)

              # trigger crawl
              payload = json.dumps({"hostname": f"http://{POD_IP}:{PORT}"}).encode()
              req = urllib.request.Request(
                  f"{COLLECTIONDIR}/admin/pds/requestCrawl",
                  data=payload,
                  headers={"Content-Type": "application/json", "Authorization": f"Bearer {ADMIN_TOKEN}"},
                  method="POST",
              )
              with urllib.request.urlopen(req, timeout=10) as resp:
                  print(f"micro-pds: requestCrawl -> {resp.status}")

              # wait for drain
              while True:
                  time.sleep(2)
                  req = urllib.request.Request(
                      f"{COLLECTIONDIR}/admin/crawlStatus",
                      headers={"Authorization": f"Bearer {ADMIN_TOKEN}"},
                  )
                  with urllib.request.urlopen(req, timeout=10) as resp:
                      status = json.loads(resp.read())
                  if not status.get("host_starts"):
                      break

              print("micro-pds: done")
              server.shutdown()
      volumes:
        - name: data
          configMap:
            name: micro-pds-data

when to use this#

  • you have a specific set of DIDs that need indexing and can't wait for a full shard crawl
  • the full crawl is blocked by rate limits
  • you've verified the DIDs exist on their PDS (call describeRepo yourself first)

limitations#

  • only indexes what you give it. if you miss a DID, it won't be indexed.
  • the collectiondir treats this like any other crawl — it writes to the main pebble DB via the normal ingestCrawl pathway. no special code paths, no risk to existing data.
  • the data you gather is a point-in-time snapshot. if a user adds a new collection after you gathered the data, the micro-PDS won't know about it (but the firehose will catch it going forward).