A decentralized music tracking and discovery platform built on AT Protocol 馃幍
at 8bf9e3af3ebedb70b5f72f0801778399d5df3bb6 168 lines 5.0 kB view raw
1import chalk from "chalk"; 2import { ctx } from "context"; 3import { desc, eq, or } from "drizzle-orm"; 4import { createHash } from "node:crypto"; 5import { publishScrobble } from "nowplaying/nowplaying.service"; 6import albums from "../schema/albums"; 7import artists from "../schema/artists"; 8import scrobbles from "../schema/scrobbles"; 9import tracks from "../schema/tracks"; 10import users from "../schema/users"; 11 12const args = process.argv.slice(2); 13 14async function updateUris(did: string) { 15 // Get scrobbles with track and user data 16 const records = await ctx.db 17 .select({ 18 track: tracks, 19 user: users, 20 }) 21 .from(scrobbles) 22 .innerJoin(tracks, eq(scrobbles.trackId, tracks.id)) 23 .innerJoin(users, eq(scrobbles.userId, users.id)) 24 .where(or(eq(users.did, did), eq(users.handle, did))) 25 .orderBy(desc(scrobbles.createdAt)) 26 .limit(process.env.SYNC_SIZE ? parseInt(process.env.SYNC_SIZE, 10) : 20); 27 28 for (const { track } of records) { 29 const trackHash = createHash("sha256") 30 .update(`${track.title} - ${track.artist} - ${track.album}`.toLowerCase()) 31 .digest("hex"); 32 33 const existingTrack = await ctx.db 34 .select() 35 .from(tracks) 36 .where(eq(tracks.sha256, trackHash)) 37 .limit(1) 38 .then((rows) => rows[0]); 39 40 if (existingTrack && !existingTrack.albumUri) { 41 console.log(`Updating album uri for ${chalk.cyan(track.id)} ...`); 42 43 const albumHash = createHash("sha256") 44 .update(`${track.album} - ${track.albumArtist}`.toLowerCase()) 45 .digest("hex"); 46 47 const album = await ctx.db 48 .select() 49 .from(albums) 50 .where(eq(albums.sha256, albumHash)) 51 .limit(1) 52 .then((rows) => rows[0]); 53 54 if (album) { 55 await ctx.db 56 .update(tracks) 57 .set({ albumUri: album.uri }) 58 .where(eq(tracks.id, existingTrack.id)); 59 } 60 } 61 62 if (existingTrack && !existingTrack.artistUri) { 63 console.log(`Updating artist uri for ${chalk.cyan(track.id)} ...`); 64 65 const artistHash = createHash("sha256") 66 .update(track.albumArtist.toLowerCase()) 67 .digest("hex"); 68 69 const artist = await ctx.db 70 .select() 71 .from(artists) 72 .where(eq(artists.sha256, artistHash)) 73 .limit(1) 74 .then((rows) => rows[0]); 75 76 if (artist) { 77 await ctx.db 78 .update(tracks) 79 .set({ artistUri: artist.uri }) 80 .where(eq(tracks.id, existingTrack.id)); 81 } 82 } 83 84 const albumHash = createHash("sha256") 85 .update(`${track.album} - ${track.albumArtist}`.toLowerCase()) 86 .digest("hex"); 87 88 const album = await ctx.db 89 .select() 90 .from(albums) 91 .where(eq(albums.sha256, albumHash)) 92 .limit(1) 93 .then((rows) => rows[0]); 94 95 if (existingTrack && album && !album.artistUri) { 96 console.log(`Updating artist uri for ${chalk.cyan(album.id)} ...`); 97 98 const artistHash = createHash("sha256") 99 .update(track.albumArtist.toLowerCase()) 100 .digest("hex"); 101 102 const artist = await ctx.db 103 .select() 104 .from(artists) 105 .where(eq(artists.sha256, artistHash)) 106 .limit(1) 107 .then((rows) => rows[0]); 108 109 if (artist) { 110 await ctx.db 111 .update(albums) 112 .set({ artistUri: artist.uri }) 113 .where(eq(albums.id, album.id)); 114 } 115 } 116 } 117} 118 119if (args.includes("--background")) { 120 console.log("Wait for new scrobbles to sync ..."); 121 const sub = ctx.nc.subscribe("rocksky.user.scrobble.sync"); 122 for await (const m of sub) { 123 const did = new TextDecoder().decode(m.data); 124 // wait for 15 seconds to ensure the scrobble is fully created 125 await new Promise((resolve) => setTimeout(resolve, 15000)); 126 console.log(`Syncing scrobbles ${chalk.magenta(did)} ...`); 127 await updateUris(did); 128 129 const records = await ctx.db 130 .select({ 131 scrobble: scrobbles, 132 }) 133 .from(scrobbles) 134 .innerJoin(users, eq(scrobbles.userId, users.id)) 135 .where(or(eq(users.did, did), eq(users.handle, did))) 136 .orderBy(desc(scrobbles.createdAt)) 137 .limit(5); 138 139 for (const { scrobble } of records) { 140 console.log(`Syncing scrobble ${chalk.cyan(scrobble.id)} ...`); 141 await publishScrobble(ctx, scrobble.id); 142 } 143 } 144 process.exit(0); 145} 146 147for (const arg of args) { 148 console.log(`Syncing scrobbles ${chalk.magenta(arg)} ...`); 149 await updateUris(arg); 150 151 const records = await ctx.db 152 .select({ 153 scrobble: scrobbles, 154 }) 155 .from(scrobbles) 156 .innerJoin(users, eq(scrobbles.userId, users.id)) 157 .where(or(eq(users.did, arg), eq(users.handle, arg))) 158 .orderBy(desc(scrobbles.createdAt)) 159 .limit(process.env.SYNC_SIZE ? parseInt(process.env.SYNC_SIZE) : 20); 160 161 for (const { scrobble } of records) { 162 console.log(`Syncing scrobble ${chalk.cyan(scrobble.id)} ...`); 163 await publishScrobble(ctx, scrobble.id); 164 } 165 console.log(`Synced ${chalk.greenBright(records.length)} scrobbles`); 166} 167 168process.exit(0);