A couple of Bluesky feeds focused around PDSes

Add endpoint to ingest/backfill specific user

+29 -9
+29 -9
ingest.ts
··· 4 4 PlcDidDocumentResolver, 5 5 WebDidDocumentResolver, 6 6 } from "@atcute/identity-resolver"; 7 - import type { ResourceUri } from "@atcute/lexicons/syntax"; 7 + import type { ActorIdentifier, ResourceUri } from "@atcute/lexicons/syntax"; 8 8 import { Jetstream, CommitType } from "@skyware/jetstream"; 9 9 10 10 import type {} from "@atcute/atproto"; ··· 127 127 } 128 128 }); 129 129 130 - jetstream.on("identity", async (e) => { 131 - const cached = getAuthor.get<Author>(e.did); 132 - const pds = await getPDS(e.did as DID, true); 133 - if (!pds || cached?.pds === pds) return; 130 + async function backfillUser(did: ActorIdentifier) { 131 + const cached = getAuthor.get<Author>(did); 132 + const pds = await getPDS(did as DID, true); 133 + if (!pds || cached?.pds === pds) return false; 134 134 const handler = simpleFetchHandler({ service: pds }); 135 135 const rpc = new Client({ handler }); 136 136 try { 137 137 const { records } = await ok( 138 138 rpc.get("com.atproto.repo.listRecords", { 139 139 params: { 140 - repo: e.did, 140 + repo: did, 141 141 collection: "app.bsky.feed.post", 142 142 }, 143 - }) 143 + }), 144 144 ); 145 145 worker.postMessage({ 146 146 op: 2, 147 147 records, 148 - did: e.did, 148 + did: did, 149 149 pds, 150 150 }); 151 151 } catch (e) { 152 152 console.error(`Failed to backfill posts: ${e}`); 153 153 } 154 + return true; 155 + } 156 + 157 + jetstream.on("identity", async (e) => { 158 + await backfillUser(e.did); 154 159 }); 155 160 156 161 jetstream.start(); 157 162 158 163 export default { 159 - fetch() { 164 + async fetch(request) { 165 + const url = new URL(request.url); 166 + if (url.pathname === "/refresh") { 167 + const did = url.searchParams.get("id"); 168 + if (!did) { 169 + return new Response("No DID/handle provided", { 170 + status: 400, 171 + }); 172 + } 173 + if (!(await backfillUser(did as ActorIdentifier))) { 174 + return new Response(`Failed to refresh ${did}`, { 175 + status: 500, 176 + }); 177 + } 178 + return new Response(`Refreshed ${did}`); 179 + } 160 180 return new Response("Pong!"); 161 181 }, 162 182 } satisfies Deno.ServeDefaultExport;