this repo has no description
1/** 2 * Minimal AT Protocol Personal Data Server (PDS) 3 * 4 * A single-file implementation of an ATProto PDS for Cloudflare Workers 5 * with Durable Objects. Implements the core protocol primitives: 6 * 7 * - CBOR/DAG-CBOR encoding for content-addressed data 8 * - CID generation (CIDv1 with dag-cbor + sha-256) 9 * - Merkle Search Tree (MST) for repository structure 10 * - P-256 signing with low-S normalization 11 * - CAR file building for repo sync 12 * - XRPC endpoints for repo operations and sync 13 * 14 * @see https://atproto.com 15 */ 16 17// === CONSTANTS === 18// CBOR primitive markers (RFC 8949) 19const CBOR_FALSE = 0xf4; 20const CBOR_TRUE = 0xf5; 21const CBOR_NULL = 0xf6; 22 23// DAG-CBOR CID link tag 24const CBOR_TAG_CID = 42; 25 26// === ERROR HELPER === 27function errorResponse(error, message, status) { 28 return Response.json({ error, message }, { status }); 29} 30 31// === CRAWLER NOTIFICATION === 32// Notify relays to come crawl us after writes (like official PDS) 33let lastCrawlNotify = 0; 34const CRAWL_NOTIFY_THRESHOLD = 20 * 60 * 1000; // 20 minutes (matches official PDS) 35 36async function notifyCrawlers(env, hostname) { 37 const now = Date.now(); 38 if (now - lastCrawlNotify < CRAWL_NOTIFY_THRESHOLD) { 39 return; // Throttle notifications 40 } 41 42 const relayHost = env.RELAY_HOST; 43 if (!relayHost) return; 44 45 lastCrawlNotify = now; 46 47 // Fire and forget - don't block writes on relay notification 48 fetch(`${relayHost}/xrpc/com.atproto.sync.requestCrawl`, { 49 method: 'POST', 50 headers: { 'Content-Type': 'application/json' }, 51 body: JSON.stringify({ hostname }), 52 }).catch((err) => { 53 console.log('Failed to notify relay:', err.message); 54 }); 55} 56 57// === CID WRAPPER === 58// Explicit CID type for DAG-CBOR encoding (avoids fragile heuristic detection) 59 60class CID { 61 constructor(bytes) { 62 if (!(bytes instanceof Uint8Array)) { 63 throw new Error('CID must be constructed with Uint8Array'); 64 } 65 this.bytes = bytes; 66 } 67} 68 69// === CBOR ENCODING === 70// Minimal deterministic CBOR (RFC 8949) - sorted keys, minimal integers 71 72/** 73 * Encode CBOR type header (major type + length) 74 * @param {number[]} parts - Array to push bytes to 75 * @param {number} majorType - CBOR major type (0-7) 76 * @param {number} length - Value or length to encode 77 */ 78function encodeHead(parts, majorType, length) { 79 const mt = majorType << 5; 80 if (length < 24) { 81 parts.push(mt | length); 82 } else if (length < 256) { 83 parts.push(mt | 24, length); 84 } else if (length < 65536) { 85 parts.push(mt | 25, length >> 8, length & 0xff); 86 } else if (length < 4294967296) { 87 // Use Math.floor instead of bitshift to avoid 32-bit signed integer overflow 88 parts.push( 89 mt | 26, 90 Math.floor(length / 0x1000000) & 0xff, 91 Math.floor(length / 0x10000) & 0xff, 92 Math.floor(length / 0x100) & 0xff, 93 length & 0xff, 94 ); 95 } 96} 97 98/** 99 * Encode a value as CBOR bytes (RFC 8949 deterministic encoding) 100 * @param {*} value - Value to encode (null, boolean, number, string, Uint8Array, array, or object) 101 * @returns {Uint8Array} CBOR-encoded bytes 102 */ 103export function cborEncode(value) { 104 const parts = []; 105 106 function encode(val) { 107 if (val === null) { 108 parts.push(CBOR_NULL); 109 } else if (val === true) { 110 parts.push(CBOR_TRUE); 111 } else if (val === false) { 112 parts.push(CBOR_FALSE); 113 } else if (typeof val === 'number') { 114 encodeInteger(val); 115 } else if (typeof val === 'string') { 116 const bytes = new TextEncoder().encode(val); 117 encodeHead(parts, 3, bytes.length); // major type 3 = text string 118 parts.push(...bytes); 119 } else if (val instanceof Uint8Array) { 120 encodeHead(parts, 2, val.length); // major type 2 = byte string 121 parts.push(...val); 122 } else if (Array.isArray(val)) { 123 encodeHead(parts, 4, val.length); // major type 4 = array 124 for (const item of val) encode(item); 125 } else if (typeof val === 'object') { 126 // Sort keys for deterministic encoding 127 const keys = Object.keys(val).sort(); 128 encodeHead(parts, 5, keys.length); // major type 5 = map 129 for (const key of keys) { 130 encode(key); 131 encode(val[key]); 132 } 133 } 134 } 135 136 function encodeInteger(n) { 137 if (n >= 0) { 138 encodeHead(parts, 0, n); // major type 0 = unsigned int 139 } else { 140 encodeHead(parts, 1, -n - 1); // major type 1 = negative int 141 } 142 } 143 144 encode(value); 145 return new Uint8Array(parts); 146} 147 148// DAG-CBOR encoder that handles CIDs with tag 42 149function cborEncodeDagCbor(value) { 150 const parts = []; 151 152 function encode(val) { 153 if (val === null) { 154 parts.push(CBOR_NULL); 155 } else if (val === true) { 156 parts.push(CBOR_TRUE); 157 } else if (val === false) { 158 parts.push(CBOR_FALSE); 159 } else if (typeof val === 'number') { 160 if (Number.isInteger(val) && val >= 0) { 161 encodeHead(parts, 0, val); 162 } else if (Number.isInteger(val) && val < 0) { 163 encodeHead(parts, 1, -val - 1); 164 } 165 } else if (typeof val === 'string') { 166 const bytes = new TextEncoder().encode(val); 167 encodeHead(parts, 3, bytes.length); 168 parts.push(...bytes); 169 } else if (val instanceof CID) { 170 // CID links in DAG-CBOR use tag 42 + 0x00 multibase prefix 171 // The 0x00 prefix indicates "identity" multibase (raw bytes) 172 parts.push(0xd8, CBOR_TAG_CID); 173 encodeHead(parts, 2, val.bytes.length + 1); // +1 for 0x00 prefix 174 parts.push(0x00); 175 parts.push(...val.bytes); 176 } else if (val instanceof Uint8Array) { 177 // Regular byte string 178 encodeHead(parts, 2, val.length); 179 parts.push(...val); 180 } else if (Array.isArray(val)) { 181 encodeHead(parts, 4, val.length); 182 for (const item of val) encode(item); 183 } else if (typeof val === 'object') { 184 // DAG-CBOR: sort keys by length first, then lexicographically 185 // (differs from standard CBOR which sorts lexicographically only) 186 const keys = Object.keys(val).filter((k) => val[k] !== undefined); 187 keys.sort((a, b) => { 188 if (a.length !== b.length) return a.length - b.length; 189 return a < b ? -1 : a > b ? 1 : 0; 190 }); 191 encodeHead(parts, 5, keys.length); 192 for (const key of keys) { 193 const keyBytes = new TextEncoder().encode(key); 194 encodeHead(parts, 3, keyBytes.length); 195 parts.push(...keyBytes); 196 encode(val[key]); 197 } 198 } 199 } 200 201 encode(value); 202 return new Uint8Array(parts); 203} 204 205/** 206 * Decode CBOR bytes to a JavaScript value 207 * @param {Uint8Array} bytes - CBOR-encoded bytes 208 * @returns {*} Decoded value 209 */ 210export function cborDecode(bytes) { 211 let offset = 0; 212 213 function read() { 214 const initial = bytes[offset++]; 215 const major = initial >> 5; 216 const info = initial & 0x1f; 217 218 let length = info; 219 if (info === 24) length = bytes[offset++]; 220 else if (info === 25) { 221 length = (bytes[offset++] << 8) | bytes[offset++]; 222 } else if (info === 26) { 223 // Use multiplication instead of bitshift to avoid 32-bit signed integer overflow 224 length = 225 bytes[offset++] * 0x1000000 + 226 bytes[offset++] * 0x10000 + 227 bytes[offset++] * 0x100 + 228 bytes[offset++]; 229 } 230 231 switch (major) { 232 case 0: 233 return length; // unsigned int 234 case 1: 235 return -1 - length; // negative int 236 case 2: { 237 // byte string 238 const data = bytes.slice(offset, offset + length); 239 offset += length; 240 return data; 241 } 242 case 3: { 243 // text string 244 const data = new TextDecoder().decode( 245 bytes.slice(offset, offset + length), 246 ); 247 offset += length; 248 return data; 249 } 250 case 4: { 251 // array 252 const arr = []; 253 for (let i = 0; i < length; i++) arr.push(read()); 254 return arr; 255 } 256 case 5: { 257 // map 258 const obj = {}; 259 for (let i = 0; i < length; i++) { 260 const key = read(); 261 obj[key] = read(); 262 } 263 return obj; 264 } 265 case 6: { 266 // tag 267 // length is the tag number 268 const taggedValue = read(); 269 if (length === CBOR_TAG_CID) { 270 // CID link: byte string with 0x00 multibase prefix, return raw CID bytes 271 return taggedValue.slice(1); // strip 0x00 prefix 272 } 273 return taggedValue; 274 } 275 case 7: { 276 // special 277 if (info === 20) return false; 278 if (info === 21) return true; 279 if (info === 22) return null; 280 return undefined; 281 } 282 } 283 } 284 285 return read(); 286} 287 288// === CID GENERATION === 289// dag-cbor (0x71) + sha-256 (0x12) + 32 bytes 290 291/** 292 * Create a CIDv1 (dag-cbor + sha-256) from raw bytes 293 * @param {Uint8Array} bytes - Content to hash 294 * @returns {Promise<Uint8Array>} CID bytes (36 bytes: version + codec + multihash) 295 */ 296export async function createCid(bytes) { 297 const hash = await crypto.subtle.digest('SHA-256', bytes); 298 const hashBytes = new Uint8Array(hash); 299 300 // CIDv1: version(1) + codec(dag-cbor=0x71) + multihash(sha256) 301 // Multihash: hash-type(0x12) + length(0x20=32) + digest 302 const cid = new Uint8Array(2 + 2 + 32); 303 cid[0] = 0x01; // CIDv1 304 cid[1] = 0x71; // dag-cbor codec 305 cid[2] = 0x12; // sha-256 306 cid[3] = 0x20; // 32 bytes 307 cid.set(hashBytes, 4); 308 309 return cid; 310} 311 312/** 313 * Convert CID bytes to base32lower string representation 314 * @param {Uint8Array} cid - CID bytes 315 * @returns {string} Base32lower-encoded CID with 'b' prefix 316 */ 317export function cidToString(cid) { 318 // base32lower encoding for CIDv1 319 return `b${base32Encode(cid)}`; 320} 321 322/** 323 * Encode bytes as base32lower string 324 * @param {Uint8Array} bytes - Bytes to encode 325 * @returns {string} Base32lower-encoded string 326 */ 327export function base32Encode(bytes) { 328 const alphabet = 'abcdefghijklmnopqrstuvwxyz234567'; 329 let result = ''; 330 let bits = 0; 331 let value = 0; 332 333 for (const byte of bytes) { 334 value = (value << 8) | byte; 335 bits += 8; 336 while (bits >= 5) { 337 bits -= 5; 338 result += alphabet[(value >> bits) & 31]; 339 } 340 } 341 342 if (bits > 0) { 343 result += alphabet[(value << (5 - bits)) & 31]; 344 } 345 346 return result; 347} 348 349// === TID GENERATION === 350// Timestamp-based IDs: base32-sort encoded microseconds + clock ID 351 352const TID_CHARS = '234567abcdefghijklmnopqrstuvwxyz'; 353let lastTimestamp = 0; 354const clockId = Math.floor(Math.random() * 1024); 355 356/** 357 * Generate a timestamp-based ID (TID) for record keys 358 * Monotonic within a process, sortable by time 359 * @returns {string} 13-character base32-sort encoded TID 360 */ 361export function createTid() { 362 let timestamp = Date.now() * 1000; // microseconds 363 364 // Ensure monotonic 365 if (timestamp <= lastTimestamp) { 366 timestamp = lastTimestamp + 1; 367 } 368 lastTimestamp = timestamp; 369 370 // 13 chars: 11 for timestamp (64 bits but only ~53 used), 2 for clock ID 371 let tid = ''; 372 373 // Encode timestamp (high bits first for sortability) 374 let ts = timestamp; 375 for (let i = 0; i < 11; i++) { 376 tid = TID_CHARS[ts & 31] + tid; 377 ts = Math.floor(ts / 32); 378 } 379 380 // Append clock ID (2 chars) 381 tid += TID_CHARS[(clockId >> 5) & 31]; 382 tid += TID_CHARS[clockId & 31]; 383 384 return tid; 385} 386 387// === P-256 SIGNING === 388// Web Crypto ECDSA with P-256 curve 389 390/** 391 * Import a raw P-256 private key for signing 392 * @param {Uint8Array} privateKeyBytes - 32-byte raw private key 393 * @returns {Promise<CryptoKey>} Web Crypto key handle 394 */ 395export async function importPrivateKey(privateKeyBytes) { 396 // Validate private key length (P-256 requires exactly 32 bytes) 397 if ( 398 !(privateKeyBytes instanceof Uint8Array) || 399 privateKeyBytes.length !== 32 400 ) { 401 throw new Error( 402 `Invalid private key: expected 32 bytes, got ${privateKeyBytes?.length ?? 'non-Uint8Array'}`, 403 ); 404 } 405 406 // PKCS#8 wrapper for raw P-256 private key 407 const pkcs8Prefix = new Uint8Array([ 408 0x30, 0x41, 0x02, 0x01, 0x00, 0x30, 0x13, 0x06, 0x07, 0x2a, 0x86, 0x48, 409 0xce, 0x3d, 0x02, 0x01, 0x06, 0x08, 0x2a, 0x86, 0x48, 0xce, 0x3d, 0x03, 410 0x01, 0x07, 0x04, 0x27, 0x30, 0x25, 0x02, 0x01, 0x01, 0x04, 0x20, 411 ]); 412 413 const pkcs8 = new Uint8Array(pkcs8Prefix.length + 32); 414 pkcs8.set(pkcs8Prefix); 415 pkcs8.set(privateKeyBytes, pkcs8Prefix.length); 416 417 return crypto.subtle.importKey( 418 'pkcs8', 419 pkcs8, 420 { name: 'ECDSA', namedCurve: 'P-256' }, 421 false, 422 ['sign'], 423 ); 424} 425 426// P-256 curve order N 427const P256_N = BigInt( 428 '0xFFFFFFFF00000000FFFFFFFFFFFFFFFFBCE6FAADA7179E84F3B9CAC2FC632551', 429); 430const P256_N_DIV_2 = P256_N / 2n; 431 432function bytesToBigInt(bytes) { 433 let result = 0n; 434 for (const byte of bytes) { 435 result = (result << 8n) | BigInt(byte); 436 } 437 return result; 438} 439 440function bigIntToBytes(n, length) { 441 const bytes = new Uint8Array(length); 442 for (let i = length - 1; i >= 0; i--) { 443 bytes[i] = Number(n & 0xffn); 444 n >>= 8n; 445 } 446 return bytes; 447} 448 449/** 450 * Sign data with ECDSA P-256, returning low-S normalized signature 451 * @param {CryptoKey} privateKey - Web Crypto key from importPrivateKey 452 * @param {Uint8Array} data - Data to sign 453 * @returns {Promise<Uint8Array>} 64-byte signature (r || s) 454 */ 455export async function sign(privateKey, data) { 456 const signature = await crypto.subtle.sign( 457 { name: 'ECDSA', hash: 'SHA-256' }, 458 privateKey, 459 data, 460 ); 461 const sig = new Uint8Array(signature); 462 463 const r = sig.slice(0, 32); 464 const s = sig.slice(32, 64); 465 const sBigInt = bytesToBigInt(s); 466 467 // Low-S normalization: Bitcoin/ATProto require S <= N/2 to prevent 468 // signature malleability (two valid signatures for same message) 469 if (sBigInt > P256_N_DIV_2) { 470 const newS = P256_N - sBigInt; 471 const newSBytes = bigIntToBytes(newS, 32); 472 const normalized = new Uint8Array(64); 473 normalized.set(r, 0); 474 normalized.set(newSBytes, 32); 475 return normalized; 476 } 477 478 return sig; 479} 480 481/** 482 * Generate a new P-256 key pair 483 * @returns {Promise<{privateKey: Uint8Array, publicKey: Uint8Array}>} 32-byte private key, 33-byte compressed public key 484 */ 485export async function generateKeyPair() { 486 const keyPair = await crypto.subtle.generateKey( 487 { name: 'ECDSA', namedCurve: 'P-256' }, 488 true, 489 ['sign', 'verify'], 490 ); 491 492 // Export private key as raw bytes 493 const privateJwk = await crypto.subtle.exportKey('jwk', keyPair.privateKey); 494 const privateBytes = base64UrlDecode(privateJwk.d); 495 496 // Export public key as compressed point 497 const publicRaw = await crypto.subtle.exportKey('raw', keyPair.publicKey); 498 const publicBytes = new Uint8Array(publicRaw); 499 const compressed = compressPublicKey(publicBytes); 500 501 return { privateKey: privateBytes, publicKey: compressed }; 502} 503 504function compressPublicKey(uncompressed) { 505 // uncompressed is 65 bytes: 0x04 + x(32) + y(32) 506 // compressed is 33 bytes: prefix(02 or 03) + x(32) 507 const x = uncompressed.slice(1, 33); 508 const y = uncompressed.slice(33, 65); 509 const prefix = (y[31] & 1) === 0 ? 0x02 : 0x03; 510 const compressed = new Uint8Array(33); 511 compressed[0] = prefix; 512 compressed.set(x, 1); 513 return compressed; 514} 515 516/** 517 * Encode bytes as base64url string (no padding) 518 * @param {Uint8Array} bytes - Bytes to encode 519 * @returns {string} Base64url-encoded string 520 */ 521export function base64UrlEncode(bytes) { 522 let binary = ''; 523 for (const byte of bytes) { 524 binary += String.fromCharCode(byte); 525 } 526 const base64 = btoa(binary); 527 return base64.replace(/\+/g, '-').replace(/\//g, '_').replace(/=/g, ''); 528} 529 530/** 531 * Decode base64url string to bytes 532 * @param {string} str - Base64url-encoded string 533 * @returns {Uint8Array} Decoded bytes 534 */ 535export function base64UrlDecode(str) { 536 const base64 = str.replace(/-/g, '+').replace(/_/g, '/'); 537 const pad = base64.length % 4; 538 const padded = pad ? base64 + '='.repeat(4 - pad) : base64; 539 const binary = atob(padded); 540 const bytes = new Uint8Array(binary.length); 541 for (let i = 0; i < binary.length; i++) { 542 bytes[i] = binary.charCodeAt(i); 543 } 544 return bytes; 545} 546 547/** 548 * Create HMAC-SHA256 signature for JWT 549 * @param {string} data - Data to sign (header.payload) 550 * @param {string} secret - Secret key 551 * @returns {Promise<string>} Base64url-encoded signature 552 */ 553async function hmacSign(data, secret) { 554 const key = await crypto.subtle.importKey( 555 'raw', 556 new TextEncoder().encode(secret), 557 { name: 'HMAC', hash: 'SHA-256' }, 558 false, 559 ['sign'], 560 ); 561 const sig = await crypto.subtle.sign( 562 'HMAC', 563 key, 564 new TextEncoder().encode(data), 565 ); 566 return base64UrlEncode(new Uint8Array(sig)); 567} 568 569/** 570 * Create an access JWT for ATProto 571 * @param {string} did - User's DID (subject and audience) 572 * @param {string} secret - JWT signing secret 573 * @param {number} [expiresIn=7200] - Expiration in seconds (default 2 hours) 574 * @returns {Promise<string>} Signed JWT 575 */ 576export async function createAccessJwt(did, secret, expiresIn = 7200) { 577 const header = { typ: 'at+jwt', alg: 'HS256' }; 578 const now = Math.floor(Date.now() / 1000); 579 const payload = { 580 scope: 'com.atproto.access', 581 sub: did, 582 aud: did, 583 iat: now, 584 exp: now + expiresIn, 585 }; 586 587 const headerB64 = base64UrlEncode( 588 new TextEncoder().encode(JSON.stringify(header)), 589 ); 590 const payloadB64 = base64UrlEncode( 591 new TextEncoder().encode(JSON.stringify(payload)), 592 ); 593 const signature = await hmacSign(`${headerB64}.${payloadB64}`, secret); 594 595 return `${headerB64}.${payloadB64}.${signature}`; 596} 597 598/** 599 * Create a refresh JWT for ATProto 600 * @param {string} did - User's DID (subject and audience) 601 * @param {string} secret - JWT signing secret 602 * @param {number} [expiresIn=86400] - Expiration in seconds (default 24 hours) 603 * @returns {Promise<string>} Signed JWT 604 */ 605export async function createRefreshJwt(did, secret, expiresIn = 86400) { 606 const header = { typ: 'refresh+jwt', alg: 'HS256' }; 607 const now = Math.floor(Date.now() / 1000); 608 // Generate random jti (token ID) 609 const jtiBytes = new Uint8Array(32); 610 crypto.getRandomValues(jtiBytes); 611 const jti = base64UrlEncode(jtiBytes); 612 613 const payload = { 614 scope: 'com.atproto.refresh', 615 sub: did, 616 aud: did, 617 jti, 618 iat: now, 619 exp: now + expiresIn, 620 }; 621 622 const headerB64 = base64UrlEncode( 623 new TextEncoder().encode(JSON.stringify(header)), 624 ); 625 const payloadB64 = base64UrlEncode( 626 new TextEncoder().encode(JSON.stringify(payload)), 627 ); 628 const signature = await hmacSign(`${headerB64}.${payloadB64}`, secret); 629 630 return `${headerB64}.${payloadB64}.${signature}`; 631} 632 633/** 634 * Verify and decode a JWT (shared logic) 635 * @param {string} jwt - JWT string to verify 636 * @param {string} secret - JWT signing secret 637 * @param {string} expectedType - Expected token type (e.g., 'at+jwt', 'refresh+jwt') 638 * @returns {Promise<{header: Object, payload: Object}>} Decoded header and payload 639 * @throws {Error} If token is invalid, expired, or wrong type 640 */ 641async function verifyJwt(jwt, secret, expectedType) { 642 const parts = jwt.split('.'); 643 if (parts.length !== 3) { 644 throw new Error('Invalid JWT format'); 645 } 646 647 const [headerB64, payloadB64, signatureB64] = parts; 648 649 // Verify signature 650 const expectedSig = await hmacSign(`${headerB64}.${payloadB64}`, secret); 651 if (signatureB64 !== expectedSig) { 652 throw new Error('Invalid signature'); 653 } 654 655 // Decode header and payload 656 const header = JSON.parse( 657 new TextDecoder().decode(base64UrlDecode(headerB64)), 658 ); 659 const payload = JSON.parse( 660 new TextDecoder().decode(base64UrlDecode(payloadB64)), 661 ); 662 663 // Check token type 664 if (header.typ !== expectedType) { 665 throw new Error(`Invalid token type: expected ${expectedType}`); 666 } 667 668 // Check expiration 669 const now = Math.floor(Date.now() / 1000); 670 if (payload.exp && payload.exp < now) { 671 throw new Error('Token expired'); 672 } 673 674 return { header, payload }; 675} 676 677/** 678 * Verify and decode an access JWT 679 * @param {string} jwt - JWT string to verify 680 * @param {string} secret - JWT signing secret 681 * @returns {Promise<Object>} Decoded payload 682 * @throws {Error} If token is invalid, expired, or wrong type 683 */ 684export async function verifyAccessJwt(jwt, secret) { 685 const { payload } = await verifyJwt(jwt, secret, 'at+jwt'); 686 return payload; 687} 688 689/** 690 * Verify and decode a refresh JWT 691 * @param {string} jwt - JWT string to verify 692 * @param {string} secret - JWT signing secret 693 * @returns {Promise<Object>} Decoded payload 694 * @throws {Error} If token is invalid, expired, or wrong type 695 */ 696export async function verifyRefreshJwt(jwt, secret) { 697 const { payload } = await verifyJwt(jwt, secret, 'refresh+jwt'); 698 699 // Validate audience matches subject (token intended for this user) 700 if (payload.aud && payload.aud !== payload.sub) { 701 throw new Error('Invalid audience'); 702 } 703 704 return payload; 705} 706 707/** 708 * Create a service auth JWT signed with ES256 (P-256) 709 * Used for proxying requests to AppView 710 * @param {Object} params - JWT parameters 711 * @param {string} params.iss - Issuer DID (PDS DID) 712 * @param {string} params.aud - Audience DID (AppView DID) 713 * @param {string|null} params.lxm - Lexicon method being called 714 * @param {CryptoKey} params.signingKey - P-256 private key from importPrivateKey 715 * @returns {Promise<string>} Signed JWT 716 */ 717export async function createServiceJwt({ iss, aud, lxm, signingKey }) { 718 const header = { typ: 'JWT', alg: 'ES256' }; 719 const now = Math.floor(Date.now() / 1000); 720 721 // Generate random jti 722 const jtiBytes = new Uint8Array(16); 723 crypto.getRandomValues(jtiBytes); 724 const jti = bytesToHex(jtiBytes); 725 726 const payload = { 727 iss, 728 aud, 729 exp: now + 60, // 1 minute expiration 730 iat: now, 731 jti, 732 }; 733 if (lxm) payload.lxm = lxm; 734 735 const headerB64 = base64UrlEncode( 736 new TextEncoder().encode(JSON.stringify(header)), 737 ); 738 const payloadB64 = base64UrlEncode( 739 new TextEncoder().encode(JSON.stringify(payload)), 740 ); 741 const toSign = new TextEncoder().encode(`${headerB64}.${payloadB64}`); 742 743 const sig = await sign(signingKey, toSign); 744 const sigB64 = base64UrlEncode(sig); 745 746 return `${headerB64}.${payloadB64}.${sigB64}`; 747} 748 749/** 750 * Convert bytes to hexadecimal string 751 * @param {Uint8Array} bytes - Bytes to convert 752 * @returns {string} Hex string 753 */ 754export function bytesToHex(bytes) { 755 return Array.from(bytes) 756 .map((b) => b.toString(16).padStart(2, '0')) 757 .join(''); 758} 759 760/** 761 * Convert hexadecimal string to bytes 762 * @param {string} hex - Hex string 763 * @returns {Uint8Array} Decoded bytes 764 */ 765export function hexToBytes(hex) { 766 const bytes = new Uint8Array(hex.length / 2); 767 for (let i = 0; i < hex.length; i += 2) { 768 bytes[i / 2] = parseInt(hex.substr(i, 2), 16); 769 } 770 return bytes; 771} 772 773// === MERKLE SEARCH TREE === 774// ATProto-compliant MST implementation 775 776async function sha256(data) { 777 const hash = await crypto.subtle.digest('SHA-256', data); 778 return new Uint8Array(hash); 779} 780 781// Cache for key depths (SHA-256 is expensive) 782const keyDepthCache = new Map(); 783 784/** 785 * Get MST tree depth for a key based on leading zeros in SHA-256 hash 786 * @param {string} key - Record key (collection/rkey) 787 * @returns {Promise<number>} Tree depth (leading zeros / 2) 788 */ 789export async function getKeyDepth(key) { 790 // Count leading zeros in SHA-256 hash, divide by 2 791 if (keyDepthCache.has(key)) return keyDepthCache.get(key); 792 793 const keyBytes = new TextEncoder().encode(key); 794 const hash = await sha256(keyBytes); 795 796 let zeros = 0; 797 for (const byte of hash) { 798 if (byte === 0) { 799 zeros += 8; 800 } else { 801 // Count leading zeros in this byte 802 for (let i = 7; i >= 0; i--) { 803 if ((byte >> i) & 1) break; 804 zeros++; 805 } 806 break; 807 } 808 } 809 810 // MST depth = leading zeros in SHA-256 hash / 2 811 // This creates a probabilistic tree where ~50% of keys are at depth 0, 812 // ~25% at depth 1, etc., giving O(log n) lookups 813 const depth = Math.floor(zeros / 2); 814 keyDepthCache.set(key, depth); 815 return depth; 816} 817 818// Compute common prefix length between two byte arrays 819function commonPrefixLen(a, b) { 820 const minLen = Math.min(a.length, b.length); 821 for (let i = 0; i < minLen; i++) { 822 if (a[i] !== b[i]) return i; 823 } 824 return minLen; 825} 826 827class MST { 828 constructor(sql) { 829 this.sql = sql; 830 } 831 832 async computeRoot() { 833 const records = this.sql 834 .exec(` 835 SELECT collection, rkey, cid FROM records ORDER BY collection, rkey 836 `) 837 .toArray(); 838 839 if (records.length === 0) { 840 return null; 841 } 842 843 // Build entries with pre-computed depths (heights) 844 // In ATProto MST, "height" determines which layer a key belongs to 845 // Layer 0 is at the BOTTOM, root is at the highest layer 846 const entries = []; 847 let maxDepth = 0; 848 for (const r of records) { 849 const key = `${r.collection}/${r.rkey}`; 850 const depth = await getKeyDepth(key); 851 maxDepth = Math.max(maxDepth, depth); 852 entries.push({ 853 key, 854 keyBytes: new TextEncoder().encode(key), 855 cid: r.cid, 856 depth, 857 }); 858 } 859 860 // Start building from the root (highest layer) going down to layer 0 861 return this.buildTree(entries, maxDepth); 862 } 863 864 async buildTree(entries, layer) { 865 if (entries.length === 0) return null; 866 867 // Separate entries for this layer vs lower layers (subtrees) 868 // Keys with depth == layer stay at this node 869 // Keys with depth < layer go into subtrees (going down toward layer 0) 870 const thisLayer = []; 871 let leftSubtree = []; 872 873 for (const entry of entries) { 874 if (entry.depth < layer) { 875 // This entry belongs to a lower layer - accumulate for subtree 876 leftSubtree.push(entry); 877 } else { 878 // This entry belongs at current layer (depth == layer) 879 // Process accumulated left subtree first 880 if (leftSubtree.length > 0) { 881 const leftCid = await this.buildTree(leftSubtree, layer - 1); 882 thisLayer.push({ type: 'subtree', cid: leftCid }); 883 leftSubtree = []; 884 } 885 thisLayer.push({ type: 'entry', entry }); 886 } 887 } 888 889 // Handle remaining left subtree 890 if (leftSubtree.length > 0) { 891 const leftCid = await this.buildTree(leftSubtree, layer - 1); 892 thisLayer.push({ type: 'subtree', cid: leftCid }); 893 } 894 895 // Build node with proper ATProto format 896 const node = { e: [] }; 897 let leftCid = null; 898 let prevKeyBytes = new Uint8Array(0); 899 900 for (let i = 0; i < thisLayer.length; i++) { 901 const item = thisLayer[i]; 902 903 if (item.type === 'subtree') { 904 if (node.e.length === 0) { 905 leftCid = item.cid; 906 } else { 907 // Attach to previous entry's 't' field 908 node.e[node.e.length - 1].t = new CID(cidToBytes(item.cid)); 909 } 910 } else { 911 // Entry - compute prefix compression 912 const keyBytes = item.entry.keyBytes; 913 const prefixLen = commonPrefixLen(prevKeyBytes, keyBytes); 914 const keySuffix = keyBytes.slice(prefixLen); 915 916 // ATProto requires t field to be present (can be null) 917 const e = { 918 p: prefixLen, 919 k: keySuffix, 920 v: new CID(cidToBytes(item.entry.cid)), 921 t: null, // Will be updated if there's a subtree 922 }; 923 924 node.e.push(e); 925 prevKeyBytes = keyBytes; 926 } 927 } 928 929 // ATProto requires l field to be present (can be null) 930 node.l = leftCid ? new CID(cidToBytes(leftCid)) : null; 931 932 // Encode node with proper MST CBOR format 933 const nodeBytes = cborEncodeDagCbor(node); 934 const nodeCid = await createCid(nodeBytes); 935 const cidStr = cidToString(nodeCid); 936 937 this.sql.exec( 938 `INSERT OR REPLACE INTO blocks (cid, data) VALUES (?, ?)`, 939 cidStr, 940 nodeBytes, 941 ); 942 943 return cidStr; 944 } 945} 946 947// === CAR FILE BUILDER === 948 949/** 950 * Encode integer as unsigned varint 951 * @param {number} n - Non-negative integer 952 * @returns {Uint8Array} Varint-encoded bytes 953 */ 954export function varint(n) { 955 const bytes = []; 956 while (n >= 0x80) { 957 bytes.push((n & 0x7f) | 0x80); 958 n >>>= 7; 959 } 960 bytes.push(n); 961 return new Uint8Array(bytes); 962} 963 964/** 965 * Convert base32lower CID string to raw bytes 966 * @param {string} cidStr - CID string with 'b' prefix 967 * @returns {Uint8Array} CID bytes 968 */ 969export function cidToBytes(cidStr) { 970 // Decode base32lower CID string to bytes 971 if (!cidStr.startsWith('b')) throw new Error('expected base32lower CID'); 972 return base32Decode(cidStr.slice(1)); 973} 974 975/** 976 * Decode base32lower string to bytes 977 * @param {string} str - Base32lower-encoded string 978 * @returns {Uint8Array} Decoded bytes 979 */ 980export function base32Decode(str) { 981 const alphabet = 'abcdefghijklmnopqrstuvwxyz234567'; 982 let bits = 0; 983 let value = 0; 984 const output = []; 985 986 for (const char of str) { 987 const idx = alphabet.indexOf(char); 988 if (idx === -1) continue; 989 value = (value << 5) | idx; 990 bits += 5; 991 if (bits >= 8) { 992 bits -= 8; 993 output.push((value >> bits) & 0xff); 994 } 995 } 996 997 return new Uint8Array(output); 998} 999 1000/** 1001 * Build a CAR (Content Addressable aRchive) file 1002 * @param {string} rootCid - Root CID string 1003 * @param {Array<{cid: string, data: Uint8Array}>} blocks - Blocks to include 1004 * @returns {Uint8Array} CAR file bytes 1005 */ 1006export function buildCarFile(rootCid, blocks) { 1007 const parts = []; 1008 1009 // Header: { version: 1, roots: [rootCid] } 1010 const rootCidBytes = cidToBytes(rootCid); 1011 const header = cborEncodeDagCbor({ 1012 version: 1, 1013 roots: [new CID(rootCidBytes)], 1014 }); 1015 parts.push(varint(header.length)); 1016 parts.push(header); 1017 1018 // Blocks: varint(len) + cid + data 1019 for (const block of blocks) { 1020 const cidBytes = cidToBytes(block.cid); 1021 const blockLen = cidBytes.length + block.data.length; 1022 parts.push(varint(blockLen)); 1023 parts.push(cidBytes); 1024 parts.push(block.data); 1025 } 1026 1027 // Concatenate all parts 1028 const totalLen = parts.reduce((sum, p) => sum + p.length, 0); 1029 const car = new Uint8Array(totalLen); 1030 let offset = 0; 1031 for (const part of parts) { 1032 car.set(part, offset); 1033 offset += part.length; 1034 } 1035 1036 return car; 1037} 1038 1039/** 1040 * Route handler function type 1041 * @callback RouteHandler 1042 * @param {PersonalDataServer} pds - PDS instance 1043 * @param {Request} request - HTTP request 1044 * @param {URL} url - Parsed URL 1045 * @returns {Promise<Response>} HTTP response 1046 */ 1047 1048/** 1049 * @typedef {Object} Route 1050 * @property {string} [method] - Required HTTP method (default: any) 1051 * @property {RouteHandler} handler - Handler function 1052 */ 1053 1054/** @type {Record<string, Route>} */ 1055const pdsRoutes = { 1056 '/.well-known/atproto-did': { 1057 handler: (pds, _req, _url) => pds.handleAtprotoDid(), 1058 }, 1059 '/init': { 1060 method: 'POST', 1061 handler: (pds, req, _url) => pds.handleInit(req), 1062 }, 1063 '/status': { 1064 handler: (pds, _req, _url) => pds.handleStatus(), 1065 }, 1066 '/reset-repo': { 1067 handler: (pds, _req, _url) => pds.handleResetRepo(), 1068 }, 1069 '/forward-event': { 1070 handler: (pds, req, _url) => pds.handleForwardEvent(req), 1071 }, 1072 '/register-did': { 1073 handler: (pds, req, _url) => pds.handleRegisterDid(req), 1074 }, 1075 '/get-registered-dids': { 1076 handler: (pds, _req, _url) => pds.handleGetRegisteredDids(), 1077 }, 1078 '/register-handle': { 1079 method: 'POST', 1080 handler: (pds, req, _url) => pds.handleRegisterHandle(req), 1081 }, 1082 '/resolve-handle': { 1083 handler: (pds, _req, url) => pds.handleResolveHandle(url), 1084 }, 1085 '/repo-info': { 1086 handler: (pds, _req, _url) => pds.handleRepoInfo(), 1087 }, 1088 '/xrpc/com.atproto.server.describeServer': { 1089 handler: (pds, req, _url) => pds.handleDescribeServer(req), 1090 }, 1091 '/xrpc/com.atproto.server.createSession': { 1092 method: 'POST', 1093 handler: (pds, req, _url) => pds.handleCreateSession(req), 1094 }, 1095 '/xrpc/com.atproto.server.getSession': { 1096 handler: (pds, req, _url) => pds.handleGetSession(req), 1097 }, 1098 '/xrpc/com.atproto.server.refreshSession': { 1099 method: 'POST', 1100 handler: (pds, req, _url) => pds.handleRefreshSession(req), 1101 }, 1102 '/xrpc/app.bsky.actor.getPreferences': { 1103 handler: (pds, req, _url) => pds.handleGetPreferences(req), 1104 }, 1105 '/xrpc/app.bsky.actor.putPreferences': { 1106 method: 'POST', 1107 handler: (pds, req, _url) => pds.handlePutPreferences(req), 1108 }, 1109 '/xrpc/com.atproto.sync.listRepos': { 1110 handler: (pds, _req, _url) => pds.handleListRepos(), 1111 }, 1112 '/xrpc/com.atproto.repo.createRecord': { 1113 method: 'POST', 1114 handler: (pds, req, _url) => pds.handleCreateRecord(req), 1115 }, 1116 '/xrpc/com.atproto.repo.deleteRecord': { 1117 method: 'POST', 1118 handler: (pds, req, _url) => pds.handleDeleteRecord(req), 1119 }, 1120 '/xrpc/com.atproto.repo.putRecord': { 1121 method: 'POST', 1122 handler: (pds, req, _url) => pds.handlePutRecord(req), 1123 }, 1124 '/xrpc/com.atproto.repo.applyWrites': { 1125 method: 'POST', 1126 handler: (pds, req, _url) => pds.handleApplyWrites(req), 1127 }, 1128 '/xrpc/com.atproto.repo.getRecord': { 1129 handler: (pds, _req, url) => pds.handleGetRecord(url), 1130 }, 1131 '/xrpc/com.atproto.repo.describeRepo': { 1132 handler: (pds, _req, _url) => pds.handleDescribeRepo(), 1133 }, 1134 '/xrpc/com.atproto.repo.listRecords': { 1135 handler: (pds, _req, url) => pds.handleListRecords(url), 1136 }, 1137 '/xrpc/com.atproto.sync.getLatestCommit': { 1138 handler: (pds, _req, _url) => pds.handleGetLatestCommit(), 1139 }, 1140 '/xrpc/com.atproto.sync.getRepoStatus': { 1141 handler: (pds, _req, _url) => pds.handleGetRepoStatus(), 1142 }, 1143 '/xrpc/com.atproto.sync.getRepo': { 1144 handler: (pds, _req, _url) => pds.handleGetRepo(), 1145 }, 1146 '/xrpc/com.atproto.sync.getRecord': { 1147 handler: (pds, _req, url) => pds.handleSyncGetRecord(url), 1148 }, 1149 '/xrpc/com.atproto.sync.subscribeRepos': { 1150 handler: (pds, req, url) => pds.handleSubscribeRepos(req, url), 1151 }, 1152}; 1153 1154export class PersonalDataServer { 1155 constructor(state, env) { 1156 this.state = state; 1157 this.sql = state.storage.sql; 1158 this.env = env; 1159 1160 // Initialize schema 1161 this.sql.exec(` 1162 CREATE TABLE IF NOT EXISTS blocks ( 1163 cid TEXT PRIMARY KEY, 1164 data BLOB NOT NULL 1165 ); 1166 1167 CREATE TABLE IF NOT EXISTS records ( 1168 uri TEXT PRIMARY KEY, 1169 cid TEXT NOT NULL, 1170 collection TEXT NOT NULL, 1171 rkey TEXT NOT NULL, 1172 value BLOB NOT NULL 1173 ); 1174 1175 CREATE TABLE IF NOT EXISTS commits ( 1176 seq INTEGER PRIMARY KEY AUTOINCREMENT, 1177 cid TEXT NOT NULL, 1178 rev TEXT NOT NULL, 1179 prev TEXT 1180 ); 1181 1182 CREATE TABLE IF NOT EXISTS seq_events ( 1183 seq INTEGER PRIMARY KEY AUTOINCREMENT, 1184 did TEXT NOT NULL, 1185 commit_cid TEXT NOT NULL, 1186 evt BLOB NOT NULL 1187 ); 1188 1189 CREATE INDEX IF NOT EXISTS idx_records_collection ON records(collection, rkey); 1190 `); 1191 } 1192 1193 async initIdentity(did, privateKeyHex, handle = null) { 1194 await this.state.storage.put('did', did); 1195 await this.state.storage.put('privateKey', privateKeyHex); 1196 if (handle) { 1197 await this.state.storage.put('handle', handle); 1198 } 1199 } 1200 1201 async getDid() { 1202 if (!this._did) { 1203 this._did = await this.state.storage.get('did'); 1204 } 1205 return this._did; 1206 } 1207 1208 async getHandle() { 1209 return this.state.storage.get('handle'); 1210 } 1211 1212 async getSigningKey() { 1213 const hex = await this.state.storage.get('privateKey'); 1214 if (!hex) return null; 1215 return importPrivateKey(hexToBytes(hex)); 1216 } 1217 1218 // Collect MST node blocks for a given root CID 1219 collectMstBlocks(rootCidStr) { 1220 const blocks = []; 1221 const visited = new Set(); 1222 1223 const collect = (cidStr) => { 1224 if (visited.has(cidStr)) return; 1225 visited.add(cidStr); 1226 1227 const rows = this.sql 1228 .exec(`SELECT data FROM blocks WHERE cid = ?`, cidStr) 1229 .toArray(); 1230 if (rows.length === 0) return; 1231 1232 const data = new Uint8Array(rows[0].data); 1233 blocks.push({ cid: cidStr, data }); // Keep as string, buildCarFile will convert 1234 1235 // Decode and follow child CIDs (MST nodes have 'l' and 'e' with 't' subtrees) 1236 try { 1237 const node = cborDecode(data); 1238 if (node.l) collect(cidToString(node.l)); 1239 if (node.e) { 1240 for (const entry of node.e) { 1241 if (entry.t) collect(cidToString(entry.t)); 1242 } 1243 } 1244 } catch (_e) { 1245 // Not an MST node, ignore 1246 } 1247 }; 1248 1249 collect(rootCidStr); 1250 return blocks; 1251 } 1252 1253 async createRecord(collection, record, rkey = null) { 1254 const did = await this.getDid(); 1255 if (!did) throw new Error('PDS not initialized'); 1256 1257 rkey = rkey || createTid(); 1258 const uri = `at://${did}/${collection}/${rkey}`; 1259 1260 // Encode and hash record (must use DAG-CBOR for proper key ordering) 1261 const recordBytes = cborEncodeDagCbor(record); 1262 const recordCid = await createCid(recordBytes); 1263 const recordCidStr = cidToString(recordCid); 1264 1265 // Store block 1266 this.sql.exec( 1267 `INSERT OR REPLACE INTO blocks (cid, data) VALUES (?, ?)`, 1268 recordCidStr, 1269 recordBytes, 1270 ); 1271 1272 // Store record index 1273 this.sql.exec( 1274 `INSERT OR REPLACE INTO records (uri, cid, collection, rkey, value) VALUES (?, ?, ?, ?, ?)`, 1275 uri, 1276 recordCidStr, 1277 collection, 1278 rkey, 1279 recordBytes, 1280 ); 1281 1282 // Rebuild MST 1283 const mst = new MST(this.sql); 1284 const dataRoot = await mst.computeRoot(); 1285 1286 // Get previous commit 1287 const prevCommits = this.sql 1288 .exec(`SELECT cid, rev FROM commits ORDER BY seq DESC LIMIT 1`) 1289 .toArray(); 1290 const prevCommit = prevCommits.length > 0 ? prevCommits[0] : null; 1291 1292 // Create commit 1293 const rev = createTid(); 1294 // Build commit with CIDs wrapped in CID class (for dag-cbor tag 42 encoding) 1295 const commit = { 1296 did, 1297 version: 3, 1298 data: new CID(cidToBytes(dataRoot)), // CID wrapped for explicit encoding 1299 rev, 1300 prev: prevCommit?.cid ? new CID(cidToBytes(prevCommit.cid)) : null, 1301 }; 1302 1303 // Sign commit (using dag-cbor encoder for CIDs) 1304 const commitBytes = cborEncodeDagCbor(commit); 1305 const signingKey = await this.getSigningKey(); 1306 const sig = await sign(signingKey, commitBytes); 1307 1308 const signedCommit = { ...commit, sig }; 1309 const signedBytes = cborEncodeDagCbor(signedCommit); 1310 const commitCid = await createCid(signedBytes); 1311 const commitCidStr = cidToString(commitCid); 1312 1313 // Store commit block 1314 this.sql.exec( 1315 `INSERT OR REPLACE INTO blocks (cid, data) VALUES (?, ?)`, 1316 commitCidStr, 1317 signedBytes, 1318 ); 1319 1320 // Store commit reference 1321 this.sql.exec( 1322 `INSERT INTO commits (cid, rev, prev) VALUES (?, ?, ?)`, 1323 commitCidStr, 1324 rev, 1325 prevCommit?.cid || null, 1326 ); 1327 1328 // Update head and rev for listRepos 1329 await this.state.storage.put('head', commitCidStr); 1330 await this.state.storage.put('rev', rev); 1331 1332 // Collect blocks for the event (record + commit + MST nodes) 1333 // Build a mini CAR with just the new blocks - use string CIDs 1334 const newBlocks = []; 1335 // Add record block 1336 newBlocks.push({ cid: recordCidStr, data: recordBytes }); 1337 // Add commit block 1338 newBlocks.push({ cid: commitCidStr, data: signedBytes }); 1339 // Add MST node blocks (get all blocks referenced by commit.data) 1340 const mstBlocks = this.collectMstBlocks(dataRoot); 1341 newBlocks.push(...mstBlocks); 1342 1343 // Sequence event with blocks - store complete event data including rev and time 1344 // blocks must be a full CAR file with header (roots = [commitCid]) 1345 const eventTime = new Date().toISOString(); 1346 const evt = cborEncode({ 1347 ops: [ 1348 { action: 'create', path: `${collection}/${rkey}`, cid: recordCidStr }, 1349 ], 1350 blocks: buildCarFile(commitCidStr, newBlocks), // Full CAR with header 1351 rev, // Store the actual commit revision 1352 time: eventTime, // Store the actual event time 1353 }); 1354 this.sql.exec( 1355 `INSERT INTO seq_events (did, commit_cid, evt) VALUES (?, ?, ?)`, 1356 did, 1357 commitCidStr, 1358 evt, 1359 ); 1360 1361 // Broadcast to subscribers (both local and via default DO for relay) 1362 const evtRows = this.sql 1363 .exec(`SELECT * FROM seq_events ORDER BY seq DESC LIMIT 1`) 1364 .toArray(); 1365 if (evtRows.length > 0) { 1366 this.broadcastEvent(evtRows[0]); 1367 // Also forward to default DO for relay subscribers 1368 if (this.env?.PDS) { 1369 const defaultId = this.env.PDS.idFromName('default'); 1370 const defaultPds = this.env.PDS.get(defaultId); 1371 // Convert ArrayBuffer to array for JSON serialization 1372 const row = evtRows[0]; 1373 const evtArray = Array.from(new Uint8Array(row.evt)); 1374 // Fire and forget but log errors 1375 defaultPds 1376 .fetch( 1377 new Request('http://internal/forward-event', { 1378 method: 'POST', 1379 body: JSON.stringify({ ...row, evt: evtArray }), 1380 }), 1381 ) 1382 .then((r) => r.json()) 1383 .then((r) => console.log('forward result:', r)) 1384 .catch((e) => console.log('forward error:', e)); 1385 } 1386 } 1387 1388 return { uri, cid: recordCidStr, commit: commitCidStr }; 1389 } 1390 1391 async deleteRecord(collection, rkey) { 1392 const did = await this.getDid(); 1393 if (!did) throw new Error('PDS not initialized'); 1394 1395 const uri = `at://${did}/${collection}/${rkey}`; 1396 1397 // Check if record exists 1398 const existing = this.sql 1399 .exec(`SELECT cid FROM records WHERE uri = ?`, uri) 1400 .toArray(); 1401 if (existing.length === 0) { 1402 return { error: 'RecordNotFound', message: 'record not found' }; 1403 } 1404 1405 // Delete from records table 1406 this.sql.exec(`DELETE FROM records WHERE uri = ?`, uri); 1407 1408 // Rebuild MST 1409 const mst = new MST(this.sql); 1410 const dataRoot = await mst.computeRoot(); 1411 1412 // Get previous commit 1413 const prevCommits = this.sql 1414 .exec(`SELECT cid, rev FROM commits ORDER BY seq DESC LIMIT 1`) 1415 .toArray(); 1416 const prevCommit = prevCommits.length > 0 ? prevCommits[0] : null; 1417 1418 // Create commit 1419 const rev = createTid(); 1420 const commit = { 1421 did, 1422 version: 3, 1423 data: dataRoot ? new CID(cidToBytes(dataRoot)) : null, 1424 rev, 1425 prev: prevCommit?.cid ? new CID(cidToBytes(prevCommit.cid)) : null, 1426 }; 1427 1428 // Sign commit 1429 const commitBytes = cborEncodeDagCbor(commit); 1430 const signingKey = await this.getSigningKey(); 1431 const sig = await sign(signingKey, commitBytes); 1432 1433 const signedCommit = { ...commit, sig }; 1434 const signedBytes = cborEncodeDagCbor(signedCommit); 1435 const commitCid = await createCid(signedBytes); 1436 const commitCidStr = cidToString(commitCid); 1437 1438 // Store commit block 1439 this.sql.exec( 1440 `INSERT OR REPLACE INTO blocks (cid, data) VALUES (?, ?)`, 1441 commitCidStr, 1442 signedBytes, 1443 ); 1444 1445 // Store commit reference 1446 this.sql.exec( 1447 `INSERT INTO commits (cid, rev, prev) VALUES (?, ?, ?)`, 1448 commitCidStr, 1449 rev, 1450 prevCommit?.cid || null, 1451 ); 1452 1453 // Update head and rev 1454 await this.state.storage.put('head', commitCidStr); 1455 await this.state.storage.put('rev', rev); 1456 1457 // Collect blocks for the event (commit + MST nodes, no record block) 1458 const newBlocks = []; 1459 newBlocks.push({ cid: commitCidStr, data: signedBytes }); 1460 if (dataRoot) { 1461 const mstBlocks = this.collectMstBlocks(dataRoot); 1462 newBlocks.push(...mstBlocks); 1463 } 1464 1465 // Sequence event with delete action 1466 const eventTime = new Date().toISOString(); 1467 const evt = cborEncode({ 1468 ops: [{ action: 'delete', path: `${collection}/${rkey}`, cid: null }], 1469 blocks: buildCarFile(commitCidStr, newBlocks), 1470 rev, 1471 time: eventTime, 1472 }); 1473 this.sql.exec( 1474 `INSERT INTO seq_events (did, commit_cid, evt) VALUES (?, ?, ?)`, 1475 did, 1476 commitCidStr, 1477 evt, 1478 ); 1479 1480 // Broadcast to subscribers 1481 const evtRows = this.sql 1482 .exec(`SELECT * FROM seq_events ORDER BY seq DESC LIMIT 1`) 1483 .toArray(); 1484 if (evtRows.length > 0) { 1485 this.broadcastEvent(evtRows[0]); 1486 // Forward to default DO for relay subscribers 1487 if (this.env?.PDS) { 1488 const defaultId = this.env.PDS.idFromName('default'); 1489 const defaultPds = this.env.PDS.get(defaultId); 1490 const row = evtRows[0]; 1491 const evtArray = Array.from(new Uint8Array(row.evt)); 1492 defaultPds 1493 .fetch( 1494 new Request('http://internal/forward-event', { 1495 method: 'POST', 1496 body: JSON.stringify({ ...row, evt: evtArray }), 1497 }), 1498 ) 1499 .catch((e) => console.log('forward error:', e)); 1500 } 1501 } 1502 1503 return { ok: true }; 1504 } 1505 1506 formatEvent(evt) { 1507 // AT Protocol frame format: header + body 1508 // Use DAG-CBOR encoding for body (CIDs need tag 42 + 0x00 prefix) 1509 const header = cborEncode({ op: 1, t: '#commit' }); 1510 1511 // Decode stored event to get ops, blocks, rev, and time 1512 const evtData = cborDecode(new Uint8Array(evt.evt)); 1513 const ops = evtData.ops.map((op) => ({ 1514 ...op, 1515 cid: op.cid ? new CID(cidToBytes(op.cid)) : null, // Wrap in CID class for tag 42 encoding 1516 })); 1517 // Get blocks from stored event (already in CAR format) 1518 const blocks = evtData.blocks || new Uint8Array(0); 1519 1520 const body = cborEncodeDagCbor({ 1521 seq: evt.seq, 1522 rebase: false, 1523 tooBig: false, 1524 repo: evt.did, 1525 commit: new CID(cidToBytes(evt.commit_cid)), // Wrap in CID class for tag 42 encoding 1526 rev: evtData.rev, // Use stored rev from commit creation 1527 since: null, 1528 blocks: blocks instanceof Uint8Array ? blocks : new Uint8Array(blocks), 1529 ops, 1530 blobs: [], 1531 time: evtData.time, // Use stored time from event creation 1532 }); 1533 1534 // Concatenate header + body 1535 const frame = new Uint8Array(header.length + body.length); 1536 frame.set(header); 1537 frame.set(body, header.length); 1538 return frame; 1539 } 1540 1541 async webSocketMessage(ws, message) { 1542 // Handle ping 1543 if (message === 'ping') ws.send('pong'); 1544 } 1545 1546 async webSocketClose(_ws, _code, _reason) { 1547 // Durable Object will hibernate when no connections remain 1548 } 1549 1550 broadcastEvent(evt) { 1551 const frame = this.formatEvent(evt); 1552 for (const ws of this.state.getWebSockets()) { 1553 try { 1554 ws.send(frame); 1555 } catch (_e) { 1556 // Client disconnected 1557 } 1558 } 1559 } 1560 1561 async handleAtprotoDid() { 1562 let did = await this.getDid(); 1563 if (!did) { 1564 const registeredDids = 1565 (await this.state.storage.get('registeredDids')) || []; 1566 did = registeredDids[0]; 1567 } 1568 if (!did) { 1569 return new Response('User not found', { status: 404 }); 1570 } 1571 return new Response(did, { headers: { 'Content-Type': 'text/plain' } }); 1572 } 1573 1574 async handleInit(request) { 1575 const body = await request.json(); 1576 if (!body.did || !body.privateKey) { 1577 return errorResponse('InvalidRequest', 'missing did or privateKey', 400); 1578 } 1579 await this.initIdentity(body.did, body.privateKey, body.handle || null); 1580 return Response.json({ 1581 ok: true, 1582 did: body.did, 1583 handle: body.handle || null, 1584 }); 1585 } 1586 1587 async handleStatus() { 1588 const did = await this.getDid(); 1589 return Response.json({ initialized: !!did, did: did || null }); 1590 } 1591 1592 async handleResetRepo() { 1593 this.sql.exec(`DELETE FROM blocks`); 1594 this.sql.exec(`DELETE FROM records`); 1595 this.sql.exec(`DELETE FROM commits`); 1596 this.sql.exec(`DELETE FROM seq_events`); 1597 await this.state.storage.delete('head'); 1598 await this.state.storage.delete('rev'); 1599 return Response.json({ ok: true, message: 'repo data cleared' }); 1600 } 1601 1602 async handleForwardEvent(request) { 1603 const evt = await request.json(); 1604 const numSockets = [...this.state.getWebSockets()].length; 1605 console.log( 1606 `forward-event: received event seq=${evt.seq}, ${numSockets} connected sockets`, 1607 ); 1608 this.broadcastEvent({ 1609 seq: evt.seq, 1610 did: evt.did, 1611 commit_cid: evt.commit_cid, 1612 evt: new Uint8Array(Object.values(evt.evt)), 1613 }); 1614 return Response.json({ ok: true, sockets: numSockets }); 1615 } 1616 1617 async handleRegisterDid(request) { 1618 const body = await request.json(); 1619 const registeredDids = 1620 (await this.state.storage.get('registeredDids')) || []; 1621 if (!registeredDids.includes(body.did)) { 1622 registeredDids.push(body.did); 1623 await this.state.storage.put('registeredDids', registeredDids); 1624 } 1625 return Response.json({ ok: true }); 1626 } 1627 1628 async handleGetRegisteredDids() { 1629 const registeredDids = 1630 (await this.state.storage.get('registeredDids')) || []; 1631 return Response.json({ dids: registeredDids }); 1632 } 1633 1634 async handleRegisterHandle(request) { 1635 const body = await request.json(); 1636 const { handle, did } = body; 1637 if (!handle || !did) { 1638 return errorResponse('InvalidRequest', 'missing handle or did', 400); 1639 } 1640 const handleMap = (await this.state.storage.get('handleMap')) || {}; 1641 handleMap[handle] = did; 1642 await this.state.storage.put('handleMap', handleMap); 1643 return Response.json({ ok: true }); 1644 } 1645 1646 async handleResolveHandle(url) { 1647 const handle = url.searchParams.get('handle'); 1648 if (!handle) { 1649 return errorResponse('InvalidRequest', 'missing handle', 400); 1650 } 1651 const handleMap = (await this.state.storage.get('handleMap')) || {}; 1652 const did = handleMap[handle]; 1653 if (!did) { 1654 return errorResponse('NotFound', 'handle not found', 404); 1655 } 1656 return Response.json({ did }); 1657 } 1658 1659 async handleRepoInfo() { 1660 const head = await this.state.storage.get('head'); 1661 const rev = await this.state.storage.get('rev'); 1662 return Response.json({ head: head || null, rev: rev || null }); 1663 } 1664 1665 handleDescribeServer(request) { 1666 const hostname = request.headers.get('x-hostname') || 'localhost'; 1667 return Response.json({ 1668 did: `did:web:${hostname}`, 1669 availableUserDomains: [`.${hostname}`], 1670 inviteCodeRequired: false, 1671 phoneVerificationRequired: false, 1672 links: {}, 1673 contact: {}, 1674 }); 1675 } 1676 1677 async handleCreateSession(request) { 1678 const body = await request.json(); 1679 const { identifier, password } = body; 1680 1681 if (!identifier || !password) { 1682 return errorResponse( 1683 'InvalidRequest', 1684 'Missing identifier or password', 1685 400, 1686 ); 1687 } 1688 1689 // Check password against env var 1690 const expectedPassword = this.env?.PDS_PASSWORD; 1691 if (!expectedPassword || password !== expectedPassword) { 1692 return errorResponse( 1693 'AuthRequired', 1694 'Invalid identifier or password', 1695 401, 1696 ); 1697 } 1698 1699 // Resolve identifier to DID 1700 let did = identifier; 1701 if (!identifier.startsWith('did:')) { 1702 // Try to resolve handle 1703 const handleMap = (await this.state.storage.get('handleMap')) || {}; 1704 did = handleMap[identifier]; 1705 if (!did) { 1706 return errorResponse('InvalidRequest', 'Unable to resolve handle', 400); 1707 } 1708 } 1709 1710 // Get handle for response 1711 const handle = await this.getHandleForDid(did); 1712 1713 // Create tokens 1714 const jwtSecret = this.env?.JWT_SECRET; 1715 if (!jwtSecret) { 1716 return errorResponse( 1717 'InternalServerError', 1718 'Server not configured for authentication', 1719 500, 1720 ); 1721 } 1722 1723 const accessJwt = await createAccessJwt(did, jwtSecret); 1724 const refreshJwt = await createRefreshJwt(did, jwtSecret); 1725 1726 return Response.json({ 1727 accessJwt, 1728 refreshJwt, 1729 handle: handle || did, 1730 did, 1731 active: true, 1732 }); 1733 } 1734 1735 async handleGetSession(request) { 1736 const authHeader = request.headers.get('Authorization'); 1737 if (!authHeader || !authHeader.startsWith('Bearer ')) { 1738 return errorResponse( 1739 'AuthRequired', 1740 'Missing or invalid authorization header', 1741 401, 1742 ); 1743 } 1744 1745 const token = authHeader.slice(7); // Remove 'Bearer ' 1746 const jwtSecret = this.env?.JWT_SECRET; 1747 if (!jwtSecret) { 1748 return errorResponse( 1749 'InternalServerError', 1750 'Server not configured for authentication', 1751 500, 1752 ); 1753 } 1754 1755 try { 1756 const payload = await verifyAccessJwt(token, jwtSecret); 1757 const did = payload.sub; 1758 const handle = await this.getHandleForDid(did); 1759 1760 return Response.json({ 1761 handle: handle || did, 1762 did, 1763 active: true, 1764 }); 1765 } catch (err) { 1766 return errorResponse('InvalidToken', err.message, 401); 1767 } 1768 } 1769 1770 async handleRefreshSession(request) { 1771 const authHeader = request.headers.get('Authorization'); 1772 if (!authHeader || !authHeader.startsWith('Bearer ')) { 1773 return errorResponse( 1774 'AuthRequired', 1775 'Missing or invalid authorization header', 1776 401, 1777 ); 1778 } 1779 1780 const token = authHeader.slice(7); // Remove 'Bearer ' 1781 const jwtSecret = this.env?.JWT_SECRET; 1782 if (!jwtSecret) { 1783 return errorResponse( 1784 'InternalServerError', 1785 'Server not configured for authentication', 1786 500, 1787 ); 1788 } 1789 1790 try { 1791 const payload = await verifyRefreshJwt(token, jwtSecret); 1792 const did = payload.sub; 1793 const handle = await this.getHandleForDid(did); 1794 1795 // Issue fresh tokens 1796 const accessJwt = await createAccessJwt(did, jwtSecret); 1797 const refreshJwt = await createRefreshJwt(did, jwtSecret); 1798 1799 return Response.json({ 1800 accessJwt, 1801 refreshJwt, 1802 handle: handle || did, 1803 did, 1804 active: true, 1805 }); 1806 } catch (err) { 1807 if (err.message === 'Token expired') { 1808 return errorResponse('ExpiredToken', 'Refresh token has expired', 400); 1809 } 1810 return errorResponse('InvalidToken', err.message, 400); 1811 } 1812 } 1813 1814 async handleGetPreferences(_request) { 1815 // Preferences are stored per-user in their DO 1816 const preferences = (await this.state.storage.get('preferences')) || []; 1817 return Response.json({ preferences }); 1818 } 1819 1820 async handlePutPreferences(request) { 1821 const body = await request.json(); 1822 const { preferences } = body; 1823 if (!Array.isArray(preferences)) { 1824 return errorResponse( 1825 'InvalidRequest', 1826 'preferences must be an array', 1827 400, 1828 ); 1829 } 1830 await this.state.storage.put('preferences', preferences); 1831 return Response.json({}); 1832 } 1833 1834 async getHandleForDid(did) { 1835 // Check if this DID has a handle registered 1836 const handleMap = (await this.state.storage.get('handleMap')) || {}; 1837 for (const [handle, mappedDid] of Object.entries(handleMap)) { 1838 if (mappedDid === did) return handle; 1839 } 1840 // Check instance's own handle 1841 const instanceDid = await this.getDid(); 1842 if (instanceDid === did) { 1843 return await this.state.storage.get('handle'); 1844 } 1845 return null; 1846 } 1847 1848 async createServiceAuthForAppView(did, lxm) { 1849 const signingKey = await this.getSigningKey(); 1850 return createServiceJwt({ 1851 iss: did, 1852 aud: 'did:web:api.bsky.app', 1853 lxm, 1854 signingKey, 1855 }); 1856 } 1857 1858 async handleAppViewProxy(request, userDid) { 1859 const url = new URL(request.url); 1860 // Extract lexicon method from path: /xrpc/app.bsky.actor.getPreferences -> app.bsky.actor.getPreferences 1861 const lxm = url.pathname.replace('/xrpc/', ''); 1862 1863 // Create service auth JWT 1864 const serviceJwt = await this.createServiceAuthForAppView(userDid, lxm); 1865 1866 // Build AppView URL 1867 const appViewUrl = new URL( 1868 url.pathname + url.search, 1869 'https://api.bsky.app', 1870 ); 1871 1872 // Forward request with service auth 1873 const headers = new Headers(); 1874 headers.set('Authorization', `Bearer ${serviceJwt}`); 1875 headers.set( 1876 'Content-Type', 1877 request.headers.get('Content-Type') || 'application/json', 1878 ); 1879 if (request.headers.get('Accept')) { 1880 headers.set('Accept', request.headers.get('Accept')); 1881 } 1882 if (request.headers.get('Accept-Language')) { 1883 headers.set('Accept-Language', request.headers.get('Accept-Language')); 1884 } 1885 1886 const proxyReq = new Request(appViewUrl.toString(), { 1887 method: request.method, 1888 headers, 1889 body: 1890 request.method !== 'GET' && request.method !== 'HEAD' 1891 ? request.body 1892 : undefined, 1893 }); 1894 1895 try { 1896 const response = await fetch(proxyReq); 1897 // Return the response with CORS headers 1898 const responseHeaders = new Headers(response.headers); 1899 responseHeaders.set('Access-Control-Allow-Origin', '*'); 1900 return new Response(response.body, { 1901 status: response.status, 1902 statusText: response.statusText, 1903 headers: responseHeaders, 1904 }); 1905 } catch (err) { 1906 return errorResponse( 1907 'UpstreamFailure', 1908 `Failed to reach AppView: ${err.message}`, 1909 502, 1910 ); 1911 } 1912 } 1913 1914 async handleListRepos() { 1915 const registeredDids = 1916 (await this.state.storage.get('registeredDids')) || []; 1917 const did = await this.getDid(); 1918 const repos = did 1919 ? [{ did, head: null, rev: null }] 1920 : registeredDids.map((d) => ({ did: d, head: null, rev: null })); 1921 return Response.json({ repos }); 1922 } 1923 1924 async handleCreateRecord(request) { 1925 const body = await request.json(); 1926 if (!body.collection || !body.record) { 1927 return errorResponse( 1928 'InvalidRequest', 1929 'missing collection or record', 1930 400, 1931 ); 1932 } 1933 try { 1934 const result = await this.createRecord( 1935 body.collection, 1936 body.record, 1937 body.rkey, 1938 ); 1939 const head = await this.state.storage.get('head'); 1940 const rev = await this.state.storage.get('rev'); 1941 return Response.json({ 1942 uri: result.uri, 1943 cid: result.cid, 1944 commit: { cid: head, rev }, 1945 validationStatus: 'valid', 1946 }); 1947 } catch (err) { 1948 return errorResponse('InternalError', err.message, 500); 1949 } 1950 } 1951 1952 async handleDeleteRecord(request) { 1953 const body = await request.json(); 1954 if (!body.collection || !body.rkey) { 1955 return errorResponse('InvalidRequest', 'missing collection or rkey', 400); 1956 } 1957 try { 1958 const result = await this.deleteRecord(body.collection, body.rkey); 1959 if (result.error) { 1960 return Response.json(result, { status: 404 }); 1961 } 1962 return Response.json({}); 1963 } catch (err) { 1964 return errorResponse('InternalError', err.message, 500); 1965 } 1966 } 1967 1968 async handlePutRecord(request) { 1969 const body = await request.json(); 1970 if (!body.collection || !body.rkey || !body.record) { 1971 return errorResponse( 1972 'InvalidRequest', 1973 'missing collection, rkey, or record', 1974 400, 1975 ); 1976 } 1977 try { 1978 // putRecord is like createRecord but with a specific rkey (upsert) 1979 const result = await this.createRecord( 1980 body.collection, 1981 body.record, 1982 body.rkey, 1983 ); 1984 const head = await this.state.storage.get('head'); 1985 const rev = await this.state.storage.get('rev'); 1986 return Response.json({ 1987 uri: result.uri, 1988 cid: result.cid, 1989 commit: { cid: head, rev }, 1990 validationStatus: 'valid', 1991 }); 1992 } catch (err) { 1993 return errorResponse('InternalError', err.message, 500); 1994 } 1995 } 1996 1997 async handleApplyWrites(request) { 1998 const body = await request.json(); 1999 if (!body.writes || !Array.isArray(body.writes)) { 2000 return errorResponse('InvalidRequest', 'missing writes array', 400); 2001 } 2002 try { 2003 const results = []; 2004 for (const write of body.writes) { 2005 const type = write.$type; 2006 if (type === 'com.atproto.repo.applyWrites#create') { 2007 const result = await this.createRecord( 2008 write.collection, 2009 write.value, 2010 write.rkey, 2011 ); 2012 results.push({ 2013 $type: 'com.atproto.repo.applyWrites#createResult', 2014 uri: result.uri, 2015 cid: result.cid, 2016 validationStatus: 'valid', 2017 }); 2018 } else if (type === 'com.atproto.repo.applyWrites#update') { 2019 const result = await this.createRecord( 2020 write.collection, 2021 write.value, 2022 write.rkey, 2023 ); 2024 results.push({ 2025 $type: 'com.atproto.repo.applyWrites#updateResult', 2026 uri: result.uri, 2027 cid: result.cid, 2028 validationStatus: 'valid', 2029 }); 2030 } else if (type === 'com.atproto.repo.applyWrites#delete') { 2031 await this.deleteRecord(write.collection, write.rkey); 2032 results.push({ 2033 $type: 'com.atproto.repo.applyWrites#deleteResult', 2034 }); 2035 } else { 2036 return errorResponse( 2037 'InvalidRequest', 2038 `Unknown write operation type: ${type}`, 2039 400, 2040 ); 2041 } 2042 } 2043 // Return commit info 2044 const head = await this.state.storage.get('head'); 2045 const rev = await this.state.storage.get('rev'); 2046 return Response.json({ commit: { cid: head, rev }, results }); 2047 } catch (err) { 2048 return errorResponse('InternalError', err.message, 500); 2049 } 2050 } 2051 2052 async handleGetRecord(url) { 2053 const collection = url.searchParams.get('collection'); 2054 const rkey = url.searchParams.get('rkey'); 2055 if (!collection || !rkey) { 2056 return errorResponse('InvalidRequest', 'missing collection or rkey', 400); 2057 } 2058 const did = await this.getDid(); 2059 const uri = `at://${did}/${collection}/${rkey}`; 2060 const rows = this.sql 2061 .exec(`SELECT cid, value FROM records WHERE uri = ?`, uri) 2062 .toArray(); 2063 if (rows.length === 0) { 2064 return errorResponse('RecordNotFound', 'record not found', 404); 2065 } 2066 const row = rows[0]; 2067 const value = cborDecode(new Uint8Array(row.value)); 2068 return Response.json({ uri, cid: row.cid, value }); 2069 } 2070 2071 async handleDescribeRepo() { 2072 const did = await this.getDid(); 2073 if (!did) { 2074 return errorResponse('RepoNotFound', 'repo not found', 404); 2075 } 2076 const handle = await this.state.storage.get('handle'); 2077 // Get unique collections 2078 const collections = this.sql 2079 .exec(`SELECT DISTINCT collection FROM records`) 2080 .toArray() 2081 .map((r) => r.collection); 2082 2083 return Response.json({ 2084 handle: handle || did, 2085 did, 2086 didDoc: {}, 2087 collections, 2088 handleIsCorrect: !!handle, 2089 }); 2090 } 2091 2092 async handleListRecords(url) { 2093 const collection = url.searchParams.get('collection'); 2094 if (!collection) { 2095 return errorResponse('InvalidRequest', 'missing collection', 400); 2096 } 2097 const limit = Math.min( 2098 parseInt(url.searchParams.get('limit') || '50', 10), 2099 100, 2100 ); 2101 const reverse = url.searchParams.get('reverse') === 'true'; 2102 const _cursor = url.searchParams.get('cursor'); 2103 2104 const _did = await this.getDid(); 2105 const query = `SELECT uri, cid, value FROM records WHERE collection = ? ORDER BY rkey ${reverse ? 'DESC' : 'ASC'} LIMIT ?`; 2106 const params = [collection, limit + 1]; 2107 2108 const rows = this.sql.exec(query, ...params).toArray(); 2109 const hasMore = rows.length > limit; 2110 const records = rows.slice(0, limit).map((r) => ({ 2111 uri: r.uri, 2112 cid: r.cid, 2113 value: cborDecode(new Uint8Array(r.value)), 2114 })); 2115 2116 return Response.json({ 2117 records, 2118 cursor: hasMore ? records[records.length - 1]?.uri : undefined, 2119 }); 2120 } 2121 2122 handleGetLatestCommit() { 2123 const commits = this.sql 2124 .exec(`SELECT cid, rev FROM commits ORDER BY seq DESC LIMIT 1`) 2125 .toArray(); 2126 if (commits.length === 0) { 2127 return errorResponse('RepoNotFound', 'repo not found', 404); 2128 } 2129 return Response.json({ cid: commits[0].cid, rev: commits[0].rev }); 2130 } 2131 2132 async handleGetRepoStatus() { 2133 const did = await this.getDid(); 2134 const commits = this.sql 2135 .exec(`SELECT cid, rev FROM commits ORDER BY seq DESC LIMIT 1`) 2136 .toArray(); 2137 if (commits.length === 0 || !did) { 2138 return errorResponse('RepoNotFound', 'repo not found', 404); 2139 } 2140 return Response.json({ 2141 did, 2142 active: true, 2143 status: 'active', 2144 rev: commits[0].rev, 2145 }); 2146 } 2147 2148 handleGetRepo() { 2149 const commits = this.sql 2150 .exec(`SELECT cid FROM commits ORDER BY seq DESC LIMIT 1`) 2151 .toArray(); 2152 if (commits.length === 0) { 2153 return errorResponse('RepoNotFound', 'repo not found', 404); 2154 } 2155 2156 // Only include blocks reachable from the current commit 2157 const commitCid = commits[0].cid; 2158 const neededCids = new Set(); 2159 2160 // Helper to get block data 2161 const getBlock = (cid) => { 2162 const rows = this.sql 2163 .exec(`SELECT data FROM blocks WHERE cid = ?`, cid) 2164 .toArray(); 2165 return rows.length > 0 ? new Uint8Array(rows[0].data) : null; 2166 }; 2167 2168 // Collect all reachable blocks starting from commit 2169 const collectBlocks = (cid) => { 2170 if (neededCids.has(cid)) return; 2171 neededCids.add(cid); 2172 2173 const data = getBlock(cid); 2174 if (!data) return; 2175 2176 // Decode CBOR to find CID references 2177 try { 2178 const decoded = cborDecode(data); 2179 if (decoded && typeof decoded === 'object') { 2180 // Commit object - follow 'data' (MST root) 2181 if (decoded.data instanceof Uint8Array) { 2182 collectBlocks(cidToString(decoded.data)); 2183 } 2184 // MST node - follow 'l' and entries' 'v' and 't' 2185 if (decoded.l instanceof Uint8Array) { 2186 collectBlocks(cidToString(decoded.l)); 2187 } 2188 if (Array.isArray(decoded.e)) { 2189 for (const entry of decoded.e) { 2190 if (entry.v instanceof Uint8Array) { 2191 collectBlocks(cidToString(entry.v)); 2192 } 2193 if (entry.t instanceof Uint8Array) { 2194 collectBlocks(cidToString(entry.t)); 2195 } 2196 } 2197 } 2198 } 2199 } catch (_e) { 2200 // Not a structured block, that's fine 2201 } 2202 }; 2203 2204 collectBlocks(commitCid); 2205 2206 // Build CAR with only needed blocks 2207 const blocksForCar = []; 2208 for (const cid of neededCids) { 2209 const data = getBlock(cid); 2210 if (data) { 2211 blocksForCar.push({ cid, data }); 2212 } 2213 } 2214 2215 const car = buildCarFile(commitCid, blocksForCar); 2216 return new Response(car, { 2217 headers: { 'content-type': 'application/vnd.ipld.car' }, 2218 }); 2219 } 2220 2221 async handleSyncGetRecord(url) { 2222 const collection = url.searchParams.get('collection'); 2223 const rkey = url.searchParams.get('rkey'); 2224 if (!collection || !rkey) { 2225 return errorResponse('InvalidRequest', 'missing collection or rkey', 400); 2226 } 2227 const did = await this.getDid(); 2228 const uri = `at://${did}/${collection}/${rkey}`; 2229 const rows = this.sql 2230 .exec(`SELECT cid FROM records WHERE uri = ?`, uri) 2231 .toArray(); 2232 if (rows.length === 0) { 2233 return errorResponse('RecordNotFound', 'record not found', 404); 2234 } 2235 const recordCid = rows[0].cid; 2236 2237 // Get latest commit 2238 const commits = this.sql 2239 .exec(`SELECT cid FROM commits ORDER BY seq DESC LIMIT 1`) 2240 .toArray(); 2241 if (commits.length === 0) { 2242 return errorResponse('RepoNotFound', 'no commits', 404); 2243 } 2244 const commitCid = commits[0].cid; 2245 2246 // Build proof chain: commit -> MST path -> record 2247 // Include commit block, all MST nodes on path to record, and record block 2248 const blocks = []; 2249 const included = new Set(); 2250 2251 const addBlock = (cidStr) => { 2252 if (included.has(cidStr)) return; 2253 included.add(cidStr); 2254 const blockRows = this.sql 2255 .exec(`SELECT data FROM blocks WHERE cid = ?`, cidStr) 2256 .toArray(); 2257 if (blockRows.length > 0) { 2258 blocks.push({ cid: cidStr, data: new Uint8Array(blockRows[0].data) }); 2259 } 2260 }; 2261 2262 // Add commit block 2263 addBlock(commitCid); 2264 2265 // Get commit to find data root 2266 const commitRows = this.sql 2267 .exec(`SELECT data FROM blocks WHERE cid = ?`, commitCid) 2268 .toArray(); 2269 if (commitRows.length > 0) { 2270 const commit = cborDecode(new Uint8Array(commitRows[0].data)); 2271 if (commit.data) { 2272 const dataRootCid = cidToString(commit.data); 2273 // Collect MST path blocks (this includes all MST nodes) 2274 const mstBlocks = this.collectMstBlocks(dataRootCid); 2275 for (const block of mstBlocks) { 2276 addBlock(block.cid); 2277 } 2278 } 2279 } 2280 2281 // Add record block 2282 addBlock(recordCid); 2283 2284 const car = buildCarFile(commitCid, blocks); 2285 return new Response(car, { 2286 headers: { 'content-type': 'application/vnd.ipld.car' }, 2287 }); 2288 } 2289 2290 handleSubscribeRepos(request, url) { 2291 const upgradeHeader = request.headers.get('Upgrade'); 2292 if (upgradeHeader !== 'websocket') { 2293 return new Response('expected websocket', { status: 426 }); 2294 } 2295 const { 0: client, 1: server } = new WebSocketPair(); 2296 this.state.acceptWebSocket(server); 2297 const cursor = url.searchParams.get('cursor'); 2298 if (cursor) { 2299 const events = this.sql 2300 .exec( 2301 `SELECT * FROM seq_events WHERE seq > ? ORDER BY seq`, 2302 parseInt(cursor, 10), 2303 ) 2304 .toArray(); 2305 for (const evt of events) { 2306 server.send(this.formatEvent(evt)); 2307 } 2308 } 2309 return new Response(null, { status: 101, webSocket: client }); 2310 } 2311 2312 async fetch(request) { 2313 const url = new URL(request.url); 2314 const route = pdsRoutes[url.pathname]; 2315 2316 // Check for local route first 2317 if (route) { 2318 if (route.method && request.method !== route.method) { 2319 return errorResponse('MethodNotAllowed', 'method not allowed', 405); 2320 } 2321 return route.handler(this, request, url); 2322 } 2323 2324 // Handle app.bsky.* proxy requests (only if no local route) 2325 if (url.pathname.startsWith('/xrpc/app.bsky.')) { 2326 const userDid = request.headers.get('x-authed-did'); 2327 if (!userDid) { 2328 return errorResponse('Unauthorized', 'Missing auth context', 401); 2329 } 2330 return this.handleAppViewProxy(request, userDid); 2331 } 2332 2333 return errorResponse('NotFound', 'not found', 404); 2334 } 2335} 2336 2337const corsHeaders = { 2338 'Access-Control-Allow-Origin': '*', 2339 'Access-Control-Allow-Methods': 'GET, POST, OPTIONS', 2340 'Access-Control-Allow-Headers': 2341 'Content-Type, Authorization, atproto-accept-labelers, atproto-proxy, x-bsky-topics', 2342}; 2343 2344function addCorsHeaders(response) { 2345 const newHeaders = new Headers(response.headers); 2346 for (const [key, value] of Object.entries(corsHeaders)) { 2347 newHeaders.set(key, value); 2348 } 2349 return new Response(response.body, { 2350 status: response.status, 2351 statusText: response.statusText, 2352 headers: newHeaders, 2353 }); 2354} 2355 2356export default { 2357 async fetch(request, env) { 2358 // Handle CORS preflight 2359 if (request.method === 'OPTIONS') { 2360 return new Response(null, { headers: corsHeaders }); 2361 } 2362 2363 const response = await handleRequest(request, env); 2364 // Don't wrap WebSocket upgrades - they need the webSocket property preserved 2365 if (response.status === 101) { 2366 return response; 2367 } 2368 return addCorsHeaders(response); 2369 }, 2370}; 2371 2372// Extract subdomain from hostname (e.g., "alice" from "alice.foo.workers.dev") 2373function getSubdomain(hostname) { 2374 const parts = hostname.split('.'); 2375 // workers.dev domains: [subdomain?].[worker-name].[account].workers.dev 2376 // If more than 4 parts, first part(s) are user subdomain 2377 if (parts.length > 4 && parts.slice(-2).join('.') === 'workers.dev') { 2378 return parts.slice(0, -4).join('.'); 2379 } 2380 // Custom domains: check if there's a subdomain before the base 2381 // For now, assume no subdomain on custom domains 2382 return null; 2383} 2384 2385/** 2386 * Verify auth and return DID from token 2387 * @param {Request} request - HTTP request with Authorization header 2388 * @param {Object} env - Environment with JWT_SECRET 2389 * @returns {Promise<{did: string} | {error: Response}>} DID or error response 2390 */ 2391async function requireAuth(request, env) { 2392 const authHeader = request.headers.get('Authorization'); 2393 if (!authHeader || !authHeader.startsWith('Bearer ')) { 2394 return { 2395 error: Response.json( 2396 { 2397 error: 'AuthRequired', 2398 message: 'Authentication required', 2399 }, 2400 { status: 401 }, 2401 ), 2402 }; 2403 } 2404 2405 const token = authHeader.slice(7); 2406 const jwtSecret = env?.JWT_SECRET; 2407 if (!jwtSecret) { 2408 return { 2409 error: Response.json( 2410 { 2411 error: 'InternalServerError', 2412 message: 'Server not configured for authentication', 2413 }, 2414 { status: 500 }, 2415 ), 2416 }; 2417 } 2418 2419 try { 2420 const payload = await verifyAccessJwt(token, jwtSecret); 2421 return { did: payload.sub }; 2422 } catch (err) { 2423 return { 2424 error: Response.json( 2425 { 2426 error: 'InvalidToken', 2427 message: err.message, 2428 }, 2429 { status: 401 }, 2430 ), 2431 }; 2432 } 2433} 2434 2435async function handleAuthenticatedRepoWrite(request, env) { 2436 const auth = await requireAuth(request, env); 2437 if (auth.error) return auth.error; 2438 2439 const body = await request.json(); 2440 const repo = body.repo; 2441 if (!repo) { 2442 return errorResponse('InvalidRequest', 'missing repo param', 400); 2443 } 2444 2445 if (auth.did !== repo) { 2446 return errorResponse('Forbidden', "Cannot modify another user's repo", 403); 2447 } 2448 2449 const id = env.PDS.idFromName(repo); 2450 const pds = env.PDS.get(id); 2451 const response = await pds.fetch( 2452 new Request(request.url, { 2453 method: 'POST', 2454 headers: request.headers, 2455 body: JSON.stringify(body), 2456 }), 2457 ); 2458 2459 // Notify relay of updates on successful writes 2460 if (response.ok) { 2461 const url = new URL(request.url); 2462 notifyCrawlers(env, url.hostname); 2463 } 2464 2465 return response; 2466} 2467 2468async function handleRequest(request, env) { 2469 const url = new URL(request.url); 2470 const subdomain = getSubdomain(url.hostname); 2471 2472 // Handle resolution via subdomain or bare domain 2473 if (url.pathname === '/.well-known/atproto-did') { 2474 // Look up handle -> DID in default DO 2475 // Use subdomain if present, otherwise try bare hostname as handle 2476 const handleToResolve = subdomain || url.hostname; 2477 const defaultId = env.PDS.idFromName('default'); 2478 const defaultPds = env.PDS.get(defaultId); 2479 const resolveRes = await defaultPds.fetch( 2480 new Request( 2481 `http://internal/resolve-handle?handle=${encodeURIComponent(handleToResolve)}`, 2482 ), 2483 ); 2484 if (!resolveRes.ok) { 2485 return new Response('Handle not found', { status: 404 }); 2486 } 2487 const { did } = await resolveRes.json(); 2488 return new Response(did, { headers: { 'Content-Type': 'text/plain' } }); 2489 } 2490 2491 // describeServer - works on bare domain 2492 if (url.pathname === '/xrpc/com.atproto.server.describeServer') { 2493 const defaultId = env.PDS.idFromName('default'); 2494 const defaultPds = env.PDS.get(defaultId); 2495 const newReq = new Request(request.url, { 2496 method: request.method, 2497 headers: { 2498 ...Object.fromEntries(request.headers), 2499 'x-hostname': url.hostname, 2500 }, 2501 }); 2502 return defaultPds.fetch(newReq); 2503 } 2504 2505 // createSession - handle on default DO (has handleMap for identifier resolution) 2506 if (url.pathname === '/xrpc/com.atproto.server.createSession') { 2507 const defaultId = env.PDS.idFromName('default'); 2508 const defaultPds = env.PDS.get(defaultId); 2509 return defaultPds.fetch(request); 2510 } 2511 2512 // getSession - route to default DO 2513 if (url.pathname === '/xrpc/com.atproto.server.getSession') { 2514 const defaultId = env.PDS.idFromName('default'); 2515 const defaultPds = env.PDS.get(defaultId); 2516 return defaultPds.fetch(request); 2517 } 2518 2519 // refreshSession - route to default DO 2520 if (url.pathname === '/xrpc/com.atproto.server.refreshSession') { 2521 const defaultId = env.PDS.idFromName('default'); 2522 const defaultPds = env.PDS.get(defaultId); 2523 return defaultPds.fetch(request); 2524 } 2525 2526 // Proxy app.bsky.* endpoints to Bluesky AppView 2527 if (url.pathname.startsWith('/xrpc/app.bsky.')) { 2528 // Authenticate the user first 2529 const auth = await requireAuth(request, env); 2530 if (auth.error) return auth.error; 2531 2532 // Route to the user's DO instance to create service auth and proxy 2533 const id = env.PDS.idFromName(auth.did); 2534 const pds = env.PDS.get(id); 2535 return pds.fetch( 2536 new Request(request.url, { 2537 method: request.method, 2538 headers: { 2539 ...Object.fromEntries(request.headers), 2540 'x-authed-did': auth.did, // Pass the authenticated DID 2541 }, 2542 body: 2543 request.method !== 'GET' && request.method !== 'HEAD' 2544 ? request.body 2545 : undefined, 2546 }), 2547 ); 2548 } 2549 2550 // Handle registration routes - go to default DO 2551 if ( 2552 url.pathname === '/register-handle' || 2553 url.pathname === '/resolve-handle' 2554 ) { 2555 const defaultId = env.PDS.idFromName('default'); 2556 const defaultPds = env.PDS.get(defaultId); 2557 return defaultPds.fetch(request); 2558 } 2559 2560 // resolveHandle XRPC endpoint 2561 if (url.pathname === '/xrpc/com.atproto.identity.resolveHandle') { 2562 const handle = url.searchParams.get('handle'); 2563 if (!handle) { 2564 return errorResponse('InvalidRequest', 'missing handle param', 400); 2565 } 2566 const defaultId = env.PDS.idFromName('default'); 2567 const defaultPds = env.PDS.get(defaultId); 2568 const resolveRes = await defaultPds.fetch( 2569 new Request( 2570 `http://internal/resolve-handle?handle=${encodeURIComponent(handle)}`, 2571 ), 2572 ); 2573 if (!resolveRes.ok) { 2574 return errorResponse('InvalidRequest', 'Unable to resolve handle', 400); 2575 } 2576 const { did } = await resolveRes.json(); 2577 return Response.json({ did }); 2578 } 2579 2580 // subscribeRepos WebSocket - route to default instance for firehose 2581 if (url.pathname === '/xrpc/com.atproto.sync.subscribeRepos') { 2582 const defaultId = env.PDS.idFromName('default'); 2583 const defaultPds = env.PDS.get(defaultId); 2584 return defaultPds.fetch(request); 2585 } 2586 2587 // listRepos needs to aggregate from all registered DIDs 2588 if (url.pathname === '/xrpc/com.atproto.sync.listRepos') { 2589 const defaultId = env.PDS.idFromName('default'); 2590 const defaultPds = env.PDS.get(defaultId); 2591 const regRes = await defaultPds.fetch( 2592 new Request('http://internal/get-registered-dids'), 2593 ); 2594 const { dids } = await regRes.json(); 2595 2596 const repos = []; 2597 for (const did of dids) { 2598 const id = env.PDS.idFromName(did); 2599 const pds = env.PDS.get(id); 2600 const infoRes = await pds.fetch(new Request('http://internal/repo-info')); 2601 const info = await infoRes.json(); 2602 if (info.head) { 2603 repos.push({ did, head: info.head, rev: info.rev, active: true }); 2604 } 2605 } 2606 return Response.json({ repos, cursor: undefined }); 2607 } 2608 2609 // Repo endpoints use ?repo= param instead of ?did= 2610 if ( 2611 url.pathname === '/xrpc/com.atproto.repo.describeRepo' || 2612 url.pathname === '/xrpc/com.atproto.repo.listRecords' || 2613 url.pathname === '/xrpc/com.atproto.repo.getRecord' 2614 ) { 2615 const repo = url.searchParams.get('repo'); 2616 if (!repo) { 2617 return errorResponse('InvalidRequest', 'missing repo param', 400); 2618 } 2619 const id = env.PDS.idFromName(repo); 2620 const pds = env.PDS.get(id); 2621 return pds.fetch(request); 2622 } 2623 2624 // Sync endpoints use ?did= param 2625 if ( 2626 url.pathname === '/xrpc/com.atproto.sync.getLatestCommit' || 2627 url.pathname === '/xrpc/com.atproto.sync.getRepoStatus' || 2628 url.pathname === '/xrpc/com.atproto.sync.getRepo' || 2629 url.pathname === '/xrpc/com.atproto.sync.getRecord' 2630 ) { 2631 const did = url.searchParams.get('did'); 2632 if (!did) { 2633 return errorResponse('InvalidRequest', 'missing did param', 400); 2634 } 2635 const id = env.PDS.idFromName(did); 2636 const pds = env.PDS.get(id); 2637 return pds.fetch(request); 2638 } 2639 2640 // Authenticated repo write endpoints 2641 const repoWriteEndpoints = [ 2642 '/xrpc/com.atproto.repo.createRecord', 2643 '/xrpc/com.atproto.repo.deleteRecord', 2644 '/xrpc/com.atproto.repo.putRecord', 2645 '/xrpc/com.atproto.repo.applyWrites', 2646 ]; 2647 if (repoWriteEndpoints.includes(url.pathname)) { 2648 return handleAuthenticatedRepoWrite(request, env); 2649 } 2650 2651 // Health check endpoint 2652 if (url.pathname === '/xrpc/_health') { 2653 return Response.json({ version: '0.1.0' }); 2654 } 2655 2656 // Root path - ASCII art 2657 if (url.pathname === '/') { 2658 const ascii = ` 2659 ██████╗ ██████╗ ███████╗ ██╗ ███████╗ 2660 ██╔══██╗ ██╔══██╗ ██╔════╝ ██║ ██╔════╝ 2661 ██████╔╝ ██║ ██║ ███████╗ ██║ ███████╗ 2662 ██╔═══╝ ██║ ██║ ╚════██║ ██ ██║ ╚════██║ 2663 ██║ ██████╔╝ ███████║ ██╗ ╚█████╔╝ ███████║ 2664 ╚═╝ ╚═════╝ ╚══════╝ ╚═╝ ╚════╝ ╚══════╝ 2665 2666 ATProto PDS on Cloudflare Workers 2667`; 2668 return new Response(ascii, { 2669 headers: { 'Content-Type': 'text/plain; charset=utf-8' }, 2670 }); 2671 } 2672 2673 // On init, register this DID with the default instance (requires ?did= param, no auth yet) 2674 if (url.pathname === '/init' && request.method === 'POST') { 2675 const did = url.searchParams.get('did'); 2676 if (!did) { 2677 return errorResponse('InvalidRequest', 'missing did param', 400); 2678 } 2679 const body = await request.json(); 2680 2681 // Register with default instance for discovery 2682 const defaultId = env.PDS.idFromName('default'); 2683 const defaultPds = env.PDS.get(defaultId); 2684 await defaultPds.fetch( 2685 new Request('http://internal/register-did', { 2686 method: 'POST', 2687 body: JSON.stringify({ did }), 2688 }), 2689 ); 2690 2691 // Register handle if provided 2692 if (body.handle) { 2693 await defaultPds.fetch( 2694 new Request('http://internal/register-handle', { 2695 method: 'POST', 2696 body: JSON.stringify({ did, handle: body.handle }), 2697 }), 2698 ); 2699 } 2700 2701 // Forward to the actual PDS instance 2702 const id = env.PDS.idFromName(did); 2703 const pds = env.PDS.get(id); 2704 return pds.fetch( 2705 new Request(request.url, { 2706 method: 'POST', 2707 headers: request.headers, 2708 body: JSON.stringify(body), 2709 }), 2710 ); 2711 } 2712 2713 // Unknown endpoint 2714 return errorResponse('NotFound', 'Endpoint not found', 404); 2715}