a tool for shared writing and social publishing
at refactor/standard.site 426 lines 16 kB view raw
1"use server"; 2 3import { supabaseServerClient } from "supabase/serverClient"; 4import { Tables, TablesInsert } from "supabase/database.types"; 5import { AtUri } from "@atproto/syntax"; 6import { idResolver } from "app/(home-pages)/reader/idResolver"; 7import { 8 normalizeDocumentRecord, 9 normalizePublicationRecord, 10 type NormalizedDocument, 11 type NormalizedPublication, 12} from "src/utils/normalizeRecords"; 13 14type NotificationRow = Tables<"notifications">; 15 16export type Notification = Omit<TablesInsert<"notifications">, "data"> & { 17 data: NotificationData; 18}; 19 20export type NotificationData = 21 | { type: "comment"; comment_uri: string; parent_uri?: string } 22 | { type: "subscribe"; subscription_uri: string } 23 | { type: "quote"; bsky_post_uri: string; document_uri: string } 24 | { type: "mention"; document_uri: string; mention_type: "did" } 25 | { type: "mention"; document_uri: string; mention_type: "publication"; mentioned_uri: string } 26 | { type: "mention"; document_uri: string; mention_type: "document"; mentioned_uri: string } 27 | { type: "comment_mention"; comment_uri: string; mention_type: "did" } 28 | { type: "comment_mention"; comment_uri: string; mention_type: "publication"; mentioned_uri: string } 29 | { type: "comment_mention"; comment_uri: string; mention_type: "document"; mentioned_uri: string }; 30 31export type HydratedNotification = 32 | HydratedCommentNotification 33 | HydratedSubscribeNotification 34 | HydratedQuoteNotification 35 | HydratedMentionNotification 36 | HydratedCommentMentionNotification; 37export async function hydrateNotifications( 38 notifications: NotificationRow[], 39): Promise<Array<HydratedNotification>> { 40 // Call all hydrators in parallel 41 const [commentNotifications, subscribeNotifications, quoteNotifications, mentionNotifications, commentMentionNotifications] = await Promise.all([ 42 hydrateCommentNotifications(notifications), 43 hydrateSubscribeNotifications(notifications), 44 hydrateQuoteNotifications(notifications), 45 hydrateMentionNotifications(notifications), 46 hydrateCommentMentionNotifications(notifications), 47 ]); 48 49 // Combine all hydrated notifications 50 const allHydrated = [...commentNotifications, ...subscribeNotifications, ...quoteNotifications, ...mentionNotifications, ...commentMentionNotifications]; 51 52 // Sort by created_at to maintain order 53 allHydrated.sort( 54 (a, b) => 55 new Date(b.created_at).getTime() - new Date(a.created_at).getTime(), 56 ); 57 58 return allHydrated; 59} 60 61// Type guard to extract notification type 62type ExtractNotificationType<T extends NotificationData["type"]> = Extract< 63 NotificationData, 64 { type: T } 65>; 66 67export type HydratedCommentNotification = Awaited< 68 ReturnType<typeof hydrateCommentNotifications> 69>[0]; 70 71async function hydrateCommentNotifications(notifications: NotificationRow[]) { 72 const commentNotifications = notifications.filter( 73 (n): n is NotificationRow & { data: ExtractNotificationType<"comment"> } => 74 (n.data as NotificationData)?.type === "comment", 75 ); 76 77 if (commentNotifications.length === 0) { 78 return []; 79 } 80 81 // Fetch comment data from the database 82 const commentUris = commentNotifications.flatMap((n) => 83 n.data.parent_uri 84 ? [n.data.comment_uri, n.data.parent_uri] 85 : [n.data.comment_uri], 86 ); 87 const { data: comments } = await supabaseServerClient 88 .from("comments_on_documents") 89 .select( 90 "*,bsky_profiles(*), documents(*, documents_in_publications(publications(*)))", 91 ) 92 .in("uri", commentUris); 93 94 return commentNotifications 95 .map((notification) => { 96 const commentData = comments?.find((c) => c.uri === notification.data.comment_uri); 97 if (!commentData) return null; 98 return { 99 id: notification.id, 100 recipient: notification.recipient, 101 created_at: notification.created_at, 102 type: "comment" as const, 103 comment_uri: notification.data.comment_uri, 104 parentData: notification.data.parent_uri 105 ? comments?.find((c) => c.uri === notification.data.parent_uri) 106 : undefined, 107 commentData, 108 normalizedDocument: normalizeDocumentRecord(commentData.documents?.data, commentData.documents?.uri), 109 normalizedPublication: normalizePublicationRecord( 110 commentData.documents?.documents_in_publications[0]?.publications?.record, 111 ), 112 }; 113 }) 114 .filter((n) => n !== null); 115} 116 117export type HydratedSubscribeNotification = Awaited< 118 ReturnType<typeof hydrateSubscribeNotifications> 119>[0]; 120 121async function hydrateSubscribeNotifications(notifications: NotificationRow[]) { 122 const subscribeNotifications = notifications.filter( 123 ( 124 n, 125 ): n is NotificationRow & { data: ExtractNotificationType<"subscribe"> } => 126 (n.data as NotificationData)?.type === "subscribe", 127 ); 128 129 if (subscribeNotifications.length === 0) { 130 return []; 131 } 132 133 // Fetch subscription data from the database with related data 134 const subscriptionUris = subscribeNotifications.map( 135 (n) => n.data.subscription_uri, 136 ); 137 const { data: subscriptions } = await supabaseServerClient 138 .from("publication_subscriptions") 139 .select("*, identities(bsky_profiles(*)), publications(*)") 140 .in("uri", subscriptionUris); 141 142 return subscribeNotifications 143 .map((notification) => { 144 const subscriptionData = subscriptions?.find((s) => s.uri === notification.data.subscription_uri); 145 if (!subscriptionData) return null; 146 return { 147 id: notification.id, 148 recipient: notification.recipient, 149 created_at: notification.created_at, 150 type: "subscribe" as const, 151 subscription_uri: notification.data.subscription_uri, 152 subscriptionData, 153 normalizedPublication: normalizePublicationRecord(subscriptionData.publications?.record), 154 }; 155 }) 156 .filter((n) => n !== null); 157} 158 159export type HydratedQuoteNotification = Awaited< 160 ReturnType<typeof hydrateQuoteNotifications> 161>[0]; 162 163async function hydrateQuoteNotifications(notifications: NotificationRow[]) { 164 const quoteNotifications = notifications.filter( 165 (n): n is NotificationRow & { data: ExtractNotificationType<"quote"> } => 166 (n.data as NotificationData)?.type === "quote", 167 ); 168 169 if (quoteNotifications.length === 0) { 170 return []; 171 } 172 173 // Fetch bsky post data and document data 174 const bskyPostUris = quoteNotifications.map((n) => n.data.bsky_post_uri); 175 const documentUris = quoteNotifications.map((n) => n.data.document_uri); 176 177 const { data: bskyPosts } = await supabaseServerClient 178 .from("bsky_posts") 179 .select("*") 180 .in("uri", bskyPostUris); 181 182 const { data: documents } = await supabaseServerClient 183 .from("documents") 184 .select("*, documents_in_publications(publications(*))") 185 .in("uri", documentUris); 186 187 return quoteNotifications 188 .map((notification) => { 189 const bskyPost = bskyPosts?.find((p) => p.uri === notification.data.bsky_post_uri); 190 const document = documents?.find((d) => d.uri === notification.data.document_uri); 191 if (!bskyPost || !document) return null; 192 return { 193 id: notification.id, 194 recipient: notification.recipient, 195 created_at: notification.created_at, 196 type: "quote" as const, 197 bsky_post_uri: notification.data.bsky_post_uri, 198 document_uri: notification.data.document_uri, 199 bskyPost, 200 document, 201 normalizedDocument: normalizeDocumentRecord(document.data, document.uri), 202 normalizedPublication: normalizePublicationRecord( 203 document.documents_in_publications[0]?.publications?.record, 204 ), 205 }; 206 }) 207 .filter((n) => n !== null); 208} 209 210export type HydratedMentionNotification = Awaited< 211 ReturnType<typeof hydrateMentionNotifications> 212>[0]; 213 214async function hydrateMentionNotifications(notifications: NotificationRow[]) { 215 const mentionNotifications = notifications.filter( 216 (n): n is NotificationRow & { data: ExtractNotificationType<"mention"> } => 217 (n.data as NotificationData)?.type === "mention", 218 ); 219 220 if (mentionNotifications.length === 0) { 221 return []; 222 } 223 224 // Fetch document data from the database 225 const documentUris = mentionNotifications.map((n) => n.data.document_uri); 226 const { data: documents } = await supabaseServerClient 227 .from("documents") 228 .select("*, documents_in_publications(publications(*))") 229 .in("uri", documentUris); 230 231 // Extract unique DIDs from document URIs to resolve handles 232 const documentCreatorDids = [...new Set(documentUris.map((uri) => new AtUri(uri).host))]; 233 234 // Resolve DIDs to handles in parallel 235 const didToHandleMap = new Map<string, string | null>(); 236 await Promise.all( 237 documentCreatorDids.map(async (did) => { 238 try { 239 const resolved = await idResolver.did.resolve(did); 240 const handle = resolved?.alsoKnownAs?.[0] 241 ? resolved.alsoKnownAs[0].slice(5) // Remove "at://" prefix 242 : null; 243 didToHandleMap.set(did, handle); 244 } catch (error) { 245 console.error(`Failed to resolve DID ${did}:`, error); 246 didToHandleMap.set(did, null); 247 } 248 }), 249 ); 250 251 // Fetch mentioned publications and documents 252 const mentionedPublicationUris = mentionNotifications 253 .filter((n) => n.data.mention_type === "publication") 254 .map((n) => (n.data as Extract<ExtractNotificationType<"mention">, { mention_type: "publication" }>).mentioned_uri); 255 256 const mentionedDocumentUris = mentionNotifications 257 .filter((n) => n.data.mention_type === "document") 258 .map((n) => (n.data as Extract<ExtractNotificationType<"mention">, { mention_type: "document" }>).mentioned_uri); 259 260 const [{ data: mentionedPublications }, { data: mentionedDocuments }] = await Promise.all([ 261 mentionedPublicationUris.length > 0 262 ? supabaseServerClient 263 .from("publications") 264 .select("*") 265 .in("uri", mentionedPublicationUris) 266 : Promise.resolve({ data: [] }), 267 mentionedDocumentUris.length > 0 268 ? supabaseServerClient 269 .from("documents") 270 .select("*, documents_in_publications(publications(*))") 271 .in("uri", mentionedDocumentUris) 272 : Promise.resolve({ data: [] }), 273 ]); 274 275 return mentionNotifications 276 .map((notification) => { 277 const document = documents?.find((d) => d.uri === notification.data.document_uri); 278 if (!document) return null; 279 280 const mentionedUri = notification.data.mention_type !== "did" 281 ? (notification.data as Extract<ExtractNotificationType<"mention">, { mentioned_uri: string }>).mentioned_uri 282 : undefined; 283 284 const documentCreatorDid = new AtUri(notification.data.document_uri).host; 285 const documentCreatorHandle = didToHandleMap.get(documentCreatorDid) ?? null; 286 287 const mentionedPublication = mentionedUri ? mentionedPublications?.find((p) => p.uri === mentionedUri) : undefined; 288 const mentionedDoc = mentionedUri ? mentionedDocuments?.find((d) => d.uri === mentionedUri) : undefined; 289 290 return { 291 id: notification.id, 292 recipient: notification.recipient, 293 created_at: notification.created_at, 294 type: "mention" as const, 295 document_uri: notification.data.document_uri, 296 mention_type: notification.data.mention_type, 297 mentioned_uri: mentionedUri, 298 document, 299 documentCreatorHandle, 300 mentionedPublication, 301 mentionedDocument: mentionedDoc, 302 normalizedDocument: normalizeDocumentRecord(document.data, document.uri), 303 normalizedPublication: normalizePublicationRecord( 304 document.documents_in_publications[0]?.publications?.record, 305 ), 306 normalizedMentionedPublication: normalizePublicationRecord(mentionedPublication?.record), 307 normalizedMentionedDocument: normalizeDocumentRecord(mentionedDoc?.data, mentionedDoc?.uri), 308 }; 309 }) 310 .filter((n) => n !== null); 311} 312 313export type HydratedCommentMentionNotification = Awaited< 314 ReturnType<typeof hydrateCommentMentionNotifications> 315>[0]; 316 317async function hydrateCommentMentionNotifications(notifications: NotificationRow[]) { 318 const commentMentionNotifications = notifications.filter( 319 (n): n is NotificationRow & { data: ExtractNotificationType<"comment_mention"> } => 320 (n.data as NotificationData)?.type === "comment_mention", 321 ); 322 323 if (commentMentionNotifications.length === 0) { 324 return []; 325 } 326 327 // Fetch comment data from the database 328 const commentUris = commentMentionNotifications.map((n) => n.data.comment_uri); 329 const { data: comments } = await supabaseServerClient 330 .from("comments_on_documents") 331 .select( 332 "*, bsky_profiles(*), documents(*, documents_in_publications(publications(*)))", 333 ) 334 .in("uri", commentUris); 335 336 // Extract unique DIDs from comment URIs to resolve handles 337 const commenterDids = [...new Set(commentUris.map((uri) => new AtUri(uri).host))]; 338 339 // Resolve DIDs to handles in parallel 340 const didToHandleMap = new Map<string, string | null>(); 341 await Promise.all( 342 commenterDids.map(async (did) => { 343 try { 344 const resolved = await idResolver.did.resolve(did); 345 const handle = resolved?.alsoKnownAs?.[0] 346 ? resolved.alsoKnownAs[0].slice(5) // Remove "at://" prefix 347 : null; 348 didToHandleMap.set(did, handle); 349 } catch (error) { 350 console.error(`Failed to resolve DID ${did}:`, error); 351 didToHandleMap.set(did, null); 352 } 353 }), 354 ); 355 356 // Fetch mentioned publications and documents 357 const mentionedPublicationUris = commentMentionNotifications 358 .filter((n) => n.data.mention_type === "publication") 359 .map((n) => (n.data as Extract<ExtractNotificationType<"comment_mention">, { mention_type: "publication" }>).mentioned_uri); 360 361 const mentionedDocumentUris = commentMentionNotifications 362 .filter((n) => n.data.mention_type === "document") 363 .map((n) => (n.data as Extract<ExtractNotificationType<"comment_mention">, { mention_type: "document" }>).mentioned_uri); 364 365 const [{ data: mentionedPublications }, { data: mentionedDocuments }] = await Promise.all([ 366 mentionedPublicationUris.length > 0 367 ? supabaseServerClient 368 .from("publications") 369 .select("*") 370 .in("uri", mentionedPublicationUris) 371 : Promise.resolve({ data: [] }), 372 mentionedDocumentUris.length > 0 373 ? supabaseServerClient 374 .from("documents") 375 .select("*, documents_in_publications(publications(*))") 376 .in("uri", mentionedDocumentUris) 377 : Promise.resolve({ data: [] }), 378 ]); 379 380 return commentMentionNotifications 381 .map((notification) => { 382 const commentData = comments?.find((c) => c.uri === notification.data.comment_uri); 383 if (!commentData) return null; 384 385 const mentionedUri = notification.data.mention_type !== "did" 386 ? (notification.data as Extract<ExtractNotificationType<"comment_mention">, { mentioned_uri: string }>).mentioned_uri 387 : undefined; 388 389 const commenterDid = new AtUri(notification.data.comment_uri).host; 390 const commenterHandle = didToHandleMap.get(commenterDid) ?? null; 391 392 const mentionedPublication = mentionedUri ? mentionedPublications?.find((p) => p.uri === mentionedUri) : undefined; 393 const mentionedDoc = mentionedUri ? mentionedDocuments?.find((d) => d.uri === mentionedUri) : undefined; 394 395 return { 396 id: notification.id, 397 recipient: notification.recipient, 398 created_at: notification.created_at, 399 type: "comment_mention" as const, 400 comment_uri: notification.data.comment_uri, 401 mention_type: notification.data.mention_type, 402 mentioned_uri: mentionedUri, 403 commentData, 404 commenterHandle, 405 mentionedPublication, 406 mentionedDocument: mentionedDoc, 407 normalizedDocument: normalizeDocumentRecord(commentData.documents?.data, commentData.documents?.uri), 408 normalizedPublication: normalizePublicationRecord( 409 commentData.documents?.documents_in_publications[0]?.publications?.record, 410 ), 411 normalizedMentionedPublication: normalizePublicationRecord(mentionedPublication?.record), 412 normalizedMentionedDocument: normalizeDocumentRecord(mentionedDoc?.data, mentionedDoc?.uri), 413 }; 414 }) 415 .filter((n) => n !== null); 416} 417 418export async function pingIdentityToUpdateNotification(did: string) { 419 let channel = supabaseServerClient.channel(`identity.atp_did:${did}`); 420 await channel.send({ 421 type: "broadcast", 422 event: "notification", 423 payload: { message: "poke" }, 424 }); 425 await supabaseServerClient.removeChannel(channel); 426}