forked from
rocksky.app/rocksky
A decentralized music tracking and discovery platform built on AT Protocol 馃幍
1import { serve } from "@hono/node-server";
2import { createNodeWebSocket } from "@hono/node-ws";
3import { trace } from "@opentelemetry/api";
4import { consola } from "consola";
5import { ctx } from "context";
6import { and, desc, eq, isNotNull, or } from "drizzle-orm";
7import { Hono } from "hono";
8import { cors } from "hono/cors";
9import jwt from "jsonwebtoken";
10import { createAgent } from "lib/agent";
11import {
12 getLovedTracks,
13 likeTrack,
14 unLikeTrack,
15} from "lovedtracks/lovedtracks.service";
16import dns from "node:dns";
17import { scrobbleTrack } from "nowplaying/nowplaying.service";
18import { rateLimiter } from "ratelimiter";
19import subscribe from "subscribers";
20import { saveTrack } from "tracks/tracks.service";
21import { trackSchema } from "types/track";
22import handleWebsocket from "websocket/handler";
23import apikeys from "./apikeys/app";
24import bsky from "./bsky/app";
25import dropbox from "./dropbox/app";
26import googledrive from "./googledrive/app";
27import { env } from "./lib/env";
28import { requestCounter, requestDuration } from "./metrics";
29import "./profiling";
30import albumTracks from "./schema/album-tracks";
31import albums from "./schema/albums";
32import artistTracks from "./schema/artist-tracks";
33import artists from "./schema/artists";
34import scrobbles from "./schema/scrobbles";
35import tracks from "./schema/tracks";
36import users from "./schema/users";
37import spotify from "./spotify/app";
38import "./tracing";
39import usersApp from "./users/app";
40import webscrobbler from "./webscrobbler/app";
41
42dns.setDefaultResultOrder("ipv4first");
43
44subscribe(ctx);
45
46const app = new Hono();
47const { injectWebSocket, upgradeWebSocket } = createNodeWebSocket({ app });
48
49app.use(
50 "*",
51 rateLimiter({
52 limit: 1000,
53 window: 30, // 馃憟 30 seconds
54 }),
55);
56
57app.use("*", async (c, next) => {
58 const span = trace.getActiveSpan();
59 span?.setAttribute("http.route", c.req.path);
60 await next();
61});
62
63app.use("*", async (c, next) => {
64 const start = Date.now();
65 await next();
66 const duration = (Date.now() - start) / 1000;
67 requestDuration.record(duration, {
68 route: c.req.path,
69 method: c.req.method,
70 });
71});
72
73app.use(cors());
74
75app.route("/", bsky);
76
77app.route("/spotify", spotify);
78
79app.route("/dropbox", dropbox);
80
81app.route("/googledrive", googledrive);
82
83app.route("/apikeys", apikeys);
84
85app.get("/ws", upgradeWebSocket(handleWebsocket));
86
87app.get("/", async (c) => {
88 return c.json({ status: "ok" });
89});
90
91app.post("/now-playing", async (c) => {
92 requestCounter.add(1, { method: "POST", route: "/now-playing" });
93 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim();
94
95 if (!bearer || bearer === "null") {
96 c.status(401);
97 return c.text("Unauthorized");
98 }
99
100 const { did } = jwt.verify(bearer, env.JWT_SECRET, {
101 ignoreExpiration: true,
102 });
103
104 const user = await ctx.db
105 .select()
106 .from(users)
107 .where(eq(users.did, did))
108 .limit(1)
109 .then((rows) => rows[0]);
110
111 if (!user) {
112 c.status(401);
113 return c.text("Unauthorized");
114 }
115
116 const body = await c.req.json();
117 const parsed = trackSchema.safeParse(body);
118
119 if (parsed.error) {
120 c.status(400);
121 return c.text("Invalid track data: " + parsed.error.message);
122 }
123 const track = parsed.data;
124
125 const agent = await createAgent(ctx.oauthClient, did);
126 if (!agent) {
127 c.status(401);
128 return c.text("Unauthorized");
129 }
130
131 await scrobbleTrack(ctx, track, agent, user.did);
132
133 return c.json({ status: "ok" });
134});
135
136app.get("/now-playing", async (c) => {
137 requestCounter.add(1, { method: "GET", route: "/now-playing" });
138
139 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim();
140
141 const payload =
142 bearer && bearer !== "null"
143 ? jwt.verify(bearer, env.JWT_SECRET, { ignoreExpiration: true })
144 : {};
145 const did = c.req.query("did") || payload.did;
146
147 if (!did) {
148 c.status(401);
149 return c.text("Unauthorized");
150 }
151
152 const user = await ctx.db
153 .select()
154 .from(users)
155 .where(or(eq(users.did, did), eq(users.handle, did)))
156 .limit(1)
157 .then((rows) => rows[0]);
158
159 if (!user) {
160 c.status(401);
161 return c.text("Unauthorized");
162 }
163
164 const [nowPlaying, status] = await Promise.all([
165 ctx.redis.get(`nowplaying:${user.did}`),
166 ctx.redis.get(`nowplaying:${user.did}:status`),
167 ]);
168 return c.json(
169 nowPlaying ? { ...JSON.parse(nowPlaying), is_playing: status === "1" } : {},
170 );
171});
172
173app.get("/now-playings", async (c) => {
174 requestCounter.add(1, { method: "GET", route: "/now-playings" });
175 const size = +c.req.query("size") || 10;
176 const offset = +c.req.query("offset") || 0;
177 const { data } = await ctx.analytics.post("library.getDistinctScrobbles", {
178 pagination: {
179 skip: offset,
180 take: size,
181 },
182 });
183 return c.json(data);
184});
185
186app.post("/likes", async (c) => {
187 requestCounter.add(1, { method: "POST", route: "/likes" });
188 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim();
189
190 if (!bearer || bearer === "null") {
191 c.status(401);
192 return c.text("Unauthorized");
193 }
194
195 const { did } = jwt.verify(bearer, env.JWT_SECRET, {
196 ignoreExpiration: true,
197 });
198 const agent = await createAgent(ctx.oauthClient, did);
199
200 const user = await ctx.db
201 .select()
202 .from(users)
203 .where(eq(users.did, did))
204 .limit(1)
205 .then((rows) => rows[0]);
206
207 if (!user) {
208 c.status(401);
209 return c.text("Unauthorized");
210 }
211
212 const body = await c.req.json();
213 const parsed = trackSchema.safeParse(body);
214
215 if (parsed.error) {
216 c.status(400);
217 return c.text("Invalid track data: " + parsed.error.message);
218 }
219 const track = parsed.data;
220 await likeTrack(ctx, track, user, agent);
221
222 return c.json({ status: "ok" });
223});
224
225app.delete("/likes/:sha256", async (c) => {
226 requestCounter.add(1, { method: "DELETE", route: "/likes/:sha256" });
227 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim();
228
229 if (!bearer || bearer === "null") {
230 c.status(401);
231 return c.text("Unauthorized");
232 }
233
234 const { did } = jwt.verify(bearer, env.JWT_SECRET, {
235 ignoreExpiration: true,
236 });
237 const agent = await createAgent(ctx.oauthClient, did);
238
239 const user = await ctx.db
240 .select()
241 .from(users)
242 .where(eq(users.did, did))
243 .limit(1)
244 .then((rows) => rows[0]);
245
246 if (!user) {
247 c.status(401);
248 return c.text("Unauthorized");
249 }
250
251 const sha256 = c.req.param("sha256");
252 await unLikeTrack(ctx, sha256, user, agent);
253 return c.json({ status: "ok" });
254});
255
256app.get("/likes", async (c) => {
257 requestCounter.add(1, { method: "GET", route: "/likes" });
258 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim();
259
260 if (!bearer || bearer === "null") {
261 c.status(401);
262 return c.text("Unauthorized");
263 }
264
265 const { did } = jwt.verify(bearer, env.JWT_SECRET, {
266 ignoreExpiration: true,
267 });
268
269 const user = await ctx.db
270 .select()
271 .from(users)
272 .where(eq(users.did, did))
273 .limit(1)
274 .then((rows) => rows[0]);
275
276 if (!user) {
277 c.status(401);
278 return c.text("Unauthorized");
279 }
280
281 const size = +c.req.query("size") || 10;
282 const offset = +c.req.query("offset") || 0;
283
284 const lovedTracks = await getLovedTracks(ctx, user, size, offset);
285 return c.json(lovedTracks);
286});
287
288app.get("/public/scrobbles", async (c) => {
289 requestCounter.add(1, { method: "GET", route: "/public/scrobbles" });
290
291 const size = +c.req.query("size") || 10;
292 const offset = +c.req.query("offset") || 0;
293
294 const scrobbleRecords = await ctx.db
295 .select({
296 scrobble: scrobbles,
297 track: tracks,
298 user: users,
299 })
300 .from(scrobbles)
301 .innerJoin(tracks, eq(scrobbles.trackId, tracks.id))
302 .innerJoin(users, eq(scrobbles.userId, users.id))
303 .orderBy(desc(scrobbles.timestamp))
304 .limit(size)
305 .offset(offset);
306
307 return c.json(
308 scrobbleRecords.map((item) => ({
309 cover: item.track.albumArt,
310 artist: item.track.artist,
311 title: item.track.title,
312 date: item.scrobble.timestamp,
313 user: item.user.handle,
314 uri: item.scrobble.uri,
315 albumUri: item.track.albumUri,
316 artistUri: item.track.artistUri,
317 tags: [],
318 listeners: 1,
319 sha256: item.track.sha256,
320 id: item.scrobble.id,
321 })),
322 );
323});
324
325app.get("/public/scrobbleschart", async (c) => {
326 requestCounter.add(1, { method: "GET", route: "/public/scrobbleschart" });
327
328 const did = c.req.query("did");
329 const artisturi = c.req.query("artisturi");
330 const albumuri = c.req.query("albumuri");
331 const songuri = c.req.query("songuri");
332
333 if (did) {
334 const chart = await ctx.analytics.post("library.getScrobblesPerDay", {
335 user_did: did,
336 });
337 return c.json(chart.data);
338 }
339
340 if (artisturi) {
341 const chart = await ctx.analytics.post("library.getArtistScrobbles", {
342 artist_id: artisturi,
343 });
344 return c.json(chart.data);
345 }
346
347 if (albumuri) {
348 const chart = await ctx.analytics.post("library.getAlbumScrobbles", {
349 album_id: albumuri,
350 });
351 return c.json(chart.data);
352 }
353
354 if (songuri) {
355 let uri = songuri;
356 if (songuri.includes("app.rocksky.scrobble")) {
357 const scrobble = await ctx.db
358 .select({
359 scrobble: scrobbles,
360 track: tracks,
361 })
362 .from(scrobbles)
363 .innerJoin(tracks, eq(scrobbles.trackId, tracks.id))
364 .where(eq(scrobbles.uri, songuri))
365 .limit(1)
366 .then((rows) => rows[0]);
367
368 uri = scrobble.track.uri;
369 }
370 const chart = await ctx.analytics.post("library.getTrackScrobbles", {
371 track_id: uri,
372 });
373 return c.json(chart.data);
374 }
375
376 const chart = await ctx.analytics.post("library.getScrobblesPerDay", {});
377 return c.json(chart.data);
378});
379
380app.get("/scrobbles", async (c) => {
381 requestCounter.add(1, { method: "GET", route: "/scrobbles" });
382
383 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim();
384
385 if (!bearer || bearer === "null") {
386 c.status(401);
387 return c.text("Unauthorized");
388 }
389
390 const { did } = jwt.verify(bearer, env.JWT_SECRET, {
391 ignoreExpiration: true,
392 });
393
394 const user = await ctx.db
395 .select()
396 .from(users)
397 .where(eq(users.did, did))
398 .limit(1)
399 .then((rows) => rows[0]);
400
401 if (!user) {
402 c.status(401);
403 return c.text("Unauthorized");
404 }
405
406 const size = +c.req.query("size") || 10;
407 const offset = +c.req.query("offset") || 0;
408
409 const userScrobbles = await ctx.db
410 .select({
411 scrobble: scrobbles,
412 track: tracks,
413 })
414 .from(scrobbles)
415 .innerJoin(tracks, eq(scrobbles.trackId, tracks.id))
416 .where(and(eq(scrobbles.userId, user.id), isNotNull(scrobbles.uri)))
417 .orderBy(desc(scrobbles.createdAt))
418 .limit(size)
419 .offset(offset);
420
421 return c.json(userScrobbles);
422});
423
424app.post("/tracks", async (c) => {
425 requestCounter.add(1, { method: "POST", route: "/tracks" });
426
427 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim();
428
429 if (!bearer || bearer === "null") {
430 c.status(401);
431 return c.text("Unauthorized");
432 }
433
434 const { did } = jwt.verify(bearer, env.JWT_SECRET, {
435 ignoreExpiration: true,
436 });
437
438 const user = await ctx.db
439 .select()
440 .from(users)
441 .where(eq(users.did, did))
442 .limit(1)
443 .then((rows) => rows[0]);
444
445 if (!user) {
446 c.status(401);
447 return c.text("Unauthorized");
448 }
449
450 const body = await c.req.json();
451 const parsed = trackSchema.safeParse(body);
452
453 if (parsed.error) {
454 c.status(400);
455 return c.text("Invalid track data: " + parsed.error.message);
456 }
457
458 const track = parsed.data;
459
460 const agent = await createAgent(ctx.oauthClient, did);
461 if (!agent) {
462 c.status(401);
463 return c.text("Unauthorized");
464 }
465
466 try {
467 await saveTrack(ctx, track, agent);
468 } catch (e) {
469 if (!e.message.includes("duplicate key value violates unique constraint")) {
470 consola.error("[tracks]", e.message);
471 }
472 }
473
474 return c.json({ status: "ok" });
475});
476
477app.get("/tracks", async (c) => {
478 requestCounter.add(1, { method: "GET", route: "/tracks" });
479
480 const size = +c.req.query("size") || 100;
481 const offset = +c.req.query("offset") || 0;
482
483 const tracksData = await ctx.analytics.post("library.getTracks", {
484 pagination: {
485 skip: offset,
486 take: size,
487 },
488 });
489
490 return c.json(tracksData.data);
491});
492
493app.get("/albums", async (c) => {
494 requestCounter.add(1, { method: "GET", route: "/albums" });
495
496 const size = +c.req.query("size") || 100;
497 const offset = +c.req.query("offset") || 0;
498
499 const albumsData = await ctx.analytics.post("library.getAlbums", {
500 pagination: {
501 skip: offset,
502 take: size,
503 },
504 });
505
506 return c.json(albumsData.data);
507});
508
509app.get("/artists", async (c) => {
510 requestCounter.add(1, { method: "GET", route: "/artists" });
511
512 const size = +c.req.query("size") || 100;
513 const offset = +c.req.query("offset") || 0;
514
515 const artistsData = await ctx.analytics.post("library.getArtists", {
516 pagination: {
517 skip: offset,
518 take: size,
519 },
520 });
521
522 return c.json(artistsData.data);
523});
524
525app.get("/tracks/:sha256", async (c) => {
526 requestCounter.add(1, { method: "GET", route: "/tracks/:sha256" });
527
528 const sha256 = c.req.param("sha256");
529 const track = await ctx.db
530 .select()
531 .from(tracks)
532 .where(eq(tracks.sha256, sha256))
533 .limit(1)
534 .then((rows) => rows[0]);
535
536 return c.json(track);
537});
538
539app.get("/albums/:sha256", async (c) => {
540 requestCounter.add(1, { method: "GET", route: "/albums/:sha256" });
541
542 const sha256 = c.req.param("sha256");
543 const album = await ctx.db
544 .select()
545 .from(albums)
546 .where(eq(albums.sha256, sha256))
547 .limit(1)
548 .then((rows) => rows[0]);
549
550 return c.json(album);
551});
552
553app.get("/artists/:sha256", async (c) => {
554 requestCounter.add(1, { method: "GET", route: "/artists/:sha256" });
555
556 const sha256 = c.req.param("sha256");
557 const artist = await ctx.db
558 .select()
559 .from(artists)
560 .where(eq(artists.sha256, sha256))
561 .limit(1)
562 .then((rows) => rows[0]);
563
564 return c.json(artist);
565});
566
567app.get("/artists/:sha256/tracks", async (c) => {
568 requestCounter.add(1, { method: "GET", route: "/artists/:sha256/tracks" });
569 const sha256 = c.req.param("sha256");
570
571 const artistTracksData = await ctx.db
572 .select({
573 track: tracks,
574 })
575 .from(artistTracks)
576 .innerJoin(tracks, eq(artistTracks.trackId, tracks.id))
577 .innerJoin(artists, eq(artistTracks.artistId, artists.id))
578 .where(eq(artists.sha256, sha256));
579
580 return c.json(artistTracksData.map((item) => item.track));
581});
582
583app.get("/albums/:sha256/tracks", async (c) => {
584 requestCounter.add(1, { method: "GET", route: "/albums/:sha256/tracks" });
585 const sha256 = c.req.param("sha256");
586
587 const albumTracksData = await ctx.db
588 .select({
589 track: tracks,
590 })
591 .from(albumTracks)
592 .innerJoin(tracks, eq(albumTracks.trackId, tracks.id))
593 .innerJoin(albums, eq(albumTracks.albumId, albums.id))
594 .where(eq(albums.sha256, sha256));
595
596 return c.json(albumTracksData.map((item) => item.track));
597});
598
599app.route("/users", usersApp);
600
601app.route("/webscrobbler", webscrobbler);
602
603const server = serve({
604 fetch: app.fetch,
605 port: 8000,
606});
607
608injectWebSocket(server);