A decentralized music tracking and discovery platform built on AT Protocol 🎵 rocksky.app
spotify atproto lastfm musicbrainz scrobbling listenbrainz

Make Deno.serve handler async and parse POST JSON

Add pagination error handling and logging, ensure client paginating
state is cleared and errors are sent to clients. Minor refactor and
formatting cleanup.

+220 -215
+220 -215
tap/src/main.ts
··· 70 70 } 71 71 } 72 72 73 - Deno.serve({ port: parseInt(Deno.env.get("WS_PORT") || "2481") }, (req) => { 74 - if (req.method === "POST") { 75 - try { 76 - assureAdminAuth(ADMIN_PASSWORD, req.headers.get("authorization")!); 77 - } catch { 78 - logger.warn`Unauthorized access attempt ${req.headers.get("authorization")}`; 79 - return new Response(null, { status: 401 }); 80 - } 81 - const evt = parseTapEvent(req.body); 82 - switch (evt.type) { 83 - case "identity": { 84 - addToBatch({ 85 - id: evt.id, 86 - type: evt.type, 87 - did: evt.did, 88 - handle: evt.handle, 89 - status: evt.status, 90 - isActive: evt.isActive, 91 - action: null, 92 - rev: null, 93 - collection: null, 94 - rkey: null, 95 - record: null, 96 - cid: null, 97 - live: null, 98 - }); 99 - logger.info`New identity: ${evt.did} ${evt.handle} ${evt.status}`; 100 - break; 73 + Deno.serve( 74 + { port: parseInt(Deno.env.get("WS_PORT") || "2481") }, 75 + async (req) => { 76 + if (req.method === "POST") { 77 + try { 78 + assureAdminAuth(ADMIN_PASSWORD, req.headers.get("authorization")!); 79 + } catch { 80 + logger.warn`Unauthorized access attempt ${req.headers.get("authorization")}`; 81 + return new Response(null, { status: 401 }); 101 82 } 102 - case "record": { 103 - addToBatch({ 104 - id: evt.id, 105 - type: evt.type, 106 - action: evt.action, 107 - did: evt.did, 108 - rev: evt.rev, 109 - collection: evt.collection, 110 - rkey: evt.rkey, 111 - record: JSON.stringify(evt.record), 112 - cid: evt.cid, 113 - live: evt.live, 114 - handle: null, 115 - status: null, 116 - isActive: null, 117 - }); 118 - const uri = `at://${evt.did}/${evt.collection}/${evt.rkey}`; 119 - logger.info`New record: ${uri}`; 120 - break; 83 + const evt = parseTapEvent(await req.json()); 84 + switch (evt.type) { 85 + case "identity": { 86 + addToBatch({ 87 + id: evt.id, 88 + type: evt.type, 89 + did: evt.did, 90 + handle: evt.handle, 91 + status: evt.status, 92 + isActive: evt.isActive, 93 + action: null, 94 + rev: null, 95 + collection: null, 96 + rkey: null, 97 + record: null, 98 + cid: null, 99 + live: null, 100 + }); 101 + logger.info`New identity: ${evt.did} ${evt.handle} ${evt.status}`; 102 + break; 103 + } 104 + case "record": { 105 + addToBatch({ 106 + id: evt.id, 107 + type: evt.type, 108 + action: evt.action, 109 + did: evt.did, 110 + rev: evt.rev, 111 + collection: evt.collection, 112 + rkey: evt.rkey, 113 + record: JSON.stringify(evt.record), 114 + cid: evt.cid, 115 + live: evt.live, 116 + handle: null, 117 + status: null, 118 + isActive: null, 119 + }); 120 + const uri = `at://${evt.did}/${evt.collection}/${evt.rkey}`; 121 + logger.info`New record: ${uri}`; 122 + break; 123 + } 121 124 } 122 - } 123 125 124 - return new Response(null, { status: 200 }); 125 - } 126 - 127 - if (req.headers.get("upgrade") != "websocket") { 128 - return new Response(null, { status: 426 }); 129 - } 130 - 131 - const { socket, response } = Deno.upgradeWebSocket(req); 132 - 133 - const url = new URL(req.url); 134 - const didsParam = url.searchParams.get("dids"); 135 - const dids = didsParam 136 - ? didsParam 137 - .split(",") 138 - .map((d) => d.trim()) 139 - .filter((d) => d.length > 0) 140 - : undefined; 126 + return new Response(null, { status: 200 }); 127 + } 141 128 142 - socket.addEventListener("open", () => { 143 - logger.info`✅ Client connected! Socket state: ${socket.readyState}`; 144 - if (dids && dids.length > 0) { 145 - logger.info`🔍 Filtering by DIDs: ${dids.join(", ")}`; 129 + if (req.headers.get("upgrade") != "websocket") { 130 + return new Response(null, { status: 426 }); 146 131 } 147 132 148 - connectedClients.set(socket, { 149 - socket, 150 - isPaginating: true, 151 - queue: [], 152 - dids, 153 - }); 133 + const { socket, response } = Deno.upgradeWebSocket(req); 134 + 135 + const url = new URL(req.url); 136 + const didsParam = url.searchParams.get("dids"); 137 + const dids = didsParam 138 + ? didsParam 139 + .split(",") 140 + .map((d) => d.trim()) 141 + .filter((d) => d.length > 0) 142 + : undefined; 154 143 155 - safeSend( 156 - socket, 157 - JSON.stringify({ 158 - type: "connected", 159 - message: "Ready to stream events", 160 - }), 161 - ); 162 - logger.info`📤 Sent connection confirmation`; 144 + socket.addEventListener("open", () => { 145 + logger.info`✅ Client connected! Socket state: ${socket.readyState}`; 146 + if (dids && dids.length > 0) { 147 + logger.info`🔍 Filtering by DIDs: ${dids.join(", ")}`; 148 + } 163 149 164 - (async () => { 165 - try { 166 - let page = 0; 167 - let hasMore = true; 168 - let totalEvents = 0; 150 + connectedClients.set(socket, { 151 + socket, 152 + isPaginating: true, 153 + queue: [], 154 + dids, 155 + }); 169 156 170 - logger.info`📖 Starting pagination...`; 157 + safeSend( 158 + socket, 159 + JSON.stringify({ 160 + type: "connected", 161 + message: "Ready to stream events", 162 + }), 163 + ); 164 + logger.info`📤 Sent connection confirmation`; 171 165 166 + (async () => { 172 167 try { 173 - const testQuery = await ctx.db 174 - .select() 175 - .from(schema.events) 176 - .limit(1) 177 - .execute(); 178 - logger.info`✅ Database test query successful, found ${testQuery.length} sample event(s)`; 179 - } catch (dbError) { 180 - logger.error`❌ Database test query failed: ${dbError}`; 181 - throw dbError; 182 - } 168 + let page = 0; 169 + let hasMore = true; 170 + let totalEvents = 0; 183 171 184 - while (hasMore && socket.readyState === WebSocket.OPEN) { 185 - let query = ctx.db.select().from(schema.events).$dynamic(); 172 + logger.info`📖 Starting pagination...`; 186 173 187 - // Apply DID filter if specified 188 - if (dids && dids.length > 0) { 189 - query = query.where(inArray(schema.events.did, dids)); 174 + try { 175 + const testQuery = await ctx.db 176 + .select() 177 + .from(schema.events) 178 + .limit(1) 179 + .execute(); 180 + logger.info`✅ Database test query successful, found ${testQuery.length} sample event(s)`; 181 + } catch (dbError) { 182 + logger.error`❌ Database test query failed: ${dbError}`; 183 + throw dbError; 190 184 } 191 185 192 - const events = await query 193 - .orderBy(asc(schema.events.createdAt)) 194 - .offset(page * PAGE_SIZE) 195 - .limit(PAGE_SIZE) 196 - .execute(); 186 + while (hasMore && socket.readyState === WebSocket.OPEN) { 187 + let query = ctx.db.select().from(schema.events).$dynamic(); 197 188 198 - if (page % 10 === 0) { 199 - logger.info`📄 Fetching page ${page}... (${totalEvents} events sent so far)`; 200 - } 189 + // Apply DID filter if specified 190 + if (dids && dids.length > 0) { 191 + query = query.where(inArray(schema.events.did, dids)); 192 + } 201 193 202 - for (let i = 0; i < events.length; i++) { 203 - const evt = events[i]; 194 + const events = await query 195 + .orderBy(asc(schema.events.createdAt)) 196 + .offset(page * PAGE_SIZE) 197 + .limit(PAGE_SIZE) 198 + .execute(); 204 199 205 - if (socket.readyState !== WebSocket.OPEN) { 206 - logger.info`⚠️ Socket closed during pagination at event ${totalEvents}`; 207 - return; 200 + if (page % 10 === 0) { 201 + logger.info`📄 Fetching page ${page}... (${totalEvents} events sent so far)`; 208 202 } 209 203 210 - const success = safeSend( 211 - socket, 212 - JSON.stringify({ 213 - ...omit(evt, "createdAt", "record"), 214 - ...(evt.record && { 215 - record: JSON.parse(evt.record), 204 + for (let i = 0; i < events.length; i++) { 205 + const evt = events[i]; 206 + 207 + if (socket.readyState !== WebSocket.OPEN) { 208 + logger.info`⚠️ Socket closed during pagination at event ${totalEvents}`; 209 + return; 210 + } 211 + 212 + const success = safeSend( 213 + socket, 214 + JSON.stringify({ 215 + ...omit(evt, "createdAt", "record"), 216 + ...(evt.record && { 217 + record: JSON.parse(evt.record), 218 + }), 216 219 }), 217 - }), 218 - totalEvents, 219 - ); 220 + totalEvents, 221 + ); 220 222 221 - if (success) { 222 - totalEvents++; 223 - } else { 224 - logger.error`❌ Failed to send event at index ${totalEvents}, stopping pagination`; 225 - return; 223 + if (success) { 224 + totalEvents++; 225 + } else { 226 + logger.error`❌ Failed to send event at index ${totalEvents}, stopping pagination`; 227 + return; 228 + } 226 229 } 227 - } 228 230 229 - hasMore = events.length === PAGE_SIZE; 230 - page++; 231 + hasMore = events.length === PAGE_SIZE; 232 + page++; 231 233 232 - if (hasMore && page % YIELD_EVERY_N_PAGES === 0) { 233 - await new Promise((resolve) => setTimeout(resolve, YIELD_DELAY_MS)); 234 + if (hasMore && page % YIELD_EVERY_N_PAGES === 0) { 235 + await new Promise((resolve) => 236 + setTimeout(resolve, YIELD_DELAY_MS), 237 + ); 238 + } 234 239 } 235 - } 236 240 237 - logger.info`📤 Sent all historical events: ${totalEvents} total (${page} pages)`; 241 + logger.info`📤 Sent all historical events: ${totalEvents} total (${page} pages)`; 238 242 239 - const clientState = connectedClients.get(socket); 240 - if (clientState && socket.readyState === WebSocket.OPEN) { 241 - const queuedCount = clientState.queue.length; 243 + const clientState = connectedClients.get(socket); 244 + if (clientState && socket.readyState === WebSocket.OPEN) { 245 + const queuedCount = clientState.queue.length; 242 246 243 - if (queuedCount > 0) { 244 - logger.info`📦 Sending ${queuedCount} queued events...`; 247 + if (queuedCount > 0) { 248 + logger.info`📦 Sending ${queuedCount} queued events...`; 245 249 246 - for (const evt of clientState.queue) { 247 - if (socket.readyState !== WebSocket.OPEN) break; 250 + for (const evt of clientState.queue) { 251 + if (socket.readyState !== WebSocket.OPEN) break; 248 252 249 - safeSend( 250 - socket, 251 - JSON.stringify({ 252 - ...omit(evt, "createdAt", "record"), 253 - ...(evt.record && { 254 - record: JSON.parse(evt.record), 253 + safeSend( 254 + socket, 255 + JSON.stringify({ 256 + ...omit(evt, "createdAt", "record"), 257 + ...(evt.record && { 258 + record: JSON.parse(evt.record), 259 + }), 255 260 }), 256 - }), 257 - ); 261 + ); 262 + } 263 + 264 + clientState.queue = []; 258 265 } 259 266 260 - clientState.queue = []; 267 + clientState.isPaginating = false; 268 + logger.info`🔄 Now streaming real-time events...`; 261 269 } 270 + } catch (error) { 271 + logger.error`Pagination error: ${error}`; 272 + logger.error`Stack: ${error instanceof Error ? error.stack : ""}`; 273 + 274 + safeSend( 275 + socket, 276 + JSON.stringify({ 277 + type: "error", 278 + message: "Failed to load historical events", 279 + }), 280 + ); 262 281 263 - clientState.isPaginating = false; 264 - logger.info`🔄 Now streaming real-time events...`; 282 + const clientState = connectedClients.get(socket); 283 + if (clientState) { 284 + clientState.isPaginating = false; 285 + } 265 286 } 266 - } catch (error) { 267 - logger.error`Pagination error: ${error}`; 268 - logger.error`Stack: ${error instanceof Error ? error.stack : ""}`; 287 + })().catch((err) => { 288 + logger.error`Unhandled error in pagination loop: ${err}`; 289 + logger.error`Stack: ${err instanceof Error ? err.stack : ""}`; 290 + }); 291 + }); 269 292 270 - safeSend( 271 - socket, 272 - JSON.stringify({ 273 - type: "error", 274 - message: "Failed to load historical events", 275 - }), 276 - ); 277 - 278 - const clientState = connectedClients.get(socket); 279 - if (clientState) { 280 - clientState.isPaginating = false; 293 + socket.addEventListener("message", (event) => { 294 + try { 295 + if (event.data === "ping") { 296 + safeSend(socket, "pong"); 281 297 } 298 + } catch (error) { 299 + logger.error`Error handling message: ${error}`; 282 300 } 283 - })().catch((err) => { 284 - logger.error`Unhandled error in pagination loop: ${err}`; 285 - logger.error`Stack: ${err instanceof Error ? err.stack : ""}`; 286 301 }); 287 - }); 288 302 289 - socket.addEventListener("message", (event) => { 290 - try { 291 - if (event.data === "ping") { 292 - safeSend(socket, "pong"); 293 - } 294 - } catch (error) { 295 - logger.error`Error handling message: ${error}`; 296 - } 297 - }); 303 + socket.addEventListener("close", (event) => { 304 + const clientState = connectedClients.get(socket); 305 + connectedClients.delete(socket); 298 306 299 - socket.addEventListener("close", (event) => { 300 - const clientState = connectedClients.get(socket); 301 - connectedClients.delete(socket); 302 - 303 - logger.info`❌ Client disconnected. Code: ${event.code}, Reason: ${event.reason || "none"}, Clean: ${event.wasClean}`; 304 - logger.info` Active clients: ${connectedClients.size}`; 307 + logger.info`❌ Client disconnected. Code: ${event.code}, Reason: ${event.reason || "none"}, Clean: ${event.wasClean}`; 308 + logger.info` Active clients: ${connectedClients.size}`; 305 309 306 - if (clientState) { 307 - logger.info` Was paginating: ${clientState.isPaginating}`; 308 - logger.info` Queued events: ${clientState.queue.length}`; 309 - } 310 + if (clientState) { 311 + logger.info` Was paginating: ${clientState.isPaginating}`; 312 + logger.info` Queued events: ${clientState.queue.length}`; 313 + } 310 314 311 - if (event.code === 1006) { 312 - logger.error`⚠️ Abnormal closure (1006) detected - connection dropped unexpectedly`; 313 - logger.error` Possible causes:`; 314 - logger.error` - Client overwhelmed with messages (try reducing PAGE_SIZE)`; 315 - logger.error` - Network timeout or interruption`; 316 - logger.error` - Server sent messages too fast`; 317 - logger.error` - Uncaught exception in message handling`; 318 - } 319 - }); 315 + if (event.code === 1006) { 316 + logger.error`⚠️ Abnormal closure (1006) detected - connection dropped unexpectedly`; 317 + logger.error` Possible causes:`; 318 + logger.error` - Client overwhelmed with messages (try reducing PAGE_SIZE)`; 319 + logger.error` - Network timeout or interruption`; 320 + logger.error` - Server sent messages too fast`; 321 + logger.error` - Uncaught exception in message handling`; 322 + } 323 + }); 320 324 321 - socket.addEventListener("error", (error) => { 322 - logger.error`❌ WebSocket error occurred`; 323 - logger.error` Error: ${error}`; 324 - logger.error` ReadyState: ${socket.readyState}`; 325 - const clientState = connectedClients.get(socket); 326 - if (clientState) { 327 - logger.error` Was paginating: ${clientState.isPaginating}`; 328 - logger.error` Queued events: ${clientState.queue.length}`; 329 - } 330 - connectedClients.delete(socket); 331 - }); 325 + socket.addEventListener("error", (error) => { 326 + logger.error`❌ WebSocket error occurred`; 327 + logger.error` Error: ${error}`; 328 + logger.error` ReadyState: ${socket.readyState}`; 329 + const clientState = connectedClients.get(socket); 330 + if (clientState) { 331 + logger.error` Was paginating: ${clientState.isPaginating}`; 332 + logger.error` Queued events: ${clientState.queue.length}`; 333 + } 334 + connectedClients.delete(socket); 335 + }); 332 336 333 - return response; 334 - }); 337 + return response; 338 + }, 339 + ); 335 340 336 341 globalThis.addEventListener("beforeunload", () => { 337 342 flushBatch();