A decentralized music tracking and discovery platform built on AT Protocol 馃幍
at b2c53fa246937f481eb20864a1aabad10f9512fb 179 lines 5.3 kB view raw
1import chalk from "chalk"; 2import { ctx } from "context"; 3import { desc, eq, or } from "drizzle-orm"; 4import { createHash } from "node:crypto"; 5import { publishScrobble } from "nowplaying/nowplaying.service"; 6import albums from "../schema/albums"; 7import artists from "../schema/artists"; 8import scrobbles from "../schema/scrobbles"; 9import tracks from "../schema/tracks"; 10import users from "../schema/users"; 11 12const args = process.argv.slice(2); 13 14async function updateUris(did: string) { 15 // Get scrobbles with track and user data 16 const records = await ctx.db 17 .select({ 18 track: tracks, 19 user: users, 20 }) 21 .from(scrobbles) 22 .innerJoin(tracks, eq(scrobbles.trackId, tracks.id)) 23 .innerJoin(users, eq(scrobbles.userId, users.id)) 24 .where(or(eq(users.did, did), eq(users.handle, did))) 25 .orderBy(desc(scrobbles.createdAt)) 26 .limit(process.env.SYNC_SIZE ? parseInt(process.env.SYNC_SIZE, 10) : 20); 27 28 for (const { track } of records) { 29 const trackHash = createHash("sha256") 30 .update(`${track.title} - ${track.artist} - ${track.album}`.toLowerCase()) 31 .digest("hex"); 32 33 const existingTrack = await ctx.db 34 .select() 35 .from(tracks) 36 .where(eq(tracks.sha256, trackHash)) 37 .limit(1) 38 .then((rows) => rows[0]); 39 40 if (existingTrack && !existingTrack.albumUri) { 41 console.log(`Updating album uri for ${chalk.cyan(track.id)} ...`); 42 43 const albumHash = createHash("sha256") 44 .update(`${track.album} - ${track.albumArtist}`.toLowerCase()) 45 .digest("hex"); 46 47 const album = await ctx.db 48 .select() 49 .from(albums) 50 .where(eq(albums.sha256, albumHash)) 51 .limit(1) 52 .then((rows) => rows[0]); 53 54 if (album) { 55 await ctx.db 56 .update(tracks) 57 .set({ albumUri: album.uri }) 58 .where(eq(tracks.id, existingTrack.id)); 59 } 60 } 61 62 if (existingTrack && !existingTrack.artistUri) { 63 console.log(`Updating artist uri for ${chalk.cyan(track.id)} ...`); 64 65 const artistHash = createHash("sha256") 66 .update(track.albumArtist.toLowerCase()) 67 .digest("hex"); 68 69 const artist = await ctx.db 70 .select() 71 .from(artists) 72 .where(eq(artists.sha256, artistHash)) 73 .limit(1) 74 .then((rows) => rows[0]); 75 76 if (artist) { 77 await ctx.db 78 .update(tracks) 79 .set({ artistUri: artist.uri }) 80 .where(eq(tracks.id, existingTrack.id)); 81 } 82 } 83 84 const albumHash = createHash("sha256") 85 .update(`${track.album} - ${track.albumArtist}`.toLowerCase()) 86 .digest("hex"); 87 88 const album = await ctx.db 89 .select() 90 .from(albums) 91 .where(eq(albums.sha256, albumHash)) 92 .limit(1) 93 .then((rows) => rows[0]); 94 95 if (existingTrack && album && !album.artistUri) { 96 console.log(`Updating artist uri for ${chalk.cyan(album.id)} ...`); 97 98 const artistHash = createHash("sha256") 99 .update(track.albumArtist.toLowerCase()) 100 .digest("hex"); 101 102 const artist = await ctx.db 103 .select() 104 .from(artists) 105 .where(eq(artists.sha256, artistHash)) 106 .limit(1) 107 .then((rows) => rows[0]); 108 109 if (artist) { 110 await ctx.db 111 .update(albums) 112 .set({ artistUri: artist.uri }) 113 .where(eq(albums.id, album.id)); 114 } 115 } 116 } 117} 118 119if (args.includes("--background")) { 120 console.log("Wait for new scrobbles to sync ..."); 121 const sub = ctx.nc.subscribe("rocksky.user.scrobble.sync"); 122 for await (const m of sub) { 123 const did = new TextDecoder().decode(m.data); 124 // wait for 15 seconds to ensure the scrobble is fully created 125 await new Promise((resolve) => setTimeout(resolve, 15000)); 126 console.log(`Syncing scrobbles ${chalk.magenta(did)} ...`); 127 await updateUris(did); 128 129 const records = await ctx.db 130 .select({ 131 scrobble: scrobbles, 132 }) 133 .from(scrobbles) 134 .innerJoin(users, eq(scrobbles.userId, users.id)) 135 .where(or(eq(users.did, did), eq(users.handle, did))) 136 .orderBy(desc(scrobbles.createdAt)) 137 .limit(5); 138 139 for (const { scrobble } of records) { 140 console.log(`Syncing scrobble ${chalk.cyan(scrobble.id)} ...`); 141 try { 142 await publishScrobble(ctx, scrobble.id); 143 } catch (err) { 144 console.error( 145 `Failed to sync scrobble ${chalk.cyan(scrobble.id)}:`, 146 err, 147 ); 148 } 149 } 150 } 151 process.exit(0); 152} 153 154for (const arg of args) { 155 console.log(`Syncing scrobbles ${chalk.magenta(arg)} ...`); 156 await updateUris(arg); 157 158 const records = await ctx.db 159 .select({ 160 scrobble: scrobbles, 161 }) 162 .from(scrobbles) 163 .innerJoin(users, eq(scrobbles.userId, users.id)) 164 .where(or(eq(users.did, arg), eq(users.handle, arg))) 165 .orderBy(desc(scrobbles.createdAt)) 166 .limit(process.env.SYNC_SIZE ? parseInt(process.env.SYNC_SIZE, 10) : 20); 167 168 for (const { scrobble } of records) { 169 console.log(`Syncing scrobble ${chalk.cyan(scrobble.id)} ...`); 170 try { 171 await publishScrobble(ctx, scrobble.id); 172 } catch (err) { 173 console.error(`Failed to sync scrobble ${chalk.cyan(scrobble.id)}:`, err); 174 } 175 } 176 console.log(`Synced ${chalk.greenBright(records.length)} scrobbles`); 177} 178 179process.exit(0);