A decentralized music tracking and discovery platform built on AT Protocol 馃幍
at 39a2afa0bd9ca4cd2b9047b8a94751ba5dcf7b38 604 lines 15 kB view raw
1import { serve } from "@hono/node-server"; 2import { createNodeWebSocket } from "@hono/node-ws"; 3import { trace } from "@opentelemetry/api"; 4import { ctx } from "context"; 5import { and, desc, eq, isNotNull, or } from "drizzle-orm"; 6import { Hono } from "hono"; 7import { cors } from "hono/cors"; 8import jwt from "jsonwebtoken"; 9import { createAgent } from "lib/agent"; 10import { 11 getLovedTracks, 12 likeTrack, 13 unLikeTrack, 14} from "lovedtracks/lovedtracks.service"; 15import { scrobbleTrack } from "nowplaying/nowplaying.service"; 16import { rateLimiter } from "ratelimiter"; 17import subscribe from "subscribers"; 18import { saveTrack } from "tracks/tracks.service"; 19import { trackSchema } from "types/track"; 20import handleWebsocket from "websocket/handler"; 21import apikeys from "./apikeys/app"; 22import bsky from "./bsky/app"; 23import dropbox from "./dropbox/app"; 24import googledrive from "./googledrive/app"; 25import { env } from "./lib/env"; 26import { requestCounter, requestDuration } from "./metrics"; 27import "./profiling"; 28import albumTracks from "./schema/album-tracks"; 29import albums from "./schema/albums"; 30import artistTracks from "./schema/artist-tracks"; 31import artists from "./schema/artists"; 32import scrobbles from "./schema/scrobbles"; 33import tracks from "./schema/tracks"; 34import users from "./schema/users"; 35import spotify from "./spotify/app"; 36import "./tracing"; 37import usersApp from "./users/app"; 38import webscrobbler from "./webscrobbler/app"; 39 40subscribe(ctx); 41 42const app = new Hono(); 43const { injectWebSocket, upgradeWebSocket } = createNodeWebSocket({ app }); 44 45app.use( 46 "*", 47 rateLimiter({ 48 limit: 1000, 49 window: 30, // 馃憟 30 seconds 50 }), 51); 52 53app.use("*", async (c, next) => { 54 const span = trace.getActiveSpan(); 55 span?.setAttribute("http.route", c.req.path); 56 await next(); 57}); 58 59app.use("*", async (c, next) => { 60 const start = Date.now(); 61 await next(); 62 const duration = (Date.now() - start) / 1000; 63 requestDuration.record(duration, { 64 route: c.req.path, 65 method: c.req.method, 66 }); 67}); 68 69app.use(cors()); 70 71app.route("/", bsky); 72 73app.route("/spotify", spotify); 74 75app.route("/dropbox", dropbox); 76 77app.route("/googledrive", googledrive); 78 79app.route("/apikeys", apikeys); 80 81app.get("/ws", upgradeWebSocket(handleWebsocket)); 82 83app.get("/", async (c) => { 84 return c.json({ status: "ok" }); 85}); 86 87app.post("/now-playing", async (c) => { 88 requestCounter.add(1, { method: "POST", route: "/now-playing" }); 89 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim(); 90 91 if (!bearer || bearer === "null") { 92 c.status(401); 93 return c.text("Unauthorized"); 94 } 95 96 const { did } = jwt.verify(bearer, env.JWT_SECRET, { 97 ignoreExpiration: true, 98 }); 99 100 const user = await ctx.db 101 .select() 102 .from(users) 103 .where(eq(users.did, did)) 104 .limit(1) 105 .then((rows) => rows[0]); 106 107 if (!user) { 108 c.status(401); 109 return c.text("Unauthorized"); 110 } 111 112 const body = await c.req.json(); 113 const parsed = trackSchema.safeParse(body); 114 115 if (parsed.error) { 116 c.status(400); 117 return c.text("Invalid track data: " + parsed.error.message); 118 } 119 const track = parsed.data; 120 121 const agent = await createAgent(ctx.oauthClient, did); 122 if (!agent) { 123 c.status(401); 124 return c.text("Unauthorized"); 125 } 126 127 await scrobbleTrack(ctx, track, agent, user.did); 128 129 return c.json({ status: "ok" }); 130}); 131 132app.get("/now-playing", async (c) => { 133 requestCounter.add(1, { method: "GET", route: "/now-playing" }); 134 135 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim(); 136 137 const payload = 138 bearer && bearer !== "null" 139 ? jwt.verify(bearer, env.JWT_SECRET, { ignoreExpiration: true }) 140 : {}; 141 const did = c.req.query("did") || payload.did; 142 143 if (!did) { 144 c.status(401); 145 return c.text("Unauthorized"); 146 } 147 148 const user = await ctx.db 149 .select() 150 .from(users) 151 .where(or(eq(users.did, did), eq(users.handle, did))) 152 .limit(1) 153 .then((rows) => rows[0]); 154 155 if (!user) { 156 c.status(401); 157 return c.text("Unauthorized"); 158 } 159 160 const [nowPlaying, status] = await Promise.all([ 161 ctx.redis.get(`nowplaying:${user.did}`), 162 ctx.redis.get(`nowplaying:${user.did}:status`), 163 ]); 164 return c.json( 165 nowPlaying ? { ...JSON.parse(nowPlaying), is_playing: status === "1" } : {}, 166 ); 167}); 168 169app.get("/now-playings", async (c) => { 170 requestCounter.add(1, { method: "GET", route: "/now-playings" }); 171 const size = +c.req.query("size") || 10; 172 const offset = +c.req.query("offset") || 0; 173 const { data } = await ctx.analytics.post("library.getDistinctScrobbles", { 174 pagination: { 175 skip: offset, 176 take: size, 177 }, 178 }); 179 return c.json(data); 180}); 181 182app.post("/likes", async (c) => { 183 requestCounter.add(1, { method: "POST", route: "/likes" }); 184 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim(); 185 186 if (!bearer || bearer === "null") { 187 c.status(401); 188 return c.text("Unauthorized"); 189 } 190 191 const { did } = jwt.verify(bearer, env.JWT_SECRET, { 192 ignoreExpiration: true, 193 }); 194 const agent = await createAgent(ctx.oauthClient, did); 195 196 const user = await ctx.db 197 .select() 198 .from(users) 199 .where(eq(users.did, did)) 200 .limit(1) 201 .then((rows) => rows[0]); 202 203 if (!user) { 204 c.status(401); 205 return c.text("Unauthorized"); 206 } 207 208 const body = await c.req.json(); 209 const parsed = trackSchema.safeParse(body); 210 211 if (parsed.error) { 212 c.status(400); 213 return c.text("Invalid track data: " + parsed.error.message); 214 } 215 const track = parsed.data; 216 await likeTrack(ctx, track, user, agent); 217 218 return c.json({ status: "ok" }); 219}); 220 221app.delete("/likes/:sha256", async (c) => { 222 requestCounter.add(1, { method: "DELETE", route: "/likes/:sha256" }); 223 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim(); 224 225 if (!bearer || bearer === "null") { 226 c.status(401); 227 return c.text("Unauthorized"); 228 } 229 230 const { did } = jwt.verify(bearer, env.JWT_SECRET, { 231 ignoreExpiration: true, 232 }); 233 const agent = await createAgent(ctx.oauthClient, did); 234 235 const user = await ctx.db 236 .select() 237 .from(users) 238 .where(eq(users.did, did)) 239 .limit(1) 240 .then((rows) => rows[0]); 241 242 if (!user) { 243 c.status(401); 244 return c.text("Unauthorized"); 245 } 246 247 const sha256 = c.req.param("sha256"); 248 await unLikeTrack(ctx, sha256, user, agent); 249 return c.json({ status: "ok" }); 250}); 251 252app.get("/likes", async (c) => { 253 requestCounter.add(1, { method: "GET", route: "/likes" }); 254 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim(); 255 256 if (!bearer || bearer === "null") { 257 c.status(401); 258 return c.text("Unauthorized"); 259 } 260 261 const { did } = jwt.verify(bearer, env.JWT_SECRET, { 262 ignoreExpiration: true, 263 }); 264 265 const user = await ctx.db 266 .select() 267 .from(users) 268 .where(eq(users.did, did)) 269 .limit(1) 270 .then((rows) => rows[0]); 271 272 if (!user) { 273 c.status(401); 274 return c.text("Unauthorized"); 275 } 276 277 const size = +c.req.query("size") || 10; 278 const offset = +c.req.query("offset") || 0; 279 280 const lovedTracks = await getLovedTracks(ctx, user, size, offset); 281 return c.json(lovedTracks); 282}); 283 284app.get("/public/scrobbles", async (c) => { 285 requestCounter.add(1, { method: "GET", route: "/public/scrobbles" }); 286 287 const size = +c.req.query("size") || 10; 288 const offset = +c.req.query("offset") || 0; 289 290 const scrobbleRecords = await ctx.db 291 .select({ 292 scrobble: scrobbles, 293 track: tracks, 294 user: users, 295 }) 296 .from(scrobbles) 297 .innerJoin(tracks, eq(scrobbles.trackId, tracks.id)) 298 .innerJoin(users, eq(scrobbles.userId, users.id)) 299 .orderBy(desc(scrobbles.timestamp)) 300 .limit(size) 301 .offset(offset); 302 303 return c.json( 304 scrobbleRecords.map((item) => ({ 305 cover: item.track.albumArt, 306 artist: item.track.artist, 307 title: item.track.title, 308 date: item.scrobble.timestamp, 309 user: item.user.handle, 310 uri: item.scrobble.uri, 311 albumUri: item.track.albumUri, 312 artistUri: item.track.artistUri, 313 tags: [], 314 listeners: 1, 315 sha256: item.track.sha256, 316 id: item.scrobble.id, 317 })), 318 ); 319}); 320 321app.get("/public/scrobbleschart", async (c) => { 322 requestCounter.add(1, { method: "GET", route: "/public/scrobbleschart" }); 323 324 const did = c.req.query("did"); 325 const artisturi = c.req.query("artisturi"); 326 const albumuri = c.req.query("albumuri"); 327 const songuri = c.req.query("songuri"); 328 329 if (did) { 330 const chart = await ctx.analytics.post("library.getScrobblesPerDay", { 331 user_did: did, 332 }); 333 return c.json(chart.data); 334 } 335 336 if (artisturi) { 337 const chart = await ctx.analytics.post("library.getArtistScrobbles", { 338 artist_id: artisturi, 339 }); 340 return c.json(chart.data); 341 } 342 343 if (albumuri) { 344 const chart = await ctx.analytics.post("library.getAlbumScrobbles", { 345 album_id: albumuri, 346 }); 347 return c.json(chart.data); 348 } 349 350 if (songuri) { 351 let uri = songuri; 352 if (songuri.includes("app.rocksky.scrobble")) { 353 const scrobble = await ctx.db 354 .select({ 355 scrobble: scrobbles, 356 track: tracks, 357 }) 358 .from(scrobbles) 359 .innerJoin(tracks, eq(scrobbles.trackId, tracks.id)) 360 .where(eq(scrobbles.uri, songuri)) 361 .limit(1) 362 .then((rows) => rows[0]); 363 364 uri = scrobble.track.uri; 365 } 366 const chart = await ctx.analytics.post("library.getTrackScrobbles", { 367 track_id: uri, 368 }); 369 return c.json(chart.data); 370 } 371 372 const chart = await ctx.analytics.post("library.getScrobblesPerDay", {}); 373 return c.json(chart.data); 374}); 375 376app.get("/scrobbles", async (c) => { 377 requestCounter.add(1, { method: "GET", route: "/scrobbles" }); 378 379 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim(); 380 381 if (!bearer || bearer === "null") { 382 c.status(401); 383 return c.text("Unauthorized"); 384 } 385 386 const { did } = jwt.verify(bearer, env.JWT_SECRET, { 387 ignoreExpiration: true, 388 }); 389 390 const user = await ctx.db 391 .select() 392 .from(users) 393 .where(eq(users.did, did)) 394 .limit(1) 395 .then((rows) => rows[0]); 396 397 if (!user) { 398 c.status(401); 399 return c.text("Unauthorized"); 400 } 401 402 const size = +c.req.query("size") || 10; 403 const offset = +c.req.query("offset") || 0; 404 405 const userScrobbles = await ctx.db 406 .select({ 407 scrobble: scrobbles, 408 track: tracks, 409 }) 410 .from(scrobbles) 411 .innerJoin(tracks, eq(scrobbles.trackId, tracks.id)) 412 .where(and(eq(scrobbles.userId, user.id), isNotNull(scrobbles.uri))) 413 .orderBy(desc(scrobbles.createdAt)) 414 .limit(size) 415 .offset(offset); 416 417 return c.json(userScrobbles); 418}); 419 420app.post("/tracks", async (c) => { 421 requestCounter.add(1, { method: "POST", route: "/tracks" }); 422 423 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim(); 424 425 if (!bearer || bearer === "null") { 426 c.status(401); 427 return c.text("Unauthorized"); 428 } 429 430 const { did } = jwt.verify(bearer, env.JWT_SECRET, { 431 ignoreExpiration: true, 432 }); 433 434 const user = await ctx.db 435 .select() 436 .from(users) 437 .where(eq(users.did, did)) 438 .limit(1) 439 .then((rows) => rows[0]); 440 441 if (!user) { 442 c.status(401); 443 return c.text("Unauthorized"); 444 } 445 446 const body = await c.req.json(); 447 const parsed = trackSchema.safeParse(body); 448 449 if (parsed.error) { 450 c.status(400); 451 return c.text("Invalid track data: " + parsed.error.message); 452 } 453 454 const track = parsed.data; 455 456 const agent = await createAgent(ctx.oauthClient, did); 457 if (!agent) { 458 c.status(401); 459 return c.text("Unauthorized"); 460 } 461 462 try { 463 await saveTrack(ctx, track, agent); 464 } catch (e) { 465 if (!e.message.includes("duplicate key value violates unique constraint")) { 466 console.error("[tracks]", e.message); 467 } 468 } 469 470 return c.json({ status: "ok" }); 471}); 472 473app.get("/tracks", async (c) => { 474 requestCounter.add(1, { method: "GET", route: "/tracks" }); 475 476 const size = +c.req.query("size") || 100; 477 const offset = +c.req.query("offset") || 0; 478 479 const tracksData = await ctx.analytics.post("library.getTracks", { 480 pagination: { 481 skip: offset, 482 take: size, 483 }, 484 }); 485 486 return c.json(tracksData.data); 487}); 488 489app.get("/albums", async (c) => { 490 requestCounter.add(1, { method: "GET", route: "/albums" }); 491 492 const size = +c.req.query("size") || 100; 493 const offset = +c.req.query("offset") || 0; 494 495 const albumsData = await ctx.analytics.post("library.getAlbums", { 496 pagination: { 497 skip: offset, 498 take: size, 499 }, 500 }); 501 502 return c.json(albumsData.data); 503}); 504 505app.get("/artists", async (c) => { 506 requestCounter.add(1, { method: "GET", route: "/artists" }); 507 508 const size = +c.req.query("size") || 100; 509 const offset = +c.req.query("offset") || 0; 510 511 const artistsData = await ctx.analytics.post("library.getArtists", { 512 pagination: { 513 skip: offset, 514 take: size, 515 }, 516 }); 517 518 return c.json(artistsData.data); 519}); 520 521app.get("/tracks/:sha256", async (c) => { 522 requestCounter.add(1, { method: "GET", route: "/tracks/:sha256" }); 523 524 const sha256 = c.req.param("sha256"); 525 const track = await ctx.db 526 .select() 527 .from(tracks) 528 .where(eq(tracks.sha256, sha256)) 529 .limit(1) 530 .then((rows) => rows[0]); 531 532 return c.json(track); 533}); 534 535app.get("/albums/:sha256", async (c) => { 536 requestCounter.add(1, { method: "GET", route: "/albums/:sha256" }); 537 538 const sha256 = c.req.param("sha256"); 539 const album = await ctx.db 540 .select() 541 .from(albums) 542 .where(eq(albums.sha256, sha256)) 543 .limit(1) 544 .then((rows) => rows[0]); 545 546 return c.json(album); 547}); 548 549app.get("/artists/:sha256", async (c) => { 550 requestCounter.add(1, { method: "GET", route: "/artists/:sha256" }); 551 552 const sha256 = c.req.param("sha256"); 553 const artist = await ctx.db 554 .select() 555 .from(artists) 556 .where(eq(artists.sha256, sha256)) 557 .limit(1) 558 .then((rows) => rows[0]); 559 560 return c.json(artist); 561}); 562 563app.get("/artists/:sha256/tracks", async (c) => { 564 requestCounter.add(1, { method: "GET", route: "/artists/:sha256/tracks" }); 565 const sha256 = c.req.param("sha256"); 566 567 const artistTracksData = await ctx.db 568 .select({ 569 track: tracks, 570 }) 571 .from(artistTracks) 572 .innerJoin(tracks, eq(artistTracks.trackId, tracks.id)) 573 .innerJoin(artists, eq(artistTracks.artistId, artists.id)) 574 .where(eq(artists.sha256, sha256)); 575 576 return c.json(artistTracksData.map((item) => item.track)); 577}); 578 579app.get("/albums/:sha256/tracks", async (c) => { 580 requestCounter.add(1, { method: "GET", route: "/albums/:sha256/tracks" }); 581 const sha256 = c.req.param("sha256"); 582 583 const albumTracksData = await ctx.db 584 .select({ 585 track: tracks, 586 }) 587 .from(albumTracks) 588 .innerJoin(tracks, eq(albumTracks.trackId, tracks.id)) 589 .innerJoin(albums, eq(albumTracks.albumId, albums.id)) 590 .where(eq(albums.sha256, sha256)); 591 592 return c.json(albumTracksData.map((item) => item.track)); 593}); 594 595app.route("/users", usersApp); 596 597app.route("/webscrobbler", webscrobbler); 598 599const server = serve({ 600 fetch: app.fetch, 601 port: 8000, 602}); 603 604injectWebSocket(server);