WIP! A BB-style forum, on the ATmosphere! We're still working... we'll be back soon when we have something to show off!
node typescript hono htmx atproto
at root/atb-56-theme-caching-layer 371 lines 12 kB view raw
1import { describe, it, expect, beforeEach, vi, afterEach } from "vitest"; 2import { FirehoseService } from "../firehose.js"; 3import { createMockLogger } from "./mock-logger.js"; 4import type { Database } from "@atbb/db"; 5import { BackfillStatus } from "../backfill-manager.js"; 6 7// Mock backfill-manager to prevent @atproto/api from loading in unit tests 8// BackfillStatus enum is re-exported as a real object so it can be used in comparisons 9vi.mock("../backfill-manager.js", () => { 10 return { 11 BackfillStatus: { 12 NotNeeded: "not_needed", 13 CatchUp: "catch_up", 14 FullSync: "full_sync", 15 }, 16 }; 17}); 18 19// Mock Jetstream 20vi.mock("@skyware/jetstream", () => { 21 return { 22 Jetstream: vi.fn().mockImplementation((_config) => { 23 return { 24 onCreate: vi.fn(), 25 onUpdate: vi.fn(), 26 onDelete: vi.fn(), 27 on: vi.fn(), 28 start: vi.fn().mockResolvedValue(undefined), 29 close: vi.fn().mockResolvedValue(undefined), 30 }; 31 }), 32 }; 33}); 34 35// Mock indexer 36vi.mock("../indexer.js", () => { 37 return { 38 Indexer: vi.fn().mockImplementation(() => { 39 return { 40 handlePostCreate: vi.fn(), 41 handlePostUpdate: vi.fn(), 42 handlePostDelete: vi.fn(), 43 handleForumCreate: vi.fn(), 44 handleForumUpdate: vi.fn(), 45 handleForumDelete: vi.fn(), 46 handleCategoryCreate: vi.fn(), 47 handleCategoryUpdate: vi.fn(), 48 handleCategoryDelete: vi.fn(), 49 handleBoardCreate: vi.fn(), 50 handleBoardUpdate: vi.fn(), 51 handleBoardDelete: vi.fn(), 52 handleRoleCreate: vi.fn(), 53 handleRoleUpdate: vi.fn(), 54 handleRoleDelete: vi.fn(), 55 handleMembershipCreate: vi.fn(), 56 handleMembershipUpdate: vi.fn(), 57 handleMembershipDelete: vi.fn(), 58 handleModActionCreate: vi.fn(), 59 handleModActionUpdate: vi.fn(), 60 handleModActionDelete: vi.fn(), 61 handleReactionCreate: vi.fn(), 62 handleReactionUpdate: vi.fn(), 63 handleReactionDelete: vi.fn(), 64 }; 65 }), 66 }; 67}); 68 69describe("FirehoseService", () => { 70 let mockDb: Database; 71 let firehoseService: FirehoseService; 72 let mockLogger: ReturnType<typeof createMockLogger>; 73 74 beforeEach(() => { 75 mockLogger = createMockLogger(); 76 77 // Create mock database 78 const mockInsert = vi.fn().mockReturnValue({ 79 values: vi.fn().mockReturnValue({ 80 onConflictDoUpdate: vi.fn().mockResolvedValue(undefined), 81 }), 82 }); 83 84 const mockSelect = vi.fn().mockReturnValue({ 85 from: vi.fn().mockReturnValue({ 86 where: vi.fn().mockReturnValue({ 87 limit: vi.fn().mockResolvedValue([]), 88 }), 89 }), 90 }); 91 92 mockDb = { 93 insert: mockInsert, 94 select: mockSelect, 95 } as unknown as Database; 96 }); 97 98 afterEach(() => { 99 vi.clearAllMocks(); 100 }); 101 102 describe("Construction", () => { 103 it("should initialize with database and Jetstream URL", () => { 104 expect(() => { 105 firehoseService = new FirehoseService( 106 mockDb, 107 "wss://jetstream.example.com", 108 mockLogger 109 ); 110 }).not.toThrow(); 111 }); 112 113 it("should create Indexer with database instance", async () => { 114 const { Indexer } = await import("../indexer.js"); 115 116 firehoseService = new FirehoseService( 117 mockDb, 118 "wss://jetstream.example.com", 119 mockLogger 120 ); 121 122 expect(Indexer).toHaveBeenCalledWith(mockDb, mockLogger); 123 }); 124 }); 125 126 describe("Lifecycle", () => { 127 beforeEach(() => { 128 firehoseService = new FirehoseService( 129 mockDb, 130 "wss://jetstream.example.com", 131 mockLogger 132 ); 133 }); 134 135 it("should start the firehose subscription", async () => { 136 await firehoseService.start(); 137 138 // Verify start was called 139 expect(firehoseService).toBeDefined(); 140 }); 141 142 it("should stop the firehose subscription", async () => { 143 await firehoseService.start(); 144 await firehoseService.stop(); 145 146 // Verify service stopped gracefully 147 expect(firehoseService).toBeDefined(); 148 }); 149 150 it("should not start if already running", async () => { 151 await firehoseService.start(); 152 await firehoseService.start(); // Second call 153 154 expect(mockLogger.warn).toHaveBeenCalledWith( 155 "Firehose service is already running" 156 ); 157 }); 158 }); 159 160 describe("Cursor Management", () => { 161 beforeEach(() => { 162 firehoseService = new FirehoseService( 163 mockDb, 164 "wss://jetstream.example.com", 165 mockLogger 166 ); 167 }); 168 169 it("should resume from saved cursor on start", async () => { 170 // Mock cursor retrieval 171 const savedCursor = BigInt(1234567890000000); 172 vi.spyOn(mockDb, "select").mockReturnValue({ 173 from: vi.fn().mockReturnValue({ 174 where: vi.fn().mockReturnValue({ 175 limit: vi.fn().mockResolvedValue([{ cursor: savedCursor }]), 176 }), 177 }), 178 } as any); 179 180 await firehoseService.start(); 181 182 // Verify cursor was loaded and logged 183 expect(mockLogger.info).toHaveBeenCalledWith( 184 "Resuming from cursor", 185 expect.any(Object) 186 ); 187 }); 188 189 it("should start from beginning if no cursor exists", async () => { 190 // Mock no cursor found 191 vi.spyOn(mockDb, "select").mockReturnValue({ 192 from: vi.fn().mockReturnValue({ 193 where: vi.fn().mockReturnValue({ 194 limit: vi.fn().mockResolvedValue([]), 195 }), 196 }), 197 } as any); 198 199 await firehoseService.start(); 200 201 // Service should start without error 202 expect(firehoseService).toBeDefined(); 203 }); 204 }); 205 206 describe("Error Handling", () => { 207 beforeEach(() => { 208 firehoseService = new FirehoseService( 209 mockDb, 210 "wss://jetstream.example.com", 211 mockLogger 212 ); 213 }); 214 215 it("continues to start firehose when backfill throws on startup", async () => { 216 const mockBackfillManager = { 217 checkForInterruptedBackfill: vi.fn().mockRejectedValue(new Error("DB connection lost")), 218 checkIfNeeded: vi.fn(), 219 performBackfill: vi.fn(), 220 resumeBackfill: vi.fn(), 221 getIsRunning: vi.fn().mockReturnValue(false), 222 }; 223 224 firehoseService.setBackfillManager(mockBackfillManager as any); 225 await firehoseService.start(); 226 227 expect(firehoseService.isRunning()).toBe(true); 228 expect(mockLogger.error).toHaveBeenCalledWith( 229 "Backfill skipped due to startup error — firehose will start without it", 230 expect.objectContaining({ event: "firehose.backfill.startup_error" }) 231 ); 232 }); 233 }); 234}); 235 236describe("Backfill Integration", () => { 237 let mockDb: Database; 238 let firehoseService: FirehoseService; 239 240 beforeEach(() => { 241 const mockInsert = vi.fn().mockReturnValue({ 242 values: vi.fn().mockReturnValue({ 243 onConflictDoUpdate: vi.fn().mockResolvedValue(undefined), 244 }), 245 }); 246 247 const mockSelect = vi.fn().mockReturnValue({ 248 from: vi.fn().mockReturnValue({ 249 where: vi.fn().mockReturnValue({ 250 limit: vi.fn().mockResolvedValue([]), 251 }), 252 }), 253 }); 254 255 mockDb = { 256 insert: mockInsert, 257 select: mockSelect, 258 } as unknown as Database; 259 260 const mockLogger = createMockLogger(); 261 firehoseService = new FirehoseService(mockDb, "wss://jetstream.example.com", mockLogger); 262 }); 263 264 afterEach(() => { 265 // Use clearAllMocks (not restoreAllMocks) to preserve module mock implementations 266 vi.clearAllMocks(); 267 }); 268 269 it("runs backfill before starting jetstream when checkIfNeeded returns CatchUp", async () => { 270 const mockBackfillManager = { 271 checkForInterruptedBackfill: vi.fn().mockResolvedValue(null), 272 checkIfNeeded: vi.fn().mockResolvedValue(BackfillStatus.CatchUp), 273 performBackfill: vi.fn().mockResolvedValue({ 274 backfillId: 1n, type: BackfillStatus.CatchUp, didsProcessed: 10, 275 recordsIndexed: 100, errors: 0, durationMs: 5000, 276 }), 277 resumeBackfill: vi.fn(), 278 getIsRunning: vi.fn().mockReturnValue(false), 279 }; 280 281 firehoseService.setBackfillManager(mockBackfillManager as any); 282 await firehoseService.start(); 283 284 expect(mockBackfillManager.checkForInterruptedBackfill).toHaveBeenCalled(); 285 expect(mockBackfillManager.performBackfill).toHaveBeenCalledWith(BackfillStatus.CatchUp); 286 }); 287 288 it("skips backfill when checkIfNeeded returns NotNeeded", async () => { 289 const mockBackfillManager = { 290 checkForInterruptedBackfill: vi.fn().mockResolvedValue(null), 291 checkIfNeeded: vi.fn().mockResolvedValue(BackfillStatus.NotNeeded), 292 performBackfill: vi.fn(), 293 resumeBackfill: vi.fn(), 294 getIsRunning: vi.fn().mockReturnValue(false), 295 }; 296 297 firehoseService.setBackfillManager(mockBackfillManager as any); 298 await firehoseService.start(); 299 300 expect(mockBackfillManager.performBackfill).not.toHaveBeenCalled(); 301 }); 302 303 it("resumes interrupted backfill before gap detection", async () => { 304 const interruptedRow = { 305 id: 5n, 306 status: "in_progress", 307 backfillType: "catch_up", 308 lastProcessedDid: "did:plc:halfway", 309 didsTotal: 100, 310 didsProcessed: 50, 311 recordsIndexed: 250, 312 startedAt: new Date(), 313 completedAt: null, 314 errorMessage: null, 315 }; 316 317 const mockBackfillManager = { 318 checkForInterruptedBackfill: vi.fn().mockResolvedValue(interruptedRow), 319 resumeBackfill: vi.fn().mockResolvedValue({ 320 backfillId: 5n, type: BackfillStatus.CatchUp, didsProcessed: 100, 321 recordsIndexed: 500, errors: 0, durationMs: 3000, 322 }), 323 checkIfNeeded: vi.fn(), 324 performBackfill: vi.fn(), 325 getIsRunning: vi.fn().mockReturnValue(false), 326 }; 327 328 firehoseService.setBackfillManager(mockBackfillManager as any); 329 await firehoseService.start(); 330 331 expect(mockBackfillManager.resumeBackfill).toHaveBeenCalledWith(interruptedRow); 332 // Gap detection should NOT run when there's an interrupted backfill 333 expect(mockBackfillManager.checkIfNeeded).not.toHaveBeenCalled(); 334 }); 335 336 it("starts firehose normally when no backfillManager is set", async () => { 337 // No setBackfillManager call — should start without errors 338 await firehoseService.start(); 339 expect(firehoseService.isRunning()).toBe(true); 340 }); 341 342 it("exposes indexer via getIndexer()", () => { 343 // Verify getIndexer() returns the internal indexer instance 344 const indexer = firehoseService.getIndexer(); 345 expect(indexer).toBeDefined(); 346 expect(typeof indexer.handlePostCreate).toBe("function"); 347 }); 348 349 it("does not re-run backfill on reconnect", async () => { 350 const mockBackfillManager = { 351 checkForInterruptedBackfill: vi.fn().mockResolvedValue(null), 352 checkIfNeeded: vi.fn().mockResolvedValue(BackfillStatus.NotNeeded), 353 performBackfill: vi.fn(), 354 resumeBackfill: vi.fn(), 355 getIsRunning: vi.fn().mockReturnValue(false), 356 }; 357 358 firehoseService.setBackfillManager(mockBackfillManager as any); 359 360 // Initial start: backfill check runs once 361 await firehoseService.start(); 362 expect(mockBackfillManager.checkForInterruptedBackfill).toHaveBeenCalledTimes(1); 363 364 // Simulate reconnect: handleReconnect sets running=false then calls start() again 365 await firehoseService.stop(); 366 await firehoseService.start(); 367 368 // Backfill check must NOT run again after the initial start 369 expect(mockBackfillManager.checkForInterruptedBackfill).toHaveBeenCalledTimes(1); 370 }); 371});