a tool for shared writing and social publishing
at feature/at-mentions 369 lines 14 kB view raw
1"use server"; 2 3import { supabaseServerClient } from "supabase/serverClient"; 4import { Tables, TablesInsert } from "supabase/database.types"; 5import { AtUri } from "@atproto/syntax"; 6import { idResolver } from "app/(home-pages)/reader/idResolver"; 7 8type NotificationRow = Tables<"notifications">; 9 10export type Notification = Omit<TablesInsert<"notifications">, "data"> & { 11 data: NotificationData; 12}; 13 14export type NotificationData = 15 | { type: "comment"; comment_uri: string; parent_uri?: string } 16 | { type: "subscribe"; subscription_uri: string } 17 | { type: "quote"; bsky_post_uri: string; document_uri: string } 18 | { type: "mention"; document_uri: string; mention_type: "did" } 19 | { type: "mention"; document_uri: string; mention_type: "publication"; mentioned_uri: string } 20 | { type: "mention"; document_uri: string; mention_type: "document"; mentioned_uri: string } 21 | { type: "comment_mention"; comment_uri: string; mention_type: "did" } 22 | { type: "comment_mention"; comment_uri: string; mention_type: "publication"; mentioned_uri: string } 23 | { type: "comment_mention"; comment_uri: string; mention_type: "document"; mentioned_uri: string }; 24 25export type HydratedNotification = 26 | HydratedCommentNotification 27 | HydratedSubscribeNotification 28 | HydratedQuoteNotification 29 | HydratedMentionNotification 30 | HydratedCommentMentionNotification; 31export async function hydrateNotifications( 32 notifications: NotificationRow[], 33): Promise<Array<HydratedNotification>> { 34 // Call all hydrators in parallel 35 const [commentNotifications, subscribeNotifications, quoteNotifications, mentionNotifications, commentMentionNotifications] = await Promise.all([ 36 hydrateCommentNotifications(notifications), 37 hydrateSubscribeNotifications(notifications), 38 hydrateQuoteNotifications(notifications), 39 hydrateMentionNotifications(notifications), 40 hydrateCommentMentionNotifications(notifications), 41 ]); 42 43 // Combine all hydrated notifications 44 const allHydrated = [...commentNotifications, ...subscribeNotifications, ...quoteNotifications, ...mentionNotifications, ...commentMentionNotifications]; 45 46 // Sort by created_at to maintain order 47 allHydrated.sort( 48 (a, b) => 49 new Date(b.created_at).getTime() - new Date(a.created_at).getTime(), 50 ); 51 52 return allHydrated; 53} 54 55// Type guard to extract notification type 56type ExtractNotificationType<T extends NotificationData["type"]> = Extract< 57 NotificationData, 58 { type: T } 59>; 60 61export type HydratedCommentNotification = Awaited< 62 ReturnType<typeof hydrateCommentNotifications> 63>[0]; 64 65async function hydrateCommentNotifications(notifications: NotificationRow[]) { 66 const commentNotifications = notifications.filter( 67 (n): n is NotificationRow & { data: ExtractNotificationType<"comment"> } => 68 (n.data as NotificationData)?.type === "comment", 69 ); 70 71 if (commentNotifications.length === 0) { 72 return []; 73 } 74 75 // Fetch comment data from the database 76 const commentUris = commentNotifications.flatMap((n) => 77 n.data.parent_uri 78 ? [n.data.comment_uri, n.data.parent_uri] 79 : [n.data.comment_uri], 80 ); 81 const { data: comments } = await supabaseServerClient 82 .from("comments_on_documents") 83 .select( 84 "*,bsky_profiles(*), documents(*, documents_in_publications(publications(*)))", 85 ) 86 .in("uri", commentUris); 87 88 return commentNotifications.map((notification) => ({ 89 id: notification.id, 90 recipient: notification.recipient, 91 created_at: notification.created_at, 92 type: "comment" as const, 93 comment_uri: notification.data.comment_uri, 94 parentData: notification.data.parent_uri 95 ? comments?.find((c) => c.uri === notification.data.parent_uri)! 96 : undefined, 97 commentData: comments?.find( 98 (c) => c.uri === notification.data.comment_uri, 99 )!, 100 })); 101} 102 103export type HydratedSubscribeNotification = Awaited< 104 ReturnType<typeof hydrateSubscribeNotifications> 105>[0]; 106 107async function hydrateSubscribeNotifications(notifications: NotificationRow[]) { 108 const subscribeNotifications = notifications.filter( 109 ( 110 n, 111 ): n is NotificationRow & { data: ExtractNotificationType<"subscribe"> } => 112 (n.data as NotificationData)?.type === "subscribe", 113 ); 114 115 if (subscribeNotifications.length === 0) { 116 return []; 117 } 118 119 // Fetch subscription data from the database with related data 120 const subscriptionUris = subscribeNotifications.map( 121 (n) => n.data.subscription_uri, 122 ); 123 const { data: subscriptions } = await supabaseServerClient 124 .from("publication_subscriptions") 125 .select("*, identities(bsky_profiles(*)), publications(*)") 126 .in("uri", subscriptionUris); 127 128 return subscribeNotifications.map((notification) => ({ 129 id: notification.id, 130 recipient: notification.recipient, 131 created_at: notification.created_at, 132 type: "subscribe" as const, 133 subscription_uri: notification.data.subscription_uri, 134 subscriptionData: subscriptions?.find( 135 (s) => s.uri === notification.data.subscription_uri, 136 )!, 137 })); 138} 139 140export type HydratedQuoteNotification = Awaited< 141 ReturnType<typeof hydrateQuoteNotifications> 142>[0]; 143 144async function hydrateQuoteNotifications(notifications: NotificationRow[]) { 145 const quoteNotifications = notifications.filter( 146 (n): n is NotificationRow & { data: ExtractNotificationType<"quote"> } => 147 (n.data as NotificationData)?.type === "quote", 148 ); 149 150 if (quoteNotifications.length === 0) { 151 return []; 152 } 153 154 // Fetch bsky post data and document data 155 const bskyPostUris = quoteNotifications.map((n) => n.data.bsky_post_uri); 156 const documentUris = quoteNotifications.map((n) => n.data.document_uri); 157 158 const { data: bskyPosts } = await supabaseServerClient 159 .from("bsky_posts") 160 .select("*") 161 .in("uri", bskyPostUris); 162 163 const { data: documents } = await supabaseServerClient 164 .from("documents") 165 .select("*, documents_in_publications(publications(*))") 166 .in("uri", documentUris); 167 168 return quoteNotifications.map((notification) => ({ 169 id: notification.id, 170 recipient: notification.recipient, 171 created_at: notification.created_at, 172 type: "quote" as const, 173 bsky_post_uri: notification.data.bsky_post_uri, 174 document_uri: notification.data.document_uri, 175 bskyPost: bskyPosts?.find((p) => p.uri === notification.data.bsky_post_uri)!, 176 document: documents?.find((d) => d.uri === notification.data.document_uri)!, 177 })); 178} 179 180export type HydratedMentionNotification = Awaited< 181 ReturnType<typeof hydrateMentionNotifications> 182>[0]; 183 184async function hydrateMentionNotifications(notifications: NotificationRow[]) { 185 const mentionNotifications = notifications.filter( 186 (n): n is NotificationRow & { data: ExtractNotificationType<"mention"> } => 187 (n.data as NotificationData)?.type === "mention", 188 ); 189 190 if (mentionNotifications.length === 0) { 191 return []; 192 } 193 194 // Fetch document data from the database 195 const documentUris = mentionNotifications.map((n) => n.data.document_uri); 196 const { data: documents } = await supabaseServerClient 197 .from("documents") 198 .select("*, documents_in_publications(publications(*))") 199 .in("uri", documentUris); 200 201 // Extract unique DIDs from document URIs to resolve handles 202 const documentCreatorDids = [...new Set(documentUris.map((uri) => new AtUri(uri).host))]; 203 204 // Resolve DIDs to handles in parallel 205 const didToHandleMap = new Map<string, string | null>(); 206 await Promise.all( 207 documentCreatorDids.map(async (did) => { 208 try { 209 const resolved = await idResolver.did.resolve(did); 210 const handle = resolved?.alsoKnownAs?.[0] 211 ? resolved.alsoKnownAs[0].slice(5) // Remove "at://" prefix 212 : null; 213 didToHandleMap.set(did, handle); 214 } catch (error) { 215 console.error(`Failed to resolve DID ${did}:`, error); 216 didToHandleMap.set(did, null); 217 } 218 }), 219 ); 220 221 // Fetch mentioned publications and documents 222 const mentionedPublicationUris = mentionNotifications 223 .filter((n) => n.data.mention_type === "publication") 224 .map((n) => (n.data as Extract<ExtractNotificationType<"mention">, { mention_type: "publication" }>).mentioned_uri); 225 226 const mentionedDocumentUris = mentionNotifications 227 .filter((n) => n.data.mention_type === "document") 228 .map((n) => (n.data as Extract<ExtractNotificationType<"mention">, { mention_type: "document" }>).mentioned_uri); 229 230 const [{ data: mentionedPublications }, { data: mentionedDocuments }] = await Promise.all([ 231 mentionedPublicationUris.length > 0 232 ? supabaseServerClient 233 .from("publications") 234 .select("*") 235 .in("uri", mentionedPublicationUris) 236 : Promise.resolve({ data: [] }), 237 mentionedDocumentUris.length > 0 238 ? supabaseServerClient 239 .from("documents") 240 .select("*, documents_in_publications(publications(*))") 241 .in("uri", mentionedDocumentUris) 242 : Promise.resolve({ data: [] }), 243 ]); 244 245 return mentionNotifications.map((notification) => { 246 const mentionedUri = notification.data.mention_type !== "did" 247 ? (notification.data as Extract<ExtractNotificationType<"mention">, { mentioned_uri: string }>).mentioned_uri 248 : undefined; 249 250 const documentCreatorDid = new AtUri(notification.data.document_uri).host; 251 const documentCreatorHandle = didToHandleMap.get(documentCreatorDid) ?? null; 252 253 return { 254 id: notification.id, 255 recipient: notification.recipient, 256 created_at: notification.created_at, 257 type: "mention" as const, 258 document_uri: notification.data.document_uri, 259 mention_type: notification.data.mention_type, 260 mentioned_uri: mentionedUri, 261 document: documents?.find((d) => d.uri === notification.data.document_uri)!, 262 documentCreatorHandle, 263 mentionedPublication: mentionedUri ? mentionedPublications?.find((p) => p.uri === mentionedUri) : undefined, 264 mentionedDocument: mentionedUri ? mentionedDocuments?.find((d) => d.uri === mentionedUri) : undefined, 265 }; 266 }); 267} 268 269export type HydratedCommentMentionNotification = Awaited< 270 ReturnType<typeof hydrateCommentMentionNotifications> 271>[0]; 272 273async function hydrateCommentMentionNotifications(notifications: NotificationRow[]) { 274 const commentMentionNotifications = notifications.filter( 275 (n): n is NotificationRow & { data: ExtractNotificationType<"comment_mention"> } => 276 (n.data as NotificationData)?.type === "comment_mention", 277 ); 278 279 if (commentMentionNotifications.length === 0) { 280 return []; 281 } 282 283 // Fetch comment data from the database 284 const commentUris = commentMentionNotifications.map((n) => n.data.comment_uri); 285 const { data: comments } = await supabaseServerClient 286 .from("comments_on_documents") 287 .select( 288 "*, bsky_profiles(*), documents(*, documents_in_publications(publications(*)))", 289 ) 290 .in("uri", commentUris); 291 292 // Extract unique DIDs from comment URIs to resolve handles 293 const commenterDids = [...new Set(commentUris.map((uri) => new AtUri(uri).host))]; 294 295 // Resolve DIDs to handles in parallel 296 const didToHandleMap = new Map<string, string | null>(); 297 await Promise.all( 298 commenterDids.map(async (did) => { 299 try { 300 const resolved = await idResolver.did.resolve(did); 301 const handle = resolved?.alsoKnownAs?.[0] 302 ? resolved.alsoKnownAs[0].slice(5) // Remove "at://" prefix 303 : null; 304 didToHandleMap.set(did, handle); 305 } catch (error) { 306 console.error(`Failed to resolve DID ${did}:`, error); 307 didToHandleMap.set(did, null); 308 } 309 }), 310 ); 311 312 // Fetch mentioned publications and documents 313 const mentionedPublicationUris = commentMentionNotifications 314 .filter((n) => n.data.mention_type === "publication") 315 .map((n) => (n.data as Extract<ExtractNotificationType<"comment_mention">, { mention_type: "publication" }>).mentioned_uri); 316 317 const mentionedDocumentUris = commentMentionNotifications 318 .filter((n) => n.data.mention_type === "document") 319 .map((n) => (n.data as Extract<ExtractNotificationType<"comment_mention">, { mention_type: "document" }>).mentioned_uri); 320 321 const [{ data: mentionedPublications }, { data: mentionedDocuments }] = await Promise.all([ 322 mentionedPublicationUris.length > 0 323 ? supabaseServerClient 324 .from("publications") 325 .select("*") 326 .in("uri", mentionedPublicationUris) 327 : Promise.resolve({ data: [] }), 328 mentionedDocumentUris.length > 0 329 ? supabaseServerClient 330 .from("documents") 331 .select("*, documents_in_publications(publications(*))") 332 .in("uri", mentionedDocumentUris) 333 : Promise.resolve({ data: [] }), 334 ]); 335 336 return commentMentionNotifications.map((notification) => { 337 const mentionedUri = notification.data.mention_type !== "did" 338 ? (notification.data as Extract<ExtractNotificationType<"comment_mention">, { mentioned_uri: string }>).mentioned_uri 339 : undefined; 340 341 const commenterDid = new AtUri(notification.data.comment_uri).host; 342 const commenterHandle = didToHandleMap.get(commenterDid) ?? null; 343 const commentData = comments?.find((c) => c.uri === notification.data.comment_uri); 344 345 return { 346 id: notification.id, 347 recipient: notification.recipient, 348 created_at: notification.created_at, 349 type: "comment_mention" as const, 350 comment_uri: notification.data.comment_uri, 351 mention_type: notification.data.mention_type, 352 mentioned_uri: mentionedUri, 353 commentData: commentData!, 354 commenterHandle, 355 mentionedPublication: mentionedUri ? mentionedPublications?.find((p) => p.uri === mentionedUri) : undefined, 356 mentionedDocument: mentionedUri ? mentionedDocuments?.find((d) => d.uri === mentionedUri) : undefined, 357 }; 358 }); 359} 360 361export async function pingIdentityToUpdateNotification(did: string) { 362 let channel = supabaseServerClient.channel(`identity.atp_did:${did}`); 363 await channel.send({ 364 type: "broadcast", 365 event: "notification", 366 payload: { message: "poke" }, 367 }); 368 await supabaseServerClient.removeChannel(channel); 369}