a tool for shared writing and social publishing
1import { PgTransaction } from "drizzle-orm/pg-core";
2import { Fact, PermissionToken } from ".";
3import { MutationContext } from "./mutations";
4import { supabaseServerClient } from "supabase/serverClient";
5import { entities, facts } from "drizzle/schema";
6import * as driz from "drizzle-orm";
7import { Attribute, Attributes, FilterAttributes } from "./attributes";
8import { v7 } from "uuid";
9import { DeepReadonly } from "replicache";
10
11type WriteCacheEntry =
12 | { type: "put"; fact: Fact<any> }
13 | { type: "del"; fact: { id: string } };
14
15export function cachedServerMutationContext(
16 tx: PgTransaction<any, any, any>,
17 permission_token_id: string,
18 token_rights: PermissionToken["permission_token_rights"],
19) {
20 let writeCache: WriteCacheEntry[] = [];
21 let eavCache = new Map<string, DeepReadonly<Fact<Attribute>>[]>();
22 let permissionsCache: { [key: string]: boolean } = {};
23 let entitiesCache: { set: string; id: string }[] = [];
24 let deleteEntitiesCache: string[] = [];
25 let textAttributeWriteCache = {} as {
26 [entityAttribute: string]: { [clientID: string]: string };
27 };
28
29 const scanIndex = {
30 async eav<A extends Attribute>(entity: string, attribute: A) {
31 let cached = eavCache.get(`${entity}-${attribute}`) as DeepReadonly<
32 Fact<A>
33 >[];
34 let baseFacts: DeepReadonly<Fact<A>>[];
35 if (deleteEntitiesCache.includes(entity)) return [];
36 if (cached) baseFacts = cached;
37 else {
38 cached = (await tx
39 .select({
40 id: facts.id,
41 data: facts.data,
42 entity: facts.entity,
43 attribute: facts.attribute,
44 })
45 .from(facts)
46 .where(
47 driz.and(
48 driz.eq(facts.attribute, attribute),
49 driz.eq(facts.entity, entity),
50 ),
51 )) as DeepReadonly<Fact<A>>[];
52 }
53 cached = cached.filter(
54 (f) =>
55 !writeCache.find((wc) => wc.type === "del" && wc.fact.id === f.id),
56 );
57 let newlyWrittenFacts = writeCache.filter(
58 (f) =>
59 f.type === "put" &&
60 f.fact.attribute === attribute &&
61 f.fact.entity === entity,
62 );
63 return [
64 ...cached,
65 ...newlyWrittenFacts.map((f) => f.fact as Fact<A>),
66 ].filter(
67 (f) =>
68 !(
69 (f.data.type === "reference" ||
70 f.data.type === "ordered-reference" ||
71 f.data.type === "spatial-reference") &&
72 deleteEntitiesCache.includes(f.data.value)
73 ),
74 ) as DeepReadonly<Fact<A>>[];
75 },
76 };
77 let getContext = (clientID: string, mutationID: number) => {
78 let ctx: MutationContext & {
79 checkPermission: (entity: string) => Promise<boolean>;
80 } = {
81 scanIndex,
82 permission_token_id,
83 async runOnServer(cb) {
84 return cb({ supabase: supabaseServerClient });
85 },
86 async checkPermission(entity: string) {
87 if (deleteEntitiesCache.includes(entity)) return false;
88 let cachedEntity = entitiesCache.find((e) => e.id === entity);
89 if (cachedEntity) {
90 return !!token_rights.find(
91 (r) => r.entity_set === cachedEntity?.set && r.write === true,
92 );
93 }
94 if (permissionsCache[entity] !== undefined)
95 return permissionsCache[entity];
96 let [permission_set] = await tx
97 .select({ entity_set: entities.set })
98 .from(entities)
99 .where(driz.eq(entities.id, entity));
100 let hasPermission =
101 !!permission_set &&
102 !!token_rights.find(
103 (r) =>
104 r.entity_set === permission_set.entity_set && r.write == true,
105 );
106 permissionsCache[entity] = hasPermission;
107 return hasPermission;
108 },
109 async runOnClient(_cb) {},
110 async createEntity({ entityID, permission_set }) {
111 if (
112 !token_rights.find(
113 (r) => r.entity_set === permission_set && r.write === true,
114 )
115 ) {
116 return false;
117 }
118 if (!entitiesCache.find((e) => e.id === entityID))
119 entitiesCache.push({ set: permission_set, id: entityID });
120 deleteEntitiesCache = deleteEntitiesCache.filter((e) => e !== entityID);
121 return true;
122 },
123 async deleteEntity(entity) {
124 if (!(await this.checkPermission(entity))) return;
125 deleteEntitiesCache.push(entity);
126 entitiesCache = entitiesCache.filter((e) => e.id !== entity);
127 writeCache = writeCache.filter(
128 (f) =>
129 f.type !== "put" ||
130 (f.fact.entity !== entity && f.fact.data.value !== entity),
131 );
132 },
133 async assertFact(f) {
134 if (!f.entity) return;
135 let attribute = Attributes[f.attribute as Attribute];
136 if (!attribute) return;
137 let id = f.id || v7();
138 let data = { ...f.data };
139 if (!(await this.checkPermission(f.entity))) return;
140 if (attribute.cardinality === "one") {
141 let existingFact = await scanIndex.eav(f.entity, f.attribute);
142 if (existingFact[0]) {
143 id = existingFact[0].id;
144 }
145 }
146 writeCache = writeCache.filter((f) => f.fact.id !== id);
147 writeCache.push({
148 type: "put",
149 fact: {
150 id: id,
151 entity: f.entity,
152 data: data,
153 attribute: f.attribute,
154 },
155 });
156 },
157 async retractFact(factID) {
158 let cachedFact = writeCache.find(
159 (f) => f.type === "put" && f.fact.id === factID,
160 );
161 let entity: string | undefined;
162 if (cachedFact && cachedFact.type === "put") {
163 entity = cachedFact.fact.entity;
164 } else {
165 let [row] = await tx
166 .select({ entity: facts.entity })
167 .from(facts)
168 .where(driz.eq(facts.id, factID));
169 entity = row?.entity;
170 }
171 if (!entity || !(await this.checkPermission(entity))) return;
172 writeCache = writeCache.filter((f) => f.fact.id !== factID);
173 writeCache.push({ type: "del", fact: { id: factID } });
174 },
175 };
176 return ctx;
177 };
178 let flush = async () => {
179 let flushStart = performance.now();
180 let timeInsertingEntities = 0;
181 let timeProcessingFactWrites = 0;
182 let timeDeletingEntities = 0;
183 let timeDeletingFacts = 0;
184 let timeCacheCleanup = 0;
185
186 // Insert entities
187 let entityInsertStart = performance.now();
188 if (entitiesCache.length > 0)
189 await tx
190 .insert(entities)
191 .values(entitiesCache.map((e) => ({ set: e.set, id: e.id })))
192 .onConflictDoNothing({ target: entities.id });
193 timeInsertingEntities = performance.now() - entityInsertStart;
194
195 // Process fact writes
196 let factWritesStart = performance.now();
197 let factWrites = writeCache.flatMap((f) =>
198 f.type === "del" ? [] : [f.fact],
199 );
200 if (factWrites.length > 0) {
201 await tx
202 .insert(facts)
203 .values(
204 factWrites.map((f) => ({
205 id: f.id,
206 entity: f.entity,
207 data: driz.sql`${f.data}::jsonb`,
208 attribute: f.attribute,
209 })),
210 )
211 .onConflictDoUpdate({
212 target: facts.id,
213 set: {
214 data: driz.sql`excluded.data`,
215 entity: driz.sql`excluded.entity`,
216 },
217 });
218 }
219 timeProcessingFactWrites = performance.now() - factWritesStart;
220
221 // Delete entities
222 let entityDeleteStart = performance.now();
223 if (deleteEntitiesCache.length > 0)
224 await tx
225 .delete(entities)
226 .where(driz.inArray(entities.id, deleteEntitiesCache));
227 timeDeletingEntities = performance.now() - entityDeleteStart;
228
229 // Delete facts
230 let factDeleteStart = performance.now();
231 let factDeletes = writeCache.flatMap((f) =>
232 f.type === "put" ? [] : [f.fact.id],
233 );
234 if (factDeletes.length > 0 || deleteEntitiesCache.length > 0) {
235 const conditions = [];
236 if (factDeletes.length > 0) {
237 conditions.push(driz.inArray(facts.id, factDeletes));
238 }
239 if (deleteEntitiesCache.length > 0) {
240 conditions.push(
241 driz.and(
242 driz.sql`(data->>'type' = 'ordered-reference' or data->>'type' = 'reference' or data->>'type' = 'spatial-reference')`,
243 driz.inArray(driz.sql`data->>'value'`, deleteEntitiesCache),
244 ),
245 );
246 }
247 if (conditions.length > 0) {
248 await tx.delete(facts).where(driz.or(...conditions));
249 }
250 }
251 timeDeletingFacts = performance.now() - factDeleteStart;
252
253 // Cache cleanup
254 let cacheCleanupStart = performance.now();
255 writeCache = [];
256 eavCache.clear();
257 permissionsCache = {};
258 entitiesCache = [];
259 permissionsCache = {};
260 deleteEntitiesCache = [];
261 timeCacheCleanup = performance.now() - cacheCleanupStart;
262
263 let totalFlushTime = performance.now() - flushStart;
264 console.log(`
265Flush Performance Breakdown (${totalFlushTime.toFixed(2)}ms):
266==========================================
267Entity Insertions (${entitiesCache.length} entities): ${timeInsertingEntities.toFixed(2)}ms
268Fact Processing (${factWrites.length} facts): ${timeProcessingFactWrites.toFixed(2)}ms
269Entity Deletions (${deleteEntitiesCache.length} entities): ${timeDeletingEntities.toFixed(2)}ms
270Fact Deletions: ${timeDeletingFacts.toFixed(2)}ms
271Cache Cleanup: ${timeCacheCleanup.toFixed(2)}ms
272 `);
273 };
274
275 return {
276 getContext,
277 flush,
278 };
279}