a tool for shared writing and social publishing
at feature/analytics 423 lines 14 kB view raw
1import { createClient } from "@supabase/supabase-js"; 2import { Database, Json } from "supabase/database.types"; 3import { IdResolver } from "@atproto/identity"; 4const idResolver = new IdResolver(); 5import { Firehose, MemoryRunner, Event } from "@atproto/sync"; 6import { ids } from "lexicons/api/lexicons"; 7import { 8 PubLeafletDocument, 9 PubLeafletGraphSubscription, 10 PubLeafletPublication, 11 PubLeafletComment, 12 PubLeafletPollVote, 13 PubLeafletPollDefinition, 14 PubLeafletInteractionsRecommend, 15 SiteStandardDocument, 16 SiteStandardPublication, 17 SiteStandardGraphSubscription, 18} from "lexicons/api"; 19import { 20 AppBskyEmbedExternal, 21 AppBskyEmbedRecordWithMedia, 22 AppBskyFeedPost, 23 AppBskyRichtextFacet, 24} from "@atproto/api"; 25import { AtUri } from "@atproto/syntax"; 26import { writeFile, readFile } from "fs/promises"; 27import { inngest } from "app/api/inngest/client"; 28 29const cursorFile = process.env.CURSOR_FILE || "/cursor/cursor"; 30 31let supabase = createClient<Database>( 32 process.env.NEXT_PUBLIC_SUPABASE_API_URL as string, 33 process.env.SUPABASE_SERVICE_ROLE_KEY as string, 34); 35const QUOTE_PARAM = "/l-quote/"; 36async function main() { 37 const runner = new MemoryRunner({}); 38 let firehose = new Firehose({ 39 service: "wss://relay1.us-west.bsky.network", 40 subscriptionReconnectDelay: 3000, 41 excludeAccount: true, 42 excludeIdentity: true, 43 runner, 44 idResolver, 45 filterCollections: [ 46 ids.PubLeafletDocument, 47 ids.PubLeafletPublication, 48 ids.PubLeafletGraphSubscription, 49 ids.PubLeafletComment, 50 ids.PubLeafletPollVote, 51 ids.PubLeafletPollDefinition, 52 ids.PubLeafletInteractionsRecommend, 53 // ids.AppBskyActorProfile, 54 "app.bsky.feed.post", 55 ids.SiteStandardDocument, 56 ids.SiteStandardPublication, 57 ids.SiteStandardGraphSubscription, 58 ], 59 handleEvent, 60 onError: (err) => { 61 console.error(err); 62 }, 63 }); 64 console.log("starting firehose consumer"); 65 firehose.start(); 66 let cleaningUp = false; 67 const cleanup = async () => { 68 if (cleaningUp) return; 69 cleaningUp = true; 70 console.log("shutting down firehose..."); 71 await firehose.destroy(); 72 await runner.destroy(); 73 process.exit(); 74 }; 75 76 process.on("SIGINT", cleanup); 77 process.on("SIGTERM", cleanup); 78} 79 80main(); 81 82async function handleEvent(evt: Event) { 83 if (evt.event === "identity") { 84 if (evt.handle) 85 await supabase 86 .from("bsky_profiles") 87 .update({ handle: evt.handle }) 88 .eq("did", evt.did); 89 } 90 if ( 91 evt.event == "account" || 92 evt.event === "identity" || 93 evt.event === "sync" 94 ) 95 return; 96 if (evt.collection !== "app.bsky.feed.post") 97 console.log( 98 `${evt.event} in ${evt.collection} ${evt.uri}: ${evt.seq} ${evt.time}`, 99 ); 100 if (evt.collection === ids.PubLeafletDocument) { 101 if (evt.event === "create" || evt.event === "update") { 102 let record = PubLeafletDocument.validateRecord(evt.record); 103 if (!record.success) { 104 console.log(record.error); 105 return; 106 } 107 let docResult = await supabase.from("documents").upsert({ 108 uri: evt.uri.toString(), 109 data: record.value as Json, 110 }); 111 if (docResult.error) console.log(docResult.error); 112 await inngest.send({ 113 name: "appview/sync-document-metadata", 114 data: { 115 document_uri: evt.uri.toString(), 116 bsky_post_uri: record.value.postRef?.uri, 117 }, 118 }); 119 if (record.value.publication) { 120 let publicationURI = new AtUri(record.value.publication); 121 122 if (publicationURI.host !== evt.uri.host) { 123 console.log("Unauthorized to create post!"); 124 return; 125 } 126 let docInPublicationResult = await supabase 127 .from("documents_in_publications") 128 .upsert({ 129 publication: record.value.publication, 130 document: evt.uri.toString(), 131 }); 132 await supabase 133 .from("documents_in_publications") 134 .delete() 135 .neq("publication", record.value.publication) 136 .eq("document", evt.uri.toString()); 137 138 if (docInPublicationResult.error) 139 console.log(docInPublicationResult.error); 140 } 141 } 142 if (evt.event === "delete") { 143 await supabase.from("documents").delete().eq("uri", evt.uri.toString()); 144 } 145 } 146 if (evt.collection === ids.PubLeafletPublication) { 147 if (evt.event === "create" || evt.event === "update") { 148 let record = PubLeafletPublication.validateRecord(evt.record); 149 if (!record.success) return; 150 await supabase 151 .from("identities") 152 .upsert({ atp_did: evt.did }, { onConflict: "atp_did" }); 153 await supabase.from("publications").upsert({ 154 uri: evt.uri.toString(), 155 identity_did: evt.did, 156 name: record.value.name, 157 record: record.value as Json, 158 }); 159 } 160 if (evt.event === "delete") { 161 await supabase 162 .from("publications") 163 .delete() 164 .eq("uri", evt.uri.toString()); 165 } 166 } 167 if (evt.collection === ids.PubLeafletComment) { 168 if (evt.event === "create" || evt.event === "update") { 169 let record = PubLeafletComment.validateRecord(evt.record); 170 if (!record.success) return; 171 let { error } = await supabase.from("comments_on_documents").upsert({ 172 uri: evt.uri.toString(), 173 profile: evt.did, 174 document: record.value.subject, 175 record: record.value as Json, 176 }); 177 } 178 if (evt.event === "delete") { 179 await supabase 180 .from("comments_on_documents") 181 .delete() 182 .eq("uri", evt.uri.toString()); 183 } 184 } 185 if (evt.collection === ids.PubLeafletPollVote) { 186 if (evt.event === "create" || evt.event === "update") { 187 let record = PubLeafletPollVote.validateRecord(evt.record); 188 if (!record.success) return; 189 let { error } = await supabase.from("atp_poll_votes").upsert({ 190 uri: evt.uri.toString(), 191 voter_did: evt.did, 192 poll_uri: record.value.poll.uri, 193 poll_cid: record.value.poll.cid, 194 record: record.value as Json, 195 }); 196 } 197 if (evt.event === "delete") { 198 await supabase 199 .from("atp_poll_votes") 200 .delete() 201 .eq("uri", evt.uri.toString()); 202 } 203 } 204 if (evt.collection === ids.PubLeafletPollDefinition) { 205 if (evt.event === "create" || evt.event === "update") { 206 let record = PubLeafletPollDefinition.validateRecord(evt.record); 207 if (!record.success) return; 208 let { error } = await supabase.from("atp_poll_records").upsert({ 209 uri: evt.uri.toString(), 210 cid: evt.cid.toString(), 211 record: record.value as Json, 212 }); 213 if (error) console.log("Error upserting poll definition:", error); 214 } 215 if (evt.event === "delete") { 216 await supabase 217 .from("atp_poll_records") 218 .delete() 219 .eq("uri", evt.uri.toString()); 220 } 221 } 222 if (evt.collection === ids.PubLeafletInteractionsRecommend) { 223 if (evt.event === "create" || evt.event === "update") { 224 let record = PubLeafletInteractionsRecommend.validateRecord(evt.record); 225 if (!record.success) return; 226 await supabase 227 .from("identities") 228 .upsert({ atp_did: evt.did }, { onConflict: "atp_did" }); 229 let { error } = await supabase.from("recommends_on_documents").upsert({ 230 uri: evt.uri.toString(), 231 recommender_did: evt.did, 232 document: record.value.subject, 233 record: record.value as Json, 234 }); 235 if (error) console.log("Error upserting recommend:", error); 236 } 237 if (evt.event === "delete") { 238 await supabase 239 .from("recommends_on_documents") 240 .delete() 241 .eq("uri", evt.uri.toString()); 242 } 243 } 244 if (evt.collection === ids.PubLeafletGraphSubscription) { 245 if (evt.event === "create" || evt.event === "update") { 246 let record = PubLeafletGraphSubscription.validateRecord(evt.record); 247 if (!record.success) return; 248 await supabase 249 .from("identities") 250 .upsert({ atp_did: evt.did }, { onConflict: "atp_did" }); 251 await supabase.from("publication_subscriptions").upsert({ 252 uri: evt.uri.toString(), 253 identity: evt.did, 254 publication: record.value.publication, 255 record: record.value as Json, 256 }); 257 } 258 if (evt.event === "delete") { 259 await supabase 260 .from("publication_subscriptions") 261 .delete() 262 .eq("uri", evt.uri.toString()); 263 } 264 } 265 // site.standard.document records go into the main "documents" table 266 // The normalization layer handles reading both pub.leaflet and site.standard formats 267 if (evt.collection === ids.SiteStandardDocument) { 268 if (evt.event === "create" || evt.event === "update") { 269 let record = SiteStandardDocument.validateRecord(evt.record); 270 if (!record.success) { 271 console.log(record.error); 272 return; 273 } 274 let docResult = await supabase.from("documents").upsert({ 275 uri: evt.uri.toString(), 276 data: record.value as Json, 277 }); 278 if (docResult.error) console.log(docResult.error); 279 await inngest.send({ 280 name: "appview/sync-document-metadata", 281 data: { 282 document_uri: evt.uri.toString(), 283 bsky_post_uri: record.value.bskyPostRef?.uri, 284 }, 285 }); 286 287 // site.standard.document uses "site" field to reference the publication 288 // For documents in publications, site is an AT-URI (at://did:plc:xxx/site.standard.publication/rkey) 289 // For standalone documents, site is an HTTPS URL (https://leaflet.pub/p/did:plc:xxx) 290 // Only link to publications table for AT-URI sites 291 if (record.value.site && record.value.site.startsWith("at://")) { 292 let siteURI = new AtUri(record.value.site); 293 294 if (siteURI.host !== evt.uri.host) { 295 console.log("Unauthorized to create document in site!"); 296 return; 297 } 298 let docInPublicationResult = await supabase 299 .from("documents_in_publications") 300 .upsert({ 301 publication: record.value.site, 302 document: evt.uri.toString(), 303 }); 304 await supabase 305 .from("documents_in_publications") 306 .delete() 307 .neq("publication", record.value.site) 308 .eq("document", evt.uri.toString()); 309 310 if (docInPublicationResult.error) 311 console.log(docInPublicationResult.error); 312 } 313 } 314 if (evt.event === "delete") { 315 await supabase.from("documents").delete().eq("uri", evt.uri.toString()); 316 } 317 } 318 319 // site.standard.publication records go into the main "publications" table 320 if (evt.collection === ids.SiteStandardPublication) { 321 if (evt.event === "create" || evt.event === "update") { 322 let record = SiteStandardPublication.validateRecord(evt.record); 323 if (!record.success) return; 324 await supabase 325 .from("identities") 326 .upsert({ atp_did: evt.did }, { onConflict: "atp_did" }); 327 await supabase.from("publications").upsert({ 328 uri: evt.uri.toString(), 329 identity_did: evt.did, 330 name: record.value.name, 331 record: record.value as Json, 332 }); 333 } 334 if (evt.event === "delete") { 335 await supabase 336 .from("publications") 337 .delete() 338 .eq("uri", evt.uri.toString()); 339 } 340 } 341 342 // site.standard.graph.subscription records go into the main "publication_subscriptions" table 343 if (evt.collection === ids.SiteStandardGraphSubscription) { 344 if (evt.event === "create" || evt.event === "update") { 345 let record = SiteStandardGraphSubscription.validateRecord(evt.record); 346 if (!record.success) return; 347 await supabase 348 .from("identities") 349 .upsert({ atp_did: evt.did }, { onConflict: "atp_did" }); 350 await supabase.from("publication_subscriptions").upsert({ 351 uri: evt.uri.toString(), 352 identity: evt.did, 353 publication: record.value.publication, 354 record: record.value as Json, 355 }); 356 } 357 if (evt.event === "delete") { 358 await supabase 359 .from("publication_subscriptions") 360 .delete() 361 .eq("uri", evt.uri.toString()); 362 } 363 } 364 // if (evt.collection === ids.AppBskyActorProfile) { 365 // //only listen to updates because we should fetch it for the first time when they subscribe! 366 // if (evt.event === "update") { 367 // await supabaseServerClient 368 // .from("bsky_profiles") 369 // .update({ record: evt.record as Json }) 370 // .eq("did", evt.did); 371 // } 372 // } 373 if (evt.collection === "app.bsky.feed.post") { 374 if (evt.event !== "create") return; 375 376 // Early exit if no embed 377 if ( 378 !evt.record || 379 typeof evt.record !== "object" || 380 !("embed" in evt.record) 381 ) 382 return; 383 384 // Check if embed contains our quote param using optional chaining 385 const embedRecord = evt.record as any; 386 const hasQuoteParam = 387 embedRecord.embed?.external?.uri?.includes(QUOTE_PARAM) || 388 embedRecord.embed?.media?.external?.uri?.includes(QUOTE_PARAM); 389 390 if (!hasQuoteParam) return; 391 392 // Now validate the record since we know it contains our quote param 393 let record = AppBskyFeedPost.validateRecord(evt.record); 394 if (!record.success) { 395 console.log(record.error); 396 return; 397 } 398 399 let embed: string | null = null; 400 if ( 401 AppBskyEmbedExternal.isMain(record.value.embed) && 402 record.value.embed.external.uri.includes(QUOTE_PARAM) 403 ) { 404 embed = record.value.embed.external.uri; 405 } 406 if ( 407 AppBskyEmbedRecordWithMedia.isMain(record.value.embed) && 408 AppBskyEmbedExternal.isMain(record.value.embed.media) && 409 record.value.embed.media?.external?.uri.includes(QUOTE_PARAM) 410 ) { 411 embed = record.value.embed.media.external.uri; 412 } 413 if (embed) { 414 console.log( 415 "processing post mention: " + embed + " in " + evt.uri.toString(), 416 ); 417 await inngest.send({ 418 name: "appview/index-bsky-post-mention", 419 data: { post_uri: evt.uri.toString(), document_link: embed }, 420 }); 421 } 422 } 423}