Auto-indexing service and GraphQL API for AT Protocol Records
quickslice.slices.network/
atproto
gleam
graphql
1import database/executor.{type DbError, type Executor, Int}
2import gleam/dynamic/decode
3import gleam/option.{type Option, None, Some}
4import gleam/result
5
6// ===== Jetstream Cursor Functions =====
7
8/// Gets the current Jetstream cursor value
9pub fn get_cursor(exec: Executor) -> Result(Option(Int), DbError) {
10 let sql = "SELECT cursor FROM jetstream_cursor WHERE id = 1"
11
12 let decoder = {
13 use cursor <- decode.field(0, decode.int)
14 decode.success(cursor)
15 }
16
17 case executor.query(exec, sql, [], decoder) {
18 Ok([cursor]) -> Ok(Some(cursor))
19 Ok([]) -> Ok(None)
20 Ok(_) -> Ok(None)
21 Error(err) -> Error(err)
22 }
23}
24
25/// Sets or updates the Jetstream cursor value
26pub fn set_cursor(exec: Executor, cursor: Int) -> Result(Nil, DbError) {
27 let sql = case executor.dialect(exec) {
28 executor.SQLite ->
29 "INSERT INTO jetstream_cursor (id, cursor, updated_at)
30 VALUES (1, ?, datetime('now'))
31 ON CONFLICT(id) DO UPDATE SET
32 cursor = excluded.cursor,
33 updated_at = datetime('now')"
34 executor.PostgreSQL ->
35 "INSERT INTO jetstream_cursor (id, cursor, updated_at)
36 VALUES (1, $1, NOW())
37 ON CONFLICT(id) DO UPDATE SET
38 cursor = EXCLUDED.cursor,
39 updated_at = NOW()"
40 }
41
42 use _ <- result.try(executor.query(exec, sql, [Int(cursor)], decode.string))
43 Ok(Nil)
44}
45
46/// Clears the Jetstream cursor (for dev reset)
47pub fn clear_cursor(exec: Executor) -> Result(Nil, DbError) {
48 let sql = "DELETE FROM jetstream_cursor WHERE id = 1"
49 executor.exec(exec, sql, [])
50}