import { indexHandlerContext } from "./index/types.ts"; import { assertRecord, validateRecord } from "./utils/records.ts"; import { buildBlobUrl, getSlingshotRecord, resolveIdentity, searchParamsToJson, withCors, } from "./utils/server.ts"; import * as IndexServerTypes from "./utils/indexservertypes.ts"; import { Database } from "jsr:@db/sqlite@0.11"; import { setupUserDb } from "./utils/dbuser.ts"; // import { systemDB } from "./env.ts"; import { JetstreamManager, SpacedustManager } from "./utils/sharders.ts"; import { handleSpacedust, SpacedustLinkMessage } from "./index/spacedust.ts"; import { handleJetstream } from "./index/jetstream.ts"; import * as ATPAPI from "npm:@atproto/api"; import { AtUri } from "npm:@atproto/api"; import * as IndexServerAPI from "./indexclient/index.ts"; import * as IndexServerUtils from "./indexclient/util.ts"; import { isPostView } from "./indexclient/types/app/bsky/feed/defs.ts"; export interface IndexServerConfig { baseDbPath: string; systemDbPath: string; } interface BaseRow { uri: string; did: string; cid: string | null; rev: string | null; createdat: number | null; indexedat: number; json: string | null; } interface GeneratorRow extends BaseRow { displayname: string | null; description: string | null; avatarcid: string | null; } interface LikeRow extends BaseRow { subject: string; } interface RepostRow extends BaseRow { subject: string; } interface BacklinkRow { srcuri: string; srcdid: string; } const FEED_LIMIT = 50; export class IndexServer { private config: IndexServerConfig; public userManager: IndexServerUserManager; public systemDB: Database; constructor(config: IndexServerConfig) { this.config = config; // We will initialize the system DB and user manager here this.systemDB = new Database(this.config.systemDbPath); // TODO: We need to setup the system DB schema if it's new this.userManager = new IndexServerUserManager(this); // Pass the server instance } public start() { // This is where we'll kick things off, like the cold start this.userManager.coldStart(this.systemDB); console.log("IndexServer started."); } public async handleRequest(req: Request): Promise { const url = new URL(req.url); // We will add routing logic here later to call our handlers if (url.pathname.startsWith("/xrpc/")) { return this.indexServerHandler(req); } if (url.pathname.startsWith("/links")) { return this.constellationAPIHandler(req); } return new Response("Not Found", { status: 404 }); } public handlesDid(did: string): boolean { return this.userManager.handlesDid(did); } async unspeccedGetRegisteredUsers(): Promise<{ did: string; role: string; registrationdate: string; onboardingstatus: string; pfp?: string; displayname: string; handle: string; }[]|undefined> { const stmt = this.systemDB.prepare(` SELECT * FROM users; `); const result = stmt.all() as { did: string; role: string; registrationdate: string; onboardingstatus: string; }[]; const hydrated = await Promise.all( result.map(async (user)=>{ const identity = await resolveIdentity(user.did); const profile = (await getSlingshotRecord(identity.did,"app.bsky.actor.profile","self")).value as ATPAPI.AppBskyActorProfile.Record; const avatarcid = uncid(profile.avatar?.ref); const avatar = avatarcid ? buildBlobUrl(identity.pds, identity.did, avatarcid) : undefined; return {...user,handle: identity.handle,pfp: avatar, displayname:profile.displayName ?? identity.handle } })) //const exists = result !== undefined; return hydrated; } // We will move all the global functions into this class as methods... async indexServerHandler(req: Request): Promise { const url = new URL(req.url); const pathname = url.pathname; //const bskyUrl = `https://api.bsky.app${pathname}${url.search}`; //const hasAuth = req.headers.has("authorization"); const xrpcMethod = pathname.startsWith("/xrpc/") ? pathname.slice("/xrpc/".length) : null; const searchParams = searchParamsToJson(url.searchParams); console.log(JSON.stringify(searchParams, null, 2)); const jsonUntyped = searchParams; switch (xrpcMethod) { case "app.bsky.actor.getProfile": { const jsonTyped = jsonUntyped as IndexServerTypes.AppBskyActorGetProfile.QueryParams; const res = await this.queryProfileView(jsonTyped.actor, "Detailed"); if (!res) return new Response( JSON.stringify({ error: "User not found", }), { status: 404, headers: withCors({ "Content-Type": "application/json" }), } ); const response: IndexServerTypes.AppBskyActorGetProfile.OutputSchema = res; return new Response(JSON.stringify(response), { headers: withCors({ "Content-Type": "application/json" }), }); } case "app.bsky.actor.getProfiles": { const jsonTyped = jsonUntyped as IndexServerTypes.AppBskyActorGetProfiles.QueryParams; if (typeof jsonUntyped?.actors === "string") { const res = await this.queryProfileView( jsonUntyped.actors as string, "Detailed" ); if (!res) return new Response( JSON.stringify({ error: "User not found", }), { status: 404, headers: withCors({ "Content-Type": "application/json" }), } ); const response: IndexServerTypes.AppBskyActorGetProfiles.OutputSchema = { profiles: [res], }; return new Response(JSON.stringify(response), { headers: withCors({ "Content-Type": "application/json" }), }); } const res: ATPAPI.AppBskyActorDefs.ProfileViewDetailed[] = await Promise.all( jsonTyped.actors .map(async (actor) => { return await this.queryProfileView(actor, "Detailed"); }) .filter( ( x ): x is Promise => x !== undefined ) ); if (!res) return new Response( JSON.stringify({ error: "User not found", }), { status: 404, headers: withCors({ "Content-Type": "application/json" }), } ); const response: IndexServerTypes.AppBskyActorGetProfiles.OutputSchema = { profiles: res, }; return new Response(JSON.stringify(response), { headers: withCors({ "Content-Type": "application/json" }), }); } case "app.bsky.feed.getActorFeeds": { const jsonTyped = jsonUntyped as IndexServerTypes.AppBskyFeedGetActorFeeds.QueryParams; const qresult = await this.queryActorFeeds(jsonTyped.actor); const response: IndexServerTypes.AppBskyFeedGetActorFeeds.OutputSchema = { feeds: qresult, }; return new Response(JSON.stringify(response), { headers: withCors({ "Content-Type": "application/json" }), }); } case "app.bsky.feed.getFeedGenerator": { const jsonTyped = jsonUntyped as IndexServerTypes.AppBskyFeedGetFeedGenerator.QueryParams; const qresult = await this.queryFeedGenerator(jsonTyped.feed); if (!qresult) { return new Response( JSON.stringify({ error: "Feed not found", }), { status: 404, headers: withCors({ "Content-Type": "application/json" }), } ); } const response: IndexServerTypes.AppBskyFeedGetFeedGenerator.OutputSchema = { view: qresult, isOnline: true, // lmao isValid: true, // lmao }; return new Response(JSON.stringify(response), { headers: withCors({ "Content-Type": "application/json" }), }); } case "app.bsky.feed.getFeedGenerators": { const jsonTyped = jsonUntyped as IndexServerTypes.AppBskyFeedGetFeedGenerators.QueryParams; const qresult = await this.queryFeedGenerators(jsonTyped.feeds); if (!qresult) { return new Response( JSON.stringify({ error: "Feed not found", }), { status: 404, headers: withCors({ "Content-Type": "application/json" }), } ); } const response: IndexServerTypes.AppBskyFeedGetFeedGenerators.OutputSchema = { feeds: qresult, }; return new Response(JSON.stringify(response), { headers: withCors({ "Content-Type": "application/json" }), }); } case "app.bsky.feed.getPosts": { const jsonTyped = jsonUntyped as IndexServerTypes.AppBskyFeedGetPosts.QueryParams; const posts: IndexServerTypes.AppBskyFeedGetPosts.OutputSchema["posts"] = ( await Promise.all( jsonTyped.uris.map((uri) => this.queryPostView(uri)) ) ).filter((p): p is ATPAPI.AppBskyFeedDefs.PostView => Boolean(p)); const response: IndexServerTypes.AppBskyFeedGetPosts.OutputSchema = { posts, }; return new Response(JSON.stringify(response), { headers: withCors({ "Content-Type": "application/json" }), }); } case "party.whey.app.bsky.feed.getActorLikesPartial": { const jsonTyped = jsonUntyped as IndexServerTypes.PartyWheyAppBskyFeedGetActorLikesPartial.QueryParams; // TODO: not partial yet, currently skips refs const qresult = await this.queryActorLikesPartial( jsonTyped.actor, jsonTyped.cursor ); if (!qresult) { return new Response( JSON.stringify({ error: "Feed not found", }), { status: 404, headers: withCors({ "Content-Type": "application/json" }), } ); } const response: IndexServerTypes.PartyWheyAppBskyFeedGetActorLikesPartial.OutputSchema = { feed: qresult.items as ATPAPI.$Typed[], cursor: qresult.cursor, }; return new Response(JSON.stringify(response), { headers: withCors({ "Content-Type": "application/json" }), }); } case "party.whey.app.bsky.feed.getAuthorFeedPartial": { const jsonTyped = jsonUntyped as IndexServerTypes.PartyWheyAppBskyFeedGetAuthorFeedPartial.QueryParams; // TODO: not partial yet, currently skips refs const qresult = await this.queryAuthorFeedPartial( jsonTyped.actor, jsonTyped.cursor ); if (!qresult) { return new Response( JSON.stringify({ error: "Feed not found", }), { status: 404, headers: withCors({ "Content-Type": "application/json" }), } ); } const response: IndexServerTypes.PartyWheyAppBskyFeedGetAuthorFeedPartial.OutputSchema = { feed: qresult.items as ATPAPI.$Typed[], cursor: qresult.cursor, }; return new Response(JSON.stringify(response), { headers: withCors({ "Content-Type": "application/json" }), }); } case "party.whey.app.bsky.feed.getLikesPartial": { const jsonTyped = jsonUntyped as IndexServerTypes.PartyWheyAppBskyFeedGetLikesPartial.QueryParams; // TODO: not partial yet, currently skips refs const qresult = this.queryLikes(jsonTyped.uri); if (!qresult) { return new Response( JSON.stringify({ error: "Feed not found", }), { status: 404, headers: withCors({ "Content-Type": "application/json" }), } ); } const response: IndexServerTypes.PartyWheyAppBskyFeedGetLikesPartial.OutputSchema = { // @ts-ignore whatever i dont care TODO: fix ts ignores likes: qresult, }; return new Response(JSON.stringify(response), { headers: withCors({ "Content-Type": "application/json" }), }); } case "party.whey.app.bsky.feed.getPostThreadPartial": { const jsonTyped = jsonUntyped as IndexServerTypes.PartyWheyAppBskyFeedGetPostThreadPartial.QueryParams; // TODO: not partial yet, currently skips refs const qresult = await this.queryPostThreadPartial(jsonTyped.uri); if (!qresult) { return new Response( JSON.stringify({ error: "Feed not found", }), { status: 404, headers: withCors({ "Content-Type": "application/json" }), } ); } const response: IndexServerTypes.PartyWheyAppBskyFeedGetPostThreadPartial.OutputSchema = qresult; return new Response(JSON.stringify(response), { headers: withCors({ "Content-Type": "application/json" }), }); } case "party.whey.app.bsky.feed.getQuotesPartial": { const jsonTyped = jsonUntyped as IndexServerTypes.PartyWheyAppBskyFeedGetQuotesPartial.QueryParams; // TODO: not partial yet, currently skips refs const qresult = await this.queryQuotes(jsonTyped.uri); if (!qresult) { return new Response( JSON.stringify({ error: "Feed not found", }), { status: 404, headers: withCors({ "Content-Type": "application/json" }), } ); } const response: IndexServerTypes.PartyWheyAppBskyFeedGetQuotesPartial.OutputSchema = { uri: jsonTyped.uri, posts: qresult.map((feedviewpost) => { return feedviewpost.post as ATPAPI.$Typed; }), }; return new Response(JSON.stringify(response), { headers: withCors({ "Content-Type": "application/json" }), }); } case "party.whey.app.bsky.feed.getRepostedByPartial": { const jsonTyped = jsonUntyped as IndexServerTypes.PartyWheyAppBskyFeedGetRepostedByPartial.QueryParams; // TODO: not partial yet, currently skips refs const qresult = await this.queryReposts(jsonTyped.uri); if (!qresult) { return new Response( JSON.stringify({ error: "Feed not found", }), { status: 404, headers: withCors({ "Content-Type": "application/json" }), } ); } const response: IndexServerTypes.PartyWheyAppBskyFeedGetRepostedByPartial.OutputSchema = { uri: jsonTyped.uri, repostedBy: qresult as ATPAPI.$Typed[], }; return new Response(JSON.stringify(response), { headers: withCors({ "Content-Type": "application/json" }), }); } // TODO: too hard for now // case "party.whey.app.bsky.feed.getListFeedPartial": { // const jsonTyped = // jsonUntyped as IndexServerTypes.PartyWheyAppBskyFeedGetListFeedPartial.QueryParams; // const response: IndexServerTypes.PartyWheyAppBskyFeedGetListFeedPartial.OutputSchema = // {}; // return new Response(JSON.stringify(response), { // headers: withCors({ "Content-Type": "application/json" }), // }); // } /* three more coming soon app.bsky.graph.getLists app.bsky.graph.getList app.bsky.graph.getActorStarterPacks */ default: { return new Response( JSON.stringify({ error: "XRPCNotSupported", message: "HEY hello there my name is whey dot party and you have used my custom appview that is very cool but have you considered that XRPC Not Supported", }), { status: 404, headers: withCors({ "Content-Type": "application/json" }), } ); } } // return new Response("Not Found", { status: 404 }); } constellationAPIHandler(req: Request): Response { const url = new URL(req.url); const pathname = url.pathname; const searchParams = searchParamsToJson(url.searchParams) as linksQuery; const jsonUntyped = searchParams; if (!jsonUntyped.target) { return new Response( JSON.stringify({ error: "Missing required parameter: target" }), { status: 400, headers: withCors({ "Content-Type": "application/json" }), } ); } const did = isDid(searchParams.target) ? searchParams.target : new AtUri(searchParams.target).host; const db = this.userManager.getDbForDid(did); if (!db) { return new Response( JSON.stringify({ error: "User not found", }), { status: 404, headers: withCors({ "Content-Type": "application/json" }), } ); } const limit = 16; //Math.min(parseInt(searchParams.limit || "50", 10), 100); const offset = parseInt(searchParams.cursor || "0", 10); switch (pathname) { case "/links": { const jsonTyped = jsonUntyped as linksQuery; if (!jsonTyped.collection || !jsonTyped.path) { return new Response( JSON.stringify({ error: "Missing required parameters: collection, path", }), { status: 400, headers: withCors({ "Content-Type": "application/json" }), } ); } const field = `${jsonTyped.collection}:${jsonTyped.path.replace( /^\./, "" )}`; const paginatedSql = `${SQL.links} LIMIT ? OFFSET ?`; const rows = db .prepare(paginatedSql) .all(jsonTyped.target, jsonTyped.collection, field, limit, offset); const countResult = db .prepare(SQL.count) .get(jsonTyped.target, jsonTyped.collection, field); const total = countResult ? Number(countResult.total) : 0; const linking_records: linksRecord[] = rows.map((row: any) => { const rkey = row.srcuri.split("/").pop()!; return { did: row.srcdid, collection: row.srccol, rkey, }; }); const response: linksRecordsResponse = { total: total.toString(), linking_records, }; const nextCursor = offset + linking_records.length; if (nextCursor < total) { response.cursor = nextCursor.toString(); } return new Response(JSON.stringify(response), { headers: withCors({ "Content-Type": "application/json" }), }); } case "/links/distinct-dids": { const jsonTyped = jsonUntyped as linksQuery; if (!jsonTyped.collection || !jsonTyped.path) { return new Response( JSON.stringify({ error: "Missing required parameters: collection, path", }), { status: 400, headers: withCors({ "Content-Type": "application/json" }), } ); } const field = `${jsonTyped.collection}:${jsonTyped.path.replace( /^\./, "" )}`; const paginatedSql = `${SQL.distinctDids} LIMIT ? OFFSET ?`; const rows = db .prepare(paginatedSql) .all(jsonTyped.target, jsonTyped.collection, field, limit, offset); const countResult = db .prepare(SQL.countDistinctDids) .get(jsonTyped.target, jsonTyped.collection, field); const total = countResult ? Number(countResult.total) : 0; const linking_dids: string[] = rows.map((row: any) => row.srcdid); const response: linksDidsResponse = { total: total.toString(), linking_dids, }; const nextCursor = offset + linking_dids.length; if (nextCursor < total) { response.cursor = nextCursor.toString(); } return new Response(JSON.stringify(response), { headers: withCors({ "Content-Type": "application/json" }), }); } case "/links/count": { const jsonTyped = jsonUntyped as linksQuery; if (!jsonTyped.collection || !jsonTyped.path) { return new Response( JSON.stringify({ error: "Missing required parameters: collection, path", }), { status: 400, headers: withCors({ "Content-Type": "application/json" }), } ); } const field = `${jsonTyped.collection}:${jsonTyped.path.replace( /^\./, "" )}`; const result = db .prepare(SQL.count) .get(jsonTyped.target, jsonTyped.collection, field); const response: linksCountResponse = { total: result && result.total ? result.total.toString() : "0", }; return new Response(JSON.stringify(response), { headers: withCors({ "Content-Type": "application/json" }), }); } case "/links/count/distinct-dids": { const jsonTyped = jsonUntyped as linksQuery; if (!jsonTyped.collection || !jsonTyped.path) { return new Response( JSON.stringify({ error: "Missing required parameters: collection, path", }), { status: 400, headers: withCors({ "Content-Type": "application/json" }), } ); } const field = `${jsonTyped.collection}:${jsonTyped.path.replace( /^\./, "" )}`; const result = db .prepare(SQL.countDistinctDids) .get(jsonTyped.target, jsonTyped.collection, field); const response: linksCountResponse = { total: result && result.total ? result.total.toString() : "0", }; return new Response(JSON.stringify(response), { headers: withCors({ "Content-Type": "application/json" }), }); } case "/links/all": { const jsonTyped = jsonUntyped as linksAllQuery; const rows = db.prepare(SQL.all).all(jsonTyped.target) as any[]; const links: linksAllResponse["links"] = {}; for (const row of rows) { if (!links[row.suburi]) { links[row.suburi] = {}; } links[row.suburi][row.srccol] = { records: row.records, distinct_dids: row.distinct_dids, }; } const response: linksAllResponse = { links, }; return new Response(JSON.stringify(response), { headers: withCors({ "Content-Type": "application/json" }), }); } default: { return new Response( JSON.stringify({ error: "NotSupported", message: "The requested endpoint is not supported by this Constellation implementation.", }), { status: 404, headers: withCors({ "Content-Type": "application/json" }), } ); } } } indexServerIndexer(ctx: indexHandlerContext) { const record = assertRecord(ctx.value); //const record = validateRecord(ctx.value); const db = this.userManager.getDbForDid(ctx.doer); if (!db) return; console.log("indexering"); switch (record?.$type) { case "app.bsky.feed.like": { return; } case "app.bsky.actor.profile": { console.log("bsky profuile"); try { const stmt = db.prepare(` INSERT OR IGNORE INTO app_bsky_actor_profile ( uri, did, cid, rev, createdat, indexedat, json, displayname, description, avatarcid, avatarmime, bannercid, bannermime ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) `); console.log({ uri: ctx.aturi, did: ctx.doer, cid: ctx.cid, rev: ctx.rev, createdat: record.createdAt, indexedat: Date.now(), json: JSON.stringify(record), displayname: record.displayName, description: record.description, avatarcid: uncid(record.avatar?.ref), avatarmime: record.avatar?.mimeType, bannercid: uncid(record.banner?.ref), bannermime: record.banner?.mimeType, }); stmt.run( ctx.aturi ?? null, ctx.doer ?? null, ctx.cid ?? null, ctx.rev ?? null, record.createdAt ?? null, Date.now(), JSON.stringify(record), record.displayName ?? null, record.description ?? null, uncid(record.avatar?.ref) ?? null, record.avatar?.mimeType ?? null, uncid(record.banner?.ref) ?? null, record.banner?.mimeType ?? null // TODO please add pinned posts ); } catch (err) { console.error("stmt.run failed:", err); } return; } case "app.bsky.feed.post": { console.log("bsky post"); const stmt = db.prepare(` INSERT OR IGNORE INTO app_bsky_feed_post ( uri, did, cid, rev, createdat, indexedat, json, text, replyroot, replyparent, quote, imagecount, image1cid, image1mime, image1aspect, image2cid, image2mime, image2aspect, image3cid, image3mime, image3aspect, image4cid, image4mime, image4aspect, videocount, videocid, videomime, videoaspect ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) `); const embed = record.embed; const images = extractImages(embed); const video = extractVideo(embed); const quoteUri = extractQuoteUri(embed); try { stmt.run( ctx.aturi ?? null, ctx.doer ?? null, ctx.cid ?? null, ctx.rev ?? null, record.createdAt, Date.now(), JSON.stringify(record), record.text ?? null, record.reply?.root?.uri ?? null, record.reply?.parent?.uri ?? null, quoteUri, images.length, uncid(images[0]?.image?.ref) ?? null, images[0]?.image?.mimeType ?? null, images[0]?.aspectRatio && images[0].aspectRatio.width && images[0].aspectRatio.height ? `${images[0].aspectRatio.width}:${images[0].aspectRatio.height}` : null, uncid(images[1]?.image?.ref) ?? null, images[1]?.image?.mimeType ?? null, images[1]?.aspectRatio && images[1].aspectRatio.width && images[1].aspectRatio.height ? `${images[1].aspectRatio.width}:${images[1].aspectRatio.height}` : null, uncid(images[2]?.image?.ref) ?? null, images[2]?.image?.mimeType ?? null, images[2]?.aspectRatio && images[2].aspectRatio.width && images[2].aspectRatio.height ? `${images[2].aspectRatio.width}:${images[2].aspectRatio.height}` : null, uncid(images[3]?.image?.ref) ?? null, images[3]?.image?.mimeType ?? null, images[3]?.aspectRatio && images[3].aspectRatio.width && images[3].aspectRatio.height ? `${images[3].aspectRatio.width}:${images[3].aspectRatio.height}` : null, uncid(video?.video) ? 1 : 0, uncid(video?.video) ?? null, uncid(video?.video) ? "video/mp4" : null, video?.aspectRatio ? `${video.aspectRatio.width}:${video.aspectRatio.height}` : null ); } catch (err) { console.error("stmt.run failed:", err); } return; } default: { // what the hell return; } } } // user data async queryProfileView( did: string, type: "" ): Promise; async queryProfileView( did: string, type: "Basic" ): Promise; async queryProfileView( did: string, type: "Detailed" ): Promise; async queryProfileView( did: string, type: "" | "Basic" | "Detailed" ): Promise< | ATPAPI.AppBskyActorDefs.ProfileView | ATPAPI.AppBskyActorDefs.ProfileViewBasic | ATPAPI.AppBskyActorDefs.ProfileViewDetailed | undefined > { if (!this.isRegisteredIndexUser(did)) return; const db = this.userManager.getDbForDid(did); if (!db) return; const stmt = db.prepare(` SELECT * FROM app_bsky_actor_profile WHERE did = ? LIMIT 1; `); const row = stmt.get(did) as ProfileRow; const identity = await resolveIdentity(did); const avatar = row.avatarcid ? buildBlobUrl( identity.pds, identity.did, row.avatarcid ) : undefined const banner = row.bannercid ? buildBlobUrl( identity.pds, identity.did, row.bannercid ) : undefined // simulate different types returned switch (type) { case "": { const result: ATPAPI.AppBskyActorDefs.ProfileView = { $type: "app.bsky.actor.defs#profileView", did: did, handle: identity.handle, // TODO: Resolve user identity here for the handle displayName: row.displayname ?? undefined, description: row.description ?? undefined, avatar: avatar, // create profile URL from resolved identity //associated?: ProfileAssociated, indexedAt: row.createdat ? new Date(row.createdat).toISOString() : undefined, createdAt: row.createdat ? new Date(row.createdat).toISOString() : undefined, //viewer?: ViewerState, //labels?: ComAtprotoLabelDefs.Label[], //verification?: VerificationState, //status?: StatusView, }; return result; } case "Basic": { const result: ATPAPI.AppBskyActorDefs.ProfileViewBasic = { $type: "app.bsky.actor.defs#profileViewBasic", did: did, handle: identity.handle, // TODO: Resolve user identity here for the handle displayName: row.displayname ?? undefined, avatar: avatar, // create profile URL from resolved identity //associated?: ProfileAssociated, createdAt: row.createdat ? new Date(row.createdat).toISOString() : undefined, //viewer?: ViewerState, //labels?: ComAtprotoLabelDefs.Label[], //verification?: VerificationState, //status?: StatusView, }; return result; } case "Detailed": { // Query for follower count from the backlink_skeleton table const followersStmt = db.prepare(` SELECT COUNT(*) as count FROM backlink_skeleton WHERE subdid = ? AND srccol = 'app.bsky.graph.follow' `); const followersResult = followersStmt.get(did) as { count: number }; const followersCount = followersResult?.count ?? 0; // Query for following count from the app_bsky_graph_follow table const followingStmt = db.prepare(` SELECT COUNT(*) as count FROM app_bsky_graph_follow WHERE did = ? `); const followingResult = followingStmt.get(did) as { count: number }; const followsCount = followingResult?.count ?? 0; // Query for post count from the app_bsky_feed_post table const postsStmt = db.prepare(` SELECT COUNT(*) as count FROM app_bsky_feed_post WHERE did = ? `); const postsResult = postsStmt.get(did) as { count: number }; const postsCount = postsResult?.count ?? 0; const result: ATPAPI.AppBskyActorDefs.ProfileViewDetailed = { $type: "app.bsky.actor.defs#profileViewDetailed", did: did, handle: identity.handle, // TODO: Resolve user identity here for the handle displayName: row.displayname ?? undefined, description: row.description ?? undefined, avatar: avatar, // TODO: create profile URL from resolved identity banner: banner, // same here followersCount: followersCount, followsCount: followsCount, postsCount: postsCount, //associated?: ProfileAssociated, //joinedViaStarterPack?: // AppBskyGraphDefs.StarterPackViewBasic; indexedAt: row.createdat ? new Date(row.createdat).toISOString() : undefined, createdAt: row.createdat ? new Date(row.createdat).toISOString() : undefined, //viewer?: ViewerState, //labels?: ComAtprotoLabelDefs.Label[], pinnedPost: undefined, //row.; // TODO: i forgot to put pinnedp posts in db schema oops //verification?: VerificationState, //status?: StatusView, }; return result; } default: throw new Error("Invalid type"); } } // post hydration async queryPostView( uri: string ): Promise { const URI = new AtUri(uri); const did = URI.host; if (!this.isRegisteredIndexUser(did)) return; const db = this.userManager.getDbForDid(did); if (!db) return; const stmt = db.prepare(` SELECT * FROM app_bsky_feed_post WHERE uri = ? LIMIT 1; `); const row = stmt.get(uri) as PostRow; const profileView = await this.queryProfileView(did, "Basic"); if (!row || !row.cid || !profileView || !row.json) return; const value = JSON.parse(row.json) as ATPAPI.AppBskyFeedPost.Record; const post: ATPAPI.AppBskyFeedDefs.PostView = { uri: row.uri, cid: row.cid, author: profileView, record: value, indexedAt: new Date(row.indexedat).toISOString(), embed: value.embed, }; return post; } constructPostViewRef( uri: string ): IndexServerAPI.PartyWheyAppBskyFeedDefs.PostViewRef { const post: IndexServerAPI.PartyWheyAppBskyFeedDefs.PostViewRef = { uri: uri, cid: "cid.invalid", // oh shit we dont know the cid TODO: major design flaw }; return post; } async queryFeedViewPost( uri: string ): Promise { const post = await this.queryPostView(uri); if (!post) return; const feedviewpost: ATPAPI.AppBskyFeedDefs.FeedViewPost = { $type: "app.bsky.feed.defs#feedViewPost", post: post, //reply: ReplyRef, //reason: , }; return feedviewpost; } constructFeedViewPostRef( uri: string ): IndexServerAPI.PartyWheyAppBskyFeedDefs.FeedViewPostRef { const post = this.constructPostViewRef(uri); const feedviewpostref: IndexServerAPI.PartyWheyAppBskyFeedDefs.FeedViewPostRef = { $type: "party.whey.app.bsky.feed.defs#feedViewPostRef", post: post as IndexServerUtils.$Typed, }; return feedviewpostref; } // user feedgens async queryActorFeeds( did: string ): Promise { if (!this.isRegisteredIndexUser(did)) return []; const db = this.userManager.getDbForDid(did); if (!db) return []; const stmt = db.prepare(` SELECT uri, cid, did, json, indexedat FROM app_bsky_feed_generator WHERE did = ? ORDER BY createdat DESC; `); const rows = stmt.all(did) as unknown as GeneratorRow[]; const creatorView = await this.queryProfileView(did, "Basic"); if (!creatorView) return []; return rows .map((row) => { try { if (!row.json) return; const record = JSON.parse( row.json ) as ATPAPI.AppBskyFeedGenerator.Record; return { $type: "app.bsky.feed.defs#generatorView", uri: row.uri, cid: row.cid, did: row.did, creator: creatorView, displayName: record.displayName, description: record.description, descriptionFacets: record.descriptionFacets, avatar: record.avatar, likeCount: 0, // TODO: this should be easy indexedAt: new Date(row.indexedat).toISOString(), } as ATPAPI.AppBskyFeedDefs.GeneratorView; } catch { return undefined; } }) .filter((v): v is ATPAPI.AppBskyFeedDefs.GeneratorView => !!v); } async queryFeedGenerator( uri: string ): Promise { const gens = await this.queryFeedGenerators([uri]); // gens: GeneratorView[] return gens[0]; } async queryFeedGenerators( uris: string[] ): Promise { const generators: ATPAPI.AppBskyFeedDefs.GeneratorView[] = []; const urisByDid = new Map(); for (const uri of uris) { try { const { host: did } = new AtUri(uri); if (!urisByDid.has(did)) { urisByDid.set(did, []); } urisByDid.get(did)!.push(uri); } catch {} } for (const [did, didUris] of urisByDid.entries()) { if (!this.isRegisteredIndexUser(did)) continue; const db = this.userManager.getDbForDid(did); if (!db) continue; const placeholders = didUris.map(() => "?").join(","); const stmt = db.prepare(` SELECT uri, cid, did, json, indexedat FROM app_bsky_feed_generator WHERE uri IN (${placeholders}); `); const rows = stmt.all(...didUris) as unknown as GeneratorRow[]; if (rows.length === 0) continue; const creatorView = await this.queryProfileView(did, ""); if (!creatorView) continue; for (const row of rows) { try { if (!row.json || !row.cid) continue; const record = JSON.parse( row.json ) as ATPAPI.AppBskyFeedGenerator.Record; generators.push({ $type: "app.bsky.feed.defs#generatorView", uri: row.uri, cid: row.cid, did: row.did, creator: creatorView, displayName: record.displayName, description: record.description, descriptionFacets: record.descriptionFacets, avatar: record.avatar as string | undefined, likeCount: 0, indexedAt: new Date(row.indexedat).toISOString(), }); } catch {} } } return generators; } // user feeds async queryAuthorFeedPartial( did: string, cursor?: string ): Promise< | { items: ( | ATPAPI.AppBskyFeedDefs.FeedViewPost | IndexServerAPI.PartyWheyAppBskyFeedDefs.FeedViewPostRef )[]; cursor: string | undefined; } | undefined > { if (!this.isRegisteredIndexUser(did)) return; const db = this.userManager.getDbForDid(did); if (!db) return; const subquery = ` SELECT uri, cid, indexedat, 'post' as type, null as subject FROM app_bsky_feed_post WHERE did = ? UNION ALL SELECT uri, cid, indexedat, 'repost' as type, subject FROM app_bsky_feed_repost WHERE did = ? `; let query = `SELECT * FROM (${subquery}) as feed_items`; const params: (string | number)[] = [did, did]; if (cursor) { const [indexedat, cid] = cursor.split("::"); query += ` WHERE (indexedat < ? OR (indexedat = ? AND cid < ?))`; params.push(parseInt(indexedat, 10), parseInt(indexedat, 10), cid); } query += ` ORDER BY indexedat DESC, cid DESC LIMIT ${FEED_LIMIT}`; const stmt = db.prepare(query); const rows = stmt.all(...params) as { uri: string; indexedat: number; cid: string; type: "post" | "repost"; subject: string | null; }[]; const authorProfile = await this.queryProfileView(did, "Basic"); const items = await Promise.all( rows .map((row) => { if (row.type === "repost" && row.subject) { const subjectDid = new AtUri(row.subject).host; const originalPost = this.handlesDid(subjectDid) ? this.queryFeedViewPost(row.subject) : this.constructFeedViewPostRef(row.subject); if (!originalPost || !authorProfile) return null; return { post: originalPost, reason: { $type: "app.bsky.feed.defs#reasonRepost", by: authorProfile, indexedAt: new Date(row.indexedat).toISOString(), }, }; } else { return this.queryFeedViewPost(row.uri); } }) .filter((p): p is Promise => !!p) ); const lastItem = rows[rows.length - 1]; const nextCursor = lastItem ? `${lastItem.indexedat}::${lastItem.cid}` : undefined; return { items, cursor: nextCursor }; } queryListFeed( uri: string, cursor?: string ): | { items: ATPAPI.AppBskyFeedDefs.FeedViewPost[]; cursor: string | undefined; } | undefined { return { items: [], cursor: undefined }; } async queryActorLikesPartial( did: string, cursor?: string ): Promise< | { items: ( | ATPAPI.AppBskyFeedDefs.FeedViewPost | IndexServerAPI.PartyWheyAppBskyFeedDefs.FeedViewPostRef )[]; cursor: string | undefined; } | undefined > { // early return only if the actor did is not registered if (!this.isRegisteredIndexUser(did)) return; const db = this.userManager.getDbForDid(did); if (!db) return; let query = ` SELECT subject, indexedat, cid FROM app_bsky_feed_like WHERE did = ? `; const params: (string | number)[] = [did]; if (cursor) { const [indexedat, cid] = cursor.split("::"); query += ` AND (indexedat < ? OR (indexedat = ? AND cid < ?))`; params.push(parseInt(indexedat, 10), parseInt(indexedat, 10), cid); } query += ` ORDER BY indexedat DESC, cid DESC LIMIT ${FEED_LIMIT}`; const stmt = db.prepare(query); const rows = stmt.all(...params) as { subject: string; indexedat: number; cid: string; }[]; const items = await Promise.all( rows .map(async (row) => { const subjectDid = new AtUri(row.subject).host; if (this.handlesDid(subjectDid)) { return await this.queryFeedViewPost(row.subject); } else { return this.constructFeedViewPostRef(row.subject); } }) .filter( ( p ): p is Promise< | ATPAPI.AppBskyFeedDefs.FeedViewPost | IndexServerAPI.PartyWheyAppBskyFeedDefs.FeedViewPostRef > => !!p ) ); const lastItem = rows[rows.length - 1]; const nextCursor = lastItem ? `${lastItem.indexedat}::${lastItem.cid}` : undefined; return { items, cursor: nextCursor }; } // post metadata async queryLikes( uri: string ): Promise { const postUri = new AtUri(uri); const postAuthorDid = postUri.hostname; if (!this.isRegisteredIndexUser(postAuthorDid)) return; const db = this.userManager.getDbForDid(postAuthorDid); if (!db) return; const stmt = db.prepare(` SELECT b.srcdid, b.srcuri FROM backlink_skeleton AS b WHERE b.suburi = ? AND b.srccol = 'app_bsky_feed_like' ORDER BY b.id DESC; `); const rows = stmt.all(uri) as unknown as BacklinkRow[]; return await Promise.all( rows .map(async (row) => { const actor = await this.queryProfileView(row.srcdid, ""); if (!actor) return; return { // TODO write indexedAt for spacedust indexes createdAt: new Date(Date.now()).toISOString(), indexedAt: new Date(Date.now()).toISOString(), actor: actor, }; }) .filter( (like): like is Promise => !!like ) ); } async queryReposts( uri: string ): Promise { const postUri = new AtUri(uri); const postAuthorDid = postUri.hostname; if (!this.isRegisteredIndexUser(postAuthorDid)) return []; const db = this.userManager.getDbForDid(postAuthorDid); if (!db) return []; const stmt = db.prepare(` SELECT srcdid FROM backlink_skeleton WHERE suburi = ? AND srccol = 'app_bsky_feed_repost' ORDER BY id DESC; `); const rows = stmt.all(uri) as { srcdid: string }[]; return await Promise.all( rows .map(async (row) => await this.queryProfileView(row.srcdid, "")) .filter((p): p is Promise => !!p) ); } async queryQuotes( uri: string ): Promise { const postUri = new AtUri(uri); const postAuthorDid = postUri.hostname; if (!this.isRegisteredIndexUser(postAuthorDid)) return []; const db = this.userManager.getDbForDid(postAuthorDid); if (!db) return []; const stmt = db.prepare(` SELECT srcuri FROM backlink_skeleton WHERE suburi = ? AND srccol = 'app_bsky_feed_post' AND srcfield = 'quote' ORDER BY id DESC; `); const rows = stmt.all(uri) as { srcuri: string }[]; return await Promise.all( rows .map(async (row) => await this.queryFeedViewPost(row.srcuri)) .filter((p): p is Promise => !!p) ); } async _getPostViewUnion( uri: string ): Promise< | ATPAPI.AppBskyFeedDefs.PostView | IndexServerAPI.PartyWheyAppBskyFeedDefs.PostViewRef | undefined > { try { const postDid = new AtUri(uri).hostname; if (this.handlesDid(postDid)) { return await this.queryPostView(uri); } else { return this.constructPostViewRef(uri); } } catch (_e) { return undefined; } } async queryPostThreadPartial( uri: string ): Promise< | IndexServerTypes.PartyWheyAppBskyFeedGetPostThreadPartial.OutputSchema | undefined > { const post = await this._getPostViewUnion(uri); if (!post) { return { thread: { $type: "app.bsky.feed.defs#notFoundPost", uri: uri, notFound: true, } as ATPAPI.$Typed, }; } const thread: IndexServerAPI.PartyWheyAppBskyFeedDefs.ThreadViewPostRef = { $type: "party.whey.app.bsky.feed.defs#threadViewPostRef", post: post as | ATPAPI.$Typed | IndexServerUtils.$Typed, replies: [], }; let current = thread; // we can only climb the parent tree if we have the full post record. // which is not implemented yet (sad i know) if ( isPostView(current.post) && isFeedPostRecord(current.post.record) && current.post.record?.reply?.parent?.uri ) { let parentUri: string | undefined = current.post.record.reply.parent.uri; // keep climbing as long as we find a valid parent post. while (parentUri) { const parentPost = await this._getPostViewUnion(parentUri); if (!parentPost) break; // stop if a parent in the chain is not found. const parentThread: IndexServerAPI.PartyWheyAppBskyFeedDefs.ThreadViewPostRef = { $type: "party.whey.app.bsky.feed.defs#threadViewPostRef", post: parentPost as ATPAPI.$Typed, replies: [ current as IndexServerUtils.$Typed, ], }; current.parent = parentThread as IndexServerUtils.$Typed; current = parentThread; // check if the new current post has a parent to continue the loop parentUri = isPostView(current.post) && isFeedPostRecord(current.post.record) ? current.post.record?.reply?.parent?.uri : undefined; } } const seenUris = new Set(); const fetchReplies = async ( parentThread: IndexServerAPI.PartyWheyAppBskyFeedDefs.ThreadViewPostRef ) => { if (!parentThread.post || !("uri" in parentThread.post)) { return; } if (seenUris.has(parentThread.post.uri)) return; seenUris.add(parentThread.post.uri); const parentUri = new AtUri(parentThread.post.uri); const parentAuthorDid = parentUri.hostname; // replies can only be discovered for local posts where we have the backlink data if (!this.handlesDid(parentAuthorDid)) return; const db = this.userManager.getDbForDid(parentAuthorDid); if (!db) return; const stmt = db.prepare(` SELECT srcuri FROM backlink_skeleton WHERE suburi = ? AND srccol = 'app_bsky_feed_post' AND srcfield = 'replyparent' `); const replyRows = stmt.all(parentThread.post.uri) as { srcuri: string }[]; const replies = await Promise.all( replyRows .map(async (row) => await this._getPostViewUnion(row.srcuri)) .filter( ( p ): p is Promise< | ATPAPI.AppBskyFeedDefs.PostView | IndexServerAPI.PartyWheyAppBskyFeedDefs.PostViewRef > => !!p ) ); for (const replyPost of replies) { const replyThread: IndexServerAPI.PartyWheyAppBskyFeedDefs.ThreadViewPostRef = { $type: "party.whey.app.bsky.feed.defs#threadViewPostRef", post: replyPost as | ATPAPI.$Typed | IndexServerUtils.$Typed, parent: parentThread as IndexServerUtils.$Typed, replies: [], }; parentThread.replies?.push( replyThread as IndexServerUtils.$Typed ); fetchReplies(replyThread); // recurse } }; fetchReplies(thread); const returned = current as unknown as IndexServerAPI.PartyWheyAppBskyFeedDefs.ThreadViewPostRef; return { thread: returned as IndexServerUtils.$Typed, }; } /** * please do not use this, use openDbForDid() instead * @param did * @returns */ internalCreateDbForDid(did: string): Database { const path = `${this.config.baseDbPath}/${did}.sqlite`; const db = new Database(path); setupUserDb(db); //await db.exec(/* CREATE IF NOT EXISTS statements */); return db; } /** * @deprecated use handlesDid() instead * @param did * @returns */ isRegisteredIndexUser(did: string): boolean { const stmt = this.systemDB.prepare(` SELECT 1 FROM users WHERE did = ? AND onboardingstatus != 'onboarding-backfill' LIMIT 1; `); const result = stmt.value<[number]>(did); const exists = result !== undefined; return exists; } } export class IndexServerUserManager { public indexServer: IndexServer; constructor(indexServer: IndexServer) { this.indexServer = indexServer; } public users = new Map(); public handlesDid(did: string): boolean { return this.users.has(did); } /*async*/ addUser(did: string) { if (this.users.has(did)) return; const instance = new UserIndexServer(this, did); //await instance.initialize(); this.users.set(did, instance); } // async handleRequest({ // did, // route, // req, // }: { // did: string; // route: string; // req: Request; // }) { // if (!this.users.has(did)) await this.addUser(did); // const user = this.users.get(did)!; // return await user.handleHttpRequest(route, req); // } removeUser(did: string) { const instance = this.users.get(did); if (!instance) return; /*await*/ instance.shutdown(); this.users.delete(did); } getDbForDid(did: string): Database | null { if (!this.users.has(did)) { return null; } return this.users.get(did)?.db ?? null; } coldStart(db: Database) { const rows = db.prepare("SELECT did FROM users").all(); for (const row of rows) { this.addUser(row.did); } } } class UserIndexServer { public indexServerUserManager: IndexServerUserManager; did: string; db: Database; // | undefined; jetstream: JetstreamManager; // | undefined; spacedust: SpacedustManager; // | undefined; constructor(indexServerUserManager: IndexServerUserManager, did: string) { this.did = did; this.indexServerUserManager = indexServerUserManager; this.db = this.indexServerUserManager.indexServer.internalCreateDbForDid( this.did ); // should probably put the params of exactly what were listening to here this.jetstream = new JetstreamManager((msg) => { console.log("Received Jetstream message: ", msg); const op = msg.commit.operation; const doer = msg.did; const rev = msg.commit.rev; const aturi = `${msg.did}/${msg.commit.collection}/${msg.commit.rkey}`; const value = msg.commit.record; if (!doer || !value) return; this.indexServerUserManager.indexServer.indexServerIndexer({ op, doer, cid: msg.commit.cid, rev, aturi, value, indexsrc: `jetstream-${op}`, db: this.db, }); }); this.jetstream.start({ // for realsies pls get from db or something instead of this shit wantedDids: [ this.did, // "did:plc:mn45tewwnse5btfftvd3powc", // "did:plc:yy6kbriyxtimkjqonqatv2rb", // "did:plc:zzhzjga3ab5fcs2vnsv2ist3", // "did:plc:jz4ibztn56hygfld6j6zjszg", ], wantedCollections: [ "app.bsky.actor.profile", "app.bsky.feed.generator", "app.bsky.feed.like", "app.bsky.feed.post", "app.bsky.feed.repost", "app.bsky.feed.threadgate", "app.bsky.graph.block", "app.bsky.graph.follow", "app.bsky.graph.list", "app.bsky.graph.listblock", "app.bsky.graph.listitem", "app.bsky.notification.declaration", ], }); //await connectToJetstream(this.did, this.db); this.spacedust = new SpacedustManager((msg: SpacedustLinkMessage) => { console.log("Received Spacedust message: ", msg); const operation = msg.link.operation; const sourceURI = new AtUri(msg.link.source_record); const srcUri = msg.link.source_record; const srcDid = sourceURI.host; const srcField = msg.link.source; const srcCol = sourceURI.collection; const subjectURI = new AtUri(msg.link.subject); const subUri = msg.link.subject; const subDid = subjectURI.host; const subCol = subjectURI.collection; if (operation === "delete") { this.db.run( `DELETE FROM backlink_skeleton WHERE srcuri = ? AND srcfield = ? AND suburi = ?`, [srcUri, srcField, subUri] ); } else if (operation === "create") { this.db.run( `INSERT OR REPLACE INTO backlink_skeleton ( srcuri, srcdid, srcfield, srccol, suburi, subdid, subcol, indexedAt ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)`, [ srcUri, // full AT URI of the source record srcDid, // did: of the source srcField, // e.g., "reply.parent.uri" or "facets.features.did" srcCol, // e.g., "app.bsky.feed.post" subUri, // full AT URI of the subject (linked record) subDid, // did: of the subject subCol, // subject collection (can be inferred or passed) Date.now() ] ); } }); this.spacedust.start({ wantedSources: [ "app.bsky.feed.like:subject.uri", // like "app.bsky.feed.like:via.uri", // liked repost "app.bsky.feed.repost:subject.uri", // repost "app.bsky.feed.repost:via.uri", // reposted repost "app.bsky.feed.post:reply.root.uri", // thread OP "app.bsky.feed.post:reply.parent.uri", // direct parent "app.bsky.feed.post:embed.media.record.record.uri", // quote with media "app.bsky.feed.post:embed.record.uri", // quote without media "app.bsky.feed.threadgate:post", // threadgate subject "app.bsky.feed.threadgate:hiddenReplies", // threadgate items (array) "app.bsky.feed.post:facets.features.did", // facet item (array): mention "app.bsky.graph.block:subject", // blocks "app.bsky.graph.follow:subject", // follow "app.bsky.graph.listblock:subject", // list item (blocks) "app.bsky.graph.listblock:list", // blocklist mention (might not exist) "app.bsky.graph.listitem:subject", // list item (blocks) "app.bsky.graph.listitem:list", // list mention ], // should be getting from DB but whatever right wantedSubjects: [ // as noted i dont need to write down each post, just the user to listen to ! // hell yeah // "at://did:plc:mn45tewwnse5btfftvd3powc/app.bsky.feed.post/3lvybv7b6ic2h", // "at://did:plc:mn45tewwnse5btfftvd3powc/app.bsky.feed.post/3lvybws4avc2h", // "at://did:plc:mn45tewwnse5btfftvd3powc/app.bsky.feed.post/3lvvkcxcscs2h", // "at://did:plc:yy6kbriyxtimkjqonqatv2rb/app.bsky.feed.post/3l63ogxocq42f", // "at://did:plc:yy6kbriyxtimkjqonqatv2rb/app.bsky.feed.post/3lw3wamvflu23", ], wantedSubjectDids: [ this.did, //"did:plc:mn45tewwnse5btfftvd3powc", //"did:plc:yy6kbriyxtimkjqonqatv2rb", //"did:plc:zzhzjga3ab5fcs2vnsv2ist3", //"did:plc:jz4ibztn56hygfld6j6zjszg", ], instant: ["true"] }); //await connectToConstellation(this.did, this.db); } // initialize() { // } // async handleHttpRequest(route: string, req: Request): Promise { // if (route === "posts") { // const posts = await this.queryPosts(); // return new Response(JSON.stringify(posts), { // headers: { "content-type": "application/json" }, // }); // } // return new Response("Unknown route", { status: 404 }); // } // private async queryPosts() { // return this.db.run( // "SELECT * FROM posts ORDER BY created_at DESC LIMIT 100" // ); // } shutdown() { this.jetstream.stop(); this.spacedust.stop(); this.db.close?.(); } } // /** // * please do not use this, use openDbForDid() instead // * @param did // * @returns // */ // function internalCreateDbForDid(did: string): Database { // const path = `./dbs/${did}.sqlite`; // const db = new Database(path); // setupUserDb(db); // //await db.exec(/* CREATE IF NOT EXISTS statements */); // return db; // } // function getDbForDid(did: string): Database | undefined { // const db = indexerUserManager.getDbForDid(did); // if (!db) return; // return db; // } // async function connectToJetstream(did: string, db: Database): Promise { // const url = `${jetstreamurl}/xrpc/com.atproto.sync.subscribeRepos?did=${did}`; // const ws = new WebSocket(url); // ws.onmessage = (msg) => { // //handleJetstreamMessage(evt.data, db) // const op = msg.commit.operation; // const doer = msg.did; // const rev = msg.commit.rev; // const aturi = `${msg.did}/${msg.commit.collection}/${msg.commit.rkey}`; // const value = msg.commit.record; // if (!doer || !value) return; // indexServerIndexer({ // op, // doer, // rev, // aturi, // value, // indexsrc: "onboarding_backfill", // userdbname: did, // }) // }; // return ws; // } // async function connectToConstellation(did: string, db: D1Database): Promise { // const url = `wss://bsky.social/xrpc/com.atproto.sync.subscribeLabels?did=${did}`; // const ws = new WebSocket(url); // ws.onmessage = (evt) => handleConstellationMessage(evt.data, db); // return ws; // } type PostRow = { uri: string; did: string; cid: string | null; rev: string | null; createdat: number | null; indexedat: number; json: string | null; text: string | null; replyroot: string | null; replyparent: string | null; quote: string | null; imagecount: number | null; image1cid: string | null; image1mime: string | null; image1aspect: string | null; image2cid: string | null; image2mime: string | null; image2aspect: string | null; image3cid: string | null; image3mime: string | null; image3aspect: string | null; image4cid: string | null; image4mime: string | null; image4aspect: string | null; videocount: number | null; videocid: string | null; videomime: string | null; videoaspect: string | null; }; type ProfileRow = { uri: string; cid: string | null; rev: string | null; createdat: number | null; indexedat: number; json: string | null; displayname: string | null; description: string | null; avatarcid: string | null; avatarmime: string | null; bannercid: string | null; bannermime: string | null; }; type linksQuery = { target: string; collection: string; path: string; cursor?: string; }; type linksRecord = { did: string; collection: string; rkey: string; }; type linksRecordsResponse = { total: string; linking_records: linksRecord[]; cursor?: string; }; type linksDidsResponse = { total: string; linking_dids: string[]; cursor?: string; }; type linksCountResponse = { total: string; }; type linksAllResponse = { links: Record< string, Record< string, { records: number; distinct_dids: number; } > >; }; type linksAllQuery = { target: string; }; const SQL = { links: ` SELECT srcuri, srcdid, srccol FROM backlink_skeleton WHERE suburi = ? AND srccol = ? AND srcfield = ? `, distinctDids: ` SELECT DISTINCT srcdid FROM backlink_skeleton WHERE suburi = ? AND srccol = ? AND srcfield = ? `, count: ` SELECT COUNT(*) as total FROM backlink_skeleton WHERE suburi = ? AND srccol = ? AND srcfield = ? `, countDistinctDids: ` SELECT COUNT(DISTINCT srcdid) as total FROM backlink_skeleton WHERE suburi = ? AND srccol = ? AND srcfield = ? `, all: ` SELECT suburi, srccol, COUNT(*) as records, COUNT(DISTINCT srcdid) as distinct_dids FROM backlink_skeleton WHERE suburi = ? GROUP BY suburi, srccol `, }; export function isDid(str: string): boolean { return typeof str === "string" && str.startsWith("did:"); } function isFeedPostRecord( post: unknown ): post is ATPAPI.AppBskyFeedPost.Record { return ( typeof post === "object" && post !== null && "$type" in post && (post as any).$type === "app.bsky.feed.post" ); } function isImageEmbed(embed: unknown): embed is ATPAPI.AppBskyEmbedImages.Main { return ( typeof embed === "object" && embed !== null && "$type" in embed && (embed as any).$type === "app.bsky.embed.images" ); } function isVideoEmbed(embed: unknown): embed is ATPAPI.AppBskyEmbedVideo.Main { return ( typeof embed === "object" && embed !== null && "$type" in embed && (embed as any).$type === "app.bsky.embed.video" ); } function isRecordEmbed( embed: unknown ): embed is ATPAPI.AppBskyEmbedRecord.Main { return ( typeof embed === "object" && embed !== null && "$type" in embed && (embed as any).$type === "app.bsky.embed.record" ); } function isRecordWithMediaEmbed( embed: unknown ): embed is ATPAPI.AppBskyEmbedRecordWithMedia.Main { return ( typeof embed === "object" && embed !== null && "$type" in embed && (embed as any).$type === "app.bsky.embed.recordWithMedia" ); } export function uncid(anything: any): string | null { return ( ((anything as Record)?.["$link"] as string | null) || null ); } function extractImages(embed: unknown) { if (isImageEmbed(embed)) return embed.images; if (isRecordWithMediaEmbed(embed) && isImageEmbed(embed.media)) return embed.media.images; return []; } function extractVideo(embed: unknown) { if (isVideoEmbed(embed)) return embed; if (isRecordWithMediaEmbed(embed) && isVideoEmbed(embed.media)) return embed.media; return null; } function extractQuoteUri(embed: unknown): string | null { if (isRecordEmbed(embed)) return embed.record.uri; if (isRecordWithMediaEmbed(embed)) return embed.record.record.uri; return null; }