forked from
samuel.fm/statusphere-react
the statusphere demo reworked into a vite/react app in a monorepo
1import { IdResolver } from '@atproto/identity'
2import { Firehose, MemoryRunner, type Event } from '@atproto/sync'
3import { XyzStatusphereStatus } from '@statusphere/lexicon'
4import pino from 'pino'
5
6import type { Database } from '#/db'
7
8export async function createFirehoseIngester(
9 db: Database,
10 idResolver: IdResolver,
11) {
12 const logger = pino({ name: 'firehose ingestion' })
13
14 const cursor = await db
15 .selectFrom('cursor')
16 .where('id', '=', 1)
17 .select('seq')
18 .executeTakeFirst()
19
20 logger.info(`start cursor: ${cursor?.seq}`)
21
22 // For throttling cursor writes
23 let lastCursorWrite = 0
24
25 const runner = new MemoryRunner({
26 startCursor: cursor?.seq || undefined,
27 setCursor: async (seq) => {
28 const now = Date.now()
29
30 if (now - lastCursorWrite >= 10000) {
31 lastCursorWrite = now
32 await db
33 .updateTable('cursor')
34 .set({ seq })
35 .where('id', '=', 1)
36 .execute()
37 }
38 },
39 })
40
41 return new Firehose({
42 idResolver,
43 runner,
44 handleEvent: async (evt: Event) => {
45 // Watch for write events
46 if (evt.event === 'create' || evt.event === 'update') {
47 const now = new Date()
48 const record = evt.record
49
50 // If the write is a valid status update
51 if (
52 evt.collection === 'xyz.statusphere.status' &&
53 XyzStatusphereStatus.isRecord(record)
54 ) {
55 const validatedRecord = XyzStatusphereStatus.validateRecord(record)
56 if (!validatedRecord.success) return
57 // Store the status in our SQLite
58 await db
59 .insertInto('status')
60 .values({
61 uri: evt.uri.toString(),
62 authorDid: evt.did,
63 status: validatedRecord.value.status,
64 createdAt: validatedRecord.value.createdAt,
65 indexedAt: now.toISOString(),
66 })
67 .onConflict((oc) =>
68 oc.column('uri').doUpdateSet({
69 status: validatedRecord.value.status,
70 indexedAt: now.toISOString(),
71 }),
72 )
73 .execute()
74 }
75 } else if (
76 evt.event === 'delete' &&
77 evt.collection === 'xyz.statusphere.status'
78 ) {
79 // Remove the status from our SQLite
80 await db
81 .deleteFrom('status')
82 .where('uri', '=', evt.uri.toString())
83 .execute()
84 }
85 },
86 onError: (err: Error) => {
87 logger.error({ err }, 'error on firehose ingestion')
88 },
89 filterCollections: ['xyz.statusphere.status'],
90 excludeIdentity: true,
91 excludeAccount: true,
92 })
93}