A minimal AT Protocol Personal Data Server written in JavaScript.
atproto pds
at main 287 lines 8.0 kB view raw
1// @pds/node - Node.js HTTP server adapter 2 3import { createServer as createHttpServer } from 'node:http'; 4import { createFsBlobs } from '@pds/blobs-fs'; 5import { PersonalDataServer } from '@pds/core'; 6import { LexiconResolver } from '@pds/lexicon-resolver'; 7import { createActorStorage, createSharedStorage } from '@pds/storage-sqlite'; 8import { WebSocketServer } from 'ws'; 9 10/** 11 * @typedef {Object} PdsServer 12 * @property {PersonalDataServer} pds - The PDS instance 13 * @property {import('node:http').Server} server - HTTP server 14 * @property {Object} db - SQLite database instance 15 * @property {() => Promise<import('node:http').Server>} listen - Start server 16 * @property {() => Promise<void>} close - Close server 17 */ 18 19/** 20 * Create WebSocket port for Node.js 21 * @param {WeakMap<Request, import('ws').WebSocket>} upgradeMap 22 * @param {import('ws').WebSocketServer} wss - WebSocket server for broadcast 23 * @returns {import('@pds/core/ports').WebSocketPort} 24 */ 25function createWebSocket(upgradeMap, wss) { 26 return { 27 isUpgrade(request) { 28 return upgradeMap.has(request); 29 }, 30 31 upgrade(request, onConnect) { 32 const ws = upgradeMap.get(request); 33 if (!ws) { 34 return new Response('WebSocket upgrade failed', { status: 500 }); 35 } 36 37 onConnect({ 38 send(data) { 39 if (ws.readyState === ws.OPEN) { 40 ws.send(data); 41 } 42 }, 43 close() { 44 ws.close(); 45 }, 46 }); 47 48 // Return 200 - actual upgrade handled by server's 'upgrade' event 49 // Node.js Response doesn't support status 101 50 return new Response(null, { 51 status: 200, 52 headers: { 'X-WebSocket-Upgraded': 'true' }, 53 }); 54 }, 55 56 broadcast(data) { 57 // Send to all connected WebSocket clients 58 for (const client of wss.clients) { 59 if (client.readyState === client.OPEN) { 60 try { 61 client.send(data); 62 } catch { 63 // Client may have disconnected 64 } 65 } 66 } 67 }, 68 }; 69} 70 71/** 72 * Create Node.js PDS server 73 * @param {Object} options 74 * @param {string} options.dbPath - Path to SQLite database 75 * @param {string} [options.blobsDir] - Directory for blob storage (used if blobs not provided) 76 * @param {import('@pds/core/ports').BlobPort} [options.blobs] - Custom blob storage adapter 77 * @param {string} options.jwtSecret - JWT signing secret 78 * @param {number} [options.port=3000] - Server port 79 * @param {string} [options.hostname] - PDS hostname 80 * @param {string} [options.appviewUrl] - AppView URL for proxying 81 * @param {string} [options.appviewDid] - AppView DID for service auth 82 * @param {string} [options.relayUrl] - Relay URL for firehose notifications (e.g., http://localhost:2470) 83 * @param {string} [options.password] - Password for createSession 84 * @param {import('@pds/core/ports').LexiconResolverPort} [options.lexiconResolver] - Lexicon resolver for record validation 85 * @returns {Promise<PdsServer>} 86 */ 87export async function createServer({ 88 dbPath, 89 blobsDir, 90 blobs: blobsArg, 91 jwtSecret, 92 port = 3000, 93 hostname, 94 appviewUrl, 95 appviewDid, 96 relayUrl, 97 password, 98 lexiconResolver, 99}) { 100 // Dynamic import for better-sqlite3 (optional peer dependency) 101 const Database = (await import('better-sqlite3')).default; 102 103 const db = new Database(dbPath); 104 105 // Create actor storage (per-DID data) 106 const actorStorage = createActorStorage(db); 107 108 // Create shared storage (global data) - can use same db for single-tenant 109 const sharedStorage = createSharedStorage(db); 110 111 if (!blobsArg && !blobsDir) { 112 throw new Error('Either blobs or blobsDir must be provided'); 113 } 114 const blobs = blobsArg ?? createFsBlobs(/** @type {string} */ (blobsDir)); 115 116 // WebSocket infrastructure 117 const upgradeMap = new WeakMap(); 118 const wss = new WebSocketServer({ noServer: true }); 119 const webSocket = createWebSocket(upgradeMap, wss); 120 121 // Create default lexicon resolver if not provided 122 const resolver = lexiconResolver ?? new LexiconResolver(); 123 124 // Create PDS with both storages 125 const pds = new PersonalDataServer({ 126 actorStorage, 127 sharedStorage, 128 blobs, 129 webSocket, 130 jwtSecret, 131 hostname, 132 appviewUrl, 133 appviewDid, 134 relayUrl, 135 password, 136 lexiconResolver: resolver, 137 }); 138 139 const server = createHttpServer(async (req, res) => { 140 try { 141 const request = nodeToWebRequest(req); 142 const response = await pds.fetch(request); 143 144 // Skip normal response for WebSocket upgrades 145 if ( 146 response.headers.get('X-WebSocket-Upgraded') && 147 upgradeMap.has(request) 148 ) { 149 return; // WebSocket handled by upgrade event 150 } 151 152 await webToNodeResponse(response, res); 153 } catch (error) { 154 console.error('Server error:', error); 155 res.statusCode = 500; 156 res.setHeader('Content-Type', 'application/json'); 157 res.end( 158 JSON.stringify({ 159 error: 'InternalServerError', 160 message: error.message, 161 }), 162 ); 163 } 164 }); 165 166 // Handle WebSocket upgrades 167 server.on('upgrade', (req, socket, head) => { 168 if (req.url?.startsWith('/xrpc/com.atproto.sync.subscribeRepos')) { 169 wss.handleUpgrade(req, socket, head, async (ws) => { 170 const request = nodeToWebRequest(req); 171 upgradeMap.set(request, ws); 172 try { 173 await pds.fetch(request); 174 } finally { 175 upgradeMap.delete(request); 176 } 177 }); 178 } else { 179 socket.destroy(); 180 } 181 }); 182 183 // Start cleanup interval (every hour) 184 const cleanupInterval = setInterval( 185 async () => { 186 await pds.runBlobCleanup(); 187 await sharedStorage.cleanupExpiredDpopJtis(); 188 }, 189 60 * 60 * 1000, 190 ); 191 192 return { 193 pds, 194 server, 195 db, 196 197 /** 198 * Start listening on configured port 199 * @returns {Promise<import('node:http').Server>} 200 */ 201 listen() { 202 return new Promise((resolve) => { 203 server.listen(port, () => { 204 console.log(`PDS listening on http://localhost:${port}`); 205 resolve(server); 206 }); 207 }); 208 }, 209 210 /** 211 * Close server and cleanup 212 * @returns {Promise<void>} 213 */ 214 close() { 215 clearInterval(cleanupInterval); 216 // Close all WebSocket connections first 217 wss.clients.forEach((client) => { 218 client.terminate(); 219 }); 220 wss.close(); 221 db.close(); 222 return new Promise((resolve, reject) => { 223 server.close((err) => { 224 if (err) reject(err); 225 else resolve(); 226 }); 227 }); 228 }, 229 }; 230} 231 232/** 233 * Convert Node.js IncomingMessage to Web Request 234 * @param {import('node:http').IncomingMessage} req 235 * @returns {Request} 236 */ 237function nodeToWebRequest(req) { 238 // Check for encrypted socket (HTTPS) 239 const socket = /** @type {import('node:tls').TLSSocket} */ (req.socket); 240 const protocol = socket.encrypted ? 'https' : 'http'; 241 const host = req.headers.host || 'localhost'; 242 const url = new URL(req.url || '/', `${protocol}://${host}`); 243 244 const headers = new Headers(); 245 for (const [key, value] of Object.entries(req.headers)) { 246 if (value) { 247 headers.set(key, Array.isArray(value) ? value.join(', ') : value); 248 } 249 } 250 251 const hasBody = req.method !== 'GET' && req.method !== 'HEAD'; 252 253 return new Request( 254 url.toString(), 255 /** @type {RequestInit} */ ({ 256 method: req.method, 257 headers, 258 body: hasBody ? req : null, 259 duplex: hasBody ? 'half' : undefined, 260 }), 261 ); 262} 263 264/** 265 * Write Web Response to Node.js ServerResponse 266 * @param {Response} response 267 * @param {import('node:http').ServerResponse} res 268 */ 269async function webToNodeResponse(response, res) { 270 res.statusCode = response.status; 271 res.statusMessage = response.statusText || ''; 272 273 for (const [key, value] of response.headers) { 274 res.setHeader(key, value); 275 } 276 277 if (response.body) { 278 const reader = response.body.getReader(); 279 while (true) { 280 const { done, value } = await reader.read(); 281 if (done) break; 282 res.write(value); 283 } 284 } 285 286 res.end(); 287}