A decentralized music tracking and discovery platform built on AT Protocol 馃幍
at 2b044aaed2c433aca9688cbd767408dc80f1fc4f 124 lines 3.2 kB view raw
1import chalk from "chalk"; 2import { ctx } from "context"; 3import { eq, or } from "drizzle-orm"; 4import _ from "lodash"; 5import users, { type SelectUser } from "schema/users"; 6 7const args = process.argv.slice(2); 8const BATCH_SIZE = 100; // Process 100 users at a time 9 10async function processUser(user: SelectUser) { 11 if (!process.env.SKIP_AVATAR_UPDATE) { 12 const plc = await fetch(`https://plc.directory/${user.did}`).then((res) => 13 res.json() 14 ); 15 16 const serviceEndpoint = _.get(plc, "service.0.serviceEndpoint"); 17 if (!serviceEndpoint) { 18 console.log(`Service endpoint not found for ${user.did}`); 19 return; 20 } 21 22 const profile = await fetch( 23 `${serviceEndpoint}/xrpc/com.atproto.repo.getRecord?repo=${user.did}&collection=app.bsky.actor.profile&rkey=self` 24 ).then((res) => res.json()); 25 const ref = _.get(profile, "value.avatar.ref.$link"); 26 const type = _.get(profile, "value.avatar.mimeType", "").split("/")[1]; 27 await ctx.db 28 .update(users) 29 .set({ 30 displayName: _.get(profile, "value.displayName"), 31 avatar: `https://cdn.bsky.app/img/avatar/plain/${user.did}/${ref}@${type}`, 32 }) 33 .where(eq(users.did, user.did)) 34 .execute(); 35 } else { 36 console.log(`Skipping avatar update for ${user.did}`); 37 } 38 39 const [u] = await ctx.db 40 .select() 41 .from(users) 42 .where(eq(users.did, user.did)) 43 .limit(1) 44 .execute(); 45 46 const userPayload = { 47 xata_id: u.id, 48 did: u.did, 49 handle: u.handle, 50 display_name: u.displayName, 51 avatar: u.avatar, 52 xata_createdat: u.createdAt.toISOString(), 53 xata_updatedat: u.updatedAt.toISOString(), 54 xata_version: u.xataVersion, 55 }; 56 57 console.log(userPayload); 58 await ctx.nc.publish( 59 "rocksky.user", 60 Buffer.from(JSON.stringify(userPayload)) 61 ); 62} 63 64if (args.length > 0) { 65 for (const did of args) { 66 const [user] = await ctx.db 67 .select() 68 .from(users) 69 .where(or(eq(users.did, did), eq(users.handle, did))) 70 .limit(1) 71 .execute(); 72 if (!user) { 73 console.log(`User ${did} not found`); 74 continue; 75 } 76 77 await processUser(user); 78 } 79} else { 80 let offset = 0; 81 let processedCount = 0; 82 83 console.log("Processing all users..."); 84 85 while (true) { 86 const batch = await ctx.db 87 .select() 88 .from(users) 89 .limit(BATCH_SIZE) 90 .offset(offset) 91 .execute(); 92 93 if (batch.length === 0) { 94 break; // No more users to process 95 } 96 97 console.log( 98 `Processing batch ${Math.floor(offset / BATCH_SIZE) + 1}, users ${offset + 1}-${offset + batch.length}` 99 ); 100 101 for (const user of batch) { 102 try { 103 await processUser(user); 104 processedCount++; 105 } catch (error) { 106 console.error(`Error processing user ${user.did}:`, error); 107 } 108 } 109 110 offset += BATCH_SIZE; 111 112 // Small delay between batches to avoid overwhelming the API 113 await new Promise((resolve) => setTimeout(resolve, 100)); 114 } 115 116 console.log(`Processed ${chalk.greenBright(processedCount)} users total`); 117} 118 119// Ensure all messages are flushed before exiting 120await ctx.nc.flush(); 121 122console.log("Done"); 123 124process.exit(0);