A decentralized music tracking and discovery platform built on AT Protocol 馃幍
at fff48ea3213bb11efcfcb7db85be1dfcd2bebc5e 608 lines 15 kB view raw
1import { serve } from "@hono/node-server"; 2import { createNodeWebSocket } from "@hono/node-ws"; 3import { trace } from "@opentelemetry/api"; 4import { consola } from "consola"; 5import { ctx } from "context"; 6import { and, desc, eq, isNotNull, or } from "drizzle-orm"; 7import { Hono } from "hono"; 8import { cors } from "hono/cors"; 9import jwt from "jsonwebtoken"; 10import { createAgent } from "lib/agent"; 11import { 12 getLovedTracks, 13 likeTrack, 14 unLikeTrack, 15} from "lovedtracks/lovedtracks.service"; 16import dns from "node:dns"; 17import { scrobbleTrack } from "nowplaying/nowplaying.service"; 18import { rateLimiter } from "ratelimiter"; 19import subscribe from "subscribers"; 20import { saveTrack } from "tracks/tracks.service"; 21import { trackSchema } from "types/track"; 22import handleWebsocket from "websocket/handler"; 23import apikeys from "./apikeys/app"; 24import bsky from "./bsky/app"; 25import dropbox from "./dropbox/app"; 26import googledrive from "./googledrive/app"; 27import { env } from "./lib/env"; 28import { requestCounter, requestDuration } from "./metrics"; 29import "./profiling"; 30import albumTracks from "./schema/album-tracks"; 31import albums from "./schema/albums"; 32import artistTracks from "./schema/artist-tracks"; 33import artists from "./schema/artists"; 34import scrobbles from "./schema/scrobbles"; 35import tracks from "./schema/tracks"; 36import users from "./schema/users"; 37import spotify from "./spotify/app"; 38import "./tracing"; 39import usersApp from "./users/app"; 40import webscrobbler from "./webscrobbler/app"; 41 42dns.setDefaultResultOrder("ipv4first"); 43 44subscribe(ctx); 45 46const app = new Hono(); 47const { injectWebSocket, upgradeWebSocket } = createNodeWebSocket({ app }); 48 49app.use( 50 "*", 51 rateLimiter({ 52 limit: 1000, 53 window: 30, // 馃憟 30 seconds 54 }), 55); 56 57app.use("*", async (c, next) => { 58 const span = trace.getActiveSpan(); 59 span?.setAttribute("http.route", c.req.path); 60 await next(); 61}); 62 63app.use("*", async (c, next) => { 64 const start = Date.now(); 65 await next(); 66 const duration = (Date.now() - start) / 1000; 67 requestDuration.record(duration, { 68 route: c.req.path, 69 method: c.req.method, 70 }); 71}); 72 73app.use(cors()); 74 75app.route("/", bsky); 76 77app.route("/spotify", spotify); 78 79app.route("/dropbox", dropbox); 80 81app.route("/googledrive", googledrive); 82 83app.route("/apikeys", apikeys); 84 85app.get("/ws", upgradeWebSocket(handleWebsocket)); 86 87app.get("/", async (c) => { 88 return c.json({ status: "ok" }); 89}); 90 91app.post("/now-playing", async (c) => { 92 requestCounter.add(1, { method: "POST", route: "/now-playing" }); 93 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim(); 94 95 if (!bearer || bearer === "null") { 96 c.status(401); 97 return c.text("Unauthorized"); 98 } 99 100 const { did } = jwt.verify(bearer, env.JWT_SECRET, { 101 ignoreExpiration: true, 102 }); 103 104 const user = await ctx.db 105 .select() 106 .from(users) 107 .where(eq(users.did, did)) 108 .limit(1) 109 .then((rows) => rows[0]); 110 111 if (!user) { 112 c.status(401); 113 return c.text("Unauthorized"); 114 } 115 116 const body = await c.req.json(); 117 const parsed = trackSchema.safeParse(body); 118 119 if (parsed.error) { 120 c.status(400); 121 return c.text("Invalid track data: " + parsed.error.message); 122 } 123 const track = parsed.data; 124 125 const agent = await createAgent(ctx.oauthClient, did); 126 if (!agent) { 127 c.status(401); 128 return c.text("Unauthorized"); 129 } 130 131 await scrobbleTrack(ctx, track, agent, user.did); 132 133 return c.json({ status: "ok" }); 134}); 135 136app.get("/now-playing", async (c) => { 137 requestCounter.add(1, { method: "GET", route: "/now-playing" }); 138 139 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim(); 140 141 const payload = 142 bearer && bearer !== "null" 143 ? jwt.verify(bearer, env.JWT_SECRET, { ignoreExpiration: true }) 144 : {}; 145 const did = c.req.query("did") || payload.did; 146 147 if (!did) { 148 c.status(401); 149 return c.text("Unauthorized"); 150 } 151 152 const user = await ctx.db 153 .select() 154 .from(users) 155 .where(or(eq(users.did, did), eq(users.handle, did))) 156 .limit(1) 157 .then((rows) => rows[0]); 158 159 if (!user) { 160 c.status(401); 161 return c.text("Unauthorized"); 162 } 163 164 const [nowPlaying, status] = await Promise.all([ 165 ctx.redis.get(`nowplaying:${user.did}`), 166 ctx.redis.get(`nowplaying:${user.did}:status`), 167 ]); 168 return c.json( 169 nowPlaying ? { ...JSON.parse(nowPlaying), is_playing: status === "1" } : {}, 170 ); 171}); 172 173app.get("/now-playings", async (c) => { 174 requestCounter.add(1, { method: "GET", route: "/now-playings" }); 175 const size = +c.req.query("size") || 10; 176 const offset = +c.req.query("offset") || 0; 177 const { data } = await ctx.analytics.post("library.getDistinctScrobbles", { 178 pagination: { 179 skip: offset, 180 take: size, 181 }, 182 }); 183 return c.json(data); 184}); 185 186app.post("/likes", async (c) => { 187 requestCounter.add(1, { method: "POST", route: "/likes" }); 188 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim(); 189 190 if (!bearer || bearer === "null") { 191 c.status(401); 192 return c.text("Unauthorized"); 193 } 194 195 const { did } = jwt.verify(bearer, env.JWT_SECRET, { 196 ignoreExpiration: true, 197 }); 198 const agent = await createAgent(ctx.oauthClient, did); 199 200 const user = await ctx.db 201 .select() 202 .from(users) 203 .where(eq(users.did, did)) 204 .limit(1) 205 .then((rows) => rows[0]); 206 207 if (!user) { 208 c.status(401); 209 return c.text("Unauthorized"); 210 } 211 212 const body = await c.req.json(); 213 const parsed = trackSchema.safeParse(body); 214 215 if (parsed.error) { 216 c.status(400); 217 return c.text("Invalid track data: " + parsed.error.message); 218 } 219 const track = parsed.data; 220 await likeTrack(ctx, track, user, agent); 221 222 return c.json({ status: "ok" }); 223}); 224 225app.delete("/likes/:sha256", async (c) => { 226 requestCounter.add(1, { method: "DELETE", route: "/likes/:sha256" }); 227 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim(); 228 229 if (!bearer || bearer === "null") { 230 c.status(401); 231 return c.text("Unauthorized"); 232 } 233 234 const { did } = jwt.verify(bearer, env.JWT_SECRET, { 235 ignoreExpiration: true, 236 }); 237 const agent = await createAgent(ctx.oauthClient, did); 238 239 const user = await ctx.db 240 .select() 241 .from(users) 242 .where(eq(users.did, did)) 243 .limit(1) 244 .then((rows) => rows[0]); 245 246 if (!user) { 247 c.status(401); 248 return c.text("Unauthorized"); 249 } 250 251 const sha256 = c.req.param("sha256"); 252 await unLikeTrack(ctx, sha256, user, agent); 253 return c.json({ status: "ok" }); 254}); 255 256app.get("/likes", async (c) => { 257 requestCounter.add(1, { method: "GET", route: "/likes" }); 258 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim(); 259 260 if (!bearer || bearer === "null") { 261 c.status(401); 262 return c.text("Unauthorized"); 263 } 264 265 const { did } = jwt.verify(bearer, env.JWT_SECRET, { 266 ignoreExpiration: true, 267 }); 268 269 const user = await ctx.db 270 .select() 271 .from(users) 272 .where(eq(users.did, did)) 273 .limit(1) 274 .then((rows) => rows[0]); 275 276 if (!user) { 277 c.status(401); 278 return c.text("Unauthorized"); 279 } 280 281 const size = +c.req.query("size") || 10; 282 const offset = +c.req.query("offset") || 0; 283 284 const lovedTracks = await getLovedTracks(ctx, user, size, offset); 285 return c.json(lovedTracks); 286}); 287 288app.get("/public/scrobbles", async (c) => { 289 requestCounter.add(1, { method: "GET", route: "/public/scrobbles" }); 290 291 const size = +c.req.query("size") || 10; 292 const offset = +c.req.query("offset") || 0; 293 294 const scrobbleRecords = await ctx.db 295 .select({ 296 scrobble: scrobbles, 297 track: tracks, 298 user: users, 299 }) 300 .from(scrobbles) 301 .innerJoin(tracks, eq(scrobbles.trackId, tracks.id)) 302 .innerJoin(users, eq(scrobbles.userId, users.id)) 303 .orderBy(desc(scrobbles.timestamp)) 304 .limit(size) 305 .offset(offset); 306 307 return c.json( 308 scrobbleRecords.map((item) => ({ 309 cover: item.track.albumArt, 310 artist: item.track.artist, 311 title: item.track.title, 312 date: item.scrobble.timestamp, 313 user: item.user.handle, 314 uri: item.scrobble.uri, 315 albumUri: item.track.albumUri, 316 artistUri: item.track.artistUri, 317 tags: [], 318 listeners: 1, 319 sha256: item.track.sha256, 320 id: item.scrobble.id, 321 })), 322 ); 323}); 324 325app.get("/public/scrobbleschart", async (c) => { 326 requestCounter.add(1, { method: "GET", route: "/public/scrobbleschart" }); 327 328 const did = c.req.query("did"); 329 const artisturi = c.req.query("artisturi"); 330 const albumuri = c.req.query("albumuri"); 331 const songuri = c.req.query("songuri"); 332 333 if (did) { 334 const chart = await ctx.analytics.post("library.getScrobblesPerDay", { 335 user_did: did, 336 }); 337 return c.json(chart.data); 338 } 339 340 if (artisturi) { 341 const chart = await ctx.analytics.post("library.getArtistScrobbles", { 342 artist_id: artisturi, 343 }); 344 return c.json(chart.data); 345 } 346 347 if (albumuri) { 348 const chart = await ctx.analytics.post("library.getAlbumScrobbles", { 349 album_id: albumuri, 350 }); 351 return c.json(chart.data); 352 } 353 354 if (songuri) { 355 let uri = songuri; 356 if (songuri.includes("app.rocksky.scrobble")) { 357 const scrobble = await ctx.db 358 .select({ 359 scrobble: scrobbles, 360 track: tracks, 361 }) 362 .from(scrobbles) 363 .innerJoin(tracks, eq(scrobbles.trackId, tracks.id)) 364 .where(eq(scrobbles.uri, songuri)) 365 .limit(1) 366 .then((rows) => rows[0]); 367 368 uri = scrobble.track.uri; 369 } 370 const chart = await ctx.analytics.post("library.getTrackScrobbles", { 371 track_id: uri, 372 }); 373 return c.json(chart.data); 374 } 375 376 const chart = await ctx.analytics.post("library.getScrobblesPerDay", {}); 377 return c.json(chart.data); 378}); 379 380app.get("/scrobbles", async (c) => { 381 requestCounter.add(1, { method: "GET", route: "/scrobbles" }); 382 383 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim(); 384 385 if (!bearer || bearer === "null") { 386 c.status(401); 387 return c.text("Unauthorized"); 388 } 389 390 const { did } = jwt.verify(bearer, env.JWT_SECRET, { 391 ignoreExpiration: true, 392 }); 393 394 const user = await ctx.db 395 .select() 396 .from(users) 397 .where(eq(users.did, did)) 398 .limit(1) 399 .then((rows) => rows[0]); 400 401 if (!user) { 402 c.status(401); 403 return c.text("Unauthorized"); 404 } 405 406 const size = +c.req.query("size") || 10; 407 const offset = +c.req.query("offset") || 0; 408 409 const userScrobbles = await ctx.db 410 .select({ 411 scrobble: scrobbles, 412 track: tracks, 413 }) 414 .from(scrobbles) 415 .innerJoin(tracks, eq(scrobbles.trackId, tracks.id)) 416 .where(and(eq(scrobbles.userId, user.id), isNotNull(scrobbles.uri))) 417 .orderBy(desc(scrobbles.createdAt)) 418 .limit(size) 419 .offset(offset); 420 421 return c.json(userScrobbles); 422}); 423 424app.post("/tracks", async (c) => { 425 requestCounter.add(1, { method: "POST", route: "/tracks" }); 426 427 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim(); 428 429 if (!bearer || bearer === "null") { 430 c.status(401); 431 return c.text("Unauthorized"); 432 } 433 434 const { did } = jwt.verify(bearer, env.JWT_SECRET, { 435 ignoreExpiration: true, 436 }); 437 438 const user = await ctx.db 439 .select() 440 .from(users) 441 .where(eq(users.did, did)) 442 .limit(1) 443 .then((rows) => rows[0]); 444 445 if (!user) { 446 c.status(401); 447 return c.text("Unauthorized"); 448 } 449 450 const body = await c.req.json(); 451 const parsed = trackSchema.safeParse(body); 452 453 if (parsed.error) { 454 c.status(400); 455 return c.text("Invalid track data: " + parsed.error.message); 456 } 457 458 const track = parsed.data; 459 460 const agent = await createAgent(ctx.oauthClient, did); 461 if (!agent) { 462 c.status(401); 463 return c.text("Unauthorized"); 464 } 465 466 try { 467 await saveTrack(ctx, track, agent); 468 } catch (e) { 469 if (!e.message.includes("duplicate key value violates unique constraint")) { 470 consola.error("[tracks]", e.message); 471 } 472 } 473 474 return c.json({ status: "ok" }); 475}); 476 477app.get("/tracks", async (c) => { 478 requestCounter.add(1, { method: "GET", route: "/tracks" }); 479 480 const size = +c.req.query("size") || 100; 481 const offset = +c.req.query("offset") || 0; 482 483 const tracksData = await ctx.analytics.post("library.getTracks", { 484 pagination: { 485 skip: offset, 486 take: size, 487 }, 488 }); 489 490 return c.json(tracksData.data); 491}); 492 493app.get("/albums", async (c) => { 494 requestCounter.add(1, { method: "GET", route: "/albums" }); 495 496 const size = +c.req.query("size") || 100; 497 const offset = +c.req.query("offset") || 0; 498 499 const albumsData = await ctx.analytics.post("library.getAlbums", { 500 pagination: { 501 skip: offset, 502 take: size, 503 }, 504 }); 505 506 return c.json(albumsData.data); 507}); 508 509app.get("/artists", async (c) => { 510 requestCounter.add(1, { method: "GET", route: "/artists" }); 511 512 const size = +c.req.query("size") || 100; 513 const offset = +c.req.query("offset") || 0; 514 515 const artistsData = await ctx.analytics.post("library.getArtists", { 516 pagination: { 517 skip: offset, 518 take: size, 519 }, 520 }); 521 522 return c.json(artistsData.data); 523}); 524 525app.get("/tracks/:sha256", async (c) => { 526 requestCounter.add(1, { method: "GET", route: "/tracks/:sha256" }); 527 528 const sha256 = c.req.param("sha256"); 529 const track = await ctx.db 530 .select() 531 .from(tracks) 532 .where(eq(tracks.sha256, sha256)) 533 .limit(1) 534 .then((rows) => rows[0]); 535 536 return c.json(track); 537}); 538 539app.get("/albums/:sha256", async (c) => { 540 requestCounter.add(1, { method: "GET", route: "/albums/:sha256" }); 541 542 const sha256 = c.req.param("sha256"); 543 const album = await ctx.db 544 .select() 545 .from(albums) 546 .where(eq(albums.sha256, sha256)) 547 .limit(1) 548 .then((rows) => rows[0]); 549 550 return c.json(album); 551}); 552 553app.get("/artists/:sha256", async (c) => { 554 requestCounter.add(1, { method: "GET", route: "/artists/:sha256" }); 555 556 const sha256 = c.req.param("sha256"); 557 const artist = await ctx.db 558 .select() 559 .from(artists) 560 .where(eq(artists.sha256, sha256)) 561 .limit(1) 562 .then((rows) => rows[0]); 563 564 return c.json(artist); 565}); 566 567app.get("/artists/:sha256/tracks", async (c) => { 568 requestCounter.add(1, { method: "GET", route: "/artists/:sha256/tracks" }); 569 const sha256 = c.req.param("sha256"); 570 571 const artistTracksData = await ctx.db 572 .select({ 573 track: tracks, 574 }) 575 .from(artistTracks) 576 .innerJoin(tracks, eq(artistTracks.trackId, tracks.id)) 577 .innerJoin(artists, eq(artistTracks.artistId, artists.id)) 578 .where(eq(artists.sha256, sha256)); 579 580 return c.json(artistTracksData.map((item) => item.track)); 581}); 582 583app.get("/albums/:sha256/tracks", async (c) => { 584 requestCounter.add(1, { method: "GET", route: "/albums/:sha256/tracks" }); 585 const sha256 = c.req.param("sha256"); 586 587 const albumTracksData = await ctx.db 588 .select({ 589 track: tracks, 590 }) 591 .from(albumTracks) 592 .innerJoin(tracks, eq(albumTracks.trackId, tracks.id)) 593 .innerJoin(albums, eq(albumTracks.albumId, albums.id)) 594 .where(eq(albums.sha256, sha256)); 595 596 return c.json(albumTracksData.map((item) => item.track)); 597}); 598 599app.route("/users", usersApp); 600 601app.route("/webscrobbler", webscrobbler); 602 603const server = serve({ 604 fetch: app.fetch, 605 port: 8000, 606}); 607 608injectWebSocket(server);