tangled
alpha
login
or
join now
leaflet.pub
/
leaflet
289
fork
atom
a tool for shared writing and social publishing
289
fork
atom
overview
issues
27
pulls
pipelines
add caching to flush operation
awarm.space
6 months ago
6fdcfd64
1d761288
+60
-11
1 changed file
expand all
collapse all
unified
split
src
replicache
cachedServerMutationContext.ts
+60
-11
src/replicache/cachedServerMutationContext.ts
···
168
return ctx;
169
};
170
let flush = async () => {
0
0
0
0
0
0
0
0
0
0
0
171
if (entitiesCache.length > 0)
172
await tx
173
.insert(entities)
174
.values(entitiesCache.map((e) => ({ set: e.set, id: e.id })));
0
0
0
0
175
let factWrites = writeCache.flatMap((f) =>
176
f.type === "del" ? [] : [f.fact],
177
);
···
179
for (let f of factWrites) {
180
let attribute = Attributes[f.attribute as Attribute];
181
let data = f.data;
0
0
0
182
if (attribute.type === "text" && attribute.cardinality === "one") {
183
let values = Object.values(
184
textAttributeWriteCache[`${f.entity}-${f.attribute}`] || {},
185
);
186
if (values.length > 0) {
187
let existingFact = await scanIndex.eav(f.entity, f.attribute);
188
-
if (existingFact[0]) values.push(existingFact[0].data.value);
189
-
let updateBytes = Y.mergeUpdatesV1(
190
-
values.map((v) => base64.toByteArray(v)),
191
-
);
192
-
data.value = base64.fromByteArray(updateBytes);
0
193
}
194
}
0
195
196
-
try {
197
-
await tx
0
0
198
.insert(facts)
199
.values({
200
id: f.id,
···
205
.onConflictDoUpdate({
206
target: facts.id,
207
set: { data: driz.sql`excluded.data` },
208
-
});
209
-
} catch (e) {
210
-
console.log(`error on inserting fact: `, JSON.stringify(e));
211
-
}
0
0
212
}
213
}
0
0
0
0
214
if (deleteEntitiesCache.length > 0)
215
await tx
216
.delete(entities)
217
.where(driz.inArray(entities.id, deleteEntitiesCache));
0
0
0
0
218
let factDeletes = writeCache.flatMap((f) =>
219
f.type === "put" ? [] : [f.fact.id],
220
);
···
235
await tx.delete(facts).where(driz.or(...conditions));
236
}
237
}
0
238
0
0
239
writeCache = [];
240
eavCache.clear();
241
permissionsCache = {};
242
entitiesCache = [];
243
permissionsCache = {};
244
deleteEntitiesCache = [];
0
0
0
0
0
0
0
0
0
0
0
0
0
0
245
};
246
247
return {
···
168
return ctx;
169
};
170
let flush = async () => {
171
+
let flushStart = performance.now();
172
+
let timeInsertingEntities = 0;
173
+
let timeProcessingFactWrites = 0;
174
+
let timeTextMerging = 0;
175
+
let timeFactInserts = 0;
176
+
let timeDeletingEntities = 0;
177
+
let timeDeletingFacts = 0;
178
+
let timeCacheCleanup = 0;
179
+
180
+
// Insert entities
181
+
let entityInsertStart = performance.now();
182
if (entitiesCache.length > 0)
183
await tx
184
.insert(entities)
185
.values(entitiesCache.map((e) => ({ set: e.set, id: e.id })));
186
+
timeInsertingEntities = performance.now() - entityInsertStart;
187
+
188
+
// Process fact writes
189
+
let factWritesStart = performance.now();
190
let factWrites = writeCache.flatMap((f) =>
191
f.type === "del" ? [] : [f.fact],
192
);
···
194
for (let f of factWrites) {
195
let attribute = Attributes[f.attribute as Attribute];
196
let data = f.data;
197
+
198
+
// Text merging timing
199
+
let textMergeStart = performance.now();
200
if (attribute.type === "text" && attribute.cardinality === "one") {
201
let values = Object.values(
202
textAttributeWriteCache[`${f.entity}-${f.attribute}`] || {},
203
);
204
if (values.length > 0) {
205
let existingFact = await scanIndex.eav(f.entity, f.attribute);
206
+
if (existingFact[0] && !values.includes(existingFact[0].data.value))
207
+
values.push(existingFact[0].data.value);
208
+
if (values.length > 1)
209
+
data.value = base64.fromByteArray(
210
+
Y.mergeUpdatesV1(values.map((v) => base64.toByteArray(v))),
211
+
);
212
}
213
}
214
+
timeTextMerging += performance.now() - textMergeStart;
215
216
+
// Fact insert timing
217
+
let factInsertStart = performance.now();
218
+
await tx.transaction((tx2) =>
219
+
tx2
220
.insert(facts)
221
.values({
222
id: f.id,
···
227
.onConflictDoUpdate({
228
target: facts.id,
229
set: { data: driz.sql`excluded.data` },
230
+
})
231
+
.catch((e) =>
232
+
console.log(`error on inserting fact: `, JSON.stringify(e)),
233
+
),
234
+
);
235
+
timeFactInserts += performance.now() - factInsertStart;
236
}
237
}
238
+
timeProcessingFactWrites = performance.now() - factWritesStart;
239
+
240
+
// Delete entities
241
+
let entityDeleteStart = performance.now();
242
if (deleteEntitiesCache.length > 0)
243
await tx
244
.delete(entities)
245
.where(driz.inArray(entities.id, deleteEntitiesCache));
246
+
timeDeletingEntities = performance.now() - entityDeleteStart;
247
+
248
+
// Delete facts
249
+
let factDeleteStart = performance.now();
250
let factDeletes = writeCache.flatMap((f) =>
251
f.type === "put" ? [] : [f.fact.id],
252
);
···
267
await tx.delete(facts).where(driz.or(...conditions));
268
}
269
}
270
+
timeDeletingFacts = performance.now() - factDeleteStart;
271
272
+
// Cache cleanup
273
+
let cacheCleanupStart = performance.now();
274
writeCache = [];
275
eavCache.clear();
276
permissionsCache = {};
277
entitiesCache = [];
278
permissionsCache = {};
279
deleteEntitiesCache = [];
280
+
timeCacheCleanup = performance.now() - cacheCleanupStart;
281
+
282
+
let totalFlushTime = performance.now() - flushStart;
283
+
console.log(`
284
+
Flush Performance Breakdown (${totalFlushTime.toFixed(2)}ms):
285
+
==========================================
286
+
Entity Insertions (${entitiesCache.length} entities): ${timeInsertingEntities.toFixed(2)}ms
287
+
Fact Processing (${factWrites.length} facts): ${timeProcessingFactWrites.toFixed(2)}ms
288
+
- Text Merging: ${timeTextMerging.toFixed(2)}ms
289
+
- Fact Inserts (nested transactions): ${timeFactInserts.toFixed(2)}ms
290
+
Entity Deletions (${deleteEntitiesCache.length} entities): ${timeDeletingEntities.toFixed(2)}ms
291
+
Fact Deletions: ${timeDeletingFacts.toFixed(2)}ms
292
+
Cache Cleanup: ${timeCacheCleanup.toFixed(2)}ms
293
+
`);
294
};
295
296
return {