A couple of Bluesky feeds focused around PDSes
at main 178 lines 4.5 kB view raw
1import { Client, ok, simpleFetchHandler } from "@atcute/client"; 2import { 3 CompositeDidDocumentResolver, 4 PlcDidDocumentResolver, 5 WebDidDocumentResolver, 6} from "@atcute/identity-resolver"; 7import type { ActorIdentifier, ResourceUri } from "@atcute/lexicons/syntax"; 8import { JetstreamSubscription } from "@atcute/jetstream"; 9 10import type {} from "@atcute/atproto"; 11 12import { db } from "./common/db.ts"; 13import type { Author, DID } from "./common/types.ts"; 14 15const didResolver = new CompositeDidDocumentResolver({ 16 methods: { 17 plc: new PlcDidDocumentResolver(), 18 web: new WebDidDocumentResolver(), 19 }, 20}); 21 22const worker = new Worker(new URL("./ingest/worker.ts", import.meta.url).href, { 23 type: "module", 24}); 25 26const getAuthor = db.prepare("SELECT pds FROM authors WHERE did = ?"); 27 28async function getPDS(did: DID, ignoreCache = false) { 29 let pds: string | undefined; 30 31 if (!ignoreCache) { 32 const author = getAuthor.get<Author>(did); 33 if (author) pds = author.pds; 34 } 35 36 if (!pds) { 37 const resolved = await didResolver.resolve(did); 38 for (const service of resolved.service ?? []) { 39 if ( 40 service.type == "AtprotoPersonalDataServer" && 41 typeof service.serviceEndpoint === "string" 42 ) { 43 worker.postMessage({ 44 op: 4, 45 did, 46 pds: service.serviceEndpoint, 47 pds_base: getPDSBase(service.serviceEndpoint), 48 }); 49 pds = service.serviceEndpoint; 50 } 51 } 52 } 53 54 return pds; 55} 56 57function getPDSBase(pds: string) { 58 const url = new URL(pds); 59 const splitDomain = url.hostname.split("."); 60 return `${splitDomain[splitDomain.length - 2]}.${ 61 splitDomain[splitDomain.length - 1] 62 }`; 63} 64 65const getCursor = db.prepare("SELECT cursor FROM state WHERE id = 1"); 66 67const dbCursor = getCursor.get<{ cursor?: string }>(); 68const cursor = dbCursor ? Number(dbCursor.cursor) : 0; 69const jetstream = new JetstreamSubscription({ 70 wantedCollections: ["app.bsky.feed.post"], 71 cursor: cursor - 10000000, // back up a bit for seamless playback 72 url: 73 Deno.env.get("JETSTREAM")?.split(",") ?? 74 "wss://jetstream1.us-east.bsky.network/subscribe", 75 onConnectionOpen() { 76 console.log("Listening to the jetstream..."); 77 }, 78 onConnectionError(event) { 79 console.error(event.error); 80 worker.postMessage({ 81 op: 3, 82 cursor: jetstream.cursor, 83 }); 84 }, 85}); 86 87let count = 0; 88 89Deno.serve({ port: Number(Deno.env.get("PORT")) || 4001 }, async (request) => { 90 const url = new URL(request.url); 91 if (url.pathname === "/refresh") { 92 const did = url.searchParams.get("id"); 93 if (!did) { 94 return new Response("No DID/handle provided", { 95 status: 400, 96 }); 97 } 98 if (!(await backfillUser(did as ActorIdentifier))) { 99 return new Response(`Failed to refresh ${did}`, { 100 status: 500, 101 }); 102 } 103 return new Response(`Refreshed ${did}`); 104 } 105 return new Response("Pong!"); 106}); 107 108for await (const event of jetstream) { 109 if (event.kind === "commit") { 110 count++; 111 if (count >= 1024) { 112 count = 0; 113 worker.postMessage({ 114 op: 3, 115 cursor: event.time_us, 116 }); 117 } 118 119 const atUri: ResourceUri = `at://${event.did}/app.bsky.feed.post/${event.commit.rkey}`; 120 let pds; 121 try { 122 pds = await getPDS(event.did as DID); 123 } catch (e) { 124 console.error(e); 125 continue; 126 } 127 128 if (!pds) { 129 console.error(`PDS not found for ${event.did}`); 130 continue; 131 } 132 133 if (event.commit.operation === "create") { 134 worker.postMessage({ 135 op: 0, 136 atUri, 137 cid: event.commit.cid, 138 did: event.did, 139 pds, 140 }); 141 } else if (event.commit.operation === "delete") { 142 worker.postMessage({ 143 op: 1, 144 atUri, 145 pds, 146 }); 147 } 148 } else if (event.kind === "identity") { 149 await backfillUser(event.did); 150 } 151} 152 153async function backfillUser(did: ActorIdentifier) { 154 const cached = getAuthor.get<Author>(did); 155 const pds = await getPDS(did as DID, true); 156 if (!pds || cached?.pds === pds) return false; 157 const handler = simpleFetchHandler({ service: pds }); 158 const rpc = new Client({ handler }); 159 try { 160 const { records } = await ok( 161 rpc.get("com.atproto.repo.listRecords", { 162 params: { 163 repo: did, 164 collection: "app.bsky.feed.post", 165 }, 166 }), 167 ); 168 worker.postMessage({ 169 op: 2, 170 records, 171 did: did, 172 pds, 173 }); 174 } catch (e) { 175 console.error(`Failed to backfill posts: ${e}`); 176 } 177 return true; 178}