a tool for shared writing and social publishing
1import { createClient } from "@supabase/supabase-js";
2import { Database, Json } from "supabase/database.types";
3import { IdResolver } from "@atproto/identity";
4const idResolver = new IdResolver();
5import { Firehose, MemoryRunner, Event } from "@atproto/sync";
6import { ids } from "lexicons/api/lexicons";
7import {
8 PubLeafletDocument,
9 PubLeafletGraphSubscription,
10 PubLeafletPublication,
11 PubLeafletComment,
12} from "lexicons/api";
13import {
14 AppBskyEmbedExternal,
15 AppBskyEmbedRecordWithMedia,
16 AppBskyFeedPost,
17 AppBskyRichtextFacet,
18} from "@atproto/api";
19import { AtUri } from "@atproto/syntax";
20import { writeFile, readFile } from "fs/promises";
21import { createIdentity } from "actions/createIdentity";
22import { drizzle } from "drizzle-orm/node-postgres";
23import { inngest } from "app/api/inngest/client";
24import { Client } from "pg";
25
26const cursorFile = process.env.CURSOR_FILE || "/cursor/cursor";
27
28let supabase = createClient<Database>(
29 process.env.NEXT_PUBLIC_SUPABASE_API_URL as string,
30 process.env.SUPABASE_SERVICE_ROLE_KEY as string,
31);
32const QUOTE_PARAM = "/l-quote/";
33async function main() {
34 const runner = new MemoryRunner({});
35 let firehose = new Firehose({
36 service: "wss://relay1.us-west.bsky.network",
37 subscriptionReconnectDelay: 3000,
38 excludeAccount: true,
39 excludeIdentity: true,
40 runner,
41 idResolver,
42 filterCollections: [
43 ids.PubLeafletDocument,
44 ids.PubLeafletPublication,
45 ids.PubLeafletGraphSubscription,
46 ids.PubLeafletComment,
47 // ids.AppBskyActorProfile,
48 "app.bsky.feed.post",
49 ],
50 handleEvent,
51 onError: (err) => {
52 console.error(err);
53 },
54 });
55 console.log("starting firehose consumer");
56 firehose.start();
57 let cleaningUp = false;
58 const cleanup = async () => {
59 if (cleaningUp) return;
60 cleaningUp = true;
61 console.log("shutting down firehose...");
62 await firehose.destroy();
63 await runner.destroy();
64 process.exit();
65 };
66
67 process.on("SIGINT", cleanup);
68 process.on("SIGTERM", cleanup);
69}
70
71main();
72
73async function handleEvent(evt: Event) {
74 if (evt.event === "identity") {
75 if (evt.handle)
76 await supabase
77 .from("bsky_profiles")
78 .update({ handle: evt.handle })
79 .eq("did", evt.did);
80 }
81 if (
82 evt.event == "account" ||
83 evt.event === "identity" ||
84 evt.event === "sync"
85 )
86 return;
87 if (evt.collection !== "app.bsky.feed.post")
88 console.log(
89 `${evt.event} in ${evt.collection} ${evt.uri}: ${evt.seq} ${evt.time}`,
90 );
91 if (evt.collection === ids.PubLeafletDocument) {
92 if (evt.event === "create" || evt.event === "update") {
93 let record = PubLeafletDocument.validateRecord(evt.record);
94 if (!record.success) {
95 console.log(record.error);
96 return;
97 }
98 let docResult = await supabase.from("documents").upsert({
99 uri: evt.uri.toString(),
100 data: record.value as Json,
101 });
102 if (docResult.error) console.log(docResult.error);
103 let publicationURI = new AtUri(record.value.publication);
104
105 if (publicationURI.host !== evt.uri.host) {
106 console.log("Unauthorized to create post!");
107 return;
108 }
109 let docInPublicationResult = await supabase
110 .from("documents_in_publications")
111 .upsert({
112 publication: record.value.publication,
113 document: evt.uri.toString(),
114 });
115 if (docInPublicationResult.error)
116 console.log(docInPublicationResult.error);
117 }
118 if (evt.event === "delete") {
119 await supabase.from("documents").delete().eq("uri", evt.uri.toString());
120 }
121 }
122 if (evt.collection === ids.PubLeafletPublication) {
123 if (evt.event === "create" || evt.event === "update") {
124 let record = PubLeafletPublication.validateRecord(evt.record);
125 if (!record.success) return;
126 let { error } = await supabase.from("publications").upsert({
127 uri: evt.uri.toString(),
128 identity_did: evt.did,
129 name: record.value.name,
130 record: record.value as Json,
131 });
132
133 if (error && error.code === "23503") {
134 console.log("creating identity");
135 let client = new Client({ connectionString: process.env.DB_URL });
136 let db = drizzle(client);
137 await createIdentity(db, { atp_did: evt.did });
138 client.end();
139 await supabase.from("publications").upsert({
140 uri: evt.uri.toString(),
141 identity_did: evt.did,
142 name: record.value.name,
143 record: record.value as Json,
144 });
145 }
146 }
147 if (evt.event === "delete") {
148 await supabase
149 .from("publications")
150 .delete()
151 .eq("uri", evt.uri.toString());
152 }
153 }
154 if (evt.collection === ids.PubLeafletComment) {
155 if (evt.event === "create" || evt.event === "update") {
156 let record = PubLeafletComment.validateRecord(evt.record);
157 if (!record.success) return;
158 let { error } = await supabase.from("comments_on_documents").upsert({
159 uri: evt.uri.toString(),
160 profile: evt.did,
161 document: record.value.subject,
162 record: record.value as Json,
163 });
164 }
165 if (evt.event === "delete") {
166 await supabase
167 .from("comments_on_documents")
168 .delete()
169 .eq("uri", evt.uri.toString());
170 }
171 }
172 if (evt.collection === ids.PubLeafletGraphSubscription) {
173 if (evt.event === "create" || evt.event === "update") {
174 let record = PubLeafletGraphSubscription.validateRecord(evt.record);
175 if (!record.success) return;
176 let { error } = await supabase.from("publication_subscriptions").upsert({
177 uri: evt.uri.toString(),
178 identity: evt.did,
179 publication: record.value.publication,
180 record: record.value as Json,
181 });
182 if (error && error.code === "23503") {
183 console.log("creating identity");
184 let client = new Client({ connectionString: process.env.DB_URL });
185 let db = drizzle(client);
186 await createIdentity(db, { atp_did: evt.did });
187 client.end();
188 await supabase.from("publication_subscriptions").upsert({
189 uri: evt.uri.toString(),
190 identity: evt.did,
191 publication: record.value.publication,
192 record: record.value as Json,
193 });
194 }
195 }
196 if (evt.event === "delete") {
197 await supabase
198 .from("publication_subscriptions")
199 .delete()
200 .eq("uri", evt.uri.toString());
201 }
202 }
203 // if (evt.collection === ids.AppBskyActorProfile) {
204 // //only listen to updates because we should fetch it for the first time when they subscribe!
205 // if (evt.event === "update") {
206 // await supabaseServerClient
207 // .from("bsky_profiles")
208 // .update({ record: evt.record as Json })
209 // .eq("did", evt.did);
210 // }
211 // }
212 if (evt.collection === "app.bsky.feed.post") {
213 if (evt.event !== "create") return;
214
215 // Early exit if no embed
216 if (
217 !evt.record ||
218 typeof evt.record !== "object" ||
219 !("embed" in evt.record)
220 )
221 return;
222
223 // Check if embed contains our quote param using optional chaining
224 const embedRecord = evt.record as any;
225 const hasQuoteParam =
226 embedRecord.embed?.external?.uri?.includes(QUOTE_PARAM) ||
227 embedRecord.embed?.media?.external?.uri?.includes(QUOTE_PARAM);
228
229 if (!hasQuoteParam) return;
230 console.log("FOUND EMBED!!!");
231
232 // Now validate the record since we know it contains our quote param
233 let record = AppBskyFeedPost.validateRecord(evt.record);
234 if (!record.success) return;
235
236 let embed: string | null = null;
237 if (
238 AppBskyEmbedExternal.isMain(record.value.embed) &&
239 record.value.embed.external.uri.includes(QUOTE_PARAM)
240 ) {
241 embed = record.value.embed.external.uri;
242 }
243 if (
244 AppBskyEmbedRecordWithMedia.isMain(record.value.embed) &&
245 AppBskyEmbedExternal.isMain(record.value.embed.media) &&
246 record.value.embed.media?.external?.uri.includes(QUOTE_PARAM)
247 ) {
248 embed = record.value.embed.media.external.uri;
249 }
250 if (embed) {
251 console.log(
252 "processing post mention: " + embed + " in " + evt.uri.toString(),
253 );
254 await inngest.send({
255 name: "appview/index-bsky-post-mention",
256 data: { post_uri: evt.uri.toString(), document_link: embed },
257 });
258 }
259 }
260}