The recipes.blue monorepo recipes.blue
recipes appview atproto

feat: cache handles in redis and continue improving robustness of the codebase

hayden.moe 9765ae9a f141c110

verified
+78 -65
+5 -2
apps/api/src/index.ts
··· 4 4 import { registerGetRecipe } from './xrpc/blue.recipes.feed.getRecipe.js'; 5 5 import { logMiddleware } from './logger.js'; 6 6 import pino from 'pino'; 7 + import { RedisClient } from 'bun'; 7 8 8 9 const logger = pino(); 10 + const redis = new RedisClient(Bun.env.REDIS_URL ?? "redis://127.0.0.1:6379/0"); 9 11 10 12 const router = new XRPCRouter({ 11 13 handleException: (err, _req) => { ··· 14 16 } else if (err instanceof Response) { 15 17 return err; 16 18 } else { 19 + logger.error({ err }, 'Exception thrown during request'); 17 20 return Response.json( 18 21 { error: 'InternalServerError', message: 'an exception happened whilst processing this request' }, 19 22 { status: 500 }, ··· 29 32 ], 30 33 }); 31 34 32 - registerGetRecipes(router, logger); 33 - registerGetRecipe(router, logger); 35 + registerGetRecipes(router, logger, redis); 36 + registerGetRecipe(router, logger, redis); 34 37 35 38 const server = Bun.serve({ 36 39 port: process.env.PORT || 3000,
+14 -40
apps/api/src/util/api.ts
··· 1 - import { Client } from '@atcute/client'; 2 - import { AppBskyActorProfile } from '@atcute/bluesky'; 3 - import type { BlueRecipesFeedDefs } from '@cookware/lexicons'; 4 - 5 1 import type {} from '@atcute/atproto'; 6 - import { isBlob, isLegacyBlob } from '@atcute/lexicons/interfaces'; 7 2 import { CompositeDidDocumentResolver, PlcDidDocumentResolver, WebDidDocumentResolver } from '@atcute/identity-resolver'; 8 3 import { CompositeHandleResolver, DohJsonHandleResolver, WellKnownHandleResolver } from '@atcute/identity-resolver'; 9 - import { ActorIdentifier, AtprotoDid, isHandle } from '@atcute/lexicons/syntax'; 4 + import { ActorIdentifier, AtprotoDid, Handle, isHandle } from '@atcute/lexicons/syntax'; 10 5 import { isAtprotoDid } from '@atcute/identity'; 6 + import { RedisClient } from 'bun'; 11 7 12 8 const handleResolver = new CompositeHandleResolver({ 13 9 strategy: 'race', ··· 24 20 } 25 21 }); 26 22 23 + const HANDLE_CACHE_TTL = 5 * 60; // 5 minutes 24 + 27 25 export const parseDid = async (id: ActorIdentifier): Promise<AtprotoDid> => { 28 26 if (isAtprotoDid(id)) return id; 29 27 if (isHandle(id)) { ··· 32 30 throw Error("Invalid DID or Handle!"); 33 31 } 34 32 35 - export const getAuthorInfo = async ( 36 - did: AtprotoDid, 37 - rpc: Client, 38 - ): Promise<BlueRecipesFeedDefs.AuthorInfo> => { 39 - const author = await didResolver.resolve(did); 40 - if (author.alsoKnownAs === undefined || author.alsoKnownAs.length < 1) { 41 - throw new Error('DID Document contained no `alsoKnownAs`!'); 42 - } 43 - 44 - const { ok, data } = await rpc.get('com.atproto.repo.getRecord', { 45 - params: { 46 - repo: did, 47 - collection: 'app.bsky.actor.profile', 48 - rkey: 'self', 49 - }, 50 - }); 51 - 52 - if (!ok) throw new Error(`Failed to query Bluesky profile: ${data.error}`); 53 - 54 - const profile = data.value as AppBskyActorProfile.Main; 55 - 56 - let info: BlueRecipesFeedDefs.AuthorInfo = { 57 - did: did, 58 - handle: author.alsoKnownAs[0]!.substring(5) as string, 59 - displayName: profile.displayName, 60 - }; 61 - 62 - if (profile.avatar) { 63 - if (isBlob(profile.avatar)) { 64 - info['avatarUrl'] = `https://cdn.bsky.app/img/avatar_thumbnail/plain/${did}/${profile.avatar.ref.$link}@jpeg`; 65 - } else if (isLegacyBlob(profile.avatar)) { 66 - info.avatarUrl = `https://cdn.bsky.app/img/avatar_thumbnail/plain/${did}/${profile.avatar.cid}@jpeg` 33 + export const getHandle = async (did: AtprotoDid, redis: RedisClient): Promise<Handle> => { 34 + let handle = await redis.get(`handle:${did}`) as Handle | null; 35 + if (!handle) { 36 + const didDoc = await didResolver.resolve(did); 37 + if (didDoc.alsoKnownAs == null || didDoc.alsoKnownAs.length < 1) { 38 + throw new Error(`User ${did} had no resolvable DID document.`); 67 39 } 40 + handle = didDoc.alsoKnownAs[0]!.substring(5) as Handle; 41 + redis.setex(`handle:${did}`, HANDLE_CACHE_TTL, handle); 68 42 } 69 43 70 - return info; 71 - }; 44 + return handle; 45 + }
+2 -1
apps/api/src/xrpc/blue.recipes.feed.getRecipe.ts
··· 6 6 import { parseResourceUri, ResourceUri } from '@atcute/lexicons'; 7 7 import { recipeTable } from '@cookware/database/schema'; 8 8 import { isLegacyBlob } from '@atcute/lexicons/interfaces'; 9 + import { RedisClient } from 'bun'; 9 10 10 11 const invalidUriError = (uri: string) => new XRPCError({ 11 12 status: 400, ··· 13 14 description: `The provided URI is invalid: ${uri}`, 14 15 }); 15 16 16 - export const registerGetRecipe = (router: XRPCRouter, _logger: Logger) => { 17 + export const registerGetRecipe = (router: XRPCRouter, _logger: Logger, _redis: RedisClient) => { 17 18 router.addQuery(BlueRecipesFeedGetRecipe.mainSchema, { 18 19 async handler({ params: { uris } }) { 19 20 const whereClauses = [];
+7 -8
apps/api/src/xrpc/blue.recipes.feed.getRecipes.ts
··· 2 2 import { recipeTable } from '@cookware/database/schema'; 3 3 import { BlueRecipesFeedGetRecipes, BlueRecipesFeedRecipe } from '@cookware/lexicons'; 4 4 import { json, XRPCRouter } from '@atcute/xrpc-server'; 5 - import { parseDid } from '../util/api.js'; 6 - import { Handle, ResourceUri } from '@atcute/lexicons/syntax'; 5 + import { getHandle, parseDid } from '../util/api.js'; 6 + import { ResourceUri } from '@atcute/lexicons/syntax'; 7 7 import { Logger } from 'pino'; 8 8 import { isLegacyBlob } from '@atcute/lexicons/interfaces'; 9 + import { RedisClient } from 'bun'; 9 10 10 - export const registerGetRecipes = (router: XRPCRouter, _logger: Logger) => { 11 + export const registerGetRecipes = (router: XRPCRouter, _logger: Logger, redis: RedisClient) => { 11 12 router.addQuery(BlueRecipesFeedGetRecipes.mainSchema, { 12 13 async handler({ params: { author, limit, cursor } }) { 13 14 const whereClauses = []; ··· 30 31 }, 31 32 }); 32 33 33 - console.log(recipes); 34 - 35 34 let nextCursor = ''; 36 35 if (recipes.length == limit) { 37 36 const { createdAt } = recipes[limit - 1]!; ··· 40 39 41 40 return json({ 42 41 nextCursor, 43 - recipes: recipes.map((recipe) => ({ 42 + recipes: await Promise.all(recipes.map(async (recipe) => ({ 44 43 author: { 45 44 did: recipe.author.did, 46 - handle: 'hayden.moe', 45 + handle: await getHandle(recipe.author.did, redis), 47 46 displayName: recipe.author.displayName, 48 47 avatar: isLegacyBlob(recipe.author.avatarRef) ? undefined : recipe.author.avatarRef ?? undefined, 49 48 pronouns: recipe.author.pronouns ?? undefined, ··· 63 62 createdAt: recipe.createdAt.toISOString(), 64 63 }, 65 64 uri: recipe.uri as ResourceUri, 66 - })), 65 + }))), 67 66 }); 68 67 } 69 68 });
+1
apps/ingester/package.json
··· 16 16 "dependencies": { 17 17 "@atcute/client": "catalog:", 18 18 "@atcute/identity": "^1.1.3", 19 + "@atcute/identity-resolver": "^1.1.4", 19 20 "@atcute/jetstream": "^1.1.2", 20 21 "@atcute/lexicons": "catalog:", 21 22 "@badrap/valita": "^0.4.6",
+3 -1
apps/ingester/src/config.ts
··· 4 4 TURSO_CONNECTION_URL: v.string().optional(() => 'https://turso.dev.hayden.moe'), 5 5 TURSO_AUTH_TOKEN: v.string().optional(), 6 6 7 + REDIS_URL: v.string().optional(() => 'redis://localhost:6379/0'), 8 + 7 9 JETSTREAM_ENDPOINT: v.string() 8 - .optional(() => 'wss://jetstream2.us-east.bsky.network'), 10 + .optional(() => 'wss://jetstream1.us-east.bsky.network'), 9 11 PLC_DIRECTORY_URL: v.string().optional(() => 'https://plc.directory'), 10 12 11 13 ENV: v
+10 -6
apps/ingester/src/index.ts
··· 14 14 15 15 const subscription = new JetstreamSubscription({ 16 16 url: env.JETSTREAM_ENDPOINT, 17 - wantedCollections: ['blue.recipes.*'], 18 - cursor: 0, 17 + wantedCollections: [ 18 + 'blue.recipes.feed.recipe', 19 + 'blue.recipes.actor.profile', 20 + ], 19 21 onConnectionOpen: () => logger.info('Connected to Jetstream'), 20 22 onConnectionError: err => { 21 23 logger.error(err, 'Failed to connect to Jetstream'); ··· 27 29 return { 28 30 subscription, 29 31 start: async () => { 32 + const redis = new Bun.RedisClient(env.REDIS_URL); 33 + 30 34 for await (const event of subscription) { 31 35 const authorDid = event.did; 32 36 if (!isAtprotoDid(authorDid)) { ··· 38 42 const commit = event.commit; 39 43 switch (commit.collection) { 40 44 case BlueRecipesFeedRecipe.mainSchema.object.shape.$type.expected: 41 - ingestRecipe(authorDid, commit, logger); 45 + await ingestRecipe(authorDid, commit, logger); 42 46 break; 43 47 case BlueRecipesActorProfile.mainSchema.object.shape.$type.expected: 44 - ingestProfile(authorDid, commit, logger); 48 + await ingestProfile(authorDid, commit, logger, redis); 45 49 break; 46 50 default: 47 - logger.trace({ collection: commit.collection }, "skipping unknown collection"); 51 + logger.debug({ collection: commit.collection }, "skipping unknown collection"); 48 52 break; 49 53 } 50 54 } else { ··· 56 60 } 57 61 }; 58 62 59 - newIngester().start(); 63 + await newIngester().start();
+27 -3
apps/ingester/src/ingesters/profile.ts
··· 1 - import { db, and, eq } from "@cookware/database"; 1 + import { db, eq } from "@cookware/database"; 2 2 import { profilesTable } from "@cookware/database/schema"; 3 3 import { is } from '@atcute/lexicons'; 4 4 import { BlueRecipesActorProfile } from "@cookware/lexicons"; 5 5 import { CommitOperation } from "@atcute/jetstream"; 6 6 import { Logger } from "pino"; 7 7 import { AtprotoDid } from "@atcute/lexicons/syntax"; 8 + import { RedisClient } from "bun"; 9 + import { CompositeDidDocumentResolver, PlcDidDocumentResolver, WebDidDocumentResolver } from "@atcute/identity-resolver"; 10 + import env from "../config.js"; 8 11 9 - export const ingestProfile = (did: AtprotoDid, commit: CommitOperation, logger: Logger) => { 12 + const didResolver = new CompositeDidDocumentResolver({ 13 + methods: { 14 + plc: new PlcDidDocumentResolver({ apiUrl: env.PLC_DIRECTORY_URL }), 15 + web: new WebDidDocumentResolver(), 16 + } 17 + }); 18 + 19 + const HANDLE_CACHE_TTL = 5 * 60; // 5 minutes 20 + 21 + export const ingestProfile = async (did: AtprotoDid, commit: CommitOperation, logger: Logger, redis: RedisClient) => { 10 22 if (commit.operation == 'create' || commit.operation == 'update') { 11 23 const { rkey, record, cid } = commit; 12 24 ··· 19 31 return; 20 32 } 21 33 22 - db 34 + // Preemptively cache the user's handle for the API 35 + let handle = await redis.get(`handle:${did}`); 36 + if (!handle) { 37 + const didDoc = await didResolver.resolve(did); 38 + if (didDoc.alsoKnownAs == null || didDoc.alsoKnownAs.length < 1) { 39 + logger.warn(`User ${did} had no resolvable DID document.`); 40 + return; 41 + } 42 + handle = didDoc.alsoKnownAs[0]!.substring(5); 43 + redis.setex(`handle:${did}`, HANDLE_CACHE_TTL, handle); 44 + } 45 + 46 + await db 23 47 .insert(profilesTable) 24 48 .values({ 25 49 cid,
+2 -2
apps/ingester/src/ingesters/recipe.ts
··· 6 6 import { Logger } from "pino"; 7 7 import { AtprotoDid } from "@atcute/lexicons/syntax"; 8 8 9 - export const ingestRecipe = (did: AtprotoDid, commit: CommitOperation, logger: Logger) => { 9 + export const ingestRecipe = async (did: AtprotoDid, commit: CommitOperation, logger: Logger) => { 10 10 if (commit.operation == 'create' || commit.operation == 'update') { 11 11 const { rkey, record, cid } = commit; 12 12 ··· 15 15 return; 16 16 } 17 17 18 - db 18 + await db 19 19 .insert(recipeTable) 20 20 .values({ 21 21 cid, rkey, did,
+1
bun.lock
··· 37 37 "dependencies": { 38 38 "@atcute/client": "catalog:", 39 39 "@atcute/identity": "^1.1.3", 40 + "@atcute/identity-resolver": "^1.1.4", 40 41 "@atcute/jetstream": "^1.1.2", 41 42 "@atcute/lexicons": "catalog:", 42 43 "@badrap/valita": "^0.4.6",
+4
docker-compose.yaml
··· 7 7 recipesblue: 8 8 9 9 services: 10 + redis: 11 + image: redis:8 12 + ports: [6379:6379] 13 + 10 14 api: 11 15 build: 12 16 context: .
+2 -2
libs/database/lib/schema.ts
··· 38 38 } as LegacyBlob; 39 39 } else if (value.startsWith('b:')) { 40 40 if (parts.length !== 4) throw new Error('Invalid blob ref format'); 41 - if (!isCidLink(parts[0])) throw new Error('Invalid CID link in blob ref'); 42 - if (isNaN(parseInt(parts[2]!, 10))) throw new Error('Invalid size in blob ref'); 41 + if (!isCidLink({ $link: parts[1] })) throw new Error('Invalid CID link in blob ref'); 42 + if (isNaN(parseInt(parts[3]!, 10))) throw new Error('Invalid size in blob ref'); 43 43 44 44 return { 45 45 $type: 'blob',