a tool for shared writing and social publishing
1import { createClient } from "@supabase/supabase-js";
2import { Database, Json } from "supabase/database.types";
3import { IdResolver } from "@atproto/identity";
4const idResolver = new IdResolver();
5import { Firehose, MemoryRunner, Event } from "@atproto/sync";
6import { ids } from "lexicons/api/lexicons";
7import {
8 PubLeafletDocument,
9 PubLeafletGraphSubscription,
10 PubLeafletPublication,
11 PubLeafletComment,
12 PubLeafletPollVote,
13 PubLeafletPollDefinition,
14} from "lexicons/api";
15import {
16 AppBskyEmbedExternal,
17 AppBskyEmbedRecordWithMedia,
18 AppBskyFeedPost,
19 AppBskyRichtextFacet,
20} from "@atproto/api";
21import { AtUri } from "@atproto/syntax";
22import { writeFile, readFile } from "fs/promises";
23import { createIdentity } from "actions/createIdentity";
24import { drizzle } from "drizzle-orm/node-postgres";
25import { inngest } from "app/api/inngest/client";
26import { Client } from "pg";
27
28const cursorFile = process.env.CURSOR_FILE || "/cursor/cursor";
29
30let supabase = createClient<Database>(
31 process.env.NEXT_PUBLIC_SUPABASE_API_URL as string,
32 process.env.SUPABASE_SERVICE_ROLE_KEY as string,
33);
34const QUOTE_PARAM = "/l-quote/";
35async function main() {
36 const runner = new MemoryRunner({});
37 let firehose = new Firehose({
38 service: "wss://relay1.us-west.bsky.network",
39 subscriptionReconnectDelay: 3000,
40 excludeAccount: true,
41 excludeIdentity: true,
42 runner,
43 idResolver,
44 filterCollections: [
45 ids.PubLeafletDocument,
46 ids.PubLeafletPublication,
47 ids.PubLeafletGraphSubscription,
48 ids.PubLeafletComment,
49 ids.PubLeafletPollVote,
50 ids.PubLeafletPollDefinition,
51 // ids.AppBskyActorProfile,
52 "app.bsky.feed.post",
53 ],
54 handleEvent,
55 onError: (err) => {
56 console.error(err);
57 },
58 });
59 console.log("starting firehose consumer");
60 firehose.start();
61 let cleaningUp = false;
62 const cleanup = async () => {
63 if (cleaningUp) return;
64 cleaningUp = true;
65 console.log("shutting down firehose...");
66 await firehose.destroy();
67 await runner.destroy();
68 process.exit();
69 };
70
71 process.on("SIGINT", cleanup);
72 process.on("SIGTERM", cleanup);
73}
74
75main();
76
77async function handleEvent(evt: Event) {
78 if (evt.event === "identity") {
79 if (evt.handle)
80 await supabase
81 .from("bsky_profiles")
82 .update({ handle: evt.handle })
83 .eq("did", evt.did);
84 }
85 if (
86 evt.event == "account" ||
87 evt.event === "identity" ||
88 evt.event === "sync"
89 )
90 return;
91 if (evt.collection !== "app.bsky.feed.post")
92 console.log(
93 `${evt.event} in ${evt.collection} ${evt.uri}: ${evt.seq} ${evt.time}`,
94 );
95 if (evt.collection === ids.PubLeafletDocument) {
96 if (evt.event === "create" || evt.event === "update") {
97 let record = PubLeafletDocument.validateRecord(evt.record);
98 if (!record.success) {
99 console.log(record.error);
100 return;
101 }
102 let docResult = await supabase.from("documents").upsert({
103 uri: evt.uri.toString(),
104 data: record.value as Json,
105 });
106 if (docResult.error) console.log(docResult.error);
107 let publicationURI = new AtUri(record.value.publication);
108
109 if (publicationURI.host !== evt.uri.host) {
110 console.log("Unauthorized to create post!");
111 return;
112 }
113 let docInPublicationResult = await supabase
114 .from("documents_in_publications")
115 .upsert({
116 publication: record.value.publication,
117 document: evt.uri.toString(),
118 });
119 if (docInPublicationResult.error)
120 console.log(docInPublicationResult.error);
121 }
122 if (evt.event === "delete") {
123 await supabase.from("documents").delete().eq("uri", evt.uri.toString());
124 }
125 }
126 if (evt.collection === ids.PubLeafletPublication) {
127 if (evt.event === "create" || evt.event === "update") {
128 let record = PubLeafletPublication.validateRecord(evt.record);
129 if (!record.success) return;
130 let { error } = await supabase.from("publications").upsert({
131 uri: evt.uri.toString(),
132 identity_did: evt.did,
133 name: record.value.name,
134 record: record.value as Json,
135 });
136
137 if (error && error.code === "23503") {
138 console.log("creating identity");
139 let client = new Client({ connectionString: process.env.DB_URL });
140 let db = drizzle(client);
141 await createIdentity(db, { atp_did: evt.did });
142 client.end();
143 await supabase.from("publications").upsert({
144 uri: evt.uri.toString(),
145 identity_did: evt.did,
146 name: record.value.name,
147 record: record.value as Json,
148 });
149 }
150 }
151 if (evt.event === "delete") {
152 await supabase
153 .from("publications")
154 .delete()
155 .eq("uri", evt.uri.toString());
156 }
157 }
158 if (evt.collection === ids.PubLeafletComment) {
159 if (evt.event === "create" || evt.event === "update") {
160 let record = PubLeafletComment.validateRecord(evt.record);
161 if (!record.success) return;
162 let { error } = await supabase.from("comments_on_documents").upsert({
163 uri: evt.uri.toString(),
164 profile: evt.did,
165 document: record.value.subject,
166 record: record.value as Json,
167 });
168 }
169 if (evt.event === "delete") {
170 await supabase
171 .from("comments_on_documents")
172 .delete()
173 .eq("uri", evt.uri.toString());
174 }
175 }
176 if (evt.collection === ids.PubLeafletPollVote) {
177 if (evt.event === "create" || evt.event === "update") {
178 let record = PubLeafletPollVote.validateRecord(evt.record);
179 if (!record.success) return;
180 let { error } = await supabase.from("atp_poll_votes").upsert({
181 uri: evt.uri.toString(),
182 voter_did: evt.did,
183 poll_uri: record.value.poll.uri,
184 poll_cid: record.value.poll.cid,
185 record: record.value as Json,
186 });
187 }
188 if (evt.event === "delete") {
189 await supabase
190 .from("atp_poll_votes")
191 .delete()
192 .eq("uri", evt.uri.toString());
193 }
194 }
195 if (evt.collection === ids.PubLeafletPollDefinition) {
196 if (evt.event === "create" || evt.event === "update") {
197 let record = PubLeafletPollDefinition.validateRecord(evt.record);
198 if (!record.success) return;
199 let { error } = await supabase.from("atp_poll_records").upsert({
200 uri: evt.uri.toString(),
201 cid: evt.cid.toString(),
202 record: record.value as Json,
203 });
204 if (error) console.log("Error upserting poll definition:", error);
205 }
206 if (evt.event === "delete") {
207 await supabase
208 .from("atp_poll_records")
209 .delete()
210 .eq("uri", evt.uri.toString());
211 }
212 }
213 if (evt.collection === ids.PubLeafletGraphSubscription) {
214 if (evt.event === "create" || evt.event === "update") {
215 let record = PubLeafletGraphSubscription.validateRecord(evt.record);
216 if (!record.success) return;
217 let { error } = await supabase.from("publication_subscriptions").upsert({
218 uri: evt.uri.toString(),
219 identity: evt.did,
220 publication: record.value.publication,
221 record: record.value as Json,
222 });
223 if (error && error.code === "23503") {
224 console.log("creating identity");
225 let client = new Client({ connectionString: process.env.DB_URL });
226 let db = drizzle(client);
227 await createIdentity(db, { atp_did: evt.did });
228 client.end();
229 await supabase.from("publication_subscriptions").upsert({
230 uri: evt.uri.toString(),
231 identity: evt.did,
232 publication: record.value.publication,
233 record: record.value as Json,
234 });
235 }
236 }
237 if (evt.event === "delete") {
238 await supabase
239 .from("publication_subscriptions")
240 .delete()
241 .eq("uri", evt.uri.toString());
242 }
243 }
244 // if (evt.collection === ids.AppBskyActorProfile) {
245 // //only listen to updates because we should fetch it for the first time when they subscribe!
246 // if (evt.event === "update") {
247 // await supabaseServerClient
248 // .from("bsky_profiles")
249 // .update({ record: evt.record as Json })
250 // .eq("did", evt.did);
251 // }
252 // }
253 if (evt.collection === "app.bsky.feed.post") {
254 if (evt.event !== "create") return;
255
256 // Early exit if no embed
257 if (
258 !evt.record ||
259 typeof evt.record !== "object" ||
260 !("embed" in evt.record)
261 )
262 return;
263
264 // Check if embed contains our quote param using optional chaining
265 const embedRecord = evt.record as any;
266 const hasQuoteParam =
267 embedRecord.embed?.external?.uri?.includes(QUOTE_PARAM) ||
268 embedRecord.embed?.media?.external?.uri?.includes(QUOTE_PARAM);
269
270 if (!hasQuoteParam) return;
271 console.log("FOUND EMBED!!!");
272
273 // Now validate the record since we know it contains our quote param
274 let record = AppBskyFeedPost.validateRecord(evt.record);
275 if (!record.success) return;
276
277 let embed: string | null = null;
278 if (
279 AppBskyEmbedExternal.isMain(record.value.embed) &&
280 record.value.embed.external.uri.includes(QUOTE_PARAM)
281 ) {
282 embed = record.value.embed.external.uri;
283 }
284 if (
285 AppBskyEmbedRecordWithMedia.isMain(record.value.embed) &&
286 AppBskyEmbedExternal.isMain(record.value.embed.media) &&
287 record.value.embed.media?.external?.uri.includes(QUOTE_PARAM)
288 ) {
289 embed = record.value.embed.media.external.uri;
290 }
291 if (embed) {
292 console.log(
293 "processing post mention: " + embed + " in " + evt.uri.toString(),
294 );
295 await inngest.send({
296 name: "appview/index-bsky-post-mention",
297 data: { post_uri: evt.uri.toString(), document_link: embed },
298 });
299 }
300 }
301}