forked from
rocksky.app/rocksky
A decentralized music tracking and discovery platform built on AT Protocol 馃幍
1import { serve } from "@hono/node-server";
2import { createNodeWebSocket } from "@hono/node-ws";
3import { trace } from "@opentelemetry/api";
4import { ctx } from "context";
5import { and, desc, eq, isNotNull, or } from "drizzle-orm";
6import { Hono } from "hono";
7import { cors } from "hono/cors";
8import jwt from "jsonwebtoken";
9import { createAgent } from "lib/agent";
10import {
11 getLovedTracks,
12 likeTrack,
13 unLikeTrack,
14} from "lovedtracks/lovedtracks.service";
15import dns from "node:dns";
16import { scrobbleTrack } from "nowplaying/nowplaying.service";
17import { rateLimiter } from "ratelimiter";
18import subscribe from "subscribers";
19import { saveTrack } from "tracks/tracks.service";
20import { trackSchema } from "types/track";
21import handleWebsocket from "websocket/handler";
22import apikeys from "./apikeys/app";
23import bsky from "./bsky/app";
24import dropbox from "./dropbox/app";
25import googledrive from "./googledrive/app";
26import { env } from "./lib/env";
27import { requestCounter, requestDuration } from "./metrics";
28import "./profiling";
29import albumTracks from "./schema/album-tracks";
30import albums from "./schema/albums";
31import artistTracks from "./schema/artist-tracks";
32import artists from "./schema/artists";
33import scrobbles from "./schema/scrobbles";
34import tracks from "./schema/tracks";
35import users from "./schema/users";
36import spotify from "./spotify/app";
37import "./tracing";
38import usersApp from "./users/app";
39import webscrobbler from "./webscrobbler/app";
40
41dns.setDefaultResultOrder("ipv4first");
42
43subscribe(ctx);
44
45const app = new Hono();
46const { injectWebSocket, upgradeWebSocket } = createNodeWebSocket({ app });
47
48app.use(
49 "*",
50 rateLimiter({
51 limit: 1000,
52 window: 30, // 馃憟 30 seconds
53 }),
54);
55
56app.use("*", async (c, next) => {
57 const span = trace.getActiveSpan();
58 span?.setAttribute("http.route", c.req.path);
59 await next();
60});
61
62app.use("*", async (c, next) => {
63 const start = Date.now();
64 await next();
65 const duration = (Date.now() - start) / 1000;
66 requestDuration.record(duration, {
67 route: c.req.path,
68 method: c.req.method,
69 });
70});
71
72app.use(cors());
73
74app.route("/", bsky);
75
76app.route("/spotify", spotify);
77
78app.route("/dropbox", dropbox);
79
80app.route("/googledrive", googledrive);
81
82app.route("/apikeys", apikeys);
83
84app.get("/ws", upgradeWebSocket(handleWebsocket));
85
86app.get("/", async (c) => {
87 return c.json({ status: "ok" });
88});
89
90app.post("/now-playing", async (c) => {
91 requestCounter.add(1, { method: "POST", route: "/now-playing" });
92 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim();
93
94 if (!bearer || bearer === "null") {
95 c.status(401);
96 return c.text("Unauthorized");
97 }
98
99 const { did } = jwt.verify(bearer, env.JWT_SECRET, {
100 ignoreExpiration: true,
101 });
102
103 const user = await ctx.db
104 .select()
105 .from(users)
106 .where(eq(users.did, did))
107 .limit(1)
108 .then((rows) => rows[0]);
109
110 if (!user) {
111 c.status(401);
112 return c.text("Unauthorized");
113 }
114
115 const body = await c.req.json();
116 const parsed = trackSchema.safeParse(body);
117
118 if (parsed.error) {
119 c.status(400);
120 return c.text("Invalid track data: " + parsed.error.message);
121 }
122 const track = parsed.data;
123
124 const agent = await createAgent(ctx.oauthClient, did);
125 if (!agent) {
126 c.status(401);
127 return c.text("Unauthorized");
128 }
129
130 await scrobbleTrack(ctx, track, agent, user.did);
131
132 return c.json({ status: "ok" });
133});
134
135app.get("/now-playing", async (c) => {
136 requestCounter.add(1, { method: "GET", route: "/now-playing" });
137
138 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim();
139
140 const payload =
141 bearer && bearer !== "null"
142 ? jwt.verify(bearer, env.JWT_SECRET, { ignoreExpiration: true })
143 : {};
144 const did = c.req.query("did") || payload.did;
145
146 if (!did) {
147 c.status(401);
148 return c.text("Unauthorized");
149 }
150
151 const user = await ctx.db
152 .select()
153 .from(users)
154 .where(or(eq(users.did, did), eq(users.handle, did)))
155 .limit(1)
156 .then((rows) => rows[0]);
157
158 if (!user) {
159 c.status(401);
160 return c.text("Unauthorized");
161 }
162
163 const [nowPlaying, status] = await Promise.all([
164 ctx.redis.get(`nowplaying:${user.did}`),
165 ctx.redis.get(`nowplaying:${user.did}:status`),
166 ]);
167 return c.json(
168 nowPlaying ? { ...JSON.parse(nowPlaying), is_playing: status === "1" } : {},
169 );
170});
171
172app.get("/now-playings", async (c) => {
173 requestCounter.add(1, { method: "GET", route: "/now-playings" });
174 const size = +c.req.query("size") || 10;
175 const offset = +c.req.query("offset") || 0;
176 const { data } = await ctx.analytics.post("library.getDistinctScrobbles", {
177 pagination: {
178 skip: offset,
179 take: size,
180 },
181 });
182 return c.json(data);
183});
184
185app.post("/likes", async (c) => {
186 requestCounter.add(1, { method: "POST", route: "/likes" });
187 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim();
188
189 if (!bearer || bearer === "null") {
190 c.status(401);
191 return c.text("Unauthorized");
192 }
193
194 const { did } = jwt.verify(bearer, env.JWT_SECRET, {
195 ignoreExpiration: true,
196 });
197 const agent = await createAgent(ctx.oauthClient, did);
198
199 const user = await ctx.db
200 .select()
201 .from(users)
202 .where(eq(users.did, did))
203 .limit(1)
204 .then((rows) => rows[0]);
205
206 if (!user) {
207 c.status(401);
208 return c.text("Unauthorized");
209 }
210
211 const body = await c.req.json();
212 const parsed = trackSchema.safeParse(body);
213
214 if (parsed.error) {
215 c.status(400);
216 return c.text("Invalid track data: " + parsed.error.message);
217 }
218 const track = parsed.data;
219 await likeTrack(ctx, track, user, agent);
220
221 return c.json({ status: "ok" });
222});
223
224app.delete("/likes/:sha256", async (c) => {
225 requestCounter.add(1, { method: "DELETE", route: "/likes/:sha256" });
226 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim();
227
228 if (!bearer || bearer === "null") {
229 c.status(401);
230 return c.text("Unauthorized");
231 }
232
233 const { did } = jwt.verify(bearer, env.JWT_SECRET, {
234 ignoreExpiration: true,
235 });
236 const agent = await createAgent(ctx.oauthClient, did);
237
238 const user = await ctx.db
239 .select()
240 .from(users)
241 .where(eq(users.did, did))
242 .limit(1)
243 .then((rows) => rows[0]);
244
245 if (!user) {
246 c.status(401);
247 return c.text("Unauthorized");
248 }
249
250 const sha256 = c.req.param("sha256");
251 await unLikeTrack(ctx, sha256, user, agent);
252 return c.json({ status: "ok" });
253});
254
255app.get("/likes", async (c) => {
256 requestCounter.add(1, { method: "GET", route: "/likes" });
257 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim();
258
259 if (!bearer || bearer === "null") {
260 c.status(401);
261 return c.text("Unauthorized");
262 }
263
264 const { did } = jwt.verify(bearer, env.JWT_SECRET, {
265 ignoreExpiration: true,
266 });
267
268 const user = await ctx.db
269 .select()
270 .from(users)
271 .where(eq(users.did, did))
272 .limit(1)
273 .then((rows) => rows[0]);
274
275 if (!user) {
276 c.status(401);
277 return c.text("Unauthorized");
278 }
279
280 const size = +c.req.query("size") || 10;
281 const offset = +c.req.query("offset") || 0;
282
283 const lovedTracks = await getLovedTracks(ctx, user, size, offset);
284 return c.json(lovedTracks);
285});
286
287app.get("/public/scrobbles", async (c) => {
288 requestCounter.add(1, { method: "GET", route: "/public/scrobbles" });
289
290 const size = +c.req.query("size") || 10;
291 const offset = +c.req.query("offset") || 0;
292
293 const scrobbleRecords = await ctx.db
294 .select({
295 scrobble: scrobbles,
296 track: tracks,
297 user: users,
298 })
299 .from(scrobbles)
300 .innerJoin(tracks, eq(scrobbles.trackId, tracks.id))
301 .innerJoin(users, eq(scrobbles.userId, users.id))
302 .orderBy(desc(scrobbles.timestamp))
303 .limit(size)
304 .offset(offset);
305
306 return c.json(
307 scrobbleRecords.map((item) => ({
308 cover: item.track.albumArt,
309 artist: item.track.artist,
310 title: item.track.title,
311 date: item.scrobble.timestamp,
312 user: item.user.handle,
313 uri: item.scrobble.uri,
314 albumUri: item.track.albumUri,
315 artistUri: item.track.artistUri,
316 tags: [],
317 listeners: 1,
318 sha256: item.track.sha256,
319 id: item.scrobble.id,
320 })),
321 );
322});
323
324app.get("/public/scrobbleschart", async (c) => {
325 requestCounter.add(1, { method: "GET", route: "/public/scrobbleschart" });
326
327 const did = c.req.query("did");
328 const artisturi = c.req.query("artisturi");
329 const albumuri = c.req.query("albumuri");
330 const songuri = c.req.query("songuri");
331
332 if (did) {
333 const chart = await ctx.analytics.post("library.getScrobblesPerDay", {
334 user_did: did,
335 });
336 return c.json(chart.data);
337 }
338
339 if (artisturi) {
340 const chart = await ctx.analytics.post("library.getArtistScrobbles", {
341 artist_id: artisturi,
342 });
343 return c.json(chart.data);
344 }
345
346 if (albumuri) {
347 const chart = await ctx.analytics.post("library.getAlbumScrobbles", {
348 album_id: albumuri,
349 });
350 return c.json(chart.data);
351 }
352
353 if (songuri) {
354 let uri = songuri;
355 if (songuri.includes("app.rocksky.scrobble")) {
356 const scrobble = await ctx.db
357 .select({
358 scrobble: scrobbles,
359 track: tracks,
360 })
361 .from(scrobbles)
362 .innerJoin(tracks, eq(scrobbles.trackId, tracks.id))
363 .where(eq(scrobbles.uri, songuri))
364 .limit(1)
365 .then((rows) => rows[0]);
366
367 uri = scrobble.track.uri;
368 }
369 const chart = await ctx.analytics.post("library.getTrackScrobbles", {
370 track_id: uri,
371 });
372 return c.json(chart.data);
373 }
374
375 const chart = await ctx.analytics.post("library.getScrobblesPerDay", {});
376 return c.json(chart.data);
377});
378
379app.get("/scrobbles", async (c) => {
380 requestCounter.add(1, { method: "GET", route: "/scrobbles" });
381
382 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim();
383
384 if (!bearer || bearer === "null") {
385 c.status(401);
386 return c.text("Unauthorized");
387 }
388
389 const { did } = jwt.verify(bearer, env.JWT_SECRET, {
390 ignoreExpiration: true,
391 });
392
393 const user = await ctx.db
394 .select()
395 .from(users)
396 .where(eq(users.did, did))
397 .limit(1)
398 .then((rows) => rows[0]);
399
400 if (!user) {
401 c.status(401);
402 return c.text("Unauthorized");
403 }
404
405 const size = +c.req.query("size") || 10;
406 const offset = +c.req.query("offset") || 0;
407
408 const userScrobbles = await ctx.db
409 .select({
410 scrobble: scrobbles,
411 track: tracks,
412 })
413 .from(scrobbles)
414 .innerJoin(tracks, eq(scrobbles.trackId, tracks.id))
415 .where(and(eq(scrobbles.userId, user.id), isNotNull(scrobbles.uri)))
416 .orderBy(desc(scrobbles.createdAt))
417 .limit(size)
418 .offset(offset);
419
420 return c.json(userScrobbles);
421});
422
423app.post("/tracks", async (c) => {
424 requestCounter.add(1, { method: "POST", route: "/tracks" });
425
426 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim();
427
428 if (!bearer || bearer === "null") {
429 c.status(401);
430 return c.text("Unauthorized");
431 }
432
433 const { did } = jwt.verify(bearer, env.JWT_SECRET, {
434 ignoreExpiration: true,
435 });
436
437 const user = await ctx.db
438 .select()
439 .from(users)
440 .where(eq(users.did, did))
441 .limit(1)
442 .then((rows) => rows[0]);
443
444 if (!user) {
445 c.status(401);
446 return c.text("Unauthorized");
447 }
448
449 const body = await c.req.json();
450 const parsed = trackSchema.safeParse(body);
451
452 if (parsed.error) {
453 c.status(400);
454 return c.text("Invalid track data: " + parsed.error.message);
455 }
456
457 const track = parsed.data;
458
459 const agent = await createAgent(ctx.oauthClient, did);
460 if (!agent) {
461 c.status(401);
462 return c.text("Unauthorized");
463 }
464
465 try {
466 await saveTrack(ctx, track, agent);
467 } catch (e) {
468 if (!e.message.includes("duplicate key value violates unique constraint")) {
469 console.error("[tracks]", e.message);
470 }
471 }
472
473 return c.json({ status: "ok" });
474});
475
476app.get("/tracks", async (c) => {
477 requestCounter.add(1, { method: "GET", route: "/tracks" });
478
479 const size = +c.req.query("size") || 100;
480 const offset = +c.req.query("offset") || 0;
481
482 const tracksData = await ctx.analytics.post("library.getTracks", {
483 pagination: {
484 skip: offset,
485 take: size,
486 },
487 });
488
489 return c.json(tracksData.data);
490});
491
492app.get("/albums", async (c) => {
493 requestCounter.add(1, { method: "GET", route: "/albums" });
494
495 const size = +c.req.query("size") || 100;
496 const offset = +c.req.query("offset") || 0;
497
498 const albumsData = await ctx.analytics.post("library.getAlbums", {
499 pagination: {
500 skip: offset,
501 take: size,
502 },
503 });
504
505 return c.json(albumsData.data);
506});
507
508app.get("/artists", async (c) => {
509 requestCounter.add(1, { method: "GET", route: "/artists" });
510
511 const size = +c.req.query("size") || 100;
512 const offset = +c.req.query("offset") || 0;
513
514 const artistsData = await ctx.analytics.post("library.getArtists", {
515 pagination: {
516 skip: offset,
517 take: size,
518 },
519 });
520
521 return c.json(artistsData.data);
522});
523
524app.get("/tracks/:sha256", async (c) => {
525 requestCounter.add(1, { method: "GET", route: "/tracks/:sha256" });
526
527 const sha256 = c.req.param("sha256");
528 const track = await ctx.db
529 .select()
530 .from(tracks)
531 .where(eq(tracks.sha256, sha256))
532 .limit(1)
533 .then((rows) => rows[0]);
534
535 return c.json(track);
536});
537
538app.get("/albums/:sha256", async (c) => {
539 requestCounter.add(1, { method: "GET", route: "/albums/:sha256" });
540
541 const sha256 = c.req.param("sha256");
542 const album = await ctx.db
543 .select()
544 .from(albums)
545 .where(eq(albums.sha256, sha256))
546 .limit(1)
547 .then((rows) => rows[0]);
548
549 return c.json(album);
550});
551
552app.get("/artists/:sha256", async (c) => {
553 requestCounter.add(1, { method: "GET", route: "/artists/:sha256" });
554
555 const sha256 = c.req.param("sha256");
556 const artist = await ctx.db
557 .select()
558 .from(artists)
559 .where(eq(artists.sha256, sha256))
560 .limit(1)
561 .then((rows) => rows[0]);
562
563 return c.json(artist);
564});
565
566app.get("/artists/:sha256/tracks", async (c) => {
567 requestCounter.add(1, { method: "GET", route: "/artists/:sha256/tracks" });
568 const sha256 = c.req.param("sha256");
569
570 const artistTracksData = await ctx.db
571 .select({
572 track: tracks,
573 })
574 .from(artistTracks)
575 .innerJoin(tracks, eq(artistTracks.trackId, tracks.id))
576 .innerJoin(artists, eq(artistTracks.artistId, artists.id))
577 .where(eq(artists.sha256, sha256));
578
579 return c.json(artistTracksData.map((item) => item.track));
580});
581
582app.get("/albums/:sha256/tracks", async (c) => {
583 requestCounter.add(1, { method: "GET", route: "/albums/:sha256/tracks" });
584 const sha256 = c.req.param("sha256");
585
586 const albumTracksData = await ctx.db
587 .select({
588 track: tracks,
589 })
590 .from(albumTracks)
591 .innerJoin(tracks, eq(albumTracks.trackId, tracks.id))
592 .innerJoin(albums, eq(albumTracks.albumId, albums.id))
593 .where(eq(albums.sha256, sha256));
594
595 return c.json(albumTracksData.map((item) => item.track));
596});
597
598app.route("/users", usersApp);
599
600app.route("/webscrobbler", webscrobbler);
601
602const server = serve({
603 fetch: app.fetch,
604 port: 8000,
605});
606
607injectWebSocket(server);