A decentralized music tracking and discovery platform built on AT Protocol 馃幍
rocksky.app
spotify
atproto
lastfm
musicbrainz
scrobbling
listenbrainz
1import chalk from "chalk";
2import { consola } from "consola";
3import { ctx } from "context";
4import { eq, or } from "drizzle-orm";
5import _ from "lodash";
6import users, { type SelectUser } from "schema/users";
7
8const args = process.argv.slice(2);
9const BATCH_SIZE = 100; // Process 100 users at a time
10
11async function processUser(user: SelectUser) {
12 if (!process.env.SKIP_AVATAR_UPDATE) {
13 const plc = await fetch(`https://plc.directory/${user.did}`).then((res) =>
14 res.json(),
15 );
16
17 const serviceEndpoint = _.get(plc, "service.0.serviceEndpoint");
18 if (!serviceEndpoint) {
19 consola.info(`Service endpoint not found for ${user.did}`);
20 return;
21 }
22
23 const profile = await fetch(
24 `${serviceEndpoint}/xrpc/com.atproto.repo.getRecord?repo=${user.did}&collection=app.bsky.actor.profile&rkey=self`,
25 ).then((res) => res.json());
26 const ref = _.get(profile, "value.avatar.ref.$link");
27 const type = _.get(profile, "value.avatar.mimeType", "").split("/")[1];
28 await ctx.db
29 .update(users)
30 .set({
31 displayName: _.get(profile, "value.displayName"),
32 avatar: `https://cdn.bsky.app/img/avatar/plain/${user.did}/${ref}@${type}`,
33 })
34 .where(eq(users.did, user.did))
35 .execute();
36 } else {
37 consola.info(`Skipping avatar update for ${user.did}`);
38 }
39
40 const [u] = await ctx.db
41 .select()
42 .from(users)
43 .where(eq(users.did, user.did))
44 .limit(1)
45 .execute();
46
47 const userPayload = {
48 xata_id: u.id,
49 did: u.did,
50 handle: u.handle,
51 display_name: u.displayName,
52 avatar: u.avatar,
53 xata_createdat: u.createdAt.toISOString(),
54 xata_updatedat: u.updatedAt.toISOString(),
55 xata_version: u.xataVersion,
56 };
57
58 consola.info(userPayload);
59 ctx.nc.publish("rocksky.user", Buffer.from(JSON.stringify(userPayload)));
60}
61
62if (args.length > 0) {
63 for (const did of args) {
64 const [user] = await ctx.db
65 .select()
66 .from(users)
67 .where(or(eq(users.did, did), eq(users.handle, did)))
68 .limit(1)
69 .execute();
70 if (!user) {
71 consola.info(`User ${did} not found`);
72 continue;
73 }
74
75 await processUser(user);
76 }
77} else {
78 let offset = 0;
79 let processedCount = 0;
80
81 consola.info("Processing all users...");
82
83 while (true) {
84 const batch = await ctx.db
85 .select()
86 .from(users)
87 .limit(BATCH_SIZE)
88 .offset(offset)
89 .execute();
90
91 if (batch.length === 0) {
92 break; // No more users to process
93 }
94
95 consola.info(
96 `Processing batch ${Math.floor(offset / BATCH_SIZE) + 1}, users ${offset + 1}-${offset + batch.length}`,
97 );
98
99 for (const user of batch) {
100 try {
101 await processUser(user);
102 processedCount++;
103 } catch (error) {
104 consola.error(`Error processing user ${user.did}:`, error);
105 }
106 }
107
108 offset += BATCH_SIZE;
109
110 // Small delay between batches to avoid overwhelming the API
111 await new Promise((resolve) => setTimeout(resolve, 100));
112 }
113
114 consola.info(`Processed ${chalk.greenBright(processedCount)} users total`);
115}
116
117// Ensure all messages are flushed before exiting
118await ctx.nc.flush();
119
120consola.info("Done");
121
122process.exit(0);