a tool for shared writing and social publishing
at update/thread-viewer 213 lines 7.1 kB view raw
1import { supabaseServerClient } from "supabase/serverClient"; 2import { inngest } from "../client"; 3import { restoreOAuthSession } from "src/atproto-oauth"; 4import { AtpBaseClient, SiteStandardDocument } from "lexicons/api"; 5import { AtUri } from "@atproto/syntax"; 6import { Json } from "supabase/database.types"; 7 8async function createAuthenticatedAgent(did: string): Promise<AtpBaseClient> { 9 const result = await restoreOAuthSession(did); 10 if (!result.ok) { 11 throw new Error(`Failed to restore OAuth session: ${result.error.message}`); 12 } 13 const credentialSession = result.value; 14 return new AtpBaseClient( 15 credentialSession.fetchHandler.bind(credentialSession), 16 ); 17} 18 19/** 20 * Fixes site.standard.document records that have stale pub.leaflet.publication 21 * references in their site field. Updates both the PDS record and database. 22 */ 23export const fix_standard_document_publications = inngest.createFunction( 24 { id: "fix_standard_document_publications" }, 25 { event: "documents/fix-publication-references" }, 26 async ({ event, step }) => { 27 const { documentUris } = event.data as { documentUris: string[] }; 28 29 const stats = { 30 documentsFixed: 0, 31 joinEntriesFixed: 0, 32 errors: [] as string[], 33 }; 34 35 if (!documentUris || documentUris.length === 0) { 36 return { success: true, stats, message: "No documents to fix" }; 37 } 38 39 // Group documents by DID (author) for efficient OAuth session handling 40 const docsByDid = new Map<string, string[]>(); 41 for (const uri of documentUris) { 42 try { 43 const aturi = new AtUri(uri); 44 const did = aturi.hostname; 45 const existing = docsByDid.get(did) || []; 46 existing.push(uri); 47 docsByDid.set(did, existing); 48 } catch (e) { 49 stats.errors.push(`Invalid URI: ${uri}`); 50 } 51 } 52 53 // Process each DID's documents 54 for (const [did, uris] of docsByDid) { 55 // Verify OAuth session for this user 56 const oauthValid = await step.run( 57 `verify-oauth-${did.slice(-8)}`, 58 async () => { 59 const result = await restoreOAuthSession(did); 60 return result.ok; 61 }, 62 ); 63 64 if (!oauthValid) { 65 stats.errors.push(`No valid OAuth session for ${did}`); 66 continue; 67 } 68 69 // Fix each document 70 for (const docUri of uris) { 71 const result = await step.run( 72 `fix-doc-${docUri.slice(-12)}`, 73 async () => { 74 // Fetch the document 75 const { data: doc, error: fetchError } = await supabaseServerClient 76 .from("documents") 77 .select("uri, data") 78 .eq("uri", docUri) 79 .single(); 80 81 if (fetchError || !doc) { 82 return { 83 success: false as const, 84 error: `Document not found: ${fetchError?.message || "no data"}`, 85 }; 86 } 87 88 const data = doc.data as SiteStandardDocument.Record; 89 const oldSite = data?.site; 90 91 if (!oldSite || !oldSite.includes("/pub.leaflet.publication/")) { 92 return { 93 success: false as const, 94 error: "Document does not have a pub.leaflet.publication site reference", 95 }; 96 } 97 98 // Convert to new publication URI 99 const oldPubAturi = new AtUri(oldSite); 100 const newSite = `at://${oldPubAturi.hostname}/site.standard.publication/${oldPubAturi.rkey}`; 101 102 // Update the record 103 const updatedRecord: SiteStandardDocument.Record = { 104 ...data, 105 site: newSite, 106 }; 107 108 // Write to PDS 109 const docAturi = new AtUri(docUri); 110 const agent = await createAuthenticatedAgent(did); 111 await agent.com.atproto.repo.putRecord({ 112 repo: did, 113 collection: "site.standard.document", 114 rkey: docAturi.rkey, 115 record: updatedRecord, 116 validate: false, 117 }); 118 119 // Update database 120 const { error: dbError } = await supabaseServerClient 121 .from("documents") 122 .update({ data: updatedRecord as Json }) 123 .eq("uri", docUri); 124 125 if (dbError) { 126 return { 127 success: false as const, 128 error: `Database update failed: ${dbError.message}`, 129 }; 130 } 131 132 return { 133 success: true as const, 134 oldSite, 135 newSite, 136 }; 137 }, 138 ); 139 140 if (result.success) { 141 stats.documentsFixed++; 142 143 // Fix the documents_in_publications entry 144 const joinResult = await step.run( 145 `fix-join-${docUri.slice(-12)}`, 146 async () => { 147 // Find the publication URI that exists in the database 148 const { data: doc } = await supabaseServerClient 149 .from("documents") 150 .select("data") 151 .eq("uri", docUri) 152 .single(); 153 154 const newSite = (doc?.data as any)?.site; 155 if (!newSite) { 156 return { success: false as const, error: "Could not read updated document" }; 157 } 158 159 // Check which publication URI exists 160 const newPubAturi = new AtUri(newSite); 161 const oldPubUri = `at://${newPubAturi.hostname}/pub.leaflet.publication/${newPubAturi.rkey}`; 162 163 const { data: pubs } = await supabaseServerClient 164 .from("publications") 165 .select("uri") 166 .in("uri", [newSite, oldPubUri]); 167 168 const existingPubUri = pubs?.find((p) => p.uri === newSite)?.uri || 169 pubs?.find((p) => p.uri === oldPubUri)?.uri; 170 171 if (!existingPubUri) { 172 return { success: false as const, error: "No matching publication found" }; 173 } 174 175 // Delete any existing entries for this document 176 await supabaseServerClient 177 .from("documents_in_publications") 178 .delete() 179 .eq("document", docUri); 180 181 // Insert the correct entry 182 const { error: insertError } = await supabaseServerClient 183 .from("documents_in_publications") 184 .insert({ 185 document: docUri, 186 publication: existingPubUri, 187 }); 188 189 if (insertError) { 190 return { success: false as const, error: insertError.message }; 191 } 192 193 return { success: true as const, publication: existingPubUri }; 194 }, 195 ); 196 197 if (joinResult.success) { 198 stats.joinEntriesFixed++; 199 } else { 200 stats.errors.push(`Join table fix failed for ${docUri}: ${"error" in joinResult ? joinResult.error : "unknown error"}`); 201 } 202 } else { 203 stats.errors.push(`${docUri}: ${result.error}`); 204 } 205 } 206 } 207 208 return { 209 success: stats.errors.length === 0, 210 stats, 211 }; 212 }, 213);