the statusphere demo reworked into a vite/react app in a monorepo
at main 93 lines 2.6 kB view raw
1import { IdResolver } from '@atproto/identity' 2import { Firehose, MemoryRunner, type Event } from '@atproto/sync' 3import { XyzStatusphereStatus } from '@statusphere/lexicon' 4import pino from 'pino' 5 6import type { Database } from '#/db' 7 8export async function createFirehoseIngester( 9 db: Database, 10 idResolver: IdResolver, 11) { 12 const logger = pino({ name: 'firehose ingestion' }) 13 14 const cursor = await db 15 .selectFrom('cursor') 16 .where('id', '=', 1) 17 .select('seq') 18 .executeTakeFirst() 19 20 logger.info(`start cursor: ${cursor?.seq}`) 21 22 // For throttling cursor writes 23 let lastCursorWrite = 0 24 25 const runner = new MemoryRunner({ 26 startCursor: cursor?.seq || undefined, 27 setCursor: async (seq) => { 28 const now = Date.now() 29 30 if (now - lastCursorWrite >= 10000) { 31 lastCursorWrite = now 32 await db 33 .updateTable('cursor') 34 .set({ seq }) 35 .where('id', '=', 1) 36 .execute() 37 } 38 }, 39 }) 40 41 return new Firehose({ 42 idResolver, 43 runner, 44 handleEvent: async (evt: Event) => { 45 // Watch for write events 46 if (evt.event === 'create' || evt.event === 'update') { 47 const now = new Date() 48 const record = evt.record 49 50 // If the write is a valid status update 51 if ( 52 evt.collection === 'xyz.statusphere.status' && 53 XyzStatusphereStatus.isRecord(record) 54 ) { 55 const validatedRecord = XyzStatusphereStatus.validateRecord(record) 56 if (!validatedRecord.success) return 57 // Store the status in our SQLite 58 await db 59 .insertInto('status') 60 .values({ 61 uri: evt.uri.toString(), 62 authorDid: evt.did, 63 status: validatedRecord.value.status, 64 createdAt: validatedRecord.value.createdAt, 65 indexedAt: now.toISOString(), 66 }) 67 .onConflict((oc) => 68 oc.column('uri').doUpdateSet({ 69 status: validatedRecord.value.status, 70 indexedAt: now.toISOString(), 71 }), 72 ) 73 .execute() 74 } 75 } else if ( 76 evt.event === 'delete' && 77 evt.collection === 'xyz.statusphere.status' 78 ) { 79 // Remove the status from our SQLite 80 await db 81 .deleteFrom('status') 82 .where('uri', '=', evt.uri.toString()) 83 .execute() 84 } 85 }, 86 onError: (err: Error) => { 87 logger.error({ err }, 'error on firehose ingestion') 88 }, 89 filterCollections: ['xyz.statusphere.status'], 90 excludeIdentity: true, 91 excludeAccount: true, 92 }) 93}