A decentralized music tracking and discovery platform built on AT Protocol 馃幍 rocksky.app
spotify atproto lastfm musicbrainz scrobbling listenbrainz
at main 122 lines 3.2 kB view raw
1import chalk from "chalk"; 2import { consola } from "consola"; 3import { ctx } from "context"; 4import { eq, or } from "drizzle-orm"; 5import _ from "lodash"; 6import users, { type SelectUser } from "schema/users"; 7 8const args = process.argv.slice(2); 9const BATCH_SIZE = 100; // Process 100 users at a time 10 11async function processUser(user: SelectUser) { 12 if (!process.env.SKIP_AVATAR_UPDATE) { 13 const plc = await fetch(`https://plc.directory/${user.did}`).then((res) => 14 res.json(), 15 ); 16 17 const serviceEndpoint = _.get(plc, "service.0.serviceEndpoint"); 18 if (!serviceEndpoint) { 19 consola.info(`Service endpoint not found for ${user.did}`); 20 return; 21 } 22 23 const profile = await fetch( 24 `${serviceEndpoint}/xrpc/com.atproto.repo.getRecord?repo=${user.did}&collection=app.bsky.actor.profile&rkey=self`, 25 ).then((res) => res.json()); 26 const ref = _.get(profile, "value.avatar.ref.$link"); 27 const type = _.get(profile, "value.avatar.mimeType", "").split("/")[1]; 28 await ctx.db 29 .update(users) 30 .set({ 31 displayName: _.get(profile, "value.displayName"), 32 avatar: `https://cdn.bsky.app/img/avatar/plain/${user.did}/${ref}@${type}`, 33 }) 34 .where(eq(users.did, user.did)) 35 .execute(); 36 } else { 37 consola.info(`Skipping avatar update for ${user.did}`); 38 } 39 40 const [u] = await ctx.db 41 .select() 42 .from(users) 43 .where(eq(users.did, user.did)) 44 .limit(1) 45 .execute(); 46 47 const userPayload = { 48 xata_id: u.id, 49 did: u.did, 50 handle: u.handle, 51 display_name: u.displayName, 52 avatar: u.avatar, 53 xata_createdat: u.createdAt.toISOString(), 54 xata_updatedat: u.updatedAt.toISOString(), 55 xata_version: u.xataVersion, 56 }; 57 58 consola.info(userPayload); 59 ctx.nc.publish("rocksky.user", Buffer.from(JSON.stringify(userPayload))); 60} 61 62if (args.length > 0) { 63 for (const did of args) { 64 const [user] = await ctx.db 65 .select() 66 .from(users) 67 .where(or(eq(users.did, did), eq(users.handle, did))) 68 .limit(1) 69 .execute(); 70 if (!user) { 71 consola.info(`User ${did} not found`); 72 continue; 73 } 74 75 await processUser(user); 76 } 77} else { 78 let offset = 0; 79 let processedCount = 0; 80 81 consola.info("Processing all users..."); 82 83 while (true) { 84 const batch = await ctx.db 85 .select() 86 .from(users) 87 .limit(BATCH_SIZE) 88 .offset(offset) 89 .execute(); 90 91 if (batch.length === 0) { 92 break; // No more users to process 93 } 94 95 consola.info( 96 `Processing batch ${Math.floor(offset / BATCH_SIZE) + 1}, users ${offset + 1}-${offset + batch.length}`, 97 ); 98 99 for (const user of batch) { 100 try { 101 await processUser(user); 102 processedCount++; 103 } catch (error) { 104 consola.error(`Error processing user ${user.did}:`, error); 105 } 106 } 107 108 offset += BATCH_SIZE; 109 110 // Small delay between batches to avoid overwhelming the API 111 await new Promise((resolve) => setTimeout(resolve, 100)); 112 } 113 114 consola.info(`Processed ${chalk.greenBright(processedCount)} users total`); 115} 116 117// Ensure all messages are flushed before exiting 118await ctx.nc.flush(); 119 120consola.info("Done"); 121 122process.exit(0);