A container registry that uses the AT Protocol for manifest storage and S3 for blob storage. atcr.io
docker container atproto go
at main 551 lines 15 kB view raw
1// Package db contains a vendored from github.com/bluesky-social/indigo/carstore/sqlite_store.go 2// Source: github.com/bluesky-social/indigo@v0.0.0-20260203235305-a86f3ae1f8ec/carstore/ 3// Reason: indigo's carstore hardcodes mattn/go-sqlite3, which conflicts with go-libsql 4// (both bundle SQLite C libraries and cannot coexist in the same binary). 5// 6// This package replaces the mattn driver with go-libsql and removes Prometheus metrics. 7// Once upstream accepts a driver-agnostic constructor, this vendored copy can be removed. 8// Modifications: 9// - Replaced mattn/go-sqlite3 driver with go-libsql 10// - Removed all Prometheus metric counters and .Inc() calls 11// - Changed package from 'carstore' to 'db' 12// - Added NewSQLiteStoreWithDB constructor for injecting an existing *sql.DB 13// - Changed sql.Open("sqlite3", path) to sql.Open("libsql", ...) with proper DSN 14package db 15 16import ( 17 "bytes" 18 "context" 19 "database/sql" 20 "errors" 21 "fmt" 22 "io" 23 "log/slog" 24 "os" 25 "path/filepath" 26 "strings" 27 28 "go.opentelemetry.io/otel/attribute" 29 30 "github.com/bluesky-social/indigo/models" 31 blockformat "github.com/ipfs/go-block-format" 32 "github.com/ipfs/go-cid" 33 "github.com/ipfs/go-libipfs/blocks" 34 "github.com/ipld/go-car" 35 _ "github.com/tursodatabase/go-libsql" 36 "go.opentelemetry.io/otel" 37) 38 39// CarShard represents metadata about a stored shard. 40// Stripped of gorm tags since we don't use gorm in the SQLite store. 41type CarShard struct { 42 Root models.DbCID 43 DataStart int64 44 Seq int 45 Path string 46 Usr models.Uid 47 Rev string 48} 49 50type SQLiteStore struct { 51 dbPath string 52 db *sql.DB 53 ownsDB bool // true when this store opened the connection itself 54 55 log *slog.Logger 56 57 lastShardCache lastShardCache 58} 59 60func ensureDir(path string) error { 61 fi, err := os.Stat(path) 62 if err != nil { 63 if os.IsNotExist(err) { 64 return os.MkdirAll(path, 0755) 65 } 66 return err 67 } 68 if fi.IsDir() { 69 return nil 70 } 71 return fmt.Errorf("%s exists but is not a directory", path) 72} 73 74func NewSqliteStore(csdir string) (*SQLiteStore, error) { 75 if err := ensureDir(csdir); err != nil { 76 return nil, err 77 } 78 dbpath := filepath.Join(csdir, "db.sqlite3") 79 out := new(SQLiteStore) 80 err := out.Open(dbpath) 81 if err != nil { 82 return nil, err 83 } 84 return out, nil 85} 86 87// NewSQLiteStoreWithDB creates a SQLiteStore using an existing *sql.DB connection. 88// This allows callers to configure the driver independently (e.g., using go-libsql 89// embedded replicas). The caller is responsible for the DB lifecycle. 90func NewSQLiteStoreWithDB(dbPath string, db *sql.DB) (*SQLiteStore, error) { 91 sqs := &SQLiteStore{ 92 dbPath: dbPath, 93 db: db, 94 log: slog.Default(), 95 } 96 if err := sqs.createTables(); err != nil { 97 return nil, fmt.Errorf("%s: sqlite could not create tables, %w", dbPath, err) 98 } 99 sqs.lastShardCache.source = sqs 100 sqs.lastShardCache.Init() 101 return sqs, nil 102} 103 104func (sqs *SQLiteStore) Open(path string) error { 105 if sqs.log == nil { 106 sqs.log = slog.Default() 107 } 108 sqs.log.Debug("open db", "path", path) 109 110 // Build DSN for go-libsql 111 dsn := path 112 if path == ":memory:" { 113 dsn = ":memory:" 114 } else if !strings.HasPrefix(path, "file:") { 115 dsn = "file:" + path 116 } 117 118 db, err := sql.Open("libsql", dsn) 119 if err != nil { 120 return fmt.Errorf("%s: sqlite could not open, %w", path, err) 121 } 122 sqs.db = db 123 sqs.dbPath = path 124 sqs.ownsDB = true 125 err = sqs.createTables() 126 if err != nil { 127 return fmt.Errorf("%s: sqlite could not create tables, %w", path, err) 128 } 129 sqs.lastShardCache.source = sqs 130 sqs.lastShardCache.Init() 131 return nil 132} 133 134func (sqs *SQLiteStore) createTables() error { 135 tx, err := sqs.db.Begin() 136 if err != nil { 137 return err 138 } 139 defer tx.Rollback() 140 _, err = tx.Exec("CREATE TABLE IF NOT EXISTS blocks (uid int, cid blob, rev varchar, root blob, block blob, PRIMARY KEY(uid,cid));") 141 if err != nil { 142 return fmt.Errorf("%s: create table blocks..., %w", sqs.dbPath, err) 143 } 144 _, err = tx.Exec("CREATE INDEX IF NOT EXISTS blocx_by_rev ON blocks (uid, rev DESC)") 145 if err != nil { 146 return fmt.Errorf("%s: create blocks by rev index, %w", sqs.dbPath, err) 147 } 148 return tx.Commit() 149} 150 151// writeNewShard needed for DeltaSession.CloseWithRoot 152func (sqs *SQLiteStore) writeNewShard(ctx context.Context, root cid.Cid, rev string, user models.Uid, seq int, blks map[cid.Cid]blockformat.Block, rmcids map[cid.Cid]bool) ([]byte, error) { 153 sqs.log.Debug("write shard", "uid", user, "root", root, "rev", rev, "nblocks", len(blks)) 154 ctx, span := otel.Tracer("carstore").Start(ctx, "writeNewShard") 155 defer span.End() 156 157 buf := new(bytes.Buffer) 158 hnw, err := WriteCarHeader(buf, root) 159 if err != nil { 160 return nil, fmt.Errorf("failed to write car header: %w", err) 161 } 162 offset := hnw 163 164 tx, err := sqs.db.BeginTx(ctx, nil) 165 if err != nil { 166 return nil, fmt.Errorf("bad block insert tx, %w", err) 167 } 168 defer tx.Rollback() 169 insertStatement, err := tx.PrepareContext(ctx, "INSERT INTO blocks (uid, cid, rev, root, block) VALUES (?, ?, ?, ?, ?) ON CONFLICT (uid,cid) DO UPDATE SET rev=excluded.rev, root=excluded.root, block=excluded.block") 170 if err != nil { 171 return nil, fmt.Errorf("bad block insert sql, %w", err) 172 } 173 defer insertStatement.Close() 174 175 dbroot := models.DbCID{CID: root} 176 177 span.SetAttributes(attribute.Int("blocks", len(blks))) 178 179 for bcid, block := range blks { 180 nw, err := LdWrite(buf, bcid.Bytes(), block.RawData()) 181 if err != nil { 182 return nil, fmt.Errorf("failed to write block: %w", err) 183 } 184 offset += nw 185 186 dbcid := models.DbCID{CID: bcid} 187 blockbytes := block.RawData() 188 _, err = insertStatement.ExecContext(ctx, user, dbcid, rev, dbroot, blockbytes) 189 if err != nil { 190 return nil, fmt.Errorf("(uid,cid) block store failed, %w", err) 191 } 192 sqs.log.Debug("put block", "uid", user, "cid", bcid, "size", len(blockbytes)) 193 } 194 err = tx.Commit() 195 if err != nil { 196 return nil, fmt.Errorf("bad block insert commit, %w", err) 197 } 198 199 shard := CarShard{ 200 Root: models.DbCID{CID: root}, 201 DataStart: hnw, 202 Seq: seq, 203 Usr: user, 204 Rev: rev, 205 } 206 207 sqs.lastShardCache.put(&shard) 208 209 return buf.Bytes(), nil 210} 211 212var ErrNothingThere = errors.New("nothing to read)") 213 214// GetLastShard needed for NewDeltaSession indirectly through lastShardCache 215func (sqs *SQLiteStore) GetLastShard(ctx context.Context, uid models.Uid) (*CarShard, error) { 216 tx, err := sqs.db.BeginTx(ctx, &txReadOnly) 217 if err != nil { 218 return nil, fmt.Errorf("bad last shard tx, %w", err) 219 } 220 defer tx.Rollback() 221 qstmt, err := tx.PrepareContext(ctx, "SELECT rev, root FROM blocks WHERE uid = ? ORDER BY rev DESC LIMIT 1") 222 if err != nil { 223 return nil, fmt.Errorf("bad last shard sql, %w", err) 224 } 225 rows, err := qstmt.QueryContext(ctx, uid) 226 if err != nil { 227 return nil, fmt.Errorf("last shard err, %w", err) 228 } 229 if rows.Next() { 230 var rev string 231 var rootb models.DbCID 232 err = rows.Scan(&rev, &rootb) 233 if err != nil { 234 return nil, fmt.Errorf("last shard bad scan, %w", err) 235 } 236 return &CarShard{ 237 Root: rootb, 238 Rev: rev, 239 }, nil 240 } 241 return nil, nil 242} 243 244func (sqs *SQLiteStore) CompactUserShards(ctx context.Context, user models.Uid, skipBigShards bool) (*CompactionStats, error) { 245 sqs.log.Warn("TODO: don't call compaction") 246 return nil, nil 247} 248 249func (sqs *SQLiteStore) GetCompactionTargets(ctx context.Context, shardCount int) ([]CompactionTarget, error) { 250 sqs.log.Warn("TODO: don't call compaction targets") 251 return nil, nil 252} 253 254func (sqs *SQLiteStore) GetUserRepoHead(ctx context.Context, user models.Uid) (cid.Cid, error) { 255 lastShard, err := sqs.lastShardCache.get(ctx, user) 256 if err != nil { 257 return cid.Undef, err 258 } 259 if lastShard == nil { 260 return cid.Undef, nil 261 } 262 263 return lastShard.Root.CID, nil 264} 265 266func (sqs *SQLiteStore) GetUserRepoRev(ctx context.Context, user models.Uid) (string, error) { 267 lastShard, err := sqs.lastShardCache.get(ctx, user) 268 if err != nil { 269 return "", err 270 } 271 if lastShard == nil { 272 return "", nil 273 } 274 275 return lastShard.Rev, nil 276} 277 278func (sqs *SQLiteStore) ImportSlice(ctx context.Context, uid models.Uid, since *string, carslice []byte) (cid.Cid, *DeltaSession, error) { 279 ctx, span := otel.Tracer("carstore").Start(ctx, "ImportSlice") 280 defer span.End() 281 282 carr, err := car.NewCarReader(bytes.NewReader(carslice)) 283 if err != nil { 284 return cid.Undef, nil, err 285 } 286 287 if len(carr.Header.Roots) != 1 { 288 return cid.Undef, nil, fmt.Errorf("invalid car file, header must have a single root (has %d)", len(carr.Header.Roots)) 289 } 290 291 ds, err := sqs.NewDeltaSession(ctx, uid, since) 292 if err != nil { 293 return cid.Undef, nil, fmt.Errorf("new delta session failed: %w", err) 294 } 295 296 for { 297 blk, err := carr.Next() 298 if err != nil { 299 if err == io.EOF { 300 break 301 } 302 return cid.Undef, nil, err 303 } 304 305 if err := ds.Put(ctx, blk); err != nil { 306 return cid.Undef, nil, err 307 } 308 } 309 310 return carr.Header.Roots[0], ds, nil 311} 312 313var zeroShard CarShard 314 315func (sqs *SQLiteStore) NewDeltaSession(ctx context.Context, user models.Uid, since *string) (*DeltaSession, error) { 316 ctx, span := otel.Tracer("carstore").Start(ctx, "NewSession") 317 defer span.End() 318 319 lastShard, err := sqs.lastShardCache.get(ctx, user) 320 if err != nil { 321 return nil, fmt.Errorf("NewDeltaSession, lsc, %w", err) 322 } 323 324 if lastShard == nil { 325 lastShard = &zeroShard 326 } 327 328 if since != nil && *since != lastShard.Rev { 329 return nil, fmt.Errorf("revision mismatch: %s != %s: %w", *since, lastShard.Rev, ErrRepoBaseMismatch) 330 } 331 332 return &DeltaSession{ 333 blks: make(map[cid.Cid]blockformat.Block), 334 base: &sqliteUserView{ 335 uid: user, 336 sqs: sqs, 337 }, 338 user: user, 339 baseCid: lastShard.Root.CID, 340 cs: sqs, 341 seq: lastShard.Seq + 1, 342 lastRev: lastShard.Rev, 343 }, nil 344} 345 346func (sqs *SQLiteStore) ReadOnlySession(user models.Uid) (*DeltaSession, error) { 347 return &DeltaSession{ 348 base: &sqliteUserView{ 349 uid: user, 350 sqs: sqs, 351 }, 352 readonly: true, 353 user: user, 354 cs: sqs, 355 }, nil 356} 357 358// ReadUserCar writes a CAR file for the user's blocks since sinceRev. 359func (sqs *SQLiteStore) ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, shardOut io.Writer) error { 360 ctx, span := otel.Tracer("carstore").Start(ctx, "ReadUserCar") 361 defer span.End() 362 363 tx, err := sqs.db.BeginTx(ctx, &txReadOnly) 364 if err != nil { 365 return fmt.Errorf("rcar tx, %w", err) 366 } 367 defer tx.Rollback() 368 qstmt, err := tx.PrepareContext(ctx, "SELECT cid,rev,root,block FROM blocks WHERE uid = ? AND rev > ? ORDER BY rev DESC") 369 if err != nil { 370 return fmt.Errorf("rcar sql, %w", err) 371 } 372 defer qstmt.Close() 373 rows, err := qstmt.QueryContext(ctx, user, sinceRev) 374 if err != nil { 375 return fmt.Errorf("rcar err, %w", err) 376 } 377 nblocks := 0 378 first := true 379 for rows.Next() { 380 var xcid models.DbCID 381 var xrev string 382 var xroot models.DbCID 383 var xblock []byte 384 err = rows.Scan(&xcid, &xrev, &xroot, &xblock) 385 if err != nil { 386 return fmt.Errorf("rcar bad scan, %w", err) 387 } 388 if first { 389 if err := car.WriteHeader(&car.CarHeader{ 390 Roots: []cid.Cid{xroot.CID}, 391 Version: 1, 392 }, shardOut); err != nil { 393 return fmt.Errorf("rcar bad header, %w", err) 394 } 395 first = false 396 } 397 nblocks++ 398 _, err := LdWrite(shardOut, xcid.CID.Bytes(), xblock) 399 if err != nil { 400 return fmt.Errorf("rcar bad write, %w", err) 401 } 402 } 403 sqs.log.Debug("read car", "nblocks", nblocks, "since", sinceRev) 404 return nil 405} 406 407// Stat is only used in a debugging admin handler 408func (sqs *SQLiteStore) Stat(ctx context.Context, usr models.Uid) ([]UserStat, error) { 409 sqs.log.Warn("Stat debugging method not implemented for sqlite store") 410 return nil, nil 411} 412 413func (sqs *SQLiteStore) WipeUserData(ctx context.Context, user models.Uid) error { 414 ctx, span := otel.Tracer("carstore").Start(ctx, "WipeUserData") 415 defer span.End() 416 tx, err := sqs.db.BeginTx(ctx, nil) 417 if err != nil { 418 return fmt.Errorf("wipe tx, %w", err) 419 } 420 defer tx.Rollback() 421 _, err = tx.ExecContext(ctx, "DELETE FROM blocks WHERE uid = ?", user) 422 if err == nil { 423 err = tx.Commit() 424 } 425 return err 426} 427 428// go-libsql does not support ReadOnly transactions, so we use default options. 429var txReadOnly = sql.TxOptions{} 430 431// HasUIDCid needed for NewDeltaSession userView 432func (sqs *SQLiteStore) HasUIDCid(ctx context.Context, user models.Uid, bcid cid.Cid) (bool, error) { 433 tx, err := sqs.db.BeginTx(ctx, &txReadOnly) 434 if err != nil { 435 return false, fmt.Errorf("hasUC tx, %w", err) 436 } 437 defer tx.Rollback() 438 qstmt, err := tx.PrepareContext(ctx, "SELECT rev, root FROM blocks WHERE uid = ? AND cid = ? LIMIT 1") 439 if err != nil { 440 return false, fmt.Errorf("hasUC sql, %w", err) 441 } 442 defer qstmt.Close() 443 rows, err := qstmt.QueryContext(ctx, user, models.DbCID{CID: bcid}) 444 if err != nil { 445 return false, fmt.Errorf("hasUC err, %w", err) 446 } 447 if rows.Next() { 448 var rev string 449 var rootb models.DbCID 450 err = rows.Scan(&rev, &rootb) 451 if err != nil { 452 return false, fmt.Errorf("hasUC bad scan, %w", err) 453 } 454 return true, nil 455 } 456 return false, nil 457} 458 459func (sqs *SQLiteStore) CarStore() CarStore { 460 return sqs 461} 462 463func (sqs *SQLiteStore) Close() error { 464 if sqs.ownsDB { 465 return sqs.db.Close() 466 } 467 return nil 468} 469 470func (sqs *SQLiteStore) getBlock(ctx context.Context, user models.Uid, bcid cid.Cid) (blockformat.Block, error) { 471 tx, err := sqs.db.BeginTx(ctx, &txReadOnly) 472 if err != nil { 473 return nil, fmt.Errorf("getb tx, %w", err) 474 } 475 defer tx.Rollback() 476 qstmt, err := tx.PrepareContext(ctx, "SELECT block FROM blocks WHERE uid = ? AND cid = ? LIMIT 1") 477 if err != nil { 478 return nil, fmt.Errorf("getb sql, %w", err) 479 } 480 defer qstmt.Close() 481 rows, err := qstmt.QueryContext(ctx, user, models.DbCID{CID: bcid}) 482 if err != nil { 483 return nil, fmt.Errorf("getb err, %w", err) 484 } 485 if rows.Next() { 486 var blockb []byte 487 err = rows.Scan(&blockb) 488 if err != nil { 489 return nil, fmt.Errorf("getb bad scan, %w", err) 490 } 491 blk, err := blocks.NewBlockWithCid(blockb, bcid) 492 if err != nil { 493 return nil, fmt.Errorf("getb bad block, %w", err) 494 } 495 return blk, nil 496 } 497 return nil, ErrNothingThere 498} 499 500func (sqs *SQLiteStore) getBlockSize(ctx context.Context, user models.Uid, bcid cid.Cid) (int64, error) { 501 tx, err := sqs.db.BeginTx(ctx, &txReadOnly) 502 if err != nil { 503 return 0, fmt.Errorf("getbs tx, %w", err) 504 } 505 defer tx.Rollback() 506 qstmt, err := tx.PrepareContext(ctx, "SELECT length(block) FROM blocks WHERE uid = ? AND cid = ? LIMIT 1") 507 if err != nil { 508 return 0, fmt.Errorf("getbs sql, %w", err) 509 } 510 defer qstmt.Close() 511 rows, err := qstmt.QueryContext(ctx, user, models.DbCID{CID: bcid}) 512 if err != nil { 513 return 0, fmt.Errorf("getbs err, %w", err) 514 } 515 if rows.Next() { 516 var out int64 517 err = rows.Scan(&out) 518 if err != nil { 519 return 0, fmt.Errorf("getbs bad scan, %w", err) 520 } 521 return out, nil 522 } 523 return 0, nil 524} 525 526type sqliteUserViewInner interface { 527 HasUIDCid(ctx context.Context, user models.Uid, bcid cid.Cid) (bool, error) 528 getBlock(ctx context.Context, user models.Uid, bcid cid.Cid) (blockformat.Block, error) 529 getBlockSize(ctx context.Context, user models.Uid, bcid cid.Cid) (int64, error) 530} 531 532type sqliteUserView struct { 533 sqs sqliteUserViewInner 534 uid models.Uid 535} 536 537func (s sqliteUserView) Has(ctx context.Context, c cid.Cid) (bool, error) { 538 return s.sqs.HasUIDCid(ctx, s.uid, c) 539} 540 541func (s sqliteUserView) Get(ctx context.Context, c cid.Cid) (blockformat.Block, error) { 542 return s.sqs.getBlock(ctx, s.uid, c) 543} 544 545func (s sqliteUserView) GetSize(ctx context.Context, c cid.Cid) (int, error) { 546 bigsize, err := s.sqs.getBlockSize(ctx, s.uid, c) 547 return int(bigsize), err 548} 549 550// ensure we implement the interface 551var _ minBlockstore = (*sqliteUserView)(nil)