forked from
rocksky.app/rocksky
A decentralized music tracking and discovery platform built on AT Protocol 🎵
1import { configure, getConsoleSink, getLogger } from "@logtape/logtape";
2import { JetStreamClient, JetStreamEvent } from "./jetstream.ts";
3import getNowPlayings from "./services/getNowPlayings.ts";
4import { ctx } from "./context.ts";
5import getScrobbles from "./services/getScrobbles.ts";
6import getScrobblesChart from "./services/getScrobblesChart.ts";
7import getActorAlbums from "./services/getActorAlbums.ts";
8import getActorArtists from "./services/getActorArtists.ts";
9import getActorScrobbles from "./services/getActorScrobbles.ts";
10
11await configure({
12 sinks: { console: getConsoleSink() },
13 loggers: [{ category: "ws", lowestLevel: "debug", sinks: ["console"] }],
14});
15
16const logger = getLogger("ws");
17
18const clients = new Map<WebSocket, Set<string>>();
19
20const getEndpoint = () => {
21 const endpoint = Deno.env.get("JETSTREAM_SERVER")
22 ? Deno.env.get("JETSTREAM_SERVER")
23 : "wss://jetstream1.us-west.bsky.network/subscribe";
24
25 if (endpoint?.endsWith("/subscribe")) {
26 return endpoint;
27 }
28
29 return `${endpoint}/subscribe`;
30};
31
32const client = new JetStreamClient({
33 wantedCollections: ["app.rocksky.scrobble"],
34 endpoint: getEndpoint(),
35
36 // Optional: filter by specific DIDs
37 // wantedDids: ["did:plc:example123"],
38
39 // Reconnection settings
40 maxReconnectAttempts: 10,
41 reconnectDelay: 1000,
42 maxReconnectDelay: 30000,
43 backoffMultiplier: 1.5,
44
45 // Enable debug logging
46 debug: true,
47});
48
49client.on("open", () => {
50 logger.info`✅ Connected to JetStream!`;
51});
52
53client.on("message", async (data) => {
54 const event = data as JetStreamEvent;
55
56 if (event.kind === "commit" && event.commit) {
57 const { operation, collection, record, rkey } = event.commit;
58
59 logger.info`\n📡 New event:`;
60 logger.info` Operation: ${operation}`;
61 logger.info` Collection: ${collection}`;
62 logger.info` DID: ${event.did}`;
63 logger.info` Uri: at://${event.did}/${collection}/${rkey}`;
64
65 if (operation === "create" && record) {
66 console.log(JSON.stringify(record, null, 2));
67 }
68
69 logger.info` Cursor: ${event.time_us}`;
70
71 try {
72 const nowPlayings = await getNowPlayings(ctx);
73 const scrobbles = await getScrobbles(ctx);
74 const scrobblesChart = await getScrobblesChart(ctx);
75 const actorScrobbles = await getActorScrobbles(ctx, event.did);
76 const actorAlbums = await getActorAlbums(ctx, event.did);
77 const actorArtists = await getActorArtists(ctx, event.did);
78
79 for (const [socket, channels] of clients) {
80 if (channels.has(collection) && socket.readyState === WebSocket.OPEN) {
81 socket.send(
82 JSON.stringify({
83 ...nowPlayings,
84 scrobbles,
85 scrobblesChart,
86 actorScrobbles,
87 actorAlbums,
88 actorArtists,
89 uri: `at://${event.did}/${collection}/${rkey}`,
90 did: event.did,
91 }),
92 );
93 }
94 }
95 } catch (error) {
96 logger.error`Failed to send data to client: ${error}`;
97 }
98 }
99});
100
101client.on("error", (error) => {
102 logger.error`❌ Error: ${error}`;
103});
104
105client.on("reconnect", (data) => {
106 const { attempt } = data as { attempt: number };
107 logger.info`🔄 Reconnecting... (attempt ${attempt})`;
108});
109
110client.connect();
111
112Deno.serve({ port: parseInt(Deno.env.get("WS_PORT") || "8002") }, (req) => {
113 if (req.headers.get("upgrade") != "websocket") {
114 return new Response(null, { status: 426 });
115 }
116 const { socket, response } = Deno.upgradeWebSocket(req);
117 socket.addEventListener("open", () => {
118 logger.info`a client connected!`;
119 clients.set(socket, new Set(["app.rocksky.scrobble"]));
120 });
121 socket.addEventListener("message", (event) => {
122 if (event.data === "ping") {
123 socket.send("pong");
124 }
125 });
126 return response;
127});