a tool for shared writing and social publishing

add bsky follows feed to generator

Squashed commit of the following:

commit 524473259155da8789c0d0c8105b48f92cc00580
Author: Jared Pereira <jared@awarm.space>
Date: Wed Oct 15 12:18:53 2025 -0400

add sub feed handler for bsky follows feed and ingest api for indexing follows

+194 -43
+5
app/api/inngest/client.ts
··· 3 3 import { EventSchemas } from "inngest"; 4 4 5 5 export type Events = { 6 + "feeds/index-follows": { 7 + data: { 8 + did: string; 9 + }; 10 + }; 6 11 "appview/profile-update": { 7 12 data: { 8 13 record: any;
+119
app/api/inngest/functions/index_follows.ts
··· 1 + import { supabaseServerClient } from "supabase/serverClient"; 2 + import { AtpAgent, AtUri } from "@atproto/api"; 3 + import { createIdentity } from "actions/createIdentity"; 4 + import { drizzle } from "drizzle-orm/node-postgres"; 5 + import { inngest } from "../client"; 6 + import { Client } from "pg"; 7 + 8 + export const index_follows = inngest.createFunction( 9 + { 10 + id: "index_follows", 11 + throttle: { 12 + limit: 1, 13 + period: "5m", 14 + key: "event.data.did", 15 + }, 16 + }, 17 + { event: "feeds/index-follows" }, 18 + async ({ event, step }) => { 19 + let follows: string[] = []; 20 + let cursor: null | string = null; 21 + let hasMore = true; 22 + let pageNumber = 0; 23 + while (hasMore) { 24 + let page: { 25 + cursor?: string; 26 + follows: string[]; 27 + } = await step.run(`get-follows-${pageNumber}`, async () => { 28 + let agent = new AtpAgent({ service: "https://public.api.bsky.app" }); 29 + let follows = await agent.app.bsky.graph.getFollows({ 30 + actor: event.data.did, 31 + limit: 100, 32 + cursor: cursor || undefined, 33 + }); 34 + if (!follows.success) 35 + throw new Error( 36 + "error during querying follows for: " + event.data.did, 37 + ); 38 + return { 39 + cursor: follows.data.cursor, 40 + follows: follows.data.follows.map((f) => f.did), 41 + }; 42 + }); 43 + pageNumber++; 44 + follows.push(...page.follows); 45 + cursor = page.cursor || null; 46 + if (!cursor) hasMore = false; 47 + } 48 + await step.run("check-if-identity-exists", async () => { 49 + let { data: exists } = await supabaseServerClient 50 + .from("identities") 51 + .select() 52 + .eq("atp_did", event.data.did); 53 + if (!exists) { 54 + let client = new Client({ connectionString: process.env.DB_URL }); 55 + let db = drizzle(client); 56 + await createIdentity(db, { atp_did: event.data.did }); 57 + client.end(); 58 + } 59 + }); 60 + let existingFollows: string[] = []; 61 + const batchSize = 100; 62 + let batchNumber = 0; 63 + 64 + // Create all check batches in parallel 65 + const checkBatches = []; 66 + for (let i = 0; i < follows.length; i += batchSize) { 67 + const batch = follows.slice(i, i + batchSize); 68 + checkBatches.push( 69 + step.run(`check-existing-follows-batch-${batchNumber}`, async () => { 70 + const { data: existingIdentities } = await supabaseServerClient 71 + .from("identities") 72 + .select("atp_did") 73 + .in("atp_did", batch); 74 + 75 + return existingIdentities?.map((identity) => identity.atp_did!) || []; 76 + }), 77 + ); 78 + batchNumber++; 79 + } 80 + 81 + // Wait for all check batches to complete 82 + const batchResults = await Promise.all(checkBatches); 83 + existingFollows = batchResults.flat(); 84 + 85 + // Filter follows to only include those that exist in identities table 86 + const insertBatchSize = 100; 87 + let insertBatchNumber = 0; 88 + 89 + await step.run("clear existing follows", () => { 90 + return supabaseServerClient 91 + .from("bsky_follows") 92 + .delete() 93 + .eq("identity", event.data.did); 94 + }); 95 + 96 + // Create all insert batches in parallel 97 + const insertBatches = []; 98 + for (let i = 0; i < existingFollows.length; i += insertBatchSize) { 99 + const batch = existingFollows.slice(i, i + insertBatchSize); 100 + insertBatches.push( 101 + step.run(`insert-follows-batch-${insertBatchNumber}`, async () => { 102 + const insertData = batch.map((f) => ({ 103 + identity: event.data.did, 104 + follows: f, 105 + })); 106 + 107 + await supabaseServerClient.from("bsky_follows").upsert(insertData); 108 + }), 109 + ); 110 + insertBatchNumber++; 111 + } 112 + 113 + // Wait for all insert batches to complete 114 + await Promise.all(insertBatches); 115 + return { 116 + done: true, 117 + }; 118 + }, 119 + );
+7 -2
app/api/inngest/route.tsx
··· 3 3 import { index_post_mention } from "./functions/index_post_mention"; 4 4 import { come_online } from "./functions/come_online"; 5 5 import { batched_update_profiles } from "./functions/batched_update_profiles"; 6 + import { index_follows } from "./functions/index_follows"; 6 7 7 - // Create an API that serves zero functions 8 8 export const { GET, POST, PUT } = serve({ 9 9 client: inngest, 10 - functions: [index_post_mention, come_online, batched_update_profiles], 10 + functions: [ 11 + index_post_mention, 12 + come_online, 13 + batched_update_profiles, 14 + index_follows, 15 + ], 11 16 });
+63 -41
feeds/index.ts
··· 4 4 import { parseReqNsid, verifyJwt } from "@atproto/xrpc-server"; 5 5 import { supabaseServerClient } from "supabase/serverClient"; 6 6 import { PubLeafletDocument } from "lexicons/api"; 7 + import { inngest } from "app/api/inngest/client"; 8 + import { AtUri } from "@atproto/api"; 7 9 8 10 const app = new Hono(); 9 11 ··· 27 29 28 30 app.get("/xrpc/app.bsky.feed.getFeedSkeleton", async (c) => { 29 31 let auth = await validateAuth(c.req, serviceDid); 30 - if (!auth) return c.json({ feed: [] }); 32 + let feed = c.req.query("feed"); 33 + if (!auth || !feed) return c.json({ feed: [] }); 31 34 let cursor = c.req.query("cursor"); 35 + let parsedCursor; 36 + if (cursor) { 37 + let date = cursor.split("::")[0]; 38 + let uri = cursor.split("::")[1]; 39 + parsedCursor = { date, uri }; 40 + } 32 41 let limit = parseInt(c.req.query("limit") || "10"); 33 - 34 - let { data: publications } = await supabaseServerClient 35 - .from("publication_subscriptions") 36 - .select(`publications(*, documents_in_publications(documents(*)))`) 37 - .eq("identity", auth); 38 - 39 - const allPosts = (publications || []) 40 - .flatMap((pub) => { 41 - let posts = pub.publications?.documents_in_publications || []; 42 - return posts; 43 - }) 44 - .sort((a, b) => { 45 - let aRecord = a.documents?.data! as PubLeafletDocument.Record; 46 - let bRecord = b.documents?.data! as PubLeafletDocument.Record; 47 - const aDate = aRecord.publishedAt 48 - ? new Date(aRecord.publishedAt) 49 - : new Date(0); 50 - const bDate = bRecord.publishedAt 51 - ? new Date(bRecord.publishedAt) 52 - : new Date(0); 53 - return bDate.getTime() - aDate.getTime(); // Sort by most recent first 54 - }); 42 + let feedAtURI = new AtUri(feed); 55 43 let posts; 56 - if (!cursor) { 57 - posts = allPosts.slice(0, 25); 44 + let query; 45 + if (feedAtURI.rkey === "bsky-follows-leaflets") { 46 + console.log(cursor); 47 + if (!cursor) { 48 + console.log("Sending event"); 49 + await inngest.send({ name: "feeds/index-follows", data: { did: auth } }); 50 + } 51 + query = supabaseServerClient 52 + .from("documents") 53 + .select( 54 + `*, 55 + documents_in_publications!inner( 56 + publications!inner(*, 57 + identities!publications_identity_did_fkey!inner( 58 + bsky_follows!bsky_follows_follows_fkey!inner(*) 59 + ) 60 + ) 61 + )`, 62 + ) 63 + .eq( 64 + "documents_in_publications.publications.identities.bsky_follows.identity", 65 + auth, 66 + ) 67 + .not("data -> postRef", "is", null) 68 + .order("indexed_at", { ascending: false }) 69 + .limit(25); 58 70 } else { 59 - let date = cursor.split("::")[0]; 60 - let uri = cursor.split("::")[1]; 61 - posts = allPosts 62 - .filter((p) => { 63 - if (!p.documents?.data) return false; 64 - let record = p.documents.data as PubLeafletDocument.Record; 65 - if (!record.publishedAt) return false; 66 - return record.publishedAt <= date && uri !== p.documents?.uri; 67 - }) 68 - .slice(0, 25); 71 + query = supabaseServerClient 72 + .from("documents") 73 + .select( 74 + `*, 75 + documents_in_publications!inner(publications!inner(*, publication_subscriptions!inner(*)))`, 76 + ) 77 + .eq( 78 + "documents_in_publications.publications.publication_subscriptions.identity", 79 + auth, 80 + ) 81 + .not("data -> postRef", "is", null) 82 + .order("indexed_at", { ascending: false }) 83 + .order("uri", { ascending: false }) 84 + .limit(25); 69 85 } 86 + if (parsedCursor) 87 + query.or( 88 + `indexed_at.lt.${parsedCursor.date},and(indexed_at.eq.${parsedCursor.date},uri.lt.${parsedCursor.uri})`, 89 + ); 90 + 91 + let { data } = await query; 92 + posts = data; 93 + 94 + posts = posts || []; 70 95 71 96 let lastPost = posts[posts.length - 1]; 72 - let lastRecord = lastPost?.documents?.data! as PubLeafletDocument.Record; 73 - let newCursor = lastRecord 74 - ? `${lastRecord.publishedAt}::${lastPost.documents?.uri}` 75 - : null; 97 + let newCursor = lastPost ? `${lastPost.indexed_at}::${lastPost.uri}` : null; 76 98 return c.json({ 77 99 cursor: newCursor || cursor, 78 100 feed: posts.flatMap((p) => { 79 - if (!p.documents?.data) return []; 80 - let record = p.documents.data as PubLeafletDocument.Record; 101 + if (!p.data) return []; 102 + let record = p.data as PubLeafletDocument.Record; 81 103 if (!record.postRef) return []; 82 104 return { post: record.postRef.uri }; 83 105 }),