a tool for shared writing and social publishing
1"use server";
2
3import { AtpBaseClient } from "lexicons/api";
4import { AppBskyActorDefs, Agent as BskyAgent } from "@atproto/api";
5import { getIdentityData } from "actions/getIdentityData";
6import { restoreOAuthSession, OAuthSessionError } from "src/atproto-oauth";
7import { TID } from "@atproto/common";
8import { supabaseServerClient } from "supabase/serverClient";
9import { revalidatePath } from "next/cache";
10import { AtUri } from "@atproto/syntax";
11import { redirect } from "next/navigation";
12import { encodeActionToSearchParam } from "app/api/oauth/[route]/afterSignInActions";
13import { Json } from "supabase/database.types";
14import { IdResolver } from "@atproto/identity";
15import {
16 Notification,
17 pingIdentityToUpdateNotification,
18} from "src/notifications";
19import { v7 } from "uuid";
20
21let leafletFeedURI =
22 "at://did:plc:btxrwcaeyodrap5mnjw2fvmz/app.bsky.feed.generator/subscribedPublications";
23let idResolver = new IdResolver();
24
25type SubscribeResult =
26 | { success: true; hasFeed: boolean }
27 | { success: false; error: OAuthSessionError };
28
29export async function subscribeToPublication(
30 publication: string,
31 redirectRoute?: string,
32): Promise<SubscribeResult | never> {
33 let identity = await getIdentityData();
34 if (!identity || !identity.atp_did) {
35 return redirect(
36 `/api/oauth/login?redirect_url=${redirectRoute}&action=${encodeActionToSearchParam({ action: "subscribe", publication })}`,
37 );
38 }
39
40 const sessionResult = await restoreOAuthSession(identity.atp_did);
41 if (!sessionResult.ok) {
42 return { success: false, error: sessionResult.error };
43 }
44 let credentialSession = sessionResult.value;
45 let agent = new AtpBaseClient(
46 credentialSession.fetchHandler.bind(credentialSession),
47 );
48 let record = await agent.site.standard.graph.subscription.create(
49 { repo: credentialSession.did!, rkey: TID.nextStr() },
50 {
51 publication,
52 },
53 );
54 let { error } = await supabaseServerClient
55 .from("publication_subscriptions")
56 .insert({
57 uri: record.uri,
58 record,
59 publication,
60 identity: credentialSession.did!,
61 });
62
63 // Create notification for the publication owner
64 let publicationOwner = new AtUri(publication).host;
65 if (publicationOwner !== credentialSession.did) {
66 let notification: Notification = {
67 id: v7(),
68 recipient: publicationOwner,
69 data: {
70 type: "subscribe",
71 subscription_uri: record.uri,
72 },
73 };
74 await supabaseServerClient.from("notifications").insert(notification);
75 await pingIdentityToUpdateNotification(publicationOwner);
76 }
77
78 let bsky = new BskyAgent(credentialSession);
79 let [profile, resolveDid] = await Promise.all([
80 bsky.app.bsky.actor.profile
81 .get({
82 repo: credentialSession.did!,
83 rkey: "self",
84 })
85 .catch(),
86 idResolver.did.resolve(credentialSession.did!),
87 ]);
88 if (!identity.bsky_profiles && profile.value) {
89 await supabaseServerClient.from("bsky_profiles").insert({
90 did: identity.atp_did,
91 record: profile.value as Json,
92 handle: resolveDid?.alsoKnownAs?.[0]?.slice(5),
93 });
94 }
95 revalidatePath("/lish/[did]/[publication]", "layout");
96 return {
97 success: true,
98 hasFeed: true,
99 };
100}
101
102type UnsubscribeResult =
103 | { success: true }
104 | { success: false; error: OAuthSessionError };
105
106export async function unsubscribeToPublication(
107 publication: string,
108): Promise<UnsubscribeResult> {
109 let identity = await getIdentityData();
110 if (!identity || !identity.atp_did) {
111 return {
112 success: false,
113 error: {
114 type: "oauth_session_expired",
115 message: "Not authenticated",
116 did: "",
117 },
118 };
119 }
120
121 const sessionResult = await restoreOAuthSession(identity.atp_did);
122 if (!sessionResult.ok) {
123 return { success: false, error: sessionResult.error };
124 }
125 let credentialSession = sessionResult.value;
126 let agent = new AtpBaseClient(
127 credentialSession.fetchHandler.bind(credentialSession),
128 );
129 let { data: existingSubscription } = await supabaseServerClient
130 .from("publication_subscriptions")
131 .select("*")
132 .eq("identity", identity.atp_did)
133 .eq("publication", publication)
134 .single();
135 if (!existingSubscription) return { success: true };
136
137 // Delete from both collections (old and new schema) - one or both may exist
138 let rkey = new AtUri(existingSubscription.uri).rkey;
139 await Promise.all([
140 agent.pub.leaflet.graph.subscription
141 .delete({ repo: credentialSession.did!, rkey })
142 .catch(() => {}),
143 agent.site.standard.graph.subscription
144 .delete({ repo: credentialSession.did!, rkey })
145 .catch(() => {}),
146 ]);
147
148 await supabaseServerClient
149 .from("publication_subscriptions")
150 .delete()
151 .eq("identity", identity.atp_did)
152 .eq("publication", publication);
153 revalidatePath("/lish/[did]/[publication]", "layout");
154 return { success: true };
155}