a tool for shared writing and social publishing
1"use server";
2
3import { supabaseServerClient } from "supabase/serverClient";
4import { Tables, TablesInsert } from "supabase/database.types";
5import { AtUri } from "@atproto/syntax";
6import { idResolver } from "app/(home-pages)/reader/idResolver";
7import {
8 normalizeDocumentRecord,
9 normalizePublicationRecord,
10 type NormalizedDocument,
11 type NormalizedPublication,
12} from "src/utils/normalizeRecords";
13
14type NotificationRow = Tables<"notifications">;
15
16export type Notification = Omit<TablesInsert<"notifications">, "data"> & {
17 data: NotificationData;
18};
19
20export type NotificationData =
21 | { type: "comment"; comment_uri: string; parent_uri?: string }
22 | { type: "subscribe"; subscription_uri: string }
23 | { type: "quote"; bsky_post_uri: string; document_uri: string }
24 | { type: "bsky_post_embed"; document_uri: string; bsky_post_uri: string }
25 | { type: "mention"; document_uri: string; mention_type: "did" }
26 | { type: "mention"; document_uri: string; mention_type: "publication"; mentioned_uri: string }
27 | { type: "mention"; document_uri: string; mention_type: "document"; mentioned_uri: string }
28 | { type: "comment_mention"; comment_uri: string; mention_type: "did" }
29 | { type: "comment_mention"; comment_uri: string; mention_type: "publication"; mentioned_uri: string }
30 | { type: "comment_mention"; comment_uri: string; mention_type: "document"; mentioned_uri: string }
31 | { type: "recommend"; document_uri: string; recommend_uri: string };
32
33export type HydratedNotification =
34 | HydratedCommentNotification
35 | HydratedSubscribeNotification
36 | HydratedQuoteNotification
37 | HydratedBskyPostEmbedNotification
38 | HydratedMentionNotification
39 | HydratedCommentMentionNotification
40 | HydratedRecommendNotification;
41export async function hydrateNotifications(
42 notifications: NotificationRow[],
43): Promise<Array<HydratedNotification>> {
44 // Call all hydrators in parallel
45 const [commentNotifications, subscribeNotifications, quoteNotifications, bskyPostEmbedNotifications, mentionNotifications, commentMentionNotifications, recommendNotifications] = await Promise.all([
46 hydrateCommentNotifications(notifications),
47 hydrateSubscribeNotifications(notifications),
48 hydrateQuoteNotifications(notifications),
49 hydrateBskyPostEmbedNotifications(notifications),
50 hydrateMentionNotifications(notifications),
51 hydrateCommentMentionNotifications(notifications),
52 hydrateRecommendNotifications(notifications),
53 ]);
54
55 // Combine all hydrated notifications
56 const allHydrated = [...commentNotifications, ...subscribeNotifications, ...quoteNotifications, ...bskyPostEmbedNotifications, ...mentionNotifications, ...commentMentionNotifications, ...recommendNotifications];
57
58 // Sort by created_at to maintain order
59 allHydrated.sort(
60 (a, b) =>
61 new Date(b.created_at).getTime() - new Date(a.created_at).getTime(),
62 );
63
64 return allHydrated;
65}
66
67// Type guard to extract notification type
68type ExtractNotificationType<T extends NotificationData["type"]> = Extract<
69 NotificationData,
70 { type: T }
71>;
72
73export type HydratedCommentNotification = Awaited<
74 ReturnType<typeof hydrateCommentNotifications>
75>[0];
76
77async function hydrateCommentNotifications(notifications: NotificationRow[]) {
78 const commentNotifications = notifications.filter(
79 (n): n is NotificationRow & { data: ExtractNotificationType<"comment"> } =>
80 (n.data as NotificationData)?.type === "comment",
81 );
82
83 if (commentNotifications.length === 0) {
84 return [];
85 }
86
87 // Fetch comment data from the database
88 const commentUris = commentNotifications.flatMap((n) =>
89 n.data.parent_uri
90 ? [n.data.comment_uri, n.data.parent_uri]
91 : [n.data.comment_uri],
92 );
93 const { data: comments } = await supabaseServerClient
94 .from("comments_on_documents")
95 .select(
96 "*,bsky_profiles(*), documents(*, documents_in_publications(publications(*)))",
97 )
98 .in("uri", commentUris);
99
100 return commentNotifications
101 .map((notification) => {
102 const commentData = comments?.find((c) => c.uri === notification.data.comment_uri);
103 if (!commentData) return null;
104 return {
105 id: notification.id,
106 recipient: notification.recipient,
107 created_at: notification.created_at,
108 type: "comment" as const,
109 comment_uri: notification.data.comment_uri,
110 parentData: notification.data.parent_uri
111 ? comments?.find((c) => c.uri === notification.data.parent_uri)
112 : undefined,
113 commentData,
114 normalizedDocument: normalizeDocumentRecord(commentData.documents?.data, commentData.documents?.uri),
115 normalizedPublication: normalizePublicationRecord(
116 commentData.documents?.documents_in_publications[0]?.publications?.record,
117 ),
118 };
119 })
120 .filter((n) => n !== null);
121}
122
123export type HydratedSubscribeNotification = Awaited<
124 ReturnType<typeof hydrateSubscribeNotifications>
125>[0];
126
127async function hydrateSubscribeNotifications(notifications: NotificationRow[]) {
128 const subscribeNotifications = notifications.filter(
129 (
130 n,
131 ): n is NotificationRow & { data: ExtractNotificationType<"subscribe"> } =>
132 (n.data as NotificationData)?.type === "subscribe",
133 );
134
135 if (subscribeNotifications.length === 0) {
136 return [];
137 }
138
139 // Fetch subscription data from the database with related data
140 const subscriptionUris = subscribeNotifications.map(
141 (n) => n.data.subscription_uri,
142 );
143 const { data: subscriptions } = await supabaseServerClient
144 .from("publication_subscriptions")
145 .select("*, identities(bsky_profiles(*)), publications(*)")
146 .in("uri", subscriptionUris);
147
148 return subscribeNotifications
149 .map((notification) => {
150 const subscriptionData = subscriptions?.find((s) => s.uri === notification.data.subscription_uri);
151 if (!subscriptionData) return null;
152 return {
153 id: notification.id,
154 recipient: notification.recipient,
155 created_at: notification.created_at,
156 type: "subscribe" as const,
157 subscription_uri: notification.data.subscription_uri,
158 subscriptionData,
159 normalizedPublication: normalizePublicationRecord(subscriptionData.publications?.record),
160 };
161 })
162 .filter((n) => n !== null);
163}
164
165export type HydratedQuoteNotification = Awaited<
166 ReturnType<typeof hydrateQuoteNotifications>
167>[0];
168
169async function hydrateQuoteNotifications(notifications: NotificationRow[]) {
170 const quoteNotifications = notifications.filter(
171 (n): n is NotificationRow & { data: ExtractNotificationType<"quote"> } =>
172 (n.data as NotificationData)?.type === "quote",
173 );
174
175 if (quoteNotifications.length === 0) {
176 return [];
177 }
178
179 // Fetch bsky post data and document data
180 const bskyPostUris = quoteNotifications.map((n) => n.data.bsky_post_uri);
181 const documentUris = quoteNotifications.map((n) => n.data.document_uri);
182
183 const { data: bskyPosts } = await supabaseServerClient
184 .from("bsky_posts")
185 .select("*")
186 .in("uri", bskyPostUris);
187
188 const { data: documents } = await supabaseServerClient
189 .from("documents")
190 .select("*, documents_in_publications(publications(*))")
191 .in("uri", documentUris);
192
193 return quoteNotifications
194 .map((notification) => {
195 const bskyPost = bskyPosts?.find((p) => p.uri === notification.data.bsky_post_uri);
196 const document = documents?.find((d) => d.uri === notification.data.document_uri);
197 if (!bskyPost || !document) return null;
198 return {
199 id: notification.id,
200 recipient: notification.recipient,
201 created_at: notification.created_at,
202 type: "quote" as const,
203 bsky_post_uri: notification.data.bsky_post_uri,
204 document_uri: notification.data.document_uri,
205 bskyPost,
206 document,
207 normalizedDocument: normalizeDocumentRecord(document.data, document.uri),
208 normalizedPublication: normalizePublicationRecord(
209 document.documents_in_publications[0]?.publications?.record,
210 ),
211 };
212 })
213 .filter((n) => n !== null);
214}
215
216export type HydratedBskyPostEmbedNotification = Awaited<
217 ReturnType<typeof hydrateBskyPostEmbedNotifications>
218>[0];
219
220async function hydrateBskyPostEmbedNotifications(notifications: NotificationRow[]) {
221 const bskyPostEmbedNotifications = notifications.filter(
222 (n): n is NotificationRow & { data: ExtractNotificationType<"bsky_post_embed"> } =>
223 (n.data as NotificationData)?.type === "bsky_post_embed",
224 );
225
226 if (bskyPostEmbedNotifications.length === 0) {
227 return [];
228 }
229
230 // Fetch document data (the leaflet that embedded the post)
231 const documentUris = bskyPostEmbedNotifications.map((n) => n.data.document_uri);
232 const bskyPostUris = bskyPostEmbedNotifications.map((n) => n.data.bsky_post_uri);
233
234 const [{ data: documents }, { data: cachedBskyPosts }] = await Promise.all([
235 supabaseServerClient
236 .from("documents")
237 .select("*, documents_in_publications(publications(*))")
238 .in("uri", documentUris),
239 supabaseServerClient
240 .from("bsky_posts")
241 .select("*")
242 .in("uri", bskyPostUris),
243 ]);
244
245 // Find which posts we need to fetch from the API
246 const cachedPostUris = new Set(cachedBskyPosts?.map((p) => p.uri) ?? []);
247 const missingPostUris = bskyPostUris.filter((uri) => !cachedPostUris.has(uri));
248
249 // Fetch missing posts from Bluesky API
250 const fetchedPosts = new Map<string, { text: string } | null>();
251 if (missingPostUris.length > 0) {
252 try {
253 const { AtpAgent } = await import("@atproto/api");
254 const agent = new AtpAgent({ service: "https://public.api.bsky.app" });
255 const response = await agent.app.bsky.feed.getPosts({ uris: missingPostUris });
256 for (const post of response.data.posts) {
257 const record = post.record as { text?: string };
258 fetchedPosts.set(post.uri, { text: record.text ?? "" });
259 }
260 } catch (error) {
261 console.error("Failed to fetch Bluesky posts:", error);
262 }
263 }
264
265 // Extract unique DIDs from document URIs to resolve handles
266 const documentCreatorDids = [...new Set(documentUris.map((uri) => new AtUri(uri).host))];
267
268 // Resolve DIDs to handles in parallel
269 const didToHandleMap = new Map<string, string | null>();
270 await Promise.all(
271 documentCreatorDids.map(async (did) => {
272 try {
273 const resolved = await idResolver.did.resolve(did);
274 const handle = resolved?.alsoKnownAs?.[0]
275 ? resolved.alsoKnownAs[0].slice(5) // Remove "at://" prefix
276 : null;
277 didToHandleMap.set(did, handle);
278 } catch (error) {
279 console.error(`Failed to resolve DID ${did}:`, error);
280 didToHandleMap.set(did, null);
281 }
282 }),
283 );
284
285 return bskyPostEmbedNotifications
286 .map((notification) => {
287 const document = documents?.find((d) => d.uri === notification.data.document_uri);
288 if (!document) return null;
289
290 const documentCreatorDid = new AtUri(notification.data.document_uri).host;
291 const documentCreatorHandle = didToHandleMap.get(documentCreatorDid) ?? null;
292
293 // Get post text from cache or fetched data
294 const cachedPost = cachedBskyPosts?.find((p) => p.uri === notification.data.bsky_post_uri);
295 const postView = cachedPost?.post_view as { record?: { text?: string } } | undefined;
296 const bskyPostText = postView?.record?.text ?? fetchedPosts.get(notification.data.bsky_post_uri)?.text ?? null;
297
298 return {
299 id: notification.id,
300 recipient: notification.recipient,
301 created_at: notification.created_at,
302 type: "bsky_post_embed" as const,
303 document_uri: notification.data.document_uri,
304 bsky_post_uri: notification.data.bsky_post_uri,
305 document,
306 documentCreatorHandle,
307 bskyPostText,
308 normalizedDocument: normalizeDocumentRecord(document.data, document.uri),
309 normalizedPublication: normalizePublicationRecord(
310 document.documents_in_publications[0]?.publications?.record,
311 ),
312 };
313 })
314 .filter((n) => n !== null);
315}
316
317export type HydratedMentionNotification = Awaited<
318 ReturnType<typeof hydrateMentionNotifications>
319>[0];
320
321async function hydrateMentionNotifications(notifications: NotificationRow[]) {
322 const mentionNotifications = notifications.filter(
323 (n): n is NotificationRow & { data: ExtractNotificationType<"mention"> } =>
324 (n.data as NotificationData)?.type === "mention",
325 );
326
327 if (mentionNotifications.length === 0) {
328 return [];
329 }
330
331 // Fetch document data from the database
332 const documentUris = mentionNotifications.map((n) => n.data.document_uri);
333 const { data: documents } = await supabaseServerClient
334 .from("documents")
335 .select("*, documents_in_publications(publications(*))")
336 .in("uri", documentUris);
337
338 // Extract unique DIDs from document URIs to resolve handles
339 const documentCreatorDids = [...new Set(documentUris.map((uri) => new AtUri(uri).host))];
340
341 // Resolve DIDs to handles in parallel
342 const didToHandleMap = new Map<string, string | null>();
343 await Promise.all(
344 documentCreatorDids.map(async (did) => {
345 try {
346 const resolved = await idResolver.did.resolve(did);
347 const handle = resolved?.alsoKnownAs?.[0]
348 ? resolved.alsoKnownAs[0].slice(5) // Remove "at://" prefix
349 : null;
350 didToHandleMap.set(did, handle);
351 } catch (error) {
352 console.error(`Failed to resolve DID ${did}:`, error);
353 didToHandleMap.set(did, null);
354 }
355 }),
356 );
357
358 // Fetch mentioned publications and documents
359 const mentionedPublicationUris = mentionNotifications
360 .filter((n) => n.data.mention_type === "publication")
361 .map((n) => (n.data as Extract<ExtractNotificationType<"mention">, { mention_type: "publication" }>).mentioned_uri);
362
363 const mentionedDocumentUris = mentionNotifications
364 .filter((n) => n.data.mention_type === "document")
365 .map((n) => (n.data as Extract<ExtractNotificationType<"mention">, { mention_type: "document" }>).mentioned_uri);
366
367 const [{ data: mentionedPublications }, { data: mentionedDocuments }] = await Promise.all([
368 mentionedPublicationUris.length > 0
369 ? supabaseServerClient
370 .from("publications")
371 .select("*")
372 .in("uri", mentionedPublicationUris)
373 : Promise.resolve({ data: [] }),
374 mentionedDocumentUris.length > 0
375 ? supabaseServerClient
376 .from("documents")
377 .select("*, documents_in_publications(publications(*))")
378 .in("uri", mentionedDocumentUris)
379 : Promise.resolve({ data: [] }),
380 ]);
381
382 return mentionNotifications
383 .map((notification) => {
384 const document = documents?.find((d) => d.uri === notification.data.document_uri);
385 if (!document) return null;
386
387 const mentionedUri = notification.data.mention_type !== "did"
388 ? (notification.data as Extract<ExtractNotificationType<"mention">, { mentioned_uri: string }>).mentioned_uri
389 : undefined;
390
391 const documentCreatorDid = new AtUri(notification.data.document_uri).host;
392 const documentCreatorHandle = didToHandleMap.get(documentCreatorDid) ?? null;
393
394 const mentionedPublication = mentionedUri ? mentionedPublications?.find((p) => p.uri === mentionedUri) : undefined;
395 const mentionedDoc = mentionedUri ? mentionedDocuments?.find((d) => d.uri === mentionedUri) : undefined;
396
397 return {
398 id: notification.id,
399 recipient: notification.recipient,
400 created_at: notification.created_at,
401 type: "mention" as const,
402 document_uri: notification.data.document_uri,
403 mention_type: notification.data.mention_type,
404 mentioned_uri: mentionedUri,
405 document,
406 documentCreatorHandle,
407 mentionedPublication,
408 mentionedDocument: mentionedDoc,
409 normalizedDocument: normalizeDocumentRecord(document.data, document.uri),
410 normalizedPublication: normalizePublicationRecord(
411 document.documents_in_publications[0]?.publications?.record,
412 ),
413 normalizedMentionedPublication: normalizePublicationRecord(mentionedPublication?.record),
414 normalizedMentionedDocument: normalizeDocumentRecord(mentionedDoc?.data, mentionedDoc?.uri),
415 };
416 })
417 .filter((n) => n !== null);
418}
419
420export type HydratedCommentMentionNotification = Awaited<
421 ReturnType<typeof hydrateCommentMentionNotifications>
422>[0];
423
424async function hydrateCommentMentionNotifications(notifications: NotificationRow[]) {
425 const commentMentionNotifications = notifications.filter(
426 (n): n is NotificationRow & { data: ExtractNotificationType<"comment_mention"> } =>
427 (n.data as NotificationData)?.type === "comment_mention",
428 );
429
430 if (commentMentionNotifications.length === 0) {
431 return [];
432 }
433
434 // Fetch comment data from the database
435 const commentUris = commentMentionNotifications.map((n) => n.data.comment_uri);
436 const { data: comments } = await supabaseServerClient
437 .from("comments_on_documents")
438 .select(
439 "*, bsky_profiles(*), documents(*, documents_in_publications(publications(*)))",
440 )
441 .in("uri", commentUris);
442
443 // Extract unique DIDs from comment URIs to resolve handles
444 const commenterDids = [...new Set(commentUris.map((uri) => new AtUri(uri).host))];
445
446 // Resolve DIDs to handles in parallel
447 const didToHandleMap = new Map<string, string | null>();
448 await Promise.all(
449 commenterDids.map(async (did) => {
450 try {
451 const resolved = await idResolver.did.resolve(did);
452 const handle = resolved?.alsoKnownAs?.[0]
453 ? resolved.alsoKnownAs[0].slice(5) // Remove "at://" prefix
454 : null;
455 didToHandleMap.set(did, handle);
456 } catch (error) {
457 console.error(`Failed to resolve DID ${did}:`, error);
458 didToHandleMap.set(did, null);
459 }
460 }),
461 );
462
463 // Fetch mentioned publications and documents
464 const mentionedPublicationUris = commentMentionNotifications
465 .filter((n) => n.data.mention_type === "publication")
466 .map((n) => (n.data as Extract<ExtractNotificationType<"comment_mention">, { mention_type: "publication" }>).mentioned_uri);
467
468 const mentionedDocumentUris = commentMentionNotifications
469 .filter((n) => n.data.mention_type === "document")
470 .map((n) => (n.data as Extract<ExtractNotificationType<"comment_mention">, { mention_type: "document" }>).mentioned_uri);
471
472 const [{ data: mentionedPublications }, { data: mentionedDocuments }] = await Promise.all([
473 mentionedPublicationUris.length > 0
474 ? supabaseServerClient
475 .from("publications")
476 .select("*")
477 .in("uri", mentionedPublicationUris)
478 : Promise.resolve({ data: [] }),
479 mentionedDocumentUris.length > 0
480 ? supabaseServerClient
481 .from("documents")
482 .select("*, documents_in_publications(publications(*))")
483 .in("uri", mentionedDocumentUris)
484 : Promise.resolve({ data: [] }),
485 ]);
486
487 return commentMentionNotifications
488 .map((notification) => {
489 const commentData = comments?.find((c) => c.uri === notification.data.comment_uri);
490 if (!commentData) return null;
491
492 const mentionedUri = notification.data.mention_type !== "did"
493 ? (notification.data as Extract<ExtractNotificationType<"comment_mention">, { mentioned_uri: string }>).mentioned_uri
494 : undefined;
495
496 const commenterDid = new AtUri(notification.data.comment_uri).host;
497 const commenterHandle = didToHandleMap.get(commenterDid) ?? null;
498
499 const mentionedPublication = mentionedUri ? mentionedPublications?.find((p) => p.uri === mentionedUri) : undefined;
500 const mentionedDoc = mentionedUri ? mentionedDocuments?.find((d) => d.uri === mentionedUri) : undefined;
501
502 return {
503 id: notification.id,
504 recipient: notification.recipient,
505 created_at: notification.created_at,
506 type: "comment_mention" as const,
507 comment_uri: notification.data.comment_uri,
508 mention_type: notification.data.mention_type,
509 mentioned_uri: mentionedUri,
510 commentData,
511 commenterHandle,
512 mentionedPublication,
513 mentionedDocument: mentionedDoc,
514 normalizedDocument: normalizeDocumentRecord(commentData.documents?.data, commentData.documents?.uri),
515 normalizedPublication: normalizePublicationRecord(
516 commentData.documents?.documents_in_publications[0]?.publications?.record,
517 ),
518 normalizedMentionedPublication: normalizePublicationRecord(mentionedPublication?.record),
519 normalizedMentionedDocument: normalizeDocumentRecord(mentionedDoc?.data, mentionedDoc?.uri),
520 };
521 })
522 .filter((n) => n !== null);
523}
524
525export type HydratedRecommendNotification = Awaited<
526 ReturnType<typeof hydrateRecommendNotifications>
527>[0];
528
529async function hydrateRecommendNotifications(notifications: NotificationRow[]) {
530 const recommendNotifications = notifications.filter(
531 (n): n is NotificationRow & { data: ExtractNotificationType<"recommend"> } =>
532 (n.data as NotificationData)?.type === "recommend",
533 );
534
535 if (recommendNotifications.length === 0) {
536 return [];
537 }
538
539 // Fetch recommend data from the database
540 const recommendUris = recommendNotifications.map((n) => n.data.recommend_uri);
541 const documentUris = recommendNotifications.map((n) => n.data.document_uri);
542
543 const [{ data: recommends }, { data: documents }] = await Promise.all([
544 supabaseServerClient
545 .from("recommends_on_documents")
546 .select("*, identities(bsky_profiles(*))")
547 .in("uri", recommendUris),
548 supabaseServerClient
549 .from("documents")
550 .select("*, documents_in_publications(publications(*))")
551 .in("uri", documentUris),
552 ]);
553
554 return recommendNotifications
555 .map((notification) => {
556 const recommendData = recommends?.find((r) => r.uri === notification.data.recommend_uri);
557 const document = documents?.find((d) => d.uri === notification.data.document_uri);
558 if (!recommendData || !document) return null;
559 return {
560 id: notification.id,
561 recipient: notification.recipient,
562 created_at: notification.created_at,
563 type: "recommend" as const,
564 recommend_uri: notification.data.recommend_uri,
565 document_uri: notification.data.document_uri,
566 recommendData,
567 document,
568 normalizedDocument: normalizeDocumentRecord(document.data, document.uri),
569 normalizedPublication: normalizePublicationRecord(
570 document.documents_in_publications[0]?.publications?.record,
571 ),
572 };
573 })
574 .filter((n) => n !== null);
575}
576
577export async function pingIdentityToUpdateNotification(did: string) {
578 let channel = supabaseServerClient.channel(`identity.atp_did:${did}`);
579 await channel.send({
580 type: "broadcast",
581 event: "notification",
582 payload: { message: "poke" },
583 });
584 await supabaseServerClient.removeChannel(channel);
585}