a tool for shared writing and social publishing
at feature/post-options 285 lines 9.1 kB view raw
1import { createClient } from "@supabase/supabase-js"; 2import { Database, Json } from "supabase/database.types"; 3import { IdResolver } from "@atproto/identity"; 4const idResolver = new IdResolver(); 5import { Firehose, MemoryRunner, Event } from "@atproto/sync"; 6import { ids } from "lexicons/api/lexicons"; 7import { 8 PubLeafletDocument, 9 PubLeafletGraphSubscription, 10 PubLeafletPublication, 11 PubLeafletComment, 12 PubLeafletPollVote, 13 PubLeafletPollDefinition, 14} from "lexicons/api"; 15import { 16 AppBskyEmbedExternal, 17 AppBskyEmbedRecordWithMedia, 18 AppBskyFeedPost, 19 AppBskyRichtextFacet, 20} from "@atproto/api"; 21import { AtUri } from "@atproto/syntax"; 22import { writeFile, readFile } from "fs/promises"; 23import { inngest } from "app/api/inngest/client"; 24 25const cursorFile = process.env.CURSOR_FILE || "/cursor/cursor"; 26 27let supabase = createClient<Database>( 28 process.env.NEXT_PUBLIC_SUPABASE_API_URL as string, 29 process.env.SUPABASE_SERVICE_ROLE_KEY as string, 30); 31const QUOTE_PARAM = "/l-quote/"; 32async function main() { 33 const runner = new MemoryRunner({}); 34 let firehose = new Firehose({ 35 service: "wss://relay1.us-west.bsky.network", 36 subscriptionReconnectDelay: 3000, 37 excludeAccount: true, 38 excludeIdentity: true, 39 runner, 40 idResolver, 41 filterCollections: [ 42 ids.PubLeafletDocument, 43 ids.PubLeafletPublication, 44 ids.PubLeafletGraphSubscription, 45 ids.PubLeafletComment, 46 ids.PubLeafletPollVote, 47 ids.PubLeafletPollDefinition, 48 // ids.AppBskyActorProfile, 49 "app.bsky.feed.post", 50 ], 51 handleEvent, 52 onError: (err) => { 53 console.error(err); 54 }, 55 }); 56 console.log("starting firehose consumer"); 57 firehose.start(); 58 let cleaningUp = false; 59 const cleanup = async () => { 60 if (cleaningUp) return; 61 cleaningUp = true; 62 console.log("shutting down firehose..."); 63 await firehose.destroy(); 64 await runner.destroy(); 65 process.exit(); 66 }; 67 68 process.on("SIGINT", cleanup); 69 process.on("SIGTERM", cleanup); 70} 71 72main(); 73 74async function handleEvent(evt: Event) { 75 if (evt.event === "identity") { 76 if (evt.handle) 77 await supabase 78 .from("bsky_profiles") 79 .update({ handle: evt.handle }) 80 .eq("did", evt.did); 81 } 82 if ( 83 evt.event == "account" || 84 evt.event === "identity" || 85 evt.event === "sync" 86 ) 87 return; 88 if (evt.collection !== "app.bsky.feed.post") 89 console.log( 90 `${evt.event} in ${evt.collection} ${evt.uri}: ${evt.seq} ${evt.time}`, 91 ); 92 if (evt.collection === ids.PubLeafletDocument) { 93 if (evt.event === "create" || evt.event === "update") { 94 let record = PubLeafletDocument.validateRecord(evt.record); 95 if (!record.success) { 96 console.log(record.error); 97 return; 98 } 99 let docResult = await supabase.from("documents").upsert({ 100 uri: evt.uri.toString(), 101 data: record.value as Json, 102 }); 103 if (docResult.error) console.log(docResult.error); 104 if (record.value.publication) { 105 let publicationURI = new AtUri(record.value.publication); 106 107 if (publicationURI.host !== evt.uri.host) { 108 console.log("Unauthorized to create post!"); 109 return; 110 } 111 let docInPublicationResult = await supabase 112 .from("documents_in_publications") 113 .upsert({ 114 publication: record.value.publication, 115 document: evt.uri.toString(), 116 }); 117 await supabase 118 .from("documents_in_publications") 119 .delete() 120 .neq("publication", record.value.publication) 121 .eq("document", evt.uri.toString()); 122 123 if (docInPublicationResult.error) 124 console.log(docInPublicationResult.error); 125 } 126 } 127 if (evt.event === "delete") { 128 await supabase.from("documents").delete().eq("uri", evt.uri.toString()); 129 } 130 } 131 if (evt.collection === ids.PubLeafletPublication) { 132 if (evt.event === "create" || evt.event === "update") { 133 let record = PubLeafletPublication.validateRecord(evt.record); 134 if (!record.success) return; 135 await supabase 136 .from("identities") 137 .upsert({ atp_did: evt.did }, { onConflict: "atp_did" }); 138 await supabase.from("publications").upsert({ 139 uri: evt.uri.toString(), 140 identity_did: evt.did, 141 name: record.value.name, 142 record: record.value as Json, 143 }); 144 } 145 if (evt.event === "delete") { 146 await supabase 147 .from("publications") 148 .delete() 149 .eq("uri", evt.uri.toString()); 150 } 151 } 152 if (evt.collection === ids.PubLeafletComment) { 153 if (evt.event === "create" || evt.event === "update") { 154 let record = PubLeafletComment.validateRecord(evt.record); 155 if (!record.success) return; 156 let { error } = await supabase.from("comments_on_documents").upsert({ 157 uri: evt.uri.toString(), 158 profile: evt.did, 159 document: record.value.subject, 160 record: record.value as Json, 161 }); 162 } 163 if (evt.event === "delete") { 164 await supabase 165 .from("comments_on_documents") 166 .delete() 167 .eq("uri", evt.uri.toString()); 168 } 169 } 170 if (evt.collection === ids.PubLeafletPollVote) { 171 if (evt.event === "create" || evt.event === "update") { 172 let record = PubLeafletPollVote.validateRecord(evt.record); 173 if (!record.success) return; 174 let { error } = await supabase.from("atp_poll_votes").upsert({ 175 uri: evt.uri.toString(), 176 voter_did: evt.did, 177 poll_uri: record.value.poll.uri, 178 poll_cid: record.value.poll.cid, 179 record: record.value as Json, 180 }); 181 } 182 if (evt.event === "delete") { 183 await supabase 184 .from("atp_poll_votes") 185 .delete() 186 .eq("uri", evt.uri.toString()); 187 } 188 } 189 if (evt.collection === ids.PubLeafletPollDefinition) { 190 if (evt.event === "create" || evt.event === "update") { 191 let record = PubLeafletPollDefinition.validateRecord(evt.record); 192 if (!record.success) return; 193 let { error } = await supabase.from("atp_poll_records").upsert({ 194 uri: evt.uri.toString(), 195 cid: evt.cid.toString(), 196 record: record.value as Json, 197 }); 198 if (error) console.log("Error upserting poll definition:", error); 199 } 200 if (evt.event === "delete") { 201 await supabase 202 .from("atp_poll_records") 203 .delete() 204 .eq("uri", evt.uri.toString()); 205 } 206 } 207 if (evt.collection === ids.PubLeafletGraphSubscription) { 208 if (evt.event === "create" || evt.event === "update") { 209 let record = PubLeafletGraphSubscription.validateRecord(evt.record); 210 if (!record.success) return; 211 await supabase 212 .from("identities") 213 .upsert({ atp_did: evt.did }, { onConflict: "atp_did" }); 214 await supabase.from("publication_subscriptions").upsert({ 215 uri: evt.uri.toString(), 216 identity: evt.did, 217 publication: record.value.publication, 218 record: record.value as Json, 219 }); 220 } 221 if (evt.event === "delete") { 222 await supabase 223 .from("publication_subscriptions") 224 .delete() 225 .eq("uri", evt.uri.toString()); 226 } 227 } 228 // if (evt.collection === ids.AppBskyActorProfile) { 229 // //only listen to updates because we should fetch it for the first time when they subscribe! 230 // if (evt.event === "update") { 231 // await supabaseServerClient 232 // .from("bsky_profiles") 233 // .update({ record: evt.record as Json }) 234 // .eq("did", evt.did); 235 // } 236 // } 237 if (evt.collection === "app.bsky.feed.post") { 238 if (evt.event !== "create") return; 239 240 // Early exit if no embed 241 if ( 242 !evt.record || 243 typeof evt.record !== "object" || 244 !("embed" in evt.record) 245 ) 246 return; 247 248 // Check if embed contains our quote param using optional chaining 249 const embedRecord = evt.record as any; 250 const hasQuoteParam = 251 embedRecord.embed?.external?.uri?.includes(QUOTE_PARAM) || 252 embedRecord.embed?.media?.external?.uri?.includes(QUOTE_PARAM); 253 254 if (!hasQuoteParam) return; 255 console.log("FOUND EMBED!!!"); 256 257 // Now validate the record since we know it contains our quote param 258 let record = AppBskyFeedPost.validateRecord(evt.record); 259 if (!record.success) return; 260 261 let embed: string | null = null; 262 if ( 263 AppBskyEmbedExternal.isMain(record.value.embed) && 264 record.value.embed.external.uri.includes(QUOTE_PARAM) 265 ) { 266 embed = record.value.embed.external.uri; 267 } 268 if ( 269 AppBskyEmbedRecordWithMedia.isMain(record.value.embed) && 270 AppBskyEmbedExternal.isMain(record.value.embed.media) && 271 record.value.embed.media?.external?.uri.includes(QUOTE_PARAM) 272 ) { 273 embed = record.value.embed.media.external.uri; 274 } 275 if (embed) { 276 console.log( 277 "processing post mention: " + embed + " in " + evt.uri.toString(), 278 ); 279 await inngest.send({ 280 name: "appview/index-bsky-post-mention", 281 data: { post_uri: evt.uri.toString(), document_link: embed }, 282 }); 283 } 284 } 285}