a tool for shared writing and social publishing
1import { createClient } from "@supabase/supabase-js";
2import { Database, Json } from "supabase/database.types";
3import { IdResolver } from "@atproto/identity";
4const idResolver = new IdResolver();
5import { Firehose, MemoryRunner, Event } from "@atproto/sync";
6import { ids } from "lexicons/api/lexicons";
7import {
8 PubLeafletDocument,
9 PubLeafletGraphSubscription,
10 PubLeafletPublication,
11 PubLeafletComment,
12 PubLeafletPollVote,
13 PubLeafletPollDefinition,
14} from "lexicons/api";
15import {
16 AppBskyEmbedExternal,
17 AppBskyEmbedRecordWithMedia,
18 AppBskyFeedPost,
19 AppBskyRichtextFacet,
20} from "@atproto/api";
21import { AtUri } from "@atproto/syntax";
22import { writeFile, readFile } from "fs/promises";
23import { inngest } from "app/api/inngest/client";
24
25const cursorFile = process.env.CURSOR_FILE || "/cursor/cursor";
26
27let supabase = createClient<Database>(
28 process.env.NEXT_PUBLIC_SUPABASE_API_URL as string,
29 process.env.SUPABASE_SERVICE_ROLE_KEY as string,
30);
31const QUOTE_PARAM = "/l-quote/";
32async function main() {
33 const runner = new MemoryRunner({});
34 let firehose = new Firehose({
35 service: "wss://relay1.us-west.bsky.network",
36 subscriptionReconnectDelay: 3000,
37 excludeAccount: true,
38 excludeIdentity: true,
39 runner,
40 idResolver,
41 filterCollections: [
42 ids.PubLeafletDocument,
43 ids.PubLeafletPublication,
44 ids.PubLeafletGraphSubscription,
45 ids.PubLeafletComment,
46 ids.PubLeafletPollVote,
47 ids.PubLeafletPollDefinition,
48 // ids.AppBskyActorProfile,
49 "app.bsky.feed.post",
50 ],
51 handleEvent,
52 onError: (err) => {
53 console.error(err);
54 },
55 });
56 console.log("starting firehose consumer");
57 firehose.start();
58 let cleaningUp = false;
59 const cleanup = async () => {
60 if (cleaningUp) return;
61 cleaningUp = true;
62 console.log("shutting down firehose...");
63 await firehose.destroy();
64 await runner.destroy();
65 process.exit();
66 };
67
68 process.on("SIGINT", cleanup);
69 process.on("SIGTERM", cleanup);
70}
71
72main();
73
74async function handleEvent(evt: Event) {
75 if (evt.event === "identity") {
76 if (evt.handle)
77 await supabase
78 .from("bsky_profiles")
79 .update({ handle: evt.handle })
80 .eq("did", evt.did);
81 }
82 if (
83 evt.event == "account" ||
84 evt.event === "identity" ||
85 evt.event === "sync"
86 )
87 return;
88 if (evt.collection !== "app.bsky.feed.post")
89 console.log(
90 `${evt.event} in ${evt.collection} ${evt.uri}: ${evt.seq} ${evt.time}`,
91 );
92 if (evt.collection === ids.PubLeafletDocument) {
93 if (evt.event === "create" || evt.event === "update") {
94 let record = PubLeafletDocument.validateRecord(evt.record);
95 if (!record.success) {
96 console.log(record.error);
97 return;
98 }
99 let docResult = await supabase.from("documents").upsert({
100 uri: evt.uri.toString(),
101 data: record.value as Json,
102 });
103 if (docResult.error) console.log(docResult.error);
104 if (record.value.publication) {
105 let publicationURI = new AtUri(record.value.publication);
106
107 if (publicationURI.host !== evt.uri.host) {
108 console.log("Unauthorized to create post!");
109 return;
110 }
111 let docInPublicationResult = await supabase
112 .from("documents_in_publications")
113 .upsert({
114 publication: record.value.publication,
115 document: evt.uri.toString(),
116 });
117 await supabase
118 .from("documents_in_publications")
119 .delete()
120 .neq("publication", record.value.publication)
121 .eq("document", evt.uri.toString());
122
123 if (docInPublicationResult.error)
124 console.log(docInPublicationResult.error);
125 }
126 }
127 if (evt.event === "delete") {
128 await supabase.from("documents").delete().eq("uri", evt.uri.toString());
129 }
130 }
131 if (evt.collection === ids.PubLeafletPublication) {
132 if (evt.event === "create" || evt.event === "update") {
133 let record = PubLeafletPublication.validateRecord(evt.record);
134 if (!record.success) return;
135 await supabase
136 .from("identities")
137 .upsert({ atp_did: evt.did }, { onConflict: "atp_did" });
138 await supabase.from("publications").upsert({
139 uri: evt.uri.toString(),
140 identity_did: evt.did,
141 name: record.value.name,
142 record: record.value as Json,
143 });
144 }
145 if (evt.event === "delete") {
146 await supabase
147 .from("publications")
148 .delete()
149 .eq("uri", evt.uri.toString());
150 }
151 }
152 if (evt.collection === ids.PubLeafletComment) {
153 if (evt.event === "create" || evt.event === "update") {
154 let record = PubLeafletComment.validateRecord(evt.record);
155 if (!record.success) return;
156 let { error } = await supabase.from("comments_on_documents").upsert({
157 uri: evt.uri.toString(),
158 profile: evt.did,
159 document: record.value.subject,
160 record: record.value as Json,
161 });
162 }
163 if (evt.event === "delete") {
164 await supabase
165 .from("comments_on_documents")
166 .delete()
167 .eq("uri", evt.uri.toString());
168 }
169 }
170 if (evt.collection === ids.PubLeafletPollVote) {
171 if (evt.event === "create" || evt.event === "update") {
172 let record = PubLeafletPollVote.validateRecord(evt.record);
173 if (!record.success) return;
174 let { error } = await supabase.from("atp_poll_votes").upsert({
175 uri: evt.uri.toString(),
176 voter_did: evt.did,
177 poll_uri: record.value.poll.uri,
178 poll_cid: record.value.poll.cid,
179 record: record.value as Json,
180 });
181 }
182 if (evt.event === "delete") {
183 await supabase
184 .from("atp_poll_votes")
185 .delete()
186 .eq("uri", evt.uri.toString());
187 }
188 }
189 if (evt.collection === ids.PubLeafletPollDefinition) {
190 if (evt.event === "create" || evt.event === "update") {
191 let record = PubLeafletPollDefinition.validateRecord(evt.record);
192 if (!record.success) return;
193 let { error } = await supabase.from("atp_poll_records").upsert({
194 uri: evt.uri.toString(),
195 cid: evt.cid.toString(),
196 record: record.value as Json,
197 });
198 if (error) console.log("Error upserting poll definition:", error);
199 }
200 if (evt.event === "delete") {
201 await supabase
202 .from("atp_poll_records")
203 .delete()
204 .eq("uri", evt.uri.toString());
205 }
206 }
207 if (evt.collection === ids.PubLeafletGraphSubscription) {
208 if (evt.event === "create" || evt.event === "update") {
209 let record = PubLeafletGraphSubscription.validateRecord(evt.record);
210 if (!record.success) return;
211 await supabase
212 .from("identities")
213 .upsert({ atp_did: evt.did }, { onConflict: "atp_did" });
214 await supabase.from("publication_subscriptions").upsert({
215 uri: evt.uri.toString(),
216 identity: evt.did,
217 publication: record.value.publication,
218 record: record.value as Json,
219 });
220 }
221 if (evt.event === "delete") {
222 await supabase
223 .from("publication_subscriptions")
224 .delete()
225 .eq("uri", evt.uri.toString());
226 }
227 }
228 // if (evt.collection === ids.AppBskyActorProfile) {
229 // //only listen to updates because we should fetch it for the first time when they subscribe!
230 // if (evt.event === "update") {
231 // await supabaseServerClient
232 // .from("bsky_profiles")
233 // .update({ record: evt.record as Json })
234 // .eq("did", evt.did);
235 // }
236 // }
237 if (evt.collection === "app.bsky.feed.post") {
238 if (evt.event !== "create") return;
239
240 // Early exit if no embed
241 if (
242 !evt.record ||
243 typeof evt.record !== "object" ||
244 !("embed" in evt.record)
245 )
246 return;
247
248 // Check if embed contains our quote param using optional chaining
249 const embedRecord = evt.record as any;
250 const hasQuoteParam =
251 embedRecord.embed?.external?.uri?.includes(QUOTE_PARAM) ||
252 embedRecord.embed?.media?.external?.uri?.includes(QUOTE_PARAM);
253
254 if (!hasQuoteParam) return;
255 console.log("FOUND EMBED!!!");
256
257 // Now validate the record since we know it contains our quote param
258 let record = AppBskyFeedPost.validateRecord(evt.record);
259 if (!record.success) return;
260
261 let embed: string | null = null;
262 if (
263 AppBskyEmbedExternal.isMain(record.value.embed) &&
264 record.value.embed.external.uri.includes(QUOTE_PARAM)
265 ) {
266 embed = record.value.embed.external.uri;
267 }
268 if (
269 AppBskyEmbedRecordWithMedia.isMain(record.value.embed) &&
270 AppBskyEmbedExternal.isMain(record.value.embed.media) &&
271 record.value.embed.media?.external?.uri.includes(QUOTE_PARAM)
272 ) {
273 embed = record.value.embed.media.external.uri;
274 }
275 if (embed) {
276 console.log(
277 "processing post mention: " + embed + " in " + evt.uri.toString(),
278 );
279 await inngest.send({
280 name: "appview/index-bsky-post-mention",
281 data: { post_uri: evt.uri.toString(), document_link: embed },
282 });
283 }
284 }
285}