Auto-indexing service and GraphQL API for AT Protocol Records quickslice.slices.network/
atproto gleam graphql
at main 439 lines 13 kB view raw
1/// Pagination utilities including cursor encoding/decoding and ORDER BY building. 2/// 3/// Cursors encode the position in a result set as base64(field1|field2|...|cid) 4/// to enable stable pagination even when new records are inserted. 5/// 6/// The cursor format: 7/// - All sort field values are included in the cursor 8/// - Values are separated by pipe (|) characters 9/// - CID is always the last element as the ultimate tiebreaker 10import database/executor.{type Executor} 11import database/types.{type Record} 12import gleam/bit_array 13import gleam/dict 14import gleam/dynamic 15import gleam/dynamic/decode 16import gleam/float 17import gleam/int 18import gleam/json 19import gleam/list 20import gleam/option.{type Option, None, Some} 21import gleam/result 22import gleam/string 23 24// ===== Cursor Types ===== 25 26/// Decoded cursor components for pagination 27pub type DecodedCursor { 28 DecodedCursor( 29 /// Field values in the order they appear in sortBy 30 field_values: List(String), 31 /// CID (always the last element) 32 cid: String, 33 ) 34} 35 36// ===== Base64 Encoding/Decoding ===== 37 38/// Encodes a string to URL-safe base64 without padding 39pub fn encode_base64(input: String) -> String { 40 let bytes = bit_array.from_string(input) 41 bit_array.base64_url_encode(bytes, False) 42} 43 44/// Decodes a URL-safe base64 string without padding 45pub fn decode_base64(input: String) -> Result(String, String) { 46 case bit_array.base64_url_decode(input) { 47 Ok(bytes) -> 48 case bit_array.to_string(bytes) { 49 Ok(str) -> Ok(str) 50 Error(_) -> Error("Invalid UTF-8 in cursor") 51 } 52 Error(_) -> Error("Failed to decode base64") 53 } 54} 55 56// ===== Field Value Extraction ===== 57 58/// Extracts a field value from a record. 59/// 60/// Handles both table columns and JSON fields with nested paths. 61pub fn extract_field_value(record: Record, field: String) -> String { 62 case field { 63 "uri" -> record.uri 64 "cid" -> record.cid 65 "did" -> record.did 66 "collection" -> record.collection 67 "indexed_at" -> record.indexed_at 68 "rkey" -> record.rkey 69 _ -> extract_json_field(record.json, field) 70 } 71} 72 73/// Extracts a value from a JSON string using a field path 74fn extract_json_field(json_str: String, field: String) -> String { 75 let decoder = decode.dict(decode.string, decode.dynamic) 76 case json.parse(json_str, decoder) { 77 Error(_) -> "NULL" 78 Ok(parsed_dict) -> { 79 let path_parts = string.split(field, ".") 80 extract_from_dict(parsed_dict, path_parts) 81 } 82 } 83} 84 85/// Recursively extracts a value from a dict using a path 86fn extract_from_dict( 87 d: dict.Dict(String, dynamic.Dynamic), 88 path: List(String), 89) -> String { 90 case path { 91 [] -> "NULL" 92 [key] -> { 93 case dict.get(d, key) { 94 Ok(val) -> dynamic_to_string(val) 95 Error(_) -> "NULL" 96 } 97 } 98 [key, ..rest] -> { 99 case dict.get(d, key) { 100 Ok(val) -> { 101 case decode.run(val, decode.dict(decode.string, decode.dynamic)) { 102 Ok(nested_dict) -> extract_from_dict(nested_dict, rest) 103 Error(_) -> "NULL" 104 } 105 } 106 Error(_) -> "NULL" 107 } 108 } 109 } 110} 111 112/// Converts a dynamic JSON value to a string representation 113fn dynamic_to_string(value: dynamic.Dynamic) -> String { 114 case decode.run(value, decode.string) { 115 Ok(s) -> s 116 Error(_) -> 117 case decode.run(value, decode.int) { 118 Ok(i) -> int.to_string(i) 119 Error(_) -> 120 case decode.run(value, decode.float) { 121 Ok(f) -> float.to_string(f) 122 Error(_) -> 123 case decode.run(value, decode.bool) { 124 Ok(b) -> 125 case b { 126 True -> "true" 127 False -> "false" 128 } 129 Error(_) -> "NULL" 130 } 131 } 132 } 133 } 134} 135 136// ===== Cursor Generation and Decoding ===== 137 138/// Generates a cursor from a record based on the sort configuration. 139/// 140/// Extracts all sort field values from the record and encodes them along with the CID. 141/// Format: `base64(field1_value|field2_value|...|cid)` 142pub fn generate_cursor_from_record( 143 record: Record, 144 sort_by: Option(List(#(String, String))), 145) -> String { 146 let cursor_parts = case sort_by { 147 None -> [] 148 Some(sort_fields) -> { 149 list.map(sort_fields, fn(sort_field) { 150 let #(field, _direction) = sort_field 151 extract_field_value(record, field) 152 }) 153 } 154 } 155 156 let all_parts = list.append(cursor_parts, [record.cid]) 157 let cursor_content = string.join(all_parts, "|") 158 encode_base64(cursor_content) 159} 160 161/// Decodes a base64-encoded cursor back into its components. 162/// 163/// The cursor format is: `base64(field1|field2|...|cid)` 164pub fn decode_cursor( 165 cursor: String, 166 sort_by: Option(List(#(String, String))), 167) -> Result(DecodedCursor, String) { 168 use decoded_str <- result.try(decode_base64(cursor)) 169 170 let parts = string.split(decoded_str, "|") 171 172 let expected_parts = case sort_by { 173 None -> 1 174 Some(fields) -> list.length(fields) + 1 175 } 176 177 case list.length(parts) == expected_parts { 178 False -> 179 Error( 180 "Invalid cursor format: expected " 181 <> int.to_string(expected_parts) 182 <> " parts, got " 183 <> int.to_string(list.length(parts)), 184 ) 185 True -> { 186 case list.reverse(parts) { 187 [cid, ..rest_reversed] -> { 188 let field_values = list.reverse(rest_reversed) 189 Ok(DecodedCursor(field_values: field_values, cid: cid)) 190 } 191 [] -> Error("Cursor has no parts") 192 } 193 } 194 } 195} 196 197// ===== Cursor WHERE Clause Building ===== 198 199/// Builds cursor-based WHERE conditions for proper multi-field pagination. 200/// 201/// Creates progressive equality checks for stable multi-field sorting. 202/// For each field, we OR together: 203/// 1. field1 > cursor_value1 204/// 2. field1 = cursor_value1 AND field2 > cursor_value2 205/// 3. field1 = cursor_value1 AND field2 = cursor_value2 AND field3 > cursor_value3 206/// ... and so on 207/// Finally: all fields equal AND cid > cursor_cid 208/// 209/// Returns: #(where_clause_sql, bind_values) 210pub fn build_cursor_where_clause( 211 exec: Executor, 212 decoded_cursor: DecodedCursor, 213 sort_by: Option(List(#(String, String))), 214 is_before: Bool, 215 start_index: Int, 216) -> #(String, List(String)) { 217 let sort_fields = case sort_by { 218 None -> [] 219 Some(fields) -> fields 220 } 221 222 case list.is_empty(sort_fields) { 223 True -> #("1=1", []) 224 False -> { 225 let clauses = 226 build_progressive_clauses( 227 exec, 228 sort_fields, 229 decoded_cursor.field_values, 230 decoded_cursor.cid, 231 is_before, 232 start_index, 233 ) 234 235 let sql = "(" <> string.join(clauses.0, " OR ") <> ")" 236 #(sql, clauses.1) 237 } 238 } 239} 240 241/// Builds progressive equality clauses for cursor pagination 242fn build_progressive_clauses( 243 exec: Executor, 244 sort_fields: List(#(String, String)), 245 field_values: List(String), 246 cid: String, 247 is_before: Bool, 248 start_index: Int, 249) -> #(List(String), List(String)) { 250 // Build clauses with tracked parameter index 251 let #(clauses, params, next_index) = 252 list.index_fold(sort_fields, #([], [], start_index), fn(acc, field, i) { 253 let #(acc_clauses, acc_params, param_index) = acc 254 255 // Build equality parts for prior fields 256 let #(equality_parts, equality_params, idx_after_eq) = case i { 257 0 -> #([], [], param_index) 258 _ -> { 259 list.index_fold( 260 list.take(sort_fields, i), 261 #([], [], param_index), 262 fn(eq_acc, prior_field, j) { 263 let #(eq_parts, eq_params, eq_idx) = eq_acc 264 let value = list_at(field_values, j) |> result.unwrap("") 265 let field_ref = build_cursor_field_reference(exec, prior_field.0) 266 let placeholder = executor.placeholder(exec, eq_idx) 267 let new_part = field_ref <> " = " <> placeholder 268 #( 269 list.append(eq_parts, [new_part]), 270 list.append(eq_params, [value]), 271 eq_idx + 1, 272 ) 273 }, 274 ) 275 } 276 } 277 278 let value = list_at(field_values, i) |> result.unwrap("") 279 let comparison_op = get_comparison_operator(field.1, is_before) 280 let field_ref = build_cursor_field_reference(exec, field.0) 281 let placeholder = executor.placeholder(exec, idx_after_eq) 282 283 let comparison_part = 284 field_ref <> " " <> comparison_op <> " " <> placeholder 285 let all_parts = list.append(equality_parts, [comparison_part]) 286 let all_params = list.append(equality_params, [value]) 287 288 let clause = "(" <> string.join(all_parts, " AND ") <> ")" 289 290 #( 291 list.append(acc_clauses, [clause]), 292 list.append(acc_params, all_params), 293 idx_after_eq + 1, 294 ) 295 }) 296 297 // Build final clause with all fields equal and CID comparison 298 let #(final_equality_parts, final_equality_params, idx_after_final_eq) = 299 list.index_fold(sort_fields, #([], [], next_index), fn(acc, field, j) { 300 let #(parts, params, idx) = acc 301 let value = list_at(field_values, j) |> result.unwrap("") 302 let field_ref = build_cursor_field_reference(exec, field.0) 303 let placeholder = executor.placeholder(exec, idx) 304 #( 305 list.append(parts, [field_ref <> " = " <> placeholder]), 306 list.append(params, [value]), 307 idx + 1, 308 ) 309 }) 310 311 let last_field = list.last(sort_fields) |> result.unwrap(#("", "desc")) 312 let cid_comparison_op = get_comparison_operator(last_field.1, is_before) 313 let cid_placeholder = executor.placeholder(exec, idx_after_final_eq) 314 315 let final_parts = 316 list.append(final_equality_parts, [ 317 "cid " <> cid_comparison_op <> " " <> cid_placeholder, 318 ]) 319 let final_params = list.append(final_equality_params, [cid]) 320 321 let final_clause = "(" <> string.join(final_parts, " AND ") <> ")" 322 let all_clauses = list.append(clauses, [final_clause]) 323 let all_params = list.append(params, final_params) 324 325 #(all_clauses, all_params) 326} 327 328/// Builds a field reference for cursor SQL queries (handles JSON fields) 329fn build_cursor_field_reference(exec: Executor, field: String) -> String { 330 case field { 331 "uri" | "cid" | "did" | "collection" | "indexed_at" | "rkey" -> field 332 _ -> executor.json_extract(exec, "json", field) 333 } 334} 335 336/// Gets the comparison operator based on sort direction and pagination direction 337fn get_comparison_operator(direction: String, is_before: Bool) -> String { 338 let is_desc = string.lowercase(direction) == "desc" 339 340 case is_before { 341 True -> 342 case is_desc { 343 True -> ">" 344 False -> "<" 345 } 346 False -> 347 case is_desc { 348 True -> "<" 349 False -> ">" 350 } 351 } 352} 353 354/// Helper to get an element at an index from a list 355fn list_at(l: List(a), index: Int) -> Result(a, Nil) { 356 l 357 |> list.drop(index) 358 |> list.first 359} 360 361// ===== Sort Direction Helpers ===== 362 363/// Reverses sort direction for backward pagination 364pub fn reverse_sort_direction(direction: String) -> String { 365 case string.lowercase(direction) { 366 "asc" -> "desc" 367 "desc" -> "asc" 368 _ -> "asc" 369 } 370} 371 372/// Reverses all sort fields for backward pagination 373pub fn reverse_sort_fields( 374 sort_fields: List(#(String, String)), 375) -> List(#(String, String)) { 376 list.map(sort_fields, fn(field) { 377 let #(field_name, direction) = field 378 #(field_name, reverse_sort_direction(direction)) 379 }) 380} 381 382// ===== ORDER BY Building ===== 383 384/// Builds an ORDER BY clause from sort fields 385/// use_table_prefix: if True, prefixes table columns with "record." for joins 386pub fn build_order_by( 387 exec: Executor, 388 sort_fields: List(#(String, String)), 389 use_table_prefix: Bool, 390) -> String { 391 let order_parts = 392 list.map(sort_fields, fn(field) { 393 let #(field_name, direction) = field 394 let table_prefix = case use_table_prefix { 395 True -> "record." 396 False -> "" 397 } 398 let field_ref = case field_name { 399 "uri" | "cid" | "did" | "collection" | "indexed_at" | "rkey" -> 400 table_prefix <> field_name 401 "createdAt" | "indexedAt" -> { 402 let json_field = 403 executor.json_extract(exec, table_prefix <> "json", field_name) 404 // Validate datetime - syntax differs by dialect 405 case executor.dialect(exec) { 406 executor.SQLite -> "CASE 407 WHEN " <> json_field <> " IS NULL THEN NULL 408 WHEN datetime(" <> json_field <> ") IS NULL THEN NULL 409 ELSE " <> json_field <> " 410 END" 411 executor.PostgreSQL -> 412 // PostgreSQL: check if value is a valid timestamp format 413 "CASE 414 WHEN " <> json_field <> " IS NULL THEN NULL 415 WHEN " <> json_field <> " ~ '^\\d{4}-\\d{2}-\\d{2}' THEN " <> json_field <> " 416 ELSE NULL 417 END" 418 } 419 } 420 _ -> executor.json_extract(exec, table_prefix <> "json", field_name) 421 } 422 let dir = case string.lowercase(direction) { 423 "asc" -> "ASC" 424 _ -> "DESC" 425 } 426 field_ref <> " " <> dir <> " NULLS LAST" 427 }) 428 429 case list.is_empty(order_parts) { 430 True -> { 431 let prefix = case use_table_prefix { 432 True -> "record." 433 False -> "" 434 } 435 prefix <> "indexed_at DESC NULLS LAST" 436 } 437 False -> string.join(order_parts, ", ") 438 } 439}