tangled
alpha
login
or
join now
leaflet.pub
/
leaflet
289
fork
atom
a tool for shared writing and social publishing
289
fork
atom
overview
issues
27
pulls
pipelines
don't index bridgy docs at all
awarm.space
6 days ago
43fa8794
b36ddf7f
+77
-73
4 changed files
expand all
collapse all
unified
split
app
api
inngest
client.ts
functions
index_document.ts
route.tsx
appview
index.ts
+5
-2
app/api/inngest/client.ts
···
1
import { Inngest } from "inngest";
2
-
3
import { EventSchemas } from "inngest";
0
4
5
export type Events = {
6
"feeds/index-follows": {
···
51
documentUris?: string[];
52
};
53
};
54
-
"appview/sync-document-metadata": {
55
data: {
56
document_uri: string;
0
57
bsky_post_uri?: string;
0
0
58
};
59
};
60
"user/write-records-to-pds": {
···
1
import { Inngest } from "inngest";
0
2
import { EventSchemas } from "inngest";
3
+
import { Json } from "supabase/database.types";
4
5
export type Events = {
6
"feeds/index-follows": {
···
51
documentUris?: string[];
52
};
53
};
54
+
"appview/index-document": {
55
data: {
56
document_uri: string;
57
+
document_data: Json;
58
bsky_post_uri?: string;
59
+
publication: string | null;
60
+
did: string;
61
};
62
};
63
"user/write-records-to-pds": {
+46
-14
app/api/inngest/functions/sync_document_metadata.ts
app/api/inngest/functions/index_document.ts
···
1
import { inngest } from "../client";
2
import { supabaseServerClient } from "supabase/serverClient";
3
-
import { AtpAgent, AtUri } from "@atproto/api";
4
import { idResolver } from "app/(home-pages)/reader/idResolver";
5
6
// 1m, 2m, 4m, 8m, 16m, 32m, 1h, 2h, 4h, 8h, 8h, 8h (~37h total)
7
const SLEEP_INTERVALS = [
8
-
"1m", "2m", "4m", "8m", "16m", "32m", "1h", "2h", "4h", "8h", "8h", "8h",
0
0
0
0
0
0
0
0
0
0
0
9
];
10
11
-
export const sync_document_metadata = inngest.createFunction(
12
{
13
-
id: "sync_document_metadata_v2",
14
debounce: {
15
key: "event.data.document_uri",
16
period: "60s",
···
18
},
19
concurrency: [{ key: "event.data.document_uri", limit: 1 }],
20
},
21
-
{ event: "appview/sync-document-metadata" },
22
async ({ event, step }) => {
23
-
const { document_uri, bsky_post_uri } = event.data;
24
-
25
-
const did = new AtUri(document_uri).host;
26
27
const handleResult = await step.run("resolve-handle", async () => {
28
const doc = await idResolver.did.resolve(did);
···
39
});
40
if (!handleResult) return { error: "No Handle" };
41
42
-
await step.run("set-indexed", async () => {
43
-
return await supabaseServerClient
0
0
0
0
44
.from("documents")
45
-
.update({ indexed: !handleResult.isBridgy })
46
-
.eq("uri", document_uri)
47
-
.select();
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
48
});
49
50
-
if (!bsky_post_uri || handleResult.isBridgy) {
51
return { handle: handleResult.handle };
52
}
53
···
1
import { inngest } from "../client";
2
import { supabaseServerClient } from "supabase/serverClient";
3
+
import { AtpAgent } from "@atproto/api";
4
import { idResolver } from "app/(home-pages)/reader/idResolver";
5
6
// 1m, 2m, 4m, 8m, 16m, 32m, 1h, 2h, 4h, 8h, 8h, 8h (~37h total)
7
const SLEEP_INTERVALS = [
8
+
"1m",
9
+
"2m",
10
+
"4m",
11
+
"8m",
12
+
"16m",
13
+
"32m",
14
+
"1h",
15
+
"2h",
16
+
"4h",
17
+
"8h",
18
+
"8h",
19
+
"8h",
20
];
21
22
+
export const index_document = inngest.createFunction(
23
{
24
+
id: "index_document_v2",
25
debounce: {
26
key: "event.data.document_uri",
27
period: "60s",
···
29
},
30
concurrency: [{ key: "event.data.document_uri", limit: 1 }],
31
},
32
+
{ event: "appview/index-document" },
33
async ({ event, step }) => {
34
+
const { document_uri, document_data, bsky_post_uri, publication, did } =
35
+
event.data;
0
36
37
const handleResult = await step.run("resolve-handle", async () => {
38
const doc = await idResolver.did.resolve(did);
···
49
});
50
if (!handleResult) return { error: "No Handle" };
51
52
+
if (handleResult.isBridgy) {
53
+
return { handle: handleResult.handle, skipped: true };
54
+
}
55
+
56
+
await step.run("write-document", async () => {
57
+
const docResult = await supabaseServerClient
58
.from("documents")
59
+
.upsert({
60
+
uri: document_uri,
61
+
data: document_data,
62
+
indexed: true,
63
+
});
64
+
if (docResult.error) console.log(docResult.error);
65
+
66
+
if (publication) {
67
+
const docInPubResult = await supabaseServerClient
68
+
.from("documents_in_publications")
69
+
.upsert({
70
+
publication,
71
+
document: document_uri,
72
+
});
73
+
await supabaseServerClient
74
+
.from("documents_in_publications")
75
+
.delete()
76
+
.neq("publication", publication)
77
+
.eq("document", document_uri);
78
+
if (docInPubResult.error) console.log(docInPubResult.error);
79
+
}
80
});
81
82
+
if (!bsky_post_uri) {
83
return { handle: handleResult.handle };
84
}
85
+2
-2
app/api/inngest/route.tsx
···
13
check_oauth_session,
14
} from "./functions/cleanup_expired_oauth_sessions";
15
import { write_records_to_pds } from "./functions/write_records_to_pds";
16
-
import { sync_document_metadata } from "./functions/sync_document_metadata";
17
18
export const { GET, POST, PUT } = serve({
19
client: inngest,
···
29
cleanup_expired_oauth_sessions,
30
check_oauth_session,
31
write_records_to_pds,
32
-
sync_document_metadata,
33
],
34
});
···
13
check_oauth_session,
14
} from "./functions/cleanup_expired_oauth_sessions";
15
import { write_records_to_pds } from "./functions/write_records_to_pds";
16
+
import { index_document } from "./functions/index_document";
17
18
export const { GET, POST, PUT } = serve({
19
client: inngest,
···
29
cleanup_expired_oauth_sessions,
30
check_oauth_session,
31
write_records_to_pds,
32
+
index_document,
33
],
34
});
+24
-55
appview/index.ts
···
104
console.log(record.error);
105
return;
106
}
107
-
let docResult = await supabase.from("documents").upsert({
108
-
uri: evt.uri.toString(),
109
-
data: record.value as Json,
110
-
});
111
-
if (docResult.error) console.log(docResult.error);
112
-
await inngest.send({
113
-
name: "appview/sync-document-metadata",
114
-
data: {
115
-
document_uri: evt.uri.toString(),
116
-
bsky_post_uri: record.value.postRef?.uri,
117
-
},
118
-
});
119
if (record.value.publication) {
120
let publicationURI = new AtUri(record.value.publication);
121
-
122
if (publicationURI.host !== evt.uri.host) {
123
console.log("Unauthorized to create post!");
124
return;
125
}
126
-
let docInPublicationResult = await supabase
127
-
.from("documents_in_publications")
128
-
.upsert({
129
-
publication: record.value.publication,
130
-
document: evt.uri.toString(),
131
-
});
132
-
await supabase
133
-
.from("documents_in_publications")
134
-
.delete()
135
-
.neq("publication", record.value.publication)
136
-
.eq("document", evt.uri.toString());
137
-
138
-
if (docInPublicationResult.error)
139
-
console.log(docInPublicationResult.error);
140
}
0
0
0
0
0
0
0
0
0
0
141
}
142
if (evt.event === "delete") {
143
await supabase.from("documents").delete().eq("uri", evt.uri.toString());
···
271
console.log(record.error);
272
return;
273
}
274
-
let docResult = await supabase.from("documents").upsert({
275
-
uri: evt.uri.toString(),
276
-
data: record.value as Json,
277
-
});
278
-
if (docResult.error) console.log(docResult.error);
279
-
await inngest.send({
280
-
name: "appview/sync-document-metadata",
281
-
data: {
282
-
document_uri: evt.uri.toString(),
283
-
bsky_post_uri: record.value.bskyPostRef?.uri,
284
-
},
285
-
});
286
-
287
// site.standard.document uses "site" field to reference the publication
288
// For documents in publications, site is an AT-URI (at://did:plc:xxx/site.standard.publication/rkey)
289
// For standalone documents, site is an HTTPS URL (https://leaflet.pub/p/did:plc:xxx)
290
// Only link to publications table for AT-URI sites
0
291
if (record.value.site && record.value.site.startsWith("at://")) {
292
let siteURI = new AtUri(record.value.site);
293
-
294
if (siteURI.host !== evt.uri.host) {
295
console.log("Unauthorized to create document in site!");
296
return;
297
}
298
-
let docInPublicationResult = await supabase
299
-
.from("documents_in_publications")
300
-
.upsert({
301
-
publication: record.value.site,
302
-
document: evt.uri.toString(),
303
-
});
304
-
await supabase
305
-
.from("documents_in_publications")
306
-
.delete()
307
-
.neq("publication", record.value.site)
308
-
.eq("document", evt.uri.toString());
309
-
310
-
if (docInPublicationResult.error)
311
-
console.log(docInPublicationResult.error);
312
}
0
0
0
0
0
0
0
0
0
0
313
}
314
if (evt.event === "delete") {
315
await supabase.from("documents").delete().eq("uri", evt.uri.toString());
···
104
console.log(record.error);
105
return;
106
}
107
+
let publication: string | null = null;
0
0
0
0
0
0
0
0
0
0
0
108
if (record.value.publication) {
109
let publicationURI = new AtUri(record.value.publication);
0
110
if (publicationURI.host !== evt.uri.host) {
111
console.log("Unauthorized to create post!");
112
return;
113
}
114
+
publication = record.value.publication;
0
0
0
0
0
0
0
0
0
0
0
0
0
115
}
116
+
await inngest.send({
117
+
name: "appview/index-document",
118
+
data: {
119
+
document_uri: evt.uri.toString(),
120
+
document_data: record.value as Json,
121
+
bsky_post_uri: record.value.postRef?.uri,
122
+
publication,
123
+
did: evt.did,
124
+
},
125
+
});
126
}
127
if (evt.event === "delete") {
128
await supabase.from("documents").delete().eq("uri", evt.uri.toString());
···
256
console.log(record.error);
257
return;
258
}
0
0
0
0
0
0
0
0
0
0
0
0
0
259
// site.standard.document uses "site" field to reference the publication
260
// For documents in publications, site is an AT-URI (at://did:plc:xxx/site.standard.publication/rkey)
261
// For standalone documents, site is an HTTPS URL (https://leaflet.pub/p/did:plc:xxx)
262
// Only link to publications table for AT-URI sites
263
+
let publication: string | null = null;
264
if (record.value.site && record.value.site.startsWith("at://")) {
265
let siteURI = new AtUri(record.value.site);
0
266
if (siteURI.host !== evt.uri.host) {
267
console.log("Unauthorized to create document in site!");
268
return;
269
}
270
+
publication = record.value.site;
0
0
0
0
0
0
0
0
0
0
0
0
0
271
}
272
+
await inngest.send({
273
+
name: "appview/index-document",
274
+
data: {
275
+
document_uri: evt.uri.toString(),
276
+
document_data: record.value as Json,
277
+
bsky_post_uri: record.value.bskyPostRef?.uri,
278
+
publication,
279
+
did: evt.did,
280
+
},
281
+
});
282
}
283
if (evt.event === "delete") {
284
await supabase.from("documents").delete().eq("uri", evt.uri.toString());