An implementation of the ATProto statusphere example app but in Go
at update-oauth-lib 58 lines 1.4 kB view raw
1package database 2 3import ( 4 "context" 5 "database/sql" 6 "fmt" 7 "log/slog" 8) 9 10func createJetstreamTable(db *sql.DB) error { 11 createJetstreamTableSQL := `CREATE TABLE IF NOT EXISTS jetstream ( 12 "id" integer NOT NULL PRIMARY KEY, 13 "cursor" INTEGER, 14 UNIQUE(id) 15 );` 16 17 slog.Info("Create jetstream table...") 18 statement, err := db.Prepare(createJetstreamTableSQL) 19 if err != nil { 20 return fmt.Errorf("prepare DB statement to create jetstream table: %w", err) 21 } 22 _, err = statement.Exec() 23 if err != nil { 24 return fmt.Errorf("exec sql statement to create jetstream table: %w", err) 25 } 26 slog.Info("jetstream table created") 27 28 return nil 29} 30 31func (d *DB) SaveCursor(ctx context.Context, cursor int64) error { 32 sql := `INSERT INTO jetstream (id, cursor) VALUES (1, ?) ON CONFLICT(id) DO UPDATE SET cursor = ?;` 33 _, err := d.db.Exec(sql, cursor, cursor) 34 if err != nil { 35 return fmt.Errorf("exec insert or update cursor: %w", err) 36 } 37 38 return nil 39} 40 41func (d *DB) GetCursor(ctx context.Context) (int64, error) { 42 sql := "SELECT cursor FROM jetstream where id = 1;" 43 rows, err := d.db.Query(sql) 44 if err != nil { 45 return 0, fmt.Errorf("run query to get cursor: %w", err) 46 } 47 defer rows.Close() 48 49 cursor := 0 50 for rows.Next() { 51 if err := rows.Scan(&cursor); err != nil { 52 return 0, fmt.Errorf("scan row: %w", err) 53 } 54 55 return int64(cursor), nil 56 } 57 return 0, fmt.Errorf("not found") 58}