a tool for shared writing and social publishing
at feature/atp-polls 301 lines 9.7 kB view raw
1import { createClient } from "@supabase/supabase-js"; 2import { Database, Json } from "supabase/database.types"; 3import { IdResolver } from "@atproto/identity"; 4const idResolver = new IdResolver(); 5import { Firehose, MemoryRunner, Event } from "@atproto/sync"; 6import { ids } from "lexicons/api/lexicons"; 7import { 8 PubLeafletDocument, 9 PubLeafletGraphSubscription, 10 PubLeafletPublication, 11 PubLeafletComment, 12 PubLeafletPollVote, 13 PubLeafletPollDefinition, 14} from "lexicons/api"; 15import { 16 AppBskyEmbedExternal, 17 AppBskyEmbedRecordWithMedia, 18 AppBskyFeedPost, 19 AppBskyRichtextFacet, 20} from "@atproto/api"; 21import { AtUri } from "@atproto/syntax"; 22import { writeFile, readFile } from "fs/promises"; 23import { createIdentity } from "actions/createIdentity"; 24import { drizzle } from "drizzle-orm/node-postgres"; 25import { inngest } from "app/api/inngest/client"; 26import { Client } from "pg"; 27 28const cursorFile = process.env.CURSOR_FILE || "/cursor/cursor"; 29 30let supabase = createClient<Database>( 31 process.env.NEXT_PUBLIC_SUPABASE_API_URL as string, 32 process.env.SUPABASE_SERVICE_ROLE_KEY as string, 33); 34const QUOTE_PARAM = "/l-quote/"; 35async function main() { 36 const runner = new MemoryRunner({}); 37 let firehose = new Firehose({ 38 service: "wss://relay1.us-west.bsky.network", 39 subscriptionReconnectDelay: 3000, 40 excludeAccount: true, 41 excludeIdentity: true, 42 runner, 43 idResolver, 44 filterCollections: [ 45 ids.PubLeafletDocument, 46 ids.PubLeafletPublication, 47 ids.PubLeafletGraphSubscription, 48 ids.PubLeafletComment, 49 ids.PubLeafletPollVote, 50 ids.PubLeafletPollDefinition, 51 // ids.AppBskyActorProfile, 52 "app.bsky.feed.post", 53 ], 54 handleEvent, 55 onError: (err) => { 56 console.error(err); 57 }, 58 }); 59 console.log("starting firehose consumer"); 60 firehose.start(); 61 let cleaningUp = false; 62 const cleanup = async () => { 63 if (cleaningUp) return; 64 cleaningUp = true; 65 console.log("shutting down firehose..."); 66 await firehose.destroy(); 67 await runner.destroy(); 68 process.exit(); 69 }; 70 71 process.on("SIGINT", cleanup); 72 process.on("SIGTERM", cleanup); 73} 74 75main(); 76 77async function handleEvent(evt: Event) { 78 if (evt.event === "identity") { 79 if (evt.handle) 80 await supabase 81 .from("bsky_profiles") 82 .update({ handle: evt.handle }) 83 .eq("did", evt.did); 84 } 85 if ( 86 evt.event == "account" || 87 evt.event === "identity" || 88 evt.event === "sync" 89 ) 90 return; 91 if (evt.collection !== "app.bsky.feed.post") 92 console.log( 93 `${evt.event} in ${evt.collection} ${evt.uri}: ${evt.seq} ${evt.time}`, 94 ); 95 if (evt.collection === ids.PubLeafletDocument) { 96 if (evt.event === "create" || evt.event === "update") { 97 let record = PubLeafletDocument.validateRecord(evt.record); 98 if (!record.success) { 99 console.log(record.error); 100 return; 101 } 102 let docResult = await supabase.from("documents").upsert({ 103 uri: evt.uri.toString(), 104 data: record.value as Json, 105 }); 106 if (docResult.error) console.log(docResult.error); 107 let publicationURI = new AtUri(record.value.publication); 108 109 if (publicationURI.host !== evt.uri.host) { 110 console.log("Unauthorized to create post!"); 111 return; 112 } 113 let docInPublicationResult = await supabase 114 .from("documents_in_publications") 115 .upsert({ 116 publication: record.value.publication, 117 document: evt.uri.toString(), 118 }); 119 if (docInPublicationResult.error) 120 console.log(docInPublicationResult.error); 121 } 122 if (evt.event === "delete") { 123 await supabase.from("documents").delete().eq("uri", evt.uri.toString()); 124 } 125 } 126 if (evt.collection === ids.PubLeafletPublication) { 127 if (evt.event === "create" || evt.event === "update") { 128 let record = PubLeafletPublication.validateRecord(evt.record); 129 if (!record.success) return; 130 let { error } = await supabase.from("publications").upsert({ 131 uri: evt.uri.toString(), 132 identity_did: evt.did, 133 name: record.value.name, 134 record: record.value as Json, 135 }); 136 137 if (error && error.code === "23503") { 138 console.log("creating identity"); 139 let client = new Client({ connectionString: process.env.DB_URL }); 140 let db = drizzle(client); 141 await createIdentity(db, { atp_did: evt.did }); 142 client.end(); 143 await supabase.from("publications").upsert({ 144 uri: evt.uri.toString(), 145 identity_did: evt.did, 146 name: record.value.name, 147 record: record.value as Json, 148 }); 149 } 150 } 151 if (evt.event === "delete") { 152 await supabase 153 .from("publications") 154 .delete() 155 .eq("uri", evt.uri.toString()); 156 } 157 } 158 if (evt.collection === ids.PubLeafletComment) { 159 if (evt.event === "create" || evt.event === "update") { 160 let record = PubLeafletComment.validateRecord(evt.record); 161 if (!record.success) return; 162 let { error } = await supabase.from("comments_on_documents").upsert({ 163 uri: evt.uri.toString(), 164 profile: evt.did, 165 document: record.value.subject, 166 record: record.value as Json, 167 }); 168 } 169 if (evt.event === "delete") { 170 await supabase 171 .from("comments_on_documents") 172 .delete() 173 .eq("uri", evt.uri.toString()); 174 } 175 } 176 if (evt.collection === ids.PubLeafletPollVote) { 177 if (evt.event === "create" || evt.event === "update") { 178 let record = PubLeafletPollVote.validateRecord(evt.record); 179 if (!record.success) return; 180 let { error } = await supabase.from("atp_poll_votes").upsert({ 181 uri: evt.uri.toString(), 182 voter_did: evt.did, 183 poll_uri: record.value.poll.uri, 184 poll_cid: record.value.poll.cid, 185 record: record.value as Json, 186 }); 187 } 188 if (evt.event === "delete") { 189 await supabase 190 .from("atp_poll_votes") 191 .delete() 192 .eq("uri", evt.uri.toString()); 193 } 194 } 195 if (evt.collection === ids.PubLeafletPollDefinition) { 196 if (evt.event === "create" || evt.event === "update") { 197 let record = PubLeafletPollDefinition.validateRecord(evt.record); 198 if (!record.success) return; 199 let { error } = await supabase.from("atp_poll_records").upsert({ 200 uri: evt.uri.toString(), 201 cid: evt.cid.toString(), 202 record: record.value as Json, 203 }); 204 if (error) console.log("Error upserting poll definition:", error); 205 } 206 if (evt.event === "delete") { 207 await supabase 208 .from("atp_poll_records") 209 .delete() 210 .eq("uri", evt.uri.toString()); 211 } 212 } 213 if (evt.collection === ids.PubLeafletGraphSubscription) { 214 if (evt.event === "create" || evt.event === "update") { 215 let record = PubLeafletGraphSubscription.validateRecord(evt.record); 216 if (!record.success) return; 217 let { error } = await supabase.from("publication_subscriptions").upsert({ 218 uri: evt.uri.toString(), 219 identity: evt.did, 220 publication: record.value.publication, 221 record: record.value as Json, 222 }); 223 if (error && error.code === "23503") { 224 console.log("creating identity"); 225 let client = new Client({ connectionString: process.env.DB_URL }); 226 let db = drizzle(client); 227 await createIdentity(db, { atp_did: evt.did }); 228 client.end(); 229 await supabase.from("publication_subscriptions").upsert({ 230 uri: evt.uri.toString(), 231 identity: evt.did, 232 publication: record.value.publication, 233 record: record.value as Json, 234 }); 235 } 236 } 237 if (evt.event === "delete") { 238 await supabase 239 .from("publication_subscriptions") 240 .delete() 241 .eq("uri", evt.uri.toString()); 242 } 243 } 244 // if (evt.collection === ids.AppBskyActorProfile) { 245 // //only listen to updates because we should fetch it for the first time when they subscribe! 246 // if (evt.event === "update") { 247 // await supabaseServerClient 248 // .from("bsky_profiles") 249 // .update({ record: evt.record as Json }) 250 // .eq("did", evt.did); 251 // } 252 // } 253 if (evt.collection === "app.bsky.feed.post") { 254 if (evt.event !== "create") return; 255 256 // Early exit if no embed 257 if ( 258 !evt.record || 259 typeof evt.record !== "object" || 260 !("embed" in evt.record) 261 ) 262 return; 263 264 // Check if embed contains our quote param using optional chaining 265 const embedRecord = evt.record as any; 266 const hasQuoteParam = 267 embedRecord.embed?.external?.uri?.includes(QUOTE_PARAM) || 268 embedRecord.embed?.media?.external?.uri?.includes(QUOTE_PARAM); 269 270 if (!hasQuoteParam) return; 271 console.log("FOUND EMBED!!!"); 272 273 // Now validate the record since we know it contains our quote param 274 let record = AppBskyFeedPost.validateRecord(evt.record); 275 if (!record.success) return; 276 277 let embed: string | null = null; 278 if ( 279 AppBskyEmbedExternal.isMain(record.value.embed) && 280 record.value.embed.external.uri.includes(QUOTE_PARAM) 281 ) { 282 embed = record.value.embed.external.uri; 283 } 284 if ( 285 AppBskyEmbedRecordWithMedia.isMain(record.value.embed) && 286 AppBskyEmbedExternal.isMain(record.value.embed.media) && 287 record.value.embed.media?.external?.uri.includes(QUOTE_PARAM) 288 ) { 289 embed = record.value.embed.media.external.uri; 290 } 291 if (embed) { 292 console.log( 293 "processing post mention: " + embed + " in " + evt.uri.toString(), 294 ); 295 await inngest.send({ 296 name: "appview/index-bsky-post-mention", 297 data: { post_uri: evt.uri.toString(), document_link: embed }, 298 }); 299 } 300 } 301}