Testing implementation for private data in ATProto with ATPKeyserver and ATCute tools

move cache methods to inbox method

+624 -292
+1
bun.lock
··· 71 71 "name": "@watproto/server", 72 72 "version": "0.0.1", 73 73 "dependencies": { 74 + "@atcute/client": "^4.2.1", 74 75 "@atcute/crypto": "^2.2.6", 75 76 "@atcute/identity": "^1.1.3", 76 77 "@atcute/identity-resolver": "^1.1.4",
-36
lexicons/app/wafrn/content/cachePost.json
··· 1 - { 2 - "lexicon": 1, 3 - "id": "app.wafrn.content.cachePost", 4 - "defs": { 5 - "main": { 6 - "type": "procedure", 7 - "description": "Cache a post supporting different privacy options", 8 - "input": { 9 - "encoding": "application/json", 10 - "schema": { 11 - "type": "object", 12 - "required": ["post"], 13 - "properties": { 14 - "post": { 15 - "type": "union", 16 - "refs": [ 17 - "app.wafrn.content.defs#publicPostView", 18 - "app.wafrn.content.defs#privatePostView" 19 - ] 20 - } 21 - } 22 - } 23 - }, 24 - "output": { 25 - "encoding": "application/json", 26 - "schema": { 27 - "type": "object", 28 - "required": ["indexedAt"], 29 - "properties": { 30 - "indexedAt": { "type": "string", "format": "datetime" } 31 - } 32 - } 33 - } 34 - } 35 - } 36 - }
+104
lexicons/app/wafrn/content/inbox.json
··· 1 + { 2 + "lexicon": 1, 3 + "id": "app.wafrn.content.inbox", 4 + "defs": { 5 + "main": { 6 + "type": "procedure", 7 + "description": "Process records from user repositories. Accepts AT-URIs and fetches records from PDS for indexing.", 8 + "input": { 9 + "encoding": "application/json", 10 + "schema": { 11 + "type": "object", 12 + "required": ["items"], 13 + "properties": { 14 + "items": { 15 + "type": "array", 16 + "description": "Array of items to process", 17 + "items": { 18 + "type": "ref", 19 + "ref": "#inboxItem" 20 + }, 21 + "maxLength": 100 22 + } 23 + } 24 + } 25 + }, 26 + "output": { 27 + "encoding": "application/json", 28 + "schema": { 29 + "type": "object", 30 + "required": ["results"], 31 + "properties": { 32 + "results": { 33 + "type": "array", 34 + "description": "Results for each processed item", 35 + "items": { 36 + "type": "ref", 37 + "ref": "#inboxResult" 38 + } 39 + } 40 + } 41 + } 42 + }, 43 + "errors": [ 44 + { 45 + "name": "InvalidUri", 46 + "description": "AT-URI is malformed or invalid" 47 + }, 48 + { 49 + "name": "RecordNotFound", 50 + "description": "Record does not exist at the specified URI" 51 + }, 52 + { 53 + "name": "UnsupportedCollection", 54 + "description": "Record collection type is not supported" 55 + }, 56 + { 57 + "name": "Unauthorized", 58 + "description": "User is not authorized to index this record" 59 + } 60 + ] 61 + }, 62 + "inboxItem": { 63 + "type": "object", 64 + "required": ["uri", "action"], 65 + "properties": { 66 + "uri": { 67 + "type": "string", 68 + "format": "at-uri", 69 + "description": "AT-URI of the record to process" 70 + }, 71 + "action": { 72 + "type": "string", 73 + "description": "Action to perform on the record", 74 + "knownValues": ["create", "update", "delete"] 75 + } 76 + } 77 + }, 78 + "inboxResult": { 79 + "type": "object", 80 + "required": ["uri", "status"], 81 + "properties": { 82 + "uri": { 83 + "type": "string", 84 + "format": "at-uri", 85 + "description": "AT-URI that was processed" 86 + }, 87 + "status": { 88 + "type": "string", 89 + "description": "Processing status", 90 + "knownValues": ["success", "error"] 91 + }, 92 + "error": { 93 + "type": "string", 94 + "description": "Error message if status is error" 95 + }, 96 + "indexedAt": { 97 + "type": "string", 98 + "format": "datetime", 99 + "description": "Timestamp when the record was indexed (only for success)" 100 + } 101 + } 102 + } 103 + } 104 + }
-50
lexicons/app/wafrn/graph/cacheFollow.json
··· 1 - { 2 - "lexicon": 1, 3 - "id": "app.wafrn.graph.cacheFollow", 4 - "defs": { 5 - "main": { 6 - "type": "procedure", 7 - "description": "Cache a follow record in the AppView", 8 - "input": { 9 - "encoding": "application/json", 10 - "schema": { 11 - "type": "object", 12 - "required": ["follower", "followee", "uri", "createdAt"], 13 - "properties": { 14 - "follower": { 15 - "type": "string", 16 - "format": "did", 17 - "description": "DID of the follower" 18 - }, 19 - "followee": { 20 - "type": "string", 21 - "format": "did", 22 - "description": "DID of the account being followed" 23 - }, 24 - "uri": { 25 - "type": "string", 26 - "format": "at-uri", 27 - "description": "URI of the follow record" 28 - }, 29 - "createdAt": { 30 - "type": "string", 31 - "format": "datetime" 32 - } 33 - } 34 - } 35 - }, 36 - "output": { 37 - "encoding": "application/json", 38 - "schema": { 39 - "type": "object", 40 - "required": ["indexed"], 41 - "properties": { 42 - "indexed": { 43 - "type": "boolean" 44 - } 45 - } 46 - } 47 - } 48 - } 49 - } 50 - }
+8 -6
packages/client/app/lib/follow.server.ts
··· 57 57 }) 58 58 ) 59 59 60 - // Cache follow on server 60 + // Index follow in AppView using inbox 61 61 await ok( 62 - serverClient.post('app.wafrn.graph.cacheFollow', { 62 + serverClient.post('app.wafrn.content.inbox', { 63 63 input: { 64 - follower: session.did, 65 - followee: followeeDid, 66 - uri, 67 - createdAt: record.createdAt 64 + items: [ 65 + { 66 + uri, 67 + action: 'create' 68 + } 69 + ] 68 70 } 69 71 }) 70 72 )
+18 -16
packages/client/app/lib/post.server.ts
··· 73 73 } 74 74 }) 75 75 ) 76 + 77 + // Index in AppView using inbox 76 78 ok( 77 - await serverClient.post('app.wafrn.content.cachePost', { 79 + await serverClient.post('app.wafrn.content.inbox', { 78 80 input: { 79 - post: { 80 - $type: 'app.wafrn.content.defs#publicPostView', 81 - createdAt: validRecord.createdAt, 82 - content: validRecord.content, 83 - uri: recordMeta.uri 84 - } 81 + items: [ 82 + { 83 + uri: recordMeta.uri, 84 + action: 'create' 85 + } 86 + ] 85 87 } 86 88 }) 87 89 ) ··· 130 132 } 131 133 }) 132 134 ) 135 + 136 + // Index in AppView using inbox 133 137 await ok( 134 - serverClient.post('app.wafrn.content.cachePost', { 138 + serverClient.post('app.wafrn.content.inbox', { 135 139 input: { 136 - post: { 137 - $type: 'app.wafrn.content.defs#privatePostView', 138 - createdAt: validRecord.createdAt, 139 - encryptedContent: validRecord.encryptedContent, 140 - keyVersion: validRecord.keyVersion, 141 - visibility: validRecord.visibility, 142 - uri: recordMeta.uri 143 - } 140 + items: [ 141 + { 142 + uri: recordMeta.uri, 143 + action: 'create' 144 + } 145 + ] 144 146 } 145 147 }) 146 148 )
-1
packages/client/app/lib/profile.server.ts
··· 1 1 import type { Did, Handle } from '@atcute/lexicons' 2 2 import { 3 - getPublicAgent, 4 3 getPublicServiceAgent, 5 4 getServiceAgent, 6 5 getSessionAgent,
+1 -2
packages/lexicon/index.ts
··· 2 2 export * as AppWafrnActorDefs from "./types/app/wafrn/actor/defs.js"; 3 3 export * as AppWafrnActorGetProfiles from "./types/app/wafrn/actor/getProfiles.js"; 4 4 export * as AppWafrnActorProfile from "./types/app/wafrn/actor/profile.js"; 5 - export * as AppWafrnContentCachePost from "./types/app/wafrn/content/cachePost.js"; 6 5 export * as AppWafrnContentDefs from "./types/app/wafrn/content/defs.js"; 7 6 export * as AppWafrnContentDeletePost from "./types/app/wafrn/content/deletePost.js"; 8 7 export * as AppWafrnContentGetFeed from "./types/app/wafrn/content/getFeed.js"; 8 + export * as AppWafrnContentInbox from "./types/app/wafrn/content/inbox.js"; 9 9 export * as AppWafrnContentPrivatePost from "./types/app/wafrn/content/privatePost.js"; 10 10 export * as AppWafrnContentPublicPost from "./types/app/wafrn/content/publicPost.js"; 11 - export * as AppWafrnGraphCacheFollow from "./types/app/wafrn/graph/cacheFollow.js"; 12 11 export * as AppWafrnGraphDeleteFollow from "./types/app/wafrn/graph/deleteFollow.js"; 13 12 export * as AppWafrnGraphFollow from "./types/app/wafrn/graph/follow.js"; 14 13 export * as AppWafrnGraphGetFollowers from "./types/app/wafrn/graph/getFollowers.js";
-41
packages/lexicon/types/app/wafrn/content/cachePost.ts
··· 1 - import type {} from "@atcute/lexicons"; 2 - import * as v from "@atcute/lexicons/validations"; 3 - import type {} from "@atcute/lexicons/ambient"; 4 - import * as AppWafrnContentDefs from "./defs.js"; 5 - 6 - const _mainSchema = /*#__PURE__*/ v.procedure("app.wafrn.content.cachePost", { 7 - params: null, 8 - input: { 9 - type: "lex", 10 - schema: /*#__PURE__*/ v.object({ 11 - get post() { 12 - return /*#__PURE__*/ v.variant([ 13 - AppWafrnContentDefs.privatePostViewSchema, 14 - AppWafrnContentDefs.publicPostViewSchema, 15 - ]); 16 - }, 17 - }), 18 - }, 19 - output: { 20 - type: "lex", 21 - schema: /*#__PURE__*/ v.object({ 22 - indexedAt: /*#__PURE__*/ v.datetimeString(), 23 - }), 24 - }, 25 - }); 26 - 27 - type main$schematype = typeof _mainSchema; 28 - 29 - export interface mainSchema extends main$schematype {} 30 - 31 - export const mainSchema = _mainSchema as mainSchema; 32 - 33 - export interface $params {} 34 - export interface $input extends v.InferXRPCBodyInput<mainSchema["input"]> {} 35 - export interface $output extends v.InferXRPCBodyInput<mainSchema["output"]> {} 36 - 37 - declare module "@atcute/lexicons/ambient" { 38 - interface XRPCProcedures { 39 - "app.wafrn.content.cachePost": mainSchema; 40 - } 41 - }
+94
packages/lexicon/types/app/wafrn/content/inbox.ts
··· 1 + import type {} from "@atcute/lexicons"; 2 + import * as v from "@atcute/lexicons/validations"; 3 + import type {} from "@atcute/lexicons/ambient"; 4 + 5 + const _inboxItemSchema = /*#__PURE__*/ v.object({ 6 + $type: /*#__PURE__*/ v.optional( 7 + /*#__PURE__*/ v.literal("app.wafrn.content.inbox#inboxItem"), 8 + ), 9 + /** 10 + * Action to perform on the record 11 + */ 12 + action: /*#__PURE__*/ v.string< 13 + "create" | "delete" | "update" | (string & {}) 14 + >(), 15 + /** 16 + * AT-URI of the record to process 17 + */ 18 + uri: /*#__PURE__*/ v.resourceUriString(), 19 + }); 20 + const _inboxResultSchema = /*#__PURE__*/ v.object({ 21 + $type: /*#__PURE__*/ v.optional( 22 + /*#__PURE__*/ v.literal("app.wafrn.content.inbox#inboxResult"), 23 + ), 24 + /** 25 + * Error message if status is error 26 + */ 27 + error: /*#__PURE__*/ v.optional(/*#__PURE__*/ v.string()), 28 + /** 29 + * Timestamp when the record was indexed (only for success) 30 + */ 31 + indexedAt: /*#__PURE__*/ v.optional(/*#__PURE__*/ v.datetimeString()), 32 + /** 33 + * Processing status 34 + */ 35 + status: /*#__PURE__*/ v.string<"error" | "success" | (string & {})>(), 36 + /** 37 + * AT-URI that was processed 38 + */ 39 + uri: /*#__PURE__*/ v.resourceUriString(), 40 + }); 41 + const _mainSchema = /*#__PURE__*/ v.procedure("app.wafrn.content.inbox", { 42 + params: null, 43 + input: { 44 + type: "lex", 45 + schema: /*#__PURE__*/ v.object({ 46 + /** 47 + * Array of items to process 48 + * @maxLength 100 49 + */ 50 + get items() { 51 + return /*#__PURE__*/ v.constrain( 52 + /*#__PURE__*/ v.array(inboxItemSchema), 53 + [/*#__PURE__*/ v.arrayLength(0, 100)], 54 + ); 55 + }, 56 + }), 57 + }, 58 + output: { 59 + type: "lex", 60 + schema: /*#__PURE__*/ v.object({ 61 + /** 62 + * Results for each processed item 63 + */ 64 + get results() { 65 + return /*#__PURE__*/ v.array(inboxResultSchema); 66 + }, 67 + }), 68 + }, 69 + }); 70 + 71 + type inboxItem$schematype = typeof _inboxItemSchema; 72 + type inboxResult$schematype = typeof _inboxResultSchema; 73 + type main$schematype = typeof _mainSchema; 74 + 75 + export interface inboxItemSchema extends inboxItem$schematype {} 76 + export interface inboxResultSchema extends inboxResult$schematype {} 77 + export interface mainSchema extends main$schematype {} 78 + 79 + export const inboxItemSchema = _inboxItemSchema as inboxItemSchema; 80 + export const inboxResultSchema = _inboxResultSchema as inboxResultSchema; 81 + export const mainSchema = _mainSchema as mainSchema; 82 + 83 + export interface InboxItem extends v.InferInput<typeof inboxItemSchema> {} 84 + export interface InboxResult extends v.InferInput<typeof inboxResultSchema> {} 85 + 86 + export interface $params {} 87 + export interface $input extends v.InferXRPCBodyInput<mainSchema["input"]> {} 88 + export interface $output extends v.InferXRPCBodyInput<mainSchema["output"]> {} 89 + 90 + declare module "@atcute/lexicons/ambient" { 91 + interface XRPCProcedures { 92 + "app.wafrn.content.inbox": mainSchema; 93 + } 94 + }
-47
packages/lexicon/types/app/wafrn/graph/cacheFollow.ts
··· 1 - import type {} from "@atcute/lexicons"; 2 - import * as v from "@atcute/lexicons/validations"; 3 - import type {} from "@atcute/lexicons/ambient"; 4 - 5 - const _mainSchema = /*#__PURE__*/ v.procedure("app.wafrn.graph.cacheFollow", { 6 - params: null, 7 - input: { 8 - type: "lex", 9 - schema: /*#__PURE__*/ v.object({ 10 - createdAt: /*#__PURE__*/ v.datetimeString(), 11 - /** 12 - * DID of the account being followed 13 - */ 14 - followee: /*#__PURE__*/ v.didString(), 15 - /** 16 - * DID of the follower 17 - */ 18 - follower: /*#__PURE__*/ v.didString(), 19 - /** 20 - * URI of the follow record 21 - */ 22 - uri: /*#__PURE__*/ v.resourceUriString(), 23 - }), 24 - }, 25 - output: { 26 - type: "lex", 27 - schema: /*#__PURE__*/ v.object({ 28 - indexed: /*#__PURE__*/ v.boolean(), 29 - }), 30 - }, 31 - }); 32 - 33 - type main$schematype = typeof _mainSchema; 34 - 35 - export interface mainSchema extends main$schematype {} 36 - 37 - export const mainSchema = _mainSchema as mainSchema; 38 - 39 - export interface $params {} 40 - export interface $input extends v.InferXRPCBodyInput<mainSchema["input"]> {} 41 - export interface $output extends v.InferXRPCBodyInput<mainSchema["output"]> {} 42 - 43 - declare module "@atcute/lexicons/ambient" { 44 - interface XRPCProcedures { 45 - "app.wafrn.graph.cacheFollow": mainSchema; 46 - } 47 - }
+1
packages/server/package.json
··· 12 12 "typecheck": "bunx --bun tsc --noEmit" 13 13 }, 14 14 "dependencies": { 15 + "@atcute/client": "^4.2.1", 15 16 "@atcute/crypto": "^2.2.6", 16 17 "@atcute/identity": "^1.1.3", 17 18 "@atcute/identity-resolver": "^1.1.4",
+6
packages/server/src/lib/idResolver.ts
··· 2 2 CompositeDidDocumentResolver, 3 3 CompositeHandleResolver, 4 4 DohJsonHandleResolver, 5 + LocalActorResolver, 5 6 PlcDidDocumentResolver, 6 7 WebDidDocumentResolver, 7 8 WellKnownHandleResolver ··· 25 26 plc: new PlcDidDocumentResolver(), 26 27 web: new WebDidDocumentResolver() 27 28 } 29 + }) 30 + 31 + export const actorResolver = new LocalActorResolver({ 32 + handleResolver, 33 + didDocumentResolver: didDocResolver 28 34 }) 29 35 30 36 // Types for keyserver resolution
+67
packages/server/src/lib/inbox-operations/cacheFollow.ts
··· 1 + import { db } from '@api/db/db' 2 + import type { Did } from '@atcute/lexicons' 3 + import type { AppWafrnGraphFollow } from '@watproto/lexicon' 4 + import { sql } from 'kysely' 5 + 6 + /** 7 + * Cache a follow relationship in the database 8 + * The follower is the repo (from URI), the followee is the subject 9 + */ 10 + export default async function cacheFollow( 11 + uri: string, 12 + followerDid: Did, 13 + record: AppWafrnGraphFollow.Main 14 + ) { 15 + const createdAt = new Date(record.createdAt).getTime() 16 + const followeeDid = record.subject 17 + 18 + await db.transaction().execute(async (trx) => { 19 + // Step 1: Insert follow record (idempotent with onConflict) 20 + await trx 21 + .insertInto('follows') 22 + .values({ 23 + follower_did: followerDid, 24 + followee_did: followeeDid, 25 + uri, 26 + created_at: createdAt 27 + }) 28 + .onConflict((oc) => 29 + oc.columns(['follower_did', 'followee_did']).doNothing() 30 + ) 31 + .execute() 32 + 33 + // Step 2: Update following_count for follower 34 + await trx 35 + .insertInto('follow_counts') 36 + .values({ 37 + did: followerDid, 38 + following_count: 1, 39 + follower_count: 0 40 + }) 41 + .onConflict((oc) => 42 + oc.column('did').doUpdateSet({ 43 + following_count: sql`following_count + 1`, 44 + updated_at: sql`(unixepoch() * 1000)` 45 + }) 46 + ) 47 + .execute() 48 + 49 + // Step 3: Update follower_count for followee 50 + await trx 51 + .insertInto('follow_counts') 52 + .values({ 53 + did: followeeDid, 54 + follower_count: 1, 55 + following_count: 0 56 + }) 57 + .onConflict((oc) => 58 + oc.column('did').doUpdateSet({ 59 + follower_count: sql`follower_count + 1`, 60 + updated_at: sql`(unixepoch() * 1000)` 61 + }) 62 + ) 63 + .execute() 64 + }) 65 + 66 + return new Date().toISOString() 67 + }
+27
packages/server/src/lib/inbox-operations/cachePrivatePost.ts
··· 1 + import { db } from '@api/db/db' 2 + import type { Did } from '@atcute/lexicons' 3 + import type { AppWafrnContentPrivatePost } from '@watproto/lexicon' 4 + 5 + /** 6 + * Cache a private post in the database 7 + */ 8 + export default async function cachePrivatePost( 9 + uri: string, 10 + authorDid: Did, 11 + record: AppWafrnContentPrivatePost.Main 12 + ) { 13 + const row = await db 14 + .insertInto('private_posts') 15 + .values({ 16 + author_did: authorDid, 17 + uri, 18 + encrypted_content: record.encryptedContent, 19 + key_version: record.keyVersion, 20 + created_at: new Date(record.createdAt).getTime(), 21 + visibility: record.visibility 22 + }) 23 + .returning(['indexed_at']) 24 + .executeTakeFirstOrThrow() 25 + 26 + return new Date(row.indexed_at).toISOString() 27 + }
+49
packages/server/src/lib/inbox-operations/cachePublicPost.ts
··· 1 + import { db } from '@api/db/db' 2 + import type { Did } from '@atcute/lexicons' 3 + import type { AppWafrnContentPublicPost } from '@watproto/lexicon' 4 + 5 + /** 6 + * Cache a public post in the database 7 + */ 8 + export default async function cachePublicPost( 9 + uri: string, 10 + authorDid: Did, 11 + record: AppWafrnContentPublicPost.Main 12 + ) { 13 + const row = await db 14 + .insertInto('public_posts') 15 + .values({ 16 + author_did: authorDid, 17 + uri, 18 + content_html: record.content.contentHTML ?? '', 19 + content_markdown: record.content.contentMarkdown ?? '', 20 + content_warning: record.content.contentWarning, 21 + created_at: new Date(record.createdAt).getTime() 22 + }) 23 + .returning(['indexed_at']) 24 + .executeTakeFirstOrThrow() 25 + 26 + // Insert tags if provided 27 + if (record.content.tags && record.content.tags.length > 0) { 28 + // Batch upsert all tags 29 + const tagRecords = record.content.tags.map((tagName) => ({ 30 + name: tagName 31 + })) 32 + 33 + await db 34 + .insertInto('tags') 35 + .values(tagRecords) 36 + .onConflict((oc) => oc.column('name').doNothing()) 37 + .execute() 38 + 39 + // Insert into junction table 40 + const tagAssociations = record.content.tags.map((tagName) => ({ 41 + post_uri: uri, 42 + tag_name: tagName 43 + })) 44 + 45 + await db.insertInto('public_post_tags').values(tagAssociations).execute() 46 + } 47 + 48 + return new Date(row.indexed_at).toISOString() 49 + }
+43
packages/server/src/lib/inbox-operations/uncacheFollow.ts
··· 1 + import { db } from '@api/db/db' 2 + import { sql } from 'kysely' 3 + 4 + /** 5 + * Delete a follow relationship from the database 6 + */ 7 + export default async function uncacheFollow(uri: string): Promise<void> { 8 + // Find the follow record by URI 9 + const followRecord = await db 10 + .selectFrom('follows') 11 + .select(['follower_did', 'followee_did']) 12 + .where('uri', '=', uri) 13 + .executeTakeFirst() 14 + 15 + if (!followRecord) { 16 + throw new Error(`Follow record not found: ${uri}`) 17 + } 18 + 19 + await db.transaction().execute(async (trx) => { 20 + // Delete follow record 21 + await trx.deleteFrom('follows').where('uri', '=', uri).execute() 22 + 23 + // Decrement following_count for follower 24 + await trx 25 + .updateTable('follow_counts') 26 + .set({ 27 + following_count: sql`MAX(following_count - 1, 0)`, 28 + updated_at: sql`(unixepoch() * 1000)` 29 + }) 30 + .where('did', '=', followRecord.follower_did) 31 + .execute() 32 + 33 + // Decrement follower_count for followee 34 + await trx 35 + .updateTable('follow_counts') 36 + .set({ 37 + follower_count: sql`MAX(follower_count - 1, 0)`, 38 + updated_at: sql`(unixepoch() * 1000)` 39 + }) 40 + .where('did', '=', followRecord.followee_did) 41 + .execute() 42 + }) 43 + }
+35
packages/server/src/lib/inbox-operations/uncachePost.ts
··· 1 + import { db } from '@api/db/db' 2 + import { parseResourceUri, type Did } from '@atcute/lexicons' 3 + 4 + /** 5 + * Delete a cached post (public or private) from the database 6 + */ 7 + export default async function uncachePost(uri: string, authorDid: Did) { 8 + const parsedUri = parseResourceUri(uri) 9 + if (!parsedUri.ok) { 10 + throw new Error(`Invalid URI: ${uri}`) 11 + } 12 + 13 + await db.transaction().execute(async (trx) => { 14 + if (parsedUri.value.collection === 'app.wafrn.content.publicPost') { 15 + const publicResult = await trx 16 + .deleteFrom('public_posts') 17 + .where('uri', '=', uri) 18 + .where('author_did', '=', authorDid) 19 + .executeTakeFirst() 20 + 21 + if (publicResult.numDeletedRows > 0) { 22 + await trx 23 + .deleteFrom('public_post_tags') 24 + .where('post_uri', '=', uri) 25 + .execute() 26 + } 27 + } else if (parsedUri.value.collection === 'app.wafrn.content.privatePost') { 28 + await trx 29 + .deleteFrom('private_posts') 30 + .where('uri', '=', uri) 31 + .where('author_did', '=', authorDid) 32 + .execute() 33 + } 34 + }) 35 + }
+159
packages/server/src/lib/inbox.ts
··· 1 + import { Client, simpleFetchHandler } from '@atcute/client' 2 + import { 3 + parseCanonicalResourceUri, 4 + type Did, 5 + type ResourceUri 6 + } from '@atcute/lexicons' 7 + import { AppWafrnContentInbox } from '@watproto/lexicon' 8 + import { actorResolver } from './idResolver' 9 + 10 + import type {} from '@atcute/atproto' 11 + import uncachePost from './inbox-operations/uncachePost' 12 + import uncacheFollow from './inbox-operations/uncacheFollow' 13 + import cachePublicPost from './inbox-operations/cachePublicPost' 14 + import cachePrivatePost from './inbox-operations/cachePrivatePost' 15 + import cacheFollow from './inbox-operations/cacheFollow' 16 + 17 + // Use the generated types from lexicon 18 + type InboxItem = AppWafrnContentInbox.InboxItem 19 + type InboxResult = AppWafrnContentInbox.InboxResult 20 + 21 + const DELETE_HANDLERS = { 22 + 'app.wafrn.content.publicPost': uncachePost, 23 + 'app.wafrn.content.privatePost': uncachePost, 24 + 'app.wafrn.graph.follow': uncacheFollow 25 + } 26 + const UPSERT_HANDLERS = { 27 + 'app.wafrn.content.publicPost': cachePublicPost, 28 + 'app.wafrn.content.privatePost': cachePrivatePost, 29 + 'app.wafrn.graph.follow': cacheFollow 30 + } 31 + 32 + export type HTTPURL = `http://${string}` | `https://${string}` 33 + 34 + /** Get PDS Client from URL */ 35 + export function getPublicServiceAgent(serviceUrl: HTTPURL) { 36 + return new Client({ 37 + handler: simpleFetchHandler({ service: serviceUrl }) 38 + }) 39 + } 40 + 41 + /** 42 + * Fetch a record from a user's PDS given its AT-URI 43 + */ 44 + async function fetchRecordFromPDS(uri: string) { 45 + const parsedUri = parseCanonicalResourceUri(uri) 46 + if (!parsedUri.ok) { 47 + throw new Error(`Error parsing AT-URI ${uri}: ${parsedUri.error}`) 48 + } 49 + 50 + const { repo, collection, rkey } = parsedUri.value 51 + 52 + // Get PDS URL from DID document 53 + const { pds } = await actorResolver.resolve(repo) 54 + 55 + // Create XRPC client for the PDS 56 + const pdsClient = getPublicServiceAgent(pds as HTTPURL) 57 + 58 + // Fetch the record 59 + const response = await pdsClient.get('com.atproto.repo.getRecord', { 60 + params: { 61 + repo, 62 + collection, 63 + rkey 64 + } 65 + }) 66 + 67 + if (!response.ok) { 68 + throw new Error( 69 + `Failed to fetch record: HTTP ${response.status} \n${response.data}` 70 + ) 71 + } 72 + 73 + return response.data 74 + } 75 + 76 + function inboxOk(uri: ResourceUri, indexedAt: string) { 77 + return { 78 + uri, 79 + status: 'success' as const, 80 + indexedAt 81 + } satisfies InboxResult 82 + } 83 + function inboxError(uri: ResourceUri, error: string) { 84 + return { 85 + uri, 86 + status: 'error' as const, 87 + error 88 + } satisfies InboxResult 89 + } 90 + 91 + /** 92 + * Process a single inbox item 93 + */ 94 + async function processInboxItem(item: InboxItem, authDid: Did) { 95 + try { 96 + const parsedUri = parseCanonicalResourceUri(item.uri) 97 + if (!parsedUri.ok) { 98 + return inboxError(item.uri, `Error parsing AT-URI: ${parsedUri.error}`) 99 + } 100 + 101 + const { repo, collection } = parsedUri.value 102 + 103 + // Authorization check: user can only index their own records 104 + if (repo !== authDid) { 105 + return inboxError( 106 + item.uri, 107 + 'Unauthorized: you can only index your own records' 108 + ) 109 + } 110 + 111 + // Handle delete action 112 + if (item.action === 'delete') { 113 + // NOTE: handler can be undefined but no way to find a type to reflect that here 114 + const handler = 115 + DELETE_HANDLERS[collection as keyof typeof DELETE_HANDLERS] 116 + 117 + if (!handler) { 118 + return inboxError(item.uri, `Unsupported collection: ${collection}`) 119 + } 120 + 121 + await handler(item.uri, authDid) 122 + return inboxOk(item.uri, new Date().toISOString()) 123 + } else { 124 + // handle create-update action 125 + 126 + // Fetch record from PDS for create/update actions 127 + const { value: record } = await fetchRecordFromPDS(item.uri) 128 + 129 + // NOTE: handler can be undefined but no way to find a type to reflect that here 130 + const handler = 131 + UPSERT_HANDLERS[collection as keyof typeof UPSERT_HANDLERS] 132 + 133 + if (!handler) { 134 + return inboxError(item.uri, `Unsupported collection: ${collection}`) 135 + } 136 + 137 + const indexedAt = await handler(item.uri, authDid, record as any) 138 + return inboxOk(item.uri, indexedAt) 139 + } 140 + } catch (error) { 141 + return inboxError( 142 + item.uri, 143 + error instanceof Error ? error.message : 'Unknown error' 144 + ) 145 + } 146 + } 147 + 148 + /** 149 + * Process multiple inbox items with partial success support 150 + * TODO: use a queue 151 + */ 152 + export async function processInboxItems(authDid: Did, items: InboxItem[]) { 153 + // Process all items in parallel 154 + const results = await Promise.all( 155 + items.map((item) => processInboxItem(item, authDid)) 156 + ) 157 + 158 + return results 159 + }
+11 -93
packages/server/src/lib/xrpcServer.ts
··· 2 2 import { ServiceJwtVerifier } from '@atcute/xrpc-server/auth' 3 3 import { cors } from '@atcute/xrpc-server/middlewares/cors' 4 4 import { 5 - AppWafrnContentCachePost, 5 + AppWafrnContentInbox, 6 6 AppWafrnContentDefs, 7 7 AppWafrnContentGetFeed, 8 8 AppWafrnContentDeletePost, 9 - AppWafrnGraphCacheFollow, 10 9 AppWafrnGraphDeleteFollow, 11 10 AppWafrnGraphGetFollowers, 12 11 AppWafrnGraphGetFollows, ··· 16 15 import { didDocResolver } from './idResolver' 17 16 import env from './env' 18 17 import { db } from '@api/db/db' 19 - import { is, type ResourceUri, type Did } from '@atcute/lexicons' 18 + import { type ResourceUri, type Did } from '@atcute/lexicons' 20 19 import { 21 - createFollow, 22 20 deleteFollow, 23 21 getFollowCountsBatch, 24 22 getFollowers, ··· 27 25 } from '@api/lib/follow' 28 26 import { getWafrnProfiles } from '@api/lib/profile' 29 27 import { deletePost } from '@api/lib/post' 28 + import { processInboxItems } from '@api/lib/inbox' 30 29 31 30 const jwtVerifier = new ServiceJwtVerifier({ 32 31 resolver: didDocResolver, ··· 98 97 } 99 98 } 100 99 101 - xrpcServer.add(AppWafrnContentCachePost.mainSchema, { 100 + // ==================================== 101 + // INBOX ENDPOINT 102 + // ==================================== 103 + 104 + xrpcServer.add(AppWafrnContentInbox.mainSchema, { 102 105 async handler({ input, request }) { 103 106 const { did } = await authenticateJwt(request) 104 - if (is(AppWafrnContentDefs.publicPostViewSchema, input.post)) { 105 - const row = await db 106 - .insertInto('public_posts') 107 - .values({ 108 - author_did: did, 109 - uri: input.post.uri, 110 - content_html: input.post.content.contentHTML ?? '', 111 - content_markdown: input.post.content.contentMarkdown ?? '', 112 - content_warning: input.post.content.contentWarning, 113 - created_at: new Date(input.post.createdAt).getTime() 114 - }) 115 - .returning(['indexed_at']) 116 - .executeTakeFirstOrThrow() 117 107 118 - // Insert tags if provided 119 - if (input.post.content.tags && input.post.content.tags.length > 0) { 120 - // Step 1: Batch upsert all tags 121 - const tagRecords = input.post.content.tags.map((tagName) => ({ 122 - name: tagName 123 - })) 124 - 125 - await db 126 - .insertInto('tags') 127 - .values(tagRecords) 128 - .onConflict((oc) => oc.column('name').doNothing()) 129 - .execute() 130 - 131 - // Step 2: Insert into junction table 132 - const tagAssociations = input.post.content.tags.map((tagName) => ({ 133 - post_uri: input.post.uri, 134 - tag_name: tagName 135 - })) 136 - 137 - await db 138 - .insertInto('public_post_tags') 139 - .values(tagAssociations) 140 - .execute() 141 - } 142 - 143 - return json({ 144 - indexedAt: new Date(row.indexed_at).toISOString() 145 - }) 146 - } else if (is(AppWafrnContentDefs.privatePostViewSchema, input.post)) { 147 - const row = await db 148 - .insertInto('private_posts') 149 - .values({ 150 - author_did: did, 151 - uri: input.post.uri, 152 - encrypted_content: input.post.encryptedContent, 153 - key_version: input.post.keyVersion, 154 - created_at: new Date(input.post.createdAt).getTime(), 155 - visibility: input.post.visibility 156 - }) 157 - .returning(['indexed_at']) 158 - .executeTakeFirstOrThrow() 108 + // Process all inbox items 109 + const results = await processInboxItems(did, input.items) 159 110 160 - return json({ 161 - indexedAt: new Date(row.indexed_at).toISOString() 162 - }) 163 - } else { 164 - throw new XRPCError({ 165 - status: 400, 166 - error: 167 - 'Invalid post input. Must follow privatePost or publicPost lexicon' 168 - }) 169 - } 111 + return json({ results }) 170 112 } 171 113 }) 172 114 ··· 269 211 // ==================================== 270 212 // FOLLOW ENDPOINTS 271 213 // ==================================== 272 - 273 - xrpcServer.add(AppWafrnGraphCacheFollow.mainSchema, { 274 - async handler({ input, request }) { 275 - const { did } = await authenticateJwt(request) 276 - 277 - // Verify the follower matches the authenticated user 278 - if (did !== input.follower) { 279 - throw new XRPCError({ 280 - status: 403, 281 - error: 'Forbidden', 282 - description: 'You can only cache follows for your own account' 283 - }) 284 - } 285 - 286 - await createFollow( 287 - input.follower, 288 - input.followee, 289 - input.uri, 290 - new Date(input.createdAt).getTime() 291 - ) 292 - 293 - return json({ indexed: true }) 294 - } 295 - }) 296 214 297 215 xrpcServer.add(AppWafrnGraphDeleteFollow.mainSchema, { 298 216 async handler({ input, request }) {