a tool for shared writing and social publishing
at update/reader 155 lines 4.9 kB view raw
1"use server"; 2 3import { AtpBaseClient } from "lexicons/api"; 4import { AppBskyActorDefs, Agent as BskyAgent } from "@atproto/api"; 5import { getIdentityData } from "actions/getIdentityData"; 6import { restoreOAuthSession, OAuthSessionError } from "src/atproto-oauth"; 7import { TID } from "@atproto/common"; 8import { supabaseServerClient } from "supabase/serverClient"; 9import { revalidatePath } from "next/cache"; 10import { AtUri } from "@atproto/syntax"; 11import { redirect } from "next/navigation"; 12import { encodeActionToSearchParam } from "app/api/oauth/[route]/afterSignInActions"; 13import { Json } from "supabase/database.types"; 14import { IdResolver } from "@atproto/identity"; 15import { 16 Notification, 17 pingIdentityToUpdateNotification, 18} from "src/notifications"; 19import { v7 } from "uuid"; 20 21let leafletFeedURI = 22 "at://did:plc:btxrwcaeyodrap5mnjw2fvmz/app.bsky.feed.generator/subscribedPublications"; 23let idResolver = new IdResolver(); 24 25type SubscribeResult = 26 | { success: true; hasFeed: boolean } 27 | { success: false; error: OAuthSessionError }; 28 29export async function subscribeToPublication( 30 publication: string, 31 redirectRoute?: string, 32): Promise<SubscribeResult | never> { 33 let identity = await getIdentityData(); 34 if (!identity || !identity.atp_did) { 35 return redirect( 36 `/api/oauth/login?redirect_url=${redirectRoute}&action=${encodeActionToSearchParam({ action: "subscribe", publication })}`, 37 ); 38 } 39 40 const sessionResult = await restoreOAuthSession(identity.atp_did); 41 if (!sessionResult.ok) { 42 return { success: false, error: sessionResult.error }; 43 } 44 let credentialSession = sessionResult.value; 45 let agent = new AtpBaseClient( 46 credentialSession.fetchHandler.bind(credentialSession), 47 ); 48 let record = await agent.site.standard.graph.subscription.create( 49 { repo: credentialSession.did!, rkey: TID.nextStr() }, 50 { 51 publication, 52 }, 53 ); 54 let { error } = await supabaseServerClient 55 .from("publication_subscriptions") 56 .insert({ 57 uri: record.uri, 58 record, 59 publication, 60 identity: credentialSession.did!, 61 }); 62 63 // Create notification for the publication owner 64 let publicationOwner = new AtUri(publication).host; 65 if (publicationOwner !== credentialSession.did) { 66 let notification: Notification = { 67 id: v7(), 68 recipient: publicationOwner, 69 data: { 70 type: "subscribe", 71 subscription_uri: record.uri, 72 }, 73 }; 74 await supabaseServerClient.from("notifications").insert(notification); 75 await pingIdentityToUpdateNotification(publicationOwner); 76 } 77 78 let bsky = new BskyAgent(credentialSession); 79 let [profile, resolveDid] = await Promise.all([ 80 bsky.app.bsky.actor.profile 81 .get({ 82 repo: credentialSession.did!, 83 rkey: "self", 84 }) 85 .catch(), 86 idResolver.did.resolve(credentialSession.did!), 87 ]); 88 if (!identity.bsky_profiles && profile.value) { 89 await supabaseServerClient.from("bsky_profiles").insert({ 90 did: identity.atp_did, 91 record: profile.value as Json, 92 handle: resolveDid?.alsoKnownAs?.[0]?.slice(5), 93 }); 94 } 95 revalidatePath("/lish/[did]/[publication]", "layout"); 96 return { 97 success: true, 98 hasFeed: true, 99 }; 100} 101 102type UnsubscribeResult = 103 | { success: true } 104 | { success: false; error: OAuthSessionError }; 105 106export async function unsubscribeToPublication( 107 publication: string, 108): Promise<UnsubscribeResult> { 109 let identity = await getIdentityData(); 110 if (!identity || !identity.atp_did) { 111 return { 112 success: false, 113 error: { 114 type: "oauth_session_expired", 115 message: "Not authenticated", 116 did: "", 117 }, 118 }; 119 } 120 121 const sessionResult = await restoreOAuthSession(identity.atp_did); 122 if (!sessionResult.ok) { 123 return { success: false, error: sessionResult.error }; 124 } 125 let credentialSession = sessionResult.value; 126 let agent = new AtpBaseClient( 127 credentialSession.fetchHandler.bind(credentialSession), 128 ); 129 let { data: existingSubscription } = await supabaseServerClient 130 .from("publication_subscriptions") 131 .select("*") 132 .eq("identity", identity.atp_did) 133 .eq("publication", publication) 134 .single(); 135 if (!existingSubscription) return { success: true }; 136 137 // Delete from both collections (old and new schema) - one or both may exist 138 let rkey = new AtUri(existingSubscription.uri).rkey; 139 await Promise.all([ 140 agent.pub.leaflet.graph.subscription 141 .delete({ repo: credentialSession.did!, rkey }) 142 .catch(() => {}), 143 agent.site.standard.graph.subscription 144 .delete({ repo: credentialSession.did!, rkey }) 145 .catch(() => {}), 146 ]); 147 148 await supabaseServerClient 149 .from("publication_subscriptions") 150 .delete() 151 .eq("identity", identity.atp_did) 152 .eq("publication", publication); 153 revalidatePath("/lish/[did]/[publication]", "layout"); 154 return { success: true }; 155}