WIP! A BB-style forum, on the ATmosphere! We're still working... we'll be back soon when we have something to show off!
node typescript hono htmx atproto
at root/atb-56-theme-caching-layer 317 lines 11 kB view raw
1import { Jetstream } from "@skyware/jetstream"; 2import { type Database } from "@atbb/db"; 3import type { Logger } from "@atbb/logger"; 4import { Indexer } from "./indexer.js"; 5import { CursorManager } from "./cursor-manager.js"; 6import { CircuitBreaker } from "./circuit-breaker.js"; 7import { ReconnectionManager } from "./reconnection-manager.js"; 8import { EventHandlerRegistry } from "./event-handler-registry.js"; 9import type { BackfillManager } from "./backfill-manager.js"; 10import { BackfillStatus } from "./backfill-manager.js"; 11 12/** 13 * Firehose service that subscribes to AT Proto Jetstream 14 * and indexes space.atbb.* records into the database. 15 * 16 * Responsibilities: 17 * - WebSocket connection management via Jetstream 18 * - Event routing to indexer handlers 19 * - Health status monitoring 20 * 21 * Delegates to: 22 * - CursorManager: cursor persistence 23 * - CircuitBreaker: failure tracking and circuit breaking 24 * - ReconnectionManager: reconnection with exponential backoff 25 */ 26export class FirehoseService { 27 private jetstream: Jetstream; 28 private indexer: Indexer; 29 private running = false; 30 private lastEventTime: Date | null = null; 31 private cursorManager: CursorManager; 32 private circuitBreaker: CircuitBreaker; 33 private reconnectionManager: ReconnectionManager; 34 35 // Event handler registry 36 private handlerRegistry: EventHandlerRegistry; 37 38 private backfillManager: BackfillManager | null = null; 39 40 // Guard: only run startup backfill on the initial start, not on reconnects. 41 private isInitialStart = true; 42 43 // Collections we're interested in (full lexicon IDs) 44 private readonly wantedCollections: string[]; 45 46 constructor( 47 private db: Database, 48 private jetstreamUrl: string, 49 private logger: Logger 50 ) { 51 // Initialize the indexer instance with the database 52 this.indexer = new Indexer(db, logger); 53 54 // Initialize helper classes 55 this.cursorManager = new CursorManager(db, logger); 56 this.circuitBreaker = new CircuitBreaker(100, () => this.stop(), logger); 57 this.reconnectionManager = new ReconnectionManager(10, 5000, logger); 58 59 // Build handler registry 60 this.handlerRegistry = this.createHandlerRegistry(); 61 this.wantedCollections = this.handlerRegistry.getCollections(); 62 63 // Initialize with a placeholder - will be recreated with cursor in start() 64 this.jetstream = this.createJetstream(); 65 this.setupEventHandlers(); 66 } 67 68 /** 69 * Create a new Jetstream instance with optional cursor 70 */ 71 private createJetstream(cursor?: number): Jetstream { 72 return new Jetstream({ 73 wantedCollections: this.wantedCollections, 74 endpoint: this.jetstreamUrl, 75 cursor, 76 }); 77 } 78 79 /** 80 * Factory method that creates a wrapped handler for a given Indexer method. 81 * The returned handler delegates to the indexer method with circuit breaker protection. 82 */ 83 private createWrappedHandler<M extends keyof Indexer>(methodName: M) { 84 return async (event: any) => { 85 await this.circuitBreaker.execute( 86 () => (this.indexer[methodName] as any).call(this.indexer, event), 87 methodName as string 88 ); 89 }; 90 } 91 92 /** 93 * Create and configure the event handler registry 94 */ 95 private createHandlerRegistry(): EventHandlerRegistry { 96 return new EventHandlerRegistry() 97 .register({ 98 collection: "space.atbb.post", 99 onCreate: this.createWrappedHandler("handlePostCreate"), 100 onUpdate: this.createWrappedHandler("handlePostUpdate"), 101 onDelete: this.createWrappedHandler("handlePostDelete"), 102 }) 103 .register({ 104 collection: "space.atbb.forum.forum", 105 onCreate: this.createWrappedHandler("handleForumCreate"), 106 onUpdate: this.createWrappedHandler("handleForumUpdate"), 107 onDelete: this.createWrappedHandler("handleForumDelete"), 108 }) 109 .register({ 110 collection: "space.atbb.forum.category", 111 onCreate: this.createWrappedHandler("handleCategoryCreate"), 112 onUpdate: this.createWrappedHandler("handleCategoryUpdate"), 113 onDelete: this.createWrappedHandler("handleCategoryDelete"), 114 }) 115 .register({ 116 collection: "space.atbb.forum.board", 117 onCreate: this.createWrappedHandler("handleBoardCreate"), 118 onUpdate: this.createWrappedHandler("handleBoardUpdate"), 119 onDelete: this.createWrappedHandler("handleBoardDelete"), 120 }) 121 .register({ 122 collection: "space.atbb.forum.role", 123 onCreate: this.createWrappedHandler("handleRoleCreate"), 124 onUpdate: this.createWrappedHandler("handleRoleUpdate"), 125 onDelete: this.createWrappedHandler("handleRoleDelete"), 126 }) 127 .register({ 128 collection: "space.atbb.membership", 129 onCreate: this.createWrappedHandler("handleMembershipCreate"), 130 onUpdate: this.createWrappedHandler("handleMembershipUpdate"), 131 onDelete: this.createWrappedHandler("handleMembershipDelete"), 132 }) 133 .register({ 134 collection: "space.atbb.modAction", 135 onCreate: this.createWrappedHandler("handleModActionCreate"), 136 onUpdate: this.createWrappedHandler("handleModActionUpdate"), 137 onDelete: this.createWrappedHandler("handleModActionDelete"), 138 }) 139 .register({ 140 collection: "space.atbb.reaction", 141 onCreate: this.createWrappedHandler("handleReactionCreate"), 142 onUpdate: this.createWrappedHandler("handleReactionUpdate"), 143 onDelete: this.createWrappedHandler("handleReactionDelete"), 144 }) 145 .register({ 146 collection: "space.atbb.forum.theme", 147 onCreate: this.createWrappedHandler("handleThemeCreate"), 148 onUpdate: this.createWrappedHandler("handleThemeUpdate"), 149 onDelete: this.createWrappedHandler("handleThemeDelete"), 150 }) 151 .register({ 152 collection: "space.atbb.forum.themePolicy", 153 onCreate: this.createWrappedHandler("handleThemePolicyCreate"), 154 onUpdate: this.createWrappedHandler("handleThemePolicyUpdate"), 155 onDelete: this.createWrappedHandler("handleThemePolicyDelete"), 156 }); 157 } 158 159 /** 160 * Set up event handlers using the registry 161 */ 162 private setupEventHandlers() { 163 // Apply all handlers from the registry 164 this.handlerRegistry.applyTo(this.jetstream); 165 166 // Listen to all commits to track cursor and last event time 167 this.jetstream.on("commit", async (event) => { 168 this.lastEventTime = new Date(); 169 await this.cursorManager.update(event.time_us); 170 }); 171 172 // Handle errors and disconnections 173 this.jetstream.on("error", (error) => { 174 this.logger.error("Jetstream error", { error: error instanceof Error ? error.message : String(error) }); 175 this.handleReconnect(); 176 }); 177 } 178 179 /** 180 * Start the firehose subscription 181 */ 182 async start() { 183 if (this.running) { 184 this.logger.warn("Firehose service is already running"); 185 return; 186 } 187 188 // Check for backfill before starting firehose — only on the initial start. 189 // Reconnects skip this block to avoid re-running a completed backfill every 190 // time the Jetstream WebSocket drops and recovers. 191 // Wrapped in try-catch so a transient DB error at startup doesn't kill the process — 192 // stale data served from the firehose is better than no data at all. 193 if (this.isInitialStart && this.backfillManager) { 194 this.isInitialStart = false; 195 try { 196 const interrupted = await this.backfillManager.checkForInterruptedBackfill(); 197 if (interrupted) { 198 this.logger.info("Resuming interrupted backfill", { 199 event: "firehose.backfill.resuming_interrupted", 200 backfillId: interrupted.id.toString(), 201 lastProcessedDid: interrupted.lastProcessedDid, 202 }); 203 await this.backfillManager.resumeBackfill(interrupted); 204 this.logger.info("Interrupted backfill resumed", { 205 event: "firehose.backfill.resumed", 206 backfillId: interrupted.id.toString(), 207 }); 208 } else { 209 const savedCursorForCheck = await this.cursorManager.load(); 210 const backfillStatus = await this.backfillManager.checkIfNeeded(savedCursorForCheck); 211 212 if (backfillStatus !== BackfillStatus.NotNeeded) { 213 this.logger.info("Starting backfill", { 214 event: "firehose.backfill.starting", 215 type: backfillStatus, 216 }); 217 await this.backfillManager.performBackfill(backfillStatus); 218 this.logger.info("Backfill completed", { 219 event: "firehose.backfill.completed", 220 type: backfillStatus, 221 }); 222 } 223 } 224 } catch (error) { 225 this.logger.error("Backfill skipped due to startup error — firehose will start without it", { 226 event: "firehose.backfill.startup_error", 227 error: error instanceof Error ? error.message : String(error), 228 }); 229 // Continue to start firehose — stale data is better than no data 230 } 231 } 232 233 try { 234 // Load the last cursor from database 235 const savedCursor = await this.cursorManager.load(); 236 if (savedCursor) { 237 this.logger.info("Resuming from cursor", { cursor: savedCursor.toString() }); 238 // Rewind by 10 seconds to ensure we don't miss any events 239 const rewindedCursor = this.cursorManager.rewind(savedCursor, 10_000_000); 240 241 // Recreate Jetstream instance with cursor 242 this.jetstream = this.createJetstream(Number(rewindedCursor)); 243 this.setupEventHandlers(); 244 } 245 246 this.logger.info("Starting Jetstream firehose subscription", { url: this.jetstreamUrl }); 247 await this.jetstream.start(); 248 this.running = true; 249 this.reconnectionManager.reset(); 250 this.logger.info("Jetstream firehose subscription started successfully"); 251 } catch (error) { 252 this.logger.error("Failed to start Jetstream firehose", { error: error instanceof Error ? error.message : String(error) }); 253 this.handleReconnect(); 254 } 255 } 256 257 /** 258 * Stop the firehose subscription 259 */ 260 async stop() { 261 if (!this.running) { 262 return; 263 } 264 265 this.logger.info("Stopping Jetstream firehose subscription"); 266 await this.jetstream.close(); 267 this.running = false; 268 this.logger.info("Jetstream firehose subscription stopped"); 269 } 270 271 /** 272 * Check if the firehose is currently running 273 */ 274 isRunning(): boolean { 275 return this.running; 276 } 277 278 /** 279 * Get the timestamp of the last received event 280 */ 281 getLastEventTime(): Date | null { 282 return this.lastEventTime; 283 } 284 285 /** 286 * Inject the BackfillManager. Called during AppContext wiring. 287 */ 288 setBackfillManager(manager: BackfillManager): void { 289 this.backfillManager = manager; 290 } 291 292 /** 293 * Expose the Indexer instance for BackfillManager wiring. 294 */ 295 getIndexer(): Indexer { 296 return this.indexer; 297 } 298 299 /** 300 * Handle reconnection with exponential backoff 301 */ 302 private async handleReconnect() { 303 try { 304 await this.reconnectionManager.attemptReconnect(async () => { 305 this.running = false; 306 await this.start(); 307 }); 308 } catch (error) { 309 this.logger.fatal("Firehose indexing has stopped. The appview will continue serving stale data.", { 310 event: "firehose.reconnect.exhausted", 311 error: error instanceof Error ? error.message : String(error), 312 }); 313 this.running = false; 314 } 315 } 316 317}