Auto-indexing service and GraphQL API for AT Protocol Records quickslice.slices.network/
atproto gleam graphql
at main 743 lines 20 kB view raw
1/// End-to-end integration tests for GraphQL aggregated queries 2/// 3/// Tests the complete aggregation flow: 4/// 1. Database setup with lexicons and records 5/// 2. GraphQL schema building with aggregate fields 6/// 3. Aggregated query execution with various parameters 7/// 4. Result formatting and verification 8import database/executor.{type Executor} 9import database/queries/aggregates 10import database/repositories/lexicons 11import database/repositories/records 12import database/types 13import gleam/dict 14import gleam/http 15import gleam/json 16import gleam/list 17import gleam/option.{None} 18import gleam/result 19import gleam/string 20import gleeunit 21import gleeunit/should 22import handlers/graphql as graphql_handler 23import lib/oauth/did_cache 24import test_helpers 25import wisp 26import wisp/simulate 27 28pub fn main() { 29 gleeunit.main() 30} 31 32// Helper to create a simple post lexicon 33fn create_post_lexicon() -> String { 34 json.object([ 35 #("lexicon", json.int(1)), 36 #("id", json.string("app.bsky.feed.post")), 37 #( 38 "defs", 39 json.object([ 40 #( 41 "main", 42 json.object([ 43 #("type", json.string("record")), 44 #( 45 "record", 46 json.object([ 47 #( 48 "properties", 49 json.object([ 50 #("text", json.object([#("type", json.string("string"))])), 51 #("author", json.object([#("type", json.string("string"))])), 52 #("lang", json.object([#("type", json.string("string"))])), 53 #("likes", json.object([#("type", json.string("integer"))])), 54 ]), 55 ), 56 ]), 57 ), 58 ]), 59 ), 60 ]), 61 ), 62 ]) 63 |> json.to_string 64} 65 66// Helper to create a status lexicon 67fn create_status_lexicon() -> String { 68 json.object([ 69 #("lexicon", json.int(1)), 70 #("id", json.string("xyz.statusphere.status")), 71 #( 72 "defs", 73 json.object([ 74 #( 75 "main", 76 json.object([ 77 #("type", json.string("record")), 78 #( 79 "record", 80 json.object([ 81 #( 82 "properties", 83 json.object([ 84 #("status", json.object([#("type", json.string("string"))])), 85 #( 86 "createdAt", 87 json.object([ 88 #("type", json.string("string")), 89 #("format", json.string("datetime")), 90 ]), 91 ), 92 ]), 93 ), 94 ]), 95 ), 96 ]), 97 ), 98 ]), 99 ), 100 ]) 101 |> json.to_string 102} 103 104// Helper to setup test database with aggregatable records 105fn setup_aggregation_test_db() -> Result(Executor, String) { 106 use exec <- result.try( 107 test_helpers.create_test_db() 108 |> result.map_error(fn(_) { "Failed to connect" }), 109 ) 110 use _ <- result.try( 111 test_helpers.create_lexicon_table(exec) 112 |> result.map_error(fn(_) { "Failed to create lexicon table" }), 113 ) 114 use _ <- result.try( 115 test_helpers.create_record_table(exec) 116 |> result.map_error(fn(_) { "Failed to create record table" }), 117 ) 118 119 // Insert post lexicon 120 let post_lexicon = create_post_lexicon() 121 use _ <- result.try( 122 lexicons.insert(exec, "app.bsky.feed.post", post_lexicon) 123 |> result.map_error(fn(_) { "Failed to insert post lexicon" }), 124 ) 125 126 // Insert status lexicon 127 let status_lexicon = create_status_lexicon() 128 use _ <- result.try( 129 lexicons.insert(exec, "xyz.statusphere.status", status_lexicon) 130 |> result.map_error(fn(_) { "Failed to insert status lexicon" }), 131 ) 132 133 // Insert test records with varying fields for aggregation 134 // Posts from different authors with different languages 135 use _ <- result.try( 136 records.insert( 137 exec, 138 "at://did:plc:alice/app.bsky.feed.post/1", 139 "cid1", 140 "did:plc:alice", 141 "app.bsky.feed.post", 142 json.object([ 143 #("text", json.string("Hello world")), 144 #("author", json.string("alice")), 145 #("lang", json.string("en")), 146 #("likes", json.int(100)), 147 ]) 148 |> json.to_string, 149 ) 150 |> result.map_error(fn(_) { "insert failed" }), 151 ) 152 153 use _ <- result.try( 154 records.insert( 155 exec, 156 "at://did:plc:alice/app.bsky.feed.post/2", 157 "cid2", 158 "did:plc:alice", 159 "app.bsky.feed.post", 160 json.object([ 161 #("text", json.string("Another post")), 162 #("author", json.string("alice")), 163 #("lang", json.string("en")), 164 #("likes", json.int(50)), 165 ]) 166 |> json.to_string, 167 ) 168 |> result.map_error(fn(_) { "insert failed" }), 169 ) 170 171 use _ <- result.try( 172 records.insert( 173 exec, 174 "at://did:plc:bob/app.bsky.feed.post/1", 175 "cid3", 176 "did:plc:bob", 177 "app.bsky.feed.post", 178 json.object([ 179 #("text", json.string("Bonjour")), 180 #("author", json.string("bob")), 181 #("lang", json.string("fr")), 182 #("likes", json.int(75)), 183 ]) 184 |> json.to_string, 185 ) 186 |> result.map_error(fn(_) { "insert failed" }), 187 ) 188 189 use _ <- result.try( 190 records.insert( 191 exec, 192 "at://did:plc:charlie/app.bsky.feed.post/1", 193 "cid4", 194 "did:plc:charlie", 195 "app.bsky.feed.post", 196 json.object([ 197 #("text", json.string("Hello from Charlie")), 198 #("author", json.string("charlie")), 199 #("lang", json.string("en")), 200 #("likes", json.int(200)), 201 ]) 202 |> json.to_string, 203 ) 204 |> result.map_error(fn(_) { "insert failed" }), 205 ) 206 207 use _ <- result.try( 208 records.insert( 209 exec, 210 "at://did:plc:bob/app.bsky.feed.post/2", 211 "cid5", 212 "did:plc:bob", 213 "app.bsky.feed.post", 214 json.object([ 215 #("text", json.string("Salut")), 216 #("author", json.string("bob")), 217 #("lang", json.string("fr")), 218 #("likes", json.int(25)), 219 ]) 220 |> json.to_string, 221 ) 222 |> result.map_error(fn(_) { "insert failed" }), 223 ) 224 225 // Insert status records 226 use _ <- result.try( 227 records.insert( 228 exec, 229 "at://did:plc:alice/xyz.statusphere.status/1", 230 "scid1", 231 "did:plc:alice", 232 "xyz.statusphere.status", 233 json.object([ 234 #("status", json.string("👍")), 235 #("createdAt", json.string("2024-01-15T10:00:00Z")), 236 ]) 237 |> json.to_string, 238 ) 239 |> result.map_error(fn(_) { "insert failed" }), 240 ) 241 242 use _ <- result.try( 243 records.insert( 244 exec, 245 "at://did:plc:bob/xyz.statusphere.status/1", 246 "scid2", 247 "did:plc:bob", 248 "xyz.statusphere.status", 249 json.object([ 250 #("status", json.string("👍")), 251 #("createdAt", json.string("2024-01-15T11:00:00Z")), 252 ]) 253 |> json.to_string, 254 ) 255 |> result.map_error(fn(_) { "insert failed" }), 256 ) 257 258 use _ <- result.try( 259 records.insert( 260 exec, 261 "at://did:plc:charlie/xyz.statusphere.status/1", 262 "scid3", 263 "did:plc:charlie", 264 "xyz.statusphere.status", 265 json.object([ 266 #("status", json.string("🔥")), 267 #("createdAt", json.string("2024-01-16T10:00:00Z")), 268 ]) 269 |> json.to_string, 270 ) 271 |> result.map_error(fn(_) { "Failed to insert record" }), 272 ) 273 274 Ok(exec) 275} 276 277// Test: Simple single-field aggregation through full GraphQL stack 278pub fn graphql_simple_aggregation_test() { 279 let assert Ok(exec) = setup_aggregation_test_db() 280 281 // Query: Group posts by author 282 let query = 283 json.object([ 284 #( 285 "query", 286 json.string( 287 "{ appBskyFeedPostAggregated(groupBy: [{field: \"author\"}]) { author count } }", 288 ), 289 ), 290 ]) 291 |> json.to_string 292 293 let request = 294 simulate.request(http.Post, "/graphql") 295 |> simulate.string_body(query) 296 |> simulate.header("content-type", "application/json") 297 298 let assert Ok(cache) = did_cache.start() 299 let response = 300 graphql_handler.handle_graphql_request( 301 request, 302 exec, 303 cache, 304 None, 305 "", 306 "https://plc.directory", 307 ) 308 309 // Verify response 310 response.status |> should.equal(200) 311 312 let assert wisp.Text(body) = response.body 313 314 // Verify response structure 315 string.contains(body, "data") |> should.be_true 316 string.contains(body, "appBskyFeedPostAggregated") |> should.be_true 317 318 // Should have counts for alice (2), bob (2), charlie (1) 319 string.contains(body, "alice") |> should.be_true 320 string.contains(body, "bob") |> should.be_true 321 string.contains(body, "charlie") |> should.be_true 322} 323 324// Test: Multi-field aggregation through GraphQL 325pub fn graphql_multi_field_aggregation_test() { 326 let assert Ok(exec) = setup_aggregation_test_db() 327 328 // Query: Group posts by author AND lang 329 let query = 330 json.object([ 331 #( 332 "query", 333 json.string( 334 "{ appBskyFeedPostAggregated(groupBy: [{field: \"author\"}, {field: \"lang\"}]) { author lang count } }", 335 ), 336 ), 337 ]) 338 |> json.to_string 339 340 let request = 341 simulate.request(http.Post, "/graphql") 342 |> simulate.string_body(query) 343 |> simulate.header("content-type", "application/json") 344 345 let assert Ok(cache) = did_cache.start() 346 let response = 347 graphql_handler.handle_graphql_request( 348 request, 349 exec, 350 cache, 351 None, 352 "", 353 "https://plc.directory", 354 ) 355 356 // Verify response 357 response.status |> should.equal(200) 358 359 let assert wisp.Text(body) = response.body 360 361 // Should have separate counts for each author+lang combination 362 string.contains(body, "alice") |> should.be_true 363 string.contains(body, "bob") |> should.be_true 364 string.contains(body, "en") |> should.be_true 365 string.contains(body, "fr") |> should.be_true 366} 367 368// Test: Aggregation with WHERE clause filtering 369pub fn graphql_aggregation_with_where_test() { 370 let assert Ok(exec) = setup_aggregation_test_db() 371 372 // First test without WHERE to ensure aggregation works 373 let query_no_where = 374 json.object([ 375 #( 376 "query", 377 json.string( 378 "{ appBskyFeedPostAggregated(groupBy: [{field: \"lang\"}]) { lang count } }", 379 ), 380 ), 381 ]) 382 |> json.to_string 383 384 let request_no_where = 385 simulate.request(http.Post, "/graphql") 386 |> simulate.string_body(query_no_where) 387 |> simulate.header("content-type", "application/json") 388 389 let assert Ok(cache2) = did_cache.start() 390 let response_no_where = 391 graphql_handler.handle_graphql_request( 392 request_no_where, 393 exec, 394 cache2, 395 None, 396 "", 397 "https://plc.directory", 398 ) 399 400 let assert wisp.Text(_body_no_where) = response_no_where.body 401 402 // Try with string field instead of integer 403 let query_string = 404 json.object([ 405 #( 406 "query", 407 json.string( 408 "{ appBskyFeedPostAggregated(groupBy: [{field: \"lang\"}], where: {author: {eq: \"alice\"}}) { lang count } }", 409 ), 410 ), 411 ]) 412 |> json.to_string 413 414 let request_string = 415 simulate.request(http.Post, "/graphql") 416 |> simulate.string_body(query_string) 417 |> simulate.header("content-type", "application/json") 418 419 let assert Ok(cache3) = did_cache.start() 420 let response_string = 421 graphql_handler.handle_graphql_request( 422 request_string, 423 exec, 424 cache3, 425 None, 426 "", 427 "https://plc.directory", 428 ) 429 430 let assert wisp.Text(_body_string) = response_string.body 431 432 // Query: Group posts by lang, but only for posts with likes >= 50 433 // Note: GraphQL integers in queries don't need quotes 434 let query = 435 json.object([ 436 #( 437 "query", 438 json.string( 439 "{ appBskyFeedPostAggregated(groupBy: [{field: \"lang\"}], where: {likes: {gte: 50}}) { lang count } }", 440 ), 441 ), 442 ]) 443 |> json.to_string 444 445 let request = 446 simulate.request(http.Post, "/graphql") 447 |> simulate.string_body(query) 448 |> simulate.header("content-type", "application/json") 449 450 let assert Ok(cache) = did_cache.start() 451 let response = 452 graphql_handler.handle_graphql_request( 453 request, 454 exec, 455 cache, 456 None, 457 "", 458 "https://plc.directory", 459 ) 460 461 // Verify response 462 response.status |> should.equal(200) 463 464 let assert wisp.Text(body) = response.body 465 466 // Should filter out posts with likes < 50 (bob/post/2 with 25 likes) 467 // Remaining: alice/post/1 (100), alice/post/2 (50), bob/post/1 (75), charlie/post/1 (200) 468 // By lang: en=3 (alice x2, charlie), fr=1 (bob) 469 response.status |> should.equal(200) 470 string.contains(body, "data") |> should.be_true 471 string.contains(body, "appBskyFeedPostAggregated") |> should.be_true 472 473 // Verify both language groups are present (JSON without quotes on keys) 474 string.contains(body, "lang") |> should.be_true 475 string.contains(body, "en") |> should.be_true 476 string.contains(body, "fr") |> should.be_true 477 478 // Verify counts are correct (en should have count 3, fr should have count 1) 479 // The response should contain both groups with their counts 480 string.contains(body, "count") |> should.be_true 481 string.contains(body, "3") |> should.be_true 482 string.contains(body, "1") |> should.be_true 483} 484 485// Test: Aggregation with ORDER BY 486pub fn graphql_aggregation_with_order_by_test() { 487 let assert Ok(exec) = setup_aggregation_test_db() 488 489 // Query: Group by lang, order by count ascending 490 let query = 491 json.object([ 492 #( 493 "query", 494 json.string( 495 "{ appBskyFeedPostAggregated(groupBy: [{field: \"lang\"}], orderBy: {count: ASC}) { lang count } }", 496 ), 497 ), 498 ]) 499 |> json.to_string 500 501 let request = 502 simulate.request(http.Post, "/graphql") 503 |> simulate.string_body(query) 504 |> simulate.header("content-type", "application/json") 505 506 let assert Ok(cache) = did_cache.start() 507 let response = 508 graphql_handler.handle_graphql_request( 509 request, 510 exec, 511 cache, 512 None, 513 "", 514 "https://plc.directory", 515 ) 516 517 // Verify response 518 response.status |> should.equal(200) 519 520 let assert wisp.Text(body) = response.body 521 522 string.contains(body, "lang") |> should.be_true 523 string.contains(body, "count") |> should.be_true 524} 525 526// Test: Aggregation with LIMIT 527pub fn graphql_aggregation_with_limit_test() { 528 let assert Ok(exec) = setup_aggregation_test_db() 529 530 // Query: Group by author, limit to 2 results 531 let query = 532 json.object([ 533 #( 534 "query", 535 json.string( 536 "{ appBskyFeedPostAggregated(groupBy: [{field: \"author\"}], limit: 2) { author count } }", 537 ), 538 ), 539 ]) 540 |> json.to_string 541 542 let request = 543 simulate.request(http.Post, "/graphql") 544 |> simulate.string_body(query) 545 |> simulate.header("content-type", "application/json") 546 547 let assert Ok(cache) = did_cache.start() 548 let response = 549 graphql_handler.handle_graphql_request( 550 request, 551 exec, 552 cache, 553 None, 554 "", 555 "https://plc.directory", 556 ) 557 558 // Verify response 559 response.status |> should.equal(200) 560 561 let assert wisp.Text(body) = response.body 562 563 string.contains(body, "author") |> should.be_true 564 string.contains(body, "count") |> should.be_true 565} 566 567// Test: Aggregation on status field (emoji grouping) 568pub fn graphql_status_aggregation_test() { 569 let assert Ok(exec) = setup_aggregation_test_db() 570 571 // Query: Group status records by status emoji 572 let query = 573 json.object([ 574 #( 575 "query", 576 json.string( 577 "{ xyzStatusphereStatusAggregated(groupBy: [{field: \"status\"}]) { status count } }", 578 ), 579 ), 580 ]) 581 |> json.to_string 582 583 let request = 584 simulate.request(http.Post, "/graphql") 585 |> simulate.string_body(query) 586 |> simulate.header("content-type", "application/json") 587 588 let assert Ok(cache) = did_cache.start() 589 let response = 590 graphql_handler.handle_graphql_request( 591 request, 592 exec, 593 cache, 594 None, 595 "", 596 "https://plc.directory", 597 ) 598 599 // Verify response 600 response.status |> should.equal(200) 601 602 let assert wisp.Text(body) = response.body 603 604 // Should have 👍 (count=2) and 🔥 (count=1) 605 string.contains(body, "👍") |> should.be_true 606 string.contains(body, "🔥") |> should.be_true 607} 608 609// Test: Direct database aggregation (not through GraphQL handler) 610pub fn database_aggregation_integration_test() { 611 let assert Ok(exec) = setup_aggregation_test_db() 612 613 // Test simple grouping by author 614 let assert Ok(results) = 615 aggregates.get_aggregated_records( 616 exec, 617 "app.bsky.feed.post", 618 [types.SimpleField("author")], 619 None, 620 True, 621 10, 622 ) 623 624 // Should have 3 groups: alice, bob, charlie 625 list.length(results) |> should.equal(3) 626 627 // Verify each result has the expected structure 628 list.each(results, fn(result) { 629 // Each result should have field_0 (author) and count 630 dict.size(result.group_values) |> should.equal(1) 631 // Count should be positive 632 should.be_true(result.count > 0) 633 }) 634} 635 636// Test: Database aggregation with multi-field grouping 637pub fn database_multi_field_aggregation_test() { 638 let assert Ok(exec) = setup_aggregation_test_db() 639 640 // Group by author and lang 641 let assert Ok(results) = 642 aggregates.get_aggregated_records( 643 exec, 644 "app.bsky.feed.post", 645 [types.SimpleField("author"), types.SimpleField("lang")], 646 None, 647 True, 648 10, 649 ) 650 651 // Should have groups for each author+lang combination 652 should.be_true(list.length(results) >= 3) 653 654 // Each result should have 2 fields (author and lang) 655 list.each(results, fn(result) { 656 dict.size(result.group_values) |> should.equal(2) 657 }) 658} 659 660// Test: Aggregation on table column (did) 661pub fn graphql_table_column_aggregation_test() { 662 let assert Ok(exec) = setup_aggregation_test_db() 663 664 // Query: Group posts by did (table column, not JSON field) 665 let query = 666 json.object([ 667 #( 668 "query", 669 json.string( 670 "{ appBskyFeedPostAggregated(groupBy: [{field: \"did\"}]) { did count } }", 671 ), 672 ), 673 ]) 674 |> json.to_string 675 676 let request = 677 simulate.request(http.Post, "/graphql") 678 |> simulate.string_body(query) 679 |> simulate.header("content-type", "application/json") 680 681 let assert Ok(cache) = did_cache.start() 682 let response = 683 graphql_handler.handle_graphql_request( 684 request, 685 exec, 686 cache, 687 None, 688 "", 689 "https://plc.directory", 690 ) 691 692 // Verify response 693 response.status |> should.equal(200) 694 695 let assert wisp.Text(body) = response.body 696 697 // Should group by DID values 698 string.contains(body, "did:plc:alice") |> should.be_true 699 string.contains(body, "did:plc:bob") |> should.be_true 700 string.contains(body, "did:plc:charlie") |> should.be_true 701} 702 703// Test: Empty aggregation result 704pub fn graphql_empty_aggregation_test() { 705 let assert Ok(exec) = setup_aggregation_test_db() 706 707 // Query: Filter that matches no records 708 let query = 709 json.object([ 710 #( 711 "query", 712 json.string( 713 "{ appBskyFeedPostAggregated(groupBy: [{field: \"author\"}], where: {likes: {gte: 1000}}) { author count } }", 714 ), 715 ), 716 ]) 717 |> json.to_string 718 719 let request = 720 simulate.request(http.Post, "/graphql") 721 |> simulate.string_body(query) 722 |> simulate.header("content-type", "application/json") 723 724 let assert Ok(cache) = did_cache.start() 725 let response = 726 graphql_handler.handle_graphql_request( 727 request, 728 exec, 729 cache, 730 None, 731 "", 732 "https://plc.directory", 733 ) 734 735 // Verify response (should still be 200 with empty results) 736 response.status |> should.equal(200) 737 738 let assert wisp.Text(body) = response.body 739 740 // Should have data field but empty array 741 string.contains(body, "data") |> should.be_true 742 string.contains(body, "appBskyFeedPostAggregated") |> should.be_true 743}