Auto-indexing service and GraphQL API for AT Protocol Records
quickslice.slices.network/
atproto
gleam
graphql
1import database/executor.{type DbError, type Dialect, type Executor, Text}
2import database/queries/where_clause
3import database/types.{
4 type DateInterval, type GroupByField, Day, Hour, Month, SimpleField,
5 TruncatedField, Week,
6}
7import gleam/dict
8import gleam/dynamic/decode
9import gleam/int
10import gleam/list
11import gleam/option.{type Option, None, Some}
12import gleam/result
13import gleam/string
14import lexicon_graphql/output/aggregate
15
16// ===== Aggregation Support =====
17
18/// Get aggregated records grouped by specified fields
19pub fn get_aggregated_records(
20 exec: Executor,
21 collection: String,
22 group_by: List(GroupByField),
23 where: Option(where_clause.WhereClause),
24 order_by_count_desc: Bool,
25 limit: Int,
26) -> Result(List(aggregate.AggregateResult), DbError) {
27 let dialect = executor.dialect(exec)
28
29 // Build SELECT clause with grouped fields
30 let select_parts =
31 group_by
32 |> list.index_map(fn(field, index) {
33 let field_name = "field_" <> int.to_string(index)
34 case field {
35 SimpleField(f) -> build_field_select(dialect, f, field_name)
36 TruncatedField(f, interval) ->
37 build_date_truncate_select(dialect, f, interval, field_name)
38 }
39 })
40 |> list.append(["COUNT(*) as count"])
41 |> string.join(", ")
42
43 // Build GROUP BY clause
44 let group_by_clause =
45 list.range(0, list.length(group_by) - 1)
46 |> list.map(fn(i) { "field_" <> int.to_string(i) })
47 |> string.join(", ")
48
49 // Check if we need to join with actor table
50 let needs_actor_join = case where {
51 Some(wc) -> where_clause.requires_actor_join(wc)
52 None -> False
53 }
54
55 // Build FROM clause with optional LEFT JOIN
56 let from_clause = case needs_actor_join {
57 True -> "record LEFT JOIN actor ON record.did = actor.did"
58 False -> "record"
59 }
60
61 // Build WHERE clause parts - start with collection filter (dialect-aware placeholder)
62 let collection_placeholder = case executor.dialect(exec) {
63 executor.SQLite -> "?"
64 executor.PostgreSQL -> "$1"
65 }
66 let mut_where_parts = ["record.collection = " <> collection_placeholder]
67 let mut_bind_values = [Text(collection)]
68
69 // Add where clause conditions if provided
70 // Note: Always use table prefix (True) because the FROM clause uses "record" as the table name
71 let #(where_parts, bind_values) = case where {
72 Some(wc) -> {
73 case where_clause.is_clause_empty(wc) {
74 True -> #(mut_where_parts, mut_bind_values)
75 False -> {
76 let #(where_sql, where_params) =
77 where_clause.build_where_sql(
78 exec,
79 wc,
80 True,
81 list.length(mut_bind_values) + 1,
82 )
83 let new_where = list.append(mut_where_parts, [where_sql])
84 let new_binds = list.append(mut_bind_values, where_params)
85 #(new_where, new_binds)
86 }
87 }
88 }
89 None -> #(mut_where_parts, mut_bind_values)
90 }
91
92 // Build ORDER BY clause
93 let order_by = case order_by_count_desc {
94 True -> "count DESC"
95 False -> "count ASC"
96 }
97
98 // Build the SQL query
99 let sql = "
100 SELECT " <> select_parts <> "
101 FROM " <> from_clause <> "
102 WHERE " <> string.join(where_parts, " AND ") <> "
103 GROUP BY " <> group_by_clause <> "
104 ORDER BY " <> order_by <> "
105 LIMIT " <> int.to_string(limit)
106
107 // Create decoder - we need to build it dynamically based on number of fields
108 let num_fields = list.length(group_by)
109
110 // Decode as list of dynamics, then post-process
111 let decoder = decode.list(decode.dynamic)
112
113 // Execute query and map results
114 executor.query(exec, sql, bind_values, decoder)
115 |> result.map(fn(rows) {
116 rows
117 |> list.map(fn(row_values) {
118 // Take first N as group fields, last as count
119 let group_values = list.take(row_values, num_fields)
120 let count = case list.last(row_values) {
121 Ok(count_dynamic) ->
122 case decode.run(count_dynamic, decode.int) {
123 Ok(n) -> n
124 Error(_) -> 0
125 }
126 Error(_) -> 0
127 }
128
129 // Build dict from field names to values
130 let field_names =
131 list.range(0, num_fields - 1)
132 |> list.map(fn(i) { "field_" <> int.to_string(i) })
133 let field_dict = dict.from_list(list.zip(field_names, group_values))
134
135 aggregate.AggregateResult(field_dict, count)
136 })
137 })
138}
139
140/// Build SELECT expression for a field (table column or JSON field)
141fn build_field_select(dialect: Dialect, field: String, alias: String) -> String {
142 case is_table_column_for_aggregate(field) {
143 True -> "record." <> field <> " as " <> alias
144 False -> {
145 // Use dialect-specific JSON extraction
146 case dialect {
147 executor.SQLite ->
148 "json_extract(record.json, '$." <> field <> "') as " <> alias
149 executor.PostgreSQL -> "record.json->>'" <> field <> "' as " <> alias
150 }
151 }
152 }
153}
154
155/// Build SELECT expression for date truncation
156fn build_date_truncate_select(
157 dialect: Dialect,
158 field: String,
159 interval: DateInterval,
160 alias: String,
161) -> String {
162 let field_ref = case is_table_column_for_aggregate(field) {
163 True -> "record." <> field
164 False -> {
165 case dialect {
166 executor.SQLite -> "json_extract(record.json, '$." <> field <> "')"
167 executor.PostgreSQL -> "record.json->>'" <> field <> "'"
168 }
169 }
170 }
171
172 // Use dialect-specific date truncation
173 case dialect {
174 executor.SQLite ->
175 case interval {
176 Hour ->
177 "strftime('%Y-%m-%d %H:00:00', " <> field_ref <> ") as " <> alias
178 Day -> "strftime('%Y-%m-%d', " <> field_ref <> ") as " <> alias
179 Week -> "strftime('%Y-W%W', " <> field_ref <> ") as " <> alias
180 Month -> "strftime('%Y-%m', " <> field_ref <> ") as " <> alias
181 }
182 executor.PostgreSQL ->
183 case interval {
184 Hour ->
185 "TO_CHAR(("
186 <> field_ref
187 <> ")::timestamp, 'YYYY-MM-DD HH24:00:00') as "
188 <> alias
189 Day ->
190 "TO_CHAR((" <> field_ref <> ")::timestamp, 'YYYY-MM-DD') as " <> alias
191 Week ->
192 "TO_CHAR(("
193 <> field_ref
194 <> ")::timestamp, 'YYYY-\"W\"IW') as "
195 <> alias
196 Month ->
197 "TO_CHAR((" <> field_ref <> ")::timestamp, 'YYYY-MM') as " <> alias
198 }
199 }
200}
201
202/// Check if field is a table column (for aggregation context)
203fn is_table_column_for_aggregate(field: String) -> Bool {
204 case field {
205 "uri" | "cid" | "did" | "collection" | "indexed_at" -> True
206 _ -> False
207 }
208}