A minimal AT Protocol Personal Data Server written in JavaScript.
atproto
pds
1// @pds/node - Node.js HTTP server adapter
2
3import { createServer as createHttpServer } from 'node:http';
4import { createFsBlobs } from '@pds/blobs-fs';
5import { PersonalDataServer } from '@pds/core';
6import { LexiconResolver } from '@pds/lexicon-resolver';
7import { createActorStorage, createSharedStorage } from '@pds/storage-sqlite';
8import { WebSocketServer } from 'ws';
9
10/**
11 * @typedef {Object} PdsServer
12 * @property {PersonalDataServer} pds - The PDS instance
13 * @property {import('node:http').Server} server - HTTP server
14 * @property {Object} db - SQLite database instance
15 * @property {() => Promise<import('node:http').Server>} listen - Start server
16 * @property {() => Promise<void>} close - Close server
17 */
18
19/**
20 * Create WebSocket port for Node.js
21 * @param {WeakMap<Request, import('ws').WebSocket>} upgradeMap
22 * @param {import('ws').WebSocketServer} wss - WebSocket server for broadcast
23 * @returns {import('@pds/core/ports').WebSocketPort}
24 */
25function createWebSocket(upgradeMap, wss) {
26 return {
27 isUpgrade(request) {
28 return upgradeMap.has(request);
29 },
30
31 upgrade(request, onConnect) {
32 const ws = upgradeMap.get(request);
33 if (!ws) {
34 return new Response('WebSocket upgrade failed', { status: 500 });
35 }
36
37 onConnect({
38 send(data) {
39 if (ws.readyState === ws.OPEN) {
40 ws.send(data);
41 }
42 },
43 close() {
44 ws.close();
45 },
46 });
47
48 // Return 200 - actual upgrade handled by server's 'upgrade' event
49 // Node.js Response doesn't support status 101
50 return new Response(null, {
51 status: 200,
52 headers: { 'X-WebSocket-Upgraded': 'true' },
53 });
54 },
55
56 broadcast(data) {
57 // Send to all connected WebSocket clients
58 for (const client of wss.clients) {
59 if (client.readyState === client.OPEN) {
60 try {
61 client.send(data);
62 } catch {
63 // Client may have disconnected
64 }
65 }
66 }
67 },
68 };
69}
70
71/**
72 * Create Node.js PDS server
73 * @param {Object} options
74 * @param {string} options.dbPath - Path to SQLite database
75 * @param {string} [options.blobsDir] - Directory for blob storage (used if blobs not provided)
76 * @param {import('@pds/core/ports').BlobPort} [options.blobs] - Custom blob storage adapter
77 * @param {string} options.jwtSecret - JWT signing secret
78 * @param {number} [options.port=3000] - Server port
79 * @param {string} [options.hostname] - PDS hostname
80 * @param {string} [options.appviewUrl] - AppView URL for proxying
81 * @param {string} [options.appviewDid] - AppView DID for service auth
82 * @param {string} [options.relayUrl] - Relay URL for firehose notifications (e.g., http://localhost:2470)
83 * @param {string} [options.password] - Password for createSession
84 * @param {import('@pds/core/ports').LexiconResolverPort} [options.lexiconResolver] - Lexicon resolver for record validation
85 * @returns {Promise<PdsServer>}
86 */
87export async function createServer({
88 dbPath,
89 blobsDir,
90 blobs: blobsArg,
91 jwtSecret,
92 port = 3000,
93 hostname,
94 appviewUrl,
95 appviewDid,
96 relayUrl,
97 password,
98 lexiconResolver,
99}) {
100 // Dynamic import for better-sqlite3 (optional peer dependency)
101 const Database = (await import('better-sqlite3')).default;
102
103 const db = new Database(dbPath);
104
105 // Create actor storage (per-DID data)
106 const actorStorage = createActorStorage(db);
107
108 // Create shared storage (global data) - can use same db for single-tenant
109 const sharedStorage = createSharedStorage(db);
110
111 if (!blobsArg && !blobsDir) {
112 throw new Error('Either blobs or blobsDir must be provided');
113 }
114 const blobs = blobsArg ?? createFsBlobs(/** @type {string} */ (blobsDir));
115
116 // WebSocket infrastructure
117 const upgradeMap = new WeakMap();
118 const wss = new WebSocketServer({ noServer: true });
119 const webSocket = createWebSocket(upgradeMap, wss);
120
121 // Create default lexicon resolver if not provided
122 const resolver = lexiconResolver ?? new LexiconResolver();
123
124 // Create PDS with both storages
125 const pds = new PersonalDataServer({
126 actorStorage,
127 sharedStorage,
128 blobs,
129 webSocket,
130 jwtSecret,
131 hostname,
132 appviewUrl,
133 appviewDid,
134 relayUrl,
135 password,
136 lexiconResolver: resolver,
137 });
138
139 const server = createHttpServer(async (req, res) => {
140 try {
141 const request = nodeToWebRequest(req);
142 const response = await pds.fetch(request);
143
144 // Skip normal response for WebSocket upgrades
145 if (
146 response.headers.get('X-WebSocket-Upgraded') &&
147 upgradeMap.has(request)
148 ) {
149 return; // WebSocket handled by upgrade event
150 }
151
152 await webToNodeResponse(response, res);
153 } catch (error) {
154 console.error('Server error:', error);
155 res.statusCode = 500;
156 res.setHeader('Content-Type', 'application/json');
157 res.end(
158 JSON.stringify({
159 error: 'InternalServerError',
160 message: error.message,
161 }),
162 );
163 }
164 });
165
166 // Handle WebSocket upgrades
167 server.on('upgrade', (req, socket, head) => {
168 if (req.url?.startsWith('/xrpc/com.atproto.sync.subscribeRepos')) {
169 wss.handleUpgrade(req, socket, head, async (ws) => {
170 const request = nodeToWebRequest(req);
171 upgradeMap.set(request, ws);
172 try {
173 await pds.fetch(request);
174 } finally {
175 upgradeMap.delete(request);
176 }
177 });
178 } else {
179 socket.destroy();
180 }
181 });
182
183 // Start cleanup interval (every hour)
184 const cleanupInterval = setInterval(
185 async () => {
186 await pds.runBlobCleanup();
187 await sharedStorage.cleanupExpiredDpopJtis();
188 },
189 60 * 60 * 1000,
190 );
191
192 return {
193 pds,
194 server,
195 db,
196
197 /**
198 * Start listening on configured port
199 * @returns {Promise<import('node:http').Server>}
200 */
201 listen() {
202 return new Promise((resolve) => {
203 server.listen(port, () => {
204 console.log(`PDS listening on http://localhost:${port}`);
205 resolve(server);
206 });
207 });
208 },
209
210 /**
211 * Close server and cleanup
212 * @returns {Promise<void>}
213 */
214 close() {
215 clearInterval(cleanupInterval);
216 // Close all WebSocket connections first
217 wss.clients.forEach((client) => {
218 client.terminate();
219 });
220 wss.close();
221 db.close();
222 return new Promise((resolve, reject) => {
223 server.close((err) => {
224 if (err) reject(err);
225 else resolve();
226 });
227 });
228 },
229 };
230}
231
232/**
233 * Convert Node.js IncomingMessage to Web Request
234 * @param {import('node:http').IncomingMessage} req
235 * @returns {Request}
236 */
237function nodeToWebRequest(req) {
238 // Check for encrypted socket (HTTPS)
239 const socket = /** @type {import('node:tls').TLSSocket} */ (req.socket);
240 const protocol = socket.encrypted ? 'https' : 'http';
241 const host = req.headers.host || 'localhost';
242 const url = new URL(req.url || '/', `${protocol}://${host}`);
243
244 const headers = new Headers();
245 for (const [key, value] of Object.entries(req.headers)) {
246 if (value) {
247 headers.set(key, Array.isArray(value) ? value.join(', ') : value);
248 }
249 }
250
251 const hasBody = req.method !== 'GET' && req.method !== 'HEAD';
252
253 return new Request(
254 url.toString(),
255 /** @type {RequestInit} */ ({
256 method: req.method,
257 headers,
258 body: hasBody ? req : null,
259 duplex: hasBody ? 'half' : undefined,
260 }),
261 );
262}
263
264/**
265 * Write Web Response to Node.js ServerResponse
266 * @param {Response} response
267 * @param {import('node:http').ServerResponse} res
268 */
269async function webToNodeResponse(response, res) {
270 res.statusCode = response.status;
271 res.statusMessage = response.statusText || '';
272
273 for (const [key, value] of response.headers) {
274 res.setHeader(key, value);
275 }
276
277 if (response.body) {
278 const reader = response.body.getReader();
279 while (true) {
280 const { done, value } = await reader.read();
281 if (done) break;
282 res.write(value);
283 }
284 }
285
286 res.end();
287}