import { config } from "../config.ts"; function getShardId(params: URLSearchParams): string { params.sort(); return params.toString(); } class ShardedConnectionManager> { private baseURL: string; private paramLimits: Record; private onMessage: (msg: any) => void; private rebalanceInterval: number; private subscriptions: T; private activeConnections: Map = new Map(); private rebalanceTimer?: number; constructor( baseURL: string, paramLimits: Record, onMessage: (msg: any) => void, rebalanceIntervalMs: number = 60 * 60 * 1000, ) { this.baseURL = baseURL; this.paramLimits = paramLimits; this.onMessage = onMessage; this.rebalanceInterval = Math.max(rebalanceIntervalMs, 30 * 60 * 1000); this.subscriptions = Object.keys(paramLimits).reduce((acc, key) => { acc[key as keyof T] = [] as any; return acc; }, {} as T); } public start(initialSubscriptions: T) { console.log('[Manager] Starting with initial subscriptions...'); for (const key in initialSubscriptions) { this.subscriptions[key] = initialSubscriptions[key]; } this._rebalance(); this.rebalanceTimer = setInterval(() => this._rebalance(), this.rebalanceInterval); console.log(`[Manager] Rebalance job scheduled to run every ${this.rebalanceInterval / 1000 / 60} minutes.`); } public stop() { console.log('[Manager] Stopping all connections...'); if (this.rebalanceTimer) { clearInterval(this.rebalanceTimer); } for (const ws of this.activeConnections.values()) { ws.close(); } this.activeConnections.clear(); console.log('[Manager] Stopped.'); } public add(newItems: Partial) { console.log('[Manager] Adding new items:', newItems); let hasNewItems = false; for (const key in newItems) { const itemsToAdd = newItems[key as keyof T]; if (itemsToAdd?.length) { if (!this.subscriptions[key as keyof T]) { this.subscriptions[key as keyof T] = [] as any; } const currentItems = new Set(this.subscriptions[key as keyof T]); const newUniqueItems = itemsToAdd.filter(item => !currentItems.has(item)); if (newUniqueItems.length > 0) { (this.subscriptions[key as keyof T] as string[]).push(...newUniqueItems); hasNewItems = true; } } } if (!hasNewItems) { console.log('[Manager] No new unique items to add.'); return; } console.log('[Manager] Creating temporary express connections for new items...'); const shards = this._calculateShards(newItems as T); for (const shard of shards) { const onOpenHandler = (ws: WebSocket) => { const shardId = getShardId(new URL(ws.url).searchParams); this.activeConnections.set(shardId, ws); }; this._createConnection(shard, onOpenHandler); } } private async _rebalance() { console.log('[Manager] Starting rebalance/compaction cycle...'); const oldConnections = new Map(this.activeConnections); const newConnections: Map = new Map(); const newShards = this._calculateShards(this.subscriptions); if (newShards.length === 0 && this.activeConnections.size > 0) { console.log('[Manager] No subscriptions, shutting down all connections.'); const timer = this.rebalanceTimer; this.stop(); this.rebalanceTimer = timer; return; } const connectionPromises = newShards.map(shard => { return new Promise((resolve, reject) => { const query = this._buildQuery(shard); const shardId = getShardId(query); if(oldConnections.has(shardId)) { console.log(`[Manager] Re-using existing connection for shard: ${shardId}`); newConnections.set(shardId, oldConnections.get(shardId)!); oldConnections.delete(shardId); resolve(); return; } const onOpenHandler = (ws: WebSocket) => { clearTimeout(timeout); newConnections.set(shardId, ws); resolve(); }; const ws = this._createConnection(shard, onOpenHandler); const timeout = setTimeout(() => reject(new Error(`Connection timed out for ${ws.url}`)), 10000); ws.onerror = (err) => { clearTimeout(timeout); reject(err); }; }); }); try { await Promise.all(connectionPromises); console.log('[Manager] All new connections are live.'); this.activeConnections = newConnections; console.log(`[Manager] Closing ${oldConnections.size} old/stale connections...`); for (const [shardId, ws] of oldConnections.entries()) { console.log(`[Manager] - Closing shard: ${shardId}`); ws.close(); } console.log('[Manager] Rebalance cycle complete.'); } catch (error) { console.error('[Manager] Failed to establish new connections during rebalance. Aborting switchover.', error); for(const ws of newConnections.values()) { const url = new URL(ws.url); if (!this.activeConnections.has(getShardId(url.searchParams))) { ws.close(); } } } } private _createConnection(shard: Partial, onOpenHandler: (ws: WebSocket) => void): WebSocket { const query = this._buildQuery(shard); const url = `${this.baseURL}?${query.toString()}`; const ws = new WebSocket(url); const shardId = getShardId(query); ws.onopen = () => { console.log(`[Manager] Shard connected: ${url}`); onOpenHandler(ws); }; ws.onmessage = (e) => { try { this.onMessage(JSON.parse(e.data)); } catch {} }; ws.onerror = (e) => console.error(`[Manager] Shard error: ${url}`, e); ws.onclose = () => { console.log(`[Manager] Shard disconnected: ${url}`); this.activeConnections.delete(shardId); }; return ws; } private _buildQuery(params: Partial): URLSearchParams { const query = new URLSearchParams(); for (const key in params) { const values = params[key]; if (values) { for (const value of values) { query.append(key, value); } } } return query; } private _calculateShards(params: T): Partial[] { const keys = Object.keys(params).filter(k => params[k as keyof T] && params[k as keyof T]!.length > 0) as (keyof T)[]; if (keys.length === 0) return []; const slicesPerKey = keys.map((key) => { const values = params[key]! const limit = this.paramLimits[key] ?? values.length; const chunks: string[][] = []; for (let i = 0; i < values.length; i += limit) { chunks.push(values.slice(i, i + limit)); } return { key, chunks: chunks.length ? chunks : [[]] }; }); const out: Partial[] = []; function recurse(index = 0, acc: Partial = {}) { if (index === slicesPerKey.length) { if (Object.values(acc).some(v => v.length > 0)) { out.push({ ...acc }); } return; } const { key, chunks } = slicesPerKey[index]; for (const chunk of chunks) { acc[key] = chunk as any; recurse(index + 1, acc); } } recurse(); return out; } } interface JetstreamParams { [key: string]: string[]; wantedDids: string[]; wantedCollections: string[]; } interface SpacedustParams { [key: string]: string[]; wantedSubjects: string[]; wantedSubjectDids: string[]; wantedSources: string[]; instant: string[]; } export class JetstreamManager extends ShardedConnectionManager { constructor(onMessage: (msg: any) => void) { super( `${config.jetstream}/subscribe`, { wantedDids: 10000, wantedCollections: 100 }, onMessage ); } } export class SpacedustManager extends ShardedConnectionManager { constructor(onMessage: (msg: any) => void) { super( `${config.spacedust}/subscribe`, { wantedSubjects: 100, wantedSubjectDids: 100, wantedSources: 100 }, onMessage ); } }