this repo has no description
at main 359 lines 10 kB view raw
1import { simpleFetchHandler, XRPC } from "@atcute/client"; 2import "@atcute/bluesky/lexicons"; 3import type { 4 AppBskyActorDefs, 5 AppBskyActorProfile, 6 AppBskyFeedPost, 7 At, 8 ComAtprotoRepoListRecords, 9} from "@atcute/client/lexicons"; 10import { 11 CompositeDidDocumentResolver, 12 PlcDidDocumentResolver, 13 WebDidDocumentResolver, 14} from "@atcute/identity-resolver"; 15import { Config } from "../../config"; 16import { Mutex } from "mutex-ts" 17// import { ComAtprotoRepoListRecords.Record } from "@atcute/client/lexicons"; 18// import { AppBskyFeedPost } from "@atcute/client/lexicons"; 19// import { AppBskyActorDefs } from "@atcute/client/lexicons"; 20 21interface AccountMetadata { 22 did: At.Did; 23 displayName: string; 24 handle: string; 25 avatarCid: string | null; 26 currentCursor?: string; 27} 28 29let accountsMetadata: AccountMetadata[] = []; 30 31interface atUriObject { 32 repo: string; 33 collection: string; 34 rkey: string; 35} 36class Post { 37 authorDid: string; 38 authorAvatarCid: string | null; 39 postCid: string; 40 recordName: string; 41 authorHandle: string; 42 displayName: string; 43 text: string; 44 timestamp: number; 45 timenotstamp: string; 46 quotingUri: atUriObject | null; 47 replyingUri: atUriObject | null; 48 imagesCid: string[] | null; 49 videosLinkCid: string | null; 50 gifLink: string | null; 51 52 constructor( 53 record: ComAtprotoRepoListRecords.Record, 54 account: AccountMetadata, 55 ) { 56 this.postCid = record.cid; 57 this.recordName = processAtUri(record.uri).rkey; 58 this.authorDid = account.did; 59 this.authorAvatarCid = account.avatarCid; 60 this.authorHandle = account.handle; 61 this.displayName = account.displayName; 62 const post = record.value as AppBskyFeedPost.Record; 63 this.timenotstamp = post.createdAt; 64 this.text = post.text; 65 this.timestamp = Date.parse(post.createdAt); 66 if (post.reply) { 67 this.replyingUri = processAtUri(post.reply.parent.uri); 68 } else { 69 this.replyingUri = null; 70 } 71 this.quotingUri = null; 72 this.imagesCid = null; 73 this.videosLinkCid = null; 74 this.gifLink = null; 75 switch (post.embed?.$type) { 76 case "app.bsky.embed.images": 77 this.imagesCid = post.embed.images.map( 78 (imageRecord: any) => imageRecord.image.ref.$link, 79 ); 80 break; 81 case "app.bsky.embed.video": 82 this.videosLinkCid = post.embed.video.ref.$link; 83 break; 84 case "app.bsky.embed.record": 85 this.quotingUri = processAtUri(post.embed.record.uri); 86 break; 87 case "app.bsky.embed.recordWithMedia": 88 this.quotingUri = processAtUri(post.embed.record.record.uri); 89 switch (post.embed.media.$type) { 90 case "app.bsky.embed.images": 91 this.imagesCid = post.embed.media.images.map( 92 (imageRecord) => imageRecord.image.ref.$link, 93 ); 94 95 break; 96 case "app.bsky.embed.video": 97 this.videosLinkCid = post.embed.media.video.ref.$link; 98 99 break; 100 } 101 break; 102 case "app.bsky.embed.external": // assuming that external embeds are gifs for now 103 if (post.embed.external.uri.includes(".gif")) { 104 this.gifLink = post.embed.external.uri; 105 } 106 break; 107 } 108 } 109} 110 111const processAtUri = (aturi: string): atUriObject => { 112 const parts = aturi.split("/"); 113 return { 114 repo: parts[2], 115 collection: parts[3], 116 rkey: parts[4], 117 }; 118}; 119 120const rpc = new XRPC({ 121 handler: simpleFetchHandler({ 122 service: Config.PDS_URL, 123 }), 124}); 125 126const getDidsFromPDS = async (): Promise<At.Did[]> => { 127 const { data } = await rpc.get("com.atproto.sync.listRepos", { 128 params: {}, 129 }); 130 return data.repos.map((repo: any) => repo.did) as At.Did[]; 131}; 132const getAccountMetadata = async ( 133 did: `did:${string}:${string}`, 134) => { 135 const account: AccountMetadata = { 136 did: did, 137 handle: "", // Guaranteed to be filled out later 138 displayName: "", 139 avatarCid: null, 140 }; 141 142 try { 143 const { data } = await rpc.get("com.atproto.repo.getRecord", { 144 params: { 145 repo: did, 146 collection: "app.bsky.actor.profile", 147 rkey: "self", 148 }, 149 }); 150 const value = data.value as AppBskyActorProfile.Record; 151 account.displayName = value.displayName || ""; 152 if (value.avatar) { 153 account.avatarCid = value.avatar.ref["$link"]; 154 } 155 } catch (e) { 156 console.warn(`Error fetching profile for ${did}:`, e); 157 } 158 159 try { 160 account.handle = await blueskyHandleFromDid(did); 161 } catch (e) { 162 console.error(`Error fetching handle for ${did}:`, e); 163 return null; 164 } 165 166 return account; 167}; 168 169const getAllMetadataFromPds = async (): Promise<AccountMetadata[]> => { 170 const dids = await getDidsFromPDS(); 171 const metadata = await Promise.all( 172 dids.map(async (repo: `did:${string}:${string}`) => { 173 return await getAccountMetadata(repo); 174 }), 175 ); 176 return metadata.filter((account) => account !== null) as AccountMetadata[]; 177}; 178 179const identityResolve = async (did: At.Did) => { 180 const resolver = new CompositeDidDocumentResolver({ 181 methods: { 182 plc: new PlcDidDocumentResolver(), 183 web: new WebDidDocumentResolver(), 184 }, 185 }); 186 187 if (did.startsWith("did:plc:") || did.startsWith("did:web:")) { 188 const doc = await resolver.resolve( 189 did as `did:plc:${string}` | `did:web:${string}`, 190 ); 191 return doc; 192 } else { 193 throw new Error(`Unsupported DID type: ${did}`); 194 } 195}; 196 197const blueskyHandleFromDid = async (did: At.Did) => { 198 const doc = await identityResolve(did); 199 if (doc.alsoKnownAs) { 200 const handleAtUri = doc.alsoKnownAs.find((url) => url.startsWith("at://")); 201 const handle = handleAtUri?.split("/")[2]; 202 if (!handle) { 203 return "Handle not found"; 204 } else { 205 return handle; 206 } 207 } else { 208 return "Handle not found"; 209 } 210}; 211 212interface PostsAcc { 213 posts: ComAtprotoRepoListRecords.Record[]; 214 account: AccountMetadata; 215} 216const getCutoffDate = (postAccounts: PostsAcc[]) => { 217 const now = Date.now(); 218 let cutoffDate: Date | null = null; 219 postAccounts.forEach((postAcc) => { 220 const latestPost = new Date( 221 (postAcc.posts[postAcc.posts.length - 1].value as AppBskyFeedPost.Record) 222 .createdAt, 223 ); 224 if (!cutoffDate) { 225 cutoffDate = latestPost; 226 } else { 227 if (latestPost > cutoffDate) { 228 cutoffDate = latestPost; 229 } 230 } 231 }); 232 if (cutoffDate) { 233 return cutoffDate; 234 } else { 235 return new Date(now); 236 } 237}; 238 239const filterPostsByDate = (posts: PostsAcc[], cutoffDate: Date) => { 240 // filter posts for each account that are older than the cutoff date and save the cursor of the last post included 241 const filteredPosts: PostsAcc[] = posts.map((postAcc) => { 242 const filtered = postAcc.posts.filter((post) => { 243 const postDate = new Date( 244 (post.value as AppBskyFeedPost.Record).createdAt, 245 ); 246 return postDate >= cutoffDate; 247 }); 248 if (filtered.length > 0) { 249 postAcc.account.currentCursor = processAtUri(filtered[filtered.length - 1].uri).rkey; 250 } 251 return { 252 posts: filtered, 253 account: postAcc.account, 254 }; 255 }); 256 return filteredPosts; 257}; 258 259const postsMutex = new Mutex(); 260// nightmare function. However it works so I am not touching it 261const getNextPosts = async () => { 262 const release = await postsMutex.obtain(); 263 if (!accountsMetadata.length) { 264 accountsMetadata = await getAllMetadataFromPds(); 265 } 266 267 const postsAcc: PostsAcc[] = await Promise.all( 268 accountsMetadata.map(async (account) => { 269 const posts = await fetchPostsForUser( 270 account.did, 271 account.currentCursor || null, 272 ); 273 if (posts) { 274 return { 275 posts: posts, 276 account: account, 277 }; 278 } else { 279 return { 280 posts: [], 281 account: account, 282 }; 283 } 284 }), 285 ); 286 const recordsFiltered = postsAcc.filter((postAcc) => 287 postAcc.posts.length > 0 288 ); 289 const cutoffDate = getCutoffDate(recordsFiltered); 290 const recordsCutoff = filterPostsByDate(recordsFiltered, cutoffDate); 291 // update the accountMetadata with the new cursor 292 accountsMetadata = accountsMetadata.map((account) => { 293 const postAcc = recordsCutoff.find( 294 (postAcc) => postAcc.account.did == account.did, 295 ); 296 if (postAcc) { 297 account.currentCursor = postAcc.account.currentCursor; 298 } 299 return account; 300 } 301 ); 302 // throw the records in a big single array 303 let records = recordsCutoff.flatMap((postAcc) => postAcc.posts); 304 // sort the records by timestamp 305 records = records.sort((a, b) => { 306 const aDate = new Date( 307 (a.value as AppBskyFeedPost.Record).createdAt, 308 ).getTime(); 309 const bDate = new Date( 310 (b.value as AppBskyFeedPost.Record).createdAt, 311 ).getTime(); 312 return bDate - aDate; 313 }); 314 // filter out posts that are in the future 315 if (!Config.SHOW_FUTURE_POSTS) { 316 const now = Date.now(); 317 records = records.filter((post) => { 318 const postDate = new Date( 319 (post.value as AppBskyFeedPost.Record).createdAt, 320 ).getTime(); 321 return postDate <= now; 322 }); 323 } 324 325 const newPosts = records.map((record) => { 326 const account = accountsMetadata.find( 327 (account) => account.did == processAtUri(record.uri).repo, 328 ); 329 if (!account) { 330 throw new Error( 331 `Account with DID ${processAtUri(record.uri).repo} not found`, 332 ); 333 } 334 return new Post(record, account); 335 }); 336 // release the mutex 337 release(); 338 return newPosts; 339}; 340 341const fetchPostsForUser = async (did: At.Did, cursor: string | null) => { 342 try { 343 const { data } = await rpc.get("com.atproto.repo.listRecords", { 344 params: { 345 repo: did as At.Identifier, 346 collection: "app.bsky.feed.post", 347 limit: Config.MAX_POSTS, 348 cursor: cursor || undefined, 349 }, 350 }); 351 return data.records as ComAtprotoRepoListRecords.Record[]; 352 } catch (e) { 353 console.error(`Error fetching posts for ${did}:`, e); 354 return null; 355 } 356}; 357 358export { getAllMetadataFromPds, getNextPosts, Post }; 359export type { AccountMetadata };