WIP! A BB-style forum, on the ATmosphere! We're still working... we'll be back soon when we have something to show off!
node typescript hono htmx atproto

refactor(appview): decompose FirehoseService into focused classes (#10)

Extract three focused classes to separate concerns in FirehoseService:

- CursorManager: manages firehose cursor persistence in database
- Handles loading/saving cursor state
- Provides rewind utility for safety margin

- CircuitBreaker: implements circuit breaker pattern
- Tracks consecutive operation failures
- Triggers callback when failure threshold exceeded
- Prevents cascading failures

- ReconnectionManager: handles reconnection with exponential backoff
- Implements backoff strategy: baseDelay * 2^(attempt - 1)
- Enforces max attempt limit
- Provides attempt count for monitoring

Benefits:
- Single Responsibility Principle: each class has one well-defined purpose
- Testability: classes can be tested in isolation with unit tests
- Reusability: helper classes can be reused in other services
- Maintainability: easier to understand, modify, and debug
- Monitoring: exposes failure/attempt counts for health checks

FirehoseService now delegates cursor, circuit breaker, and reconnection
concerns to these helper classes while focusing on WebSocket management
and event routing.

authored by

Malpercio and committed by
GitHub
5b97039d d3c76fe8

+896 -98
+203
apps/appview/src/lib/__tests__/circuit-breaker.test.ts
··· 1 + import { describe, it, expect, beforeEach, vi, afterEach } from "vitest"; 2 + import { CircuitBreaker } from "../circuit-breaker.js"; 3 + 4 + describe("CircuitBreaker", () => { 5 + let onBreak: ReturnType<typeof vi.fn>; 6 + let circuitBreaker: CircuitBreaker; 7 + 8 + beforeEach(() => { 9 + onBreak = vi.fn().mockResolvedValue(undefined); 10 + }); 11 + 12 + afterEach(() => { 13 + vi.clearAllMocks(); 14 + }); 15 + 16 + describe("Construction", () => { 17 + it("should initialize with maxFailures and onBreak callback", () => { 18 + expect(() => { 19 + circuitBreaker = new CircuitBreaker(5, onBreak); 20 + }).not.toThrow(); 21 + 22 + expect(circuitBreaker.getFailureCount()).toBe(0); 23 + }); 24 + }); 25 + 26 + describe("execute", () => { 27 + beforeEach(() => { 28 + circuitBreaker = new CircuitBreaker(3, onBreak); 29 + }); 30 + 31 + it("should execute operation successfully and reset counter", async () => { 32 + const operation = vi.fn().mockResolvedValue("success"); 33 + 34 + await circuitBreaker.execute(operation, "test-operation"); 35 + 36 + expect(operation).toHaveBeenCalledOnce(); 37 + expect(circuitBreaker.getFailureCount()).toBe(0); 38 + expect(onBreak).not.toHaveBeenCalled(); 39 + }); 40 + 41 + it("should track consecutive failures", async () => { 42 + const consoleSpy = vi.spyOn(console, "error").mockImplementation(() => {}); 43 + const operation = vi.fn().mockRejectedValue(new Error("Operation failed")); 44 + 45 + await circuitBreaker.execute(operation, "test-operation"); 46 + expect(circuitBreaker.getFailureCount()).toBe(1); 47 + 48 + await circuitBreaker.execute(operation, "test-operation"); 49 + expect(circuitBreaker.getFailureCount()).toBe(2); 50 + 51 + expect(onBreak).not.toHaveBeenCalled(); 52 + 53 + consoleSpy.mockRestore(); 54 + }); 55 + 56 + it("should trigger onBreak when max failures reached", async () => { 57 + const consoleSpy = vi.spyOn(console, "error").mockImplementation(() => {}); 58 + const operation = vi.fn().mockRejectedValue(new Error("Operation failed")); 59 + 60 + // Fail 3 times (maxFailures = 3) 61 + await circuitBreaker.execute(operation, "test-operation"); 62 + await circuitBreaker.execute(operation, "test-operation"); 63 + await circuitBreaker.execute(operation, "test-operation"); 64 + 65 + expect(circuitBreaker.getFailureCount()).toBe(3); 66 + expect(onBreak).toHaveBeenCalledOnce(); 67 + 68 + consoleSpy.mockRestore(); 69 + }); 70 + 71 + it("should log failures with operation name", async () => { 72 + const consoleSpy = vi.spyOn(console, "error").mockImplementation(() => {}); 73 + const operation = vi.fn().mockRejectedValue(new Error("Operation failed")); 74 + 75 + await circuitBreaker.execute(operation, "custom-operation"); 76 + 77 + expect(consoleSpy).toHaveBeenCalledWith( 78 + expect.stringContaining("custom-operation"), 79 + expect.any(Error) 80 + ); 81 + 82 + consoleSpy.mockRestore(); 83 + }); 84 + 85 + it("should log when circuit breaks", async () => { 86 + const consoleSpy = vi.spyOn(console, "error").mockImplementation(() => {}); 87 + const operation = vi.fn().mockRejectedValue(new Error("Operation failed")); 88 + 89 + // Trigger circuit breaker 90 + await circuitBreaker.execute(operation, "test-operation"); 91 + await circuitBreaker.execute(operation, "test-operation"); 92 + await circuitBreaker.execute(operation, "test-operation"); 93 + 94 + expect(consoleSpy).toHaveBeenCalledWith( 95 + expect.stringContaining("[CIRCUIT BREAKER] Max consecutive failures") 96 + ); 97 + 98 + consoleSpy.mockRestore(); 99 + }); 100 + 101 + it("should reset counter after successful operation", async () => { 102 + const consoleSpy = vi.spyOn(console, "error").mockImplementation(() => {}); 103 + const failingOp = vi.fn().mockRejectedValue(new Error("Failed")); 104 + const successOp = vi.fn().mockResolvedValue("success"); 105 + 106 + // Fail twice 107 + await circuitBreaker.execute(failingOp, "failing-op"); 108 + await circuitBreaker.execute(failingOp, "failing-op"); 109 + expect(circuitBreaker.getFailureCount()).toBe(2); 110 + 111 + // Succeed once - should reset counter 112 + await circuitBreaker.execute(successOp, "success-op"); 113 + expect(circuitBreaker.getFailureCount()).toBe(0); 114 + 115 + // Verify onBreak was never called 116 + expect(onBreak).not.toHaveBeenCalled(); 117 + 118 + consoleSpy.mockRestore(); 119 + }); 120 + }); 121 + 122 + describe("reset", () => { 123 + beforeEach(() => { 124 + circuitBreaker = new CircuitBreaker(3, onBreak); 125 + }); 126 + 127 + it("should reset failure counter", async () => { 128 + const consoleSpy = vi.spyOn(console, "error").mockImplementation(() => {}); 129 + const operation = vi.fn().mockRejectedValue(new Error("Failed")); 130 + 131 + await circuitBreaker.execute(operation, "test-operation"); 132 + await circuitBreaker.execute(operation, "test-operation"); 133 + expect(circuitBreaker.getFailureCount()).toBe(2); 134 + 135 + circuitBreaker.reset(); 136 + expect(circuitBreaker.getFailureCount()).toBe(0); 137 + 138 + consoleSpy.mockRestore(); 139 + }); 140 + }); 141 + 142 + describe("getFailureCount", () => { 143 + beforeEach(() => { 144 + circuitBreaker = new CircuitBreaker(3, onBreak); 145 + }); 146 + 147 + it("should return current failure count", async () => { 148 + const consoleSpy = vi.spyOn(console, "error").mockImplementation(() => {}); 149 + const operation = vi.fn().mockRejectedValue(new Error("Failed")); 150 + 151 + expect(circuitBreaker.getFailureCount()).toBe(0); 152 + 153 + await circuitBreaker.execute(operation, "test-operation"); 154 + expect(circuitBreaker.getFailureCount()).toBe(1); 155 + 156 + await circuitBreaker.execute(operation, "test-operation"); 157 + expect(circuitBreaker.getFailureCount()).toBe(2); 158 + 159 + consoleSpy.mockRestore(); 160 + }); 161 + }); 162 + 163 + describe("Edge Cases", () => { 164 + it("should handle onBreak callback errors", async () => { 165 + const consoleSpy = vi.spyOn(console, "error").mockImplementation(() => {}); 166 + const failingOnBreak = vi.fn().mockRejectedValue(new Error("onBreak failed")); 167 + circuitBreaker = new CircuitBreaker(2, failingOnBreak); 168 + 169 + const operation = vi.fn().mockRejectedValue(new Error("Operation failed")); 170 + 171 + // This should not throw even if onBreak fails 172 + await circuitBreaker.execute(operation, "test-operation"); 173 + 174 + // Second call triggers the circuit breaker, which calls the failing onBreak 175 + // We need to catch the unhandled promise rejection from onBreak 176 + try { 177 + await circuitBreaker.execute(operation, "test-operation"); 178 + // Wait a bit for the onBreak promise to be handled 179 + await new Promise(resolve => setTimeout(resolve, 10)); 180 + } catch { 181 + // Ignore error from onBreak 182 + } 183 + 184 + expect(failingOnBreak).toHaveBeenCalled(); 185 + 186 + consoleSpy.mockRestore(); 187 + }); 188 + 189 + it("should handle maxFailures of 1", async () => { 190 + const consoleSpy = vi.spyOn(console, "error").mockImplementation(() => {}); 191 + circuitBreaker = new CircuitBreaker(1, onBreak); 192 + 193 + const operation = vi.fn().mockRejectedValue(new Error("Failed")); 194 + 195 + await circuitBreaker.execute(operation, "test-operation"); 196 + 197 + expect(circuitBreaker.getFailureCount()).toBe(1); 198 + expect(onBreak).toHaveBeenCalledOnce(); 199 + 200 + consoleSpy.mockRestore(); 201 + }); 202 + }); 203 + });
+192
apps/appview/src/lib/__tests__/cursor-manager.test.ts
··· 1 + import { describe, it, expect, beforeEach, vi, afterEach } from "vitest"; 2 + import { CursorManager } from "../cursor-manager.js"; 3 + import type { Database } from "@atbb/db"; 4 + 5 + describe("CursorManager", () => { 6 + let mockDb: Database; 7 + let cursorManager: CursorManager; 8 + 9 + beforeEach(() => { 10 + // Create mock database with common patterns 11 + const mockInsert = vi.fn().mockReturnValue({ 12 + values: vi.fn().mockReturnValue({ 13 + onConflictDoUpdate: vi.fn().mockResolvedValue(undefined), 14 + }), 15 + }); 16 + 17 + const mockSelect = vi.fn().mockReturnValue({ 18 + from: vi.fn().mockReturnValue({ 19 + where: vi.fn().mockReturnValue({ 20 + limit: vi.fn().mockResolvedValue([]), 21 + }), 22 + }), 23 + }); 24 + 25 + mockDb = { 26 + insert: mockInsert, 27 + select: mockSelect, 28 + } as unknown as Database; 29 + 30 + cursorManager = new CursorManager(mockDb); 31 + }); 32 + 33 + afterEach(() => { 34 + vi.clearAllMocks(); 35 + }); 36 + 37 + describe("load", () => { 38 + it("should return null when no cursor exists", async () => { 39 + // Mock empty result 40 + vi.spyOn(mockDb, "select").mockReturnValue({ 41 + from: vi.fn().mockReturnValue({ 42 + where: vi.fn().mockReturnValue({ 43 + limit: vi.fn().mockResolvedValue([]), 44 + }), 45 + }), 46 + } as any); 47 + 48 + const cursor = await cursorManager.load(); 49 + expect(cursor).toBeNull(); 50 + }); 51 + 52 + it("should return saved cursor when it exists", async () => { 53 + const savedCursor = BigInt(1234567890000000); 54 + 55 + // Mock cursor retrieval 56 + vi.spyOn(mockDb, "select").mockReturnValue({ 57 + from: vi.fn().mockReturnValue({ 58 + where: vi.fn().mockReturnValue({ 59 + limit: vi.fn().mockResolvedValue([{ cursor: savedCursor }]), 60 + }), 61 + }), 62 + } as any); 63 + 64 + const cursor = await cursorManager.load(); 65 + expect(cursor).toBe(savedCursor); 66 + }); 67 + 68 + it("should return null and log error on database failure", async () => { 69 + const consoleSpy = vi.spyOn(console, "error").mockImplementation(() => {}); 70 + 71 + // Mock database error 72 + vi.spyOn(mockDb, "select").mockReturnValue({ 73 + from: vi.fn().mockReturnValue({ 74 + where: vi.fn().mockReturnValue({ 75 + limit: vi.fn().mockRejectedValue(new Error("Database error")), 76 + }), 77 + }), 78 + } as any); 79 + 80 + const cursor = await cursorManager.load(); 81 + expect(cursor).toBeNull(); 82 + expect(consoleSpy).toHaveBeenCalledWith( 83 + "Failed to load cursor from database:", 84 + expect.any(Error) 85 + ); 86 + 87 + consoleSpy.mockRestore(); 88 + }); 89 + 90 + it("should allow custom service name", async () => { 91 + const savedCursor = BigInt(9876543210000000); 92 + 93 + // Mock cursor retrieval 94 + const whereFn = vi.fn().mockReturnValue({ 95 + limit: vi.fn().mockResolvedValue([{ cursor: savedCursor }]), 96 + }); 97 + 98 + vi.spyOn(mockDb, "select").mockReturnValue({ 99 + from: vi.fn().mockReturnValue({ 100 + where: whereFn, 101 + }), 102 + } as any); 103 + 104 + const cursor = await cursorManager.load("custom-service"); 105 + expect(cursor).toBe(savedCursor); 106 + }); 107 + }); 108 + 109 + describe("update", () => { 110 + it("should update cursor in database", async () => { 111 + const mockInsert = vi.fn().mockReturnValue({ 112 + values: vi.fn().mockReturnValue({ 113 + onConflictDoUpdate: vi.fn().mockResolvedValue(undefined), 114 + }), 115 + }); 116 + 117 + vi.spyOn(mockDb, "insert").mockImplementation(mockInsert); 118 + 119 + await cursorManager.update(1234567890000000); 120 + 121 + expect(mockInsert).toHaveBeenCalled(); 122 + }); 123 + 124 + it("should not throw on database failure", async () => { 125 + const consoleSpy = vi.spyOn(console, "error").mockImplementation(() => {}); 126 + 127 + // Mock database error 128 + vi.spyOn(mockDb, "insert").mockReturnValue({ 129 + values: vi.fn().mockReturnValue({ 130 + onConflictDoUpdate: vi.fn().mockRejectedValue(new Error("Database error")), 131 + }), 132 + } as any); 133 + 134 + // Should not throw 135 + await expect(cursorManager.update(1234567890000000)).resolves.toBeUndefined(); 136 + 137 + expect(consoleSpy).toHaveBeenCalledWith( 138 + "Failed to update cursor:", 139 + expect.any(Error) 140 + ); 141 + 142 + consoleSpy.mockRestore(); 143 + }); 144 + 145 + it("should allow custom service name", async () => { 146 + const valuesFn = vi.fn().mockReturnValue({ 147 + onConflictDoUpdate: vi.fn().mockResolvedValue(undefined), 148 + }); 149 + 150 + vi.spyOn(mockDb, "insert").mockReturnValue({ 151 + values: valuesFn, 152 + } as any); 153 + 154 + await cursorManager.update(1234567890000000, "custom-service"); 155 + 156 + // Verify values was called with custom service 157 + expect(valuesFn).toHaveBeenCalledWith({ 158 + service: "custom-service", 159 + cursor: BigInt(1234567890000000), 160 + updatedAt: expect.any(Date), 161 + }); 162 + }); 163 + }); 164 + 165 + describe("rewind", () => { 166 + it("should rewind cursor by specified microseconds", () => { 167 + const cursor = BigInt(1234567890000000); 168 + const rewindAmount = 10_000_000; // 10 seconds 169 + 170 + const rewound = cursorManager.rewind(cursor, rewindAmount); 171 + 172 + expect(rewound).toBe(cursor - BigInt(rewindAmount)); 173 + }); 174 + 175 + it("should handle zero rewind", () => { 176 + const cursor = BigInt(1234567890000000); 177 + 178 + const rewound = cursorManager.rewind(cursor, 0); 179 + 180 + expect(rewound).toBe(cursor); 181 + }); 182 + 183 + it("should handle large rewind amounts", () => { 184 + const cursor = BigInt(1234567890000000); 185 + const rewindAmount = 1_000_000_000; // 1000 seconds 186 + 187 + const rewound = cursorManager.rewind(cursor, rewindAmount); 188 + 189 + expect(rewound).toBe(cursor - BigInt(rewindAmount)); 190 + }); 191 + }); 192 + });
+257
apps/appview/src/lib/__tests__/reconnection-manager.test.ts
··· 1 + import { describe, it, expect, beforeEach, vi, afterEach } from "vitest"; 2 + import { ReconnectionManager } from "../reconnection-manager.js"; 3 + 4 + describe("ReconnectionManager", () => { 5 + let reconnectionManager: ReconnectionManager; 6 + let reconnectFn: ReturnType<typeof vi.fn>; 7 + 8 + beforeEach(() => { 9 + vi.useFakeTimers(); 10 + reconnectFn = vi.fn().mockResolvedValue(undefined); 11 + }); 12 + 13 + afterEach(() => { 14 + vi.clearAllMocks(); 15 + vi.useRealTimers(); 16 + }); 17 + 18 + describe("Construction", () => { 19 + it("should initialize with maxAttempts and baseDelayMs", () => { 20 + expect(() => { 21 + reconnectionManager = new ReconnectionManager(5, 1000); 22 + }).not.toThrow(); 23 + 24 + expect(reconnectionManager.getAttemptCount()).toBe(0); 25 + }); 26 + }); 27 + 28 + describe("attemptReconnect", () => { 29 + beforeEach(() => { 30 + reconnectionManager = new ReconnectionManager(3, 1000); 31 + }); 32 + 33 + it("should attempt reconnection with exponential backoff", async () => { 34 + const consoleSpy = vi.spyOn(console, "log").mockImplementation(() => {}); 35 + 36 + // First attempt: 1000ms delay (1000 * 2^0) 37 + const promise1 = reconnectionManager.attemptReconnect(reconnectFn); 38 + expect(consoleSpy).toHaveBeenCalledWith( 39 + expect.stringContaining("(1/3) in 1000ms") 40 + ); 41 + await vi.advanceTimersByTimeAsync(1000); 42 + await promise1; 43 + expect(reconnectFn).toHaveBeenCalledTimes(1); 44 + 45 + // Second attempt: 2000ms delay (1000 * 2^1) 46 + const promise2 = reconnectionManager.attemptReconnect(reconnectFn); 47 + expect(consoleSpy).toHaveBeenCalledWith( 48 + expect.stringContaining("(2/3) in 2000ms") 49 + ); 50 + await vi.advanceTimersByTimeAsync(2000); 51 + await promise2; 52 + expect(reconnectFn).toHaveBeenCalledTimes(2); 53 + 54 + // Third attempt: 4000ms delay (1000 * 2^2) 55 + const promise3 = reconnectionManager.attemptReconnect(reconnectFn); 56 + expect(consoleSpy).toHaveBeenCalledWith( 57 + expect.stringContaining("(3/3) in 4000ms") 58 + ); 59 + await vi.advanceTimersByTimeAsync(4000); 60 + await promise3; 61 + expect(reconnectFn).toHaveBeenCalledTimes(3); 62 + 63 + consoleSpy.mockRestore(); 64 + }); 65 + 66 + it("should throw error when max attempts exceeded", async () => { 67 + const consoleSpy = vi.spyOn(console, "error").mockImplementation(() => {}); 68 + 69 + // Exhaust all attempts 70 + const promise1 = reconnectionManager.attemptReconnect(reconnectFn); 71 + await vi.runAllTimersAsync(); 72 + await promise1; 73 + 74 + const promise2 = reconnectionManager.attemptReconnect(reconnectFn); 75 + await vi.runAllTimersAsync(); 76 + await promise2; 77 + 78 + const promise3 = reconnectionManager.attemptReconnect(reconnectFn); 79 + await vi.runAllTimersAsync(); 80 + await promise3; 81 + 82 + // Fourth attempt should throw 83 + await expect(reconnectionManager.attemptReconnect(reconnectFn)).rejects.toThrow( 84 + "Max reconnection attempts exceeded" 85 + ); 86 + 87 + expect(consoleSpy).toHaveBeenCalledWith( 88 + expect.stringContaining("[FATAL] Max reconnect attempts") 89 + ); 90 + 91 + consoleSpy.mockRestore(); 92 + }); 93 + 94 + it("should log reconnection attempts", async () => { 95 + const consoleSpy = vi.spyOn(console, "log").mockImplementation(() => {}); 96 + 97 + const promise = reconnectionManager.attemptReconnect(reconnectFn); 98 + await vi.runAllTimersAsync(); 99 + await promise; 100 + 101 + expect(consoleSpy).toHaveBeenCalledWith( 102 + expect.stringContaining("Attempting to reconnect") 103 + ); 104 + 105 + consoleSpy.mockRestore(); 106 + }); 107 + 108 + it("should propagate errors from reconnectFn", async () => { 109 + const error = new Error("Reconnection failed"); 110 + reconnectFn.mockRejectedValueOnce(error); 111 + 112 + // Run the reconnection attempt and catch the expected error 113 + await expect(async () => { 114 + const promise = reconnectionManager.attemptReconnect(reconnectFn); 115 + await vi.runAllTimersAsync(); 116 + await promise; 117 + }).rejects.toThrow("Reconnection failed"); 118 + }); 119 + }); 120 + 121 + describe("reset", () => { 122 + beforeEach(() => { 123 + reconnectionManager = new ReconnectionManager(3, 1000); 124 + }); 125 + 126 + it("should reset attempt counter", async () => { 127 + // Make one attempt 128 + const promise = reconnectionManager.attemptReconnect(reconnectFn); 129 + await vi.runAllTimersAsync(); 130 + await promise; 131 + 132 + expect(reconnectionManager.getAttemptCount()).toBe(1); 133 + 134 + // Reset 135 + reconnectionManager.reset(); 136 + expect(reconnectionManager.getAttemptCount()).toBe(0); 137 + }); 138 + 139 + it("should allow reconnection after reset", async () => { 140 + const consoleSpy = vi.spyOn(console, "log").mockImplementation(() => {}); 141 + 142 + // Exhaust all attempts 143 + const promise1 = reconnectionManager.attemptReconnect(reconnectFn); 144 + await vi.runAllTimersAsync(); 145 + await promise1; 146 + 147 + const promise2 = reconnectionManager.attemptReconnect(reconnectFn); 148 + await vi.runAllTimersAsync(); 149 + await promise2; 150 + 151 + const promise3 = reconnectionManager.attemptReconnect(reconnectFn); 152 + await vi.runAllTimersAsync(); 153 + await promise3; 154 + 155 + expect(reconnectionManager.getAttemptCount()).toBe(3); 156 + 157 + // Reset should allow new attempts 158 + reconnectionManager.reset(); 159 + 160 + const promise4 = reconnectionManager.attemptReconnect(reconnectFn); 161 + expect(consoleSpy).toHaveBeenCalledWith( 162 + expect.stringContaining("(1/3) in 1000ms") 163 + ); 164 + await vi.runAllTimersAsync(); 165 + await promise4; 166 + 167 + consoleSpy.mockRestore(); 168 + }); 169 + }); 170 + 171 + describe("getAttemptCount", () => { 172 + beforeEach(() => { 173 + reconnectionManager = new ReconnectionManager(3, 1000); 174 + }); 175 + 176 + it("should return current attempt count", async () => { 177 + expect(reconnectionManager.getAttemptCount()).toBe(0); 178 + 179 + const promise1 = reconnectionManager.attemptReconnect(reconnectFn); 180 + await vi.runAllTimersAsync(); 181 + await promise1; 182 + expect(reconnectionManager.getAttemptCount()).toBe(1); 183 + 184 + const promise2 = reconnectionManager.attemptReconnect(reconnectFn); 185 + await vi.runAllTimersAsync(); 186 + await promise2; 187 + expect(reconnectionManager.getAttemptCount()).toBe(2); 188 + }); 189 + }); 190 + 191 + describe("Edge Cases", () => { 192 + it("should handle maxAttempts of 1", async () => { 193 + const consoleSpy = vi.spyOn(console, "error").mockImplementation(() => {}); 194 + reconnectionManager = new ReconnectionManager(1, 1000); 195 + 196 + const promise = reconnectionManager.attemptReconnect(reconnectFn); 197 + await vi.runAllTimersAsync(); 198 + await promise; 199 + 200 + expect(reconnectionManager.getAttemptCount()).toBe(1); 201 + 202 + // Second attempt should fail 203 + await expect(reconnectionManager.attemptReconnect(reconnectFn)).rejects.toThrow( 204 + "Max reconnection attempts exceeded" 205 + ); 206 + 207 + consoleSpy.mockRestore(); 208 + }); 209 + 210 + it("should handle very small base delays", async () => { 211 + reconnectionManager = new ReconnectionManager(2, 10); 212 + 213 + const promise = reconnectionManager.attemptReconnect(reconnectFn); 214 + await vi.advanceTimersByTimeAsync(10); 215 + await promise; 216 + 217 + expect(reconnectFn).toHaveBeenCalled(); 218 + }); 219 + 220 + it("should calculate exponential backoff correctly for many attempts", async () => { 221 + const consoleSpy = vi.spyOn(console, "log").mockImplementation(() => {}); 222 + reconnectionManager = new ReconnectionManager(5, 100); 223 + 224 + // Attempt 1: 100ms (100 * 2^0) 225 + let promise = reconnectionManager.attemptReconnect(reconnectFn); 226 + expect(consoleSpy).toHaveBeenCalledWith(expect.stringContaining("in 100ms")); 227 + await vi.runAllTimersAsync(); 228 + await promise; 229 + 230 + // Attempt 2: 200ms (100 * 2^1) 231 + promise = reconnectionManager.attemptReconnect(reconnectFn); 232 + expect(consoleSpy).toHaveBeenCalledWith(expect.stringContaining("in 200ms")); 233 + await vi.runAllTimersAsync(); 234 + await promise; 235 + 236 + // Attempt 3: 400ms (100 * 2^2) 237 + promise = reconnectionManager.attemptReconnect(reconnectFn); 238 + expect(consoleSpy).toHaveBeenCalledWith(expect.stringContaining("in 400ms")); 239 + await vi.runAllTimersAsync(); 240 + await promise; 241 + 242 + // Attempt 4: 800ms (100 * 2^3) 243 + promise = reconnectionManager.attemptReconnect(reconnectFn); 244 + expect(consoleSpy).toHaveBeenCalledWith(expect.stringContaining("in 800ms")); 245 + await vi.runAllTimersAsync(); 246 + await promise; 247 + 248 + // Attempt 5: 1600ms (100 * 2^4) 249 + promise = reconnectionManager.attemptReconnect(reconnectFn); 250 + expect(consoleSpy).toHaveBeenCalledWith(expect.stringContaining("in 1600ms")); 251 + await vi.runAllTimersAsync(); 252 + await promise; 253 + 254 + consoleSpy.mockRestore(); 255 + }); 256 + }); 257 + });
+69
apps/appview/src/lib/circuit-breaker.ts
··· 1 + /** 2 + * Implements circuit breaker pattern to prevent cascading failures. 3 + * 4 + * Tracks consecutive errors and triggers a callback when threshold is exceeded. 5 + * This prevents the system from repeatedly attempting operations that are likely 6 + * to fail, allowing for graceful degradation. 7 + */ 8 + export class CircuitBreaker { 9 + private consecutiveFailures = 0; 10 + 11 + /** 12 + * @param maxFailures - Maximum consecutive failures before circuit breaks 13 + * @param onBreak - Callback invoked when circuit breaks 14 + */ 15 + constructor( 16 + private maxFailures: number, 17 + private onBreak: () => Promise<void> 18 + ) {} 19 + 20 + /** 21 + * Execute an operation with circuit breaker protection 22 + * 23 + * @param operation - The async operation to execute 24 + * @param operationName - Name for logging purposes 25 + */ 26 + async execute<T>( 27 + operation: () => Promise<T>, 28 + operationName: string 29 + ): Promise<void> { 30 + try { 31 + await operation(); 32 + this.reset(); 33 + } catch (error) { 34 + await this.recordFailure(operationName, error); 35 + } 36 + } 37 + 38 + /** 39 + * Record a failure and check if circuit should break 40 + */ 41 + private async recordFailure(operationName: string, error: unknown): Promise<void> { 42 + this.consecutiveFailures++; 43 + console.error( 44 + `[CIRCUIT BREAKER] ${operationName} failed (${this.consecutiveFailures}/${this.maxFailures}):`, 45 + error 46 + ); 47 + 48 + if (this.consecutiveFailures >= this.maxFailures) { 49 + console.error( 50 + `[CIRCUIT BREAKER] Max consecutive failures (${this.maxFailures}) reached.` 51 + ); 52 + await this.onBreak(); 53 + } 54 + } 55 + 56 + /** 57 + * Reset the failure counter (call on successful operation) 58 + */ 59 + reset(): void { 60 + this.consecutiveFailures = 0; 61 + } 62 + 63 + /** 64 + * Get current failure count for monitoring 65 + */ 66 + getFailureCount(): number { 67 + return this.consecutiveFailures; 68 + } 69 + }
+72
apps/appview/src/lib/cursor-manager.ts
··· 1 + import { type Database, firehoseCursor } from "@atbb/db"; 2 + import { eq } from "drizzle-orm"; 3 + 4 + /** 5 + * Manages firehose cursor persistence in the database. 6 + * 7 + * The cursor tracks the last processed event timestamp (in microseconds) 8 + * to enable resumption after restart or reconnection. 9 + */ 10 + export class CursorManager { 11 + constructor(private db: Database) {} 12 + 13 + /** 14 + * Load the last cursor from database 15 + * 16 + * @param service - Service name (default: "jetstream") 17 + * @returns The last cursor value, or null if none exists 18 + */ 19 + async load(service: string = "jetstream"): Promise<bigint | null> { 20 + try { 21 + const result = await this.db 22 + .select() 23 + .from(firehoseCursor) 24 + .where(eq(firehoseCursor.service, service)) 25 + .limit(1); 26 + 27 + return result.length > 0 ? result[0].cursor : null; 28 + } catch (error) { 29 + console.error("Failed to load cursor from database:", error); 30 + return null; 31 + } 32 + } 33 + 34 + /** 35 + * Update the cursor in database 36 + * 37 + * @param timeUs - Timestamp in microseconds 38 + * @param service - Service name (default: "jetstream") 39 + */ 40 + async update(timeUs: number, service: string = "jetstream"): Promise<void> { 41 + try { 42 + await this.db 43 + .insert(firehoseCursor) 44 + .values({ 45 + service, 46 + cursor: BigInt(timeUs), 47 + updatedAt: new Date(), 48 + }) 49 + .onConflictDoUpdate({ 50 + target: firehoseCursor.service, 51 + set: { 52 + cursor: BigInt(timeUs), 53 + updatedAt: new Date(), 54 + }, 55 + }); 56 + } catch (error) { 57 + // Don't throw - we don't want cursor updates to break the stream 58 + console.error("Failed to update cursor:", error); 59 + } 60 + } 61 + 62 + /** 63 + * Rewind cursor by specified microseconds for safety margin 64 + * 65 + * @param cursor - Current cursor value 66 + * @param microseconds - Amount to rewind in microseconds 67 + * @returns Rewound cursor value 68 + */ 69 + rewind(cursor: bigint, microseconds: number): bigint { 70 + return cursor - BigInt(microseconds); 71 + } 72 + }
+38 -98
apps/appview/src/lib/firehose.ts
··· 1 1 import { Jetstream } from "@skyware/jetstream"; 2 - import { type Database, firehoseCursor } from "@atbb/db"; 3 - import { eq } from "drizzle-orm"; 2 + import { type Database } from "@atbb/db"; 4 3 import { Indexer } from "./indexer.js"; 4 + import { CursorManager } from "./cursor-manager.js"; 5 + import { CircuitBreaker } from "./circuit-breaker.js"; 6 + import { ReconnectionManager } from "./reconnection-manager.js"; 5 7 6 8 /** 7 9 * Firehose service that subscribes to AT Proto Jetstream 8 10 * and indexes space.atbb.* records into the database. 11 + * 12 + * Responsibilities: 13 + * - WebSocket connection management via Jetstream 14 + * - Event routing to indexer handlers 15 + * - Health status monitoring 16 + * 17 + * Delegates to: 18 + * - CursorManager: cursor persistence 19 + * - CircuitBreaker: failure tracking and circuit breaking 20 + * - ReconnectionManager: reconnection with exponential backoff 9 21 */ 10 22 export class FirehoseService { 11 23 private jetstream: Jetstream; 12 24 private indexer: Indexer; 13 25 private isRunning = false; 14 - private reconnectAttempts = 0; 15 - private readonly maxReconnectAttempts = 10; 16 - private readonly reconnectDelayMs = 5000; 17 - 18 - // Circuit breaker for handler failures 19 - private consecutiveFailures = 0; 20 - private readonly maxConsecutiveFailures = 100; 26 + private cursorManager: CursorManager; 27 + private circuitBreaker: CircuitBreaker; 28 + private reconnectionManager: ReconnectionManager; 21 29 22 30 // Collections we're interested in (full lexicon IDs) 23 31 private readonly wantedCollections = [ ··· 35 43 ) { 36 44 // Initialize the indexer instance with the database 37 45 this.indexer = new Indexer(db); 46 + 47 + // Initialize helper classes 48 + this.cursorManager = new CursorManager(db); 49 + this.circuitBreaker = new CircuitBreaker(100, () => this.stop()); 50 + this.reconnectionManager = new ReconnectionManager(10, 5000); 38 51 39 52 // Initialize with a placeholder - will be recreated with cursor in start() 40 53 this.jetstream = this.createJetstream(); ··· 133 146 134 147 // Listen to all commits to track cursor 135 148 this.jetstream.on("commit", async (event) => { 136 - await this.updateCursor(event.time_us); 149 + await this.cursorManager.update(event.time_us); 137 150 }); 138 151 139 152 // Handle errors and disconnections ··· 154 167 155 168 try { 156 169 // Load the last cursor from database 157 - const savedCursor = await this.loadCursor(); 170 + const savedCursor = await this.cursorManager.load(); 158 171 if (savedCursor) { 159 172 console.log(`Resuming from cursor: ${savedCursor}`); 160 173 // Rewind by 10 seconds to ensure we don't miss any events 161 - const rewindedCursor = savedCursor - BigInt(10_000_000); // 10 seconds in microseconds 174 + const rewindedCursor = this.cursorManager.rewind(savedCursor, 10_000_000); 162 175 163 176 // Recreate Jetstream instance with cursor 164 177 this.jetstream = this.createJetstream(Number(rewindedCursor)); ··· 168 181 console.log(`Starting Jetstream firehose subscription to ${this.jetstreamUrl}`); 169 182 await this.jetstream.start(); 170 183 this.isRunning = true; 171 - this.reconnectAttempts = 0; 184 + this.reconnectionManager.reset(); 172 185 console.log("Jetstream firehose subscription started successfully"); 173 186 } catch (error) { 174 187 console.error("Failed to start Jetstream firehose:", error); ··· 209 222 } { 210 223 return { 211 224 isRunning: this.isRunning, 212 - reconnectAttempts: this.reconnectAttempts, 213 - consecutiveFailures: this.consecutiveFailures, 214 - maxReconnectAttempts: this.maxReconnectAttempts, 215 - maxConsecutiveFailures: this.maxConsecutiveFailures, 225 + reconnectAttempts: this.reconnectionManager.getAttemptCount(), 226 + consecutiveFailures: this.circuitBreaker.getFailureCount(), 227 + maxReconnectAttempts: 10, 228 + maxConsecutiveFailures: 100, 216 229 }; 217 230 } 218 231 ··· 220 233 * Handle reconnection with exponential backoff 221 234 */ 222 235 private async handleReconnect() { 223 - if (this.reconnectAttempts >= this.maxReconnectAttempts) { 224 - console.error( 225 - `[FATAL] Max reconnect attempts (${this.maxReconnectAttempts}) reached. Firehose indexing has stopped.` 226 - ); 236 + try { 237 + await this.reconnectionManager.attemptReconnect(async () => { 238 + this.isRunning = false; 239 + await this.start(); 240 + }); 241 + } catch (error) { 227 242 console.error( 228 - `[FATAL] The appview will continue serving stale data. Manual intervention required.` 243 + `[FATAL] Firehose indexing has stopped. The appview will continue serving stale data.` 229 244 ); 230 245 this.isRunning = false; 231 - return; 232 246 } 233 - 234 - this.reconnectAttempts++; 235 - const delay = this.reconnectDelayMs * Math.pow(2, this.reconnectAttempts - 1); 236 - console.log( 237 - `Attempting to reconnect (${this.reconnectAttempts}/${this.maxReconnectAttempts}) in ${delay}ms` 238 - ); 239 - 240 - setTimeout(async () => { 241 - this.isRunning = false; 242 - await this.start(); 243 - }, delay); 244 247 } 245 248 246 249 /** 247 - * Load the last cursor from database 248 - */ 249 - private async loadCursor(): Promise<bigint | null> { 250 - try { 251 - const result = await this.db 252 - .select() 253 - .from(firehoseCursor) 254 - .where(eq(firehoseCursor.service, "jetstream")) 255 - .limit(1); 256 - 257 - return result.length > 0 ? result[0].cursor : null; 258 - } catch (error) { 259 - console.error("Failed to load cursor from database:", error); 260 - return null; 261 - } 262 - } 263 - 264 - /** 265 - * Update the cursor in database 266 - */ 267 - private async updateCursor(timeUs: number) { 268 - try { 269 - await this.db 270 - .insert(firehoseCursor) 271 - .values({ 272 - service: "jetstream", 273 - cursor: BigInt(timeUs), 274 - updatedAt: new Date(), 275 - }) 276 - .onConflictDoUpdate({ 277 - target: firehoseCursor.service, 278 - set: { 279 - cursor: BigInt(timeUs), 280 - updatedAt: new Date(), 281 - }, 282 - }); 283 - } catch (error) { 284 - // Don't throw - we don't want cursor updates to break the stream 285 - console.error("Failed to update cursor:", error); 286 - } 287 - } 288 - 289 - // ── Circuit Breaker ───────────────────────────────────── 290 - 291 - /** 292 250 * Wrap handler to track failures and stop firehose on excessive errors 293 251 */ 294 252 private async wrapHandler<T>( ··· 296 254 event: T, 297 255 handlerName: string 298 256 ): Promise<void> { 299 - try { 300 - await handler(event); 301 - // Success - reset failure counter 302 - this.consecutiveFailures = 0; 303 - } catch (error) { 304 - this.consecutiveFailures++; 305 - console.error( 306 - `[HANDLER ERROR] ${handlerName} failed (${this.consecutiveFailures}/${this.maxConsecutiveFailures}):`, 307 - error 308 - ); 309 - 310 - // Check circuit breaker threshold 311 - if (this.consecutiveFailures >= this.maxConsecutiveFailures) { 312 - console.error( 313 - `[CIRCUIT BREAKER] Max consecutive failures (${this.maxConsecutiveFailures}) reached. Stopping firehose to prevent data loss.` 314 - ); 315 - await this.stop(); 316 - } 317 - } 257 + await this.circuitBreaker.execute(() => handler(event), handlerName); 318 258 } 319 259 320 260 // ── Event Handlers ──────────────────────────────────────
+65
apps/appview/src/lib/reconnection-manager.ts
··· 1 + /** 2 + * Manages reconnection attempts with exponential backoff. 3 + * 4 + * Implements a backoff strategy where each retry is delayed by 5 + * baseDelay * 2^(attempt - 1), providing increasing delays between 6 + * reconnection attempts to avoid overwhelming the service. 7 + */ 8 + export class ReconnectionManager { 9 + private attempts = 0; 10 + 11 + /** 12 + * @param maxAttempts - Maximum number of reconnection attempts 13 + * @param baseDelayMs - Base delay in milliseconds (will be exponentially increased) 14 + */ 15 + constructor( 16 + private maxAttempts: number, 17 + private baseDelayMs: number 18 + ) {} 19 + 20 + /** 21 + * Attempt to reconnect with exponential backoff 22 + * 23 + * @param reconnectFn - Function to call to perform the reconnection 24 + * @throws Error if max attempts exceeded 25 + */ 26 + async attemptReconnect(reconnectFn: () => Promise<void>): Promise<void> { 27 + if (this.attempts >= this.maxAttempts) { 28 + console.error( 29 + `[FATAL] Max reconnect attempts (${this.maxAttempts}) reached. Manual intervention required.` 30 + ); 31 + throw new Error('Max reconnection attempts exceeded'); 32 + } 33 + 34 + this.attempts++; 35 + const delay = this.baseDelayMs * Math.pow(2, this.attempts - 1); 36 + 37 + console.log( 38 + `Attempting to reconnect (${this.attempts}/${this.maxAttempts}) in ${delay}ms` 39 + ); 40 + 41 + await this.sleep(delay); 42 + await reconnectFn(); 43 + } 44 + 45 + /** 46 + * Reset the attempt counter (call on successful connection) 47 + */ 48 + reset(): void { 49 + this.attempts = 0; 50 + } 51 + 52 + /** 53 + * Get current attempt count for monitoring 54 + */ 55 + getAttemptCount(): number { 56 + return this.attempts; 57 + } 58 + 59 + /** 60 + * Sleep for specified milliseconds 61 + */ 62 + private sleep(ms: number): Promise<void> { 63 + return new Promise(resolve => setTimeout(resolve, ms)); 64 + } 65 + }