serverless #atproto jetstream to webhook connector, powered by cloudflare durable objects
at main 475 lines 16 kB view raw
1import { DurableObject } from "cloudflare:workers"; 2import type { JetstreamEvent, StoredStats, QueueMessage } from "./types"; 3 4/** 5 * Welcome to Cloudflare Workers! This is your first Durable Objects application. 6 * 7 * - Run `npm run dev` in your terminal to start a development server 8 * - Open a browser tab at http://localhost:8787/ to see your Durable Object in action 9 * - Run `npm run deploy` to publish your application 10 * 11 * Bind resources to your worker in `wrangler.jsonc`. After adding bindings, a type definition for the 12 * `Env` object can be regenerated with `npm run cf-typegen`. 13 * 14 * Learn more at https://developers.cloudflare.com/durable-objects 15 */ 16 17/** Durable Object for managing Jetstream connection and event processing */ 18export class JetstreamProcessor extends DurableObject<Env> { 19 private websocket: WebSocket | null = null; 20 private reconnectTimeout: any = null; 21 private stats: StoredStats = { 22 cursor: 0, 23 eventCounts: {}, 24 totalEvents: 0, 25 totalReceived: 0, 26 lastEventTime: new Date().toISOString() 27 }; 28 29 /** 30 * The constructor is invoked once upon creation of the Durable Object, i.e. the first call to 31 * `DurableObjectStub::get` for a given identifier (no-op constructors can be omitted) 32 * 33 * @param ctx - The interface for interacting with Durable Object state 34 * @param env - The interface to reference bindings declared in wrangler.jsonc 35 */ 36 constructor(ctx: DurableObjectState, env: Env) { 37 super(ctx, env); 38 39 // Validate required environment variables 40 if (!env.WEBHOOK_URL) { 41 throw new Error("WEBHOOK_URL environment variable is required"); 42 } 43 44 if (!env.JETSTREAM_COLLECTIONS) { 45 throw new Error("JETSTREAM_COLLECTIONS environment variable is required"); 46 } 47 48 this.initializeProcessor(); 49 } 50 51 private async initializeProcessor() { 52 // Load existing stats from storage 53 const storedStats = await this.ctx.storage.get<StoredStats>("stats"); 54 if (storedStats) { 55 this.stats = storedStats; 56 } 57 58 // Start the Jetstream connection 59 this.connectToJetstream(); 60 } 61 62 private async connectToJetstream() { 63 try { 64 // Parse collections from environment variable (comma-separated) 65 const collectionsStr = this.env.JETSTREAM_COLLECTIONS; 66 const collections = collectionsStr.split(",").map(c => c.trim()).filter(c => c.length > 0); 67 68 const url = new URL("wss://jetstream1.us-west.bsky.network/subscribe"); 69 70 // Add collections to the query 71 collections.forEach(collection => { 72 url.searchParams.append("wantedCollections", collection); 73 }); 74 75 // Add cursor if we have one (reconnection scenario) 76 if (this.stats.cursor > 0) { 77 // Subtract 5 seconds as buffer to ensure gapless playback 78 const cursorWithBuffer = this.stats.cursor - (5 * 1000 * 1000); 79 url.searchParams.set("cursor", cursorWithBuffer.toString()); 80 } 81 82 console.log("JETSTREAM_COLLECTIONS", this.env.JETSTREAM_COLLECTIONS); 83 console.log(`Connecting to Jetstream: ${url.toString()}`); 84 console.log(`Watching collections: ${collections.join(", ")}`); 85 86 this.websocket = new WebSocket(url.toString()); 87 88 this.websocket.addEventListener("open", () => { 89 console.log("Jetstream WebSocket connected"); 90 // Clear any existing reconnect timeout 91 if (this.reconnectTimeout) { 92 clearTimeout(this.reconnectTimeout); 93 this.reconnectTimeout = null; 94 } 95 }); 96 97 this.websocket.addEventListener("message", async (event) => { 98 try { 99 // WebSocket message data can be string or ArrayBuffer, we expect JSON string 100 const data = typeof event.data === "string" ? event.data : new TextDecoder().decode(event.data); 101 const jetstreamEvent: JetstreamEvent = JSON.parse(data); 102 await this.processEvent(jetstreamEvent); 103 } catch (error) { 104 console.error("Error processing Jetstream event:", error); 105 } 106 }); 107 108 this.websocket.addEventListener("close", (event) => { 109 console.log(`Jetstream WebSocket closed: ${event.code} ${event.reason}`); 110 this.websocket = null; 111 this.scheduleReconnect(); 112 }); 113 114 this.websocket.addEventListener("error", (event) => { 115 console.error("Jetstream WebSocket error:", event); 116 this.websocket = null; 117 this.scheduleReconnect(); 118 }); 119 120 } catch (error) { 121 console.error("Error connecting to Jetstream:", error); 122 this.scheduleReconnect(); 123 } 124 } 125 126 private scheduleReconnect() { 127 if (this.reconnectTimeout) return; 128 129 // Exponential backoff with jitter, starting at 1 second, max 30 seconds 130 const baseDelay = 1000; 131 const maxDelay = 30000; 132 const delay = Math.min(baseDelay * Math.pow(2, Math.random()), maxDelay); 133 134 console.log(`Scheduling Jetstream reconnect in ${delay}ms`); 135 this.reconnectTimeout = setTimeout(() => { 136 this.reconnectTimeout = null; 137 this.connectToJetstream(); 138 }, delay); 139 } 140 141 private async processEvent(event: JetstreamEvent) { 142 // Always update cursor and received count for all events 143 this.stats.cursor = event.time_us; 144 this.stats.totalReceived++; 145 146 // Skip identity and account events - only process commits 147 if (event.kind !== "commit") { 148 return; 149 } 150 151 // Update stats for commit events only 152 this.stats.totalEvents++; 153 this.stats.lastEventTime = new Date().toISOString(); 154 155 // Track collection-specific stats for commits only 156 if (event.commit?.collection) { 157 const collection = event.commit.collection; 158 this.stats.eventCounts[collection] = (this.stats.eventCounts[collection] || 0) + 1; 159 console.log(`Processing ${event.commit.operation} event for collection: ${collection}`); 160 } 161 162 // Send to Cloudflare Queue instead of webhook 163 try { 164 const queueMessage: QueueMessage = { 165 event: event, 166 queuedAt: new Date().toISOString(), 167 retryCount: 0 168 }; 169 170 await this.env.JETSTREAM_QUEUE.send(queueMessage); 171 172 console.log(`Event queued successfully: ${event.time_us}`); 173 } catch (error) { 174 console.error("Error sending to queue:", error); 175 // Note: Queue failures are more serious than webhook failures 176 // You might want to implement additional error handling here 177 } 178 179 // Persist stats every 100 events to avoid too frequent writes 180 if (this.stats.totalEvents % 100 === 0) { 181 await this.ctx.storage.put("stats", this.stats); 182 } 183 } 184 185 /** 186 * Get current processing statistics 187 */ 188 async getStats(): Promise<StoredStats> { 189 // Ensure we have the latest stats 190 await this.ctx.storage.put("stats", this.stats); 191 return this.stats; 192 } 193 194 /** 195 * Reset statistics (useful for testing) 196 */ 197 async resetStats(): Promise<void> { 198 this.stats = { 199 cursor: 0, 200 eventCounts: {}, 201 totalEvents: 0, 202 totalReceived: 0, 203 lastEventTime: new Date().toISOString() 204 }; 205 await this.ctx.storage.put("stats", this.stats); 206 } 207 208 /** 209 * Get connection status 210 */ 211 getConnectionStatus(): { connected: boolean; readyState?: number } { 212 return { 213 connected: this.websocket?.readyState === WebSocket.OPEN, 214 readyState: this.websocket?.readyState 215 }; 216 } 217 218 /** 219 * Force reconnection (useful for debugging) 220 */ 221 async forceReconnect(): Promise<void> { 222 if (this.websocket) { 223 this.websocket.close(); 224 } 225 if (this.reconnectTimeout) { 226 clearTimeout(this.reconnectTimeout); 227 this.reconnectTimeout = null; 228 } 229 this.connectToJetstream(); 230 } 231} 232 233export default { 234 /** 235 * This is the standard fetch handler for a Cloudflare Worker 236 * 237 * @param request - The request submitted to the Worker from the client 238 * @param env - The interface to reference bindings declared in wrangler.jsonc 239 * @param ctx - The execution context of the Worker 240 * @returns The response to be sent back to the client 241 */ 242 async fetch(request, env, ctx): Promise<Response> { 243 const url = new URL(request.url); 244 245 // Create a single instance of the Jetstream processor 246 const id: DurableObjectId = env.JETSTREAM_PROCESSOR.idFromName("main"); 247 const stub = env.JETSTREAM_PROCESSOR.get(id); 248 249 // Handle different routes 250 if (url.pathname === "/stats") { 251 const stats = await stub.getStats(); 252 return new Response(JSON.stringify(stats, null, 2), { 253 headers: { "Content-Type": "application/json" } 254 }); 255 } 256 257 if (url.pathname === "/stats/html") { 258 const stats = await stub.getStats(); 259 const html = ` 260<!DOCTYPE html> 261<html> 262<head> 263 <title>Jetstream Statistics</title> 264 <meta charset="utf-8"> 265 <meta name="viewport" content="width=device-width, initial-scale=1"> 266 <style> 267 body { 268 font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif; 269 max-width: 800px; 270 margin: 2rem auto; 271 padding: 0 1rem; 272 background: #f5f5f5; 273 } 274 .container { 275 background: white; 276 border-radius: 8px; 277 padding: 2rem; 278 box-shadow: 0 2px 4px rgba(0,0,0,0.1); 279 } 280 h1 { color: #333; margin-bottom: 2rem; } 281 .stat-grid { display: grid; grid-template-columns: repeat(auto-fit, minmax(200px, 1fr)); gap: 1rem; margin-bottom: 2rem; } 282 .stat-card { background: #f8f9fa; padding: 1rem; border-radius: 6px; border-left: 4px solid #007bff; } 283 .stat-value { font-size: 1.5rem; font-weight: bold; color: #007bff; } 284 .stat-label { color: #666; font-size: 0.9rem; } 285 .collections { margin-top: 2rem; } 286 .collection-item { padding: 0.5rem; margin: 0.25rem 0; background: #e9ecef; border-radius: 4px; display: flex; justify-content: space-between; } 287 .refresh-btn { 288 background: #007bff; 289 color: white; 290 border: none; 291 padding: 0.5rem 1rem; 292 border-radius: 4px; 293 cursor: pointer; 294 margin-bottom: 1rem; 295 } 296 .refresh-btn:hover { background: #0056b3; } 297 </style> 298 <script> 299 function refreshStats() { 300 window.location.reload(); 301 } 302 setInterval(refreshStats, 30000); // Auto-refresh every 30 seconds 303 </script> 304</head> 305<body> 306 <div class="container"> 307 <h1>🚀 Jetstream Event Processor</h1> 308 <button class="refresh-btn" onclick="refreshStats()">Refresh Stats</button> 309 310 <div class="stat-grid"> 311 <div class="stat-card"> 312 <div class="stat-value">${stats.totalEvents.toLocaleString()}</div> 313 <div class="stat-label">Commit Events Processed</div> 314 </div> 315 <div class="stat-card"> 316 <div class="stat-value">${stats.totalReceived.toLocaleString()}</div> 317 <div class="stat-label">Total Events Received</div> 318 </div> 319 <div class="stat-card"> 320 <div class="stat-value">${stats.totalReceived > 0 ? ((stats.totalEvents / stats.totalReceived) * 100).toFixed(1) + '%' : '0%'}</div> 321 <div class="stat-label">Processing Efficiency</div> 322 </div> 323 <div class="stat-card"> 324 <div class="stat-value">${Object.keys(stats.eventCounts).length}</div> 325 <div class="stat-label">Unique Collections</div> 326 </div> 327 <div class="stat-card"> 328 <div class="stat-value">${stats.cursor > 0 ? new Date(stats.cursor / 1000).toLocaleString() : 'N/A'}</div> 329 <div class="stat-label">Last Event Time</div> 330 </div> 331 <div class="stat-card"> 332 <div class="stat-value">${new Date(stats.lastEventTime).toLocaleString()}</div> 333 <div class="stat-label">Last Processed</div> 334 </div> 335 </div> 336 337 <div class="collections"> 338 <h3>Events by Collection</h3> 339 ${Object.entries(stats.eventCounts) 340 .sort(([,a], [,b]) => (b as number) - (a as number)) 341 .map(([collection, count]) => ` 342 <div class="collection-item"> 343 <span>${collection}</span> 344 <span><strong>${(count as number).toLocaleString()}</strong></span> 345 </div> 346 `).join('')} 347 </div> 348 </div> 349</body> 350</html>`; 351 return new Response(html, { 352 headers: { "Content-Type": "text/html" } 353 }); 354 } 355 356 if (url.pathname === "/status") { 357 const status = await stub.getConnectionStatus(); 358 return new Response(JSON.stringify(status, null, 2), { 359 headers: { "Content-Type": "application/json" } 360 }); 361 } 362 363 if (url.pathname === "/reset" && request.method === "POST") { 364 await stub.resetStats(); 365 return new Response(JSON.stringify({ message: "Stats reset successfully" }), { 366 headers: { "Content-Type": "application/json" } 367 }); 368 } 369 370 if (url.pathname === "/reconnect" && request.method === "POST") { 371 await stub.forceReconnect(); 372 return new Response(JSON.stringify({ message: "Reconnection initiated" }), { 373 headers: { "Content-Type": "application/json" } 374 }); 375 } 376 377 if (url.pathname === "/health") { 378 return new Response(JSON.stringify({ 379 status: "healthy", 380 worker: "jetstream-unified", 381 timestamp: new Date().toISOString() 382 }), { 383 headers: { "Content-Type": "application/json" } 384 }); 385 } 386 387 // Default route - show basic info 388 return new Response(JSON.stringify({ 389 message: "Jetstream Event Processor (Unified)", 390 endpoints: { 391 "/stats": "Get processing statistics (JSON)", 392 "/stats/html": "Get processing statistics (HTML dashboard)", 393 "/status": "Get WebSocket connection status", 394 "/health": "Health check endpoint", 395 "POST /reset": "Reset statistics", 396 "POST /reconnect": "Force WebSocket reconnection" 397 } 398 }, null, 2), { 399 headers: { "Content-Type": "application/json" } 400 }); 401 }, 402 403 // Scheduled event handler - keeps Durable Object alive 404 async scheduled(controller: ScheduledController, env: Env, ctx: ExecutionContext): Promise<void> { 405 console.log("Scheduled keepalive triggered"); 406 407 // Get the Durable Object instance to keep it alive 408 const id: DurableObjectId = env.JETSTREAM_PROCESSOR.idFromName("main"); 409 const stub = env.JETSTREAM_PROCESSOR.get(id); 410 411 // Check connection status to ensure it's healthy 412 const status = await stub.getConnectionStatus(); 413 console.log("Keepalive check - WebSocket connected:", status.connected); 414 415 // If not connected, force a reconnection attempt 416 if (!status.connected) { 417 console.log("WebSocket disconnected, forcing reconnection"); 418 await stub.forceReconnect(); 419 } 420 }, 421 422 // Queue consumer handler - processes events from the queue 423 async queue(batch: MessageBatch, env: Env, ctx: ExecutionContext): Promise<void> { 424 console.log(`Processing batch of ${batch.messages.length} messages`); 425 426 // Process messages in batch for efficiency 427 const webhookPromises = batch.messages.map(async (message) => { 428 try { 429 // Cast the unknown message body to our QueueMessage type 430 const queueMessage = message.body as QueueMessage; 431 await sendToWebhook(queueMessage.event, env); 432 433 // Acknowledge successful processing 434 message.ack(); 435 436 console.log(`Successfully processed event ${queueMessage.event.time_us} for collection: ${queueMessage.event.commit?.collection || 'non-commit'}`); 437 } catch (error) { 438 console.error(`Failed to process queue message:`, error); 439 440 // Let the message retry (don't ack) 441 // Cloudflare Queues will automatically retry based on configuration 442 message.retry(); 443 } 444 }); 445 446 // Wait for all webhook calls to complete 447 await Promise.allSettled(webhookPromises); 448 } 449} satisfies ExportedHandler<Env>; 450 451async function sendToWebhook(event: JetstreamEvent, env: Env): Promise<void> { 452 const webhookUrl = env.WEBHOOK_URL; 453 const bearerToken = env.WEBHOOK_BEARER_TOKEN; 454 455 const headers: Record<string, string> = { 456 "Content-Type": "application/json", 457 "User-Agent": "Jetstream-Unified/1.0" 458 }; 459 460 // Add Authorization header if bearer token is available 461 if (bearerToken) { 462 headers["Authorization"] = `Bearer ${bearerToken}`; 463 } 464 465 const response = await fetch(webhookUrl, { 466 method: "POST", 467 headers, 468 body: JSON.stringify(event), 469 }); 470 471 if (!response.ok) { 472 // This will cause the message to retry 473 throw new Error(`Webhook request failed: ${response.status} ${response.statusText}`); 474 } 475}