forked from
rocksky.app/rocksky
A decentralized music tracking and discovery platform built on AT Protocol 馃幍
1import chalk from "chalk";
2import { ctx } from "context";
3import { eq, or } from "drizzle-orm";
4import _ from "lodash";
5import users, { type SelectUser } from "schema/users";
6
7const args = process.argv.slice(2);
8const BATCH_SIZE = 100; // Process 100 users at a time
9
10async function processUser(user: SelectUser) {
11 if (!process.env.SKIP_AVATAR_UPDATE) {
12 const plc = await fetch(`https://plc.directory/${user.did}`).then((res) =>
13 res.json()
14 );
15
16 const serviceEndpoint = _.get(plc, "service.0.serviceEndpoint");
17 if (!serviceEndpoint) {
18 console.log(`Service endpoint not found for ${user.did}`);
19 return;
20 }
21
22 const profile = await fetch(
23 `${serviceEndpoint}/xrpc/com.atproto.repo.getRecord?repo=${user.did}&collection=app.bsky.actor.profile&rkey=self`
24 ).then((res) => res.json());
25 const ref = _.get(profile, "value.avatar.ref.$link");
26 const type = _.get(profile, "value.avatar.mimeType", "").split("/")[1];
27 await ctx.db
28 .update(users)
29 .set({
30 displayName: _.get(profile, "value.displayName"),
31 avatar: `https://cdn.bsky.app/img/avatar/plain/${user.did}/${ref}@${type}`,
32 })
33 .where(eq(users.did, user.did))
34 .execute();
35 } else {
36 console.log(`Skipping avatar update for ${user.did}`);
37 }
38
39 const [u] = await ctx.db
40 .select()
41 .from(users)
42 .where(eq(users.did, user.did))
43 .limit(1)
44 .execute();
45
46 const userPayload = {
47 xata_id: u.id,
48 did: u.did,
49 handle: u.handle,
50 display_name: u.displayName,
51 avatar: u.avatar,
52 xata_createdat: u.createdAt.toISOString(),
53 xata_updatedat: u.updatedAt.toISOString(),
54 xata_version: u.xataVersion,
55 };
56
57 console.log(userPayload);
58 await ctx.nc.publish(
59 "rocksky.user",
60 Buffer.from(JSON.stringify(userPayload))
61 );
62}
63
64if (args.length > 0) {
65 for (const did of args) {
66 const [user] = await ctx.db
67 .select()
68 .from(users)
69 .where(or(eq(users.did, did), eq(users.handle, did)))
70 .limit(1)
71 .execute();
72 if (!user) {
73 console.log(`User ${did} not found`);
74 continue;
75 }
76
77 await processUser(user);
78 }
79} else {
80 let offset = 0;
81 let processedCount = 0;
82
83 console.log("Processing all users...");
84
85 while (true) {
86 const batch = await ctx.db
87 .select()
88 .from(users)
89 .limit(BATCH_SIZE)
90 .offset(offset)
91 .execute();
92
93 if (batch.length === 0) {
94 break; // No more users to process
95 }
96
97 console.log(
98 `Processing batch ${Math.floor(offset / BATCH_SIZE) + 1}, users ${offset + 1}-${offset + batch.length}`
99 );
100
101 for (const user of batch) {
102 try {
103 await processUser(user);
104 processedCount++;
105 } catch (error) {
106 console.error(`Error processing user ${user.did}:`, error);
107 }
108 }
109
110 offset += BATCH_SIZE;
111
112 // Small delay between batches to avoid overwhelming the API
113 await new Promise((resolve) => setTimeout(resolve, 100));
114 }
115
116 console.log(`Processed ${chalk.greenBright(processedCount)} users total`);
117}
118
119// Ensure all messages are flushed before exiting
120await ctx.nc.flush();
121
122console.log("Done");
123
124process.exit(0);