this repo has no description
1/**
2 * Minimal AT Protocol Personal Data Server (PDS)
3 *
4 * A single-file implementation of an ATProto PDS for Cloudflare Workers
5 * with Durable Objects. Implements the core protocol primitives:
6 *
7 * - CBOR/DAG-CBOR encoding for content-addressed data
8 * - CID generation (CIDv1 with dag-cbor + sha-256)
9 * - Merkle Search Tree (MST) for repository structure
10 * - P-256 signing with low-S normalization
11 * - CAR file building for repo sync
12 * - XRPC endpoints for repo operations and sync
13 *
14 * @see https://atproto.com
15 */
16
17// === CONSTANTS ===
18// CBOR primitive markers (RFC 8949)
19const CBOR_FALSE = 0xf4;
20const CBOR_TRUE = 0xf5;
21const CBOR_NULL = 0xf6;
22
23// DAG-CBOR CID link tag
24const CBOR_TAG_CID = 42;
25
26// === ERROR HELPER ===
27function errorResponse(error, message, status) {
28 return Response.json({ error, message }, { status });
29}
30
31// === CRAWLER NOTIFICATION ===
32// Notify relays to come crawl us after writes (like official PDS)
33let lastCrawlNotify = 0;
34const CRAWL_NOTIFY_THRESHOLD = 20 * 60 * 1000; // 20 minutes (matches official PDS)
35
36async function notifyCrawlers(env, hostname) {
37 const now = Date.now();
38 if (now - lastCrawlNotify < CRAWL_NOTIFY_THRESHOLD) {
39 return; // Throttle notifications
40 }
41
42 const relayHost = env.RELAY_HOST;
43 if (!relayHost) return;
44
45 lastCrawlNotify = now;
46
47 // Fire and forget - don't block writes on relay notification
48 fetch(`${relayHost}/xrpc/com.atproto.sync.requestCrawl`, {
49 method: 'POST',
50 headers: { 'Content-Type': 'application/json' },
51 body: JSON.stringify({ hostname }),
52 }).catch((err) => {
53 console.log('Failed to notify relay:', err.message);
54 });
55}
56
57// === CID WRAPPER ===
58// Explicit CID type for DAG-CBOR encoding (avoids fragile heuristic detection)
59
60class CID {
61 constructor(bytes) {
62 if (!(bytes instanceof Uint8Array)) {
63 throw new Error('CID must be constructed with Uint8Array');
64 }
65 this.bytes = bytes;
66 }
67}
68
69// === CBOR ENCODING ===
70// Minimal deterministic CBOR (RFC 8949) - sorted keys, minimal integers
71
72/**
73 * Encode CBOR type header (major type + length)
74 * @param {number[]} parts - Array to push bytes to
75 * @param {number} majorType - CBOR major type (0-7)
76 * @param {number} length - Value or length to encode
77 */
78function encodeHead(parts, majorType, length) {
79 const mt = majorType << 5;
80 if (length < 24) {
81 parts.push(mt | length);
82 } else if (length < 256) {
83 parts.push(mt | 24, length);
84 } else if (length < 65536) {
85 parts.push(mt | 25, length >> 8, length & 0xff);
86 } else if (length < 4294967296) {
87 // Use Math.floor instead of bitshift to avoid 32-bit signed integer overflow
88 parts.push(
89 mt | 26,
90 Math.floor(length / 0x1000000) & 0xff,
91 Math.floor(length / 0x10000) & 0xff,
92 Math.floor(length / 0x100) & 0xff,
93 length & 0xff,
94 );
95 }
96}
97
98/**
99 * Encode a value as CBOR bytes (RFC 8949 deterministic encoding)
100 * @param {*} value - Value to encode (null, boolean, number, string, Uint8Array, array, or object)
101 * @returns {Uint8Array} CBOR-encoded bytes
102 */
103export function cborEncode(value) {
104 const parts = [];
105
106 function encode(val) {
107 if (val === null) {
108 parts.push(CBOR_NULL);
109 } else if (val === true) {
110 parts.push(CBOR_TRUE);
111 } else if (val === false) {
112 parts.push(CBOR_FALSE);
113 } else if (typeof val === 'number') {
114 encodeInteger(val);
115 } else if (typeof val === 'string') {
116 const bytes = new TextEncoder().encode(val);
117 encodeHead(parts, 3, bytes.length); // major type 3 = text string
118 parts.push(...bytes);
119 } else if (val instanceof Uint8Array) {
120 encodeHead(parts, 2, val.length); // major type 2 = byte string
121 parts.push(...val);
122 } else if (Array.isArray(val)) {
123 encodeHead(parts, 4, val.length); // major type 4 = array
124 for (const item of val) encode(item);
125 } else if (typeof val === 'object') {
126 // Sort keys for deterministic encoding
127 const keys = Object.keys(val).sort();
128 encodeHead(parts, 5, keys.length); // major type 5 = map
129 for (const key of keys) {
130 encode(key);
131 encode(val[key]);
132 }
133 }
134 }
135
136 function encodeInteger(n) {
137 if (n >= 0) {
138 encodeHead(parts, 0, n); // major type 0 = unsigned int
139 } else {
140 encodeHead(parts, 1, -n - 1); // major type 1 = negative int
141 }
142 }
143
144 encode(value);
145 return new Uint8Array(parts);
146}
147
148// DAG-CBOR encoder that handles CIDs with tag 42
149function cborEncodeDagCbor(value) {
150 const parts = [];
151
152 function encode(val) {
153 if (val === null) {
154 parts.push(CBOR_NULL);
155 } else if (val === true) {
156 parts.push(CBOR_TRUE);
157 } else if (val === false) {
158 parts.push(CBOR_FALSE);
159 } else if (typeof val === 'number') {
160 if (Number.isInteger(val) && val >= 0) {
161 encodeHead(parts, 0, val);
162 } else if (Number.isInteger(val) && val < 0) {
163 encodeHead(parts, 1, -val - 1);
164 }
165 } else if (typeof val === 'string') {
166 const bytes = new TextEncoder().encode(val);
167 encodeHead(parts, 3, bytes.length);
168 parts.push(...bytes);
169 } else if (val instanceof CID) {
170 // CID links in DAG-CBOR use tag 42 + 0x00 multibase prefix
171 // The 0x00 prefix indicates "identity" multibase (raw bytes)
172 parts.push(0xd8, CBOR_TAG_CID);
173 encodeHead(parts, 2, val.bytes.length + 1); // +1 for 0x00 prefix
174 parts.push(0x00);
175 parts.push(...val.bytes);
176 } else if (val instanceof Uint8Array) {
177 // Regular byte string
178 encodeHead(parts, 2, val.length);
179 parts.push(...val);
180 } else if (Array.isArray(val)) {
181 encodeHead(parts, 4, val.length);
182 for (const item of val) encode(item);
183 } else if (typeof val === 'object') {
184 // DAG-CBOR: sort keys by length first, then lexicographically
185 // (differs from standard CBOR which sorts lexicographically only)
186 const keys = Object.keys(val).filter((k) => val[k] !== undefined);
187 keys.sort((a, b) => {
188 if (a.length !== b.length) return a.length - b.length;
189 return a < b ? -1 : a > b ? 1 : 0;
190 });
191 encodeHead(parts, 5, keys.length);
192 for (const key of keys) {
193 const keyBytes = new TextEncoder().encode(key);
194 encodeHead(parts, 3, keyBytes.length);
195 parts.push(...keyBytes);
196 encode(val[key]);
197 }
198 }
199 }
200
201 encode(value);
202 return new Uint8Array(parts);
203}
204
205/**
206 * Decode CBOR bytes to a JavaScript value
207 * @param {Uint8Array} bytes - CBOR-encoded bytes
208 * @returns {*} Decoded value
209 */
210export function cborDecode(bytes) {
211 let offset = 0;
212
213 function read() {
214 const initial = bytes[offset++];
215 const major = initial >> 5;
216 const info = initial & 0x1f;
217
218 let length = info;
219 if (info === 24) length = bytes[offset++];
220 else if (info === 25) {
221 length = (bytes[offset++] << 8) | bytes[offset++];
222 } else if (info === 26) {
223 // Use multiplication instead of bitshift to avoid 32-bit signed integer overflow
224 length =
225 bytes[offset++] * 0x1000000 +
226 bytes[offset++] * 0x10000 +
227 bytes[offset++] * 0x100 +
228 bytes[offset++];
229 }
230
231 switch (major) {
232 case 0:
233 return length; // unsigned int
234 case 1:
235 return -1 - length; // negative int
236 case 2: {
237 // byte string
238 const data = bytes.slice(offset, offset + length);
239 offset += length;
240 return data;
241 }
242 case 3: {
243 // text string
244 const data = new TextDecoder().decode(
245 bytes.slice(offset, offset + length),
246 );
247 offset += length;
248 return data;
249 }
250 case 4: {
251 // array
252 const arr = [];
253 for (let i = 0; i < length; i++) arr.push(read());
254 return arr;
255 }
256 case 5: {
257 // map
258 const obj = {};
259 for (let i = 0; i < length; i++) {
260 const key = read();
261 obj[key] = read();
262 }
263 return obj;
264 }
265 case 6: {
266 // tag
267 // length is the tag number
268 const taggedValue = read();
269 if (length === CBOR_TAG_CID) {
270 // CID link: byte string with 0x00 multibase prefix, return raw CID bytes
271 return taggedValue.slice(1); // strip 0x00 prefix
272 }
273 return taggedValue;
274 }
275 case 7: {
276 // special
277 if (info === 20) return false;
278 if (info === 21) return true;
279 if (info === 22) return null;
280 return undefined;
281 }
282 }
283 }
284
285 return read();
286}
287
288// === CID GENERATION ===
289// dag-cbor (0x71) + sha-256 (0x12) + 32 bytes
290
291/**
292 * Create a CIDv1 (dag-cbor + sha-256) from raw bytes
293 * @param {Uint8Array} bytes - Content to hash
294 * @returns {Promise<Uint8Array>} CID bytes (36 bytes: version + codec + multihash)
295 */
296export async function createCid(bytes) {
297 const hash = await crypto.subtle.digest('SHA-256', bytes);
298 const hashBytes = new Uint8Array(hash);
299
300 // CIDv1: version(1) + codec(dag-cbor=0x71) + multihash(sha256)
301 // Multihash: hash-type(0x12) + length(0x20=32) + digest
302 const cid = new Uint8Array(2 + 2 + 32);
303 cid[0] = 0x01; // CIDv1
304 cid[1] = 0x71; // dag-cbor codec
305 cid[2] = 0x12; // sha-256
306 cid[3] = 0x20; // 32 bytes
307 cid.set(hashBytes, 4);
308
309 return cid;
310}
311
312/**
313 * Convert CID bytes to base32lower string representation
314 * @param {Uint8Array} cid - CID bytes
315 * @returns {string} Base32lower-encoded CID with 'b' prefix
316 */
317export function cidToString(cid) {
318 // base32lower encoding for CIDv1
319 return `b${base32Encode(cid)}`;
320}
321
322/**
323 * Encode bytes as base32lower string
324 * @param {Uint8Array} bytes - Bytes to encode
325 * @returns {string} Base32lower-encoded string
326 */
327export function base32Encode(bytes) {
328 const alphabet = 'abcdefghijklmnopqrstuvwxyz234567';
329 let result = '';
330 let bits = 0;
331 let value = 0;
332
333 for (const byte of bytes) {
334 value = (value << 8) | byte;
335 bits += 8;
336 while (bits >= 5) {
337 bits -= 5;
338 result += alphabet[(value >> bits) & 31];
339 }
340 }
341
342 if (bits > 0) {
343 result += alphabet[(value << (5 - bits)) & 31];
344 }
345
346 return result;
347}
348
349// === TID GENERATION ===
350// Timestamp-based IDs: base32-sort encoded microseconds + clock ID
351
352const TID_CHARS = '234567abcdefghijklmnopqrstuvwxyz';
353let lastTimestamp = 0;
354const clockId = Math.floor(Math.random() * 1024);
355
356/**
357 * Generate a timestamp-based ID (TID) for record keys
358 * Monotonic within a process, sortable by time
359 * @returns {string} 13-character base32-sort encoded TID
360 */
361export function createTid() {
362 let timestamp = Date.now() * 1000; // microseconds
363
364 // Ensure monotonic
365 if (timestamp <= lastTimestamp) {
366 timestamp = lastTimestamp + 1;
367 }
368 lastTimestamp = timestamp;
369
370 // 13 chars: 11 for timestamp (64 bits but only ~53 used), 2 for clock ID
371 let tid = '';
372
373 // Encode timestamp (high bits first for sortability)
374 let ts = timestamp;
375 for (let i = 0; i < 11; i++) {
376 tid = TID_CHARS[ts & 31] + tid;
377 ts = Math.floor(ts / 32);
378 }
379
380 // Append clock ID (2 chars)
381 tid += TID_CHARS[(clockId >> 5) & 31];
382 tid += TID_CHARS[clockId & 31];
383
384 return tid;
385}
386
387// === P-256 SIGNING ===
388// Web Crypto ECDSA with P-256 curve
389
390/**
391 * Import a raw P-256 private key for signing
392 * @param {Uint8Array} privateKeyBytes - 32-byte raw private key
393 * @returns {Promise<CryptoKey>} Web Crypto key handle
394 */
395export async function importPrivateKey(privateKeyBytes) {
396 // Validate private key length (P-256 requires exactly 32 bytes)
397 if (
398 !(privateKeyBytes instanceof Uint8Array) ||
399 privateKeyBytes.length !== 32
400 ) {
401 throw new Error(
402 `Invalid private key: expected 32 bytes, got ${privateKeyBytes?.length ?? 'non-Uint8Array'}`,
403 );
404 }
405
406 // PKCS#8 wrapper for raw P-256 private key
407 const pkcs8Prefix = new Uint8Array([
408 0x30, 0x41, 0x02, 0x01, 0x00, 0x30, 0x13, 0x06, 0x07, 0x2a, 0x86, 0x48,
409 0xce, 0x3d, 0x02, 0x01, 0x06, 0x08, 0x2a, 0x86, 0x48, 0xce, 0x3d, 0x03,
410 0x01, 0x07, 0x04, 0x27, 0x30, 0x25, 0x02, 0x01, 0x01, 0x04, 0x20,
411 ]);
412
413 const pkcs8 = new Uint8Array(pkcs8Prefix.length + 32);
414 pkcs8.set(pkcs8Prefix);
415 pkcs8.set(privateKeyBytes, pkcs8Prefix.length);
416
417 return crypto.subtle.importKey(
418 'pkcs8',
419 pkcs8,
420 { name: 'ECDSA', namedCurve: 'P-256' },
421 false,
422 ['sign'],
423 );
424}
425
426// P-256 curve order N
427const P256_N = BigInt(
428 '0xFFFFFFFF00000000FFFFFFFFFFFFFFFFBCE6FAADA7179E84F3B9CAC2FC632551',
429);
430const P256_N_DIV_2 = P256_N / 2n;
431
432function bytesToBigInt(bytes) {
433 let result = 0n;
434 for (const byte of bytes) {
435 result = (result << 8n) | BigInt(byte);
436 }
437 return result;
438}
439
440function bigIntToBytes(n, length) {
441 const bytes = new Uint8Array(length);
442 for (let i = length - 1; i >= 0; i--) {
443 bytes[i] = Number(n & 0xffn);
444 n >>= 8n;
445 }
446 return bytes;
447}
448
449/**
450 * Sign data with ECDSA P-256, returning low-S normalized signature
451 * @param {CryptoKey} privateKey - Web Crypto key from importPrivateKey
452 * @param {Uint8Array} data - Data to sign
453 * @returns {Promise<Uint8Array>} 64-byte signature (r || s)
454 */
455export async function sign(privateKey, data) {
456 const signature = await crypto.subtle.sign(
457 { name: 'ECDSA', hash: 'SHA-256' },
458 privateKey,
459 data,
460 );
461 const sig = new Uint8Array(signature);
462
463 const r = sig.slice(0, 32);
464 const s = sig.slice(32, 64);
465 const sBigInt = bytesToBigInt(s);
466
467 // Low-S normalization: Bitcoin/ATProto require S <= N/2 to prevent
468 // signature malleability (two valid signatures for same message)
469 if (sBigInt > P256_N_DIV_2) {
470 const newS = P256_N - sBigInt;
471 const newSBytes = bigIntToBytes(newS, 32);
472 const normalized = new Uint8Array(64);
473 normalized.set(r, 0);
474 normalized.set(newSBytes, 32);
475 return normalized;
476 }
477
478 return sig;
479}
480
481/**
482 * Generate a new P-256 key pair
483 * @returns {Promise<{privateKey: Uint8Array, publicKey: Uint8Array}>} 32-byte private key, 33-byte compressed public key
484 */
485export async function generateKeyPair() {
486 const keyPair = await crypto.subtle.generateKey(
487 { name: 'ECDSA', namedCurve: 'P-256' },
488 true,
489 ['sign', 'verify'],
490 );
491
492 // Export private key as raw bytes
493 const privateJwk = await crypto.subtle.exportKey('jwk', keyPair.privateKey);
494 const privateBytes = base64UrlDecode(privateJwk.d);
495
496 // Export public key as compressed point
497 const publicRaw = await crypto.subtle.exportKey('raw', keyPair.publicKey);
498 const publicBytes = new Uint8Array(publicRaw);
499 const compressed = compressPublicKey(publicBytes);
500
501 return { privateKey: privateBytes, publicKey: compressed };
502}
503
504function compressPublicKey(uncompressed) {
505 // uncompressed is 65 bytes: 0x04 + x(32) + y(32)
506 // compressed is 33 bytes: prefix(02 or 03) + x(32)
507 const x = uncompressed.slice(1, 33);
508 const y = uncompressed.slice(33, 65);
509 const prefix = (y[31] & 1) === 0 ? 0x02 : 0x03;
510 const compressed = new Uint8Array(33);
511 compressed[0] = prefix;
512 compressed.set(x, 1);
513 return compressed;
514}
515
516/**
517 * Encode bytes as base64url string (no padding)
518 * @param {Uint8Array} bytes - Bytes to encode
519 * @returns {string} Base64url-encoded string
520 */
521export function base64UrlEncode(bytes) {
522 let binary = '';
523 for (const byte of bytes) {
524 binary += String.fromCharCode(byte);
525 }
526 const base64 = btoa(binary);
527 return base64.replace(/\+/g, '-').replace(/\//g, '_').replace(/=/g, '');
528}
529
530/**
531 * Decode base64url string to bytes
532 * @param {string} str - Base64url-encoded string
533 * @returns {Uint8Array} Decoded bytes
534 */
535export function base64UrlDecode(str) {
536 const base64 = str.replace(/-/g, '+').replace(/_/g, '/');
537 const pad = base64.length % 4;
538 const padded = pad ? base64 + '='.repeat(4 - pad) : base64;
539 const binary = atob(padded);
540 const bytes = new Uint8Array(binary.length);
541 for (let i = 0; i < binary.length; i++) {
542 bytes[i] = binary.charCodeAt(i);
543 }
544 return bytes;
545}
546
547/**
548 * Create HMAC-SHA256 signature for JWT
549 * @param {string} data - Data to sign (header.payload)
550 * @param {string} secret - Secret key
551 * @returns {Promise<string>} Base64url-encoded signature
552 */
553async function hmacSign(data, secret) {
554 const key = await crypto.subtle.importKey(
555 'raw',
556 new TextEncoder().encode(secret),
557 { name: 'HMAC', hash: 'SHA-256' },
558 false,
559 ['sign'],
560 );
561 const sig = await crypto.subtle.sign(
562 'HMAC',
563 key,
564 new TextEncoder().encode(data),
565 );
566 return base64UrlEncode(new Uint8Array(sig));
567}
568
569/**
570 * Create an access JWT for ATProto
571 * @param {string} did - User's DID (subject and audience)
572 * @param {string} secret - JWT signing secret
573 * @param {number} [expiresIn=7200] - Expiration in seconds (default 2 hours)
574 * @returns {Promise<string>} Signed JWT
575 */
576export async function createAccessJwt(did, secret, expiresIn = 7200) {
577 const header = { typ: 'at+jwt', alg: 'HS256' };
578 const now = Math.floor(Date.now() / 1000);
579 const payload = {
580 scope: 'com.atproto.access',
581 sub: did,
582 aud: did,
583 iat: now,
584 exp: now + expiresIn,
585 };
586
587 const headerB64 = base64UrlEncode(
588 new TextEncoder().encode(JSON.stringify(header)),
589 );
590 const payloadB64 = base64UrlEncode(
591 new TextEncoder().encode(JSON.stringify(payload)),
592 );
593 const signature = await hmacSign(`${headerB64}.${payloadB64}`, secret);
594
595 return `${headerB64}.${payloadB64}.${signature}`;
596}
597
598/**
599 * Create a refresh JWT for ATProto
600 * @param {string} did - User's DID (subject and audience)
601 * @param {string} secret - JWT signing secret
602 * @param {number} [expiresIn=86400] - Expiration in seconds (default 24 hours)
603 * @returns {Promise<string>} Signed JWT
604 */
605export async function createRefreshJwt(did, secret, expiresIn = 86400) {
606 const header = { typ: 'refresh+jwt', alg: 'HS256' };
607 const now = Math.floor(Date.now() / 1000);
608 // Generate random jti (token ID)
609 const jtiBytes = new Uint8Array(32);
610 crypto.getRandomValues(jtiBytes);
611 const jti = base64UrlEncode(jtiBytes);
612
613 const payload = {
614 scope: 'com.atproto.refresh',
615 sub: did,
616 aud: did,
617 jti,
618 iat: now,
619 exp: now + expiresIn,
620 };
621
622 const headerB64 = base64UrlEncode(
623 new TextEncoder().encode(JSON.stringify(header)),
624 );
625 const payloadB64 = base64UrlEncode(
626 new TextEncoder().encode(JSON.stringify(payload)),
627 );
628 const signature = await hmacSign(`${headerB64}.${payloadB64}`, secret);
629
630 return `${headerB64}.${payloadB64}.${signature}`;
631}
632
633/**
634 * Verify and decode a JWT (shared logic)
635 * @param {string} jwt - JWT string to verify
636 * @param {string} secret - JWT signing secret
637 * @param {string} expectedType - Expected token type (e.g., 'at+jwt', 'refresh+jwt')
638 * @returns {Promise<{header: Object, payload: Object}>} Decoded header and payload
639 * @throws {Error} If token is invalid, expired, or wrong type
640 */
641async function verifyJwt(jwt, secret, expectedType) {
642 const parts = jwt.split('.');
643 if (parts.length !== 3) {
644 throw new Error('Invalid JWT format');
645 }
646
647 const [headerB64, payloadB64, signatureB64] = parts;
648
649 // Verify signature
650 const expectedSig = await hmacSign(`${headerB64}.${payloadB64}`, secret);
651 if (signatureB64 !== expectedSig) {
652 throw new Error('Invalid signature');
653 }
654
655 // Decode header and payload
656 const header = JSON.parse(
657 new TextDecoder().decode(base64UrlDecode(headerB64)),
658 );
659 const payload = JSON.parse(
660 new TextDecoder().decode(base64UrlDecode(payloadB64)),
661 );
662
663 // Check token type
664 if (header.typ !== expectedType) {
665 throw new Error(`Invalid token type: expected ${expectedType}`);
666 }
667
668 // Check expiration
669 const now = Math.floor(Date.now() / 1000);
670 if (payload.exp && payload.exp < now) {
671 throw new Error('Token expired');
672 }
673
674 return { header, payload };
675}
676
677/**
678 * Verify and decode an access JWT
679 * @param {string} jwt - JWT string to verify
680 * @param {string} secret - JWT signing secret
681 * @returns {Promise<Object>} Decoded payload
682 * @throws {Error} If token is invalid, expired, or wrong type
683 */
684export async function verifyAccessJwt(jwt, secret) {
685 const { payload } = await verifyJwt(jwt, secret, 'at+jwt');
686 return payload;
687}
688
689/**
690 * Verify and decode a refresh JWT
691 * @param {string} jwt - JWT string to verify
692 * @param {string} secret - JWT signing secret
693 * @returns {Promise<Object>} Decoded payload
694 * @throws {Error} If token is invalid, expired, or wrong type
695 */
696export async function verifyRefreshJwt(jwt, secret) {
697 const { payload } = await verifyJwt(jwt, secret, 'refresh+jwt');
698
699 // Validate audience matches subject (token intended for this user)
700 if (payload.aud && payload.aud !== payload.sub) {
701 throw new Error('Invalid audience');
702 }
703
704 return payload;
705}
706
707/**
708 * Create a service auth JWT signed with ES256 (P-256)
709 * Used for proxying requests to AppView
710 * @param {Object} params - JWT parameters
711 * @param {string} params.iss - Issuer DID (PDS DID)
712 * @param {string} params.aud - Audience DID (AppView DID)
713 * @param {string|null} params.lxm - Lexicon method being called
714 * @param {CryptoKey} params.signingKey - P-256 private key from importPrivateKey
715 * @returns {Promise<string>} Signed JWT
716 */
717export async function createServiceJwt({ iss, aud, lxm, signingKey }) {
718 const header = { typ: 'JWT', alg: 'ES256' };
719 const now = Math.floor(Date.now() / 1000);
720
721 // Generate random jti
722 const jtiBytes = new Uint8Array(16);
723 crypto.getRandomValues(jtiBytes);
724 const jti = bytesToHex(jtiBytes);
725
726 const payload = {
727 iss,
728 aud,
729 exp: now + 60, // 1 minute expiration
730 iat: now,
731 jti,
732 };
733 if (lxm) payload.lxm = lxm;
734
735 const headerB64 = base64UrlEncode(
736 new TextEncoder().encode(JSON.stringify(header)),
737 );
738 const payloadB64 = base64UrlEncode(
739 new TextEncoder().encode(JSON.stringify(payload)),
740 );
741 const toSign = new TextEncoder().encode(`${headerB64}.${payloadB64}`);
742
743 const sig = await sign(signingKey, toSign);
744 const sigB64 = base64UrlEncode(sig);
745
746 return `${headerB64}.${payloadB64}.${sigB64}`;
747}
748
749/**
750 * Convert bytes to hexadecimal string
751 * @param {Uint8Array} bytes - Bytes to convert
752 * @returns {string} Hex string
753 */
754export function bytesToHex(bytes) {
755 return Array.from(bytes)
756 .map((b) => b.toString(16).padStart(2, '0'))
757 .join('');
758}
759
760/**
761 * Convert hexadecimal string to bytes
762 * @param {string} hex - Hex string
763 * @returns {Uint8Array} Decoded bytes
764 */
765export function hexToBytes(hex) {
766 const bytes = new Uint8Array(hex.length / 2);
767 for (let i = 0; i < hex.length; i += 2) {
768 bytes[i / 2] = parseInt(hex.substr(i, 2), 16);
769 }
770 return bytes;
771}
772
773// === MERKLE SEARCH TREE ===
774// ATProto-compliant MST implementation
775
776async function sha256(data) {
777 const hash = await crypto.subtle.digest('SHA-256', data);
778 return new Uint8Array(hash);
779}
780
781// Cache for key depths (SHA-256 is expensive)
782const keyDepthCache = new Map();
783
784/**
785 * Get MST tree depth for a key based on leading zeros in SHA-256 hash
786 * @param {string} key - Record key (collection/rkey)
787 * @returns {Promise<number>} Tree depth (leading zeros / 2)
788 */
789export async function getKeyDepth(key) {
790 // Count leading zeros in SHA-256 hash, divide by 2
791 if (keyDepthCache.has(key)) return keyDepthCache.get(key);
792
793 const keyBytes = new TextEncoder().encode(key);
794 const hash = await sha256(keyBytes);
795
796 let zeros = 0;
797 for (const byte of hash) {
798 if (byte === 0) {
799 zeros += 8;
800 } else {
801 // Count leading zeros in this byte
802 for (let i = 7; i >= 0; i--) {
803 if ((byte >> i) & 1) break;
804 zeros++;
805 }
806 break;
807 }
808 }
809
810 // MST depth = leading zeros in SHA-256 hash / 2
811 // This creates a probabilistic tree where ~50% of keys are at depth 0,
812 // ~25% at depth 1, etc., giving O(log n) lookups
813 const depth = Math.floor(zeros / 2);
814 keyDepthCache.set(key, depth);
815 return depth;
816}
817
818// Compute common prefix length between two byte arrays
819function commonPrefixLen(a, b) {
820 const minLen = Math.min(a.length, b.length);
821 for (let i = 0; i < minLen; i++) {
822 if (a[i] !== b[i]) return i;
823 }
824 return minLen;
825}
826
827class MST {
828 constructor(sql) {
829 this.sql = sql;
830 }
831
832 async computeRoot() {
833 const records = this.sql
834 .exec(`
835 SELECT collection, rkey, cid FROM records ORDER BY collection, rkey
836 `)
837 .toArray();
838
839 if (records.length === 0) {
840 return null;
841 }
842
843 // Build entries with pre-computed depths (heights)
844 // In ATProto MST, "height" determines which layer a key belongs to
845 // Layer 0 is at the BOTTOM, root is at the highest layer
846 const entries = [];
847 let maxDepth = 0;
848 for (const r of records) {
849 const key = `${r.collection}/${r.rkey}`;
850 const depth = await getKeyDepth(key);
851 maxDepth = Math.max(maxDepth, depth);
852 entries.push({
853 key,
854 keyBytes: new TextEncoder().encode(key),
855 cid: r.cid,
856 depth,
857 });
858 }
859
860 // Start building from the root (highest layer) going down to layer 0
861 return this.buildTree(entries, maxDepth);
862 }
863
864 async buildTree(entries, layer) {
865 if (entries.length === 0) return null;
866
867 // Separate entries for this layer vs lower layers (subtrees)
868 // Keys with depth == layer stay at this node
869 // Keys with depth < layer go into subtrees (going down toward layer 0)
870 const thisLayer = [];
871 let leftSubtree = [];
872
873 for (const entry of entries) {
874 if (entry.depth < layer) {
875 // This entry belongs to a lower layer - accumulate for subtree
876 leftSubtree.push(entry);
877 } else {
878 // This entry belongs at current layer (depth == layer)
879 // Process accumulated left subtree first
880 if (leftSubtree.length > 0) {
881 const leftCid = await this.buildTree(leftSubtree, layer - 1);
882 thisLayer.push({ type: 'subtree', cid: leftCid });
883 leftSubtree = [];
884 }
885 thisLayer.push({ type: 'entry', entry });
886 }
887 }
888
889 // Handle remaining left subtree
890 if (leftSubtree.length > 0) {
891 const leftCid = await this.buildTree(leftSubtree, layer - 1);
892 thisLayer.push({ type: 'subtree', cid: leftCid });
893 }
894
895 // Build node with proper ATProto format
896 const node = { e: [] };
897 let leftCid = null;
898 let prevKeyBytes = new Uint8Array(0);
899
900 for (let i = 0; i < thisLayer.length; i++) {
901 const item = thisLayer[i];
902
903 if (item.type === 'subtree') {
904 if (node.e.length === 0) {
905 leftCid = item.cid;
906 } else {
907 // Attach to previous entry's 't' field
908 node.e[node.e.length - 1].t = new CID(cidToBytes(item.cid));
909 }
910 } else {
911 // Entry - compute prefix compression
912 const keyBytes = item.entry.keyBytes;
913 const prefixLen = commonPrefixLen(prevKeyBytes, keyBytes);
914 const keySuffix = keyBytes.slice(prefixLen);
915
916 // ATProto requires t field to be present (can be null)
917 const e = {
918 p: prefixLen,
919 k: keySuffix,
920 v: new CID(cidToBytes(item.entry.cid)),
921 t: null, // Will be updated if there's a subtree
922 };
923
924 node.e.push(e);
925 prevKeyBytes = keyBytes;
926 }
927 }
928
929 // ATProto requires l field to be present (can be null)
930 node.l = leftCid ? new CID(cidToBytes(leftCid)) : null;
931
932 // Encode node with proper MST CBOR format
933 const nodeBytes = cborEncodeDagCbor(node);
934 const nodeCid = await createCid(nodeBytes);
935 const cidStr = cidToString(nodeCid);
936
937 this.sql.exec(
938 `INSERT OR REPLACE INTO blocks (cid, data) VALUES (?, ?)`,
939 cidStr,
940 nodeBytes,
941 );
942
943 return cidStr;
944 }
945}
946
947// === CAR FILE BUILDER ===
948
949/**
950 * Encode integer as unsigned varint
951 * @param {number} n - Non-negative integer
952 * @returns {Uint8Array} Varint-encoded bytes
953 */
954export function varint(n) {
955 const bytes = [];
956 while (n >= 0x80) {
957 bytes.push((n & 0x7f) | 0x80);
958 n >>>= 7;
959 }
960 bytes.push(n);
961 return new Uint8Array(bytes);
962}
963
964/**
965 * Convert base32lower CID string to raw bytes
966 * @param {string} cidStr - CID string with 'b' prefix
967 * @returns {Uint8Array} CID bytes
968 */
969export function cidToBytes(cidStr) {
970 // Decode base32lower CID string to bytes
971 if (!cidStr.startsWith('b')) throw new Error('expected base32lower CID');
972 return base32Decode(cidStr.slice(1));
973}
974
975/**
976 * Decode base32lower string to bytes
977 * @param {string} str - Base32lower-encoded string
978 * @returns {Uint8Array} Decoded bytes
979 */
980export function base32Decode(str) {
981 const alphabet = 'abcdefghijklmnopqrstuvwxyz234567';
982 let bits = 0;
983 let value = 0;
984 const output = [];
985
986 for (const char of str) {
987 const idx = alphabet.indexOf(char);
988 if (idx === -1) continue;
989 value = (value << 5) | idx;
990 bits += 5;
991 if (bits >= 8) {
992 bits -= 8;
993 output.push((value >> bits) & 0xff);
994 }
995 }
996
997 return new Uint8Array(output);
998}
999
1000/**
1001 * Build a CAR (Content Addressable aRchive) file
1002 * @param {string} rootCid - Root CID string
1003 * @param {Array<{cid: string, data: Uint8Array}>} blocks - Blocks to include
1004 * @returns {Uint8Array} CAR file bytes
1005 */
1006export function buildCarFile(rootCid, blocks) {
1007 const parts = [];
1008
1009 // Header: { version: 1, roots: [rootCid] }
1010 const rootCidBytes = cidToBytes(rootCid);
1011 const header = cborEncodeDagCbor({
1012 version: 1,
1013 roots: [new CID(rootCidBytes)],
1014 });
1015 parts.push(varint(header.length));
1016 parts.push(header);
1017
1018 // Blocks: varint(len) + cid + data
1019 for (const block of blocks) {
1020 const cidBytes = cidToBytes(block.cid);
1021 const blockLen = cidBytes.length + block.data.length;
1022 parts.push(varint(blockLen));
1023 parts.push(cidBytes);
1024 parts.push(block.data);
1025 }
1026
1027 // Concatenate all parts
1028 const totalLen = parts.reduce((sum, p) => sum + p.length, 0);
1029 const car = new Uint8Array(totalLen);
1030 let offset = 0;
1031 for (const part of parts) {
1032 car.set(part, offset);
1033 offset += part.length;
1034 }
1035
1036 return car;
1037}
1038
1039/**
1040 * Route handler function type
1041 * @callback RouteHandler
1042 * @param {PersonalDataServer} pds - PDS instance
1043 * @param {Request} request - HTTP request
1044 * @param {URL} url - Parsed URL
1045 * @returns {Promise<Response>} HTTP response
1046 */
1047
1048/**
1049 * @typedef {Object} Route
1050 * @property {string} [method] - Required HTTP method (default: any)
1051 * @property {RouteHandler} handler - Handler function
1052 */
1053
1054/** @type {Record<string, Route>} */
1055const pdsRoutes = {
1056 '/.well-known/atproto-did': {
1057 handler: (pds, _req, _url) => pds.handleAtprotoDid(),
1058 },
1059 '/init': {
1060 method: 'POST',
1061 handler: (pds, req, _url) => pds.handleInit(req),
1062 },
1063 '/status': {
1064 handler: (pds, _req, _url) => pds.handleStatus(),
1065 },
1066 '/reset-repo': {
1067 handler: (pds, _req, _url) => pds.handleResetRepo(),
1068 },
1069 '/forward-event': {
1070 handler: (pds, req, _url) => pds.handleForwardEvent(req),
1071 },
1072 '/register-did': {
1073 handler: (pds, req, _url) => pds.handleRegisterDid(req),
1074 },
1075 '/get-registered-dids': {
1076 handler: (pds, _req, _url) => pds.handleGetRegisteredDids(),
1077 },
1078 '/register-handle': {
1079 method: 'POST',
1080 handler: (pds, req, _url) => pds.handleRegisterHandle(req),
1081 },
1082 '/resolve-handle': {
1083 handler: (pds, _req, url) => pds.handleResolveHandle(url),
1084 },
1085 '/repo-info': {
1086 handler: (pds, _req, _url) => pds.handleRepoInfo(),
1087 },
1088 '/xrpc/com.atproto.server.describeServer': {
1089 handler: (pds, req, _url) => pds.handleDescribeServer(req),
1090 },
1091 '/xrpc/com.atproto.server.createSession': {
1092 method: 'POST',
1093 handler: (pds, req, _url) => pds.handleCreateSession(req),
1094 },
1095 '/xrpc/com.atproto.server.getSession': {
1096 handler: (pds, req, _url) => pds.handleGetSession(req),
1097 },
1098 '/xrpc/com.atproto.server.refreshSession': {
1099 method: 'POST',
1100 handler: (pds, req, _url) => pds.handleRefreshSession(req),
1101 },
1102 '/xrpc/app.bsky.actor.getPreferences': {
1103 handler: (pds, req, _url) => pds.handleGetPreferences(req),
1104 },
1105 '/xrpc/app.bsky.actor.putPreferences': {
1106 method: 'POST',
1107 handler: (pds, req, _url) => pds.handlePutPreferences(req),
1108 },
1109 '/xrpc/com.atproto.sync.listRepos': {
1110 handler: (pds, _req, _url) => pds.handleListRepos(),
1111 },
1112 '/xrpc/com.atproto.repo.createRecord': {
1113 method: 'POST',
1114 handler: (pds, req, _url) => pds.handleCreateRecord(req),
1115 },
1116 '/xrpc/com.atproto.repo.deleteRecord': {
1117 method: 'POST',
1118 handler: (pds, req, _url) => pds.handleDeleteRecord(req),
1119 },
1120 '/xrpc/com.atproto.repo.putRecord': {
1121 method: 'POST',
1122 handler: (pds, req, _url) => pds.handlePutRecord(req),
1123 },
1124 '/xrpc/com.atproto.repo.applyWrites': {
1125 method: 'POST',
1126 handler: (pds, req, _url) => pds.handleApplyWrites(req),
1127 },
1128 '/xrpc/com.atproto.repo.getRecord': {
1129 handler: (pds, _req, url) => pds.handleGetRecord(url),
1130 },
1131 '/xrpc/com.atproto.repo.describeRepo': {
1132 handler: (pds, _req, _url) => pds.handleDescribeRepo(),
1133 },
1134 '/xrpc/com.atproto.repo.listRecords': {
1135 handler: (pds, _req, url) => pds.handleListRecords(url),
1136 },
1137 '/xrpc/com.atproto.sync.getLatestCommit': {
1138 handler: (pds, _req, _url) => pds.handleGetLatestCommit(),
1139 },
1140 '/xrpc/com.atproto.sync.getRepoStatus': {
1141 handler: (pds, _req, _url) => pds.handleGetRepoStatus(),
1142 },
1143 '/xrpc/com.atproto.sync.getRepo': {
1144 handler: (pds, _req, _url) => pds.handleGetRepo(),
1145 },
1146 '/xrpc/com.atproto.sync.getRecord': {
1147 handler: (pds, _req, url) => pds.handleSyncGetRecord(url),
1148 },
1149 '/xrpc/com.atproto.sync.subscribeRepos': {
1150 handler: (pds, req, url) => pds.handleSubscribeRepos(req, url),
1151 },
1152};
1153
1154export class PersonalDataServer {
1155 constructor(state, env) {
1156 this.state = state;
1157 this.sql = state.storage.sql;
1158 this.env = env;
1159
1160 // Initialize schema
1161 this.sql.exec(`
1162 CREATE TABLE IF NOT EXISTS blocks (
1163 cid TEXT PRIMARY KEY,
1164 data BLOB NOT NULL
1165 );
1166
1167 CREATE TABLE IF NOT EXISTS records (
1168 uri TEXT PRIMARY KEY,
1169 cid TEXT NOT NULL,
1170 collection TEXT NOT NULL,
1171 rkey TEXT NOT NULL,
1172 value BLOB NOT NULL
1173 );
1174
1175 CREATE TABLE IF NOT EXISTS commits (
1176 seq INTEGER PRIMARY KEY AUTOINCREMENT,
1177 cid TEXT NOT NULL,
1178 rev TEXT NOT NULL,
1179 prev TEXT
1180 );
1181
1182 CREATE TABLE IF NOT EXISTS seq_events (
1183 seq INTEGER PRIMARY KEY AUTOINCREMENT,
1184 did TEXT NOT NULL,
1185 commit_cid TEXT NOT NULL,
1186 evt BLOB NOT NULL
1187 );
1188
1189 CREATE INDEX IF NOT EXISTS idx_records_collection ON records(collection, rkey);
1190 `);
1191 }
1192
1193 async initIdentity(did, privateKeyHex, handle = null) {
1194 await this.state.storage.put('did', did);
1195 await this.state.storage.put('privateKey', privateKeyHex);
1196 if (handle) {
1197 await this.state.storage.put('handle', handle);
1198 }
1199 }
1200
1201 async getDid() {
1202 if (!this._did) {
1203 this._did = await this.state.storage.get('did');
1204 }
1205 return this._did;
1206 }
1207
1208 async getHandle() {
1209 return this.state.storage.get('handle');
1210 }
1211
1212 async getSigningKey() {
1213 const hex = await this.state.storage.get('privateKey');
1214 if (!hex) return null;
1215 return importPrivateKey(hexToBytes(hex));
1216 }
1217
1218 // Collect MST node blocks for a given root CID
1219 collectMstBlocks(rootCidStr) {
1220 const blocks = [];
1221 const visited = new Set();
1222
1223 const collect = (cidStr) => {
1224 if (visited.has(cidStr)) return;
1225 visited.add(cidStr);
1226
1227 const rows = this.sql
1228 .exec(`SELECT data FROM blocks WHERE cid = ?`, cidStr)
1229 .toArray();
1230 if (rows.length === 0) return;
1231
1232 const data = new Uint8Array(rows[0].data);
1233 blocks.push({ cid: cidStr, data }); // Keep as string, buildCarFile will convert
1234
1235 // Decode and follow child CIDs (MST nodes have 'l' and 'e' with 't' subtrees)
1236 try {
1237 const node = cborDecode(data);
1238 if (node.l) collect(cidToString(node.l));
1239 if (node.e) {
1240 for (const entry of node.e) {
1241 if (entry.t) collect(cidToString(entry.t));
1242 }
1243 }
1244 } catch (_e) {
1245 // Not an MST node, ignore
1246 }
1247 };
1248
1249 collect(rootCidStr);
1250 return blocks;
1251 }
1252
1253 async createRecord(collection, record, rkey = null) {
1254 const did = await this.getDid();
1255 if (!did) throw new Error('PDS not initialized');
1256
1257 rkey = rkey || createTid();
1258 const uri = `at://${did}/${collection}/${rkey}`;
1259
1260 // Encode and hash record (must use DAG-CBOR for proper key ordering)
1261 const recordBytes = cborEncodeDagCbor(record);
1262 const recordCid = await createCid(recordBytes);
1263 const recordCidStr = cidToString(recordCid);
1264
1265 // Store block
1266 this.sql.exec(
1267 `INSERT OR REPLACE INTO blocks (cid, data) VALUES (?, ?)`,
1268 recordCidStr,
1269 recordBytes,
1270 );
1271
1272 // Store record index
1273 this.sql.exec(
1274 `INSERT OR REPLACE INTO records (uri, cid, collection, rkey, value) VALUES (?, ?, ?, ?, ?)`,
1275 uri,
1276 recordCidStr,
1277 collection,
1278 rkey,
1279 recordBytes,
1280 );
1281
1282 // Rebuild MST
1283 const mst = new MST(this.sql);
1284 const dataRoot = await mst.computeRoot();
1285
1286 // Get previous commit
1287 const prevCommits = this.sql
1288 .exec(`SELECT cid, rev FROM commits ORDER BY seq DESC LIMIT 1`)
1289 .toArray();
1290 const prevCommit = prevCommits.length > 0 ? prevCommits[0] : null;
1291
1292 // Create commit
1293 const rev = createTid();
1294 // Build commit with CIDs wrapped in CID class (for dag-cbor tag 42 encoding)
1295 const commit = {
1296 did,
1297 version: 3,
1298 data: new CID(cidToBytes(dataRoot)), // CID wrapped for explicit encoding
1299 rev,
1300 prev: prevCommit?.cid ? new CID(cidToBytes(prevCommit.cid)) : null,
1301 };
1302
1303 // Sign commit (using dag-cbor encoder for CIDs)
1304 const commitBytes = cborEncodeDagCbor(commit);
1305 const signingKey = await this.getSigningKey();
1306 const sig = await sign(signingKey, commitBytes);
1307
1308 const signedCommit = { ...commit, sig };
1309 const signedBytes = cborEncodeDagCbor(signedCommit);
1310 const commitCid = await createCid(signedBytes);
1311 const commitCidStr = cidToString(commitCid);
1312
1313 // Store commit block
1314 this.sql.exec(
1315 `INSERT OR REPLACE INTO blocks (cid, data) VALUES (?, ?)`,
1316 commitCidStr,
1317 signedBytes,
1318 );
1319
1320 // Store commit reference
1321 this.sql.exec(
1322 `INSERT INTO commits (cid, rev, prev) VALUES (?, ?, ?)`,
1323 commitCidStr,
1324 rev,
1325 prevCommit?.cid || null,
1326 );
1327
1328 // Update head and rev for listRepos
1329 await this.state.storage.put('head', commitCidStr);
1330 await this.state.storage.put('rev', rev);
1331
1332 // Collect blocks for the event (record + commit + MST nodes)
1333 // Build a mini CAR with just the new blocks - use string CIDs
1334 const newBlocks = [];
1335 // Add record block
1336 newBlocks.push({ cid: recordCidStr, data: recordBytes });
1337 // Add commit block
1338 newBlocks.push({ cid: commitCidStr, data: signedBytes });
1339 // Add MST node blocks (get all blocks referenced by commit.data)
1340 const mstBlocks = this.collectMstBlocks(dataRoot);
1341 newBlocks.push(...mstBlocks);
1342
1343 // Sequence event with blocks - store complete event data including rev and time
1344 // blocks must be a full CAR file with header (roots = [commitCid])
1345 const eventTime = new Date().toISOString();
1346 const evt = cborEncode({
1347 ops: [
1348 { action: 'create', path: `${collection}/${rkey}`, cid: recordCidStr },
1349 ],
1350 blocks: buildCarFile(commitCidStr, newBlocks), // Full CAR with header
1351 rev, // Store the actual commit revision
1352 time: eventTime, // Store the actual event time
1353 });
1354 this.sql.exec(
1355 `INSERT INTO seq_events (did, commit_cid, evt) VALUES (?, ?, ?)`,
1356 did,
1357 commitCidStr,
1358 evt,
1359 );
1360
1361 // Broadcast to subscribers (both local and via default DO for relay)
1362 const evtRows = this.sql
1363 .exec(`SELECT * FROM seq_events ORDER BY seq DESC LIMIT 1`)
1364 .toArray();
1365 if (evtRows.length > 0) {
1366 this.broadcastEvent(evtRows[0]);
1367 // Also forward to default DO for relay subscribers
1368 if (this.env?.PDS) {
1369 const defaultId = this.env.PDS.idFromName('default');
1370 const defaultPds = this.env.PDS.get(defaultId);
1371 // Convert ArrayBuffer to array for JSON serialization
1372 const row = evtRows[0];
1373 const evtArray = Array.from(new Uint8Array(row.evt));
1374 // Fire and forget but log errors
1375 defaultPds
1376 .fetch(
1377 new Request('http://internal/forward-event', {
1378 method: 'POST',
1379 body: JSON.stringify({ ...row, evt: evtArray }),
1380 }),
1381 )
1382 .then((r) => r.json())
1383 .then((r) => console.log('forward result:', r))
1384 .catch((e) => console.log('forward error:', e));
1385 }
1386 }
1387
1388 return { uri, cid: recordCidStr, commit: commitCidStr };
1389 }
1390
1391 async deleteRecord(collection, rkey) {
1392 const did = await this.getDid();
1393 if (!did) throw new Error('PDS not initialized');
1394
1395 const uri = `at://${did}/${collection}/${rkey}`;
1396
1397 // Check if record exists
1398 const existing = this.sql
1399 .exec(`SELECT cid FROM records WHERE uri = ?`, uri)
1400 .toArray();
1401 if (existing.length === 0) {
1402 return { error: 'RecordNotFound', message: 'record not found' };
1403 }
1404
1405 // Delete from records table
1406 this.sql.exec(`DELETE FROM records WHERE uri = ?`, uri);
1407
1408 // Rebuild MST
1409 const mst = new MST(this.sql);
1410 const dataRoot = await mst.computeRoot();
1411
1412 // Get previous commit
1413 const prevCommits = this.sql
1414 .exec(`SELECT cid, rev FROM commits ORDER BY seq DESC LIMIT 1`)
1415 .toArray();
1416 const prevCommit = prevCommits.length > 0 ? prevCommits[0] : null;
1417
1418 // Create commit
1419 const rev = createTid();
1420 const commit = {
1421 did,
1422 version: 3,
1423 data: dataRoot ? new CID(cidToBytes(dataRoot)) : null,
1424 rev,
1425 prev: prevCommit?.cid ? new CID(cidToBytes(prevCommit.cid)) : null,
1426 };
1427
1428 // Sign commit
1429 const commitBytes = cborEncodeDagCbor(commit);
1430 const signingKey = await this.getSigningKey();
1431 const sig = await sign(signingKey, commitBytes);
1432
1433 const signedCommit = { ...commit, sig };
1434 const signedBytes = cborEncodeDagCbor(signedCommit);
1435 const commitCid = await createCid(signedBytes);
1436 const commitCidStr = cidToString(commitCid);
1437
1438 // Store commit block
1439 this.sql.exec(
1440 `INSERT OR REPLACE INTO blocks (cid, data) VALUES (?, ?)`,
1441 commitCidStr,
1442 signedBytes,
1443 );
1444
1445 // Store commit reference
1446 this.sql.exec(
1447 `INSERT INTO commits (cid, rev, prev) VALUES (?, ?, ?)`,
1448 commitCidStr,
1449 rev,
1450 prevCommit?.cid || null,
1451 );
1452
1453 // Update head and rev
1454 await this.state.storage.put('head', commitCidStr);
1455 await this.state.storage.put('rev', rev);
1456
1457 // Collect blocks for the event (commit + MST nodes, no record block)
1458 const newBlocks = [];
1459 newBlocks.push({ cid: commitCidStr, data: signedBytes });
1460 if (dataRoot) {
1461 const mstBlocks = this.collectMstBlocks(dataRoot);
1462 newBlocks.push(...mstBlocks);
1463 }
1464
1465 // Sequence event with delete action
1466 const eventTime = new Date().toISOString();
1467 const evt = cborEncode({
1468 ops: [{ action: 'delete', path: `${collection}/${rkey}`, cid: null }],
1469 blocks: buildCarFile(commitCidStr, newBlocks),
1470 rev,
1471 time: eventTime,
1472 });
1473 this.sql.exec(
1474 `INSERT INTO seq_events (did, commit_cid, evt) VALUES (?, ?, ?)`,
1475 did,
1476 commitCidStr,
1477 evt,
1478 );
1479
1480 // Broadcast to subscribers
1481 const evtRows = this.sql
1482 .exec(`SELECT * FROM seq_events ORDER BY seq DESC LIMIT 1`)
1483 .toArray();
1484 if (evtRows.length > 0) {
1485 this.broadcastEvent(evtRows[0]);
1486 // Forward to default DO for relay subscribers
1487 if (this.env?.PDS) {
1488 const defaultId = this.env.PDS.idFromName('default');
1489 const defaultPds = this.env.PDS.get(defaultId);
1490 const row = evtRows[0];
1491 const evtArray = Array.from(new Uint8Array(row.evt));
1492 defaultPds
1493 .fetch(
1494 new Request('http://internal/forward-event', {
1495 method: 'POST',
1496 body: JSON.stringify({ ...row, evt: evtArray }),
1497 }),
1498 )
1499 .catch((e) => console.log('forward error:', e));
1500 }
1501 }
1502
1503 return { ok: true };
1504 }
1505
1506 formatEvent(evt) {
1507 // AT Protocol frame format: header + body
1508 // Use DAG-CBOR encoding for body (CIDs need tag 42 + 0x00 prefix)
1509 const header = cborEncode({ op: 1, t: '#commit' });
1510
1511 // Decode stored event to get ops, blocks, rev, and time
1512 const evtData = cborDecode(new Uint8Array(evt.evt));
1513 const ops = evtData.ops.map((op) => ({
1514 ...op,
1515 cid: op.cid ? new CID(cidToBytes(op.cid)) : null, // Wrap in CID class for tag 42 encoding
1516 }));
1517 // Get blocks from stored event (already in CAR format)
1518 const blocks = evtData.blocks || new Uint8Array(0);
1519
1520 const body = cborEncodeDagCbor({
1521 seq: evt.seq,
1522 rebase: false,
1523 tooBig: false,
1524 repo: evt.did,
1525 commit: new CID(cidToBytes(evt.commit_cid)), // Wrap in CID class for tag 42 encoding
1526 rev: evtData.rev, // Use stored rev from commit creation
1527 since: null,
1528 blocks: blocks instanceof Uint8Array ? blocks : new Uint8Array(blocks),
1529 ops,
1530 blobs: [],
1531 time: evtData.time, // Use stored time from event creation
1532 });
1533
1534 // Concatenate header + body
1535 const frame = new Uint8Array(header.length + body.length);
1536 frame.set(header);
1537 frame.set(body, header.length);
1538 return frame;
1539 }
1540
1541 async webSocketMessage(ws, message) {
1542 // Handle ping
1543 if (message === 'ping') ws.send('pong');
1544 }
1545
1546 async webSocketClose(_ws, _code, _reason) {
1547 // Durable Object will hibernate when no connections remain
1548 }
1549
1550 broadcastEvent(evt) {
1551 const frame = this.formatEvent(evt);
1552 for (const ws of this.state.getWebSockets()) {
1553 try {
1554 ws.send(frame);
1555 } catch (_e) {
1556 // Client disconnected
1557 }
1558 }
1559 }
1560
1561 async handleAtprotoDid() {
1562 let did = await this.getDid();
1563 if (!did) {
1564 const registeredDids =
1565 (await this.state.storage.get('registeredDids')) || [];
1566 did = registeredDids[0];
1567 }
1568 if (!did) {
1569 return new Response('User not found', { status: 404 });
1570 }
1571 return new Response(did, { headers: { 'Content-Type': 'text/plain' } });
1572 }
1573
1574 async handleInit(request) {
1575 const body = await request.json();
1576 if (!body.did || !body.privateKey) {
1577 return errorResponse('InvalidRequest', 'missing did or privateKey', 400);
1578 }
1579 await this.initIdentity(body.did, body.privateKey, body.handle || null);
1580 return Response.json({
1581 ok: true,
1582 did: body.did,
1583 handle: body.handle || null,
1584 });
1585 }
1586
1587 async handleStatus() {
1588 const did = await this.getDid();
1589 return Response.json({ initialized: !!did, did: did || null });
1590 }
1591
1592 async handleResetRepo() {
1593 this.sql.exec(`DELETE FROM blocks`);
1594 this.sql.exec(`DELETE FROM records`);
1595 this.sql.exec(`DELETE FROM commits`);
1596 this.sql.exec(`DELETE FROM seq_events`);
1597 await this.state.storage.delete('head');
1598 await this.state.storage.delete('rev');
1599 return Response.json({ ok: true, message: 'repo data cleared' });
1600 }
1601
1602 async handleForwardEvent(request) {
1603 const evt = await request.json();
1604 const numSockets = [...this.state.getWebSockets()].length;
1605 console.log(
1606 `forward-event: received event seq=${evt.seq}, ${numSockets} connected sockets`,
1607 );
1608 this.broadcastEvent({
1609 seq: evt.seq,
1610 did: evt.did,
1611 commit_cid: evt.commit_cid,
1612 evt: new Uint8Array(Object.values(evt.evt)),
1613 });
1614 return Response.json({ ok: true, sockets: numSockets });
1615 }
1616
1617 async handleRegisterDid(request) {
1618 const body = await request.json();
1619 const registeredDids =
1620 (await this.state.storage.get('registeredDids')) || [];
1621 if (!registeredDids.includes(body.did)) {
1622 registeredDids.push(body.did);
1623 await this.state.storage.put('registeredDids', registeredDids);
1624 }
1625 return Response.json({ ok: true });
1626 }
1627
1628 async handleGetRegisteredDids() {
1629 const registeredDids =
1630 (await this.state.storage.get('registeredDids')) || [];
1631 return Response.json({ dids: registeredDids });
1632 }
1633
1634 async handleRegisterHandle(request) {
1635 const body = await request.json();
1636 const { handle, did } = body;
1637 if (!handle || !did) {
1638 return errorResponse('InvalidRequest', 'missing handle or did', 400);
1639 }
1640 const handleMap = (await this.state.storage.get('handleMap')) || {};
1641 handleMap[handle] = did;
1642 await this.state.storage.put('handleMap', handleMap);
1643 return Response.json({ ok: true });
1644 }
1645
1646 async handleResolveHandle(url) {
1647 const handle = url.searchParams.get('handle');
1648 if (!handle) {
1649 return errorResponse('InvalidRequest', 'missing handle', 400);
1650 }
1651 const handleMap = (await this.state.storage.get('handleMap')) || {};
1652 const did = handleMap[handle];
1653 if (!did) {
1654 return errorResponse('NotFound', 'handle not found', 404);
1655 }
1656 return Response.json({ did });
1657 }
1658
1659 async handleRepoInfo() {
1660 const head = await this.state.storage.get('head');
1661 const rev = await this.state.storage.get('rev');
1662 return Response.json({ head: head || null, rev: rev || null });
1663 }
1664
1665 handleDescribeServer(request) {
1666 const hostname = request.headers.get('x-hostname') || 'localhost';
1667 return Response.json({
1668 did: `did:web:${hostname}`,
1669 availableUserDomains: [`.${hostname}`],
1670 inviteCodeRequired: false,
1671 phoneVerificationRequired: false,
1672 links: {},
1673 contact: {},
1674 });
1675 }
1676
1677 async handleCreateSession(request) {
1678 const body = await request.json();
1679 const { identifier, password } = body;
1680
1681 if (!identifier || !password) {
1682 return errorResponse(
1683 'InvalidRequest',
1684 'Missing identifier or password',
1685 400,
1686 );
1687 }
1688
1689 // Check password against env var
1690 const expectedPassword = this.env?.PDS_PASSWORD;
1691 if (!expectedPassword || password !== expectedPassword) {
1692 return errorResponse(
1693 'AuthRequired',
1694 'Invalid identifier or password',
1695 401,
1696 );
1697 }
1698
1699 // Resolve identifier to DID
1700 let did = identifier;
1701 if (!identifier.startsWith('did:')) {
1702 // Try to resolve handle
1703 const handleMap = (await this.state.storage.get('handleMap')) || {};
1704 did = handleMap[identifier];
1705 if (!did) {
1706 return errorResponse('InvalidRequest', 'Unable to resolve handle', 400);
1707 }
1708 }
1709
1710 // Get handle for response
1711 const handle = await this.getHandleForDid(did);
1712
1713 // Create tokens
1714 const jwtSecret = this.env?.JWT_SECRET;
1715 if (!jwtSecret) {
1716 return errorResponse(
1717 'InternalServerError',
1718 'Server not configured for authentication',
1719 500,
1720 );
1721 }
1722
1723 const accessJwt = await createAccessJwt(did, jwtSecret);
1724 const refreshJwt = await createRefreshJwt(did, jwtSecret);
1725
1726 return Response.json({
1727 accessJwt,
1728 refreshJwt,
1729 handle: handle || did,
1730 did,
1731 active: true,
1732 });
1733 }
1734
1735 async handleGetSession(request) {
1736 const authHeader = request.headers.get('Authorization');
1737 if (!authHeader || !authHeader.startsWith('Bearer ')) {
1738 return errorResponse(
1739 'AuthRequired',
1740 'Missing or invalid authorization header',
1741 401,
1742 );
1743 }
1744
1745 const token = authHeader.slice(7); // Remove 'Bearer '
1746 const jwtSecret = this.env?.JWT_SECRET;
1747 if (!jwtSecret) {
1748 return errorResponse(
1749 'InternalServerError',
1750 'Server not configured for authentication',
1751 500,
1752 );
1753 }
1754
1755 try {
1756 const payload = await verifyAccessJwt(token, jwtSecret);
1757 const did = payload.sub;
1758 const handle = await this.getHandleForDid(did);
1759
1760 return Response.json({
1761 handle: handle || did,
1762 did,
1763 active: true,
1764 });
1765 } catch (err) {
1766 return errorResponse('InvalidToken', err.message, 401);
1767 }
1768 }
1769
1770 async handleRefreshSession(request) {
1771 const authHeader = request.headers.get('Authorization');
1772 if (!authHeader || !authHeader.startsWith('Bearer ')) {
1773 return errorResponse(
1774 'AuthRequired',
1775 'Missing or invalid authorization header',
1776 401,
1777 );
1778 }
1779
1780 const token = authHeader.slice(7); // Remove 'Bearer '
1781 const jwtSecret = this.env?.JWT_SECRET;
1782 if (!jwtSecret) {
1783 return errorResponse(
1784 'InternalServerError',
1785 'Server not configured for authentication',
1786 500,
1787 );
1788 }
1789
1790 try {
1791 const payload = await verifyRefreshJwt(token, jwtSecret);
1792 const did = payload.sub;
1793 const handle = await this.getHandleForDid(did);
1794
1795 // Issue fresh tokens
1796 const accessJwt = await createAccessJwt(did, jwtSecret);
1797 const refreshJwt = await createRefreshJwt(did, jwtSecret);
1798
1799 return Response.json({
1800 accessJwt,
1801 refreshJwt,
1802 handle: handle || did,
1803 did,
1804 active: true,
1805 });
1806 } catch (err) {
1807 if (err.message === 'Token expired') {
1808 return errorResponse('ExpiredToken', 'Refresh token has expired', 400);
1809 }
1810 return errorResponse('InvalidToken', err.message, 400);
1811 }
1812 }
1813
1814 async handleGetPreferences(_request) {
1815 // Preferences are stored per-user in their DO
1816 const preferences = (await this.state.storage.get('preferences')) || [];
1817 return Response.json({ preferences });
1818 }
1819
1820 async handlePutPreferences(request) {
1821 const body = await request.json();
1822 const { preferences } = body;
1823 if (!Array.isArray(preferences)) {
1824 return errorResponse(
1825 'InvalidRequest',
1826 'preferences must be an array',
1827 400,
1828 );
1829 }
1830 await this.state.storage.put('preferences', preferences);
1831 return Response.json({});
1832 }
1833
1834 async getHandleForDid(did) {
1835 // Check if this DID has a handle registered
1836 const handleMap = (await this.state.storage.get('handleMap')) || {};
1837 for (const [handle, mappedDid] of Object.entries(handleMap)) {
1838 if (mappedDid === did) return handle;
1839 }
1840 // Check instance's own handle
1841 const instanceDid = await this.getDid();
1842 if (instanceDid === did) {
1843 return await this.state.storage.get('handle');
1844 }
1845 return null;
1846 }
1847
1848 async createServiceAuthForAppView(did, lxm) {
1849 const signingKey = await this.getSigningKey();
1850 return createServiceJwt({
1851 iss: did,
1852 aud: 'did:web:api.bsky.app',
1853 lxm,
1854 signingKey,
1855 });
1856 }
1857
1858 async handleAppViewProxy(request, userDid) {
1859 const url = new URL(request.url);
1860 // Extract lexicon method from path: /xrpc/app.bsky.actor.getPreferences -> app.bsky.actor.getPreferences
1861 const lxm = url.pathname.replace('/xrpc/', '');
1862
1863 // Create service auth JWT
1864 const serviceJwt = await this.createServiceAuthForAppView(userDid, lxm);
1865
1866 // Build AppView URL
1867 const appViewUrl = new URL(
1868 url.pathname + url.search,
1869 'https://api.bsky.app',
1870 );
1871
1872 // Forward request with service auth
1873 const headers = new Headers();
1874 headers.set('Authorization', `Bearer ${serviceJwt}`);
1875 headers.set(
1876 'Content-Type',
1877 request.headers.get('Content-Type') || 'application/json',
1878 );
1879 if (request.headers.get('Accept')) {
1880 headers.set('Accept', request.headers.get('Accept'));
1881 }
1882 if (request.headers.get('Accept-Language')) {
1883 headers.set('Accept-Language', request.headers.get('Accept-Language'));
1884 }
1885
1886 const proxyReq = new Request(appViewUrl.toString(), {
1887 method: request.method,
1888 headers,
1889 body:
1890 request.method !== 'GET' && request.method !== 'HEAD'
1891 ? request.body
1892 : undefined,
1893 });
1894
1895 try {
1896 const response = await fetch(proxyReq);
1897 // Return the response with CORS headers
1898 const responseHeaders = new Headers(response.headers);
1899 responseHeaders.set('Access-Control-Allow-Origin', '*');
1900 return new Response(response.body, {
1901 status: response.status,
1902 statusText: response.statusText,
1903 headers: responseHeaders,
1904 });
1905 } catch (err) {
1906 return errorResponse(
1907 'UpstreamFailure',
1908 `Failed to reach AppView: ${err.message}`,
1909 502,
1910 );
1911 }
1912 }
1913
1914 async handleListRepos() {
1915 const registeredDids =
1916 (await this.state.storage.get('registeredDids')) || [];
1917 const did = await this.getDid();
1918 const repos = did
1919 ? [{ did, head: null, rev: null }]
1920 : registeredDids.map((d) => ({ did: d, head: null, rev: null }));
1921 return Response.json({ repos });
1922 }
1923
1924 async handleCreateRecord(request) {
1925 const body = await request.json();
1926 if (!body.collection || !body.record) {
1927 return errorResponse(
1928 'InvalidRequest',
1929 'missing collection or record',
1930 400,
1931 );
1932 }
1933 try {
1934 const result = await this.createRecord(
1935 body.collection,
1936 body.record,
1937 body.rkey,
1938 );
1939 const head = await this.state.storage.get('head');
1940 const rev = await this.state.storage.get('rev');
1941 return Response.json({
1942 uri: result.uri,
1943 cid: result.cid,
1944 commit: { cid: head, rev },
1945 validationStatus: 'valid',
1946 });
1947 } catch (err) {
1948 return errorResponse('InternalError', err.message, 500);
1949 }
1950 }
1951
1952 async handleDeleteRecord(request) {
1953 const body = await request.json();
1954 if (!body.collection || !body.rkey) {
1955 return errorResponse('InvalidRequest', 'missing collection or rkey', 400);
1956 }
1957 try {
1958 const result = await this.deleteRecord(body.collection, body.rkey);
1959 if (result.error) {
1960 return Response.json(result, { status: 404 });
1961 }
1962 return Response.json({});
1963 } catch (err) {
1964 return errorResponse('InternalError', err.message, 500);
1965 }
1966 }
1967
1968 async handlePutRecord(request) {
1969 const body = await request.json();
1970 if (!body.collection || !body.rkey || !body.record) {
1971 return errorResponse(
1972 'InvalidRequest',
1973 'missing collection, rkey, or record',
1974 400,
1975 );
1976 }
1977 try {
1978 // putRecord is like createRecord but with a specific rkey (upsert)
1979 const result = await this.createRecord(
1980 body.collection,
1981 body.record,
1982 body.rkey,
1983 );
1984 const head = await this.state.storage.get('head');
1985 const rev = await this.state.storage.get('rev');
1986 return Response.json({
1987 uri: result.uri,
1988 cid: result.cid,
1989 commit: { cid: head, rev },
1990 validationStatus: 'valid',
1991 });
1992 } catch (err) {
1993 return errorResponse('InternalError', err.message, 500);
1994 }
1995 }
1996
1997 async handleApplyWrites(request) {
1998 const body = await request.json();
1999 if (!body.writes || !Array.isArray(body.writes)) {
2000 return errorResponse('InvalidRequest', 'missing writes array', 400);
2001 }
2002 try {
2003 const results = [];
2004 for (const write of body.writes) {
2005 const type = write.$type;
2006 if (type === 'com.atproto.repo.applyWrites#create') {
2007 const result = await this.createRecord(
2008 write.collection,
2009 write.value,
2010 write.rkey,
2011 );
2012 results.push({
2013 $type: 'com.atproto.repo.applyWrites#createResult',
2014 uri: result.uri,
2015 cid: result.cid,
2016 validationStatus: 'valid',
2017 });
2018 } else if (type === 'com.atproto.repo.applyWrites#update') {
2019 const result = await this.createRecord(
2020 write.collection,
2021 write.value,
2022 write.rkey,
2023 );
2024 results.push({
2025 $type: 'com.atproto.repo.applyWrites#updateResult',
2026 uri: result.uri,
2027 cid: result.cid,
2028 validationStatus: 'valid',
2029 });
2030 } else if (type === 'com.atproto.repo.applyWrites#delete') {
2031 await this.deleteRecord(write.collection, write.rkey);
2032 results.push({
2033 $type: 'com.atproto.repo.applyWrites#deleteResult',
2034 });
2035 } else {
2036 return errorResponse(
2037 'InvalidRequest',
2038 `Unknown write operation type: ${type}`,
2039 400,
2040 );
2041 }
2042 }
2043 // Return commit info
2044 const head = await this.state.storage.get('head');
2045 const rev = await this.state.storage.get('rev');
2046 return Response.json({ commit: { cid: head, rev }, results });
2047 } catch (err) {
2048 return errorResponse('InternalError', err.message, 500);
2049 }
2050 }
2051
2052 async handleGetRecord(url) {
2053 const collection = url.searchParams.get('collection');
2054 const rkey = url.searchParams.get('rkey');
2055 if (!collection || !rkey) {
2056 return errorResponse('InvalidRequest', 'missing collection or rkey', 400);
2057 }
2058 const did = await this.getDid();
2059 const uri = `at://${did}/${collection}/${rkey}`;
2060 const rows = this.sql
2061 .exec(`SELECT cid, value FROM records WHERE uri = ?`, uri)
2062 .toArray();
2063 if (rows.length === 0) {
2064 return errorResponse('RecordNotFound', 'record not found', 404);
2065 }
2066 const row = rows[0];
2067 const value = cborDecode(new Uint8Array(row.value));
2068 return Response.json({ uri, cid: row.cid, value });
2069 }
2070
2071 async handleDescribeRepo() {
2072 const did = await this.getDid();
2073 if (!did) {
2074 return errorResponse('RepoNotFound', 'repo not found', 404);
2075 }
2076 const handle = await this.state.storage.get('handle');
2077 // Get unique collections
2078 const collections = this.sql
2079 .exec(`SELECT DISTINCT collection FROM records`)
2080 .toArray()
2081 .map((r) => r.collection);
2082
2083 return Response.json({
2084 handle: handle || did,
2085 did,
2086 didDoc: {},
2087 collections,
2088 handleIsCorrect: !!handle,
2089 });
2090 }
2091
2092 async handleListRecords(url) {
2093 const collection = url.searchParams.get('collection');
2094 if (!collection) {
2095 return errorResponse('InvalidRequest', 'missing collection', 400);
2096 }
2097 const limit = Math.min(
2098 parseInt(url.searchParams.get('limit') || '50', 10),
2099 100,
2100 );
2101 const reverse = url.searchParams.get('reverse') === 'true';
2102 const _cursor = url.searchParams.get('cursor');
2103
2104 const _did = await this.getDid();
2105 const query = `SELECT uri, cid, value FROM records WHERE collection = ? ORDER BY rkey ${reverse ? 'DESC' : 'ASC'} LIMIT ?`;
2106 const params = [collection, limit + 1];
2107
2108 const rows = this.sql.exec(query, ...params).toArray();
2109 const hasMore = rows.length > limit;
2110 const records = rows.slice(0, limit).map((r) => ({
2111 uri: r.uri,
2112 cid: r.cid,
2113 value: cborDecode(new Uint8Array(r.value)),
2114 }));
2115
2116 return Response.json({
2117 records,
2118 cursor: hasMore ? records[records.length - 1]?.uri : undefined,
2119 });
2120 }
2121
2122 handleGetLatestCommit() {
2123 const commits = this.sql
2124 .exec(`SELECT cid, rev FROM commits ORDER BY seq DESC LIMIT 1`)
2125 .toArray();
2126 if (commits.length === 0) {
2127 return errorResponse('RepoNotFound', 'repo not found', 404);
2128 }
2129 return Response.json({ cid: commits[0].cid, rev: commits[0].rev });
2130 }
2131
2132 async handleGetRepoStatus() {
2133 const did = await this.getDid();
2134 const commits = this.sql
2135 .exec(`SELECT cid, rev FROM commits ORDER BY seq DESC LIMIT 1`)
2136 .toArray();
2137 if (commits.length === 0 || !did) {
2138 return errorResponse('RepoNotFound', 'repo not found', 404);
2139 }
2140 return Response.json({
2141 did,
2142 active: true,
2143 status: 'active',
2144 rev: commits[0].rev,
2145 });
2146 }
2147
2148 handleGetRepo() {
2149 const commits = this.sql
2150 .exec(`SELECT cid FROM commits ORDER BY seq DESC LIMIT 1`)
2151 .toArray();
2152 if (commits.length === 0) {
2153 return errorResponse('RepoNotFound', 'repo not found', 404);
2154 }
2155
2156 // Only include blocks reachable from the current commit
2157 const commitCid = commits[0].cid;
2158 const neededCids = new Set();
2159
2160 // Helper to get block data
2161 const getBlock = (cid) => {
2162 const rows = this.sql
2163 .exec(`SELECT data FROM blocks WHERE cid = ?`, cid)
2164 .toArray();
2165 return rows.length > 0 ? new Uint8Array(rows[0].data) : null;
2166 };
2167
2168 // Collect all reachable blocks starting from commit
2169 const collectBlocks = (cid) => {
2170 if (neededCids.has(cid)) return;
2171 neededCids.add(cid);
2172
2173 const data = getBlock(cid);
2174 if (!data) return;
2175
2176 // Decode CBOR to find CID references
2177 try {
2178 const decoded = cborDecode(data);
2179 if (decoded && typeof decoded === 'object') {
2180 // Commit object - follow 'data' (MST root)
2181 if (decoded.data instanceof Uint8Array) {
2182 collectBlocks(cidToString(decoded.data));
2183 }
2184 // MST node - follow 'l' and entries' 'v' and 't'
2185 if (decoded.l instanceof Uint8Array) {
2186 collectBlocks(cidToString(decoded.l));
2187 }
2188 if (Array.isArray(decoded.e)) {
2189 for (const entry of decoded.e) {
2190 if (entry.v instanceof Uint8Array) {
2191 collectBlocks(cidToString(entry.v));
2192 }
2193 if (entry.t instanceof Uint8Array) {
2194 collectBlocks(cidToString(entry.t));
2195 }
2196 }
2197 }
2198 }
2199 } catch (_e) {
2200 // Not a structured block, that's fine
2201 }
2202 };
2203
2204 collectBlocks(commitCid);
2205
2206 // Build CAR with only needed blocks
2207 const blocksForCar = [];
2208 for (const cid of neededCids) {
2209 const data = getBlock(cid);
2210 if (data) {
2211 blocksForCar.push({ cid, data });
2212 }
2213 }
2214
2215 const car = buildCarFile(commitCid, blocksForCar);
2216 return new Response(car, {
2217 headers: { 'content-type': 'application/vnd.ipld.car' },
2218 });
2219 }
2220
2221 async handleSyncGetRecord(url) {
2222 const collection = url.searchParams.get('collection');
2223 const rkey = url.searchParams.get('rkey');
2224 if (!collection || !rkey) {
2225 return errorResponse('InvalidRequest', 'missing collection or rkey', 400);
2226 }
2227 const did = await this.getDid();
2228 const uri = `at://${did}/${collection}/${rkey}`;
2229 const rows = this.sql
2230 .exec(`SELECT cid FROM records WHERE uri = ?`, uri)
2231 .toArray();
2232 if (rows.length === 0) {
2233 return errorResponse('RecordNotFound', 'record not found', 404);
2234 }
2235 const recordCid = rows[0].cid;
2236
2237 // Get latest commit
2238 const commits = this.sql
2239 .exec(`SELECT cid FROM commits ORDER BY seq DESC LIMIT 1`)
2240 .toArray();
2241 if (commits.length === 0) {
2242 return errorResponse('RepoNotFound', 'no commits', 404);
2243 }
2244 const commitCid = commits[0].cid;
2245
2246 // Build proof chain: commit -> MST path -> record
2247 // Include commit block, all MST nodes on path to record, and record block
2248 const blocks = [];
2249 const included = new Set();
2250
2251 const addBlock = (cidStr) => {
2252 if (included.has(cidStr)) return;
2253 included.add(cidStr);
2254 const blockRows = this.sql
2255 .exec(`SELECT data FROM blocks WHERE cid = ?`, cidStr)
2256 .toArray();
2257 if (blockRows.length > 0) {
2258 blocks.push({ cid: cidStr, data: new Uint8Array(blockRows[0].data) });
2259 }
2260 };
2261
2262 // Add commit block
2263 addBlock(commitCid);
2264
2265 // Get commit to find data root
2266 const commitRows = this.sql
2267 .exec(`SELECT data FROM blocks WHERE cid = ?`, commitCid)
2268 .toArray();
2269 if (commitRows.length > 0) {
2270 const commit = cborDecode(new Uint8Array(commitRows[0].data));
2271 if (commit.data) {
2272 const dataRootCid = cidToString(commit.data);
2273 // Collect MST path blocks (this includes all MST nodes)
2274 const mstBlocks = this.collectMstBlocks(dataRootCid);
2275 for (const block of mstBlocks) {
2276 addBlock(block.cid);
2277 }
2278 }
2279 }
2280
2281 // Add record block
2282 addBlock(recordCid);
2283
2284 const car = buildCarFile(commitCid, blocks);
2285 return new Response(car, {
2286 headers: { 'content-type': 'application/vnd.ipld.car' },
2287 });
2288 }
2289
2290 handleSubscribeRepos(request, url) {
2291 const upgradeHeader = request.headers.get('Upgrade');
2292 if (upgradeHeader !== 'websocket') {
2293 return new Response('expected websocket', { status: 426 });
2294 }
2295 const { 0: client, 1: server } = new WebSocketPair();
2296 this.state.acceptWebSocket(server);
2297 const cursor = url.searchParams.get('cursor');
2298 if (cursor) {
2299 const events = this.sql
2300 .exec(
2301 `SELECT * FROM seq_events WHERE seq > ? ORDER BY seq`,
2302 parseInt(cursor, 10),
2303 )
2304 .toArray();
2305 for (const evt of events) {
2306 server.send(this.formatEvent(evt));
2307 }
2308 }
2309 return new Response(null, { status: 101, webSocket: client });
2310 }
2311
2312 async fetch(request) {
2313 const url = new URL(request.url);
2314 const route = pdsRoutes[url.pathname];
2315
2316 // Check for local route first
2317 if (route) {
2318 if (route.method && request.method !== route.method) {
2319 return errorResponse('MethodNotAllowed', 'method not allowed', 405);
2320 }
2321 return route.handler(this, request, url);
2322 }
2323
2324 // Handle app.bsky.* proxy requests (only if no local route)
2325 if (url.pathname.startsWith('/xrpc/app.bsky.')) {
2326 const userDid = request.headers.get('x-authed-did');
2327 if (!userDid) {
2328 return errorResponse('Unauthorized', 'Missing auth context', 401);
2329 }
2330 return this.handleAppViewProxy(request, userDid);
2331 }
2332
2333 return errorResponse('NotFound', 'not found', 404);
2334 }
2335}
2336
2337const corsHeaders = {
2338 'Access-Control-Allow-Origin': '*',
2339 'Access-Control-Allow-Methods': 'GET, POST, OPTIONS',
2340 'Access-Control-Allow-Headers':
2341 'Content-Type, Authorization, atproto-accept-labelers, atproto-proxy, x-bsky-topics',
2342};
2343
2344function addCorsHeaders(response) {
2345 const newHeaders = new Headers(response.headers);
2346 for (const [key, value] of Object.entries(corsHeaders)) {
2347 newHeaders.set(key, value);
2348 }
2349 return new Response(response.body, {
2350 status: response.status,
2351 statusText: response.statusText,
2352 headers: newHeaders,
2353 });
2354}
2355
2356export default {
2357 async fetch(request, env) {
2358 // Handle CORS preflight
2359 if (request.method === 'OPTIONS') {
2360 return new Response(null, { headers: corsHeaders });
2361 }
2362
2363 const response = await handleRequest(request, env);
2364 // Don't wrap WebSocket upgrades - they need the webSocket property preserved
2365 if (response.status === 101) {
2366 return response;
2367 }
2368 return addCorsHeaders(response);
2369 },
2370};
2371
2372// Extract subdomain from hostname (e.g., "alice" from "alice.foo.workers.dev")
2373function getSubdomain(hostname) {
2374 const parts = hostname.split('.');
2375 // workers.dev domains: [subdomain?].[worker-name].[account].workers.dev
2376 // If more than 4 parts, first part(s) are user subdomain
2377 if (parts.length > 4 && parts.slice(-2).join('.') === 'workers.dev') {
2378 return parts.slice(0, -4).join('.');
2379 }
2380 // Custom domains: check if there's a subdomain before the base
2381 // For now, assume no subdomain on custom domains
2382 return null;
2383}
2384
2385/**
2386 * Verify auth and return DID from token
2387 * @param {Request} request - HTTP request with Authorization header
2388 * @param {Object} env - Environment with JWT_SECRET
2389 * @returns {Promise<{did: string} | {error: Response}>} DID or error response
2390 */
2391async function requireAuth(request, env) {
2392 const authHeader = request.headers.get('Authorization');
2393 if (!authHeader || !authHeader.startsWith('Bearer ')) {
2394 return {
2395 error: Response.json(
2396 {
2397 error: 'AuthRequired',
2398 message: 'Authentication required',
2399 },
2400 { status: 401 },
2401 ),
2402 };
2403 }
2404
2405 const token = authHeader.slice(7);
2406 const jwtSecret = env?.JWT_SECRET;
2407 if (!jwtSecret) {
2408 return {
2409 error: Response.json(
2410 {
2411 error: 'InternalServerError',
2412 message: 'Server not configured for authentication',
2413 },
2414 { status: 500 },
2415 ),
2416 };
2417 }
2418
2419 try {
2420 const payload = await verifyAccessJwt(token, jwtSecret);
2421 return { did: payload.sub };
2422 } catch (err) {
2423 return {
2424 error: Response.json(
2425 {
2426 error: 'InvalidToken',
2427 message: err.message,
2428 },
2429 { status: 401 },
2430 ),
2431 };
2432 }
2433}
2434
2435async function handleAuthenticatedRepoWrite(request, env) {
2436 const auth = await requireAuth(request, env);
2437 if (auth.error) return auth.error;
2438
2439 const body = await request.json();
2440 const repo = body.repo;
2441 if (!repo) {
2442 return errorResponse('InvalidRequest', 'missing repo param', 400);
2443 }
2444
2445 if (auth.did !== repo) {
2446 return errorResponse('Forbidden', "Cannot modify another user's repo", 403);
2447 }
2448
2449 const id = env.PDS.idFromName(repo);
2450 const pds = env.PDS.get(id);
2451 const response = await pds.fetch(
2452 new Request(request.url, {
2453 method: 'POST',
2454 headers: request.headers,
2455 body: JSON.stringify(body),
2456 }),
2457 );
2458
2459 // Notify relay of updates on successful writes
2460 if (response.ok) {
2461 const url = new URL(request.url);
2462 notifyCrawlers(env, url.hostname);
2463 }
2464
2465 return response;
2466}
2467
2468async function handleRequest(request, env) {
2469 const url = new URL(request.url);
2470 const subdomain = getSubdomain(url.hostname);
2471
2472 // Handle resolution via subdomain or bare domain
2473 if (url.pathname === '/.well-known/atproto-did') {
2474 // Look up handle -> DID in default DO
2475 // Use subdomain if present, otherwise try bare hostname as handle
2476 const handleToResolve = subdomain || url.hostname;
2477 const defaultId = env.PDS.idFromName('default');
2478 const defaultPds = env.PDS.get(defaultId);
2479 const resolveRes = await defaultPds.fetch(
2480 new Request(
2481 `http://internal/resolve-handle?handle=${encodeURIComponent(handleToResolve)}`,
2482 ),
2483 );
2484 if (!resolveRes.ok) {
2485 return new Response('Handle not found', { status: 404 });
2486 }
2487 const { did } = await resolveRes.json();
2488 return new Response(did, { headers: { 'Content-Type': 'text/plain' } });
2489 }
2490
2491 // describeServer - works on bare domain
2492 if (url.pathname === '/xrpc/com.atproto.server.describeServer') {
2493 const defaultId = env.PDS.idFromName('default');
2494 const defaultPds = env.PDS.get(defaultId);
2495 const newReq = new Request(request.url, {
2496 method: request.method,
2497 headers: {
2498 ...Object.fromEntries(request.headers),
2499 'x-hostname': url.hostname,
2500 },
2501 });
2502 return defaultPds.fetch(newReq);
2503 }
2504
2505 // createSession - handle on default DO (has handleMap for identifier resolution)
2506 if (url.pathname === '/xrpc/com.atproto.server.createSession') {
2507 const defaultId = env.PDS.idFromName('default');
2508 const defaultPds = env.PDS.get(defaultId);
2509 return defaultPds.fetch(request);
2510 }
2511
2512 // getSession - route to default DO
2513 if (url.pathname === '/xrpc/com.atproto.server.getSession') {
2514 const defaultId = env.PDS.idFromName('default');
2515 const defaultPds = env.PDS.get(defaultId);
2516 return defaultPds.fetch(request);
2517 }
2518
2519 // refreshSession - route to default DO
2520 if (url.pathname === '/xrpc/com.atproto.server.refreshSession') {
2521 const defaultId = env.PDS.idFromName('default');
2522 const defaultPds = env.PDS.get(defaultId);
2523 return defaultPds.fetch(request);
2524 }
2525
2526 // Proxy app.bsky.* endpoints to Bluesky AppView
2527 if (url.pathname.startsWith('/xrpc/app.bsky.')) {
2528 // Authenticate the user first
2529 const auth = await requireAuth(request, env);
2530 if (auth.error) return auth.error;
2531
2532 // Route to the user's DO instance to create service auth and proxy
2533 const id = env.PDS.idFromName(auth.did);
2534 const pds = env.PDS.get(id);
2535 return pds.fetch(
2536 new Request(request.url, {
2537 method: request.method,
2538 headers: {
2539 ...Object.fromEntries(request.headers),
2540 'x-authed-did': auth.did, // Pass the authenticated DID
2541 },
2542 body:
2543 request.method !== 'GET' && request.method !== 'HEAD'
2544 ? request.body
2545 : undefined,
2546 }),
2547 );
2548 }
2549
2550 // Handle registration routes - go to default DO
2551 if (
2552 url.pathname === '/register-handle' ||
2553 url.pathname === '/resolve-handle'
2554 ) {
2555 const defaultId = env.PDS.idFromName('default');
2556 const defaultPds = env.PDS.get(defaultId);
2557 return defaultPds.fetch(request);
2558 }
2559
2560 // resolveHandle XRPC endpoint
2561 if (url.pathname === '/xrpc/com.atproto.identity.resolveHandle') {
2562 const handle = url.searchParams.get('handle');
2563 if (!handle) {
2564 return errorResponse('InvalidRequest', 'missing handle param', 400);
2565 }
2566 const defaultId = env.PDS.idFromName('default');
2567 const defaultPds = env.PDS.get(defaultId);
2568 const resolveRes = await defaultPds.fetch(
2569 new Request(
2570 `http://internal/resolve-handle?handle=${encodeURIComponent(handle)}`,
2571 ),
2572 );
2573 if (!resolveRes.ok) {
2574 return errorResponse('InvalidRequest', 'Unable to resolve handle', 400);
2575 }
2576 const { did } = await resolveRes.json();
2577 return Response.json({ did });
2578 }
2579
2580 // subscribeRepos WebSocket - route to default instance for firehose
2581 if (url.pathname === '/xrpc/com.atproto.sync.subscribeRepos') {
2582 const defaultId = env.PDS.idFromName('default');
2583 const defaultPds = env.PDS.get(defaultId);
2584 return defaultPds.fetch(request);
2585 }
2586
2587 // listRepos needs to aggregate from all registered DIDs
2588 if (url.pathname === '/xrpc/com.atproto.sync.listRepos') {
2589 const defaultId = env.PDS.idFromName('default');
2590 const defaultPds = env.PDS.get(defaultId);
2591 const regRes = await defaultPds.fetch(
2592 new Request('http://internal/get-registered-dids'),
2593 );
2594 const { dids } = await regRes.json();
2595
2596 const repos = [];
2597 for (const did of dids) {
2598 const id = env.PDS.idFromName(did);
2599 const pds = env.PDS.get(id);
2600 const infoRes = await pds.fetch(new Request('http://internal/repo-info'));
2601 const info = await infoRes.json();
2602 if (info.head) {
2603 repos.push({ did, head: info.head, rev: info.rev, active: true });
2604 }
2605 }
2606 return Response.json({ repos, cursor: undefined });
2607 }
2608
2609 // Repo endpoints use ?repo= param instead of ?did=
2610 if (
2611 url.pathname === '/xrpc/com.atproto.repo.describeRepo' ||
2612 url.pathname === '/xrpc/com.atproto.repo.listRecords' ||
2613 url.pathname === '/xrpc/com.atproto.repo.getRecord'
2614 ) {
2615 const repo = url.searchParams.get('repo');
2616 if (!repo) {
2617 return errorResponse('InvalidRequest', 'missing repo param', 400);
2618 }
2619 const id = env.PDS.idFromName(repo);
2620 const pds = env.PDS.get(id);
2621 return pds.fetch(request);
2622 }
2623
2624 // Sync endpoints use ?did= param
2625 if (
2626 url.pathname === '/xrpc/com.atproto.sync.getLatestCommit' ||
2627 url.pathname === '/xrpc/com.atproto.sync.getRepoStatus' ||
2628 url.pathname === '/xrpc/com.atproto.sync.getRepo' ||
2629 url.pathname === '/xrpc/com.atproto.sync.getRecord'
2630 ) {
2631 const did = url.searchParams.get('did');
2632 if (!did) {
2633 return errorResponse('InvalidRequest', 'missing did param', 400);
2634 }
2635 const id = env.PDS.idFromName(did);
2636 const pds = env.PDS.get(id);
2637 return pds.fetch(request);
2638 }
2639
2640 // Authenticated repo write endpoints
2641 const repoWriteEndpoints = [
2642 '/xrpc/com.atproto.repo.createRecord',
2643 '/xrpc/com.atproto.repo.deleteRecord',
2644 '/xrpc/com.atproto.repo.putRecord',
2645 '/xrpc/com.atproto.repo.applyWrites',
2646 ];
2647 if (repoWriteEndpoints.includes(url.pathname)) {
2648 return handleAuthenticatedRepoWrite(request, env);
2649 }
2650
2651 // Health check endpoint
2652 if (url.pathname === '/xrpc/_health') {
2653 return Response.json({ version: '0.1.0' });
2654 }
2655
2656 // Root path - ASCII art
2657 if (url.pathname === '/') {
2658 const ascii = `
2659 ██████╗ ██████╗ ███████╗ ██╗ ███████╗
2660 ██╔══██╗ ██╔══██╗ ██╔════╝ ██║ ██╔════╝
2661 ██████╔╝ ██║ ██║ ███████╗ ██║ ███████╗
2662 ██╔═══╝ ██║ ██║ ╚════██║ ██ ██║ ╚════██║
2663 ██║ ██████╔╝ ███████║ ██╗ ╚█████╔╝ ███████║
2664 ╚═╝ ╚═════╝ ╚══════╝ ╚═╝ ╚════╝ ╚══════╝
2665
2666 ATProto PDS on Cloudflare Workers
2667`;
2668 return new Response(ascii, {
2669 headers: { 'Content-Type': 'text/plain; charset=utf-8' },
2670 });
2671 }
2672
2673 // On init, register this DID with the default instance (requires ?did= param, no auth yet)
2674 if (url.pathname === '/init' && request.method === 'POST') {
2675 const did = url.searchParams.get('did');
2676 if (!did) {
2677 return errorResponse('InvalidRequest', 'missing did param', 400);
2678 }
2679 const body = await request.json();
2680
2681 // Register with default instance for discovery
2682 const defaultId = env.PDS.idFromName('default');
2683 const defaultPds = env.PDS.get(defaultId);
2684 await defaultPds.fetch(
2685 new Request('http://internal/register-did', {
2686 method: 'POST',
2687 body: JSON.stringify({ did }),
2688 }),
2689 );
2690
2691 // Register handle if provided
2692 if (body.handle) {
2693 await defaultPds.fetch(
2694 new Request('http://internal/register-handle', {
2695 method: 'POST',
2696 body: JSON.stringify({ did, handle: body.handle }),
2697 }),
2698 );
2699 }
2700
2701 // Forward to the actual PDS instance
2702 const id = env.PDS.idFromName(did);
2703 const pds = env.PDS.get(id);
2704 return pds.fetch(
2705 new Request(request.url, {
2706 method: 'POST',
2707 headers: request.headers,
2708 body: JSON.stringify(body),
2709 }),
2710 );
2711 }
2712
2713 // Unknown endpoint
2714 return errorResponse('NotFound', 'Endpoint not found', 404);
2715}