import { describe, it, expect, beforeEach, vi, afterEach } from "vitest"; import { FirehoseService } from "../firehose.js"; import { createMockLogger } from "./mock-logger.js"; import type { Database } from "@atbb/db"; import { BackfillStatus } from "../backfill-manager.js"; // Mock backfill-manager to prevent @atproto/api from loading in unit tests // BackfillStatus enum is re-exported as a real object so it can be used in comparisons vi.mock("../backfill-manager.js", () => { return { BackfillStatus: { NotNeeded: "not_needed", CatchUp: "catch_up", FullSync: "full_sync", }, }; }); // Mock Jetstream vi.mock("@skyware/jetstream", () => { return { Jetstream: vi.fn().mockImplementation((_config) => { return { onCreate: vi.fn(), onUpdate: vi.fn(), onDelete: vi.fn(), on: vi.fn(), start: vi.fn().mockResolvedValue(undefined), close: vi.fn().mockResolvedValue(undefined), }; }), }; }); // Mock indexer vi.mock("../indexer.js", () => { return { Indexer: vi.fn().mockImplementation(() => { return { handlePostCreate: vi.fn(), handlePostUpdate: vi.fn(), handlePostDelete: vi.fn(), handleForumCreate: vi.fn(), handleForumUpdate: vi.fn(), handleForumDelete: vi.fn(), handleCategoryCreate: vi.fn(), handleCategoryUpdate: vi.fn(), handleCategoryDelete: vi.fn(), handleBoardCreate: vi.fn(), handleBoardUpdate: vi.fn(), handleBoardDelete: vi.fn(), handleRoleCreate: vi.fn(), handleRoleUpdate: vi.fn(), handleRoleDelete: vi.fn(), handleMembershipCreate: vi.fn(), handleMembershipUpdate: vi.fn(), handleMembershipDelete: vi.fn(), handleModActionCreate: vi.fn(), handleModActionUpdate: vi.fn(), handleModActionDelete: vi.fn(), handleReactionCreate: vi.fn(), handleReactionUpdate: vi.fn(), handleReactionDelete: vi.fn(), }; }), }; }); describe("FirehoseService", () => { let mockDb: Database; let firehoseService: FirehoseService; let mockLogger: ReturnType; beforeEach(() => { mockLogger = createMockLogger(); // Create mock database const mockInsert = vi.fn().mockReturnValue({ values: vi.fn().mockReturnValue({ onConflictDoUpdate: vi.fn().mockResolvedValue(undefined), }), }); const mockSelect = vi.fn().mockReturnValue({ from: vi.fn().mockReturnValue({ where: vi.fn().mockReturnValue({ limit: vi.fn().mockResolvedValue([]), }), }), }); mockDb = { insert: mockInsert, select: mockSelect, } as unknown as Database; }); afterEach(() => { vi.clearAllMocks(); }); describe("Construction", () => { it("should initialize with database and Jetstream URL", () => { expect(() => { firehoseService = new FirehoseService( mockDb, "wss://jetstream.example.com", mockLogger ); }).not.toThrow(); }); it("should create Indexer with database instance", async () => { const { Indexer } = await import("../indexer.js"); firehoseService = new FirehoseService( mockDb, "wss://jetstream.example.com", mockLogger ); expect(Indexer).toHaveBeenCalledWith(mockDb, mockLogger); }); }); describe("Lifecycle", () => { beforeEach(() => { firehoseService = new FirehoseService( mockDb, "wss://jetstream.example.com", mockLogger ); }); it("should start the firehose subscription", async () => { await firehoseService.start(); // Verify start was called expect(firehoseService).toBeDefined(); }); it("should stop the firehose subscription", async () => { await firehoseService.start(); await firehoseService.stop(); // Verify service stopped gracefully expect(firehoseService).toBeDefined(); }); it("should not start if already running", async () => { await firehoseService.start(); await firehoseService.start(); // Second call expect(mockLogger.warn).toHaveBeenCalledWith( "Firehose service is already running" ); }); }); describe("Cursor Management", () => { beforeEach(() => { firehoseService = new FirehoseService( mockDb, "wss://jetstream.example.com", mockLogger ); }); it("should resume from saved cursor on start", async () => { // Mock cursor retrieval const savedCursor = BigInt(1234567890000000); vi.spyOn(mockDb, "select").mockReturnValue({ from: vi.fn().mockReturnValue({ where: vi.fn().mockReturnValue({ limit: vi.fn().mockResolvedValue([{ cursor: savedCursor }]), }), }), } as any); await firehoseService.start(); // Verify cursor was loaded and logged expect(mockLogger.info).toHaveBeenCalledWith( "Resuming from cursor", expect.any(Object) ); }); it("should start from beginning if no cursor exists", async () => { // Mock no cursor found vi.spyOn(mockDb, "select").mockReturnValue({ from: vi.fn().mockReturnValue({ where: vi.fn().mockReturnValue({ limit: vi.fn().mockResolvedValue([]), }), }), } as any); await firehoseService.start(); // Service should start without error expect(firehoseService).toBeDefined(); }); }); describe("Error Handling", () => { beforeEach(() => { firehoseService = new FirehoseService( mockDb, "wss://jetstream.example.com", mockLogger ); }); it("continues to start firehose when backfill throws on startup", async () => { const mockBackfillManager = { checkForInterruptedBackfill: vi.fn().mockRejectedValue(new Error("DB connection lost")), checkIfNeeded: vi.fn(), performBackfill: vi.fn(), resumeBackfill: vi.fn(), getIsRunning: vi.fn().mockReturnValue(false), }; firehoseService.setBackfillManager(mockBackfillManager as any); await firehoseService.start(); expect(firehoseService.isRunning()).toBe(true); expect(mockLogger.error).toHaveBeenCalledWith( "Backfill skipped due to startup error — firehose will start without it", expect.objectContaining({ event: "firehose.backfill.startup_error" }) ); }); }); }); describe("Backfill Integration", () => { let mockDb: Database; let firehoseService: FirehoseService; beforeEach(() => { const mockInsert = vi.fn().mockReturnValue({ values: vi.fn().mockReturnValue({ onConflictDoUpdate: vi.fn().mockResolvedValue(undefined), }), }); const mockSelect = vi.fn().mockReturnValue({ from: vi.fn().mockReturnValue({ where: vi.fn().mockReturnValue({ limit: vi.fn().mockResolvedValue([]), }), }), }); mockDb = { insert: mockInsert, select: mockSelect, } as unknown as Database; const mockLogger = createMockLogger(); firehoseService = new FirehoseService(mockDb, "wss://jetstream.example.com", mockLogger); }); afterEach(() => { // Use clearAllMocks (not restoreAllMocks) to preserve module mock implementations vi.clearAllMocks(); }); it("runs backfill before starting jetstream when checkIfNeeded returns CatchUp", async () => { const mockBackfillManager = { checkForInterruptedBackfill: vi.fn().mockResolvedValue(null), checkIfNeeded: vi.fn().mockResolvedValue(BackfillStatus.CatchUp), performBackfill: vi.fn().mockResolvedValue({ backfillId: 1n, type: BackfillStatus.CatchUp, didsProcessed: 10, recordsIndexed: 100, errors: 0, durationMs: 5000, }), resumeBackfill: vi.fn(), getIsRunning: vi.fn().mockReturnValue(false), }; firehoseService.setBackfillManager(mockBackfillManager as any); await firehoseService.start(); expect(mockBackfillManager.checkForInterruptedBackfill).toHaveBeenCalled(); expect(mockBackfillManager.performBackfill).toHaveBeenCalledWith(BackfillStatus.CatchUp); }); it("skips backfill when checkIfNeeded returns NotNeeded", async () => { const mockBackfillManager = { checkForInterruptedBackfill: vi.fn().mockResolvedValue(null), checkIfNeeded: vi.fn().mockResolvedValue(BackfillStatus.NotNeeded), performBackfill: vi.fn(), resumeBackfill: vi.fn(), getIsRunning: vi.fn().mockReturnValue(false), }; firehoseService.setBackfillManager(mockBackfillManager as any); await firehoseService.start(); expect(mockBackfillManager.performBackfill).not.toHaveBeenCalled(); }); it("resumes interrupted backfill before gap detection", async () => { const interruptedRow = { id: 5n, status: "in_progress", backfillType: "catch_up", lastProcessedDid: "did:plc:halfway", didsTotal: 100, didsProcessed: 50, recordsIndexed: 250, startedAt: new Date(), completedAt: null, errorMessage: null, }; const mockBackfillManager = { checkForInterruptedBackfill: vi.fn().mockResolvedValue(interruptedRow), resumeBackfill: vi.fn().mockResolvedValue({ backfillId: 5n, type: BackfillStatus.CatchUp, didsProcessed: 100, recordsIndexed: 500, errors: 0, durationMs: 3000, }), checkIfNeeded: vi.fn(), performBackfill: vi.fn(), getIsRunning: vi.fn().mockReturnValue(false), }; firehoseService.setBackfillManager(mockBackfillManager as any); await firehoseService.start(); expect(mockBackfillManager.resumeBackfill).toHaveBeenCalledWith(interruptedRow); // Gap detection should NOT run when there's an interrupted backfill expect(mockBackfillManager.checkIfNeeded).not.toHaveBeenCalled(); }); it("starts firehose normally when no backfillManager is set", async () => { // No setBackfillManager call — should start without errors await firehoseService.start(); expect(firehoseService.isRunning()).toBe(true); }); it("exposes indexer via getIndexer()", () => { // Verify getIndexer() returns the internal indexer instance const indexer = firehoseService.getIndexer(); expect(indexer).toBeDefined(); expect(typeof indexer.handlePostCreate).toBe("function"); }); it("does not re-run backfill on reconnect", async () => { const mockBackfillManager = { checkForInterruptedBackfill: vi.fn().mockResolvedValue(null), checkIfNeeded: vi.fn().mockResolvedValue(BackfillStatus.NotNeeded), performBackfill: vi.fn(), resumeBackfill: vi.fn(), getIsRunning: vi.fn().mockReturnValue(false), }; firehoseService.setBackfillManager(mockBackfillManager as any); // Initial start: backfill check runs once await firehoseService.start(); expect(mockBackfillManager.checkForInterruptedBackfill).toHaveBeenCalledTimes(1); // Simulate reconnect: handleReconnect sets running=false then calls start() again await firehoseService.stop(); await firehoseService.start(); // Backfill check must NOT run again after the initial start expect(mockBackfillManager.checkForInterruptedBackfill).toHaveBeenCalledTimes(1); }); });