serverless #atproto jetstream to webhook connector, powered by cloudflare durable objects
1import { DurableObject } from "cloudflare:workers";
2import type { JetstreamEvent, StoredStats, QueueMessage } from "./types";
3
4/**
5 * Welcome to Cloudflare Workers! This is your first Durable Objects application.
6 *
7 * - Run `npm run dev` in your terminal to start a development server
8 * - Open a browser tab at http://localhost:8787/ to see your Durable Object in action
9 * - Run `npm run deploy` to publish your application
10 *
11 * Bind resources to your worker in `wrangler.jsonc`. After adding bindings, a type definition for the
12 * `Env` object can be regenerated with `npm run cf-typegen`.
13 *
14 * Learn more at https://developers.cloudflare.com/durable-objects
15 */
16
17/** Durable Object for managing Jetstream connection and event processing */
18export class JetstreamProcessor extends DurableObject<Env> {
19 private websocket: WebSocket | null = null;
20 private reconnectTimeout: any = null;
21 private stats: StoredStats = {
22 cursor: 0,
23 eventCounts: {},
24 totalEvents: 0,
25 totalReceived: 0,
26 lastEventTime: new Date().toISOString()
27 };
28
29 /**
30 * The constructor is invoked once upon creation of the Durable Object, i.e. the first call to
31 * `DurableObjectStub::get` for a given identifier (no-op constructors can be omitted)
32 *
33 * @param ctx - The interface for interacting with Durable Object state
34 * @param env - The interface to reference bindings declared in wrangler.jsonc
35 */
36 constructor(ctx: DurableObjectState, env: Env) {
37 super(ctx, env);
38
39 // Validate required environment variables
40 if (!env.WEBHOOK_URL) {
41 throw new Error("WEBHOOK_URL environment variable is required");
42 }
43
44 if (!env.JETSTREAM_COLLECTIONS) {
45 throw new Error("JETSTREAM_COLLECTIONS environment variable is required");
46 }
47
48 this.initializeProcessor();
49 }
50
51 private async initializeProcessor() {
52 // Load existing stats from storage
53 const storedStats = await this.ctx.storage.get<StoredStats>("stats");
54 if (storedStats) {
55 this.stats = storedStats;
56 }
57
58 // Start the Jetstream connection
59 this.connectToJetstream();
60 }
61
62 private async connectToJetstream() {
63 try {
64 // Parse collections from environment variable (comma-separated)
65 const collectionsStr = this.env.JETSTREAM_COLLECTIONS;
66 const collections = collectionsStr.split(",").map(c => c.trim()).filter(c => c.length > 0);
67
68 const url = new URL("wss://jetstream1.us-west.bsky.network/subscribe");
69
70 // Add collections to the query
71 collections.forEach(collection => {
72 url.searchParams.append("wantedCollections", collection);
73 });
74
75 // Add cursor if we have one (reconnection scenario)
76 if (this.stats.cursor > 0) {
77 // Subtract 5 seconds as buffer to ensure gapless playback
78 const cursorWithBuffer = this.stats.cursor - (5 * 1000 * 1000);
79 url.searchParams.set("cursor", cursorWithBuffer.toString());
80 }
81
82 console.log("JETSTREAM_COLLECTIONS", this.env.JETSTREAM_COLLECTIONS);
83 console.log(`Connecting to Jetstream: ${url.toString()}`);
84 console.log(`Watching collections: ${collections.join(", ")}`);
85
86 this.websocket = new WebSocket(url.toString());
87
88 this.websocket.addEventListener("open", () => {
89 console.log("Jetstream WebSocket connected");
90 // Clear any existing reconnect timeout
91 if (this.reconnectTimeout) {
92 clearTimeout(this.reconnectTimeout);
93 this.reconnectTimeout = null;
94 }
95 });
96
97 this.websocket.addEventListener("message", async (event) => {
98 try {
99 // WebSocket message data can be string or ArrayBuffer, we expect JSON string
100 const data = typeof event.data === "string" ? event.data : new TextDecoder().decode(event.data);
101 const jetstreamEvent: JetstreamEvent = JSON.parse(data);
102 await this.processEvent(jetstreamEvent);
103 } catch (error) {
104 console.error("Error processing Jetstream event:", error);
105 }
106 });
107
108 this.websocket.addEventListener("close", (event) => {
109 console.log(`Jetstream WebSocket closed: ${event.code} ${event.reason}`);
110 this.websocket = null;
111 this.scheduleReconnect();
112 });
113
114 this.websocket.addEventListener("error", (event) => {
115 console.error("Jetstream WebSocket error:", event);
116 this.websocket = null;
117 this.scheduleReconnect();
118 });
119
120 } catch (error) {
121 console.error("Error connecting to Jetstream:", error);
122 this.scheduleReconnect();
123 }
124 }
125
126 private scheduleReconnect() {
127 if (this.reconnectTimeout) return;
128
129 // Exponential backoff with jitter, starting at 1 second, max 30 seconds
130 const baseDelay = 1000;
131 const maxDelay = 30000;
132 const delay = Math.min(baseDelay * Math.pow(2, Math.random()), maxDelay);
133
134 console.log(`Scheduling Jetstream reconnect in ${delay}ms`);
135 this.reconnectTimeout = setTimeout(() => {
136 this.reconnectTimeout = null;
137 this.connectToJetstream();
138 }, delay);
139 }
140
141 private async processEvent(event: JetstreamEvent) {
142 // Always update cursor and received count for all events
143 this.stats.cursor = event.time_us;
144 this.stats.totalReceived++;
145
146 // Skip identity and account events - only process commits
147 if (event.kind !== "commit") {
148 return;
149 }
150
151 // Update stats for commit events only
152 this.stats.totalEvents++;
153 this.stats.lastEventTime = new Date().toISOString();
154
155 // Track collection-specific stats for commits only
156 if (event.commit?.collection) {
157 const collection = event.commit.collection;
158 this.stats.eventCounts[collection] = (this.stats.eventCounts[collection] || 0) + 1;
159 console.log(`Processing ${event.commit.operation} event for collection: ${collection}`);
160 }
161
162 // Send to Cloudflare Queue instead of webhook
163 try {
164 const queueMessage: QueueMessage = {
165 event: event,
166 queuedAt: new Date().toISOString(),
167 retryCount: 0
168 };
169
170 await this.env.JETSTREAM_QUEUE.send(queueMessage);
171
172 console.log(`Event queued successfully: ${event.time_us}`);
173 } catch (error) {
174 console.error("Error sending to queue:", error);
175 // Note: Queue failures are more serious than webhook failures
176 // You might want to implement additional error handling here
177 }
178
179 // Persist stats every 100 events to avoid too frequent writes
180 if (this.stats.totalEvents % 100 === 0) {
181 await this.ctx.storage.put("stats", this.stats);
182 }
183 }
184
185 /**
186 * Get current processing statistics
187 */
188 async getStats(): Promise<StoredStats> {
189 // Ensure we have the latest stats
190 await this.ctx.storage.put("stats", this.stats);
191 return this.stats;
192 }
193
194 /**
195 * Reset statistics (useful for testing)
196 */
197 async resetStats(): Promise<void> {
198 this.stats = {
199 cursor: 0,
200 eventCounts: {},
201 totalEvents: 0,
202 totalReceived: 0,
203 lastEventTime: new Date().toISOString()
204 };
205 await this.ctx.storage.put("stats", this.stats);
206 }
207
208 /**
209 * Get connection status
210 */
211 getConnectionStatus(): { connected: boolean; readyState?: number } {
212 return {
213 connected: this.websocket?.readyState === WebSocket.OPEN,
214 readyState: this.websocket?.readyState
215 };
216 }
217
218 /**
219 * Force reconnection (useful for debugging)
220 */
221 async forceReconnect(): Promise<void> {
222 if (this.websocket) {
223 this.websocket.close();
224 }
225 if (this.reconnectTimeout) {
226 clearTimeout(this.reconnectTimeout);
227 this.reconnectTimeout = null;
228 }
229 this.connectToJetstream();
230 }
231}
232
233export default {
234 /**
235 * This is the standard fetch handler for a Cloudflare Worker
236 *
237 * @param request - The request submitted to the Worker from the client
238 * @param env - The interface to reference bindings declared in wrangler.jsonc
239 * @param ctx - The execution context of the Worker
240 * @returns The response to be sent back to the client
241 */
242 async fetch(request, env, ctx): Promise<Response> {
243 const url = new URL(request.url);
244
245 // Create a single instance of the Jetstream processor
246 const id: DurableObjectId = env.JETSTREAM_PROCESSOR.idFromName("main");
247 const stub = env.JETSTREAM_PROCESSOR.get(id);
248
249 // Handle different routes
250 if (url.pathname === "/stats") {
251 const stats = await stub.getStats();
252 return new Response(JSON.stringify(stats, null, 2), {
253 headers: { "Content-Type": "application/json" }
254 });
255 }
256
257 if (url.pathname === "/stats/html") {
258 const stats = await stub.getStats();
259 const html = `
260<!DOCTYPE html>
261<html>
262<head>
263 <title>Jetstream Statistics</title>
264 <meta charset="utf-8">
265 <meta name="viewport" content="width=device-width, initial-scale=1">
266 <style>
267 body {
268 font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
269 max-width: 800px;
270 margin: 2rem auto;
271 padding: 0 1rem;
272 background: #f5f5f5;
273 }
274 .container {
275 background: white;
276 border-radius: 8px;
277 padding: 2rem;
278 box-shadow: 0 2px 4px rgba(0,0,0,0.1);
279 }
280 h1 { color: #333; margin-bottom: 2rem; }
281 .stat-grid { display: grid; grid-template-columns: repeat(auto-fit, minmax(200px, 1fr)); gap: 1rem; margin-bottom: 2rem; }
282 .stat-card { background: #f8f9fa; padding: 1rem; border-radius: 6px; border-left: 4px solid #007bff; }
283 .stat-value { font-size: 1.5rem; font-weight: bold; color: #007bff; }
284 .stat-label { color: #666; font-size: 0.9rem; }
285 .collections { margin-top: 2rem; }
286 .collection-item { padding: 0.5rem; margin: 0.25rem 0; background: #e9ecef; border-radius: 4px; display: flex; justify-content: space-between; }
287 .refresh-btn {
288 background: #007bff;
289 color: white;
290 border: none;
291 padding: 0.5rem 1rem;
292 border-radius: 4px;
293 cursor: pointer;
294 margin-bottom: 1rem;
295 }
296 .refresh-btn:hover { background: #0056b3; }
297 </style>
298 <script>
299 function refreshStats() {
300 window.location.reload();
301 }
302 setInterval(refreshStats, 30000); // Auto-refresh every 30 seconds
303 </script>
304</head>
305<body>
306 <div class="container">
307 <h1>🚀 Jetstream Event Processor</h1>
308 <button class="refresh-btn" onclick="refreshStats()">Refresh Stats</button>
309
310 <div class="stat-grid">
311 <div class="stat-card">
312 <div class="stat-value">${stats.totalEvents.toLocaleString()}</div>
313 <div class="stat-label">Commit Events Processed</div>
314 </div>
315 <div class="stat-card">
316 <div class="stat-value">${stats.totalReceived.toLocaleString()}</div>
317 <div class="stat-label">Total Events Received</div>
318 </div>
319 <div class="stat-card">
320 <div class="stat-value">${stats.totalReceived > 0 ? ((stats.totalEvents / stats.totalReceived) * 100).toFixed(1) + '%' : '0%'}</div>
321 <div class="stat-label">Processing Efficiency</div>
322 </div>
323 <div class="stat-card">
324 <div class="stat-value">${Object.keys(stats.eventCounts).length}</div>
325 <div class="stat-label">Unique Collections</div>
326 </div>
327 <div class="stat-card">
328 <div class="stat-value">${stats.cursor > 0 ? new Date(stats.cursor / 1000).toLocaleString() : 'N/A'}</div>
329 <div class="stat-label">Last Event Time</div>
330 </div>
331 <div class="stat-card">
332 <div class="stat-value">${new Date(stats.lastEventTime).toLocaleString()}</div>
333 <div class="stat-label">Last Processed</div>
334 </div>
335 </div>
336
337 <div class="collections">
338 <h3>Events by Collection</h3>
339 ${Object.entries(stats.eventCounts)
340 .sort(([,a], [,b]) => (b as number) - (a as number))
341 .map(([collection, count]) => `
342 <div class="collection-item">
343 <span>${collection}</span>
344 <span><strong>${(count as number).toLocaleString()}</strong></span>
345 </div>
346 `).join('')}
347 </div>
348 </div>
349</body>
350</html>`;
351 return new Response(html, {
352 headers: { "Content-Type": "text/html" }
353 });
354 }
355
356 if (url.pathname === "/status") {
357 const status = await stub.getConnectionStatus();
358 return new Response(JSON.stringify(status, null, 2), {
359 headers: { "Content-Type": "application/json" }
360 });
361 }
362
363 if (url.pathname === "/reset" && request.method === "POST") {
364 await stub.resetStats();
365 return new Response(JSON.stringify({ message: "Stats reset successfully" }), {
366 headers: { "Content-Type": "application/json" }
367 });
368 }
369
370 if (url.pathname === "/reconnect" && request.method === "POST") {
371 await stub.forceReconnect();
372 return new Response(JSON.stringify({ message: "Reconnection initiated" }), {
373 headers: { "Content-Type": "application/json" }
374 });
375 }
376
377 if (url.pathname === "/health") {
378 return new Response(JSON.stringify({
379 status: "healthy",
380 worker: "jetstream-unified",
381 timestamp: new Date().toISOString()
382 }), {
383 headers: { "Content-Type": "application/json" }
384 });
385 }
386
387 // Default route - show basic info
388 return new Response(JSON.stringify({
389 message: "Jetstream Event Processor (Unified)",
390 endpoints: {
391 "/stats": "Get processing statistics (JSON)",
392 "/stats/html": "Get processing statistics (HTML dashboard)",
393 "/status": "Get WebSocket connection status",
394 "/health": "Health check endpoint",
395 "POST /reset": "Reset statistics",
396 "POST /reconnect": "Force WebSocket reconnection"
397 }
398 }, null, 2), {
399 headers: { "Content-Type": "application/json" }
400 });
401 },
402
403 // Scheduled event handler - keeps Durable Object alive
404 async scheduled(controller: ScheduledController, env: Env, ctx: ExecutionContext): Promise<void> {
405 console.log("Scheduled keepalive triggered");
406
407 // Get the Durable Object instance to keep it alive
408 const id: DurableObjectId = env.JETSTREAM_PROCESSOR.idFromName("main");
409 const stub = env.JETSTREAM_PROCESSOR.get(id);
410
411 // Check connection status to ensure it's healthy
412 const status = await stub.getConnectionStatus();
413 console.log("Keepalive check - WebSocket connected:", status.connected);
414
415 // If not connected, force a reconnection attempt
416 if (!status.connected) {
417 console.log("WebSocket disconnected, forcing reconnection");
418 await stub.forceReconnect();
419 }
420 },
421
422 // Queue consumer handler - processes events from the queue
423 async queue(batch: MessageBatch, env: Env, ctx: ExecutionContext): Promise<void> {
424 console.log(`Processing batch of ${batch.messages.length} messages`);
425
426 // Process messages in batch for efficiency
427 const webhookPromises = batch.messages.map(async (message) => {
428 try {
429 // Cast the unknown message body to our QueueMessage type
430 const queueMessage = message.body as QueueMessage;
431 await sendToWebhook(queueMessage.event, env);
432
433 // Acknowledge successful processing
434 message.ack();
435
436 console.log(`Successfully processed event ${queueMessage.event.time_us} for collection: ${queueMessage.event.commit?.collection || 'non-commit'}`);
437 } catch (error) {
438 console.error(`Failed to process queue message:`, error);
439
440 // Let the message retry (don't ack)
441 // Cloudflare Queues will automatically retry based on configuration
442 message.retry();
443 }
444 });
445
446 // Wait for all webhook calls to complete
447 await Promise.allSettled(webhookPromises);
448 }
449} satisfies ExportedHandler<Env>;
450
451async function sendToWebhook(event: JetstreamEvent, env: Env): Promise<void> {
452 const webhookUrl = env.WEBHOOK_URL;
453 const bearerToken = env.WEBHOOK_BEARER_TOKEN;
454
455 const headers: Record<string, string> = {
456 "Content-Type": "application/json",
457 "User-Agent": "Jetstream-Unified/1.0"
458 };
459
460 // Add Authorization header if bearer token is available
461 if (bearerToken) {
462 headers["Authorization"] = `Bearer ${bearerToken}`;
463 }
464
465 const response = await fetch(webhookUrl, {
466 method: "POST",
467 headers,
468 body: JSON.stringify(event),
469 });
470
471 if (!response.ok) {
472 // This will cause the message to retry
473 throw new Error(`Webhook request failed: ${response.status} ${response.statusText}`);
474 }
475}