···11# main indexers
22-JETSTREAM_URL="wss://jetstream.whey.party"
22+JETSTREAM_URL="wss://jetstream1.us-east.bsky.network"
33SPACEDUST_URL="wss://spacedust.whey.party"
4455# for backfill (useless if you just started the instance right now)
+5-2
index/types.ts
···11+import { Database } from "jsr:@db/sqlite@0.11";
22+13export type indexHandlerContext = {
24 op: string;
35 doer: string; // the formal term for this is "repo" but whatever right
···57 cid?: string;
68 aturi: string;
79 indexsrc: string;
88- value: Record<string, unknown>
99- userdbname: string;
1010+ value: Record<string, unknown>;
1111+ //userdbname: string;
1212+ db: Database;
1013}
+299-3
indexserver.ts
···33import { validateRecord } from "./utils/records.ts";
44import { searchParamsToJson, withCors } from "./utils/server.ts";
55import * as IndexServerTypes from "./utils/indexservertypes.ts";
66+import { Database } from "jsr:@db/sqlite@0.11";
77+import { setupUserDb } from "./utils/dbuser.ts";
88+import { jetstreamurl } from "./main.ts";
99+import { JetstreamManager, SpacedustManager } from "./utils/sharders.ts";
1010+import { handleSpacedust, SpacedustLinkMessage } from "./index/spacedust.ts";
1111+import { handleJetstream } from "./index/jetstream.ts";
1212+import { AtUri } from "npm:@atproto/api";
1313+1414+export class IndexServerUserManager {
1515+ private users = new Map<string, UserIndexServer>();
1616+1717+ /*async*/ addUser(did: string) {
1818+ if (this.users.has(did)) return;
1919+ const instance = new UserIndexServer(did);
2020+ //await instance.initialize();
2121+ this.users.set(did, instance);
2222+ }
2323+2424+ // async handleRequest({
2525+ // did,
2626+ // route,
2727+ // req,
2828+ // }: {
2929+ // did: string;
3030+ // route: string;
3131+ // req: Request;
3232+ // }) {
3333+ // if (!this.users.has(did)) await this.addUser(did);
3434+ // const user = this.users.get(did)!;
3535+ // return await user.handleHttpRequest(route, req);
3636+ // }
3737+3838+ removeUser(did: string) {
3939+ const instance = this.users.get(did);
4040+ if (!instance) return;
4141+ /*await*/ instance.shutdown();
4242+ this.users.delete(did);
4343+ }
4444+4545+ getDbForDid(did: string): Database | null {
4646+ if (!this.users.has(did)) {
4747+ return null
4848+ }
4949+ return this.users.get(did)?.db ?? null;
5050+ }
5151+5252+ coldStart(db: Database) {
5353+ const rows = db.prepare("SELECT did FROM users").all();
5454+ for (const row of rows) {
5555+ this.addUser(row.did);
5656+ }
5757+}
5858+}
5959+6060+class UserIndexServer {
6161+ did: string;
6262+ db: Database;// | undefined;
6363+ jetstream: JetstreamManager;// | undefined;
6464+ spacedust: SpacedustManager;// | undefined;
6565+6666+ constructor(did: string) {
6767+ this.did = did;
6868+ this.db = openDbForDid(this.did);
6969+ // should probably put the params of exactly what were listening to here
7070+ this.jetstream = new JetstreamManager((msg) => {
7171+ console.log("Received Jetstream message: ", msg);
7272+7373+ const op = msg.commit.operation;
7474+ const doer = msg.did;
7575+ const rev = msg.commit.rev;
7676+ const aturi = `${msg.did}/${msg.commit.collection}/${msg.commit.rkey}`;
7777+ const value = msg.commit.record;
7878+7979+ if (!doer || !value) return;
8080+ indexServerIndexer({
8181+ op,
8282+ doer,
8383+ rev,
8484+ aturi,
8585+ value,
8686+ indexsrc: `jetstream-${op}`,
8787+ db: this.db,
8888+ });
8989+ });
9090+ this.jetstream.start({
9191+ // for realsies pls get from db or something instead of this shit
9292+ wantedDids: [
9393+ this.did
9494+ // "did:plc:mn45tewwnse5btfftvd3powc",
9595+ // "did:plc:yy6kbriyxtimkjqonqatv2rb",
9696+ // "did:plc:zzhzjga3ab5fcs2vnsv2ist3",
9797+ // "did:plc:jz4ibztn56hygfld6j6zjszg",
9898+ ],
9999+ wantedCollections: [
100100+ "app.bsky.actor.profile",
101101+ "app.bsky.feed.generator",
102102+ "app.bsky.feed.like",
103103+ "app.bsky.feed.post",
104104+ "app.bsky.feed.repost",
105105+ "app.bsky.feed.threadgate",
106106+ "app.bsky.graph.block",
107107+ "app.bsky.graph.follow",
108108+ "app.bsky.graph.list",
109109+ "app.bsky.graph.listblock",
110110+ "app.bsky.graph.listitem",
111111+ "app.bsky.notification.declaration",
112112+ ],
113113+ });
114114+ //await connectToJetstream(this.did, this.db);
115115+ this.spacedust = new SpacedustManager((msg: SpacedustLinkMessage) => {
116116+ console.log("Received Spacedust message: ", msg);
117117+ const sourceURI = new AtUri(msg.link.source_record);
118118+ const srcUri = msg.link.source_record
119119+ const srcDid = sourceURI.host
120120+ const srcField = msg.link.source
121121+ const srcCol = sourceURI.collection
122122+ const subjectURI = new AtUri(msg.link.subject)
123123+ const subUri = msg.link.subject
124124+ const subDid = subjectURI.host
125125+ const subCol = subjectURI.collection
126126+127127+ this.db.run(
128128+ `INSERT INTO backlink_skeleton (
129129+ srcuri,
130130+ srcdid,
131131+ srcfield,
132132+ srccol,
133133+ suburi,
134134+ subdid,
135135+ subcol
136136+ ) VALUES (?, ?, ?, ?, ?, ?, ?)`,
137137+ [
138138+ srcUri, // full AT URI of the source record
139139+ srcDid, // did: of the source
140140+ srcField, // e.g., "reply.parent.uri" or "facets.features.did"
141141+ srcCol, // e.g., "app.bsky.feed.post"
142142+ subUri, // full AT URI of the subject (linked record)
143143+ subDid, // did: of the subject
144144+ subCol, // subject collection (can be inferred or passed)
145145+ ]
146146+ );
147147+ });
148148+ this.spacedust.start({
149149+ wantedSources: [
150150+ "app.bsky.feed.like:subject.uri", // like
151151+ "app.bsky.feed.like:via.uri", // liked repost
152152+ "app.bsky.feed.repost:subject.uri", // repost
153153+ "app.bsky.feed.repost:via.uri", // reposted repost
154154+ "app.bsky.feed.post:reply.root.uri", // thread OP
155155+ "app.bsky.feed.post:reply.parent.uri", // direct parent
156156+ "app.bsky.feed.post:embed.media.record.record.uri", // quote with media
157157+ "app.bsky.feed.post:embed.record.uri", // quote without media
158158+ "app.bsky.feed.threadgate:post", // threadgate subject
159159+ "app.bsky.feed.threadgate:hiddenReplies", // threadgate items (array)
160160+ "app.bsky.feed.post:facets.features.did", // facet item (array): mention
161161+ "app.bsky.graph.block:subject", // blocks
162162+ "app.bsky.graph.follow:subject", // follow
163163+ "app.bsky.graph.listblock:subject", // list item (blocks)
164164+ "app.bsky.graph.listblock:list", // blocklist mention (might not exist)
165165+ "app.bsky.graph.listitem:subject", // list item (blocks)
166166+ "app.bsky.graph.listitem:list", // list mention
167167+ ],
168168+ // should be getting from DB but whatever right
169169+ wantedSubjects: [
170170+ // as noted i dont need to write down each post, just the user to listen to !
171171+ // hell yeah
172172+ // "at://did:plc:mn45tewwnse5btfftvd3powc/app.bsky.feed.post/3lvybv7b6ic2h",
173173+ // "at://did:plc:mn45tewwnse5btfftvd3powc/app.bsky.feed.post/3lvybws4avc2h",
174174+ // "at://did:plc:mn45tewwnse5btfftvd3powc/app.bsky.feed.post/3lvvkcxcscs2h",
175175+ // "at://did:plc:yy6kbriyxtimkjqonqatv2rb/app.bsky.feed.post/3l63ogxocq42f",
176176+ // "at://did:plc:yy6kbriyxtimkjqonqatv2rb/app.bsky.feed.post/3lw3wamvflu23",
177177+ ],
178178+ wantedSubjectDids: [
179179+ this.did,
180180+ //"did:plc:mn45tewwnse5btfftvd3powc",
181181+ //"did:plc:yy6kbriyxtimkjqonqatv2rb",
182182+ //"did:plc:zzhzjga3ab5fcs2vnsv2ist3",
183183+ //"did:plc:jz4ibztn56hygfld6j6zjszg",
184184+ ],
185185+ });
186186+ //await connectToConstellation(this.did, this.db);
187187+ }
188188+189189+ // initialize() {
190190+191191+ // }
192192+193193+ // async handleHttpRequest(route: string, req: Request): Promise<Response> {
194194+ // if (route === "posts") {
195195+ // const posts = await this.queryPosts();
196196+ // return new Response(JSON.stringify(posts), {
197197+ // headers: { "content-type": "application/json" },
198198+ // });
199199+ // }
200200+201201+ // return new Response("Unknown route", { status: 404 });
202202+ // }
203203+204204+ // private async queryPosts() {
205205+ // return this.db.run(
206206+ // "SELECT * FROM posts ORDER BY created_at DESC LIMIT 100"
207207+ // );
208208+ // }
209209+210210+ shutdown() {
211211+ this.jetstream.stop();
212212+ this.spacedust.stop();
213213+ this.db.close?.();
214214+ }
215215+}
216216+217217+function openDbForDid(did: string): Database {
218218+ const path = `./dbs/${did}.sqlite`;
219219+ const db = new Database(path);
220220+ setupUserDb(db);
221221+ //await db.exec(/* CREATE IF NOT EXISTS statements */);
222222+ return db;
223223+}
224224+225225+// async function connectToJetstream(did: string, db: Database): Promise<WebSocket> {
226226+// const url = `${jetstreamurl}/xrpc/com.atproto.sync.subscribeRepos?did=${did}`;
227227+// const ws = new WebSocket(url);
228228+// ws.onmessage = (msg) => {
229229+// //handleJetstreamMessage(evt.data, db)
230230+231231+// const op = msg.commit.operation;
232232+// const doer = msg.did;
233233+// const rev = msg.commit.rev;
234234+// const aturi = `${msg.did}/${msg.commit.collection}/${msg.commit.rkey}`;
235235+// const value = msg.commit.record;
236236+237237+// if (!doer || !value) return;
238238+// indexServerIndexer({
239239+// op,
240240+// doer,
241241+// rev,
242242+// aturi,
243243+// value,
244244+// indexsrc: "onboarding_backfill",
245245+// userdbname: did,
246246+// })
247247+// };
248248+249249+// return ws;
250250+// }
251251+252252+// async function connectToConstellation(did: string, db: D1Database): Promise<WebSocket> {
253253+// const url = `wss://bsky.social/xrpc/com.atproto.sync.subscribeLabels?did=${did}`;
254254+// const ws = new WebSocket(url);
255255+// ws.onmessage = (evt) => handleConstellationMessage(evt.data, db);
256256+// return ws;
257257+// }
62587259export async function indexServerHandler(req: Request): Promise<Response> {
8260 const url = new URL(req.url);
···221473 target: string;
222474};
223475224224-export async function constellationAPIHandler(req: Request): Promise<Response> {
476476+const SQL = {
477477+ links: `
478478+ SELECT srcuri, srcdid, srccol
479479+ FROM backlink_skeleton
480480+ WHERE suburi = ? AND subcol = ? AND srcfield = ?
481481+ `,
482482+ distinctDids: `
483483+ SELECT DISTINCT srcdid
484484+ FROM backlink_skeleton
485485+ WHERE suburi = ? AND subcol = ? AND srcfield = ?
486486+ `,
487487+ count: `
488488+ SELECT COUNT(*) as total
489489+ FROM backlink_skeleton
490490+ WHERE suburi = ? AND subcol = ? AND srcfield = ?
491491+ `,
492492+ countDistinctDids: `
493493+ SELECT COUNT(DISTINCT srcdid) as total
494494+ FROM backlink_skeleton
495495+ WHERE suburi = ? AND subcol = ? AND srcfield = ?
496496+ `,
497497+ all: `
498498+ SELECT suburi, srccol, COUNT(*) as records, COUNT(DISTINCT srcdid) as distinct_dids
499499+ FROM backlink_skeleton
500500+ WHERE suburi = ?
501501+ GROUP BY suburi, srccol
502502+ `,
503503+}
504504+505505+export async function constellationAPIHandler(req: Request, did: string): Promise<Response> {
225506 const url = new URL(req.url);
226507 const pathname = url.pathname;
227508 // const bskyUrl = `https://api.bsky.app${pathname}${url.search}`;
···231512 // : null;
232513 const searchParams = searchParamsToJson(url.searchParams);
233514 const jsonUntyped = searchParams;
515515+ const db = openDbForDid(did);
234516235517 switch (pathname) {
236518 case "/links": {
237519 const jsonTyped = jsonUntyped as linksQuery;
520520+ // probably need to do pagination or something
238521239239- const response: linksRecordsResponse = {};
522522+ const rows = db.prepare(SQL.links).all(jsonTyped.target, jsonTyped.collection, jsonTyped.path);
523523+524524+ const linking_records: linksRecord[] = rows.map((row: any) => {
525525+ const rkey = row.srcuri.split('/').pop()!;
526526+ return {
527527+ did: row.srcdid,
528528+ collection: row.srccol,
529529+ rkey,
530530+ };
531531+ });
240532533533+ const response: linksRecordsResponse = {
534534+ total: linking_records.length.toString(),
535535+ linking_records,
536536+ };
241537 return new Response(JSON.stringify(response), {
242538 headers: withCors({ "Content-Type": "application/json" }),
243539 });
···283579 JSON.stringify({
284580 error: "idk NotSupported",
285581 message:
286286- "HEY hello there my name is whey dot party and you have used my custom appview that is very cool but have you considered that idk Not Supported",
582582+ "HEY hello there my name is whey dot party and you have used my custom constellation implementation that is very cool but have you considered that idk Not Supported",
287583 }),
288584 {
289585 status: 404,
+5-1
main.ts
···1414import * as ATPAPI from "npm:@atproto/api";
1515import { didDocument } from "./utils/diddoc.ts";
1616import { cachedFetch, searchParamsToJson, withCors } from "./utils/server.ts";
1717-import { constellationAPIHandler, indexServerHandler } from "./indexserver.ts";
1717+import { constellationAPIHandler, indexServerHandler, IndexServerUserManager } from "./indexserver.ts";
1818import { viewServerHandler } from "./viewserver.ts";
19192020+export const jetstreamurl = Deno.env.get("JETSTREAM_URL");
2021export const slingshoturl = Deno.env.get("SLINGSHOT_URL");
2122export const constellationurl = Deno.env.get("CONSTELLATION_URL");
2223export const spacedusturl = Deno.env.get("SPACEDUST_URL");
···27282829export const systemDB = new Database("system.db");
2930setupSystemDb(systemDB);
3131+3232+const userManager = new IndexServerUserManager();
3333+userManager.coldStart(systemDB)
30343135// should do both of these per user actually, since now each user has their own db
3236// also the set of records and backlinks to listen should be seperate between index and view servers