a tool for shared writing and social publishing
at feature/footnotes 393 lines 13 kB view raw
1import { createClient } from "@supabase/supabase-js"; 2import { Database, Json } from "supabase/database.types"; 3import { IdResolver } from "@atproto/identity"; 4const idResolver = new IdResolver(); 5import { Firehose, MemoryRunner, Event } from "@atproto/sync"; 6import { ids } from "lexicons/api/lexicons"; 7import { 8 PubLeafletDocument, 9 PubLeafletGraphSubscription, 10 PubLeafletPublication, 11 PubLeafletComment, 12 PubLeafletPollVote, 13 PubLeafletPollDefinition, 14 PubLeafletInteractionsRecommend, 15 SiteStandardDocument, 16 SiteStandardPublication, 17 SiteStandardGraphSubscription, 18} from "lexicons/api"; 19import { 20 AppBskyEmbedExternal, 21 AppBskyEmbedRecordWithMedia, 22 AppBskyFeedPost, 23 AppBskyRichtextFacet, 24} from "@atproto/api"; 25import { AtUri } from "@atproto/syntax"; 26import { writeFile, readFile } from "fs/promises"; 27import { inngest } from "app/api/inngest/client"; 28 29const cursorFile = process.env.CURSOR_FILE || "/cursor/cursor"; 30 31let supabase = createClient<Database>( 32 process.env.NEXT_PUBLIC_SUPABASE_API_URL as string, 33 process.env.SUPABASE_SERVICE_ROLE_KEY as string, 34); 35const QUOTE_PARAM = "/l-quote/"; 36async function main() { 37 const runner = new MemoryRunner({}); 38 let firehose = new Firehose({ 39 service: "wss://relay1.us-west.bsky.network", 40 subscriptionReconnectDelay: 3000, 41 excludeAccount: true, 42 excludeIdentity: true, 43 runner, 44 idResolver, 45 filterCollections: [ 46 ids.PubLeafletDocument, 47 ids.PubLeafletPublication, 48 ids.PubLeafletGraphSubscription, 49 ids.PubLeafletComment, 50 ids.PubLeafletPollVote, 51 ids.PubLeafletPollDefinition, 52 ids.PubLeafletInteractionsRecommend, 53 // ids.AppBskyActorProfile, 54 "app.bsky.feed.post", 55 ids.SiteStandardDocument, 56 ids.SiteStandardPublication, 57 ids.SiteStandardGraphSubscription, 58 ], 59 handleEvent, 60 onError: (err) => { 61 console.error(err); 62 }, 63 }); 64 console.log("starting firehose consumer"); 65 firehose.start(); 66 let cleaningUp = false; 67 const cleanup = async () => { 68 if (cleaningUp) return; 69 cleaningUp = true; 70 console.log("shutting down firehose..."); 71 await firehose.destroy(); 72 await runner.destroy(); 73 process.exit(); 74 }; 75 76 process.on("SIGINT", cleanup); 77 process.on("SIGTERM", cleanup); 78} 79 80main(); 81 82async function handleEvent(evt: Event) { 83 if (evt.event === "identity") { 84 if (evt.handle) 85 await supabase 86 .from("bsky_profiles") 87 .update({ handle: evt.handle }) 88 .eq("did", evt.did); 89 } 90 if ( 91 evt.event == "account" || 92 evt.event === "identity" || 93 evt.event === "sync" 94 ) 95 return; 96 if (evt.collection !== "app.bsky.feed.post") 97 console.log( 98 `${evt.event} in ${evt.collection} ${evt.uri}: ${evt.seq} ${evt.time}`, 99 ); 100 if (evt.collection === ids.PubLeafletDocument) { 101 if (evt.event === "create" || evt.event === "update") { 102 let record = PubLeafletDocument.validateRecord(evt.record); 103 if (!record.success) { 104 console.log(record.error); 105 return; 106 } 107 let publication: string | null = null; 108 if (record.value.publication) { 109 let publicationURI = new AtUri(record.value.publication); 110 if (publicationURI.host !== evt.uri.host) { 111 console.log("Unauthorized to create post!"); 112 return; 113 } 114 publication = record.value.publication; 115 } 116 await inngest.send({ 117 name: "appview/index-document", 118 data: { 119 document_uri: evt.uri.toString(), 120 document_data: record.value as Json, 121 bsky_post_uri: record.value.postRef?.uri, 122 publication, 123 did: evt.did, 124 }, 125 }); 126 } 127 if (evt.event === "delete") { 128 await supabase.from("documents").delete().eq("uri", evt.uri.toString()); 129 } 130 } 131 if (evt.collection === ids.PubLeafletPublication) { 132 if (evt.event === "create" || evt.event === "update") { 133 let record = PubLeafletPublication.validateRecord(evt.record); 134 if (!record.success) return; 135 await supabase 136 .from("identities") 137 .upsert({ atp_did: evt.did }, { onConflict: "atp_did" }); 138 await supabase.from("publications").upsert({ 139 uri: evt.uri.toString(), 140 identity_did: evt.did, 141 name: record.value.name, 142 record: record.value as Json, 143 }); 144 } 145 if (evt.event === "delete") { 146 await supabase 147 .from("publications") 148 .delete() 149 .eq("uri", evt.uri.toString()); 150 } 151 } 152 if (evt.collection === ids.PubLeafletComment) { 153 if (evt.event === "create" || evt.event === "update") { 154 let record = PubLeafletComment.validateRecord(evt.record); 155 if (!record.success) return; 156 let { error } = await supabase.from("comments_on_documents").upsert({ 157 uri: evt.uri.toString(), 158 profile: evt.did, 159 document: record.value.subject, 160 record: record.value as Json, 161 }); 162 } 163 if (evt.event === "delete") { 164 await supabase 165 .from("comments_on_documents") 166 .delete() 167 .eq("uri", evt.uri.toString()); 168 } 169 } 170 if (evt.collection === ids.PubLeafletPollVote) { 171 if (evt.event === "create" || evt.event === "update") { 172 let record = PubLeafletPollVote.validateRecord(evt.record); 173 if (!record.success) return; 174 let { error } = await supabase.from("atp_poll_votes").upsert({ 175 uri: evt.uri.toString(), 176 voter_did: evt.did, 177 poll_uri: record.value.poll.uri, 178 poll_cid: record.value.poll.cid, 179 record: record.value as Json, 180 }); 181 } 182 if (evt.event === "delete") { 183 await supabase 184 .from("atp_poll_votes") 185 .delete() 186 .eq("uri", evt.uri.toString()); 187 } 188 } 189 if (evt.collection === ids.PubLeafletPollDefinition) { 190 if (evt.event === "create" || evt.event === "update") { 191 let record = PubLeafletPollDefinition.validateRecord(evt.record); 192 if (!record.success) return; 193 let { error } = await supabase.from("atp_poll_records").upsert({ 194 uri: evt.uri.toString(), 195 cid: evt.cid.toString(), 196 record: record.value as Json, 197 }); 198 if (error) console.log("Error upserting poll definition:", error); 199 } 200 if (evt.event === "delete") { 201 await supabase 202 .from("atp_poll_records") 203 .delete() 204 .eq("uri", evt.uri.toString()); 205 } 206 } 207 if (evt.collection === ids.PubLeafletInteractionsRecommend) { 208 if (evt.event === "create" || evt.event === "update") { 209 let record = PubLeafletInteractionsRecommend.validateRecord(evt.record); 210 if (!record.success) return; 211 await supabase 212 .from("identities") 213 .upsert({ atp_did: evt.did }, { onConflict: "atp_did" }); 214 let { error } = await supabase.from("recommends_on_documents").upsert({ 215 uri: evt.uri.toString(), 216 recommender_did: evt.did, 217 document: record.value.subject, 218 record: record.value as Json, 219 }); 220 if (error) console.log("Error upserting recommend:", error); 221 } 222 if (evt.event === "delete") { 223 await supabase 224 .from("recommends_on_documents") 225 .delete() 226 .eq("uri", evt.uri.toString()); 227 } 228 } 229 if (evt.collection === ids.PubLeafletGraphSubscription) { 230 if (evt.event === "create" || evt.event === "update") { 231 let record = PubLeafletGraphSubscription.validateRecord(evt.record); 232 if (!record.success) return; 233 await supabase 234 .from("identities") 235 .upsert({ atp_did: evt.did }, { onConflict: "atp_did" }); 236 await supabase.from("publication_subscriptions").upsert({ 237 uri: evt.uri.toString(), 238 identity: evt.did, 239 publication: record.value.publication, 240 record: record.value as Json, 241 }); 242 } 243 if (evt.event === "delete") { 244 await supabase 245 .from("publication_subscriptions") 246 .delete() 247 .eq("uri", evt.uri.toString()); 248 } 249 } 250 // site.standard.document records go into the main "documents" table 251 // The normalization layer handles reading both pub.leaflet and site.standard formats 252 if (evt.collection === ids.SiteStandardDocument) { 253 if (evt.event === "create" || evt.event === "update") { 254 let record = SiteStandardDocument.validateRecord(evt.record); 255 if (!record.success) { 256 console.log(record.error); 257 return; 258 } 259 // site.standard.document uses "site" field to reference the publication 260 // For documents in publications, site is an AT-URI (at://did:plc:xxx/site.standard.publication/rkey) 261 // For standalone documents, site is an HTTPS URL (https://leaflet.pub/p/did:plc:xxx) 262 // Only link to publications table for AT-URI sites 263 let publication: string | null = null; 264 if (record.value.site && record.value.site.startsWith("at://")) { 265 let siteURI = new AtUri(record.value.site); 266 if (siteURI.host !== evt.uri.host) { 267 console.log("Unauthorized to create document in site!"); 268 return; 269 } 270 publication = record.value.site; 271 } 272 await inngest.send({ 273 name: "appview/index-document", 274 data: { 275 document_uri: evt.uri.toString(), 276 document_data: record.value as Json, 277 bsky_post_uri: record.value.bskyPostRef?.uri, 278 publication, 279 did: evt.did, 280 }, 281 }); 282 } 283 if (evt.event === "delete") { 284 await supabase.from("documents").delete().eq("uri", evt.uri.toString()); 285 } 286 } 287 288 // site.standard.publication records go into the main "publications" table 289 if (evt.collection === ids.SiteStandardPublication) { 290 if (evt.event === "create" || evt.event === "update") { 291 let record = SiteStandardPublication.validateRecord(evt.record); 292 if (!record.success) return; 293 await supabase 294 .from("identities") 295 .upsert({ atp_did: evt.did }, { onConflict: "atp_did" }); 296 await supabase.from("publications").upsert({ 297 uri: evt.uri.toString(), 298 identity_did: evt.did, 299 name: record.value.name, 300 record: record.value as Json, 301 }); 302 } 303 if (evt.event === "delete") { 304 await supabase 305 .from("publications") 306 .delete() 307 .eq("uri", evt.uri.toString()); 308 } 309 } 310 311 // site.standard.graph.subscription records go into the main "publication_subscriptions" table 312 if (evt.collection === ids.SiteStandardGraphSubscription) { 313 if (evt.event === "create" || evt.event === "update") { 314 let record = SiteStandardGraphSubscription.validateRecord(evt.record); 315 if (!record.success) return; 316 await supabase 317 .from("identities") 318 .upsert({ atp_did: evt.did }, { onConflict: "atp_did" }); 319 await supabase.from("publication_subscriptions").upsert({ 320 uri: evt.uri.toString(), 321 identity: evt.did, 322 publication: record.value.publication, 323 record: record.value as Json, 324 }); 325 } 326 if (evt.event === "delete") { 327 await supabase 328 .from("publication_subscriptions") 329 .delete() 330 .eq("uri", evt.uri.toString()); 331 } 332 } 333 // if (evt.collection === ids.AppBskyActorProfile) { 334 // //only listen to updates because we should fetch it for the first time when they subscribe! 335 // if (evt.event === "update") { 336 // await supabaseServerClient 337 // .from("bsky_profiles") 338 // .update({ record: evt.record as Json }) 339 // .eq("did", evt.did); 340 // } 341 // } 342 if (evt.collection === "app.bsky.feed.post") { 343 if (evt.event !== "create") return; 344 345 // Early exit if no embed 346 if ( 347 !evt.record || 348 typeof evt.record !== "object" || 349 !("embed" in evt.record) 350 ) 351 return; 352 353 // Check if embed contains our quote param using optional chaining 354 const embedRecord = evt.record as any; 355 const hasQuoteParam = 356 embedRecord.embed?.external?.uri?.includes(QUOTE_PARAM) || 357 embedRecord.embed?.media?.external?.uri?.includes(QUOTE_PARAM); 358 359 if (!hasQuoteParam) return; 360 console.log("FOUND EMBED!!!"); 361 362 // Now validate the record since we know it contains our quote param 363 let record = AppBskyFeedPost.validateRecord(evt.record); 364 if (!record.success) { 365 console.log(record.error); 366 return; 367 } 368 369 let embed: string | null = null; 370 if ( 371 AppBskyEmbedExternal.isMain(record.value.embed) && 372 record.value.embed.external.uri.includes(QUOTE_PARAM) 373 ) { 374 embed = record.value.embed.external.uri; 375 } 376 if ( 377 AppBskyEmbedRecordWithMedia.isMain(record.value.embed) && 378 AppBskyEmbedExternal.isMain(record.value.embed.media) && 379 record.value.embed.media?.external?.uri.includes(QUOTE_PARAM) 380 ) { 381 embed = record.value.embed.media.external.uri; 382 } 383 if (embed) { 384 console.log( 385 "processing post mention: " + embed + " in " + evt.uri.toString(), 386 ); 387 await inngest.send({ 388 name: "appview/index-bsky-post-mention", 389 data: { post_uri: evt.uri.toString(), document_link: embed }, 390 }); 391 } 392 } 393}