a tool for shared writing and social publishing

update migration workflow to handle old/new records

+125
+125
app/api/inngest/functions/migrate_user_to_standard.ts
··· 38 const stats = { 39 publicationsMigrated: 0, 40 documentsMigrated: 0, 41 userSubscriptionsMigrated: 0, 42 referencesUpdated: 0, 43 errors: [] as string[], ··· 320 stats.errors.push( 321 `Document ${result.oldUri}: Database error: ${result.error}`, 322 ); 323 } 324 } 325
··· 38 const stats = { 39 publicationsMigrated: 0, 40 documentsMigrated: 0, 41 + standardDocumentsFixed: 0, 42 userSubscriptionsMigrated: 0, 43 referencesUpdated: 0, 44 errors: [] as string[], ··· 321 stats.errors.push( 322 `Document ${result.oldUri}: Database error: ${result.error}`, 323 ); 324 + } 325 + } 326 + 327 + // Step 4b: Fix existing site.standard.document records that reference pub.leaflet.publication 328 + // This handles the case where site.standard.document records were created pointing to 329 + // pub.leaflet.publication URIs before the publication was migrated to site.standard.publication 330 + const existingStandardDocs = await step.run( 331 + "fetch-existing-standard-documents", 332 + async () => { 333 + const { data, error } = await supabaseServerClient 334 + .from("documents") 335 + .select("uri, data") 336 + .like("uri", `at://${did}/site.standard.document/%`); 337 + 338 + if (error) 339 + throw new Error( 340 + `Failed to fetch existing standard documents: ${error.message}`, 341 + ); 342 + return data || []; 343 + }, 344 + ); 345 + 346 + // Find documents that reference pub.leaflet.publication and need their site field updated 347 + const standardDocsToFix = existingStandardDocs 348 + .map((doc) => { 349 + const data = doc.data as SiteStandardDocument.Record; 350 + const site = data?.site; 351 + 352 + // Check if site field references a pub.leaflet.publication 353 + if (!site || !site.includes("/pub.leaflet.publication/")) { 354 + return null; 355 + } 356 + 357 + try { 358 + const oldPubAturi = new AtUri(site); 359 + const newPubUri = `at://${oldPubAturi.hostname}/site.standard.publication/${oldPubAturi.rkey}`; 360 + 361 + // Only fix if we have the new publication in our map (meaning it was migrated) 362 + // or if the new publication exists (check against all migrated publications) 363 + if ( 364 + publicationUriMap[site] || 365 + Object.values(publicationUriMap).includes(newPubUri) 366 + ) { 367 + const docAturi = new AtUri(doc.uri); 368 + const updatedRecord: SiteStandardDocument.Record = { 369 + ...data, 370 + site: newPubUri, 371 + }; 372 + 373 + return { 374 + doc, 375 + rkey: docAturi.rkey, 376 + oldSite: site, 377 + newSite: newPubUri, 378 + updatedRecord, 379 + }; 380 + } 381 + } catch (e) { 382 + stats.errors.push(`Invalid site URI in document ${doc.uri}: ${site}`); 383 + } 384 + 385 + return null; 386 + }) 387 + .filter((x) => x !== null); 388 + 389 + // Update these documents on PDS and in database 390 + if (standardDocsToFix.length > 0) { 391 + const fixResults = await Promise.all( 392 + standardDocsToFix.map(({ doc, rkey, oldSite, newSite, updatedRecord }) => 393 + step.run(`fix-standard-document-${doc.uri}`, async () => { 394 + // PDS write to update the site field 395 + const agent = await createAuthenticatedAgent(did); 396 + await agent.com.atproto.repo.putRecord({ 397 + repo: did, 398 + collection: "site.standard.document", 399 + rkey, 400 + record: updatedRecord, 401 + validate: false, 402 + }); 403 + 404 + // DB write 405 + const { error: dbError } = await supabaseServerClient 406 + .from("documents") 407 + .update({ data: updatedRecord as Json }) 408 + .eq("uri", doc.uri); 409 + 410 + if (dbError) { 411 + return { 412 + success: false as const, 413 + uri: doc.uri, 414 + error: dbError.message, 415 + }; 416 + } 417 + 418 + // Update documents_in_publications to point to new publication URI 419 + await supabaseServerClient 420 + .from("documents_in_publications") 421 + .upsert({ 422 + publication: newSite, 423 + document: doc.uri, 424 + }); 425 + 426 + // Remove old publication reference if different 427 + if (oldSite !== newSite) { 428 + await supabaseServerClient 429 + .from("documents_in_publications") 430 + .delete() 431 + .eq("publication", oldSite) 432 + .eq("document", doc.uri); 433 + } 434 + 435 + return { success: true as const, uri: doc.uri }; 436 + }), 437 + ), 438 + ); 439 + 440 + for (const result of fixResults) { 441 + if (result.success) { 442 + stats.standardDocumentsFixed++; 443 + } else { 444 + stats.errors.push( 445 + `Fix standard document ${result.uri}: Database error: ${result.error}`, 446 + ); 447 + } 448 } 449 } 450