A container registry that uses the AT Protocol for manifest storage and S3 for blob storage.
atcr.io
docker
container
atproto
go
1// Package db contains a vendored from github.com/bluesky-social/indigo/carstore/sqlite_store.go
2// Source: github.com/bluesky-social/indigo@v0.0.0-20260203235305-a86f3ae1f8ec/carstore/
3// Reason: indigo's carstore hardcodes mattn/go-sqlite3, which conflicts with go-libsql
4// (both bundle SQLite C libraries and cannot coexist in the same binary).
5//
6// This package replaces the mattn driver with go-libsql and removes Prometheus metrics.
7// Once upstream accepts a driver-agnostic constructor, this vendored copy can be removed.
8// Modifications:
9// - Replaced mattn/go-sqlite3 driver with go-libsql
10// - Removed all Prometheus metric counters and .Inc() calls
11// - Changed package from 'carstore' to 'db'
12// - Added NewSQLiteStoreWithDB constructor for injecting an existing *sql.DB
13// - Changed sql.Open("sqlite3", path) to sql.Open("libsql", ...) with proper DSN
14package db
15
16import (
17 "bytes"
18 "context"
19 "database/sql"
20 "errors"
21 "fmt"
22 "io"
23 "log/slog"
24 "os"
25 "path/filepath"
26 "strings"
27
28 "go.opentelemetry.io/otel/attribute"
29
30 "github.com/bluesky-social/indigo/models"
31 blockformat "github.com/ipfs/go-block-format"
32 "github.com/ipfs/go-cid"
33 "github.com/ipfs/go-libipfs/blocks"
34 "github.com/ipld/go-car"
35 _ "github.com/tursodatabase/go-libsql"
36 "go.opentelemetry.io/otel"
37)
38
39// CarShard represents metadata about a stored shard.
40// Stripped of gorm tags since we don't use gorm in the SQLite store.
41type CarShard struct {
42 Root models.DbCID
43 DataStart int64
44 Seq int
45 Path string
46 Usr models.Uid
47 Rev string
48}
49
50type SQLiteStore struct {
51 dbPath string
52 db *sql.DB
53 ownsDB bool // true when this store opened the connection itself
54
55 log *slog.Logger
56
57 lastShardCache lastShardCache
58}
59
60func ensureDir(path string) error {
61 fi, err := os.Stat(path)
62 if err != nil {
63 if os.IsNotExist(err) {
64 return os.MkdirAll(path, 0755)
65 }
66 return err
67 }
68 if fi.IsDir() {
69 return nil
70 }
71 return fmt.Errorf("%s exists but is not a directory", path)
72}
73
74func NewSqliteStore(csdir string) (*SQLiteStore, error) {
75 if err := ensureDir(csdir); err != nil {
76 return nil, err
77 }
78 dbpath := filepath.Join(csdir, "db.sqlite3")
79 out := new(SQLiteStore)
80 err := out.Open(dbpath)
81 if err != nil {
82 return nil, err
83 }
84 return out, nil
85}
86
87// NewSQLiteStoreWithDB creates a SQLiteStore using an existing *sql.DB connection.
88// This allows callers to configure the driver independently (e.g., using go-libsql
89// embedded replicas). The caller is responsible for the DB lifecycle.
90func NewSQLiteStoreWithDB(dbPath string, db *sql.DB) (*SQLiteStore, error) {
91 sqs := &SQLiteStore{
92 dbPath: dbPath,
93 db: db,
94 log: slog.Default(),
95 }
96 if err := sqs.createTables(); err != nil {
97 return nil, fmt.Errorf("%s: sqlite could not create tables, %w", dbPath, err)
98 }
99 sqs.lastShardCache.source = sqs
100 sqs.lastShardCache.Init()
101 return sqs, nil
102}
103
104func (sqs *SQLiteStore) Open(path string) error {
105 if sqs.log == nil {
106 sqs.log = slog.Default()
107 }
108 sqs.log.Debug("open db", "path", path)
109
110 // Build DSN for go-libsql
111 dsn := path
112 if path == ":memory:" {
113 dsn = ":memory:"
114 } else if !strings.HasPrefix(path, "file:") {
115 dsn = "file:" + path
116 }
117
118 db, err := sql.Open("libsql", dsn)
119 if err != nil {
120 return fmt.Errorf("%s: sqlite could not open, %w", path, err)
121 }
122 sqs.db = db
123 sqs.dbPath = path
124 sqs.ownsDB = true
125 err = sqs.createTables()
126 if err != nil {
127 return fmt.Errorf("%s: sqlite could not create tables, %w", path, err)
128 }
129 sqs.lastShardCache.source = sqs
130 sqs.lastShardCache.Init()
131 return nil
132}
133
134func (sqs *SQLiteStore) createTables() error {
135 tx, err := sqs.db.Begin()
136 if err != nil {
137 return err
138 }
139 defer tx.Rollback()
140 _, err = tx.Exec("CREATE TABLE IF NOT EXISTS blocks (uid int, cid blob, rev varchar, root blob, block blob, PRIMARY KEY(uid,cid));")
141 if err != nil {
142 return fmt.Errorf("%s: create table blocks..., %w", sqs.dbPath, err)
143 }
144 _, err = tx.Exec("CREATE INDEX IF NOT EXISTS blocx_by_rev ON blocks (uid, rev DESC)")
145 if err != nil {
146 return fmt.Errorf("%s: create blocks by rev index, %w", sqs.dbPath, err)
147 }
148 return tx.Commit()
149}
150
151// writeNewShard needed for DeltaSession.CloseWithRoot
152func (sqs *SQLiteStore) writeNewShard(ctx context.Context, root cid.Cid, rev string, user models.Uid, seq int, blks map[cid.Cid]blockformat.Block, rmcids map[cid.Cid]bool) ([]byte, error) {
153 sqs.log.Debug("write shard", "uid", user, "root", root, "rev", rev, "nblocks", len(blks))
154 ctx, span := otel.Tracer("carstore").Start(ctx, "writeNewShard")
155 defer span.End()
156
157 buf := new(bytes.Buffer)
158 hnw, err := WriteCarHeader(buf, root)
159 if err != nil {
160 return nil, fmt.Errorf("failed to write car header: %w", err)
161 }
162 offset := hnw
163
164 tx, err := sqs.db.BeginTx(ctx, nil)
165 if err != nil {
166 return nil, fmt.Errorf("bad block insert tx, %w", err)
167 }
168 defer tx.Rollback()
169 insertStatement, err := tx.PrepareContext(ctx, "INSERT INTO blocks (uid, cid, rev, root, block) VALUES (?, ?, ?, ?, ?) ON CONFLICT (uid,cid) DO UPDATE SET rev=excluded.rev, root=excluded.root, block=excluded.block")
170 if err != nil {
171 return nil, fmt.Errorf("bad block insert sql, %w", err)
172 }
173 defer insertStatement.Close()
174
175 dbroot := models.DbCID{CID: root}
176
177 span.SetAttributes(attribute.Int("blocks", len(blks)))
178
179 for bcid, block := range blks {
180 nw, err := LdWrite(buf, bcid.Bytes(), block.RawData())
181 if err != nil {
182 return nil, fmt.Errorf("failed to write block: %w", err)
183 }
184 offset += nw
185
186 dbcid := models.DbCID{CID: bcid}
187 blockbytes := block.RawData()
188 _, err = insertStatement.ExecContext(ctx, user, dbcid, rev, dbroot, blockbytes)
189 if err != nil {
190 return nil, fmt.Errorf("(uid,cid) block store failed, %w", err)
191 }
192 sqs.log.Debug("put block", "uid", user, "cid", bcid, "size", len(blockbytes))
193 }
194 err = tx.Commit()
195 if err != nil {
196 return nil, fmt.Errorf("bad block insert commit, %w", err)
197 }
198
199 shard := CarShard{
200 Root: models.DbCID{CID: root},
201 DataStart: hnw,
202 Seq: seq,
203 Usr: user,
204 Rev: rev,
205 }
206
207 sqs.lastShardCache.put(&shard)
208
209 return buf.Bytes(), nil
210}
211
212var ErrNothingThere = errors.New("nothing to read)")
213
214// GetLastShard needed for NewDeltaSession indirectly through lastShardCache
215func (sqs *SQLiteStore) GetLastShard(ctx context.Context, uid models.Uid) (*CarShard, error) {
216 tx, err := sqs.db.BeginTx(ctx, &txReadOnly)
217 if err != nil {
218 return nil, fmt.Errorf("bad last shard tx, %w", err)
219 }
220 defer tx.Rollback()
221 qstmt, err := tx.PrepareContext(ctx, "SELECT rev, root FROM blocks WHERE uid = ? ORDER BY rev DESC LIMIT 1")
222 if err != nil {
223 return nil, fmt.Errorf("bad last shard sql, %w", err)
224 }
225 rows, err := qstmt.QueryContext(ctx, uid)
226 if err != nil {
227 return nil, fmt.Errorf("last shard err, %w", err)
228 }
229 if rows.Next() {
230 var rev string
231 var rootb models.DbCID
232 err = rows.Scan(&rev, &rootb)
233 if err != nil {
234 return nil, fmt.Errorf("last shard bad scan, %w", err)
235 }
236 return &CarShard{
237 Root: rootb,
238 Rev: rev,
239 }, nil
240 }
241 return nil, nil
242}
243
244func (sqs *SQLiteStore) CompactUserShards(ctx context.Context, user models.Uid, skipBigShards bool) (*CompactionStats, error) {
245 sqs.log.Warn("TODO: don't call compaction")
246 return nil, nil
247}
248
249func (sqs *SQLiteStore) GetCompactionTargets(ctx context.Context, shardCount int) ([]CompactionTarget, error) {
250 sqs.log.Warn("TODO: don't call compaction targets")
251 return nil, nil
252}
253
254func (sqs *SQLiteStore) GetUserRepoHead(ctx context.Context, user models.Uid) (cid.Cid, error) {
255 lastShard, err := sqs.lastShardCache.get(ctx, user)
256 if err != nil {
257 return cid.Undef, err
258 }
259 if lastShard == nil {
260 return cid.Undef, nil
261 }
262
263 return lastShard.Root.CID, nil
264}
265
266func (sqs *SQLiteStore) GetUserRepoRev(ctx context.Context, user models.Uid) (string, error) {
267 lastShard, err := sqs.lastShardCache.get(ctx, user)
268 if err != nil {
269 return "", err
270 }
271 if lastShard == nil {
272 return "", nil
273 }
274
275 return lastShard.Rev, nil
276}
277
278func (sqs *SQLiteStore) ImportSlice(ctx context.Context, uid models.Uid, since *string, carslice []byte) (cid.Cid, *DeltaSession, error) {
279 ctx, span := otel.Tracer("carstore").Start(ctx, "ImportSlice")
280 defer span.End()
281
282 carr, err := car.NewCarReader(bytes.NewReader(carslice))
283 if err != nil {
284 return cid.Undef, nil, err
285 }
286
287 if len(carr.Header.Roots) != 1 {
288 return cid.Undef, nil, fmt.Errorf("invalid car file, header must have a single root (has %d)", len(carr.Header.Roots))
289 }
290
291 ds, err := sqs.NewDeltaSession(ctx, uid, since)
292 if err != nil {
293 return cid.Undef, nil, fmt.Errorf("new delta session failed: %w", err)
294 }
295
296 for {
297 blk, err := carr.Next()
298 if err != nil {
299 if err == io.EOF {
300 break
301 }
302 return cid.Undef, nil, err
303 }
304
305 if err := ds.Put(ctx, blk); err != nil {
306 return cid.Undef, nil, err
307 }
308 }
309
310 return carr.Header.Roots[0], ds, nil
311}
312
313var zeroShard CarShard
314
315func (sqs *SQLiteStore) NewDeltaSession(ctx context.Context, user models.Uid, since *string) (*DeltaSession, error) {
316 ctx, span := otel.Tracer("carstore").Start(ctx, "NewSession")
317 defer span.End()
318
319 lastShard, err := sqs.lastShardCache.get(ctx, user)
320 if err != nil {
321 return nil, fmt.Errorf("NewDeltaSession, lsc, %w", err)
322 }
323
324 if lastShard == nil {
325 lastShard = &zeroShard
326 }
327
328 if since != nil && *since != lastShard.Rev {
329 return nil, fmt.Errorf("revision mismatch: %s != %s: %w", *since, lastShard.Rev, ErrRepoBaseMismatch)
330 }
331
332 return &DeltaSession{
333 blks: make(map[cid.Cid]blockformat.Block),
334 base: &sqliteUserView{
335 uid: user,
336 sqs: sqs,
337 },
338 user: user,
339 baseCid: lastShard.Root.CID,
340 cs: sqs,
341 seq: lastShard.Seq + 1,
342 lastRev: lastShard.Rev,
343 }, nil
344}
345
346func (sqs *SQLiteStore) ReadOnlySession(user models.Uid) (*DeltaSession, error) {
347 return &DeltaSession{
348 base: &sqliteUserView{
349 uid: user,
350 sqs: sqs,
351 },
352 readonly: true,
353 user: user,
354 cs: sqs,
355 }, nil
356}
357
358// ReadUserCar writes a CAR file for the user's blocks since sinceRev.
359func (sqs *SQLiteStore) ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, shardOut io.Writer) error {
360 ctx, span := otel.Tracer("carstore").Start(ctx, "ReadUserCar")
361 defer span.End()
362
363 tx, err := sqs.db.BeginTx(ctx, &txReadOnly)
364 if err != nil {
365 return fmt.Errorf("rcar tx, %w", err)
366 }
367 defer tx.Rollback()
368 qstmt, err := tx.PrepareContext(ctx, "SELECT cid,rev,root,block FROM blocks WHERE uid = ? AND rev > ? ORDER BY rev DESC")
369 if err != nil {
370 return fmt.Errorf("rcar sql, %w", err)
371 }
372 defer qstmt.Close()
373 rows, err := qstmt.QueryContext(ctx, user, sinceRev)
374 if err != nil {
375 return fmt.Errorf("rcar err, %w", err)
376 }
377 nblocks := 0
378 first := true
379 for rows.Next() {
380 var xcid models.DbCID
381 var xrev string
382 var xroot models.DbCID
383 var xblock []byte
384 err = rows.Scan(&xcid, &xrev, &xroot, &xblock)
385 if err != nil {
386 return fmt.Errorf("rcar bad scan, %w", err)
387 }
388 if first {
389 if err := car.WriteHeader(&car.CarHeader{
390 Roots: []cid.Cid{xroot.CID},
391 Version: 1,
392 }, shardOut); err != nil {
393 return fmt.Errorf("rcar bad header, %w", err)
394 }
395 first = false
396 }
397 nblocks++
398 _, err := LdWrite(shardOut, xcid.CID.Bytes(), xblock)
399 if err != nil {
400 return fmt.Errorf("rcar bad write, %w", err)
401 }
402 }
403 sqs.log.Debug("read car", "nblocks", nblocks, "since", sinceRev)
404 return nil
405}
406
407// Stat is only used in a debugging admin handler
408func (sqs *SQLiteStore) Stat(ctx context.Context, usr models.Uid) ([]UserStat, error) {
409 sqs.log.Warn("Stat debugging method not implemented for sqlite store")
410 return nil, nil
411}
412
413func (sqs *SQLiteStore) WipeUserData(ctx context.Context, user models.Uid) error {
414 ctx, span := otel.Tracer("carstore").Start(ctx, "WipeUserData")
415 defer span.End()
416 tx, err := sqs.db.BeginTx(ctx, nil)
417 if err != nil {
418 return fmt.Errorf("wipe tx, %w", err)
419 }
420 defer tx.Rollback()
421 _, err = tx.ExecContext(ctx, "DELETE FROM blocks WHERE uid = ?", user)
422 if err == nil {
423 err = tx.Commit()
424 }
425 return err
426}
427
428// go-libsql does not support ReadOnly transactions, so we use default options.
429var txReadOnly = sql.TxOptions{}
430
431// HasUIDCid needed for NewDeltaSession userView
432func (sqs *SQLiteStore) HasUIDCid(ctx context.Context, user models.Uid, bcid cid.Cid) (bool, error) {
433 tx, err := sqs.db.BeginTx(ctx, &txReadOnly)
434 if err != nil {
435 return false, fmt.Errorf("hasUC tx, %w", err)
436 }
437 defer tx.Rollback()
438 qstmt, err := tx.PrepareContext(ctx, "SELECT rev, root FROM blocks WHERE uid = ? AND cid = ? LIMIT 1")
439 if err != nil {
440 return false, fmt.Errorf("hasUC sql, %w", err)
441 }
442 defer qstmt.Close()
443 rows, err := qstmt.QueryContext(ctx, user, models.DbCID{CID: bcid})
444 if err != nil {
445 return false, fmt.Errorf("hasUC err, %w", err)
446 }
447 if rows.Next() {
448 var rev string
449 var rootb models.DbCID
450 err = rows.Scan(&rev, &rootb)
451 if err != nil {
452 return false, fmt.Errorf("hasUC bad scan, %w", err)
453 }
454 return true, nil
455 }
456 return false, nil
457}
458
459func (sqs *SQLiteStore) CarStore() CarStore {
460 return sqs
461}
462
463func (sqs *SQLiteStore) Close() error {
464 if sqs.ownsDB {
465 return sqs.db.Close()
466 }
467 return nil
468}
469
470func (sqs *SQLiteStore) getBlock(ctx context.Context, user models.Uid, bcid cid.Cid) (blockformat.Block, error) {
471 tx, err := sqs.db.BeginTx(ctx, &txReadOnly)
472 if err != nil {
473 return nil, fmt.Errorf("getb tx, %w", err)
474 }
475 defer tx.Rollback()
476 qstmt, err := tx.PrepareContext(ctx, "SELECT block FROM blocks WHERE uid = ? AND cid = ? LIMIT 1")
477 if err != nil {
478 return nil, fmt.Errorf("getb sql, %w", err)
479 }
480 defer qstmt.Close()
481 rows, err := qstmt.QueryContext(ctx, user, models.DbCID{CID: bcid})
482 if err != nil {
483 return nil, fmt.Errorf("getb err, %w", err)
484 }
485 if rows.Next() {
486 var blockb []byte
487 err = rows.Scan(&blockb)
488 if err != nil {
489 return nil, fmt.Errorf("getb bad scan, %w", err)
490 }
491 blk, err := blocks.NewBlockWithCid(blockb, bcid)
492 if err != nil {
493 return nil, fmt.Errorf("getb bad block, %w", err)
494 }
495 return blk, nil
496 }
497 return nil, ErrNothingThere
498}
499
500func (sqs *SQLiteStore) getBlockSize(ctx context.Context, user models.Uid, bcid cid.Cid) (int64, error) {
501 tx, err := sqs.db.BeginTx(ctx, &txReadOnly)
502 if err != nil {
503 return 0, fmt.Errorf("getbs tx, %w", err)
504 }
505 defer tx.Rollback()
506 qstmt, err := tx.PrepareContext(ctx, "SELECT length(block) FROM blocks WHERE uid = ? AND cid = ? LIMIT 1")
507 if err != nil {
508 return 0, fmt.Errorf("getbs sql, %w", err)
509 }
510 defer qstmt.Close()
511 rows, err := qstmt.QueryContext(ctx, user, models.DbCID{CID: bcid})
512 if err != nil {
513 return 0, fmt.Errorf("getbs err, %w", err)
514 }
515 if rows.Next() {
516 var out int64
517 err = rows.Scan(&out)
518 if err != nil {
519 return 0, fmt.Errorf("getbs bad scan, %w", err)
520 }
521 return out, nil
522 }
523 return 0, nil
524}
525
526type sqliteUserViewInner interface {
527 HasUIDCid(ctx context.Context, user models.Uid, bcid cid.Cid) (bool, error)
528 getBlock(ctx context.Context, user models.Uid, bcid cid.Cid) (blockformat.Block, error)
529 getBlockSize(ctx context.Context, user models.Uid, bcid cid.Cid) (int64, error)
530}
531
532type sqliteUserView struct {
533 sqs sqliteUserViewInner
534 uid models.Uid
535}
536
537func (s sqliteUserView) Has(ctx context.Context, c cid.Cid) (bool, error) {
538 return s.sqs.HasUIDCid(ctx, s.uid, c)
539}
540
541func (s sqliteUserView) Get(ctx context.Context, c cid.Cid) (blockformat.Block, error) {
542 return s.sqs.getBlock(ctx, s.uid, c)
543}
544
545func (s sqliteUserView) GetSize(ctx context.Context, c cid.Cid) (int, error) {
546 bigsize, err := s.sqs.getBlockSize(ctx, s.uid, c)
547 return int(bigsize), err
548}
549
550// ensure we implement the interface
551var _ minBlockstore = (*sqliteUserView)(nil)