A decentralized music tracking and discovery platform built on AT Protocol 🎵

Add sync/jetstream locks and publish helpers

Add modules to create and clean up per-user temp lock files on exit
(cleanUpSyncLockOnExit, cleanUpJetstreamLockOnExit).
subscribeToJetstream now checks/creates a JetStream lock file and sync
writes a sync lock and registers cleanup handlers. Export createUser and
subscribeToJetstream for reuse.

Wire createUser/subscribeToJetstream into scrobble flow and add helper
functions to put song, artist and album records (with validation) and a
stub for scrobble records. Update matchTrack types: releaseDate is now a
string, add mbArtists and MusicBrainzArtist type. Remove some unused
imports.

+285 -5
+17 -3
apps/cli/src/cmd/sync.ts
··· 1 1 import { JetStreamClient, JetStreamEvent } from "jetstream"; 2 2 import { logger } from "logger"; 3 3 import { ctx } from "context"; 4 - import { isValidHandle } from "@atproto/syntax"; 5 4 import { Agent } from "@atproto/api"; 6 5 import { env } from "lib/env"; 7 6 import { createAgent } from "lib/agent"; ··· 20 19 import os from "node:os"; 21 20 import path from "node:path"; 22 21 import { getDidAndHandle } from "lib/getDidAndHandle"; 22 + import { cleanUpJetstreamLockOnExit } from "lib/cleanUpJetstreamLock"; 23 + import { cleanUpSyncLockOnExit } from "lib/cleanUpSyncLock"; 23 24 24 25 const PAGE_SIZE = 100; 25 26 ··· 35 36 const user = await createUser(agent, did, handle); 36 37 subscribeToJetstream(user); 37 38 39 + cleanUpJetstreamLockOnExit(user.did); 40 + 38 41 logger.info` DID: ${did}`; 39 42 logger.info` Handle: ${handle}`; 40 43 ··· 58 61 } 59 62 60 63 await fs.promises.writeFile(lockFilePath, ""); 64 + cleanUpSyncLockOnExit(user.did); 61 65 62 66 await createArtists(artists, user); 63 67 await createAlbums(albums, user); ··· 77 81 return `${endpoint}/subscribe`; 78 82 }; 79 83 80 - const createUser = async ( 84 + export const createUser = async ( 81 85 agent: Agent, 82 86 did: string, 83 87 handle: string, ··· 504 508 logger.info`🕒 ${totalScrobblesImported} scrobbles imported`; 505 509 }; 506 510 507 - const subscribeToJetstream = (user: SelectUser) => { 511 + export const subscribeToJetstream = (user: SelectUser) => { 512 + const lockFile = path.join(os.tmpdir(), `rocksky-jetstream-${user.did}.lock`); 513 + if (fs.existsSync(lockFile)) { 514 + logger.warn`JetStream subscription already in progress for user ${user.did}`; 515 + logger.warn`Skipping subscription`; 516 + logger.warn`Lock file exists at ${lockFile}`; 517 + return; 518 + } 519 + 520 + fs.writeFileSync(lockFile, ""); 521 + 508 522 const client = new JetStreamClient({ 509 523 wantedCollections: [ 510 524 "app.rocksky.scrobble",
+66
apps/cli/src/lib/cleanUpJetstreamLock.ts
··· 1 + import fs from "fs"; 2 + import path from "path"; 3 + import os from "os"; 4 + import { logger } from "logger"; 5 + 6 + export function cleanUpJetstreamLockOnExit(did: string) { 7 + process.on("exit", async () => { 8 + try { 9 + await fs.promises.unlink( 10 + path.join(os.tmpdir(), `rocksky-jetstream-${did}.lock`), 11 + ); 12 + process.exit(0); 13 + } catch (error) { 14 + logger.error`Error cleaning up Jetstream lock: ${error}`; 15 + process.exit(1); 16 + } 17 + }); 18 + 19 + process.on("SIGINT", async () => { 20 + try { 21 + await fs.promises.unlink( 22 + path.join(os.tmpdir(), `rocksky-jetstream-${did}.lock`), 23 + ); 24 + process.exit(0); 25 + } catch (error) { 26 + logger.error`Error cleaning up Jetstream lock: ${error}`; 27 + process.exit(1); 28 + } 29 + }); 30 + 31 + process.on("SIGTERM", async () => { 32 + try { 33 + await fs.promises.unlink( 34 + path.join(os.tmpdir(), `rocksky-jetstream-${did}.lock`), 35 + ); 36 + process.exit(0); 37 + } catch (error) { 38 + logger.error`Error cleaning up Jetstream lock: ${error}`; 39 + process.exit(1); 40 + } 41 + }); 42 + 43 + process.on("uncaughtException", async () => { 44 + try { 45 + await fs.promises.unlink( 46 + path.join(os.tmpdir(), `rocksky-jetstream-${did}.lock`), 47 + ); 48 + process.exit(1); 49 + } catch (error) { 50 + logger.error`Error cleaning up Jetstream lock: ${error}`; 51 + process.exit(1); 52 + } 53 + }); 54 + 55 + process.on("unhandledRejection", async () => { 56 + try { 57 + await fs.promises.unlink( 58 + path.join(os.tmpdir(), `rocksky-jetstream-${did}.lock`), 59 + ); 60 + process.exit(1); 61 + } catch (error) { 62 + logger.error`Error cleaning up Jetstream lock: ${error}`; 63 + process.exit(1); 64 + } 65 + }); 66 + }
+56
apps/cli/src/lib/cleanUpSyncLock.ts
··· 1 + import fs from "fs"; 2 + import path from "path"; 3 + import os from "os"; 4 + import { logger } from "logger"; 5 + 6 + export function cleanUpSyncLockOnExit(did: string) { 7 + process.on("exit", async () => { 8 + try { 9 + await fs.promises.unlink(path.join(os.tmpdir(), `rocksky-${did}.lock`)); 10 + process.exit(0); 11 + } catch (error) { 12 + logger.error`Error cleaning up Sync lock: ${error}`; 13 + process.exit(1); 14 + } 15 + }); 16 + 17 + process.on("SIGINT", async () => { 18 + try { 19 + await fs.promises.unlink(path.join(os.tmpdir(), `rocksky-${did}.lock`)); 20 + process.exit(0); 21 + } catch (error) { 22 + logger.error`Error cleaning up Sync lock: ${error}`; 23 + process.exit(1); 24 + } 25 + }); 26 + 27 + process.on("SIGTERM", async () => { 28 + try { 29 + await fs.promises.unlink(path.join(os.tmpdir(), `rocksky-${did}.lock`)); 30 + process.exit(0); 31 + } catch (error) { 32 + logger.error`Error cleaning up Sync lock: ${error}`; 33 + process.exit(1); 34 + } 35 + }); 36 + 37 + process.on("uncaughtException", async () => { 38 + try { 39 + await fs.promises.unlink(path.join(os.tmpdir(), `rocksky-${did}.lock`)); 40 + process.exit(1); 41 + } catch (error) { 42 + logger.error`Error cleaning up Sync lock: ${error}`; 43 + process.exit(1); 44 + } 45 + }); 46 + 47 + process.on("unhandledRejection", async () => { 48 + try { 49 + await fs.promises.unlink(path.join(os.tmpdir(), `rocksky-${did}.lock`)); 50 + process.exit(1); 51 + } catch (error) { 52 + logger.error`Error cleaning up Sync lock: ${error}`; 53 + process.exit(1); 54 + } 55 + }); 56 + }
+7 -1
apps/cli/src/lib/matchTrack.ts
··· 3 3 import { logger } from "logger"; 4 4 import { SelectTrack } from "schema/tracks"; 5 5 6 + export type MusicBrainzArtist = { 7 + mbid: string; 8 + name: string; 9 + }; 10 + 6 11 export type MatchTrackResult = SelectTrack & { 7 12 genres: string[] | null; 8 13 artistPicture: string | null; 9 - releaseDate: Date | null; 14 + releaseDate: string | null; 10 15 year: number | null; 16 + mbArtists: MusicBrainzArtist[] | null; 11 17 }; 12 18 13 19 export async function matchTrack(
+139 -1
apps/cli/src/scrobble.ts
··· 3 3 import dayjs from "dayjs"; 4 4 import { createAgent } from "lib/agent"; 5 5 import { getDidAndHandle } from "lib/getDidAndHandle"; 6 - import { Agent } from "node:http"; 7 6 import { ctx } from "context"; 8 7 import schema from "schema"; 9 8 import { and, eq, gte, lte, sql } from "drizzle-orm"; ··· 11 10 import path from "node:path"; 12 11 import fs from "node:fs"; 13 12 import chalk from "chalk"; 13 + import * as Album from "lexicon/types/app/rocksky/album"; 14 + import * as Artist from "lexicon/types/app/rocksky/artist"; 15 + import * as Scrobble from "lexicon/types/app/rocksky/scrobble"; 16 + import * as Song from "lexicon/types/app/rocksky/song"; 17 + import { TID } from "@atproto/common"; 18 + import { Agent } from "@atproto/api"; 19 + import { createUser, subscribeToJetstream } from "cmd/sync"; 14 20 15 21 export async function publishScrobble( 16 22 track: MatchTrackResult, ··· 19 25 const [did, handle] = await getDidAndHandle(); 20 26 const agent: Agent = await createAgent(did, handle); 21 27 const recentScrobble = await getRecentScrobble(did, track, timestamp); 28 + const user = await createUser(agent, did, handle); 29 + subscribeToJetstream(user); 22 30 23 31 const lockFilePath = path.join(os.tmpdir(), `rocksky-${did}.lock`); 24 32 ··· 40 48 } 41 49 42 50 logger.info`${handle} Publishing scrobble for ${track.title} by ${track.artist} at ${timestamp ? dayjs.unix(timestamp).format("YYYY-MM-DD HH:mm:ss") : dayjs().format("YYYY-MM-DD HH:mm:ss")}`; 51 + 52 + // putSongRecord 53 + // putArtistRecord 54 + // putAlbumRecord 55 + // putScrobbleRecord 43 56 44 57 return true; 45 58 } ··· 86 99 .where(eq(schema.users.did, did)) 87 100 .then((rows) => rows[0].count); 88 101 } 102 + 103 + async function putSongRecord(agent: Agent, track: MatchTrackResult) { 104 + const rkey = TID.nextStr(); 105 + 106 + const record: Song.Record = { 107 + $type: "app.rocksky.song", 108 + title: track.title, 109 + artist: track.artist, 110 + artists: track.mbArtists === null ? undefined : track.mbArtists, 111 + album: track.album, 112 + albumArtist: track.albumArtist, 113 + duration: track.duration, 114 + releaseDate: track.releaseDate 115 + ? new Date(track.releaseDate).toISOString() 116 + : undefined, 117 + year: track.year === null ? undefined : track.year, 118 + albumArtUrl: track.albumArt, 119 + composer: track.composer ? track.composer : undefined, 120 + lyrics: track.lyrics ? track.lyrics : undefined, 121 + trackNumber: track.trackNumber, 122 + discNumber: track.discNumber === 0 ? 1 : track.discNumber, 123 + copyrightMessage: track.copyrightMessage 124 + ? track.copyrightMessage 125 + : undefined, 126 + createdAt: new Date().toISOString(), 127 + spotifyLink: track.spotifyLink ? track.spotifyLink : undefined, 128 + tags: track.genres || [], 129 + mbid: track.mbId, 130 + }; 131 + 132 + if (!Song.validateRecord(record).success) { 133 + logger.info`${Song.validateRecord(record)}`; 134 + logger.info`${record}`; 135 + throw new Error("Invalid Song record"); 136 + } 137 + 138 + try { 139 + const res = await agent.com.atproto.repo.putRecord({ 140 + repo: agent.assertDid, 141 + collection: "app.rocksky.song", 142 + rkey, 143 + record, 144 + validate: false, 145 + }); 146 + const uri = res.data.uri; 147 + logger.info`Song record created at ${uri}`; 148 + return uri; 149 + } catch (e) { 150 + logger.error`Error creating song record: ${e}`; 151 + return null; 152 + } 153 + } 154 + 155 + async function putArtistRecord(agent: Agent, track: MatchTrackResult) { 156 + const rkey = TID.nextStr(); 157 + const record: Artist.Record = { 158 + $type: "app.rocksky.artist", 159 + name: track.albumArtist, 160 + createdAt: new Date().toISOString(), 161 + pictureUrl: track.artistPicture || undefined, 162 + tags: track.genres || [], 163 + }; 164 + 165 + if (!Artist.validateRecord(record).success) { 166 + logger.info`${Artist.validateRecord(record)}`; 167 + logger.info`${JSON.stringify(record, null, 2)}`; 168 + throw new Error("Invalid Artist record"); 169 + } 170 + 171 + try { 172 + const res = await agent.com.atproto.repo.putRecord({ 173 + repo: agent.assertDid, 174 + collection: "app.rocksky.artist", 175 + rkey, 176 + record, 177 + validate: false, 178 + }); 179 + const uri = res.data.uri; 180 + console.log(`Artist record created at ${uri}`); 181 + return uri; 182 + } catch (e) { 183 + console.error("Error creating artist record", e); 184 + return null; 185 + } 186 + } 187 + 188 + async function putAlbumRecord(agent: Agent, track: MatchTrackResult) { 189 + const rkey = TID.nextStr(); 190 + 191 + const record = { 192 + $type: "app.rocksky.album", 193 + title: track.album, 194 + artist: track.albumArtist, 195 + year: track.year === null ? undefined : track.year, 196 + releaseDate: track.releaseDate 197 + ? new Date(track.releaseDate).toISOString() 198 + : undefined, 199 + createdAt: new Date().toISOString(), 200 + albumArtUrl: track.albumArt, 201 + }; 202 + 203 + if (!Album.validateRecord(record).success) { 204 + logger.info`${Album.validateRecord(record)}`; 205 + logger.info`${record}`; 206 + throw new Error("Invalid Album record"); 207 + } 208 + 209 + try { 210 + const res = await agent.com.atproto.repo.putRecord({ 211 + repo: agent.assertDid, 212 + collection: "app.rocksky.album", 213 + rkey, 214 + record, 215 + validate: false, 216 + }); 217 + const uri = res.data.uri; 218 + logger.info`Album record created at ${uri}`; 219 + return uri; 220 + } catch (e) { 221 + logger.error`Error creating album record: ${e}`; 222 + return null; 223 + } 224 + } 225 + 226 + async function putScrobbleRecord(agent: Agent, track: MatchTrackResult) {}