a tool for shared writing and social publishing
1import { createClient } from "@supabase/supabase-js";
2import { Database, Json } from "supabase/database.types";
3import { IdResolver } from "@atproto/identity";
4const idResolver = new IdResolver();
5import { Firehose, MemoryRunner, Event } from "@atproto/sync";
6import { ids } from "lexicons/api/lexicons";
7import {
8 PubLeafletDocument,
9 PubLeafletGraphSubscription,
10 PubLeafletPublication,
11 PubLeafletComment,
12 PubLeafletPollVote,
13 PubLeafletPollDefinition,
14 PubLeafletInteractionsRecommend,
15 SiteStandardDocument,
16 SiteStandardPublication,
17 SiteStandardGraphSubscription,
18} from "lexicons/api";
19import {
20 AppBskyEmbedExternal,
21 AppBskyEmbedRecordWithMedia,
22 AppBskyFeedPost,
23 AppBskyRichtextFacet,
24} from "@atproto/api";
25import { AtUri } from "@atproto/syntax";
26import { writeFile, readFile } from "fs/promises";
27import { inngest } from "app/api/inngest/client";
28
29const cursorFile = process.env.CURSOR_FILE || "/cursor/cursor";
30
31let supabase = createClient<Database>(
32 process.env.NEXT_PUBLIC_SUPABASE_API_URL as string,
33 process.env.SUPABASE_SERVICE_ROLE_KEY as string,
34);
35const QUOTE_PARAM = "/l-quote/";
36async function main() {
37 const runner = new MemoryRunner({});
38 let firehose = new Firehose({
39 service: "wss://relay1.us-west.bsky.network",
40 subscriptionReconnectDelay: 3000,
41 excludeAccount: true,
42 excludeIdentity: true,
43 runner,
44 idResolver,
45 filterCollections: [
46 ids.PubLeafletDocument,
47 ids.PubLeafletPublication,
48 ids.PubLeafletGraphSubscription,
49 ids.PubLeafletComment,
50 ids.PubLeafletPollVote,
51 ids.PubLeafletPollDefinition,
52 ids.PubLeafletInteractionsRecommend,
53 // ids.AppBskyActorProfile,
54 "app.bsky.feed.post",
55 ids.SiteStandardDocument,
56 ids.SiteStandardPublication,
57 ids.SiteStandardGraphSubscription,
58 ],
59 handleEvent,
60 onError: (err) => {
61 console.error(err);
62 },
63 });
64 console.log("starting firehose consumer");
65 firehose.start();
66 let cleaningUp = false;
67 const cleanup = async () => {
68 if (cleaningUp) return;
69 cleaningUp = true;
70 console.log("shutting down firehose...");
71 await firehose.destroy();
72 await runner.destroy();
73 process.exit();
74 };
75
76 process.on("SIGINT", cleanup);
77 process.on("SIGTERM", cleanup);
78}
79
80main();
81
82async function handleEvent(evt: Event) {
83 if (evt.event === "identity") {
84 if (evt.handle)
85 await supabase
86 .from("bsky_profiles")
87 .update({ handle: evt.handle })
88 .eq("did", evt.did);
89 }
90 if (
91 evt.event == "account" ||
92 evt.event === "identity" ||
93 evt.event === "sync"
94 )
95 return;
96 if (evt.collection !== "app.bsky.feed.post")
97 console.log(
98 `${evt.event} in ${evt.collection} ${evt.uri}: ${evt.seq} ${evt.time}`,
99 );
100 if (evt.collection === ids.PubLeafletDocument) {
101 if (evt.event === "create" || evt.event === "update") {
102 let record = PubLeafletDocument.validateRecord(evt.record);
103 if (!record.success) {
104 console.log(record.error);
105 return;
106 }
107 let publication: string | null = null;
108 if (record.value.publication) {
109 let publicationURI = new AtUri(record.value.publication);
110 if (publicationURI.host !== evt.uri.host) {
111 console.log("Unauthorized to create post!");
112 return;
113 }
114 publication = record.value.publication;
115 }
116 await inngest.send({
117 name: "appview/index-document",
118 data: {
119 document_uri: evt.uri.toString(),
120 document_data: record.value as Json,
121 bsky_post_uri: record.value.postRef?.uri,
122 publication,
123 did: evt.did,
124 },
125 });
126 }
127 if (evt.event === "delete") {
128 await supabase.from("documents").delete().eq("uri", evt.uri.toString());
129 }
130 }
131 if (evt.collection === ids.PubLeafletPublication) {
132 if (evt.event === "create" || evt.event === "update") {
133 let record = PubLeafletPublication.validateRecord(evt.record);
134 if (!record.success) return;
135 await supabase
136 .from("identities")
137 .upsert({ atp_did: evt.did }, { onConflict: "atp_did" });
138 await supabase.from("publications").upsert({
139 uri: evt.uri.toString(),
140 identity_did: evt.did,
141 name: record.value.name,
142 record: record.value as Json,
143 });
144 }
145 if (evt.event === "delete") {
146 await supabase
147 .from("publications")
148 .delete()
149 .eq("uri", evt.uri.toString());
150 }
151 }
152 if (evt.collection === ids.PubLeafletComment) {
153 if (evt.event === "create" || evt.event === "update") {
154 let record = PubLeafletComment.validateRecord(evt.record);
155 if (!record.success) return;
156 let { error } = await supabase.from("comments_on_documents").upsert({
157 uri: evt.uri.toString(),
158 profile: evt.did,
159 document: record.value.subject,
160 record: record.value as Json,
161 });
162 }
163 if (evt.event === "delete") {
164 await supabase
165 .from("comments_on_documents")
166 .delete()
167 .eq("uri", evt.uri.toString());
168 }
169 }
170 if (evt.collection === ids.PubLeafletPollVote) {
171 if (evt.event === "create" || evt.event === "update") {
172 let record = PubLeafletPollVote.validateRecord(evt.record);
173 if (!record.success) return;
174 let { error } = await supabase.from("atp_poll_votes").upsert({
175 uri: evt.uri.toString(),
176 voter_did: evt.did,
177 poll_uri: record.value.poll.uri,
178 poll_cid: record.value.poll.cid,
179 record: record.value as Json,
180 });
181 }
182 if (evt.event === "delete") {
183 await supabase
184 .from("atp_poll_votes")
185 .delete()
186 .eq("uri", evt.uri.toString());
187 }
188 }
189 if (evt.collection === ids.PubLeafletPollDefinition) {
190 if (evt.event === "create" || evt.event === "update") {
191 let record = PubLeafletPollDefinition.validateRecord(evt.record);
192 if (!record.success) return;
193 let { error } = await supabase.from("atp_poll_records").upsert({
194 uri: evt.uri.toString(),
195 cid: evt.cid.toString(),
196 record: record.value as Json,
197 });
198 if (error) console.log("Error upserting poll definition:", error);
199 }
200 if (evt.event === "delete") {
201 await supabase
202 .from("atp_poll_records")
203 .delete()
204 .eq("uri", evt.uri.toString());
205 }
206 }
207 if (evt.collection === ids.PubLeafletInteractionsRecommend) {
208 if (evt.event === "create" || evt.event === "update") {
209 let record = PubLeafletInteractionsRecommend.validateRecord(evt.record);
210 if (!record.success) return;
211 await supabase
212 .from("identities")
213 .upsert({ atp_did: evt.did }, { onConflict: "atp_did" });
214 let { error } = await supabase.from("recommends_on_documents").upsert({
215 uri: evt.uri.toString(),
216 recommender_did: evt.did,
217 document: record.value.subject,
218 record: record.value as Json,
219 });
220 if (error) console.log("Error upserting recommend:", error);
221 }
222 if (evt.event === "delete") {
223 await supabase
224 .from("recommends_on_documents")
225 .delete()
226 .eq("uri", evt.uri.toString());
227 }
228 }
229 if (evt.collection === ids.PubLeafletGraphSubscription) {
230 if (evt.event === "create" || evt.event === "update") {
231 let record = PubLeafletGraphSubscription.validateRecord(evt.record);
232 if (!record.success) return;
233 await supabase
234 .from("identities")
235 .upsert({ atp_did: evt.did }, { onConflict: "atp_did" });
236 await supabase.from("publication_subscriptions").upsert({
237 uri: evt.uri.toString(),
238 identity: evt.did,
239 publication: record.value.publication,
240 record: record.value as Json,
241 });
242 }
243 if (evt.event === "delete") {
244 await supabase
245 .from("publication_subscriptions")
246 .delete()
247 .eq("uri", evt.uri.toString());
248 }
249 }
250 // site.standard.document records go into the main "documents" table
251 // The normalization layer handles reading both pub.leaflet and site.standard formats
252 if (evt.collection === ids.SiteStandardDocument) {
253 if (evt.event === "create" || evt.event === "update") {
254 let record = SiteStandardDocument.validateRecord(evt.record);
255 if (!record.success) {
256 console.log(record.error);
257 return;
258 }
259 // site.standard.document uses "site" field to reference the publication
260 // For documents in publications, site is an AT-URI (at://did:plc:xxx/site.standard.publication/rkey)
261 // For standalone documents, site is an HTTPS URL (https://leaflet.pub/p/did:plc:xxx)
262 // Only link to publications table for AT-URI sites
263 let publication: string | null = null;
264 if (record.value.site && record.value.site.startsWith("at://")) {
265 let siteURI = new AtUri(record.value.site);
266 if (siteURI.host !== evt.uri.host) {
267 console.log("Unauthorized to create document in site!");
268 return;
269 }
270 publication = record.value.site;
271 }
272 await inngest.send({
273 name: "appview/index-document",
274 data: {
275 document_uri: evt.uri.toString(),
276 document_data: record.value as Json,
277 bsky_post_uri: record.value.bskyPostRef?.uri,
278 publication,
279 did: evt.did,
280 },
281 });
282 }
283 if (evt.event === "delete") {
284 await supabase.from("documents").delete().eq("uri", evt.uri.toString());
285 }
286 }
287
288 // site.standard.publication records go into the main "publications" table
289 if (evt.collection === ids.SiteStandardPublication) {
290 if (evt.event === "create" || evt.event === "update") {
291 let record = SiteStandardPublication.validateRecord(evt.record);
292 if (!record.success) return;
293 await supabase
294 .from("identities")
295 .upsert({ atp_did: evt.did }, { onConflict: "atp_did" });
296 await supabase.from("publications").upsert({
297 uri: evt.uri.toString(),
298 identity_did: evt.did,
299 name: record.value.name,
300 record: record.value as Json,
301 });
302 }
303 if (evt.event === "delete") {
304 await supabase
305 .from("publications")
306 .delete()
307 .eq("uri", evt.uri.toString());
308 }
309 }
310
311 // site.standard.graph.subscription records go into the main "publication_subscriptions" table
312 if (evt.collection === ids.SiteStandardGraphSubscription) {
313 if (evt.event === "create" || evt.event === "update") {
314 let record = SiteStandardGraphSubscription.validateRecord(evt.record);
315 if (!record.success) return;
316 await supabase
317 .from("identities")
318 .upsert({ atp_did: evt.did }, { onConflict: "atp_did" });
319 await supabase.from("publication_subscriptions").upsert({
320 uri: evt.uri.toString(),
321 identity: evt.did,
322 publication: record.value.publication,
323 record: record.value as Json,
324 });
325 }
326 if (evt.event === "delete") {
327 await supabase
328 .from("publication_subscriptions")
329 .delete()
330 .eq("uri", evt.uri.toString());
331 }
332 }
333 // if (evt.collection === ids.AppBskyActorProfile) {
334 // //only listen to updates because we should fetch it for the first time when they subscribe!
335 // if (evt.event === "update") {
336 // await supabaseServerClient
337 // .from("bsky_profiles")
338 // .update({ record: evt.record as Json })
339 // .eq("did", evt.did);
340 // }
341 // }
342 if (evt.collection === "app.bsky.feed.post") {
343 if (evt.event !== "create") return;
344
345 // Early exit if no embed
346 if (
347 !evt.record ||
348 typeof evt.record !== "object" ||
349 !("embed" in evt.record)
350 )
351 return;
352
353 // Check if embed contains our quote param using optional chaining
354 const embedRecord = evt.record as any;
355 const hasQuoteParam =
356 embedRecord.embed?.external?.uri?.includes(QUOTE_PARAM) ||
357 embedRecord.embed?.media?.external?.uri?.includes(QUOTE_PARAM);
358
359 if (!hasQuoteParam) return;
360 console.log("FOUND EMBED!!!");
361
362 // Now validate the record since we know it contains our quote param
363 let record = AppBskyFeedPost.validateRecord(evt.record);
364 if (!record.success) {
365 console.log(record.error);
366 return;
367 }
368
369 let embed: string | null = null;
370 if (
371 AppBskyEmbedExternal.isMain(record.value.embed) &&
372 record.value.embed.external.uri.includes(QUOTE_PARAM)
373 ) {
374 embed = record.value.embed.external.uri;
375 }
376 if (
377 AppBskyEmbedRecordWithMedia.isMain(record.value.embed) &&
378 AppBskyEmbedExternal.isMain(record.value.embed.media) &&
379 record.value.embed.media?.external?.uri.includes(QUOTE_PARAM)
380 ) {
381 embed = record.value.embed.media.external.uri;
382 }
383 if (embed) {
384 console.log(
385 "processing post mention: " + embed + " in " + evt.uri.toString(),
386 );
387 await inngest.send({
388 name: "appview/index-bsky-post-mention",
389 data: { post_uri: evt.uri.toString(), document_link: embed },
390 });
391 }
392 }
393}