A decentralized music tracking and discovery platform built on AT Protocol 馃幍 rocksky.app
spotify atproto lastfm musicbrainz scrobbling listenbrainz
at main 298 lines 7.8 kB view raw
1import type { Agent } from "@atproto/api"; 2import { consola } from "consola"; 3import type { Context } from "context"; 4import { and, eq } from "drizzle-orm"; 5import { deepSnakeCaseKeys } from "lib"; 6import { createHash } from "node:crypto"; 7import { 8 putAlbumRecord, 9 putArtistRecord, 10 putSongRecord, 11} from "nowplaying/nowplaying.service"; 12import tables from "schema"; 13import type { Track } from "types/track"; 14 15const { tracks, albums, artists, albumTracks, artistTracks, artistAlbums } = 16 tables; 17 18export async function saveTrack(ctx: Context, track: Track, agent: Agent) { 19 const trackHash = createHash("sha256") 20 .update(`${track.title} - ${track.artist} - ${track.album}`.toLowerCase()) 21 .digest("hex"); 22 23 const existingTrack = await ctx.db 24 .select() 25 .from(tracks) 26 .where(eq(tracks.sha256, trackHash)) 27 .limit(1) 28 .then((results) => results[0]); 29 30 let trackUri = existingTrack?.uri; 31 if (!existingTrack?.uri) { 32 trackUri = await putSongRecord(track, agent); 33 } 34 35 // start update existing track with album and artist uri 36 if (existingTrack && !existingTrack.albumUri) { 37 const albumHash = createHash("sha256") 38 .update(`${track.album} - ${track.albumArtist}`.toLowerCase()) 39 .digest("hex"); 40 41 const album = await ctx.db 42 .select() 43 .from(albums) 44 .where(eq(albums.sha256, albumHash)) 45 .limit(1) 46 .then((results) => results[0]); 47 48 if (album) { 49 await ctx.db 50 .update(tracks) 51 .set({ albumUri: album.uri }) 52 .where(eq(tracks.id, existingTrack.id)); 53 } 54 } 55 56 if (existingTrack && !existingTrack.artistUri) { 57 const artistHash = createHash("sha256") 58 .update(track.albumArtist.toLowerCase()) 59 .digest("hex"); 60 61 const artist = await ctx.db 62 .select() 63 .from(artists) 64 .where(eq(artists.sha256, artistHash)) 65 .limit(1) 66 .then((results) => results[0]); 67 68 if (artist) { 69 await ctx.db 70 .update(tracks) 71 .set({ artistUri: artist.uri }) 72 .where(eq(tracks.id, existingTrack.id)); 73 } 74 } 75 // end 76 77 const artistHash = createHash("sha256") 78 .update(track.albumArtist.toLowerCase()) 79 .digest("hex"); 80 81 const existingArtist = await ctx.db 82 .select() 83 .from(artists) 84 .where(eq(artists.sha256, artistHash)) 85 .limit(1) 86 .then((results) => results[0]); 87 88 let artistUri = existingArtist?.uri; 89 if (!existingArtist?.uri) { 90 artistUri = await putArtistRecord(track, agent); 91 } 92 93 const albumHash = createHash("sha256") 94 .update(`${track.album} - ${track.albumArtist}`.toLowerCase()) 95 .digest("hex"); 96 97 const existingAlbum = await ctx.db 98 .select() 99 .from(albums) 100 .where(eq(albums.sha256, albumHash)) 101 .limit(1) 102 .then((results) => results[0]); 103 104 let albumUri = existingAlbum?.uri; 105 if (!existingAlbum?.uri) { 106 albumUri = await putAlbumRecord(track, agent); 107 } 108 109 let tries = 0; 110 111 while (tries < 15) { 112 const track_id = await ctx.db 113 .select() 114 .from(tracks) 115 .where(eq(tracks.uri, trackUri)) 116 .limit(1) 117 .then((results) => results[0]); 118 119 const album_id = await ctx.db 120 .select() 121 .from(albums) 122 .where(eq(albums.uri, albumUri)) 123 .limit(1) 124 .then((results) => results[0]); 125 126 const artist_id = await ctx.db 127 .select() 128 .from(artists) 129 .where(eq(artists.uri, artistUri)) 130 .limit(1) 131 .then((results) => results[0]); 132 133 if (!track_id || !album_id || !artist_id) { 134 consola.info( 135 "Track not yet saved (uri not saved), retrying...", 136 tries + 1, 137 ); 138 await new Promise((resolve) => setTimeout(resolve, 1000)); 139 tries += 1; 140 continue; 141 } 142 143 const album_track = await ctx.db 144 .select() 145 .from(albumTracks) 146 .where( 147 and( 148 eq(albumTracks.albumId, album_id.id), 149 eq(albumTracks.trackId, track_id.id), 150 ), 151 ) 152 .limit(1) 153 .then((results) => results[0]); 154 155 const artist_track = await ctx.db 156 .select() 157 .from(artistTracks) 158 .where( 159 and( 160 eq(artistTracks.artistId, artist_id.id), 161 eq(artistTracks.trackId, track_id.id), 162 ), 163 ) 164 .limit(1) 165 .then((results) => results[0]); 166 167 const artist_album = await ctx.db 168 .select() 169 .from(artistAlbums) 170 .where( 171 and( 172 eq(artistAlbums.artistId, artist_id.id), 173 eq(artistAlbums.albumId, album_id.id), 174 ), 175 ) 176 .limit(1) 177 .then((results) => results[0]); 178 179 if (!album_track) { 180 await ctx.db.insert(albumTracks).values({ 181 albumId: album_id.id, 182 trackId: track_id.id, 183 }); 184 } 185 186 if (!artist_track) { 187 await ctx.db.insert(artistTracks).values({ 188 artistId: artist_id.id, 189 trackId: track_id.id, 190 }); 191 } 192 193 if (!artist_album) { 194 await ctx.db.insert(artistAlbums).values({ 195 artistId: artist_id.id, 196 albumId: album_id.id, 197 }); 198 } 199 200 if (track_id && !track_id.albumUri) { 201 await ctx.db 202 .update(tracks) 203 .set({ albumUri: album_id.uri }) 204 .where(eq(tracks.id, track_id.id)); 205 } 206 207 if (track_id && !track_id.artistUri) { 208 await ctx.db 209 .update(tracks) 210 .set({ artistUri: artist_id.uri }) 211 .where(eq(tracks.id, track_id.id)); 212 } 213 214 if ( 215 album_track && 216 artist_track && 217 artist_album && 218 track_id && 219 track_id.albumUri && 220 track_id.artistUri 221 ) { 222 consola.info("Track saved successfully after", tries + 1, "tries"); 223 224 const message = JSON.stringify( 225 deepSnakeCaseKeys({ 226 track: { 227 ...track_id, 228 xata_id: track_id.id, 229 xata_createdat: track_id.createdAt.toISOString(), 230 xata_updatedat: track_id.updatedAt.toISOString(), 231 }, 232 album_track: { 233 ...album_track, 234 album_id: { 235 xata_id: album_track.albumId, 236 }, 237 track_id: { 238 xata_id: album_track.trackId, 239 }, 240 xata_id: album_track.id, 241 xata_createdat: album_track.createdAt.toISOString(), 242 xata_updatedat: album_track.updatedAt.toISOString(), 243 }, 244 artist_track: { 245 ...artist_track, 246 artist_id: { 247 xata_id: artist_track.artistId, 248 }, 249 track_id: { 250 xata_id: artist_track.trackId, 251 }, 252 xata_id: artist_track.id, 253 xata_createdat: artist_track.createdAt.toISOString(), 254 xata_updatedat: artist_track.updatedAt.toISOString(), 255 }, 256 artist_album: { 257 ...artist_album, 258 artist_id: { 259 xata_id: artist_album.artistId, 260 }, 261 album_id: { 262 xata_id: artist_album.albumId, 263 }, 264 xata_id: artist_album.id, 265 xata_createdat: artist_album.createdAt.toISOString(), 266 xata_updatedat: artist_album.updatedAt.toISOString(), 267 }, 268 }), 269 ); 270 271 ctx.nc.publish( 272 "rocksky.track", 273 Buffer.from(message.replaceAll("sha_256", "sha256")), 274 ); 275 break; 276 } 277 278 tries += 1; 279 consola.info("Track not yet saved, retrying...", tries + 1); 280 if (tries === 15) { 281 consola.info(">>>"); 282 consola.info(album_track); 283 consola.info(artist_track); 284 consola.info(artist_album); 285 consola.info(artist_id); 286 consola.info(album_id); 287 consola.info(track_id); 288 consola.info(track_id.albumUri); 289 consola.info(track_id.artistUri); 290 consola.info("<<<"); 291 } 292 await new Promise((resolve) => setTimeout(resolve, 1000)); 293 } 294 295 if (tries === 15) { 296 consola.info("Failed to save track after 15 tries"); 297 } 298}