WIP! A BB-style forum, on the ATmosphere!
We're still working... we'll be back soon when we have something to show off!
node
typescript
hono
htmx
atproto
1import { Jetstream } from "@skyware/jetstream";
2import { type Database } from "@atbb/db";
3import type { Logger } from "@atbb/logger";
4import { Indexer } from "./indexer.js";
5import { CursorManager } from "./cursor-manager.js";
6import { CircuitBreaker } from "./circuit-breaker.js";
7import { ReconnectionManager } from "./reconnection-manager.js";
8import { EventHandlerRegistry } from "./event-handler-registry.js";
9import type { BackfillManager } from "./backfill-manager.js";
10import { BackfillStatus } from "./backfill-manager.js";
11
12/**
13 * Firehose service that subscribes to AT Proto Jetstream
14 * and indexes space.atbb.* records into the database.
15 *
16 * Responsibilities:
17 * - WebSocket connection management via Jetstream
18 * - Event routing to indexer handlers
19 * - Health status monitoring
20 *
21 * Delegates to:
22 * - CursorManager: cursor persistence
23 * - CircuitBreaker: failure tracking and circuit breaking
24 * - ReconnectionManager: reconnection with exponential backoff
25 */
26export class FirehoseService {
27 private jetstream: Jetstream;
28 private indexer: Indexer;
29 private running = false;
30 private lastEventTime: Date | null = null;
31 private cursorManager: CursorManager;
32 private circuitBreaker: CircuitBreaker;
33 private reconnectionManager: ReconnectionManager;
34
35 // Event handler registry
36 private handlerRegistry: EventHandlerRegistry;
37
38 private backfillManager: BackfillManager | null = null;
39
40 // Guard: only run startup backfill on the initial start, not on reconnects.
41 private isInitialStart = true;
42
43 // Collections we're interested in (full lexicon IDs)
44 private readonly wantedCollections: string[];
45
46 constructor(
47 private db: Database,
48 private jetstreamUrl: string,
49 private logger: Logger
50 ) {
51 // Initialize the indexer instance with the database
52 this.indexer = new Indexer(db, logger);
53
54 // Initialize helper classes
55 this.cursorManager = new CursorManager(db, logger);
56 this.circuitBreaker = new CircuitBreaker(100, () => this.stop(), logger);
57 this.reconnectionManager = new ReconnectionManager(10, 5000, logger);
58
59 // Build handler registry
60 this.handlerRegistry = this.createHandlerRegistry();
61 this.wantedCollections = this.handlerRegistry.getCollections();
62
63 // Initialize with a placeholder - will be recreated with cursor in start()
64 this.jetstream = this.createJetstream();
65 this.setupEventHandlers();
66 }
67
68 /**
69 * Create a new Jetstream instance with optional cursor
70 */
71 private createJetstream(cursor?: number): Jetstream {
72 return new Jetstream({
73 wantedCollections: this.wantedCollections,
74 endpoint: this.jetstreamUrl,
75 cursor,
76 });
77 }
78
79 /**
80 * Factory method that creates a wrapped handler for a given Indexer method.
81 * The returned handler delegates to the indexer method with circuit breaker protection.
82 */
83 private createWrappedHandler<M extends keyof Indexer>(methodName: M) {
84 return async (event: any) => {
85 await this.circuitBreaker.execute(
86 () => (this.indexer[methodName] as any).call(this.indexer, event),
87 methodName as string
88 );
89 };
90 }
91
92 /**
93 * Create and configure the event handler registry
94 */
95 private createHandlerRegistry(): EventHandlerRegistry {
96 return new EventHandlerRegistry()
97 .register({
98 collection: "space.atbb.post",
99 onCreate: this.createWrappedHandler("handlePostCreate"),
100 onUpdate: this.createWrappedHandler("handlePostUpdate"),
101 onDelete: this.createWrappedHandler("handlePostDelete"),
102 })
103 .register({
104 collection: "space.atbb.forum.forum",
105 onCreate: this.createWrappedHandler("handleForumCreate"),
106 onUpdate: this.createWrappedHandler("handleForumUpdate"),
107 onDelete: this.createWrappedHandler("handleForumDelete"),
108 })
109 .register({
110 collection: "space.atbb.forum.category",
111 onCreate: this.createWrappedHandler("handleCategoryCreate"),
112 onUpdate: this.createWrappedHandler("handleCategoryUpdate"),
113 onDelete: this.createWrappedHandler("handleCategoryDelete"),
114 })
115 .register({
116 collection: "space.atbb.forum.board",
117 onCreate: this.createWrappedHandler("handleBoardCreate"),
118 onUpdate: this.createWrappedHandler("handleBoardUpdate"),
119 onDelete: this.createWrappedHandler("handleBoardDelete"),
120 })
121 .register({
122 collection: "space.atbb.forum.role",
123 onCreate: this.createWrappedHandler("handleRoleCreate"),
124 onUpdate: this.createWrappedHandler("handleRoleUpdate"),
125 onDelete: this.createWrappedHandler("handleRoleDelete"),
126 })
127 .register({
128 collection: "space.atbb.membership",
129 onCreate: this.createWrappedHandler("handleMembershipCreate"),
130 onUpdate: this.createWrappedHandler("handleMembershipUpdate"),
131 onDelete: this.createWrappedHandler("handleMembershipDelete"),
132 })
133 .register({
134 collection: "space.atbb.modAction",
135 onCreate: this.createWrappedHandler("handleModActionCreate"),
136 onUpdate: this.createWrappedHandler("handleModActionUpdate"),
137 onDelete: this.createWrappedHandler("handleModActionDelete"),
138 })
139 .register({
140 collection: "space.atbb.reaction",
141 onCreate: this.createWrappedHandler("handleReactionCreate"),
142 onUpdate: this.createWrappedHandler("handleReactionUpdate"),
143 onDelete: this.createWrappedHandler("handleReactionDelete"),
144 })
145 .register({
146 collection: "space.atbb.forum.theme",
147 onCreate: this.createWrappedHandler("handleThemeCreate"),
148 onUpdate: this.createWrappedHandler("handleThemeUpdate"),
149 onDelete: this.createWrappedHandler("handleThemeDelete"),
150 })
151 .register({
152 collection: "space.atbb.forum.themePolicy",
153 onCreate: this.createWrappedHandler("handleThemePolicyCreate"),
154 onUpdate: this.createWrappedHandler("handleThemePolicyUpdate"),
155 onDelete: this.createWrappedHandler("handleThemePolicyDelete"),
156 });
157 }
158
159 /**
160 * Set up event handlers using the registry
161 */
162 private setupEventHandlers() {
163 // Apply all handlers from the registry
164 this.handlerRegistry.applyTo(this.jetstream);
165
166 // Listen to all commits to track cursor and last event time
167 this.jetstream.on("commit", async (event) => {
168 this.lastEventTime = new Date();
169 await this.cursorManager.update(event.time_us);
170 });
171
172 // Handle errors and disconnections
173 this.jetstream.on("error", (error) => {
174 this.logger.error("Jetstream error", { error: error instanceof Error ? error.message : String(error) });
175 this.handleReconnect();
176 });
177 }
178
179 /**
180 * Start the firehose subscription
181 */
182 async start() {
183 if (this.running) {
184 this.logger.warn("Firehose service is already running");
185 return;
186 }
187
188 // Check for backfill before starting firehose — only on the initial start.
189 // Reconnects skip this block to avoid re-running a completed backfill every
190 // time the Jetstream WebSocket drops and recovers.
191 // Wrapped in try-catch so a transient DB error at startup doesn't kill the process —
192 // stale data served from the firehose is better than no data at all.
193 if (this.isInitialStart && this.backfillManager) {
194 this.isInitialStart = false;
195 try {
196 const interrupted = await this.backfillManager.checkForInterruptedBackfill();
197 if (interrupted) {
198 this.logger.info("Resuming interrupted backfill", {
199 event: "firehose.backfill.resuming_interrupted",
200 backfillId: interrupted.id.toString(),
201 lastProcessedDid: interrupted.lastProcessedDid,
202 });
203 await this.backfillManager.resumeBackfill(interrupted);
204 this.logger.info("Interrupted backfill resumed", {
205 event: "firehose.backfill.resumed",
206 backfillId: interrupted.id.toString(),
207 });
208 } else {
209 const savedCursorForCheck = await this.cursorManager.load();
210 const backfillStatus = await this.backfillManager.checkIfNeeded(savedCursorForCheck);
211
212 if (backfillStatus !== BackfillStatus.NotNeeded) {
213 this.logger.info("Starting backfill", {
214 event: "firehose.backfill.starting",
215 type: backfillStatus,
216 });
217 await this.backfillManager.performBackfill(backfillStatus);
218 this.logger.info("Backfill completed", {
219 event: "firehose.backfill.completed",
220 type: backfillStatus,
221 });
222 }
223 }
224 } catch (error) {
225 this.logger.error("Backfill skipped due to startup error — firehose will start without it", {
226 event: "firehose.backfill.startup_error",
227 error: error instanceof Error ? error.message : String(error),
228 });
229 // Continue to start firehose — stale data is better than no data
230 }
231 }
232
233 try {
234 // Load the last cursor from database
235 const savedCursor = await this.cursorManager.load();
236 if (savedCursor) {
237 this.logger.info("Resuming from cursor", { cursor: savedCursor.toString() });
238 // Rewind by 10 seconds to ensure we don't miss any events
239 const rewindedCursor = this.cursorManager.rewind(savedCursor, 10_000_000);
240
241 // Recreate Jetstream instance with cursor
242 this.jetstream = this.createJetstream(Number(rewindedCursor));
243 this.setupEventHandlers();
244 }
245
246 this.logger.info("Starting Jetstream firehose subscription", { url: this.jetstreamUrl });
247 await this.jetstream.start();
248 this.running = true;
249 this.reconnectionManager.reset();
250 this.logger.info("Jetstream firehose subscription started successfully");
251 } catch (error) {
252 this.logger.error("Failed to start Jetstream firehose", { error: error instanceof Error ? error.message : String(error) });
253 this.handleReconnect();
254 }
255 }
256
257 /**
258 * Stop the firehose subscription
259 */
260 async stop() {
261 if (!this.running) {
262 return;
263 }
264
265 this.logger.info("Stopping Jetstream firehose subscription");
266 await this.jetstream.close();
267 this.running = false;
268 this.logger.info("Jetstream firehose subscription stopped");
269 }
270
271 /**
272 * Check if the firehose is currently running
273 */
274 isRunning(): boolean {
275 return this.running;
276 }
277
278 /**
279 * Get the timestamp of the last received event
280 */
281 getLastEventTime(): Date | null {
282 return this.lastEventTime;
283 }
284
285 /**
286 * Inject the BackfillManager. Called during AppContext wiring.
287 */
288 setBackfillManager(manager: BackfillManager): void {
289 this.backfillManager = manager;
290 }
291
292 /**
293 * Expose the Indexer instance for BackfillManager wiring.
294 */
295 getIndexer(): Indexer {
296 return this.indexer;
297 }
298
299 /**
300 * Handle reconnection with exponential backoff
301 */
302 private async handleReconnect() {
303 try {
304 await this.reconnectionManager.attemptReconnect(async () => {
305 this.running = false;
306 await this.start();
307 });
308 } catch (error) {
309 this.logger.fatal("Firehose indexing has stopped. The appview will continue serving stale data.", {
310 event: "firehose.reconnect.exhausted",
311 error: error instanceof Error ? error.message : String(error),
312 });
313 this.running = false;
314 }
315 }
316
317}