a tool for shared writing and social publishing
1import { createClient } from "@supabase/supabase-js";
2import { Database, Json } from "supabase/database.types";
3import { IdResolver } from "@atproto/identity";
4const idResolver = new IdResolver();
5import { Firehose, MemoryRunner, Event } from "@atproto/sync";
6import { ids } from "lexicons/api/lexicons";
7import {
8 PubLeafletDocument,
9 PubLeafletGraphSubscription,
10 PubLeafletPublication,
11 PubLeafletComment,
12 PubLeafletPollVote,
13 PubLeafletPollDefinition,
14 PubLeafletInteractionsRecommend,
15 SiteStandardDocument,
16 SiteStandardPublication,
17 SiteStandardGraphSubscription,
18} from "lexicons/api";
19import {
20 AppBskyEmbedExternal,
21 AppBskyEmbedRecordWithMedia,
22 AppBskyFeedPost,
23 AppBskyRichtextFacet,
24} from "@atproto/api";
25import { AtUri } from "@atproto/syntax";
26import { writeFile, readFile } from "fs/promises";
27import { inngest } from "app/api/inngest/client";
28
29const cursorFile = process.env.CURSOR_FILE || "/cursor/cursor";
30
31let supabase = createClient<Database>(
32 process.env.NEXT_PUBLIC_SUPABASE_API_URL as string,
33 process.env.SUPABASE_SERVICE_ROLE_KEY as string,
34);
35const QUOTE_PARAM = "/l-quote/";
36async function main() {
37 const runner = new MemoryRunner({});
38 let firehose = new Firehose({
39 service: "wss://relay1.us-west.bsky.network",
40 subscriptionReconnectDelay: 3000,
41 excludeAccount: true,
42 excludeIdentity: true,
43 runner,
44 idResolver,
45 filterCollections: [
46 ids.PubLeafletDocument,
47 ids.PubLeafletPublication,
48 ids.PubLeafletGraphSubscription,
49 ids.PubLeafletComment,
50 ids.PubLeafletPollVote,
51 ids.PubLeafletPollDefinition,
52 ids.PubLeafletInteractionsRecommend,
53 // ids.AppBskyActorProfile,
54 "app.bsky.feed.post",
55 ids.SiteStandardDocument,
56 ids.SiteStandardPublication,
57 ids.SiteStandardGraphSubscription,
58 ],
59 handleEvent,
60 onError: (err) => {
61 console.error(err);
62 },
63 });
64 console.log("starting firehose consumer");
65 firehose.start();
66 let cleaningUp = false;
67 const cleanup = async () => {
68 if (cleaningUp) return;
69 cleaningUp = true;
70 console.log("shutting down firehose...");
71 await firehose.destroy();
72 await runner.destroy();
73 process.exit();
74 };
75
76 process.on("SIGINT", cleanup);
77 process.on("SIGTERM", cleanup);
78}
79
80main();
81
82async function handleEvent(evt: Event) {
83 if (evt.event === "identity") {
84 if (evt.handle)
85 await supabase
86 .from("bsky_profiles")
87 .update({ handle: evt.handle })
88 .eq("did", evt.did);
89 }
90 if (
91 evt.event == "account" ||
92 evt.event === "identity" ||
93 evt.event === "sync"
94 )
95 return;
96 if (evt.collection !== "app.bsky.feed.post")
97 console.log(
98 `${evt.event} in ${evt.collection} ${evt.uri}: ${evt.seq} ${evt.time}`,
99 );
100 if (evt.collection === ids.PubLeafletDocument) {
101 if (evt.event === "create" || evt.event === "update") {
102 let record = PubLeafletDocument.validateRecord(evt.record);
103 if (!record.success) {
104 console.log(record.error);
105 return;
106 }
107 let docResult = await supabase.from("documents").upsert({
108 uri: evt.uri.toString(),
109 data: record.value as Json,
110 });
111 if (docResult.error) console.log(docResult.error);
112 await inngest.send({
113 name: "appview/sync-document-metadata",
114 data: {
115 document_uri: evt.uri.toString(),
116 bsky_post_uri: record.value.postRef?.uri,
117 },
118 });
119 if (record.value.publication) {
120 let publicationURI = new AtUri(record.value.publication);
121
122 if (publicationURI.host !== evt.uri.host) {
123 console.log("Unauthorized to create post!");
124 return;
125 }
126 let docInPublicationResult = await supabase
127 .from("documents_in_publications")
128 .upsert({
129 publication: record.value.publication,
130 document: evt.uri.toString(),
131 });
132 await supabase
133 .from("documents_in_publications")
134 .delete()
135 .neq("publication", record.value.publication)
136 .eq("document", evt.uri.toString());
137
138 if (docInPublicationResult.error)
139 console.log(docInPublicationResult.error);
140 }
141 }
142 if (evt.event === "delete") {
143 await supabase.from("documents").delete().eq("uri", evt.uri.toString());
144 }
145 }
146 if (evt.collection === ids.PubLeafletPublication) {
147 if (evt.event === "create" || evt.event === "update") {
148 let record = PubLeafletPublication.validateRecord(evt.record);
149 if (!record.success) return;
150 await supabase
151 .from("identities")
152 .upsert({ atp_did: evt.did }, { onConflict: "atp_did" });
153 await supabase.from("publications").upsert({
154 uri: evt.uri.toString(),
155 identity_did: evt.did,
156 name: record.value.name,
157 record: record.value as Json,
158 });
159 }
160 if (evt.event === "delete") {
161 await supabase
162 .from("publications")
163 .delete()
164 .eq("uri", evt.uri.toString());
165 }
166 }
167 if (evt.collection === ids.PubLeafletComment) {
168 if (evt.event === "create" || evt.event === "update") {
169 let record = PubLeafletComment.validateRecord(evt.record);
170 if (!record.success) return;
171 let { error } = await supabase.from("comments_on_documents").upsert({
172 uri: evt.uri.toString(),
173 profile: evt.did,
174 document: record.value.subject,
175 record: record.value as Json,
176 });
177 }
178 if (evt.event === "delete") {
179 await supabase
180 .from("comments_on_documents")
181 .delete()
182 .eq("uri", evt.uri.toString());
183 }
184 }
185 if (evt.collection === ids.PubLeafletPollVote) {
186 if (evt.event === "create" || evt.event === "update") {
187 let record = PubLeafletPollVote.validateRecord(evt.record);
188 if (!record.success) return;
189 let { error } = await supabase.from("atp_poll_votes").upsert({
190 uri: evt.uri.toString(),
191 voter_did: evt.did,
192 poll_uri: record.value.poll.uri,
193 poll_cid: record.value.poll.cid,
194 record: record.value as Json,
195 });
196 }
197 if (evt.event === "delete") {
198 await supabase
199 .from("atp_poll_votes")
200 .delete()
201 .eq("uri", evt.uri.toString());
202 }
203 }
204 if (evt.collection === ids.PubLeafletPollDefinition) {
205 if (evt.event === "create" || evt.event === "update") {
206 let record = PubLeafletPollDefinition.validateRecord(evt.record);
207 if (!record.success) return;
208 let { error } = await supabase.from("atp_poll_records").upsert({
209 uri: evt.uri.toString(),
210 cid: evt.cid.toString(),
211 record: record.value as Json,
212 });
213 if (error) console.log("Error upserting poll definition:", error);
214 }
215 if (evt.event === "delete") {
216 await supabase
217 .from("atp_poll_records")
218 .delete()
219 .eq("uri", evt.uri.toString());
220 }
221 }
222 if (evt.collection === ids.PubLeafletInteractionsRecommend) {
223 if (evt.event === "create" || evt.event === "update") {
224 let record = PubLeafletInteractionsRecommend.validateRecord(evt.record);
225 if (!record.success) return;
226 await supabase
227 .from("identities")
228 .upsert({ atp_did: evt.did }, { onConflict: "atp_did" });
229 let { error } = await supabase.from("recommends_on_documents").upsert({
230 uri: evt.uri.toString(),
231 recommender_did: evt.did,
232 document: record.value.subject,
233 record: record.value as Json,
234 });
235 if (error) console.log("Error upserting recommend:", error);
236 }
237 if (evt.event === "delete") {
238 await supabase
239 .from("recommends_on_documents")
240 .delete()
241 .eq("uri", evt.uri.toString());
242 }
243 }
244 if (evt.collection === ids.PubLeafletGraphSubscription) {
245 if (evt.event === "create" || evt.event === "update") {
246 let record = PubLeafletGraphSubscription.validateRecord(evt.record);
247 if (!record.success) return;
248 await supabase
249 .from("identities")
250 .upsert({ atp_did: evt.did }, { onConflict: "atp_did" });
251 await supabase.from("publication_subscriptions").upsert({
252 uri: evt.uri.toString(),
253 identity: evt.did,
254 publication: record.value.publication,
255 record: record.value as Json,
256 });
257 }
258 if (evt.event === "delete") {
259 await supabase
260 .from("publication_subscriptions")
261 .delete()
262 .eq("uri", evt.uri.toString());
263 }
264 }
265 // site.standard.document records go into the main "documents" table
266 // The normalization layer handles reading both pub.leaflet and site.standard formats
267 if (evt.collection === ids.SiteStandardDocument) {
268 if (evt.event === "create" || evt.event === "update") {
269 let record = SiteStandardDocument.validateRecord(evt.record);
270 if (!record.success) {
271 console.log(record.error);
272 return;
273 }
274 let docResult = await supabase.from("documents").upsert({
275 uri: evt.uri.toString(),
276 data: record.value as Json,
277 });
278 if (docResult.error) console.log(docResult.error);
279 await inngest.send({
280 name: "appview/sync-document-metadata",
281 data: {
282 document_uri: evt.uri.toString(),
283 bsky_post_uri: record.value.bskyPostRef?.uri,
284 },
285 });
286
287 // site.standard.document uses "site" field to reference the publication
288 // For documents in publications, site is an AT-URI (at://did:plc:xxx/site.standard.publication/rkey)
289 // For standalone documents, site is an HTTPS URL (https://leaflet.pub/p/did:plc:xxx)
290 // Only link to publications table for AT-URI sites
291 if (record.value.site && record.value.site.startsWith("at://")) {
292 let siteURI = new AtUri(record.value.site);
293
294 if (siteURI.host !== evt.uri.host) {
295 console.log("Unauthorized to create document in site!");
296 return;
297 }
298 let docInPublicationResult = await supabase
299 .from("documents_in_publications")
300 .upsert({
301 publication: record.value.site,
302 document: evt.uri.toString(),
303 });
304 await supabase
305 .from("documents_in_publications")
306 .delete()
307 .neq("publication", record.value.site)
308 .eq("document", evt.uri.toString());
309
310 if (docInPublicationResult.error)
311 console.log(docInPublicationResult.error);
312 }
313 }
314 if (evt.event === "delete") {
315 await supabase.from("documents").delete().eq("uri", evt.uri.toString());
316 }
317 }
318
319 // site.standard.publication records go into the main "publications" table
320 if (evt.collection === ids.SiteStandardPublication) {
321 if (evt.event === "create" || evt.event === "update") {
322 let record = SiteStandardPublication.validateRecord(evt.record);
323 if (!record.success) return;
324 await supabase
325 .from("identities")
326 .upsert({ atp_did: evt.did }, { onConflict: "atp_did" });
327 await supabase.from("publications").upsert({
328 uri: evt.uri.toString(),
329 identity_did: evt.did,
330 name: record.value.name,
331 record: record.value as Json,
332 });
333 }
334 if (evt.event === "delete") {
335 await supabase
336 .from("publications")
337 .delete()
338 .eq("uri", evt.uri.toString());
339 }
340 }
341
342 // site.standard.graph.subscription records go into the main "publication_subscriptions" table
343 if (evt.collection === ids.SiteStandardGraphSubscription) {
344 if (evt.event === "create" || evt.event === "update") {
345 let record = SiteStandardGraphSubscription.validateRecord(evt.record);
346 if (!record.success) return;
347 await supabase
348 .from("identities")
349 .upsert({ atp_did: evt.did }, { onConflict: "atp_did" });
350 await supabase.from("publication_subscriptions").upsert({
351 uri: evt.uri.toString(),
352 identity: evt.did,
353 publication: record.value.publication,
354 record: record.value as Json,
355 });
356 }
357 if (evt.event === "delete") {
358 await supabase
359 .from("publication_subscriptions")
360 .delete()
361 .eq("uri", evt.uri.toString());
362 }
363 }
364 // if (evt.collection === ids.AppBskyActorProfile) {
365 // //only listen to updates because we should fetch it for the first time when they subscribe!
366 // if (evt.event === "update") {
367 // await supabaseServerClient
368 // .from("bsky_profiles")
369 // .update({ record: evt.record as Json })
370 // .eq("did", evt.did);
371 // }
372 // }
373 if (evt.collection === "app.bsky.feed.post") {
374 if (evt.event !== "create") return;
375
376 // Early exit if no embed
377 if (
378 !evt.record ||
379 typeof evt.record !== "object" ||
380 !("embed" in evt.record)
381 )
382 return;
383
384 // Check if embed contains our quote param using optional chaining
385 const embedRecord = evt.record as any;
386 const hasQuoteParam =
387 embedRecord.embed?.external?.uri?.includes(QUOTE_PARAM) ||
388 embedRecord.embed?.media?.external?.uri?.includes(QUOTE_PARAM);
389
390 if (!hasQuoteParam) return;
391
392 // Now validate the record since we know it contains our quote param
393 let record = AppBskyFeedPost.validateRecord(evt.record);
394 if (!record.success) {
395 console.log(record.error);
396 return;
397 }
398
399 let embed: string | null = null;
400 if (
401 AppBskyEmbedExternal.isMain(record.value.embed) &&
402 record.value.embed.external.uri.includes(QUOTE_PARAM)
403 ) {
404 embed = record.value.embed.external.uri;
405 }
406 if (
407 AppBskyEmbedRecordWithMedia.isMain(record.value.embed) &&
408 AppBskyEmbedExternal.isMain(record.value.embed.media) &&
409 record.value.embed.media?.external?.uri.includes(QUOTE_PARAM)
410 ) {
411 embed = record.value.embed.media.external.uri;
412 }
413 if (embed) {
414 console.log(
415 "processing post mention: " + embed + " in " + evt.uri.toString(),
416 );
417 await inngest.send({
418 name: "appview/index-bsky-post-mention",
419 data: { post_uri: evt.uri.toString(), document_link: embed },
420 });
421 }
422 }
423}