Fork of atp.tools as a universal profile for people on the ATmosphere
at main 488 lines 14 kB view raw
1// Stolen from https://github.com/skyware-js/jetstream/ 2// MPL 3 4import type { 5 At, 6 ComAtprotoSyncSubscribeRepos, 7 Records as _Records, 8} from "@atcute/client/lexicons"; 9import "@atcute/bluesky/lexicons"; 10import { EventEmitter } from "eventemitter3"; 11 12/** Record mappings. */ 13export interface Records extends _Records {} 14 15/** 16 * Options for the {@link Jetstream} class. 17 */ 18export interface JetstreamOptions< 19 WantedCollections extends Collection = Collection, 20> { 21 /** 22 * The full subscription endpoint to connect to. 23 * @default "wss://jetstream1.us-east.bsky.network/subscribe" 24 */ 25 endpoint?: string; 26 /** 27 * The record collections that you want to receive updates for. 28 * Leave this empty to receive updates for all record collections. 29 */ 30 wantedCollections?: Array<WantedCollections>; 31 /** 32 * The DIDs that you want to receive updates for. 33 * Leave this empty to receive updates for all DIDs. 34 */ 35 wantedDids?: Array<string>; 36 /** 37 * The maximum size of a payload that this client would like to receive. 38 * Zero means no limit, negative values are treated as zero. 39 * @default 0 40 */ 41 maxMessageSizeBytes?: number; 42 /** 43 * The Unix timestamp in microseconds that you want to receive updates from. 44 */ 45 cursor?: number; 46 /** 47 * The WebSocket implementation to use (e.g. `import ws from "ws"`). 48 * Not required if you are on Node 21.0.0 or newer, or another environment that provides a WebSocket implementation. 49 */ 50 ws?: unknown; 51} 52 53/** 54 * The events that are emitted by the {@link Jetstream} class. 55 * @see {@link Jetstream#on} 56 */ 57export type JetstreamEvents<WantedCollections extends Collection = Collection> = 58 { 59 open: []; 60 close: []; 61 commit: [event: CommitEvent<WantedCollections>]; 62 account: [event: AccountEvent]; 63 identity: [event: IdentityEvent]; 64 error: [error: Error, cursor?: number]; 65 }; 66 67/** 68 * The Jetstream client. 69 */ 70export class Jetstream< 71 WantedCollections extends CollectionOrWildcard = CollectionOrWildcard, 72 ResolvedCollections extends 73 Collection = ResolveLexiconWildcard<WantedCollections>, 74> extends EventEmitter<JetstreamEvents<ResolvedCollections>> { 75 /** WebSocket connection to the server. */ 76 public ws?: WebSocket; 77 78 /** The full connection URL. */ 79 public url: URL; 80 81 /** The current cursor. */ 82 public cursor?: number; 83 84 /** The WebSocket implementation to use. */ 85 private wsImpl?: unknown; 86 87 constructor(options?: JetstreamOptions<WantedCollections>) { 88 super(); 89 options ??= {}; 90 if (options.ws) this.wsImpl = options.ws; 91 92 if (typeof globalThis.WebSocket === "undefined" && !this.wsImpl) { 93 throw new Error( 94 `No WebSocket implementation was found in your environment. You must provide an implementation as the \`ws\` option. 95 96For example, in a Node.js environment, \`npm install ws\` and then: 97import { Jetstream } from "@skyware/jetstream"; 98import WebSocket from "ws"; 99 100const jetstream = new Jetstream({ 101 ws: WebSocket, 102});`, 103 ); 104 } 105 106 this.url = new URL( 107 options.endpoint ?? "wss://jetstream1.us-east.bsky.network/subscribe", 108 ); 109 options.wantedCollections?.forEach((collection) => { 110 this.url.searchParams.append("wantedCollections", collection); 111 }); 112 options.wantedDids?.forEach((did) => { 113 this.url.searchParams.append("wantedDids", did); 114 }); 115 if (options.maxMessageSizeBytes) { 116 this.url.searchParams.append( 117 "maxMessageSizeBytes", 118 `${options.maxMessageSizeBytes}`, 119 ); 120 } 121 if (options.cursor) this.cursor = options.cursor; 122 } 123 124 /** 125 * Opens a WebSocket connection to the server. 126 */ 127 start() { 128 this.ws = new WebSocket(this.createUrl()); 129 130 this.ws.onopen = () => this.emit("open"); 131 this.ws.onclose = () => this.emit("close"); 132 this.ws.onerror = (event) => 133 this.emit("error", new Error("WebSocket error: " + event), this.cursor); 134 135 this.ws.onmessage = (data) => { 136 try { 137 const event = JSON.parse(data.data) as 138 | CommitEvent<ResolvedCollections> 139 | AccountEvent 140 | IdentityEvent; 141 if (event.time_us > (this.cursor ?? 0)) this.cursor = event.time_us; 142 switch (event.kind) { 143 case EventType.Commit: 144 if ( 145 !event.commit?.collection || 146 !event.commit.rkey || 147 !event.commit.rev 148 ) { 149 return; 150 } 151 if ( 152 event.commit.operation === CommitType.Create && 153 !event.commit.record 154 ) { 155 return; 156 } 157 158 this.emit("commit", event); 159 // @ts-expect-error – We know we can use collection name as an event. 160 this.emit(event.commit.collection, event); 161 break; 162 case EventType.Account: 163 if (!event.account?.did) return; 164 this.emit("account", event); 165 break; 166 case EventType.Identity: 167 if (!event.identity?.did) return; 168 this.emit("identity", event); 169 break; 170 } 171 } catch (e) { 172 this.emit( 173 "error", 174 e instanceof Error ? e : new Error(e as never), 175 this.cursor, 176 ); 177 } 178 }; 179 } 180 181 /** 182 * Closes the WebSocket connection. 183 */ 184 close() { 185 this.ws?.close(); 186 } 187 188 /** 189 * Listen for records created in a specific collection. 190 * @param collection The name of the collection to listen for. 191 * @param listener A callback function that receives the commit event. 192 */ 193 onCreate<T extends ResolvedCollections>( 194 collection: T, 195 listener: (event: CommitCreateEvent<T>) => void, 196 ) { 197 this.on(collection, ({ commit, ...event }) => { 198 if (commit.operation === CommitType.Create) 199 listener({ commit, ...event }); 200 }); 201 } 202 203 /** 204 * Listen for records updated in a specific collection. 205 * @param collection The name of the collection to listen for. 206 * @param listener A callback function that receives the commit event. 207 */ 208 onUpdate<T extends ResolvedCollections>( 209 collection: T, 210 listener: (event: CommitUpdateEvent<T>) => void, 211 ) { 212 this.on(collection, ({ commit, ...event }) => { 213 if (commit.operation === CommitType.Update) 214 listener({ commit, ...event }); 215 }); 216 } 217 218 /** 219 * Listen for records deleted in a specific collection. 220 * @param collection The name of the collection to listen for. 221 * @param listener A callback function that receives the commit event. 222 */ 223 onDelete<T extends ResolvedCollections>( 224 collection: T, 225 listener: (event: CommitDeleteEvent<T>) => void, 226 ) { 227 this.on(collection, ({ commit, ...event }) => { 228 if (commit.operation === CommitType.Delete) 229 listener({ commit, ...event }); 230 }); 231 } 232 233 /** 234 * Send a message to update options for the duration of this connection. 235 */ 236 updateOptions( 237 payload: Pick< 238 JetstreamOptions, 239 "wantedDids" | "wantedCollections" | "maxMessageSizeBytes" 240 >, 241 ) { 242 if (!this.ws) throw new Error("Not connected."); 243 244 if (payload.wantedDids) { 245 this.url.searchParams.delete("wantedDids"); 246 payload.wantedDids.forEach((did) => { 247 this.url.searchParams.append("wantedDids", did); 248 }); 249 } 250 if (payload.wantedCollections) { 251 this.url.searchParams.delete("wantedCollections"); 252 payload.wantedCollections.forEach((collection) => { 253 this.url.searchParams.append("wantedCollections", collection); 254 }); 255 } 256 if (payload.maxMessageSizeBytes) { 257 this.url.searchParams.set( 258 "maxMessageSizeBytes", 259 payload.maxMessageSizeBytes.toString(), 260 ); 261 } 262 263 this.ws.send(JSON.stringify({ type: "options_update", payload })); 264 } 265 266 private createUrl() { 267 if (this.cursor) 268 this.url.searchParams.set("cursor", this.cursor.toString()); 269 return this.url.toString(); 270 } 271 272 /** Emitted when the connection is opened. */ 273 override on(event: "open", listener: () => void): this; 274 /** Emitted when the connection is closed. */ 275 override on(event: "close", listener: () => void): this; 276 /** Emitted when any commit is received. */ 277 override on( 278 event: "commit", 279 listener: (event: CommitEvent<ResolvedCollections>) => void, 280 ): this; 281 /** Emitted when an account is updated. */ 282 override on(event: "account", listener: (event: AccountEvent) => void): this; 283 /** Emitted when an identity event is received. */ 284 override on( 285 event: "identity", 286 listener: (event: IdentityEvent) => void, 287 ): this; 288 /** 289 * Emitted when a network error occurs. 290 * @param listener A callback function that receives the error and the last known cursor. 291 */ 292 override on( 293 event: "error", 294 listener: (error: Error, cursor?: number) => void, 295 ): this; 296 /** 297 * Listen for all commits related to a specific collection. 298 * @param collection The name of the collection. 299 * @param listener A callback function that receives the commit event. 300 */ 301 override on<T extends ResolvedCollections>( 302 collection: T, 303 listener: (event: CommitEvent<T>) => void, 304 ): this; 305 /** 306 * @param event The event to listen for. 307 * @param listener The callback function, called when the event is emitted. 308 */ 309 override on( 310 event: keyof JetstreamEvents<ResolvedCollections>, 311 listener: (...args: any[]) => void, 312 ) { 313 return super.on(event, listener as never); 314 } 315} 316 317/** Resolves a lexicon name to its record operation. */ 318export type ResolveLexicon<T extends string> = T extends keyof Records 319 ? Records[T] 320 : { $type: T }; 321 322/** Checks if any member of a union is assignable to a given operation. */ 323type UnionMemberIsAssignableTo<Union, AssignableTo> = 324 // Distribute over union members 325 Union extends Union 326 ? // `Union` here refers to a given union member 327 Union extends AssignableTo 328 ? true 329 : never 330 : never; 331 332/** Resolves a wildcard string to the record types it matches. */ 333export type ResolveLexiconWildcard<T extends string> = 334 // Match the prefix 335 T extends `${infer Prefix}*` 336 ? // Check that at least one collection name matches the prefix (we use `true extends` because `never` extends everything) 337 true extends UnionMemberIsAssignableTo< 338 keyof Records, 339 `${Prefix}${string}` 340 > 341 ? // If so, return known matching collection names 342 keyof Records & `${Prefix}${string}` extends infer Lexicon extends 343 string 344 ? Lexicon 345 : never 346 : // If no collection name matches the prefix, return as a operation-level wildcard string 347 `${Prefix}${string}` 348 : // If there's no wildcard, return the original string 349 T; 350 351/** The name of a collection. */ 352export type Collection = keyof Records | (string & {}); 353 354/** Generates all possible wildcard strings that match a given collection name. */ 355type PossibleCollectionWildcards<CollectionName extends string> = 356 CollectionName extends `${infer Prefix}.${infer Suffix}` 357 ? `${Prefix}.*` | `${Prefix}.${PossibleCollectionWildcards<Suffix>}` 358 : never; 359 360/** The name of a collection or a wildcard string matching multiple collections. */ 361export type CollectionOrWildcard = 362 | PossibleCollectionWildcards<keyof Records> 363 | Collection; 364 365/** 366 * The types of events that are emitted by {@link Jetstream}. 367 * @enum 368 */ 369export const EventType = { 370 /** A new commit. */ 371 Commit: "commit", 372 /** An account's status was updated. */ 373 Account: "account", 374 /** An account's identity was updated. */ 375 Identity: "identity", 376} as const; 377export type EventType = (typeof EventType)[keyof typeof EventType]; 378 379/** 380 * The types of commits that can be received. 381 * @enum 382 */ 383export const CommitType = { 384 /** A record was created. */ 385 Create: "create", 386 /** A record was updated. */ 387 Update: "update", 388 /** A record was deleted. */ 389 Delete: "delete", 390} as const; 391export type CommitType = (typeof CommitType)[keyof typeof CommitType]; 392 393/** 394 * The base operation for events emitted by the {@link Jetstream} class. 395 */ 396export interface EventBase { 397 did: At.DID; 398 time_us: number; 399 kind: EventType; 400} 401 402/** 403 * A commit event. Represents a commit to a user repository. 404 */ 405export interface CommitEvent<RecordType extends string> extends EventBase { 406 kind: typeof EventType.Commit; 407 commit: Commit<RecordType>; 408} 409 410/** A commit event where a record was created. */ 411export interface CommitCreateEvent<RecordType extends string> 412 extends CommitEvent<RecordType> { 413 commit: CommitCreate<RecordType>; 414} 415 416/** A commit event where a record was updated. */ 417export interface CommitUpdateEvent<RecordType extends string> 418 extends CommitEvent<RecordType> { 419 commit: CommitUpdate<RecordType>; 420} 421 422/** A commit event where a record was deleted. */ 423export interface CommitDeleteEvent<RecordType extends string> 424 extends CommitEvent<RecordType> { 425 commit: CommitDelete<RecordType>; 426} 427 428/** 429 * An account event. Represents a change to an account's status on a host (e.g. PDS or Relay). 430 */ 431export interface AccountEvent extends EventBase { 432 kind: typeof EventType.Account; 433 account: ComAtprotoSyncSubscribeRepos.Account; 434} 435 436/** 437 * An identity event. Represents a change to an account's identity. 438 */ 439export interface IdentityEvent extends EventBase { 440 kind: typeof EventType.Identity; 441 identity: ComAtprotoSyncSubscribeRepos.Identity; 442} 443 444/** 445 * The base operation for commit events. 446 */ 447export interface CommitBase<RecordType extends string> { 448 operation: CommitType; 449 rev: string; 450 collection: RecordType; 451 rkey: string; 452} 453 454/** 455 * A commit event representing a new record. 456 */ 457export interface CommitCreate<RecordType extends string> 458 extends CommitBase<RecordType> { 459 operation: typeof CommitType.Create; 460 record: ResolveLexicon<RecordType>; 461 cid: At.CID; 462} 463 464/** 465 * A commit event representing an update to an existing record. 466 */ 467export interface CommitUpdate<RecordType extends string> 468 extends CommitBase<RecordType> { 469 operation: typeof CommitType.Update; 470 record: ResolveLexicon<RecordType>; 471 cid: At.CID; 472} 473 474/** 475 * A commit event representing a deletion of an existing record. 476 */ 477export interface CommitDelete<RecordType extends string> 478 extends CommitBase<RecordType> { 479 operation: typeof CommitType.Delete; 480} 481 482/** 483 * A commit event. 484 */ 485export type Commit<RecordType extends string> = 486 | CommitCreate<RecordType> 487 | CommitUpdate<RecordType> 488 | CommitDelete<RecordType>;