WIP! A BB-style forum, on the ATmosphere!
We're still working... we'll be back soon when we have something to show off!
node
typescript
hono
htmx
atproto
1import { type Database, firehoseCursor } from "@atbb/db";
2import type { Logger } from "@atbb/logger";
3import { eq } from "drizzle-orm";
4
5/**
6 * Manages firehose cursor persistence in the database.
7 *
8 * The cursor tracks the last processed event timestamp (in microseconds)
9 * to enable resumption after restart or reconnection.
10 */
11export class CursorManager {
12 constructor(private db: Database, private logger: Logger) {}
13
14 /**
15 * Load the last cursor from database
16 *
17 * @param service - Service name (default: "jetstream")
18 * @returns The last cursor value, or null if none exists
19 */
20 async load(service: string = "jetstream"): Promise<bigint | null> {
21 try {
22 const result = await this.db
23 .select()
24 .from(firehoseCursor)
25 .where(eq(firehoseCursor.service, service))
26 .limit(1);
27
28 return result.length > 0 ? result[0].cursor : null;
29 } catch (error) {
30 this.logger.error("Failed to load cursor from database", {
31 error: error instanceof Error ? error.message : String(error),
32 });
33 return null;
34 }
35 }
36
37 /**
38 * Update the cursor in database
39 *
40 * @param timeUs - Timestamp in microseconds
41 * @param service - Service name (default: "jetstream")
42 */
43 async update(timeUs: number, service: string = "jetstream"): Promise<void> {
44 try {
45 await this.db
46 .insert(firehoseCursor)
47 .values({
48 service,
49 cursor: BigInt(timeUs),
50 updatedAt: new Date(),
51 })
52 .onConflictDoUpdate({
53 target: firehoseCursor.service,
54 set: {
55 cursor: BigInt(timeUs),
56 updatedAt: new Date(),
57 },
58 });
59 } catch (error) {
60 // Don't throw - we don't want cursor updates to break the stream
61 this.logger.error("Failed to update cursor", {
62 error: error instanceof Error ? error.message : String(error),
63 });
64 }
65 }
66
67 /**
68 * Rewind cursor by specified microseconds for safety margin
69 *
70 * @param cursor - Current cursor value
71 * @param microseconds - Amount to rewind in microseconds
72 * @returns Rewound cursor value
73 */
74 rewind(cursor: bigint, microseconds: number): bigint {
75 return cursor - BigInt(microseconds);
76 }
77
78 /**
79 * Calculate cursor age in hours.
80 * Cursor values are Jetstream timestamps in microseconds since epoch.
81 *
82 * @param cursor - Cursor value (microseconds), or null
83 * @returns Age in hours, or null if cursor is null
84 */
85 getCursorAgeHours(cursor: bigint | null): number | null {
86 if (cursor === null) return null;
87 const cursorMs = Number(cursor / 1000n);
88 const ageMs = Date.now() - cursorMs;
89 return ageMs / (1000 * 60 * 60);
90 }
91}