forked from
rocksky.app/rocksky
A decentralized music tracking and discovery platform built on AT Protocol 馃幍
1import { serve } from "@hono/node-server";
2import { createNodeWebSocket } from "@hono/node-ws";
3import { trace } from "@opentelemetry/api";
4import { equals } from "@xata.io/client";
5import { ctx } from "context";
6import { Hono } from "hono";
7import { cors } from "hono/cors";
8import jwt from "jsonwebtoken";
9import { createAgent } from "lib/agent";
10import {
11 getLovedTracks,
12 likeTrack,
13 unLikeTrack,
14} from "lovedtracks/lovedtracks.service";
15import { scrobbleTrack } from "nowplaying/nowplaying.service";
16import { rateLimiter } from "ratelimiter";
17import subscribe from "subscribers";
18import { saveTrack } from "tracks/tracks.service";
19import { trackSchema } from "types/track";
20import handleWebsocket from "websocket/handler";
21import apikeys from "./apikeys/app";
22import bsky from "./bsky/app";
23import dropbox from "./dropbox/app";
24import googledrive from "./googledrive/app";
25import { env } from "./lib/env";
26import { requestCounter, requestDuration } from "./metrics";
27import "./profiling";
28import search from "./search/app";
29import spotify from "./spotify/app";
30import "./tracing";
31import users from "./users/app";
32import webscrobbler from "./webscrobbler/app";
33
34subscribe(ctx);
35
36const app = new Hono();
37const { injectWebSocket, upgradeWebSocket } = createNodeWebSocket({ app });
38
39app.use(
40 "*",
41 rateLimiter({
42 limit: 1000,
43 window: 30, // 馃憟 30 seconds
44 }),
45);
46
47app.use("*", async (c, next) => {
48 const span = trace.getActiveSpan();
49 span?.setAttribute("http.route", c.req.path);
50 await next();
51});
52
53app.use("*", async (c, next) => {
54 const start = Date.now();
55 await next();
56 const duration = (Date.now() - start) / 1000;
57 requestDuration.record(duration, {
58 route: c.req.path,
59 method: c.req.method,
60 });
61});
62
63app.use(cors());
64
65app.route("/", bsky);
66
67app.route("/spotify", spotify);
68
69app.route("/dropbox", dropbox);
70
71app.route("/googledrive", googledrive);
72
73app.route("/apikeys", apikeys);
74
75app.get("/ws", upgradeWebSocket(handleWebsocket));
76
77app.get("/", async (c) => {
78 return c.json({ status: "ok" });
79});
80
81app.post("/now-playing", async (c) => {
82 requestCounter.add(1, { method: "POST", route: "/now-playing" });
83 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim();
84
85 if (!bearer || bearer === "null") {
86 c.status(401);
87 return c.text("Unauthorized");
88 }
89
90 const { did } = jwt.verify(bearer, env.JWT_SECRET, {
91 ignoreExpiration: true,
92 });
93
94 const user = await ctx.client.db.users.filter("did", equals(did)).getFirst();
95 if (!user) {
96 c.status(401);
97 return c.text("Unauthorized");
98 }
99
100 const body = await c.req.json();
101 const parsed = trackSchema.safeParse(body);
102
103 if (parsed.error) {
104 c.status(400);
105 return c.text("Invalid track data: " + parsed.error.message);
106 }
107 const track = parsed.data;
108
109 const agent = await createAgent(ctx.oauthClient, did);
110 if (!agent) {
111 c.status(401);
112 return c.text("Unauthorized");
113 }
114
115 await scrobbleTrack(ctx, track, agent, user.did);
116
117 return c.json({ status: "ok" });
118});
119
120app.get("/now-playing", async (c) => {
121 requestCounter.add(1, { method: "GET", route: "/now-playing" });
122
123 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim();
124
125 const payload =
126 bearer && bearer !== "null"
127 ? jwt.verify(bearer, env.JWT_SECRET, { ignoreExpiration: true })
128 : {};
129 const did = c.req.query("did") || payload.did;
130
131 if (!did) {
132 c.status(401);
133 return c.text("Unauthorized");
134 }
135
136 const user = await ctx.client.db.users
137 .filter({
138 $any: [{ did }, { handle: did }],
139 })
140 .getFirst();
141
142 if (!user) {
143 c.status(401);
144 return c.text("Unauthorized");
145 }
146
147 const [nowPlaying, status] = await Promise.all([
148 ctx.redis.get(`nowplaying:${user.did}`),
149 ctx.redis.get(`nowplaying:${user.did}:status`),
150 ]);
151 return c.json(
152 nowPlaying ? { ...JSON.parse(nowPlaying), is_playing: status === "1" } : {},
153 );
154});
155
156app.get("/now-playings", async (c) => {
157 requestCounter.add(1, { method: "GET", route: "/now-playings" });
158 const size = +c.req.query("size") || 10;
159 const offset = +c.req.query("offset") || 0;
160 const { data } = await ctx.analytics.post("library.getDistinctScrobbles", {
161 pagination: {
162 skip: offset,
163 take: size,
164 },
165 });
166 return c.json(data);
167});
168
169app.post("/likes", async (c) => {
170 requestCounter.add(1, { method: "POST", route: "/likes" });
171 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim();
172
173 if (!bearer || bearer === "null") {
174 c.status(401);
175 return c.text("Unauthorized");
176 }
177
178 const { did } = jwt.verify(bearer, env.JWT_SECRET, {
179 ignoreExpiration: true,
180 });
181 const agent = await createAgent(ctx.oauthClient, did);
182
183 const user = await ctx.client.db.users.filter("did", equals(did)).getFirst();
184 if (!user) {
185 c.status(401);
186 return c.text("Unauthorized");
187 }
188
189 const body = await c.req.json();
190 const parsed = trackSchema.safeParse(body);
191
192 if (parsed.error) {
193 c.status(400);
194 return c.text("Invalid track data: " + parsed.error.message);
195 }
196 const track = parsed.data;
197 await likeTrack(ctx, track, user, agent);
198
199 return c.json({ status: "ok" });
200});
201
202app.delete("/likes/:sha256", async (c) => {
203 requestCounter.add(1, { method: "DELETE", route: "/likes/:sha256" });
204 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim();
205
206 if (!bearer || bearer === "null") {
207 c.status(401);
208 return c.text("Unauthorized");
209 }
210
211 const { did } = jwt.verify(bearer, env.JWT_SECRET, {
212 ignoreExpiration: true,
213 });
214 const agent = await createAgent(ctx.oauthClient, did);
215
216 const user = await ctx.client.db.users.filter("did", equals(did)).getFirst();
217 if (!user) {
218 c.status(401);
219 return c.text("Unauthorized");
220 }
221
222 const sha256 = c.req.param("sha256");
223 await unLikeTrack(ctx, sha256, user, agent);
224 return c.json({ status: "ok" });
225});
226
227app.get("/likes", async (c) => {
228 requestCounter.add(1, { method: "GET", route: "/likes" });
229 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim();
230
231 if (!bearer || bearer === "null") {
232 c.status(401);
233 return c.text("Unauthorized");
234 }
235
236 const { did } = jwt.verify(bearer, env.JWT_SECRET, {
237 ignoreExpiration: true,
238 });
239
240 const user = await ctx.client.db.users.filter("did", equals(did)).getFirst();
241 if (!user) {
242 c.status(401);
243 return c.text("Unauthorized");
244 }
245
246 const size = +c.req.query("size") || 10;
247 const offset = +c.req.query("offset") || 0;
248
249 const lovedTracks = await getLovedTracks(ctx, user, size, offset);
250 return c.json(lovedTracks);
251});
252
253app.get("/public/scrobbles", async (c) => {
254 requestCounter.add(1, { method: "GET", route: "/public/scrobbles" });
255
256 const size = +c.req.query("size") || 10;
257 const offset = +c.req.query("offset") || 0;
258
259 const scrobbles = await ctx.client.db.scrobbles
260 .select(["track_id.*", "user_id.*", "timestamp", "xata_createdat", "uri"])
261 .sort("timestamp", "desc")
262 .getPaginated({
263 pagination: {
264 size,
265 offset,
266 },
267 });
268
269 return c.json(
270 scrobbles.records.map((item) => ({
271 cover: item.track_id.album_art,
272 artist: item.track_id.artist,
273 title: item.track_id.title,
274 date: item.timestamp,
275 user: item.user_id.handle,
276 uri: item.uri,
277 albumUri: item.track_id.album_uri,
278 artistUri: item.track_id.artist_uri,
279 tags: [],
280 listeners: 1,
281 sha256: item.track_id.sha256,
282 id: item.xata_id,
283 })),
284 );
285});
286
287app.get("/public/scrobbleschart", async (c) => {
288 requestCounter.add(1, { method: "GET", route: "/public/scrobbleschart" });
289
290 const did = c.req.query("did");
291 const artisturi = c.req.query("artisturi");
292 const albumuri = c.req.query("albumuri");
293 const songuri = c.req.query("songuri");
294
295 if (did) {
296 const chart = await ctx.analytics.post("library.getScrobblesPerDay", {
297 user_did: did,
298 });
299 return c.json(chart.data);
300 }
301
302 if (artisturi) {
303 const chart = await ctx.analytics.post("library.getArtistScrobbles", {
304 artist_id: artisturi,
305 });
306 return c.json(chart.data);
307 }
308
309 if (albumuri) {
310 const chart = await ctx.analytics.post("library.getAlbumScrobbles", {
311 album_id: albumuri,
312 });
313 return c.json(chart.data);
314 }
315
316 if (songuri) {
317 let uri = songuri;
318 if (songuri.includes("app.rocksky.scrobble")) {
319 const scrobble = await ctx.client.db.scrobbles
320 .select(["track_id.*", "uri"])
321 .filter("uri", equals(songuri))
322 .getFirst();
323
324 uri = scrobble.track_id.uri;
325 }
326 const chart = await ctx.analytics.post("library.getTrackScrobbles", {
327 track_id: uri,
328 });
329 return c.json(chart.data);
330 }
331
332 const chart = await ctx.analytics.post("library.getScrobblesPerDay", {});
333 return c.json(chart.data);
334});
335
336app.get("/scrobbles", async (c) => {
337 requestCounter.add(1, { method: "GET", route: "/scrobbles" });
338
339 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim();
340
341 if (!bearer || bearer === "null") {
342 c.status(401);
343 return c.text("Unauthorized");
344 }
345
346 const { did } = jwt.verify(bearer, env.JWT_SECRET, {
347 ignoreExpiration: true,
348 });
349
350 const user = await ctx.client.db.users.filter("did", equals(did)).getFirst();
351 if (!user) {
352 c.status(401);
353 return c.text("Unauthorized");
354 }
355
356 const size = +c.req.query("size") || 10;
357 const offset = +c.req.query("offset") || 0;
358
359 const scrobbles = await ctx.client.db.scrobbles
360 .select(["track_id.*", "uri"])
361 .filter("user_id", equals(user.xata_id))
362 .filter({
363 $not: [
364 {
365 uri: null,
366 },
367 ],
368 })
369 .sort("xata_createdat", "desc")
370 .getPaginated({
371 pagination: {
372 size,
373 offset,
374 },
375 });
376
377 return c.json(scrobbles.records);
378});
379
380app.post("/tracks", async (c) => {
381 requestCounter.add(1, { method: "POST", route: "/tracks" });
382
383 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim();
384
385 if (!bearer || bearer === "null") {
386 c.status(401);
387 return c.text("Unauthorized");
388 }
389
390 const { did } = jwt.verify(bearer, env.JWT_SECRET, {
391 ignoreExpiration: true,
392 });
393
394 const user = await ctx.client.db.users.filter("did", equals(did)).getFirst();
395 if (!user) {
396 c.status(401);
397 return c.text("Unauthorized");
398 }
399
400 const body = await c.req.json();
401 const parsed = trackSchema.safeParse(body);
402
403 if (parsed.error) {
404 c.status(400);
405 return c.text("Invalid track data: " + parsed.error.message);
406 }
407
408 const track = parsed.data;
409
410 const agent = await createAgent(ctx.oauthClient, did);
411 if (!agent) {
412 c.status(401);
413 return c.text("Unauthorized");
414 }
415
416 try {
417 await saveTrack(ctx, track, agent);
418 } catch (e) {
419 if (!e.message.includes("invalid record: column [sha256]: is not unique")) {
420 console.error("[spotify user]", e.message);
421 }
422 }
423
424 return c.json({ status: "ok" });
425});
426
427app.get("/tracks", async (c) => {
428 requestCounter.add(1, { method: "GET", route: "/tracks" });
429
430 const size = +c.req.query("size") || 100;
431 const offset = +c.req.query("offset") || 0;
432
433 const tracks = await ctx.analytics.post("library.getTracks", {
434 pagination: {
435 skip: offset,
436 take: size,
437 },
438 });
439
440 return c.json(tracks.data);
441});
442
443app.get("/albums", async (c) => {
444 requestCounter.add(1, { method: "GET", route: "/albums" });
445
446 const size = +c.req.query("size") || 100;
447 const offset = +c.req.query("offset") || 0;
448
449 const albums = await ctx.analytics.post("library.getAlbums", {
450 pagination: {
451 skip: offset,
452 take: size,
453 },
454 });
455
456 return c.json(albums.data);
457});
458
459app.get("/artists", async (c) => {
460 requestCounter.add(1, { method: "GET", route: "/artists" });
461
462 const size = +c.req.query("size") || 100;
463 const offset = +c.req.query("offset") || 0;
464
465 const artists = await ctx.analytics.post("library.getArtists", {
466 pagination: {
467 skip: offset,
468 take: size,
469 },
470 });
471
472 return c.json(artists.data);
473});
474
475app.get("/tracks/:sha256", async (c) => {
476 requestCounter.add(1, { method: "GET", route: "/tracks/:sha256" });
477
478 const sha256 = c.req.param("sha256");
479 const track = await ctx.client.db.tracks
480 .filter("sha256", equals(sha256))
481 .getFirst();
482 return c.json(track);
483});
484
485app.get("/albums/:sha256", async (c) => {
486 requestCounter.add(1, { method: "GET", route: "/albums/:sha256" });
487
488 const sha256 = c.req.param("sha256");
489 const album = await ctx.client.db.albums
490 .filter("sha256", equals(sha256))
491 .getFirst();
492
493 return c.json(album);
494});
495
496app.get("/artists/:sha256", async (c) => {
497 requestCounter.add(1, { method: "GET", route: "/artists/:sha256" });
498
499 const sha256 = c.req.param("sha256");
500 const artist = await ctx.client.db.artists
501 .filter("sha256", equals(sha256))
502 .getFirst();
503
504 return c.json(artist);
505});
506
507app.get("/artists/:sha256/tracks", async (c) => {
508 requestCounter.add(1, { method: "GET", route: "/artists/:sha256/tracks" });
509 const sha256 = c.req.param("sha256");
510
511 const tracks = await ctx.client.db.artist_tracks
512 .select(["track_id.*"])
513 .filter("artist_id.sha256", equals(sha256))
514 .getAll();
515
516 return c.json(tracks);
517});
518
519app.get("/albums/:sha256/tracks", async (c) => {
520 requestCounter.add(1, { method: "GET", route: "/albums/:sha256/tracks" });
521 const sha256 = c.req.param("sha256");
522 const tracks = await ctx.client.db.album_tracks
523 .select(["track_id.*"])
524 .filter("album_id.sha256", equals(sha256))
525 .getAll();
526
527 return c.json(tracks);
528});
529
530app.route("/users", users);
531
532app.route("/search", search);
533
534app.route("/webscrobbler", webscrobbler);
535
536const server = serve({
537 fetch: app.fetch,
538 port: 8000,
539});
540
541injectWebSocket(server);