import { Jetstream } from "@skyware/jetstream"; import { type Database } from "@atbb/db"; import type { Logger } from "@atbb/logger"; import { Indexer } from "./indexer.js"; import { CursorManager } from "./cursor-manager.js"; import { CircuitBreaker } from "./circuit-breaker.js"; import { ReconnectionManager } from "./reconnection-manager.js"; import { EventHandlerRegistry } from "./event-handler-registry.js"; import type { BackfillManager } from "./backfill-manager.js"; import { BackfillStatus } from "./backfill-manager.js"; /** * Firehose service that subscribes to AT Proto Jetstream * and indexes space.atbb.* records into the database. * * Responsibilities: * - WebSocket connection management via Jetstream * - Event routing to indexer handlers * - Health status monitoring * * Delegates to: * - CursorManager: cursor persistence * - CircuitBreaker: failure tracking and circuit breaking * - ReconnectionManager: reconnection with exponential backoff */ export class FirehoseService { private jetstream: Jetstream; private indexer: Indexer; private running = false; private lastEventTime: Date | null = null; private cursorManager: CursorManager; private circuitBreaker: CircuitBreaker; private reconnectionManager: ReconnectionManager; // Event handler registry private handlerRegistry: EventHandlerRegistry; private backfillManager: BackfillManager | null = null; // Guard: only run startup backfill on the initial start, not on reconnects. private isInitialStart = true; // Collections we're interested in (full lexicon IDs) private readonly wantedCollections: string[]; constructor( private db: Database, private jetstreamUrl: string, private logger: Logger ) { // Initialize the indexer instance with the database this.indexer = new Indexer(db, logger); // Initialize helper classes this.cursorManager = new CursorManager(db, logger); this.circuitBreaker = new CircuitBreaker(100, () => this.stop(), logger); this.reconnectionManager = new ReconnectionManager(10, 5000, logger); // Build handler registry this.handlerRegistry = this.createHandlerRegistry(); this.wantedCollections = this.handlerRegistry.getCollections(); // Initialize with a placeholder - will be recreated with cursor in start() this.jetstream = this.createJetstream(); this.setupEventHandlers(); } /** * Create a new Jetstream instance with optional cursor */ private createJetstream(cursor?: number): Jetstream { return new Jetstream({ wantedCollections: this.wantedCollections, endpoint: this.jetstreamUrl, cursor, }); } /** * Factory method that creates a wrapped handler for a given Indexer method. * The returned handler delegates to the indexer method with circuit breaker protection. */ private createWrappedHandler(methodName: M) { return async (event: any) => { await this.circuitBreaker.execute( () => (this.indexer[methodName] as any).call(this.indexer, event), methodName as string ); }; } /** * Create and configure the event handler registry */ private createHandlerRegistry(): EventHandlerRegistry { return new EventHandlerRegistry() .register({ collection: "space.atbb.post", onCreate: this.createWrappedHandler("handlePostCreate"), onUpdate: this.createWrappedHandler("handlePostUpdate"), onDelete: this.createWrappedHandler("handlePostDelete"), }) .register({ collection: "space.atbb.forum.forum", onCreate: this.createWrappedHandler("handleForumCreate"), onUpdate: this.createWrappedHandler("handleForumUpdate"), onDelete: this.createWrappedHandler("handleForumDelete"), }) .register({ collection: "space.atbb.forum.category", onCreate: this.createWrappedHandler("handleCategoryCreate"), onUpdate: this.createWrappedHandler("handleCategoryUpdate"), onDelete: this.createWrappedHandler("handleCategoryDelete"), }) .register({ collection: "space.atbb.forum.board", onCreate: this.createWrappedHandler("handleBoardCreate"), onUpdate: this.createWrappedHandler("handleBoardUpdate"), onDelete: this.createWrappedHandler("handleBoardDelete"), }) .register({ collection: "space.atbb.forum.role", onCreate: this.createWrappedHandler("handleRoleCreate"), onUpdate: this.createWrappedHandler("handleRoleUpdate"), onDelete: this.createWrappedHandler("handleRoleDelete"), }) .register({ collection: "space.atbb.membership", onCreate: this.createWrappedHandler("handleMembershipCreate"), onUpdate: this.createWrappedHandler("handleMembershipUpdate"), onDelete: this.createWrappedHandler("handleMembershipDelete"), }) .register({ collection: "space.atbb.modAction", onCreate: this.createWrappedHandler("handleModActionCreate"), onUpdate: this.createWrappedHandler("handleModActionUpdate"), onDelete: this.createWrappedHandler("handleModActionDelete"), }) .register({ collection: "space.atbb.reaction", onCreate: this.createWrappedHandler("handleReactionCreate"), onUpdate: this.createWrappedHandler("handleReactionUpdate"), onDelete: this.createWrappedHandler("handleReactionDelete"), }) .register({ collection: "space.atbb.forum.theme", onCreate: this.createWrappedHandler("handleThemeCreate"), onUpdate: this.createWrappedHandler("handleThemeUpdate"), onDelete: this.createWrappedHandler("handleThemeDelete"), }) .register({ collection: "space.atbb.forum.themePolicy", onCreate: this.createWrappedHandler("handleThemePolicyCreate"), onUpdate: this.createWrappedHandler("handleThemePolicyUpdate"), onDelete: this.createWrappedHandler("handleThemePolicyDelete"), }); } /** * Set up event handlers using the registry */ private setupEventHandlers() { // Apply all handlers from the registry this.handlerRegistry.applyTo(this.jetstream); // Listen to all commits to track cursor and last event time this.jetstream.on("commit", async (event) => { this.lastEventTime = new Date(); await this.cursorManager.update(event.time_us); }); // Handle errors and disconnections this.jetstream.on("error", (error) => { this.logger.error("Jetstream error", { error: error instanceof Error ? error.message : String(error) }); this.handleReconnect(); }); } /** * Start the firehose subscription */ async start() { if (this.running) { this.logger.warn("Firehose service is already running"); return; } // Check for backfill before starting firehose — only on the initial start. // Reconnects skip this block to avoid re-running a completed backfill every // time the Jetstream WebSocket drops and recovers. // Wrapped in try-catch so a transient DB error at startup doesn't kill the process — // stale data served from the firehose is better than no data at all. if (this.isInitialStart && this.backfillManager) { this.isInitialStart = false; try { const interrupted = await this.backfillManager.checkForInterruptedBackfill(); if (interrupted) { this.logger.info("Resuming interrupted backfill", { event: "firehose.backfill.resuming_interrupted", backfillId: interrupted.id.toString(), lastProcessedDid: interrupted.lastProcessedDid, }); await this.backfillManager.resumeBackfill(interrupted); this.logger.info("Interrupted backfill resumed", { event: "firehose.backfill.resumed", backfillId: interrupted.id.toString(), }); } else { const savedCursorForCheck = await this.cursorManager.load(); const backfillStatus = await this.backfillManager.checkIfNeeded(savedCursorForCheck); if (backfillStatus !== BackfillStatus.NotNeeded) { this.logger.info("Starting backfill", { event: "firehose.backfill.starting", type: backfillStatus, }); await this.backfillManager.performBackfill(backfillStatus); this.logger.info("Backfill completed", { event: "firehose.backfill.completed", type: backfillStatus, }); } } } catch (error) { this.logger.error("Backfill skipped due to startup error — firehose will start without it", { event: "firehose.backfill.startup_error", error: error instanceof Error ? error.message : String(error), }); // Continue to start firehose — stale data is better than no data } } try { // Load the last cursor from database const savedCursor = await this.cursorManager.load(); if (savedCursor) { this.logger.info("Resuming from cursor", { cursor: savedCursor.toString() }); // Rewind by 10 seconds to ensure we don't miss any events const rewindedCursor = this.cursorManager.rewind(savedCursor, 10_000_000); // Recreate Jetstream instance with cursor this.jetstream = this.createJetstream(Number(rewindedCursor)); this.setupEventHandlers(); } this.logger.info("Starting Jetstream firehose subscription", { url: this.jetstreamUrl }); await this.jetstream.start(); this.running = true; this.reconnectionManager.reset(); this.logger.info("Jetstream firehose subscription started successfully"); } catch (error) { this.logger.error("Failed to start Jetstream firehose", { error: error instanceof Error ? error.message : String(error) }); this.handleReconnect(); } } /** * Stop the firehose subscription */ async stop() { if (!this.running) { return; } this.logger.info("Stopping Jetstream firehose subscription"); await this.jetstream.close(); this.running = false; this.logger.info("Jetstream firehose subscription stopped"); } /** * Check if the firehose is currently running */ isRunning(): boolean { return this.running; } /** * Get the timestamp of the last received event */ getLastEventTime(): Date | null { return this.lastEventTime; } /** * Inject the BackfillManager. Called during AppContext wiring. */ setBackfillManager(manager: BackfillManager): void { this.backfillManager = manager; } /** * Expose the Indexer instance for BackfillManager wiring. */ getIndexer(): Indexer { return this.indexer; } /** * Handle reconnection with exponential backoff */ private async handleReconnect() { try { await this.reconnectionManager.attemptReconnect(async () => { this.running = false; await this.start(); }); } catch (error) { this.logger.fatal("Firehose indexing has stopped. The appview will continue serving stale data.", { event: "firehose.reconnect.exhausted", error: error instanceof Error ? error.message : String(error), }); this.running = false; } } }