tangled
alpha
login
or
join now
leaflet.pub
/
leaflet
289
fork
atom
a tool for shared writing and social publishing
289
fork
atom
overview
issues
28
pulls
pipelines
use api route for push/pull instead of server action
awarm.space
1 year ago
ec999e66
c3ff21e2
+364
-151
8 changed files
expand all
collapse all
unified
split
app
api
rpc
[command]
pull.ts
push.ts
route.ts
client.ts
lib.ts
src
replicache
index.tsx
pull.ts
push.ts
+103
app/api/rpc/[command]/pull.ts
···
1
1
+
import { z } from "zod";
2
2
+
import {
3
3
+
PullRequest,
4
4
+
PullResponseV1,
5
5
+
VersionNotSupportedResponse,
6
6
+
} from "replicache";
7
7
+
import { Database } from "supabase/database.types";
8
8
+
import { Fact } from "src/replicache";
9
9
+
import postgres from "postgres";
10
10
+
import { drizzle } from "drizzle-orm/postgres-js";
11
11
+
import { FactWithIndexes, getClientGroup } from "src/replicache/utils";
12
12
+
import { Attributes } from "src/replicache/attributes";
13
13
+
import { permission_tokens } from "drizzle/schema";
14
14
+
import { eq } from "drizzle-orm";
15
15
+
import { makeRoute } from "../lib";
16
16
+
import { Env } from "./route";
17
17
+
18
18
+
// First define the sub-types for V0 and V1 requests
19
19
+
const pullRequestV0 = z.object({
20
20
+
pullVersion: z.literal(0),
21
21
+
schemaVersion: z.string(),
22
22
+
profileID: z.string(),
23
23
+
cookie: z.any(), // ReadonlyJSONValue
24
24
+
clientID: z.string(),
25
25
+
lastMutationID: z.number(),
26
26
+
});
27
27
+
28
28
+
// For the Cookie type used in V1
29
29
+
const cookieType = z.union([
30
30
+
z.null(),
31
31
+
z.string(),
32
32
+
z.number(),
33
33
+
z
34
34
+
.object({
35
35
+
order: z.union([z.string(), z.number()]),
36
36
+
})
37
37
+
.and(z.record(z.string(), z.any())), // ReadonlyJSONValue with order property
38
38
+
]);
39
39
+
40
40
+
const pullRequestV1 = z.object({
41
41
+
pullVersion: z.literal(1),
42
42
+
schemaVersion: z.string(),
43
43
+
profileID: z.string(),
44
44
+
cookie: cookieType,
45
45
+
clientGroupID: z.string(),
46
46
+
});
47
47
+
48
48
+
// Combined PullRequest type
49
49
+
const PullRequestSchema = z.union([pullRequestV0, pullRequestV1]);
50
50
+
51
51
+
export const pull = makeRoute({
52
52
+
route: "pull",
53
53
+
input: z.object({ pullRequest: PullRequestSchema, token_id: z.string() }),
54
54
+
handler: async ({ pullRequest, token_id }, { db, supabase }: Env) => {
55
55
+
let body = pullRequest;
56
56
+
if (body.pullVersion === 0) return versionNotSupported;
57
57
+
let [token] = await db
58
58
+
.select({ root_entity: permission_tokens.root_entity })
59
59
+
.from(permission_tokens)
60
60
+
.where(eq(permission_tokens.id, token_id));
61
61
+
let facts: {
62
62
+
attribute: string;
63
63
+
created_at: string;
64
64
+
data: any;
65
65
+
entity: string;
66
66
+
id: string;
67
67
+
updated_at: string | null;
68
68
+
version: number;
69
69
+
}[] = [];
70
70
+
let clientGroup = {};
71
71
+
if (token) {
72
72
+
let { data } = await supabase.rpc("get_facts", {
73
73
+
root: token.root_entity,
74
74
+
});
75
75
+
76
76
+
clientGroup = await getClientGroup(db, body.clientGroupID);
77
77
+
facts = data || [];
78
78
+
}
79
79
+
80
80
+
return {
81
81
+
cookie: Date.now(),
82
82
+
lastMutationIDChanges: clientGroup,
83
83
+
patch: [
84
84
+
{ op: "clear" },
85
85
+
{ op: "put", key: "initialized", value: true },
86
86
+
...facts.map((f) => {
87
87
+
return {
88
88
+
op: "put",
89
89
+
key: f.id,
90
90
+
value: FactWithIndexes(
91
91
+
f as unknown as Fact<keyof typeof Attributes>,
92
92
+
),
93
93
+
} as const;
94
94
+
}),
95
95
+
],
96
96
+
} as PullResponseV1;
97
97
+
},
98
98
+
});
99
99
+
100
100
+
const versionNotSupported: VersionNotSupportedResponse = {
101
101
+
error: "VersionNotSupported",
102
102
+
versionType: "pull",
103
103
+
};
+111
app/api/rpc/[command]/push.ts
···
1
1
+
import { PushResponse } from "replicache";
2
2
+
import { serverMutationContext } from "src/replicache/serverMutationContext";
3
3
+
import { mutations } from "src/replicache/mutations";
4
4
+
import { eq } from "drizzle-orm";
5
5
+
import { permission_token_rights, replicache_clients } from "drizzle/schema";
6
6
+
import { getClientGroup } from "src/replicache/utils";
7
7
+
import { makeRoute } from "../lib";
8
8
+
import { z } from "zod";
9
9
+
import { Env } from "./route";
10
10
+
11
11
+
const mutationV0Schema = z.object({
12
12
+
id: z.number(),
13
13
+
name: z.string(),
14
14
+
args: z.unknown(),
15
15
+
timestamp: z.number(),
16
16
+
});
17
17
+
18
18
+
const mutationV1Schema = mutationV0Schema.extend({
19
19
+
clientID: z.string(),
20
20
+
});
21
21
+
22
22
+
const pushRequestV0Schema = z.object({
23
23
+
pushVersion: z.literal(0),
24
24
+
schemaVersion: z.string(),
25
25
+
profileID: z.string(),
26
26
+
clientID: z.string(),
27
27
+
mutations: z.array(mutationV0Schema),
28
28
+
});
29
29
+
30
30
+
const pushRequestV1Schema = z.object({
31
31
+
pushVersion: z.literal(1),
32
32
+
schemaVersion: z.string(),
33
33
+
profileID: z.string(),
34
34
+
clientGroupID: z.string(),
35
35
+
mutations: z.array(mutationV1Schema),
36
36
+
});
37
37
+
38
38
+
// Combine both versions into final PushRequest schema
39
39
+
const pushRequestSchema = z.discriminatedUnion("pushVersion", [
40
40
+
pushRequestV0Schema,
41
41
+
pushRequestV1Schema,
42
42
+
]);
43
43
+
44
44
+
type PushRequestZ = z.infer<typeof pushRequestSchema>;
45
45
+
46
46
+
export const push = makeRoute({
47
47
+
route: "push",
48
48
+
input: z.object({
49
49
+
pushRequest: pushRequestSchema,
50
50
+
rootEntity: z.string(),
51
51
+
token: z.object({ id: z.string() }),
52
52
+
}),
53
53
+
handler: async (
54
54
+
{ pushRequest, rootEntity, token },
55
55
+
{ db, supabase }: Env,
56
56
+
) => {
57
57
+
if (pushRequest.pushVersion !== 1) {
58
58
+
return {
59
59
+
result: { error: "VersionNotSupported", versionType: "push" } as const,
60
60
+
};
61
61
+
}
62
62
+
let clientGroup = await getClientGroup(db, pushRequest.clientGroupID);
63
63
+
let token_rights = await db
64
64
+
.select()
65
65
+
.from(permission_token_rights)
66
66
+
.where(eq(permission_token_rights.token, token.id));
67
67
+
for (let mutation of pushRequest.mutations) {
68
68
+
let lastMutationID = clientGroup[mutation.clientID] || 0;
69
69
+
if (mutation.id <= lastMutationID) continue;
70
70
+
clientGroup[mutation.clientID] = mutation.id;
71
71
+
let name = mutation.name as keyof typeof mutations;
72
72
+
if (!mutations[name]) {
73
73
+
continue;
74
74
+
}
75
75
+
await db.transaction(async (tx) => {
76
76
+
try {
77
77
+
await mutations[name](
78
78
+
mutation.args as any,
79
79
+
serverMutationContext(tx, token_rights),
80
80
+
);
81
81
+
} catch (e) {
82
82
+
console.log(
83
83
+
`Error occured while running mutation: ${name}`,
84
84
+
JSON.stringify(e),
85
85
+
JSON.stringify(mutation, null, 2),
86
86
+
);
87
87
+
}
88
88
+
await tx
89
89
+
.insert(replicache_clients)
90
90
+
.values({
91
91
+
client_group: pushRequest.clientGroupID,
92
92
+
client_id: mutation.clientID,
93
93
+
last_mutation: mutation.id,
94
94
+
})
95
95
+
.onConflictDoUpdate({
96
96
+
target: replicache_clients.client_id,
97
97
+
set: { last_mutation: mutation.id },
98
98
+
});
99
99
+
});
100
100
+
}
101
101
+
102
102
+
let channel = supabase.channel(`rootEntity:${rootEntity}`);
103
103
+
await channel.send({
104
104
+
type: "broadcast",
105
105
+
event: "poke",
106
106
+
payload: { message: "poke" },
107
107
+
});
108
108
+
supabase.removeChannel(channel);
109
109
+
return { result: undefined } as const;
110
110
+
},
111
111
+
});
+29
app/api/rpc/[command]/route.ts
···
1
1
+
import { drizzle } from "drizzle-orm/postgres-js";
2
2
+
import { makeRouter } from "../lib";
3
3
+
import { push } from "./push";
4
4
+
import postgres from "postgres";
5
5
+
import { createClient } from "@supabase/supabase-js";
6
6
+
import { Database } from "supabase/database.types";
7
7
+
import { pull } from "./pull";
8
8
+
9
9
+
const client = postgres(process.env.DB_URL as string, { idle_timeout: 5 });
10
10
+
let supabase = createClient<Database>(
11
11
+
process.env.NEXT_PUBLIC_SUPABASE_API_URL as string,
12
12
+
process.env.SUPABASE_SERVICE_ROLE_KEY as string,
13
13
+
);
14
14
+
const db = drizzle(client);
15
15
+
16
16
+
const Env = {
17
17
+
supabase,
18
18
+
db,
19
19
+
};
20
20
+
export type Env = typeof Env;
21
21
+
export type Routes = typeof Routes;
22
22
+
let Routes = [push, pull];
23
23
+
export async function POST(
24
24
+
req: Request,
25
25
+
{ params }: { params: { command: string } },
26
26
+
) {
27
27
+
let router = makeRouter(Routes);
28
28
+
return router(params.command, req, Env);
29
29
+
}
+4
app/api/rpc/client.ts
···
1
1
+
import { makeAPIClient } from "./lib";
2
2
+
import type { Routes } from "./[command]/route";
3
3
+
4
4
+
export const callRPC = makeAPIClient<Routes>("/api/rpc");
+104
app/api/rpc/lib.ts
···
1
1
+
import { ZodObject, ZodRawShape, ZodUnion, z } from "zod";
2
2
+
3
3
+
type Route<
4
4
+
Cmd extends string,
5
5
+
Input extends ZodObject<ZodRawShape> | ZodUnion<any>,
6
6
+
Result extends object,
7
7
+
Env extends {},
8
8
+
> = {
9
9
+
route: Cmd;
10
10
+
input: Input;
11
11
+
handler: (msg: z.infer<Input>, env: Env, request: Request) => Promise<Result>;
12
12
+
};
13
13
+
14
14
+
type Routes<Env extends {}> = Route<string, any, any, Env>[];
15
15
+
16
16
+
export function makeAPIClient<R extends Routes<any>>(basePath: string) {
17
17
+
return async <T extends R[number]["route"]>(
18
18
+
route: T,
19
19
+
data: z.infer<Extract<R[number], { route: T }>["input"]>,
20
20
+
) => {
21
21
+
let result = await fetch(`${basePath}/${route}`, {
22
22
+
body: JSON.stringify(data),
23
23
+
method: "POST",
24
24
+
headers: { "Content-type": "application/json" },
25
25
+
});
26
26
+
return result.json() as Promise<
27
27
+
Awaited<ReturnType<Extract<R[number], { route: T }>["handler"]>>
28
28
+
>;
29
29
+
};
30
30
+
}
31
31
+
32
32
+
export const makeRouter = <Env extends {}>(routes: Routes<Env>) => {
33
33
+
return async (route: string, request: Request, env: Env) => {
34
34
+
let status = 200;
35
35
+
let result;
36
36
+
switch (request.method) {
37
37
+
case "POST": {
38
38
+
let handler = routes.find((f) => f.route === route);
39
39
+
if (!handler) {
40
40
+
status = 404;
41
41
+
result = { error: `route ${route} not Found` };
42
42
+
break;
43
43
+
}
44
44
+
45
45
+
let body;
46
46
+
if (handler.input)
47
47
+
try {
48
48
+
body = await request.json();
49
49
+
} catch (e) {
50
50
+
result = { error: "Request body must be valid JSON" };
51
51
+
status = 400;
52
52
+
break;
53
53
+
}
54
54
+
55
55
+
let msg = handler.input.safeParse(body);
56
56
+
if (!msg.success) {
57
57
+
status = 400;
58
58
+
result = msg.error;
59
59
+
break;
60
60
+
}
61
61
+
try {
62
62
+
result = (await handler.handler(
63
63
+
msg.data as any,
64
64
+
env,
65
65
+
request,
66
66
+
)) as object;
67
67
+
break;
68
68
+
} catch (e) {
69
69
+
console.log(e);
70
70
+
status = 500;
71
71
+
result = {
72
72
+
error: "An error occured while handling this request",
73
73
+
errorText: (e as Error).toString(),
74
74
+
};
75
75
+
break;
76
76
+
}
77
77
+
}
78
78
+
default:
79
79
+
status = 404;
80
80
+
result = { error: "Only POST Supported" };
81
81
+
}
82
82
+
83
83
+
let res = new Response(JSON.stringify(result), {
84
84
+
status,
85
85
+
headers: {
86
86
+
"Access-Control-Allow-Credentials": "true",
87
87
+
"Content-type": "application/json;charset=UTF-8",
88
88
+
"Access-Control-Allow-Origin": "*",
89
89
+
"Access-Control-Allow-Methods": "GET,HEAD,POST,OPTIONS",
90
90
+
},
91
91
+
});
92
92
+
//result.headers?.forEach((h) => res.headers.append(h[0], h[1]));
93
93
+
return res;
94
94
+
};
95
95
+
};
96
96
+
97
97
+
export function makeRoute<
98
98
+
Cmd extends string,
99
99
+
Input extends ZodObject<ZodRawShape> | ZodUnion<any>,
100
100
+
Result extends object,
101
101
+
Env extends {},
102
102
+
>(d: Route<Cmd, Input, Result, Env>) {
103
103
+
return d;
104
104
+
}
+13
-4
src/replicache/index.tsx
···
8
8
Replicache,
9
9
WriteTransaction,
10
10
} from "replicache";
11
11
-
import { Pull } from "./pull";
12
11
import { mutations } from "./mutations";
13
12
import { Attributes, Data, FilterAttributes } from "./attributes";
14
14
-
import { Push } from "./push";
15
13
import { clientMutationContext } from "./clientMutationContext";
16
14
import { supabaseBrowserClient } from "supabase/browserClient";
15
15
+
import { callRPC } from "app/api/rpc/client";
17
16
18
17
export type Fact<A extends keyof typeof Attributes> = {
19
18
id: string;
···
82
81
mutations: pushRequest.mutations.slice(0, 250),
83
82
} as PushRequest;
84
83
return {
85
85
-
response: await Push(smolpushRequest, props.name, props.token),
84
84
+
response: (
85
85
+
await callRPC("push", {
86
86
+
pushRequest: smolpushRequest,
87
87
+
token: props.token,
88
88
+
rootEntity: props.name,
89
89
+
})
90
90
+
).result,
86
91
httpRequestInfo: { errorMessage: "", httpStatusCode: 200 },
87
92
};
88
93
},
89
94
puller: async (pullRequest) => {
95
95
+
let res = await callRPC("pull", {
96
96
+
pullRequest,
97
97
+
token_id: props.token.id,
98
98
+
});
90
99
return {
91
91
-
response: await Pull(pullRequest, props.token.id),
100
100
+
response: res,
92
101
httpRequestInfo: { errorMessage: "", httpStatusCode: 200 },
93
102
};
94
103
},
-71
src/replicache/pull.ts
···
1
1
-
"use server";
2
2
-
3
3
-
import { createClient } from "@supabase/supabase-js";
4
4
-
import {
5
5
-
PullRequest,
6
6
-
PullResponseV1,
7
7
-
VersionNotSupportedResponse,
8
8
-
} from "replicache";
9
9
-
import { Database } from "supabase/database.types";
10
10
-
import { Fact } from ".";
11
11
-
import postgres from "postgres";
12
12
-
import { drizzle } from "drizzle-orm/postgres-js";
13
13
-
import { FactWithIndexes, getClientGroup } from "./utils";
14
14
-
import { Attributes } from "./attributes";
15
15
-
import { permission_tokens } from "drizzle/schema";
16
16
-
import { eq } from "drizzle-orm";
17
17
-
let supabase = createClient<Database>(
18
18
-
process.env.NEXT_PUBLIC_SUPABASE_API_URL as string,
19
19
-
process.env.SUPABASE_SERVICE_ROLE_KEY as string,
20
20
-
);
21
21
-
22
22
-
const client = postgres(process.env.DB_URL as string, { idle_timeout: 5 });
23
23
-
const db = drizzle(client);
24
24
-
export async function Pull(
25
25
-
body: PullRequest,
26
26
-
token_id: string,
27
27
-
): Promise<PullResponseV1> {
28
28
-
console.log("Pull");
29
29
-
if (body.pullVersion === 0) return versionNotSupported;
30
30
-
let [token] = await db
31
31
-
.select({ root_entity: permission_tokens.root_entity })
32
32
-
.from(permission_tokens)
33
33
-
.where(eq(permission_tokens.id, token_id));
34
34
-
let facts: {
35
35
-
attribute: string;
36
36
-
created_at: string;
37
37
-
data: any;
38
38
-
entity: string;
39
39
-
id: string;
40
40
-
updated_at: string | null;
41
41
-
version: number;
42
42
-
}[] = [];
43
43
-
let clientGroup = {};
44
44
-
if (token) {
45
45
-
let { data } = await supabase.rpc("get_facts", { root: token.root_entity });
46
46
-
47
47
-
clientGroup = await getClientGroup(db, body.clientGroupID);
48
48
-
facts = data || [];
49
49
-
}
50
50
-
51
51
-
return {
52
52
-
cookie: Date.now(),
53
53
-
lastMutationIDChanges: clientGroup,
54
54
-
patch: [
55
55
-
{ op: "clear" },
56
56
-
{ op: "put", key: "initialized", value: true },
57
57
-
...facts.map((f) => {
58
58
-
return {
59
59
-
op: "put",
60
60
-
key: f.id,
61
61
-
value: FactWithIndexes(f as unknown as Fact<keyof typeof Attributes>),
62
62
-
} as const;
63
63
-
}),
64
64
-
],
65
65
-
};
66
66
-
}
67
67
-
68
68
-
const versionNotSupported: VersionNotSupportedResponse = {
69
69
-
error: "VersionNotSupported",
70
70
-
versionType: "pull",
71
71
-
};
-76
src/replicache/push.ts
···
1
1
-
"use server";
2
2
-
import { PushRequest, PushResponse } from "replicache";
3
3
-
import { serverMutationContext } from "./serverMutationContext";
4
4
-
import { mutations } from "./mutations";
5
5
-
import { drizzle } from "drizzle-orm/postgres-js";
6
6
-
import { eq } from "drizzle-orm";
7
7
-
import postgres from "postgres";
8
8
-
import { permission_token_rights, replicache_clients } from "drizzle/schema";
9
9
-
import { getClientGroup } from "./utils";
10
10
-
import { createClient } from "@supabase/supabase-js";
11
11
-
import { Database } from "supabase/database.types";
12
12
-
13
13
-
const client = postgres(process.env.DB_URL as string, { idle_timeout: 5 });
14
14
-
let supabase = createClient<Database>(
15
15
-
process.env.NEXT_PUBLIC_SUPABASE_API_URL as string,
16
16
-
process.env.SUPABASE_SERVICE_ROLE_KEY as string,
17
17
-
);
18
18
-
const db = drizzle(client);
19
19
-
export async function Push(
20
20
-
pushRequest: PushRequest,
21
21
-
rootEntity: string,
22
22
-
token: { id: string },
23
23
-
): Promise<PushResponse | undefined> {
24
24
-
console.log("Push");
25
25
-
if (pushRequest.pushVersion !== 1) {
26
26
-
return { error: "VersionNotSupported", versionType: "push" };
27
27
-
}
28
28
-
let clientGroup = await getClientGroup(db, pushRequest.clientGroupID);
29
29
-
let token_rights = await db
30
30
-
.select()
31
31
-
.from(permission_token_rights)
32
32
-
.where(eq(permission_token_rights.token, token.id));
33
33
-
for (let mutation of pushRequest.mutations) {
34
34
-
let lastMutationID = clientGroup[mutation.clientID] || 0;
35
35
-
if (mutation.id <= lastMutationID) continue;
36
36
-
clientGroup[mutation.clientID] = mutation.id;
37
37
-
let name = mutation.name as keyof typeof mutations;
38
38
-
if (!mutations[name]) {
39
39
-
continue;
40
40
-
}
41
41
-
await db.transaction(async (tx) => {
42
42
-
try {
43
43
-
await mutations[name](
44
44
-
mutation.args as any,
45
45
-
serverMutationContext(tx, token_rights),
46
46
-
);
47
47
-
} catch (e) {
48
48
-
console.log(
49
49
-
`Error occured while running mutation: ${name}`,
50
50
-
JSON.stringify(e),
51
51
-
JSON.stringify(mutation, null, 2),
52
52
-
);
53
53
-
}
54
54
-
await tx
55
55
-
.insert(replicache_clients)
56
56
-
.values({
57
57
-
client_group: pushRequest.clientGroupID,
58
58
-
client_id: mutation.clientID,
59
59
-
last_mutation: mutation.id,
60
60
-
})
61
61
-
.onConflictDoUpdate({
62
62
-
target: replicache_clients.client_id,
63
63
-
set: { last_mutation: mutation.id },
64
64
-
});
65
65
-
});
66
66
-
}
67
67
-
68
68
-
let channel = supabase.channel(`rootEntity:${rootEntity}`);
69
69
-
await channel.send({
70
70
-
type: "broadcast",
71
71
-
event: "poke",
72
72
-
payload: { message: "poke" },
73
73
-
});
74
74
-
supabase.removeChannel(channel);
75
75
-
return undefined;
76
76
-
}