WIP! A BB-style forum, on the ATmosphere! We're still working... we'll be back soon when we have something to show off!
node typescript hono htmx atproto
at root/atb-56-theme-caching-layer 91 lines 2.7 kB view raw
1import { type Database, firehoseCursor } from "@atbb/db"; 2import type { Logger } from "@atbb/logger"; 3import { eq } from "drizzle-orm"; 4 5/** 6 * Manages firehose cursor persistence in the database. 7 * 8 * The cursor tracks the last processed event timestamp (in microseconds) 9 * to enable resumption after restart or reconnection. 10 */ 11export class CursorManager { 12 constructor(private db: Database, private logger: Logger) {} 13 14 /** 15 * Load the last cursor from database 16 * 17 * @param service - Service name (default: "jetstream") 18 * @returns The last cursor value, or null if none exists 19 */ 20 async load(service: string = "jetstream"): Promise<bigint | null> { 21 try { 22 const result = await this.db 23 .select() 24 .from(firehoseCursor) 25 .where(eq(firehoseCursor.service, service)) 26 .limit(1); 27 28 return result.length > 0 ? result[0].cursor : null; 29 } catch (error) { 30 this.logger.error("Failed to load cursor from database", { 31 error: error instanceof Error ? error.message : String(error), 32 }); 33 return null; 34 } 35 } 36 37 /** 38 * Update the cursor in database 39 * 40 * @param timeUs - Timestamp in microseconds 41 * @param service - Service name (default: "jetstream") 42 */ 43 async update(timeUs: number, service: string = "jetstream"): Promise<void> { 44 try { 45 await this.db 46 .insert(firehoseCursor) 47 .values({ 48 service, 49 cursor: BigInt(timeUs), 50 updatedAt: new Date(), 51 }) 52 .onConflictDoUpdate({ 53 target: firehoseCursor.service, 54 set: { 55 cursor: BigInt(timeUs), 56 updatedAt: new Date(), 57 }, 58 }); 59 } catch (error) { 60 // Don't throw - we don't want cursor updates to break the stream 61 this.logger.error("Failed to update cursor", { 62 error: error instanceof Error ? error.message : String(error), 63 }); 64 } 65 } 66 67 /** 68 * Rewind cursor by specified microseconds for safety margin 69 * 70 * @param cursor - Current cursor value 71 * @param microseconds - Amount to rewind in microseconds 72 * @returns Rewound cursor value 73 */ 74 rewind(cursor: bigint, microseconds: number): bigint { 75 return cursor - BigInt(microseconds); 76 } 77 78 /** 79 * Calculate cursor age in hours. 80 * Cursor values are Jetstream timestamps in microseconds since epoch. 81 * 82 * @param cursor - Cursor value (microseconds), or null 83 * @returns Age in hours, or null if cursor is null 84 */ 85 getCursorAgeHours(cursor: bigint | null): number | null { 86 if (cursor === null) return null; 87 const cursorMs = Number(cursor / 1000n); 88 const ageMs = Date.now() - cursorMs; 89 return ageMs / (1000 * 60 * 60); 90 } 91}