a tool for shared writing and social publishing
at feature/reader 260 lines 8.3 kB view raw
1import { createClient } from "@supabase/supabase-js"; 2import { Database, Json } from "supabase/database.types"; 3import { IdResolver } from "@atproto/identity"; 4const idResolver = new IdResolver(); 5import { Firehose, MemoryRunner, Event } from "@atproto/sync"; 6import { ids } from "lexicons/api/lexicons"; 7import { 8 PubLeafletDocument, 9 PubLeafletGraphSubscription, 10 PubLeafletPublication, 11 PubLeafletComment, 12} from "lexicons/api"; 13import { 14 AppBskyEmbedExternal, 15 AppBskyEmbedRecordWithMedia, 16 AppBskyFeedPost, 17 AppBskyRichtextFacet, 18} from "@atproto/api"; 19import { AtUri } from "@atproto/syntax"; 20import { writeFile, readFile } from "fs/promises"; 21import { createIdentity } from "actions/createIdentity"; 22import { drizzle } from "drizzle-orm/node-postgres"; 23import { inngest } from "app/api/inngest/client"; 24import { Client } from "pg"; 25 26const cursorFile = process.env.CURSOR_FILE || "/cursor/cursor"; 27 28let supabase = createClient<Database>( 29 process.env.NEXT_PUBLIC_SUPABASE_API_URL as string, 30 process.env.SUPABASE_SERVICE_ROLE_KEY as string, 31); 32const QUOTE_PARAM = "/l-quote/"; 33async function main() { 34 const runner = new MemoryRunner({}); 35 let firehose = new Firehose({ 36 service: "wss://relay1.us-west.bsky.network", 37 subscriptionReconnectDelay: 3000, 38 excludeAccount: true, 39 excludeIdentity: true, 40 runner, 41 idResolver, 42 filterCollections: [ 43 ids.PubLeafletDocument, 44 ids.PubLeafletPublication, 45 ids.PubLeafletGraphSubscription, 46 ids.PubLeafletComment, 47 // ids.AppBskyActorProfile, 48 "app.bsky.feed.post", 49 ], 50 handleEvent, 51 onError: (err) => { 52 console.error(err); 53 }, 54 }); 55 console.log("starting firehose consumer"); 56 firehose.start(); 57 let cleaningUp = false; 58 const cleanup = async () => { 59 if (cleaningUp) return; 60 cleaningUp = true; 61 console.log("shutting down firehose..."); 62 await firehose.destroy(); 63 await runner.destroy(); 64 process.exit(); 65 }; 66 67 process.on("SIGINT", cleanup); 68 process.on("SIGTERM", cleanup); 69} 70 71main(); 72 73async function handleEvent(evt: Event) { 74 if (evt.event === "identity") { 75 if (evt.handle) 76 await supabase 77 .from("bsky_profiles") 78 .update({ handle: evt.handle }) 79 .eq("did", evt.did); 80 } 81 if ( 82 evt.event == "account" || 83 evt.event === "identity" || 84 evt.event === "sync" 85 ) 86 return; 87 if (evt.collection !== "app.bsky.feed.post") 88 console.log( 89 `${evt.event} in ${evt.collection} ${evt.uri}: ${evt.seq} ${evt.time}`, 90 ); 91 if (evt.collection === ids.PubLeafletDocument) { 92 if (evt.event === "create" || evt.event === "update") { 93 let record = PubLeafletDocument.validateRecord(evt.record); 94 if (!record.success) { 95 console.log(record.error); 96 return; 97 } 98 let docResult = await supabase.from("documents").upsert({ 99 uri: evt.uri.toString(), 100 data: record.value as Json, 101 }); 102 if (docResult.error) console.log(docResult.error); 103 let publicationURI = new AtUri(record.value.publication); 104 105 if (publicationURI.host !== evt.uri.host) { 106 console.log("Unauthorized to create post!"); 107 return; 108 } 109 let docInPublicationResult = await supabase 110 .from("documents_in_publications") 111 .upsert({ 112 publication: record.value.publication, 113 document: evt.uri.toString(), 114 }); 115 if (docInPublicationResult.error) 116 console.log(docInPublicationResult.error); 117 } 118 if (evt.event === "delete") { 119 await supabase.from("documents").delete().eq("uri", evt.uri.toString()); 120 } 121 } 122 if (evt.collection === ids.PubLeafletPublication) { 123 if (evt.event === "create" || evt.event === "update") { 124 let record = PubLeafletPublication.validateRecord(evt.record); 125 if (!record.success) return; 126 let { error } = await supabase.from("publications").upsert({ 127 uri: evt.uri.toString(), 128 identity_did: evt.did, 129 name: record.value.name, 130 record: record.value as Json, 131 }); 132 133 if (error && error.code === "23503") { 134 console.log("creating identity"); 135 let client = new Client({ connectionString: process.env.DB_URL }); 136 let db = drizzle(client); 137 await createIdentity(db, { atp_did: evt.did }); 138 client.end(); 139 await supabase.from("publications").upsert({ 140 uri: evt.uri.toString(), 141 identity_did: evt.did, 142 name: record.value.name, 143 record: record.value as Json, 144 }); 145 } 146 } 147 if (evt.event === "delete") { 148 await supabase 149 .from("publications") 150 .delete() 151 .eq("uri", evt.uri.toString()); 152 } 153 } 154 if (evt.collection === ids.PubLeafletComment) { 155 if (evt.event === "create" || evt.event === "update") { 156 let record = PubLeafletComment.validateRecord(evt.record); 157 if (!record.success) return; 158 let { error } = await supabase.from("comments_on_documents").upsert({ 159 uri: evt.uri.toString(), 160 profile: evt.did, 161 document: record.value.subject, 162 record: record.value as Json, 163 }); 164 } 165 if (evt.event === "delete") { 166 await supabase 167 .from("comments_on_documents") 168 .delete() 169 .eq("uri", evt.uri.toString()); 170 } 171 } 172 if (evt.collection === ids.PubLeafletGraphSubscription) { 173 if (evt.event === "create" || evt.event === "update") { 174 let record = PubLeafletGraphSubscription.validateRecord(evt.record); 175 if (!record.success) return; 176 let { error } = await supabase.from("publication_subscriptions").upsert({ 177 uri: evt.uri.toString(), 178 identity: evt.did, 179 publication: record.value.publication, 180 record: record.value as Json, 181 }); 182 if (error && error.code === "23503") { 183 console.log("creating identity"); 184 let client = new Client({ connectionString: process.env.DB_URL }); 185 let db = drizzle(client); 186 await createIdentity(db, { atp_did: evt.did }); 187 client.end(); 188 await supabase.from("publication_subscriptions").upsert({ 189 uri: evt.uri.toString(), 190 identity: evt.did, 191 publication: record.value.publication, 192 record: record.value as Json, 193 }); 194 } 195 } 196 if (evt.event === "delete") { 197 await supabase 198 .from("publication_subscriptions") 199 .delete() 200 .eq("uri", evt.uri.toString()); 201 } 202 } 203 // if (evt.collection === ids.AppBskyActorProfile) { 204 // //only listen to updates because we should fetch it for the first time when they subscribe! 205 // if (evt.event === "update") { 206 // await supabaseServerClient 207 // .from("bsky_profiles") 208 // .update({ record: evt.record as Json }) 209 // .eq("did", evt.did); 210 // } 211 // } 212 if (evt.collection === "app.bsky.feed.post") { 213 if (evt.event !== "create") return; 214 215 // Early exit if no embed 216 if ( 217 !evt.record || 218 typeof evt.record !== "object" || 219 !("embed" in evt.record) 220 ) 221 return; 222 223 // Check if embed contains our quote param using optional chaining 224 const embedRecord = evt.record as any; 225 const hasQuoteParam = 226 embedRecord.embed?.external?.uri?.includes(QUOTE_PARAM) || 227 embedRecord.embed?.media?.external?.uri?.includes(QUOTE_PARAM); 228 229 if (!hasQuoteParam) return; 230 console.log("FOUND EMBED!!!"); 231 232 // Now validate the record since we know it contains our quote param 233 let record = AppBskyFeedPost.validateRecord(evt.record); 234 if (!record.success) return; 235 236 let embed: string | null = null; 237 if ( 238 AppBskyEmbedExternal.isMain(record.value.embed) && 239 record.value.embed.external.uri.includes(QUOTE_PARAM) 240 ) { 241 embed = record.value.embed.external.uri; 242 } 243 if ( 244 AppBskyEmbedRecordWithMedia.isMain(record.value.embed) && 245 AppBskyEmbedExternal.isMain(record.value.embed.media) && 246 record.value.embed.media?.external?.uri.includes(QUOTE_PARAM) 247 ) { 248 embed = record.value.embed.media.external.uri; 249 } 250 if (embed) { 251 console.log( 252 "processing post mention: " + embed + " in " + evt.uri.toString(), 253 ); 254 await inngest.send({ 255 name: "appview/index-bsky-post-mention", 256 data: { post_uri: evt.uri.toString(), document_link: embed }, 257 }); 258 } 259 } 260}