A decentralized music tracking and discovery platform built on AT Protocol 🎵

Support per-client DID filters for WS stream

Accept a comma-separated "dids" query param on WebSocket connect and
store per-client DID lists. Apply the DID filter to the initial
paginated query (using drizzle-orm inArray) and to broadcastEvent so
clients only receive matching events. Also fix bufferedAmount typing in
backpressure checks, reduce PAGE_DELAY_MS to 3ms, and remove the
periodic heartbeat timer.

+35 -19
+35 -19
tap/src/main.ts
··· 2 2 import logger from "./logger.ts"; 3 3 import connectToTap from "./tap.ts"; 4 4 import schema from "./schema/mod.ts"; 5 - import { asc } from "drizzle-orm"; 5 + import { asc, inArray } from "drizzle-orm"; 6 6 import { omit } from "@es-toolkit/es-toolkit/compat"; 7 7 import type { SelectEvent } from "./schema/event.ts"; 8 8 9 9 const PAGE_SIZE = 50; 10 - const PAGE_DELAY_MS = 10; 10 + const PAGE_DELAY_MS = 3; 11 11 const YIELD_EVERY_N_PAGES = 1; // Yield after every page 12 - const MAX_BUFFER_SIZE = 64 * 1024; // 64KB buffer limit (more conservative) 12 + const MAX_BUFFER_SIZE = 64 * 1024; // 64KB buffer limit 13 13 const BACKPRESSURE_CHECK_INTERVAL = 10; // Check every 10 events 14 14 const MESSAGE_DELAY_MS = 1; // Add tiny delay between messages 15 15 const VERBOSE_LOGGING = false; // Set to true for detailed message tracking ··· 18 18 socket: WebSocket; 19 19 isPaginating: boolean; 20 20 queue: SelectEvent[]; 21 + dids?: string[]; 21 22 } 22 23 23 24 const connectedClients = new Map<WebSocket, ClientState>(); 24 25 25 - // Helper function to safely send message with error handling 26 26 function safeSend( 27 27 socket: WebSocket, 28 28 message: string, ··· 50 50 } 51 51 52 52 async function waitForBackpressure(socket: WebSocket): Promise<void> { 53 - const bufferedAmount = (socket as any).bufferedAmount; 53 + const bufferedAmount = (socket as unknown as { bufferedAmount?: number }) 54 + .bufferedAmount; 54 55 if (bufferedAmount && bufferedAmount > MAX_BUFFER_SIZE) { 55 56 logger.info`⏸️ Backpressure detected (${bufferedAmount} bytes buffered), waiting...`; 56 57 // Wait for buffer to drain ··· 68 69 69 70 for (const [socket, state] of connectedClients.entries()) { 70 71 if (socket.readyState === WebSocket.OPEN) { 72 + if ( 73 + state.dids && 74 + state.dids.length > 0 && 75 + !state.dids.includes(evt.did) 76 + ) { 77 + continue; // Skip events not matching the DID filter 78 + } 79 + 71 80 if (state.isPaginating) { 72 81 state.queue.push(evt); 73 82 } else { ··· 86 95 87 96 const { socket, response } = Deno.upgradeWebSocket(req); 88 97 98 + const url = new URL(req.url); 99 + const didsParam = url.searchParams.get("dids"); 100 + const dids = didsParam 101 + ? didsParam 102 + .split(",") 103 + .map((d) => d.trim()) 104 + .filter((d) => d.length > 0) 105 + : undefined; 106 + 89 107 socket.addEventListener("open", () => { 90 108 logger.info`✅ Client connected! Socket state: ${socket.readyState}`; 109 + if (dids && dids.length > 0) { 110 + logger.info`🔍 Filtering by DIDs: ${dids.join(", ")}`; 111 + } 91 112 92 113 connectedClients.set(socket, { 93 114 socket, 94 115 isPaginating: true, 95 116 queue: [], 117 + dids, 96 118 }); 97 119 98 120 safeSend( ··· 104 126 ); 105 127 logger.info`📤 Sent connection confirmation`; 106 128 107 - const heartbeatInterval = setInterval(() => { 108 - if (socket.readyState === WebSocket.OPEN) { 109 - safeSend( 110 - socket, 111 - JSON.stringify({ type: "heartbeat", timestamp: Date.now() }), 112 - ); 113 - } else { 114 - clearInterval(heartbeatInterval); 115 - } 116 - }, 10000); 117 - 118 129 (async () => { 119 130 try { 120 131 let page = 0; ··· 136 147 } 137 148 138 149 while (hasMore && socket.readyState === WebSocket.OPEN) { 139 - const events = await ctx.db 140 - .select() 141 - .from(schema.events) 150 + let query = ctx.db.select().from(schema.events).$dynamic(); 151 + 152 + // Apply DID filter if specified 153 + if (dids && dids.length > 0) { 154 + query = query.where(inArray(schema.events.did, dids)); 155 + } 156 + 157 + const events = await query 142 158 .orderBy(asc(schema.events.createdAt)) 143 159 .offset(page * PAGE_SIZE) 144 160 .limit(PAGE_SIZE)