a tool for shared writing and social publishing
1import { createClient } from "@supabase/supabase-js";
2import { Database, Json } from "supabase/database.types";
3import { IdResolver } from "@atproto/identity";
4const idResolver = new IdResolver();
5import { Firehose, MemoryRunner, Event } from "@atproto/sync";
6import { ids } from "lexicons/api/lexicons";
7import {
8 PubLeafletDocument,
9 PubLeafletGraphSubscription,
10 PubLeafletPublication,
11 PubLeafletComment,
12 PubLeafletPollVote,
13 PubLeafletPollDefinition,
14} from "lexicons/api";
15import {
16 AppBskyEmbedExternal,
17 AppBskyEmbedRecordWithMedia,
18 AppBskyFeedPost,
19 AppBskyRichtextFacet,
20} from "@atproto/api";
21import { AtUri } from "@atproto/syntax";
22import { writeFile, readFile } from "fs/promises";
23import { createIdentity } from "actions/createIdentity";
24import { drizzle } from "drizzle-orm/node-postgres";
25import { inngest } from "app/api/inngest/client";
26import { Client } from "pg";
27
28const cursorFile = process.env.CURSOR_FILE || "/cursor/cursor";
29
30let supabase = createClient<Database>(
31 process.env.NEXT_PUBLIC_SUPABASE_API_URL as string,
32 process.env.SUPABASE_SERVICE_ROLE_KEY as string,
33);
34const QUOTE_PARAM = "/l-quote/";
35async function main() {
36 const runner = new MemoryRunner({});
37 let firehose = new Firehose({
38 service: "wss://relay1.us-west.bsky.network",
39 subscriptionReconnectDelay: 3000,
40 excludeAccount: true,
41 excludeIdentity: true,
42 runner,
43 idResolver,
44 filterCollections: [
45 ids.PubLeafletDocument,
46 ids.PubLeafletPublication,
47 ids.PubLeafletGraphSubscription,
48 ids.PubLeafletComment,
49 ids.PubLeafletPollVote,
50 ids.PubLeafletPollDefinition,
51 // ids.AppBskyActorProfile,
52 "app.bsky.feed.post",
53 ],
54 handleEvent,
55 onError: (err) => {
56 console.error(err);
57 },
58 });
59 console.log("starting firehose consumer");
60 firehose.start();
61 let cleaningUp = false;
62 const cleanup = async () => {
63 if (cleaningUp) return;
64 cleaningUp = true;
65 console.log("shutting down firehose...");
66 await firehose.destroy();
67 await runner.destroy();
68 process.exit();
69 };
70
71 process.on("SIGINT", cleanup);
72 process.on("SIGTERM", cleanup);
73}
74
75main();
76
77async function handleEvent(evt: Event) {
78 if (evt.event === "identity") {
79 if (evt.handle)
80 await supabase
81 .from("bsky_profiles")
82 .update({ handle: evt.handle })
83 .eq("did", evt.did);
84 }
85 if (
86 evt.event == "account" ||
87 evt.event === "identity" ||
88 evt.event === "sync"
89 )
90 return;
91 if (evt.collection !== "app.bsky.feed.post")
92 console.log(
93 `${evt.event} in ${evt.collection} ${evt.uri}: ${evt.seq} ${evt.time}`,
94 );
95 if (evt.collection === ids.PubLeafletDocument) {
96 if (evt.event === "create" || evt.event === "update") {
97 let record = PubLeafletDocument.validateRecord(evt.record);
98 if (!record.success) {
99 console.log(record.error);
100 return;
101 }
102 let docResult = await supabase.from("documents").upsert({
103 uri: evt.uri.toString(),
104 data: record.value as Json,
105 });
106 if (docResult.error) console.log(docResult.error);
107 if (record.value.publication) {
108 let publicationURI = new AtUri(record.value.publication);
109
110 if (publicationURI.host !== evt.uri.host) {
111 console.log("Unauthorized to create post!");
112 return;
113 }
114 let docInPublicationResult = await supabase
115 .from("documents_in_publications")
116 .upsert({
117 publication: record.value.publication,
118 document: evt.uri.toString(),
119 });
120 await supabase
121 .from("documents_in_publications")
122 .delete()
123 .neq("publication", record.value.publication)
124 .eq("document", evt.uri.toString());
125
126 if (docInPublicationResult.error)
127 console.log(docInPublicationResult.error);
128 }
129 }
130 if (evt.event === "delete") {
131 await supabase.from("documents").delete().eq("uri", evt.uri.toString());
132 }
133 }
134 if (evt.collection === ids.PubLeafletPublication) {
135 if (evt.event === "create" || evt.event === "update") {
136 let record = PubLeafletPublication.validateRecord(evt.record);
137 if (!record.success) return;
138 let { error } = await supabase.from("publications").upsert({
139 uri: evt.uri.toString(),
140 identity_did: evt.did,
141 name: record.value.name,
142 record: record.value as Json,
143 });
144
145 if (error && error.code === "23503") {
146 console.log("creating identity");
147 let client = new Client({ connectionString: process.env.DB_URL });
148 let db = drizzle(client);
149 await createIdentity(db, { atp_did: evt.did });
150 client.end();
151 await supabase.from("publications").upsert({
152 uri: evt.uri.toString(),
153 identity_did: evt.did,
154 name: record.value.name,
155 record: record.value as Json,
156 });
157 }
158 }
159 if (evt.event === "delete") {
160 await supabase
161 .from("publications")
162 .delete()
163 .eq("uri", evt.uri.toString());
164 }
165 }
166 if (evt.collection === ids.PubLeafletComment) {
167 if (evt.event === "create" || evt.event === "update") {
168 let record = PubLeafletComment.validateRecord(evt.record);
169 if (!record.success) return;
170 let { error } = await supabase.from("comments_on_documents").upsert({
171 uri: evt.uri.toString(),
172 profile: evt.did,
173 document: record.value.subject,
174 record: record.value as Json,
175 });
176 }
177 if (evt.event === "delete") {
178 await supabase
179 .from("comments_on_documents")
180 .delete()
181 .eq("uri", evt.uri.toString());
182 }
183 }
184 if (evt.collection === ids.PubLeafletPollVote) {
185 if (evt.event === "create" || evt.event === "update") {
186 let record = PubLeafletPollVote.validateRecord(evt.record);
187 if (!record.success) return;
188 let { error } = await supabase.from("atp_poll_votes").upsert({
189 uri: evt.uri.toString(),
190 voter_did: evt.did,
191 poll_uri: record.value.poll.uri,
192 poll_cid: record.value.poll.cid,
193 record: record.value as Json,
194 });
195 }
196 if (evt.event === "delete") {
197 await supabase
198 .from("atp_poll_votes")
199 .delete()
200 .eq("uri", evt.uri.toString());
201 }
202 }
203 if (evt.collection === ids.PubLeafletPollDefinition) {
204 if (evt.event === "create" || evt.event === "update") {
205 let record = PubLeafletPollDefinition.validateRecord(evt.record);
206 if (!record.success) return;
207 let { error } = await supabase.from("atp_poll_records").upsert({
208 uri: evt.uri.toString(),
209 cid: evt.cid.toString(),
210 record: record.value as Json,
211 });
212 if (error) console.log("Error upserting poll definition:", error);
213 }
214 if (evt.event === "delete") {
215 await supabase
216 .from("atp_poll_records")
217 .delete()
218 .eq("uri", evt.uri.toString());
219 }
220 }
221 if (evt.collection === ids.PubLeafletGraphSubscription) {
222 if (evt.event === "create" || evt.event === "update") {
223 let record = PubLeafletGraphSubscription.validateRecord(evt.record);
224 if (!record.success) return;
225 let { error } = await supabase.from("publication_subscriptions").upsert({
226 uri: evt.uri.toString(),
227 identity: evt.did,
228 publication: record.value.publication,
229 record: record.value as Json,
230 });
231 if (error && error.code === "23503") {
232 console.log("creating identity");
233 let client = new Client({ connectionString: process.env.DB_URL });
234 let db = drizzle(client);
235 await createIdentity(db, { atp_did: evt.did });
236 client.end();
237 await supabase.from("publication_subscriptions").upsert({
238 uri: evt.uri.toString(),
239 identity: evt.did,
240 publication: record.value.publication,
241 record: record.value as Json,
242 });
243 }
244 }
245 if (evt.event === "delete") {
246 await supabase
247 .from("publication_subscriptions")
248 .delete()
249 .eq("uri", evt.uri.toString());
250 }
251 }
252 // if (evt.collection === ids.AppBskyActorProfile) {
253 // //only listen to updates because we should fetch it for the first time when they subscribe!
254 // if (evt.event === "update") {
255 // await supabaseServerClient
256 // .from("bsky_profiles")
257 // .update({ record: evt.record as Json })
258 // .eq("did", evt.did);
259 // }
260 // }
261 if (evt.collection === "app.bsky.feed.post") {
262 if (evt.event !== "create") return;
263
264 // Early exit if no embed
265 if (
266 !evt.record ||
267 typeof evt.record !== "object" ||
268 !("embed" in evt.record)
269 )
270 return;
271
272 // Check if embed contains our quote param using optional chaining
273 const embedRecord = evt.record as any;
274 const hasQuoteParam =
275 embedRecord.embed?.external?.uri?.includes(QUOTE_PARAM) ||
276 embedRecord.embed?.media?.external?.uri?.includes(QUOTE_PARAM);
277
278 if (!hasQuoteParam) return;
279 console.log("FOUND EMBED!!!");
280
281 // Now validate the record since we know it contains our quote param
282 let record = AppBskyFeedPost.validateRecord(evt.record);
283 if (!record.success) return;
284
285 let embed: string | null = null;
286 if (
287 AppBskyEmbedExternal.isMain(record.value.embed) &&
288 record.value.embed.external.uri.includes(QUOTE_PARAM)
289 ) {
290 embed = record.value.embed.external.uri;
291 }
292 if (
293 AppBskyEmbedRecordWithMedia.isMain(record.value.embed) &&
294 AppBskyEmbedExternal.isMain(record.value.embed.media) &&
295 record.value.embed.media?.external?.uri.includes(QUOTE_PARAM)
296 ) {
297 embed = record.value.embed.media.external.uri;
298 }
299 if (embed) {
300 console.log(
301 "processing post mention: " + embed + " in " + evt.uri.toString(),
302 );
303 await inngest.send({
304 name: "appview/index-bsky-post-mention",
305 data: { post_uri: evt.uri.toString(), document_link: embed },
306 });
307 }
308 }
309}