A decentralized music tracking and discovery platform built on AT Protocol 馃幍
at main 285 lines 7.1 kB view raw
1export interface JetStreamEvent { 2 did: string; 3 time_us: number; 4 kind: "commit" | "identity" | "account"; 5 commit?: { 6 rev: string; 7 operation: "create" | "update" | "delete"; 8 collection: string; 9 rkey: string; 10 record?: Record<string, unknown>; 11 cid?: string; 12 }; 13 identity?: { 14 did: string; 15 handle?: string; 16 seq?: number; 17 time?: string; 18 }; 19 account?: { 20 active: boolean; 21 did: string; 22 seq: number; 23 time: string; 24 }; 25} 26 27export interface JetStreamClientOptions { 28 endpoint?: string; 29 wantedCollections?: string[]; 30 wantedDids?: string[]; 31 maxReconnectAttempts?: number; 32 reconnectDelay?: number; 33 maxReconnectDelay?: number; 34 backoffMultiplier?: number; 35 debug?: boolean; 36} 37 38export type JetStreamEventType = 39 | "open" 40 | "message" 41 | "error" 42 | "close" 43 | "reconnect"; 44 45export class JetStreamClient { 46 private ws: WebSocket | null = null; 47 private options: Required<JetStreamClientOptions>; 48 private reconnectAttempts = 0; 49 private reconnectTimer: number | null = null; 50 private isManualClose = false; 51 private eventHandlers: Map< 52 JetStreamEventType, 53 Set<(data?: unknown) => void> 54 > = new Map(); 55 private cursor: number | null = null; 56 57 constructor(options: JetStreamClientOptions = {}) { 58 this.options = { 59 endpoint: 60 options.endpoint || "wss://jetstream1.us-east.bsky.network/subscribe", 61 wantedCollections: options.wantedCollections || [], 62 wantedDids: options.wantedDids || [], 63 maxReconnectAttempts: options.maxReconnectAttempts ?? Infinity, 64 reconnectDelay: options.reconnectDelay ?? 1000, 65 maxReconnectDelay: options.maxReconnectDelay ?? 30000, 66 backoffMultiplier: options.backoffMultiplier ?? 1.5, 67 debug: options.debug ?? false, 68 }; 69 70 // Initialize event handler sets 71 ["open", "message", "error", "close", "reconnect"].forEach((event) => { 72 this.eventHandlers.set(event as JetStreamEventType, new Set()); 73 }); 74 } 75 76 /** 77 * Register an event handler 78 */ 79 on(event: JetStreamEventType, handler: (data?: unknown) => void): this { 80 this.eventHandlers.get(event)?.add(handler); 81 return this; 82 } 83 84 /** 85 * Remove an event handler 86 */ 87 off(event: JetStreamEventType, handler: (data?: unknown) => void): this { 88 this.eventHandlers.get(event)?.delete(handler); 89 return this; 90 } 91 92 /** 93 * Emit an event to all registered handlers 94 */ 95 private emit(event: JetStreamEventType, data?: unknown): void { 96 this.eventHandlers.get(event)?.forEach((handler) => { 97 try { 98 handler(data); 99 } catch (error) { 100 this.log("error", `Handler error for ${event}:`, error); 101 } 102 }); 103 } 104 105 /** 106 * Build the WebSocket URL with query parameters 107 */ 108 private buildUrl(): string { 109 const url = new URL(this.options.endpoint); 110 111 if (this.options.wantedCollections.length > 0) { 112 this.options.wantedCollections.forEach((collection) => { 113 url.searchParams.append("wantedCollections", collection); 114 }); 115 } 116 117 if (this.options.wantedDids.length > 0) { 118 this.options.wantedDids.forEach((did) => { 119 url.searchParams.append("wantedDids", did); 120 }); 121 } 122 123 if (this.cursor !== null) { 124 url.searchParams.set("cursor", this.cursor.toString()); 125 } 126 127 return url.toString(); 128 } 129 130 /** 131 * Connect to the JetStream WebSocket 132 */ 133 connect(): void { 134 if (this.ws && this.ws.readyState === WebSocket.OPEN) { 135 this.log("warn", "Already connected"); 136 return; 137 } 138 139 this.isManualClose = false; 140 const url = this.buildUrl(); 141 this.log("info", `Connecting to ${url}`); 142 143 try { 144 this.ws = new WebSocket(url); 145 146 this.ws.onopen = () => { 147 this.log("info", "Connected successfully"); 148 this.reconnectAttempts = 0; 149 this.emit("open"); 150 }; 151 152 this.ws.onmessage = (event) => { 153 try { 154 const data = JSON.parse(event.data) as JetStreamEvent; 155 156 // Update cursor for resumption 157 if (data.time_us) { 158 this.cursor = data.time_us; 159 } 160 161 this.emit("message", data); 162 } catch (error) { 163 this.log("error", "Failed to parse message:", error); 164 this.emit("error", { type: "parse_error", error }); 165 } 166 }; 167 168 this.ws.onerror = (event) => { 169 this.log("error", "WebSocket error:", event); 170 this.emit("error", event); 171 }; 172 173 this.ws.onclose = (event) => { 174 this.log("info", `Connection closed: ${event.code} ${event.reason}`); 175 this.emit("close", event); 176 177 if (!this.isManualClose) { 178 this.scheduleReconnect(); 179 } 180 }; 181 } catch (error) { 182 this.log("error", "Failed to create WebSocket:", error); 183 this.emit("error", { type: "connection_error", error }); 184 this.scheduleReconnect(); 185 } 186 } 187 188 /** 189 * Schedule a reconnection attempt with exponential backoff 190 */ 191 private scheduleReconnect(): void { 192 if (this.reconnectAttempts >= this.options.maxReconnectAttempts) { 193 this.log("error", "Max reconnection attempts reached"); 194 return; 195 } 196 197 const delay = Math.min( 198 this.options.reconnectDelay * 199 Math.pow(this.options.backoffMultiplier, this.reconnectAttempts), 200 this.options.maxReconnectDelay, 201 ); 202 203 this.reconnectAttempts++; 204 this.log( 205 "info", 206 `Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts})`, 207 ); 208 209 this.reconnectTimer = setTimeout(() => { 210 this.emit("reconnect", { attempt: this.reconnectAttempts }); 211 this.connect(); 212 }, delay); 213 } 214 215 /** 216 * Manually disconnect from the WebSocket 217 */ 218 disconnect(): void { 219 this.isManualClose = true; 220 221 if (this.reconnectTimer !== null) { 222 clearTimeout(this.reconnectTimer); 223 this.reconnectTimer = null; 224 } 225 226 if (this.ws) { 227 this.ws.close(); 228 this.ws = null; 229 } 230 231 this.log("info", "Disconnected"); 232 } 233 234 /** 235 * Update subscription filters (requires reconnection) 236 */ 237 updateFilters(options: { 238 wantedCollections?: string[]; 239 wantedDids?: string[]; 240 }): void { 241 if (options.wantedCollections) { 242 this.options.wantedCollections = options.wantedCollections; 243 } 244 if (options.wantedDids) { 245 this.options.wantedDids = options.wantedDids; 246 } 247 248 // Reconnect with new filters 249 if (this.ws) { 250 this.disconnect(); 251 this.connect(); 252 } 253 } 254 255 /** 256 * Get current connection state 257 */ 258 get readyState(): number { 259 return this.ws?.readyState ?? WebSocket.CLOSED; 260 } 261 262 /** 263 * Check if currently connected 264 */ 265 get isConnected(): boolean { 266 return this.ws?.readyState === WebSocket.OPEN; 267 } 268 269 /** 270 * Get current cursor position 271 */ 272 get currentCursor(): number | null { 273 return this.cursor; 274 } 275 276 /** 277 * Logging utility 278 */ 279 private log(level: "info" | "warn" | "error", ...args: unknown[]): void { 280 if (this.options.debug || level === "error") { 281 const prefix = `[JetStream ${level.toUpperCase()}]`; 282 console[level](prefix, ...args); 283 } 284 } 285}