forked from
rocksky.app/rocksky
A decentralized music tracking and discovery platform built on AT Protocol 🎵
1import { JetStreamClient, JetStreamEvent } from "jetstream";
2import { logger } from "logger";
3import { ctx } from "context";
4import { Agent } from "@atproto/api";
5import { env } from "lib/env";
6import { createAgent } from "lib/agent";
7import chalk from "chalk";
8import * as Artist from "lexicon/types/app/rocksky/artist";
9import * as Album from "lexicon/types/app/rocksky/album";
10import * as Song from "lexicon/types/app/rocksky/song";
11import * as Scrobble from "lexicon/types/app/rocksky/scrobble";
12import { SelectUser } from "schema/users";
13import schema from "schema";
14import { createId } from "@paralleldrive/cuid2";
15import _ from "lodash";
16import { and, eq, or } from "drizzle-orm";
17import { indexBy } from "ramda";
18import fs from "node:fs";
19import os from "node:os";
20import path from "node:path";
21import { getDidAndHandle } from "lib/getDidAndHandle";
22import { cleanUpJetstreamLockOnExit } from "lib/cleanUpJetstreamLock";
23import { cleanUpSyncLockOnExit } from "lib/cleanUpSyncLock";
24import { CarReader } from "@ipld/car";
25import * as cbor from "@ipld/dag-cbor";
26
27type Artists = { value: Artist.Record; uri: string; cid: string }[];
28type Albums = { value: Album.Record; uri: string; cid: string }[];
29type Songs = { value: Song.Record; uri: string; cid: string }[];
30type Scrobbles = { value: Scrobble.Record; uri: string; cid: string }[];
31
32export async function sync() {
33 const [did, handle] = await getDidAndHandle();
34 const agent: Agent = await createAgent(did, handle);
35
36 const user = await createUser(agent, did, handle);
37 await subscribeToJetstream(user);
38
39 logger.info` DID: ${did}`;
40 logger.info` Handle: ${handle}`;
41
42 const carReader = await downloadCarFile(agent);
43
44 const [artists, albums, songs, scrobbles] = await Promise.all([
45 getRockskyUserArtists(agent, carReader),
46 getRockskyUserAlbums(agent, carReader),
47 getRockskyUserSongs(agent, carReader),
48 getRockskyUserScrobbles(agent, carReader),
49 ]);
50
51 logger.info` Artists: ${artists.length}`;
52 logger.info` Albums: ${albums.length}`;
53 logger.info` Songs: ${songs.length}`;
54 logger.info` Scrobbles: ${scrobbles.length}`;
55
56 const lockFilePath = path.join(os.tmpdir(), `rocksky-${did}.lock`);
57
58 if (await fs.promises.stat(lockFilePath).catch(() => false)) {
59 logger.error`Lock file already exists, if you want to force sync, delete the lock file ${lockFilePath}`;
60 process.exit(1);
61 }
62
63 await fs.promises.writeFile(lockFilePath, "");
64 cleanUpSyncLockOnExit(user.did);
65
66 await createArtists(artists, user);
67 await createAlbums(albums, user);
68 await createSongs(songs, user);
69 await createScrobbles(scrobbles, user);
70
71 await fs.promises.unlink(lockFilePath);
72}
73
74const getEndpoint = () => {
75 const endpoint = env.JETSTREAM_SERVER;
76
77 if (endpoint?.endsWith("/subscribe")) {
78 return endpoint;
79 }
80
81 return `${endpoint}/subscribe`;
82};
83
84export const createUser = async (
85 agent: Agent,
86 did: string,
87 handle: string,
88): Promise<SelectUser> => {
89 const { data: profileRecord } = await agent.com.atproto.repo.getRecord({
90 repo: agent.assertDid,
91 collection: "app.bsky.actor.profile",
92 rkey: "self",
93 });
94
95 const displayName = _.get(profileRecord, "value.displayName") as
96 | string
97 | undefined;
98 const avatar = `https://cdn.bsky.app/img/avatar/plain/${did}/${_.get(profileRecord, "value.avatar.ref", "").toString()}@jpeg`;
99
100 const [user] = await ctx.db
101 .insert(schema.users)
102 .values({
103 id: createId(),
104 did,
105 handle,
106 displayName,
107 avatar,
108 })
109 .onConflictDoUpdate({
110 target: schema.users.did,
111 set: {
112 handle,
113 displayName,
114 avatar,
115 updatedAt: new Date(),
116 },
117 })
118 .returning()
119 .execute();
120
121 return user;
122};
123
124const createArtists = async (artists: Artists, user: SelectUser) => {
125 if (artists.length === 0) return;
126
127 const tags = artists.map((artist) => artist.value.tags || []);
128
129 // Batch genre inserts to avoid stack overflow
130 const uniqueTags = tags
131 .flat()
132 .filter((tag) => tag)
133 .map((tag) => ({
134 id: createId(),
135 name: tag,
136 }));
137
138 const BATCH_SIZE = 1000;
139 for (let i = 0; i < uniqueTags.length; i += BATCH_SIZE) {
140 const batch = uniqueTags.slice(i, i + BATCH_SIZE);
141 await ctx.db
142 .insert(schema.genres)
143 .values(batch)
144 .onConflictDoNothing({
145 target: schema.genres.name,
146 })
147 .execute();
148 }
149
150 const genres = await ctx.db.select().from(schema.genres).execute();
151
152 const genreMap = indexBy((genre) => genre.name, genres);
153
154 // Process artists in batches
155 let totalArtistsImported = 0;
156
157 for (let i = 0; i < artists.length; i += BATCH_SIZE) {
158 const batch = artists.slice(i, i + BATCH_SIZE);
159
160 ctx.db.transaction((tx) => {
161 const newArtists = tx
162 .insert(schema.artists)
163 .values(
164 batch.map((artist) => ({
165 id: createId(),
166 name: artist.value.name,
167 cid: artist.cid,
168 uri: artist.uri,
169 biography: artist.value.bio,
170 born: artist.value.born ? new Date(artist.value.born) : null,
171 bornIn: artist.value.bornIn,
172 died: artist.value.died ? new Date(artist.value.died) : null,
173 picture: artist.value.pictureUrl,
174 genres: artist.value.tags?.join(", "),
175 })),
176 )
177 .onConflictDoNothing({
178 target: schema.artists.cid,
179 })
180 .returning()
181 .all();
182
183 if (newArtists.length === 0) return;
184
185 const artistGenres = newArtists
186 .map(
187 (artist) =>
188 artist.genres
189 ?.split(", ")
190 .filter((tag) => !!tag && !!genreMap[tag])
191 .map((tag) => ({
192 id: createId(),
193 artistId: artist.id,
194 genreId: genreMap[tag].id,
195 })) || [],
196 )
197 .flat();
198
199 if (artistGenres.length > 0) {
200 tx.insert(schema.artistGenres)
201 .values(artistGenres)
202 .onConflictDoNothing({
203 target: [schema.artistGenres.artistId, schema.artistGenres.genreId],
204 })
205 .returning()
206 .run();
207 }
208
209 tx.insert(schema.userArtists)
210 .values(
211 newArtists.map((artist) => ({
212 id: createId(),
213 userId: user.id,
214 artistId: artist.id,
215 uri: artist.uri,
216 })),
217 )
218 .run();
219
220 totalArtistsImported += newArtists.length;
221 });
222 }
223
224 logger.info`👤 ${totalArtistsImported} Artists imported`;
225};
226
227const createAlbums = async (albums: Albums, user: SelectUser) => {
228 if (albums.length === 0) return;
229
230 const artists = await Promise.all(
231 albums.map(async (album) =>
232 ctx.db
233 .select()
234 .from(schema.artists)
235 .where(eq(schema.artists.name, album.value.artist))
236 .execute()
237 .then(([artist]) => artist),
238 ),
239 );
240
241 const validAlbumData = albums
242 .map((album, index) => ({ album, artist: artists[index] }))
243 .filter(({ artist }) => artist);
244
245 // Process albums in batches
246 const BATCH_SIZE = 1000;
247 let totalAlbumsImported = 0;
248
249 for (let i = 0; i < validAlbumData.length; i += BATCH_SIZE) {
250 const batch = validAlbumData.slice(i, i + BATCH_SIZE);
251
252 ctx.db.transaction((tx) => {
253 const newAlbums = tx
254 .insert(schema.albums)
255 .values(
256 batch.map(({ album, artist }) => ({
257 id: createId(),
258 cid: album.cid,
259 uri: album.uri,
260 title: album.value.title,
261 artist: album.value.artist,
262 releaseDate: album.value.releaseDate,
263 year: album.value.year,
264 albumArt: album.value.albumArtUrl,
265 artistUri: artist.uri,
266 appleMusicLink: album.value.appleMusicLink,
267 spotifyLink: album.value.spotifyLink,
268 tidalLink: album.value.tidalLink,
269 youtubeLink: album.value.youtubeLink,
270 })),
271 )
272 .onConflictDoNothing({
273 target: schema.albums.cid,
274 })
275 .returning()
276 .all();
277
278 if (newAlbums.length === 0) return;
279
280 tx.insert(schema.userAlbums)
281 .values(
282 newAlbums.map((album) => ({
283 id: createId(),
284 userId: user.id,
285 albumId: album.id,
286 uri: album.uri,
287 })),
288 )
289 .run();
290
291 totalAlbumsImported += newAlbums.length;
292 });
293 }
294
295 logger.info`💿 ${totalAlbumsImported} Albums imported`;
296};
297
298const createSongs = async (songs: Songs, user: SelectUser) => {
299 if (songs.length === 0) return;
300
301 const albums = await Promise.all(
302 songs.map((song) =>
303 ctx.db
304 .select()
305 .from(schema.albums)
306 .where(
307 and(
308 eq(schema.albums.artist, song.value.albumArtist),
309 eq(schema.albums.title, song.value.album),
310 ),
311 )
312 .execute()
313 .then((result) => result[0]),
314 ),
315 );
316
317 const artists = await Promise.all(
318 songs.map((song) =>
319 ctx.db
320 .select()
321 .from(schema.artists)
322 .where(eq(schema.artists.name, song.value.albumArtist))
323 .execute()
324 .then((result) => result[0]),
325 ),
326 );
327
328 const validSongData = songs
329 .map((song, index) => ({
330 song,
331 artist: artists[index],
332 album: albums[index],
333 }))
334 .filter(({ artist, album }) => artist && album);
335
336 // Process in batches to avoid stack overflow with large datasets
337 const BATCH_SIZE = 1000;
338 let totalTracksImported = 0;
339
340 for (let i = 0; i < validSongData.length; i += BATCH_SIZE) {
341 const batch = validSongData.slice(i, i + BATCH_SIZE);
342 const batchNumber = Math.floor(i / BATCH_SIZE) + 1;
343 const totalBatches = Math.ceil(validSongData.length / BATCH_SIZE);
344
345 logger.info`▶️ Processing tracks batch ${batchNumber}/${totalBatches} (${Math.min(i + BATCH_SIZE, validSongData.length)}/${validSongData.length})`;
346
347 ctx.db.transaction((tx) => {
348 const tracks = tx
349 .insert(schema.tracks)
350 .values(
351 batch.map(({ song, artist, album }) => ({
352 id: createId(),
353 cid: song.cid,
354 uri: song.uri,
355 title: song.value.title,
356 artist: song.value.artist,
357 albumArtist: song.value.albumArtist,
358 albumArt: song.value.albumArtUrl,
359 album: song.value.album,
360 trackNumber: song.value.trackNumber,
361 duration: song.value.duration,
362 mbId: song.value.mbid,
363 youtubeLink: song.value.youtubeLink,
364 spotifyLink: song.value.spotifyLink,
365 appleMusicLink: song.value.appleMusicLink,
366 tidalLink: song.value.tidalLink,
367 discNumber: song.value.discNumber,
368 lyrics: song.value.lyrics,
369 composer: song.value.composer,
370 genre: song.value.genre,
371 label: song.value.label,
372 copyrightMessage: song.value.copyrightMessage,
373 albumUri: album.uri,
374 artistUri: artist.uri,
375 })),
376 )
377 .onConflictDoNothing()
378 .returning()
379 .all();
380
381 if (tracks.length === 0) return;
382
383 tx.insert(schema.albumTracks)
384 .values(
385 tracks.map((track, index) => ({
386 id: createId(),
387 albumId: batch[index].album.id,
388 trackId: track.id,
389 })),
390 )
391 .onConflictDoNothing({
392 target: [schema.albumTracks.albumId, schema.albumTracks.trackId],
393 })
394 .run();
395
396 tx.insert(schema.userTracks)
397 .values(
398 tracks.map((track) => ({
399 id: createId(),
400 userId: user.id,
401 trackId: track.id,
402 uri: track.uri,
403 })),
404 )
405 .onConflictDoNothing({
406 target: [schema.userTracks.userId, schema.userTracks.trackId],
407 })
408 .run();
409
410 totalTracksImported += tracks.length;
411 });
412 }
413
414 logger.info`▶️ ${totalTracksImported} Tracks imported`;
415};
416
417const createScrobbles = async (scrobbles: Scrobbles, user: SelectUser) => {
418 if (!scrobbles.length) return;
419
420 logger.info`Loading Scrobble Tracks ...`;
421
422 const tracks = await Promise.all(
423 scrobbles.map((scrobble) =>
424 ctx.db
425 .select()
426 .from(schema.tracks)
427 .where(
428 and(
429 eq(schema.tracks.title, scrobble.value.title),
430 eq(schema.tracks.artist, scrobble.value.artist),
431 eq(schema.tracks.album, scrobble.value.album),
432 eq(schema.tracks.albumArtist, scrobble.value.albumArtist),
433 ),
434 )
435 .execute()
436 .then(([track]) => track),
437 ),
438 );
439
440 logger.info`Loading Scrobble Albums ...`;
441
442 const albums = await Promise.all(
443 scrobbles.map((scrobble) =>
444 ctx.db
445 .select()
446 .from(schema.albums)
447 .where(
448 and(
449 eq(schema.albums.title, scrobble.value.album),
450 eq(schema.albums.artist, scrobble.value.albumArtist),
451 ),
452 )
453 .execute()
454 .then(([album]) => album),
455 ),
456 );
457
458 logger.info`Loading Scrobble Artists ...`;
459
460 const artists = await Promise.all(
461 scrobbles.map((scrobble) =>
462 ctx.db
463 .select()
464 .from(schema.artists)
465 .where(
466 or(
467 and(eq(schema.artists.name, scrobble.value.artist)),
468 and(eq(schema.artists.name, scrobble.value.albumArtist)),
469 ),
470 )
471 .execute()
472 .then(([artist]) => artist),
473 ),
474 );
475
476 const validScrobbleData = scrobbles
477 .map((scrobble, index) => ({
478 scrobble,
479 track: tracks[index],
480 album: albums[index],
481 artist: artists[index],
482 }))
483 .filter(({ track, album, artist }) => track && album && artist);
484
485 // Process in batches to avoid stack overflow with large datasets
486 const BATCH_SIZE = 1000;
487 let totalScrobblesImported = 0;
488
489 for (let i = 0; i < validScrobbleData.length; i += BATCH_SIZE) {
490 const batch = validScrobbleData.slice(i, i + BATCH_SIZE);
491 const batchNumber = Math.floor(i / BATCH_SIZE) + 1;
492 const totalBatches = Math.ceil(validScrobbleData.length / BATCH_SIZE);
493
494 logger.info`🕒 Processing scrobbles batch ${batchNumber}/${totalBatches} (${Math.min(i + BATCH_SIZE, validScrobbleData.length)}/${validScrobbleData.length})`;
495
496 const result = await ctx.db
497 .insert(schema.scrobbles)
498 .values(
499 batch.map(({ scrobble, track, album, artist }) => ({
500 id: createId(),
501 userId: user.id,
502 trackId: track.id,
503 albumId: album.id,
504 artistId: artist.id,
505 uri: scrobble.uri,
506 cid: scrobble.cid,
507 timestamp: new Date(scrobble.value.createdAt),
508 })),
509 )
510 .onConflictDoNothing({
511 target: schema.scrobbles.cid,
512 })
513 .returning()
514 .execute();
515
516 totalScrobblesImported += result.length;
517 }
518
519 logger.info`🕒 ${totalScrobblesImported} scrobbles imported`;
520};
521
522export const subscribeToJetstream = (user: SelectUser): Promise<void> => {
523 const lockFile = path.join(os.tmpdir(), `rocksky-jetstream-${user.did}.lock`);
524 if (fs.existsSync(lockFile)) {
525 logger.warn`JetStream subscription already in progress for user ${user.did}`;
526 logger.warn`Skipping subscription`;
527 logger.warn`Lock file exists at ${lockFile}`;
528 return Promise.resolve();
529 }
530
531 fs.writeFileSync(lockFile, "");
532
533 const client = new JetStreamClient({
534 wantedCollections: [
535 "app.rocksky.scrobble",
536 "app.rocksky.artist",
537 "app.rocksky.album",
538 "app.rocksky.song",
539 ],
540 endpoint: getEndpoint(),
541 wantedDids: [user.did],
542
543 // Reconnection settings
544 maxReconnectAttempts: 10,
545 reconnectDelay: 1000,
546 maxReconnectDelay: 30000,
547 backoffMultiplier: 1.5,
548
549 // Enable debug logging
550 debug: true,
551 });
552
553 return new Promise((resolve, reject) => {
554 client.on("open", () => {
555 logger.info`✅ Connected to JetStream!`;
556 cleanUpJetstreamLockOnExit(user.did);
557 resolve();
558 });
559
560 client.on("message", async (data) => {
561 const event = data as JetStreamEvent;
562
563 if (event.kind === "commit" && event.commit) {
564 const { operation, collection, record, rkey, cid } = event.commit;
565 const uri = `at://${event.did}/${collection}/${rkey}`;
566
567 logger.info`\n📡 New event:`;
568 logger.info` Operation: ${operation}`;
569 logger.info` Collection: ${collection}`;
570 logger.info` DID: ${event.did}`;
571 logger.info` Uri: ${uri}`;
572
573 if (operation === "create" && record) {
574 console.log(JSON.stringify(record, null, 2));
575 await onNewCollection(record, cid, uri, user);
576 }
577
578 logger.info` Cursor: ${event.time_us}`;
579 }
580 });
581
582 client.on("error", (error) => {
583 logger.error`❌ Error: ${error}`;
584 cleanUpJetstreamLockOnExit(user.did);
585 reject(error);
586 });
587
588 client.on("reconnect", (data) => {
589 const { attempt } = data as { attempt: number };
590 logger.info`🔄 Reconnecting... (attempt ${attempt})`;
591 });
592
593 client.connect();
594 });
595};
596
597const onNewCollection = async (
598 record: any,
599 cid: string,
600 uri: string,
601 user: SelectUser,
602) => {
603 switch (record.$type) {
604 case "app.rocksky.song":
605 await onNewSong(record, cid, uri, user);
606 break;
607 case "app.rocksky.album":
608 await onNewAlbum(record, cid, uri, user);
609 break;
610 case "app.rocksky.artist":
611 await onNewArtist(record, cid, uri, user);
612 break;
613 case "app.rocksky.scrobble":
614 await onNewScrobble(record, cid, uri, user);
615 break;
616 default:
617 logger.warn`Unknown collection type: ${record.$type}`;
618 }
619};
620
621const onNewSong = async (
622 record: Song.Record,
623 cid: string,
624 uri: string,
625 user: SelectUser,
626) => {
627 const { title, artist, album } = record;
628 logger.info` New song: ${title} by ${artist} from ${album}`;
629 await createSongs(
630 [
631 {
632 cid,
633 uri,
634 value: record,
635 },
636 ],
637 user,
638 );
639};
640
641const onNewAlbum = async (
642 record: Album.Record,
643 cid: string,
644 uri: string,
645 user: SelectUser,
646) => {
647 const { title, artist } = record;
648 logger.info` New album: ${title} by ${artist}`;
649 await createAlbums(
650 [
651 {
652 cid,
653 uri,
654 value: record,
655 },
656 ],
657 user,
658 );
659};
660
661const onNewArtist = async (
662 record: Artist.Record,
663 cid: string,
664 uri: string,
665 user: SelectUser,
666) => {
667 const { name } = record;
668 logger.info` New artist: ${name}`;
669 await createArtists(
670 [
671 {
672 cid,
673 uri,
674 value: record,
675 },
676 ],
677 user,
678 );
679};
680
681const onNewScrobble = async (
682 record: Scrobble.Record,
683 cid: string,
684 uri: string,
685 user: SelectUser,
686) => {
687 const { title, createdAt, artist, album, albumArtist } = record;
688 logger.info` New scrobble: ${title} at ${createdAt}`;
689
690 // Check if the artist exists, create if not
691 let [artistRecord] = await ctx.db
692 .select()
693 .from(schema.artists)
694 .where(eq(schema.artists.name, record.albumArtist))
695 .execute();
696
697 if (!artistRecord) {
698 logger.info` ⚙️ Artist not found, creating: "${albumArtist}"`;
699
700 // Create a synthetic artist record from scrobble data
701 const artistUri = `at://${user.did}/app.rocksky.artist/${createId()}`;
702 const artistCid = createId();
703
704 await createArtists(
705 [
706 {
707 cid: artistCid,
708 uri: artistUri,
709 value: {
710 $type: "app.rocksky.artist",
711 name: record.albumArtist,
712 createdAt: new Date().toISOString(),
713 tags: record.tags || [],
714 } as Artist.Record,
715 },
716 ],
717 user,
718 );
719
720 [artistRecord] = await ctx.db
721 .select()
722 .from(schema.artists)
723 .where(eq(schema.artists.name, record.albumArtist))
724 .execute();
725
726 if (!artistRecord) {
727 logger.error` ❌ Failed to create artist. Skipping scrobble.`;
728 return;
729 }
730 }
731
732 // Check if the album exists, create if not
733 let [albumRecord] = await ctx.db
734 .select()
735 .from(schema.albums)
736 .where(
737 and(
738 eq(schema.albums.title, record.album),
739 eq(schema.albums.artist, record.albumArtist),
740 ),
741 )
742 .execute();
743
744 if (!albumRecord) {
745 logger.info` ⚙️ Album not found, creating: "${album}" by ${albumArtist}`;
746
747 // Create a synthetic album record from scrobble data
748 const albumUri = `at://${user.did}/app.rocksky.album/${createId()}`;
749 const albumCid = createId();
750
751 await createAlbums(
752 [
753 {
754 cid: albumCid,
755 uri: albumUri,
756 value: {
757 $type: "app.rocksky.album",
758 title: record.album,
759 artist: record.albumArtist,
760 createdAt: new Date().toISOString(),
761 releaseDate: record.releaseDate,
762 year: record.year,
763 albumArt: record.albumArt,
764 artistUri: artistRecord.uri,
765 spotifyLink: record.spotifyLink,
766 appleMusicLink: record.appleMusicLink,
767 tidalLink: record.tidalLink,
768 youtubeLink: record.youtubeLink,
769 } as Album.Record,
770 },
771 ],
772 user,
773 );
774
775 // Fetch the newly created album
776 [albumRecord] = await ctx.db
777 .select()
778 .from(schema.albums)
779 .where(
780 and(
781 eq(schema.albums.title, record.album),
782 eq(schema.albums.artist, record.albumArtist),
783 ),
784 )
785 .execute();
786
787 if (!albumRecord) {
788 logger.error` ❌ Failed to create album. Skipping scrobble.`;
789 return;
790 }
791 }
792
793 // Check if the track exists, create if not
794 let [track] = await ctx.db
795 .select()
796 .from(schema.tracks)
797 .where(
798 and(
799 eq(schema.tracks.title, record.title),
800 eq(schema.tracks.artist, record.artist),
801 eq(schema.tracks.album, record.album),
802 eq(schema.tracks.albumArtist, record.albumArtist),
803 ),
804 )
805 .execute();
806
807 if (!track) {
808 logger.info` ⚙️ Track not found, creating: "${title}" by ${artist} from ${album}`;
809
810 // Create a synthetic track record from scrobble data
811 const trackUri = `at://${user.did}/app.rocksky.song/${createId()}`;
812 const trackCid = createId();
813
814 await createSongs(
815 [
816 {
817 cid: trackCid,
818 uri: trackUri,
819 value: {
820 $type: "app.rocksky.song",
821 title: record.title,
822 artist: record.artist,
823 albumArtist: record.albumArtist,
824 album: record.album,
825 duration: record.duration,
826 trackNumber: record.trackNumber,
827 discNumber: record.discNumber,
828 releaseDate: record.releaseDate,
829 year: record.year,
830 genre: record.genre,
831 tags: record.tags,
832 composer: record.composer,
833 lyrics: record.lyrics,
834 copyrightMessage: record.copyrightMessage,
835 albumArt: record.albumArt,
836 youtubeLink: record.youtubeLink,
837 spotifyLink: record.spotifyLink,
838 tidalLink: record.tidalLink,
839 appleMusicLink: record.appleMusicLink,
840 createdAt: new Date().toISOString(),
841 mbId: record.mbid,
842 label: record.label,
843 albumUri: albumRecord.uri,
844 artistUri: artistRecord.uri,
845 } as Song.Record,
846 },
847 ],
848 user,
849 );
850
851 // Fetch the newly created track
852 [track] = await ctx.db
853 .select()
854 .from(schema.tracks)
855 .where(
856 and(
857 eq(schema.tracks.title, record.title),
858 eq(schema.tracks.artist, record.artist),
859 eq(schema.tracks.album, record.album),
860 eq(schema.tracks.albumArtist, record.albumArtist),
861 ),
862 )
863 .execute();
864
865 if (!track) {
866 logger.error` ❌ Failed to create track. Skipping scrobble.`;
867 return;
868 }
869 }
870
871 logger.info` ✓ All required entities ready. Creating scrobble...`;
872
873 await createScrobbles(
874 [
875 {
876 cid,
877 uri,
878 value: record,
879 },
880 ],
881 user,
882 );
883};
884
885const downloadCarFile = async (agent: Agent) => {
886 logger.info(`Fetching repository CAR file ...`);
887
888 const repoRes = await agent.com.atproto.sync.getRepo({
889 did: agent.assertDid,
890 });
891
892 return CarReader.fromBytes(new Uint8Array(repoRes.data));
893};
894
895const getRockskyUserSongs = async (
896 agent: Agent,
897 carReader: CarReader,
898): Promise<Songs> => {
899 const results: {
900 value: Song.Record;
901 uri: string;
902 cid: string;
903 }[] = [];
904
905 try {
906 const collection = "app.rocksky.song";
907 logger.info`Extracting ${collection} records from CAR file ...`;
908
909 for await (const { cid, bytes } of carReader.blocks()) {
910 try {
911 const decoded = cbor.decode(bytes);
912
913 // Check if this is a record with $type matching our collection
914 if (decoded && typeof decoded === "object" && "$type" in decoded) {
915 if (decoded.$type === collection) {
916 const value = decoded as unknown as Song.Record;
917 // Extract rkey from uri if present in the block, otherwise use cid
918 const uri = `at://${agent.assertDid}/${collection}/${cid.toString()}`;
919
920 results.push({
921 value,
922 uri,
923 cid: cid.toString(),
924 });
925 }
926 }
927 } catch (e) {
928 logger.warn` Skipping block with CID ${cid.toString()} due to decode error: ${e}`;
929 continue;
930 }
931 }
932
933 logger.info(
934 `${chalk.cyanBright(agent.assertDid)} ${chalk.greenBright(results.length)} songs`,
935 );
936 } catch (error) {
937 logger.error(`Error fetching songs from CAR: ${error}`);
938 throw error;
939 }
940
941 return results;
942};
943
944const getRockskyUserAlbums = async (
945 agent: Agent,
946 carReader: CarReader,
947): Promise<Albums> => {
948 const results: {
949 value: Album.Record;
950 uri: string;
951 cid: string;
952 }[] = [];
953
954 try {
955 const collection = "app.rocksky.album";
956 logger.info`Extracting ${collection} records from CAR file ...`;
957
958 for await (const { cid, bytes } of carReader.blocks()) {
959 try {
960 const decoded = cbor.decode(bytes);
961
962 if (decoded && typeof decoded === "object" && "$type" in decoded) {
963 if (decoded.$type === collection) {
964 const value = decoded as unknown as Album.Record;
965 const uri = `at://${agent.assertDid}/${collection}/${cid.toString()}`;
966
967 results.push({
968 value,
969 uri,
970 cid: cid.toString(),
971 });
972 }
973 }
974 } catch (e) {
975 logger.warn` Skipping block with CID ${cid.toString()} due to decode error: ${e}`;
976 continue;
977 }
978 }
979
980 logger.info(
981 `${chalk.cyanBright(agent.assertDid)} ${chalk.greenBright(results.length)} albums`,
982 );
983 } catch (error) {
984 logger.error(`Error fetching albums from CAR: ${error}`);
985 throw error;
986 }
987
988 return results;
989};
990
991const getRockskyUserArtists = async (
992 agent: Agent,
993 carReader: CarReader,
994): Promise<Artists> => {
995 const results: {
996 value: Artist.Record;
997 uri: string;
998 cid: string;
999 }[] = [];
1000
1001 try {
1002 const collection = "app.rocksky.artist";
1003 logger.info`Extracting ${collection} records from CAR file ...`;
1004
1005 for await (const { cid, bytes } of carReader.blocks()) {
1006 try {
1007 const decoded = cbor.decode(bytes);
1008
1009 if (decoded && typeof decoded === "object" && "$type" in decoded) {
1010 if (decoded.$type === collection) {
1011 const value = decoded as unknown as Artist.Record;
1012 const uri = `at://${agent.assertDid}/${collection}/${cid.toString()}`;
1013
1014 results.push({
1015 value,
1016 uri,
1017 cid: cid.toString(),
1018 });
1019 }
1020 }
1021 } catch (e) {
1022 // Skip blocks that can't be decoded
1023 continue;
1024 }
1025 }
1026
1027 logger.info(
1028 `${chalk.cyanBright(agent.assertDid)} ${chalk.greenBright(results.length)} artists`,
1029 );
1030 } catch (error) {
1031 logger.error(`Error fetching artists from CAR: ${error}`);
1032 throw error;
1033 }
1034
1035 return results;
1036};
1037
1038const getRockskyUserScrobbles = async (
1039 agent: Agent,
1040 carReader: CarReader,
1041): Promise<Scrobbles> => {
1042 const results: {
1043 value: Scrobble.Record;
1044 uri: string;
1045 cid: string;
1046 }[] = [];
1047
1048 try {
1049 const collection = "app.rocksky.scrobble";
1050 logger.info`Extracting ${collection} records from CAR file ...`;
1051
1052 for await (const { cid, bytes } of carReader.blocks()) {
1053 try {
1054 const decoded = cbor.decode(bytes);
1055
1056 if (decoded && typeof decoded === "object" && "$type" in decoded) {
1057 if (decoded.$type === collection) {
1058 const value = decoded as unknown as Scrobble.Record;
1059 const uri = `at://${agent.assertDid}/${collection}/${cid.toString()}`;
1060
1061 results.push({
1062 value,
1063 uri,
1064 cid: cid.toString(),
1065 });
1066 }
1067 }
1068 } catch (e) {
1069 logger.warn` Skipping block with CID ${cid.toString()} due to decode error: ${e}`;
1070 continue;
1071 }
1072 }
1073
1074 logger.info(
1075 `${chalk.cyanBright(agent.assertDid)} ${chalk.greenBright(results.length)} scrobbles`,
1076 );
1077 } catch (error) {
1078 logger.error(`Error fetching scrobbles from CAR: ${error}`);
1079 throw error;
1080 }
1081
1082 return results;
1083};