a tool for shared writing and social publishing
at feature/analytics 585 lines 23 kB view raw
1"use server"; 2 3import { supabaseServerClient } from "supabase/serverClient"; 4import { Tables, TablesInsert } from "supabase/database.types"; 5import { AtUri } from "@atproto/syntax"; 6import { idResolver } from "app/(home-pages)/reader/idResolver"; 7import { 8 normalizeDocumentRecord, 9 normalizePublicationRecord, 10 type NormalizedDocument, 11 type NormalizedPublication, 12} from "src/utils/normalizeRecords"; 13 14type NotificationRow = Tables<"notifications">; 15 16export type Notification = Omit<TablesInsert<"notifications">, "data"> & { 17 data: NotificationData; 18}; 19 20export type NotificationData = 21 | { type: "comment"; comment_uri: string; parent_uri?: string } 22 | { type: "subscribe"; subscription_uri: string } 23 | { type: "quote"; bsky_post_uri: string; document_uri: string } 24 | { type: "bsky_post_embed"; document_uri: string; bsky_post_uri: string } 25 | { type: "mention"; document_uri: string; mention_type: "did" } 26 | { type: "mention"; document_uri: string; mention_type: "publication"; mentioned_uri: string } 27 | { type: "mention"; document_uri: string; mention_type: "document"; mentioned_uri: string } 28 | { type: "comment_mention"; comment_uri: string; mention_type: "did" } 29 | { type: "comment_mention"; comment_uri: string; mention_type: "publication"; mentioned_uri: string } 30 | { type: "comment_mention"; comment_uri: string; mention_type: "document"; mentioned_uri: string } 31 | { type: "recommend"; document_uri: string; recommend_uri: string }; 32 33export type HydratedNotification = 34 | HydratedCommentNotification 35 | HydratedSubscribeNotification 36 | HydratedQuoteNotification 37 | HydratedBskyPostEmbedNotification 38 | HydratedMentionNotification 39 | HydratedCommentMentionNotification 40 | HydratedRecommendNotification; 41export async function hydrateNotifications( 42 notifications: NotificationRow[], 43): Promise<Array<HydratedNotification>> { 44 // Call all hydrators in parallel 45 const [commentNotifications, subscribeNotifications, quoteNotifications, bskyPostEmbedNotifications, mentionNotifications, commentMentionNotifications, recommendNotifications] = await Promise.all([ 46 hydrateCommentNotifications(notifications), 47 hydrateSubscribeNotifications(notifications), 48 hydrateQuoteNotifications(notifications), 49 hydrateBskyPostEmbedNotifications(notifications), 50 hydrateMentionNotifications(notifications), 51 hydrateCommentMentionNotifications(notifications), 52 hydrateRecommendNotifications(notifications), 53 ]); 54 55 // Combine all hydrated notifications 56 const allHydrated = [...commentNotifications, ...subscribeNotifications, ...quoteNotifications, ...bskyPostEmbedNotifications, ...mentionNotifications, ...commentMentionNotifications, ...recommendNotifications]; 57 58 // Sort by created_at to maintain order 59 allHydrated.sort( 60 (a, b) => 61 new Date(b.created_at).getTime() - new Date(a.created_at).getTime(), 62 ); 63 64 return allHydrated; 65} 66 67// Type guard to extract notification type 68type ExtractNotificationType<T extends NotificationData["type"]> = Extract< 69 NotificationData, 70 { type: T } 71>; 72 73export type HydratedCommentNotification = Awaited< 74 ReturnType<typeof hydrateCommentNotifications> 75>[0]; 76 77async function hydrateCommentNotifications(notifications: NotificationRow[]) { 78 const commentNotifications = notifications.filter( 79 (n): n is NotificationRow & { data: ExtractNotificationType<"comment"> } => 80 (n.data as NotificationData)?.type === "comment", 81 ); 82 83 if (commentNotifications.length === 0) { 84 return []; 85 } 86 87 // Fetch comment data from the database 88 const commentUris = commentNotifications.flatMap((n) => 89 n.data.parent_uri 90 ? [n.data.comment_uri, n.data.parent_uri] 91 : [n.data.comment_uri], 92 ); 93 const { data: comments } = await supabaseServerClient 94 .from("comments_on_documents") 95 .select( 96 "*,bsky_profiles(*), documents(*, documents_in_publications(publications(*)))", 97 ) 98 .in("uri", commentUris); 99 100 return commentNotifications 101 .map((notification) => { 102 const commentData = comments?.find((c) => c.uri === notification.data.comment_uri); 103 if (!commentData) return null; 104 return { 105 id: notification.id, 106 recipient: notification.recipient, 107 created_at: notification.created_at, 108 type: "comment" as const, 109 comment_uri: notification.data.comment_uri, 110 parentData: notification.data.parent_uri 111 ? comments?.find((c) => c.uri === notification.data.parent_uri) 112 : undefined, 113 commentData, 114 normalizedDocument: normalizeDocumentRecord(commentData.documents?.data, commentData.documents?.uri), 115 normalizedPublication: normalizePublicationRecord( 116 commentData.documents?.documents_in_publications[0]?.publications?.record, 117 ), 118 }; 119 }) 120 .filter((n) => n !== null); 121} 122 123export type HydratedSubscribeNotification = Awaited< 124 ReturnType<typeof hydrateSubscribeNotifications> 125>[0]; 126 127async function hydrateSubscribeNotifications(notifications: NotificationRow[]) { 128 const subscribeNotifications = notifications.filter( 129 ( 130 n, 131 ): n is NotificationRow & { data: ExtractNotificationType<"subscribe"> } => 132 (n.data as NotificationData)?.type === "subscribe", 133 ); 134 135 if (subscribeNotifications.length === 0) { 136 return []; 137 } 138 139 // Fetch subscription data from the database with related data 140 const subscriptionUris = subscribeNotifications.map( 141 (n) => n.data.subscription_uri, 142 ); 143 const { data: subscriptions } = await supabaseServerClient 144 .from("publication_subscriptions") 145 .select("*, identities(bsky_profiles(*)), publications(*)") 146 .in("uri", subscriptionUris); 147 148 return subscribeNotifications 149 .map((notification) => { 150 const subscriptionData = subscriptions?.find((s) => s.uri === notification.data.subscription_uri); 151 if (!subscriptionData) return null; 152 return { 153 id: notification.id, 154 recipient: notification.recipient, 155 created_at: notification.created_at, 156 type: "subscribe" as const, 157 subscription_uri: notification.data.subscription_uri, 158 subscriptionData, 159 normalizedPublication: normalizePublicationRecord(subscriptionData.publications?.record), 160 }; 161 }) 162 .filter((n) => n !== null); 163} 164 165export type HydratedQuoteNotification = Awaited< 166 ReturnType<typeof hydrateQuoteNotifications> 167>[0]; 168 169async function hydrateQuoteNotifications(notifications: NotificationRow[]) { 170 const quoteNotifications = notifications.filter( 171 (n): n is NotificationRow & { data: ExtractNotificationType<"quote"> } => 172 (n.data as NotificationData)?.type === "quote", 173 ); 174 175 if (quoteNotifications.length === 0) { 176 return []; 177 } 178 179 // Fetch bsky post data and document data 180 const bskyPostUris = quoteNotifications.map((n) => n.data.bsky_post_uri); 181 const documentUris = quoteNotifications.map((n) => n.data.document_uri); 182 183 const { data: bskyPosts } = await supabaseServerClient 184 .from("bsky_posts") 185 .select("*") 186 .in("uri", bskyPostUris); 187 188 const { data: documents } = await supabaseServerClient 189 .from("documents") 190 .select("*, documents_in_publications(publications(*))") 191 .in("uri", documentUris); 192 193 return quoteNotifications 194 .map((notification) => { 195 const bskyPost = bskyPosts?.find((p) => p.uri === notification.data.bsky_post_uri); 196 const document = documents?.find((d) => d.uri === notification.data.document_uri); 197 if (!bskyPost || !document) return null; 198 return { 199 id: notification.id, 200 recipient: notification.recipient, 201 created_at: notification.created_at, 202 type: "quote" as const, 203 bsky_post_uri: notification.data.bsky_post_uri, 204 document_uri: notification.data.document_uri, 205 bskyPost, 206 document, 207 normalizedDocument: normalizeDocumentRecord(document.data, document.uri), 208 normalizedPublication: normalizePublicationRecord( 209 document.documents_in_publications[0]?.publications?.record, 210 ), 211 }; 212 }) 213 .filter((n) => n !== null); 214} 215 216export type HydratedBskyPostEmbedNotification = Awaited< 217 ReturnType<typeof hydrateBskyPostEmbedNotifications> 218>[0]; 219 220async function hydrateBskyPostEmbedNotifications(notifications: NotificationRow[]) { 221 const bskyPostEmbedNotifications = notifications.filter( 222 (n): n is NotificationRow & { data: ExtractNotificationType<"bsky_post_embed"> } => 223 (n.data as NotificationData)?.type === "bsky_post_embed", 224 ); 225 226 if (bskyPostEmbedNotifications.length === 0) { 227 return []; 228 } 229 230 // Fetch document data (the leaflet that embedded the post) 231 const documentUris = bskyPostEmbedNotifications.map((n) => n.data.document_uri); 232 const bskyPostUris = bskyPostEmbedNotifications.map((n) => n.data.bsky_post_uri); 233 234 const [{ data: documents }, { data: cachedBskyPosts }] = await Promise.all([ 235 supabaseServerClient 236 .from("documents") 237 .select("*, documents_in_publications(publications(*))") 238 .in("uri", documentUris), 239 supabaseServerClient 240 .from("bsky_posts") 241 .select("*") 242 .in("uri", bskyPostUris), 243 ]); 244 245 // Find which posts we need to fetch from the API 246 const cachedPostUris = new Set(cachedBskyPosts?.map((p) => p.uri) ?? []); 247 const missingPostUris = bskyPostUris.filter((uri) => !cachedPostUris.has(uri)); 248 249 // Fetch missing posts from Bluesky API 250 const fetchedPosts = new Map<string, { text: string } | null>(); 251 if (missingPostUris.length > 0) { 252 try { 253 const { AtpAgent } = await import("@atproto/api"); 254 const agent = new AtpAgent({ service: "https://public.api.bsky.app" }); 255 const response = await agent.app.bsky.feed.getPosts({ uris: missingPostUris }); 256 for (const post of response.data.posts) { 257 const record = post.record as { text?: string }; 258 fetchedPosts.set(post.uri, { text: record.text ?? "" }); 259 } 260 } catch (error) { 261 console.error("Failed to fetch Bluesky posts:", error); 262 } 263 } 264 265 // Extract unique DIDs from document URIs to resolve handles 266 const documentCreatorDids = [...new Set(documentUris.map((uri) => new AtUri(uri).host))]; 267 268 // Resolve DIDs to handles in parallel 269 const didToHandleMap = new Map<string, string | null>(); 270 await Promise.all( 271 documentCreatorDids.map(async (did) => { 272 try { 273 const resolved = await idResolver.did.resolve(did); 274 const handle = resolved?.alsoKnownAs?.[0] 275 ? resolved.alsoKnownAs[0].slice(5) // Remove "at://" prefix 276 : null; 277 didToHandleMap.set(did, handle); 278 } catch (error) { 279 console.error(`Failed to resolve DID ${did}:`, error); 280 didToHandleMap.set(did, null); 281 } 282 }), 283 ); 284 285 return bskyPostEmbedNotifications 286 .map((notification) => { 287 const document = documents?.find((d) => d.uri === notification.data.document_uri); 288 if (!document) return null; 289 290 const documentCreatorDid = new AtUri(notification.data.document_uri).host; 291 const documentCreatorHandle = didToHandleMap.get(documentCreatorDid) ?? null; 292 293 // Get post text from cache or fetched data 294 const cachedPost = cachedBskyPosts?.find((p) => p.uri === notification.data.bsky_post_uri); 295 const postView = cachedPost?.post_view as { record?: { text?: string } } | undefined; 296 const bskyPostText = postView?.record?.text ?? fetchedPosts.get(notification.data.bsky_post_uri)?.text ?? null; 297 298 return { 299 id: notification.id, 300 recipient: notification.recipient, 301 created_at: notification.created_at, 302 type: "bsky_post_embed" as const, 303 document_uri: notification.data.document_uri, 304 bsky_post_uri: notification.data.bsky_post_uri, 305 document, 306 documentCreatorHandle, 307 bskyPostText, 308 normalizedDocument: normalizeDocumentRecord(document.data, document.uri), 309 normalizedPublication: normalizePublicationRecord( 310 document.documents_in_publications[0]?.publications?.record, 311 ), 312 }; 313 }) 314 .filter((n) => n !== null); 315} 316 317export type HydratedMentionNotification = Awaited< 318 ReturnType<typeof hydrateMentionNotifications> 319>[0]; 320 321async function hydrateMentionNotifications(notifications: NotificationRow[]) { 322 const mentionNotifications = notifications.filter( 323 (n): n is NotificationRow & { data: ExtractNotificationType<"mention"> } => 324 (n.data as NotificationData)?.type === "mention", 325 ); 326 327 if (mentionNotifications.length === 0) { 328 return []; 329 } 330 331 // Fetch document data from the database 332 const documentUris = mentionNotifications.map((n) => n.data.document_uri); 333 const { data: documents } = await supabaseServerClient 334 .from("documents") 335 .select("*, documents_in_publications(publications(*))") 336 .in("uri", documentUris); 337 338 // Extract unique DIDs from document URIs to resolve handles 339 const documentCreatorDids = [...new Set(documentUris.map((uri) => new AtUri(uri).host))]; 340 341 // Resolve DIDs to handles in parallel 342 const didToHandleMap = new Map<string, string | null>(); 343 await Promise.all( 344 documentCreatorDids.map(async (did) => { 345 try { 346 const resolved = await idResolver.did.resolve(did); 347 const handle = resolved?.alsoKnownAs?.[0] 348 ? resolved.alsoKnownAs[0].slice(5) // Remove "at://" prefix 349 : null; 350 didToHandleMap.set(did, handle); 351 } catch (error) { 352 console.error(`Failed to resolve DID ${did}:`, error); 353 didToHandleMap.set(did, null); 354 } 355 }), 356 ); 357 358 // Fetch mentioned publications and documents 359 const mentionedPublicationUris = mentionNotifications 360 .filter((n) => n.data.mention_type === "publication") 361 .map((n) => (n.data as Extract<ExtractNotificationType<"mention">, { mention_type: "publication" }>).mentioned_uri); 362 363 const mentionedDocumentUris = mentionNotifications 364 .filter((n) => n.data.mention_type === "document") 365 .map((n) => (n.data as Extract<ExtractNotificationType<"mention">, { mention_type: "document" }>).mentioned_uri); 366 367 const [{ data: mentionedPublications }, { data: mentionedDocuments }] = await Promise.all([ 368 mentionedPublicationUris.length > 0 369 ? supabaseServerClient 370 .from("publications") 371 .select("*") 372 .in("uri", mentionedPublicationUris) 373 : Promise.resolve({ data: [] }), 374 mentionedDocumentUris.length > 0 375 ? supabaseServerClient 376 .from("documents") 377 .select("*, documents_in_publications(publications(*))") 378 .in("uri", mentionedDocumentUris) 379 : Promise.resolve({ data: [] }), 380 ]); 381 382 return mentionNotifications 383 .map((notification) => { 384 const document = documents?.find((d) => d.uri === notification.data.document_uri); 385 if (!document) return null; 386 387 const mentionedUri = notification.data.mention_type !== "did" 388 ? (notification.data as Extract<ExtractNotificationType<"mention">, { mentioned_uri: string }>).mentioned_uri 389 : undefined; 390 391 const documentCreatorDid = new AtUri(notification.data.document_uri).host; 392 const documentCreatorHandle = didToHandleMap.get(documentCreatorDid) ?? null; 393 394 const mentionedPublication = mentionedUri ? mentionedPublications?.find((p) => p.uri === mentionedUri) : undefined; 395 const mentionedDoc = mentionedUri ? mentionedDocuments?.find((d) => d.uri === mentionedUri) : undefined; 396 397 return { 398 id: notification.id, 399 recipient: notification.recipient, 400 created_at: notification.created_at, 401 type: "mention" as const, 402 document_uri: notification.data.document_uri, 403 mention_type: notification.data.mention_type, 404 mentioned_uri: mentionedUri, 405 document, 406 documentCreatorHandle, 407 mentionedPublication, 408 mentionedDocument: mentionedDoc, 409 normalizedDocument: normalizeDocumentRecord(document.data, document.uri), 410 normalizedPublication: normalizePublicationRecord( 411 document.documents_in_publications[0]?.publications?.record, 412 ), 413 normalizedMentionedPublication: normalizePublicationRecord(mentionedPublication?.record), 414 normalizedMentionedDocument: normalizeDocumentRecord(mentionedDoc?.data, mentionedDoc?.uri), 415 }; 416 }) 417 .filter((n) => n !== null); 418} 419 420export type HydratedCommentMentionNotification = Awaited< 421 ReturnType<typeof hydrateCommentMentionNotifications> 422>[0]; 423 424async function hydrateCommentMentionNotifications(notifications: NotificationRow[]) { 425 const commentMentionNotifications = notifications.filter( 426 (n): n is NotificationRow & { data: ExtractNotificationType<"comment_mention"> } => 427 (n.data as NotificationData)?.type === "comment_mention", 428 ); 429 430 if (commentMentionNotifications.length === 0) { 431 return []; 432 } 433 434 // Fetch comment data from the database 435 const commentUris = commentMentionNotifications.map((n) => n.data.comment_uri); 436 const { data: comments } = await supabaseServerClient 437 .from("comments_on_documents") 438 .select( 439 "*, bsky_profiles(*), documents(*, documents_in_publications(publications(*)))", 440 ) 441 .in("uri", commentUris); 442 443 // Extract unique DIDs from comment URIs to resolve handles 444 const commenterDids = [...new Set(commentUris.map((uri) => new AtUri(uri).host))]; 445 446 // Resolve DIDs to handles in parallel 447 const didToHandleMap = new Map<string, string | null>(); 448 await Promise.all( 449 commenterDids.map(async (did) => { 450 try { 451 const resolved = await idResolver.did.resolve(did); 452 const handle = resolved?.alsoKnownAs?.[0] 453 ? resolved.alsoKnownAs[0].slice(5) // Remove "at://" prefix 454 : null; 455 didToHandleMap.set(did, handle); 456 } catch (error) { 457 console.error(`Failed to resolve DID ${did}:`, error); 458 didToHandleMap.set(did, null); 459 } 460 }), 461 ); 462 463 // Fetch mentioned publications and documents 464 const mentionedPublicationUris = commentMentionNotifications 465 .filter((n) => n.data.mention_type === "publication") 466 .map((n) => (n.data as Extract<ExtractNotificationType<"comment_mention">, { mention_type: "publication" }>).mentioned_uri); 467 468 const mentionedDocumentUris = commentMentionNotifications 469 .filter((n) => n.data.mention_type === "document") 470 .map((n) => (n.data as Extract<ExtractNotificationType<"comment_mention">, { mention_type: "document" }>).mentioned_uri); 471 472 const [{ data: mentionedPublications }, { data: mentionedDocuments }] = await Promise.all([ 473 mentionedPublicationUris.length > 0 474 ? supabaseServerClient 475 .from("publications") 476 .select("*") 477 .in("uri", mentionedPublicationUris) 478 : Promise.resolve({ data: [] }), 479 mentionedDocumentUris.length > 0 480 ? supabaseServerClient 481 .from("documents") 482 .select("*, documents_in_publications(publications(*))") 483 .in("uri", mentionedDocumentUris) 484 : Promise.resolve({ data: [] }), 485 ]); 486 487 return commentMentionNotifications 488 .map((notification) => { 489 const commentData = comments?.find((c) => c.uri === notification.data.comment_uri); 490 if (!commentData) return null; 491 492 const mentionedUri = notification.data.mention_type !== "did" 493 ? (notification.data as Extract<ExtractNotificationType<"comment_mention">, { mentioned_uri: string }>).mentioned_uri 494 : undefined; 495 496 const commenterDid = new AtUri(notification.data.comment_uri).host; 497 const commenterHandle = didToHandleMap.get(commenterDid) ?? null; 498 499 const mentionedPublication = mentionedUri ? mentionedPublications?.find((p) => p.uri === mentionedUri) : undefined; 500 const mentionedDoc = mentionedUri ? mentionedDocuments?.find((d) => d.uri === mentionedUri) : undefined; 501 502 return { 503 id: notification.id, 504 recipient: notification.recipient, 505 created_at: notification.created_at, 506 type: "comment_mention" as const, 507 comment_uri: notification.data.comment_uri, 508 mention_type: notification.data.mention_type, 509 mentioned_uri: mentionedUri, 510 commentData, 511 commenterHandle, 512 mentionedPublication, 513 mentionedDocument: mentionedDoc, 514 normalizedDocument: normalizeDocumentRecord(commentData.documents?.data, commentData.documents?.uri), 515 normalizedPublication: normalizePublicationRecord( 516 commentData.documents?.documents_in_publications[0]?.publications?.record, 517 ), 518 normalizedMentionedPublication: normalizePublicationRecord(mentionedPublication?.record), 519 normalizedMentionedDocument: normalizeDocumentRecord(mentionedDoc?.data, mentionedDoc?.uri), 520 }; 521 }) 522 .filter((n) => n !== null); 523} 524 525export type HydratedRecommendNotification = Awaited< 526 ReturnType<typeof hydrateRecommendNotifications> 527>[0]; 528 529async function hydrateRecommendNotifications(notifications: NotificationRow[]) { 530 const recommendNotifications = notifications.filter( 531 (n): n is NotificationRow & { data: ExtractNotificationType<"recommend"> } => 532 (n.data as NotificationData)?.type === "recommend", 533 ); 534 535 if (recommendNotifications.length === 0) { 536 return []; 537 } 538 539 // Fetch recommend data from the database 540 const recommendUris = recommendNotifications.map((n) => n.data.recommend_uri); 541 const documentUris = recommendNotifications.map((n) => n.data.document_uri); 542 543 const [{ data: recommends }, { data: documents }] = await Promise.all([ 544 supabaseServerClient 545 .from("recommends_on_documents") 546 .select("*, identities(bsky_profiles(*))") 547 .in("uri", recommendUris), 548 supabaseServerClient 549 .from("documents") 550 .select("*, documents_in_publications(publications(*))") 551 .in("uri", documentUris), 552 ]); 553 554 return recommendNotifications 555 .map((notification) => { 556 const recommendData = recommends?.find((r) => r.uri === notification.data.recommend_uri); 557 const document = documents?.find((d) => d.uri === notification.data.document_uri); 558 if (!recommendData || !document) return null; 559 return { 560 id: notification.id, 561 recipient: notification.recipient, 562 created_at: notification.created_at, 563 type: "recommend" as const, 564 recommend_uri: notification.data.recommend_uri, 565 document_uri: notification.data.document_uri, 566 recommendData, 567 document, 568 normalizedDocument: normalizeDocumentRecord(document.data, document.uri), 569 normalizedPublication: normalizePublicationRecord( 570 document.documents_in_publications[0]?.publications?.record, 571 ), 572 }; 573 }) 574 .filter((n) => n !== null); 575} 576 577export async function pingIdentityToUpdateNotification(did: string) { 578 let channel = supabaseServerClient.channel(`identity.atp_did:${did}`); 579 await channel.send({ 580 type: "broadcast", 581 event: "notification", 582 payload: { message: "poke" }, 583 }); 584 await supabaseServerClient.removeChannel(channel); 585}