a tool for shared writing and social publishing
1"use server";
2
3import { supabaseServerClient } from "supabase/serverClient";
4import { Tables, TablesInsert } from "supabase/database.types";
5import { AtUri } from "@atproto/syntax";
6import { idResolver } from "app/(home-pages)/reader/idResolver";
7
8type NotificationRow = Tables<"notifications">;
9
10export type Notification = Omit<TablesInsert<"notifications">, "data"> & {
11 data: NotificationData;
12};
13
14export type NotificationData =
15 | { type: "comment"; comment_uri: string; parent_uri?: string }
16 | { type: "subscribe"; subscription_uri: string }
17 | { type: "quote"; bsky_post_uri: string; document_uri: string }
18 | { type: "mention"; document_uri: string; mention_type: "did" }
19 | { type: "mention"; document_uri: string; mention_type: "publication"; mentioned_uri: string }
20 | { type: "mention"; document_uri: string; mention_type: "document"; mentioned_uri: string }
21 | { type: "comment_mention"; comment_uri: string; mention_type: "did" }
22 | { type: "comment_mention"; comment_uri: string; mention_type: "publication"; mentioned_uri: string }
23 | { type: "comment_mention"; comment_uri: string; mention_type: "document"; mentioned_uri: string };
24
25export type HydratedNotification =
26 | HydratedCommentNotification
27 | HydratedSubscribeNotification
28 | HydratedQuoteNotification
29 | HydratedMentionNotification
30 | HydratedCommentMentionNotification;
31export async function hydrateNotifications(
32 notifications: NotificationRow[],
33): Promise<Array<HydratedNotification>> {
34 // Call all hydrators in parallel
35 const [commentNotifications, subscribeNotifications, quoteNotifications, mentionNotifications, commentMentionNotifications] = await Promise.all([
36 hydrateCommentNotifications(notifications),
37 hydrateSubscribeNotifications(notifications),
38 hydrateQuoteNotifications(notifications),
39 hydrateMentionNotifications(notifications),
40 hydrateCommentMentionNotifications(notifications),
41 ]);
42
43 // Combine all hydrated notifications
44 const allHydrated = [...commentNotifications, ...subscribeNotifications, ...quoteNotifications, ...mentionNotifications, ...commentMentionNotifications];
45
46 // Sort by created_at to maintain order
47 allHydrated.sort(
48 (a, b) =>
49 new Date(b.created_at).getTime() - new Date(a.created_at).getTime(),
50 );
51
52 return allHydrated;
53}
54
55// Type guard to extract notification type
56type ExtractNotificationType<T extends NotificationData["type"]> = Extract<
57 NotificationData,
58 { type: T }
59>;
60
61export type HydratedCommentNotification = Awaited<
62 ReturnType<typeof hydrateCommentNotifications>
63>[0];
64
65async function hydrateCommentNotifications(notifications: NotificationRow[]) {
66 const commentNotifications = notifications.filter(
67 (n): n is NotificationRow & { data: ExtractNotificationType<"comment"> } =>
68 (n.data as NotificationData)?.type === "comment",
69 );
70
71 if (commentNotifications.length === 0) {
72 return [];
73 }
74
75 // Fetch comment data from the database
76 const commentUris = commentNotifications.flatMap((n) =>
77 n.data.parent_uri
78 ? [n.data.comment_uri, n.data.parent_uri]
79 : [n.data.comment_uri],
80 );
81 const { data: comments } = await supabaseServerClient
82 .from("comments_on_documents")
83 .select(
84 "*,bsky_profiles(*), documents(*, documents_in_publications(publications(*)))",
85 )
86 .in("uri", commentUris);
87
88 return commentNotifications.map((notification) => ({
89 id: notification.id,
90 recipient: notification.recipient,
91 created_at: notification.created_at,
92 type: "comment" as const,
93 comment_uri: notification.data.comment_uri,
94 parentData: notification.data.parent_uri
95 ? comments?.find((c) => c.uri === notification.data.parent_uri)!
96 : undefined,
97 commentData: comments?.find(
98 (c) => c.uri === notification.data.comment_uri,
99 )!,
100 }));
101}
102
103export type HydratedSubscribeNotification = Awaited<
104 ReturnType<typeof hydrateSubscribeNotifications>
105>[0];
106
107async function hydrateSubscribeNotifications(notifications: NotificationRow[]) {
108 const subscribeNotifications = notifications.filter(
109 (
110 n,
111 ): n is NotificationRow & { data: ExtractNotificationType<"subscribe"> } =>
112 (n.data as NotificationData)?.type === "subscribe",
113 );
114
115 if (subscribeNotifications.length === 0) {
116 return [];
117 }
118
119 // Fetch subscription data from the database with related data
120 const subscriptionUris = subscribeNotifications.map(
121 (n) => n.data.subscription_uri,
122 );
123 const { data: subscriptions } = await supabaseServerClient
124 .from("publication_subscriptions")
125 .select("*, identities(bsky_profiles(*)), publications(*)")
126 .in("uri", subscriptionUris);
127
128 return subscribeNotifications.map((notification) => ({
129 id: notification.id,
130 recipient: notification.recipient,
131 created_at: notification.created_at,
132 type: "subscribe" as const,
133 subscription_uri: notification.data.subscription_uri,
134 subscriptionData: subscriptions?.find(
135 (s) => s.uri === notification.data.subscription_uri,
136 )!,
137 }));
138}
139
140export type HydratedQuoteNotification = Awaited<
141 ReturnType<typeof hydrateQuoteNotifications>
142>[0];
143
144async function hydrateQuoteNotifications(notifications: NotificationRow[]) {
145 const quoteNotifications = notifications.filter(
146 (n): n is NotificationRow & { data: ExtractNotificationType<"quote"> } =>
147 (n.data as NotificationData)?.type === "quote",
148 );
149
150 if (quoteNotifications.length === 0) {
151 return [];
152 }
153
154 // Fetch bsky post data and document data
155 const bskyPostUris = quoteNotifications.map((n) => n.data.bsky_post_uri);
156 const documentUris = quoteNotifications.map((n) => n.data.document_uri);
157
158 const { data: bskyPosts } = await supabaseServerClient
159 .from("bsky_posts")
160 .select("*")
161 .in("uri", bskyPostUris);
162
163 const { data: documents } = await supabaseServerClient
164 .from("documents")
165 .select("*, documents_in_publications(publications(*))")
166 .in("uri", documentUris);
167
168 return quoteNotifications.map((notification) => ({
169 id: notification.id,
170 recipient: notification.recipient,
171 created_at: notification.created_at,
172 type: "quote" as const,
173 bsky_post_uri: notification.data.bsky_post_uri,
174 document_uri: notification.data.document_uri,
175 bskyPost: bskyPosts?.find((p) => p.uri === notification.data.bsky_post_uri)!,
176 document: documents?.find((d) => d.uri === notification.data.document_uri)!,
177 }));
178}
179
180export type HydratedMentionNotification = Awaited<
181 ReturnType<typeof hydrateMentionNotifications>
182>[0];
183
184async function hydrateMentionNotifications(notifications: NotificationRow[]) {
185 const mentionNotifications = notifications.filter(
186 (n): n is NotificationRow & { data: ExtractNotificationType<"mention"> } =>
187 (n.data as NotificationData)?.type === "mention",
188 );
189
190 if (mentionNotifications.length === 0) {
191 return [];
192 }
193
194 // Fetch document data from the database
195 const documentUris = mentionNotifications.map((n) => n.data.document_uri);
196 const { data: documents } = await supabaseServerClient
197 .from("documents")
198 .select("*, documents_in_publications(publications(*))")
199 .in("uri", documentUris);
200
201 // Extract unique DIDs from document URIs to resolve handles
202 const documentCreatorDids = [...new Set(documentUris.map((uri) => new AtUri(uri).host))];
203
204 // Resolve DIDs to handles in parallel
205 const didToHandleMap = new Map<string, string | null>();
206 await Promise.all(
207 documentCreatorDids.map(async (did) => {
208 try {
209 const resolved = await idResolver.did.resolve(did);
210 const handle = resolved?.alsoKnownAs?.[0]
211 ? resolved.alsoKnownAs[0].slice(5) // Remove "at://" prefix
212 : null;
213 didToHandleMap.set(did, handle);
214 } catch (error) {
215 console.error(`Failed to resolve DID ${did}:`, error);
216 didToHandleMap.set(did, null);
217 }
218 }),
219 );
220
221 // Fetch mentioned publications and documents
222 const mentionedPublicationUris = mentionNotifications
223 .filter((n) => n.data.mention_type === "publication")
224 .map((n) => (n.data as Extract<ExtractNotificationType<"mention">, { mention_type: "publication" }>).mentioned_uri);
225
226 const mentionedDocumentUris = mentionNotifications
227 .filter((n) => n.data.mention_type === "document")
228 .map((n) => (n.data as Extract<ExtractNotificationType<"mention">, { mention_type: "document" }>).mentioned_uri);
229
230 const [{ data: mentionedPublications }, { data: mentionedDocuments }] = await Promise.all([
231 mentionedPublicationUris.length > 0
232 ? supabaseServerClient
233 .from("publications")
234 .select("*")
235 .in("uri", mentionedPublicationUris)
236 : Promise.resolve({ data: [] }),
237 mentionedDocumentUris.length > 0
238 ? supabaseServerClient
239 .from("documents")
240 .select("*, documents_in_publications(publications(*))")
241 .in("uri", mentionedDocumentUris)
242 : Promise.resolve({ data: [] }),
243 ]);
244
245 return mentionNotifications.map((notification) => {
246 const mentionedUri = notification.data.mention_type !== "did"
247 ? (notification.data as Extract<ExtractNotificationType<"mention">, { mentioned_uri: string }>).mentioned_uri
248 : undefined;
249
250 const documentCreatorDid = new AtUri(notification.data.document_uri).host;
251 const documentCreatorHandle = didToHandleMap.get(documentCreatorDid) ?? null;
252
253 return {
254 id: notification.id,
255 recipient: notification.recipient,
256 created_at: notification.created_at,
257 type: "mention" as const,
258 document_uri: notification.data.document_uri,
259 mention_type: notification.data.mention_type,
260 mentioned_uri: mentionedUri,
261 document: documents?.find((d) => d.uri === notification.data.document_uri)!,
262 documentCreatorHandle,
263 mentionedPublication: mentionedUri ? mentionedPublications?.find((p) => p.uri === mentionedUri) : undefined,
264 mentionedDocument: mentionedUri ? mentionedDocuments?.find((d) => d.uri === mentionedUri) : undefined,
265 };
266 });
267}
268
269export type HydratedCommentMentionNotification = Awaited<
270 ReturnType<typeof hydrateCommentMentionNotifications>
271>[0];
272
273async function hydrateCommentMentionNotifications(notifications: NotificationRow[]) {
274 const commentMentionNotifications = notifications.filter(
275 (n): n is NotificationRow & { data: ExtractNotificationType<"comment_mention"> } =>
276 (n.data as NotificationData)?.type === "comment_mention",
277 );
278
279 if (commentMentionNotifications.length === 0) {
280 return [];
281 }
282
283 // Fetch comment data from the database
284 const commentUris = commentMentionNotifications.map((n) => n.data.comment_uri);
285 const { data: comments } = await supabaseServerClient
286 .from("comments_on_documents")
287 .select(
288 "*, bsky_profiles(*), documents(*, documents_in_publications(publications(*)))",
289 )
290 .in("uri", commentUris);
291
292 // Extract unique DIDs from comment URIs to resolve handles
293 const commenterDids = [...new Set(commentUris.map((uri) => new AtUri(uri).host))];
294
295 // Resolve DIDs to handles in parallel
296 const didToHandleMap = new Map<string, string | null>();
297 await Promise.all(
298 commenterDids.map(async (did) => {
299 try {
300 const resolved = await idResolver.did.resolve(did);
301 const handle = resolved?.alsoKnownAs?.[0]
302 ? resolved.alsoKnownAs[0].slice(5) // Remove "at://" prefix
303 : null;
304 didToHandleMap.set(did, handle);
305 } catch (error) {
306 console.error(`Failed to resolve DID ${did}:`, error);
307 didToHandleMap.set(did, null);
308 }
309 }),
310 );
311
312 // Fetch mentioned publications and documents
313 const mentionedPublicationUris = commentMentionNotifications
314 .filter((n) => n.data.mention_type === "publication")
315 .map((n) => (n.data as Extract<ExtractNotificationType<"comment_mention">, { mention_type: "publication" }>).mentioned_uri);
316
317 const mentionedDocumentUris = commentMentionNotifications
318 .filter((n) => n.data.mention_type === "document")
319 .map((n) => (n.data as Extract<ExtractNotificationType<"comment_mention">, { mention_type: "document" }>).mentioned_uri);
320
321 const [{ data: mentionedPublications }, { data: mentionedDocuments }] = await Promise.all([
322 mentionedPublicationUris.length > 0
323 ? supabaseServerClient
324 .from("publications")
325 .select("*")
326 .in("uri", mentionedPublicationUris)
327 : Promise.resolve({ data: [] }),
328 mentionedDocumentUris.length > 0
329 ? supabaseServerClient
330 .from("documents")
331 .select("*, documents_in_publications(publications(*))")
332 .in("uri", mentionedDocumentUris)
333 : Promise.resolve({ data: [] }),
334 ]);
335
336 return commentMentionNotifications.map((notification) => {
337 const mentionedUri = notification.data.mention_type !== "did"
338 ? (notification.data as Extract<ExtractNotificationType<"comment_mention">, { mentioned_uri: string }>).mentioned_uri
339 : undefined;
340
341 const commenterDid = new AtUri(notification.data.comment_uri).host;
342 const commenterHandle = didToHandleMap.get(commenterDid) ?? null;
343 const commentData = comments?.find((c) => c.uri === notification.data.comment_uri);
344
345 return {
346 id: notification.id,
347 recipient: notification.recipient,
348 created_at: notification.created_at,
349 type: "comment_mention" as const,
350 comment_uri: notification.data.comment_uri,
351 mention_type: notification.data.mention_type,
352 mentioned_uri: mentionedUri,
353 commentData: commentData!,
354 commenterHandle,
355 mentionedPublication: mentionedUri ? mentionedPublications?.find((p) => p.uri === mentionedUri) : undefined,
356 mentionedDocument: mentionedUri ? mentionedDocuments?.find((d) => d.uri === mentionedUri) : undefined,
357 };
358 });
359}
360
361export async function pingIdentityToUpdateNotification(did: string) {
362 let channel = supabaseServerClient.channel(`identity.atp_did:${did}`);
363 await channel.send({
364 type: "broadcast",
365 event: "notification",
366 payload: { message: "poke" },
367 });
368 await supabaseServerClient.removeChannel(channel);
369}