tangled
alpha
login
or
join now
leaflet.pub
/
leaflet
289
fork
atom
a tool for shared writing and social publishing
289
fork
atom
overview
issues
27
pulls
pipelines
add fix incorrect site values function
awarm.space
1 month ago
70e6083c
ff2ed113
+307
3 changed files
expand all
collapse all
unified
split
app
api
inngest
client.ts
functions
fix_incorrect_site_values.ts
route.tsx
+5
app/api/inngest/client.ts
···
41
41
documentUris: string[];
42
42
};
43
43
};
44
44
+
"documents/fix-incorrect-site-values": {
45
45
+
data: {
46
46
+
did: string;
47
47
+
};
48
48
+
};
44
49
};
45
50
46
51
// Create a client to send and receive events
+300
app/api/inngest/functions/fix_incorrect_site_values.ts
···
1
1
+
import { supabaseServerClient } from "supabase/serverClient";
2
2
+
import { inngest } from "../client";
3
3
+
import { restoreOAuthSession } from "src/atproto-oauth";
4
4
+
import { AtpBaseClient, SiteStandardDocument } from "lexicons/api";
5
5
+
import { AtUri } from "@atproto/syntax";
6
6
+
import { Json } from "supabase/database.types";
7
7
+
8
8
+
async function createAuthenticatedAgent(did: string): Promise<AtpBaseClient> {
9
9
+
const result = await restoreOAuthSession(did);
10
10
+
if (!result.ok) {
11
11
+
throw new Error(`Failed to restore OAuth session: ${result.error.message}`);
12
12
+
}
13
13
+
const credentialSession = result.value;
14
14
+
return new AtpBaseClient(
15
15
+
credentialSession.fetchHandler.bind(credentialSession),
16
16
+
);
17
17
+
}
18
18
+
19
19
+
/**
20
20
+
* Build set of valid site values for a publication.
21
21
+
* A site value is valid if it matches the publication or its legacy equivalent.
22
22
+
*/
23
23
+
function buildValidSiteValues(pubUri: string): Set<string> {
24
24
+
const validValues = new Set<string>([pubUri]);
25
25
+
26
26
+
try {
27
27
+
const aturi = new AtUri(pubUri);
28
28
+
29
29
+
if (pubUri.includes("/site.standard.publication/")) {
30
30
+
// Also accept legacy pub.leaflet.publication
31
31
+
validValues.add(
32
32
+
`at://${aturi.hostname}/pub.leaflet.publication/${aturi.rkey}`,
33
33
+
);
34
34
+
} else if (pubUri.includes("/pub.leaflet.publication/")) {
35
35
+
// Also accept new site.standard.publication
36
36
+
validValues.add(
37
37
+
`at://${aturi.hostname}/site.standard.publication/${aturi.rkey}`,
38
38
+
);
39
39
+
}
40
40
+
} catch (e) {
41
41
+
// Invalid URI, just use the original
42
42
+
}
43
43
+
44
44
+
return validValues;
45
45
+
}
46
46
+
47
47
+
/**
48
48
+
* This function finds and fixes documents that have incorrect site values.
49
49
+
* A document has an incorrect site value if its `site` field doesn't match
50
50
+
* the publication it belongs to (via documents_in_publications).
51
51
+
*
52
52
+
* Takes a DID as input and processes publications owned by that identity.
53
53
+
*/
54
54
+
export const fix_incorrect_site_values = inngest.createFunction(
55
55
+
{ id: "fix_incorrect_site_values" },
56
56
+
{ event: "documents/fix-incorrect-site-values" },
57
57
+
async ({ event, step }) => {
58
58
+
const { did } = event.data;
59
59
+
60
60
+
const stats = {
61
61
+
publicationsChecked: 0,
62
62
+
documentsChecked: 0,
63
63
+
documentsWithIncorrectSite: 0,
64
64
+
documentsFixed: 0,
65
65
+
documentsMissingSite: 0,
66
66
+
errors: [] as string[],
67
67
+
};
68
68
+
69
69
+
// Step 1: Get all publications owned by this identity
70
70
+
const publications = await step.run("fetch-publications", async () => {
71
71
+
const { data, error } = await supabaseServerClient
72
72
+
.from("publications")
73
73
+
.select("uri")
74
74
+
.eq("identity_did", did);
75
75
+
76
76
+
if (error) {
77
77
+
throw new Error(`Failed to fetch publications: ${error.message}`);
78
78
+
}
79
79
+
return data || [];
80
80
+
});
81
81
+
82
82
+
stats.publicationsChecked = publications.length;
83
83
+
84
84
+
if (publications.length === 0) {
85
85
+
return {
86
86
+
success: true,
87
87
+
message: "No publications found for this identity",
88
88
+
stats,
89
89
+
};
90
90
+
}
91
91
+
92
92
+
// Step 2: Get all documents_in_publications entries for these publications
93
93
+
const publicationUris = publications.map((p) => p.uri);
94
94
+
95
95
+
const joinEntries = await step.run(
96
96
+
"fetch-documents-in-publications",
97
97
+
async () => {
98
98
+
const { data, error } = await supabaseServerClient
99
99
+
.from("documents_in_publications")
100
100
+
.select("document, publication")
101
101
+
.in("publication", publicationUris);
102
102
+
103
103
+
if (error) {
104
104
+
throw new Error(
105
105
+
`Failed to fetch documents_in_publications: ${error.message}`,
106
106
+
);
107
107
+
}
108
108
+
return data || [];
109
109
+
},
110
110
+
);
111
111
+
112
112
+
if (joinEntries.length === 0) {
113
113
+
return {
114
114
+
success: true,
115
115
+
message: "No documents found in publications",
116
116
+
stats,
117
117
+
};
118
118
+
}
119
119
+
120
120
+
// Create a map of document URI -> expected publication URI
121
121
+
const documentToPublication = new Map<string, string>();
122
122
+
for (const row of joinEntries) {
123
123
+
documentToPublication.set(row.document, row.publication);
124
124
+
}
125
125
+
126
126
+
// Step 3: Fetch all document records
127
127
+
const documentUris = Array.from(documentToPublication.keys());
128
128
+
129
129
+
const allDocuments = await step.run("fetch-documents", async () => {
130
130
+
const { data, error } = await supabaseServerClient
131
131
+
.from("documents")
132
132
+
.select("uri, data")
133
133
+
.in("uri", documentUris);
134
134
+
135
135
+
if (error) {
136
136
+
throw new Error(`Failed to fetch documents: ${error.message}`);
137
137
+
}
138
138
+
return data || [];
139
139
+
});
140
140
+
141
141
+
stats.documentsChecked = allDocuments.length;
142
142
+
143
143
+
// Step 4: Find documents with incorrect site values
144
144
+
const documentsToFix: Array<{
145
145
+
uri: string;
146
146
+
currentSite: string | null;
147
147
+
correctSite: string;
148
148
+
docData: SiteStandardDocument.Record;
149
149
+
}> = [];
150
150
+
151
151
+
for (const doc of allDocuments) {
152
152
+
const expectedPubUri = documentToPublication.get(doc.uri);
153
153
+
if (!expectedPubUri) continue;
154
154
+
155
155
+
const data = doc.data as unknown as SiteStandardDocument.Record;
156
156
+
const currentSite = data?.site;
157
157
+
158
158
+
if (!currentSite) {
159
159
+
stats.documentsMissingSite++;
160
160
+
continue;
161
161
+
}
162
162
+
163
163
+
const validSiteValues = buildValidSiteValues(expectedPubUri);
164
164
+
165
165
+
if (!validSiteValues.has(currentSite)) {
166
166
+
// Document has incorrect site value - determine the correct one
167
167
+
// Prefer the site.standard.publication format if the doc is site.standard.document
168
168
+
let correctSite = expectedPubUri;
169
169
+
170
170
+
if (doc.uri.includes("/site.standard.document/")) {
171
171
+
// For site.standard.document, use site.standard.publication format
172
172
+
try {
173
173
+
const pubAturi = new AtUri(expectedPubUri);
174
174
+
if (expectedPubUri.includes("/pub.leaflet.publication/")) {
175
175
+
correctSite = `at://${pubAturi.hostname}/site.standard.publication/${pubAturi.rkey}`;
176
176
+
}
177
177
+
} catch (e) {
178
178
+
// Use as-is
179
179
+
}
180
180
+
}
181
181
+
182
182
+
documentsToFix.push({
183
183
+
uri: doc.uri,
184
184
+
currentSite,
185
185
+
correctSite,
186
186
+
docData: data,
187
187
+
});
188
188
+
}
189
189
+
}
190
190
+
191
191
+
stats.documentsWithIncorrectSite = documentsToFix.length;
192
192
+
193
193
+
if (documentsToFix.length === 0) {
194
194
+
return {
195
195
+
success: true,
196
196
+
message: "All documents have correct site values",
197
197
+
stats,
198
198
+
};
199
199
+
}
200
200
+
201
201
+
// Step 5: Group documents by author DID for efficient OAuth session handling
202
202
+
const docsByDid = new Map<string, typeof documentsToFix>();
203
203
+
for (const doc of documentsToFix) {
204
204
+
try {
205
205
+
const aturi = new AtUri(doc.uri);
206
206
+
const authorDid = aturi.hostname;
207
207
+
const existing = docsByDid.get(authorDid) || [];
208
208
+
existing.push(doc);
209
209
+
docsByDid.set(authorDid, existing);
210
210
+
} catch (e) {
211
211
+
stats.errors.push(`Invalid URI: ${doc.uri}`);
212
212
+
}
213
213
+
}
214
214
+
215
215
+
// Step 6: Process each author's documents
216
216
+
for (const [authorDid, docs] of docsByDid) {
217
217
+
// Verify OAuth session for this author
218
218
+
const oauthValid = await step.run(
219
219
+
`verify-oauth-${authorDid.slice(-8)}`,
220
220
+
async () => {
221
221
+
const result = await restoreOAuthSession(authorDid);
222
222
+
return result.ok;
223
223
+
},
224
224
+
);
225
225
+
226
226
+
if (!oauthValid) {
227
227
+
stats.errors.push(`No valid OAuth session for ${authorDid}`);
228
228
+
continue;
229
229
+
}
230
230
+
231
231
+
// Fix each document for this author
232
232
+
for (const docToFix of docs) {
233
233
+
const result = await step.run(
234
234
+
`fix-doc-${docToFix.uri.slice(-12)}`,
235
235
+
async () => {
236
236
+
try {
237
237
+
const docAturi = new AtUri(docToFix.uri);
238
238
+
239
239
+
// Build updated record
240
240
+
const updatedRecord: SiteStandardDocument.Record = {
241
241
+
...docToFix.docData,
242
242
+
site: docToFix.correctSite,
243
243
+
};
244
244
+
245
245
+
// Update on PDS
246
246
+
const agent = await createAuthenticatedAgent(authorDid);
247
247
+
await agent.com.atproto.repo.putRecord({
248
248
+
repo: authorDid,
249
249
+
collection: docAturi.collection,
250
250
+
rkey: docAturi.rkey,
251
251
+
record: updatedRecord,
252
252
+
validate: false,
253
253
+
});
254
254
+
255
255
+
// Update in database
256
256
+
const { error: dbError } = await supabaseServerClient
257
257
+
.from("documents")
258
258
+
.update({ data: updatedRecord as Json })
259
259
+
.eq("uri", docToFix.uri);
260
260
+
261
261
+
if (dbError) {
262
262
+
return {
263
263
+
success: false as const,
264
264
+
error: `Database update failed: ${dbError.message}`,
265
265
+
};
266
266
+
}
267
267
+
268
268
+
return {
269
269
+
success: true as const,
270
270
+
oldSite: docToFix.currentSite,
271
271
+
newSite: docToFix.correctSite,
272
272
+
};
273
273
+
} catch (e) {
274
274
+
return {
275
275
+
success: false as const,
276
276
+
error: e instanceof Error ? e.message : String(e),
277
277
+
};
278
278
+
}
279
279
+
},
280
280
+
);
281
281
+
282
282
+
if (result.success) {
283
283
+
stats.documentsFixed++;
284
284
+
} else {
285
285
+
stats.errors.push(`${docToFix.uri}: ${result.error}`);
286
286
+
}
287
287
+
}
288
288
+
}
289
289
+
290
290
+
return {
291
291
+
success: stats.errors.length === 0,
292
292
+
stats,
293
293
+
documentsToFix: documentsToFix.map((d) => ({
294
294
+
uri: d.uri,
295
295
+
oldSite: d.currentSite,
296
296
+
newSite: d.correctSite,
297
297
+
})),
298
298
+
};
299
299
+
},
300
300
+
);
+2
app/api/inngest/route.tsx
···
6
6
import { index_follows } from "./functions/index_follows";
7
7
import { migrate_user_to_standard } from "./functions/migrate_user_to_standard";
8
8
import { fix_standard_document_publications } from "./functions/fix_standard_document_publications";
9
9
+
import { fix_incorrect_site_values } from "./functions/fix_incorrect_site_values";
9
10
import {
10
11
cleanup_expired_oauth_sessions,
11
12
check_oauth_session,
···
20
21
index_follows,
21
22
migrate_user_to_standard,
22
23
fix_standard_document_publications,
24
24
+
fix_incorrect_site_values,
23
25
cleanup_expired_oauth_sessions,
24
26
check_oauth_session,
25
27
],