a tool for shared writing and social publishing
1"use server";
2
3import { supabaseServerClient } from "supabase/serverClient";
4import { Tables, TablesInsert } from "supabase/database.types";
5import { AtUri } from "@atproto/syntax";
6import { idResolver } from "app/(home-pages)/reader/idResolver";
7import {
8 normalizeDocumentRecord,
9 normalizePublicationRecord,
10 type NormalizedDocument,
11 type NormalizedPublication,
12} from "src/utils/normalizeRecords";
13
14type NotificationRow = Tables<"notifications">;
15
16export type Notification = Omit<TablesInsert<"notifications">, "data"> & {
17 data: NotificationData;
18};
19
20export type NotificationData =
21 | { type: "comment"; comment_uri: string; parent_uri?: string }
22 | { type: "subscribe"; subscription_uri: string }
23 | { type: "quote"; bsky_post_uri: string; document_uri: string }
24 | { type: "mention"; document_uri: string; mention_type: "did" }
25 | { type: "mention"; document_uri: string; mention_type: "publication"; mentioned_uri: string }
26 | { type: "mention"; document_uri: string; mention_type: "document"; mentioned_uri: string }
27 | { type: "comment_mention"; comment_uri: string; mention_type: "did" }
28 | { type: "comment_mention"; comment_uri: string; mention_type: "publication"; mentioned_uri: string }
29 | { type: "comment_mention"; comment_uri: string; mention_type: "document"; mentioned_uri: string };
30
31export type HydratedNotification =
32 | HydratedCommentNotification
33 | HydratedSubscribeNotification
34 | HydratedQuoteNotification
35 | HydratedMentionNotification
36 | HydratedCommentMentionNotification;
37export async function hydrateNotifications(
38 notifications: NotificationRow[],
39): Promise<Array<HydratedNotification>> {
40 // Call all hydrators in parallel
41 const [commentNotifications, subscribeNotifications, quoteNotifications, mentionNotifications, commentMentionNotifications] = await Promise.all([
42 hydrateCommentNotifications(notifications),
43 hydrateSubscribeNotifications(notifications),
44 hydrateQuoteNotifications(notifications),
45 hydrateMentionNotifications(notifications),
46 hydrateCommentMentionNotifications(notifications),
47 ]);
48
49 // Combine all hydrated notifications
50 const allHydrated = [...commentNotifications, ...subscribeNotifications, ...quoteNotifications, ...mentionNotifications, ...commentMentionNotifications];
51
52 // Sort by created_at to maintain order
53 allHydrated.sort(
54 (a, b) =>
55 new Date(b.created_at).getTime() - new Date(a.created_at).getTime(),
56 );
57
58 return allHydrated;
59}
60
61// Type guard to extract notification type
62type ExtractNotificationType<T extends NotificationData["type"]> = Extract<
63 NotificationData,
64 { type: T }
65>;
66
67export type HydratedCommentNotification = Awaited<
68 ReturnType<typeof hydrateCommentNotifications>
69>[0];
70
71async function hydrateCommentNotifications(notifications: NotificationRow[]) {
72 const commentNotifications = notifications.filter(
73 (n): n is NotificationRow & { data: ExtractNotificationType<"comment"> } =>
74 (n.data as NotificationData)?.type === "comment",
75 );
76
77 if (commentNotifications.length === 0) {
78 return [];
79 }
80
81 // Fetch comment data from the database
82 const commentUris = commentNotifications.flatMap((n) =>
83 n.data.parent_uri
84 ? [n.data.comment_uri, n.data.parent_uri]
85 : [n.data.comment_uri],
86 );
87 const { data: comments } = await supabaseServerClient
88 .from("comments_on_documents")
89 .select(
90 "*,bsky_profiles(*), documents(*, documents_in_publications(publications(*)))",
91 )
92 .in("uri", commentUris);
93
94 return commentNotifications
95 .map((notification) => {
96 const commentData = comments?.find((c) => c.uri === notification.data.comment_uri);
97 if (!commentData) return null;
98 return {
99 id: notification.id,
100 recipient: notification.recipient,
101 created_at: notification.created_at,
102 type: "comment" as const,
103 comment_uri: notification.data.comment_uri,
104 parentData: notification.data.parent_uri
105 ? comments?.find((c) => c.uri === notification.data.parent_uri)
106 : undefined,
107 commentData,
108 normalizedDocument: normalizeDocumentRecord(commentData.documents?.data, commentData.documents?.uri),
109 normalizedPublication: normalizePublicationRecord(
110 commentData.documents?.documents_in_publications[0]?.publications?.record,
111 ),
112 };
113 })
114 .filter((n) => n !== null);
115}
116
117export type HydratedSubscribeNotification = Awaited<
118 ReturnType<typeof hydrateSubscribeNotifications>
119>[0];
120
121async function hydrateSubscribeNotifications(notifications: NotificationRow[]) {
122 const subscribeNotifications = notifications.filter(
123 (
124 n,
125 ): n is NotificationRow & { data: ExtractNotificationType<"subscribe"> } =>
126 (n.data as NotificationData)?.type === "subscribe",
127 );
128
129 if (subscribeNotifications.length === 0) {
130 return [];
131 }
132
133 // Fetch subscription data from the database with related data
134 const subscriptionUris = subscribeNotifications.map(
135 (n) => n.data.subscription_uri,
136 );
137 const { data: subscriptions } = await supabaseServerClient
138 .from("publication_subscriptions")
139 .select("*, identities(bsky_profiles(*)), publications(*)")
140 .in("uri", subscriptionUris);
141
142 return subscribeNotifications
143 .map((notification) => {
144 const subscriptionData = subscriptions?.find((s) => s.uri === notification.data.subscription_uri);
145 if (!subscriptionData) return null;
146 return {
147 id: notification.id,
148 recipient: notification.recipient,
149 created_at: notification.created_at,
150 type: "subscribe" as const,
151 subscription_uri: notification.data.subscription_uri,
152 subscriptionData,
153 normalizedPublication: normalizePublicationRecord(subscriptionData.publications?.record),
154 };
155 })
156 .filter((n) => n !== null);
157}
158
159export type HydratedQuoteNotification = Awaited<
160 ReturnType<typeof hydrateQuoteNotifications>
161>[0];
162
163async function hydrateQuoteNotifications(notifications: NotificationRow[]) {
164 const quoteNotifications = notifications.filter(
165 (n): n is NotificationRow & { data: ExtractNotificationType<"quote"> } =>
166 (n.data as NotificationData)?.type === "quote",
167 );
168
169 if (quoteNotifications.length === 0) {
170 return [];
171 }
172
173 // Fetch bsky post data and document data
174 const bskyPostUris = quoteNotifications.map((n) => n.data.bsky_post_uri);
175 const documentUris = quoteNotifications.map((n) => n.data.document_uri);
176
177 const { data: bskyPosts } = await supabaseServerClient
178 .from("bsky_posts")
179 .select("*")
180 .in("uri", bskyPostUris);
181
182 const { data: documents } = await supabaseServerClient
183 .from("documents")
184 .select("*, documents_in_publications(publications(*))")
185 .in("uri", documentUris);
186
187 return quoteNotifications
188 .map((notification) => {
189 const bskyPost = bskyPosts?.find((p) => p.uri === notification.data.bsky_post_uri);
190 const document = documents?.find((d) => d.uri === notification.data.document_uri);
191 if (!bskyPost || !document) return null;
192 return {
193 id: notification.id,
194 recipient: notification.recipient,
195 created_at: notification.created_at,
196 type: "quote" as const,
197 bsky_post_uri: notification.data.bsky_post_uri,
198 document_uri: notification.data.document_uri,
199 bskyPost,
200 document,
201 normalizedDocument: normalizeDocumentRecord(document.data, document.uri),
202 normalizedPublication: normalizePublicationRecord(
203 document.documents_in_publications[0]?.publications?.record,
204 ),
205 };
206 })
207 .filter((n) => n !== null);
208}
209
210export type HydratedMentionNotification = Awaited<
211 ReturnType<typeof hydrateMentionNotifications>
212>[0];
213
214async function hydrateMentionNotifications(notifications: NotificationRow[]) {
215 const mentionNotifications = notifications.filter(
216 (n): n is NotificationRow & { data: ExtractNotificationType<"mention"> } =>
217 (n.data as NotificationData)?.type === "mention",
218 );
219
220 if (mentionNotifications.length === 0) {
221 return [];
222 }
223
224 // Fetch document data from the database
225 const documentUris = mentionNotifications.map((n) => n.data.document_uri);
226 const { data: documents } = await supabaseServerClient
227 .from("documents")
228 .select("*, documents_in_publications(publications(*))")
229 .in("uri", documentUris);
230
231 // Extract unique DIDs from document URIs to resolve handles
232 const documentCreatorDids = [...new Set(documentUris.map((uri) => new AtUri(uri).host))];
233
234 // Resolve DIDs to handles in parallel
235 const didToHandleMap = new Map<string, string | null>();
236 await Promise.all(
237 documentCreatorDids.map(async (did) => {
238 try {
239 const resolved = await idResolver.did.resolve(did);
240 const handle = resolved?.alsoKnownAs?.[0]
241 ? resolved.alsoKnownAs[0].slice(5) // Remove "at://" prefix
242 : null;
243 didToHandleMap.set(did, handle);
244 } catch (error) {
245 console.error(`Failed to resolve DID ${did}:`, error);
246 didToHandleMap.set(did, null);
247 }
248 }),
249 );
250
251 // Fetch mentioned publications and documents
252 const mentionedPublicationUris = mentionNotifications
253 .filter((n) => n.data.mention_type === "publication")
254 .map((n) => (n.data as Extract<ExtractNotificationType<"mention">, { mention_type: "publication" }>).mentioned_uri);
255
256 const mentionedDocumentUris = mentionNotifications
257 .filter((n) => n.data.mention_type === "document")
258 .map((n) => (n.data as Extract<ExtractNotificationType<"mention">, { mention_type: "document" }>).mentioned_uri);
259
260 const [{ data: mentionedPublications }, { data: mentionedDocuments }] = await Promise.all([
261 mentionedPublicationUris.length > 0
262 ? supabaseServerClient
263 .from("publications")
264 .select("*")
265 .in("uri", mentionedPublicationUris)
266 : Promise.resolve({ data: [] }),
267 mentionedDocumentUris.length > 0
268 ? supabaseServerClient
269 .from("documents")
270 .select("*, documents_in_publications(publications(*))")
271 .in("uri", mentionedDocumentUris)
272 : Promise.resolve({ data: [] }),
273 ]);
274
275 return mentionNotifications
276 .map((notification) => {
277 const document = documents?.find((d) => d.uri === notification.data.document_uri);
278 if (!document) return null;
279
280 const mentionedUri = notification.data.mention_type !== "did"
281 ? (notification.data as Extract<ExtractNotificationType<"mention">, { mentioned_uri: string }>).mentioned_uri
282 : undefined;
283
284 const documentCreatorDid = new AtUri(notification.data.document_uri).host;
285 const documentCreatorHandle = didToHandleMap.get(documentCreatorDid) ?? null;
286
287 const mentionedPublication = mentionedUri ? mentionedPublications?.find((p) => p.uri === mentionedUri) : undefined;
288 const mentionedDoc = mentionedUri ? mentionedDocuments?.find((d) => d.uri === mentionedUri) : undefined;
289
290 return {
291 id: notification.id,
292 recipient: notification.recipient,
293 created_at: notification.created_at,
294 type: "mention" as const,
295 document_uri: notification.data.document_uri,
296 mention_type: notification.data.mention_type,
297 mentioned_uri: mentionedUri,
298 document,
299 documentCreatorHandle,
300 mentionedPublication,
301 mentionedDocument: mentionedDoc,
302 normalizedDocument: normalizeDocumentRecord(document.data, document.uri),
303 normalizedPublication: normalizePublicationRecord(
304 document.documents_in_publications[0]?.publications?.record,
305 ),
306 normalizedMentionedPublication: normalizePublicationRecord(mentionedPublication?.record),
307 normalizedMentionedDocument: normalizeDocumentRecord(mentionedDoc?.data, mentionedDoc?.uri),
308 };
309 })
310 .filter((n) => n !== null);
311}
312
313export type HydratedCommentMentionNotification = Awaited<
314 ReturnType<typeof hydrateCommentMentionNotifications>
315>[0];
316
317async function hydrateCommentMentionNotifications(notifications: NotificationRow[]) {
318 const commentMentionNotifications = notifications.filter(
319 (n): n is NotificationRow & { data: ExtractNotificationType<"comment_mention"> } =>
320 (n.data as NotificationData)?.type === "comment_mention",
321 );
322
323 if (commentMentionNotifications.length === 0) {
324 return [];
325 }
326
327 // Fetch comment data from the database
328 const commentUris = commentMentionNotifications.map((n) => n.data.comment_uri);
329 const { data: comments } = await supabaseServerClient
330 .from("comments_on_documents")
331 .select(
332 "*, bsky_profiles(*), documents(*, documents_in_publications(publications(*)))",
333 )
334 .in("uri", commentUris);
335
336 // Extract unique DIDs from comment URIs to resolve handles
337 const commenterDids = [...new Set(commentUris.map((uri) => new AtUri(uri).host))];
338
339 // Resolve DIDs to handles in parallel
340 const didToHandleMap = new Map<string, string | null>();
341 await Promise.all(
342 commenterDids.map(async (did) => {
343 try {
344 const resolved = await idResolver.did.resolve(did);
345 const handle = resolved?.alsoKnownAs?.[0]
346 ? resolved.alsoKnownAs[0].slice(5) // Remove "at://" prefix
347 : null;
348 didToHandleMap.set(did, handle);
349 } catch (error) {
350 console.error(`Failed to resolve DID ${did}:`, error);
351 didToHandleMap.set(did, null);
352 }
353 }),
354 );
355
356 // Fetch mentioned publications and documents
357 const mentionedPublicationUris = commentMentionNotifications
358 .filter((n) => n.data.mention_type === "publication")
359 .map((n) => (n.data as Extract<ExtractNotificationType<"comment_mention">, { mention_type: "publication" }>).mentioned_uri);
360
361 const mentionedDocumentUris = commentMentionNotifications
362 .filter((n) => n.data.mention_type === "document")
363 .map((n) => (n.data as Extract<ExtractNotificationType<"comment_mention">, { mention_type: "document" }>).mentioned_uri);
364
365 const [{ data: mentionedPublications }, { data: mentionedDocuments }] = await Promise.all([
366 mentionedPublicationUris.length > 0
367 ? supabaseServerClient
368 .from("publications")
369 .select("*")
370 .in("uri", mentionedPublicationUris)
371 : Promise.resolve({ data: [] }),
372 mentionedDocumentUris.length > 0
373 ? supabaseServerClient
374 .from("documents")
375 .select("*, documents_in_publications(publications(*))")
376 .in("uri", mentionedDocumentUris)
377 : Promise.resolve({ data: [] }),
378 ]);
379
380 return commentMentionNotifications
381 .map((notification) => {
382 const commentData = comments?.find((c) => c.uri === notification.data.comment_uri);
383 if (!commentData) return null;
384
385 const mentionedUri = notification.data.mention_type !== "did"
386 ? (notification.data as Extract<ExtractNotificationType<"comment_mention">, { mentioned_uri: string }>).mentioned_uri
387 : undefined;
388
389 const commenterDid = new AtUri(notification.data.comment_uri).host;
390 const commenterHandle = didToHandleMap.get(commenterDid) ?? null;
391
392 const mentionedPublication = mentionedUri ? mentionedPublications?.find((p) => p.uri === mentionedUri) : undefined;
393 const mentionedDoc = mentionedUri ? mentionedDocuments?.find((d) => d.uri === mentionedUri) : undefined;
394
395 return {
396 id: notification.id,
397 recipient: notification.recipient,
398 created_at: notification.created_at,
399 type: "comment_mention" as const,
400 comment_uri: notification.data.comment_uri,
401 mention_type: notification.data.mention_type,
402 mentioned_uri: mentionedUri,
403 commentData,
404 commenterHandle,
405 mentionedPublication,
406 mentionedDocument: mentionedDoc,
407 normalizedDocument: normalizeDocumentRecord(commentData.documents?.data, commentData.documents?.uri),
408 normalizedPublication: normalizePublicationRecord(
409 commentData.documents?.documents_in_publications[0]?.publications?.record,
410 ),
411 normalizedMentionedPublication: normalizePublicationRecord(mentionedPublication?.record),
412 normalizedMentionedDocument: normalizeDocumentRecord(mentionedDoc?.data, mentionedDoc?.uri),
413 };
414 })
415 .filter((n) => n !== null);
416}
417
418export async function pingIdentityToUpdateNotification(did: string) {
419 let channel = supabaseServerClient.channel(`identity.atp_did:${did}`);
420 await channel.send({
421 type: "broadcast",
422 event: "notification",
423 payload: { message: "poke" },
424 });
425 await supabaseServerClient.removeChannel(channel);
426}