WIP! A BB-style forum, on the ATmosphere! We're still working... we'll be back soon when we have something to show off!
node typescript hono htmx atproto
at atb-52-css-token-extraction 305 lines 11 kB view raw
1import { Jetstream } from "@skyware/jetstream"; 2import { type Database } from "@atbb/db"; 3import type { Logger } from "@atbb/logger"; 4import { Indexer } from "./indexer.js"; 5import { CursorManager } from "./cursor-manager.js"; 6import { CircuitBreaker } from "./circuit-breaker.js"; 7import { ReconnectionManager } from "./reconnection-manager.js"; 8import { EventHandlerRegistry } from "./event-handler-registry.js"; 9import type { BackfillManager } from "./backfill-manager.js"; 10import { BackfillStatus } from "./backfill-manager.js"; 11 12/** 13 * Firehose service that subscribes to AT Proto Jetstream 14 * and indexes space.atbb.* records into the database. 15 * 16 * Responsibilities: 17 * - WebSocket connection management via Jetstream 18 * - Event routing to indexer handlers 19 * - Health status monitoring 20 * 21 * Delegates to: 22 * - CursorManager: cursor persistence 23 * - CircuitBreaker: failure tracking and circuit breaking 24 * - ReconnectionManager: reconnection with exponential backoff 25 */ 26export class FirehoseService { 27 private jetstream: Jetstream; 28 private indexer: Indexer; 29 private running = false; 30 private lastEventTime: Date | null = null; 31 private cursorManager: CursorManager; 32 private circuitBreaker: CircuitBreaker; 33 private reconnectionManager: ReconnectionManager; 34 35 // Event handler registry 36 private handlerRegistry: EventHandlerRegistry; 37 38 private backfillManager: BackfillManager | null = null; 39 40 // Guard: only run startup backfill on the initial start, not on reconnects. 41 private isInitialStart = true; 42 43 // Collections we're interested in (full lexicon IDs) 44 private readonly wantedCollections: string[]; 45 46 constructor( 47 private db: Database, 48 private jetstreamUrl: string, 49 private logger: Logger 50 ) { 51 // Initialize the indexer instance with the database 52 this.indexer = new Indexer(db, logger); 53 54 // Initialize helper classes 55 this.cursorManager = new CursorManager(db, logger); 56 this.circuitBreaker = new CircuitBreaker(100, () => this.stop(), logger); 57 this.reconnectionManager = new ReconnectionManager(10, 5000, logger); 58 59 // Build handler registry 60 this.handlerRegistry = this.createHandlerRegistry(); 61 this.wantedCollections = this.handlerRegistry.getCollections(); 62 63 // Initialize with a placeholder - will be recreated with cursor in start() 64 this.jetstream = this.createJetstream(); 65 this.setupEventHandlers(); 66 } 67 68 /** 69 * Create a new Jetstream instance with optional cursor 70 */ 71 private createJetstream(cursor?: number): Jetstream { 72 return new Jetstream({ 73 wantedCollections: this.wantedCollections, 74 endpoint: this.jetstreamUrl, 75 cursor, 76 }); 77 } 78 79 /** 80 * Factory method that creates a wrapped handler for a given Indexer method. 81 * The returned handler delegates to the indexer method with circuit breaker protection. 82 */ 83 private createWrappedHandler<M extends keyof Indexer>(methodName: M) { 84 return async (event: any) => { 85 await this.circuitBreaker.execute( 86 () => (this.indexer[methodName] as any).call(this.indexer, event), 87 methodName as string 88 ); 89 }; 90 } 91 92 /** 93 * Create and configure the event handler registry 94 */ 95 private createHandlerRegistry(): EventHandlerRegistry { 96 return new EventHandlerRegistry() 97 .register({ 98 collection: "space.atbb.post", 99 onCreate: this.createWrappedHandler("handlePostCreate"), 100 onUpdate: this.createWrappedHandler("handlePostUpdate"), 101 onDelete: this.createWrappedHandler("handlePostDelete"), 102 }) 103 .register({ 104 collection: "space.atbb.forum.forum", 105 onCreate: this.createWrappedHandler("handleForumCreate"), 106 onUpdate: this.createWrappedHandler("handleForumUpdate"), 107 onDelete: this.createWrappedHandler("handleForumDelete"), 108 }) 109 .register({ 110 collection: "space.atbb.forum.category", 111 onCreate: this.createWrappedHandler("handleCategoryCreate"), 112 onUpdate: this.createWrappedHandler("handleCategoryUpdate"), 113 onDelete: this.createWrappedHandler("handleCategoryDelete"), 114 }) 115 .register({ 116 collection: "space.atbb.forum.board", 117 onCreate: this.createWrappedHandler("handleBoardCreate"), 118 onUpdate: this.createWrappedHandler("handleBoardUpdate"), 119 onDelete: this.createWrappedHandler("handleBoardDelete"), 120 }) 121 .register({ 122 collection: "space.atbb.forum.role", 123 onCreate: this.createWrappedHandler("handleRoleCreate"), 124 onUpdate: this.createWrappedHandler("handleRoleUpdate"), 125 onDelete: this.createWrappedHandler("handleRoleDelete"), 126 }) 127 .register({ 128 collection: "space.atbb.membership", 129 onCreate: this.createWrappedHandler("handleMembershipCreate"), 130 onUpdate: this.createWrappedHandler("handleMembershipUpdate"), 131 onDelete: this.createWrappedHandler("handleMembershipDelete"), 132 }) 133 .register({ 134 collection: "space.atbb.modAction", 135 onCreate: this.createWrappedHandler("handleModActionCreate"), 136 onUpdate: this.createWrappedHandler("handleModActionUpdate"), 137 onDelete: this.createWrappedHandler("handleModActionDelete"), 138 }) 139 .register({ 140 collection: "space.atbb.reaction", 141 onCreate: this.createWrappedHandler("handleReactionCreate"), 142 onUpdate: this.createWrappedHandler("handleReactionUpdate"), 143 onDelete: this.createWrappedHandler("handleReactionDelete"), 144 }); 145 } 146 147 /** 148 * Set up event handlers using the registry 149 */ 150 private setupEventHandlers() { 151 // Apply all handlers from the registry 152 this.handlerRegistry.applyTo(this.jetstream); 153 154 // Listen to all commits to track cursor and last event time 155 this.jetstream.on("commit", async (event) => { 156 this.lastEventTime = new Date(); 157 await this.cursorManager.update(event.time_us); 158 }); 159 160 // Handle errors and disconnections 161 this.jetstream.on("error", (error) => { 162 this.logger.error("Jetstream error", { error: error instanceof Error ? error.message : String(error) }); 163 this.handleReconnect(); 164 }); 165 } 166 167 /** 168 * Start the firehose subscription 169 */ 170 async start() { 171 if (this.running) { 172 this.logger.warn("Firehose service is already running"); 173 return; 174 } 175 176 // Check for backfill before starting firehose — only on the initial start. 177 // Reconnects skip this block to avoid re-running a completed backfill every 178 // time the Jetstream WebSocket drops and recovers. 179 // Wrapped in try-catch so a transient DB error at startup doesn't kill the process — 180 // stale data served from the firehose is better than no data at all. 181 if (this.isInitialStart && this.backfillManager) { 182 this.isInitialStart = false; 183 try { 184 const interrupted = await this.backfillManager.checkForInterruptedBackfill(); 185 if (interrupted) { 186 this.logger.info("Resuming interrupted backfill", { 187 event: "firehose.backfill.resuming_interrupted", 188 backfillId: interrupted.id.toString(), 189 lastProcessedDid: interrupted.lastProcessedDid, 190 }); 191 await this.backfillManager.resumeBackfill(interrupted); 192 this.logger.info("Interrupted backfill resumed", { 193 event: "firehose.backfill.resumed", 194 backfillId: interrupted.id.toString(), 195 }); 196 } else { 197 const savedCursorForCheck = await this.cursorManager.load(); 198 const backfillStatus = await this.backfillManager.checkIfNeeded(savedCursorForCheck); 199 200 if (backfillStatus !== BackfillStatus.NotNeeded) { 201 this.logger.info("Starting backfill", { 202 event: "firehose.backfill.starting", 203 type: backfillStatus, 204 }); 205 await this.backfillManager.performBackfill(backfillStatus); 206 this.logger.info("Backfill completed", { 207 event: "firehose.backfill.completed", 208 type: backfillStatus, 209 }); 210 } 211 } 212 } catch (error) { 213 this.logger.error("Backfill skipped due to startup error — firehose will start without it", { 214 event: "firehose.backfill.startup_error", 215 error: error instanceof Error ? error.message : String(error), 216 }); 217 // Continue to start firehose — stale data is better than no data 218 } 219 } 220 221 try { 222 // Load the last cursor from database 223 const savedCursor = await this.cursorManager.load(); 224 if (savedCursor) { 225 this.logger.info("Resuming from cursor", { cursor: savedCursor.toString() }); 226 // Rewind by 10 seconds to ensure we don't miss any events 227 const rewindedCursor = this.cursorManager.rewind(savedCursor, 10_000_000); 228 229 // Recreate Jetstream instance with cursor 230 this.jetstream = this.createJetstream(Number(rewindedCursor)); 231 this.setupEventHandlers(); 232 } 233 234 this.logger.info("Starting Jetstream firehose subscription", { url: this.jetstreamUrl }); 235 await this.jetstream.start(); 236 this.running = true; 237 this.reconnectionManager.reset(); 238 this.logger.info("Jetstream firehose subscription started successfully"); 239 } catch (error) { 240 this.logger.error("Failed to start Jetstream firehose", { error: error instanceof Error ? error.message : String(error) }); 241 this.handleReconnect(); 242 } 243 } 244 245 /** 246 * Stop the firehose subscription 247 */ 248 async stop() { 249 if (!this.running) { 250 return; 251 } 252 253 this.logger.info("Stopping Jetstream firehose subscription"); 254 await this.jetstream.close(); 255 this.running = false; 256 this.logger.info("Jetstream firehose subscription stopped"); 257 } 258 259 /** 260 * Check if the firehose is currently running 261 */ 262 isRunning(): boolean { 263 return this.running; 264 } 265 266 /** 267 * Get the timestamp of the last received event 268 */ 269 getLastEventTime(): Date | null { 270 return this.lastEventTime; 271 } 272 273 /** 274 * Inject the BackfillManager. Called during AppContext wiring. 275 */ 276 setBackfillManager(manager: BackfillManager): void { 277 this.backfillManager = manager; 278 } 279 280 /** 281 * Expose the Indexer instance for BackfillManager wiring. 282 */ 283 getIndexer(): Indexer { 284 return this.indexer; 285 } 286 287 /** 288 * Handle reconnection with exponential backoff 289 */ 290 private async handleReconnect() { 291 try { 292 await this.reconnectionManager.attemptReconnect(async () => { 293 this.running = false; 294 await this.start(); 295 }); 296 } catch (error) { 297 this.logger.fatal("Firehose indexing has stopped. The appview will continue serving stale data.", { 298 event: "firehose.reconnect.exhausted", 299 error: error instanceof Error ? error.message : String(error), 300 }); 301 this.running = false; 302 } 303 } 304 305}