an attempt to make a lightweight, easily self-hostable, scoped bluesky appview
1import { config } from "../config.ts";
2
3function getShardId(params: URLSearchParams): string {
4 params.sort();
5 return params.toString();
6}
7
8class ShardedConnectionManager<T extends Record<string, string[]>> {
9 private baseURL: string;
10 private paramLimits: Record<keyof T, number>;
11 private onMessage: (msg: any) => void;
12 private rebalanceInterval: number;
13
14 private subscriptions: T;
15 private activeConnections: Map<string, WebSocket> = new Map();
16 private rebalanceTimer?: number;
17
18 constructor(
19 baseURL: string,
20 paramLimits: Record<keyof T, number>,
21 onMessage: (msg: any) => void,
22 rebalanceIntervalMs: number = 60 * 60 * 1000,
23 ) {
24 this.baseURL = baseURL;
25 this.paramLimits = paramLimits;
26 this.onMessage = onMessage;
27 this.rebalanceInterval = Math.max(rebalanceIntervalMs, 30 * 60 * 1000);
28
29 this.subscriptions = Object.keys(paramLimits).reduce((acc, key) => {
30 acc[key as keyof T] = [] as any;
31 return acc;
32 }, {} as T);
33 }
34
35 public start(initialSubscriptions: T) {
36 console.log('[Manager] Starting with initial subscriptions...');
37 for (const key in initialSubscriptions) {
38 this.subscriptions[key] = initialSubscriptions[key];
39 }
40
41 this._rebalance();
42
43 this.rebalanceTimer = setInterval(() => this._rebalance(), this.rebalanceInterval);
44 console.log(`[Manager] Rebalance job scheduled to run every ${this.rebalanceInterval / 1000 / 60} minutes.`);
45 }
46
47 public stop() {
48 console.log('[Manager] Stopping all connections...');
49 if (this.rebalanceTimer) {
50 clearInterval(this.rebalanceTimer);
51 }
52 for (const ws of this.activeConnections.values()) {
53 ws.close();
54 }
55 this.activeConnections.clear();
56 console.log('[Manager] Stopped.');
57 }
58
59 public add(newItems: Partial<T>) {
60 console.log('[Manager] Adding new items:', newItems);
61 let hasNewItems = false;
62
63 for (const key in newItems) {
64 const itemsToAdd = newItems[key as keyof T];
65 if (itemsToAdd?.length) {
66 if (!this.subscriptions[key as keyof T]) {
67 this.subscriptions[key as keyof T] = [] as any;
68 }
69
70 const currentItems = new Set(this.subscriptions[key as keyof T]);
71 const newUniqueItems = itemsToAdd.filter(item => !currentItems.has(item));
72
73 if (newUniqueItems.length > 0) {
74 (this.subscriptions[key as keyof T] as string[]).push(...newUniqueItems);
75 hasNewItems = true;
76 }
77 }
78 }
79
80 if (!hasNewItems) {
81 console.log('[Manager] No new unique items to add.');
82 return;
83 }
84
85 console.log('[Manager] Creating temporary express connections for new items...');
86 const shards = this._calculateShards(newItems as T);
87 for (const shard of shards) {
88 const onOpenHandler = (ws: WebSocket) => {
89 const shardId = getShardId(new URL(ws.url).searchParams);
90 this.activeConnections.set(shardId, ws);
91 };
92 this._createConnection(shard, onOpenHandler);
93 }
94 }
95
96 private async _rebalance() {
97 console.log('[Manager] Starting rebalance/compaction cycle...');
98 const oldConnections = new Map(this.activeConnections);
99 const newConnections: Map<string, WebSocket> = new Map();
100
101 const newShards = this._calculateShards(this.subscriptions);
102 if (newShards.length === 0 && this.activeConnections.size > 0) {
103 console.log('[Manager] No subscriptions, shutting down all connections.');
104 const timer = this.rebalanceTimer;
105 this.stop();
106 this.rebalanceTimer = timer;
107 return;
108 }
109
110 const connectionPromises = newShards.map(shard => {
111 return new Promise<void>((resolve, reject) => {
112 const query = this._buildQuery(shard);
113 const shardId = getShardId(query);
114
115 if(oldConnections.has(shardId)) {
116 console.log(`[Manager] Re-using existing connection for shard: ${shardId}`);
117 newConnections.set(shardId, oldConnections.get(shardId)!);
118 oldConnections.delete(shardId);
119 resolve();
120 return;
121 }
122
123 const onOpenHandler = (ws: WebSocket) => {
124 clearTimeout(timeout);
125 newConnections.set(shardId, ws);
126 resolve();
127 };
128
129 const ws = this._createConnection(shard, onOpenHandler);
130
131 const timeout = setTimeout(() => reject(new Error(`Connection timed out for ${ws.url}`)), 10000);
132
133 ws.onerror = (err) => {
134 clearTimeout(timeout);
135 reject(err);
136 };
137
138 });
139 });
140
141 try {
142 await Promise.all(connectionPromises);
143 console.log('[Manager] All new connections are live.');
144
145 this.activeConnections = newConnections;
146
147 console.log(`[Manager] Closing ${oldConnections.size} old/stale connections...`);
148 for (const [shardId, ws] of oldConnections.entries()) {
149 console.log(`[Manager] - Closing shard: ${shardId}`);
150 ws.close();
151 }
152 console.log('[Manager] Rebalance cycle complete.');
153 } catch (error) {
154 console.error('[Manager] Failed to establish new connections during rebalance. Aborting switchover.', error);
155 for(const ws of newConnections.values()) {
156 const url = new URL(ws.url);
157 if (!this.activeConnections.has(getShardId(url.searchParams))) {
158 ws.close();
159 }
160 }
161 }
162 }
163
164 private _createConnection(shard: Partial<T>, onOpenHandler: (ws: WebSocket) => void): WebSocket { const query = this._buildQuery(shard);
165 const url = `${this.baseURL}?${query.toString()}`;
166 const ws = new WebSocket(url);
167 const shardId = getShardId(query);
168
169 ws.onopen = () => {
170 console.log(`[Manager] Shard connected: ${url}`);
171 onOpenHandler(ws);
172 };
173
174 ws.onmessage = (e) => {
175 try { this.onMessage(JSON.parse(e.data)); } catch {}
176 };
177 ws.onerror = (e) => console.error(`[Manager] Shard error: ${url}`, e);
178 ws.onclose = () => {
179 console.log(`[Manager] Shard disconnected: ${url}`);
180 this.activeConnections.delete(shardId);
181 };
182
183 return ws;
184 }
185
186
187 private _buildQuery(params: Partial<T>): URLSearchParams {
188 const query = new URLSearchParams();
189 for (const key in params) {
190 const values = params[key];
191 if (values) {
192 for (const value of values) {
193 query.append(key, value);
194 }
195 }
196 }
197 return query;
198 }
199
200 private _calculateShards(params: T): Partial<T>[] {
201 const keys = Object.keys(params).filter(k => params[k as keyof T] && params[k as keyof T]!.length > 0) as (keyof T)[];
202 if (keys.length === 0) return [];
203
204 const slicesPerKey = keys.map((key) => {
205 const values = params[key]!
206 const limit = this.paramLimits[key] ?? values.length;
207 const chunks: string[][] = [];
208 for (let i = 0; i < values.length; i += limit) {
209 chunks.push(values.slice(i, i + limit));
210 }
211 return { key, chunks: chunks.length ? chunks : [[]] };
212 });
213
214 const out: Partial<T>[] = [];
215 function recurse(index = 0, acc: Partial<T> = {}) {
216 if (index === slicesPerKey.length) {
217 if (Object.values(acc).some(v => v.length > 0)) {
218 out.push({ ...acc });
219 }
220 return;
221 }
222
223 const { key, chunks } = slicesPerKey[index];
224 for (const chunk of chunks) {
225 acc[key] = chunk as any;
226 recurse(index + 1, acc);
227 }
228 }
229
230 recurse();
231 return out;
232 }
233}
234
235interface JetstreamParams {
236 [key: string]: string[];
237 wantedDids: string[];
238 wantedCollections: string[];
239}
240interface SpacedustParams {
241 [key: string]: string[];
242 wantedSubjects: string[];
243 wantedSubjectDids: string[];
244 wantedSources: string[];
245 instant: string[];
246}
247
248export class JetstreamManager extends ShardedConnectionManager<JetstreamParams> {
249 constructor(onMessage: (msg: any) => void) {
250 super(
251 `${config.jetstream}/subscribe`,
252 { wantedDids: 10000, wantedCollections: 100 },
253 onMessage
254 );
255 }
256}
257
258export class SpacedustManager extends ShardedConnectionManager<SpacedustParams> {
259 constructor(onMessage: (msg: any) => void) {
260 super(
261 `${config.spacedust}/subscribe`,
262 { wantedSubjects: 100, wantedSubjectDids: 100, wantedSources: 100 },
263 onMessage
264 );
265 }
266}