A decentralized music tracking and discovery platform built on AT Protocol 🎵
at feat/feed-generator 127 lines 3.8 kB view raw
1import { configure, getConsoleSink, getLogger } from "@logtape/logtape"; 2import { JetStreamClient, JetStreamEvent } from "./jetstream.ts"; 3import getNowPlayings from "./services/getNowPlayings.ts"; 4import { ctx } from "./context.ts"; 5import getScrobbles from "./services/getScrobbles.ts"; 6import getScrobblesChart from "./services/getScrobblesChart.ts"; 7import getActorAlbums from "./services/getActorAlbums.ts"; 8import getActorArtists from "./services/getActorArtists.ts"; 9import getActorScrobbles from "./services/getActorScrobbles.ts"; 10 11await configure({ 12 sinks: { console: getConsoleSink() }, 13 loggers: [{ category: "ws", lowestLevel: "debug", sinks: ["console"] }], 14}); 15 16const logger = getLogger("ws"); 17 18const clients = new Map<WebSocket, Set<string>>(); 19 20const getEndpoint = () => { 21 const endpoint = Deno.env.get("JETSTREAM_SERVER") 22 ? Deno.env.get("JETSTREAM_SERVER") 23 : "wss://jetstream1.us-west.bsky.network/subscribe"; 24 25 if (endpoint?.endsWith("/subscribe")) { 26 return endpoint; 27 } 28 29 return `${endpoint}/subscribe`; 30}; 31 32const client = new JetStreamClient({ 33 wantedCollections: ["app.rocksky.scrobble"], 34 endpoint: getEndpoint(), 35 36 // Optional: filter by specific DIDs 37 // wantedDids: ["did:plc:example123"], 38 39 // Reconnection settings 40 maxReconnectAttempts: 10, 41 reconnectDelay: 1000, 42 maxReconnectDelay: 30000, 43 backoffMultiplier: 1.5, 44 45 // Enable debug logging 46 debug: true, 47}); 48 49client.on("open", () => { 50 logger.info`✅ Connected to JetStream!`; 51}); 52 53client.on("message", async (data) => { 54 const event = data as JetStreamEvent; 55 56 if (event.kind === "commit" && event.commit) { 57 const { operation, collection, record, rkey } = event.commit; 58 59 logger.info`\n📡 New event:`; 60 logger.info` Operation: ${operation}`; 61 logger.info` Collection: ${collection}`; 62 logger.info` DID: ${event.did}`; 63 logger.info` Uri: at://${event.did}/${collection}/${rkey}`; 64 65 if (operation === "create" && record) { 66 console.log(JSON.stringify(record, null, 2)); 67 } 68 69 logger.info` Cursor: ${event.time_us}`; 70 71 try { 72 const nowPlayings = await getNowPlayings(ctx); 73 const scrobbles = await getScrobbles(ctx); 74 const scrobblesChart = await getScrobblesChart(ctx); 75 const actorScrobbles = await getActorScrobbles(ctx, event.did); 76 const actorAlbums = await getActorAlbums(ctx, event.did); 77 const actorArtists = await getActorArtists(ctx, event.did); 78 79 for (const [socket, channels] of clients) { 80 if (channels.has(collection) && socket.readyState === WebSocket.OPEN) { 81 socket.send( 82 JSON.stringify({ 83 nowPlayings, 84 scrobbles, 85 scrobblesChart, 86 actorScrobbles, 87 actorAlbums, 88 actorArtists, 89 uri: `at://${event.did}/${collection}/${rkey}`, 90 did: event.did, 91 }), 92 ); 93 } 94 } 95 } catch (error) { 96 logger.error`Failed to send data to client: ${error}`; 97 } 98 } 99}); 100 101client.on("error", (error) => { 102 logger.error`❌ Error: ${error}`; 103}); 104 105client.on("reconnect", (data) => { 106 const { attempt } = data as { attempt: number }; 107 logger.info`🔄 Reconnecting... (attempt ${attempt})`; 108}); 109 110client.connect(); 111 112Deno.serve({ port: parseInt(Deno.env.get("WS_PORT") || "8002") }, (req) => { 113 if (req.headers.get("upgrade") != "websocket") { 114 return new Response(null, { status: 426 }); 115 } 116 const { socket, response } = Deno.upgradeWebSocket(req); 117 socket.addEventListener("open", () => { 118 logger.info`a client connected!`; 119 clients.set(socket, new Set(["app.rocksky.scrobble"])); 120 }); 121 socket.addEventListener("message", (event) => { 122 if (event.data === "ping") { 123 socket.send("pong"); 124 } 125 }); 126 return response; 127});