A decentralized music tracking and discovery platform built on AT Protocol 馃幍 rocksky.app
spotify atproto lastfm musicbrainz scrobbling listenbrainz
at main 611 lines 15 kB view raw
1import { serve } from "@hono/node-server"; 2import { createNodeWebSocket } from "@hono/node-ws"; 3import { trace } from "@opentelemetry/api"; 4import { consola } from "consola"; 5import { ctx } from "context"; 6import { and, desc, eq, isNotNull, or } from "drizzle-orm"; 7import { Hono } from "hono"; 8import { cors } from "hono/cors"; 9import jwt from "jsonwebtoken"; 10import { createAgent } from "lib/agent"; 11import { 12 getLovedTracks, 13 likeTrack, 14 unLikeTrack, 15} from "lovedtracks/lovedtracks.service"; 16import dns from "node:dns"; 17import { scrobbleTrack } from "nowplaying/nowplaying.service"; 18import { rateLimiter } from "ratelimiter"; 19import subscribe from "subscribers"; 20import { saveTrack } from "tracks/tracks.service"; 21import { trackSchema } from "types/track"; 22import handleWebsocket from "websocket/handler"; 23import apikeys from "./apikeys/app"; 24import bsky from "./bsky/app"; 25import dropbox from "./dropbox/app"; 26import googledrive from "./googledrive/app"; 27import { env } from "./lib/env"; 28import { requestCounter, requestDuration } from "./metrics"; 29import "./profiling"; 30import albumTracks from "./schema/album-tracks"; 31import albums from "./schema/albums"; 32import artistTracks from "./schema/artist-tracks"; 33import artists from "./schema/artists"; 34import scrobbles from "./schema/scrobbles"; 35import tracks from "./schema/tracks"; 36import users from "./schema/users"; 37import spotify from "./spotify/app"; 38import "./tracing"; 39import usersApp from "./users/app"; 40import webscrobbler from "./webscrobbler/app"; 41import opengraph from "./opengraph/app"; 42 43dns.setDefaultResultOrder("ipv4first"); 44 45subscribe(ctx); 46 47const app = new Hono(); 48const { injectWebSocket, upgradeWebSocket } = createNodeWebSocket({ app }); 49 50app.use( 51 "*", 52 rateLimiter({ 53 limit: 1000, 54 window: 30, // 馃憟 30 seconds 55 }), 56); 57 58app.use("*", async (c, next) => { 59 const span = trace.getActiveSpan(); 60 span?.setAttribute("http.route", c.req.path); 61 await next(); 62}); 63 64app.use("*", async (c, next) => { 65 const start = Date.now(); 66 await next(); 67 const duration = (Date.now() - start) / 1000; 68 requestDuration.record(duration, { 69 route: c.req.path, 70 method: c.req.method, 71 }); 72}); 73 74app.use(cors()); 75 76app.route("/", bsky); 77 78app.route("/spotify", spotify); 79 80app.route("/dropbox", dropbox); 81 82app.route("/googledrive", googledrive); 83 84app.route("/apikeys", apikeys); 85 86app.route("/public/og", opengraph); 87 88app.get("/ws", upgradeWebSocket(handleWebsocket)); 89 90app.get("/", async (c) => { 91 return c.json({ status: "ok" }); 92}); 93 94app.post("/now-playing", async (c) => { 95 requestCounter.add(1, { method: "POST", route: "/now-playing" }); 96 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim(); 97 98 if (!bearer || bearer === "null") { 99 c.status(401); 100 return c.text("Unauthorized"); 101 } 102 103 const { did } = jwt.verify(bearer, env.JWT_SECRET, { 104 ignoreExpiration: true, 105 }); 106 107 const user = await ctx.db 108 .select() 109 .from(users) 110 .where(eq(users.did, did)) 111 .limit(1) 112 .then((rows) => rows[0]); 113 114 if (!user) { 115 c.status(401); 116 return c.text("Unauthorized"); 117 } 118 119 const body = await c.req.json(); 120 const parsed = trackSchema.safeParse(body); 121 122 if (parsed.error) { 123 c.status(400); 124 return c.text("Invalid track data: " + parsed.error.message); 125 } 126 const track = parsed.data; 127 128 const agent = await createAgent(ctx.oauthClient, did); 129 if (!agent) { 130 c.status(401); 131 return c.text("Unauthorized"); 132 } 133 134 await scrobbleTrack(ctx, track, agent, user.did); 135 136 return c.json({ status: "ok" }); 137}); 138 139app.get("/now-playing", async (c) => { 140 requestCounter.add(1, { method: "GET", route: "/now-playing" }); 141 142 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim(); 143 144 const payload = 145 bearer && bearer !== "null" 146 ? jwt.verify(bearer, env.JWT_SECRET, { ignoreExpiration: true }) 147 : {}; 148 const did = c.req.query("did") || payload.did; 149 150 if (!did) { 151 c.status(401); 152 return c.text("Unauthorized"); 153 } 154 155 const user = await ctx.db 156 .select() 157 .from(users) 158 .where(or(eq(users.did, did), eq(users.handle, did))) 159 .limit(1) 160 .then((rows) => rows[0]); 161 162 if (!user) { 163 c.status(401); 164 return c.text("Unauthorized"); 165 } 166 167 const [nowPlaying, status] = await Promise.all([ 168 ctx.redis.get(`nowplaying:${user.did}`), 169 ctx.redis.get(`nowplaying:${user.did}:status`), 170 ]); 171 return c.json( 172 nowPlaying ? { ...JSON.parse(nowPlaying), is_playing: status === "1" } : {}, 173 ); 174}); 175 176app.get("/now-playings", async (c) => { 177 requestCounter.add(1, { method: "GET", route: "/now-playings" }); 178 const size = +c.req.query("size") || 10; 179 const offset = +c.req.query("offset") || 0; 180 const { data } = await ctx.analytics.post("library.getDistinctScrobbles", { 181 pagination: { 182 skip: offset, 183 take: size, 184 }, 185 }); 186 return c.json(data); 187}); 188 189app.post("/likes", async (c) => { 190 requestCounter.add(1, { method: "POST", route: "/likes" }); 191 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim(); 192 193 if (!bearer || bearer === "null") { 194 c.status(401); 195 return c.text("Unauthorized"); 196 } 197 198 const { did } = jwt.verify(bearer, env.JWT_SECRET, { 199 ignoreExpiration: true, 200 }); 201 const agent = await createAgent(ctx.oauthClient, did); 202 203 const user = await ctx.db 204 .select() 205 .from(users) 206 .where(eq(users.did, did)) 207 .limit(1) 208 .then((rows) => rows[0]); 209 210 if (!user) { 211 c.status(401); 212 return c.text("Unauthorized"); 213 } 214 215 const body = await c.req.json(); 216 const parsed = trackSchema.safeParse(body); 217 218 if (parsed.error) { 219 c.status(400); 220 return c.text("Invalid track data: " + parsed.error.message); 221 } 222 const track = parsed.data; 223 await likeTrack(ctx, track, user, agent); 224 225 return c.json({ status: "ok" }); 226}); 227 228app.delete("/likes/:sha256", async (c) => { 229 requestCounter.add(1, { method: "DELETE", route: "/likes/:sha256" }); 230 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim(); 231 232 if (!bearer || bearer === "null") { 233 c.status(401); 234 return c.text("Unauthorized"); 235 } 236 237 const { did } = jwt.verify(bearer, env.JWT_SECRET, { 238 ignoreExpiration: true, 239 }); 240 const agent = await createAgent(ctx.oauthClient, did); 241 242 const user = await ctx.db 243 .select() 244 .from(users) 245 .where(eq(users.did, did)) 246 .limit(1) 247 .then((rows) => rows[0]); 248 249 if (!user) { 250 c.status(401); 251 return c.text("Unauthorized"); 252 } 253 254 const sha256 = c.req.param("sha256"); 255 await unLikeTrack(ctx, sha256, user, agent); 256 return c.json({ status: "ok" }); 257}); 258 259app.get("/likes", async (c) => { 260 requestCounter.add(1, { method: "GET", route: "/likes" }); 261 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim(); 262 263 if (!bearer || bearer === "null") { 264 c.status(401); 265 return c.text("Unauthorized"); 266 } 267 268 const { did } = jwt.verify(bearer, env.JWT_SECRET, { 269 ignoreExpiration: true, 270 }); 271 272 const user = await ctx.db 273 .select() 274 .from(users) 275 .where(eq(users.did, did)) 276 .limit(1) 277 .then((rows) => rows[0]); 278 279 if (!user) { 280 c.status(401); 281 return c.text("Unauthorized"); 282 } 283 284 const size = +c.req.query("size") || 10; 285 const offset = +c.req.query("offset") || 0; 286 287 const lovedTracks = await getLovedTracks(ctx, user, size, offset); 288 return c.json(lovedTracks); 289}); 290 291app.get("/public/scrobbles", async (c) => { 292 requestCounter.add(1, { method: "GET", route: "/public/scrobbles" }); 293 294 const size = +c.req.query("size") || 10; 295 const offset = +c.req.query("offset") || 0; 296 297 const scrobbleRecords = await ctx.db 298 .select({ 299 scrobble: scrobbles, 300 track: tracks, 301 user: users, 302 }) 303 .from(scrobbles) 304 .innerJoin(tracks, eq(scrobbles.trackId, tracks.id)) 305 .innerJoin(users, eq(scrobbles.userId, users.id)) 306 .orderBy(desc(scrobbles.timestamp)) 307 .limit(size) 308 .offset(offset); 309 310 return c.json( 311 scrobbleRecords.map((item) => ({ 312 cover: item.track.albumArt, 313 artist: item.track.artist, 314 title: item.track.title, 315 date: item.scrobble.timestamp, 316 user: item.user.handle, 317 uri: item.scrobble.uri, 318 albumUri: item.track.albumUri, 319 artistUri: item.track.artistUri, 320 tags: [], 321 listeners: 1, 322 sha256: item.track.sha256, 323 id: item.scrobble.id, 324 })), 325 ); 326}); 327 328app.get("/public/scrobbleschart", async (c) => { 329 requestCounter.add(1, { method: "GET", route: "/public/scrobbleschart" }); 330 331 const did = c.req.query("did"); 332 const artisturi = c.req.query("artisturi"); 333 const albumuri = c.req.query("albumuri"); 334 const songuri = c.req.query("songuri"); 335 336 if (did) { 337 const chart = await ctx.analytics.post("library.getScrobblesPerDay", { 338 user_did: did, 339 }); 340 return c.json(chart.data); 341 } 342 343 if (artisturi) { 344 const chart = await ctx.analytics.post("library.getArtistScrobbles", { 345 artist_id: artisturi, 346 }); 347 return c.json(chart.data); 348 } 349 350 if (albumuri) { 351 const chart = await ctx.analytics.post("library.getAlbumScrobbles", { 352 album_id: albumuri, 353 }); 354 return c.json(chart.data); 355 } 356 357 if (songuri) { 358 let uri = songuri; 359 if (songuri.includes("app.rocksky.scrobble")) { 360 const scrobble = await ctx.db 361 .select({ 362 scrobble: scrobbles, 363 track: tracks, 364 }) 365 .from(scrobbles) 366 .innerJoin(tracks, eq(scrobbles.trackId, tracks.id)) 367 .where(eq(scrobbles.uri, songuri)) 368 .limit(1) 369 .then((rows) => rows[0]); 370 371 uri = scrobble.track.uri; 372 } 373 const chart = await ctx.analytics.post("library.getTrackScrobbles", { 374 track_id: uri, 375 }); 376 return c.json(chart.data); 377 } 378 379 const chart = await ctx.analytics.post("library.getScrobblesPerDay", {}); 380 return c.json(chart.data); 381}); 382 383app.get("/scrobbles", async (c) => { 384 requestCounter.add(1, { method: "GET", route: "/scrobbles" }); 385 386 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim(); 387 388 if (!bearer || bearer === "null") { 389 c.status(401); 390 return c.text("Unauthorized"); 391 } 392 393 const { did } = jwt.verify(bearer, env.JWT_SECRET, { 394 ignoreExpiration: true, 395 }); 396 397 const user = await ctx.db 398 .select() 399 .from(users) 400 .where(eq(users.did, did)) 401 .limit(1) 402 .then((rows) => rows[0]); 403 404 if (!user) { 405 c.status(401); 406 return c.text("Unauthorized"); 407 } 408 409 const size = +c.req.query("size") || 10; 410 const offset = +c.req.query("offset") || 0; 411 412 const userScrobbles = await ctx.db 413 .select({ 414 scrobble: scrobbles, 415 track: tracks, 416 }) 417 .from(scrobbles) 418 .innerJoin(tracks, eq(scrobbles.trackId, tracks.id)) 419 .where(and(eq(scrobbles.userId, user.id), isNotNull(scrobbles.uri))) 420 .orderBy(desc(scrobbles.createdAt)) 421 .limit(size) 422 .offset(offset); 423 424 return c.json(userScrobbles); 425}); 426 427app.post("/tracks", async (c) => { 428 requestCounter.add(1, { method: "POST", route: "/tracks" }); 429 430 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim(); 431 432 if (!bearer || bearer === "null") { 433 c.status(401); 434 return c.text("Unauthorized"); 435 } 436 437 const { did } = jwt.verify(bearer, env.JWT_SECRET, { 438 ignoreExpiration: true, 439 }); 440 441 const user = await ctx.db 442 .select() 443 .from(users) 444 .where(eq(users.did, did)) 445 .limit(1) 446 .then((rows) => rows[0]); 447 448 if (!user) { 449 c.status(401); 450 return c.text("Unauthorized"); 451 } 452 453 const body = await c.req.json(); 454 const parsed = trackSchema.safeParse(body); 455 456 if (parsed.error) { 457 c.status(400); 458 return c.text("Invalid track data: " + parsed.error.message); 459 } 460 461 const track = parsed.data; 462 463 const agent = await createAgent(ctx.oauthClient, did); 464 if (!agent) { 465 c.status(401); 466 return c.text("Unauthorized"); 467 } 468 469 try { 470 await saveTrack(ctx, track, agent); 471 } catch (e) { 472 if (!e.message.includes("duplicate key value violates unique constraint")) { 473 consola.error("[tracks]", e.message); 474 } 475 } 476 477 return c.json({ status: "ok" }); 478}); 479 480app.get("/tracks", async (c) => { 481 requestCounter.add(1, { method: "GET", route: "/tracks" }); 482 483 const size = +c.req.query("size") || 100; 484 const offset = +c.req.query("offset") || 0; 485 486 const tracksData = await ctx.analytics.post("library.getTracks", { 487 pagination: { 488 skip: offset, 489 take: size, 490 }, 491 }); 492 493 return c.json(tracksData.data); 494}); 495 496app.get("/albums", async (c) => { 497 requestCounter.add(1, { method: "GET", route: "/albums" }); 498 499 const size = +c.req.query("size") || 100; 500 const offset = +c.req.query("offset") || 0; 501 502 const albumsData = await ctx.analytics.post("library.getAlbums", { 503 pagination: { 504 skip: offset, 505 take: size, 506 }, 507 }); 508 509 return c.json(albumsData.data); 510}); 511 512app.get("/artists", async (c) => { 513 requestCounter.add(1, { method: "GET", route: "/artists" }); 514 515 const size = +c.req.query("size") || 100; 516 const offset = +c.req.query("offset") || 0; 517 518 const artistsData = await ctx.analytics.post("library.getArtists", { 519 pagination: { 520 skip: offset, 521 take: size, 522 }, 523 }); 524 525 return c.json(artistsData.data); 526}); 527 528app.get("/tracks/:sha256", async (c) => { 529 requestCounter.add(1, { method: "GET", route: "/tracks/:sha256" }); 530 531 const sha256 = c.req.param("sha256"); 532 const track = await ctx.db 533 .select() 534 .from(tracks) 535 .where(eq(tracks.sha256, sha256)) 536 .limit(1) 537 .then((rows) => rows[0]); 538 539 return c.json(track); 540}); 541 542app.get("/albums/:sha256", async (c) => { 543 requestCounter.add(1, { method: "GET", route: "/albums/:sha256" }); 544 545 const sha256 = c.req.param("sha256"); 546 const album = await ctx.db 547 .select() 548 .from(albums) 549 .where(eq(albums.sha256, sha256)) 550 .limit(1) 551 .then((rows) => rows[0]); 552 553 return c.json(album); 554}); 555 556app.get("/artists/:sha256", async (c) => { 557 requestCounter.add(1, { method: "GET", route: "/artists/:sha256" }); 558 559 const sha256 = c.req.param("sha256"); 560 const artist = await ctx.db 561 .select() 562 .from(artists) 563 .where(eq(artists.sha256, sha256)) 564 .limit(1) 565 .then((rows) => rows[0]); 566 567 return c.json(artist); 568}); 569 570app.get("/artists/:sha256/tracks", async (c) => { 571 requestCounter.add(1, { method: "GET", route: "/artists/:sha256/tracks" }); 572 const sha256 = c.req.param("sha256"); 573 574 const artistTracksData = await ctx.db 575 .select({ 576 track: tracks, 577 }) 578 .from(artistTracks) 579 .innerJoin(tracks, eq(artistTracks.trackId, tracks.id)) 580 .innerJoin(artists, eq(artistTracks.artistId, artists.id)) 581 .where(eq(artists.sha256, sha256)); 582 583 return c.json(artistTracksData.map((item) => item.track)); 584}); 585 586app.get("/albums/:sha256/tracks", async (c) => { 587 requestCounter.add(1, { method: "GET", route: "/albums/:sha256/tracks" }); 588 const sha256 = c.req.param("sha256"); 589 590 const albumTracksData = await ctx.db 591 .select({ 592 track: tracks, 593 }) 594 .from(albumTracks) 595 .innerJoin(tracks, eq(albumTracks.trackId, tracks.id)) 596 .innerJoin(albums, eq(albumTracks.albumId, albums.id)) 597 .where(eq(albums.sha256, sha256)); 598 599 return c.json(albumTracksData.map((item) => item.track)); 600}); 601 602app.route("/users", usersApp); 603 604app.route("/webscrobbler", webscrobbler); 605 606const server = serve({ 607 fetch: app.fetch, 608 port: 8000, 609}); 610 611injectWebSocket(server);