a tool for shared writing and social publishing
at update/looseleafs 309 lines 10 kB view raw
1import { createClient } from "@supabase/supabase-js"; 2import { Database, Json } from "supabase/database.types"; 3import { IdResolver } from "@atproto/identity"; 4const idResolver = new IdResolver(); 5import { Firehose, MemoryRunner, Event } from "@atproto/sync"; 6import { ids } from "lexicons/api/lexicons"; 7import { 8 PubLeafletDocument, 9 PubLeafletGraphSubscription, 10 PubLeafletPublication, 11 PubLeafletComment, 12 PubLeafletPollVote, 13 PubLeafletPollDefinition, 14} from "lexicons/api"; 15import { 16 AppBskyEmbedExternal, 17 AppBskyEmbedRecordWithMedia, 18 AppBskyFeedPost, 19 AppBskyRichtextFacet, 20} from "@atproto/api"; 21import { AtUri } from "@atproto/syntax"; 22import { writeFile, readFile } from "fs/promises"; 23import { createIdentity } from "actions/createIdentity"; 24import { drizzle } from "drizzle-orm/node-postgres"; 25import { inngest } from "app/api/inngest/client"; 26import { Client } from "pg"; 27 28const cursorFile = process.env.CURSOR_FILE || "/cursor/cursor"; 29 30let supabase = createClient<Database>( 31 process.env.NEXT_PUBLIC_SUPABASE_API_URL as string, 32 process.env.SUPABASE_SERVICE_ROLE_KEY as string, 33); 34const QUOTE_PARAM = "/l-quote/"; 35async function main() { 36 const runner = new MemoryRunner({}); 37 let firehose = new Firehose({ 38 service: "wss://relay1.us-west.bsky.network", 39 subscriptionReconnectDelay: 3000, 40 excludeAccount: true, 41 excludeIdentity: true, 42 runner, 43 idResolver, 44 filterCollections: [ 45 ids.PubLeafletDocument, 46 ids.PubLeafletPublication, 47 ids.PubLeafletGraphSubscription, 48 ids.PubLeafletComment, 49 ids.PubLeafletPollVote, 50 ids.PubLeafletPollDefinition, 51 // ids.AppBskyActorProfile, 52 "app.bsky.feed.post", 53 ], 54 handleEvent, 55 onError: (err) => { 56 console.error(err); 57 }, 58 }); 59 console.log("starting firehose consumer"); 60 firehose.start(); 61 let cleaningUp = false; 62 const cleanup = async () => { 63 if (cleaningUp) return; 64 cleaningUp = true; 65 console.log("shutting down firehose..."); 66 await firehose.destroy(); 67 await runner.destroy(); 68 process.exit(); 69 }; 70 71 process.on("SIGINT", cleanup); 72 process.on("SIGTERM", cleanup); 73} 74 75main(); 76 77async function handleEvent(evt: Event) { 78 if (evt.event === "identity") { 79 if (evt.handle) 80 await supabase 81 .from("bsky_profiles") 82 .update({ handle: evt.handle }) 83 .eq("did", evt.did); 84 } 85 if ( 86 evt.event == "account" || 87 evt.event === "identity" || 88 evt.event === "sync" 89 ) 90 return; 91 if (evt.collection !== "app.bsky.feed.post") 92 console.log( 93 `${evt.event} in ${evt.collection} ${evt.uri}: ${evt.seq} ${evt.time}`, 94 ); 95 if (evt.collection === ids.PubLeafletDocument) { 96 if (evt.event === "create" || evt.event === "update") { 97 let record = PubLeafletDocument.validateRecord(evt.record); 98 if (!record.success) { 99 console.log(record.error); 100 return; 101 } 102 let docResult = await supabase.from("documents").upsert({ 103 uri: evt.uri.toString(), 104 data: record.value as Json, 105 }); 106 if (docResult.error) console.log(docResult.error); 107 if (record.value.publication) { 108 let publicationURI = new AtUri(record.value.publication); 109 110 if (publicationURI.host !== evt.uri.host) { 111 console.log("Unauthorized to create post!"); 112 return; 113 } 114 let docInPublicationResult = await supabase 115 .from("documents_in_publications") 116 .upsert({ 117 publication: record.value.publication, 118 document: evt.uri.toString(), 119 }); 120 await supabase 121 .from("documents_in_publications") 122 .delete() 123 .neq("publication", record.value.publication) 124 .eq("document", evt.uri.toString()); 125 126 if (docInPublicationResult.error) 127 console.log(docInPublicationResult.error); 128 } 129 } 130 if (evt.event === "delete") { 131 await supabase.from("documents").delete().eq("uri", evt.uri.toString()); 132 } 133 } 134 if (evt.collection === ids.PubLeafletPublication) { 135 if (evt.event === "create" || evt.event === "update") { 136 let record = PubLeafletPublication.validateRecord(evt.record); 137 if (!record.success) return; 138 let { error } = await supabase.from("publications").upsert({ 139 uri: evt.uri.toString(), 140 identity_did: evt.did, 141 name: record.value.name, 142 record: record.value as Json, 143 }); 144 145 if (error && error.code === "23503") { 146 console.log("creating identity"); 147 let client = new Client({ connectionString: process.env.DB_URL }); 148 let db = drizzle(client); 149 await createIdentity(db, { atp_did: evt.did }); 150 client.end(); 151 await supabase.from("publications").upsert({ 152 uri: evt.uri.toString(), 153 identity_did: evt.did, 154 name: record.value.name, 155 record: record.value as Json, 156 }); 157 } 158 } 159 if (evt.event === "delete") { 160 await supabase 161 .from("publications") 162 .delete() 163 .eq("uri", evt.uri.toString()); 164 } 165 } 166 if (evt.collection === ids.PubLeafletComment) { 167 if (evt.event === "create" || evt.event === "update") { 168 let record = PubLeafletComment.validateRecord(evt.record); 169 if (!record.success) return; 170 let { error } = await supabase.from("comments_on_documents").upsert({ 171 uri: evt.uri.toString(), 172 profile: evt.did, 173 document: record.value.subject, 174 record: record.value as Json, 175 }); 176 } 177 if (evt.event === "delete") { 178 await supabase 179 .from("comments_on_documents") 180 .delete() 181 .eq("uri", evt.uri.toString()); 182 } 183 } 184 if (evt.collection === ids.PubLeafletPollVote) { 185 if (evt.event === "create" || evt.event === "update") { 186 let record = PubLeafletPollVote.validateRecord(evt.record); 187 if (!record.success) return; 188 let { error } = await supabase.from("atp_poll_votes").upsert({ 189 uri: evt.uri.toString(), 190 voter_did: evt.did, 191 poll_uri: record.value.poll.uri, 192 poll_cid: record.value.poll.cid, 193 record: record.value as Json, 194 }); 195 } 196 if (evt.event === "delete") { 197 await supabase 198 .from("atp_poll_votes") 199 .delete() 200 .eq("uri", evt.uri.toString()); 201 } 202 } 203 if (evt.collection === ids.PubLeafletPollDefinition) { 204 if (evt.event === "create" || evt.event === "update") { 205 let record = PubLeafletPollDefinition.validateRecord(evt.record); 206 if (!record.success) return; 207 let { error } = await supabase.from("atp_poll_records").upsert({ 208 uri: evt.uri.toString(), 209 cid: evt.cid.toString(), 210 record: record.value as Json, 211 }); 212 if (error) console.log("Error upserting poll definition:", error); 213 } 214 if (evt.event === "delete") { 215 await supabase 216 .from("atp_poll_records") 217 .delete() 218 .eq("uri", evt.uri.toString()); 219 } 220 } 221 if (evt.collection === ids.PubLeafletGraphSubscription) { 222 if (evt.event === "create" || evt.event === "update") { 223 let record = PubLeafletGraphSubscription.validateRecord(evt.record); 224 if (!record.success) return; 225 let { error } = await supabase.from("publication_subscriptions").upsert({ 226 uri: evt.uri.toString(), 227 identity: evt.did, 228 publication: record.value.publication, 229 record: record.value as Json, 230 }); 231 if (error && error.code === "23503") { 232 console.log("creating identity"); 233 let client = new Client({ connectionString: process.env.DB_URL }); 234 let db = drizzle(client); 235 await createIdentity(db, { atp_did: evt.did }); 236 client.end(); 237 await supabase.from("publication_subscriptions").upsert({ 238 uri: evt.uri.toString(), 239 identity: evt.did, 240 publication: record.value.publication, 241 record: record.value as Json, 242 }); 243 } 244 } 245 if (evt.event === "delete") { 246 await supabase 247 .from("publication_subscriptions") 248 .delete() 249 .eq("uri", evt.uri.toString()); 250 } 251 } 252 // if (evt.collection === ids.AppBskyActorProfile) { 253 // //only listen to updates because we should fetch it for the first time when they subscribe! 254 // if (evt.event === "update") { 255 // await supabaseServerClient 256 // .from("bsky_profiles") 257 // .update({ record: evt.record as Json }) 258 // .eq("did", evt.did); 259 // } 260 // } 261 if (evt.collection === "app.bsky.feed.post") { 262 if (evt.event !== "create") return; 263 264 // Early exit if no embed 265 if ( 266 !evt.record || 267 typeof evt.record !== "object" || 268 !("embed" in evt.record) 269 ) 270 return; 271 272 // Check if embed contains our quote param using optional chaining 273 const embedRecord = evt.record as any; 274 const hasQuoteParam = 275 embedRecord.embed?.external?.uri?.includes(QUOTE_PARAM) || 276 embedRecord.embed?.media?.external?.uri?.includes(QUOTE_PARAM); 277 278 if (!hasQuoteParam) return; 279 console.log("FOUND EMBED!!!"); 280 281 // Now validate the record since we know it contains our quote param 282 let record = AppBskyFeedPost.validateRecord(evt.record); 283 if (!record.success) return; 284 285 let embed: string | null = null; 286 if ( 287 AppBskyEmbedExternal.isMain(record.value.embed) && 288 record.value.embed.external.uri.includes(QUOTE_PARAM) 289 ) { 290 embed = record.value.embed.external.uri; 291 } 292 if ( 293 AppBskyEmbedRecordWithMedia.isMain(record.value.embed) && 294 AppBskyEmbedExternal.isMain(record.value.embed.media) && 295 record.value.embed.media?.external?.uri.includes(QUOTE_PARAM) 296 ) { 297 embed = record.value.embed.media.external.uri; 298 } 299 if (embed) { 300 console.log( 301 "processing post mention: " + embed + " in " + evt.uri.toString(), 302 ); 303 await inngest.send({ 304 name: "appview/index-bsky-post-mention", 305 data: { post_uri: evt.uri.toString(), document_link: embed }, 306 }); 307 } 308 } 309}