a tool for shared writing and social publishing

Perf/cached server mutation context (#181)

* use a cached server mutation context

* properly handle deleting facts when deleting entity

* add logging

* don't use entries.map

* shrink pushrequest size

* properly compose delete fact sql query

* add textAttributeWriteCache

* call pull if batch is too small

* remove debug

* handle case w/ no other updates

* accquire lock on token

* use local lock instead of redis

authored by awarm.space and committed by

GitHub a2faf8d7 cdc68782

+336 -25
+39 -16
app/api/rpc/[command]/push.ts
··· 1 - import { serverMutationContext } from "src/replicache/serverMutationContext"; 2 1 import { mutations } from "src/replicache/mutations"; 3 - import { eq } from "drizzle-orm"; 2 + import { eq, sql } from "drizzle-orm"; 4 3 import { permission_token_rights, replicache_clients } from "drizzle/schema"; 5 4 import { getClientGroup } from "src/replicache/utils"; 6 5 import { makeRoute } from "../lib"; 7 6 import { z } from "zod"; 8 7 import type { Env } from "./route"; 8 + import { cachedServerMutationContext } from "src/replicache/cachedServerMutationContext"; 9 9 import { drizzle } from "drizzle-orm/node-postgres"; 10 - 11 10 import { Pool } from "pg"; 12 - 13 11 import { attachDatabasePool } from "@vercel/functions"; 14 12 import { DbPool } from "@vercel/functions/db-connections"; 15 13 ··· 57 55 // Attach the pool to ensure idle connections close before suspension 58 56 attachDatabasePool(pool as DbPool); 59 57 58 + import { Lock } from "src/utils/lock"; 59 + 60 + let locks = new Map<string, Lock>(); 61 + 60 62 export const push = makeRoute({ 61 63 route: "push", 62 64 input: z.object({ ··· 73 75 74 76 let client = await pool.connect(); 75 77 const db = drizzle(client); 76 - 77 78 let channel = supabase.channel(`rootEntity:${rootEntity}`); 79 + let lock = locks.get(token.id); 80 + if (!lock) { 81 + lock = new Lock(); 82 + locks.set(token.id, lock); 83 + } 84 + let release = await lock.lock(); 78 85 try { 79 86 await db.transaction(async (tx) => { 80 87 let clientGroup = await getClientGroup(tx, pushRequest.clientGroupID); ··· 82 89 .select() 83 90 .from(permission_token_rights) 84 91 .where(eq(permission_token_rights.token, token.id)); 92 + let { getContext, flush } = cachedServerMutationContext( 93 + tx, 94 + token.id, 95 + token_rights, 96 + ); 97 + 98 + let lastMutations = new Map<string, number>(); 99 + console.log(pushRequest.mutations.map((m) => m.name)); 85 100 for (let mutation of pushRequest.mutations) { 86 101 let lastMutationID = clientGroup[mutation.clientID] || 0; 87 102 if (mutation.id <= lastMutationID) continue; ··· 91 106 continue; 92 107 } 93 108 try { 94 - await mutations[name]( 95 - mutation.args as any, 96 - serverMutationContext(tx, token.id, token_rights), 97 - ); 109 + let ctx = getContext(mutation.clientID, mutation.id); 110 + await mutations[name](mutation.args as any, ctx); 98 111 } catch (e) { 99 112 console.log( 100 113 `Error occured while running mutation: ${name}`, ··· 102 115 JSON.stringify(mutation, null, 2), 103 116 ); 104 117 } 118 + lastMutations.set(mutation.clientID, mutation.id); 119 + } 120 + 121 + let lastMutationIdsUpdate = Array.from(lastMutations.entries()).map( 122 + (entries) => ({ 123 + client_group: pushRequest.clientGroupID, 124 + client_id: entries[0], 125 + last_mutation: entries[1], 126 + }), 127 + ); 128 + if (lastMutationIdsUpdate.length > 0) 105 129 await tx 106 130 .insert(replicache_clients) 107 - .values({ 108 - client_group: pushRequest.clientGroupID, 109 - client_id: mutation.clientID, 110 - last_mutation: mutation.id, 111 - }) 131 + .values(lastMutationIdsUpdate) 112 132 .onConflictDoUpdate({ 113 133 target: replicache_clients.client_id, 114 - set: { last_mutation: mutation.id }, 134 + set: { last_mutation: sql`excluded.last_mutation` }, 115 135 }); 116 - } 136 + await flush(); 117 137 }); 118 138 119 139 await channel.send({ ··· 121 141 event: "poke", 122 142 payload: { message: "poke" }, 123 143 }); 144 + } catch (e) { 145 + console.log(e); 124 146 } finally { 125 147 client.release(); 148 + release(); 126 149 supabase.removeChannel(channel); 127 150 return { result: undefined } as const; 128 151 }
+252
src/replicache/cachedServerMutationContext.ts
··· 1 + import { PgTransaction } from "drizzle-orm/pg-core"; 2 + import { Fact, PermissionToken } from "."; 3 + import { MutationContext } from "./mutations"; 4 + import { supabaseServerClient } from "supabase/serverClient"; 5 + import { entities, facts } from "drizzle/schema"; 6 + import * as driz from "drizzle-orm"; 7 + import { Attribute, Attributes, FilterAttributes } from "./attributes"; 8 + import { v7 } from "uuid"; 9 + import * as base64 from "base64-js"; 10 + import * as Y from "yjs"; 11 + import { DeepReadonly } from "replicache"; 12 + 13 + type WriteCacheEntry = 14 + | { type: "put"; fact: Fact<any> } 15 + | { type: "del"; fact: { id: string } }; 16 + export function cachedServerMutationContext( 17 + tx: PgTransaction<any, any, any>, 18 + permission_token_id: string, 19 + token_rights: PermissionToken["permission_token_rights"], 20 + ) { 21 + let writeCache: WriteCacheEntry[] = []; 22 + let eavCache = new Map<string, DeepReadonly<Fact<Attribute>>[]>(); 23 + let permissionsCache: { [key: string]: boolean } = {}; 24 + let entitiesCache: { set: string; id: string }[] = []; 25 + let deleteEntitiesCache: string[] = []; 26 + let textAttributeWriteCache = {} as { 27 + [entityAttribute: string]: { [clientID: string]: string }; 28 + }; 29 + 30 + const scanIndex = { 31 + async eav<A extends Attribute>(entity: string, attribute: A) { 32 + let cached = eavCache.get(`${entity}-${attribute}`) as DeepReadonly< 33 + Fact<A> 34 + >[]; 35 + let baseFacts: DeepReadonly<Fact<A>>[]; 36 + if (deleteEntitiesCache.includes(entity)) return []; 37 + if (cached) baseFacts = cached; 38 + else { 39 + cached = (await tx 40 + .select({ 41 + id: facts.id, 42 + data: facts.data, 43 + entity: facts.entity, 44 + attribute: facts.attribute, 45 + }) 46 + .from(facts) 47 + .where( 48 + driz.and( 49 + driz.eq(facts.attribute, attribute), 50 + driz.eq(facts.entity, entity), 51 + ), 52 + )) as DeepReadonly<Fact<A>>[]; 53 + } 54 + cached = cached.filter( 55 + (f) => 56 + !writeCache.find((wc) => wc.type === "del" && wc.fact.id === f.id), 57 + ); 58 + let newlyWrittenFacts = writeCache.filter( 59 + (f) => 60 + f.type === "put" && 61 + f.fact.attribute === attribute && 62 + f.fact.entity === entity, 63 + ); 64 + return [ 65 + ...cached, 66 + ...newlyWrittenFacts.map((f) => f.fact as Fact<A>), 67 + ].filter( 68 + (f) => 69 + !( 70 + (f.data.type === "reference" || 71 + f.data.type === "ordered-reference" || 72 + f.data.type === "spatial-reference") && 73 + deleteEntitiesCache.includes(f.data.value) 74 + ), 75 + ) as DeepReadonly<Fact<A>>[]; 76 + }, 77 + }; 78 + let getContext = (clientID: string, mutationID: number) => { 79 + let ctx: MutationContext & { 80 + checkPermission: (entity: string) => Promise<boolean>; 81 + } = { 82 + scanIndex, 83 + permission_token_id, 84 + async runOnServer(cb) { 85 + return cb({ supabase: supabaseServerClient }); 86 + }, 87 + async checkPermission(entity: string) { 88 + if (deleteEntitiesCache.includes(entity)) return false; 89 + let cachedEntity = entitiesCache.find((e) => e.id === entity); 90 + if (cachedEntity) { 91 + return !!token_rights.find( 92 + (r) => r.entity_set === cachedEntity?.set && r.write === true, 93 + ); 94 + } 95 + if (permissionsCache[entity] !== undefined) 96 + return permissionsCache[entity]; 97 + let [permission_set] = await tx 98 + .select({ entity_set: entities.set }) 99 + .from(entities) 100 + .where(driz.eq(entities.id, entity)); 101 + let hasPermission = 102 + !!permission_set && 103 + !!token_rights.find( 104 + (r) => 105 + r.entity_set === permission_set.entity_set && r.write == true, 106 + ); 107 + permissionsCache[entity] = hasPermission; 108 + return hasPermission; 109 + }, 110 + async runOnClient(_cb) {}, 111 + async createEntity({ entityID, permission_set }) { 112 + if ( 113 + !token_rights.find( 114 + (r) => r.entity_set === permission_set && r.write === true, 115 + ) 116 + ) { 117 + return false; 118 + } 119 + if (!entitiesCache.find((e) => e.id === entityID)) 120 + entitiesCache.push({ set: permission_set, id: entityID }); 121 + deleteEntitiesCache = deleteEntitiesCache.filter((e) => e === entityID); 122 + return true; 123 + }, 124 + async deleteEntity(entity) { 125 + if (!(await this.checkPermission(entity))) return; 126 + deleteEntitiesCache.push(entity); 127 + entitiesCache = entitiesCache.filter((e) => e.id === entity); 128 + }, 129 + async assertFact(f) { 130 + if (!f.entity) return; 131 + let attribute = Attributes[f.attribute as Attribute]; 132 + if (!attribute) return; 133 + let id = f.id || v7(); 134 + let data = { ...f.data }; 135 + if (!(await this.checkPermission(f.entity))) return; 136 + if (attribute.cardinality === "one") { 137 + let existingFact = await scanIndex.eav(f.entity, f.attribute); 138 + if (existingFact[0]) { 139 + id = existingFact[0].id; 140 + if (attribute.type === "text") { 141 + let c = 142 + textAttributeWriteCache[`${f.entity}-${f.attribute}`] || {}; 143 + textAttributeWriteCache[`${f.entity}-${f.attribute}`] = { 144 + ...c, 145 + [clientID]: ( 146 + data as Fact<keyof FilterAttributes<{ type: "text" }>>["data"] 147 + ).value, 148 + }; 149 + } 150 + } 151 + } 152 + writeCache = writeCache.filter((f) => f.fact.id !== id); 153 + writeCache.push({ 154 + type: "put", 155 + fact: { 156 + id: id, 157 + entity: f.entity, 158 + data: data, 159 + attribute: f.attribute, 160 + }, 161 + }); 162 + }, 163 + async retractFact(factID) { 164 + writeCache = writeCache.filter((f) => f.fact.id !== factID); 165 + writeCache.push({ type: "del", fact: { id: factID } }); 166 + }, 167 + }; 168 + return ctx; 169 + }; 170 + let flush = async () => { 171 + if (entitiesCache.length > 0) 172 + await tx 173 + .insert(entities) 174 + .values(entitiesCache.map((e) => ({ set: e.set, id: e.id }))); 175 + let factWrites = writeCache.flatMap((f) => 176 + f.type === "del" ? [] : [f.fact], 177 + ); 178 + if (factWrites.length > 0) 179 + await tx 180 + .insert(facts) 181 + .values( 182 + await Promise.all( 183 + factWrites.map(async (f) => { 184 + let attribute = Attributes[f.attribute as Attribute]; 185 + let data = f.data; 186 + if ( 187 + attribute.type === "text" && 188 + attribute.cardinality === "one" 189 + ) { 190 + let values = Object.values( 191 + textAttributeWriteCache[`${f.entity}-${f.attribute}`] || {}, 192 + ); 193 + if (values.length > 0) { 194 + let existingFact = await scanIndex.eav(f.entity, f.attribute); 195 + if (existingFact[0]) values.push(existingFact[0].data.value); 196 + let updateBytes = Y.mergeUpdates( 197 + values.map((v) => base64.toByteArray(v)), 198 + ); 199 + data.value = base64.fromByteArray(updateBytes); 200 + } 201 + } 202 + 203 + return { 204 + id: f.id, 205 + entity: f.entity, 206 + data: driz.sql`${data}::jsonb`, 207 + attribute: f.attribute, 208 + }; 209 + }), 210 + ), 211 + ) 212 + .onConflictDoUpdate({ 213 + target: facts.id, 214 + set: { data: driz.sql`excluded.data` }, 215 + }); 216 + if (deleteEntitiesCache.length > 0) 217 + await tx 218 + .delete(entities) 219 + .where(driz.inArray(entities.id, deleteEntitiesCache)); 220 + let factDeletes = writeCache.flatMap((f) => 221 + f.type === "put" ? [] : [f.fact.id], 222 + ); 223 + if (factDeletes.length > 0 || deleteEntitiesCache.length > 0) { 224 + const conditions = []; 225 + if (factDeletes.length > 0) { 226 + conditions.push(driz.inArray(facts.id, factDeletes)); 227 + } 228 + if (deleteEntitiesCache.length > 0) { 229 + conditions.push( 230 + driz.and( 231 + driz.sql`(data->>'type' = 'ordered-reference' or data->>'type' = 'reference' or data->>'type' = 'spatial-reference')`, 232 + driz.inArray(driz.sql`data->>'value'`, deleteEntitiesCache), 233 + ), 234 + ); 235 + } 236 + if (conditions.length > 0) { 237 + await tx.delete(facts).where(driz.or(...conditions)); 238 + } 239 + } 240 + 241 + writeCache = []; 242 + eavCache.clear(); 243 + permissionsCache = {}; 244 + entitiesCache = []; 245 + deleteEntitiesCache = []; 246 + }; 247 + 248 + return { 249 + getContext, 250 + flush, 251 + }; 252 + }
+15 -9
src/replicache/index.tsx
··· 130 130 ) as ReplicacheMutators, 131 131 licenseKey: "l381074b8d5224dabaef869802421225a", 132 132 pusher: async (pushRequest) => { 133 + const batchSize = 250; 133 134 let smolpushRequest = { 134 135 ...pushRequest, 135 - mutations: pushRequest.mutations.slice(0, 250), 136 + mutations: pushRequest.mutations.slice(0, batchSize), 136 137 } as PushRequest; 138 + let response = ( 139 + await callRPC("push", { 140 + pushRequest: smolpushRequest, 141 + token: props.token, 142 + rootEntity: props.name, 143 + }) 144 + ).result; 145 + if (pushRequest.mutations.length > batchSize) 146 + setTimeout(() => { 147 + newRep.push(); 148 + }, 50); 137 149 return { 138 - response: ( 139 - await callRPC("push", { 140 - pushRequest: smolpushRequest, 141 - token: props.token, 142 - rootEntity: props.name, 143 - }) 144 - ).result, 150 + response, 145 151 httpRequestInfo: { errorMessage: "", httpStatusCode: 200 }, 146 152 }; 147 153 }, ··· 158 164 name: props.name, 159 165 indexes: { 160 166 eav: { jsonPointer: "/indexes/eav", allowEmpty: true }, 161 - aev: { jsonPointer: "/indexes/aev", allowEmpty: true }, 162 167 vae: { jsonPointer: "/indexes/vae", allowEmpty: true }, 163 168 }, 164 169 }); 170 + 165 171 setRep(newRep); 166 172 let channel: RealtimeChannel | null = null; 167 173 if (!props.disablePull) {
+30
src/utils/lock.ts
··· 1 + // Taken from https://github.com/rocicorp/lock/blob/main/src/lock.ts 2 + export class Lock { 3 + private _lockP: Promise<void> | null = null; 4 + 5 + async lock(): Promise<() => void> { 6 + const previous = this._lockP; 7 + const { promise, resolve } = resolver(); 8 + this._lockP = promise; 9 + await previous; 10 + return resolve; 11 + } 12 + async withLock<R>(f: () => Promise<R>) { 13 + let release = await this.lock(); 14 + try { 15 + return await f(); 16 + } finally { 17 + release(); 18 + } 19 + } 20 + } 21 + 22 + export function resolver() { 23 + let resolve!: (v: void) => void; 24 + let reject!: () => void; 25 + const promise = new Promise<void>((res, rej) => { 26 + resolve = res; 27 + reject = rej; 28 + }); 29 + return { promise, resolve, reject }; 30 + }