A decentralized music tracking and discovery platform built on AT Protocol 馃幍 rocksky.app
spotify atproto lastfm musicbrainz scrobbling listenbrainz
at main 410 lines 12 kB view raw
1import { MatchTrackResult } from "lib/matchTrack"; 2import { logger } from "logger"; 3import dayjs from "dayjs"; 4import { createAgent } from "lib/agent"; 5import { getDidAndHandle } from "lib/getDidAndHandle"; 6import { ctx } from "context"; 7import schema from "schema"; 8import { and, eq, gte, lte, or, sql } from "drizzle-orm"; 9import os from "node:os"; 10import path from "node:path"; 11import fs from "node:fs"; 12import chalk from "chalk"; 13import * as Album from "lexicon/types/app/rocksky/album"; 14import * as Artist from "lexicon/types/app/rocksky/artist"; 15import * as Scrobble from "lexicon/types/app/rocksky/scrobble"; 16import * as Song from "lexicon/types/app/rocksky/song"; 17import { TID } from "@atproto/common"; 18import { Agent } from "@atproto/api"; 19import { createUser, subscribeToJetstream, sync } from "cmd/sync"; 20import _ from "lodash"; 21 22export async function publishScrobble( 23 track: MatchTrackResult, 24 timestamp?: number, 25 dryRun?: boolean, 26) { 27 const [did, handle] = await getDidAndHandle(); 28 const agent: Agent = await createAgent(did, handle); 29 const recentScrobble = await getRecentScrobble(did, track, timestamp); 30 const user = await createUser(agent, did, handle); 31 await subscribeToJetstream(user); 32 33 const lockFilePath = path.join(os.tmpdir(), `rocksky-${did}.lock`); 34 35 if (fs.existsSync(lockFilePath)) { 36 logger.error( 37 `${chalk.greenBright(handle)} Scrobble publishing failed: lock file exists, maybe rocksky-cli is still syncing?\nPlease wait for rocksky to finish syncing before publishing scrobbles or delete the lock file manually ${chalk.greenBright(lockFilePath)}`, 38 ); 39 return false; 40 } 41 42 if (recentScrobble) { 43 logger.info`${handle} Skipping scrobble for ${track.title} by ${track.artist} at ${timestamp ? dayjs.unix(timestamp).format("YYYY-MM-DD HH:mm:ss") : dayjs().format("YYYY-MM-DD HH:mm:ss")} (already scrobbled)`; 44 return true; 45 } 46 47 const totalScrobbles = await countScrobbles(did); 48 if (totalScrobbles === 0) { 49 logger.warn`${handle} No scrobbles found for this user. Are you sure you have successfully synced your scrobbles locally?\nIf not, please run ${"rocksky sync"} to sync your scrobbles before publishing scrobbles.`; 50 } 51 52 logger.info`${handle} Publishing scrobble for ${track.title} by ${track.artist} at ${timestamp ? dayjs.unix(timestamp).format("YYYY-MM-DD HH:mm:ss") : dayjs().format("YYYY-MM-DD HH:mm:ss")}`; 53 54 if (await shouldSync(agent)) { 55 logger.info`${handle} Syncing scrobbles before publishing`; 56 await sync(); 57 } else { 58 logger.info`${handle} Local scrobbles are up-to-date, skipping sync`; 59 } 60 61 if (dryRun) { 62 logger.info`${handle} Dry run: Skipping publishing scrobble for ${track.title} by ${track.artist} at ${timestamp ? dayjs.unix(timestamp).format("YYYY-MM-DD HH:mm:ss") : dayjs().format("YYYY-MM-DD HH:mm:ss")}`; 63 return true; 64 } 65 66 const existingTrack = await ctx.db 67 .select() 68 .from(schema.tracks) 69 .where( 70 or( 71 and( 72 sql`LOWER(${schema.tracks.title}) = LOWER(${track.title})`, 73 sql`LOWER(${schema.tracks.artist}) = LOWER(${track.artist})`, 74 ), 75 and( 76 sql`LOWER(${schema.tracks.title}) = LOWER(${track.title})`, 77 sql`LOWER(${schema.tracks.albumArtist}) = LOWER(${track.artist})`, 78 ), 79 and( 80 sql`LOWER(${schema.tracks.title}) = LOWER(${track.title})`, 81 sql`LOWER(${schema.tracks.albumArtist}) = LOWER(${track.albumArtist})`, 82 ), 83 ), 84 ) 85 .limit(1) 86 .execute() 87 .then((rows) => rows[0]); 88 89 if (!existingTrack) { 90 await putSongRecord(agent, track); 91 } 92 93 const existingArtist = await ctx.db 94 .select() 95 .from(schema.artists) 96 .where( 97 or( 98 sql`LOWER(${schema.artists.name}) = LOWER(${track.artist})`, 99 sql`LOWER(${schema.artists.name}) = LOWER(${track.albumArtist})`, 100 ), 101 ) 102 .limit(1) 103 .execute() 104 .then((rows) => rows[0]); 105 106 if (!existingArtist) { 107 await putArtistRecord(agent, track); 108 } 109 110 const existingAlbum = await ctx.db 111 .select() 112 .from(schema.albums) 113 .where( 114 and( 115 sql`LOWER(${schema.albums.title}) = LOWER(${track.album})`, 116 sql`LOWER(${schema.albums.artist}) = LOWER(${track.albumArtist})`, 117 ), 118 ) 119 .limit(1) 120 .execute() 121 .then((rows) => rows[0]); 122 123 if (!existingAlbum) { 124 await putAlbumRecord(agent, track); 125 } 126 127 const scrobbleUri = await putScrobbleRecord(agent, track, timestamp); 128 129 // wait for the scrobble to be published 130 if (scrobbleUri) { 131 const MAX_ATTEMPTS = 40; 132 let attempts = 0; 133 do { 134 const count = await ctx.db 135 .select({ 136 count: sql`COUNT(*)`, 137 }) 138 .from(schema.scrobbles) 139 .where(eq(schema.scrobbles.uri, scrobbleUri)) 140 .execute() 141 .then((rows) => _.get(rows, "[0].count", 0) as number); 142 143 if (count > 0 || attempts >= MAX_ATTEMPTS) { 144 if (attempts == MAX_ATTEMPTS) { 145 logger.error`Failed to detect published scrobble after ${MAX_ATTEMPTS} attempts`; 146 } 147 break; 148 } 149 150 await new Promise((resolve) => setTimeout(resolve, 600)); 151 attempts += 1; 152 } while (true); 153 } 154 155 return true; 156} 157 158async function getRecentScrobble( 159 did: string, 160 track: MatchTrackResult, 161 timestamp?: number, 162) { 163 const scrobbleTime = dayjs.unix(timestamp || dayjs().unix()); 164 return ctx.db 165 .select({ 166 scrobble: schema.scrobbles, 167 user: schema.users, 168 track: schema.tracks, 169 }) 170 .from(schema.scrobbles) 171 .innerJoin(schema.users, eq(schema.scrobbles.userId, schema.users.id)) 172 .innerJoin(schema.tracks, eq(schema.scrobbles.trackId, schema.tracks.id)) 173 .where( 174 and( 175 eq(schema.users.did, did), 176 sql`LOWER(${schema.tracks.title}) = LOWER(${track.title})`, 177 sql`LOWER(${schema.tracks.artist}) = LOWER(${track.artist})`, 178 gte( 179 schema.scrobbles.timestamp, 180 scrobbleTime.subtract(60, "seconds").toDate(), 181 ), 182 lte( 183 schema.scrobbles.timestamp, 184 scrobbleTime.add(60, "seconds").toDate(), 185 ), 186 ), 187 ) 188 .limit(1) 189 .then((rows) => rows[0]); 190} 191 192async function countScrobbles(did: string): Promise<number> { 193 return ctx.db 194 .select({ count: sql<number>`count(*)` }) 195 .from(schema.scrobbles) 196 .innerJoin(schema.users, eq(schema.scrobbles.userId, schema.users.id)) 197 .where(eq(schema.users.did, did)) 198 .then((rows) => rows[0].count); 199} 200 201async function putSongRecord(agent: Agent, track: MatchTrackResult) { 202 const rkey = TID.nextStr(); 203 204 const record: Song.Record = { 205 $type: "app.rocksky.song", 206 title: track.title, 207 artist: track.artist, 208 artists: track.mbArtists === null ? undefined : track.mbArtists, 209 album: track.album, 210 albumArtist: track.albumArtist, 211 duration: track.duration, 212 releaseDate: track.releaseDate 213 ? new Date(track.releaseDate).toISOString() 214 : undefined, 215 year: track.year === null ? undefined : track.year, 216 albumArtUrl: track.albumArt, 217 composer: track.composer ? track.composer : undefined, 218 lyrics: track.lyrics ? track.lyrics : undefined, 219 trackNumber: track.trackNumber, 220 discNumber: track.discNumber === 0 ? 1 : track.discNumber, 221 copyrightMessage: track.copyrightMessage 222 ? track.copyrightMessage 223 : undefined, 224 createdAt: new Date().toISOString(), 225 spotifyLink: track.spotifyLink ? track.spotifyLink : undefined, 226 tags: track.genres || [], 227 mbid: track.mbId, 228 }; 229 230 if (!Song.validateRecord(record).success) { 231 logger.info`${Song.validateRecord(record)}`; 232 logger.info`${record}`; 233 throw new Error("Invalid Song record"); 234 } 235 236 try { 237 const res = await agent.com.atproto.repo.putRecord({ 238 repo: agent.assertDid, 239 collection: "app.rocksky.song", 240 rkey, 241 record, 242 validate: false, 243 }); 244 const uri = res.data.uri; 245 logger.info`Song record created at ${uri}`; 246 return uri; 247 } catch (e) { 248 logger.error`Error creating song record: ${e}`; 249 return null; 250 } 251} 252 253async function putArtistRecord(agent: Agent, track: MatchTrackResult) { 254 const rkey = TID.nextStr(); 255 const record: Artist.Record = { 256 $type: "app.rocksky.artist", 257 name: track.albumArtist, 258 createdAt: new Date().toISOString(), 259 pictureUrl: track.artistPicture || undefined, 260 tags: track.genres || [], 261 }; 262 263 if (!Artist.validateRecord(record).success) { 264 logger.info`${Artist.validateRecord(record)}`; 265 logger.info`${record}`; 266 throw new Error("Invalid Artist record"); 267 } 268 269 try { 270 const res = await agent.com.atproto.repo.putRecord({ 271 repo: agent.assertDid, 272 collection: "app.rocksky.artist", 273 rkey, 274 record, 275 validate: false, 276 }); 277 const uri = res.data.uri; 278 logger.info`Artist record created at ${uri}`; 279 return uri; 280 } catch (e) { 281 logger.error`Error creating artist record: ${e}`; 282 return null; 283 } 284} 285 286async function putAlbumRecord(agent: Agent, track: MatchTrackResult) { 287 const rkey = TID.nextStr(); 288 289 const record = { 290 $type: "app.rocksky.album", 291 title: track.album, 292 artist: track.albumArtist, 293 year: track.year === null ? undefined : track.year, 294 releaseDate: track.releaseDate 295 ? new Date(track.releaseDate).toISOString() 296 : undefined, 297 createdAt: new Date().toISOString(), 298 albumArtUrl: track.albumArt, 299 }; 300 301 if (!Album.validateRecord(record).success) { 302 logger.info`${Album.validateRecord(record)}`; 303 logger.info`${record}`; 304 throw new Error("Invalid Album record"); 305 } 306 307 try { 308 const res = await agent.com.atproto.repo.putRecord({ 309 repo: agent.assertDid, 310 collection: "app.rocksky.album", 311 rkey, 312 record, 313 validate: false, 314 }); 315 const uri = res.data.uri; 316 logger.info`Album record created at ${uri}`; 317 return uri; 318 } catch (e) { 319 logger.error`Error creating album record: ${e}`; 320 return null; 321 } 322} 323 324async function putScrobbleRecord( 325 agent: Agent, 326 track: MatchTrackResult, 327 timestamp?: number, 328) { 329 const rkey = TID.nextStr(); 330 331 const record: Scrobble.Record = { 332 $type: "app.rocksky.scrobble", 333 title: track.title, 334 albumArtist: track.albumArtist, 335 albumArtUrl: track.albumArt, 336 artist: track.artist, 337 artists: track.mbArtists === null ? undefined : track.mbArtists, 338 album: track.album, 339 duration: track.duration, 340 trackNumber: track.trackNumber, 341 discNumber: track.discNumber === 0 ? 1 : track.discNumber, 342 releaseDate: track.releaseDate 343 ? new Date(track.releaseDate).toISOString() 344 : undefined, 345 year: track.year === null ? undefined : track.year, 346 composer: track.composer ? track.composer : undefined, 347 lyrics: track.lyrics ? track.lyrics : undefined, 348 copyrightMessage: track.copyrightMessage 349 ? track.copyrightMessage 350 : undefined, 351 createdAt: timestamp 352 ? dayjs.unix(timestamp).toISOString() 353 : new Date().toISOString(), 354 spotifyLink: track.spotifyLink ? track.spotifyLink : undefined, 355 tags: track.genres || [], 356 mbid: track.mbId, 357 }; 358 359 if (!Scrobble.validateRecord(record).success) { 360 logger.info`${Scrobble.validateRecord(record)}`; 361 logger.info`${record}`; 362 throw new Error("Invalid Scrobble record"); 363 } 364 365 try { 366 const res = await agent.com.atproto.repo.putRecord({ 367 repo: agent.assertDid, 368 collection: "app.rocksky.scrobble", 369 rkey, 370 record, 371 validate: false, 372 }); 373 const uri = res.data.uri; 374 logger.info`Scrobble record created at ${uri}`; 375 return uri; 376 } catch (e) { 377 logger.error`Error creating scrobble record: ${e}`; 378 return null; 379 } 380} 381 382async function shouldSync(agent: Agent): Promise<boolean> { 383 const res = await agent.com.atproto.repo.listRecords({ 384 repo: agent.assertDid, 385 collection: "app.rocksky.scrobble", 386 limit: 1, 387 }); 388 389 const records = res.data.records as Array<{ 390 uri: string; 391 cid: string; 392 value: Scrobble.Record; 393 }>; 394 395 if (!records.length) { 396 logger.info`No scrobble records found`; 397 return true; 398 } 399 400 const { count } = await ctx.db 401 .select({ 402 count: sql<number>`count(*)`, 403 }) 404 .from(schema.scrobbles) 405 .where(eq(schema.scrobbles.cid, records[0].cid)) 406 .execute() 407 .then((result) => result[0]); 408 409 return count === 0; 410}