import { simpleFetchHandler, XRPC } from "@atcute/client"; import "@atcute/bluesky/lexicons"; import type { AppBskyActorProfile, AppBskyEmbedImages, AppBskyFeedPost, At, ComAtprotoRepoListRecords, } from "@atcute/client/lexicons"; import { CompositeDidDocumentResolver, PlcDidDocumentResolver, WebDidDocumentResolver, } from "@atcute/identity-resolver"; import { Config } from "../../config.ts"; import { Mutex } from "mutex-ts"; import moment from "moment"; import { RichText } from "@atproto/api"; // import { ComAtprotoRepoListRecords.Record } from "@atcute/client/lexicons"; // import { AppBskyFeedPost } from "@atcute/client/lexicons"; // import { AppBskyActorDefs } from "@atcute/client/lexicons"; interface AccountMetadata { did: At.Did; displayName: string; handle: string; avatarCid: string | null; currentCursor?: string; } let accountsMetadata: AccountMetadata[] = []; interface atUriObject { repo: string; collection: string; rkey: string; } class Post { authorDid: string; authorAvatarCid: string | null; postCid: string; recordName: string; authorHandle: string; displayName: string; text: string; timestamp: number; timenotstamp: string; quotingUri: atUriObject | null; replyingUri: atUriObject | null; imagesCid: string[] | null; videosLinkCid: string | null; gifLink: string | null; richText: RichText; constructor( record: ComAtprotoRepoListRecords.Record, account: AccountMetadata, richText: RichText, ) { this.richText = richText; this.postCid = record.cid; this.recordName = processAtUri(record.uri).rkey; this.authorDid = account.did; this.authorAvatarCid = account.avatarCid; this.authorHandle = account.handle; this.displayName = account.displayName; const post = record.value as AppBskyFeedPost.Record; this.timenotstamp = post.createdAt; this.text = post.text; this.timestamp = Date.parse(post.createdAt); if (post.reply) { this.replyingUri = processAtUri(post.reply.parent.uri); } else { this.replyingUri = null; } this.quotingUri = null; this.imagesCid = null; this.videosLinkCid = null; this.gifLink = null; switch (post.embed?.$type) { case "app.bsky.embed.images": this.imagesCid = post.embed.images.map( (imageRecord: AppBskyEmbedImages.Image) => imageRecord.image.ref.$link, ); break; case "app.bsky.embed.video": this.videosLinkCid = post.embed.video.ref.$link; break; case "app.bsky.embed.record": this.quotingUri = processAtUri(post.embed.record.uri); break; case "app.bsky.embed.recordWithMedia": this.quotingUri = processAtUri(post.embed.record.record.uri); switch (post.embed.media.$type) { case "app.bsky.embed.images": this.imagesCid = post.embed.media.images.map( (imageRecord) => imageRecord.image.ref.$link, ); break; case "app.bsky.embed.video": this.videosLinkCid = post.embed.media.video.ref.$link; break; } break; case "app.bsky.embed.external": // assuming that external embeds are gifs for now if (post.embed.external.uri.includes(".gif")) { this.gifLink = post.embed.external.uri; } break; } } } const processAtUri = (aturi: string): atUriObject => { const parts = aturi.split("/"); return { repo: parts[2], collection: parts[3], rkey: parts[4], }; }; const rpc = new XRPC({ handler: simpleFetchHandler({ service: Config.PDS_URL, }), }); const getDidsFromPDS = async (): Promise => { const { data } = await rpc.get("com.atproto.sync.listRepos", { params: { limit: 1000, }, }); return data.repos.filter((x) => x.active).map((repo: Repo) => repo.did) .reverse() as At.Did[]; }; const getAccountMetadata = async ( did: `did:${string}:${string}`, ) => { const account: AccountMetadata = { did: did, handle: "", // Guaranteed to be filled out later displayName: "", avatarCid: null, }; const localStorageKey = `did-metadata:${did}`; const cachedResult = cacheGet(localStorageKey); if (cachedResult) { return cachedResult; } try { const { data } = await rpc.get("com.atproto.repo.getRecord", { params: { repo: did, collection: "app.bsky.actor.profile", rkey: "self", }, }); const value = data.value as AppBskyActorProfile.Record; account.displayName = value.displayName || ""; if (value.avatar) { account.avatarCid = value.avatar.ref["$link"]; } } catch (e) { console.warn(`Error fetching profile for ${did}:`, e); } try { account.handle = await blueskyHandleFromDid(did); } catch (e) { console.error(`Error fetching handle for ${did}:`, e); return null; } cacheSet(localStorageKey, account); return account; }; const getAllMetadataFromPds = async (): Promise => { const dids = await getDidsFromPDS(); const metadata = await Promise.all( dids.map(async (repo: `did:${string}:${string}`) => { return await getAccountMetadata(repo); }), ); return metadata.filter((account) => account !== null) as AccountMetadata[]; }; const identityResolve = async (did: At.Did) => { const resolver = new CompositeDidDocumentResolver({ methods: { plc: new PlcDidDocumentResolver(), web: new WebDidDocumentResolver(), }, }); if (did.startsWith("did:plc:") || did.startsWith("did:web:")) { const doc = await resolver.resolve( did as `did:plc:${string}` | `did:web:${string}`, ); return doc; } else { throw new Error(`Unsupported DID type: ${did}`); } }; const blueskyHandleFromDid = async (did: At.Did) => { const localStorageKey = `did-handle:${did}`; const cachedResult = cacheGet(localStorageKey); if (cachedResult) { return cachedResult; } const doc = await identityResolve(did); if (doc.alsoKnownAs) { const handleAtUri = doc.alsoKnownAs.find((url) => url.startsWith("at://")); const handle = handleAtUri?.split("/")[2]; if (!handle) { return "Handle not found"; } else { cacheSet(localStorageKey, handle); return handle; } } else { return "Handle not found"; } }; interface PostsAcc { posts: ComAtprotoRepoListRecords.Record[]; account: AccountMetadata; } const getCutoffDate = (postAccounts: PostsAcc[]) => { const now = Date.now(); let cutoffDate: Date | null = null; postAccounts.forEach((postAcc) => { const latestPost = new Date( (postAcc.posts[postAcc.posts.length - 1].value as AppBskyFeedPost.Record) .createdAt, ); if (!cutoffDate) { cutoffDate = latestPost; } else { if (latestPost > cutoffDate) { cutoffDate = latestPost; } } }); if (cutoffDate) { return cutoffDate; } else { return new Date(now); } }; const filterPostsByDate = (posts: PostsAcc[], cutoffDate: Date) => { // filter posts for each account that are older than the cutoff date and save the cursor of the last post included const filteredPosts: PostsAcc[] = posts.map((postAcc) => { const filtered = postAcc.posts.filter((post) => { const postDate = new Date( (post.value as AppBskyFeedPost.Record).createdAt, ); return postDate >= cutoffDate; }); if (filtered.length > 0) { postAcc.account.currentCursor = processAtUri(filtered[filtered.length - 1].uri).rkey; } return { posts: filtered, account: postAcc.account, }; }); return filteredPosts; }; const postsMutex = new Mutex(); // nightmare function. However it works so I am not touching it const getNextPosts = async () => { const release = await postsMutex.obtain(); if (!accountsMetadata.length) { accountsMetadata = await getAllMetadataFromPds(); } const postsAcc: PostsAcc[] = await Promise.all( accountsMetadata.map(async (account) => { const posts = await fetchPostsForUser( account.did, account.currentCursor || null, ); if (posts) { return { posts: posts, account: account, }; } else { return { posts: [], account: account, }; } }), ); const recordsFiltered = postsAcc.filter((postAcc) => postAcc.posts.length > 0 ); const cutoffDate = getCutoffDate(recordsFiltered); const recordsCutoff = filterPostsByDate(recordsFiltered, cutoffDate); // update the accountMetadata with the new cursor accountsMetadata = accountsMetadata.map((account) => { const postAcc = recordsCutoff.find( (postAcc) => postAcc.account.did == account.did, ); if (postAcc) { account.currentCursor = postAcc.account.currentCursor; } return account; }); // throw the records in a big single array let records = recordsCutoff.flatMap((postAcc) => postAcc.posts); // sort the records by timestamp records = records.sort((a, b) => { const aDate = new Date( (a.value as AppBskyFeedPost.Record).createdAt, ).getTime(); const bDate = new Date( (b.value as AppBskyFeedPost.Record).createdAt, ).getTime(); return bDate - aDate; }); // filter out posts that are in the future if (!Config.SHOW_FUTURE_POSTS) { const now = Date.now(); records = records.filter((post) => { const postDate = new Date( (post.value as AppBskyFeedPost.Record).createdAt, ).getTime(); return postDate <= now; }); } const newPosts = records.map((record) => { const account = accountsMetadata.find( (account) => account.did == processAtUri(record.uri).repo, ); if (!account) { throw new Error( `Account with DID ${processAtUri(record.uri).repo} not found`, ); } const post = record.value as AppBskyFeedPost.Record; const richText = new RichText({ text: post.text, facets: post.facets }); return new Post(record, account, richText); }); // release the mutex release(); return newPosts; }; const fetchPostsForUser = async (did: At.Did, cursor: string | null) => { try { const { data } = await rpc.get("com.atproto.repo.listRecords", { params: { repo: did as At.Identifier, collection: "app.bsky.feed.post", limit: Config.MAX_POSTS, cursor: cursor || undefined, }, }); return data.records as ComAtprotoRepoListRecords.Record[]; } catch (e) { console.error(`Error fetching posts for ${did}:`, e); return null; } }; type artists = { artistName: string; }; type dietTeal = { artists: artists[]; trackName: string; playedTime: number; }; const getTealNowListeningTo = async (did: At.Did) => { const { data } = await rpc.get("com.atproto.repo.listRecords", { params: { repo: did as At.Identifier, collection: "fm.teal.alpha.feed.play", limit: 1, }, }); if (data.records.length > 0) { const record = data.records[0] as ComAtprotoRepoListRecords.Record; const value = record.value as dietTeal; const artists = value.artists.map((artist) => artist.artistName).join(", "); const timeStamp = moment(value.playedTime).isBefore(moment().subtract(1, "month")) ? moment(value.playedTime).format("MMM D, YYYY") : moment(value.playedTime).fromNow(); return `Listening to ${value.trackName} by ${artists} ${timeStamp}`; } console.log(data); return null; }; type statusSphere = { status: string; }; const getStatusSphere = async (did: At.Did) => { const { data } = await rpc.get("com.atproto.repo.listRecords", { params: { repo: did as At.Identifier, collection: "xyz.statusphere.status", limit: 1, }, }); if (data.records.length > 0) { const record = data.records[0].value as statusSphere; return record.status; } return null; }; type CacheEntry = { data: T; expire_timestamp: number; }; const cacheSet = (key: string, value: T) => { try { const day = 60 * 60 * 24 * 1000; const cacheData: CacheEntry = { data: value, expire_timestamp: Date.now() + day, }; localStorage.setItem(key, JSON.stringify(cacheData)); } catch (e) { console.error("Error caching data:", e); //Going just clear the cache and assume it's full. localStorage.clear(); } }; const cacheGet = (key: string): T | null => { try { const cachedData = localStorage.getItem(key); if (cachedData) { const parsedData = JSON.parse(cachedData) as CacheEntry; if (parsedData.expire_timestamp > Date.now()) { return parsedData.data; } else { localStorage.removeItem(key); } } //Return null if empty or expired return null; } catch (e) { console.error("Error fetching data from cache:", e); return null; } }; export { blueskyHandleFromDid, getAllMetadataFromPds, getNextPosts, getStatusSphere, getTealNowListeningTo, Post, }; export type { AccountMetadata };