the micro-PDS trick#
a technique for injecting specific (DID, collection) pairs into the collectiondir without crawling entire PDS hosts.
the problem#
the collectiondir indexes (DID, collection) pairs via two paths:
- firehose — sees new
create/updatecommits in real time. only indexes the specific collection in each commit. - requestCrawl — crawls a PDS by paginating
com.atproto.sync.listReposthen callingcom.atproto.repo.describeRepofor each DID. indexes all collections for every repo on that host.
the backfill uses path 2. but bsky PDS shards have ~500K repos each and enforce a shared IP-based rate limit of 3,000 requests per 300 seconds (~10 QPS). the crawl code has no retry on 429 — a single rate limit response kills the entire crawl. at the default 100 QPS, the crawl dies after processing ~2-3K repos (0.6% of the shard). even at 8 QPS, running multiple shard crawls in parallel exhausts the shared budget.
this leaves a gap: repos the relay knows about (firehose is current, revs match) but the collectiondir never indexed because the crawl died before reaching them.
the trick#
the collectiondir's requestCrawl doesn't know or care what it's crawling. it talks XRPC to whatever hostname you give it. so: stand up a tiny HTTP server that implements just the two required endpoints — listRepos and describeRepo — with only the specific DIDs you need, and point requestCrawl at it.
22 DIDs with 784 collection pairs, indexed in 3 seconds. no rate limits, because it's our server talking to our server.
how to do it#
1. gather the data#
for each missing DID, resolve its PDS via plc.directory and call describeRepo to get its collections:
import json, urllib.request
did = "did:plc:example"
doc = json.loads(urllib.request.urlopen(f"https://plc.directory/{did}").read())
pds = [s["serviceEndpoint"] for s in doc["service"] if s["id"] == "#atproto_pds"][0]
desc = json.loads(urllib.request.urlopen(f"{pds}/xrpc/com.atproto.repo.describeRepo?repo={did}").read())
# desc["collections"] is what we need
save all results to a JSON file keyed by DID:
{
"did:plc:example": {
"handle": "user.bsky.social",
"did": "did:plc:example",
"collections": ["app.bsky.feed.post", "io.atcr.sailor.profile", "..."]
}
}
2. deploy as a k8s Job#
create a ConfigMap with the data and a Job that:
- starts an HTTP server implementing
listReposanddescribeRepo - calls
requestCrawlon the collectiondir pointing at itself (http://POD_IP:8080) - polls
crawlStatusuntil the crawl drains - exits
# create the configmap
kubectl create configmap micro-pds-data \
--namespace relay \
--from-file=dids.json=missing-dids.json
# create the job (see below for manifest)
kubectl apply -f micro-pds-job.yaml
# watch it run
kubectl logs -n relay job/micro-pds-crawl -f
# clean up
kubectl delete job micro-pds-crawl configmap micro-pds-data -n relay
3. the Job manifest#
apiVersion: batch/v1
kind: Job
metadata:
name: micro-pds-crawl
namespace: relay
spec:
backoffLimit: 0
activeDeadlineSeconds: 120
template:
spec:
restartPolicy: Never
containers:
- name: micro-pds
image: python:3.12-alpine
env:
- name: PYTHONUNBUFFERED
value: "1"
- name: POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
- name: ADMIN_TOKEN
valueFrom:
secretKeyRef:
name: collectiondir-secret
key: COLLECTIONS_ADMIN_TOKEN
volumeMounts:
- name: data
mountPath: /data
command:
- python3
- -c
- |
import json, os, sys, time, urllib.request
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import urlparse, parse_qs
from threading import Thread
PORT = 8080
POD_IP = os.environ["POD_IP"]
ADMIN_TOKEN = os.environ["ADMIN_TOKEN"]
COLLECTIONDIR = "http://collectiondir.relay.svc.cluster.local:2510"
with open("/data/dids.json") as f:
DID_DATA = json.load(f)
DIDS = list(DID_DATA.keys())
print(f"micro-pds: serving {len(DIDS)} DIDs")
class Handler(BaseHTTPRequestHandler):
def log_message(self, *args):
pass
def do_GET(self):
parsed = urlparse(self.path)
params = parse_qs(parsed.query)
if parsed.path == "/xrpc/com.atproto.sync.listRepos":
cursor = params.get("cursor", [""])[0]
limit = int(params.get("limit", ["1000"])[0])
start = int(cursor) if cursor else 0
batch = DIDS[start:start + limit]
repos = [{"did": d, "head": "baf", "rev": "0", "active": True} for d in batch]
resp = {"repos": repos}
if start + limit < len(DIDS):
resp["cursor"] = str(start + limit)
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(resp).encode())
elif parsed.path == "/xrpc/com.atproto.repo.describeRepo":
repo = params.get("repo", [""])[0]
if repo in DID_DATA:
info = DID_DATA[repo]
resp = {
"handle": info.get("handle", "unknown"),
"did": repo,
"didDoc": {},
"collections": info["collections"],
"handleIsCorrect": True,
}
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(resp).encode())
else:
self.send_response(404)
self.end_headers()
else:
self.send_response(404)
self.end_headers()
server = HTTPServer(("0.0.0.0", PORT), Handler)
Thread(target=server.serve_forever, daemon=True).start()
print(f"micro-pds: listening on {POD_IP}:{PORT}")
time.sleep(1)
# trigger crawl
payload = json.dumps({"hostname": f"http://{POD_IP}:{PORT}"}).encode()
req = urllib.request.Request(
f"{COLLECTIONDIR}/admin/pds/requestCrawl",
data=payload,
headers={"Content-Type": "application/json", "Authorization": f"Bearer {ADMIN_TOKEN}"},
method="POST",
)
with urllib.request.urlopen(req, timeout=10) as resp:
print(f"micro-pds: requestCrawl -> {resp.status}")
# wait for drain
while True:
time.sleep(2)
req = urllib.request.Request(
f"{COLLECTIONDIR}/admin/crawlStatus",
headers={"Authorization": f"Bearer {ADMIN_TOKEN}"},
)
with urllib.request.urlopen(req, timeout=10) as resp:
status = json.loads(resp.read())
if not status.get("host_starts"):
break
print("micro-pds: done")
server.shutdown()
volumes:
- name: data
configMap:
name: micro-pds-data
when to use this#
- you have a specific set of DIDs that need indexing and can't wait for a full shard crawl
- the full crawl is blocked by rate limits
- you've verified the DIDs exist on their PDS (call
describeRepoyourself first)
limitations#
- only indexes what you give it. if you miss a DID, it won't be indexed.
- the collectiondir treats this like any other crawl — it writes to the main pebble DB via the normal
ingestCrawlpathway. no special code paths, no risk to existing data. - the data you gather is a point-in-time snapshot. if a user adds a new collection after you gathered the data, the micro-PDS won't know about it (but the firehose will catch it going forward).