A decentralized music tracking and discovery platform built on AT Protocol 馃幍
at 2b044aaed2c433aca9688cbd767408dc80f1fc4f 607 lines 15 kB view raw
1import { serve } from "@hono/node-server"; 2import { createNodeWebSocket } from "@hono/node-ws"; 3import { trace } from "@opentelemetry/api"; 4import { ctx } from "context"; 5import { and, desc, eq, isNotNull, or } from "drizzle-orm"; 6import { Hono } from "hono"; 7import { cors } from "hono/cors"; 8import jwt from "jsonwebtoken"; 9import { createAgent } from "lib/agent"; 10import { 11 getLovedTracks, 12 likeTrack, 13 unLikeTrack, 14} from "lovedtracks/lovedtracks.service"; 15import dns from "node:dns"; 16import { scrobbleTrack } from "nowplaying/nowplaying.service"; 17import { rateLimiter } from "ratelimiter"; 18import subscribe from "subscribers"; 19import { saveTrack } from "tracks/tracks.service"; 20import { trackSchema } from "types/track"; 21import handleWebsocket from "websocket/handler"; 22import apikeys from "./apikeys/app"; 23import bsky from "./bsky/app"; 24import dropbox from "./dropbox/app"; 25import googledrive from "./googledrive/app"; 26import { env } from "./lib/env"; 27import { requestCounter, requestDuration } from "./metrics"; 28import "./profiling"; 29import albumTracks from "./schema/album-tracks"; 30import albums from "./schema/albums"; 31import artistTracks from "./schema/artist-tracks"; 32import artists from "./schema/artists"; 33import scrobbles from "./schema/scrobbles"; 34import tracks from "./schema/tracks"; 35import users from "./schema/users"; 36import spotify from "./spotify/app"; 37import "./tracing"; 38import usersApp from "./users/app"; 39import webscrobbler from "./webscrobbler/app"; 40 41dns.setDefaultResultOrder("ipv4first"); 42 43subscribe(ctx); 44 45const app = new Hono(); 46const { injectWebSocket, upgradeWebSocket } = createNodeWebSocket({ app }); 47 48app.use( 49 "*", 50 rateLimiter({ 51 limit: 1000, 52 window: 30, // 馃憟 30 seconds 53 }), 54); 55 56app.use("*", async (c, next) => { 57 const span = trace.getActiveSpan(); 58 span?.setAttribute("http.route", c.req.path); 59 await next(); 60}); 61 62app.use("*", async (c, next) => { 63 const start = Date.now(); 64 await next(); 65 const duration = (Date.now() - start) / 1000; 66 requestDuration.record(duration, { 67 route: c.req.path, 68 method: c.req.method, 69 }); 70}); 71 72app.use(cors()); 73 74app.route("/", bsky); 75 76app.route("/spotify", spotify); 77 78app.route("/dropbox", dropbox); 79 80app.route("/googledrive", googledrive); 81 82app.route("/apikeys", apikeys); 83 84app.get("/ws", upgradeWebSocket(handleWebsocket)); 85 86app.get("/", async (c) => { 87 return c.json({ status: "ok" }); 88}); 89 90app.post("/now-playing", async (c) => { 91 requestCounter.add(1, { method: "POST", route: "/now-playing" }); 92 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim(); 93 94 if (!bearer || bearer === "null") { 95 c.status(401); 96 return c.text("Unauthorized"); 97 } 98 99 const { did } = jwt.verify(bearer, env.JWT_SECRET, { 100 ignoreExpiration: true, 101 }); 102 103 const user = await ctx.db 104 .select() 105 .from(users) 106 .where(eq(users.did, did)) 107 .limit(1) 108 .then((rows) => rows[0]); 109 110 if (!user) { 111 c.status(401); 112 return c.text("Unauthorized"); 113 } 114 115 const body = await c.req.json(); 116 const parsed = trackSchema.safeParse(body); 117 118 if (parsed.error) { 119 c.status(400); 120 return c.text("Invalid track data: " + parsed.error.message); 121 } 122 const track = parsed.data; 123 124 const agent = await createAgent(ctx.oauthClient, did); 125 if (!agent) { 126 c.status(401); 127 return c.text("Unauthorized"); 128 } 129 130 await scrobbleTrack(ctx, track, agent, user.did); 131 132 return c.json({ status: "ok" }); 133}); 134 135app.get("/now-playing", async (c) => { 136 requestCounter.add(1, { method: "GET", route: "/now-playing" }); 137 138 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim(); 139 140 const payload = 141 bearer && bearer !== "null" 142 ? jwt.verify(bearer, env.JWT_SECRET, { ignoreExpiration: true }) 143 : {}; 144 const did = c.req.query("did") || payload.did; 145 146 if (!did) { 147 c.status(401); 148 return c.text("Unauthorized"); 149 } 150 151 const user = await ctx.db 152 .select() 153 .from(users) 154 .where(or(eq(users.did, did), eq(users.handle, did))) 155 .limit(1) 156 .then((rows) => rows[0]); 157 158 if (!user) { 159 c.status(401); 160 return c.text("Unauthorized"); 161 } 162 163 const [nowPlaying, status] = await Promise.all([ 164 ctx.redis.get(`nowplaying:${user.did}`), 165 ctx.redis.get(`nowplaying:${user.did}:status`), 166 ]); 167 return c.json( 168 nowPlaying ? { ...JSON.parse(nowPlaying), is_playing: status === "1" } : {}, 169 ); 170}); 171 172app.get("/now-playings", async (c) => { 173 requestCounter.add(1, { method: "GET", route: "/now-playings" }); 174 const size = +c.req.query("size") || 10; 175 const offset = +c.req.query("offset") || 0; 176 const { data } = await ctx.analytics.post("library.getDistinctScrobbles", { 177 pagination: { 178 skip: offset, 179 take: size, 180 }, 181 }); 182 return c.json(data); 183}); 184 185app.post("/likes", async (c) => { 186 requestCounter.add(1, { method: "POST", route: "/likes" }); 187 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim(); 188 189 if (!bearer || bearer === "null") { 190 c.status(401); 191 return c.text("Unauthorized"); 192 } 193 194 const { did } = jwt.verify(bearer, env.JWT_SECRET, { 195 ignoreExpiration: true, 196 }); 197 const agent = await createAgent(ctx.oauthClient, did); 198 199 const user = await ctx.db 200 .select() 201 .from(users) 202 .where(eq(users.did, did)) 203 .limit(1) 204 .then((rows) => rows[0]); 205 206 if (!user) { 207 c.status(401); 208 return c.text("Unauthorized"); 209 } 210 211 const body = await c.req.json(); 212 const parsed = trackSchema.safeParse(body); 213 214 if (parsed.error) { 215 c.status(400); 216 return c.text("Invalid track data: " + parsed.error.message); 217 } 218 const track = parsed.data; 219 await likeTrack(ctx, track, user, agent); 220 221 return c.json({ status: "ok" }); 222}); 223 224app.delete("/likes/:sha256", async (c) => { 225 requestCounter.add(1, { method: "DELETE", route: "/likes/:sha256" }); 226 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim(); 227 228 if (!bearer || bearer === "null") { 229 c.status(401); 230 return c.text("Unauthorized"); 231 } 232 233 const { did } = jwt.verify(bearer, env.JWT_SECRET, { 234 ignoreExpiration: true, 235 }); 236 const agent = await createAgent(ctx.oauthClient, did); 237 238 const user = await ctx.db 239 .select() 240 .from(users) 241 .where(eq(users.did, did)) 242 .limit(1) 243 .then((rows) => rows[0]); 244 245 if (!user) { 246 c.status(401); 247 return c.text("Unauthorized"); 248 } 249 250 const sha256 = c.req.param("sha256"); 251 await unLikeTrack(ctx, sha256, user, agent); 252 return c.json({ status: "ok" }); 253}); 254 255app.get("/likes", async (c) => { 256 requestCounter.add(1, { method: "GET", route: "/likes" }); 257 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim(); 258 259 if (!bearer || bearer === "null") { 260 c.status(401); 261 return c.text("Unauthorized"); 262 } 263 264 const { did } = jwt.verify(bearer, env.JWT_SECRET, { 265 ignoreExpiration: true, 266 }); 267 268 const user = await ctx.db 269 .select() 270 .from(users) 271 .where(eq(users.did, did)) 272 .limit(1) 273 .then((rows) => rows[0]); 274 275 if (!user) { 276 c.status(401); 277 return c.text("Unauthorized"); 278 } 279 280 const size = +c.req.query("size") || 10; 281 const offset = +c.req.query("offset") || 0; 282 283 const lovedTracks = await getLovedTracks(ctx, user, size, offset); 284 return c.json(lovedTracks); 285}); 286 287app.get("/public/scrobbles", async (c) => { 288 requestCounter.add(1, { method: "GET", route: "/public/scrobbles" }); 289 290 const size = +c.req.query("size") || 10; 291 const offset = +c.req.query("offset") || 0; 292 293 const scrobbleRecords = await ctx.db 294 .select({ 295 scrobble: scrobbles, 296 track: tracks, 297 user: users, 298 }) 299 .from(scrobbles) 300 .innerJoin(tracks, eq(scrobbles.trackId, tracks.id)) 301 .innerJoin(users, eq(scrobbles.userId, users.id)) 302 .orderBy(desc(scrobbles.timestamp)) 303 .limit(size) 304 .offset(offset); 305 306 return c.json( 307 scrobbleRecords.map((item) => ({ 308 cover: item.track.albumArt, 309 artist: item.track.artist, 310 title: item.track.title, 311 date: item.scrobble.timestamp, 312 user: item.user.handle, 313 uri: item.scrobble.uri, 314 albumUri: item.track.albumUri, 315 artistUri: item.track.artistUri, 316 tags: [], 317 listeners: 1, 318 sha256: item.track.sha256, 319 id: item.scrobble.id, 320 })), 321 ); 322}); 323 324app.get("/public/scrobbleschart", async (c) => { 325 requestCounter.add(1, { method: "GET", route: "/public/scrobbleschart" }); 326 327 const did = c.req.query("did"); 328 const artisturi = c.req.query("artisturi"); 329 const albumuri = c.req.query("albumuri"); 330 const songuri = c.req.query("songuri"); 331 332 if (did) { 333 const chart = await ctx.analytics.post("library.getScrobblesPerDay", { 334 user_did: did, 335 }); 336 return c.json(chart.data); 337 } 338 339 if (artisturi) { 340 const chart = await ctx.analytics.post("library.getArtistScrobbles", { 341 artist_id: artisturi, 342 }); 343 return c.json(chart.data); 344 } 345 346 if (albumuri) { 347 const chart = await ctx.analytics.post("library.getAlbumScrobbles", { 348 album_id: albumuri, 349 }); 350 return c.json(chart.data); 351 } 352 353 if (songuri) { 354 let uri = songuri; 355 if (songuri.includes("app.rocksky.scrobble")) { 356 const scrobble = await ctx.db 357 .select({ 358 scrobble: scrobbles, 359 track: tracks, 360 }) 361 .from(scrobbles) 362 .innerJoin(tracks, eq(scrobbles.trackId, tracks.id)) 363 .where(eq(scrobbles.uri, songuri)) 364 .limit(1) 365 .then((rows) => rows[0]); 366 367 uri = scrobble.track.uri; 368 } 369 const chart = await ctx.analytics.post("library.getTrackScrobbles", { 370 track_id: uri, 371 }); 372 return c.json(chart.data); 373 } 374 375 const chart = await ctx.analytics.post("library.getScrobblesPerDay", {}); 376 return c.json(chart.data); 377}); 378 379app.get("/scrobbles", async (c) => { 380 requestCounter.add(1, { method: "GET", route: "/scrobbles" }); 381 382 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim(); 383 384 if (!bearer || bearer === "null") { 385 c.status(401); 386 return c.text("Unauthorized"); 387 } 388 389 const { did } = jwt.verify(bearer, env.JWT_SECRET, { 390 ignoreExpiration: true, 391 }); 392 393 const user = await ctx.db 394 .select() 395 .from(users) 396 .where(eq(users.did, did)) 397 .limit(1) 398 .then((rows) => rows[0]); 399 400 if (!user) { 401 c.status(401); 402 return c.text("Unauthorized"); 403 } 404 405 const size = +c.req.query("size") || 10; 406 const offset = +c.req.query("offset") || 0; 407 408 const userScrobbles = await ctx.db 409 .select({ 410 scrobble: scrobbles, 411 track: tracks, 412 }) 413 .from(scrobbles) 414 .innerJoin(tracks, eq(scrobbles.trackId, tracks.id)) 415 .where(and(eq(scrobbles.userId, user.id), isNotNull(scrobbles.uri))) 416 .orderBy(desc(scrobbles.createdAt)) 417 .limit(size) 418 .offset(offset); 419 420 return c.json(userScrobbles); 421}); 422 423app.post("/tracks", async (c) => { 424 requestCounter.add(1, { method: "POST", route: "/tracks" }); 425 426 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim(); 427 428 if (!bearer || bearer === "null") { 429 c.status(401); 430 return c.text("Unauthorized"); 431 } 432 433 const { did } = jwt.verify(bearer, env.JWT_SECRET, { 434 ignoreExpiration: true, 435 }); 436 437 const user = await ctx.db 438 .select() 439 .from(users) 440 .where(eq(users.did, did)) 441 .limit(1) 442 .then((rows) => rows[0]); 443 444 if (!user) { 445 c.status(401); 446 return c.text("Unauthorized"); 447 } 448 449 const body = await c.req.json(); 450 const parsed = trackSchema.safeParse(body); 451 452 if (parsed.error) { 453 c.status(400); 454 return c.text("Invalid track data: " + parsed.error.message); 455 } 456 457 const track = parsed.data; 458 459 const agent = await createAgent(ctx.oauthClient, did); 460 if (!agent) { 461 c.status(401); 462 return c.text("Unauthorized"); 463 } 464 465 try { 466 await saveTrack(ctx, track, agent); 467 } catch (e) { 468 if (!e.message.includes("duplicate key value violates unique constraint")) { 469 console.error("[tracks]", e.message); 470 } 471 } 472 473 return c.json({ status: "ok" }); 474}); 475 476app.get("/tracks", async (c) => { 477 requestCounter.add(1, { method: "GET", route: "/tracks" }); 478 479 const size = +c.req.query("size") || 100; 480 const offset = +c.req.query("offset") || 0; 481 482 const tracksData = await ctx.analytics.post("library.getTracks", { 483 pagination: { 484 skip: offset, 485 take: size, 486 }, 487 }); 488 489 return c.json(tracksData.data); 490}); 491 492app.get("/albums", async (c) => { 493 requestCounter.add(1, { method: "GET", route: "/albums" }); 494 495 const size = +c.req.query("size") || 100; 496 const offset = +c.req.query("offset") || 0; 497 498 const albumsData = await ctx.analytics.post("library.getAlbums", { 499 pagination: { 500 skip: offset, 501 take: size, 502 }, 503 }); 504 505 return c.json(albumsData.data); 506}); 507 508app.get("/artists", async (c) => { 509 requestCounter.add(1, { method: "GET", route: "/artists" }); 510 511 const size = +c.req.query("size") || 100; 512 const offset = +c.req.query("offset") || 0; 513 514 const artistsData = await ctx.analytics.post("library.getArtists", { 515 pagination: { 516 skip: offset, 517 take: size, 518 }, 519 }); 520 521 return c.json(artistsData.data); 522}); 523 524app.get("/tracks/:sha256", async (c) => { 525 requestCounter.add(1, { method: "GET", route: "/tracks/:sha256" }); 526 527 const sha256 = c.req.param("sha256"); 528 const track = await ctx.db 529 .select() 530 .from(tracks) 531 .where(eq(tracks.sha256, sha256)) 532 .limit(1) 533 .then((rows) => rows[0]); 534 535 return c.json(track); 536}); 537 538app.get("/albums/:sha256", async (c) => { 539 requestCounter.add(1, { method: "GET", route: "/albums/:sha256" }); 540 541 const sha256 = c.req.param("sha256"); 542 const album = await ctx.db 543 .select() 544 .from(albums) 545 .where(eq(albums.sha256, sha256)) 546 .limit(1) 547 .then((rows) => rows[0]); 548 549 return c.json(album); 550}); 551 552app.get("/artists/:sha256", async (c) => { 553 requestCounter.add(1, { method: "GET", route: "/artists/:sha256" }); 554 555 const sha256 = c.req.param("sha256"); 556 const artist = await ctx.db 557 .select() 558 .from(artists) 559 .where(eq(artists.sha256, sha256)) 560 .limit(1) 561 .then((rows) => rows[0]); 562 563 return c.json(artist); 564}); 565 566app.get("/artists/:sha256/tracks", async (c) => { 567 requestCounter.add(1, { method: "GET", route: "/artists/:sha256/tracks" }); 568 const sha256 = c.req.param("sha256"); 569 570 const artistTracksData = await ctx.db 571 .select({ 572 track: tracks, 573 }) 574 .from(artistTracks) 575 .innerJoin(tracks, eq(artistTracks.trackId, tracks.id)) 576 .innerJoin(artists, eq(artistTracks.artistId, artists.id)) 577 .where(eq(artists.sha256, sha256)); 578 579 return c.json(artistTracksData.map((item) => item.track)); 580}); 581 582app.get("/albums/:sha256/tracks", async (c) => { 583 requestCounter.add(1, { method: "GET", route: "/albums/:sha256/tracks" }); 584 const sha256 = c.req.param("sha256"); 585 586 const albumTracksData = await ctx.db 587 .select({ 588 track: tracks, 589 }) 590 .from(albumTracks) 591 .innerJoin(tracks, eq(albumTracks.trackId, tracks.id)) 592 .innerJoin(albums, eq(albumTracks.albumId, albums.id)) 593 .where(eq(albums.sha256, sha256)); 594 595 return c.json(albumTracksData.map((item) => item.track)); 596}); 597 598app.route("/users", usersApp); 599 600app.route("/webscrobbler", webscrobbler); 601 602const server = serve({ 603 fetch: app.fetch, 604 port: 8000, 605}); 606 607injectWebSocket(server);