import { type Database, firehoseCursor } from "@atbb/db"; import type { Logger } from "@atbb/logger"; import { eq } from "drizzle-orm"; /** * Manages firehose cursor persistence in the database. * * The cursor tracks the last processed event timestamp (in microseconds) * to enable resumption after restart or reconnection. */ export class CursorManager { constructor(private db: Database, private logger: Logger) {} /** * Load the last cursor from database * * @param service - Service name (default: "jetstream") * @returns The last cursor value, or null if none exists */ async load(service: string = "jetstream"): Promise { try { const result = await this.db .select() .from(firehoseCursor) .where(eq(firehoseCursor.service, service)) .limit(1); return result.length > 0 ? result[0].cursor : null; } catch (error) { this.logger.error("Failed to load cursor from database", { error: error instanceof Error ? error.message : String(error), }); return null; } } /** * Update the cursor in database * * @param timeUs - Timestamp in microseconds * @param service - Service name (default: "jetstream") */ async update(timeUs: number, service: string = "jetstream"): Promise { try { await this.db .insert(firehoseCursor) .values({ service, cursor: BigInt(timeUs), updatedAt: new Date(), }) .onConflictDoUpdate({ target: firehoseCursor.service, set: { cursor: BigInt(timeUs), updatedAt: new Date(), }, }); } catch (error) { // Don't throw - we don't want cursor updates to break the stream this.logger.error("Failed to update cursor", { error: error instanceof Error ? error.message : String(error), }); } } /** * Rewind cursor by specified microseconds for safety margin * * @param cursor - Current cursor value * @param microseconds - Amount to rewind in microseconds * @returns Rewound cursor value */ rewind(cursor: bigint, microseconds: number): bigint { return cursor - BigInt(microseconds); } /** * Calculate cursor age in hours. * Cursor values are Jetstream timestamps in microseconds since epoch. * * @param cursor - Cursor value (microseconds), or null * @returns Age in hours, or null if cursor is null */ getCursorAgeHours(cursor: bigint | null): number | null { if (cursor === null) return null; const cursorMs = Number(cursor / 1000n); const ageMs = Date.now() - cursorMs; return ageMs / (1000 * 60 * 60); } }