package dbmtoiavldb import ( "bytes" "slices" "sync" "cosmossdk.io/core/store" dbm "github.com/cometbft/cometbft-db" iavldbm "github.com/cosmos/iavl/db" "github.com/gbl08ma/stacktrace" "github.com/klauspost/compress/zstd" ) type AdaptedDB struct { underlying dbm.DB // these two may be nil when not compressing: zstdEncoderPool sync.Pool zstdDecoder *zstd.Decoder } type myEncoder struct { zstdEncoder *zstd.Encoder buf *bytes.Buffer } func Adapt(underlying dbm.DB) *AdaptedDB { return &AdaptedDB{ underlying: underlying, } } func AdaptWithCompression(underlying dbm.DB, level zstd.EncoderLevel, dict []byte) *AdaptedDB { zstdDecoder, _ := zstd.NewReader(nil, zstd.WithDecoderDicts(dict)) return &AdaptedDB{ underlying: underlying, zstdEncoderPool: sync.Pool{ New: func() any { buf := bytes.NewBuffer(make([]byte, 0, 4096)) enc, _ := zstd.NewWriter(buf, zstd.WithEncoderDict(dict), zstd.WithEncoderLevel(level)) return &myEncoder{ zstdEncoder: enc, buf: buf, } }, }, zstdDecoder: zstdDecoder, } } var _ iavldbm.DB = (*AdaptedDB)(nil) // Close implements [iavldbm.DB]. func (b *AdaptedDB) Close() error { return b.underlying.Close() } // Get implements [iavldbm.DB]. func (b *AdaptedDB) Get(key []byte) ([]byte, error) { v, err := b.underlying.Get(key) if err != nil { return nil, stacktrace.Propagate(err) } v, err = decompressValue(b.zstdDecoder, v) return v, stacktrace.Propagate(err) } // Has implements [iavldbm.DB]. func (b *AdaptedDB) Has(key []byte) (bool, error) { return b.underlying.Has(key) } // Delete implements [db.DB]. func (b *AdaptedDB) Delete(key []byte) error { batch := b.NewBatch() defer batch.Close() err := batch.Delete(key) if err != nil { return stacktrace.Propagate(err) } return stacktrace.Propagate(batch.Write()) } // Set implements [db.DB]. func (b *AdaptedDB) Set(key []byte, value []byte) error { batch := b.NewBatch() defer batch.Close() err := batch.Set(key, value) if err != nil { return stacktrace.Propagate(err) } return stacktrace.Propagate(batch.Write()) } // AdaptedIterator adapts badger.Iterator to store.Iterator type AdaptedIterator struct { zstdDecoder *zstd.Decoder underlying dbm.Iterator calledNextOnce bool } func (i *AdaptedIterator) Domain() (start, end []byte) { return i.underlying.Domain() } func (i *AdaptedIterator) Valid() bool { return i.underlying.Valid() } func (i *AdaptedIterator) Next() { i.underlying.Next() } func (i *AdaptedIterator) Key() []byte { // dbm.Iterator says the result of underlying.Key() is not safe for modification, but // corestore.Iterator (used by iavldbm) says "the key returned should be a copy and thus safe for modification" return slices.Clone(i.underlying.Key()) } func (i *AdaptedIterator) Value() []byte { // dbm.Iterator says the result of underlying.Value() is not safe for modification, but // corestore.Iterator (used by iavldbm) says "the value returned should be a copy and thus safe for modification" v, _ := decompressValue(i.zstdDecoder, i.underlying.Value()) return v } func (i *AdaptedIterator) Error() error { return i.underlying.Error() } func (i *AdaptedIterator) Close() error { return i.underlying.Close() } // Iterator implements [iavldbm.DB]. func (b *AdaptedDB) Iterator(start []byte, end []byte) (store.Iterator, error) { i, err := b.underlying.Iterator(start, end) if err != nil { return nil, err } return &AdaptedIterator{underlying: i, zstdDecoder: b.zstdDecoder}, nil } // ReverseIterator implements [iavldbm.DB]. func (b *AdaptedDB) ReverseIterator(start []byte, end []byte) (store.Iterator, error) { i, err := b.underlying.ReverseIterator(start, end) if err != nil { return nil, err } return &AdaptedIterator{underlying: i, zstdDecoder: b.zstdDecoder}, nil } // NewBatch implements [db.DB]. func (b *AdaptedDB) NewBatch() store.Batch { return &AdaptedBatch{ underlying: b.underlying.NewBatch(), zstdEncoderPool: &b.zstdEncoderPool, } } // NewBatchWithSize implements [db.DB]. func (b *AdaptedDB) NewBatchWithSize(int) store.Batch { return b.NewBatch() } type AdaptedBatch struct { underlying dbm.Batch zstdEncoderPool *sync.Pool } // Close implements [store.Batch]. func (a *AdaptedBatch) Close() error { return a.underlying.Close() } // Delete implements [store.Batch]. func (a *AdaptedBatch) Delete(key []byte) error { return a.underlying.Delete(key) } // Set implements [store.Batch]. func (a *AdaptedBatch) Set(key []byte, value []byte) error { v, err := compressValue(a.zstdEncoderPool, value) if err != nil { return stacktrace.Propagate(err) } return stacktrace.Propagate(a.underlying.Set(key, v)) } // Write implements [store.Batch]. func (a *AdaptedBatch) Write() error { return a.underlying.Write() } // WriteSync implements [store.Batch]. func (a *AdaptedBatch) WriteSync() error { return a.underlying.WriteSync() } // GetByteSize implements [store.Batch]. func (a *AdaptedBatch) GetByteSize() (int, error) { return 0, nil } func compressValue(encoderPool *sync.Pool, value []byte) ([]byte, error) { if encoderPool == nil { return value, nil } if len(value) < 192 { // this is probably a inner node of the iavl tree and we don't gain anything from compressing those 50-ish byte values return prepend(value, 0x00), nil } encoder := encoderPool.Get().(*myEncoder) _, err := encoder.zstdEncoder.Write(value) if err != nil { return nil, stacktrace.Propagate(err) } err = encoder.zstdEncoder.Close() if err != nil { return nil, stacktrace.Propagate(err) } out := make([]byte, encoder.buf.Len()+1) out[0] = 0x01 // buf Read can't fail, can ignore err _, _ = encoder.buf.Read(out[1:]) go func() { // zstd reset takes time, reduce Set latency by moving it to a goroutine // (this is why we don't use EncodeAll) encoder.buf.Reset() encoder.zstdEncoder.Reset(encoder.buf) encoderPool.Put(encoder) }() return out, nil } func decompressValue(decoder *zstd.Decoder, value []byte) ([]byte, error) { // always create a copy of the value, see comment on Value() about iavldb expectations if decoder == nil || len(value) == 0 { return slices.Clone(value), nil } else if value[0] == 0x00 { return slices.Clone(value[1:]), nil } // passing a nil output buffer to DecodeAll means it'll optimistically start by allocating len(value)*2 // but we observe compression ratios better than 50% frequently, so we allocate a slice ourselves with cap len(value)*3 value, err := decoder.DecodeAll(value[1:], make([]byte, 0, len(value)*3)) return value, stacktrace.Propagate(err) } // this is a simplified version of slices.Insert for prepending a single element to a slice, returning the modified slice func prepend[S ~[]E, E any](s S, v E) S { n := len(s) if n >= cap(s) { s2 := make(S, n+1) s2[0] = v copy(s2[1:], s) return s2 } s = s[:n+1] copy(s[1:], s) s[0] = v return s }