forked from
rocksky.app/rocksky
A decentralized music tracking and discovery platform built on AT Protocol 馃幍
1import { serve } from "@hono/node-server";
2import { createNodeWebSocket } from "@hono/node-ws";
3import { trace } from "@opentelemetry/api";
4import { ctx } from "context";
5import { and, desc, eq, isNotNull, or } from "drizzle-orm";
6import { Hono } from "hono";
7import { cors } from "hono/cors";
8import jwt from "jsonwebtoken";
9import { createAgent } from "lib/agent";
10import {
11 getLovedTracks,
12 likeTrack,
13 unLikeTrack,
14} from "lovedtracks/lovedtracks.service";
15import { scrobbleTrack } from "nowplaying/nowplaying.service";
16import { rateLimiter } from "ratelimiter";
17import subscribe from "subscribers";
18import { saveTrack } from "tracks/tracks.service";
19import { trackSchema } from "types/track";
20import handleWebsocket from "websocket/handler";
21import apikeys from "./apikeys/app";
22import bsky from "./bsky/app";
23import dropbox from "./dropbox/app";
24import googledrive from "./googledrive/app";
25import { env } from "./lib/env";
26import { requestCounter, requestDuration } from "./metrics";
27import "./profiling";
28import albumTracks from "./schema/album-tracks";
29import albums from "./schema/albums";
30import artistTracks from "./schema/artist-tracks";
31import artists from "./schema/artists";
32import scrobbles from "./schema/scrobbles";
33import tracks from "./schema/tracks";
34import users from "./schema/users";
35import spotify from "./spotify/app";
36import "./tracing";
37import usersApp from "./users/app";
38import webscrobbler from "./webscrobbler/app";
39
40subscribe(ctx);
41
42const app = new Hono();
43const { injectWebSocket, upgradeWebSocket } = createNodeWebSocket({ app });
44
45app.use(
46 "*",
47 rateLimiter({
48 limit: 1000,
49 window: 30, // 馃憟 30 seconds
50 }),
51);
52
53app.use("*", async (c, next) => {
54 const span = trace.getActiveSpan();
55 span?.setAttribute("http.route", c.req.path);
56 await next();
57});
58
59app.use("*", async (c, next) => {
60 const start = Date.now();
61 await next();
62 const duration = (Date.now() - start) / 1000;
63 requestDuration.record(duration, {
64 route: c.req.path,
65 method: c.req.method,
66 });
67});
68
69app.use(cors());
70
71app.route("/", bsky);
72
73app.route("/spotify", spotify);
74
75app.route("/dropbox", dropbox);
76
77app.route("/googledrive", googledrive);
78
79app.route("/apikeys", apikeys);
80
81app.get("/ws", upgradeWebSocket(handleWebsocket));
82
83app.get("/", async (c) => {
84 return c.json({ status: "ok" });
85});
86
87app.post("/now-playing", async (c) => {
88 requestCounter.add(1, { method: "POST", route: "/now-playing" });
89 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim();
90
91 if (!bearer || bearer === "null") {
92 c.status(401);
93 return c.text("Unauthorized");
94 }
95
96 const { did } = jwt.verify(bearer, env.JWT_SECRET, {
97 ignoreExpiration: true,
98 });
99
100 const user = await ctx.db
101 .select()
102 .from(users)
103 .where(eq(users.did, did))
104 .limit(1)
105 .then((rows) => rows[0]);
106
107 if (!user) {
108 c.status(401);
109 return c.text("Unauthorized");
110 }
111
112 const body = await c.req.json();
113 const parsed = trackSchema.safeParse(body);
114
115 if (parsed.error) {
116 c.status(400);
117 return c.text("Invalid track data: " + parsed.error.message);
118 }
119 const track = parsed.data;
120
121 const agent = await createAgent(ctx.oauthClient, did);
122 if (!agent) {
123 c.status(401);
124 return c.text("Unauthorized");
125 }
126
127 await scrobbleTrack(ctx, track, agent, user.did);
128
129 return c.json({ status: "ok" });
130});
131
132app.get("/now-playing", async (c) => {
133 requestCounter.add(1, { method: "GET", route: "/now-playing" });
134
135 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim();
136
137 const payload =
138 bearer && bearer !== "null"
139 ? jwt.verify(bearer, env.JWT_SECRET, { ignoreExpiration: true })
140 : {};
141 const did = c.req.query("did") || payload.did;
142
143 if (!did) {
144 c.status(401);
145 return c.text("Unauthorized");
146 }
147
148 const user = await ctx.db
149 .select()
150 .from(users)
151 .where(or(eq(users.did, did), eq(users.handle, did)))
152 .limit(1)
153 .then((rows) => rows[0]);
154
155 if (!user) {
156 c.status(401);
157 return c.text("Unauthorized");
158 }
159
160 const [nowPlaying, status] = await Promise.all([
161 ctx.redis.get(`nowplaying:${user.did}`),
162 ctx.redis.get(`nowplaying:${user.did}:status`),
163 ]);
164 return c.json(
165 nowPlaying ? { ...JSON.parse(nowPlaying), is_playing: status === "1" } : {},
166 );
167});
168
169app.get("/now-playings", async (c) => {
170 requestCounter.add(1, { method: "GET", route: "/now-playings" });
171 const size = +c.req.query("size") || 10;
172 const offset = +c.req.query("offset") || 0;
173 const { data } = await ctx.analytics.post("library.getDistinctScrobbles", {
174 pagination: {
175 skip: offset,
176 take: size,
177 },
178 });
179 return c.json(data);
180});
181
182app.post("/likes", async (c) => {
183 requestCounter.add(1, { method: "POST", route: "/likes" });
184 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim();
185
186 if (!bearer || bearer === "null") {
187 c.status(401);
188 return c.text("Unauthorized");
189 }
190
191 const { did } = jwt.verify(bearer, env.JWT_SECRET, {
192 ignoreExpiration: true,
193 });
194 const agent = await createAgent(ctx.oauthClient, did);
195
196 const user = await ctx.db
197 .select()
198 .from(users)
199 .where(eq(users.did, did))
200 .limit(1)
201 .then((rows) => rows[0]);
202
203 if (!user) {
204 c.status(401);
205 return c.text("Unauthorized");
206 }
207
208 const body = await c.req.json();
209 const parsed = trackSchema.safeParse(body);
210
211 if (parsed.error) {
212 c.status(400);
213 return c.text("Invalid track data: " + parsed.error.message);
214 }
215 const track = parsed.data;
216 await likeTrack(ctx, track, user, agent);
217
218 return c.json({ status: "ok" });
219});
220
221app.delete("/likes/:sha256", async (c) => {
222 requestCounter.add(1, { method: "DELETE", route: "/likes/:sha256" });
223 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim();
224
225 if (!bearer || bearer === "null") {
226 c.status(401);
227 return c.text("Unauthorized");
228 }
229
230 const { did } = jwt.verify(bearer, env.JWT_SECRET, {
231 ignoreExpiration: true,
232 });
233 const agent = await createAgent(ctx.oauthClient, did);
234
235 const user = await ctx.db
236 .select()
237 .from(users)
238 .where(eq(users.did, did))
239 .limit(1)
240 .then((rows) => rows[0]);
241
242 if (!user) {
243 c.status(401);
244 return c.text("Unauthorized");
245 }
246
247 const sha256 = c.req.param("sha256");
248 await unLikeTrack(ctx, sha256, user, agent);
249 return c.json({ status: "ok" });
250});
251
252app.get("/likes", async (c) => {
253 requestCounter.add(1, { method: "GET", route: "/likes" });
254 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim();
255
256 if (!bearer || bearer === "null") {
257 c.status(401);
258 return c.text("Unauthorized");
259 }
260
261 const { did } = jwt.verify(bearer, env.JWT_SECRET, {
262 ignoreExpiration: true,
263 });
264
265 const user = await ctx.db
266 .select()
267 .from(users)
268 .where(eq(users.did, did))
269 .limit(1)
270 .then((rows) => rows[0]);
271
272 if (!user) {
273 c.status(401);
274 return c.text("Unauthorized");
275 }
276
277 const size = +c.req.query("size") || 10;
278 const offset = +c.req.query("offset") || 0;
279
280 const lovedTracks = await getLovedTracks(ctx, user, size, offset);
281 return c.json(lovedTracks);
282});
283
284app.get("/public/scrobbles", async (c) => {
285 requestCounter.add(1, { method: "GET", route: "/public/scrobbles" });
286
287 const size = +c.req.query("size") || 10;
288 const offset = +c.req.query("offset") || 0;
289
290 const scrobbleRecords = await ctx.db
291 .select({
292 scrobble: scrobbles,
293 track: tracks,
294 user: users,
295 })
296 .from(scrobbles)
297 .innerJoin(tracks, eq(scrobbles.trackId, tracks.id))
298 .innerJoin(users, eq(scrobbles.userId, users.id))
299 .orderBy(desc(scrobbles.timestamp))
300 .limit(size)
301 .offset(offset);
302
303 return c.json(
304 scrobbleRecords.map((item) => ({
305 cover: item.track.albumArt,
306 artist: item.track.artist,
307 title: item.track.title,
308 date: item.scrobble.timestamp,
309 user: item.user.handle,
310 uri: item.scrobble.uri,
311 albumUri: item.track.albumUri,
312 artistUri: item.track.artistUri,
313 tags: [],
314 listeners: 1,
315 sha256: item.track.sha256,
316 id: item.scrobble.id,
317 })),
318 );
319});
320
321app.get("/public/scrobbleschart", async (c) => {
322 requestCounter.add(1, { method: "GET", route: "/public/scrobbleschart" });
323
324 const did = c.req.query("did");
325 const artisturi = c.req.query("artisturi");
326 const albumuri = c.req.query("albumuri");
327 const songuri = c.req.query("songuri");
328
329 if (did) {
330 const chart = await ctx.analytics.post("library.getScrobblesPerDay", {
331 user_did: did,
332 });
333 return c.json(chart.data);
334 }
335
336 if (artisturi) {
337 const chart = await ctx.analytics.post("library.getArtistScrobbles", {
338 artist_id: artisturi,
339 });
340 return c.json(chart.data);
341 }
342
343 if (albumuri) {
344 const chart = await ctx.analytics.post("library.getAlbumScrobbles", {
345 album_id: albumuri,
346 });
347 return c.json(chart.data);
348 }
349
350 if (songuri) {
351 let uri = songuri;
352 if (songuri.includes("app.rocksky.scrobble")) {
353 const scrobble = await ctx.db
354 .select({
355 scrobble: scrobbles,
356 track: tracks,
357 })
358 .from(scrobbles)
359 .innerJoin(tracks, eq(scrobbles.trackId, tracks.id))
360 .where(eq(scrobbles.uri, songuri))
361 .limit(1)
362 .then((rows) => rows[0]);
363
364 uri = scrobble.track.uri;
365 }
366 const chart = await ctx.analytics.post("library.getTrackScrobbles", {
367 track_id: uri,
368 });
369 return c.json(chart.data);
370 }
371
372 const chart = await ctx.analytics.post("library.getScrobblesPerDay", {});
373 return c.json(chart.data);
374});
375
376app.get("/scrobbles", async (c) => {
377 requestCounter.add(1, { method: "GET", route: "/scrobbles" });
378
379 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim();
380
381 if (!bearer || bearer === "null") {
382 c.status(401);
383 return c.text("Unauthorized");
384 }
385
386 const { did } = jwt.verify(bearer, env.JWT_SECRET, {
387 ignoreExpiration: true,
388 });
389
390 const user = await ctx.db
391 .select()
392 .from(users)
393 .where(eq(users.did, did))
394 .limit(1)
395 .then((rows) => rows[0]);
396
397 if (!user) {
398 c.status(401);
399 return c.text("Unauthorized");
400 }
401
402 const size = +c.req.query("size") || 10;
403 const offset = +c.req.query("offset") || 0;
404
405 const userScrobbles = await ctx.db
406 .select({
407 scrobble: scrobbles,
408 track: tracks,
409 })
410 .from(scrobbles)
411 .innerJoin(tracks, eq(scrobbles.trackId, tracks.id))
412 .where(and(eq(scrobbles.userId, user.id), isNotNull(scrobbles.uri)))
413 .orderBy(desc(scrobbles.createdAt))
414 .limit(size)
415 .offset(offset);
416
417 return c.json(userScrobbles);
418});
419
420app.post("/tracks", async (c) => {
421 requestCounter.add(1, { method: "POST", route: "/tracks" });
422
423 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim();
424
425 if (!bearer || bearer === "null") {
426 c.status(401);
427 return c.text("Unauthorized");
428 }
429
430 const { did } = jwt.verify(bearer, env.JWT_SECRET, {
431 ignoreExpiration: true,
432 });
433
434 const user = await ctx.db
435 .select()
436 .from(users)
437 .where(eq(users.did, did))
438 .limit(1)
439 .then((rows) => rows[0]);
440
441 if (!user) {
442 c.status(401);
443 return c.text("Unauthorized");
444 }
445
446 const body = await c.req.json();
447 const parsed = trackSchema.safeParse(body);
448
449 if (parsed.error) {
450 c.status(400);
451 return c.text("Invalid track data: " + parsed.error.message);
452 }
453
454 const track = parsed.data;
455
456 const agent = await createAgent(ctx.oauthClient, did);
457 if (!agent) {
458 c.status(401);
459 return c.text("Unauthorized");
460 }
461
462 try {
463 await saveTrack(ctx, track, agent);
464 } catch (e) {
465 if (!e.message.includes("duplicate key value violates unique constraint")) {
466 console.error("[tracks]", e.message);
467 }
468 }
469
470 return c.json({ status: "ok" });
471});
472
473app.get("/tracks", async (c) => {
474 requestCounter.add(1, { method: "GET", route: "/tracks" });
475
476 const size = +c.req.query("size") || 100;
477 const offset = +c.req.query("offset") || 0;
478
479 const tracksData = await ctx.analytics.post("library.getTracks", {
480 pagination: {
481 skip: offset,
482 take: size,
483 },
484 });
485
486 return c.json(tracksData.data);
487});
488
489app.get("/albums", async (c) => {
490 requestCounter.add(1, { method: "GET", route: "/albums" });
491
492 const size = +c.req.query("size") || 100;
493 const offset = +c.req.query("offset") || 0;
494
495 const albumsData = await ctx.analytics.post("library.getAlbums", {
496 pagination: {
497 skip: offset,
498 take: size,
499 },
500 });
501
502 return c.json(albumsData.data);
503});
504
505app.get("/artists", async (c) => {
506 requestCounter.add(1, { method: "GET", route: "/artists" });
507
508 const size = +c.req.query("size") || 100;
509 const offset = +c.req.query("offset") || 0;
510
511 const artistsData = await ctx.analytics.post("library.getArtists", {
512 pagination: {
513 skip: offset,
514 take: size,
515 },
516 });
517
518 return c.json(artistsData.data);
519});
520
521app.get("/tracks/:sha256", async (c) => {
522 requestCounter.add(1, { method: "GET", route: "/tracks/:sha256" });
523
524 const sha256 = c.req.param("sha256");
525 const track = await ctx.db
526 .select()
527 .from(tracks)
528 .where(eq(tracks.sha256, sha256))
529 .limit(1)
530 .then((rows) => rows[0]);
531
532 return c.json(track);
533});
534
535app.get("/albums/:sha256", async (c) => {
536 requestCounter.add(1, { method: "GET", route: "/albums/:sha256" });
537
538 const sha256 = c.req.param("sha256");
539 const album = await ctx.db
540 .select()
541 .from(albums)
542 .where(eq(albums.sha256, sha256))
543 .limit(1)
544 .then((rows) => rows[0]);
545
546 return c.json(album);
547});
548
549app.get("/artists/:sha256", async (c) => {
550 requestCounter.add(1, { method: "GET", route: "/artists/:sha256" });
551
552 const sha256 = c.req.param("sha256");
553 const artist = await ctx.db
554 .select()
555 .from(artists)
556 .where(eq(artists.sha256, sha256))
557 .limit(1)
558 .then((rows) => rows[0]);
559
560 return c.json(artist);
561});
562
563app.get("/artists/:sha256/tracks", async (c) => {
564 requestCounter.add(1, { method: "GET", route: "/artists/:sha256/tracks" });
565 const sha256 = c.req.param("sha256");
566
567 const artistTracksData = await ctx.db
568 .select({
569 track: tracks,
570 })
571 .from(artistTracks)
572 .innerJoin(tracks, eq(artistTracks.trackId, tracks.id))
573 .innerJoin(artists, eq(artistTracks.artistId, artists.id))
574 .where(eq(artists.sha256, sha256));
575
576 return c.json(artistTracksData.map((item) => item.track));
577});
578
579app.get("/albums/:sha256/tracks", async (c) => {
580 requestCounter.add(1, { method: "GET", route: "/albums/:sha256/tracks" });
581 const sha256 = c.req.param("sha256");
582
583 const albumTracksData = await ctx.db
584 .select({
585 track: tracks,
586 })
587 .from(albumTracks)
588 .innerJoin(tracks, eq(albumTracks.trackId, tracks.id))
589 .innerJoin(albums, eq(albumTracks.albumId, albums.id))
590 .where(eq(albums.sha256, sha256));
591
592 return c.json(albumTracksData.map((item) => item.track));
593});
594
595app.route("/users", usersApp);
596
597app.route("/webscrobbler", webscrobbler);
598
599const server = serve({
600 fetch: app.fetch,
601 port: 8000,
602});
603
604injectWebSocket(server);