Auto-indexing service and GraphQL API for AT Protocol Records quickslice.slices.network/
atproto gleam graphql
at main 50 lines 1.5 kB view raw
1import database/executor.{type DbError, type Executor, Int} 2import gleam/dynamic/decode 3import gleam/option.{type Option, None, Some} 4import gleam/result 5 6// ===== Jetstream Cursor Functions ===== 7 8/// Gets the current Jetstream cursor value 9pub fn get_cursor(exec: Executor) -> Result(Option(Int), DbError) { 10 let sql = "SELECT cursor FROM jetstream_cursor WHERE id = 1" 11 12 let decoder = { 13 use cursor <- decode.field(0, decode.int) 14 decode.success(cursor) 15 } 16 17 case executor.query(exec, sql, [], decoder) { 18 Ok([cursor]) -> Ok(Some(cursor)) 19 Ok([]) -> Ok(None) 20 Ok(_) -> Ok(None) 21 Error(err) -> Error(err) 22 } 23} 24 25/// Sets or updates the Jetstream cursor value 26pub fn set_cursor(exec: Executor, cursor: Int) -> Result(Nil, DbError) { 27 let sql = case executor.dialect(exec) { 28 executor.SQLite -> 29 "INSERT INTO jetstream_cursor (id, cursor, updated_at) 30 VALUES (1, ?, datetime('now')) 31 ON CONFLICT(id) DO UPDATE SET 32 cursor = excluded.cursor, 33 updated_at = datetime('now')" 34 executor.PostgreSQL -> 35 "INSERT INTO jetstream_cursor (id, cursor, updated_at) 36 VALUES (1, $1, NOW()) 37 ON CONFLICT(id) DO UPDATE SET 38 cursor = EXCLUDED.cursor, 39 updated_at = NOW()" 40 } 41 42 use _ <- result.try(executor.query(exec, sql, [Int(cursor)], decode.string)) 43 Ok(Nil) 44} 45 46/// Clears the Jetstream cursor (for dev reset) 47pub fn clear_cursor(exec: Executor) -> Result(Nil, DbError) { 48 let sql = "DELETE FROM jetstream_cursor WHERE id = 1" 49 executor.exec(exec, sql, []) 50}