A decentralized music tracking and discovery platform built on AT Protocol 馃幍
at main 944 lines 26 kB view raw
1import type { Agent } from "@atproto/api"; 2import { TID } from "@atproto/common"; 3import chalk from "chalk"; 4import { consola } from "consola"; 5import type { Context } from "context"; 6import dayjs from "dayjs"; 7import { and, eq, gte, lte, or } from "drizzle-orm"; 8import * as Album from "lexicon/types/app/rocksky/album"; 9import * as Artist from "lexicon/types/app/rocksky/artist"; 10import * as Scrobble from "lexicon/types/app/rocksky/scrobble"; 11import * as Song from "lexicon/types/app/rocksky/song"; 12import { deepSnakeCaseKeys } from "lib"; 13import { createHash } from "node:crypto"; 14import type { MusicbrainzTrack, Track } from "types/track"; 15import albumTracks from "../schema/album-tracks"; 16import albums from "../schema/albums"; 17import artistAlbums from "../schema/artist-albums"; 18import artistTracks from "../schema/artist-tracks"; 19import artists from "../schema/artists"; 20import scrobbles from "../schema/scrobbles"; 21import tracks from "../schema/tracks"; 22import userAlbums from "../schema/user-albums"; 23import userArtists from "../schema/user-artists"; 24import userTracks from "../schema/user-tracks"; 25import users from "../schema/users"; 26import tealfm from "../tealfm"; 27 28export async function putArtistRecord( 29 track: Track, 30 agent: Agent, 31): Promise<string | null> { 32 const rkey = TID.nextStr(); 33 const record: Artist.Record = { 34 $type: "app.rocksky.artist", 35 name: track.albumArtist, 36 createdAt: new Date().toISOString(), 37 pictureUrl: track.artistPicture || undefined, 38 tags: track.genres || [], 39 }; 40 41 if (!Artist.validateRecord(record).success) { 42 consola.info(Artist.validateRecord(record)); 43 consola.info(JSON.stringify(record, null, 2)); 44 throw new Error("Invalid record"); 45 } 46 47 try { 48 const res = await agent.com.atproto.repo.putRecord({ 49 repo: agent.assertDid, 50 collection: "app.rocksky.artist", 51 rkey, 52 record, 53 validate: false, 54 }); 55 const uri = res.data.uri; 56 consola.info(`Artist record created at ${uri}`); 57 return uri; 58 } catch (e) { 59 consola.error("Error creating artist record", e); 60 return null; 61 } 62} 63 64export async function putAlbumRecord( 65 track: Track, 66 agent: Agent, 67): Promise<string | null> { 68 const rkey = TID.nextStr(); 69 70 const record = { 71 $type: "app.rocksky.album", 72 title: track.album, 73 artist: track.albumArtist, 74 year: track.year === null ? undefined : track.year, 75 releaseDate: track.releaseDate 76 ? track.releaseDate.toISOString() 77 : undefined, 78 createdAt: new Date().toISOString(), 79 albumArtUrl: track.albumArt, 80 }; 81 82 if (!Album.validateRecord(record).success) { 83 consola.info(Album.validateRecord(record)); 84 consola.info(JSON.stringify(record, null, 2)); 85 throw new Error("Invalid record"); 86 } 87 88 try { 89 const res = await agent.com.atproto.repo.putRecord({ 90 repo: agent.assertDid, 91 collection: "app.rocksky.album", 92 rkey, 93 record, 94 validate: false, 95 }); 96 const uri = res.data.uri; 97 consola.info(`Album record created at ${uri}`); 98 return uri; 99 } catch (e) { 100 consola.error("Error creating album record", e); 101 return null; 102 } 103} 104 105export async function putSongRecord( 106 track: Track, 107 agent: Agent, 108): Promise<string | null> { 109 const rkey = TID.nextStr(); 110 111 const record: Song.Record = { 112 $type: "app.rocksky.song", 113 title: track.title, 114 artist: track.artist, 115 artists: track.artists === null ? undefined : track.artists, 116 album: track.album, 117 albumArtist: track.albumArtist, 118 duration: track.duration, 119 releaseDate: track.releaseDate 120 ? track.releaseDate.toISOString() 121 : undefined, 122 year: track.year === null ? undefined : track.year, 123 albumArtUrl: track.albumArt, 124 composer: track.composer ? track.composer : undefined, 125 lyrics: track.lyrics ? track.lyrics : undefined, 126 trackNumber: track.trackNumber, 127 discNumber: track.discNumber === 0 ? 1 : track.discNumber, 128 copyrightMessage: track.copyrightMessage 129 ? track.copyrightMessage 130 : undefined, 131 createdAt: new Date().toISOString(), 132 spotifyLink: track.spotifyLink ? track.spotifyLink : undefined, 133 tags: track.genres || [], 134 mbid: track.mbId, 135 }; 136 137 if (!Song.validateRecord(record).success) { 138 consola.info(Song.validateRecord(record)); 139 consola.info(chalk.cyan(JSON.stringify(record, null, 2))); 140 throw new Error("Invalid record"); 141 } 142 143 try { 144 const res = await agent.com.atproto.repo.putRecord({ 145 repo: agent.assertDid, 146 collection: "app.rocksky.song", 147 rkey, 148 record, 149 validate: false, 150 }); 151 const uri = res.data.uri; 152 consola.info(`Song record created at ${uri}`); 153 return uri; 154 } catch (e) { 155 consola.error("Error creating song record", e); 156 return null; 157 } 158} 159 160async function putScrobbleRecord( 161 track: Track, 162 agent: Agent, 163): Promise<string | null> { 164 const rkey = TID.nextStr(); 165 166 const record: Scrobble.Record = { 167 $type: "app.rocksky.scrobble", 168 title: track.title, 169 albumArtist: track.albumArtist, 170 albumArtUrl: track.albumArt, 171 artist: track.artist, 172 artists: track.artists === null ? undefined : track.artists, 173 album: track.album, 174 duration: track.duration, 175 trackNumber: track.trackNumber, 176 discNumber: track.discNumber === 0 ? 1 : track.discNumber, 177 releaseDate: track.releaseDate 178 ? track.releaseDate.toISOString() 179 : undefined, 180 year: track.year === null ? undefined : track.year, 181 composer: track.composer ? track.composer : undefined, 182 lyrics: track.lyrics ? track.lyrics : undefined, 183 copyrightMessage: track.copyrightMessage 184 ? track.copyrightMessage 185 : undefined, 186 // if track.timestamp is not null, set it to the timestamp 187 createdAt: track.timestamp 188 ? dayjs.unix(track.timestamp).toISOString() 189 : new Date().toISOString(), 190 spotifyLink: track.spotifyLink ? track.spotifyLink : undefined, 191 tags: track.genres || [], 192 mbid: track.mbId, 193 }; 194 195 if (!Scrobble.validateRecord(record).success) { 196 consola.info(Scrobble.validateRecord(record)); 197 consola.info(JSON.stringify(record, null, 2)); 198 throw new Error("Invalid record"); 199 } 200 201 try { 202 const res = await agent.com.atproto.repo.putRecord({ 203 repo: agent.assertDid, 204 collection: "app.rocksky.scrobble", 205 rkey, 206 record, 207 validate: false, 208 }); 209 const uri = res.data.uri; 210 consola.info(`Scrobble record created at ${uri}`); 211 return uri; 212 } catch (e) { 213 consola.error("Error creating scrobble record", e); 214 return null; 215 } 216} 217 218export async function publishScrobble(ctx: Context, id: string) { 219 const scrobble = await ctx.db 220 .select({ 221 scrobble: scrobbles, 222 track: tracks, 223 album: albums, 224 artist: artists, 225 user: users, 226 }) 227 .from(scrobbles) 228 .innerJoin(tracks, eq(scrobbles.trackId, tracks.id)) 229 .innerJoin(albums, eq(scrobbles.albumId, albums.id)) 230 .innerJoin(artists, eq(scrobbles.artistId, artists.id)) 231 .innerJoin(users, eq(scrobbles.userId, users.id)) 232 .where(eq(scrobbles.id, id)) 233 .limit(1) 234 .then((rows) => rows[0]); 235 236 const [ 237 _user_album, 238 _user_artist, 239 _user_track, 240 album_track, 241 artist_track, 242 artist_album, 243 ] = await Promise.all([ 244 ctx.db 245 .select() 246 .from(userAlbums) 247 .where(eq(userAlbums.albumId, scrobble.album.id)) 248 .limit(1) 249 .then((rows) => rows[0]), 250 ctx.db 251 .select() 252 .from(userArtists) 253 .where(eq(userArtists.artistId, scrobble.artist.id)) 254 .limit(1) 255 .then((rows) => rows[0]), 256 ctx.db 257 .select() 258 .from(userTracks) 259 .where(eq(userTracks.trackId, scrobble.track.id)) 260 .limit(1) 261 .then((rows) => rows[0]), 262 ctx.db 263 .select() 264 .from(albumTracks) 265 .where(eq(albumTracks.trackId, scrobble.track.id)) 266 .limit(1) 267 .then((rows) => rows[0]), 268 ctx.db 269 .select() 270 .from(artistTracks) 271 .where(eq(artistTracks.trackId, scrobble.track.id)) 272 .limit(1) 273 .then((rows) => rows[0]), 274 ctx.db 275 .select() 276 .from(artistAlbums) 277 .where( 278 and( 279 eq(artistAlbums.albumId, scrobble.album.id), 280 eq(artistAlbums.artistId, scrobble.artist.id), 281 ), 282 ) 283 .limit(1) 284 .then((rows) => rows[0]), 285 ]); 286 287 let user_artist = _user_artist; 288 if (!user_artist) { 289 await ctx.db.insert(userArtists).values({ 290 userId: scrobble.user.id, 291 artistId: scrobble.artist.id, 292 uri: scrobble.artist.uri, 293 scrobbles: 1, 294 }); 295 user_artist = await ctx.db 296 .select() 297 .from(userArtists) 298 .where(eq(userArtists.artistId, scrobble.artist.id)) 299 .limit(1) 300 .then((rows) => rows[0]); 301 } 302 303 let user_album = _user_album; 304 if (!user_album) { 305 await ctx.db.insert(userAlbums).values({ 306 userId: scrobble.user.id, 307 albumId: scrobble.album.id, 308 uri: scrobble.album.uri, 309 scrobbles: 1, 310 }); 311 user_album = await ctx.db 312 .select() 313 .from(userAlbums) 314 .where(eq(userAlbums.albumId, scrobble.album.id)) 315 .limit(1) 316 .then((rows) => rows[0]); 317 } 318 319 let user_track = _user_track; 320 if (!user_track) { 321 await ctx.db.insert(userTracks).values({ 322 userId: scrobble.user.id, 323 trackId: scrobble.track.id, 324 uri: scrobble.track.uri, 325 scrobbles: 1, 326 }); 327 user_track = await ctx.db 328 .select() 329 .from(userTracks) 330 .where(eq(userTracks.trackId, scrobble.track.id)) 331 .limit(1) 332 .then((rows) => rows[0]); 333 } 334 335 const message = JSON.stringify( 336 deepSnakeCaseKeys({ 337 scrobble: { 338 ...scrobble.scrobble, 339 album_id: { 340 ...scrobble.album, 341 xata_id: scrobble.album.id, 342 xata_createdat: scrobble.album.createdAt.toISOString(), 343 xata_updatedat: scrobble.album.updatedAt.toISOString(), 344 }, 345 artist_id: { 346 ...scrobble.artist, 347 xata_id: scrobble.artist.id, 348 xata_createdat: scrobble.artist.createdAt.toISOString(), 349 xata_updatedat: scrobble.artist.updatedAt.toISOString(), 350 }, 351 track_id: { 352 ...scrobble.track, 353 xata_id: scrobble.track.id, 354 xata_createdat: scrobble.track.createdAt.toISOString(), 355 xata_updatedat: scrobble.track.updatedAt.toISOString(), 356 }, 357 user_id: { 358 ...scrobble.user, 359 xata_id: scrobble.user.id, 360 xata_createdat: scrobble.user.createdAt.toISOString(), 361 xata_updatedat: scrobble.user.updatedAt.toISOString(), 362 }, 363 xata_id: scrobble.scrobble.id, 364 xata_createdat: scrobble.scrobble.createdAt.toISOString(), 365 xata_updatedat: scrobble.scrobble.updatedAt.toISOString(), 366 timestamp: scrobble.scrobble.timestamp 367 ? scrobble.scrobble.timestamp.toISOString() 368 : scrobble.scrobble.createdAt.toISOString(), 369 }, 370 user_album: { 371 ...user_album, 372 album_id: { 373 xata_id: scrobble.album.id, 374 }, 375 user_id: { 376 xata_id: scrobble.user.id, 377 }, 378 xata_id: user_album.id, 379 xata_createdat: user_album.createdAt.toISOString(), 380 xata_updatedat: user_album.updatedAt.toISOString(), 381 }, 382 user_artist: { 383 ...user_artist, 384 artist_id: { 385 xata_id: scrobble.artist.id, 386 }, 387 user_id: { 388 xata_id: scrobble.user.id, 389 }, 390 xata_id: user_artist.id, 391 xata_createdat: user_artist.createdAt.toISOString(), 392 xata_updatedat: user_artist.updatedAt.toISOString(), 393 }, 394 user_track: { 395 ...user_track, 396 track_id: { 397 xata_id: scrobble.track.id, 398 }, 399 user_id: { 400 xata_id: scrobble.user.id, 401 }, 402 xata_id: user_track.id, 403 xata_createdat: user_track.createdAt.toISOString(), 404 xata_updatedat: user_track.updatedAt.toISOString(), 405 }, 406 album_track: { 407 ...album_track, 408 album_id: { 409 xata_id: scrobble.album.id, 410 }, 411 track_id: { 412 xata_id: scrobble.track.id, 413 }, 414 xata_id: album_track.id, 415 xata_createdat: album_track.createdAt.toISOString(), 416 xata_updatedat: album_track.updatedAt.toISOString(), 417 }, 418 artist_track: { 419 ...artist_track, 420 artist_id: { 421 xata_id: scrobble.artist.id, 422 }, 423 track_id: { 424 xata_id: scrobble.track.id, 425 }, 426 xata_id: artist_track.id, 427 xata_createdat: artist_track.createdAt.toISOString(), 428 xata_updatedat: artist_track.updatedAt.toISOString(), 429 }, 430 artist_album: { 431 ...artist_album, 432 artist_id: { 433 xata_id: scrobble.artist.id, 434 }, 435 album_id: { 436 xata_id: scrobble.album.id, 437 }, 438 xata_id: artist_album.id, 439 xata_createdat: artist_album.createdAt.toISOString(), 440 xata_updatedat: artist_album.updatedAt.toISOString(), 441 }, 442 }), 443 null, 444 2, 445 ); 446 447 ctx.nc.publish( 448 "rocksky.scrobble", 449 Buffer.from(message.replaceAll("sha_256", "sha256")), 450 ); 451 452 const trackMessage = JSON.stringify( 453 deepSnakeCaseKeys({ 454 track: { 455 ...scrobble.track, 456 xata_id: scrobble.track.id, 457 xata_createdat: scrobble.track.createdAt.toISOString(), 458 xata_updatedat: scrobble.track.updatedAt.toISOString(), 459 }, 460 album_track: { 461 ...album_track, 462 album_id: { 463 xata_id: album_track.albumId, 464 }, 465 track_id: { 466 xata_id: album_track.trackId, 467 }, 468 xata_id: album_track.id, 469 xata_createdat: album_track.createdAt.toISOString(), 470 xata_updatedat: album_track.updatedAt.toISOString(), 471 }, 472 artist_track: { 473 ...artist_track, 474 artist_id: { 475 xata_id: artist_track.artistId, 476 }, 477 track_id: { 478 xata_id: artist_track.trackId, 479 }, 480 xata_id: artist_track.id, 481 xata_createdat: artist_track.createdAt.toISOString(), 482 xata_updatedat: artist_track.updatedAt.toISOString(), 483 }, 484 artist_album: { 485 ...artist_album, 486 artist_id: { 487 xata_id: artist_album.artistId, 488 }, 489 album_id: { 490 xata_id: artist_album.albumId, 491 }, 492 xata_id: artist_album.id, 493 xata_createdat: artist_album.createdAt.toISOString(), 494 xata_updatedat: artist_album.updatedAt.toISOString(), 495 }, 496 }), 497 ); 498 499 ctx.nc.publish( 500 "rocksky.track", 501 Buffer.from(trackMessage.replaceAll("sha_256", "sha256")), 502 ); 503} 504 505export async function scrobbleTrack( 506 ctx: Context, 507 track: Track, 508 agent: Agent, 509 userDid: string, 510): Promise<void> { 511 // check if scrobble already exists (user did + timestamp) 512 const scrobbleTime = dayjs.unix(track.timestamp || dayjs().unix()); 513 const existingScrobble = await ctx.db 514 .select({ 515 scrobble: scrobbles, 516 user: users, 517 track: tracks, 518 }) 519 .from(scrobbles) 520 .innerJoin(users, eq(scrobbles.userId, users.id)) 521 .innerJoin(tracks, eq(scrobbles.trackId, tracks.id)) 522 .where( 523 and( 524 eq(users.did, userDid), 525 eq(tracks.title, track.title), 526 eq(tracks.artist, track.artist), 527 gte(scrobbles.timestamp, scrobbleTime.subtract(60, "seconds").toDate()), 528 lte(scrobbles.timestamp, scrobbleTime.add(60, "seconds").toDate()), 529 ), 530 ) 531 .limit(1) 532 .then((rows) => rows[0]); 533 534 if (existingScrobble) { 535 consola.info( 536 `Scrobble already exists for ${chalk.cyan(track.title)} at ${chalk.cyan( 537 scrobbleTime.format("YYYY-MM-DD HH:mm:ss"), 538 )}`, 539 ); 540 return; 541 } 542 543 let existingTrack = await ctx.db 544 .select() 545 .from(tracks) 546 .where( 547 eq( 548 tracks.sha256, 549 createHash("sha256") 550 .update( 551 `${track.title} - ${track.artist} - ${track.album}`.toLowerCase(), 552 ) 553 .digest("hex"), 554 ), 555 ) 556 .limit(1) 557 .then((rows) => rows[0]); 558 559 if (existingTrack && !existingTrack.albumUri) { 560 const album = await ctx.db 561 .select() 562 .from(albums) 563 .where( 564 eq( 565 albums.sha256, 566 createHash("sha256") 567 .update(`${track.album} - ${track.albumArtist}`.toLowerCase()) 568 .digest("hex"), 569 ), 570 ) 571 .limit(1) 572 .then((rows) => rows[0]); 573 if (album) { 574 await ctx.db 575 .update(tracks) 576 .set({ albumUri: album.uri }) 577 .where(eq(tracks.id, existingTrack.id)); 578 } 579 } 580 581 if (existingTrack && !existingTrack.artistUri) { 582 const artist = await ctx.db 583 .select() 584 .from(artists) 585 .where( 586 eq( 587 artists.sha256, 588 createHash("sha256") 589 .update(track.albumArtist.toLowerCase()) 590 .digest("hex"), 591 ), 592 ) 593 .limit(1) 594 .then((rows) => rows[0]); 595 if (artist) { 596 await ctx.db 597 .update(tracks) 598 .set({ artistUri: artist.uri }) 599 .where(eq(tracks.id, existingTrack.id)); 600 } 601 } 602 603 const userTrack = await ctx.db 604 .select({ 605 userTrack: userTracks, 606 track: tracks, 607 user: users, 608 }) 609 .from(userTracks) 610 .innerJoin(tracks, eq(userTracks.trackId, tracks.id)) 611 .innerJoin(users, eq(userTracks.userId, users.id)) 612 .where(and(eq(tracks.id, existingTrack?.id || ""), eq(users.did, userDid))) 613 .limit(1) 614 .then((rows) => rows[0]); 615 616 let mbTrack; 617 try { 618 const { data } = await ctx.musicbrainz.post<MusicbrainzTrack>("/hydrate", { 619 artist: track.artist 620 .replaceAll(";", ",") 621 .split(",") 622 .map((a) => ({ name: a.trim() })), 623 name: track.title, 624 album: track.album, 625 }); 626 mbTrack = data; 627 628 if (!mbTrack?.trackMBID) { 629 const response = await ctx.musicbrainz.post<MusicbrainzTrack>( 630 "/hydrate", 631 { 632 artist: track.artist.split(",").map((a) => ({ name: a.trim() })), 633 name: track.title, 634 }, 635 ); 636 mbTrack = response.data; 637 } 638 639 track.mbId = mbTrack?.trackMBID; 640 track.artists = mbTrack?.artist?.map((artist) => ({ 641 mbid: artist.mbid, 642 name: artist.name, 643 })); 644 } catch (error) { 645 consola.error("Error fetching MusicBrainz data"); 646 } 647 648 if (!existingTrack?.uri || !userTrack?.userTrack.uri?.includes(userDid)) { 649 await putSongRecord(track, agent); 650 } 651 652 const existingAlbum = await ctx.db 653 .select() 654 .from(albums) 655 .where( 656 eq( 657 albums.sha256, 658 createHash("sha256") 659 .update(`${track.album} - ${track.albumArtist}`.toLowerCase()) 660 .digest("hex"), 661 ), 662 ) 663 .limit(1) 664 .then((rows) => rows[0]); 665 666 let tries = 0; 667 while (!existingTrack && tries < 30) { 668 consola.info(`Song not found, trying again: ${chalk.magenta(tries + 1)}`); 669 existingTrack = await ctx.db 670 .select() 671 .from(tracks) 672 .where( 673 eq( 674 tracks.sha256, 675 createHash("sha256") 676 .update( 677 `${track.title} - ${track.artist} - ${track.album}`.toLowerCase(), 678 ) 679 .digest("hex"), 680 ), 681 ) 682 .limit(1) 683 .then((rows) => rows[0]); 684 await new Promise((resolve) => setTimeout(resolve, 1000)); 685 tries += 1; 686 } 687 688 if (tries === 30 && !existingTrack) { 689 consola.info(`Song not found after ${chalk.magenta("30 tries")}`); 690 } 691 692 if (existingTrack) { 693 consola.info( 694 `Song found: ${chalk.cyan(existingTrack.id)} - ${track.title}, after ${chalk.magenta(tries)} tries`, 695 ); 696 } 697 698 const existingArtist = await ctx.db 699 .select() 700 .from(artists) 701 .where( 702 or( 703 eq( 704 artists.sha256, 705 createHash("sha256") 706 .update(track.albumArtist.toLowerCase()) 707 .digest("hex"), 708 ), 709 eq( 710 artists.sha256, 711 createHash("sha256").update(track.artist.toLowerCase()).digest("hex"), 712 ), 713 ), 714 ) 715 .limit(1) 716 .then((rows) => rows[0]); 717 718 const userArtist = await ctx.db 719 .select({ 720 userArtist: userArtists, 721 artist: artists, 722 user: users, 723 }) 724 .from(userArtists) 725 .innerJoin(artists, eq(userArtists.artistId, artists.id)) 726 .innerJoin(users, eq(userArtists.userId, users.id)) 727 .where( 728 and(eq(artists.id, existingArtist?.id || ""), eq(users.did, userDid)), 729 ) 730 .limit(1) 731 .then((rows) => rows[0]); 732 733 if (!existingArtist?.uri || !userArtist?.userArtist.uri?.includes(userDid)) { 734 await putArtistRecord(track, agent); 735 } 736 737 const userAlbum = await ctx.db 738 .select({ 739 userAlbum: userAlbums, 740 album: albums, 741 user: users, 742 }) 743 .from(userAlbums) 744 .innerJoin(albums, eq(userAlbums.albumId, albums.id)) 745 .innerJoin(users, eq(userAlbums.userId, users.id)) 746 .where(and(eq(albums.id, existingAlbum?.id || ""), eq(users.did, userDid))) 747 .limit(1) 748 .then((rows) => rows[0]); 749 750 if (!existingAlbum?.uri || !userAlbum?.userAlbum.uri?.includes(userDid)) { 751 await putAlbumRecord(track, agent); 752 } 753 754 tries = 0; 755 existingTrack = await ctx.db 756 .select() 757 .from(tracks) 758 .where( 759 eq( 760 tracks.sha256, 761 createHash("sha256") 762 .update( 763 `${track.title} - ${track.artist} - ${track.album}`.toLowerCase(), 764 ) 765 .digest("hex"), 766 ), 767 ) 768 .limit(1) 769 .then((rows) => rows[0]); 770 771 while (!existingTrack?.artistUri && !existingTrack?.albumUri && tries < 30) { 772 consola.info( 773 `Artist uri not ready, trying again: ${chalk.magenta(tries + 1)}`, 774 ); 775 existingTrack = await ctx.db 776 .select() 777 .from(tracks) 778 .where( 779 eq( 780 tracks.sha256, 781 createHash("sha256") 782 .update( 783 `${track.title} - ${track.artist} - ${track.album}`.toLowerCase(), 784 ) 785 .digest("hex"), 786 ), 787 ) 788 .limit(1) 789 .then((rows) => rows[0]); 790 791 // start update artist uri if it is not set 792 if (existingTrack && !existingTrack.artistUri) { 793 const artist = await ctx.db 794 .select() 795 .from(artists) 796 .where( 797 eq( 798 artists.sha256, 799 createHash("sha256") 800 .update(track.albumArtist.toLowerCase()) 801 .digest("hex"), 802 ), 803 ) 804 .limit(1) 805 .then((rows) => rows[0]); 806 if (artist) { 807 await ctx.db 808 .update(tracks) 809 .set({ artistUri: artist.uri }) 810 .where(eq(tracks.id, existingTrack.id)); 811 } 812 } 813 // end update artist uri 814 815 // start update album uri if it is not set 816 if (existingTrack && !existingTrack.albumUri) { 817 const album = await ctx.db 818 .select() 819 .from(albums) 820 .where( 821 eq( 822 albums.sha256, 823 createHash("sha256") 824 .update(`${track.album} - ${track.albumArtist}`.toLowerCase()) 825 .digest("hex"), 826 ), 827 ) 828 .limit(1) 829 .then((rows) => rows[0]); 830 if (album) { 831 await ctx.db 832 .update(tracks) 833 .set({ albumUri: album.uri }) 834 .where(eq(tracks.id, existingTrack.id)); 835 836 if (!album.artistUri && existingTrack?.artistUri) { 837 await ctx.db 838 .update(albums) 839 .set({ artistUri: existingTrack.artistUri }) 840 .where(eq(albums.id, album.id)); 841 } 842 } 843 } 844 // end update album uri 845 846 await new Promise((resolve) => setTimeout(resolve, 1000)); 847 tries += 1; 848 } 849 850 if (tries === 30 && !existingTrack?.artistUri) { 851 consola.info(`Artist uri not ready after ${chalk.magenta("30 tries")}`); 852 } 853 854 if (existingTrack?.artistUri) { 855 consola.info( 856 `Artist uri ready: ${chalk.cyan(existingTrack.id)} - ${track.title}, after ${chalk.magenta(tries)} tries`, 857 ); 858 } 859 860 if (mbTrack?.trackMBID) { 861 mbTrack.timestamp = track.timestamp 862 ? dayjs.unix(track.timestamp).toISOString() 863 : new Date().toISOString(); 864 // don't await this 865 tealfm.publishPlayingNow(agent, mbTrack, Math.floor(track.duration / 1000)); 866 } 867 868 const scrobbleUri = await putScrobbleRecord(track, agent); 869 870 // loop while scrobble is null, try 30 times, sleep 1 second between tries 871 tries = 0; 872 let scrobble = null; 873 while (!scrobble && tries < 30) { 874 scrobble = await ctx.db 875 .select({ 876 scrobble: scrobbles, 877 track: tracks, 878 album: albums, 879 artist: artists, 880 user: users, 881 }) 882 .from(scrobbles) 883 .innerJoin(tracks, eq(scrobbles.trackId, tracks.id)) 884 .innerJoin(albums, eq(scrobbles.albumId, albums.id)) 885 .innerJoin(artists, eq(scrobbles.artistId, artists.id)) 886 .innerJoin(users, eq(scrobbles.userId, users.id)) 887 .where(eq(scrobbles.uri, scrobbleUri)) 888 .limit(1) 889 .then((rows) => rows[0]); 890 891 if ( 892 scrobble && 893 scrobble.album && 894 !scrobble.album.artistUri && 895 scrobble.artist.uri 896 ) { 897 await ctx.db 898 .update(albums) 899 .set({ artistUri: scrobble.artist.uri }) 900 .where(eq(albums.id, scrobble.album.id)); 901 } 902 903 scrobble = await ctx.db 904 .select({ 905 scrobble: scrobbles, 906 track: tracks, 907 album: albums, 908 artist: artists, 909 user: users, 910 }) 911 .from(scrobbles) 912 .innerJoin(tracks, eq(scrobbles.trackId, tracks.id)) 913 .innerJoin(albums, eq(scrobbles.albumId, albums.id)) 914 .innerJoin(artists, eq(scrobbles.artistId, artists.id)) 915 .innerJoin(users, eq(scrobbles.userId, users.id)) 916 .where(eq(scrobbles.uri, scrobbleUri)) 917 .limit(1) 918 .then((rows) => rows[0]); 919 920 if ( 921 scrobble && 922 scrobble.track && 923 scrobble.album && 924 scrobble.artist && 925 scrobble.album.artistUri && 926 scrobble.track.artistUri && 927 scrobble.track.albumUri 928 ) { 929 consola.info("Scrobble found after ", chalk.magenta(tries + 1), " tries"); 930 await publishScrobble(ctx, scrobble.scrobble.id); 931 consola.info("Scrobble published"); 932 break; 933 } 934 tries += 1; 935 consola.info("Scrobble not found, trying again: ", chalk.magenta(tries)); 936 await new Promise((resolve) => setTimeout(resolve, 1000)); 937 } 938 939 if (tries === 30 && !scrobble) { 940 consola.info(`Scrobble not found after ${chalk.magenta("30 tries")}`); 941 } 942 943 ctx.nc.publish("rocksky.user.scrobble.sync", Buffer.from(userDid)); 944}