A decentralized music tracking and discovery platform built on AT Protocol 🎵
at main 1083 lines 29 kB view raw
1import { JetStreamClient, JetStreamEvent } from "jetstream"; 2import { logger } from "logger"; 3import { ctx } from "context"; 4import { Agent } from "@atproto/api"; 5import { env } from "lib/env"; 6import { createAgent } from "lib/agent"; 7import chalk from "chalk"; 8import * as Artist from "lexicon/types/app/rocksky/artist"; 9import * as Album from "lexicon/types/app/rocksky/album"; 10import * as Song from "lexicon/types/app/rocksky/song"; 11import * as Scrobble from "lexicon/types/app/rocksky/scrobble"; 12import { SelectUser } from "schema/users"; 13import schema from "schema"; 14import { createId } from "@paralleldrive/cuid2"; 15import _ from "lodash"; 16import { and, eq, or } from "drizzle-orm"; 17import { indexBy } from "ramda"; 18import fs from "node:fs"; 19import os from "node:os"; 20import path from "node:path"; 21import { getDidAndHandle } from "lib/getDidAndHandle"; 22import { cleanUpJetstreamLockOnExit } from "lib/cleanUpJetstreamLock"; 23import { cleanUpSyncLockOnExit } from "lib/cleanUpSyncLock"; 24import { CarReader } from "@ipld/car"; 25import * as cbor from "@ipld/dag-cbor"; 26 27type Artists = { value: Artist.Record; uri: string; cid: string }[]; 28type Albums = { value: Album.Record; uri: string; cid: string }[]; 29type Songs = { value: Song.Record; uri: string; cid: string }[]; 30type Scrobbles = { value: Scrobble.Record; uri: string; cid: string }[]; 31 32export async function sync() { 33 const [did, handle] = await getDidAndHandle(); 34 const agent: Agent = await createAgent(did, handle); 35 36 const user = await createUser(agent, did, handle); 37 await subscribeToJetstream(user); 38 39 logger.info` DID: ${did}`; 40 logger.info` Handle: ${handle}`; 41 42 const carReader = await downloadCarFile(agent); 43 44 const [artists, albums, songs, scrobbles] = await Promise.all([ 45 getRockskyUserArtists(agent, carReader), 46 getRockskyUserAlbums(agent, carReader), 47 getRockskyUserSongs(agent, carReader), 48 getRockskyUserScrobbles(agent, carReader), 49 ]); 50 51 logger.info` Artists: ${artists.length}`; 52 logger.info` Albums: ${albums.length}`; 53 logger.info` Songs: ${songs.length}`; 54 logger.info` Scrobbles: ${scrobbles.length}`; 55 56 const lockFilePath = path.join(os.tmpdir(), `rocksky-${did}.lock`); 57 58 if (await fs.promises.stat(lockFilePath).catch(() => false)) { 59 logger.error`Lock file already exists, if you want to force sync, delete the lock file ${lockFilePath}`; 60 process.exit(1); 61 } 62 63 await fs.promises.writeFile(lockFilePath, ""); 64 cleanUpSyncLockOnExit(user.did); 65 66 await createArtists(artists, user); 67 await createAlbums(albums, user); 68 await createSongs(songs, user); 69 await createScrobbles(scrobbles, user); 70 71 await fs.promises.unlink(lockFilePath); 72} 73 74const getEndpoint = () => { 75 const endpoint = env.JETSTREAM_SERVER; 76 77 if (endpoint?.endsWith("/subscribe")) { 78 return endpoint; 79 } 80 81 return `${endpoint}/subscribe`; 82}; 83 84export const createUser = async ( 85 agent: Agent, 86 did: string, 87 handle: string, 88): Promise<SelectUser> => { 89 const { data: profileRecord } = await agent.com.atproto.repo.getRecord({ 90 repo: agent.assertDid, 91 collection: "app.bsky.actor.profile", 92 rkey: "self", 93 }); 94 95 const displayName = _.get(profileRecord, "value.displayName") as 96 | string 97 | undefined; 98 const avatar = `https://cdn.bsky.app/img/avatar/plain/${did}/${_.get(profileRecord, "value.avatar.ref", "").toString()}@jpeg`; 99 100 const [user] = await ctx.db 101 .insert(schema.users) 102 .values({ 103 id: createId(), 104 did, 105 handle, 106 displayName, 107 avatar, 108 }) 109 .onConflictDoUpdate({ 110 target: schema.users.did, 111 set: { 112 handle, 113 displayName, 114 avatar, 115 updatedAt: new Date(), 116 }, 117 }) 118 .returning() 119 .execute(); 120 121 return user; 122}; 123 124const createArtists = async (artists: Artists, user: SelectUser) => { 125 if (artists.length === 0) return; 126 127 const tags = artists.map((artist) => artist.value.tags || []); 128 129 // Batch genre inserts to avoid stack overflow 130 const uniqueTags = tags 131 .flat() 132 .filter((tag) => tag) 133 .map((tag) => ({ 134 id: createId(), 135 name: tag, 136 })); 137 138 const BATCH_SIZE = 1000; 139 for (let i = 0; i < uniqueTags.length; i += BATCH_SIZE) { 140 const batch = uniqueTags.slice(i, i + BATCH_SIZE); 141 await ctx.db 142 .insert(schema.genres) 143 .values(batch) 144 .onConflictDoNothing({ 145 target: schema.genres.name, 146 }) 147 .execute(); 148 } 149 150 const genres = await ctx.db.select().from(schema.genres).execute(); 151 152 const genreMap = indexBy((genre) => genre.name, genres); 153 154 // Process artists in batches 155 let totalArtistsImported = 0; 156 157 for (let i = 0; i < artists.length; i += BATCH_SIZE) { 158 const batch = artists.slice(i, i + BATCH_SIZE); 159 160 ctx.db.transaction((tx) => { 161 const newArtists = tx 162 .insert(schema.artists) 163 .values( 164 batch.map((artist) => ({ 165 id: createId(), 166 name: artist.value.name, 167 cid: artist.cid, 168 uri: artist.uri, 169 biography: artist.value.bio, 170 born: artist.value.born ? new Date(artist.value.born) : null, 171 bornIn: artist.value.bornIn, 172 died: artist.value.died ? new Date(artist.value.died) : null, 173 picture: artist.value.pictureUrl, 174 genres: artist.value.tags?.join(", "), 175 })), 176 ) 177 .onConflictDoNothing({ 178 target: schema.artists.cid, 179 }) 180 .returning() 181 .all(); 182 183 if (newArtists.length === 0) return; 184 185 const artistGenres = newArtists 186 .map( 187 (artist) => 188 artist.genres 189 ?.split(", ") 190 .filter((tag) => !!tag && !!genreMap[tag]) 191 .map((tag) => ({ 192 id: createId(), 193 artistId: artist.id, 194 genreId: genreMap[tag].id, 195 })) || [], 196 ) 197 .flat(); 198 199 if (artistGenres.length > 0) { 200 tx.insert(schema.artistGenres) 201 .values(artistGenres) 202 .onConflictDoNothing({ 203 target: [schema.artistGenres.artistId, schema.artistGenres.genreId], 204 }) 205 .returning() 206 .run(); 207 } 208 209 tx.insert(schema.userArtists) 210 .values( 211 newArtists.map((artist) => ({ 212 id: createId(), 213 userId: user.id, 214 artistId: artist.id, 215 uri: artist.uri, 216 })), 217 ) 218 .run(); 219 220 totalArtistsImported += newArtists.length; 221 }); 222 } 223 224 logger.info`👤 ${totalArtistsImported} Artists imported`; 225}; 226 227const createAlbums = async (albums: Albums, user: SelectUser) => { 228 if (albums.length === 0) return; 229 230 const artists = await Promise.all( 231 albums.map(async (album) => 232 ctx.db 233 .select() 234 .from(schema.artists) 235 .where(eq(schema.artists.name, album.value.artist)) 236 .execute() 237 .then(([artist]) => artist), 238 ), 239 ); 240 241 const validAlbumData = albums 242 .map((album, index) => ({ album, artist: artists[index] })) 243 .filter(({ artist }) => artist); 244 245 // Process albums in batches 246 const BATCH_SIZE = 1000; 247 let totalAlbumsImported = 0; 248 249 for (let i = 0; i < validAlbumData.length; i += BATCH_SIZE) { 250 const batch = validAlbumData.slice(i, i + BATCH_SIZE); 251 252 ctx.db.transaction((tx) => { 253 const newAlbums = tx 254 .insert(schema.albums) 255 .values( 256 batch.map(({ album, artist }) => ({ 257 id: createId(), 258 cid: album.cid, 259 uri: album.uri, 260 title: album.value.title, 261 artist: album.value.artist, 262 releaseDate: album.value.releaseDate, 263 year: album.value.year, 264 albumArt: album.value.albumArtUrl, 265 artistUri: artist.uri, 266 appleMusicLink: album.value.appleMusicLink, 267 spotifyLink: album.value.spotifyLink, 268 tidalLink: album.value.tidalLink, 269 youtubeLink: album.value.youtubeLink, 270 })), 271 ) 272 .onConflictDoNothing({ 273 target: schema.albums.cid, 274 }) 275 .returning() 276 .all(); 277 278 if (newAlbums.length === 0) return; 279 280 tx.insert(schema.userAlbums) 281 .values( 282 newAlbums.map((album) => ({ 283 id: createId(), 284 userId: user.id, 285 albumId: album.id, 286 uri: album.uri, 287 })), 288 ) 289 .run(); 290 291 totalAlbumsImported += newAlbums.length; 292 }); 293 } 294 295 logger.info`💿 ${totalAlbumsImported} Albums imported`; 296}; 297 298const createSongs = async (songs: Songs, user: SelectUser) => { 299 if (songs.length === 0) return; 300 301 const albums = await Promise.all( 302 songs.map((song) => 303 ctx.db 304 .select() 305 .from(schema.albums) 306 .where( 307 and( 308 eq(schema.albums.artist, song.value.albumArtist), 309 eq(schema.albums.title, song.value.album), 310 ), 311 ) 312 .execute() 313 .then((result) => result[0]), 314 ), 315 ); 316 317 const artists = await Promise.all( 318 songs.map((song) => 319 ctx.db 320 .select() 321 .from(schema.artists) 322 .where(eq(schema.artists.name, song.value.albumArtist)) 323 .execute() 324 .then((result) => result[0]), 325 ), 326 ); 327 328 const validSongData = songs 329 .map((song, index) => ({ 330 song, 331 artist: artists[index], 332 album: albums[index], 333 })) 334 .filter(({ artist, album }) => artist && album); 335 336 // Process in batches to avoid stack overflow with large datasets 337 const BATCH_SIZE = 1000; 338 let totalTracksImported = 0; 339 340 for (let i = 0; i < validSongData.length; i += BATCH_SIZE) { 341 const batch = validSongData.slice(i, i + BATCH_SIZE); 342 const batchNumber = Math.floor(i / BATCH_SIZE) + 1; 343 const totalBatches = Math.ceil(validSongData.length / BATCH_SIZE); 344 345 logger.info`▶️ Processing tracks batch ${batchNumber}/${totalBatches} (${Math.min(i + BATCH_SIZE, validSongData.length)}/${validSongData.length})`; 346 347 ctx.db.transaction((tx) => { 348 const tracks = tx 349 .insert(schema.tracks) 350 .values( 351 batch.map(({ song, artist, album }) => ({ 352 id: createId(), 353 cid: song.cid, 354 uri: song.uri, 355 title: song.value.title, 356 artist: song.value.artist, 357 albumArtist: song.value.albumArtist, 358 albumArt: song.value.albumArtUrl, 359 album: song.value.album, 360 trackNumber: song.value.trackNumber, 361 duration: song.value.duration, 362 mbId: song.value.mbid, 363 youtubeLink: song.value.youtubeLink, 364 spotifyLink: song.value.spotifyLink, 365 appleMusicLink: song.value.appleMusicLink, 366 tidalLink: song.value.tidalLink, 367 discNumber: song.value.discNumber, 368 lyrics: song.value.lyrics, 369 composer: song.value.composer, 370 genre: song.value.genre, 371 label: song.value.label, 372 copyrightMessage: song.value.copyrightMessage, 373 albumUri: album.uri, 374 artistUri: artist.uri, 375 })), 376 ) 377 .onConflictDoNothing() 378 .returning() 379 .all(); 380 381 if (tracks.length === 0) return; 382 383 tx.insert(schema.albumTracks) 384 .values( 385 tracks.map((track, index) => ({ 386 id: createId(), 387 albumId: batch[index].album.id, 388 trackId: track.id, 389 })), 390 ) 391 .onConflictDoNothing({ 392 target: [schema.albumTracks.albumId, schema.albumTracks.trackId], 393 }) 394 .run(); 395 396 tx.insert(schema.userTracks) 397 .values( 398 tracks.map((track) => ({ 399 id: createId(), 400 userId: user.id, 401 trackId: track.id, 402 uri: track.uri, 403 })), 404 ) 405 .onConflictDoNothing({ 406 target: [schema.userTracks.userId, schema.userTracks.trackId], 407 }) 408 .run(); 409 410 totalTracksImported += tracks.length; 411 }); 412 } 413 414 logger.info`▶️ ${totalTracksImported} Tracks imported`; 415}; 416 417const createScrobbles = async (scrobbles: Scrobbles, user: SelectUser) => { 418 if (!scrobbles.length) return; 419 420 logger.info`Loading Scrobble Tracks ...`; 421 422 const tracks = await Promise.all( 423 scrobbles.map((scrobble) => 424 ctx.db 425 .select() 426 .from(schema.tracks) 427 .where( 428 and( 429 eq(schema.tracks.title, scrobble.value.title), 430 eq(schema.tracks.artist, scrobble.value.artist), 431 eq(schema.tracks.album, scrobble.value.album), 432 eq(schema.tracks.albumArtist, scrobble.value.albumArtist), 433 ), 434 ) 435 .execute() 436 .then(([track]) => track), 437 ), 438 ); 439 440 logger.info`Loading Scrobble Albums ...`; 441 442 const albums = await Promise.all( 443 scrobbles.map((scrobble) => 444 ctx.db 445 .select() 446 .from(schema.albums) 447 .where( 448 and( 449 eq(schema.albums.title, scrobble.value.album), 450 eq(schema.albums.artist, scrobble.value.albumArtist), 451 ), 452 ) 453 .execute() 454 .then(([album]) => album), 455 ), 456 ); 457 458 logger.info`Loading Scrobble Artists ...`; 459 460 const artists = await Promise.all( 461 scrobbles.map((scrobble) => 462 ctx.db 463 .select() 464 .from(schema.artists) 465 .where( 466 or( 467 and(eq(schema.artists.name, scrobble.value.artist)), 468 and(eq(schema.artists.name, scrobble.value.albumArtist)), 469 ), 470 ) 471 .execute() 472 .then(([artist]) => artist), 473 ), 474 ); 475 476 const validScrobbleData = scrobbles 477 .map((scrobble, index) => ({ 478 scrobble, 479 track: tracks[index], 480 album: albums[index], 481 artist: artists[index], 482 })) 483 .filter(({ track, album, artist }) => track && album && artist); 484 485 // Process in batches to avoid stack overflow with large datasets 486 const BATCH_SIZE = 1000; 487 let totalScrobblesImported = 0; 488 489 for (let i = 0; i < validScrobbleData.length; i += BATCH_SIZE) { 490 const batch = validScrobbleData.slice(i, i + BATCH_SIZE); 491 const batchNumber = Math.floor(i / BATCH_SIZE) + 1; 492 const totalBatches = Math.ceil(validScrobbleData.length / BATCH_SIZE); 493 494 logger.info`🕒 Processing scrobbles batch ${batchNumber}/${totalBatches} (${Math.min(i + BATCH_SIZE, validScrobbleData.length)}/${validScrobbleData.length})`; 495 496 const result = await ctx.db 497 .insert(schema.scrobbles) 498 .values( 499 batch.map(({ scrobble, track, album, artist }) => ({ 500 id: createId(), 501 userId: user.id, 502 trackId: track.id, 503 albumId: album.id, 504 artistId: artist.id, 505 uri: scrobble.uri, 506 cid: scrobble.cid, 507 timestamp: new Date(scrobble.value.createdAt), 508 })), 509 ) 510 .onConflictDoNothing({ 511 target: schema.scrobbles.cid, 512 }) 513 .returning() 514 .execute(); 515 516 totalScrobblesImported += result.length; 517 } 518 519 logger.info`🕒 ${totalScrobblesImported} scrobbles imported`; 520}; 521 522export const subscribeToJetstream = (user: SelectUser): Promise<void> => { 523 const lockFile = path.join(os.tmpdir(), `rocksky-jetstream-${user.did}.lock`); 524 if (fs.existsSync(lockFile)) { 525 logger.warn`JetStream subscription already in progress for user ${user.did}`; 526 logger.warn`Skipping subscription`; 527 logger.warn`Lock file exists at ${lockFile}`; 528 return Promise.resolve(); 529 } 530 531 fs.writeFileSync(lockFile, ""); 532 533 const client = new JetStreamClient({ 534 wantedCollections: [ 535 "app.rocksky.scrobble", 536 "app.rocksky.artist", 537 "app.rocksky.album", 538 "app.rocksky.song", 539 ], 540 endpoint: getEndpoint(), 541 wantedDids: [user.did], 542 543 // Reconnection settings 544 maxReconnectAttempts: 10, 545 reconnectDelay: 1000, 546 maxReconnectDelay: 30000, 547 backoffMultiplier: 1.5, 548 549 // Enable debug logging 550 debug: true, 551 }); 552 553 return new Promise((resolve, reject) => { 554 client.on("open", () => { 555 logger.info`✅ Connected to JetStream!`; 556 cleanUpJetstreamLockOnExit(user.did); 557 resolve(); 558 }); 559 560 client.on("message", async (data) => { 561 const event = data as JetStreamEvent; 562 563 if (event.kind === "commit" && event.commit) { 564 const { operation, collection, record, rkey, cid } = event.commit; 565 const uri = `at://${event.did}/${collection}/${rkey}`; 566 567 logger.info`\n📡 New event:`; 568 logger.info` Operation: ${operation}`; 569 logger.info` Collection: ${collection}`; 570 logger.info` DID: ${event.did}`; 571 logger.info` Uri: ${uri}`; 572 573 if (operation === "create" && record) { 574 console.log(JSON.stringify(record, null, 2)); 575 await onNewCollection(record, cid, uri, user); 576 } 577 578 logger.info` Cursor: ${event.time_us}`; 579 } 580 }); 581 582 client.on("error", (error) => { 583 logger.error`❌ Error: ${error}`; 584 cleanUpJetstreamLockOnExit(user.did); 585 reject(error); 586 }); 587 588 client.on("reconnect", (data) => { 589 const { attempt } = data as { attempt: number }; 590 logger.info`🔄 Reconnecting... (attempt ${attempt})`; 591 }); 592 593 client.connect(); 594 }); 595}; 596 597const onNewCollection = async ( 598 record: any, 599 cid: string, 600 uri: string, 601 user: SelectUser, 602) => { 603 switch (record.$type) { 604 case "app.rocksky.song": 605 await onNewSong(record, cid, uri, user); 606 break; 607 case "app.rocksky.album": 608 await onNewAlbum(record, cid, uri, user); 609 break; 610 case "app.rocksky.artist": 611 await onNewArtist(record, cid, uri, user); 612 break; 613 case "app.rocksky.scrobble": 614 await onNewScrobble(record, cid, uri, user); 615 break; 616 default: 617 logger.warn`Unknown collection type: ${record.$type}`; 618 } 619}; 620 621const onNewSong = async ( 622 record: Song.Record, 623 cid: string, 624 uri: string, 625 user: SelectUser, 626) => { 627 const { title, artist, album } = record; 628 logger.info` New song: ${title} by ${artist} from ${album}`; 629 await createSongs( 630 [ 631 { 632 cid, 633 uri, 634 value: record, 635 }, 636 ], 637 user, 638 ); 639}; 640 641const onNewAlbum = async ( 642 record: Album.Record, 643 cid: string, 644 uri: string, 645 user: SelectUser, 646) => { 647 const { title, artist } = record; 648 logger.info` New album: ${title} by ${artist}`; 649 await createAlbums( 650 [ 651 { 652 cid, 653 uri, 654 value: record, 655 }, 656 ], 657 user, 658 ); 659}; 660 661const onNewArtist = async ( 662 record: Artist.Record, 663 cid: string, 664 uri: string, 665 user: SelectUser, 666) => { 667 const { name } = record; 668 logger.info` New artist: ${name}`; 669 await createArtists( 670 [ 671 { 672 cid, 673 uri, 674 value: record, 675 }, 676 ], 677 user, 678 ); 679}; 680 681const onNewScrobble = async ( 682 record: Scrobble.Record, 683 cid: string, 684 uri: string, 685 user: SelectUser, 686) => { 687 const { title, createdAt, artist, album, albumArtist } = record; 688 logger.info` New scrobble: ${title} at ${createdAt}`; 689 690 // Check if the artist exists, create if not 691 let [artistRecord] = await ctx.db 692 .select() 693 .from(schema.artists) 694 .where(eq(schema.artists.name, record.albumArtist)) 695 .execute(); 696 697 if (!artistRecord) { 698 logger.info` ⚙️ Artist not found, creating: "${albumArtist}"`; 699 700 // Create a synthetic artist record from scrobble data 701 const artistUri = `at://${user.did}/app.rocksky.artist/${createId()}`; 702 const artistCid = createId(); 703 704 await createArtists( 705 [ 706 { 707 cid: artistCid, 708 uri: artistUri, 709 value: { 710 $type: "app.rocksky.artist", 711 name: record.albumArtist, 712 createdAt: new Date().toISOString(), 713 tags: record.tags || [], 714 } as Artist.Record, 715 }, 716 ], 717 user, 718 ); 719 720 [artistRecord] = await ctx.db 721 .select() 722 .from(schema.artists) 723 .where(eq(schema.artists.name, record.albumArtist)) 724 .execute(); 725 726 if (!artistRecord) { 727 logger.error` ❌ Failed to create artist. Skipping scrobble.`; 728 return; 729 } 730 } 731 732 // Check if the album exists, create if not 733 let [albumRecord] = await ctx.db 734 .select() 735 .from(schema.albums) 736 .where( 737 and( 738 eq(schema.albums.title, record.album), 739 eq(schema.albums.artist, record.albumArtist), 740 ), 741 ) 742 .execute(); 743 744 if (!albumRecord) { 745 logger.info` ⚙️ Album not found, creating: "${album}" by ${albumArtist}`; 746 747 // Create a synthetic album record from scrobble data 748 const albumUri = `at://${user.did}/app.rocksky.album/${createId()}`; 749 const albumCid = createId(); 750 751 await createAlbums( 752 [ 753 { 754 cid: albumCid, 755 uri: albumUri, 756 value: { 757 $type: "app.rocksky.album", 758 title: record.album, 759 artist: record.albumArtist, 760 createdAt: new Date().toISOString(), 761 releaseDate: record.releaseDate, 762 year: record.year, 763 albumArt: record.albumArt, 764 artistUri: artistRecord.uri, 765 spotifyLink: record.spotifyLink, 766 appleMusicLink: record.appleMusicLink, 767 tidalLink: record.tidalLink, 768 youtubeLink: record.youtubeLink, 769 } as Album.Record, 770 }, 771 ], 772 user, 773 ); 774 775 // Fetch the newly created album 776 [albumRecord] = await ctx.db 777 .select() 778 .from(schema.albums) 779 .where( 780 and( 781 eq(schema.albums.title, record.album), 782 eq(schema.albums.artist, record.albumArtist), 783 ), 784 ) 785 .execute(); 786 787 if (!albumRecord) { 788 logger.error` ❌ Failed to create album. Skipping scrobble.`; 789 return; 790 } 791 } 792 793 // Check if the track exists, create if not 794 let [track] = await ctx.db 795 .select() 796 .from(schema.tracks) 797 .where( 798 and( 799 eq(schema.tracks.title, record.title), 800 eq(schema.tracks.artist, record.artist), 801 eq(schema.tracks.album, record.album), 802 eq(schema.tracks.albumArtist, record.albumArtist), 803 ), 804 ) 805 .execute(); 806 807 if (!track) { 808 logger.info` ⚙️ Track not found, creating: "${title}" by ${artist} from ${album}`; 809 810 // Create a synthetic track record from scrobble data 811 const trackUri = `at://${user.did}/app.rocksky.song/${createId()}`; 812 const trackCid = createId(); 813 814 await createSongs( 815 [ 816 { 817 cid: trackCid, 818 uri: trackUri, 819 value: { 820 $type: "app.rocksky.song", 821 title: record.title, 822 artist: record.artist, 823 albumArtist: record.albumArtist, 824 album: record.album, 825 duration: record.duration, 826 trackNumber: record.trackNumber, 827 discNumber: record.discNumber, 828 releaseDate: record.releaseDate, 829 year: record.year, 830 genre: record.genre, 831 tags: record.tags, 832 composer: record.composer, 833 lyrics: record.lyrics, 834 copyrightMessage: record.copyrightMessage, 835 albumArt: record.albumArt, 836 youtubeLink: record.youtubeLink, 837 spotifyLink: record.spotifyLink, 838 tidalLink: record.tidalLink, 839 appleMusicLink: record.appleMusicLink, 840 createdAt: new Date().toISOString(), 841 mbId: record.mbid, 842 label: record.label, 843 albumUri: albumRecord.uri, 844 artistUri: artistRecord.uri, 845 } as Song.Record, 846 }, 847 ], 848 user, 849 ); 850 851 // Fetch the newly created track 852 [track] = await ctx.db 853 .select() 854 .from(schema.tracks) 855 .where( 856 and( 857 eq(schema.tracks.title, record.title), 858 eq(schema.tracks.artist, record.artist), 859 eq(schema.tracks.album, record.album), 860 eq(schema.tracks.albumArtist, record.albumArtist), 861 ), 862 ) 863 .execute(); 864 865 if (!track) { 866 logger.error` ❌ Failed to create track. Skipping scrobble.`; 867 return; 868 } 869 } 870 871 logger.info` ✓ All required entities ready. Creating scrobble...`; 872 873 await createScrobbles( 874 [ 875 { 876 cid, 877 uri, 878 value: record, 879 }, 880 ], 881 user, 882 ); 883}; 884 885const downloadCarFile = async (agent: Agent) => { 886 logger.info(`Fetching repository CAR file ...`); 887 888 const repoRes = await agent.com.atproto.sync.getRepo({ 889 did: agent.assertDid, 890 }); 891 892 return CarReader.fromBytes(new Uint8Array(repoRes.data)); 893}; 894 895const getRockskyUserSongs = async ( 896 agent: Agent, 897 carReader: CarReader, 898): Promise<Songs> => { 899 const results: { 900 value: Song.Record; 901 uri: string; 902 cid: string; 903 }[] = []; 904 905 try { 906 const collection = "app.rocksky.song"; 907 logger.info`Extracting ${collection} records from CAR file ...`; 908 909 for await (const { cid, bytes } of carReader.blocks()) { 910 try { 911 const decoded = cbor.decode(bytes); 912 913 // Check if this is a record with $type matching our collection 914 if (decoded && typeof decoded === "object" && "$type" in decoded) { 915 if (decoded.$type === collection) { 916 const value = decoded as unknown as Song.Record; 917 // Extract rkey from uri if present in the block, otherwise use cid 918 const uri = `at://${agent.assertDid}/${collection}/${cid.toString()}`; 919 920 results.push({ 921 value, 922 uri, 923 cid: cid.toString(), 924 }); 925 } 926 } 927 } catch (e) { 928 logger.warn` Skipping block with CID ${cid.toString()} due to decode error: ${e}`; 929 continue; 930 } 931 } 932 933 logger.info( 934 `${chalk.cyanBright(agent.assertDid)} ${chalk.greenBright(results.length)} songs`, 935 ); 936 } catch (error) { 937 logger.error(`Error fetching songs from CAR: ${error}`); 938 throw error; 939 } 940 941 return results; 942}; 943 944const getRockskyUserAlbums = async ( 945 agent: Agent, 946 carReader: CarReader, 947): Promise<Albums> => { 948 const results: { 949 value: Album.Record; 950 uri: string; 951 cid: string; 952 }[] = []; 953 954 try { 955 const collection = "app.rocksky.album"; 956 logger.info`Extracting ${collection} records from CAR file ...`; 957 958 for await (const { cid, bytes } of carReader.blocks()) { 959 try { 960 const decoded = cbor.decode(bytes); 961 962 if (decoded && typeof decoded === "object" && "$type" in decoded) { 963 if (decoded.$type === collection) { 964 const value = decoded as unknown as Album.Record; 965 const uri = `at://${agent.assertDid}/${collection}/${cid.toString()}`; 966 967 results.push({ 968 value, 969 uri, 970 cid: cid.toString(), 971 }); 972 } 973 } 974 } catch (e) { 975 logger.warn` Skipping block with CID ${cid.toString()} due to decode error: ${e}`; 976 continue; 977 } 978 } 979 980 logger.info( 981 `${chalk.cyanBright(agent.assertDid)} ${chalk.greenBright(results.length)} albums`, 982 ); 983 } catch (error) { 984 logger.error(`Error fetching albums from CAR: ${error}`); 985 throw error; 986 } 987 988 return results; 989}; 990 991const getRockskyUserArtists = async ( 992 agent: Agent, 993 carReader: CarReader, 994): Promise<Artists> => { 995 const results: { 996 value: Artist.Record; 997 uri: string; 998 cid: string; 999 }[] = []; 1000 1001 try { 1002 const collection = "app.rocksky.artist"; 1003 logger.info`Extracting ${collection} records from CAR file ...`; 1004 1005 for await (const { cid, bytes } of carReader.blocks()) { 1006 try { 1007 const decoded = cbor.decode(bytes); 1008 1009 if (decoded && typeof decoded === "object" && "$type" in decoded) { 1010 if (decoded.$type === collection) { 1011 const value = decoded as unknown as Artist.Record; 1012 const uri = `at://${agent.assertDid}/${collection}/${cid.toString()}`; 1013 1014 results.push({ 1015 value, 1016 uri, 1017 cid: cid.toString(), 1018 }); 1019 } 1020 } 1021 } catch (e) { 1022 // Skip blocks that can't be decoded 1023 continue; 1024 } 1025 } 1026 1027 logger.info( 1028 `${chalk.cyanBright(agent.assertDid)} ${chalk.greenBright(results.length)} artists`, 1029 ); 1030 } catch (error) { 1031 logger.error(`Error fetching artists from CAR: ${error}`); 1032 throw error; 1033 } 1034 1035 return results; 1036}; 1037 1038const getRockskyUserScrobbles = async ( 1039 agent: Agent, 1040 carReader: CarReader, 1041): Promise<Scrobbles> => { 1042 const results: { 1043 value: Scrobble.Record; 1044 uri: string; 1045 cid: string; 1046 }[] = []; 1047 1048 try { 1049 const collection = "app.rocksky.scrobble"; 1050 logger.info`Extracting ${collection} records from CAR file ...`; 1051 1052 for await (const { cid, bytes } of carReader.blocks()) { 1053 try { 1054 const decoded = cbor.decode(bytes); 1055 1056 if (decoded && typeof decoded === "object" && "$type" in decoded) { 1057 if (decoded.$type === collection) { 1058 const value = decoded as unknown as Scrobble.Record; 1059 const uri = `at://${agent.assertDid}/${collection}/${cid.toString()}`; 1060 1061 results.push({ 1062 value, 1063 uri, 1064 cid: cid.toString(), 1065 }); 1066 } 1067 } 1068 } catch (e) { 1069 logger.warn` Skipping block with CID ${cid.toString()} due to decode error: ${e}`; 1070 continue; 1071 } 1072 } 1073 1074 logger.info( 1075 `${chalk.cyanBright(agent.assertDid)} ${chalk.greenBright(results.length)} scrobbles`, 1076 ); 1077 } catch (error) { 1078 logger.error(`Error fetching scrobbles from CAR: ${error}`); 1079 throw error; 1080 } 1081 1082 return results; 1083};