Auto-indexing service and GraphQL API for AT Protocol Records
quickslice.slices.network/
atproto
gleam
graphql
1/// Pagination utilities including cursor encoding/decoding and ORDER BY building.
2///
3/// Cursors encode the position in a result set as base64(field1|field2|...|cid)
4/// to enable stable pagination even when new records are inserted.
5///
6/// The cursor format:
7/// - All sort field values are included in the cursor
8/// - Values are separated by pipe (|) characters
9/// - CID is always the last element as the ultimate tiebreaker
10import database/executor.{type Executor}
11import database/types.{type Record}
12import gleam/bit_array
13import gleam/dict
14import gleam/dynamic
15import gleam/dynamic/decode
16import gleam/float
17import gleam/int
18import gleam/json
19import gleam/list
20import gleam/option.{type Option, None, Some}
21import gleam/result
22import gleam/string
23
24// ===== Cursor Types =====
25
26/// Decoded cursor components for pagination
27pub type DecodedCursor {
28 DecodedCursor(
29 /// Field values in the order they appear in sortBy
30 field_values: List(String),
31 /// CID (always the last element)
32 cid: String,
33 )
34}
35
36// ===== Base64 Encoding/Decoding =====
37
38/// Encodes a string to URL-safe base64 without padding
39pub fn encode_base64(input: String) -> String {
40 let bytes = bit_array.from_string(input)
41 bit_array.base64_url_encode(bytes, False)
42}
43
44/// Decodes a URL-safe base64 string without padding
45pub fn decode_base64(input: String) -> Result(String, String) {
46 case bit_array.base64_url_decode(input) {
47 Ok(bytes) ->
48 case bit_array.to_string(bytes) {
49 Ok(str) -> Ok(str)
50 Error(_) -> Error("Invalid UTF-8 in cursor")
51 }
52 Error(_) -> Error("Failed to decode base64")
53 }
54}
55
56// ===== Field Value Extraction =====
57
58/// Extracts a field value from a record.
59///
60/// Handles both table columns and JSON fields with nested paths.
61pub fn extract_field_value(record: Record, field: String) -> String {
62 case field {
63 "uri" -> record.uri
64 "cid" -> record.cid
65 "did" -> record.did
66 "collection" -> record.collection
67 "indexed_at" -> record.indexed_at
68 "rkey" -> record.rkey
69 _ -> extract_json_field(record.json, field)
70 }
71}
72
73/// Extracts a value from a JSON string using a field path
74fn extract_json_field(json_str: String, field: String) -> String {
75 let decoder = decode.dict(decode.string, decode.dynamic)
76 case json.parse(json_str, decoder) {
77 Error(_) -> "NULL"
78 Ok(parsed_dict) -> {
79 let path_parts = string.split(field, ".")
80 extract_from_dict(parsed_dict, path_parts)
81 }
82 }
83}
84
85/// Recursively extracts a value from a dict using a path
86fn extract_from_dict(
87 d: dict.Dict(String, dynamic.Dynamic),
88 path: List(String),
89) -> String {
90 case path {
91 [] -> "NULL"
92 [key] -> {
93 case dict.get(d, key) {
94 Ok(val) -> dynamic_to_string(val)
95 Error(_) -> "NULL"
96 }
97 }
98 [key, ..rest] -> {
99 case dict.get(d, key) {
100 Ok(val) -> {
101 case decode.run(val, decode.dict(decode.string, decode.dynamic)) {
102 Ok(nested_dict) -> extract_from_dict(nested_dict, rest)
103 Error(_) -> "NULL"
104 }
105 }
106 Error(_) -> "NULL"
107 }
108 }
109 }
110}
111
112/// Converts a dynamic JSON value to a string representation
113fn dynamic_to_string(value: dynamic.Dynamic) -> String {
114 case decode.run(value, decode.string) {
115 Ok(s) -> s
116 Error(_) ->
117 case decode.run(value, decode.int) {
118 Ok(i) -> int.to_string(i)
119 Error(_) ->
120 case decode.run(value, decode.float) {
121 Ok(f) -> float.to_string(f)
122 Error(_) ->
123 case decode.run(value, decode.bool) {
124 Ok(b) ->
125 case b {
126 True -> "true"
127 False -> "false"
128 }
129 Error(_) -> "NULL"
130 }
131 }
132 }
133 }
134}
135
136// ===== Cursor Generation and Decoding =====
137
138/// Generates a cursor from a record based on the sort configuration.
139///
140/// Extracts all sort field values from the record and encodes them along with the CID.
141/// Format: `base64(field1_value|field2_value|...|cid)`
142pub fn generate_cursor_from_record(
143 record: Record,
144 sort_by: Option(List(#(String, String))),
145) -> String {
146 let cursor_parts = case sort_by {
147 None -> []
148 Some(sort_fields) -> {
149 list.map(sort_fields, fn(sort_field) {
150 let #(field, _direction) = sort_field
151 extract_field_value(record, field)
152 })
153 }
154 }
155
156 let all_parts = list.append(cursor_parts, [record.cid])
157 let cursor_content = string.join(all_parts, "|")
158 encode_base64(cursor_content)
159}
160
161/// Decodes a base64-encoded cursor back into its components.
162///
163/// The cursor format is: `base64(field1|field2|...|cid)`
164pub fn decode_cursor(
165 cursor: String,
166 sort_by: Option(List(#(String, String))),
167) -> Result(DecodedCursor, String) {
168 use decoded_str <- result.try(decode_base64(cursor))
169
170 let parts = string.split(decoded_str, "|")
171
172 let expected_parts = case sort_by {
173 None -> 1
174 Some(fields) -> list.length(fields) + 1
175 }
176
177 case list.length(parts) == expected_parts {
178 False ->
179 Error(
180 "Invalid cursor format: expected "
181 <> int.to_string(expected_parts)
182 <> " parts, got "
183 <> int.to_string(list.length(parts)),
184 )
185 True -> {
186 case list.reverse(parts) {
187 [cid, ..rest_reversed] -> {
188 let field_values = list.reverse(rest_reversed)
189 Ok(DecodedCursor(field_values: field_values, cid: cid))
190 }
191 [] -> Error("Cursor has no parts")
192 }
193 }
194 }
195}
196
197// ===== Cursor WHERE Clause Building =====
198
199/// Builds cursor-based WHERE conditions for proper multi-field pagination.
200///
201/// Creates progressive equality checks for stable multi-field sorting.
202/// For each field, we OR together:
203/// 1. field1 > cursor_value1
204/// 2. field1 = cursor_value1 AND field2 > cursor_value2
205/// 3. field1 = cursor_value1 AND field2 = cursor_value2 AND field3 > cursor_value3
206/// ... and so on
207/// Finally: all fields equal AND cid > cursor_cid
208///
209/// Returns: #(where_clause_sql, bind_values)
210pub fn build_cursor_where_clause(
211 exec: Executor,
212 decoded_cursor: DecodedCursor,
213 sort_by: Option(List(#(String, String))),
214 is_before: Bool,
215 start_index: Int,
216) -> #(String, List(String)) {
217 let sort_fields = case sort_by {
218 None -> []
219 Some(fields) -> fields
220 }
221
222 case list.is_empty(sort_fields) {
223 True -> #("1=1", [])
224 False -> {
225 let clauses =
226 build_progressive_clauses(
227 exec,
228 sort_fields,
229 decoded_cursor.field_values,
230 decoded_cursor.cid,
231 is_before,
232 start_index,
233 )
234
235 let sql = "(" <> string.join(clauses.0, " OR ") <> ")"
236 #(sql, clauses.1)
237 }
238 }
239}
240
241/// Builds progressive equality clauses for cursor pagination
242fn build_progressive_clauses(
243 exec: Executor,
244 sort_fields: List(#(String, String)),
245 field_values: List(String),
246 cid: String,
247 is_before: Bool,
248 start_index: Int,
249) -> #(List(String), List(String)) {
250 // Build clauses with tracked parameter index
251 let #(clauses, params, next_index) =
252 list.index_fold(sort_fields, #([], [], start_index), fn(acc, field, i) {
253 let #(acc_clauses, acc_params, param_index) = acc
254
255 // Build equality parts for prior fields
256 let #(equality_parts, equality_params, idx_after_eq) = case i {
257 0 -> #([], [], param_index)
258 _ -> {
259 list.index_fold(
260 list.take(sort_fields, i),
261 #([], [], param_index),
262 fn(eq_acc, prior_field, j) {
263 let #(eq_parts, eq_params, eq_idx) = eq_acc
264 let value = list_at(field_values, j) |> result.unwrap("")
265 let field_ref = build_cursor_field_reference(exec, prior_field.0)
266 let placeholder = executor.placeholder(exec, eq_idx)
267 let new_part = field_ref <> " = " <> placeholder
268 #(
269 list.append(eq_parts, [new_part]),
270 list.append(eq_params, [value]),
271 eq_idx + 1,
272 )
273 },
274 )
275 }
276 }
277
278 let value = list_at(field_values, i) |> result.unwrap("")
279 let comparison_op = get_comparison_operator(field.1, is_before)
280 let field_ref = build_cursor_field_reference(exec, field.0)
281 let placeholder = executor.placeholder(exec, idx_after_eq)
282
283 let comparison_part =
284 field_ref <> " " <> comparison_op <> " " <> placeholder
285 let all_parts = list.append(equality_parts, [comparison_part])
286 let all_params = list.append(equality_params, [value])
287
288 let clause = "(" <> string.join(all_parts, " AND ") <> ")"
289
290 #(
291 list.append(acc_clauses, [clause]),
292 list.append(acc_params, all_params),
293 idx_after_eq + 1,
294 )
295 })
296
297 // Build final clause with all fields equal and CID comparison
298 let #(final_equality_parts, final_equality_params, idx_after_final_eq) =
299 list.index_fold(sort_fields, #([], [], next_index), fn(acc, field, j) {
300 let #(parts, params, idx) = acc
301 let value = list_at(field_values, j) |> result.unwrap("")
302 let field_ref = build_cursor_field_reference(exec, field.0)
303 let placeholder = executor.placeholder(exec, idx)
304 #(
305 list.append(parts, [field_ref <> " = " <> placeholder]),
306 list.append(params, [value]),
307 idx + 1,
308 )
309 })
310
311 let last_field = list.last(sort_fields) |> result.unwrap(#("", "desc"))
312 let cid_comparison_op = get_comparison_operator(last_field.1, is_before)
313 let cid_placeholder = executor.placeholder(exec, idx_after_final_eq)
314
315 let final_parts =
316 list.append(final_equality_parts, [
317 "cid " <> cid_comparison_op <> " " <> cid_placeholder,
318 ])
319 let final_params = list.append(final_equality_params, [cid])
320
321 let final_clause = "(" <> string.join(final_parts, " AND ") <> ")"
322 let all_clauses = list.append(clauses, [final_clause])
323 let all_params = list.append(params, final_params)
324
325 #(all_clauses, all_params)
326}
327
328/// Builds a field reference for cursor SQL queries (handles JSON fields)
329fn build_cursor_field_reference(exec: Executor, field: String) -> String {
330 case field {
331 "uri" | "cid" | "did" | "collection" | "indexed_at" | "rkey" -> field
332 _ -> executor.json_extract(exec, "json", field)
333 }
334}
335
336/// Gets the comparison operator based on sort direction and pagination direction
337fn get_comparison_operator(direction: String, is_before: Bool) -> String {
338 let is_desc = string.lowercase(direction) == "desc"
339
340 case is_before {
341 True ->
342 case is_desc {
343 True -> ">"
344 False -> "<"
345 }
346 False ->
347 case is_desc {
348 True -> "<"
349 False -> ">"
350 }
351 }
352}
353
354/// Helper to get an element at an index from a list
355fn list_at(l: List(a), index: Int) -> Result(a, Nil) {
356 l
357 |> list.drop(index)
358 |> list.first
359}
360
361// ===== Sort Direction Helpers =====
362
363/// Reverses sort direction for backward pagination
364pub fn reverse_sort_direction(direction: String) -> String {
365 case string.lowercase(direction) {
366 "asc" -> "desc"
367 "desc" -> "asc"
368 _ -> "asc"
369 }
370}
371
372/// Reverses all sort fields for backward pagination
373pub fn reverse_sort_fields(
374 sort_fields: List(#(String, String)),
375) -> List(#(String, String)) {
376 list.map(sort_fields, fn(field) {
377 let #(field_name, direction) = field
378 #(field_name, reverse_sort_direction(direction))
379 })
380}
381
382// ===== ORDER BY Building =====
383
384/// Builds an ORDER BY clause from sort fields
385/// use_table_prefix: if True, prefixes table columns with "record." for joins
386pub fn build_order_by(
387 exec: Executor,
388 sort_fields: List(#(String, String)),
389 use_table_prefix: Bool,
390) -> String {
391 let order_parts =
392 list.map(sort_fields, fn(field) {
393 let #(field_name, direction) = field
394 let table_prefix = case use_table_prefix {
395 True -> "record."
396 False -> ""
397 }
398 let field_ref = case field_name {
399 "uri" | "cid" | "did" | "collection" | "indexed_at" | "rkey" ->
400 table_prefix <> field_name
401 "createdAt" | "indexedAt" -> {
402 let json_field =
403 executor.json_extract(exec, table_prefix <> "json", field_name)
404 // Validate datetime - syntax differs by dialect
405 case executor.dialect(exec) {
406 executor.SQLite -> "CASE
407 WHEN " <> json_field <> " IS NULL THEN NULL
408 WHEN datetime(" <> json_field <> ") IS NULL THEN NULL
409 ELSE " <> json_field <> "
410 END"
411 executor.PostgreSQL ->
412 // PostgreSQL: check if value is a valid timestamp format
413 "CASE
414 WHEN " <> json_field <> " IS NULL THEN NULL
415 WHEN " <> json_field <> " ~ '^\\d{4}-\\d{2}-\\d{2}' THEN " <> json_field <> "
416 ELSE NULL
417 END"
418 }
419 }
420 _ -> executor.json_extract(exec, table_prefix <> "json", field_name)
421 }
422 let dir = case string.lowercase(direction) {
423 "asc" -> "ASC"
424 _ -> "DESC"
425 }
426 field_ref <> " " <> dir <> " NULLS LAST"
427 })
428
429 case list.is_empty(order_parts) {
430 True -> {
431 let prefix = case use_table_prefix {
432 True -> "record."
433 False -> ""
434 }
435 prefix <> "indexed_at DESC NULLS LAST"
436 }
437 False -> string.join(order_parts, ", ")
438 }
439}