Fork of atp.tools as a universal profile for people on the ATmosphere
1// Stolen from https://github.com/skyware-js/jetstream/
2// MPL
3
4import type {
5 At,
6 ComAtprotoSyncSubscribeRepos,
7 Records as _Records,
8} from "@atcute/client/lexicons";
9import "@atcute/bluesky/lexicons";
10import { EventEmitter } from "eventemitter3";
11
12/** Record mappings. */
13export interface Records extends _Records {}
14
15/**
16 * Options for the {@link Jetstream} class.
17 */
18export interface JetstreamOptions<
19 WantedCollections extends Collection = Collection,
20> {
21 /**
22 * The full subscription endpoint to connect to.
23 * @default "wss://jetstream1.us-east.bsky.network/subscribe"
24 */
25 endpoint?: string;
26 /**
27 * The record collections that you want to receive updates for.
28 * Leave this empty to receive updates for all record collections.
29 */
30 wantedCollections?: Array<WantedCollections>;
31 /**
32 * The DIDs that you want to receive updates for.
33 * Leave this empty to receive updates for all DIDs.
34 */
35 wantedDids?: Array<string>;
36 /**
37 * The maximum size of a payload that this client would like to receive.
38 * Zero means no limit, negative values are treated as zero.
39 * @default 0
40 */
41 maxMessageSizeBytes?: number;
42 /**
43 * The Unix timestamp in microseconds that you want to receive updates from.
44 */
45 cursor?: number;
46 /**
47 * The WebSocket implementation to use (e.g. `import ws from "ws"`).
48 * Not required if you are on Node 21.0.0 or newer, or another environment that provides a WebSocket implementation.
49 */
50 ws?: unknown;
51}
52
53/**
54 * The events that are emitted by the {@link Jetstream} class.
55 * @see {@link Jetstream#on}
56 */
57export type JetstreamEvents<WantedCollections extends Collection = Collection> =
58 {
59 open: [];
60 close: [];
61 commit: [event: CommitEvent<WantedCollections>];
62 account: [event: AccountEvent];
63 identity: [event: IdentityEvent];
64 error: [error: Error, cursor?: number];
65 };
66
67/**
68 * The Jetstream client.
69 */
70export class Jetstream<
71 WantedCollections extends CollectionOrWildcard = CollectionOrWildcard,
72 ResolvedCollections extends
73 Collection = ResolveLexiconWildcard<WantedCollections>,
74> extends EventEmitter<JetstreamEvents<ResolvedCollections>> {
75 /** WebSocket connection to the server. */
76 public ws?: WebSocket;
77
78 /** The full connection URL. */
79 public url: URL;
80
81 /** The current cursor. */
82 public cursor?: number;
83
84 /** The WebSocket implementation to use. */
85 private wsImpl?: unknown;
86
87 constructor(options?: JetstreamOptions<WantedCollections>) {
88 super();
89 options ??= {};
90 if (options.ws) this.wsImpl = options.ws;
91
92 if (typeof globalThis.WebSocket === "undefined" && !this.wsImpl) {
93 throw new Error(
94 `No WebSocket implementation was found in your environment. You must provide an implementation as the \`ws\` option.
95
96For example, in a Node.js environment, \`npm install ws\` and then:
97import { Jetstream } from "@skyware/jetstream";
98import WebSocket from "ws";
99
100const jetstream = new Jetstream({
101 ws: WebSocket,
102});`,
103 );
104 }
105
106 this.url = new URL(
107 options.endpoint ?? "wss://jetstream1.us-east.bsky.network/subscribe",
108 );
109 options.wantedCollections?.forEach((collection) => {
110 this.url.searchParams.append("wantedCollections", collection);
111 });
112 options.wantedDids?.forEach((did) => {
113 this.url.searchParams.append("wantedDids", did);
114 });
115 if (options.maxMessageSizeBytes) {
116 this.url.searchParams.append(
117 "maxMessageSizeBytes",
118 `${options.maxMessageSizeBytes}`,
119 );
120 }
121 if (options.cursor) this.cursor = options.cursor;
122 }
123
124 /**
125 * Opens a WebSocket connection to the server.
126 */
127 start() {
128 this.ws = new WebSocket(this.createUrl());
129
130 this.ws.onopen = () => this.emit("open");
131 this.ws.onclose = () => this.emit("close");
132 this.ws.onerror = (event) =>
133 this.emit("error", new Error("WebSocket error: " + event), this.cursor);
134
135 this.ws.onmessage = (data) => {
136 try {
137 const event = JSON.parse(data.data) as
138 | CommitEvent<ResolvedCollections>
139 | AccountEvent
140 | IdentityEvent;
141 if (event.time_us > (this.cursor ?? 0)) this.cursor = event.time_us;
142 switch (event.kind) {
143 case EventType.Commit:
144 if (
145 !event.commit?.collection ||
146 !event.commit.rkey ||
147 !event.commit.rev
148 ) {
149 return;
150 }
151 if (
152 event.commit.operation === CommitType.Create &&
153 !event.commit.record
154 ) {
155 return;
156 }
157
158 this.emit("commit", event);
159 // @ts-expect-error – We know we can use collection name as an event.
160 this.emit(event.commit.collection, event);
161 break;
162 case EventType.Account:
163 if (!event.account?.did) return;
164 this.emit("account", event);
165 break;
166 case EventType.Identity:
167 if (!event.identity?.did) return;
168 this.emit("identity", event);
169 break;
170 }
171 } catch (e) {
172 this.emit(
173 "error",
174 e instanceof Error ? e : new Error(e as never),
175 this.cursor,
176 );
177 }
178 };
179 }
180
181 /**
182 * Closes the WebSocket connection.
183 */
184 close() {
185 this.ws?.close();
186 }
187
188 /**
189 * Listen for records created in a specific collection.
190 * @param collection The name of the collection to listen for.
191 * @param listener A callback function that receives the commit event.
192 */
193 onCreate<T extends ResolvedCollections>(
194 collection: T,
195 listener: (event: CommitCreateEvent<T>) => void,
196 ) {
197 this.on(collection, ({ commit, ...event }) => {
198 if (commit.operation === CommitType.Create)
199 listener({ commit, ...event });
200 });
201 }
202
203 /**
204 * Listen for records updated in a specific collection.
205 * @param collection The name of the collection to listen for.
206 * @param listener A callback function that receives the commit event.
207 */
208 onUpdate<T extends ResolvedCollections>(
209 collection: T,
210 listener: (event: CommitUpdateEvent<T>) => void,
211 ) {
212 this.on(collection, ({ commit, ...event }) => {
213 if (commit.operation === CommitType.Update)
214 listener({ commit, ...event });
215 });
216 }
217
218 /**
219 * Listen for records deleted in a specific collection.
220 * @param collection The name of the collection to listen for.
221 * @param listener A callback function that receives the commit event.
222 */
223 onDelete<T extends ResolvedCollections>(
224 collection: T,
225 listener: (event: CommitDeleteEvent<T>) => void,
226 ) {
227 this.on(collection, ({ commit, ...event }) => {
228 if (commit.operation === CommitType.Delete)
229 listener({ commit, ...event });
230 });
231 }
232
233 /**
234 * Send a message to update options for the duration of this connection.
235 */
236 updateOptions(
237 payload: Pick<
238 JetstreamOptions,
239 "wantedDids" | "wantedCollections" | "maxMessageSizeBytes"
240 >,
241 ) {
242 if (!this.ws) throw new Error("Not connected.");
243
244 if (payload.wantedDids) {
245 this.url.searchParams.delete("wantedDids");
246 payload.wantedDids.forEach((did) => {
247 this.url.searchParams.append("wantedDids", did);
248 });
249 }
250 if (payload.wantedCollections) {
251 this.url.searchParams.delete("wantedCollections");
252 payload.wantedCollections.forEach((collection) => {
253 this.url.searchParams.append("wantedCollections", collection);
254 });
255 }
256 if (payload.maxMessageSizeBytes) {
257 this.url.searchParams.set(
258 "maxMessageSizeBytes",
259 payload.maxMessageSizeBytes.toString(),
260 );
261 }
262
263 this.ws.send(JSON.stringify({ type: "options_update", payload }));
264 }
265
266 private createUrl() {
267 if (this.cursor)
268 this.url.searchParams.set("cursor", this.cursor.toString());
269 return this.url.toString();
270 }
271
272 /** Emitted when the connection is opened. */
273 override on(event: "open", listener: () => void): this;
274 /** Emitted when the connection is closed. */
275 override on(event: "close", listener: () => void): this;
276 /** Emitted when any commit is received. */
277 override on(
278 event: "commit",
279 listener: (event: CommitEvent<ResolvedCollections>) => void,
280 ): this;
281 /** Emitted when an account is updated. */
282 override on(event: "account", listener: (event: AccountEvent) => void): this;
283 /** Emitted when an identity event is received. */
284 override on(
285 event: "identity",
286 listener: (event: IdentityEvent) => void,
287 ): this;
288 /**
289 * Emitted when a network error occurs.
290 * @param listener A callback function that receives the error and the last known cursor.
291 */
292 override on(
293 event: "error",
294 listener: (error: Error, cursor?: number) => void,
295 ): this;
296 /**
297 * Listen for all commits related to a specific collection.
298 * @param collection The name of the collection.
299 * @param listener A callback function that receives the commit event.
300 */
301 override on<T extends ResolvedCollections>(
302 collection: T,
303 listener: (event: CommitEvent<T>) => void,
304 ): this;
305 /**
306 * @param event The event to listen for.
307 * @param listener The callback function, called when the event is emitted.
308 */
309 override on(
310 event: keyof JetstreamEvents<ResolvedCollections>,
311 listener: (...args: any[]) => void,
312 ) {
313 return super.on(event, listener as never);
314 }
315}
316
317/** Resolves a lexicon name to its record operation. */
318export type ResolveLexicon<T extends string> = T extends keyof Records
319 ? Records[T]
320 : { $type: T };
321
322/** Checks if any member of a union is assignable to a given operation. */
323type UnionMemberIsAssignableTo<Union, AssignableTo> =
324 // Distribute over union members
325 Union extends Union
326 ? // `Union` here refers to a given union member
327 Union extends AssignableTo
328 ? true
329 : never
330 : never;
331
332/** Resolves a wildcard string to the record types it matches. */
333export type ResolveLexiconWildcard<T extends string> =
334 // Match the prefix
335 T extends `${infer Prefix}*`
336 ? // Check that at least one collection name matches the prefix (we use `true extends` because `never` extends everything)
337 true extends UnionMemberIsAssignableTo<
338 keyof Records,
339 `${Prefix}${string}`
340 >
341 ? // If so, return known matching collection names
342 keyof Records & `${Prefix}${string}` extends infer Lexicon extends
343 string
344 ? Lexicon
345 : never
346 : // If no collection name matches the prefix, return as a operation-level wildcard string
347 `${Prefix}${string}`
348 : // If there's no wildcard, return the original string
349 T;
350
351/** The name of a collection. */
352export type Collection = keyof Records | (string & {});
353
354/** Generates all possible wildcard strings that match a given collection name. */
355type PossibleCollectionWildcards<CollectionName extends string> =
356 CollectionName extends `${infer Prefix}.${infer Suffix}`
357 ? `${Prefix}.*` | `${Prefix}.${PossibleCollectionWildcards<Suffix>}`
358 : never;
359
360/** The name of a collection or a wildcard string matching multiple collections. */
361export type CollectionOrWildcard =
362 | PossibleCollectionWildcards<keyof Records>
363 | Collection;
364
365/**
366 * The types of events that are emitted by {@link Jetstream}.
367 * @enum
368 */
369export const EventType = {
370 /** A new commit. */
371 Commit: "commit",
372 /** An account's status was updated. */
373 Account: "account",
374 /** An account's identity was updated. */
375 Identity: "identity",
376} as const;
377export type EventType = (typeof EventType)[keyof typeof EventType];
378
379/**
380 * The types of commits that can be received.
381 * @enum
382 */
383export const CommitType = {
384 /** A record was created. */
385 Create: "create",
386 /** A record was updated. */
387 Update: "update",
388 /** A record was deleted. */
389 Delete: "delete",
390} as const;
391export type CommitType = (typeof CommitType)[keyof typeof CommitType];
392
393/**
394 * The base operation for events emitted by the {@link Jetstream} class.
395 */
396export interface EventBase {
397 did: At.DID;
398 time_us: number;
399 kind: EventType;
400}
401
402/**
403 * A commit event. Represents a commit to a user repository.
404 */
405export interface CommitEvent<RecordType extends string> extends EventBase {
406 kind: typeof EventType.Commit;
407 commit: Commit<RecordType>;
408}
409
410/** A commit event where a record was created. */
411export interface CommitCreateEvent<RecordType extends string>
412 extends CommitEvent<RecordType> {
413 commit: CommitCreate<RecordType>;
414}
415
416/** A commit event where a record was updated. */
417export interface CommitUpdateEvent<RecordType extends string>
418 extends CommitEvent<RecordType> {
419 commit: CommitUpdate<RecordType>;
420}
421
422/** A commit event where a record was deleted. */
423export interface CommitDeleteEvent<RecordType extends string>
424 extends CommitEvent<RecordType> {
425 commit: CommitDelete<RecordType>;
426}
427
428/**
429 * An account event. Represents a change to an account's status on a host (e.g. PDS or Relay).
430 */
431export interface AccountEvent extends EventBase {
432 kind: typeof EventType.Account;
433 account: ComAtprotoSyncSubscribeRepos.Account;
434}
435
436/**
437 * An identity event. Represents a change to an account's identity.
438 */
439export interface IdentityEvent extends EventBase {
440 kind: typeof EventType.Identity;
441 identity: ComAtprotoSyncSubscribeRepos.Identity;
442}
443
444/**
445 * The base operation for commit events.
446 */
447export interface CommitBase<RecordType extends string> {
448 operation: CommitType;
449 rev: string;
450 collection: RecordType;
451 rkey: string;
452}
453
454/**
455 * A commit event representing a new record.
456 */
457export interface CommitCreate<RecordType extends string>
458 extends CommitBase<RecordType> {
459 operation: typeof CommitType.Create;
460 record: ResolveLexicon<RecordType>;
461 cid: At.CID;
462}
463
464/**
465 * A commit event representing an update to an existing record.
466 */
467export interface CommitUpdate<RecordType extends string>
468 extends CommitBase<RecordType> {
469 operation: typeof CommitType.Update;
470 record: ResolveLexicon<RecordType>;
471 cid: At.CID;
472}
473
474/**
475 * A commit event representing a deletion of an existing record.
476 */
477export interface CommitDelete<RecordType extends string>
478 extends CommitBase<RecordType> {
479 operation: typeof CommitType.Delete;
480}
481
482/**
483 * A commit event.
484 */
485export type Commit<RecordType extends string> =
486 | CommitCreate<RecordType>
487 | CommitUpdate<RecordType>
488 | CommitDelete<RecordType>;