A decentralized music tracking and discovery platform built on AT Protocol ๐ŸŽต

Batch events for WebSocket streaming

Send events as JSON arrays to improve throughput (batch size 50).
Increase PAGE_SIZE to 500 and send queued events in batches.
Update client to handle batched messages and reduce ping interval
to 30s. Adjust progress/logging to report events every 500.

+57 -39
+16 -5
tap/scripts/test-client.ts
··· 23 console.log("๐Ÿ“ค Sending ping..."); 24 ws.send("ping"); 25 26 - // Send ping every 5 seconds 27 setInterval(() => { 28 if (ws.readyState === WebSocket.OPEN) { 29 const now = Date.now(); ··· 33 ); 34 ws.send("ping"); 35 } 36 - }, 5000); 37 }; 38 39 ws.onmessage = async (event) => { ··· 43 44 try { 45 const data = JSON.parse(event.data); 46 - if (data.type === "connected") { 47 console.log(`๐Ÿ“จ [${elapsed}s] Connection confirmed: ${data.message}`); 48 } else if (data.type === "heartbeat") { 49 console.log(`๐Ÿ’“ [${elapsed}s] Heartbeat received`); ··· 59 console.log(`๐Ÿ“จ [${elapsed}s] Message #${messageCount}: ${event.data}`); 60 } 61 62 - if (messageCount % 100 === 0) { 63 const rate = (messageCount / parseFloat(elapsed)).toFixed(2); 64 console.log( 65 - `๐Ÿ“Š Progress: ${messageCount} messages received in ${elapsed}s (${rate} msg/s)`, 66 ); 67 } 68
··· 23 console.log("๐Ÿ“ค Sending ping..."); 24 ws.send("ping"); 25 26 + // Send ping every 30 seconds (less frequent to not interfere with fast streaming) 27 setInterval(() => { 28 if (ws.readyState === WebSocket.OPEN) { 29 const now = Date.now(); ··· 33 ); 34 ws.send("ping"); 35 } 36 + }, 30000); 37 }; 38 39 ws.onmessage = async (event) => { ··· 43 44 try { 45 const data = JSON.parse(event.data); 46 + 47 + // Handle batched messages (array of events) 48 + if (Array.isArray(data)) { 49 + messageCount += data.length; 50 + if (messageCount % 500 === 0 || messageCount <= 50) { 51 + console.log( 52 + `๐Ÿ“จ [${elapsed}s] Batch received: ${data.length} events (total: ${messageCount})`, 53 + ); 54 + } 55 + } 56 + // Handle single messages 57 + else if (data.type === "connected") { 58 console.log(`๐Ÿ“จ [${elapsed}s] Connection confirmed: ${data.message}`); 59 } else if (data.type === "heartbeat") { 60 console.log(`๐Ÿ’“ [${elapsed}s] Heartbeat received`); ··· 70 console.log(`๐Ÿ“จ [${elapsed}s] Message #${messageCount}: ${event.data}`); 71 } 72 73 + if (messageCount % 500 === 0) { 74 const rate = (messageCount / parseFloat(elapsed)).toFixed(2); 75 console.log( 76 + `๐Ÿ“Š Progress: ${messageCount} events received in ${elapsed}s (${rate} events/s)`, 77 ); 78 } 79
+41 -34
tap/src/main.ts
··· 2 import logger from "./logger.ts"; 3 import schema from "./schema/mod.ts"; 4 import { asc, inArray } from "drizzle-orm"; 5 - import { omit } from "@es-toolkit/es-toolkit/compat"; 6 import type { SelectEvent } from "./schema/event.ts"; 7 import { assureAdminAuth, parseTapEvent } from "@atproto/tap"; 8 import { addToBatch, flushBatch } from "./batch.ts"; 9 10 - const PAGE_SIZE = 100; 11 - const YIELD_EVERY_N_PAGES = 5; 12 - const YIELD_DELAY_MS = 100; 13 const ADMIN_PASSWORD = Deno.env.get("TAP_ADMIN_PASSWORD")!; 14 15 interface ClientState { ··· 43 return false; 44 } 45 46 export function broadcastEvent(evt: SelectEvent) { 47 - const message = JSON.stringify({ 48 - ...omit(evt, "createdAt", "record"), 49 - ...(evt.record && { 50 - record: JSON.parse(evt.record), 51 - }), 52 - }); 53 54 for (const [socket, state] of connectedClients.entries()) { 55 if (socket.readyState === WebSocket.OPEN) { ··· 201 logger.info`๐Ÿ“„ Fetching page ${page}... (${totalEvents} events sent so far)`; 202 } 203 204 for (let i = 0; i < events.length; i++) { 205 const evt = events[i]; 206 ··· 209 return; 210 } 211 212 - const success = safeSend( 213 - socket, 214 - JSON.stringify({ 215 - ...omit(evt, "createdAt", "record"), 216 - ...(evt.record && { 217 - record: JSON.parse(evt.record), 218 - }), 219 - }), 220 - totalEvents, 221 - ); 222 223 - if (success) { 224 - totalEvents++; 225 - } else { 226 - logger.error`โŒ Failed to send event at index ${totalEvents}, stopping pagination`; 227 - return; 228 } 229 } 230 ··· 247 if (queuedCount > 0) { 248 logger.info`๐Ÿ“ฆ Sending ${queuedCount} queued events...`; 249 250 for (const evt of clientState.queue) { 251 if (socket.readyState !== WebSocket.OPEN) break; 252 253 - safeSend( 254 - socket, 255 - JSON.stringify({ 256 - ...omit(evt, "createdAt", "record"), 257 - ...(evt.record && { 258 - record: JSON.parse(evt.record), 259 - }), 260 - }), 261 - ); 262 } 263 264 clientState.queue = [];
··· 2 import logger from "./logger.ts"; 3 import schema from "./schema/mod.ts"; 4 import { asc, inArray } from "drizzle-orm"; 5 import type { SelectEvent } from "./schema/event.ts"; 6 import { assureAdminAuth, parseTapEvent } from "@atproto/tap"; 7 import { addToBatch, flushBatch } from "./batch.ts"; 8 9 + const PAGE_SIZE = 500; 10 + const BATCH_SEND_SIZE = 50; 11 const ADMIN_PASSWORD = Deno.env.get("TAP_ADMIN_PASSWORD")!; 12 13 interface ClientState { ··· 41 return false; 42 } 43 44 + function formatEvent(evt: SelectEvent): string { 45 + const { createdAt: _createdAt, record, ...rest } = evt; 46 + if (record) { 47 + return JSON.stringify({ ...rest, record: JSON.parse(record) }); 48 + } 49 + return JSON.stringify(rest); 50 + } 51 + 52 export function broadcastEvent(evt: SelectEvent) { 53 + const message = formatEvent(evt); 54 55 for (const [socket, state] of connectedClients.entries()) { 56 if (socket.readyState === WebSocket.OPEN) { ··· 202 logger.info`๐Ÿ“„ Fetching page ${page}... (${totalEvents} events sent so far)`; 203 } 204 205 + // Batch send events for better performance 206 + const batchMessages: string[] = []; 207 for (let i = 0; i < events.length; i++) { 208 const evt = events[i]; 209 ··· 212 return; 213 } 214 215 + batchMessages.push(formatEvent(evt)); 216 + 217 + // Send batch when full or at end of page 218 + if ( 219 + batchMessages.length >= BATCH_SEND_SIZE || 220 + i === events.length - 1 221 + ) { 222 + const batchMessage = `[${batchMessages.join(",")}]`; 223 + const success = safeSend(socket, batchMessage, totalEvents); 224 225 + if (success) { 226 + totalEvents += batchMessages.length; 227 + batchMessages.length = 0; // Clear batch 228 + } else { 229 + logger.error`โŒ Failed to send batch at ${totalEvents}, stopping pagination`; 230 + return; 231 + } 232 } 233 } 234 ··· 251 if (queuedCount > 0) { 252 logger.info`๐Ÿ“ฆ Sending ${queuedCount} queued events...`; 253 254 + // Batch send queued events 255 + const queueMessages: string[] = []; 256 for (const evt of clientState.queue) { 257 if (socket.readyState !== WebSocket.OPEN) break; 258 259 + queueMessages.push(formatEvent(evt)); 260 + 261 + if (queueMessages.length >= BATCH_SEND_SIZE) { 262 + safeSend(socket, `[${queueMessages.join(",")}]`); 263 + queueMessages.length = 0; 264 + } 265 + } 266 + 267 + if (queueMessages.length > 0) { 268 + safeSend(socket, `[${queueMessages.join(",")}]`); 269 } 270 271 clientState.queue = [];