forked from
rocksky.app/rocksky
A decentralized music tracking and discovery platform built on AT Protocol 馃幍
1export interface JetStreamEvent {
2 did: string;
3 time_us: number;
4 kind: "commit" | "identity" | "account";
5 commit?: {
6 rev: string;
7 operation: "create" | "update" | "delete";
8 collection: string;
9 rkey: string;
10 record?: Record<string, unknown>;
11 cid?: string;
12 };
13 identity?: {
14 did: string;
15 handle?: string;
16 seq?: number;
17 time?: string;
18 };
19 account?: {
20 active: boolean;
21 did: string;
22 seq: number;
23 time: string;
24 };
25}
26
27export interface JetStreamClientOptions {
28 endpoint?: string;
29 wantedCollections?: string[];
30 wantedDids?: string[];
31 maxReconnectAttempts?: number;
32 reconnectDelay?: number;
33 maxReconnectDelay?: number;
34 backoffMultiplier?: number;
35 debug?: boolean;
36}
37
38export type JetStreamEventType =
39 | "open"
40 | "message"
41 | "error"
42 | "close"
43 | "reconnect";
44
45export class JetStreamClient {
46 private ws: WebSocket | null = null;
47 private options: Required<JetStreamClientOptions>;
48 private reconnectAttempts = 0;
49 private reconnectTimer: number | null = null;
50 private isManualClose = false;
51 private eventHandlers: Map<
52 JetStreamEventType,
53 Set<(data?: unknown) => void>
54 > = new Map();
55 private cursor: number | null = null;
56
57 constructor(options: JetStreamClientOptions = {}) {
58 this.options = {
59 endpoint:
60 options.endpoint || "wss://jetstream1.us-east.bsky.network/subscribe",
61 wantedCollections: options.wantedCollections || [],
62 wantedDids: options.wantedDids || [],
63 maxReconnectAttempts: options.maxReconnectAttempts ?? Infinity,
64 reconnectDelay: options.reconnectDelay ?? 1000,
65 maxReconnectDelay: options.maxReconnectDelay ?? 30000,
66 backoffMultiplier: options.backoffMultiplier ?? 1.5,
67 debug: options.debug ?? false,
68 };
69
70 // Initialize event handler sets
71 ["open", "message", "error", "close", "reconnect"].forEach((event) => {
72 this.eventHandlers.set(event as JetStreamEventType, new Set());
73 });
74 }
75
76 /**
77 * Register an event handler
78 */
79 on(event: JetStreamEventType, handler: (data?: unknown) => void): this {
80 this.eventHandlers.get(event)?.add(handler);
81 return this;
82 }
83
84 /**
85 * Remove an event handler
86 */
87 off(event: JetStreamEventType, handler: (data?: unknown) => void): this {
88 this.eventHandlers.get(event)?.delete(handler);
89 return this;
90 }
91
92 /**
93 * Emit an event to all registered handlers
94 */
95 private emit(event: JetStreamEventType, data?: unknown): void {
96 this.eventHandlers.get(event)?.forEach((handler) => {
97 try {
98 handler(data);
99 } catch (error) {
100 this.log("error", `Handler error for ${event}:`, error);
101 }
102 });
103 }
104
105 /**
106 * Build the WebSocket URL with query parameters
107 */
108 private buildUrl(): string {
109 const url = new URL(this.options.endpoint);
110
111 if (this.options.wantedCollections.length > 0) {
112 this.options.wantedCollections.forEach((collection) => {
113 url.searchParams.append("wantedCollections", collection);
114 });
115 }
116
117 if (this.options.wantedDids.length > 0) {
118 this.options.wantedDids.forEach((did) => {
119 url.searchParams.append("wantedDids", did);
120 });
121 }
122
123 if (this.cursor !== null) {
124 url.searchParams.set("cursor", this.cursor.toString());
125 }
126
127 return url.toString();
128 }
129
130 /**
131 * Connect to the JetStream WebSocket
132 */
133 connect(): void {
134 if (this.ws && this.ws.readyState === WebSocket.OPEN) {
135 this.log("warn", "Already connected");
136 return;
137 }
138
139 this.isManualClose = false;
140 const url = this.buildUrl();
141 this.log("info", `Connecting to ${url}`);
142
143 try {
144 this.ws = new WebSocket(url);
145
146 this.ws.onopen = () => {
147 this.log("info", "Connected successfully");
148 this.reconnectAttempts = 0;
149 this.emit("open");
150 };
151
152 this.ws.onmessage = (event) => {
153 try {
154 const data = JSON.parse(event.data) as JetStreamEvent;
155
156 // Update cursor for resumption
157 if (data.time_us) {
158 this.cursor = data.time_us;
159 }
160
161 this.emit("message", data);
162 } catch (error) {
163 this.log("error", "Failed to parse message:", error);
164 this.emit("error", { type: "parse_error", error });
165 }
166 };
167
168 this.ws.onerror = (event) => {
169 this.log("error", "WebSocket error:", event);
170 this.emit("error", event);
171 };
172
173 this.ws.onclose = (event) => {
174 this.log("info", `Connection closed: ${event.code} ${event.reason}`);
175 this.emit("close", event);
176
177 if (!this.isManualClose) {
178 this.scheduleReconnect();
179 }
180 };
181 } catch (error) {
182 this.log("error", "Failed to create WebSocket:", error);
183 this.emit("error", { type: "connection_error", error });
184 this.scheduleReconnect();
185 }
186 }
187
188 /**
189 * Schedule a reconnection attempt with exponential backoff
190 */
191 private scheduleReconnect(): void {
192 if (this.reconnectAttempts >= this.options.maxReconnectAttempts) {
193 this.log("error", "Max reconnection attempts reached");
194 return;
195 }
196
197 const delay = Math.min(
198 this.options.reconnectDelay *
199 Math.pow(this.options.backoffMultiplier, this.reconnectAttempts),
200 this.options.maxReconnectDelay,
201 );
202
203 this.reconnectAttempts++;
204 this.log(
205 "info",
206 `Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts})`,
207 );
208
209 this.reconnectTimer = setTimeout(() => {
210 this.emit("reconnect", { attempt: this.reconnectAttempts });
211 this.connect();
212 }, delay);
213 }
214
215 /**
216 * Manually disconnect from the WebSocket
217 */
218 disconnect(): void {
219 this.isManualClose = true;
220
221 if (this.reconnectTimer !== null) {
222 clearTimeout(this.reconnectTimer);
223 this.reconnectTimer = null;
224 }
225
226 if (this.ws) {
227 this.ws.close();
228 this.ws = null;
229 }
230
231 this.log("info", "Disconnected");
232 }
233
234 /**
235 * Update subscription filters (requires reconnection)
236 */
237 updateFilters(options: {
238 wantedCollections?: string[];
239 wantedDids?: string[];
240 }): void {
241 if (options.wantedCollections) {
242 this.options.wantedCollections = options.wantedCollections;
243 }
244 if (options.wantedDids) {
245 this.options.wantedDids = options.wantedDids;
246 }
247
248 // Reconnect with new filters
249 if (this.ws) {
250 this.disconnect();
251 this.connect();
252 }
253 }
254
255 /**
256 * Get current connection state
257 */
258 get readyState(): number {
259 return this.ws?.readyState ?? WebSocket.CLOSED;
260 }
261
262 /**
263 * Check if currently connected
264 */
265 get isConnected(): boolean {
266 return this.ws?.readyState === WebSocket.OPEN;
267 }
268
269 /**
270 * Get current cursor position
271 */
272 get currentCursor(): number | null {
273 return this.cursor;
274 }
275
276 /**
277 * Logging utility
278 */
279 private log(level: "info" | "warn" | "error", ...args: unknown[]): void {
280 if (this.options.debug || level === "error") {
281 const prefix = `[JetStream ${level.toUpperCase()}]`;
282 console[level](prefix, ...args);
283 }
284 }
285}