A decentralized music tracking and discovery platform built on AT Protocol 馃幍
at feat/scrobble-user-avatar 541 lines 14 kB view raw
1import { serve } from "@hono/node-server"; 2import { createNodeWebSocket } from "@hono/node-ws"; 3import { trace } from "@opentelemetry/api"; 4import { equals } from "@xata.io/client"; 5import { ctx } from "context"; 6import { Hono } from "hono"; 7import { cors } from "hono/cors"; 8import jwt from "jsonwebtoken"; 9import { createAgent } from "lib/agent"; 10import { 11 getLovedTracks, 12 likeTrack, 13 unLikeTrack, 14} from "lovedtracks/lovedtracks.service"; 15import { scrobbleTrack } from "nowplaying/nowplaying.service"; 16import { rateLimiter } from "ratelimiter"; 17import subscribe from "subscribers"; 18import { saveTrack } from "tracks/tracks.service"; 19import { trackSchema } from "types/track"; 20import handleWebsocket from "websocket/handler"; 21import apikeys from "./apikeys/app"; 22import bsky from "./bsky/app"; 23import dropbox from "./dropbox/app"; 24import googledrive from "./googledrive/app"; 25import { env } from "./lib/env"; 26import { requestCounter, requestDuration } from "./metrics"; 27import "./profiling"; 28import search from "./search/app"; 29import spotify from "./spotify/app"; 30import "./tracing"; 31import users from "./users/app"; 32import webscrobbler from "./webscrobbler/app"; 33 34subscribe(ctx); 35 36const app = new Hono(); 37const { injectWebSocket, upgradeWebSocket } = createNodeWebSocket({ app }); 38 39app.use( 40 "*", 41 rateLimiter({ 42 limit: 1000, 43 window: 30, // 馃憟 30 seconds 44 }), 45); 46 47app.use("*", async (c, next) => { 48 const span = trace.getActiveSpan(); 49 span?.setAttribute("http.route", c.req.path); 50 await next(); 51}); 52 53app.use("*", async (c, next) => { 54 const start = Date.now(); 55 await next(); 56 const duration = (Date.now() - start) / 1000; 57 requestDuration.record(duration, { 58 route: c.req.path, 59 method: c.req.method, 60 }); 61}); 62 63app.use(cors()); 64 65app.route("/", bsky); 66 67app.route("/spotify", spotify); 68 69app.route("/dropbox", dropbox); 70 71app.route("/googledrive", googledrive); 72 73app.route("/apikeys", apikeys); 74 75app.get("/ws", upgradeWebSocket(handleWebsocket)); 76 77app.get("/", async (c) => { 78 return c.json({ status: "ok" }); 79}); 80 81app.post("/now-playing", async (c) => { 82 requestCounter.add(1, { method: "POST", route: "/now-playing" }); 83 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim(); 84 85 if (!bearer || bearer === "null") { 86 c.status(401); 87 return c.text("Unauthorized"); 88 } 89 90 const { did } = jwt.verify(bearer, env.JWT_SECRET, { 91 ignoreExpiration: true, 92 }); 93 94 const user = await ctx.client.db.users.filter("did", equals(did)).getFirst(); 95 if (!user) { 96 c.status(401); 97 return c.text("Unauthorized"); 98 } 99 100 const body = await c.req.json(); 101 const parsed = trackSchema.safeParse(body); 102 103 if (parsed.error) { 104 c.status(400); 105 return c.text("Invalid track data: " + parsed.error.message); 106 } 107 const track = parsed.data; 108 109 const agent = await createAgent(ctx.oauthClient, did); 110 if (!agent) { 111 c.status(401); 112 return c.text("Unauthorized"); 113 } 114 115 await scrobbleTrack(ctx, track, agent, user.did); 116 117 return c.json({ status: "ok" }); 118}); 119 120app.get("/now-playing", async (c) => { 121 requestCounter.add(1, { method: "GET", route: "/now-playing" }); 122 123 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim(); 124 125 const payload = 126 bearer && bearer !== "null" 127 ? jwt.verify(bearer, env.JWT_SECRET, { ignoreExpiration: true }) 128 : {}; 129 const did = c.req.query("did") || payload.did; 130 131 if (!did) { 132 c.status(401); 133 return c.text("Unauthorized"); 134 } 135 136 const user = await ctx.client.db.users 137 .filter({ 138 $any: [{ did }, { handle: did }], 139 }) 140 .getFirst(); 141 142 if (!user) { 143 c.status(401); 144 return c.text("Unauthorized"); 145 } 146 147 const [nowPlaying, status] = await Promise.all([ 148 ctx.redis.get(`nowplaying:${user.did}`), 149 ctx.redis.get(`nowplaying:${user.did}:status`), 150 ]); 151 return c.json( 152 nowPlaying ? { ...JSON.parse(nowPlaying), is_playing: status === "1" } : {}, 153 ); 154}); 155 156app.get("/now-playings", async (c) => { 157 requestCounter.add(1, { method: "GET", route: "/now-playings" }); 158 const size = +c.req.query("size") || 10; 159 const offset = +c.req.query("offset") || 0; 160 const { data } = await ctx.analytics.post("library.getDistinctScrobbles", { 161 pagination: { 162 skip: offset, 163 take: size, 164 }, 165 }); 166 return c.json(data); 167}); 168 169app.post("/likes", async (c) => { 170 requestCounter.add(1, { method: "POST", route: "/likes" }); 171 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim(); 172 173 if (!bearer || bearer === "null") { 174 c.status(401); 175 return c.text("Unauthorized"); 176 } 177 178 const { did } = jwt.verify(bearer, env.JWT_SECRET, { 179 ignoreExpiration: true, 180 }); 181 const agent = await createAgent(ctx.oauthClient, did); 182 183 const user = await ctx.client.db.users.filter("did", equals(did)).getFirst(); 184 if (!user) { 185 c.status(401); 186 return c.text("Unauthorized"); 187 } 188 189 const body = await c.req.json(); 190 const parsed = trackSchema.safeParse(body); 191 192 if (parsed.error) { 193 c.status(400); 194 return c.text("Invalid track data: " + parsed.error.message); 195 } 196 const track = parsed.data; 197 await likeTrack(ctx, track, user, agent); 198 199 return c.json({ status: "ok" }); 200}); 201 202app.delete("/likes/:sha256", async (c) => { 203 requestCounter.add(1, { method: "DELETE", route: "/likes/:sha256" }); 204 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim(); 205 206 if (!bearer || bearer === "null") { 207 c.status(401); 208 return c.text("Unauthorized"); 209 } 210 211 const { did } = jwt.verify(bearer, env.JWT_SECRET, { 212 ignoreExpiration: true, 213 }); 214 const agent = await createAgent(ctx.oauthClient, did); 215 216 const user = await ctx.client.db.users.filter("did", equals(did)).getFirst(); 217 if (!user) { 218 c.status(401); 219 return c.text("Unauthorized"); 220 } 221 222 const sha256 = c.req.param("sha256"); 223 await unLikeTrack(ctx, sha256, user, agent); 224 return c.json({ status: "ok" }); 225}); 226 227app.get("/likes", async (c) => { 228 requestCounter.add(1, { method: "GET", route: "/likes" }); 229 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim(); 230 231 if (!bearer || bearer === "null") { 232 c.status(401); 233 return c.text("Unauthorized"); 234 } 235 236 const { did } = jwt.verify(bearer, env.JWT_SECRET, { 237 ignoreExpiration: true, 238 }); 239 240 const user = await ctx.client.db.users.filter("did", equals(did)).getFirst(); 241 if (!user) { 242 c.status(401); 243 return c.text("Unauthorized"); 244 } 245 246 const size = +c.req.query("size") || 10; 247 const offset = +c.req.query("offset") || 0; 248 249 const lovedTracks = await getLovedTracks(ctx, user, size, offset); 250 return c.json(lovedTracks); 251}); 252 253app.get("/public/scrobbles", async (c) => { 254 requestCounter.add(1, { method: "GET", route: "/public/scrobbles" }); 255 256 const size = +c.req.query("size") || 10; 257 const offset = +c.req.query("offset") || 0; 258 259 const scrobbles = await ctx.client.db.scrobbles 260 .select(["track_id.*", "user_id.*", "timestamp", "xata_createdat", "uri"]) 261 .sort("timestamp", "desc") 262 .getPaginated({ 263 pagination: { 264 size, 265 offset, 266 }, 267 }); 268 269 return c.json( 270 scrobbles.records.map((item) => ({ 271 cover: item.track_id.album_art, 272 artist: item.track_id.artist, 273 title: item.track_id.title, 274 date: item.timestamp, 275 user: item.user_id.handle, 276 uri: item.uri, 277 albumUri: item.track_id.album_uri, 278 artistUri: item.track_id.artist_uri, 279 tags: [], 280 listeners: 1, 281 sha256: item.track_id.sha256, 282 id: item.xata_id, 283 })), 284 ); 285}); 286 287app.get("/public/scrobbleschart", async (c) => { 288 requestCounter.add(1, { method: "GET", route: "/public/scrobbleschart" }); 289 290 const did = c.req.query("did"); 291 const artisturi = c.req.query("artisturi"); 292 const albumuri = c.req.query("albumuri"); 293 const songuri = c.req.query("songuri"); 294 295 if (did) { 296 const chart = await ctx.analytics.post("library.getScrobblesPerDay", { 297 user_did: did, 298 }); 299 return c.json(chart.data); 300 } 301 302 if (artisturi) { 303 const chart = await ctx.analytics.post("library.getArtistScrobbles", { 304 artist_id: artisturi, 305 }); 306 return c.json(chart.data); 307 } 308 309 if (albumuri) { 310 const chart = await ctx.analytics.post("library.getAlbumScrobbles", { 311 album_id: albumuri, 312 }); 313 return c.json(chart.data); 314 } 315 316 if (songuri) { 317 let uri = songuri; 318 if (songuri.includes("app.rocksky.scrobble")) { 319 const scrobble = await ctx.client.db.scrobbles 320 .select(["track_id.*", "uri"]) 321 .filter("uri", equals(songuri)) 322 .getFirst(); 323 324 uri = scrobble.track_id.uri; 325 } 326 const chart = await ctx.analytics.post("library.getTrackScrobbles", { 327 track_id: uri, 328 }); 329 return c.json(chart.data); 330 } 331 332 const chart = await ctx.analytics.post("library.getScrobblesPerDay", {}); 333 return c.json(chart.data); 334}); 335 336app.get("/scrobbles", async (c) => { 337 requestCounter.add(1, { method: "GET", route: "/scrobbles" }); 338 339 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim(); 340 341 if (!bearer || bearer === "null") { 342 c.status(401); 343 return c.text("Unauthorized"); 344 } 345 346 const { did } = jwt.verify(bearer, env.JWT_SECRET, { 347 ignoreExpiration: true, 348 }); 349 350 const user = await ctx.client.db.users.filter("did", equals(did)).getFirst(); 351 if (!user) { 352 c.status(401); 353 return c.text("Unauthorized"); 354 } 355 356 const size = +c.req.query("size") || 10; 357 const offset = +c.req.query("offset") || 0; 358 359 const scrobbles = await ctx.client.db.scrobbles 360 .select(["track_id.*", "uri"]) 361 .filter("user_id", equals(user.xata_id)) 362 .filter({ 363 $not: [ 364 { 365 uri: null, 366 }, 367 ], 368 }) 369 .sort("xata_createdat", "desc") 370 .getPaginated({ 371 pagination: { 372 size, 373 offset, 374 }, 375 }); 376 377 return c.json(scrobbles.records); 378}); 379 380app.post("/tracks", async (c) => { 381 requestCounter.add(1, { method: "POST", route: "/tracks" }); 382 383 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim(); 384 385 if (!bearer || bearer === "null") { 386 c.status(401); 387 return c.text("Unauthorized"); 388 } 389 390 const { did } = jwt.verify(bearer, env.JWT_SECRET, { 391 ignoreExpiration: true, 392 }); 393 394 const user = await ctx.client.db.users.filter("did", equals(did)).getFirst(); 395 if (!user) { 396 c.status(401); 397 return c.text("Unauthorized"); 398 } 399 400 const body = await c.req.json(); 401 const parsed = trackSchema.safeParse(body); 402 403 if (parsed.error) { 404 c.status(400); 405 return c.text("Invalid track data: " + parsed.error.message); 406 } 407 408 const track = parsed.data; 409 410 const agent = await createAgent(ctx.oauthClient, did); 411 if (!agent) { 412 c.status(401); 413 return c.text("Unauthorized"); 414 } 415 416 try { 417 await saveTrack(ctx, track, agent); 418 } catch (e) { 419 if (!e.message.includes("invalid record: column [sha256]: is not unique")) { 420 console.error("[spotify user]", e.message); 421 } 422 } 423 424 return c.json({ status: "ok" }); 425}); 426 427app.get("/tracks", async (c) => { 428 requestCounter.add(1, { method: "GET", route: "/tracks" }); 429 430 const size = +c.req.query("size") || 100; 431 const offset = +c.req.query("offset") || 0; 432 433 const tracks = await ctx.analytics.post("library.getTracks", { 434 pagination: { 435 skip: offset, 436 take: size, 437 }, 438 }); 439 440 return c.json(tracks.data); 441}); 442 443app.get("/albums", async (c) => { 444 requestCounter.add(1, { method: "GET", route: "/albums" }); 445 446 const size = +c.req.query("size") || 100; 447 const offset = +c.req.query("offset") || 0; 448 449 const albums = await ctx.analytics.post("library.getAlbums", { 450 pagination: { 451 skip: offset, 452 take: size, 453 }, 454 }); 455 456 return c.json(albums.data); 457}); 458 459app.get("/artists", async (c) => { 460 requestCounter.add(1, { method: "GET", route: "/artists" }); 461 462 const size = +c.req.query("size") || 100; 463 const offset = +c.req.query("offset") || 0; 464 465 const artists = await ctx.analytics.post("library.getArtists", { 466 pagination: { 467 skip: offset, 468 take: size, 469 }, 470 }); 471 472 return c.json(artists.data); 473}); 474 475app.get("/tracks/:sha256", async (c) => { 476 requestCounter.add(1, { method: "GET", route: "/tracks/:sha256" }); 477 478 const sha256 = c.req.param("sha256"); 479 const track = await ctx.client.db.tracks 480 .filter("sha256", equals(sha256)) 481 .getFirst(); 482 return c.json(track); 483}); 484 485app.get("/albums/:sha256", async (c) => { 486 requestCounter.add(1, { method: "GET", route: "/albums/:sha256" }); 487 488 const sha256 = c.req.param("sha256"); 489 const album = await ctx.client.db.albums 490 .filter("sha256", equals(sha256)) 491 .getFirst(); 492 493 return c.json(album); 494}); 495 496app.get("/artists/:sha256", async (c) => { 497 requestCounter.add(1, { method: "GET", route: "/artists/:sha256" }); 498 499 const sha256 = c.req.param("sha256"); 500 const artist = await ctx.client.db.artists 501 .filter("sha256", equals(sha256)) 502 .getFirst(); 503 504 return c.json(artist); 505}); 506 507app.get("/artists/:sha256/tracks", async (c) => { 508 requestCounter.add(1, { method: "GET", route: "/artists/:sha256/tracks" }); 509 const sha256 = c.req.param("sha256"); 510 511 const tracks = await ctx.client.db.artist_tracks 512 .select(["track_id.*"]) 513 .filter("artist_id.sha256", equals(sha256)) 514 .getAll(); 515 516 return c.json(tracks); 517}); 518 519app.get("/albums/:sha256/tracks", async (c) => { 520 requestCounter.add(1, { method: "GET", route: "/albums/:sha256/tracks" }); 521 const sha256 = c.req.param("sha256"); 522 const tracks = await ctx.client.db.album_tracks 523 .select(["track_id.*"]) 524 .filter("album_id.sha256", equals(sha256)) 525 .getAll(); 526 527 return c.json(tracks); 528}); 529 530app.route("/users", users); 531 532app.route("/search", search); 533 534app.route("/webscrobbler", webscrobbler); 535 536const server = serve({ 537 fetch: app.fetch, 538 port: 8000, 539}); 540 541injectWebSocket(server);