the statusphere demo reworked into a vite/react app in a monorepo
at main 223 lines 5.5 kB view raw
1import { XyzStatusphereStatus } from '@statusphere/lexicon' 2import pino from 'pino' 3import WebSocket from 'ws' 4 5import type { Database } from '#/db' 6import { env } from '#/lib/env' 7 8export async function createJetstreamIngester(db: Database) { 9 const logger = pino({ name: 'jetstream ingestion' }) 10 11 const cursor = await db 12 .selectFrom('cursor') 13 .where('id', '=', 2) 14 .select('seq') 15 .executeTakeFirst() 16 17 logger.info(`start cursor: ${cursor?.seq}`) 18 19 // For throttling cursor writes 20 let lastCursorWrite = 0 21 22 return new Jetstream<XyzStatusphereStatus.Record>({ 23 instanceUrl: env.JETSTREAM_INSTANCE, 24 logger, 25 cursor: cursor?.seq || undefined, 26 setCursor: async (seq) => { 27 const now = Date.now() 28 29 if (now - lastCursorWrite >= 30000) { 30 lastCursorWrite = now 31 logger.info(`writing cursor: ${seq}`) 32 await db 33 .updateTable('cursor') 34 .set({ seq }) 35 .where('id', '=', 2) 36 .execute() 37 } 38 }, 39 handleEvent: async (evt) => { 40 // ignore account and identity events 41 if ( 42 evt.kind !== 'commit' || 43 evt.commit.collection !== 'xyz.statusphere.status' 44 ) 45 return 46 47 const now = new Date() 48 const uri = `at://${evt.did}/${evt.commit.collection}/${evt.commit.rkey}` 49 50 if ( 51 (evt.commit.operation === 'create' || 52 evt.commit.operation === 'update') && 53 XyzStatusphereStatus.isRecord(evt.commit.record) 54 ) { 55 const validatedRecord = XyzStatusphereStatus.validateRecord( 56 evt.commit.record, 57 ) 58 if (!validatedRecord.success) return 59 60 await db 61 .insertInto('status') 62 .values({ 63 uri, 64 authorDid: evt.did, 65 status: validatedRecord.value.status, 66 createdAt: validatedRecord.value.createdAt, 67 indexedAt: now.toISOString(), 68 }) 69 .onConflict((oc) => 70 oc.column('uri').doUpdateSet({ 71 status: validatedRecord.value.status, 72 indexedAt: now.toISOString(), 73 }), 74 ) 75 .execute() 76 } else if (evt.commit.operation === 'delete') { 77 await db.deleteFrom('status').where('uri', '=', uri).execute() 78 } 79 }, 80 onError: (err) => { 81 logger.error({ err }, 'error during jetstream ingestion') 82 }, 83 wantedCollections: ['xyz.statusphere.status'], 84 }) 85} 86 87export class Jetstream<T> { 88 private instanceUrl: string 89 private logger: pino.Logger 90 private handleEvent: (evt: JetstreamEvent<T>) => Promise<void> 91 private onError: (err: unknown) => void 92 private setCursor?: (seq: number) => Promise<void> 93 private cursor?: number 94 private ws?: WebSocket 95 private isStarted = false 96 private isDestroyed = false 97 private wantedCollections: string[] 98 99 constructor({ 100 instanceUrl, 101 logger, 102 cursor, 103 setCursor, 104 handleEvent, 105 onError, 106 wantedCollections, 107 }: { 108 instanceUrl: string 109 logger: pino.Logger 110 cursor?: number 111 setCursor?: (seq: number) => Promise<void> 112 handleEvent: (evt: any) => Promise<void> 113 onError: (err: any) => void 114 wantedCollections: string[] 115 }) { 116 this.instanceUrl = instanceUrl 117 this.logger = logger 118 this.cursor = cursor 119 this.setCursor = setCursor 120 this.handleEvent = handleEvent 121 this.onError = onError 122 this.wantedCollections = wantedCollections 123 } 124 125 constructUrlWithQuery = (): string => { 126 const params = new URLSearchParams() 127 params.append('wantedCollections', this.wantedCollections.join(',')) 128 if (this.cursor !== undefined) { 129 params.append('cursor', this.cursor.toString()) 130 } 131 return `${this.instanceUrl}/subscribe?${params.toString()}` 132 } 133 134 start() { 135 if (this.isStarted) return 136 this.isStarted = true 137 this.isDestroyed = false 138 this.ws = new WebSocket(this.constructUrlWithQuery()) 139 140 this.ws.on('open', () => { 141 this.logger.info('Jetstream connection opened.') 142 }) 143 144 this.ws.on('message', async (data) => { 145 try { 146 const event: JetstreamEvent<T> = JSON.parse(data.toString()) 147 148 // Update cursor if provided 149 if (event.time_us !== undefined && this.setCursor) { 150 await this.setCursor(event.time_us) 151 } 152 153 await this.handleEvent(event) 154 } catch (err) { 155 this.onError(err) 156 } 157 }) 158 159 this.ws.on('error', (err) => { 160 this.onError(err) 161 }) 162 163 this.ws.on('close', (code, reason) => { 164 if (!this.isDestroyed) { 165 this.logger.error(`Jetstream closed. Code: ${code}, Reason: ${reason}`) 166 } 167 this.isStarted = false 168 }) 169 } 170 171 destroy() { 172 if (this.ws) { 173 this.isDestroyed = true 174 this.ws.close() 175 this.isStarted = false 176 this.logger.info('jetstream destroyed gracefully') 177 } 178 } 179} 180 181type JetstreamEvent<T> = { 182 did: string 183 time_us: number 184} & (CommitEvent<T> | AccountEvent | IdentityEvent) 185 186type CommitEvent<T> = { 187 kind: 'commit' 188 commit: 189 | { 190 operation: 'create' | 'update' 191 record: T 192 rev: string 193 collection: string 194 rkey: string 195 cid: string 196 } 197 | { 198 operation: 'delete' 199 rev: string 200 collection: string 201 rkey: string 202 } 203} 204 205type IdentityEvent = { 206 kind: 'identity' 207 identity: { 208 did: string 209 handle: string 210 seq: number 211 time: string 212 } 213} 214 215type AccountEvent = { 216 kind: 'account' 217 account: { 218 active: boolean 219 did: string 220 seq: number 221 time: string 222 } 223}