Auto-indexing service and GraphQL API for AT Protocol Records quickslice.slices.network/
atproto gleam graphql
at main 208 lines 6.5 kB view raw
1import database/executor.{type DbError, type Dialect, type Executor, Text} 2import database/queries/where_clause 3import database/types.{ 4 type DateInterval, type GroupByField, Day, Hour, Month, SimpleField, 5 TruncatedField, Week, 6} 7import gleam/dict 8import gleam/dynamic/decode 9import gleam/int 10import gleam/list 11import gleam/option.{type Option, None, Some} 12import gleam/result 13import gleam/string 14import lexicon_graphql/output/aggregate 15 16// ===== Aggregation Support ===== 17 18/// Get aggregated records grouped by specified fields 19pub fn get_aggregated_records( 20 exec: Executor, 21 collection: String, 22 group_by: List(GroupByField), 23 where: Option(where_clause.WhereClause), 24 order_by_count_desc: Bool, 25 limit: Int, 26) -> Result(List(aggregate.AggregateResult), DbError) { 27 let dialect = executor.dialect(exec) 28 29 // Build SELECT clause with grouped fields 30 let select_parts = 31 group_by 32 |> list.index_map(fn(field, index) { 33 let field_name = "field_" <> int.to_string(index) 34 case field { 35 SimpleField(f) -> build_field_select(dialect, f, field_name) 36 TruncatedField(f, interval) -> 37 build_date_truncate_select(dialect, f, interval, field_name) 38 } 39 }) 40 |> list.append(["COUNT(*) as count"]) 41 |> string.join(", ") 42 43 // Build GROUP BY clause 44 let group_by_clause = 45 list.range(0, list.length(group_by) - 1) 46 |> list.map(fn(i) { "field_" <> int.to_string(i) }) 47 |> string.join(", ") 48 49 // Check if we need to join with actor table 50 let needs_actor_join = case where { 51 Some(wc) -> where_clause.requires_actor_join(wc) 52 None -> False 53 } 54 55 // Build FROM clause with optional LEFT JOIN 56 let from_clause = case needs_actor_join { 57 True -> "record LEFT JOIN actor ON record.did = actor.did" 58 False -> "record" 59 } 60 61 // Build WHERE clause parts - start with collection filter (dialect-aware placeholder) 62 let collection_placeholder = case executor.dialect(exec) { 63 executor.SQLite -> "?" 64 executor.PostgreSQL -> "$1" 65 } 66 let mut_where_parts = ["record.collection = " <> collection_placeholder] 67 let mut_bind_values = [Text(collection)] 68 69 // Add where clause conditions if provided 70 // Note: Always use table prefix (True) because the FROM clause uses "record" as the table name 71 let #(where_parts, bind_values) = case where { 72 Some(wc) -> { 73 case where_clause.is_clause_empty(wc) { 74 True -> #(mut_where_parts, mut_bind_values) 75 False -> { 76 let #(where_sql, where_params) = 77 where_clause.build_where_sql( 78 exec, 79 wc, 80 True, 81 list.length(mut_bind_values) + 1, 82 ) 83 let new_where = list.append(mut_where_parts, [where_sql]) 84 let new_binds = list.append(mut_bind_values, where_params) 85 #(new_where, new_binds) 86 } 87 } 88 } 89 None -> #(mut_where_parts, mut_bind_values) 90 } 91 92 // Build ORDER BY clause 93 let order_by = case order_by_count_desc { 94 True -> "count DESC" 95 False -> "count ASC" 96 } 97 98 // Build the SQL query 99 let sql = " 100 SELECT " <> select_parts <> " 101 FROM " <> from_clause <> " 102 WHERE " <> string.join(where_parts, " AND ") <> " 103 GROUP BY " <> group_by_clause <> " 104 ORDER BY " <> order_by <> " 105 LIMIT " <> int.to_string(limit) 106 107 // Create decoder - we need to build it dynamically based on number of fields 108 let num_fields = list.length(group_by) 109 110 // Decode as list of dynamics, then post-process 111 let decoder = decode.list(decode.dynamic) 112 113 // Execute query and map results 114 executor.query(exec, sql, bind_values, decoder) 115 |> result.map(fn(rows) { 116 rows 117 |> list.map(fn(row_values) { 118 // Take first N as group fields, last as count 119 let group_values = list.take(row_values, num_fields) 120 let count = case list.last(row_values) { 121 Ok(count_dynamic) -> 122 case decode.run(count_dynamic, decode.int) { 123 Ok(n) -> n 124 Error(_) -> 0 125 } 126 Error(_) -> 0 127 } 128 129 // Build dict from field names to values 130 let field_names = 131 list.range(0, num_fields - 1) 132 |> list.map(fn(i) { "field_" <> int.to_string(i) }) 133 let field_dict = dict.from_list(list.zip(field_names, group_values)) 134 135 aggregate.AggregateResult(field_dict, count) 136 }) 137 }) 138} 139 140/// Build SELECT expression for a field (table column or JSON field) 141fn build_field_select(dialect: Dialect, field: String, alias: String) -> String { 142 case is_table_column_for_aggregate(field) { 143 True -> "record." <> field <> " as " <> alias 144 False -> { 145 // Use dialect-specific JSON extraction 146 case dialect { 147 executor.SQLite -> 148 "json_extract(record.json, '$." <> field <> "') as " <> alias 149 executor.PostgreSQL -> "record.json->>'" <> field <> "' as " <> alias 150 } 151 } 152 } 153} 154 155/// Build SELECT expression for date truncation 156fn build_date_truncate_select( 157 dialect: Dialect, 158 field: String, 159 interval: DateInterval, 160 alias: String, 161) -> String { 162 let field_ref = case is_table_column_for_aggregate(field) { 163 True -> "record." <> field 164 False -> { 165 case dialect { 166 executor.SQLite -> "json_extract(record.json, '$." <> field <> "')" 167 executor.PostgreSQL -> "record.json->>'" <> field <> "'" 168 } 169 } 170 } 171 172 // Use dialect-specific date truncation 173 case dialect { 174 executor.SQLite -> 175 case interval { 176 Hour -> 177 "strftime('%Y-%m-%d %H:00:00', " <> field_ref <> ") as " <> alias 178 Day -> "strftime('%Y-%m-%d', " <> field_ref <> ") as " <> alias 179 Week -> "strftime('%Y-W%W', " <> field_ref <> ") as " <> alias 180 Month -> "strftime('%Y-%m', " <> field_ref <> ") as " <> alias 181 } 182 executor.PostgreSQL -> 183 case interval { 184 Hour -> 185 "TO_CHAR((" 186 <> field_ref 187 <> ")::timestamp, 'YYYY-MM-DD HH24:00:00') as " 188 <> alias 189 Day -> 190 "TO_CHAR((" <> field_ref <> ")::timestamp, 'YYYY-MM-DD') as " <> alias 191 Week -> 192 "TO_CHAR((" 193 <> field_ref 194 <> ")::timestamp, 'YYYY-\"W\"IW') as " 195 <> alias 196 Month -> 197 "TO_CHAR((" <> field_ref <> ")::timestamp, 'YYYY-MM') as " <> alias 198 } 199 } 200} 201 202/// Check if field is a table column (for aggregation context) 203fn is_table_column_for_aggregate(field: String) -> Bool { 204 case field { 205 "uri" | "cid" | "did" | "collection" | "indexed_at" -> True 206 _ -> False 207 } 208}