A couple of Bluesky feeds focused around PDSes
1import { Client, ok, simpleFetchHandler } from "@atcute/client";
2import {
3 CompositeDidDocumentResolver,
4 PlcDidDocumentResolver,
5 WebDidDocumentResolver,
6} from "@atcute/identity-resolver";
7import type { ActorIdentifier, ResourceUri } from "@atcute/lexicons/syntax";
8import { JetstreamSubscription } from "@atcute/jetstream";
9
10import type {} from "@atcute/atproto";
11
12import { db } from "./common/db.ts";
13import type { Author, DID } from "./common/types.ts";
14
15const didResolver = new CompositeDidDocumentResolver({
16 methods: {
17 plc: new PlcDidDocumentResolver(),
18 web: new WebDidDocumentResolver(),
19 },
20});
21
22const worker = new Worker(new URL("./ingest/worker.ts", import.meta.url).href, {
23 type: "module",
24});
25
26const getAuthor = db.prepare("SELECT pds FROM authors WHERE did = ?");
27
28async function getPDS(did: DID, ignoreCache = false) {
29 let pds: string | undefined;
30
31 if (!ignoreCache) {
32 const author = getAuthor.get<Author>(did);
33 if (author) pds = author.pds;
34 }
35
36 if (!pds) {
37 const resolved = await didResolver.resolve(did);
38 for (const service of resolved.service ?? []) {
39 if (
40 service.type == "AtprotoPersonalDataServer" &&
41 typeof service.serviceEndpoint === "string"
42 ) {
43 worker.postMessage({
44 op: 4,
45 did,
46 pds: service.serviceEndpoint,
47 pds_base: getPDSBase(service.serviceEndpoint),
48 });
49 pds = service.serviceEndpoint;
50 }
51 }
52 }
53
54 return pds;
55}
56
57function getPDSBase(pds: string) {
58 const url = new URL(pds);
59 const splitDomain = url.hostname.split(".");
60 return `${splitDomain[splitDomain.length - 2]}.${
61 splitDomain[splitDomain.length - 1]
62 }`;
63}
64
65const getCursor = db.prepare("SELECT cursor FROM state WHERE id = 1");
66
67const dbCursor = getCursor.get<{ cursor?: string }>();
68const cursor = dbCursor ? Number(dbCursor.cursor) : 0;
69const jetstream = new JetstreamSubscription({
70 wantedCollections: ["app.bsky.feed.post"],
71 cursor: cursor - 10000000, // back up a bit for seamless playback
72 url:
73 Deno.env.get("JETSTREAM")?.split(",") ??
74 "wss://jetstream1.us-east.bsky.network/subscribe",
75 onConnectionOpen() {
76 console.log("Listening to the jetstream...");
77 },
78 onConnectionError(event) {
79 console.error(event.error);
80 worker.postMessage({
81 op: 3,
82 cursor: jetstream.cursor,
83 });
84 },
85});
86
87let count = 0;
88
89Deno.serve({ port: Number(Deno.env.get("PORT")) || 4001 }, async (request) => {
90 const url = new URL(request.url);
91 if (url.pathname === "/refresh") {
92 const did = url.searchParams.get("id");
93 if (!did) {
94 return new Response("No DID/handle provided", {
95 status: 400,
96 });
97 }
98 if (!(await backfillUser(did as ActorIdentifier))) {
99 return new Response(`Failed to refresh ${did}`, {
100 status: 500,
101 });
102 }
103 return new Response(`Refreshed ${did}`);
104 }
105 return new Response("Pong!");
106});
107
108for await (const event of jetstream) {
109 if (event.kind === "commit") {
110 count++;
111 if (count >= 1024) {
112 count = 0;
113 worker.postMessage({
114 op: 3,
115 cursor: event.time_us,
116 });
117 }
118
119 const atUri: ResourceUri = `at://${event.did}/app.bsky.feed.post/${event.commit.rkey}`;
120 let pds;
121 try {
122 pds = await getPDS(event.did as DID);
123 } catch (e) {
124 console.error(e);
125 continue;
126 }
127
128 if (!pds) {
129 console.error(`PDS not found for ${event.did}`);
130 continue;
131 }
132
133 if (event.commit.operation === "create") {
134 worker.postMessage({
135 op: 0,
136 atUri,
137 cid: event.commit.cid,
138 did: event.did,
139 pds,
140 });
141 } else if (event.commit.operation === "delete") {
142 worker.postMessage({
143 op: 1,
144 atUri,
145 pds,
146 });
147 }
148 } else if (event.kind === "identity") {
149 await backfillUser(event.did);
150 }
151}
152
153async function backfillUser(did: ActorIdentifier) {
154 const cached = getAuthor.get<Author>(did);
155 const pds = await getPDS(did as DID, true);
156 if (!pds || cached?.pds === pds) return false;
157 const handler = simpleFetchHandler({ service: pds });
158 const rpc = new Client({ handler });
159 try {
160 const { records } = await ok(
161 rpc.get("com.atproto.repo.listRecords", {
162 params: {
163 repo: did,
164 collection: "app.bsky.feed.post",
165 },
166 }),
167 );
168 worker.postMessage({
169 op: 2,
170 records,
171 did: did,
172 pds,
173 });
174 } catch (e) {
175 console.error(`Failed to backfill posts: ${e}`);
176 }
177 return true;
178}