an attempt to make a lightweight, easily self-hostable, scoped bluesky appview
at main 266 lines 8.2 kB view raw
1import { config } from "../config.ts"; 2 3function getShardId(params: URLSearchParams): string { 4 params.sort(); 5 return params.toString(); 6} 7 8class ShardedConnectionManager<T extends Record<string, string[]>> { 9 private baseURL: string; 10 private paramLimits: Record<keyof T, number>; 11 private onMessage: (msg: any) => void; 12 private rebalanceInterval: number; 13 14 private subscriptions: T; 15 private activeConnections: Map<string, WebSocket> = new Map(); 16 private rebalanceTimer?: number; 17 18 constructor( 19 baseURL: string, 20 paramLimits: Record<keyof T, number>, 21 onMessage: (msg: any) => void, 22 rebalanceIntervalMs: number = 60 * 60 * 1000, 23 ) { 24 this.baseURL = baseURL; 25 this.paramLimits = paramLimits; 26 this.onMessage = onMessage; 27 this.rebalanceInterval = Math.max(rebalanceIntervalMs, 30 * 60 * 1000); 28 29 this.subscriptions = Object.keys(paramLimits).reduce((acc, key) => { 30 acc[key as keyof T] = [] as any; 31 return acc; 32 }, {} as T); 33 } 34 35 public start(initialSubscriptions: T) { 36 console.log('[Manager] Starting with initial subscriptions...'); 37 for (const key in initialSubscriptions) { 38 this.subscriptions[key] = initialSubscriptions[key]; 39 } 40 41 this._rebalance(); 42 43 this.rebalanceTimer = setInterval(() => this._rebalance(), this.rebalanceInterval); 44 console.log(`[Manager] Rebalance job scheduled to run every ${this.rebalanceInterval / 1000 / 60} minutes.`); 45 } 46 47 public stop() { 48 console.log('[Manager] Stopping all connections...'); 49 if (this.rebalanceTimer) { 50 clearInterval(this.rebalanceTimer); 51 } 52 for (const ws of this.activeConnections.values()) { 53 ws.close(); 54 } 55 this.activeConnections.clear(); 56 console.log('[Manager] Stopped.'); 57 } 58 59 public add(newItems: Partial<T>) { 60 console.log('[Manager] Adding new items:', newItems); 61 let hasNewItems = false; 62 63 for (const key in newItems) { 64 const itemsToAdd = newItems[key as keyof T]; 65 if (itemsToAdd?.length) { 66 if (!this.subscriptions[key as keyof T]) { 67 this.subscriptions[key as keyof T] = [] as any; 68 } 69 70 const currentItems = new Set(this.subscriptions[key as keyof T]); 71 const newUniqueItems = itemsToAdd.filter(item => !currentItems.has(item)); 72 73 if (newUniqueItems.length > 0) { 74 (this.subscriptions[key as keyof T] as string[]).push(...newUniqueItems); 75 hasNewItems = true; 76 } 77 } 78 } 79 80 if (!hasNewItems) { 81 console.log('[Manager] No new unique items to add.'); 82 return; 83 } 84 85 console.log('[Manager] Creating temporary express connections for new items...'); 86 const shards = this._calculateShards(newItems as T); 87 for (const shard of shards) { 88 const onOpenHandler = (ws: WebSocket) => { 89 const shardId = getShardId(new URL(ws.url).searchParams); 90 this.activeConnections.set(shardId, ws); 91 }; 92 this._createConnection(shard, onOpenHandler); 93 } 94 } 95 96 private async _rebalance() { 97 console.log('[Manager] Starting rebalance/compaction cycle...'); 98 const oldConnections = new Map(this.activeConnections); 99 const newConnections: Map<string, WebSocket> = new Map(); 100 101 const newShards = this._calculateShards(this.subscriptions); 102 if (newShards.length === 0 && this.activeConnections.size > 0) { 103 console.log('[Manager] No subscriptions, shutting down all connections.'); 104 const timer = this.rebalanceTimer; 105 this.stop(); 106 this.rebalanceTimer = timer; 107 return; 108 } 109 110 const connectionPromises = newShards.map(shard => { 111 return new Promise<void>((resolve, reject) => { 112 const query = this._buildQuery(shard); 113 const shardId = getShardId(query); 114 115 if(oldConnections.has(shardId)) { 116 console.log(`[Manager] Re-using existing connection for shard: ${shardId}`); 117 newConnections.set(shardId, oldConnections.get(shardId)!); 118 oldConnections.delete(shardId); 119 resolve(); 120 return; 121 } 122 123 const onOpenHandler = (ws: WebSocket) => { 124 clearTimeout(timeout); 125 newConnections.set(shardId, ws); 126 resolve(); 127 }; 128 129 const ws = this._createConnection(shard, onOpenHandler); 130 131 const timeout = setTimeout(() => reject(new Error(`Connection timed out for ${ws.url}`)), 10000); 132 133 ws.onerror = (err) => { 134 clearTimeout(timeout); 135 reject(err); 136 }; 137 138 }); 139 }); 140 141 try { 142 await Promise.all(connectionPromises); 143 console.log('[Manager] All new connections are live.'); 144 145 this.activeConnections = newConnections; 146 147 console.log(`[Manager] Closing ${oldConnections.size} old/stale connections...`); 148 for (const [shardId, ws] of oldConnections.entries()) { 149 console.log(`[Manager] - Closing shard: ${shardId}`); 150 ws.close(); 151 } 152 console.log('[Manager] Rebalance cycle complete.'); 153 } catch (error) { 154 console.error('[Manager] Failed to establish new connections during rebalance. Aborting switchover.', error); 155 for(const ws of newConnections.values()) { 156 const url = new URL(ws.url); 157 if (!this.activeConnections.has(getShardId(url.searchParams))) { 158 ws.close(); 159 } 160 } 161 } 162 } 163 164 private _createConnection(shard: Partial<T>, onOpenHandler: (ws: WebSocket) => void): WebSocket { const query = this._buildQuery(shard); 165 const url = `${this.baseURL}?${query.toString()}`; 166 const ws = new WebSocket(url); 167 const shardId = getShardId(query); 168 169 ws.onopen = () => { 170 console.log(`[Manager] Shard connected: ${url}`); 171 onOpenHandler(ws); 172 }; 173 174 ws.onmessage = (e) => { 175 try { this.onMessage(JSON.parse(e.data)); } catch {} 176 }; 177 ws.onerror = (e) => console.error(`[Manager] Shard error: ${url}`, e); 178 ws.onclose = () => { 179 console.log(`[Manager] Shard disconnected: ${url}`); 180 this.activeConnections.delete(shardId); 181 }; 182 183 return ws; 184 } 185 186 187 private _buildQuery(params: Partial<T>): URLSearchParams { 188 const query = new URLSearchParams(); 189 for (const key in params) { 190 const values = params[key]; 191 if (values) { 192 for (const value of values) { 193 query.append(key, value); 194 } 195 } 196 } 197 return query; 198 } 199 200 private _calculateShards(params: T): Partial<T>[] { 201 const keys = Object.keys(params).filter(k => params[k as keyof T] && params[k as keyof T]!.length > 0) as (keyof T)[]; 202 if (keys.length === 0) return []; 203 204 const slicesPerKey = keys.map((key) => { 205 const values = params[key]! 206 const limit = this.paramLimits[key] ?? values.length; 207 const chunks: string[][] = []; 208 for (let i = 0; i < values.length; i += limit) { 209 chunks.push(values.slice(i, i + limit)); 210 } 211 return { key, chunks: chunks.length ? chunks : [[]] }; 212 }); 213 214 const out: Partial<T>[] = []; 215 function recurse(index = 0, acc: Partial<T> = {}) { 216 if (index === slicesPerKey.length) { 217 if (Object.values(acc).some(v => v.length > 0)) { 218 out.push({ ...acc }); 219 } 220 return; 221 } 222 223 const { key, chunks } = slicesPerKey[index]; 224 for (const chunk of chunks) { 225 acc[key] = chunk as any; 226 recurse(index + 1, acc); 227 } 228 } 229 230 recurse(); 231 return out; 232 } 233} 234 235interface JetstreamParams { 236 [key: string]: string[]; 237 wantedDids: string[]; 238 wantedCollections: string[]; 239} 240interface SpacedustParams { 241 [key: string]: string[]; 242 wantedSubjects: string[]; 243 wantedSubjectDids: string[]; 244 wantedSources: string[]; 245 instant: string[]; 246} 247 248export class JetstreamManager extends ShardedConnectionManager<JetstreamParams> { 249 constructor(onMessage: (msg: any) => void) { 250 super( 251 `${config.jetstream}/subscribe`, 252 { wantedDids: 10000, wantedCollections: 100 }, 253 onMessage 254 ); 255 } 256} 257 258export class SpacedustManager extends ShardedConnectionManager<SpacedustParams> { 259 constructor(onMessage: (msg: any) => void) { 260 super( 261 `${config.spacedust}/subscribe`, 262 { wantedSubjects: 100, wantedSubjectDids: 100, wantedSources: 100 }, 263 onMessage 264 ); 265 } 266}