Auto-indexing service and GraphQL API for AT Protocol Records
quickslice.slices.network/
atproto
gleam
graphql
1/// End-to-end integration tests for GraphQL aggregated queries
2///
3/// Tests the complete aggregation flow:
4/// 1. Database setup with lexicons and records
5/// 2. GraphQL schema building with aggregate fields
6/// 3. Aggregated query execution with various parameters
7/// 4. Result formatting and verification
8import database/executor.{type Executor}
9import database/queries/aggregates
10import database/repositories/lexicons
11import database/repositories/records
12import database/types
13import gleam/dict
14import gleam/http
15import gleam/json
16import gleam/list
17import gleam/option.{None}
18import gleam/result
19import gleam/string
20import gleeunit
21import gleeunit/should
22import handlers/graphql as graphql_handler
23import lib/oauth/did_cache
24import test_helpers
25import wisp
26import wisp/simulate
27
28pub fn main() {
29 gleeunit.main()
30}
31
32// Helper to create a simple post lexicon
33fn create_post_lexicon() -> String {
34 json.object([
35 #("lexicon", json.int(1)),
36 #("id", json.string("app.bsky.feed.post")),
37 #(
38 "defs",
39 json.object([
40 #(
41 "main",
42 json.object([
43 #("type", json.string("record")),
44 #(
45 "record",
46 json.object([
47 #(
48 "properties",
49 json.object([
50 #("text", json.object([#("type", json.string("string"))])),
51 #("author", json.object([#("type", json.string("string"))])),
52 #("lang", json.object([#("type", json.string("string"))])),
53 #("likes", json.object([#("type", json.string("integer"))])),
54 ]),
55 ),
56 ]),
57 ),
58 ]),
59 ),
60 ]),
61 ),
62 ])
63 |> json.to_string
64}
65
66// Helper to create a status lexicon
67fn create_status_lexicon() -> String {
68 json.object([
69 #("lexicon", json.int(1)),
70 #("id", json.string("xyz.statusphere.status")),
71 #(
72 "defs",
73 json.object([
74 #(
75 "main",
76 json.object([
77 #("type", json.string("record")),
78 #(
79 "record",
80 json.object([
81 #(
82 "properties",
83 json.object([
84 #("status", json.object([#("type", json.string("string"))])),
85 #(
86 "createdAt",
87 json.object([
88 #("type", json.string("string")),
89 #("format", json.string("datetime")),
90 ]),
91 ),
92 ]),
93 ),
94 ]),
95 ),
96 ]),
97 ),
98 ]),
99 ),
100 ])
101 |> json.to_string
102}
103
104// Helper to setup test database with aggregatable records
105fn setup_aggregation_test_db() -> Result(Executor, String) {
106 use exec <- result.try(
107 test_helpers.create_test_db()
108 |> result.map_error(fn(_) { "Failed to connect" }),
109 )
110 use _ <- result.try(
111 test_helpers.create_lexicon_table(exec)
112 |> result.map_error(fn(_) { "Failed to create lexicon table" }),
113 )
114 use _ <- result.try(
115 test_helpers.create_record_table(exec)
116 |> result.map_error(fn(_) { "Failed to create record table" }),
117 )
118
119 // Insert post lexicon
120 let post_lexicon = create_post_lexicon()
121 use _ <- result.try(
122 lexicons.insert(exec, "app.bsky.feed.post", post_lexicon)
123 |> result.map_error(fn(_) { "Failed to insert post lexicon" }),
124 )
125
126 // Insert status lexicon
127 let status_lexicon = create_status_lexicon()
128 use _ <- result.try(
129 lexicons.insert(exec, "xyz.statusphere.status", status_lexicon)
130 |> result.map_error(fn(_) { "Failed to insert status lexicon" }),
131 )
132
133 // Insert test records with varying fields for aggregation
134 // Posts from different authors with different languages
135 use _ <- result.try(
136 records.insert(
137 exec,
138 "at://did:plc:alice/app.bsky.feed.post/1",
139 "cid1",
140 "did:plc:alice",
141 "app.bsky.feed.post",
142 json.object([
143 #("text", json.string("Hello world")),
144 #("author", json.string("alice")),
145 #("lang", json.string("en")),
146 #("likes", json.int(100)),
147 ])
148 |> json.to_string,
149 )
150 |> result.map_error(fn(_) { "insert failed" }),
151 )
152
153 use _ <- result.try(
154 records.insert(
155 exec,
156 "at://did:plc:alice/app.bsky.feed.post/2",
157 "cid2",
158 "did:plc:alice",
159 "app.bsky.feed.post",
160 json.object([
161 #("text", json.string("Another post")),
162 #("author", json.string("alice")),
163 #("lang", json.string("en")),
164 #("likes", json.int(50)),
165 ])
166 |> json.to_string,
167 )
168 |> result.map_error(fn(_) { "insert failed" }),
169 )
170
171 use _ <- result.try(
172 records.insert(
173 exec,
174 "at://did:plc:bob/app.bsky.feed.post/1",
175 "cid3",
176 "did:plc:bob",
177 "app.bsky.feed.post",
178 json.object([
179 #("text", json.string("Bonjour")),
180 #("author", json.string("bob")),
181 #("lang", json.string("fr")),
182 #("likes", json.int(75)),
183 ])
184 |> json.to_string,
185 )
186 |> result.map_error(fn(_) { "insert failed" }),
187 )
188
189 use _ <- result.try(
190 records.insert(
191 exec,
192 "at://did:plc:charlie/app.bsky.feed.post/1",
193 "cid4",
194 "did:plc:charlie",
195 "app.bsky.feed.post",
196 json.object([
197 #("text", json.string("Hello from Charlie")),
198 #("author", json.string("charlie")),
199 #("lang", json.string("en")),
200 #("likes", json.int(200)),
201 ])
202 |> json.to_string,
203 )
204 |> result.map_error(fn(_) { "insert failed" }),
205 )
206
207 use _ <- result.try(
208 records.insert(
209 exec,
210 "at://did:plc:bob/app.bsky.feed.post/2",
211 "cid5",
212 "did:plc:bob",
213 "app.bsky.feed.post",
214 json.object([
215 #("text", json.string("Salut")),
216 #("author", json.string("bob")),
217 #("lang", json.string("fr")),
218 #("likes", json.int(25)),
219 ])
220 |> json.to_string,
221 )
222 |> result.map_error(fn(_) { "insert failed" }),
223 )
224
225 // Insert status records
226 use _ <- result.try(
227 records.insert(
228 exec,
229 "at://did:plc:alice/xyz.statusphere.status/1",
230 "scid1",
231 "did:plc:alice",
232 "xyz.statusphere.status",
233 json.object([
234 #("status", json.string("👍")),
235 #("createdAt", json.string("2024-01-15T10:00:00Z")),
236 ])
237 |> json.to_string,
238 )
239 |> result.map_error(fn(_) { "insert failed" }),
240 )
241
242 use _ <- result.try(
243 records.insert(
244 exec,
245 "at://did:plc:bob/xyz.statusphere.status/1",
246 "scid2",
247 "did:plc:bob",
248 "xyz.statusphere.status",
249 json.object([
250 #("status", json.string("👍")),
251 #("createdAt", json.string("2024-01-15T11:00:00Z")),
252 ])
253 |> json.to_string,
254 )
255 |> result.map_error(fn(_) { "insert failed" }),
256 )
257
258 use _ <- result.try(
259 records.insert(
260 exec,
261 "at://did:plc:charlie/xyz.statusphere.status/1",
262 "scid3",
263 "did:plc:charlie",
264 "xyz.statusphere.status",
265 json.object([
266 #("status", json.string("🔥")),
267 #("createdAt", json.string("2024-01-16T10:00:00Z")),
268 ])
269 |> json.to_string,
270 )
271 |> result.map_error(fn(_) { "Failed to insert record" }),
272 )
273
274 Ok(exec)
275}
276
277// Test: Simple single-field aggregation through full GraphQL stack
278pub fn graphql_simple_aggregation_test() {
279 let assert Ok(exec) = setup_aggregation_test_db()
280
281 // Query: Group posts by author
282 let query =
283 json.object([
284 #(
285 "query",
286 json.string(
287 "{ appBskyFeedPostAggregated(groupBy: [{field: \"author\"}]) { author count } }",
288 ),
289 ),
290 ])
291 |> json.to_string
292
293 let request =
294 simulate.request(http.Post, "/graphql")
295 |> simulate.string_body(query)
296 |> simulate.header("content-type", "application/json")
297
298 let assert Ok(cache) = did_cache.start()
299 let response =
300 graphql_handler.handle_graphql_request(
301 request,
302 exec,
303 cache,
304 None,
305 "",
306 "https://plc.directory",
307 )
308
309 // Verify response
310 response.status |> should.equal(200)
311
312 let assert wisp.Text(body) = response.body
313
314 // Verify response structure
315 string.contains(body, "data") |> should.be_true
316 string.contains(body, "appBskyFeedPostAggregated") |> should.be_true
317
318 // Should have counts for alice (2), bob (2), charlie (1)
319 string.contains(body, "alice") |> should.be_true
320 string.contains(body, "bob") |> should.be_true
321 string.contains(body, "charlie") |> should.be_true
322}
323
324// Test: Multi-field aggregation through GraphQL
325pub fn graphql_multi_field_aggregation_test() {
326 let assert Ok(exec) = setup_aggregation_test_db()
327
328 // Query: Group posts by author AND lang
329 let query =
330 json.object([
331 #(
332 "query",
333 json.string(
334 "{ appBskyFeedPostAggregated(groupBy: [{field: \"author\"}, {field: \"lang\"}]) { author lang count } }",
335 ),
336 ),
337 ])
338 |> json.to_string
339
340 let request =
341 simulate.request(http.Post, "/graphql")
342 |> simulate.string_body(query)
343 |> simulate.header("content-type", "application/json")
344
345 let assert Ok(cache) = did_cache.start()
346 let response =
347 graphql_handler.handle_graphql_request(
348 request,
349 exec,
350 cache,
351 None,
352 "",
353 "https://plc.directory",
354 )
355
356 // Verify response
357 response.status |> should.equal(200)
358
359 let assert wisp.Text(body) = response.body
360
361 // Should have separate counts for each author+lang combination
362 string.contains(body, "alice") |> should.be_true
363 string.contains(body, "bob") |> should.be_true
364 string.contains(body, "en") |> should.be_true
365 string.contains(body, "fr") |> should.be_true
366}
367
368// Test: Aggregation with WHERE clause filtering
369pub fn graphql_aggregation_with_where_test() {
370 let assert Ok(exec) = setup_aggregation_test_db()
371
372 // First test without WHERE to ensure aggregation works
373 let query_no_where =
374 json.object([
375 #(
376 "query",
377 json.string(
378 "{ appBskyFeedPostAggregated(groupBy: [{field: \"lang\"}]) { lang count } }",
379 ),
380 ),
381 ])
382 |> json.to_string
383
384 let request_no_where =
385 simulate.request(http.Post, "/graphql")
386 |> simulate.string_body(query_no_where)
387 |> simulate.header("content-type", "application/json")
388
389 let assert Ok(cache2) = did_cache.start()
390 let response_no_where =
391 graphql_handler.handle_graphql_request(
392 request_no_where,
393 exec,
394 cache2,
395 None,
396 "",
397 "https://plc.directory",
398 )
399
400 let assert wisp.Text(_body_no_where) = response_no_where.body
401
402 // Try with string field instead of integer
403 let query_string =
404 json.object([
405 #(
406 "query",
407 json.string(
408 "{ appBskyFeedPostAggregated(groupBy: [{field: \"lang\"}], where: {author: {eq: \"alice\"}}) { lang count } }",
409 ),
410 ),
411 ])
412 |> json.to_string
413
414 let request_string =
415 simulate.request(http.Post, "/graphql")
416 |> simulate.string_body(query_string)
417 |> simulate.header("content-type", "application/json")
418
419 let assert Ok(cache3) = did_cache.start()
420 let response_string =
421 graphql_handler.handle_graphql_request(
422 request_string,
423 exec,
424 cache3,
425 None,
426 "",
427 "https://plc.directory",
428 )
429
430 let assert wisp.Text(_body_string) = response_string.body
431
432 // Query: Group posts by lang, but only for posts with likes >= 50
433 // Note: GraphQL integers in queries don't need quotes
434 let query =
435 json.object([
436 #(
437 "query",
438 json.string(
439 "{ appBskyFeedPostAggregated(groupBy: [{field: \"lang\"}], where: {likes: {gte: 50}}) { lang count } }",
440 ),
441 ),
442 ])
443 |> json.to_string
444
445 let request =
446 simulate.request(http.Post, "/graphql")
447 |> simulate.string_body(query)
448 |> simulate.header("content-type", "application/json")
449
450 let assert Ok(cache) = did_cache.start()
451 let response =
452 graphql_handler.handle_graphql_request(
453 request,
454 exec,
455 cache,
456 None,
457 "",
458 "https://plc.directory",
459 )
460
461 // Verify response
462 response.status |> should.equal(200)
463
464 let assert wisp.Text(body) = response.body
465
466 // Should filter out posts with likes < 50 (bob/post/2 with 25 likes)
467 // Remaining: alice/post/1 (100), alice/post/2 (50), bob/post/1 (75), charlie/post/1 (200)
468 // By lang: en=3 (alice x2, charlie), fr=1 (bob)
469 response.status |> should.equal(200)
470 string.contains(body, "data") |> should.be_true
471 string.contains(body, "appBskyFeedPostAggregated") |> should.be_true
472
473 // Verify both language groups are present (JSON without quotes on keys)
474 string.contains(body, "lang") |> should.be_true
475 string.contains(body, "en") |> should.be_true
476 string.contains(body, "fr") |> should.be_true
477
478 // Verify counts are correct (en should have count 3, fr should have count 1)
479 // The response should contain both groups with their counts
480 string.contains(body, "count") |> should.be_true
481 string.contains(body, "3") |> should.be_true
482 string.contains(body, "1") |> should.be_true
483}
484
485// Test: Aggregation with ORDER BY
486pub fn graphql_aggregation_with_order_by_test() {
487 let assert Ok(exec) = setup_aggregation_test_db()
488
489 // Query: Group by lang, order by count ascending
490 let query =
491 json.object([
492 #(
493 "query",
494 json.string(
495 "{ appBskyFeedPostAggregated(groupBy: [{field: \"lang\"}], orderBy: {count: ASC}) { lang count } }",
496 ),
497 ),
498 ])
499 |> json.to_string
500
501 let request =
502 simulate.request(http.Post, "/graphql")
503 |> simulate.string_body(query)
504 |> simulate.header("content-type", "application/json")
505
506 let assert Ok(cache) = did_cache.start()
507 let response =
508 graphql_handler.handle_graphql_request(
509 request,
510 exec,
511 cache,
512 None,
513 "",
514 "https://plc.directory",
515 )
516
517 // Verify response
518 response.status |> should.equal(200)
519
520 let assert wisp.Text(body) = response.body
521
522 string.contains(body, "lang") |> should.be_true
523 string.contains(body, "count") |> should.be_true
524}
525
526// Test: Aggregation with LIMIT
527pub fn graphql_aggregation_with_limit_test() {
528 let assert Ok(exec) = setup_aggregation_test_db()
529
530 // Query: Group by author, limit to 2 results
531 let query =
532 json.object([
533 #(
534 "query",
535 json.string(
536 "{ appBskyFeedPostAggregated(groupBy: [{field: \"author\"}], limit: 2) { author count } }",
537 ),
538 ),
539 ])
540 |> json.to_string
541
542 let request =
543 simulate.request(http.Post, "/graphql")
544 |> simulate.string_body(query)
545 |> simulate.header("content-type", "application/json")
546
547 let assert Ok(cache) = did_cache.start()
548 let response =
549 graphql_handler.handle_graphql_request(
550 request,
551 exec,
552 cache,
553 None,
554 "",
555 "https://plc.directory",
556 )
557
558 // Verify response
559 response.status |> should.equal(200)
560
561 let assert wisp.Text(body) = response.body
562
563 string.contains(body, "author") |> should.be_true
564 string.contains(body, "count") |> should.be_true
565}
566
567// Test: Aggregation on status field (emoji grouping)
568pub fn graphql_status_aggregation_test() {
569 let assert Ok(exec) = setup_aggregation_test_db()
570
571 // Query: Group status records by status emoji
572 let query =
573 json.object([
574 #(
575 "query",
576 json.string(
577 "{ xyzStatusphereStatusAggregated(groupBy: [{field: \"status\"}]) { status count } }",
578 ),
579 ),
580 ])
581 |> json.to_string
582
583 let request =
584 simulate.request(http.Post, "/graphql")
585 |> simulate.string_body(query)
586 |> simulate.header("content-type", "application/json")
587
588 let assert Ok(cache) = did_cache.start()
589 let response =
590 graphql_handler.handle_graphql_request(
591 request,
592 exec,
593 cache,
594 None,
595 "",
596 "https://plc.directory",
597 )
598
599 // Verify response
600 response.status |> should.equal(200)
601
602 let assert wisp.Text(body) = response.body
603
604 // Should have 👍 (count=2) and 🔥 (count=1)
605 string.contains(body, "👍") |> should.be_true
606 string.contains(body, "🔥") |> should.be_true
607}
608
609// Test: Direct database aggregation (not through GraphQL handler)
610pub fn database_aggregation_integration_test() {
611 let assert Ok(exec) = setup_aggregation_test_db()
612
613 // Test simple grouping by author
614 let assert Ok(results) =
615 aggregates.get_aggregated_records(
616 exec,
617 "app.bsky.feed.post",
618 [types.SimpleField("author")],
619 None,
620 True,
621 10,
622 )
623
624 // Should have 3 groups: alice, bob, charlie
625 list.length(results) |> should.equal(3)
626
627 // Verify each result has the expected structure
628 list.each(results, fn(result) {
629 // Each result should have field_0 (author) and count
630 dict.size(result.group_values) |> should.equal(1)
631 // Count should be positive
632 should.be_true(result.count > 0)
633 })
634}
635
636// Test: Database aggregation with multi-field grouping
637pub fn database_multi_field_aggregation_test() {
638 let assert Ok(exec) = setup_aggregation_test_db()
639
640 // Group by author and lang
641 let assert Ok(results) =
642 aggregates.get_aggregated_records(
643 exec,
644 "app.bsky.feed.post",
645 [types.SimpleField("author"), types.SimpleField("lang")],
646 None,
647 True,
648 10,
649 )
650
651 // Should have groups for each author+lang combination
652 should.be_true(list.length(results) >= 3)
653
654 // Each result should have 2 fields (author and lang)
655 list.each(results, fn(result) {
656 dict.size(result.group_values) |> should.equal(2)
657 })
658}
659
660// Test: Aggregation on table column (did)
661pub fn graphql_table_column_aggregation_test() {
662 let assert Ok(exec) = setup_aggregation_test_db()
663
664 // Query: Group posts by did (table column, not JSON field)
665 let query =
666 json.object([
667 #(
668 "query",
669 json.string(
670 "{ appBskyFeedPostAggregated(groupBy: [{field: \"did\"}]) { did count } }",
671 ),
672 ),
673 ])
674 |> json.to_string
675
676 let request =
677 simulate.request(http.Post, "/graphql")
678 |> simulate.string_body(query)
679 |> simulate.header("content-type", "application/json")
680
681 let assert Ok(cache) = did_cache.start()
682 let response =
683 graphql_handler.handle_graphql_request(
684 request,
685 exec,
686 cache,
687 None,
688 "",
689 "https://plc.directory",
690 )
691
692 // Verify response
693 response.status |> should.equal(200)
694
695 let assert wisp.Text(body) = response.body
696
697 // Should group by DID values
698 string.contains(body, "did:plc:alice") |> should.be_true
699 string.contains(body, "did:plc:bob") |> should.be_true
700 string.contains(body, "did:plc:charlie") |> should.be_true
701}
702
703// Test: Empty aggregation result
704pub fn graphql_empty_aggregation_test() {
705 let assert Ok(exec) = setup_aggregation_test_db()
706
707 // Query: Filter that matches no records
708 let query =
709 json.object([
710 #(
711 "query",
712 json.string(
713 "{ appBskyFeedPostAggregated(groupBy: [{field: \"author\"}], where: {likes: {gte: 1000}}) { author count } }",
714 ),
715 ),
716 ])
717 |> json.to_string
718
719 let request =
720 simulate.request(http.Post, "/graphql")
721 |> simulate.string_body(query)
722 |> simulate.header("content-type", "application/json")
723
724 let assert Ok(cache) = did_cache.start()
725 let response =
726 graphql_handler.handle_graphql_request(
727 request,
728 exec,
729 cache,
730 None,
731 "",
732 "https://plc.directory",
733 )
734
735 // Verify response (should still be 200 with empty results)
736 response.status |> should.equal(200)
737
738 let assert wisp.Text(body) = response.body
739
740 // Should have data field but empty array
741 string.contains(body, "data") |> should.be_true
742 string.contains(body, "appBskyFeedPostAggregated") |> should.be_true
743}