a tool for shared writing and social publishing
at update/thread-viewer 158 lines 5.1 kB view raw
1"use server"; 2 3import { AtpBaseClient } from "lexicons/api"; 4import { AppBskyActorDefs, Agent as BskyAgent } from "@atproto/api"; 5import { getIdentityData } from "actions/getIdentityData"; 6import { 7 restoreOAuthSession, 8 OAuthSessionError, 9} from "src/atproto-oauth"; 10import { TID } from "@atproto/common"; 11import { supabaseServerClient } from "supabase/serverClient"; 12import { revalidatePath } from "next/cache"; 13import { AtUri } from "@atproto/syntax"; 14import { redirect } from "next/navigation"; 15import { encodeActionToSearchParam } from "app/api/oauth/[route]/afterSignInActions"; 16import { Json } from "supabase/database.types"; 17import { IdResolver } from "@atproto/identity"; 18import { 19 Notification, 20 pingIdentityToUpdateNotification, 21} from "src/notifications"; 22import { v7 } from "uuid"; 23 24let leafletFeedURI = 25 "at://did:plc:btxrwcaeyodrap5mnjw2fvmz/app.bsky.feed.generator/subscribedPublications"; 26let idResolver = new IdResolver(); 27 28type SubscribeResult = 29 | { success: true; hasFeed: boolean } 30 | { success: false; error: OAuthSessionError }; 31 32export async function subscribeToPublication( 33 publication: string, 34 redirectRoute?: string, 35): Promise<SubscribeResult | never> { 36 let identity = await getIdentityData(); 37 if (!identity || !identity.atp_did) { 38 return redirect( 39 `/api/oauth/login?redirect_url=${redirectRoute}&action=${encodeActionToSearchParam({ action: "subscribe", publication })}`, 40 ); 41 } 42 43 const sessionResult = await restoreOAuthSession(identity.atp_did); 44 if (!sessionResult.ok) { 45 return { success: false, error: sessionResult.error }; 46 } 47 let credentialSession = sessionResult.value; 48 let agent = new AtpBaseClient( 49 credentialSession.fetchHandler.bind(credentialSession), 50 ); 51 let record = await agent.site.standard.graph.subscription.create( 52 { repo: credentialSession.did!, rkey: TID.nextStr() }, 53 { 54 publication, 55 }, 56 ); 57 let { error } = await supabaseServerClient 58 .from("publication_subscriptions") 59 .insert({ 60 uri: record.uri, 61 record, 62 publication, 63 identity: credentialSession.did!, 64 }); 65 66 // Create notification for the publication owner 67 let publicationOwner = new AtUri(publication).host; 68 if (publicationOwner !== credentialSession.did) { 69 let notification: Notification = { 70 id: v7(), 71 recipient: publicationOwner, 72 data: { 73 type: "subscribe", 74 subscription_uri: record.uri, 75 }, 76 }; 77 await supabaseServerClient.from("notifications").insert(notification); 78 await pingIdentityToUpdateNotification(publicationOwner); 79 } 80 81 let bsky = new BskyAgent(credentialSession); 82 let [prefs, profile, resolveDid] = await Promise.all([ 83 bsky.app.bsky.actor.getPreferences(), 84 bsky.app.bsky.actor.profile 85 .get({ 86 repo: credentialSession.did!, 87 rkey: "self", 88 }) 89 .catch(), 90 idResolver.did.resolve(credentialSession.did!), 91 ]); 92 if (!identity.bsky_profiles && profile.value) { 93 await supabaseServerClient.from("bsky_profiles").insert({ 94 did: identity.atp_did, 95 record: profile.value as Json, 96 handle: resolveDid?.alsoKnownAs?.[0]?.slice(5), 97 }); 98 } 99 let savedFeeds = prefs.data.preferences.find( 100 (pref) => pref.$type === "app.bsky.actor.defs#savedFeedsPrefV2", 101 ) as AppBskyActorDefs.SavedFeedsPrefV2; 102 revalidatePath("/lish/[did]/[publication]", "layout"); 103 return { 104 success: true, 105 hasFeed: !!savedFeeds.items.find((feed) => feed.value === leafletFeedURI), 106 }; 107} 108 109type UnsubscribeResult = 110 | { success: true } 111 | { success: false; error: OAuthSessionError }; 112 113export async function unsubscribeToPublication( 114 publication: string 115): Promise<UnsubscribeResult> { 116 let identity = await getIdentityData(); 117 if (!identity || !identity.atp_did) { 118 return { 119 success: false, 120 error: { 121 type: "oauth_session_expired", 122 message: "Not authenticated", 123 did: "", 124 }, 125 }; 126 } 127 128 const sessionResult = await restoreOAuthSession(identity.atp_did); 129 if (!sessionResult.ok) { 130 return { success: false, error: sessionResult.error }; 131 } 132 let credentialSession = sessionResult.value; 133 let agent = new AtpBaseClient( 134 credentialSession.fetchHandler.bind(credentialSession), 135 ); 136 let { data: existingSubscription } = await supabaseServerClient 137 .from("publication_subscriptions") 138 .select("*") 139 .eq("identity", identity.atp_did) 140 .eq("publication", publication) 141 .single(); 142 if (!existingSubscription) return { success: true }; 143 144 // Delete from both collections (old and new schema) - one or both may exist 145 let rkey = new AtUri(existingSubscription.uri).rkey; 146 await Promise.all([ 147 agent.pub.leaflet.graph.subscription.delete({ repo: credentialSession.did!, rkey }).catch(() => {}), 148 agent.site.standard.graph.subscription.delete({ repo: credentialSession.did!, rkey }).catch(() => {}), 149 ]); 150 151 await supabaseServerClient 152 .from("publication_subscriptions") 153 .delete() 154 .eq("identity", identity.atp_did) 155 .eq("publication", publication); 156 revalidatePath("/lish/[did]/[publication]", "layout"); 157 return { success: true }; 158}