package database import ( "context" "database/sql" "fmt" "log/slog" ) func createJetstreamTable(db *sql.DB) error { createJetstreamTableSQL := `CREATE TABLE IF NOT EXISTS jetstream ( "id" integer NOT NULL PRIMARY KEY AUTOINCREMENT, "did" TEXT, "cursor" INTEGER, UNIQUE(did) );` slog.Info("Create jetstream table...") statement, err := db.Prepare(createJetstreamTableSQL) if err != nil { return fmt.Errorf("prepare DB statement to create jetstream table: %w", err) } _, err = statement.Exec() if err != nil { return fmt.Errorf("exec sql statement to create jetstream table: %w", err) } slog.Info("jetstream table created") return nil } func (d *DB) SaveCursor(ctx context.Context, did string, cursor int64) error { sql := `INSERT INTO jetstream (did, cursor) VALUES (?, ?) ON CONFLICT(did) DO UPDATE SET cursor = ?;` _, err := d.db.Exec(sql, did, cursor, cursor) if err != nil { return fmt.Errorf("exec insert or update cursor: %w", err) } return nil } func (d *DB) GetCursor(ctx context.Context, did string) (int64, error) { sql := "SELECT cursor FROM jetstream where did = ?;" rows, err := d.db.Query(sql, did) if err != nil { return 0, fmt.Errorf("run query to get cursor: %w", err) } defer rows.Close() cursor := 0 for rows.Next() { if err := rows.Scan(&cursor); err != nil { return 0, fmt.Errorf("scan row: %w", err) } return int64(cursor), nil } return 0, fmt.Errorf("not found") }