Auto-indexing service and GraphQL API for AT Protocol Records

feat: add notifications query with rkey-based sorting

Add cross-collection notifications query that finds records mentioning a DID:
- notifications(viewerDid, collections, first, after) GraphQL query
- NotificationRecord union type for type-safe results
- RecordCollection enum for collection filtering
- Real-time notificationCreated subscription with event filtering

Sort notifications by rkey (TID) for chronological ordering:
- Add rkey generated column to record table (VIRTUAL for SQLite, STORED for PostgreSQL)
- Update pagination to support rkey field extraction
- Cursor pagination uses rkey|uri format

Includes comprehensive test coverage for repository, e2e, and subscription handlers.

+2724 -77
+11
CHANGELOG.md
··· 5 5 The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), 6 6 and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). 7 7 8 + ## v0.18.0 9 + 10 + ### Added 11 + - Add `notifications` GraphQL query for cross-collection DID mention search 12 + - Add `notificationCreated` GraphQL subscription for real-time notifications 13 + - Auto-generate `RecordCollection` enum from registered lexicons 14 + - Auto-generate `NotificationRecord` union type from record types 15 + 16 + ### Changed 17 + - Upgrade swell to 2.1.3 (fixes NonNull-wrapped union type resolution) 18 + 8 19 ## v0.17.5 9 20 10 21 ### Fixed
+1187
dev-docs/plans/2025-12-26-notifications.md
··· 1 + # Notifications GraphQL Query Implementation Plan 2 + 3 + > **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. 4 + 5 + **Goal:** Add a `notifications` query and `notificationCreated` subscription that returns all records referencing a given DID, with typed results based on collection. 6 + 7 + **Architecture:** Cross-collection search using SQL LIKE on record JSON. Returns a union type of all record types auto-generated from lexicons. Uses existing cursor-based pagination. Subscription hooks into existing PubSub system. 8 + 9 + **Tech Stack:** Gleam, SQLite/PostgreSQL via Executor abstraction, GraphQL via lexicon_graphql 10 + 11 + --- 12 + 13 + ## Task 1: Database Repository - Add get_notifications function 14 + 15 + **Files:** 16 + - Modify: `/Users/chadmiller/code/quickslice/server/src/database/repositories/records.gleam` 17 + - Test: `/Users/chadmiller/code/quickslice/server/test/database/repositories/notifications_test.gleam` 18 + 19 + ### Step 1: Write the failing test 20 + 21 + Create a new test file: 22 + 23 + ```gleam 24 + // /Users/chadmiller/code/quickslice/server/test/database/repositories/notifications_test.gleam 25 + import database/executor 26 + import database/repositories/records 27 + import database/types.{Record} 28 + import gleam/option.{None, Some} 29 + import gleeunit/should 30 + import test_helpers 31 + 32 + pub fn get_notifications_returns_records_mentioning_did_test() { 33 + let assert Ok(db) = test_helpers.create_test_db() 34 + let assert Ok(_) = test_helpers.create_record_table(db) 35 + 36 + // Insert a record that mentions did:plc:target 37 + let assert Ok(_) = 38 + records.insert( 39 + db, 40 + "at://did:plc:author/app.bsky.feed.like/abc", 41 + "bafy123", 42 + "did:plc:author", 43 + "app.bsky.feed.like", 44 + "{\"subject\":{\"uri\":\"at://did:plc:target/app.bsky.feed.post/xyz\"}}", 45 + ) 46 + 47 + // Insert a record by the target (should be excluded) 48 + let assert Ok(_) = 49 + records.insert( 50 + db, 51 + "at://did:plc:target/app.bsky.feed.post/xyz", 52 + "bafy456", 53 + "did:plc:target", 54 + "app.bsky.feed.post", 55 + "{\"text\":\"Hello world\"}", 56 + ) 57 + 58 + // Insert a record that doesn't mention the target 59 + let assert Ok(_) = 60 + records.insert( 61 + db, 62 + "at://did:plc:other/app.bsky.feed.post/zzz", 63 + "bafy789", 64 + "did:plc:other", 65 + "app.bsky.feed.post", 66 + "{\"text\":\"Unrelated post\"}", 67 + ) 68 + 69 + let assert Ok(#(results, _cursor, _has_next, _has_prev)) = 70 + records.get_notifications(db, "did:plc:target", None, None, None) 71 + 72 + // Should only return the like, not the target's own post or unrelated post 73 + results |> should.equal([ 74 + Record( 75 + uri: "at://did:plc:author/app.bsky.feed.like/abc", 76 + cid: "bafy123", 77 + did: "did:plc:author", 78 + collection: "app.bsky.feed.like", 79 + json: "{\"subject\":{\"uri\":\"at://did:plc:target/app.bsky.feed.post/xyz\"}}", 80 + indexed_at: results |> list.first |> result.unwrap(Record("", "", "", "", "", "")) |> fn(r) { r.indexed_at }, 81 + ), 82 + ]) 83 + } 84 + ``` 85 + 86 + ### Step 2: Run test to verify it fails 87 + 88 + Run: `cd /Users/chadmiller/code/quickslice/server && gleam test -- --filter=get_notifications` 89 + Expected: FAIL with "function get_notifications not defined" or similar 90 + 91 + ### Step 3: Write minimal implementation 92 + 93 + Add to `/Users/chadmiller/code/quickslice/server/src/database/repositories/records.gleam`: 94 + 95 + ```gleam 96 + /// Get records that mention the given DID (excluding records authored by that DID) 97 + pub fn get_notifications( 98 + exec: Executor, 99 + did: String, 100 + collections: option.Option(List(String)), 101 + first: option.Option(Int), 102 + after: option.Option(String), 103 + ) -> Result(#(List(Record), option.Option(String), Bool, Bool), DbError) { 104 + let limit = option.unwrap(first, 50) 105 + let pattern = "%" <> did <> "%" 106 + 107 + // Build collection filter 108 + let #(collection_clause, collection_params) = case collections { 109 + option.None -> #("", []) 110 + option.Some([]) -> #("", []) 111 + option.Some(cols) -> { 112 + let placeholders = 113 + cols 114 + |> list.index_map(fn(_, i) { executor.placeholder(exec, i + 3) }) 115 + |> string.join(", ") 116 + #(" AND collection IN (" <> placeholders <> ")", list.map(cols, Text)) 117 + } 118 + } 119 + 120 + // Build cursor clause 121 + let #(cursor_clause, cursor_params) = case after { 122 + option.None -> #("", []) 123 + option.Some(cursor) -> { 124 + case pagination.decode_cursor(cursor) { 125 + Ok(decoded) -> { 126 + let idx = 3 + list.length(collection_params) 127 + #( 128 + " AND (COALESCE(" <> executor.json_extract(exec, "json", "updatedAt") <> ", " <> executor.json_extract(exec, "json", "createdAt") <> "), uri) < (" <> executor.placeholder(exec, idx) <> ", " <> executor.placeholder(exec, idx + 1) <> ")", 129 + [Text(decoded.field_values |> list.first |> result.unwrap("")), Text(decoded.cid)], 130 + ) 131 + } 132 + Error(_) -> #("", []) 133 + } 134 + } 135 + } 136 + 137 + let sql = 138 + "SELECT " 139 + <> record_columns(exec) 140 + <> " FROM record WHERE json LIKE " 141 + <> executor.placeholder(exec, 1) 142 + <> " AND did != " 143 + <> executor.placeholder(exec, 2) 144 + <> collection_clause 145 + <> cursor_clause 146 + <> " ORDER BY COALESCE(" 147 + <> executor.json_extract(exec, "json", "updatedAt") 148 + <> ", " 149 + <> executor.json_extract(exec, "json", "createdAt") 150 + <> ") DESC, uri DESC LIMIT " 151 + <> int.to_string(limit + 1) 152 + 153 + let params = 154 + [Text(pattern), Text(did)] 155 + |> list.append(collection_params) 156 + |> list.append(cursor_params) 157 + 158 + use results <- result.try(executor.query(exec, sql, params, record_decoder())) 159 + 160 + let has_next = list.length(results) > limit 161 + let trimmed = results |> list.take(limit) 162 + let end_cursor = case list.last(trimmed) { 163 + Ok(record) -> { 164 + let sort_value = 165 + record.json 166 + |> json.decode(dynamic.dynamic) 167 + |> result.map(fn(d) { 168 + d 169 + |> dynamic.field("updatedAt", dynamic.string) 170 + |> result.lazy_or(fn() { 171 + dynamic.field("createdAt", dynamic.string)(d) 172 + }) 173 + |> result.unwrap(record.indexed_at) 174 + }) 175 + |> result.unwrap(record.indexed_at) 176 + option.Some(pagination.encode_cursor([sort_value], record.uri)) 177 + } 178 + Error(_) -> option.None 179 + } 180 + 181 + Ok(#(trimmed, end_cursor, has_next, False)) 182 + } 183 + ``` 184 + 185 + ### Step 4: Run test to verify it passes 186 + 187 + Run: `cd /Users/chadmiller/code/quickslice/server && gleam test -- --filter=get_notifications` 188 + Expected: PASS 189 + 190 + ### Step 5: Commit 191 + 192 + ```bash 193 + git add server/src/database/repositories/records.gleam server/test/database/repositories/notifications_test.gleam 194 + git commit -m "feat(db): add get_notifications repository function" 195 + ``` 196 + 197 + --- 198 + 199 + ## Task 2: Add collection filter test 200 + 201 + **Files:** 202 + - Test: `/Users/chadmiller/code/quickslice/server/test/database/repositories/notifications_test.gleam` 203 + 204 + ### Step 1: Write the failing test 205 + 206 + Add to the test file: 207 + 208 + ```gleam 209 + pub fn get_notifications_filters_by_collection_test() { 210 + let assert Ok(db) = test_helpers.create_test_db() 211 + let assert Ok(_) = test_helpers.create_record_table(db) 212 + 213 + // Insert a like mentioning target 214 + let assert Ok(_) = 215 + records.insert( 216 + db, 217 + "at://did:plc:author/app.bsky.feed.like/abc", 218 + "bafy123", 219 + "did:plc:author", 220 + "app.bsky.feed.like", 221 + "{\"subject\":{\"uri\":\"at://did:plc:target/app.bsky.feed.post/xyz\"}}", 222 + ) 223 + 224 + // Insert a follow mentioning target 225 + let assert Ok(_) = 226 + records.insert( 227 + db, 228 + "at://did:plc:author/app.bsky.graph.follow/def", 229 + "bafy456", 230 + "did:plc:author", 231 + "app.bsky.graph.follow", 232 + "{\"subject\":\"did:plc:target\"}", 233 + ) 234 + 235 + // Filter to only likes 236 + let assert Ok(#(results, _, _, _)) = 237 + records.get_notifications( 238 + db, 239 + "did:plc:target", 240 + Some(["app.bsky.feed.like"]), 241 + None, 242 + None, 243 + ) 244 + 245 + list.length(results) |> should.equal(1) 246 + let assert Ok(first) = list.first(results) 247 + first.collection |> should.equal("app.bsky.feed.like") 248 + } 249 + ``` 250 + 251 + ### Step 2: Run test to verify it passes 252 + 253 + Run: `cd /Users/chadmiller/code/quickslice/server && gleam test -- --filter=filters_by_collection` 254 + Expected: PASS (implementation already supports this) 255 + 256 + ### Step 3: Commit 257 + 258 + ```bash 259 + git add server/test/database/repositories/notifications_test.gleam 260 + git commit -m "test(db): add collection filter test for notifications" 261 + ``` 262 + 263 + --- 264 + 265 + ## Task 3: Schema Generation - Add RecordCollection enum 266 + 267 + **Files:** 268 + - Modify: `/Users/chadmiller/code/quickslice/lexicon_graphql/src/lexicon_graphql/schema/database.gleam` 269 + - Test: `/Users/chadmiller/code/quickslice/lexicon_graphql/test/schema/notifications_schema_test.gleam` 270 + 271 + ### Step 1: Write the failing test 272 + 273 + ```gleam 274 + // /Users/chadmiller/code/quickslice/lexicon_graphql/test/schema/notifications_schema_test.gleam 275 + import gleam/list 276 + import gleeunit/should 277 + import lexicon_graphql 278 + import lexicon_graphql/schema/database 279 + 280 + pub fn builds_record_collection_enum_test() { 281 + let lexicon_json = 282 + "{\"lexicon\":1,\"id\":\"app.bsky.feed.post\",\"defs\":{\"main\":{\"type\":\"record\",\"record\":{\"properties\":{\"text\":{\"type\":\"string\"}}}}}}" 283 + 284 + let assert Ok(parsed) = lexicon_graphql.parse_lexicon(lexicon_json) 285 + 286 + let enum_result = database.build_record_collection_enum([parsed]) 287 + 288 + enum_result |> should.be_ok() 289 + let assert Ok(enum_type) = enum_result 290 + // Enum should have the collection NSID as a value 291 + // APP_BSKY_FEED_POST maps to "app.bsky.feed.post" 292 + } 293 + ``` 294 + 295 + ### Step 2: Run test to verify it fails 296 + 297 + Run: `cd /Users/chadmiller/code/quickslice/lexicon_graphql && gleam test -- --filter=builds_record_collection_enum` 298 + Expected: FAIL with "function build_record_collection_enum not defined" 299 + 300 + ### Step 3: Write minimal implementation 301 + 302 + Add to `/Users/chadmiller/code/quickslice/lexicon_graphql/src/lexicon_graphql/schema/database.gleam`: 303 + 304 + ```gleam 305 + /// Convert NSID to GraphQL enum value (app.bsky.feed.post -> APP_BSKY_FEED_POST) 306 + pub fn nsid_to_enum_value(nsid: String) -> String { 307 + nsid 308 + |> string.replace(".", "_") 309 + |> string.uppercase() 310 + } 311 + 312 + /// Convert GraphQL enum value back to NSID (APP_BSKY_FEED_POST -> app.bsky.feed.post) 313 + pub fn enum_value_to_nsid(enum_value: String) -> String { 314 + enum_value 315 + |> string.lowercase() 316 + |> string.replace("_", ".") 317 + } 318 + 319 + /// Build RecordCollection enum from parsed lexicons 320 + pub fn build_record_collection_enum( 321 + lexicons: List(types.Lexicon), 322 + ) -> Result(schema.Type, String) { 323 + let record_nsids = 324 + lexicons 325 + |> list.filter_map(fn(lex) { 326 + case dict.get(lex.defs, "main") { 327 + Ok(def) -> 328 + case def { 329 + types.RecordDef(_) -> Ok(lex.id) 330 + _ -> Error(Nil) 331 + } 332 + Error(_) -> Error(Nil) 333 + } 334 + }) 335 + 336 + let enum_values = 337 + record_nsids 338 + |> list.map(fn(nsid) { 339 + schema.enum_value(nsid_to_enum_value(nsid), "Collection: " <> nsid) 340 + }) 341 + 342 + Ok(schema.enum_type( 343 + "RecordCollection", 344 + "Available record collection types", 345 + enum_values, 346 + )) 347 + } 348 + ``` 349 + 350 + ### Step 4: Run test to verify it passes 351 + 352 + Run: `cd /Users/chadmiller/code/quickslice/lexicon_graphql && gleam test -- --filter=builds_record_collection_enum` 353 + Expected: PASS 354 + 355 + ### Step 5: Commit 356 + 357 + ```bash 358 + git add lexicon_graphql/src/lexicon_graphql/schema/database.gleam lexicon_graphql/test/schema/notifications_schema_test.gleam 359 + git commit -m "feat(schema): add RecordCollection enum generation" 360 + ``` 361 + 362 + --- 363 + 364 + ## Task 4: Schema Generation - Add NotificationRecord union type 365 + 366 + **Files:** 367 + - Modify: `/Users/chadmiller/code/quickslice/lexicon_graphql/src/lexicon_graphql/schema/database.gleam` 368 + - Test: `/Users/chadmiller/code/quickslice/lexicon_graphql/test/schema/notifications_schema_test.gleam` 369 + 370 + ### Step 1: Write the failing test 371 + 372 + Add to test file: 373 + 374 + ```gleam 375 + pub fn builds_notification_record_union_test() { 376 + let post_lexicon = 377 + "{\"lexicon\":1,\"id\":\"app.bsky.feed.post\",\"defs\":{\"main\":{\"type\":\"record\",\"record\":{\"properties\":{\"text\":{\"type\":\"string\"}}}}}}" 378 + let like_lexicon = 379 + "{\"lexicon\":1,\"id\":\"app.bsky.feed.like\",\"defs\":{\"main\":{\"type\":\"record\",\"record\":{\"properties\":{\"subject\":{\"type\":\"ref\"}}}}}}" 380 + 381 + let assert Ok(post_parsed) = lexicon_graphql.parse_lexicon(post_lexicon) 382 + let assert Ok(like_parsed) = lexicon_graphql.parse_lexicon(like_lexicon) 383 + 384 + let union_result = 385 + database.build_notification_record_union([post_parsed, like_parsed]) 386 + 387 + union_result |> should.be_ok() 388 + } 389 + ``` 390 + 391 + ### Step 2: Run test to verify it fails 392 + 393 + Run: `cd /Users/chadmiller/code/quickslice/lexicon_graphql && gleam test -- --filter=builds_notification_record_union` 394 + Expected: FAIL 395 + 396 + ### Step 3: Write minimal implementation 397 + 398 + Add to database.gleam: 399 + 400 + ```gleam 401 + /// Build NotificationRecord union from all record types 402 + pub fn build_notification_record_union( 403 + lexicons: List(types.Lexicon), 404 + ) -> Result(schema.Type, String) { 405 + let record_type_names = 406 + lexicons 407 + |> list.filter_map(fn(lex) { 408 + case dict.get(lex.defs, "main") { 409 + Ok(def) -> 410 + case def { 411 + types.RecordDef(_) -> Ok(nsid_to_type_name(lex.id)) 412 + _ -> Error(Nil) 413 + } 414 + Error(_) -> Error(Nil) 415 + } 416 + }) 417 + 418 + Ok(schema.union_type( 419 + "NotificationRecord", 420 + "Union of all record types for notifications", 421 + record_type_names, 422 + )) 423 + } 424 + 425 + /// Convert NSID to GraphQL type name (app.bsky.feed.post -> AppBskyFeedPost) 426 + fn nsid_to_type_name(nsid: String) -> String { 427 + nsid 428 + |> string.split(".") 429 + |> list.map(string.capitalise) 430 + |> string.join("") 431 + } 432 + ``` 433 + 434 + ### Step 4: Run test to verify it passes 435 + 436 + Run: `cd /Users/chadmiller/code/quickslice/lexicon_graphql && gleam test -- --filter=builds_notification_record_union` 437 + Expected: PASS 438 + 439 + ### Step 5: Commit 440 + 441 + ```bash 442 + git add lexicon_graphql/src/lexicon_graphql/schema/database.gleam lexicon_graphql/test/schema/notifications_schema_test.gleam 443 + git commit -m "feat(schema): add NotificationRecord union type generation" 444 + ``` 445 + 446 + --- 447 + 448 + ## Task 5: Schema Generation - Add notifications query field 449 + 450 + **Files:** 451 + - Modify: `/Users/chadmiller/code/quickslice/lexicon_graphql/src/lexicon_graphql/schema/database.gleam` 452 + 453 + ### Step 1: Write the failing test 454 + 455 + Add to test file: 456 + 457 + ```gleam 458 + pub fn schema_includes_notifications_query_test() { 459 + let lexicon_json = 460 + "{\"lexicon\":1,\"id\":\"app.bsky.feed.post\",\"defs\":{\"main\":{\"type\":\"record\",\"record\":{\"properties\":{\"text\":{\"type\":\"string\"}}}}}}" 461 + 462 + let assert Ok(parsed) = lexicon_graphql.parse_lexicon(lexicon_json) 463 + 464 + // Build schema with mock fetchers 465 + let mock_fetcher = fn(_, _) { Ok(#([], None, False, False, None)) } 466 + let mock_notification_fetcher = fn(_, _, _, _) { Ok(#([], None, False, False)) } 467 + 468 + let schema_result = 469 + database.build_schema_with_subscriptions( 470 + [parsed], 471 + mock_fetcher, 472 + None, 473 + None, 474 + None, 475 + None, 476 + None, 477 + Some(mock_notification_fetcher), 478 + ) 479 + 480 + schema_result |> should.be_ok() 481 + // Schema should include "notifications" query field 482 + } 483 + ``` 484 + 485 + ### Step 2: Run test to verify it fails 486 + 487 + Run: `cd /Users/chadmiller/code/quickslice/lexicon_graphql && gleam test -- --filter=schema_includes_notifications_query` 488 + Expected: FAIL 489 + 490 + ### Step 3: Write minimal implementation 491 + 492 + Modify `build_schema_with_subscriptions` to accept notification fetcher and add the query field: 493 + 494 + ```gleam 495 + /// Type for notification fetcher function 496 + pub type NotificationFetcher = 497 + fn(String, option.Option(List(String)), option.Option(Int), option.Option(String)) -> 498 + Result(#(List(#(value.Value, String)), option.Option(String), Bool, Bool), String) 499 + 500 + pub fn build_schema_with_subscriptions( 501 + lexicons: List(types.Lexicon), 502 + record_fetcher: RecordFetcher, 503 + batch_fetcher: option.Option(BatchFetcher), 504 + paginated_batch_fetcher: option.Option(PaginatedBatchFetcher), 505 + mutation_create_factory: option.Option(MutationCreateFactory), 506 + mutation_update_factory: option.Option(MutationUpdateFactory), 507 + mutation_delete_factory: option.Option(MutationDeleteFactory), 508 + notification_fetcher: option.Option(NotificationFetcher), 509 + ) -> Result(schema.Schema, String) { 510 + // ... existing code ... 511 + 512 + // Build notification types if fetcher provided 513 + let notification_fields = case notification_fetcher { 514 + option.None -> [] 515 + option.Some(fetcher) -> { 516 + let assert Ok(collection_enum) = build_record_collection_enum(lexicons) 517 + let assert Ok(notification_union) = build_notification_record_union(lexicons) 518 + 519 + let notification_edge = schema.object_type( 520 + "NotificationEdge", 521 + "An edge in a notification connection", 522 + [ 523 + schema.field("node", schema.non_null(notification_union), "The notification record", fn(ctx) { 524 + case ctx.parent { 525 + option.Some(value.Object(fields)) -> 526 + list.find(fields, fn(f) { f.0 == "node" }) 527 + |> result.map(fn(f) { f.1 }) 528 + |> result.replace_error("node not found") 529 + _ -> Error("Invalid parent") 530 + } 531 + }), 532 + schema.field("cursor", schema.non_null(schema.string()), "Cursor for pagination", fn(ctx) { 533 + case ctx.parent { 534 + option.Some(value.Object(fields)) -> 535 + list.find(fields, fn(f) { f.0 == "cursor" }) 536 + |> result.map(fn(f) { f.1 }) 537 + |> result.replace_error("cursor not found") 538 + _ -> Error("Invalid parent") 539 + } 540 + }), 541 + ], 542 + ) 543 + 544 + let notification_connection = schema.object_type( 545 + "NotificationConnection", 546 + "A connection to a list of notifications", 547 + [ 548 + schema.field("edges", schema.non_null(schema.list(schema.non_null(notification_edge))), "The edges", fn(ctx) { 549 + case ctx.parent { 550 + option.Some(value.Object(fields)) -> 551 + list.find(fields, fn(f) { f.0 == "edges" }) 552 + |> result.map(fn(f) { f.1 }) 553 + |> result.replace_error("edges not found") 554 + _ -> Error("Invalid parent") 555 + } 556 + }), 557 + schema.field("pageInfo", schema.non_null(page_info_type), "Pagination info", fn(ctx) { 558 + case ctx.parent { 559 + option.Some(value.Object(fields)) -> 560 + list.find(fields, fn(f) { f.0 == "pageInfo" }) 561 + |> result.map(fn(f) { f.1 }) 562 + |> result.replace_error("pageInfo not found") 563 + _ -> Error("Invalid parent") 564 + } 565 + }), 566 + ], 567 + ) 568 + 569 + [ 570 + schema.field( 571 + "notifications", 572 + schema.non_null(notification_connection), 573 + "Get records that mention the given DID", 574 + [ 575 + schema.argument("did", schema.non_null(schema.string()), "The DID to find notifications for"), 576 + schema.argument("collections", schema.list(schema.non_null(collection_enum)), "Filter to specific collections"), 577 + schema.argument("first", schema.int(), "Number of results to return"), 578 + schema.argument("after", schema.string(), "Cursor for pagination"), 579 + ], 580 + fn(ctx) { 581 + let did = get_string_arg(ctx.arguments, "did") |> result.unwrap("") 582 + let collections = get_string_list_arg(ctx.arguments, "collections") 583 + let first = get_int_arg(ctx.arguments, "first") 584 + let after = get_string_arg_option(ctx.arguments, "after") 585 + 586 + // Convert enum values back to NSIDs 587 + let nsid_collections = option.map(collections, fn(cols) { 588 + list.map(cols, enum_value_to_nsid) 589 + }) 590 + 591 + use results <- result.try(fetcher(did, nsid_collections, first, after)) 592 + let #(records, end_cursor, has_next, has_prev) = results 593 + 594 + let edges = list.map(records, fn(r) { 595 + let #(record_value, cursor) = r 596 + value.Object([ 597 + #("node", record_value), 598 + #("cursor", value.String(cursor)), 599 + ]) 600 + }) 601 + 602 + Ok(value.Object([ 603 + #("edges", value.List(edges)), 604 + #("pageInfo", value.Object([ 605 + #("hasNextPage", value.Boolean(has_next)), 606 + #("hasPreviousPage", value.Boolean(has_prev)), 607 + #("startCursor", case list.first(records) { 608 + Ok(#(_, c)) -> value.String(c) 609 + Error(_) -> value.Null 610 + }), 611 + #("endCursor", case end_cursor { 612 + option.Some(c) -> value.String(c) 613 + option.None -> value.Null 614 + }), 615 + ])), 616 + ])) 617 + }, 618 + ), 619 + ] 620 + } 621 + } 622 + 623 + // Add notification_fields to query_fields 624 + let all_query_fields = list.append(query_fields, notification_fields) 625 + 626 + // ... rest of existing code using all_query_fields ... 627 + } 628 + ``` 629 + 630 + ### Step 4: Run test to verify it passes 631 + 632 + Run: `cd /Users/chadmiller/code/quickslice/lexicon_graphql && gleam test -- --filter=schema_includes_notifications_query` 633 + Expected: PASS 634 + 635 + ### Step 5: Commit 636 + 637 + ```bash 638 + git add lexicon_graphql/src/lexicon_graphql/schema/database.gleam lexicon_graphql/test/schema/notifications_schema_test.gleam 639 + git commit -m "feat(schema): add notifications query field to schema" 640 + ``` 641 + 642 + --- 643 + 644 + ## Task 6: Schema Generation - Add notificationCreated subscription 645 + 646 + **Files:** 647 + - Modify: `/Users/chadmiller/code/quickslice/lexicon_graphql/src/lexicon_graphql/schema/database.gleam` 648 + 649 + ### Step 1: Write the failing test 650 + 651 + Add to test file: 652 + 653 + ```gleam 654 + pub fn schema_includes_notification_created_subscription_test() { 655 + let lexicon_json = 656 + "{\"lexicon\":1,\"id\":\"app.bsky.feed.post\",\"defs\":{\"main\":{\"type\":\"record\",\"record\":{\"properties\":{\"text\":{\"type\":\"string\"}}}}}}" 657 + 658 + let assert Ok(parsed) = lexicon_graphql.parse_lexicon(lexicon_json) 659 + 660 + let mock_fetcher = fn(_, _) { Ok(#([], None, False, False, None)) } 661 + let mock_notification_fetcher = fn(_, _, _, _) { Ok(#([], None, False, False)) } 662 + 663 + let schema_result = 664 + database.build_schema_with_subscriptions( 665 + [parsed], 666 + mock_fetcher, 667 + None, 668 + None, 669 + None, 670 + None, 671 + None, 672 + Some(mock_notification_fetcher), 673 + ) 674 + 675 + schema_result |> should.be_ok() 676 + // Schema subscription type should include "notificationCreated" field 677 + } 678 + ``` 679 + 680 + ### Step 2: Run test to verify it fails 681 + 682 + Run: `cd /Users/chadmiller/code/quickslice/lexicon_graphql && gleam test -- --filter=notification_created_subscription` 683 + Expected: FAIL 684 + 685 + ### Step 3: Write minimal implementation 686 + 687 + Add to the subscription type building in `build_subscription_type`: 688 + 689 + ```gleam 690 + fn build_subscription_type( 691 + record_types: List(RecordType), 692 + object_types: dict.Dict(String, schema.Type), 693 + notification_union: option.Option(schema.Type), 694 + collection_enum: option.Option(schema.Type), 695 + ) -> schema.Type { 696 + // Existing subscription fields for each record type 697 + let record_subscription_fields = list.flat_map(record_types, fn(record_type) { 698 + // ... existing code for Created/Updated/Deleted per type ... 699 + }) 700 + 701 + // Add notificationCreated subscription if union provided 702 + let notification_subscription_fields = case notification_union, collection_enum { 703 + option.Some(union_type), option.Some(enum_type) -> [ 704 + schema.subscription_field( 705 + "notificationCreated", 706 + schema.non_null(union_type), 707 + "Emitted when a new record is created that mentions the subscribed DID", 708 + [ 709 + schema.argument("did", schema.non_null(schema.string()), "The DID to watch for mentions"), 710 + schema.argument("collections", schema.list(schema.non_null(enum_type)), "Filter to specific collections"), 711 + ], 712 + fn(ctx) { 713 + case ctx.data { 714 + option.Some(data) -> Ok(data) 715 + option.None -> Error("Subscription resolver called without event data") 716 + } 717 + }, 718 + ), 719 + ] 720 + _, _ -> [] 721 + } 722 + 723 + let all_subscription_fields = 724 + list.append(record_subscription_fields, notification_subscription_fields) 725 + 726 + schema.object_type("Subscription", "GraphQL subscription root", all_subscription_fields) 727 + } 728 + ``` 729 + 730 + ### Step 4: Run test to verify it passes 731 + 732 + Run: `cd /Users/chadmiller/code/quickslice/lexicon_graphql && gleam test -- --filter=notification_created_subscription` 733 + Expected: PASS 734 + 735 + ### Step 5: Commit 736 + 737 + ```bash 738 + git add lexicon_graphql/src/lexicon_graphql/schema/database.gleam lexicon_graphql/test/schema/notifications_schema_test.gleam 739 + git commit -m "feat(schema): add notificationCreated subscription field" 740 + ``` 741 + 742 + --- 743 + 744 + ## Task 7: Server Integration - Add notification fetcher 745 + 746 + **Files:** 747 + - Modify: `/Users/chadmiller/code/quickslice/server/src/graphql/lexicon/fetchers.gleam` 748 + - Modify: `/Users/chadmiller/code/quickslice/server/src/graphql/lexicon/schema.gleam` 749 + 750 + ### Step 1: Write the failing test 751 + 752 + Create integration test: 753 + 754 + ```gleam 755 + // /Users/chadmiller/code/quickslice/server/test/graphql/notifications_integration_test.gleam 756 + import database/repositories/lexicons 757 + import database/repositories/records 758 + import gleam/json 759 + import gleeunit/should 760 + import graphql/lexicon/schema as lexicon_schema 761 + import test_helpers 762 + 763 + pub fn notifications_query_returns_mentioning_records_test() { 764 + let assert Ok(db) = test_helpers.create_test_db() 765 + let assert Ok(_) = test_helpers.create_record_table(db) 766 + let assert Ok(_) = test_helpers.create_actor_table(db) 767 + let assert Ok(_) = test_helpers.create_lexicon_table(db) 768 + 769 + // Insert lexicons 770 + let post_lexicon = 771 + "{\"lexicon\":1,\"id\":\"app.bsky.feed.post\",\"defs\":{\"main\":{\"type\":\"record\",\"record\":{\"properties\":{\"text\":{\"type\":\"string\"},\"createdAt\":{\"type\":\"string\",\"format\":\"datetime\"}}}}}}" 772 + let like_lexicon = 773 + "{\"lexicon\":1,\"id\":\"app.bsky.feed.like\",\"defs\":{\"main\":{\"type\":\"record\",\"record\":{\"properties\":{\"subject\":{\"type\":\"ref\"},\"createdAt\":{\"type\":\"string\",\"format\":\"datetime\"}}}}}}" 774 + 775 + let assert Ok(_) = lexicons.insert(db, "app.bsky.feed.post", post_lexicon) 776 + let assert Ok(_) = lexicons.insert(db, "app.bsky.feed.like", like_lexicon) 777 + 778 + // Insert a like that mentions the target 779 + let assert Ok(_) = 780 + records.insert( 781 + db, 782 + "at://did:plc:author/app.bsky.feed.like/abc", 783 + "bafy123", 784 + "did:plc:author", 785 + "app.bsky.feed.like", 786 + "{\"subject\":{\"uri\":\"at://did:plc:target/app.bsky.feed.post/xyz\"},\"createdAt\":\"2024-01-01T00:00:00Z\"}", 787 + ) 788 + 789 + // Execute notifications query 790 + let query = 791 + "query { notifications(did: \"did:plc:target\", first: 10) { edges { node { ... on AppBskyFeedLike { uri value { createdAt } } } cursor } pageInfo { hasNextPage } } }" 792 + 793 + let assert Ok(result) = lexicon_schema.execute_query_with_db(db, query, "{}", None) 794 + 795 + // Should return the like 796 + result |> should.not_equal("{\"data\":{\"notifications\":{\"edges\":[],\"pageInfo\":{\"hasNextPage\":false}}}}") 797 + } 798 + ``` 799 + 800 + ### Step 2: Run test to verify it fails 801 + 802 + Run: `cd /Users/chadmiller/code/quickslice/server && gleam test -- --filter=notifications_query_returns` 803 + Expected: FAIL 804 + 805 + ### Step 3: Write minimal implementation 806 + 807 + Add to `/Users/chadmiller/code/quickslice/server/src/graphql/lexicon/fetchers.gleam`: 808 + 809 + ```gleam 810 + /// Create a notification fetcher that queries records mentioning a DID 811 + pub fn notification_fetcher(db: Executor) { 812 + fn( 813 + did: String, 814 + collections: option.Option(List(String)), 815 + first: option.Option(Int), 816 + after: option.Option(String), 817 + ) -> Result(#(List(#(value.Value, String)), option.Option(String), Bool, Bool), String) { 818 + use result <- result.try( 819 + records.get_notifications(db, did, collections, first, after) 820 + |> result.map_error(fn(e) { "Database error: " <> string.inspect(e) }), 821 + ) 822 + 823 + let #(records_list, end_cursor, has_next, has_prev) = result 824 + 825 + let converted = 826 + list.map(records_list, fn(record) { 827 + let record_value = converters.record_to_graphql_value(record, db) 828 + let cursor = pagination.encode_cursor_from_record(record) 829 + #(record_value, cursor) 830 + }) 831 + 832 + Ok(#(converted, end_cursor, has_next, has_prev)) 833 + } 834 + } 835 + ``` 836 + 837 + Update `/Users/chadmiller/code/quickslice/server/src/graphql/lexicon/schema.gleam` to pass the notification fetcher: 838 + 839 + ```gleam 840 + pub fn build_schema_from_db(db: Executor) -> Result(schema.Schema, String) { 841 + // ... existing code ... 842 + 843 + database.build_schema_with_subscriptions( 844 + parsed_lexicons, 845 + record_fetcher(db), 846 + option.Some(batch_fetcher(db)), 847 + option.Some(paginated_batch_fetcher(db)), 848 + mutation_create_factory, 849 + mutation_update_factory, 850 + mutation_delete_factory, 851 + option.Some(notification_fetcher(db)), // Add this 852 + ) 853 + } 854 + ``` 855 + 856 + ### Step 4: Run test to verify it passes 857 + 858 + Run: `cd /Users/chadmiller/code/quickslice/server && gleam test -- --filter=notifications_query_returns` 859 + Expected: PASS 860 + 861 + ### Step 5: Commit 862 + 863 + ```bash 864 + git add server/src/graphql/lexicon/fetchers.gleam server/src/graphql/lexicon/schema.gleam server/test/graphql/notifications_integration_test.gleam 865 + git commit -m "feat(server): integrate notification fetcher with schema" 866 + ``` 867 + 868 + --- 869 + 870 + ## Task 8: Subscription Handler - Filter events for notification subscriptions 871 + 872 + **Files:** 873 + - Modify: `/Users/chadmiller/code/quickslice/server/src/handlers/graphql_ws.gleam` 874 + 875 + ### Step 1: Write the failing test 876 + 877 + ```gleam 878 + // /Users/chadmiller/code/quickslice/server/test/handlers/notification_subscription_test.gleam 879 + import gleam/string 880 + import gleeunit/should 881 + import handlers/graphql_ws 882 + import pubsub 883 + 884 + pub fn notification_event_matches_did_test() { 885 + let event = 886 + pubsub.RecordEvent( 887 + uri: "at://did:plc:author/app.bsky.feed.like/abc", 888 + cid: "bafy123", 889 + did: "did:plc:author", 890 + collection: "app.bsky.feed.like", 891 + value: "{\"subject\":{\"uri\":\"at://did:plc:target/app.bsky.feed.post/xyz\"}}", 892 + indexed_at: "2024-01-01T00:00:00Z", 893 + operation: pubsub.Create, 894 + ) 895 + 896 + let matches = 897 + graphql_ws.event_matches_notification_subscription( 898 + event, 899 + "did:plc:target", 900 + None, 901 + ) 902 + 903 + matches |> should.be_true() 904 + } 905 + 906 + pub fn notification_event_excludes_self_authored_test() { 907 + let event = 908 + pubsub.RecordEvent( 909 + uri: "at://did:plc:target/app.bsky.feed.post/xyz", 910 + cid: "bafy123", 911 + did: "did:plc:target", 912 + collection: "app.bsky.feed.post", 913 + value: "{\"text\":\"Hello\"}", 914 + indexed_at: "2024-01-01T00:00:00Z", 915 + operation: pubsub.Create, 916 + ) 917 + 918 + let matches = 919 + graphql_ws.event_matches_notification_subscription( 920 + event, 921 + "did:plc:target", 922 + None, 923 + ) 924 + 925 + matches |> should.be_false() 926 + } 927 + 928 + pub fn notification_event_filters_by_collection_test() { 929 + let event = 930 + pubsub.RecordEvent( 931 + uri: "at://did:plc:author/app.bsky.feed.like/abc", 932 + cid: "bafy123", 933 + did: "did:plc:author", 934 + collection: "app.bsky.feed.like", 935 + value: "{\"subject\":{\"uri\":\"at://did:plc:target/app.bsky.feed.post/xyz\"}}", 936 + indexed_at: "2024-01-01T00:00:00Z", 937 + operation: pubsub.Create, 938 + ) 939 + 940 + // Filter to only posts, like should not match 941 + let matches = 942 + graphql_ws.event_matches_notification_subscription( 943 + event, 944 + "did:plc:target", 945 + Some(["app.bsky.feed.post"]), 946 + ) 947 + 948 + matches |> should.be_false() 949 + } 950 + ``` 951 + 952 + ### Step 2: Run test to verify it fails 953 + 954 + Run: `cd /Users/chadmiller/code/quickslice/server && gleam test -- --filter=notification_event` 955 + Expected: FAIL 956 + 957 + ### Step 3: Write minimal implementation 958 + 959 + Add to `/Users/chadmiller/code/quickslice/server/src/handlers/graphql_ws.gleam`: 960 + 961 + ```gleam 962 + /// Check if a record event matches a notification subscription 963 + pub fn event_matches_notification_subscription( 964 + event: pubsub.RecordEvent, 965 + subscribed_did: String, 966 + collections: option.Option(List(String)), 967 + ) -> Bool { 968 + // Event must contain the subscribed DID 969 + let contains_did = string.contains(event.value, subscribed_did) 970 + 971 + // Event must not be authored by the subscribed DID 972 + let not_self_authored = event.did != subscribed_did 973 + 974 + // Event must be a Create operation 975 + let is_create = event.operation == pubsub.Create 976 + 977 + // Event collection must match filter (if provided) 978 + let matches_collection = case collections { 979 + option.None -> True 980 + option.Some([]) -> True 981 + option.Some(cols) -> list.contains(cols, event.collection) 982 + } 983 + 984 + contains_did && not_self_authored && is_create && matches_collection 985 + } 986 + ``` 987 + 988 + Then update the subscription event loop to use this for `notificationCreated` subscriptions: 989 + 990 + ```gleam 991 + fn handle_subscription_event( 992 + event: pubsub.RecordEvent, 993 + subscription: ActiveSubscription, 994 + // ... other params 995 + ) { 996 + case subscription.field_name { 997 + "notificationCreated" -> { 998 + let did = get_subscription_arg(subscription, "did") 999 + let collections = get_subscription_arg_list(subscription, "collections") 1000 + case event_matches_notification_subscription(event, did, collections) { 1001 + True -> send_subscription_event(subscription, event) 1002 + False -> Nil 1003 + } 1004 + } 1005 + // ... existing record subscription handling ... 1006 + } 1007 + } 1008 + ``` 1009 + 1010 + ### Step 4: Run test to verify it passes 1011 + 1012 + Run: `cd /Users/chadmiller/code/quickslice/server && gleam test -- --filter=notification_event` 1013 + Expected: PASS 1014 + 1015 + ### Step 5: Commit 1016 + 1017 + ```bash 1018 + git add server/src/handlers/graphql_ws.gleam server/test/handlers/notification_subscription_test.gleam 1019 + git commit -m "feat(ws): add notification subscription event filtering" 1020 + ``` 1021 + 1022 + --- 1023 + 1024 + ## Task 9: End-to-end test 1025 + 1026 + **Files:** 1027 + - Test: `/Users/chadmiller/code/quickslice/server/test/graphql/notifications_e2e_test.gleam` 1028 + 1029 + ### Step 1: Write comprehensive e2e test 1030 + 1031 + ```gleam 1032 + // /Users/chadmiller/code/quickslice/server/test/graphql/notifications_e2e_test.gleam 1033 + import database/repositories/actors 1034 + import database/repositories/lexicons 1035 + import database/repositories/records 1036 + import gleam/json 1037 + import gleam/string 1038 + import gleeunit/should 1039 + import graphql/lexicon/schema as lexicon_schema 1040 + import test_helpers 1041 + 1042 + pub fn notifications_e2e_test() { 1043 + let assert Ok(db) = test_helpers.create_test_db() 1044 + let assert Ok(_) = test_helpers.create_record_table(db) 1045 + let assert Ok(_) = test_helpers.create_actor_table(db) 1046 + let assert Ok(_) = test_helpers.create_lexicon_table(db) 1047 + 1048 + // Setup lexicons 1049 + let post_lexicon = 1050 + "{\"lexicon\":1,\"id\":\"app.bsky.feed.post\",\"defs\":{\"main\":{\"type\":\"record\",\"record\":{\"properties\":{\"text\":{\"type\":\"string\"},\"createdAt\":{\"type\":\"string\",\"format\":\"datetime\"}}}}}}" 1051 + let like_lexicon = 1052 + "{\"lexicon\":1,\"id\":\"app.bsky.feed.like\",\"defs\":{\"main\":{\"type\":\"record\",\"record\":{\"properties\":{\"subject\":{\"type\":\"ref\"},\"createdAt\":{\"type\":\"string\",\"format\":\"datetime\"}}}}}}" 1053 + let follow_lexicon = 1054 + "{\"lexicon\":1,\"id\":\"app.bsky.graph.follow\",\"defs\":{\"main\":{\"type\":\"record\",\"record\":{\"properties\":{\"subject\":{\"type\":\"string\"},\"createdAt\":{\"type\":\"string\",\"format\":\"datetime\"}}}}}}" 1055 + 1056 + let assert Ok(_) = lexicons.insert(db, "app.bsky.feed.post", post_lexicon) 1057 + let assert Ok(_) = lexicons.insert(db, "app.bsky.feed.like", like_lexicon) 1058 + let assert Ok(_) = lexicons.insert(db, "app.bsky.graph.follow", follow_lexicon) 1059 + 1060 + // Setup actors 1061 + let assert Ok(_) = actors.upsert(db, "did:plc:target", "target.bsky.social") 1062 + let assert Ok(_) = actors.upsert(db, "did:plc:alice", "alice.bsky.social") 1063 + let assert Ok(_) = actors.upsert(db, "did:plc:bob", "bob.bsky.social") 1064 + 1065 + // Target's own post (should NOT appear in notifications) 1066 + let assert Ok(_) = 1067 + records.insert( 1068 + db, 1069 + "at://did:plc:target/app.bsky.feed.post/post1", 1070 + "bafy001", 1071 + "did:plc:target", 1072 + "app.bsky.feed.post", 1073 + "{\"text\":\"Hello world\",\"createdAt\":\"2024-01-01T00:00:00Z\"}", 1074 + ) 1075 + 1076 + // Alice likes target's post (SHOULD appear) 1077 + let assert Ok(_) = 1078 + records.insert( 1079 + db, 1080 + "at://did:plc:alice/app.bsky.feed.like/like1", 1081 + "bafy002", 1082 + "did:plc:alice", 1083 + "app.bsky.feed.like", 1084 + "{\"subject\":{\"uri\":\"at://did:plc:target/app.bsky.feed.post/post1\"},\"createdAt\":\"2024-01-02T00:00:00Z\"}", 1085 + ) 1086 + 1087 + // Bob follows target (SHOULD appear) 1088 + let assert Ok(_) = 1089 + records.insert( 1090 + db, 1091 + "at://did:plc:bob/app.bsky.graph.follow/follow1", 1092 + "bafy003", 1093 + "did:plc:bob", 1094 + "app.bsky.graph.follow", 1095 + "{\"subject\":\"did:plc:target\",\"createdAt\":\"2024-01-03T00:00:00Z\"}", 1096 + ) 1097 + 1098 + // Alice's unrelated post (should NOT appear) 1099 + let assert Ok(_) = 1100 + records.insert( 1101 + db, 1102 + "at://did:plc:alice/app.bsky.feed.post/post2", 1103 + "bafy004", 1104 + "did:plc:alice", 1105 + "app.bsky.feed.post", 1106 + "{\"text\":\"Unrelated post\",\"createdAt\":\"2024-01-04T00:00:00Z\"}", 1107 + ) 1108 + 1109 + // Query all notifications 1110 + let query = 1111 + "query { notifications(did: \"did:plc:target\", first: 10) { edges { node { ... on AppBskyFeedLike { uri did } ... on AppBskyGraphFollow { uri did } } } } }" 1112 + 1113 + let assert Ok(result) = lexicon_schema.execute_query_with_db(db, query, "{}", None) 1114 + 1115 + // Should have 2 notifications (like + follow) 1116 + result |> string.contains("did:plc:alice") |> should.be_true() 1117 + result |> string.contains("did:plc:bob") |> should.be_true() 1118 + result |> string.contains("like1") |> should.be_true() 1119 + result |> string.contains("follow1") |> should.be_true() 1120 + 1121 + // Query with collection filter (only likes) 1122 + let filtered_query = 1123 + "query { notifications(did: \"did:plc:target\", collections: [APP_BSKY_FEED_LIKE], first: 10) { edges { node { ... on AppBskyFeedLike { uri } } } } }" 1124 + 1125 + let assert Ok(filtered_result) = 1126 + lexicon_schema.execute_query_with_db(db, filtered_query, "{}", None) 1127 + 1128 + // Should only have the like 1129 + filtered_result |> string.contains("like1") |> should.be_true() 1130 + filtered_result |> string.contains("follow1") |> should.be_false() 1131 + } 1132 + ``` 1133 + 1134 + ### Step 2: Run test 1135 + 1136 + Run: `cd /Users/chadmiller/code/quickslice/server && gleam test -- --filter=notifications_e2e` 1137 + Expected: PASS 1138 + 1139 + ### Step 3: Commit 1140 + 1141 + ```bash 1142 + git add server/test/graphql/notifications_e2e_test.gleam 1143 + git commit -m "test: add notifications e2e test" 1144 + ``` 1145 + 1146 + --- 1147 + 1148 + ## Task 10: Final cleanup and documentation 1149 + 1150 + ### Step 1: Run full test suite 1151 + 1152 + Run: `cd /Users/chadmiller/code/quickslice && gleam test` 1153 + Expected: All tests pass 1154 + 1155 + ### Step 2: Update CHANGELOG if exists 1156 + 1157 + ### Step 3: Final commit 1158 + 1159 + ```bash 1160 + git add -A 1161 + git commit -m "feat: notifications GraphQL query and subscription 1162 + 1163 + - Add notifications(did, collections, first, after) query 1164 + - Add notificationCreated(did, collections) subscription 1165 + - Auto-generate RecordCollection enum from lexicons 1166 + - Auto-generate NotificationRecord union from record types 1167 + - Cross-collection DID search using SQL LIKE 1168 + - Exclude self-authored records 1169 + - Fixed newest-first sorting" 1170 + ``` 1171 + 1172 + --- 1173 + 1174 + ## Summary 1175 + 1176 + | Task | Description | Files | 1177 + |------|-------------|-------| 1178 + | 1 | Database get_notifications function | records.gleam, notifications_test.gleam | 1179 + | 2 | Collection filter test | notifications_test.gleam | 1180 + | 3 | RecordCollection enum generation | database.gleam | 1181 + | 4 | NotificationRecord union generation | database.gleam | 1182 + | 5 | notifications query field | database.gleam | 1183 + | 6 | notificationCreated subscription | database.gleam | 1184 + | 7 | Server notification fetcher integration | fetchers.gleam, schema.gleam | 1185 + | 8 | Subscription event filtering | graphql_ws.gleam | 1186 + | 9 | E2E test | notifications_e2e_test.gleam | 1187 + | 10 | Final cleanup | - |
+1 -1
lexicon_graphql/gleam.toml
··· 15 15 [dependencies] 16 16 gleam_stdlib = ">= 0.44.0 and < 2.0.0" 17 17 gleam_json = ">= 3.0.0 and < 4.0.0" 18 - swell = ">= 2.0.0 and < 3.0.0" 18 + swell = ">= 2.1.3 and < 3.0.0" 19 19 20 20 [dev-dependencies] 21 21 gleeunit = ">= 1.0.0 and < 2.0.0"
+2 -2
lexicon_graphql/manifest.toml
··· 21 21 { name = "rank", version = "1.0.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "rank", source = "hex", outer_checksum = "5660E361F0E49CBB714CC57CC4C89C63415D8986F05B2DA0C719D5642FAD91C9" }, 22 22 { name = "simplifile", version = "2.3.1", build_tools = ["gleam"], requirements = ["filepath", "gleam_stdlib"], otp_app = "simplifile", source = "hex", outer_checksum = "957E0E5B75927659F1D2A1B7B75D7B9BA96FAA8D0C53EA71C4AD9CD0C6B848F6" }, 23 23 { name = "splitter", version = "1.2.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "splitter", source = "hex", outer_checksum = "3DFD6B6C49E61EDAF6F7B27A42054A17CFF6CA2135FF553D0CB61C234D281DD0" }, 24 - { name = "swell", version = "2.1.2", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "swell", source = "hex", outer_checksum = "98177585C75E2273CA6B5D63D9D8BE072B9B0CBA6F66DA0310A44D379B89A6D1" }, 24 + { name = "swell", version = "2.1.3", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "swell", source = "hex", outer_checksum = "018FF9B3F775F101E72208B4E0EA15A670A51E59C83247DD0302C3AD8C2FE9FF" }, 25 25 { name = "term_size", version = "1.0.1", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "term_size", source = "hex", outer_checksum = "D00BD2BC8FB3EBB7E6AE076F3F1FF2AC9D5ED1805F004D0896C784D06C6645F1" }, 26 26 { name = "tom", version = "2.0.0", build_tools = ["gleam"], requirements = ["gleam_stdlib", "gleam_time"], otp_app = "tom", source = "hex", outer_checksum = "74D0C5A3761F7A7D06994755D4D5AD854122EF8E9F9F76A3E7547606D8C77091" }, 27 27 { name = "trie_again", version = "1.1.4", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "trie_again", source = "hex", outer_checksum = "E3BD66B4E126EF567EA8C4944EAB216413392ADF6C16C36047AF79EE5EF13466" }, ··· 32 32 gleam_json = { version = ">= 3.0.0 and < 4.0.0" } 33 33 gleam_stdlib = { version = ">= 0.44.0 and < 2.0.0" } 34 34 gleeunit = { version = ">= 1.0.0 and < 2.0.0" } 35 - swell = { version = ">= 2.0.0 and < 3.0.0" } 35 + swell = { version = ">= 2.1.3 and < 3.0.0" }
+5
lexicon_graphql/src/lexicon_graphql.gleam
··· 42 42 pub type ViewerFetcher = 43 43 db_schema_builder.ViewerFetcher 44 44 45 + pub type NotificationFetcher = 46 + db_schema_builder.NotificationFetcher 47 + 45 48 // Re-export main schema building functions 46 49 pub fn build_schema(lexicons: List(Lexicon)) { 47 50 schema_builder.build_schema(lexicons) ··· 58 61 upload_blob_factory: Option(mutation_builder.UploadBlobResolverFactory), 59 62 aggregate_fetcher: Option(db_schema_builder.AggregateFetcher), 60 63 viewer_fetcher: Option(ViewerFetcher), 64 + notification_fetcher: Option(NotificationFetcher), 61 65 ) { 62 66 db_schema_builder.build_schema_with_subscriptions( 63 67 lexicons, ··· 70 74 upload_blob_factory, 71 75 aggregate_fetcher, 72 76 viewer_fetcher, 77 + notification_fetcher, 73 78 ) 74 79 } 75 80
+327 -3
lexicon_graphql/src/lexicon_graphql/schema/database.gleam
··· 126 126 pub type ViewerFetcher = 127 127 fn(String) -> Result(#(String, option.Option(String)), String) 128 128 129 + /// Type for notification fetcher function 130 + /// Takes: did, optional collection filter, first (limit), after (cursor) 131 + /// Returns: (records_with_cursors, end_cursor, has_next_page, has_previous_page) 132 + pub type NotificationFetcher = 133 + fn( 134 + String, 135 + option.Option(List(String)), 136 + option.Option(Int), 137 + option.Option(String), 138 + ) -> 139 + Result( 140 + #(List(#(value.Value, String)), option.Option(String), Bool, Bool), 141 + String, 142 + ) 143 + 129 144 /// Build a GraphQL schema from lexicons with database-backed resolvers 130 145 /// 131 146 /// The fetcher parameter should be a function that queries the database for records with pagination ··· 163 178 batch_fetcher, 164 179 paginated_batch_fetcher, 165 180 option.None, 181 + option.None, 182 + option.None, 183 + option.None, 166 184 ) 167 185 168 186 // Build the mutation type with provided resolver factories ··· 198 216 upload_blob_factory: option.Option(mutation_builder.UploadBlobResolverFactory), 199 217 aggregate_fetcher: option.Option(AggregateFetcher), 200 218 viewer_fetcher: option.Option(ViewerFetcher), 219 + notification_fetcher: option.Option(NotificationFetcher), 201 220 ) -> Result(schema.Schema, String) { 202 221 case lexicons { 203 222 [] -> Error("Cannot build schema from empty lexicon list") ··· 210 229 paginated_batch_fetcher, 211 230 ) 212 231 232 + // Build notification types if fetcher provided 233 + let #(notification_union, collection_enum) = case notification_fetcher { 234 + option.Some(_) -> { 235 + let assert Ok(union) = build_notification_record_union(lexicons) 236 + let assert Ok(enum) = build_record_collection_enum(lexicons) 237 + #(option.Some(union), option.Some(enum)) 238 + } 239 + option.None -> #(option.None, option.None) 240 + } 241 + 213 242 // Build the query type (pass field_type_registry for aggregation validation) 214 243 let query_type = 215 244 build_query_type( ··· 221 250 batch_fetcher, 222 251 paginated_batch_fetcher, 223 252 viewer_fetcher, 253 + notification_fetcher, 254 + notification_union, 255 + collection_enum, 224 256 ) 225 257 226 258 // Build the mutation type ··· 236 268 237 269 // Build the subscription type 238 270 let subscription_type = 239 - build_subscription_type(record_types, object_types) 271 + build_subscription_type( 272 + record_types, 273 + object_types, 274 + notification_union, 275 + collection_enum, 276 + ) 240 277 241 278 // Create the schema with queries, mutations, and subscriptions 242 279 Ok(schema.schema_with_subscriptions( ··· 1561 1598 batch_fetcher: option.Option(dataloader.BatchFetcher), 1562 1599 paginated_batch_fetcher: option.Option(dataloader.PaginatedBatchFetcher), 1563 1600 viewer_fetcher: option.Option(ViewerFetcher), 1601 + notification_fetcher: option.Option(NotificationFetcher), 1602 + notification_union: option.Option(schema.Type), 1603 + collection_enum: option.Option(schema.Type), 1564 1604 ) -> schema.Type { 1565 1605 // Build regular query fields 1566 1606 let query_fields = ··· 1899 1939 option.None -> [] 1900 1940 } 1901 1941 1942 + // Build notifications query field if fetcher provided 1943 + let notification_fields = case notification_fetcher, notification_union { 1944 + option.Some(notif_fetcher), option.Some(union_type) -> { 1945 + // Build connection types for notifications 1946 + let edge_type = connection.edge_type("NotificationRecord", union_type) 1947 + let connection_type = 1948 + connection.connection_type("NotificationRecord", edge_type) 1949 + 1950 + // Build notification-specific args 1951 + let notification_args = build_notification_query_args(collection_enum) 1952 + 1953 + [ 1954 + schema.field_with_args( 1955 + "notifications", 1956 + connection_type, 1957 + "Query notifications for the authenticated user (records mentioning them)", 1958 + notification_args, 1959 + fn(ctx: schema.Context) { 1960 + // Get viewer DID from context (viewer field should be resolved first) 1961 + case schema.get_argument(ctx, "viewerDid") { 1962 + option.Some(value.String(viewer_did)) -> { 1963 + // Extract collection filter 1964 + let collections = case schema.get_argument(ctx, "collections") { 1965 + option.Some(value.List(items)) -> { 1966 + let nsids = 1967 + list.filter_map(items, fn(item) { 1968 + case item { 1969 + value.String(enum_val) -> 1970 + Ok(enum_value_to_nsid(enum_val)) 1971 + _ -> Error(Nil) 1972 + } 1973 + }) 1974 + case nsids { 1975 + [] -> option.None 1976 + _ -> option.Some(nsids) 1977 + } 1978 + } 1979 + _ -> option.None 1980 + } 1981 + 1982 + // Extract pagination params 1983 + let first = case schema.get_argument(ctx, "first") { 1984 + option.Some(value.Int(n)) -> option.Some(n) 1985 + _ -> option.None 1986 + } 1987 + let after = case schema.get_argument(ctx, "after") { 1988 + option.Some(value.String(cursor)) -> option.Some(cursor) 1989 + _ -> option.None 1990 + } 1991 + 1992 + // Call the notification fetcher 1993 + use #(records_with_cursors, end_cursor, has_next, has_prev) <- result.try( 1994 + notif_fetcher(viewer_did, collections, first, after), 1995 + ) 1996 + 1997 + // Build edges from records with their cursors 1998 + let edges = 1999 + list.map(records_with_cursors, fn(record_tuple) { 2000 + let #(record_value, record_cursor) = record_tuple 2001 + connection.Edge(node: record_value, cursor: record_cursor) 2002 + }) 2003 + 2004 + // Build PageInfo 2005 + let page_info = 2006 + connection.PageInfo( 2007 + has_next_page: has_next, 2008 + has_previous_page: has_prev, 2009 + start_cursor: case list.first(edges) { 2010 + Ok(edge) -> option.Some(edge.cursor) 2011 + Error(_) -> option.None 2012 + }, 2013 + end_cursor: end_cursor, 2014 + ) 2015 + 2016 + // Build Connection 2017 + let conn = 2018 + connection.Connection( 2019 + edges: edges, 2020 + page_info: page_info, 2021 + total_count: option.None, 2022 + ) 2023 + Ok(connection.connection_to_value(conn)) 2024 + } 2025 + _ -> Error("notifications query requires viewerDid argument") 2026 + } 2027 + }, 2028 + ), 2029 + ] 2030 + } 2031 + _, _ -> [] 2032 + } 2033 + 1902 2034 // Combine all query fields 1903 2035 let all_query_fields = 1904 - list.flatten([query_fields, aggregate_query_fields, viewer_field]) 2036 + list.flatten([ 2037 + query_fields, 2038 + aggregate_query_fields, 2039 + viewer_field, 2040 + notification_fields, 2041 + ]) 1905 2042 1906 2043 schema.object_type("Query", "Root query type", all_query_fields) 1907 2044 } 1908 2045 2046 + /// Build notification query arguments 2047 + fn build_notification_query_args( 2048 + collection_enum: option.Option(schema.Type), 2049 + ) -> List(schema.Argument) { 2050 + let base_args = [ 2051 + schema.argument( 2052 + "viewerDid", 2053 + schema.non_null(schema.string_type()), 2054 + "DID of the viewer to get notifications for", 2055 + option.None, 2056 + ), 2057 + schema.argument( 2058 + "first", 2059 + schema.int_type(), 2060 + "Number of notifications to return", 2061 + option.Some(value.Int(50)), 2062 + ), 2063 + schema.argument( 2064 + "after", 2065 + schema.string_type(), 2066 + "Cursor for pagination", 2067 + option.None, 2068 + ), 2069 + ] 2070 + 2071 + case collection_enum { 2072 + option.Some(enum_type) -> { 2073 + list.append(base_args, [ 2074 + schema.argument( 2075 + "collections", 2076 + schema.list_type(enum_type), 2077 + "Filter notifications by collection types", 2078 + option.None, 2079 + ), 2080 + ]) 2081 + } 2082 + option.None -> base_args 2083 + } 2084 + } 2085 + 1909 2086 /// Extract pagination parameters from GraphQL context 1910 2087 fn extract_pagination_params(ctx: schema.Context) -> dataloader.PaginationParams { 1911 2088 // Extract sortBy argument ··· 2196 2373 fn build_subscription_type( 2197 2374 record_types: List(RecordType), 2198 2375 object_types: dict.Dict(String, schema.Type), 2376 + notification_union: option.Option(schema.Type), 2377 + collection_enum: option.Option(schema.Type), 2199 2378 ) -> schema.Type { 2200 2379 let subscription_fields = 2201 2380 list.flat_map(record_types, fn(record_type) { ··· 2255 2434 2256 2435 [created_field, updated_field, deleted_field] 2257 2436 }) 2437 + 2438 + // Build notification subscription field if union type provided 2439 + let notification_subscription_fields = case notification_union { 2440 + option.Some(union_type) -> { 2441 + let notification_args = 2442 + build_notification_subscription_args(collection_enum) 2443 + [ 2444 + schema.field_with_args( 2445 + "notificationCreated", 2446 + schema.non_null(union_type), 2447 + "Emitted when a new notification is created for the subscriber", 2448 + notification_args, 2449 + fn(ctx) { 2450 + // Event data is passed via ctx.data 2451 + case ctx.data { 2452 + option.Some(data) -> Ok(data) 2453 + option.None -> 2454 + Error("Subscription resolver called without event data") 2455 + } 2456 + }, 2457 + ), 2458 + ] 2459 + } 2460 + option.None -> [] 2461 + } 2462 + 2463 + let all_subscription_fields = 2464 + list.append(subscription_fields, notification_subscription_fields) 2258 2465 2259 2466 schema.object_type( 2260 2467 "Subscription", 2261 2468 "GraphQL subscription root", 2262 - subscription_fields, 2469 + all_subscription_fields, 2263 2470 ) 2471 + } 2472 + 2473 + /// Build notification subscription arguments 2474 + fn build_notification_subscription_args( 2475 + collection_enum: option.Option(schema.Type), 2476 + ) -> List(schema.Argument) { 2477 + let base_args = [ 2478 + schema.argument( 2479 + "subscriberDid", 2480 + schema.non_null(schema.string_type()), 2481 + "DID of the subscriber to receive notifications for", 2482 + option.None, 2483 + ), 2484 + ] 2485 + 2486 + case collection_enum { 2487 + option.Some(enum_type) -> { 2488 + list.append(base_args, [ 2489 + schema.argument( 2490 + "collections", 2491 + schema.list_type(enum_type), 2492 + "Filter notifications to specific collection types", 2493 + option.None, 2494 + ), 2495 + ]) 2496 + } 2497 + option.None -> base_args 2498 + } 2264 2499 } 2265 2500 2266 2501 // ===== Aggregated Query Support ===== ··· 2763 2998 } 2764 2999 } 2765 3000 } 3001 + 3002 + // ===== Notification Schema Helpers ===== 3003 + 3004 + /// Convert NSID to GraphQL enum value (app.bsky.feed.post -> APP_BSKY_FEED_POST) 3005 + pub fn nsid_to_enum_value(nsid: String) -> String { 3006 + nsid 3007 + |> string.replace(".", "_") 3008 + |> string.uppercase() 3009 + } 3010 + 3011 + /// Convert GraphQL enum value back to NSID (APP_BSKY_FEED_POST -> app.bsky.feed.post) 3012 + pub fn enum_value_to_nsid(enum_value: String) -> String { 3013 + enum_value 3014 + |> string.lowercase() 3015 + |> string.replace("_", ".") 3016 + } 3017 + 3018 + /// Build RecordCollection enum from parsed lexicons 3019 + /// Only includes record-type lexicons (excludes object types like RichtextFacet) 3020 + pub fn build_record_collection_enum( 3021 + lexicons: List(types.Lexicon), 3022 + ) -> Result(schema.Type, String) { 3023 + // Only include lexicons where main.type_ is "record" (not "object") 3024 + let record_nsids = 3025 + lexicons 3026 + |> list.filter_map(fn(lex) { 3027 + case lex.defs.main { 3028 + option.Some(types.RecordDef(type_: "record", key: _, properties: _)) -> 3029 + Ok(lex.id) 3030 + _ -> Error(Nil) 3031 + } 3032 + }) 3033 + 3034 + let enum_values = 3035 + record_nsids 3036 + |> list.map(fn(nsid) { 3037 + schema.enum_value(nsid_to_enum_value(nsid), "Collection: " <> nsid) 3038 + }) 3039 + 3040 + Ok(schema.enum_type( 3041 + "RecordCollection", 3042 + "Available record collection types for notifications", 3043 + enum_values, 3044 + )) 3045 + } 3046 + 3047 + /// Build NotificationRecord union from all record types in lexicons 3048 + /// This union represents any record that can appear as a notification 3049 + pub fn build_notification_record_union( 3050 + lexicons: List(types.Lexicon), 3051 + ) -> Result(schema.Type, String) { 3052 + // Only include lexicons where main.type_ is "record" (not "object") 3053 + let record_type_names = 3054 + lexicons 3055 + |> list.filter_map(fn(lex) { 3056 + case lex.defs.main { 3057 + option.Some(types.RecordDef(type_: "record", key: _, properties: _)) -> 3058 + Ok(nsid.to_type_name(lex.id)) 3059 + _ -> Error(Nil) 3060 + } 3061 + }) 3062 + 3063 + // Build placeholder object types for the union 3064 + // These just need the correct names - actual resolution uses the real types 3065 + let possible_types = 3066 + record_type_names 3067 + |> list.map(fn(type_name) { 3068 + schema.object_type(type_name, "Record type: " <> type_name, []) 3069 + }) 3070 + 3071 + // Type resolver examines the "collection" field to determine the concrete type 3072 + let type_resolver = fn(ctx: schema.Context) -> Result(String, String) { 3073 + case get_field_from_context(ctx, "collection") { 3074 + Ok(collection_nsid) -> { 3075 + // Convert NSID to type name (e.g., "app.bsky.feed.post" -> "AppBskyFeedPost") 3076 + Ok(nsid.to_type_name(collection_nsid)) 3077 + } 3078 + Error(_) -> 3079 + Error("Could not determine record type: missing 'collection' field") 3080 + } 3081 + } 3082 + 3083 + Ok(schema.union_type( 3084 + "NotificationRecord", 3085 + "Union of all record types for notifications", 3086 + possible_types, 3087 + type_resolver, 3088 + )) 3089 + }
+157
lexicon_graphql/test/notifications_schema_test.gleam
··· 1 + /// Tests for notifications schema generation 2 + import gleam/dict 3 + import gleam/option.{None, Some} 4 + import gleeunit/should 5 + import lexicon_graphql/schema/database 6 + import lexicon_graphql/types 7 + 8 + pub fn builds_record_collection_enum_test() { 9 + let lexicon = 10 + types.Lexicon( 11 + id: "app.bsky.feed.post", 12 + defs: types.Defs( 13 + main: Some( 14 + types.RecordDef(type_: "record", key: None, properties: [ 15 + #( 16 + "text", 17 + types.Property( 18 + type_: "string", 19 + required: False, 20 + format: None, 21 + ref: None, 22 + refs: None, 23 + items: None, 24 + ), 25 + ), 26 + ]), 27 + ), 28 + others: dict.new(), 29 + ), 30 + ) 31 + 32 + let enum_result = database.build_record_collection_enum([lexicon]) 33 + 34 + enum_result |> should.be_ok() 35 + } 36 + 37 + pub fn builds_record_collection_enum_from_multiple_lexicons_test() { 38 + let post_lexicon = 39 + types.Lexicon( 40 + id: "app.bsky.feed.post", 41 + defs: types.Defs( 42 + main: Some( 43 + types.RecordDef(type_: "record", key: None, properties: [ 44 + #( 45 + "text", 46 + types.Property( 47 + type_: "string", 48 + required: False, 49 + format: None, 50 + ref: None, 51 + refs: None, 52 + items: None, 53 + ), 54 + ), 55 + ]), 56 + ), 57 + others: dict.new(), 58 + ), 59 + ) 60 + 61 + let like_lexicon = 62 + types.Lexicon( 63 + id: "app.bsky.feed.like", 64 + defs: types.Defs( 65 + main: Some( 66 + types.RecordDef(type_: "record", key: None, properties: [ 67 + #( 68 + "subject", 69 + types.Property( 70 + type_: "ref", 71 + required: True, 72 + format: None, 73 + ref: None, 74 + refs: None, 75 + items: None, 76 + ), 77 + ), 78 + ]), 79 + ), 80 + others: dict.new(), 81 + ), 82 + ) 83 + 84 + let enum_result = 85 + database.build_record_collection_enum([post_lexicon, like_lexicon]) 86 + 87 + enum_result |> should.be_ok() 88 + } 89 + 90 + pub fn nsid_to_enum_value_converts_correctly_test() { 91 + database.nsid_to_enum_value("app.bsky.feed.post") 92 + |> should.equal("APP_BSKY_FEED_POST") 93 + 94 + database.nsid_to_enum_value("app.bsky.graph.follow") 95 + |> should.equal("APP_BSKY_GRAPH_FOLLOW") 96 + } 97 + 98 + pub fn enum_value_to_nsid_converts_correctly_test() { 99 + database.enum_value_to_nsid("APP_BSKY_FEED_POST") 100 + |> should.equal("app.bsky.feed.post") 101 + 102 + database.enum_value_to_nsid("APP_BSKY_GRAPH_FOLLOW") 103 + |> should.equal("app.bsky.graph.follow") 104 + } 105 + 106 + pub fn builds_notification_record_union_test() { 107 + let post_lexicon = 108 + types.Lexicon( 109 + id: "app.bsky.feed.post", 110 + defs: types.Defs( 111 + main: Some( 112 + types.RecordDef(type_: "record", key: None, properties: [ 113 + #( 114 + "text", 115 + types.Property( 116 + type_: "string", 117 + required: False, 118 + format: None, 119 + ref: None, 120 + refs: None, 121 + items: None, 122 + ), 123 + ), 124 + ]), 125 + ), 126 + others: dict.new(), 127 + ), 128 + ) 129 + 130 + let like_lexicon = 131 + types.Lexicon( 132 + id: "app.bsky.feed.like", 133 + defs: types.Defs( 134 + main: Some( 135 + types.RecordDef(type_: "record", key: None, properties: [ 136 + #( 137 + "subject", 138 + types.Property( 139 + type_: "ref", 140 + required: True, 141 + format: None, 142 + ref: None, 143 + refs: None, 144 + items: None, 145 + ), 146 + ), 147 + ]), 148 + ), 149 + others: dict.new(), 150 + ), 151 + ) 152 + 153 + let union_result = 154 + database.build_notification_record_union([post_lexicon, like_lexicon]) 155 + 156 + union_result |> should.be_ok() 157 + }
+1
lexicon_graphql/test/sorting_test.gleam
··· 48 48 option.None, 49 49 option.Some(aggregate_fetcher), 50 50 option.None, 51 + option.None, 51 52 ) 52 53 { 53 54 Ok(s) -> s
+10
lexicon_graphql/test/subscription_schema_test.gleam
··· 60 60 // aggregate_fetcher 61 61 None, 62 62 // viewer_fetcher 63 + None, 64 + // notification_fetcher 63 65 ) 64 66 { 65 67 Ok(s) -> { ··· 113 115 // aggregate_fetcher 114 116 None, 115 117 // viewer_fetcher 118 + None, 119 + // notification_fetcher 116 120 ) 117 121 { 118 122 Ok(s) -> { ··· 188 192 // aggregate_fetcher 189 193 None, 190 194 // viewer_fetcher 195 + None, 196 + // notification_fetcher 191 197 ) 192 198 { 193 199 Ok(s) -> { ··· 256 262 // aggregate_fetcher 257 263 None, 258 264 // viewer_fetcher 265 + None, 266 + // notification_fetcher 259 267 ) 260 268 { 261 269 Ok(s) -> { ··· 344 352 // aggregate_fetcher 345 353 None, 346 354 // viewer_fetcher 355 + None, 356 + // notification_fetcher 347 357 ) 348 358 { 349 359 Ok(s) -> {
+1
lexicon_graphql/test/where_schema_test.gleam
··· 48 48 option.None, 49 49 option.Some(aggregate_fetcher), 50 50 option.None, 51 + option.None, 51 52 ) 52 53 { 53 54 Ok(s) -> s
+18
server/db/migrations/20241227000001_add_rkey_column.sql
··· 1 + -- migrate:up 2 + 3 + -- Add rkey as a generated column extracted from uri 4 + -- URI format: at://did/collection/rkey 5 + -- We extract everything after the last '/' 6 + -- Note: Using VIRTUAL instead of STORED because SQLite doesn't allow 7 + -- adding STORED columns via ALTER TABLE 8 + ALTER TABLE record ADD COLUMN rkey TEXT 9 + GENERATED ALWAYS AS ( 10 + substr(uri, instr(substr(uri, instr(substr(uri, 6), '/') + 6), '/') + instr(substr(uri, 6), '/') + 6) 11 + ) VIRTUAL; 12 + 13 + -- Note: Cannot create index on VIRTUAL column in SQLite 14 + -- The column will be computed on-the-fly during queries 15 + 16 + -- migrate:down 17 + 18 + -- Note: SQLite doesn't support DROP COLUMN, would need table rebuild
+17
server/db/migrations_postgres/20241227000001_add_rkey_column.sql
··· 1 + -- migrate:up 2 + 3 + -- Add rkey as a generated column extracted from uri 4 + -- URI format: at://did/collection/rkey 5 + -- We extract everything after the last '/' 6 + ALTER TABLE record ADD COLUMN rkey TEXT 7 + GENERATED ALWAYS AS ( 8 + substring(uri from '[^/]+$') 9 + ) STORED; 10 + 11 + -- Index for efficient sorting by rkey (TID-based chronological order) 12 + CREATE INDEX IF NOT EXISTS idx_record_rkey ON record(rkey DESC); 13 + 14 + -- migrate:down 15 + 16 + DROP INDEX IF EXISTS idx_record_rkey; 17 + ALTER TABLE record DROP COLUMN rkey;
+6 -2
server/db/schema.sql
··· 6 6 collection TEXT NOT NULL, 7 7 json TEXT NOT NULL, 8 8 indexed_at TEXT NOT NULL DEFAULT (datetime('now')) 9 - ); 9 + , rkey TEXT 10 + GENERATED ALWAYS AS ( 11 + substr(uri, instr(substr(uri, instr(substr(uri, 6), '/') + 6), '/') + instr(substr(uri, 6), '/') + 6) 12 + ) VIRTUAL); 10 13 CREATE INDEX idx_record_did ON record(did); 11 14 CREATE INDEX idx_record_collection ON record(collection); 12 15 CREATE INDEX idx_record_did_collection ON record(did, collection); ··· 190 193 CREATE INDEX idx_admin_session_atp_session_id ON admin_session(atp_session_id); 191 194 -- Dbmate schema migrations 192 195 INSERT INTO "schema_migrations" (version) VALUES 193 - ('20241210000001'); 196 + ('20241210000001'), 197 + ('20241227000001');
+1 -1
server/gleam.toml
··· 37 37 gleam_crypto = ">= 1.5.1 and < 2.0.0" 38 38 logging = ">= 1.3.0 and < 2.0.0" 39 39 group_registry = ">= 1.0.0 and < 2.0.0" 40 - swell = ">= 2.0.0 and < 3.0.0" 40 + swell = ">= 2.1.3 and < 3.0.0" 41 41 honk = ">= 1.0.0 and < 2.0.0" 42 42 43 43 [dev-dependencies]
+4 -4
server/manifest.toml
··· 17 17 { name = "filepath", version = "1.1.2", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "filepath", source = "hex", outer_checksum = "B06A9AF0BF10E51401D64B98E4B627F1D2E48C154967DA7AF4D0914780A6D40A" }, 18 18 { name = "gleam_crypto", version = "1.5.1", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_crypto", source = "hex", outer_checksum = "50774BAFFF1144E7872814C566C5D653D83A3EBF23ACC3156B757A1B6819086E" }, 19 19 { name = "gleam_erlang", version = "1.3.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_erlang", source = "hex", outer_checksum = "1124AD3AA21143E5AF0FC5CF3D9529F6DB8CA03E43A55711B60B6B7B3874375C" }, 20 - { name = "gleam_hackney", version = "1.3.2", build_tools = ["gleam"], requirements = ["gleam_http", "gleam_stdlib", "hackney"], otp_app = "gleam_hackney", source = "hex", outer_checksum = "CF6B627BC3E3726D14D220C30ACE8EF32433F19C33CE96BBF70C2068DFF04ACD" }, 20 + { name = "gleam_hackney", version = "1.3.3", build_tools = ["gleam"], requirements = ["gleam_http", "gleam_stdlib", "hackney"], otp_app = "gleam_hackney", source = "hex", outer_checksum = "03B4125E5E6DFD6BC20CC1FAD0D35B8474D9515DFDFFB69255EFE6D7B49BEB07" }, 21 21 { name = "gleam_http", version = "4.3.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_http", source = "hex", outer_checksum = "82EA6A717C842456188C190AFB372665EA56CE13D8559BF3B1DD9E40F619EE0C" }, 22 22 { name = "gleam_httpc", version = "5.0.0", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_http", "gleam_stdlib"], otp_app = "gleam_httpc", source = "hex", outer_checksum = "C545172618D07811494E97AAA4A0FB34DA6F6D0061FDC8041C2F8E3BE2B2E48F" }, 23 23 { name = "gleam_json", version = "3.1.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_json", source = "hex", outer_checksum = "44FDAA8847BE8FC48CA7A1C089706BD54BADCC4C45B237A992EDDF9F2CDB2836" }, ··· 49 49 { name = "pgo", version = "0.20.0", build_tools = ["rebar3"], requirements = ["backoff", "opentelemetry_api", "pg_types"], otp_app = "pgo", source = "hex", outer_checksum = "2F11E6649CEB38E569EF56B16BE1D04874AE5B11A02867080A2817CE423C683B" }, 50 50 { name = "platform", version = "1.0.0", build_tools = ["gleam"], requirements = [], otp_app = "platform", source = "hex", outer_checksum = "8339420A95AD89AAC0F82F4C3DB8DD401041742D6C3F46132A8739F6AEB75391" }, 51 51 { name = "pog", version = "4.1.0", build_tools = ["gleam"], requirements = ["exception", "gleam_erlang", "gleam_otp", "gleam_stdlib", "gleam_time", "pgo"], otp_app = "pog", source = "hex", outer_checksum = "E4AFBA39A5FAA2E77291836C9683ADE882E65A06AB28CA7D61AE7A3AD61EBBD5" }, 52 - { name = "simplifile", version = "2.3.1", build_tools = ["gleam"], requirements = ["filepath", "gleam_stdlib"], otp_app = "simplifile", source = "hex", outer_checksum = "957E0E5B75927659F1D2A1B7B75D7B9BA96FAA8D0C53EA71C4AD9CD0C6B848F6" }, 52 + { name = "simplifile", version = "2.3.2", build_tools = ["gleam"], requirements = ["filepath", "gleam_stdlib"], otp_app = "simplifile", source = "hex", outer_checksum = "E049B4DACD4D206D87843BCF4C775A50AE0F50A52031A2FFB40C9ED07D6EC70A" }, 53 53 { name = "sqlight", version = "1.0.3", build_tools = ["gleam"], requirements = ["esqlite", "gleam_stdlib"], otp_app = "sqlight", source = "hex", outer_checksum = "CADD79663C9B61D4BAC960A47CC2D42CA8F48EAF5804DBEB79977287750F4B16" }, 54 54 { name = "ssl_verify_fun", version = "1.1.7", build_tools = ["mix", "rebar3", "make"], requirements = [], otp_app = "ssl_verify_fun", source = "hex", outer_checksum = "FE4C190E8F37401D30167C8C405EDA19469F34577987C76DDE613E838BBC67F8" }, 55 - { name = "swell", version = "2.1.2", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "swell", source = "hex", outer_checksum = "98177585C75E2273CA6B5D63D9D8BE072B9B0CBA6F66DA0310A44D379B89A6D1" }, 55 + { name = "swell", version = "2.1.3", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "swell", source = "hex", outer_checksum = "018FF9B3F775F101E72208B4E0EA15A670A51E59C83247DD0302C3AD8C2FE9FF" }, 56 56 { name = "telemetry", version = "1.3.0", build_tools = ["rebar3"], requirements = [], otp_app = "telemetry", source = "hex", outer_checksum = "7015FC8919DBE63764F4B4B87A95B7C0996BD539E0D499BE6EC9D7F3875B79E6" }, 57 57 { name = "unicode_util_compat", version = "0.7.1", build_tools = ["rebar3"], requirements = [], otp_app = "unicode_util_compat", source = "hex", outer_checksum = "B3A917854CE3AE233619744AD1E0102E05673136776FB2FA76234F3E03B23642" }, 58 58 { name = "wisp", version = "2.1.1", build_tools = ["gleam"], requirements = ["directories", "exception", "filepath", "gleam_crypto", "gleam_erlang", "gleam_http", "gleam_json", "gleam_stdlib", "houdini", "logging", "marceau", "mist", "simplifile"], otp_app = "wisp", source = "hex", outer_checksum = "46E2E31DECD61A3748CF6CB317D9AC432BBC8D8A6E65655A9E787BDC69389DE0" }, ··· 84 84 pog = { version = ">= 4.0.0 and < 5.0.0" } 85 85 simplifile = { version = ">= 2.0.0 and < 3.0.0" } 86 86 sqlight = { version = ">= 1.0.0 and < 2.0.0" } 87 - swell = { version = ">= 2.0.0 and < 3.0.0" } 87 + swell = { version = ">= 2.1.3 and < 3.0.0" } 88 88 wisp = { version = ">= 2.1.0 and < 3.0.0" }
+1
server/src/backfill.gleam
··· 229 229 collection: car_record.collection, 230 230 json: atproto_car.record_to_json(car_record), 231 231 indexed_at: now, 232 + rkey: car_record.rkey, 232 233 ) 233 234 } 234 235
+3 -2
server/src/database/queries/pagination.gleam
··· 65 65 "did" -> record.did 66 66 "collection" -> record.collection 67 67 "indexed_at" -> record.indexed_at 68 + "rkey" -> record.rkey 68 69 _ -> extract_json_field(record.json, field) 69 70 } 70 71 } ··· 308 309 /// Builds a field reference for cursor SQL queries (handles JSON fields) 309 310 fn build_cursor_field_reference(exec: Executor, field: String) -> String { 310 311 case field { 311 - "uri" | "cid" | "did" | "collection" | "indexed_at" -> field 312 + "uri" | "cid" | "did" | "collection" | "indexed_at" | "rkey" -> field 312 313 _ -> executor.json_extract(exec, "json", field) 313 314 } 314 315 } ··· 376 377 False -> "" 377 378 } 378 379 let field_ref = case field_name { 379 - "uri" | "cid" | "did" | "collection" | "indexed_at" -> 380 + "uri" | "cid" | "did" | "collection" | "indexed_at" | "rkey" -> 380 381 table_prefix <> field_name 381 382 "createdAt" | "indexedAt" -> { 382 383 let json_field =
+115 -5
server/src/database/repositories/records.gleam
··· 19 19 /// SQLite: both are TEXT 20 20 fn record_columns(exec: Executor) -> String { 21 21 case executor.dialect(exec) { 22 - executor.SQLite -> "uri, cid, did, collection, json, indexed_at" 22 + executor.SQLite -> "uri, cid, did, collection, json, indexed_at, rkey" 23 23 executor.PostgreSQL -> 24 - "uri, cid, did, collection, json::text, indexed_at::text" 24 + "uri, cid, did, collection, json::text, indexed_at::text, rkey" 25 25 } 26 26 } 27 27 ··· 29 29 fn record_columns_prefixed(exec: Executor) -> String { 30 30 case executor.dialect(exec) { 31 31 executor.SQLite -> 32 - "record.uri, record.cid, record.did, record.collection, record.json, record.indexed_at" 32 + "record.uri, record.cid, record.did, record.collection, record.json, record.indexed_at, record.rkey" 33 33 executor.PostgreSQL -> 34 - "record.uri, record.cid, record.did, record.collection, record.json::text, record.indexed_at::text" 34 + "record.uri, record.cid, record.did, record.collection, record.json::text, record.indexed_at::text, record.rkey" 35 35 } 36 36 } 37 37 ··· 394 394 use collection <- decode.field(3, decode.string) 395 395 use json <- decode.field(4, decode.string) 396 396 use indexed_at <- decode.field(5, decode.string) 397 - decode.success(Record(uri:, cid:, did:, collection:, json:, indexed_at:)) 397 + use rkey <- decode.field(6, decode.string) 398 + decode.success(Record( 399 + uri:, 400 + cid:, 401 + did:, 402 + collection:, 403 + json:, 404 + indexed_at:, 405 + rkey:, 406 + )) 398 407 } 399 408 400 409 // ===== Statistics Functions ===== ··· 1061 1070 executor.query(exec, sql, params, record_decoder()) 1062 1071 } 1063 1072 } 1073 + } 1074 + 1075 + /// Get records that mention the given DID (excluding records authored by that DID) 1076 + /// This is used for notifications - finding all records that reference a user. 1077 + /// Returns: (records, next_cursor, has_next_page, has_previous_page) 1078 + pub fn get_notifications( 1079 + exec: Executor, 1080 + did: String, 1081 + collections: Option(List(String)), 1082 + first: Option(Int), 1083 + after: Option(String), 1084 + ) -> Result(#(List(Record), Option(String), Bool, Bool), DbError) { 1085 + let limit = option.unwrap(first, 50) 1086 + let pattern = "%" <> did <> "%" 1087 + 1088 + // Start building params - pattern is $1, did is $2 1089 + let mut_params: List(Value) = [Text(pattern), Text(did)] 1090 + let mut_param_count = 2 1091 + 1092 + // Build collection filter 1093 + let #(collection_clause, collection_params, param_count_after_cols) = case 1094 + collections 1095 + { 1096 + None -> #("", [], mut_param_count) 1097 + Some([]) -> #("", [], mut_param_count) 1098 + Some(cols) -> { 1099 + let placeholders = 1100 + cols 1101 + |> list.index_map(fn(_, i) { 1102 + executor.placeholder(exec, mut_param_count + i + 1) 1103 + }) 1104 + |> string.join(", ") 1105 + let new_count = mut_param_count + list.length(cols) 1106 + #( 1107 + " AND collection IN (" <> placeholders <> ")", 1108 + list.map(cols, Text), 1109 + new_count, 1110 + ) 1111 + } 1112 + } 1113 + 1114 + // Build cursor clause 1115 + let #(cursor_clause, cursor_params) = case after { 1116 + None -> #("", []) 1117 + Some(cursor) -> { 1118 + case pagination.decode_cursor(cursor, None) { 1119 + Ok(decoded) -> { 1120 + // Cursor format: rkey|uri for TID-based chronological sorting 1121 + let rkey_value = 1122 + decoded.field_values |> list.first |> result.unwrap("") 1123 + let uri_value = decoded.cid 1124 + let p1 = executor.placeholder(exec, param_count_after_cols + 1) 1125 + let p2 = executor.placeholder(exec, param_count_after_cols + 2) 1126 + #(" AND (rkey, uri) < (" <> p1 <> ", " <> p2 <> ")", [ 1127 + Text(rkey_value), 1128 + Text(uri_value), 1129 + ]) 1130 + } 1131 + Error(_) -> #("", []) 1132 + } 1133 + } 1134 + } 1135 + 1136 + // Combine all params 1137 + let all_params = 1138 + mut_params 1139 + |> list.append(collection_params) 1140 + |> list.append(cursor_params) 1141 + 1142 + let sql = 1143 + "SELECT " 1144 + <> record_columns(exec) 1145 + <> " FROM record WHERE json LIKE " 1146 + <> executor.placeholder(exec, 1) 1147 + <> " AND did != " 1148 + <> executor.placeholder(exec, 2) 1149 + <> collection_clause 1150 + <> cursor_clause 1151 + <> " ORDER BY rkey DESC, uri DESC LIMIT " 1152 + <> int.to_string(limit + 1) 1153 + 1154 + use results <- result.try(executor.query( 1155 + exec, 1156 + sql, 1157 + all_params, 1158 + record_decoder(), 1159 + )) 1160 + 1161 + let has_next = list.length(results) > limit 1162 + let trimmed = results |> list.take(limit) 1163 + 1164 + let end_cursor = case list.last(trimmed) { 1165 + Ok(record) -> { 1166 + // Encode cursor as rkey|uri for notification pagination 1167 + let cursor_content = record.rkey <> "|" <> record.uri 1168 + Some(pagination.encode_base64(cursor_content)) 1169 + } 1170 + Error(_) -> None 1171 + } 1172 + 1173 + Ok(#(trimmed, end_cursor, has_next, False)) 1064 1174 } 1065 1175 1066 1176 /// Get records by DID and collection with pagination (for DID joins with connections)
+1
server/src/database/types.gleam
··· 11 11 collection: String, 12 12 json: String, 13 13 indexed_at: String, 14 + rkey: String, 14 15 ) 15 16 } 16 17
+31
server/src/graphql/lexicon/fetchers.gleam
··· 348 348 } 349 349 } 350 350 } 351 + 352 + /// Create a notification fetcher for cross-collection DID mention queries 353 + pub fn notification_fetcher(db: Executor) { 354 + fn( 355 + did: String, 356 + collections: option.Option(List(String)), 357 + first: option.Option(Int), 358 + after: option.Option(String), 359 + ) -> Result( 360 + #(List(#(value.Value, String)), option.Option(String), Bool, Bool), 361 + String, 362 + ) { 363 + use result <- result.try( 364 + records.get_notifications(db, did, collections, first, after) 365 + |> result.map_error(fn(e) { "Database error: " <> string.inspect(e) }), 366 + ) 367 + 368 + let #(records_list, end_cursor, has_next, has_prev) = result 369 + 370 + // Convert database records to GraphQL values with cursors 371 + let converted = 372 + list.map(records_list, fn(record) { 373 + let graphql_value = converters.record_to_graphql_value(record, db) 374 + // Generate cursor for this record (no sort_by for notifications) 375 + let cursor = pagination.generate_cursor_from_record(record, option.None) 376 + #(graphql_value, cursor) 377 + }) 378 + 379 + Ok(#(converted, end_cursor, has_next, has_prev)) 380 + } 381 + }
+5 -1
server/src/graphql/lexicon/schema.gleam
··· 118 118 mutations.upload_blob_resolver_factory(mutation_ctx) 119 119 }) 120 120 121 - // Step 6: Build schema with database-backed resolvers, mutations, and subscriptions 121 + // Step 6: Create notification fetcher 122 + let notification_fetcher = fetchers.notification_fetcher(db) 123 + 124 + // Step 7: Build schema with database-backed resolvers, mutations, and subscriptions 122 125 database.build_schema_with_subscriptions( 123 126 parsed_lexicons, 124 127 record_fetcher, ··· 130 133 upload_blob_factory, 131 134 option.Some(aggregate_fetcher), 132 135 option.Some(viewer_fetcher), 136 + option.Some(notification_fetcher), 133 137 ) 134 138 } 135 139 }
+96 -1
server/src/handlers/graphql_ws.gleam
··· 484 484 event_field == subscription_field 485 485 } 486 486 487 + /// Check if a record event matches a notification subscription 488 + /// 489 + /// A notification event matches when: 490 + /// 1. The event value contains the subscribed DID (is a mention) 491 + /// 2. The event is NOT authored by the subscribed DID (excludes self) 492 + /// 3. The operation is Create (notifications are for new records only) 493 + /// 4. The collection matches the filter (if provided) 494 + pub fn event_matches_notification_subscription( 495 + event: pubsub.RecordEvent, 496 + subscribed_did: String, 497 + collections: Option(List(String)), 498 + ) -> Bool { 499 + // Event value must contain the subscribed DID (mentioning them) 500 + let contains_did = string.contains(event.value, subscribed_did) 501 + 502 + // Event must NOT be authored by the subscribed DID (exclude self) 503 + let not_self_authored = event.did != subscribed_did 504 + 505 + // Event must be a Create operation (notifications for new records only) 506 + let is_create = event.operation == pubsub.Create 507 + 508 + // Event collection must match filter (if provided) 509 + let matches_collection = case collections { 510 + None -> True 511 + Some([]) -> True 512 + Some(cols) -> list.contains(cols, event.collection) 513 + } 514 + 515 + contains_did && not_self_authored && is_create && matches_collection 516 + } 517 + 518 + /// Extract a string value from variables dict 519 + fn get_variable_string( 520 + variables: Dict(String, value.Value), 521 + key: String, 522 + ) -> Option(String) { 523 + case dict.get(variables, key) { 524 + Ok(value.String(s)) -> Some(s) 525 + _ -> None 526 + } 527 + } 528 + 529 + /// Extract a list of strings from variables dict (for enum list values) 530 + fn get_variable_string_list( 531 + variables: Dict(String, value.Value), 532 + key: String, 533 + ) -> Option(List(String)) { 534 + case dict.get(variables, key) { 535 + Ok(value.List(items)) -> { 536 + let strings = 537 + list.filter_map(items, fn(item) { 538 + case item { 539 + value.String(s) -> Ok(s) 540 + value.Enum(e) -> Ok(e) 541 + _ -> Error(Nil) 542 + } 543 + }) 544 + Some(strings) 545 + } 546 + _ -> None 547 + } 548 + } 549 + 487 550 /// Process an event and send it to the WebSocket client if it matches 488 551 fn process_event( 489 552 event: pubsub.RecordEvent, ··· 495 558 db: Executor, 496 559 graphql_schema: schema.Schema, 497 560 ) -> Nil { 498 - case event_matches_subscription(event, subscription_field) { 561 + // Check if this is a notification subscription 562 + let matches = case subscription_field { 563 + "notificationCreated" -> { 564 + // For notifications, extract did and collections from variables 565 + let subscribed_did = case get_variable_string(variables, "did") { 566 + Some(did) -> did 567 + None -> "" 568 + } 569 + // Convert enum values to NSIDs (APP_BSKY_FEED_LIKE -> app.bsky.feed.like) 570 + let collections = case 571 + get_variable_string_list(variables, "collections") 572 + { 573 + Some(enum_values) -> { 574 + let nsids = 575 + list.map(enum_values, fn(enum_val) { 576 + enum_val 577 + |> string.lowercase() 578 + |> string.replace("_", ".") 579 + }) 580 + Some(nsids) 581 + } 582 + None -> None 583 + } 584 + event_matches_notification_subscription( 585 + event, 586 + subscribed_did, 587 + collections, 588 + ) 589 + } 590 + _ -> event_matches_subscription(event, subscription_field) 591 + } 592 + 593 + case matches { 499 594 True -> { 500 595 // Execute the GraphQL subscription query with the event data and variables 501 596 case
+13 -54
server/test/blob_integration_test.gleam
··· 5 5 /// 2. Insert records with blob data in AT Protocol format 6 6 /// 3. Execute GraphQL queries with blob field selection 7 7 /// 4. Verify blob fields are resolved correctly with all sub-fields 8 - import database/executor 9 8 import database/repositories/lexicons 10 9 import database/repositories/records 11 - import database/sqlite/connection as db_connection 12 10 import gleam/http 13 11 import gleam/json 14 12 import gleam/option ··· 16 14 import gleeunit/should 17 15 import handlers/graphql as graphql_handler 18 16 import lib/oauth/did_cache 17 + import test_helpers 19 18 import wisp 20 19 import wisp/simulate 21 20 ··· 67 66 68 67 pub fn blob_field_query_test() { 69 68 // Create in-memory database 70 - let assert Ok(exec) = db_connection.connect("sqlite::memory:") 71 - let assert Ok(_) = 72 - executor.exec( 73 - exec, 74 - "CREATE TABLE IF NOT EXISTS lexicon (id TEXT PRIMARY KEY NOT NULL, json TEXT NOT NULL, created_at TEXT NOT NULL DEFAULT (datetime('now')))", 75 - [], 76 - ) 77 - let assert Ok(_) = 78 - executor.exec( 79 - exec, 80 - "CREATE TABLE IF NOT EXISTS record (uri TEXT PRIMARY KEY NOT NULL, cid TEXT NOT NULL, did TEXT NOT NULL, collection TEXT NOT NULL, json TEXT NOT NULL, indexed_at TEXT NOT NULL DEFAULT (datetime('now')))", 81 - [], 82 - ) 69 + let assert Ok(exec) = test_helpers.create_test_db() 70 + let assert Ok(_) = test_helpers.create_lexicon_table(exec) 71 + let assert Ok(_) = test_helpers.create_record_table(exec) 83 72 84 73 // Insert profile lexicon with blob fields 85 74 let lexicon = create_profile_lexicon() ··· 176 165 177 166 pub fn blob_field_with_different_presets_test() { 178 167 // Create in-memory database 179 - let assert Ok(exec) = db_connection.connect("sqlite::memory:") 180 - let assert Ok(_) = 181 - executor.exec( 182 - exec, 183 - "CREATE TABLE IF NOT EXISTS lexicon (id TEXT PRIMARY KEY NOT NULL, json TEXT NOT NULL, created_at TEXT NOT NULL DEFAULT (datetime('now')))", 184 - [], 185 - ) 186 - let assert Ok(_) = 187 - executor.exec( 188 - exec, 189 - "CREATE TABLE IF NOT EXISTS record (uri TEXT PRIMARY KEY NOT NULL, cid TEXT NOT NULL, did TEXT NOT NULL, collection TEXT NOT NULL, json TEXT NOT NULL, indexed_at TEXT NOT NULL DEFAULT (datetime('now')))", 190 - [], 191 - ) 168 + let assert Ok(exec) = test_helpers.create_test_db() 169 + let assert Ok(_) = test_helpers.create_lexicon_table(exec) 170 + let assert Ok(_) = test_helpers.create_record_table(exec) 192 171 193 172 // Insert profile lexicon 194 173 let lexicon = create_profile_lexicon() ··· 262 241 263 242 pub fn blob_field_default_preset_test() { 264 243 // Test that when no preset is specified, feed_fullsize is used 265 - let assert Ok(exec) = db_connection.connect("sqlite::memory:") 266 - let assert Ok(_) = 267 - executor.exec( 268 - exec, 269 - "CREATE TABLE IF NOT EXISTS lexicon (id TEXT PRIMARY KEY NOT NULL, json TEXT NOT NULL, created_at TEXT NOT NULL DEFAULT (datetime('now')))", 270 - [], 271 - ) 272 - let assert Ok(_) = 273 - executor.exec( 274 - exec, 275 - "CREATE TABLE IF NOT EXISTS record (uri TEXT PRIMARY KEY NOT NULL, cid TEXT NOT NULL, did TEXT NOT NULL, collection TEXT NOT NULL, json TEXT NOT NULL, indexed_at TEXT NOT NULL DEFAULT (datetime('now')))", 276 - [], 277 - ) 244 + let assert Ok(exec) = test_helpers.create_test_db() 245 + let assert Ok(_) = test_helpers.create_lexicon_table(exec) 246 + let assert Ok(_) = test_helpers.create_record_table(exec) 278 247 279 248 let lexicon = create_profile_lexicon() 280 249 let assert Ok(_) = lexicons.insert(exec, "app.test.profile", lexicon) ··· 344 313 345 314 pub fn blob_field_null_when_missing_test() { 346 315 // Test that blob fields return null when not present in record 347 - let assert Ok(exec) = db_connection.connect("sqlite::memory:") 348 - let assert Ok(_) = 349 - executor.exec( 350 - exec, 351 - "CREATE TABLE IF NOT EXISTS lexicon (id TEXT PRIMARY KEY NOT NULL, json TEXT NOT NULL, created_at TEXT NOT NULL DEFAULT (datetime('now')))", 352 - [], 353 - ) 354 - let assert Ok(_) = 355 - executor.exec( 356 - exec, 357 - "CREATE TABLE IF NOT EXISTS record (uri TEXT PRIMARY KEY NOT NULL, cid TEXT NOT NULL, did TEXT NOT NULL, collection TEXT NOT NULL, json TEXT NOT NULL, indexed_at TEXT NOT NULL DEFAULT (datetime('now')))", 358 - [], 359 - ) 316 + let assert Ok(exec) = test_helpers.create_test_db() 317 + let assert Ok(_) = test_helpers.create_lexicon_table(exec) 318 + let assert Ok(_) = test_helpers.create_record_table(exec) 360 319 361 320 let lexicon = create_profile_lexicon() 362 321 let assert Ok(_) = lexicons.insert(exec, "app.test.profile", lexicon)
+96
server/test/database/repositories/notifications_test.gleam
··· 1 + // Test suite for get_notifications repository function 2 + import database/repositories/records 3 + import database/types.{type Record} 4 + import gleam/list 5 + import gleam/option.{None, Some} 6 + import gleeunit/should 7 + import test_helpers 8 + 9 + pub fn get_notifications_returns_records_mentioning_did_test() { 10 + let assert Ok(db) = test_helpers.create_test_db() 11 + let assert Ok(_) = test_helpers.create_record_table(db) 12 + 13 + // Insert a record that mentions did:plc:target 14 + let assert Ok(_) = 15 + records.insert( 16 + db, 17 + "at://did:plc:author/app.bsky.feed.like/abc", 18 + "bafy123", 19 + "did:plc:author", 20 + "app.bsky.feed.like", 21 + "{\"subject\":{\"uri\":\"at://did:plc:target/app.bsky.feed.post/xyz\"}}", 22 + ) 23 + 24 + // Insert a record by the target (should be excluded) 25 + let assert Ok(_) = 26 + records.insert( 27 + db, 28 + "at://did:plc:target/app.bsky.feed.post/xyz", 29 + "bafy456", 30 + "did:plc:target", 31 + "app.bsky.feed.post", 32 + "{\"text\":\"Hello world\"}", 33 + ) 34 + 35 + // Insert a record that doesn't mention the target 36 + let assert Ok(_) = 37 + records.insert( 38 + db, 39 + "at://did:plc:other/app.bsky.feed.post/zzz", 40 + "bafy789", 41 + "did:plc:other", 42 + "app.bsky.feed.post", 43 + "{\"text\":\"Unrelated post\"}", 44 + ) 45 + 46 + let assert Ok(#(results, _cursor, _has_next, _has_prev)) = 47 + records.get_notifications(db, "did:plc:target", None, None, None) 48 + 49 + // Should only return the like, not the target's own post or unrelated post 50 + list.length(results) |> should.equal(1) 51 + let assert Ok(first): Result(Record, _) = list.first(results) 52 + first.uri |> should.equal("at://did:plc:author/app.bsky.feed.like/abc") 53 + first.did |> should.equal("did:plc:author") 54 + first.collection |> should.equal("app.bsky.feed.like") 55 + } 56 + 57 + pub fn get_notifications_filters_by_collection_test() { 58 + let assert Ok(db) = test_helpers.create_test_db() 59 + let assert Ok(_) = test_helpers.create_record_table(db) 60 + 61 + // Insert a like mentioning target 62 + let assert Ok(_) = 63 + records.insert( 64 + db, 65 + "at://did:plc:author/app.bsky.feed.like/abc", 66 + "bafy123", 67 + "did:plc:author", 68 + "app.bsky.feed.like", 69 + "{\"subject\":{\"uri\":\"at://did:plc:target/app.bsky.feed.post/xyz\"}}", 70 + ) 71 + 72 + // Insert a follow mentioning target 73 + let assert Ok(_) = 74 + records.insert( 75 + db, 76 + "at://did:plc:author/app.bsky.graph.follow/def", 77 + "bafy456", 78 + "did:plc:author", 79 + "app.bsky.graph.follow", 80 + "{\"subject\":\"did:plc:target\"}", 81 + ) 82 + 83 + // Filter to only likes 84 + let assert Ok(#(results, _, _, _)) = 85 + records.get_notifications( 86 + db, 87 + "did:plc:target", 88 + Some(["app.bsky.feed.like"]), 89 + None, 90 + None, 91 + ) 92 + 93 + list.length(results) |> should.equal(1) 94 + let assert Ok(first): Result(Record, _) = list.first(results) 95 + first.collection |> should.equal("app.bsky.feed.like") 96 + }
+434
server/test/graphql/notifications_e2e_test.gleam
··· 1 + /// End-to-end tests for notifications GraphQL query 2 + /// 3 + /// Tests verify that: 4 + /// - notifications query returns records mentioning the given DID 5 + /// - Self-authored records are excluded 6 + /// - Collection filtering works correctly 7 + /// - Union type resolution works across different record types 8 + import database/repositories/actors 9 + import database/repositories/lexicons 10 + import database/repositories/records 11 + import gleam/json 12 + import gleam/option 13 + import gleam/string 14 + import gleeunit/should 15 + import graphql/lexicon/schema as lexicon_schema 16 + import lib/oauth/did_cache 17 + import test_helpers 18 + 19 + // Helper to create a post lexicon JSON 20 + fn create_post_lexicon() -> String { 21 + json.object([ 22 + #("lexicon", json.int(1)), 23 + #("id", json.string("app.bsky.feed.post")), 24 + #( 25 + "defs", 26 + json.object([ 27 + #( 28 + "main", 29 + json.object([ 30 + #("type", json.string("record")), 31 + #("key", json.string("tid")), 32 + #( 33 + "record", 34 + json.object([ 35 + #("type", json.string("object")), 36 + #( 37 + "required", 38 + json.array([json.string("text")], of: fn(x) { x }), 39 + ), 40 + #( 41 + "properties", 42 + json.object([ 43 + #( 44 + "text", 45 + json.object([ 46 + #("type", json.string("string")), 47 + #("maxLength", json.int(300)), 48 + ]), 49 + ), 50 + #( 51 + "createdAt", 52 + json.object([ 53 + #("type", json.string("string")), 54 + #("format", json.string("datetime")), 55 + ]), 56 + ), 57 + ]), 58 + ), 59 + ]), 60 + ), 61 + ]), 62 + ), 63 + ]), 64 + ), 65 + ]) 66 + |> json.to_string 67 + } 68 + 69 + // Helper to create a like lexicon JSON with subject field 70 + fn create_like_lexicon() -> String { 71 + json.object([ 72 + #("lexicon", json.int(1)), 73 + #("id", json.string("app.bsky.feed.like")), 74 + #( 75 + "defs", 76 + json.object([ 77 + #( 78 + "main", 79 + json.object([ 80 + #("type", json.string("record")), 81 + #("key", json.string("tid")), 82 + #( 83 + "record", 84 + json.object([ 85 + #("type", json.string("object")), 86 + #( 87 + "required", 88 + json.array([json.string("subject")], of: fn(x) { x }), 89 + ), 90 + #( 91 + "properties", 92 + json.object([ 93 + #( 94 + "subject", 95 + json.object([ 96 + #("type", json.string("string")), 97 + #("format", json.string("at-uri")), 98 + ]), 99 + ), 100 + #( 101 + "createdAt", 102 + json.object([ 103 + #("type", json.string("string")), 104 + #("format", json.string("datetime")), 105 + ]), 106 + ), 107 + ]), 108 + ), 109 + ]), 110 + ), 111 + ]), 112 + ), 113 + ]), 114 + ), 115 + ]) 116 + |> json.to_string 117 + } 118 + 119 + // Helper to create a follow lexicon JSON with subject field (DID) 120 + fn create_follow_lexicon() -> String { 121 + json.object([ 122 + #("lexicon", json.int(1)), 123 + #("id", json.string("app.bsky.graph.follow")), 124 + #( 125 + "defs", 126 + json.object([ 127 + #( 128 + "main", 129 + json.object([ 130 + #("type", json.string("record")), 131 + #("key", json.string("tid")), 132 + #( 133 + "record", 134 + json.object([ 135 + #("type", json.string("object")), 136 + #( 137 + "required", 138 + json.array([json.string("subject")], of: fn(x) { x }), 139 + ), 140 + #( 141 + "properties", 142 + json.object([ 143 + #( 144 + "subject", 145 + json.object([#("type", json.string("string"))]), 146 + ), 147 + #( 148 + "createdAt", 149 + json.object([ 150 + #("type", json.string("string")), 151 + #("format", json.string("datetime")), 152 + ]), 153 + ), 154 + ]), 155 + ), 156 + ]), 157 + ), 158 + ]), 159 + ), 160 + ]), 161 + ), 162 + ]) 163 + |> json.to_string 164 + } 165 + 166 + // Test: notifications query returns records mentioning the target DID 167 + pub fn notifications_returns_mentioning_records_test() { 168 + // Setup database 169 + let assert Ok(exec) = test_helpers.create_test_db() 170 + let assert Ok(_) = test_helpers.create_lexicon_table(exec) 171 + let assert Ok(_) = test_helpers.create_record_table(exec) 172 + let assert Ok(_) = test_helpers.create_actor_table(exec) 173 + 174 + // Insert lexicons 175 + let assert Ok(_) = 176 + lexicons.insert(exec, "app.bsky.feed.post", create_post_lexicon()) 177 + let assert Ok(_) = 178 + lexicons.insert(exec, "app.bsky.feed.like", create_like_lexicon()) 179 + let assert Ok(_) = 180 + lexicons.insert(exec, "app.bsky.graph.follow", create_follow_lexicon()) 181 + 182 + // Setup actors 183 + let assert Ok(_) = actors.upsert(exec, "did:plc:target", "target.bsky.social") 184 + let assert Ok(_) = actors.upsert(exec, "did:plc:alice", "alice.bsky.social") 185 + let assert Ok(_) = actors.upsert(exec, "did:plc:bob", "bob.bsky.social") 186 + 187 + // Target's own post (should NOT appear in notifications) 188 + let assert Ok(_) = 189 + records.insert( 190 + exec, 191 + "at://did:plc:target/app.bsky.feed.post/post1", 192 + "bafy001", 193 + "did:plc:target", 194 + "app.bsky.feed.post", 195 + "{\"text\":\"Hello world\",\"createdAt\":\"2024-01-01T00:00:00Z\"}", 196 + ) 197 + 198 + // Alice likes target's post (SHOULD appear) 199 + let assert Ok(_) = 200 + records.insert( 201 + exec, 202 + "at://did:plc:alice/app.bsky.feed.like/like1", 203 + "bafy002", 204 + "did:plc:alice", 205 + "app.bsky.feed.like", 206 + "{\"subject\":\"at://did:plc:target/app.bsky.feed.post/post1\",\"createdAt\":\"2024-01-02T00:00:00Z\"}", 207 + ) 208 + 209 + // Bob follows target (SHOULD appear) 210 + let assert Ok(_) = 211 + records.insert( 212 + exec, 213 + "at://did:plc:bob/app.bsky.graph.follow/follow1", 214 + "bafy003", 215 + "did:plc:bob", 216 + "app.bsky.graph.follow", 217 + "{\"subject\":\"did:plc:target\",\"createdAt\":\"2024-01-03T00:00:00Z\"}", 218 + ) 219 + 220 + // Alice's unrelated post (should NOT appear) 221 + let assert Ok(_) = 222 + records.insert( 223 + exec, 224 + "at://did:plc:alice/app.bsky.feed.post/post2", 225 + "bafy004", 226 + "did:plc:alice", 227 + "app.bsky.feed.post", 228 + "{\"text\":\"Unrelated post\",\"createdAt\":\"2024-01-04T00:00:00Z\"}", 229 + ) 230 + 231 + // Query all notifications - verify union type resolution works correctly 232 + let query = 233 + " 234 + query { 235 + notifications(viewerDid: \"did:plc:target\", first: 10) { 236 + edges { 237 + cursor 238 + node { 239 + __typename 240 + ... on AppBskyFeedLike { 241 + uri 242 + } 243 + ... on AppBskyGraphFollow { 244 + uri 245 + } 246 + } 247 + } 248 + pageInfo { 249 + hasNextPage 250 + hasPreviousPage 251 + } 252 + } 253 + } 254 + " 255 + 256 + let assert Ok(cache) = did_cache.start() 257 + let assert Ok(response_json) = 258 + lexicon_schema.execute_query_with_db( 259 + exec, 260 + query, 261 + "{}", 262 + Error(Nil), 263 + cache, 264 + option.None, 265 + "", 266 + "https://plc.directory", 267 + ) 268 + 269 + // Verify union type resolution returns concrete types 270 + string.contains(response_json, "AppBskyFeedLike") 271 + |> should.be_true 272 + 273 + string.contains(response_json, "AppBskyGraphFollow") 274 + |> should.be_true 275 + 276 + // Verify URIs are returned from inline fragments 277 + string.contains(response_json, "like1") 278 + |> should.be_true 279 + 280 + string.contains(response_json, "follow1") 281 + |> should.be_true 282 + 283 + // Should have pagination info 284 + string.contains(response_json, "hasNextPage") 285 + |> should.be_true 286 + } 287 + 288 + // Test: notifications query with collection filter 289 + pub fn notifications_filters_by_collection_test() { 290 + // Setup database 291 + let assert Ok(exec) = test_helpers.create_test_db() 292 + let assert Ok(_) = test_helpers.create_lexicon_table(exec) 293 + let assert Ok(_) = test_helpers.create_record_table(exec) 294 + let assert Ok(_) = test_helpers.create_actor_table(exec) 295 + 296 + // Insert lexicons 297 + let assert Ok(_) = 298 + lexicons.insert(exec, "app.bsky.feed.post", create_post_lexicon()) 299 + let assert Ok(_) = 300 + lexicons.insert(exec, "app.bsky.feed.like", create_like_lexicon()) 301 + let assert Ok(_) = 302 + lexicons.insert(exec, "app.bsky.graph.follow", create_follow_lexicon()) 303 + 304 + // Setup actors 305 + let assert Ok(_) = actors.upsert(exec, "did:plc:target", "target.bsky.social") 306 + let assert Ok(_) = actors.upsert(exec, "did:plc:alice", "alice.bsky.social") 307 + let assert Ok(_) = actors.upsert(exec, "did:plc:bob", "bob.bsky.social") 308 + 309 + // Alice likes target's post 310 + let assert Ok(_) = 311 + records.insert( 312 + exec, 313 + "at://did:plc:alice/app.bsky.feed.like/like1", 314 + "bafy002", 315 + "did:plc:alice", 316 + "app.bsky.feed.like", 317 + "{\"subject\":\"at://did:plc:target/app.bsky.feed.post/post1\",\"createdAt\":\"2024-01-02T00:00:00Z\"}", 318 + ) 319 + 320 + // Bob follows target 321 + let assert Ok(_) = 322 + records.insert( 323 + exec, 324 + "at://did:plc:bob/app.bsky.graph.follow/follow1", 325 + "bafy003", 326 + "did:plc:bob", 327 + "app.bsky.graph.follow", 328 + "{\"subject\":\"did:plc:target\",\"createdAt\":\"2024-01-03T00:00:00Z\"}", 329 + ) 330 + 331 + // Query only likes (not follows) 332 + let query = 333 + " 334 + query { 335 + notifications(viewerDid: \"did:plc:target\", collections: [APP_BSKY_FEED_LIKE], first: 10) { 336 + edges { 337 + cursor 338 + node { 339 + __typename 340 + ... on AppBskyFeedLike { 341 + uri 342 + } 343 + } 344 + } 345 + } 346 + } 347 + " 348 + 349 + let assert Ok(cache) = did_cache.start() 350 + let assert Ok(response_json) = 351 + lexicon_schema.execute_query_with_db( 352 + exec, 353 + query, 354 + "{}", 355 + Error(Nil), 356 + cache, 357 + option.None, 358 + "", 359 + "https://plc.directory", 360 + ) 361 + 362 + // Should have the like with correct type 363 + string.contains(response_json, "AppBskyFeedLike") 364 + |> should.be_true 365 + 366 + string.contains(response_json, "like1") 367 + |> should.be_true 368 + 369 + // Should NOT have the follow (filtered out) 370 + string.contains(response_json, "follow1") 371 + |> should.be_false 372 + 373 + string.contains(response_json, "AppBskyGraphFollow") 374 + |> should.be_false 375 + } 376 + 377 + // Test: notifications query excludes self-authored records 378 + pub fn notifications_excludes_self_authored_test() { 379 + // Setup database 380 + let assert Ok(exec) = test_helpers.create_test_db() 381 + let assert Ok(_) = test_helpers.create_lexicon_table(exec) 382 + let assert Ok(_) = test_helpers.create_record_table(exec) 383 + let assert Ok(_) = test_helpers.create_actor_table(exec) 384 + 385 + // Insert lexicons 386 + let assert Ok(_) = 387 + lexicons.insert(exec, "app.bsky.feed.post", create_post_lexicon()) 388 + 389 + // Setup actors 390 + let assert Ok(_) = actors.upsert(exec, "did:plc:target", "target.bsky.social") 391 + 392 + // Target's own post that mentions themselves (should NOT appear) 393 + let assert Ok(_) = 394 + records.insert( 395 + exec, 396 + "at://did:plc:target/app.bsky.feed.post/post1", 397 + "bafy001", 398 + "did:plc:target", 399 + "app.bsky.feed.post", 400 + "{\"text\":\"Talking about did:plc:target\",\"createdAt\":\"2024-01-01T00:00:00Z\"}", 401 + ) 402 + 403 + let query = 404 + " 405 + query { 406 + notifications(viewerDid: \"did:plc:target\", first: 10) { 407 + edges { 408 + cursor 409 + node { 410 + __typename 411 + } 412 + } 413 + } 414 + } 415 + " 416 + 417 + let assert Ok(cache) = did_cache.start() 418 + let assert Ok(response_json) = 419 + lexicon_schema.execute_query_with_db( 420 + exec, 421 + query, 422 + "{}", 423 + Error(Nil), 424 + cache, 425 + option.None, 426 + "", 427 + "https://plc.directory", 428 + ) 429 + 430 + // Should have empty edges since self-authored is excluded 431 + // Check for empty edges array (with space after colon) 432 + string.contains(response_json, "\"edges\": []") 433 + |> should.be_true 434 + }
+2
server/test/groupby_enum_validation_test.gleam
··· 56 56 option.None, 57 57 option.Some(stub_aggregate_fetcher), 58 58 option.None, 59 + option.None, 59 60 ) 60 61 61 62 // Introspection query to check if AppBskyFeedPostGroupByField enum exists ··· 204 205 option.None, 205 206 option.None, 206 207 option.Some(stub_aggregate_fetcher), 208 + option.None, 207 209 option.None, 208 210 ) 209 211
+166
server/test/handlers/notification_subscription_test.gleam
··· 1 + /// Tests for notification subscription event filtering 2 + /// 3 + /// Verifies that notification subscription events correctly match 4 + /// mentions of the subscribed DID while excluding self-authored records. 5 + import gleam/option.{None, Some} 6 + import gleeunit/should 7 + import handlers/graphql_ws 8 + import pubsub 9 + 10 + pub fn notification_event_matches_did_test() { 11 + let event = 12 + pubsub.RecordEvent( 13 + uri: "at://did:plc:author/app.bsky.feed.like/abc", 14 + cid: "bafy123", 15 + did: "did:plc:author", 16 + collection: "app.bsky.feed.like", 17 + value: "{\"subject\":{\"uri\":\"at://did:plc:target/app.bsky.feed.post/xyz\"}}", 18 + indexed_at: "2024-01-01T00:00:00Z", 19 + operation: pubsub.Create, 20 + ) 21 + 22 + let matches = 23 + graphql_ws.event_matches_notification_subscription( 24 + event, 25 + "did:plc:target", 26 + None, 27 + ) 28 + 29 + matches |> should.be_true() 30 + } 31 + 32 + pub fn notification_event_excludes_self_authored_test() { 33 + let event = 34 + pubsub.RecordEvent( 35 + uri: "at://did:plc:target/app.bsky.feed.post/xyz", 36 + cid: "bafy123", 37 + did: "did:plc:target", 38 + collection: "app.bsky.feed.post", 39 + value: "{\"text\":\"Hello\"}", 40 + indexed_at: "2024-01-01T00:00:00Z", 41 + operation: pubsub.Create, 42 + ) 43 + 44 + let matches = 45 + graphql_ws.event_matches_notification_subscription( 46 + event, 47 + "did:plc:target", 48 + None, 49 + ) 50 + 51 + matches |> should.be_false() 52 + } 53 + 54 + pub fn notification_event_filters_by_collection_test() { 55 + let event = 56 + pubsub.RecordEvent( 57 + uri: "at://did:plc:author/app.bsky.feed.like/abc", 58 + cid: "bafy123", 59 + did: "did:plc:author", 60 + collection: "app.bsky.feed.like", 61 + value: "{\"subject\":{\"uri\":\"at://did:plc:target/app.bsky.feed.post/xyz\"}}", 62 + indexed_at: "2024-01-01T00:00:00Z", 63 + operation: pubsub.Create, 64 + ) 65 + 66 + // Filter to only posts, like should not match 67 + let matches = 68 + graphql_ws.event_matches_notification_subscription( 69 + event, 70 + "did:plc:target", 71 + Some(["app.bsky.feed.post"]), 72 + ) 73 + 74 + matches |> should.be_false() 75 + } 76 + 77 + pub fn notification_event_accepts_matching_collection_test() { 78 + let event = 79 + pubsub.RecordEvent( 80 + uri: "at://did:plc:author/app.bsky.feed.like/abc", 81 + cid: "bafy123", 82 + did: "did:plc:author", 83 + collection: "app.bsky.feed.like", 84 + value: "{\"subject\":{\"uri\":\"at://did:plc:target/app.bsky.feed.post/xyz\"}}", 85 + indexed_at: "2024-01-01T00:00:00Z", 86 + operation: pubsub.Create, 87 + ) 88 + 89 + // Filter includes likes, should match 90 + let matches = 91 + graphql_ws.event_matches_notification_subscription( 92 + event, 93 + "did:plc:target", 94 + Some(["app.bsky.feed.like", "app.bsky.graph.follow"]), 95 + ) 96 + 97 + matches |> should.be_true() 98 + } 99 + 100 + pub fn notification_event_excludes_update_operation_test() { 101 + let event = 102 + pubsub.RecordEvent( 103 + uri: "at://did:plc:author/app.bsky.feed.like/abc", 104 + cid: "bafy123", 105 + did: "did:plc:author", 106 + collection: "app.bsky.feed.like", 107 + value: "{\"subject\":{\"uri\":\"at://did:plc:target/app.bsky.feed.post/xyz\"}}", 108 + indexed_at: "2024-01-01T00:00:00Z", 109 + operation: pubsub.Update, 110 + ) 111 + 112 + // Only Create operations should match notifications 113 + let matches = 114 + graphql_ws.event_matches_notification_subscription( 115 + event, 116 + "did:plc:target", 117 + None, 118 + ) 119 + 120 + matches |> should.be_false() 121 + } 122 + 123 + pub fn notification_event_excludes_delete_operation_test() { 124 + let event = 125 + pubsub.RecordEvent( 126 + uri: "at://did:plc:author/app.bsky.feed.like/abc", 127 + cid: "bafy123", 128 + did: "did:plc:author", 129 + collection: "app.bsky.feed.like", 130 + value: "{\"subject\":{\"uri\":\"at://did:plc:target/app.bsky.feed.post/xyz\"}}", 131 + indexed_at: "2024-01-01T00:00:00Z", 132 + operation: pubsub.Delete, 133 + ) 134 + 135 + // Only Create operations should match notifications 136 + let matches = 137 + graphql_ws.event_matches_notification_subscription( 138 + event, 139 + "did:plc:target", 140 + None, 141 + ) 142 + 143 + matches |> should.be_false() 144 + } 145 + 146 + pub fn notification_event_excludes_unrelated_did_test() { 147 + let event = 148 + pubsub.RecordEvent( 149 + uri: "at://did:plc:author/app.bsky.feed.post/abc", 150 + cid: "bafy123", 151 + did: "did:plc:author", 152 + collection: "app.bsky.feed.post", 153 + value: "{\"text\":\"Hello world, no mentions\"}", 154 + indexed_at: "2024-01-01T00:00:00Z", 155 + operation: pubsub.Create, 156 + ) 157 + 158 + let matches = 159 + graphql_ws.event_matches_notification_subscription( 160 + event, 161 + "did:plc:target", 162 + None, 163 + ) 164 + 165 + matches |> should.be_false() 166 + }
+9
server/test/pagination_test.gleam
··· 14 14 collection: "app.bsky.feed.post", 15 15 json: "{\"text\":\"Hello world\",\"createdAt\":\"2025-01-15T12:00:00Z\"}", 16 16 indexed_at: "2025-01-15 12:00:00", 17 + rkey: "123", 17 18 ) 18 19 19 20 let result = pagination.generate_cursor_from_record(record, None) ··· 34 35 collection: "app.bsky.feed.post", 35 36 json: "{\"text\":\"Hello world\",\"createdAt\":\"2025-01-15T12:00:00Z\"}", 36 37 indexed_at: "2025-01-15 12:00:00", 38 + rkey: "123", 37 39 ) 38 40 39 41 let sort_by = Some([#("indexed_at", "desc")]) ··· 56 58 collection: "app.bsky.feed.post", 57 59 json: "{\"text\":\"Hello world\",\"createdAt\":\"2025-01-15T12:00:00Z\"}", 58 60 indexed_at: "2025-01-15 12:00:00", 61 + rkey: "123", 59 62 ) 60 63 61 64 let sort_by = Some([#("text", "desc")]) ··· 77 80 collection: "app.bsky.feed.post", 78 81 json: "{\"author\":{\"name\":\"Alice\"},\"createdAt\":\"2025-01-15T12:00:00Z\"}", 79 82 indexed_at: "2025-01-15 12:00:00", 83 + rkey: "123", 80 84 ) 81 85 82 86 let sort_by = Some([#("author.name", "asc")]) ··· 98 102 collection: "app.bsky.feed.post", 99 103 json: "{\"text\":\"Hello\",\"createdAt\":\"2025-01-15T12:00:00Z\"}", 100 104 indexed_at: "2025-01-15 12:00:00", 105 + rkey: "123", 101 106 ) 102 107 103 108 let sort_by = Some([#("text", "desc"), #("createdAt", "desc")]) ··· 179 184 collection: "app.bsky.feed.post", 180 185 json: "{}", 181 186 indexed_at: "2025-01-15 12:00:00", 187 + rkey: "123", 182 188 ) 183 189 184 190 pagination.extract_field_value(record, "uri") ··· 207 213 collection: "app.bsky.feed.post", 208 214 json: "{\"text\":\"Hello world\",\"createdAt\":\"2025-01-15T12:00:00Z\",\"likeCount\":42}", 209 215 indexed_at: "2025-01-15 12:00:00", 216 + rkey: "123", 210 217 ) 211 218 212 219 pagination.extract_field_value(record, "text") ··· 229 236 collection: "app.bsky.feed.post", 230 237 json: "{\"author\":{\"name\":\"Alice\",\"did\":\"did:plc:alice\"}}", 231 238 indexed_at: "2025-01-15 12:00:00", 239 + rkey: "123", 232 240 ) 233 241 234 242 pagination.extract_field_value(record, "author.name") ··· 248 256 collection: "app.bsky.feed.post", 249 257 json: "{\"text\":\"Hello\"}", 250 258 indexed_at: "2025-01-15 12:00:00", 259 + rkey: "123", 251 260 ) 252 261 253 262 pagination.extract_field_value(record, "nonexistent")
+4 -1
server/test/test_helpers.gleam
··· 20 20 did TEXT NOT NULL, 21 21 collection TEXT NOT NULL, 22 22 json TEXT NOT NULL, 23 - indexed_at TEXT NOT NULL DEFAULT (datetime('now')) 23 + indexed_at TEXT NOT NULL DEFAULT (datetime('now')), 24 + rkey TEXT GENERATED ALWAYS AS ( 25 + substr(uri, instr(substr(uri, instr(substr(uri, 6), '/') + 6), '/') + instr(substr(uri, 6), '/') + 6) 26 + ) STORED 24 27 )", 25 28 [], 26 29 )