forked from
rocksky.app/rocksky
A decentralized music tracking and discovery platform built on AT Protocol 馃幍
1import chalk from "chalk";
2import { ctx } from "context";
3import { desc, eq, or } from "drizzle-orm";
4import { createHash } from "node:crypto";
5import { publishScrobble } from "nowplaying/nowplaying.service";
6import albums from "../schema/albums";
7import artists from "../schema/artists";
8import scrobbles from "../schema/scrobbles";
9import tracks from "../schema/tracks";
10import users from "../schema/users";
11
12const args = process.argv.slice(2);
13
14async function updateUris(did: string) {
15 // Get scrobbles with track and user data
16 const records = await ctx.db
17 .select({
18 track: tracks,
19 user: users,
20 })
21 .from(scrobbles)
22 .innerJoin(tracks, eq(scrobbles.trackId, tracks.id))
23 .innerJoin(users, eq(scrobbles.userId, users.id))
24 .where(or(eq(users.did, did), eq(users.handle, did)))
25 .orderBy(desc(scrobbles.createdAt))
26 .limit(process.env.SYNC_SIZE ? parseInt(process.env.SYNC_SIZE, 10) : 20);
27
28 for (const { track } of records) {
29 const trackHash = createHash("sha256")
30 .update(`${track.title} - ${track.artist} - ${track.album}`.toLowerCase())
31 .digest("hex");
32
33 const existingTrack = await ctx.db
34 .select()
35 .from(tracks)
36 .where(eq(tracks.sha256, trackHash))
37 .limit(1)
38 .then((rows) => rows[0]);
39
40 if (existingTrack && !existingTrack.albumUri) {
41 console.log(`Updating album uri for ${chalk.cyan(track.id)} ...`);
42
43 const albumHash = createHash("sha256")
44 .update(`${track.album} - ${track.albumArtist}`.toLowerCase())
45 .digest("hex");
46
47 const album = await ctx.db
48 .select()
49 .from(albums)
50 .where(eq(albums.sha256, albumHash))
51 .limit(1)
52 .then((rows) => rows[0]);
53
54 if (album) {
55 await ctx.db
56 .update(tracks)
57 .set({ albumUri: album.uri })
58 .where(eq(tracks.id, existingTrack.id));
59 }
60 }
61
62 if (existingTrack && !existingTrack.artistUri) {
63 console.log(`Updating artist uri for ${chalk.cyan(track.id)} ...`);
64
65 const artistHash = createHash("sha256")
66 .update(track.albumArtist.toLowerCase())
67 .digest("hex");
68
69 const artist = await ctx.db
70 .select()
71 .from(artists)
72 .where(eq(artists.sha256, artistHash))
73 .limit(1)
74 .then((rows) => rows[0]);
75
76 if (artist) {
77 await ctx.db
78 .update(tracks)
79 .set({ artistUri: artist.uri })
80 .where(eq(tracks.id, existingTrack.id));
81 }
82 }
83
84 const albumHash = createHash("sha256")
85 .update(`${track.album} - ${track.albumArtist}`.toLowerCase())
86 .digest("hex");
87
88 const album = await ctx.db
89 .select()
90 .from(albums)
91 .where(eq(albums.sha256, albumHash))
92 .limit(1)
93 .then((rows) => rows[0]);
94
95 if (existingTrack && album && !album.artistUri) {
96 console.log(`Updating artist uri for ${chalk.cyan(album.id)} ...`);
97
98 const artistHash = createHash("sha256")
99 .update(track.albumArtist.toLowerCase())
100 .digest("hex");
101
102 const artist = await ctx.db
103 .select()
104 .from(artists)
105 .where(eq(artists.sha256, artistHash))
106 .limit(1)
107 .then((rows) => rows[0]);
108
109 if (artist) {
110 await ctx.db
111 .update(albums)
112 .set({ artistUri: artist.uri })
113 .where(eq(albums.id, album.id));
114 }
115 }
116 }
117}
118
119if (args.includes("--background")) {
120 console.log("Wait for new scrobbles to sync ...");
121 const sub = ctx.nc.subscribe("rocksky.user.scrobble.sync");
122 for await (const m of sub) {
123 const did = new TextDecoder().decode(m.data);
124 // wait for 15 seconds to ensure the scrobble is fully created
125 await new Promise((resolve) => setTimeout(resolve, 15000));
126 console.log(`Syncing scrobbles ${chalk.magenta(did)} ...`);
127 await updateUris(did);
128
129 const records = await ctx.db
130 .select({
131 scrobble: scrobbles,
132 })
133 .from(scrobbles)
134 .innerJoin(users, eq(scrobbles.userId, users.id))
135 .where(or(eq(users.did, did), eq(users.handle, did)))
136 .orderBy(desc(scrobbles.createdAt))
137 .limit(5);
138
139 for (const { scrobble } of records) {
140 console.log(`Syncing scrobble ${chalk.cyan(scrobble.id)} ...`);
141 try {
142 await publishScrobble(ctx, scrobble.id);
143 } catch (err) {
144 console.error(
145 `Failed to sync scrobble ${chalk.cyan(scrobble.id)}:`,
146 err,
147 );
148 }
149 }
150 }
151 process.exit(0);
152}
153
154for (const arg of args) {
155 console.log(`Syncing scrobbles ${chalk.magenta(arg)} ...`);
156 await updateUris(arg);
157
158 const records = await ctx.db
159 .select({
160 scrobble: scrobbles,
161 })
162 .from(scrobbles)
163 .innerJoin(users, eq(scrobbles.userId, users.id))
164 .where(or(eq(users.did, arg), eq(users.handle, arg)))
165 .orderBy(desc(scrobbles.createdAt))
166 .limit(process.env.SYNC_SIZE ? parseInt(process.env.SYNC_SIZE, 10) : 20);
167
168 for (const { scrobble } of records) {
169 console.log(`Syncing scrobble ${chalk.cyan(scrobble.id)} ...`);
170 try {
171 await publishScrobble(ctx, scrobble.id);
172 } catch (err) {
173 console.error(`Failed to sync scrobble ${chalk.cyan(scrobble.id)}:`, err);
174 }
175 }
176 console.log(`Synced ${chalk.greenBright(records.length)} scrobbles`);
177}
178
179process.exit(0);