// @pds/node - Node.js HTTP server adapter import { createServer as createHttpServer } from 'node:http'; import { createFsBlobs } from '@pds/blobs-fs'; import { PersonalDataServer } from '@pds/core'; import { LexiconResolver } from '@pds/lexicon-resolver'; import { createActorStorage, createSharedStorage } from '@pds/storage-sqlite'; import { WebSocketServer } from 'ws'; /** * @typedef {Object} PdsServer * @property {PersonalDataServer} pds - The PDS instance * @property {import('node:http').Server} server - HTTP server * @property {Object} db - SQLite database instance * @property {() => Promise} listen - Start server * @property {() => Promise} close - Close server */ /** * Create WebSocket port for Node.js * @param {WeakMap} upgradeMap * @param {import('ws').WebSocketServer} wss - WebSocket server for broadcast * @returns {import('@pds/core/ports').WebSocketPort} */ function createWebSocket(upgradeMap, wss) { return { isUpgrade(request) { return upgradeMap.has(request); }, upgrade(request, onConnect) { const ws = upgradeMap.get(request); if (!ws) { return new Response('WebSocket upgrade failed', { status: 500 }); } onConnect({ send(data) { if (ws.readyState === ws.OPEN) { ws.send(data); } }, close() { ws.close(); }, }); // Return 200 - actual upgrade handled by server's 'upgrade' event // Node.js Response doesn't support status 101 return new Response(null, { status: 200, headers: { 'X-WebSocket-Upgraded': 'true' }, }); }, broadcast(data) { // Send to all connected WebSocket clients for (const client of wss.clients) { if (client.readyState === client.OPEN) { try { client.send(data); } catch { // Client may have disconnected } } } }, }; } /** * Create Node.js PDS server * @param {Object} options * @param {string} options.dbPath - Path to SQLite database * @param {string} [options.blobsDir] - Directory for blob storage (used if blobs not provided) * @param {import('@pds/core/ports').BlobPort} [options.blobs] - Custom blob storage adapter * @param {string} options.jwtSecret - JWT signing secret * @param {number} [options.port=3000] - Server port * @param {string} [options.hostname] - PDS hostname * @param {string} [options.appviewUrl] - AppView URL for proxying * @param {string} [options.appviewDid] - AppView DID for service auth * @param {string} [options.relayUrl] - Relay URL for firehose notifications (e.g., http://localhost:2470) * @param {string} [options.password] - Password for createSession * @param {import('@pds/core/ports').LexiconResolverPort} [options.lexiconResolver] - Lexicon resolver for record validation * @returns {Promise} */ export async function createServer({ dbPath, blobsDir, blobs: blobsArg, jwtSecret, port = 3000, hostname, appviewUrl, appviewDid, relayUrl, password, lexiconResolver, }) { // Dynamic import for better-sqlite3 (optional peer dependency) const Database = (await import('better-sqlite3')).default; const db = new Database(dbPath); // Create actor storage (per-DID data) const actorStorage = createActorStorage(db); // Create shared storage (global data) - can use same db for single-tenant const sharedStorage = createSharedStorage(db); if (!blobsArg && !blobsDir) { throw new Error('Either blobs or blobsDir must be provided'); } const blobs = blobsArg ?? createFsBlobs(/** @type {string} */ (blobsDir)); // WebSocket infrastructure const upgradeMap = new WeakMap(); const wss = new WebSocketServer({ noServer: true }); const webSocket = createWebSocket(upgradeMap, wss); // Create default lexicon resolver if not provided const resolver = lexiconResolver ?? new LexiconResolver(); // Create PDS with both storages const pds = new PersonalDataServer({ actorStorage, sharedStorage, blobs, webSocket, jwtSecret, hostname, appviewUrl, appviewDid, relayUrl, password, lexiconResolver: resolver, }); const server = createHttpServer(async (req, res) => { try { const request = nodeToWebRequest(req); const response = await pds.fetch(request); // Skip normal response for WebSocket upgrades if ( response.headers.get('X-WebSocket-Upgraded') && upgradeMap.has(request) ) { return; // WebSocket handled by upgrade event } await webToNodeResponse(response, res); } catch (error) { console.error('Server error:', error); res.statusCode = 500; res.setHeader('Content-Type', 'application/json'); res.end( JSON.stringify({ error: 'InternalServerError', message: error.message, }), ); } }); // Handle WebSocket upgrades server.on('upgrade', (req, socket, head) => { if (req.url?.startsWith('/xrpc/com.atproto.sync.subscribeRepos')) { wss.handleUpgrade(req, socket, head, async (ws) => { const request = nodeToWebRequest(req); upgradeMap.set(request, ws); try { await pds.fetch(request); } finally { upgradeMap.delete(request); } }); } else { socket.destroy(); } }); // Start cleanup interval (every hour) const cleanupInterval = setInterval( async () => { await pds.runBlobCleanup(); await sharedStorage.cleanupExpiredDpopJtis(); }, 60 * 60 * 1000, ); return { pds, server, db, /** * Start listening on configured port * @returns {Promise} */ listen() { return new Promise((resolve) => { server.listen(port, () => { console.log(`PDS listening on http://localhost:${port}`); resolve(server); }); }); }, /** * Close server and cleanup * @returns {Promise} */ close() { clearInterval(cleanupInterval); // Close all WebSocket connections first wss.clients.forEach((client) => { client.terminate(); }); wss.close(); db.close(); return new Promise((resolve, reject) => { server.close((err) => { if (err) reject(err); else resolve(); }); }); }, }; } /** * Convert Node.js IncomingMessage to Web Request * @param {import('node:http').IncomingMessage} req * @returns {Request} */ function nodeToWebRequest(req) { // Check for encrypted socket (HTTPS) const socket = /** @type {import('node:tls').TLSSocket} */ (req.socket); const protocol = socket.encrypted ? 'https' : 'http'; const host = req.headers.host || 'localhost'; const url = new URL(req.url || '/', `${protocol}://${host}`); const headers = new Headers(); for (const [key, value] of Object.entries(req.headers)) { if (value) { headers.set(key, Array.isArray(value) ? value.join(', ') : value); } } const hasBody = req.method !== 'GET' && req.method !== 'HEAD'; return new Request( url.toString(), /** @type {RequestInit} */ ({ method: req.method, headers, body: hasBody ? req : null, duplex: hasBody ? 'half' : undefined, }), ); } /** * Write Web Response to Node.js ServerResponse * @param {Response} response * @param {import('node:http').ServerResponse} res */ async function webToNodeResponse(response, res) { res.statusCode = response.status; res.statusMessage = response.statusText || ''; for (const [key, value] of response.headers) { res.setHeader(key, value); } if (response.body) { const reader = response.body.getReader(); while (true) { const { done, value } = await reader.read(); if (done) break; res.write(value); } } res.end(); }