WIP! A BB-style forum, on the ATmosphere!
We're still working... we'll be back soon when we have something to show off!
node
typescript
hono
htmx
atproto
1import { Jetstream } from "@skyware/jetstream";
2import { type Database } from "@atbb/db";
3import type { Logger } from "@atbb/logger";
4import { Indexer } from "./indexer.js";
5import { CursorManager } from "./cursor-manager.js";
6import { CircuitBreaker } from "./circuit-breaker.js";
7import { ReconnectionManager } from "./reconnection-manager.js";
8import { EventHandlerRegistry } from "./event-handler-registry.js";
9import type { BackfillManager } from "./backfill-manager.js";
10import { BackfillStatus } from "./backfill-manager.js";
11
12/**
13 * Firehose service that subscribes to AT Proto Jetstream
14 * and indexes space.atbb.* records into the database.
15 *
16 * Responsibilities:
17 * - WebSocket connection management via Jetstream
18 * - Event routing to indexer handlers
19 * - Health status monitoring
20 *
21 * Delegates to:
22 * - CursorManager: cursor persistence
23 * - CircuitBreaker: failure tracking and circuit breaking
24 * - ReconnectionManager: reconnection with exponential backoff
25 */
26export class FirehoseService {
27 private jetstream: Jetstream;
28 private indexer: Indexer;
29 private running = false;
30 private lastEventTime: Date | null = null;
31 private cursorManager: CursorManager;
32 private circuitBreaker: CircuitBreaker;
33 private reconnectionManager: ReconnectionManager;
34
35 // Event handler registry
36 private handlerRegistry: EventHandlerRegistry;
37
38 private backfillManager: BackfillManager | null = null;
39
40 // Guard: only run startup backfill on the initial start, not on reconnects.
41 private isInitialStart = true;
42
43 // Collections we're interested in (full lexicon IDs)
44 private readonly wantedCollections: string[];
45
46 constructor(
47 private db: Database,
48 private jetstreamUrl: string,
49 private logger: Logger
50 ) {
51 // Initialize the indexer instance with the database
52 this.indexer = new Indexer(db, logger);
53
54 // Initialize helper classes
55 this.cursorManager = new CursorManager(db, logger);
56 this.circuitBreaker = new CircuitBreaker(100, () => this.stop(), logger);
57 this.reconnectionManager = new ReconnectionManager(10, 5000, logger);
58
59 // Build handler registry
60 this.handlerRegistry = this.createHandlerRegistry();
61 this.wantedCollections = this.handlerRegistry.getCollections();
62
63 // Initialize with a placeholder - will be recreated with cursor in start()
64 this.jetstream = this.createJetstream();
65 this.setupEventHandlers();
66 }
67
68 /**
69 * Create a new Jetstream instance with optional cursor
70 */
71 private createJetstream(cursor?: number): Jetstream {
72 return new Jetstream({
73 wantedCollections: this.wantedCollections,
74 endpoint: this.jetstreamUrl,
75 cursor,
76 });
77 }
78
79 /**
80 * Factory method that creates a wrapped handler for a given Indexer method.
81 * The returned handler delegates to the indexer method with circuit breaker protection.
82 */
83 private createWrappedHandler<M extends keyof Indexer>(methodName: M) {
84 return async (event: any) => {
85 await this.circuitBreaker.execute(
86 () => (this.indexer[methodName] as any).call(this.indexer, event),
87 methodName as string
88 );
89 };
90 }
91
92 /**
93 * Create and configure the event handler registry
94 */
95 private createHandlerRegistry(): EventHandlerRegistry {
96 return new EventHandlerRegistry()
97 .register({
98 collection: "space.atbb.post",
99 onCreate: this.createWrappedHandler("handlePostCreate"),
100 onUpdate: this.createWrappedHandler("handlePostUpdate"),
101 onDelete: this.createWrappedHandler("handlePostDelete"),
102 })
103 .register({
104 collection: "space.atbb.forum.forum",
105 onCreate: this.createWrappedHandler("handleForumCreate"),
106 onUpdate: this.createWrappedHandler("handleForumUpdate"),
107 onDelete: this.createWrappedHandler("handleForumDelete"),
108 })
109 .register({
110 collection: "space.atbb.forum.category",
111 onCreate: this.createWrappedHandler("handleCategoryCreate"),
112 onUpdate: this.createWrappedHandler("handleCategoryUpdate"),
113 onDelete: this.createWrappedHandler("handleCategoryDelete"),
114 })
115 .register({
116 collection: "space.atbb.forum.board",
117 onCreate: this.createWrappedHandler("handleBoardCreate"),
118 onUpdate: this.createWrappedHandler("handleBoardUpdate"),
119 onDelete: this.createWrappedHandler("handleBoardDelete"),
120 })
121 .register({
122 collection: "space.atbb.forum.role",
123 onCreate: this.createWrappedHandler("handleRoleCreate"),
124 onUpdate: this.createWrappedHandler("handleRoleUpdate"),
125 onDelete: this.createWrappedHandler("handleRoleDelete"),
126 })
127 .register({
128 collection: "space.atbb.membership",
129 onCreate: this.createWrappedHandler("handleMembershipCreate"),
130 onUpdate: this.createWrappedHandler("handleMembershipUpdate"),
131 onDelete: this.createWrappedHandler("handleMembershipDelete"),
132 })
133 .register({
134 collection: "space.atbb.modAction",
135 onCreate: this.createWrappedHandler("handleModActionCreate"),
136 onUpdate: this.createWrappedHandler("handleModActionUpdate"),
137 onDelete: this.createWrappedHandler("handleModActionDelete"),
138 })
139 .register({
140 collection: "space.atbb.reaction",
141 onCreate: this.createWrappedHandler("handleReactionCreate"),
142 onUpdate: this.createWrappedHandler("handleReactionUpdate"),
143 onDelete: this.createWrappedHandler("handleReactionDelete"),
144 });
145 }
146
147 /**
148 * Set up event handlers using the registry
149 */
150 private setupEventHandlers() {
151 // Apply all handlers from the registry
152 this.handlerRegistry.applyTo(this.jetstream);
153
154 // Listen to all commits to track cursor and last event time
155 this.jetstream.on("commit", async (event) => {
156 this.lastEventTime = new Date();
157 await this.cursorManager.update(event.time_us);
158 });
159
160 // Handle errors and disconnections
161 this.jetstream.on("error", (error) => {
162 this.logger.error("Jetstream error", { error: error instanceof Error ? error.message : String(error) });
163 this.handleReconnect();
164 });
165 }
166
167 /**
168 * Start the firehose subscription
169 */
170 async start() {
171 if (this.running) {
172 this.logger.warn("Firehose service is already running");
173 return;
174 }
175
176 // Check for backfill before starting firehose — only on the initial start.
177 // Reconnects skip this block to avoid re-running a completed backfill every
178 // time the Jetstream WebSocket drops and recovers.
179 // Wrapped in try-catch so a transient DB error at startup doesn't kill the process —
180 // stale data served from the firehose is better than no data at all.
181 if (this.isInitialStart && this.backfillManager) {
182 this.isInitialStart = false;
183 try {
184 const interrupted = await this.backfillManager.checkForInterruptedBackfill();
185 if (interrupted) {
186 this.logger.info("Resuming interrupted backfill", {
187 event: "firehose.backfill.resuming_interrupted",
188 backfillId: interrupted.id.toString(),
189 lastProcessedDid: interrupted.lastProcessedDid,
190 });
191 await this.backfillManager.resumeBackfill(interrupted);
192 this.logger.info("Interrupted backfill resumed", {
193 event: "firehose.backfill.resumed",
194 backfillId: interrupted.id.toString(),
195 });
196 } else {
197 const savedCursorForCheck = await this.cursorManager.load();
198 const backfillStatus = await this.backfillManager.checkIfNeeded(savedCursorForCheck);
199
200 if (backfillStatus !== BackfillStatus.NotNeeded) {
201 this.logger.info("Starting backfill", {
202 event: "firehose.backfill.starting",
203 type: backfillStatus,
204 });
205 await this.backfillManager.performBackfill(backfillStatus);
206 this.logger.info("Backfill completed", {
207 event: "firehose.backfill.completed",
208 type: backfillStatus,
209 });
210 }
211 }
212 } catch (error) {
213 this.logger.error("Backfill skipped due to startup error — firehose will start without it", {
214 event: "firehose.backfill.startup_error",
215 error: error instanceof Error ? error.message : String(error),
216 });
217 // Continue to start firehose — stale data is better than no data
218 }
219 }
220
221 try {
222 // Load the last cursor from database
223 const savedCursor = await this.cursorManager.load();
224 if (savedCursor) {
225 this.logger.info("Resuming from cursor", { cursor: savedCursor.toString() });
226 // Rewind by 10 seconds to ensure we don't miss any events
227 const rewindedCursor = this.cursorManager.rewind(savedCursor, 10_000_000);
228
229 // Recreate Jetstream instance with cursor
230 this.jetstream = this.createJetstream(Number(rewindedCursor));
231 this.setupEventHandlers();
232 }
233
234 this.logger.info("Starting Jetstream firehose subscription", { url: this.jetstreamUrl });
235 await this.jetstream.start();
236 this.running = true;
237 this.reconnectionManager.reset();
238 this.logger.info("Jetstream firehose subscription started successfully");
239 } catch (error) {
240 this.logger.error("Failed to start Jetstream firehose", { error: error instanceof Error ? error.message : String(error) });
241 this.handleReconnect();
242 }
243 }
244
245 /**
246 * Stop the firehose subscription
247 */
248 async stop() {
249 if (!this.running) {
250 return;
251 }
252
253 this.logger.info("Stopping Jetstream firehose subscription");
254 await this.jetstream.close();
255 this.running = false;
256 this.logger.info("Jetstream firehose subscription stopped");
257 }
258
259 /**
260 * Check if the firehose is currently running
261 */
262 isRunning(): boolean {
263 return this.running;
264 }
265
266 /**
267 * Get the timestamp of the last received event
268 */
269 getLastEventTime(): Date | null {
270 return this.lastEventTime;
271 }
272
273 /**
274 * Inject the BackfillManager. Called during AppContext wiring.
275 */
276 setBackfillManager(manager: BackfillManager): void {
277 this.backfillManager = manager;
278 }
279
280 /**
281 * Expose the Indexer instance for BackfillManager wiring.
282 */
283 getIndexer(): Indexer {
284 return this.indexer;
285 }
286
287 /**
288 * Handle reconnection with exponential backoff
289 */
290 private async handleReconnect() {
291 try {
292 await this.reconnectionManager.attemptReconnect(async () => {
293 this.running = false;
294 await this.start();
295 });
296 } catch (error) {
297 this.logger.fatal("Firehose indexing has stopped. The appview will continue serving stale data.", {
298 event: "firehose.reconnect.exhausted",
299 error: error instanceof Error ? error.message : String(error),
300 });
301 this.running = false;
302 }
303 }
304
305}