a tool for shared writing and social publishing
1import { supabaseServerClient } from "supabase/serverClient";
2import { inngest } from "../client";
3import { restoreOAuthSession } from "src/atproto-oauth";
4import { AtpBaseClient, SiteStandardDocument } from "lexicons/api";
5import { AtUri } from "@atproto/syntax";
6import { Json } from "supabase/database.types";
7
8async function createAuthenticatedAgent(did: string): Promise<AtpBaseClient> {
9 const result = await restoreOAuthSession(did);
10 if (!result.ok) {
11 throw new Error(`Failed to restore OAuth session: ${result.error.message}`);
12 }
13 const credentialSession = result.value;
14 return new AtpBaseClient(
15 credentialSession.fetchHandler.bind(credentialSession),
16 );
17}
18
19/**
20 * Fixes site.standard.document records that have stale pub.leaflet.publication
21 * references in their site field. Updates both the PDS record and database.
22 */
23export const fix_standard_document_publications = inngest.createFunction(
24 { id: "fix_standard_document_publications" },
25 { event: "documents/fix-publication-references" },
26 async ({ event, step }) => {
27 const { documentUris } = event.data as { documentUris: string[] };
28
29 const stats = {
30 documentsFixed: 0,
31 joinEntriesFixed: 0,
32 errors: [] as string[],
33 };
34
35 if (!documentUris || documentUris.length === 0) {
36 return { success: true, stats, message: "No documents to fix" };
37 }
38
39 // Group documents by DID (author) for efficient OAuth session handling
40 const docsByDid = new Map<string, string[]>();
41 for (const uri of documentUris) {
42 try {
43 const aturi = new AtUri(uri);
44 const did = aturi.hostname;
45 const existing = docsByDid.get(did) || [];
46 existing.push(uri);
47 docsByDid.set(did, existing);
48 } catch (e) {
49 stats.errors.push(`Invalid URI: ${uri}`);
50 }
51 }
52
53 // Process each DID's documents
54 for (const [did, uris] of docsByDid) {
55 // Verify OAuth session for this user
56 const oauthValid = await step.run(
57 `verify-oauth-${did.slice(-8)}`,
58 async () => {
59 const result = await restoreOAuthSession(did);
60 return result.ok;
61 },
62 );
63
64 if (!oauthValid) {
65 stats.errors.push(`No valid OAuth session for ${did}`);
66 continue;
67 }
68
69 // Fix each document
70 for (const docUri of uris) {
71 const result = await step.run(
72 `fix-doc-${docUri.slice(-12)}`,
73 async () => {
74 // Fetch the document
75 const { data: doc, error: fetchError } = await supabaseServerClient
76 .from("documents")
77 .select("uri, data")
78 .eq("uri", docUri)
79 .single();
80
81 if (fetchError || !doc) {
82 return {
83 success: false as const,
84 error: `Document not found: ${fetchError?.message || "no data"}`,
85 };
86 }
87
88 const data = doc.data as SiteStandardDocument.Record;
89 const oldSite = data?.site;
90
91 if (!oldSite || !oldSite.includes("/pub.leaflet.publication/")) {
92 return {
93 success: false as const,
94 error: "Document does not have a pub.leaflet.publication site reference",
95 };
96 }
97
98 // Convert to new publication URI
99 const oldPubAturi = new AtUri(oldSite);
100 const newSite = `at://${oldPubAturi.hostname}/site.standard.publication/${oldPubAturi.rkey}`;
101
102 // Update the record
103 const updatedRecord: SiteStandardDocument.Record = {
104 ...data,
105 site: newSite,
106 };
107
108 // Write to PDS
109 const docAturi = new AtUri(docUri);
110 const agent = await createAuthenticatedAgent(did);
111 await agent.com.atproto.repo.putRecord({
112 repo: did,
113 collection: "site.standard.document",
114 rkey: docAturi.rkey,
115 record: updatedRecord,
116 validate: false,
117 });
118
119 // Update database
120 const { error: dbError } = await supabaseServerClient
121 .from("documents")
122 .update({ data: updatedRecord as Json })
123 .eq("uri", docUri);
124
125 if (dbError) {
126 return {
127 success: false as const,
128 error: `Database update failed: ${dbError.message}`,
129 };
130 }
131
132 return {
133 success: true as const,
134 oldSite,
135 newSite,
136 };
137 },
138 );
139
140 if (result.success) {
141 stats.documentsFixed++;
142
143 // Fix the documents_in_publications entry
144 const joinResult = await step.run(
145 `fix-join-${docUri.slice(-12)}`,
146 async () => {
147 // Find the publication URI that exists in the database
148 const { data: doc } = await supabaseServerClient
149 .from("documents")
150 .select("data")
151 .eq("uri", docUri)
152 .single();
153
154 const newSite = (doc?.data as any)?.site;
155 if (!newSite) {
156 return { success: false as const, error: "Could not read updated document" };
157 }
158
159 // Check which publication URI exists
160 const newPubAturi = new AtUri(newSite);
161 const oldPubUri = `at://${newPubAturi.hostname}/pub.leaflet.publication/${newPubAturi.rkey}`;
162
163 const { data: pubs } = await supabaseServerClient
164 .from("publications")
165 .select("uri")
166 .in("uri", [newSite, oldPubUri]);
167
168 const existingPubUri = pubs?.find((p) => p.uri === newSite)?.uri ||
169 pubs?.find((p) => p.uri === oldPubUri)?.uri;
170
171 if (!existingPubUri) {
172 return { success: false as const, error: "No matching publication found" };
173 }
174
175 // Delete any existing entries for this document
176 await supabaseServerClient
177 .from("documents_in_publications")
178 .delete()
179 .eq("document", docUri);
180
181 // Insert the correct entry
182 const { error: insertError } = await supabaseServerClient
183 .from("documents_in_publications")
184 .insert({
185 document: docUri,
186 publication: existingPubUri,
187 });
188
189 if (insertError) {
190 return { success: false as const, error: insertError.message };
191 }
192
193 return { success: true as const, publication: existingPubUri };
194 },
195 );
196
197 if (joinResult.success) {
198 stats.joinEntriesFixed++;
199 } else {
200 stats.errors.push(`Join table fix failed for ${docUri}: ${"error" in joinResult ? joinResult.error : "unknown error"}`);
201 }
202 } else {
203 stats.errors.push(`${docUri}: ${result.error}`);
204 }
205 }
206 }
207
208 return {
209 success: stats.errors.length === 0,
210 stats,
211 };
212 },
213);