tangled
alpha
login
or
join now
leaflet.pub
/
leaflet
289
fork
atom
a tool for shared writing and social publishing
289
fork
atom
overview
issues
29
pulls
pipelines
put all of push/pull in a transaction
awarm.space
1 year ago
c500383e
ec999e66
+54
-39
2 changed files
expand all
collapse all
unified
split
app
api
rpc
[command]
pull.ts
push.ts
+37
-23
app/api/rpc/[command]/pull.ts
···
11
11
import { FactWithIndexes, getClientGroup } from "src/replicache/utils";
12
12
import { Attributes } from "src/replicache/attributes";
13
13
import { permission_tokens } from "drizzle/schema";
14
14
-
import { eq } from "drizzle-orm";
14
14
+
import { eq, sql } from "drizzle-orm";
15
15
import { makeRoute } from "../lib";
16
16
import { Env } from "./route";
17
17
···
54
54
handler: async ({ pullRequest, token_id }, { db, supabase }: Env) => {
55
55
let body = pullRequest;
56
56
if (body.pullVersion === 0) return versionNotSupported;
57
57
-
let [token] = await db
58
58
-
.select({ root_entity: permission_tokens.root_entity })
59
59
-
.from(permission_tokens)
60
60
-
.where(eq(permission_tokens.id, token_id));
61
61
-
let facts: {
62
62
-
attribute: string;
63
63
-
created_at: string;
64
64
-
data: any;
65
65
-
entity: string;
66
66
-
id: string;
67
67
-
updated_at: string | null;
68
68
-
version: number;
69
69
-
}[] = [];
70
70
-
let clientGroup = {};
71
71
-
if (token) {
72
72
-
let { data } = await supabase.rpc("get_facts", {
73
73
-
root: token.root_entity,
74
74
-
});
57
57
+
let [facts, clientGroup] = await db.transaction(async (tx) => {
58
58
+
let [token] = await tx
59
59
+
.select({ root_entity: permission_tokens.root_entity })
60
60
+
.from(permission_tokens)
61
61
+
.where(eq(permission_tokens.id, token_id));
75
62
76
76
-
clientGroup = await getClientGroup(db, body.clientGroupID);
77
77
-
facts = data || [];
78
78
-
}
63
63
+
let facts: {
64
64
+
attribute: string;
65
65
+
created_at: string;
66
66
+
data: any;
67
67
+
entity: string;
68
68
+
id: string;
69
69
+
updated_at: string | null;
70
70
+
version: number;
71
71
+
}[] = [];
72
72
+
let clientGroup = {};
73
73
+
74
74
+
if (token) {
75
75
+
let data = (await tx.execute(
76
76
+
sql`select * from get_facts(${token.root_entity}) as get_facts`,
77
77
+
)) as {
78
78
+
attribute: string;
79
79
+
created_at: string;
80
80
+
data: any;
81
81
+
entity: string;
82
82
+
id: string;
83
83
+
updated_at: string | null;
84
84
+
version: number;
85
85
+
}[];
86
86
+
87
87
+
clientGroup = await getClientGroup(tx, body.clientGroupID);
88
88
+
facts = data || [];
89
89
+
return [facts, clientGroup];
90
90
+
}
91
91
+
return [];
92
92
+
});
79
93
80
94
return {
81
95
cookie: Date.now(),
···
83
97
patch: [
84
98
{ op: "clear" },
85
99
{ op: "put", key: "initialized", value: true },
86
86
-
...facts.map((f) => {
100
100
+
...(facts || []).map((f) => {
87
101
return {
88
102
op: "put",
89
103
key: f.id,
+17
-16
app/api/rpc/[command]/push.ts
···
59
59
result: { error: "VersionNotSupported", versionType: "push" } as const,
60
60
};
61
61
}
62
62
-
let clientGroup = await getClientGroup(db, pushRequest.clientGroupID);
63
63
-
let token_rights = await db
64
64
-
.select()
65
65
-
.from(permission_token_rights)
66
66
-
.where(eq(permission_token_rights.token, token.id));
67
67
-
for (let mutation of pushRequest.mutations) {
68
68
-
let lastMutationID = clientGroup[mutation.clientID] || 0;
69
69
-
if (mutation.id <= lastMutationID) continue;
70
70
-
clientGroup[mutation.clientID] = mutation.id;
71
71
-
let name = mutation.name as keyof typeof mutations;
72
72
-
if (!mutations[name]) {
73
73
-
continue;
74
74
-
}
75
75
-
await db.transaction(async (tx) => {
62
62
+
63
63
+
await db.transaction(async (tx) => {
64
64
+
let clientGroup = await getClientGroup(tx, pushRequest.clientGroupID);
65
65
+
let token_rights = await tx
66
66
+
.select()
67
67
+
.from(permission_token_rights)
68
68
+
.where(eq(permission_token_rights.token, token.id));
69
69
+
for (let mutation of pushRequest.mutations) {
70
70
+
let lastMutationID = clientGroup[mutation.clientID] || 0;
71
71
+
if (mutation.id <= lastMutationID) continue;
72
72
+
clientGroup[mutation.clientID] = mutation.id;
73
73
+
let name = mutation.name as keyof typeof mutations;
74
74
+
if (!mutations[name]) {
75
75
+
continue;
76
76
+
}
76
77
try {
77
78
await mutations[name](
78
79
mutation.args as any,
···
96
97
target: replicache_clients.client_id,
97
98
set: { last_mutation: mutation.id },
98
99
});
99
99
-
});
100
100
-
}
100
100
+
}
101
101
+
});
101
102
102
103
let channel = supabase.channel(`rootEntity:${rootEntity}`);
103
104
await channel.send({