A fork of pds-dash-fork for arabica.systems
at main 479 lines 13 kB view raw
1import { simpleFetchHandler, XRPC } from "@atcute/client"; 2import "@atcute/bluesky/lexicons"; 3import type { 4 AppBskyActorProfile, 5 AppBskyEmbedImages, 6 AppBskyFeedPost, 7 At, 8 ComAtprotoRepoListRecords, 9} from "@atcute/client/lexicons"; 10import { 11 CompositeDidDocumentResolver, 12 PlcDidDocumentResolver, 13 WebDidDocumentResolver, 14} from "@atcute/identity-resolver"; 15import { Config } from "../../config.ts"; 16import { Mutex } from "mutex-ts"; 17import moment from "moment"; 18import { RichText } from "@atproto/api"; 19 20// import { ComAtprotoRepoListRecords.Record } from "@atcute/client/lexicons"; 21// import { AppBskyFeedPost } from "@atcute/client/lexicons"; 22// import { AppBskyActorDefs } from "@atcute/client/lexicons"; 23 24interface AccountMetadata { 25 did: At.Did; 26 displayName: string; 27 handle: string; 28 avatarCid: string | null; 29 currentCursor?: string; 30} 31 32let accountsMetadata: AccountMetadata[] = []; 33 34interface atUriObject { 35 repo: string; 36 collection: string; 37 rkey: string; 38} 39class Post { 40 authorDid: string; 41 authorAvatarCid: string | null; 42 postCid: string; 43 recordName: string; 44 authorHandle: string; 45 displayName: string; 46 text: string; 47 timestamp: number; 48 timenotstamp: string; 49 quotingUri: atUriObject | null; 50 replyingUri: atUriObject | null; 51 imagesCid: string[] | null; 52 videosLinkCid: string | null; 53 gifLink: string | null; 54 richText: RichText; 55 56 constructor( 57 record: ComAtprotoRepoListRecords.Record, 58 account: AccountMetadata, 59 richText: RichText, 60 ) { 61 this.richText = richText; 62 this.postCid = record.cid; 63 this.recordName = processAtUri(record.uri).rkey; 64 this.authorDid = account.did; 65 this.authorAvatarCid = account.avatarCid; 66 this.authorHandle = account.handle; 67 this.displayName = account.displayName; 68 const post = record.value as AppBskyFeedPost.Record; 69 this.timenotstamp = post.createdAt; 70 this.text = post.text; 71 this.timestamp = Date.parse(post.createdAt); 72 if (post.reply) { 73 this.replyingUri = processAtUri(post.reply.parent.uri); 74 } else { 75 this.replyingUri = null; 76 } 77 this.quotingUri = null; 78 this.imagesCid = null; 79 this.videosLinkCid = null; 80 this.gifLink = null; 81 switch (post.embed?.$type) { 82 case "app.bsky.embed.images": 83 this.imagesCid = post.embed.images.map( 84 (imageRecord: AppBskyEmbedImages.Image) => 85 imageRecord.image.ref.$link, 86 ); 87 break; 88 case "app.bsky.embed.video": 89 this.videosLinkCid = post.embed.video.ref.$link; 90 break; 91 case "app.bsky.embed.record": 92 this.quotingUri = processAtUri(post.embed.record.uri); 93 break; 94 case "app.bsky.embed.recordWithMedia": 95 this.quotingUri = processAtUri(post.embed.record.record.uri); 96 switch (post.embed.media.$type) { 97 case "app.bsky.embed.images": 98 this.imagesCid = post.embed.media.images.map( 99 (imageRecord) => imageRecord.image.ref.$link, 100 ); 101 102 break; 103 case "app.bsky.embed.video": 104 this.videosLinkCid = post.embed.media.video.ref.$link; 105 106 break; 107 } 108 break; 109 case "app.bsky.embed.external": // assuming that external embeds are gifs for now 110 if (post.embed.external.uri.includes(".gif")) { 111 this.gifLink = post.embed.external.uri; 112 } 113 break; 114 } 115 } 116} 117 118const processAtUri = (aturi: string): atUriObject => { 119 const parts = aturi.split("/"); 120 return { 121 repo: parts[2], 122 collection: parts[3], 123 rkey: parts[4], 124 }; 125}; 126 127const rpc = new XRPC({ 128 handler: simpleFetchHandler({ 129 service: Config.PDS_URL, 130 }), 131}); 132 133const getDidsFromPDS = async (): Promise<At.Did[]> => { 134 const { data } = await rpc.get("com.atproto.sync.listRepos", { 135 params: { 136 limit: 1000, 137 }, 138 }); 139 return data.repos.filter((x) => x.active).map((repo: Repo) => repo.did) 140 .reverse() as At.Did[]; 141}; 142const getAccountMetadata = async ( 143 did: `did:${string}:${string}`, 144) => { 145 const account: AccountMetadata = { 146 did: did, 147 handle: "", // Guaranteed to be filled out later 148 displayName: "", 149 avatarCid: null, 150 }; 151 const localStorageKey = `did-metadata:${did}`; 152 const cachedResult = cacheGet<AccountMetadata>(localStorageKey); 153 if (cachedResult) { 154 return cachedResult; 155 } 156 try { 157 const { data } = await rpc.get("com.atproto.repo.getRecord", { 158 params: { 159 repo: did, 160 collection: "app.bsky.actor.profile", 161 rkey: "self", 162 }, 163 }); 164 const value = data.value as AppBskyActorProfile.Record; 165 account.displayName = value.displayName || ""; 166 if (value.avatar) { 167 account.avatarCid = value.avatar.ref["$link"]; 168 } 169 } catch (e) { 170 console.warn(`Error fetching profile for ${did}:`, e); 171 } 172 173 try { 174 account.handle = await blueskyHandleFromDid(did); 175 } catch (e) { 176 console.error(`Error fetching handle for ${did}:`, e); 177 return null; 178 } 179 cacheSet<AccountMetadata>(localStorageKey, account); 180 return account; 181}; 182 183const getAllMetadataFromPds = async (): Promise<AccountMetadata[]> => { 184 const dids = await getDidsFromPDS(); 185 const metadata = await Promise.all( 186 dids.map(async (repo: `did:${string}:${string}`) => { 187 return await getAccountMetadata(repo); 188 }), 189 ); 190 return metadata.filter((account) => account !== null) as AccountMetadata[]; 191}; 192 193const identityResolve = async (did: At.Did) => { 194 const resolver = new CompositeDidDocumentResolver({ 195 methods: { 196 plc: new PlcDidDocumentResolver(), 197 web: new WebDidDocumentResolver(), 198 }, 199 }); 200 201 if (did.startsWith("did:plc:") || did.startsWith("did:web:")) { 202 const doc = await resolver.resolve( 203 did as `did:plc:${string}` | `did:web:${string}`, 204 ); 205 return doc; 206 } else { 207 throw new Error(`Unsupported DID type: ${did}`); 208 } 209}; 210 211const blueskyHandleFromDid = async (did: At.Did) => { 212 const localStorageKey = `did-handle:${did}`; 213 const cachedResult = cacheGet<string>(localStorageKey); 214 if (cachedResult) { 215 return cachedResult; 216 } 217 const doc = await identityResolve(did); 218 if (doc.alsoKnownAs) { 219 const handleAtUri = doc.alsoKnownAs.find((url) => url.startsWith("at://")); 220 const handle = handleAtUri?.split("/")[2]; 221 if (!handle) { 222 return "Handle not found"; 223 } else { 224 cacheSet<string>(localStorageKey, handle); 225 return handle; 226 } 227 } else { 228 return "Handle not found"; 229 } 230}; 231 232interface PostsAcc { 233 posts: ComAtprotoRepoListRecords.Record[]; 234 account: AccountMetadata; 235} 236const getCutoffDate = (postAccounts: PostsAcc[]) => { 237 const now = Date.now(); 238 let cutoffDate: Date | null = null; 239 postAccounts.forEach((postAcc) => { 240 const latestPost = new Date( 241 (postAcc.posts[postAcc.posts.length - 1].value as AppBskyFeedPost.Record) 242 .createdAt, 243 ); 244 if (!cutoffDate) { 245 cutoffDate = latestPost; 246 } else { 247 if (latestPost > cutoffDate) { 248 cutoffDate = latestPost; 249 } 250 } 251 }); 252 if (cutoffDate) { 253 return cutoffDate; 254 } else { 255 return new Date(now); 256 } 257}; 258 259const filterPostsByDate = (posts: PostsAcc[], cutoffDate: Date) => { 260 // filter posts for each account that are older than the cutoff date and save the cursor of the last post included 261 const filteredPosts: PostsAcc[] = posts.map((postAcc) => { 262 const filtered = postAcc.posts.filter((post) => { 263 const postDate = new Date( 264 (post.value as AppBskyFeedPost.Record).createdAt, 265 ); 266 return postDate >= cutoffDate; 267 }); 268 if (filtered.length > 0) { 269 postAcc.account.currentCursor = 270 processAtUri(filtered[filtered.length - 1].uri).rkey; 271 } 272 return { 273 posts: filtered, 274 account: postAcc.account, 275 }; 276 }); 277 return filteredPosts; 278}; 279 280const postsMutex = new Mutex(); 281// nightmare function. However it works so I am not touching it 282const getNextPosts = async () => { 283 const release = await postsMutex.obtain(); 284 if (!accountsMetadata.length) { 285 accountsMetadata = await getAllMetadataFromPds(); 286 } 287 288 const postsAcc: PostsAcc[] = await Promise.all( 289 accountsMetadata.map(async (account) => { 290 const posts = await fetchPostsForUser( 291 account.did, 292 account.currentCursor || null, 293 ); 294 if (posts) { 295 return { 296 posts: posts, 297 account: account, 298 }; 299 } else { 300 return { 301 posts: [], 302 account: account, 303 }; 304 } 305 }), 306 ); 307 const recordsFiltered = postsAcc.filter((postAcc) => 308 postAcc.posts.length > 0 309 ); 310 const cutoffDate = getCutoffDate(recordsFiltered); 311 const recordsCutoff = filterPostsByDate(recordsFiltered, cutoffDate); 312 // update the accountMetadata with the new cursor 313 accountsMetadata = accountsMetadata.map((account) => { 314 const postAcc = recordsCutoff.find( 315 (postAcc) => postAcc.account.did == account.did, 316 ); 317 if (postAcc) { 318 account.currentCursor = postAcc.account.currentCursor; 319 } 320 return account; 321 }); 322 // throw the records in a big single array 323 let records = recordsCutoff.flatMap((postAcc) => postAcc.posts); 324 // sort the records by timestamp 325 records = records.sort((a, b) => { 326 const aDate = new Date( 327 (a.value as AppBskyFeedPost.Record).createdAt, 328 ).getTime(); 329 const bDate = new Date( 330 (b.value as AppBskyFeedPost.Record).createdAt, 331 ).getTime(); 332 return bDate - aDate; 333 }); 334 // filter out posts that are in the future 335 if (!Config.SHOW_FUTURE_POSTS) { 336 const now = Date.now(); 337 records = records.filter((post) => { 338 const postDate = new Date( 339 (post.value as AppBskyFeedPost.Record).createdAt, 340 ).getTime(); 341 return postDate <= now; 342 }); 343 } 344 345 const newPosts = records.map((record) => { 346 const account = accountsMetadata.find( 347 (account) => account.did == processAtUri(record.uri).repo, 348 ); 349 if (!account) { 350 throw new Error( 351 `Account with DID ${processAtUri(record.uri).repo} not found`, 352 ); 353 } 354 const post = record.value as AppBskyFeedPost.Record; 355 const richText = new RichText({ text: post.text, facets: post.facets }); 356 357 return new Post(record, account, richText); 358 }); 359 // release the mutex 360 release(); 361 return newPosts; 362}; 363 364const fetchPostsForUser = async (did: At.Did, cursor: string | null) => { 365 try { 366 const { data } = await rpc.get("com.atproto.repo.listRecords", { 367 params: { 368 repo: did as At.Identifier, 369 collection: "app.bsky.feed.post", 370 limit: Config.MAX_POSTS, 371 cursor: cursor || undefined, 372 }, 373 }); 374 return data.records as ComAtprotoRepoListRecords.Record[]; 375 } catch (e) { 376 console.error(`Error fetching posts for ${did}:`, e); 377 return null; 378 } 379}; 380 381type artists = { 382 artistName: string; 383}; 384 385type dietTeal = { 386 artists: artists[]; 387 trackName: string; 388 playedTime: number; 389}; 390 391const getTealNowListeningTo = async (did: At.Did) => { 392 const { data } = await rpc.get("com.atproto.repo.listRecords", { 393 params: { 394 repo: did as At.Identifier, 395 collection: "fm.teal.alpha.feed.play", 396 limit: 1, 397 }, 398 }); 399 if (data.records.length > 0) { 400 const record = data.records[0] as ComAtprotoRepoListRecords.Record; 401 const value = record.value as dietTeal; 402 const artists = value.artists.map((artist) => artist.artistName).join(", "); 403 const timeStamp = 404 moment(value.playedTime).isBefore(moment().subtract(1, "month")) 405 ? moment(value.playedTime).format("MMM D, YYYY") 406 : moment(value.playedTime).fromNow(); 407 return `Listening to ${value.trackName} by ${artists} ${timeStamp}`; 408 } 409 console.log(data); 410 return null; 411}; 412 413type statusSphere = { 414 status: string; 415}; 416 417const getStatusSphere = async (did: At.Did) => { 418 const { data } = await rpc.get("com.atproto.repo.listRecords", { 419 params: { 420 repo: did as At.Identifier, 421 collection: "xyz.statusphere.status", 422 limit: 1, 423 }, 424 }); 425 if (data.records.length > 0) { 426 const record = data.records[0].value as statusSphere; 427 return record.status; 428 } 429 return null; 430}; 431 432type CacheEntry<T> = { 433 data: T; 434 expire_timestamp: number; 435}; 436 437const cacheSet = <T>(key: string, value: T) => { 438 try { 439 const day = 60 * 60 * 24 * 1000; 440 const cacheData: CacheEntry<T> = { 441 data: value, 442 expire_timestamp: Date.now() + day, 443 }; 444 localStorage.setItem(key, JSON.stringify(cacheData)); 445 } catch (e) { 446 console.error("Error caching data:", e); 447 //Going just clear the cache and assume it's full. 448 localStorage.clear(); 449 } 450}; 451 452const cacheGet = <T>(key: string): T | null => { 453 try { 454 const cachedData = localStorage.getItem(key); 455 if (cachedData) { 456 const parsedData = JSON.parse(cachedData) as CacheEntry<T>; 457 if (parsedData.expire_timestamp > Date.now()) { 458 return parsedData.data; 459 } else { 460 localStorage.removeItem(key); 461 } 462 } 463 //Return null if empty or expired 464 return null; 465 } catch (e) { 466 console.error("Error fetching data from cache:", e); 467 return null; 468 } 469}; 470 471export { 472 blueskyHandleFromDid, 473 getAllMetadataFromPds, 474 getNextPosts, 475 getStatusSphere, 476 getTealNowListeningTo, 477 Post, 478}; 479export type { AccountMetadata };