WIP! A BB-style forum, on the ATmosphere!
We're still working... we'll be back soon when we have something to show off!
node
typescript
hono
htmx
atproto
1import { describe, it, expect, beforeEach, vi, afterEach } from "vitest";
2import { FirehoseService } from "../firehose.js";
3import { createMockLogger } from "./mock-logger.js";
4import type { Database } from "@atbb/db";
5import { BackfillStatus } from "../backfill-manager.js";
6
7// Mock backfill-manager to prevent @atproto/api from loading in unit tests
8// BackfillStatus enum is re-exported as a real object so it can be used in comparisons
9vi.mock("../backfill-manager.js", () => {
10 return {
11 BackfillStatus: {
12 NotNeeded: "not_needed",
13 CatchUp: "catch_up",
14 FullSync: "full_sync",
15 },
16 };
17});
18
19// Mock Jetstream
20vi.mock("@skyware/jetstream", () => {
21 return {
22 Jetstream: vi.fn().mockImplementation((_config) => {
23 return {
24 onCreate: vi.fn(),
25 onUpdate: vi.fn(),
26 onDelete: vi.fn(),
27 on: vi.fn(),
28 start: vi.fn().mockResolvedValue(undefined),
29 close: vi.fn().mockResolvedValue(undefined),
30 };
31 }),
32 };
33});
34
35// Mock indexer
36vi.mock("../indexer.js", () => {
37 return {
38 Indexer: vi.fn().mockImplementation(() => {
39 return {
40 handlePostCreate: vi.fn(),
41 handlePostUpdate: vi.fn(),
42 handlePostDelete: vi.fn(),
43 handleForumCreate: vi.fn(),
44 handleForumUpdate: vi.fn(),
45 handleForumDelete: vi.fn(),
46 handleCategoryCreate: vi.fn(),
47 handleCategoryUpdate: vi.fn(),
48 handleCategoryDelete: vi.fn(),
49 handleBoardCreate: vi.fn(),
50 handleBoardUpdate: vi.fn(),
51 handleBoardDelete: vi.fn(),
52 handleRoleCreate: vi.fn(),
53 handleRoleUpdate: vi.fn(),
54 handleRoleDelete: vi.fn(),
55 handleMembershipCreate: vi.fn(),
56 handleMembershipUpdate: vi.fn(),
57 handleMembershipDelete: vi.fn(),
58 handleModActionCreate: vi.fn(),
59 handleModActionUpdate: vi.fn(),
60 handleModActionDelete: vi.fn(),
61 handleReactionCreate: vi.fn(),
62 handleReactionUpdate: vi.fn(),
63 handleReactionDelete: vi.fn(),
64 };
65 }),
66 };
67});
68
69describe("FirehoseService", () => {
70 let mockDb: Database;
71 let firehoseService: FirehoseService;
72 let mockLogger: ReturnType<typeof createMockLogger>;
73
74 beforeEach(() => {
75 mockLogger = createMockLogger();
76
77 // Create mock database
78 const mockInsert = vi.fn().mockReturnValue({
79 values: vi.fn().mockReturnValue({
80 onConflictDoUpdate: vi.fn().mockResolvedValue(undefined),
81 }),
82 });
83
84 const mockSelect = vi.fn().mockReturnValue({
85 from: vi.fn().mockReturnValue({
86 where: vi.fn().mockReturnValue({
87 limit: vi.fn().mockResolvedValue([]),
88 }),
89 }),
90 });
91
92 mockDb = {
93 insert: mockInsert,
94 select: mockSelect,
95 } as unknown as Database;
96 });
97
98 afterEach(() => {
99 vi.clearAllMocks();
100 });
101
102 describe("Construction", () => {
103 it("should initialize with database and Jetstream URL", () => {
104 expect(() => {
105 firehoseService = new FirehoseService(
106 mockDb,
107 "wss://jetstream.example.com",
108 mockLogger
109 );
110 }).not.toThrow();
111 });
112
113 it("should create Indexer with database instance", async () => {
114 const { Indexer } = await import("../indexer.js");
115
116 firehoseService = new FirehoseService(
117 mockDb,
118 "wss://jetstream.example.com",
119 mockLogger
120 );
121
122 expect(Indexer).toHaveBeenCalledWith(mockDb, mockLogger);
123 });
124 });
125
126 describe("Lifecycle", () => {
127 beforeEach(() => {
128 firehoseService = new FirehoseService(
129 mockDb,
130 "wss://jetstream.example.com",
131 mockLogger
132 );
133 });
134
135 it("should start the firehose subscription", async () => {
136 await firehoseService.start();
137
138 // Verify start was called
139 expect(firehoseService).toBeDefined();
140 });
141
142 it("should stop the firehose subscription", async () => {
143 await firehoseService.start();
144 await firehoseService.stop();
145
146 // Verify service stopped gracefully
147 expect(firehoseService).toBeDefined();
148 });
149
150 it("should not start if already running", async () => {
151 await firehoseService.start();
152 await firehoseService.start(); // Second call
153
154 expect(mockLogger.warn).toHaveBeenCalledWith(
155 "Firehose service is already running"
156 );
157 });
158 });
159
160 describe("Cursor Management", () => {
161 beforeEach(() => {
162 firehoseService = new FirehoseService(
163 mockDb,
164 "wss://jetstream.example.com",
165 mockLogger
166 );
167 });
168
169 it("should resume from saved cursor on start", async () => {
170 // Mock cursor retrieval
171 const savedCursor = BigInt(1234567890000000);
172 vi.spyOn(mockDb, "select").mockReturnValue({
173 from: vi.fn().mockReturnValue({
174 where: vi.fn().mockReturnValue({
175 limit: vi.fn().mockResolvedValue([{ cursor: savedCursor }]),
176 }),
177 }),
178 } as any);
179
180 await firehoseService.start();
181
182 // Verify cursor was loaded and logged
183 expect(mockLogger.info).toHaveBeenCalledWith(
184 "Resuming from cursor",
185 expect.any(Object)
186 );
187 });
188
189 it("should start from beginning if no cursor exists", async () => {
190 // Mock no cursor found
191 vi.spyOn(mockDb, "select").mockReturnValue({
192 from: vi.fn().mockReturnValue({
193 where: vi.fn().mockReturnValue({
194 limit: vi.fn().mockResolvedValue([]),
195 }),
196 }),
197 } as any);
198
199 await firehoseService.start();
200
201 // Service should start without error
202 expect(firehoseService).toBeDefined();
203 });
204 });
205
206 describe("Error Handling", () => {
207 beforeEach(() => {
208 firehoseService = new FirehoseService(
209 mockDb,
210 "wss://jetstream.example.com",
211 mockLogger
212 );
213 });
214
215 it("continues to start firehose when backfill throws on startup", async () => {
216 const mockBackfillManager = {
217 checkForInterruptedBackfill: vi.fn().mockRejectedValue(new Error("DB connection lost")),
218 checkIfNeeded: vi.fn(),
219 performBackfill: vi.fn(),
220 resumeBackfill: vi.fn(),
221 getIsRunning: vi.fn().mockReturnValue(false),
222 };
223
224 firehoseService.setBackfillManager(mockBackfillManager as any);
225 await firehoseService.start();
226
227 expect(firehoseService.isRunning()).toBe(true);
228 expect(mockLogger.error).toHaveBeenCalledWith(
229 "Backfill skipped due to startup error — firehose will start without it",
230 expect.objectContaining({ event: "firehose.backfill.startup_error" })
231 );
232 });
233 });
234});
235
236describe("Backfill Integration", () => {
237 let mockDb: Database;
238 let firehoseService: FirehoseService;
239
240 beforeEach(() => {
241 const mockInsert = vi.fn().mockReturnValue({
242 values: vi.fn().mockReturnValue({
243 onConflictDoUpdate: vi.fn().mockResolvedValue(undefined),
244 }),
245 });
246
247 const mockSelect = vi.fn().mockReturnValue({
248 from: vi.fn().mockReturnValue({
249 where: vi.fn().mockReturnValue({
250 limit: vi.fn().mockResolvedValue([]),
251 }),
252 }),
253 });
254
255 mockDb = {
256 insert: mockInsert,
257 select: mockSelect,
258 } as unknown as Database;
259
260 const mockLogger = createMockLogger();
261 firehoseService = new FirehoseService(mockDb, "wss://jetstream.example.com", mockLogger);
262 });
263
264 afterEach(() => {
265 // Use clearAllMocks (not restoreAllMocks) to preserve module mock implementations
266 vi.clearAllMocks();
267 });
268
269 it("runs backfill before starting jetstream when checkIfNeeded returns CatchUp", async () => {
270 const mockBackfillManager = {
271 checkForInterruptedBackfill: vi.fn().mockResolvedValue(null),
272 checkIfNeeded: vi.fn().mockResolvedValue(BackfillStatus.CatchUp),
273 performBackfill: vi.fn().mockResolvedValue({
274 backfillId: 1n, type: BackfillStatus.CatchUp, didsProcessed: 10,
275 recordsIndexed: 100, errors: 0, durationMs: 5000,
276 }),
277 resumeBackfill: vi.fn(),
278 getIsRunning: vi.fn().mockReturnValue(false),
279 };
280
281 firehoseService.setBackfillManager(mockBackfillManager as any);
282 await firehoseService.start();
283
284 expect(mockBackfillManager.checkForInterruptedBackfill).toHaveBeenCalled();
285 expect(mockBackfillManager.performBackfill).toHaveBeenCalledWith(BackfillStatus.CatchUp);
286 });
287
288 it("skips backfill when checkIfNeeded returns NotNeeded", async () => {
289 const mockBackfillManager = {
290 checkForInterruptedBackfill: vi.fn().mockResolvedValue(null),
291 checkIfNeeded: vi.fn().mockResolvedValue(BackfillStatus.NotNeeded),
292 performBackfill: vi.fn(),
293 resumeBackfill: vi.fn(),
294 getIsRunning: vi.fn().mockReturnValue(false),
295 };
296
297 firehoseService.setBackfillManager(mockBackfillManager as any);
298 await firehoseService.start();
299
300 expect(mockBackfillManager.performBackfill).not.toHaveBeenCalled();
301 });
302
303 it("resumes interrupted backfill before gap detection", async () => {
304 const interruptedRow = {
305 id: 5n,
306 status: "in_progress",
307 backfillType: "catch_up",
308 lastProcessedDid: "did:plc:halfway",
309 didsTotal: 100,
310 didsProcessed: 50,
311 recordsIndexed: 250,
312 startedAt: new Date(),
313 completedAt: null,
314 errorMessage: null,
315 };
316
317 const mockBackfillManager = {
318 checkForInterruptedBackfill: vi.fn().mockResolvedValue(interruptedRow),
319 resumeBackfill: vi.fn().mockResolvedValue({
320 backfillId: 5n, type: BackfillStatus.CatchUp, didsProcessed: 100,
321 recordsIndexed: 500, errors: 0, durationMs: 3000,
322 }),
323 checkIfNeeded: vi.fn(),
324 performBackfill: vi.fn(),
325 getIsRunning: vi.fn().mockReturnValue(false),
326 };
327
328 firehoseService.setBackfillManager(mockBackfillManager as any);
329 await firehoseService.start();
330
331 expect(mockBackfillManager.resumeBackfill).toHaveBeenCalledWith(interruptedRow);
332 // Gap detection should NOT run when there's an interrupted backfill
333 expect(mockBackfillManager.checkIfNeeded).not.toHaveBeenCalled();
334 });
335
336 it("starts firehose normally when no backfillManager is set", async () => {
337 // No setBackfillManager call — should start without errors
338 await firehoseService.start();
339 expect(firehoseService.isRunning()).toBe(true);
340 });
341
342 it("exposes indexer via getIndexer()", () => {
343 // Verify getIndexer() returns the internal indexer instance
344 const indexer = firehoseService.getIndexer();
345 expect(indexer).toBeDefined();
346 expect(typeof indexer.handlePostCreate).toBe("function");
347 });
348
349 it("does not re-run backfill on reconnect", async () => {
350 const mockBackfillManager = {
351 checkForInterruptedBackfill: vi.fn().mockResolvedValue(null),
352 checkIfNeeded: vi.fn().mockResolvedValue(BackfillStatus.NotNeeded),
353 performBackfill: vi.fn(),
354 resumeBackfill: vi.fn(),
355 getIsRunning: vi.fn().mockReturnValue(false),
356 };
357
358 firehoseService.setBackfillManager(mockBackfillManager as any);
359
360 // Initial start: backfill check runs once
361 await firehoseService.start();
362 expect(mockBackfillManager.checkForInterruptedBackfill).toHaveBeenCalledTimes(1);
363
364 // Simulate reconnect: handleReconnect sets running=false then calls start() again
365 await firehoseService.stop();
366 await firehoseService.start();
367
368 // Backfill check must NOT run again after the initial start
369 expect(mockBackfillManager.checkForInterruptedBackfill).toHaveBeenCalledTimes(1);
370 });
371});