// Package db contains a vendored from github.com/bluesky-social/indigo/carstore/sqlite_store.go // Source: github.com/bluesky-social/indigo@v0.0.0-20260203235305-a86f3ae1f8ec/carstore/ // Reason: indigo's carstore hardcodes mattn/go-sqlite3, which conflicts with go-libsql // (both bundle SQLite C libraries and cannot coexist in the same binary). // // This package replaces the mattn driver with go-libsql and removes Prometheus metrics. // Once upstream accepts a driver-agnostic constructor, this vendored copy can be removed. // Modifications: // - Replaced mattn/go-sqlite3 driver with go-libsql // - Removed all Prometheus metric counters and .Inc() calls // - Changed package from 'carstore' to 'db' // - Added NewSQLiteStoreWithDB constructor for injecting an existing *sql.DB // - Changed sql.Open("sqlite3", path) to sql.Open("libsql", ...) with proper DSN package db import ( "bytes" "context" "database/sql" "errors" "fmt" "io" "log/slog" "os" "path/filepath" "strings" "go.opentelemetry.io/otel/attribute" "github.com/bluesky-social/indigo/models" blockformat "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" "github.com/ipfs/go-libipfs/blocks" "github.com/ipld/go-car" _ "github.com/tursodatabase/go-libsql" "go.opentelemetry.io/otel" ) // CarShard represents metadata about a stored shard. // Stripped of gorm tags since we don't use gorm in the SQLite store. type CarShard struct { Root models.DbCID DataStart int64 Seq int Path string Usr models.Uid Rev string } type SQLiteStore struct { dbPath string db *sql.DB ownsDB bool // true when this store opened the connection itself log *slog.Logger lastShardCache lastShardCache } func ensureDir(path string) error { fi, err := os.Stat(path) if err != nil { if os.IsNotExist(err) { return os.MkdirAll(path, 0755) } return err } if fi.IsDir() { return nil } return fmt.Errorf("%s exists but is not a directory", path) } func NewSqliteStore(csdir string) (*SQLiteStore, error) { if err := ensureDir(csdir); err != nil { return nil, err } dbpath := filepath.Join(csdir, "db.sqlite3") out := new(SQLiteStore) err := out.Open(dbpath) if err != nil { return nil, err } return out, nil } // NewSQLiteStoreWithDB creates a SQLiteStore using an existing *sql.DB connection. // This allows callers to configure the driver independently (e.g., using go-libsql // embedded replicas). The caller is responsible for the DB lifecycle. func NewSQLiteStoreWithDB(dbPath string, db *sql.DB) (*SQLiteStore, error) { sqs := &SQLiteStore{ dbPath: dbPath, db: db, log: slog.Default(), } if err := sqs.createTables(); err != nil { return nil, fmt.Errorf("%s: sqlite could not create tables, %w", dbPath, err) } sqs.lastShardCache.source = sqs sqs.lastShardCache.Init() return sqs, nil } func (sqs *SQLiteStore) Open(path string) error { if sqs.log == nil { sqs.log = slog.Default() } sqs.log.Debug("open db", "path", path) // Build DSN for go-libsql dsn := path if path == ":memory:" { dsn = ":memory:" } else if !strings.HasPrefix(path, "file:") { dsn = "file:" + path } db, err := sql.Open("libsql", dsn) if err != nil { return fmt.Errorf("%s: sqlite could not open, %w", path, err) } sqs.db = db sqs.dbPath = path sqs.ownsDB = true err = sqs.createTables() if err != nil { return fmt.Errorf("%s: sqlite could not create tables, %w", path, err) } sqs.lastShardCache.source = sqs sqs.lastShardCache.Init() return nil } func (sqs *SQLiteStore) createTables() error { tx, err := sqs.db.Begin() if err != nil { return err } defer tx.Rollback() _, err = tx.Exec("CREATE TABLE IF NOT EXISTS blocks (uid int, cid blob, rev varchar, root blob, block blob, PRIMARY KEY(uid,cid));") if err != nil { return fmt.Errorf("%s: create table blocks..., %w", sqs.dbPath, err) } _, err = tx.Exec("CREATE INDEX IF NOT EXISTS blocx_by_rev ON blocks (uid, rev DESC)") if err != nil { return fmt.Errorf("%s: create blocks by rev index, %w", sqs.dbPath, err) } return tx.Commit() } // writeNewShard needed for DeltaSession.CloseWithRoot func (sqs *SQLiteStore) writeNewShard(ctx context.Context, root cid.Cid, rev string, user models.Uid, seq int, blks map[cid.Cid]blockformat.Block, rmcids map[cid.Cid]bool) ([]byte, error) { sqs.log.Debug("write shard", "uid", user, "root", root, "rev", rev, "nblocks", len(blks)) ctx, span := otel.Tracer("carstore").Start(ctx, "writeNewShard") defer span.End() buf := new(bytes.Buffer) hnw, err := WriteCarHeader(buf, root) if err != nil { return nil, fmt.Errorf("failed to write car header: %w", err) } offset := hnw tx, err := sqs.db.BeginTx(ctx, nil) if err != nil { return nil, fmt.Errorf("bad block insert tx, %w", err) } defer tx.Rollback() insertStatement, err := tx.PrepareContext(ctx, "INSERT INTO blocks (uid, cid, rev, root, block) VALUES (?, ?, ?, ?, ?) ON CONFLICT (uid,cid) DO UPDATE SET rev=excluded.rev, root=excluded.root, block=excluded.block") if err != nil { return nil, fmt.Errorf("bad block insert sql, %w", err) } defer insertStatement.Close() dbroot := models.DbCID{CID: root} span.SetAttributes(attribute.Int("blocks", len(blks))) for bcid, block := range blks { nw, err := LdWrite(buf, bcid.Bytes(), block.RawData()) if err != nil { return nil, fmt.Errorf("failed to write block: %w", err) } offset += nw dbcid := models.DbCID{CID: bcid} blockbytes := block.RawData() _, err = insertStatement.ExecContext(ctx, user, dbcid, rev, dbroot, blockbytes) if err != nil { return nil, fmt.Errorf("(uid,cid) block store failed, %w", err) } sqs.log.Debug("put block", "uid", user, "cid", bcid, "size", len(blockbytes)) } err = tx.Commit() if err != nil { return nil, fmt.Errorf("bad block insert commit, %w", err) } shard := CarShard{ Root: models.DbCID{CID: root}, DataStart: hnw, Seq: seq, Usr: user, Rev: rev, } sqs.lastShardCache.put(&shard) return buf.Bytes(), nil } var ErrNothingThere = errors.New("nothing to read)") // GetLastShard needed for NewDeltaSession indirectly through lastShardCache func (sqs *SQLiteStore) GetLastShard(ctx context.Context, uid models.Uid) (*CarShard, error) { tx, err := sqs.db.BeginTx(ctx, &txReadOnly) if err != nil { return nil, fmt.Errorf("bad last shard tx, %w", err) } defer tx.Rollback() qstmt, err := tx.PrepareContext(ctx, "SELECT rev, root FROM blocks WHERE uid = ? ORDER BY rev DESC LIMIT 1") if err != nil { return nil, fmt.Errorf("bad last shard sql, %w", err) } rows, err := qstmt.QueryContext(ctx, uid) if err != nil { return nil, fmt.Errorf("last shard err, %w", err) } if rows.Next() { var rev string var rootb models.DbCID err = rows.Scan(&rev, &rootb) if err != nil { return nil, fmt.Errorf("last shard bad scan, %w", err) } return &CarShard{ Root: rootb, Rev: rev, }, nil } return nil, nil } func (sqs *SQLiteStore) CompactUserShards(ctx context.Context, user models.Uid, skipBigShards bool) (*CompactionStats, error) { sqs.log.Warn("TODO: don't call compaction") return nil, nil } func (sqs *SQLiteStore) GetCompactionTargets(ctx context.Context, shardCount int) ([]CompactionTarget, error) { sqs.log.Warn("TODO: don't call compaction targets") return nil, nil } func (sqs *SQLiteStore) GetUserRepoHead(ctx context.Context, user models.Uid) (cid.Cid, error) { lastShard, err := sqs.lastShardCache.get(ctx, user) if err != nil { return cid.Undef, err } if lastShard == nil { return cid.Undef, nil } return lastShard.Root.CID, nil } func (sqs *SQLiteStore) GetUserRepoRev(ctx context.Context, user models.Uid) (string, error) { lastShard, err := sqs.lastShardCache.get(ctx, user) if err != nil { return "", err } if lastShard == nil { return "", nil } return lastShard.Rev, nil } func (sqs *SQLiteStore) ImportSlice(ctx context.Context, uid models.Uid, since *string, carslice []byte) (cid.Cid, *DeltaSession, error) { ctx, span := otel.Tracer("carstore").Start(ctx, "ImportSlice") defer span.End() carr, err := car.NewCarReader(bytes.NewReader(carslice)) if err != nil { return cid.Undef, nil, err } if len(carr.Header.Roots) != 1 { return cid.Undef, nil, fmt.Errorf("invalid car file, header must have a single root (has %d)", len(carr.Header.Roots)) } ds, err := sqs.NewDeltaSession(ctx, uid, since) if err != nil { return cid.Undef, nil, fmt.Errorf("new delta session failed: %w", err) } for { blk, err := carr.Next() if err != nil { if err == io.EOF { break } return cid.Undef, nil, err } if err := ds.Put(ctx, blk); err != nil { return cid.Undef, nil, err } } return carr.Header.Roots[0], ds, nil } var zeroShard CarShard func (sqs *SQLiteStore) NewDeltaSession(ctx context.Context, user models.Uid, since *string) (*DeltaSession, error) { ctx, span := otel.Tracer("carstore").Start(ctx, "NewSession") defer span.End() lastShard, err := sqs.lastShardCache.get(ctx, user) if err != nil { return nil, fmt.Errorf("NewDeltaSession, lsc, %w", err) } if lastShard == nil { lastShard = &zeroShard } if since != nil && *since != lastShard.Rev { return nil, fmt.Errorf("revision mismatch: %s != %s: %w", *since, lastShard.Rev, ErrRepoBaseMismatch) } return &DeltaSession{ blks: make(map[cid.Cid]blockformat.Block), base: &sqliteUserView{ uid: user, sqs: sqs, }, user: user, baseCid: lastShard.Root.CID, cs: sqs, seq: lastShard.Seq + 1, lastRev: lastShard.Rev, }, nil } func (sqs *SQLiteStore) ReadOnlySession(user models.Uid) (*DeltaSession, error) { return &DeltaSession{ base: &sqliteUserView{ uid: user, sqs: sqs, }, readonly: true, user: user, cs: sqs, }, nil } // ReadUserCar writes a CAR file for the user's blocks since sinceRev. func (sqs *SQLiteStore) ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, shardOut io.Writer) error { ctx, span := otel.Tracer("carstore").Start(ctx, "ReadUserCar") defer span.End() tx, err := sqs.db.BeginTx(ctx, &txReadOnly) if err != nil { return fmt.Errorf("rcar tx, %w", err) } defer tx.Rollback() qstmt, err := tx.PrepareContext(ctx, "SELECT cid,rev,root,block FROM blocks WHERE uid = ? AND rev > ? ORDER BY rev DESC") if err != nil { return fmt.Errorf("rcar sql, %w", err) } defer qstmt.Close() rows, err := qstmt.QueryContext(ctx, user, sinceRev) if err != nil { return fmt.Errorf("rcar err, %w", err) } nblocks := 0 first := true for rows.Next() { var xcid models.DbCID var xrev string var xroot models.DbCID var xblock []byte err = rows.Scan(&xcid, &xrev, &xroot, &xblock) if err != nil { return fmt.Errorf("rcar bad scan, %w", err) } if first { if err := car.WriteHeader(&car.CarHeader{ Roots: []cid.Cid{xroot.CID}, Version: 1, }, shardOut); err != nil { return fmt.Errorf("rcar bad header, %w", err) } first = false } nblocks++ _, err := LdWrite(shardOut, xcid.CID.Bytes(), xblock) if err != nil { return fmt.Errorf("rcar bad write, %w", err) } } sqs.log.Debug("read car", "nblocks", nblocks, "since", sinceRev) return nil } // Stat is only used in a debugging admin handler func (sqs *SQLiteStore) Stat(ctx context.Context, usr models.Uid) ([]UserStat, error) { sqs.log.Warn("Stat debugging method not implemented for sqlite store") return nil, nil } func (sqs *SQLiteStore) WipeUserData(ctx context.Context, user models.Uid) error { ctx, span := otel.Tracer("carstore").Start(ctx, "WipeUserData") defer span.End() tx, err := sqs.db.BeginTx(ctx, nil) if err != nil { return fmt.Errorf("wipe tx, %w", err) } defer tx.Rollback() _, err = tx.ExecContext(ctx, "DELETE FROM blocks WHERE uid = ?", user) if err == nil { err = tx.Commit() } return err } // go-libsql does not support ReadOnly transactions, so we use default options. var txReadOnly = sql.TxOptions{} // HasUIDCid needed for NewDeltaSession userView func (sqs *SQLiteStore) HasUIDCid(ctx context.Context, user models.Uid, bcid cid.Cid) (bool, error) { tx, err := sqs.db.BeginTx(ctx, &txReadOnly) if err != nil { return false, fmt.Errorf("hasUC tx, %w", err) } defer tx.Rollback() qstmt, err := tx.PrepareContext(ctx, "SELECT rev, root FROM blocks WHERE uid = ? AND cid = ? LIMIT 1") if err != nil { return false, fmt.Errorf("hasUC sql, %w", err) } defer qstmt.Close() rows, err := qstmt.QueryContext(ctx, user, models.DbCID{CID: bcid}) if err != nil { return false, fmt.Errorf("hasUC err, %w", err) } if rows.Next() { var rev string var rootb models.DbCID err = rows.Scan(&rev, &rootb) if err != nil { return false, fmt.Errorf("hasUC bad scan, %w", err) } return true, nil } return false, nil } func (sqs *SQLiteStore) CarStore() CarStore { return sqs } func (sqs *SQLiteStore) Close() error { if sqs.ownsDB { return sqs.db.Close() } return nil } func (sqs *SQLiteStore) getBlock(ctx context.Context, user models.Uid, bcid cid.Cid) (blockformat.Block, error) { tx, err := sqs.db.BeginTx(ctx, &txReadOnly) if err != nil { return nil, fmt.Errorf("getb tx, %w", err) } defer tx.Rollback() qstmt, err := tx.PrepareContext(ctx, "SELECT block FROM blocks WHERE uid = ? AND cid = ? LIMIT 1") if err != nil { return nil, fmt.Errorf("getb sql, %w", err) } defer qstmt.Close() rows, err := qstmt.QueryContext(ctx, user, models.DbCID{CID: bcid}) if err != nil { return nil, fmt.Errorf("getb err, %w", err) } if rows.Next() { var blockb []byte err = rows.Scan(&blockb) if err != nil { return nil, fmt.Errorf("getb bad scan, %w", err) } blk, err := blocks.NewBlockWithCid(blockb, bcid) if err != nil { return nil, fmt.Errorf("getb bad block, %w", err) } return blk, nil } return nil, ErrNothingThere } func (sqs *SQLiteStore) getBlockSize(ctx context.Context, user models.Uid, bcid cid.Cid) (int64, error) { tx, err := sqs.db.BeginTx(ctx, &txReadOnly) if err != nil { return 0, fmt.Errorf("getbs tx, %w", err) } defer tx.Rollback() qstmt, err := tx.PrepareContext(ctx, "SELECT length(block) FROM blocks WHERE uid = ? AND cid = ? LIMIT 1") if err != nil { return 0, fmt.Errorf("getbs sql, %w", err) } defer qstmt.Close() rows, err := qstmt.QueryContext(ctx, user, models.DbCID{CID: bcid}) if err != nil { return 0, fmt.Errorf("getbs err, %w", err) } if rows.Next() { var out int64 err = rows.Scan(&out) if err != nil { return 0, fmt.Errorf("getbs bad scan, %w", err) } return out, nil } return 0, nil } type sqliteUserViewInner interface { HasUIDCid(ctx context.Context, user models.Uid, bcid cid.Cid) (bool, error) getBlock(ctx context.Context, user models.Uid, bcid cid.Cid) (blockformat.Block, error) getBlockSize(ctx context.Context, user models.Uid, bcid cid.Cid) (int64, error) } type sqliteUserView struct { sqs sqliteUserViewInner uid models.Uid } func (s sqliteUserView) Has(ctx context.Context, c cid.Cid) (bool, error) { return s.sqs.HasUIDCid(ctx, s.uid, c) } func (s sqliteUserView) Get(ctx context.Context, c cid.Cid) (blockformat.Block, error) { return s.sqs.getBlock(ctx, s.uid, c) } func (s sqliteUserView) GetSize(ctx context.Context, c cid.Cid) (int, error) { bigsize, err := s.sqs.getBlockSize(ctx, s.uid, c) return int(bigsize), err } // ensure we implement the interface var _ minBlockstore = (*sqliteUserView)(nil)