A decentralized music tracking and discovery platform built on AT Protocol ๐ŸŽต

Await JetStream subscription and cleanup lock

Make subscribeToJetstream return Promise<void> and await it where used
(sync and scrobble). Move client event handlers inside the Promise and
resolve on "open". Return early Promise.resolve() if lock file exists.
Call client.connect() from within the Promise.

Remove the JetStream lock file after scrobble dry-run and after
successful publish. Also explicitly exit with code 0 in scrobble
command.

+45 -31
+2
apps/cli/src/cmd/scrobble.ts
··· 12 12 if (!success) { 13 13 process.exit(1); 14 14 } 15 + 16 + process.exit(0); 15 17 }
+33 -30
apps/cli/src/cmd/sync.ts
··· 34 34 const agent: Agent = await createAgent(did, handle); 35 35 36 36 const user = await createUser(agent, did, handle); 37 - subscribeToJetstream(user); 37 + await subscribeToJetstream(user); 38 38 39 39 cleanUpJetstreamLockOnExit(user.did); 40 40 ··· 508 508 logger.info`๐Ÿ•’ ${totalScrobblesImported} scrobbles imported`; 509 509 }; 510 510 511 - export const subscribeToJetstream = (user: SelectUser) => { 511 + export const subscribeToJetstream = (user: SelectUser): Promise<void> => { 512 512 const lockFile = path.join(os.tmpdir(), `rocksky-jetstream-${user.did}.lock`); 513 513 if (fs.existsSync(lockFile)) { 514 514 logger.warn`JetStream subscription already in progress for user ${user.did}`; 515 515 logger.warn`Skipping subscription`; 516 516 logger.warn`Lock file exists at ${lockFile}`; 517 - return; 517 + return Promise.resolve(); 518 518 } 519 519 520 520 fs.writeFileSync(lockFile, ""); ··· 539 539 debug: true, 540 540 }); 541 541 542 - client.on("open", () => { 543 - logger.info`โœ… Connected to JetStream!`; 544 - }); 542 + return new Promise((resolve) => { 543 + client.on("open", () => { 544 + logger.info`โœ… Connected to JetStream!`; 545 + resolve(); 546 + }); 545 547 546 - client.on("message", async (data) => { 547 - const event = data as JetStreamEvent; 548 + client.on("message", async (data) => { 549 + const event = data as JetStreamEvent; 550 + 551 + if (event.kind === "commit" && event.commit) { 552 + const { operation, collection, record, rkey, cid } = event.commit; 553 + const uri = `at://${event.did}/${collection}/${rkey}`; 548 554 549 - if (event.kind === "commit" && event.commit) { 550 - const { operation, collection, record, rkey, cid } = event.commit; 551 - const uri = `at://${event.did}/${collection}/${rkey}`; 555 + logger.info`\n๐Ÿ“ก New event:`; 556 + logger.info` Operation: ${operation}`; 557 + logger.info` Collection: ${collection}`; 558 + logger.info` DID: ${event.did}`; 559 + logger.info` Uri: ${uri}`; 552 560 553 - logger.info`\n๐Ÿ“ก New event:`; 554 - logger.info` Operation: ${operation}`; 555 - logger.info` Collection: ${collection}`; 556 - logger.info` DID: ${event.did}`; 557 - logger.info` Uri: ${uri}`; 561 + if (operation === "create" && record) { 562 + console.log(JSON.stringify(record, null, 2)); 563 + await onNewCollection(record, cid, uri, user); 564 + } 558 565 559 - if (operation === "create" && record) { 560 - console.log(JSON.stringify(record, null, 2)); 561 - await onNewCollection(record, cid, uri, user); 566 + logger.info` Cursor: ${event.time_us}`; 562 567 } 568 + }); 563 569 564 - logger.info` Cursor: ${event.time_us}`; 565 - } 566 - }); 570 + client.on("error", (error) => { 571 + logger.error`โŒ Error: ${error}`; 572 + }); 567 573 568 - client.on("error", (error) => { 569 - logger.error`โŒ Error: ${error}`; 570 - }); 574 + client.on("reconnect", (data) => { 575 + const { attempt } = data as { attempt: number }; 576 + logger.info`๐Ÿ”„ Reconnecting... (attempt ${attempt})`; 577 + }); 571 578 572 - client.on("reconnect", (data) => { 573 - const { attempt } = data as { attempt: number }; 574 - logger.info`๐Ÿ”„ Reconnecting... (attempt ${attempt})`; 579 + client.connect(); 575 580 }); 576 - 577 - client.connect(); 578 581 }; 579 582 580 583 const onNewCollection = async (
+10 -1
apps/cli/src/scrobble.ts
··· 27 27 const agent: Agent = await createAgent(did, handle); 28 28 const recentScrobble = await getRecentScrobble(did, track, timestamp); 29 29 const user = await createUser(agent, did, handle); 30 - subscribeToJetstream(user); 30 + await subscribeToJetstream(user); 31 31 32 32 const lockFilePath = path.join(os.tmpdir(), `rocksky-${did}.lock`); 33 33 ··· 52 52 53 53 if (dryRun) { 54 54 logger.info`${handle} Dry run: Skipping publishing scrobble for ${track.title} by ${track.artist} at ${timestamp ? dayjs.unix(timestamp).format("YYYY-MM-DD HH:mm:ss") : dayjs().format("YYYY-MM-DD HH:mm:ss")}`; 55 + 56 + await fs.promises.unlink( 57 + path.join(os.tmpdir(), `rocksky-jetstream-${did}.lock`), 58 + ); 59 + 55 60 return true; 56 61 } 57 62 ··· 117 122 } 118 123 119 124 await putScrobbleRecord(agent, track, timestamp); 125 + 126 + await fs.promises.unlink( 127 + path.join(os.tmpdir(), `rocksky-jetstream-${did}.lock`), 128 + ); 120 129 121 130 return true; 122 131 }