// === CBOR ENCODING === // Minimal deterministic CBOR (RFC 8949) - sorted keys, minimal integers function cborEncode(value) { const parts = [] function encode(val) { if (val === null) { parts.push(0xf6) // null } else if (val === true) { parts.push(0xf5) // true } else if (val === false) { parts.push(0xf4) // false } else if (typeof val === 'number') { encodeInteger(val) } else if (typeof val === 'string') { const bytes = new TextEncoder().encode(val) encodeHead(3, bytes.length) // major type 3 = text string parts.push(...bytes) } else if (val instanceof Uint8Array) { encodeHead(2, val.length) // major type 2 = byte string parts.push(...val) } else if (Array.isArray(val)) { encodeHead(4, val.length) // major type 4 = array for (const item of val) encode(item) } else if (typeof val === 'object') { // Sort keys for deterministic encoding const keys = Object.keys(val).sort() encodeHead(5, keys.length) // major type 5 = map for (const key of keys) { encode(key) encode(val[key]) } } } function encodeHead(majorType, length) { const mt = majorType << 5 if (length < 24) { parts.push(mt | length) } else if (length < 256) { parts.push(mt | 24, length) } else if (length < 65536) { parts.push(mt | 25, length >> 8, length & 0xff) } else if (length < 4294967296) { parts.push(mt | 26, (length >> 24) & 0xff, (length >> 16) & 0xff, (length >> 8) & 0xff, length & 0xff) } } function encodeInteger(n) { if (n >= 0) { encodeHead(0, n) // major type 0 = unsigned int } else { encodeHead(1, -n - 1) // major type 1 = negative int } } encode(value) return new Uint8Array(parts) } function cborDecode(bytes) { let offset = 0 function read() { const initial = bytes[offset++] const major = initial >> 5 const info = initial & 0x1f let length = info if (info === 24) length = bytes[offset++] else if (info === 25) { length = (bytes[offset++] << 8) | bytes[offset++] } else if (info === 26) { length = (bytes[offset++] << 24) | (bytes[offset++] << 16) | (bytes[offset++] << 8) | bytes[offset++] } switch (major) { case 0: return length // unsigned int case 1: return -1 - length // negative int case 2: { // byte string const data = bytes.slice(offset, offset + length) offset += length return data } case 3: { // text string const data = new TextDecoder().decode(bytes.slice(offset, offset + length)) offset += length return data } case 4: { // array const arr = [] for (let i = 0; i < length; i++) arr.push(read()) return arr } case 5: { // map const obj = {} for (let i = 0; i < length; i++) { const key = read() obj[key] = read() } return obj } case 7: { // special if (info === 20) return false if (info === 21) return true if (info === 22) return null return undefined } } } return read() } // === CID GENERATION === // dag-cbor (0x71) + sha-256 (0x12) + 32 bytes async function createCid(bytes) { const hash = await crypto.subtle.digest('SHA-256', bytes) const hashBytes = new Uint8Array(hash) // CIDv1: version(1) + codec(dag-cbor=0x71) + multihash(sha256) // Multihash: hash-type(0x12) + length(0x20=32) + digest const cid = new Uint8Array(2 + 2 + 32) cid[0] = 0x01 // CIDv1 cid[1] = 0x71 // dag-cbor codec cid[2] = 0x12 // sha-256 cid[3] = 0x20 // 32 bytes cid.set(hashBytes, 4) return cid } function cidToString(cid) { // base32lower encoding for CIDv1 return 'b' + base32Encode(cid) } function base32Encode(bytes) { const alphabet = 'abcdefghijklmnopqrstuvwxyz234567' let result = '' let bits = 0 let value = 0 for (const byte of bytes) { value = (value << 8) | byte bits += 8 while (bits >= 5) { bits -= 5 result += alphabet[(value >> bits) & 31] } } if (bits > 0) { result += alphabet[(value << (5 - bits)) & 31] } return result } // === TID GENERATION === // Timestamp-based IDs: base32-sort encoded microseconds + clock ID const TID_CHARS = '234567abcdefghijklmnopqrstuvwxyz' let lastTimestamp = 0 let clockId = Math.floor(Math.random() * 1024) function createTid() { let timestamp = Date.now() * 1000 // microseconds // Ensure monotonic if (timestamp <= lastTimestamp) { timestamp = lastTimestamp + 1 } lastTimestamp = timestamp // 13 chars: 11 for timestamp (64 bits but only ~53 used), 2 for clock ID let tid = '' // Encode timestamp (high bits first for sortability) let ts = timestamp for (let i = 0; i < 11; i++) { tid = TID_CHARS[ts & 31] + tid ts = Math.floor(ts / 32) } // Append clock ID (2 chars) tid += TID_CHARS[(clockId >> 5) & 31] tid += TID_CHARS[clockId & 31] return tid } // === P-256 SIGNING === // Web Crypto ECDSA with P-256 curve async function importPrivateKey(privateKeyBytes) { // PKCS#8 wrapper for raw P-256 private key const pkcs8Prefix = new Uint8Array([ 0x30, 0x41, 0x02, 0x01, 0x00, 0x30, 0x13, 0x06, 0x07, 0x2a, 0x86, 0x48, 0xce, 0x3d, 0x02, 0x01, 0x06, 0x08, 0x2a, 0x86, 0x48, 0xce, 0x3d, 0x03, 0x01, 0x07, 0x04, 0x27, 0x30, 0x25, 0x02, 0x01, 0x01, 0x04, 0x20 ]) const pkcs8 = new Uint8Array(pkcs8Prefix.length + 32) pkcs8.set(pkcs8Prefix) pkcs8.set(privateKeyBytes, pkcs8Prefix.length) return crypto.subtle.importKey( 'pkcs8', pkcs8, { name: 'ECDSA', namedCurve: 'P-256' }, false, ['sign'] ) } async function sign(privateKey, data) { const signature = await crypto.subtle.sign( { name: 'ECDSA', hash: 'SHA-256' }, privateKey, data ) return new Uint8Array(signature) } async function generateKeyPair() { const keyPair = await crypto.subtle.generateKey( { name: 'ECDSA', namedCurve: 'P-256' }, true, ['sign', 'verify'] ) // Export private key as raw bytes const privateJwk = await crypto.subtle.exportKey('jwk', keyPair.privateKey) const privateBytes = base64UrlDecode(privateJwk.d) // Export public key as compressed point const publicRaw = await crypto.subtle.exportKey('raw', keyPair.publicKey) const publicBytes = new Uint8Array(publicRaw) const compressed = compressPublicKey(publicBytes) return { privateKey: privateBytes, publicKey: compressed } } function compressPublicKey(uncompressed) { // uncompressed is 65 bytes: 0x04 + x(32) + y(32) // compressed is 33 bytes: prefix(02 or 03) + x(32) const x = uncompressed.slice(1, 33) const y = uncompressed.slice(33, 65) const prefix = (y[31] & 1) === 0 ? 0x02 : 0x03 const compressed = new Uint8Array(33) compressed[0] = prefix compressed.set(x, 1) return compressed } function base64UrlDecode(str) { const base64 = str.replace(/-/g, '+').replace(/_/g, '/') const binary = atob(base64) const bytes = new Uint8Array(binary.length) for (let i = 0; i < binary.length; i++) { bytes[i] = binary.charCodeAt(i) } return bytes } function bytesToHex(bytes) { return Array.from(bytes).map(b => b.toString(16).padStart(2, '0')).join('') } function hexToBytes(hex) { const bytes = new Uint8Array(hex.length / 2) for (let i = 0; i < hex.length; i += 2) { bytes[i / 2] = parseInt(hex.substr(i, 2), 16) } return bytes } // === MERKLE SEARCH TREE === // Simple rebuild-on-write implementation async function sha256(data) { const hash = await crypto.subtle.digest('SHA-256', data) return new Uint8Array(hash) } function getKeyDepth(key) { // Count leading zeros in hash to determine tree depth const keyBytes = new TextEncoder().encode(key) // Sync hash for depth calculation (use first bytes of key as proxy) let zeros = 0 for (const byte of keyBytes) { if (byte === 0) zeros += 8 else { for (let i = 7; i >= 0; i--) { if ((byte >> i) & 1) break zeros++ } break } } return Math.floor(zeros / 4) } class MST { constructor(sql) { this.sql = sql } async computeRoot() { const records = this.sql.exec(` SELECT collection, rkey, cid FROM records ORDER BY collection, rkey `).toArray() if (records.length === 0) { return null } const entries = records.map(r => ({ key: `${r.collection}/${r.rkey}`, cid: r.cid })) return this.buildTree(entries, 0) } async buildTree(entries, depth) { if (entries.length === 0) return null const node = { l: null, e: [] } let leftEntries = [] for (const entry of entries) { const keyDepth = getKeyDepth(entry.key) if (keyDepth > depth) { leftEntries.push(entry) } else { // Store accumulated left entries if (leftEntries.length > 0) { const leftCid = await this.buildTree(leftEntries, depth + 1) if (node.e.length === 0) { node.l = leftCid } else { node.e[node.e.length - 1].t = leftCid } leftEntries = [] } node.e.push({ k: entry.key, v: entry.cid, t: null }) } } // Handle remaining left entries if (leftEntries.length > 0) { const leftCid = await this.buildTree(leftEntries, depth + 1) if (node.e.length > 0) { node.e[node.e.length - 1].t = leftCid } else { node.l = leftCid } } // Encode and store node const nodeBytes = cborEncode(node) const nodeCid = await createCid(nodeBytes) const cidStr = cidToString(nodeCid) this.sql.exec( `INSERT OR REPLACE INTO blocks (cid, data) VALUES (?, ?)`, cidStr, nodeBytes ) return cidStr } } // === CAR FILE BUILDER === function varint(n) { const bytes = [] while (n >= 0x80) { bytes.push((n & 0x7f) | 0x80) n >>>= 7 } bytes.push(n) return new Uint8Array(bytes) } function cidToBytes(cidStr) { // Decode base32lower CID string to bytes if (!cidStr.startsWith('b')) throw new Error('expected base32lower CID') return base32Decode(cidStr.slice(1)) } function base32Decode(str) { const alphabet = 'abcdefghijklmnopqrstuvwxyz234567' let bits = 0 let value = 0 const output = [] for (const char of str) { const idx = alphabet.indexOf(char) if (idx === -1) continue value = (value << 5) | idx bits += 5 if (bits >= 8) { bits -= 8 output.push((value >> bits) & 0xff) } } return new Uint8Array(output) } function buildCarFile(rootCid, blocks) { const parts = [] // Header: { version: 1, roots: [rootCid] } const rootCidBytes = cidToBytes(rootCid) const header = cborEncode({ version: 1, roots: [rootCidBytes] }) parts.push(varint(header.length)) parts.push(header) // Blocks: varint(len) + cid + data for (const block of blocks) { const cidBytes = cidToBytes(block.cid) const blockLen = cidBytes.length + block.data.length parts.push(varint(blockLen)) parts.push(cidBytes) parts.push(block.data) } // Concatenate all parts const totalLen = parts.reduce((sum, p) => sum + p.length, 0) const car = new Uint8Array(totalLen) let offset = 0 for (const part of parts) { car.set(part, offset) offset += part.length } return car } export class PersonalDataServer { constructor(state, env) { this.state = state this.sql = state.storage.sql this.env = env // Initialize schema this.sql.exec(` CREATE TABLE IF NOT EXISTS blocks ( cid TEXT PRIMARY KEY, data BLOB NOT NULL ); CREATE TABLE IF NOT EXISTS records ( uri TEXT PRIMARY KEY, cid TEXT NOT NULL, collection TEXT NOT NULL, rkey TEXT NOT NULL, value BLOB NOT NULL ); CREATE TABLE IF NOT EXISTS commits ( seq INTEGER PRIMARY KEY AUTOINCREMENT, cid TEXT NOT NULL, rev TEXT NOT NULL, prev TEXT ); CREATE TABLE IF NOT EXISTS seq_events ( seq INTEGER PRIMARY KEY AUTOINCREMENT, did TEXT NOT NULL, commit_cid TEXT NOT NULL, evt BLOB NOT NULL ); CREATE INDEX IF NOT EXISTS idx_records_collection ON records(collection, rkey); `) } async initIdentity(did, privateKeyHex, handle = null) { await this.state.storage.put('did', did) await this.state.storage.put('privateKey', privateKeyHex) if (handle) { await this.state.storage.put('handle', handle) } } async getDid() { if (!this._did) { this._did = await this.state.storage.get('did') } return this._did } async getHandle() { return this.state.storage.get('handle') } async getSigningKey() { const hex = await this.state.storage.get('privateKey') if (!hex) return null return importPrivateKey(hexToBytes(hex)) } async createRecord(collection, record, rkey = null) { const did = await this.getDid() if (!did) throw new Error('PDS not initialized') rkey = rkey || createTid() const uri = `at://${did}/${collection}/${rkey}` // Encode and hash record const recordBytes = cborEncode(record) const recordCid = await createCid(recordBytes) const recordCidStr = cidToString(recordCid) // Store block this.sql.exec( `INSERT OR REPLACE INTO blocks (cid, data) VALUES (?, ?)`, recordCidStr, recordBytes ) // Store record index this.sql.exec( `INSERT OR REPLACE INTO records (uri, cid, collection, rkey, value) VALUES (?, ?, ?, ?, ?)`, uri, recordCidStr, collection, rkey, recordBytes ) // Rebuild MST const mst = new MST(this.sql) const dataRoot = await mst.computeRoot() // Get previous commit const prevCommits = this.sql.exec( `SELECT cid, rev FROM commits ORDER BY seq DESC LIMIT 1` ).toArray() const prevCommit = prevCommits.length > 0 ? prevCommits[0] : null // Create commit const rev = createTid() const commit = { did, version: 3, data: dataRoot, rev, prev: prevCommit?.cid || null } // Sign commit const commitBytes = cborEncode(commit) const signingKey = await this.getSigningKey() const sig = await sign(signingKey, commitBytes) const signedCommit = { ...commit, sig } const signedBytes = cborEncode(signedCommit) const commitCid = await createCid(signedBytes) const commitCidStr = cidToString(commitCid) // Store commit block this.sql.exec( `INSERT OR REPLACE INTO blocks (cid, data) VALUES (?, ?)`, commitCidStr, signedBytes ) // Store commit reference this.sql.exec( `INSERT INTO commits (cid, rev, prev) VALUES (?, ?, ?)`, commitCidStr, rev, prevCommit?.cid || null ) // Sequence event const evt = cborEncode({ ops: [{ action: 'create', path: `${collection}/${rkey}`, cid: recordCidStr }] }) this.sql.exec( `INSERT INTO seq_events (did, commit_cid, evt) VALUES (?, ?, ?)`, did, commitCidStr, evt ) // Broadcast to subscribers const evtRows = this.sql.exec( `SELECT * FROM seq_events ORDER BY seq DESC LIMIT 1` ).toArray() if (evtRows.length > 0) { this.broadcastEvent(evtRows[0]) } return { uri, cid: recordCidStr, commit: commitCidStr } } formatEvent(evt) { // AT Protocol frame format: header + body const header = cborEncode({ op: 1, t: '#commit' }) const body = cborEncode({ seq: evt.seq, rebase: false, tooBig: false, repo: evt.did, commit: cidToBytes(evt.commit_cid), rev: createTid(), since: null, blocks: new Uint8Array(0), // Simplified - real impl includes CAR slice ops: cborDecode(new Uint8Array(evt.evt)).ops, blobs: [], time: new Date().toISOString() }) // Concatenate header + body const frame = new Uint8Array(header.length + body.length) frame.set(header) frame.set(body, header.length) return frame } async webSocketMessage(ws, message) { // Handle ping if (message === 'ping') ws.send('pong') } async webSocketClose(ws, code, reason) { // Durable Object will hibernate when no connections remain } broadcastEvent(evt) { const frame = this.formatEvent(evt) for (const ws of this.state.getWebSockets()) { try { ws.send(frame) } catch (e) { // Client disconnected } } } async fetch(request) { const url = new URL(request.url) // Handle resolution - doesn't require ?did= param if (url.pathname === '/.well-known/atproto-did') { const did = await this.getDid() if (!did) { return new Response('User not found', { status: 404 }) } return new Response(did, { headers: { 'Content-Type': 'text/plain' } }) } if (url.pathname === '/init') { const body = await request.json() if (!body.did || !body.privateKey) { return Response.json({ error: 'missing did or privateKey' }, { status: 400 }) } await this.initIdentity(body.did, body.privateKey, body.handle || null) return Response.json({ ok: true, did: body.did, handle: body.handle || null }) } if (url.pathname === '/status') { const did = await this.getDid() return Response.json({ initialized: !!did, did: did || null }) } if (url.pathname === '/xrpc/com.atproto.repo.createRecord') { if (request.method !== 'POST') { return Response.json({ error: 'method not allowed' }, { status: 405 }) } const body = await request.json() if (!body.collection || !body.record) { return Response.json({ error: 'missing collection or record' }, { status: 400 }) } try { const result = await this.createRecord(body.collection, body.record, body.rkey) return Response.json(result) } catch (err) { return Response.json({ error: err.message }, { status: 500 }) } } if (url.pathname === '/xrpc/com.atproto.repo.getRecord') { const collection = url.searchParams.get('collection') const rkey = url.searchParams.get('rkey') if (!collection || !rkey) { return Response.json({ error: 'missing collection or rkey' }, { status: 400 }) } const did = await this.getDid() const uri = `at://${did}/${collection}/${rkey}` const rows = this.sql.exec( `SELECT cid, value FROM records WHERE uri = ?`, uri ).toArray() if (rows.length === 0) { return Response.json({ error: 'record not found' }, { status: 404 }) } const row = rows[0] // Decode CBOR for response (convert ArrayBuffer to Uint8Array) const value = cborDecode(new Uint8Array(row.value)) return Response.json({ uri, cid: row.cid, value }) } if (url.pathname === '/xrpc/com.atproto.sync.getRepo') { const commits = this.sql.exec( `SELECT cid FROM commits ORDER BY seq DESC LIMIT 1` ).toArray() if (commits.length === 0) { return Response.json({ error: 'repo not found' }, { status: 404 }) } const blocks = this.sql.exec(`SELECT cid, data FROM blocks`).toArray() // Convert ArrayBuffer data to Uint8Array const blocksForCar = blocks.map(b => ({ cid: b.cid, data: new Uint8Array(b.data) })) const car = buildCarFile(commits[0].cid, blocksForCar) return new Response(car, { headers: { 'content-type': 'application/vnd.ipld.car' } }) } if (url.pathname === '/xrpc/com.atproto.sync.subscribeRepos') { const upgradeHeader = request.headers.get('Upgrade') if (upgradeHeader !== 'websocket') { return new Response('expected websocket', { status: 426 }) } const { 0: client, 1: server } = new WebSocketPair() this.state.acceptWebSocket(server) // Send backlog if cursor provided const cursor = url.searchParams.get('cursor') if (cursor) { const events = this.sql.exec( `SELECT * FROM seq_events WHERE seq > ? ORDER BY seq`, parseInt(cursor) ).toArray() for (const evt of events) { server.send(this.formatEvent(evt)) } } return new Response(null, { status: 101, webSocket: client }) } return Response.json({ error: 'not found' }, { status: 404 }) } } export default { async fetch(request, env) { const url = new URL(request.url) // For /.well-known/atproto-did, extract DID from subdomain // e.g., alice.atproto-pds.chad-53c.workers.dev -> look up "alice" if (url.pathname === '/.well-known/atproto-did') { const host = request.headers.get('Host') || '' // For now, use the first Durable Object (single-user PDS) // Extract handle from subdomain if present const did = url.searchParams.get('did') || 'default' const id = env.PDS.idFromName(did) const pds = env.PDS.get(id) return pds.fetch(request) } const did = url.searchParams.get('did') if (!did) { return new Response('missing did param', { status: 400 }) } const id = env.PDS.idFromName(did) const pds = env.PDS.get(id) return pds.fetch(request) } } // Export utilities for testing export { cborEncode, cborDecode, createCid, cidToString, base32Encode, createTid, generateKeyPair, importPrivateKey, sign, bytesToHex, hexToBytes, getKeyDepth, varint, base32Decode, buildCarFile }