a tool for shared writing and social publishing

don't persist cursor for now

+182 -199
+182 -199
appview/index.ts
··· 31 31 ); 32 32 const QUOTE_PARAM = "/l-quote/"; 33 33 async function main() { 34 - let startCursor; 35 - try { 36 - let file = (await readFile(cursorFile)).toString(); 37 - console.log("START CURSOR: " + file); 38 - startCursor = parseInt(file); 39 - console.log(startCursor); 40 - if (Number.isNaN(startCursor)) startCursor = undefined; 41 - } catch (e) {} 42 - 43 - async function handleEvent(evt: Event) { 44 - if (evt.event === "identity") { 45 - if (evt.handle) 46 - await supabase 47 - .from("bsky_profiles") 48 - .update({ handle: evt.handle }) 49 - .eq("did", evt.did); 50 - } 51 - if ( 52 - evt.event == "account" || 53 - evt.event === "identity" || 54 - evt.event === "sync" 55 - ) 56 - return; 57 - if (evt.collection !== "app.bsky.feed.post") 58 - console.log(`${evt.event} in ${evt.collection} @ ${evt.seq}: ${evt.uri}`); 59 - if (evt.collection === ids.PubLeafletDocument) { 60 - if (evt.event === "create" || evt.event === "update") { 61 - let record = PubLeafletDocument.validateRecord(evt.record); 62 - if (!record.success) { 63 - return; 64 - } 65 - await supabase.from("documents").upsert({ 66 - uri: evt.uri.toString(), 67 - data: record.value as Json, 68 - }); 69 - let publicationURI = new AtUri(record.value.publication); 70 - 71 - if (publicationURI.host !== evt.uri.host) { 72 - console.log("Unauthorized to create post!"); 73 - return; 74 - } 75 - await supabase.from("documents_in_publications").insert({ 76 - publication: record.value.publication, 77 - document: evt.uri.toString(), 78 - }); 79 - } 80 - if (evt.event === "delete") { 81 - await supabase.from("documents").delete().eq("uri", evt.uri.toString()); 82 - } 83 - } 84 - if (evt.collection === ids.PubLeafletPublication) { 85 - if (evt.event === "create" || evt.event === "update") { 86 - let record = PubLeafletPublication.validateRecord(evt.record); 87 - if (!record.success) return; 88 - let { error } = await supabase.from("publications").upsert({ 89 - uri: evt.uri.toString(), 90 - identity_did: evt.did, 91 - name: record.value.name, 92 - record: record.value as Json, 93 - }); 94 - 95 - if (error && error.code === "23503") { 96 - console.log("creating identity"); 97 - let client = new Client({ connectionString: process.env.DB_URL }); 98 - let db = drizzle(client); 99 - await createIdentity(db, { atp_did: evt.did }); 100 - client.end(); 101 - await supabase.from("publications").upsert({ 102 - uri: evt.uri.toString(), 103 - identity_did: evt.did, 104 - name: record.value.name, 105 - record: record.value as Json, 106 - }); 107 - } 108 - } 109 - if (evt.event === "delete") { 110 - await supabase 111 - .from("publications") 112 - .delete() 113 - .eq("uri", evt.uri.toString()); 114 - } 115 - } 116 - if (evt.collection === ids.PubLeafletComment) { 117 - if (evt.event === "create" || evt.event === "update") { 118 - let record = PubLeafletComment.validateRecord(evt.record); 119 - if (!record.success) return; 120 - let { error } = await supabase.from("comments_on_documents").upsert({ 121 - uri: evt.uri.toString(), 122 - profile: evt.did, 123 - document: record.value.subject, 124 - record: record.value as Json, 125 - }); 126 - } 127 - if (evt.event === "delete") { 128 - await supabase 129 - .from("comments_on_documents") 130 - .delete() 131 - .eq("uri", evt.uri.toString()); 132 - } 133 - } 134 - if (evt.collection === ids.PubLeafletGraphSubscription) { 135 - if (evt.event === "create" || evt.event === "update") { 136 - let record = PubLeafletGraphSubscription.validateRecord(evt.record); 137 - if (!record.success) return; 138 - let { error } = await supabase 139 - .from("publication_subscriptions") 140 - .upsert({ 141 - uri: evt.uri.toString(), 142 - identity: evt.did, 143 - publication: record.value.publication, 144 - record: record.value as Json, 145 - }); 146 - if (error && error.code === "23503") { 147 - console.log("creating identity"); 148 - let client = new Client({ connectionString: process.env.DB_URL }); 149 - let db = drizzle(client); 150 - await createIdentity(db, { atp_did: evt.did }); 151 - client.end(); 152 - await supabase.from("publication_subscriptions").upsert({ 153 - uri: evt.uri.toString(), 154 - identity: evt.did, 155 - publication: record.value.publication, 156 - record: record.value as Json, 157 - }); 158 - } 159 - } 160 - if (evt.event === "delete") { 161 - await supabase 162 - .from("publication_subscriptions") 163 - .delete() 164 - .eq("uri", evt.uri.toString()); 165 - } 166 - } 167 - // if (evt.collection === ids.AppBskyActorProfile) { 168 - // //only listen to updates because we should fetch it for the first time when they subscribe! 169 - // if (evt.event === "update") { 170 - // await supabaseServerClient 171 - // .from("bsky_profiles") 172 - // .update({ record: evt.record as Json }) 173 - // .eq("did", evt.did); 174 - // } 175 - // } 176 - if (evt.collection === "app.bsky.feed.post") { 177 - if (evt.event !== "create") return; 178 - 179 - // Early exit if no embed 180 - if ( 181 - !evt.record || 182 - typeof evt.record !== "object" || 183 - !("embed" in evt.record) 184 - ) 185 - return; 186 - 187 - // Check if embed contains our quote param using optional chaining 188 - const embedRecord = evt.record as any; 189 - const hasQuoteParam = 190 - embedRecord.embed?.external?.uri?.includes(QUOTE_PARAM) || 191 - embedRecord.embed?.media?.external?.uri?.includes(QUOTE_PARAM); 192 - 193 - if (!hasQuoteParam) return; 194 - console.log("FOUND EMBED!!!"); 195 - 196 - // Now validate the record since we know it contains our quote param 197 - let record = AppBskyFeedPost.validateRecord(evt.record); 198 - if (!record.success) return; 199 - 200 - let embed: string | null = null; 201 - if ( 202 - AppBskyEmbedExternal.isMain(record.value.embed) && 203 - record.value.embed.external.uri.includes(QUOTE_PARAM) 204 - ) { 205 - embed = record.value.embed.external.uri; 206 - } 207 - if ( 208 - AppBskyEmbedRecordWithMedia.isMain(record.value.embed) && 209 - AppBskyEmbedExternal.isMain(record.value.embed.media) && 210 - record.value.embed.media?.external?.uri.includes(QUOTE_PARAM) 211 - ) { 212 - embed = record.value.embed.media.external.uri; 213 - } 214 - if (embed) { 215 - console.log( 216 - "processing post mention: " + embed + " in " + evt.uri.toString(), 217 - ); 218 - await inngest.send({ 219 - name: "appview/index-bsky-post-mention", 220 - data: { post_uri: evt.uri.toString(), document_link: embed }, 221 - }); 222 - } 223 - } 224 - } 225 - 226 - const runner = new MemoryRunner({ 227 - startCursor, 228 - setCursor: async (cursor) => { 229 - // persist cursor 230 - await writeFile(cursorFile, cursor.toString(), { flush: true }); 231 - }, 232 - }); 34 + const runner = new MemoryRunner({}); 233 35 let firehose = new Firehose({ 234 36 service: "wss://relay1.us-west.bsky.network", 235 37 subscriptionReconnectDelay: 3000, ··· 267 69 } 268 70 269 71 main(); 72 + 73 + async function handleEvent(evt: Event) { 74 + if (evt.event === "identity") { 75 + if (evt.handle) 76 + await supabase 77 + .from("bsky_profiles") 78 + .update({ handle: evt.handle }) 79 + .eq("did", evt.did); 80 + } 81 + if ( 82 + evt.event == "account" || 83 + evt.event === "identity" || 84 + evt.event === "sync" 85 + ) 86 + return; 87 + if (evt.collection !== "app.bsky.feed.post") 88 + console.log(`${evt.event} in ${evt.collection} @ ${evt.seq}: ${evt.uri}`); 89 + if (evt.collection === ids.PubLeafletDocument) { 90 + if (evt.event === "create" || evt.event === "update") { 91 + let record = PubLeafletDocument.validateRecord(evt.record); 92 + if (!record.success) { 93 + return; 94 + } 95 + await supabase.from("documents").upsert({ 96 + uri: evt.uri.toString(), 97 + data: record.value as Json, 98 + }); 99 + let publicationURI = new AtUri(record.value.publication); 100 + 101 + if (publicationURI.host !== evt.uri.host) { 102 + console.log("Unauthorized to create post!"); 103 + return; 104 + } 105 + await supabase.from("documents_in_publications").insert({ 106 + publication: record.value.publication, 107 + document: evt.uri.toString(), 108 + }); 109 + } 110 + if (evt.event === "delete") { 111 + await supabase.from("documents").delete().eq("uri", evt.uri.toString()); 112 + } 113 + } 114 + if (evt.collection === ids.PubLeafletPublication) { 115 + if (evt.event === "create" || evt.event === "update") { 116 + let record = PubLeafletPublication.validateRecord(evt.record); 117 + if (!record.success) return; 118 + let { error } = await supabase.from("publications").upsert({ 119 + uri: evt.uri.toString(), 120 + identity_did: evt.did, 121 + name: record.value.name, 122 + record: record.value as Json, 123 + }); 124 + 125 + if (error && error.code === "23503") { 126 + console.log("creating identity"); 127 + let client = new Client({ connectionString: process.env.DB_URL }); 128 + let db = drizzle(client); 129 + await createIdentity(db, { atp_did: evt.did }); 130 + client.end(); 131 + await supabase.from("publications").upsert({ 132 + uri: evt.uri.toString(), 133 + identity_did: evt.did, 134 + name: record.value.name, 135 + record: record.value as Json, 136 + }); 137 + } 138 + } 139 + if (evt.event === "delete") { 140 + await supabase 141 + .from("publications") 142 + .delete() 143 + .eq("uri", evt.uri.toString()); 144 + } 145 + } 146 + if (evt.collection === ids.PubLeafletComment) { 147 + if (evt.event === "create" || evt.event === "update") { 148 + let record = PubLeafletComment.validateRecord(evt.record); 149 + if (!record.success) return; 150 + let { error } = await supabase.from("comments_on_documents").upsert({ 151 + uri: evt.uri.toString(), 152 + profile: evt.did, 153 + document: record.value.subject, 154 + record: record.value as Json, 155 + }); 156 + } 157 + if (evt.event === "delete") { 158 + await supabase 159 + .from("comments_on_documents") 160 + .delete() 161 + .eq("uri", evt.uri.toString()); 162 + } 163 + } 164 + if (evt.collection === ids.PubLeafletGraphSubscription) { 165 + if (evt.event === "create" || evt.event === "update") { 166 + let record = PubLeafletGraphSubscription.validateRecord(evt.record); 167 + if (!record.success) return; 168 + let { error } = await supabase.from("publication_subscriptions").upsert({ 169 + uri: evt.uri.toString(), 170 + identity: evt.did, 171 + publication: record.value.publication, 172 + record: record.value as Json, 173 + }); 174 + if (error && error.code === "23503") { 175 + console.log("creating identity"); 176 + let client = new Client({ connectionString: process.env.DB_URL }); 177 + let db = drizzle(client); 178 + await createIdentity(db, { atp_did: evt.did }); 179 + client.end(); 180 + await supabase.from("publication_subscriptions").upsert({ 181 + uri: evt.uri.toString(), 182 + identity: evt.did, 183 + publication: record.value.publication, 184 + record: record.value as Json, 185 + }); 186 + } 187 + } 188 + if (evt.event === "delete") { 189 + await supabase 190 + .from("publication_subscriptions") 191 + .delete() 192 + .eq("uri", evt.uri.toString()); 193 + } 194 + } 195 + // if (evt.collection === ids.AppBskyActorProfile) { 196 + // //only listen to updates because we should fetch it for the first time when they subscribe! 197 + // if (evt.event === "update") { 198 + // await supabaseServerClient 199 + // .from("bsky_profiles") 200 + // .update({ record: evt.record as Json }) 201 + // .eq("did", evt.did); 202 + // } 203 + // } 204 + if (evt.collection === "app.bsky.feed.post") { 205 + if (evt.event !== "create") return; 206 + 207 + // Early exit if no embed 208 + if ( 209 + !evt.record || 210 + typeof evt.record !== "object" || 211 + !("embed" in evt.record) 212 + ) 213 + return; 214 + 215 + // Check if embed contains our quote param using optional chaining 216 + const embedRecord = evt.record as any; 217 + const hasQuoteParam = 218 + embedRecord.embed?.external?.uri?.includes(QUOTE_PARAM) || 219 + embedRecord.embed?.media?.external?.uri?.includes(QUOTE_PARAM); 220 + 221 + if (!hasQuoteParam) return; 222 + console.log("FOUND EMBED!!!"); 223 + 224 + // Now validate the record since we know it contains our quote param 225 + let record = AppBskyFeedPost.validateRecord(evt.record); 226 + if (!record.success) return; 227 + 228 + let embed: string | null = null; 229 + if ( 230 + AppBskyEmbedExternal.isMain(record.value.embed) && 231 + record.value.embed.external.uri.includes(QUOTE_PARAM) 232 + ) { 233 + embed = record.value.embed.external.uri; 234 + } 235 + if ( 236 + AppBskyEmbedRecordWithMedia.isMain(record.value.embed) && 237 + AppBskyEmbedExternal.isMain(record.value.embed.media) && 238 + record.value.embed.media?.external?.uri.includes(QUOTE_PARAM) 239 + ) { 240 + embed = record.value.embed.media.external.uri; 241 + } 242 + if (embed) { 243 + console.log( 244 + "processing post mention: " + embed + " in " + evt.uri.toString(), 245 + ); 246 + await inngest.send({ 247 + name: "appview/index-bsky-post-mention", 248 + data: { post_uri: evt.uri.toString(), document_link: embed }, 249 + }); 250 + } 251 + } 252 + }