tangled
alpha
login
or
join now
leaflet.pub
/
leaflet
289
fork
atom
a tool for shared writing and social publishing
289
fork
atom
overview
issues
27
pulls
pipelines
add inngest function to fix postref issue
awarm.space
1 month ago
5626c866
8c91fcf1
+203
3 changed files
expand all
collapse all
unified
split
app
api
inngest
client.ts
functions
fix_standard_document_postref.ts
route.tsx
+5
app/api/inngest/client.ts
···
46
46
did: string;
47
47
};
48
48
};
49
49
+
"documents/fix-postref": {
50
50
+
data: {
51
51
+
documentUris?: string[];
52
52
+
};
53
53
+
};
49
54
};
50
55
51
56
// Create a client to send and receive events
+196
app/api/inngest/functions/fix_standard_document_postref.ts
···
1
1
+
import { supabaseServerClient } from "supabase/serverClient";
2
2
+
import { inngest } from "../client";
3
3
+
import { restoreOAuthSession } from "src/atproto-oauth";
4
4
+
import {
5
5
+
AtpBaseClient,
6
6
+
SiteStandardDocument,
7
7
+
ComAtprotoRepoStrongRef,
8
8
+
} from "lexicons/api";
9
9
+
import { AtUri } from "@atproto/syntax";
10
10
+
import { Json } from "supabase/database.types";
11
11
+
12
12
+
async function createAuthenticatedAgent(did: string): Promise<AtpBaseClient> {
13
13
+
const result = await restoreOAuthSession(did);
14
14
+
if (!result.ok) {
15
15
+
throw new Error(`Failed to restore OAuth session: ${result.error.message}`);
16
16
+
}
17
17
+
const credentialSession = result.value;
18
18
+
return new AtpBaseClient(
19
19
+
credentialSession.fetchHandler.bind(credentialSession),
20
20
+
);
21
21
+
}
22
22
+
23
23
+
/**
24
24
+
* Fixes site.standard.document records that have the legacy `postRef` field set.
25
25
+
* Migrates the value to `bskyPostRef` (the correct field for site.standard.document)
26
26
+
* and removes the legacy `postRef` field.
27
27
+
*
28
28
+
* Can be triggered with specific document URIs, or will find all affected documents
29
29
+
* if no URIs are provided.
30
30
+
*/
31
31
+
export const fix_standard_document_postref = inngest.createFunction(
32
32
+
{ id: "fix_standard_document_postref" },
33
33
+
{ event: "documents/fix-postref" },
34
34
+
async ({ event, step }) => {
35
35
+
const { documentUris: providedUris } = event.data as {
36
36
+
documentUris?: string[];
37
37
+
};
38
38
+
39
39
+
const stats = {
40
40
+
documentsFound: 0,
41
41
+
documentsFixed: 0,
42
42
+
documentsSkipped: 0,
43
43
+
errors: [] as string[],
44
44
+
};
45
45
+
46
46
+
// Step 1: Find documents to fix (either provided or query for them)
47
47
+
const documentUris = await step.run("find-documents", async () => {
48
48
+
if (providedUris && providedUris.length > 0) {
49
49
+
return providedUris;
50
50
+
}
51
51
+
52
52
+
// Find all site.standard.document records with postRef set
53
53
+
const { data: documents, error } = await supabaseServerClient
54
54
+
.from("documents")
55
55
+
.select("uri")
56
56
+
.like("uri", "at://%/site.standard.document/%")
57
57
+
.not("data->postRef", "is", null);
58
58
+
59
59
+
if (error) {
60
60
+
throw new Error(`Failed to query documents: ${error.message}`);
61
61
+
}
62
62
+
63
63
+
return (documents || []).map((d) => d.uri);
64
64
+
});
65
65
+
66
66
+
stats.documentsFound = documentUris.length;
67
67
+
68
68
+
if (documentUris.length === 0) {
69
69
+
return {
70
70
+
success: true,
71
71
+
message: "No documents found with postRef field",
72
72
+
stats,
73
73
+
};
74
74
+
}
75
75
+
76
76
+
// Step 2: Group documents by DID for efficient OAuth session handling
77
77
+
const docsByDid = new Map<string, string[]>();
78
78
+
for (const uri of documentUris) {
79
79
+
try {
80
80
+
const aturi = new AtUri(uri);
81
81
+
const did = aturi.hostname;
82
82
+
const existing = docsByDid.get(did) || [];
83
83
+
existing.push(uri);
84
84
+
docsByDid.set(did, existing);
85
85
+
} catch (e) {
86
86
+
stats.errors.push(`Invalid URI: ${uri}`);
87
87
+
}
88
88
+
}
89
89
+
90
90
+
// Step 3: Process each DID's documents
91
91
+
for (const [did, uris] of docsByDid) {
92
92
+
// Verify OAuth session for this user
93
93
+
const oauthValid = await step.run(
94
94
+
`verify-oauth-${did.slice(-8)}`,
95
95
+
async () => {
96
96
+
const result = await restoreOAuthSession(did);
97
97
+
return result.ok;
98
98
+
},
99
99
+
);
100
100
+
101
101
+
if (!oauthValid) {
102
102
+
stats.errors.push(`No valid OAuth session for ${did}`);
103
103
+
stats.documentsSkipped += uris.length;
104
104
+
continue;
105
105
+
}
106
106
+
107
107
+
// Fix each document
108
108
+
for (const docUri of uris) {
109
109
+
const result = await step.run(
110
110
+
`fix-doc-${docUri.slice(-12)}`,
111
111
+
async () => {
112
112
+
// Fetch the document
113
113
+
const { data: doc, error: fetchError } = await supabaseServerClient
114
114
+
.from("documents")
115
115
+
.select("uri, data")
116
116
+
.eq("uri", docUri)
117
117
+
.single();
118
118
+
119
119
+
if (fetchError || !doc) {
120
120
+
return {
121
121
+
success: false as const,
122
122
+
error: `Document not found: ${fetchError?.message || "no data"}`,
123
123
+
};
124
124
+
}
125
125
+
126
126
+
const data = doc.data as Record<string, unknown>;
127
127
+
const postRef = data.postRef as
128
128
+
| ComAtprotoRepoStrongRef.Main
129
129
+
| undefined;
130
130
+
131
131
+
if (!postRef) {
132
132
+
return {
133
133
+
success: false as const,
134
134
+
skipped: true,
135
135
+
error: "Document does not have postRef field",
136
136
+
};
137
137
+
}
138
138
+
139
139
+
// Build updated record: move postRef to bskyPostRef
140
140
+
const { postRef: _, ...restData } = data;
141
141
+
let updatedRecord: SiteStandardDocument.Record = {
142
142
+
...(restData as SiteStandardDocument.Record),
143
143
+
};
144
144
+
145
145
+
updatedRecord.bskyPostRef = data.bskyPostRef
146
146
+
? (data.bskyPostRef as ComAtprotoRepoStrongRef.Main)
147
147
+
: postRef;
148
148
+
149
149
+
// Write to PDS
150
150
+
const docAturi = new AtUri(docUri);
151
151
+
const agent = await createAuthenticatedAgent(did);
152
152
+
await agent.com.atproto.repo.putRecord({
153
153
+
repo: did,
154
154
+
collection: "site.standard.document",
155
155
+
rkey: docAturi.rkey,
156
156
+
record: updatedRecord,
157
157
+
validate: false,
158
158
+
});
159
159
+
160
160
+
// Update database
161
161
+
const { error: dbError } = await supabaseServerClient
162
162
+
.from("documents")
163
163
+
.update({ data: updatedRecord as Json })
164
164
+
.eq("uri", docUri);
165
165
+
166
166
+
if (dbError) {
167
167
+
return {
168
168
+
success: false as const,
169
169
+
error: `Database update failed: ${dbError.message}`,
170
170
+
};
171
171
+
}
172
172
+
173
173
+
return {
174
174
+
success: true as const,
175
175
+
postRef,
176
176
+
bskyPostRef: updatedRecord.bskyPostRef,
177
177
+
};
178
178
+
},
179
179
+
);
180
180
+
181
181
+
if (result.success) {
182
182
+
stats.documentsFixed++;
183
183
+
} else if ("skipped" in result && result.skipped) {
184
184
+
stats.documentsSkipped++;
185
185
+
} else {
186
186
+
stats.errors.push(`${docUri}: ${result.error}`);
187
187
+
}
188
188
+
}
189
189
+
}
190
190
+
191
191
+
return {
192
192
+
success: stats.errors.length === 0,
193
193
+
stats,
194
194
+
};
195
195
+
},
196
196
+
);
+2
app/api/inngest/route.tsx
···
7
7
import { migrate_user_to_standard } from "./functions/migrate_user_to_standard";
8
8
import { fix_standard_document_publications } from "./functions/fix_standard_document_publications";
9
9
import { fix_incorrect_site_values } from "./functions/fix_incorrect_site_values";
10
10
+
import { fix_standard_document_postref } from "./functions/fix_standard_document_postref";
10
11
import {
11
12
cleanup_expired_oauth_sessions,
12
13
check_oauth_session,
···
22
23
migrate_user_to_standard,
23
24
fix_standard_document_publications,
24
25
fix_incorrect_site_values,
26
26
+
fix_standard_document_postref,
25
27
cleanup_expired_oauth_sessions,
26
28
check_oauth_session,
27
29
],