An implementation of the ATProto statusphere example app but in Go
1package database
2
3import (
4 "context"
5 "database/sql"
6 "fmt"
7 "log/slog"
8)
9
10func createJetstreamTable(db *sql.DB) error {
11 createJetstreamTableSQL := `CREATE TABLE IF NOT EXISTS jetstream (
12 "id" integer NOT NULL PRIMARY KEY,
13 "cursor" INTEGER,
14 UNIQUE(id)
15 );`
16
17 slog.Info("Create jetstream table...")
18 statement, err := db.Prepare(createJetstreamTableSQL)
19 if err != nil {
20 return fmt.Errorf("prepare DB statement to create jetstream table: %w", err)
21 }
22 _, err = statement.Exec()
23 if err != nil {
24 return fmt.Errorf("exec sql statement to create jetstream table: %w", err)
25 }
26 slog.Info("jetstream table created")
27
28 return nil
29}
30
31func (d *DB) SaveCursor(ctx context.Context, cursor int64) error {
32 sql := `INSERT INTO jetstream (id, cursor) VALUES (1, ?) ON CONFLICT(id) DO UPDATE SET cursor = ?;`
33 _, err := d.db.Exec(sql, cursor, cursor)
34 if err != nil {
35 return fmt.Errorf("exec insert or update cursor: %w", err)
36 }
37
38 return nil
39}
40
41func (d *DB) GetCursor(ctx context.Context) (int64, error) {
42 sql := "SELECT cursor FROM jetstream where id = 1;"
43 rows, err := d.db.Query(sql)
44 if err != nil {
45 return 0, fmt.Errorf("run query to get cursor: %w", err)
46 }
47 defer rows.Close()
48
49 cursor := 0
50 for rows.Next() {
51 if err := rows.Scan(&cursor); err != nil {
52 return 0, fmt.Errorf("scan row: %w", err)
53 }
54
55 return int64(cursor), nil
56 }
57 return 0, fmt.Errorf("not found")
58}