forked from
samuel.fm/statusphere-react
the statusphere demo reworked into a vite/react app in a monorepo
1import { XyzStatusphereStatus } from '@statusphere/lexicon'
2import pino from 'pino'
3import WebSocket from 'ws'
4
5import type { Database } from '#/db'
6import { env } from '#/lib/env'
7
8export async function createJetstreamIngester(db: Database) {
9 const logger = pino({ name: 'jetstream ingestion' })
10
11 const cursor = await db
12 .selectFrom('cursor')
13 .where('id', '=', 2)
14 .select('seq')
15 .executeTakeFirst()
16
17 logger.info(`start cursor: ${cursor?.seq}`)
18
19 // For throttling cursor writes
20 let lastCursorWrite = 0
21
22 return new Jetstream<XyzStatusphereStatus.Record>({
23 instanceUrl: env.JETSTREAM_INSTANCE,
24 logger,
25 cursor: cursor?.seq || undefined,
26 setCursor: async (seq) => {
27 const now = Date.now()
28
29 if (now - lastCursorWrite >= 30000) {
30 lastCursorWrite = now
31 logger.info(`writing cursor: ${seq}`)
32 await db
33 .updateTable('cursor')
34 .set({ seq })
35 .where('id', '=', 2)
36 .execute()
37 }
38 },
39 handleEvent: async (evt) => {
40 // ignore account and identity events
41 if (
42 evt.kind !== 'commit' ||
43 evt.commit.collection !== 'xyz.statusphere.status'
44 )
45 return
46
47 const now = new Date()
48 const uri = `at://${evt.did}/${evt.commit.collection}/${evt.commit.rkey}`
49
50 if (
51 (evt.commit.operation === 'create' ||
52 evt.commit.operation === 'update') &&
53 XyzStatusphereStatus.isRecord(evt.commit.record)
54 ) {
55 const validatedRecord = XyzStatusphereStatus.validateRecord(
56 evt.commit.record,
57 )
58 if (!validatedRecord.success) return
59
60 await db
61 .insertInto('status')
62 .values({
63 uri,
64 authorDid: evt.did,
65 status: validatedRecord.value.status,
66 createdAt: validatedRecord.value.createdAt,
67 indexedAt: now.toISOString(),
68 })
69 .onConflict((oc) =>
70 oc.column('uri').doUpdateSet({
71 status: validatedRecord.value.status,
72 indexedAt: now.toISOString(),
73 }),
74 )
75 .execute()
76 } else if (evt.commit.operation === 'delete') {
77 await db.deleteFrom('status').where('uri', '=', uri).execute()
78 }
79 },
80 onError: (err) => {
81 logger.error({ err }, 'error during jetstream ingestion')
82 },
83 wantedCollections: ['xyz.statusphere.status'],
84 })
85}
86
87export class Jetstream<T> {
88 private instanceUrl: string
89 private logger: pino.Logger
90 private handleEvent: (evt: JetstreamEvent<T>) => Promise<void>
91 private onError: (err: unknown) => void
92 private setCursor?: (seq: number) => Promise<void>
93 private cursor?: number
94 private ws?: WebSocket
95 private isStarted = false
96 private isDestroyed = false
97 private wantedCollections: string[]
98
99 constructor({
100 instanceUrl,
101 logger,
102 cursor,
103 setCursor,
104 handleEvent,
105 onError,
106 wantedCollections,
107 }: {
108 instanceUrl: string
109 logger: pino.Logger
110 cursor?: number
111 setCursor?: (seq: number) => Promise<void>
112 handleEvent: (evt: any) => Promise<void>
113 onError: (err: any) => void
114 wantedCollections: string[]
115 }) {
116 this.instanceUrl = instanceUrl
117 this.logger = logger
118 this.cursor = cursor
119 this.setCursor = setCursor
120 this.handleEvent = handleEvent
121 this.onError = onError
122 this.wantedCollections = wantedCollections
123 }
124
125 constructUrlWithQuery = (): string => {
126 const params = new URLSearchParams()
127 params.append('wantedCollections', this.wantedCollections.join(','))
128 if (this.cursor !== undefined) {
129 params.append('cursor', this.cursor.toString())
130 }
131 return `${this.instanceUrl}/subscribe?${params.toString()}`
132 }
133
134 start() {
135 if (this.isStarted) return
136 this.isStarted = true
137 this.isDestroyed = false
138 this.ws = new WebSocket(this.constructUrlWithQuery())
139
140 this.ws.on('open', () => {
141 this.logger.info('Jetstream connection opened.')
142 })
143
144 this.ws.on('message', async (data) => {
145 try {
146 const event: JetstreamEvent<T> = JSON.parse(data.toString())
147
148 // Update cursor if provided
149 if (event.time_us !== undefined && this.setCursor) {
150 await this.setCursor(event.time_us)
151 }
152
153 await this.handleEvent(event)
154 } catch (err) {
155 this.onError(err)
156 }
157 })
158
159 this.ws.on('error', (err) => {
160 this.onError(err)
161 })
162
163 this.ws.on('close', (code, reason) => {
164 if (!this.isDestroyed) {
165 this.logger.error(`Jetstream closed. Code: ${code}, Reason: ${reason}`)
166 }
167 this.isStarted = false
168 })
169 }
170
171 destroy() {
172 if (this.ws) {
173 this.isDestroyed = true
174 this.ws.close()
175 this.isStarted = false
176 this.logger.info('jetstream destroyed gracefully')
177 }
178 }
179}
180
181type JetstreamEvent<T> = {
182 did: string
183 time_us: number
184} & (CommitEvent<T> | AccountEvent | IdentityEvent)
185
186type CommitEvent<T> = {
187 kind: 'commit'
188 commit:
189 | {
190 operation: 'create' | 'update'
191 record: T
192 rev: string
193 collection: string
194 rkey: string
195 cid: string
196 }
197 | {
198 operation: 'delete'
199 rev: string
200 collection: string
201 rkey: string
202 }
203}
204
205type IdentityEvent = {
206 kind: 'identity'
207 identity: {
208 did: string
209 handle: string
210 seq: number
211 time: string
212 }
213}
214
215type AccountEvent = {
216 kind: 'account'
217 account: {
218 active: boolean
219 did: string
220 seq: number
221 time: string
222 }
223}