a tool for shared writing and social publishing
at feature/fonts 279 lines 9.6 kB view raw
1import { PgTransaction } from "drizzle-orm/pg-core"; 2import { Fact, PermissionToken } from "."; 3import { MutationContext } from "./mutations"; 4import { supabaseServerClient } from "supabase/serverClient"; 5import { entities, facts } from "drizzle/schema"; 6import * as driz from "drizzle-orm"; 7import { Attribute, Attributes, FilterAttributes } from "./attributes"; 8import { v7 } from "uuid"; 9import { DeepReadonly } from "replicache"; 10 11type WriteCacheEntry = 12 | { type: "put"; fact: Fact<any> } 13 | { type: "del"; fact: { id: string } }; 14 15export function cachedServerMutationContext( 16 tx: PgTransaction<any, any, any>, 17 permission_token_id: string, 18 token_rights: PermissionToken["permission_token_rights"], 19) { 20 let writeCache: WriteCacheEntry[] = []; 21 let eavCache = new Map<string, DeepReadonly<Fact<Attribute>>[]>(); 22 let permissionsCache: { [key: string]: boolean } = {}; 23 let entitiesCache: { set: string; id: string }[] = []; 24 let deleteEntitiesCache: string[] = []; 25 let textAttributeWriteCache = {} as { 26 [entityAttribute: string]: { [clientID: string]: string }; 27 }; 28 29 const scanIndex = { 30 async eav<A extends Attribute>(entity: string, attribute: A) { 31 let cached = eavCache.get(`${entity}-${attribute}`) as DeepReadonly< 32 Fact<A> 33 >[]; 34 let baseFacts: DeepReadonly<Fact<A>>[]; 35 if (deleteEntitiesCache.includes(entity)) return []; 36 if (cached) baseFacts = cached; 37 else { 38 cached = (await tx 39 .select({ 40 id: facts.id, 41 data: facts.data, 42 entity: facts.entity, 43 attribute: facts.attribute, 44 }) 45 .from(facts) 46 .where( 47 driz.and( 48 driz.eq(facts.attribute, attribute), 49 driz.eq(facts.entity, entity), 50 ), 51 )) as DeepReadonly<Fact<A>>[]; 52 } 53 cached = cached.filter( 54 (f) => 55 !writeCache.find((wc) => wc.type === "del" && wc.fact.id === f.id), 56 ); 57 let newlyWrittenFacts = writeCache.filter( 58 (f) => 59 f.type === "put" && 60 f.fact.attribute === attribute && 61 f.fact.entity === entity, 62 ); 63 return [ 64 ...cached, 65 ...newlyWrittenFacts.map((f) => f.fact as Fact<A>), 66 ].filter( 67 (f) => 68 !( 69 (f.data.type === "reference" || 70 f.data.type === "ordered-reference" || 71 f.data.type === "spatial-reference") && 72 deleteEntitiesCache.includes(f.data.value) 73 ), 74 ) as DeepReadonly<Fact<A>>[]; 75 }, 76 }; 77 let getContext = (clientID: string, mutationID: number) => { 78 let ctx: MutationContext & { 79 checkPermission: (entity: string) => Promise<boolean>; 80 } = { 81 scanIndex, 82 permission_token_id, 83 async runOnServer(cb) { 84 return cb({ supabase: supabaseServerClient }); 85 }, 86 async checkPermission(entity: string) { 87 if (deleteEntitiesCache.includes(entity)) return false; 88 let cachedEntity = entitiesCache.find((e) => e.id === entity); 89 if (cachedEntity) { 90 return !!token_rights.find( 91 (r) => r.entity_set === cachedEntity?.set && r.write === true, 92 ); 93 } 94 if (permissionsCache[entity] !== undefined) 95 return permissionsCache[entity]; 96 let [permission_set] = await tx 97 .select({ entity_set: entities.set }) 98 .from(entities) 99 .where(driz.eq(entities.id, entity)); 100 let hasPermission = 101 !!permission_set && 102 !!token_rights.find( 103 (r) => 104 r.entity_set === permission_set.entity_set && r.write == true, 105 ); 106 permissionsCache[entity] = hasPermission; 107 return hasPermission; 108 }, 109 async runOnClient(_cb) {}, 110 async createEntity({ entityID, permission_set }) { 111 if ( 112 !token_rights.find( 113 (r) => r.entity_set === permission_set && r.write === true, 114 ) 115 ) { 116 return false; 117 } 118 if (!entitiesCache.find((e) => e.id === entityID)) 119 entitiesCache.push({ set: permission_set, id: entityID }); 120 deleteEntitiesCache = deleteEntitiesCache.filter((e) => e !== entityID); 121 return true; 122 }, 123 async deleteEntity(entity) { 124 if (!(await this.checkPermission(entity))) return; 125 deleteEntitiesCache.push(entity); 126 entitiesCache = entitiesCache.filter((e) => e.id !== entity); 127 writeCache = writeCache.filter( 128 (f) => 129 f.type !== "put" || 130 (f.fact.entity !== entity && f.fact.data.value !== entity), 131 ); 132 }, 133 async assertFact(f) { 134 if (!f.entity) return; 135 let attribute = Attributes[f.attribute as Attribute]; 136 if (!attribute) return; 137 let id = f.id || v7(); 138 let data = { ...f.data }; 139 if (!(await this.checkPermission(f.entity))) return; 140 if (attribute.cardinality === "one") { 141 let existingFact = await scanIndex.eav(f.entity, f.attribute); 142 if (existingFact[0]) { 143 id = existingFact[0].id; 144 } 145 } 146 writeCache = writeCache.filter((f) => f.fact.id !== id); 147 writeCache.push({ 148 type: "put", 149 fact: { 150 id: id, 151 entity: f.entity, 152 data: data, 153 attribute: f.attribute, 154 }, 155 }); 156 }, 157 async retractFact(factID) { 158 let cachedFact = writeCache.find( 159 (f) => f.type === "put" && f.fact.id === factID, 160 ); 161 let entity: string | undefined; 162 if (cachedFact && cachedFact.type === "put") { 163 entity = cachedFact.fact.entity; 164 } else { 165 let [row] = await tx 166 .select({ entity: facts.entity }) 167 .from(facts) 168 .where(driz.eq(facts.id, factID)); 169 entity = row?.entity; 170 } 171 if (!entity || !(await this.checkPermission(entity))) return; 172 writeCache = writeCache.filter((f) => f.fact.id !== factID); 173 writeCache.push({ type: "del", fact: { id: factID } }); 174 }, 175 }; 176 return ctx; 177 }; 178 let flush = async () => { 179 let flushStart = performance.now(); 180 let timeInsertingEntities = 0; 181 let timeProcessingFactWrites = 0; 182 let timeDeletingEntities = 0; 183 let timeDeletingFacts = 0; 184 let timeCacheCleanup = 0; 185 186 // Insert entities 187 let entityInsertStart = performance.now(); 188 if (entitiesCache.length > 0) 189 await tx 190 .insert(entities) 191 .values(entitiesCache.map((e) => ({ set: e.set, id: e.id }))) 192 .onConflictDoNothing({ target: entities.id }); 193 timeInsertingEntities = performance.now() - entityInsertStart; 194 195 // Process fact writes 196 let factWritesStart = performance.now(); 197 let factWrites = writeCache.flatMap((f) => 198 f.type === "del" ? [] : [f.fact], 199 ); 200 if (factWrites.length > 0) { 201 await tx 202 .insert(facts) 203 .values( 204 factWrites.map((f) => ({ 205 id: f.id, 206 entity: f.entity, 207 data: driz.sql`${f.data}::jsonb`, 208 attribute: f.attribute, 209 })), 210 ) 211 .onConflictDoUpdate({ 212 target: facts.id, 213 set: { 214 data: driz.sql`excluded.data`, 215 entity: driz.sql`excluded.entity`, 216 }, 217 }); 218 } 219 timeProcessingFactWrites = performance.now() - factWritesStart; 220 221 // Delete entities 222 let entityDeleteStart = performance.now(); 223 if (deleteEntitiesCache.length > 0) 224 await tx 225 .delete(entities) 226 .where(driz.inArray(entities.id, deleteEntitiesCache)); 227 timeDeletingEntities = performance.now() - entityDeleteStart; 228 229 // Delete facts 230 let factDeleteStart = performance.now(); 231 let factDeletes = writeCache.flatMap((f) => 232 f.type === "put" ? [] : [f.fact.id], 233 ); 234 if (factDeletes.length > 0 || deleteEntitiesCache.length > 0) { 235 const conditions = []; 236 if (factDeletes.length > 0) { 237 conditions.push(driz.inArray(facts.id, factDeletes)); 238 } 239 if (deleteEntitiesCache.length > 0) { 240 conditions.push( 241 driz.and( 242 driz.sql`(data->>'type' = 'ordered-reference' or data->>'type' = 'reference' or data->>'type' = 'spatial-reference')`, 243 driz.inArray(driz.sql`data->>'value'`, deleteEntitiesCache), 244 ), 245 ); 246 } 247 if (conditions.length > 0) { 248 await tx.delete(facts).where(driz.or(...conditions)); 249 } 250 } 251 timeDeletingFacts = performance.now() - factDeleteStart; 252 253 // Cache cleanup 254 let cacheCleanupStart = performance.now(); 255 writeCache = []; 256 eavCache.clear(); 257 permissionsCache = {}; 258 entitiesCache = []; 259 permissionsCache = {}; 260 deleteEntitiesCache = []; 261 timeCacheCleanup = performance.now() - cacheCleanupStart; 262 263 let totalFlushTime = performance.now() - flushStart; 264 console.log(` 265Flush Performance Breakdown (${totalFlushTime.toFixed(2)}ms): 266========================================== 267Entity Insertions (${entitiesCache.length} entities): ${timeInsertingEntities.toFixed(2)}ms 268Fact Processing (${factWrites.length} facts): ${timeProcessingFactWrites.toFixed(2)}ms 269Entity Deletions (${deleteEntitiesCache.length} entities): ${timeDeletingEntities.toFixed(2)}ms 270Fact Deletions: ${timeDeletingFacts.toFixed(2)}ms 271Cache Cleanup: ${timeCacheCleanup.toFixed(2)}ms 272 `); 273 }; 274 275 return { 276 getContext, 277 flush, 278 }; 279}