A very experimental PLC implementation which uses BFT consensus for decentralization
at main 269 lines 7.0 kB view raw
1package dbmtoiavldb 2 3import ( 4 "bytes" 5 "slices" 6 "sync" 7 8 "cosmossdk.io/core/store" 9 dbm "github.com/cometbft/cometbft-db" 10 iavldbm "github.com/cosmos/iavl/db" 11 "github.com/gbl08ma/stacktrace" 12 "github.com/klauspost/compress/zstd" 13) 14 15type AdaptedDB struct { 16 underlying dbm.DB 17 18 // these two may be nil when not compressing: 19 zstdEncoderPool sync.Pool 20 zstdDecoder *zstd.Decoder 21} 22 23type myEncoder struct { 24 zstdEncoder *zstd.Encoder 25 buf *bytes.Buffer 26} 27 28func Adapt(underlying dbm.DB) *AdaptedDB { 29 return &AdaptedDB{ 30 underlying: underlying, 31 } 32} 33 34func AdaptWithCompression(underlying dbm.DB, level zstd.EncoderLevel, dict []byte) *AdaptedDB { 35 zstdDecoder, _ := zstd.NewReader(nil, zstd.WithDecoderDicts(dict)) 36 37 return &AdaptedDB{ 38 underlying: underlying, 39 zstdEncoderPool: sync.Pool{ 40 New: func() any { 41 buf := bytes.NewBuffer(make([]byte, 0, 4096)) 42 enc, _ := zstd.NewWriter(buf, zstd.WithEncoderDict(dict), zstd.WithEncoderLevel(level)) 43 return &myEncoder{ 44 zstdEncoder: enc, 45 buf: buf, 46 } 47 }, 48 }, 49 zstdDecoder: zstdDecoder, 50 } 51} 52 53var _ iavldbm.DB = (*AdaptedDB)(nil) 54 55// Close implements [iavldbm.DB]. 56func (b *AdaptedDB) Close() error { 57 return b.underlying.Close() 58} 59 60// Get implements [iavldbm.DB]. 61func (b *AdaptedDB) Get(key []byte) ([]byte, error) { 62 v, err := b.underlying.Get(key) 63 if err != nil { 64 return nil, stacktrace.Propagate(err) 65 } 66 v, err = decompressValue(b.zstdDecoder, v) 67 return v, stacktrace.Propagate(err) 68} 69 70// Has implements [iavldbm.DB]. 71func (b *AdaptedDB) Has(key []byte) (bool, error) { 72 return b.underlying.Has(key) 73} 74 75// Delete implements [db.DB]. 76func (b *AdaptedDB) Delete(key []byte) error { 77 batch := b.NewBatch() 78 defer batch.Close() 79 err := batch.Delete(key) 80 if err != nil { 81 return stacktrace.Propagate(err) 82 } 83 return stacktrace.Propagate(batch.Write()) 84} 85 86// Set implements [db.DB]. 87func (b *AdaptedDB) Set(key []byte, value []byte) error { 88 batch := b.NewBatch() 89 defer batch.Close() 90 err := batch.Set(key, value) 91 if err != nil { 92 return stacktrace.Propagate(err) 93 } 94 return stacktrace.Propagate(batch.Write()) 95} 96 97// AdaptedIterator adapts badger.Iterator to store.Iterator 98type AdaptedIterator struct { 99 zstdDecoder *zstd.Decoder 100 underlying dbm.Iterator 101 calledNextOnce bool 102} 103 104func (i *AdaptedIterator) Domain() (start, end []byte) { 105 return i.underlying.Domain() 106} 107 108func (i *AdaptedIterator) Valid() bool { 109 return i.underlying.Valid() 110} 111 112func (i *AdaptedIterator) Next() { 113 i.underlying.Next() 114} 115 116func (i *AdaptedIterator) Key() []byte { 117 // dbm.Iterator says the result of underlying.Key() is not safe for modification, but 118 // corestore.Iterator (used by iavldbm) says "the key returned should be a copy and thus safe for modification" 119 return slices.Clone(i.underlying.Key()) 120} 121 122func (i *AdaptedIterator) Value() []byte { 123 // dbm.Iterator says the result of underlying.Value() is not safe for modification, but 124 // corestore.Iterator (used by iavldbm) says "the value returned should be a copy and thus safe for modification" 125 v, _ := decompressValue(i.zstdDecoder, i.underlying.Value()) 126 return v 127} 128 129func (i *AdaptedIterator) Error() error { 130 return i.underlying.Error() 131} 132 133func (i *AdaptedIterator) Close() error { 134 return i.underlying.Close() 135} 136 137// Iterator implements [iavldbm.DB]. 138func (b *AdaptedDB) Iterator(start []byte, end []byte) (store.Iterator, error) { 139 i, err := b.underlying.Iterator(start, end) 140 if err != nil { 141 return nil, err 142 } 143 return &AdaptedIterator{underlying: i, zstdDecoder: b.zstdDecoder}, nil 144} 145 146// ReverseIterator implements [iavldbm.DB]. 147func (b *AdaptedDB) ReverseIterator(start []byte, end []byte) (store.Iterator, error) { 148 i, err := b.underlying.ReverseIterator(start, end) 149 if err != nil { 150 return nil, err 151 } 152 return &AdaptedIterator{underlying: i, zstdDecoder: b.zstdDecoder}, nil 153} 154 155// NewBatch implements [db.DB]. 156func (b *AdaptedDB) NewBatch() store.Batch { 157 return &AdaptedBatch{ 158 underlying: b.underlying.NewBatch(), 159 zstdEncoderPool: &b.zstdEncoderPool, 160 } 161} 162 163// NewBatchWithSize implements [db.DB]. 164func (b *AdaptedDB) NewBatchWithSize(int) store.Batch { 165 return b.NewBatch() 166} 167 168type AdaptedBatch struct { 169 underlying dbm.Batch 170 zstdEncoderPool *sync.Pool 171} 172 173// Close implements [store.Batch]. 174func (a *AdaptedBatch) Close() error { 175 return a.underlying.Close() 176} 177 178// Delete implements [store.Batch]. 179func (a *AdaptedBatch) Delete(key []byte) error { 180 return a.underlying.Delete(key) 181} 182 183// Set implements [store.Batch]. 184func (a *AdaptedBatch) Set(key []byte, value []byte) error { 185 v, err := compressValue(a.zstdEncoderPool, value) 186 if err != nil { 187 return stacktrace.Propagate(err) 188 } 189 return stacktrace.Propagate(a.underlying.Set(key, v)) 190} 191 192// Write implements [store.Batch]. 193func (a *AdaptedBatch) Write() error { 194 return a.underlying.Write() 195} 196 197// WriteSync implements [store.Batch]. 198func (a *AdaptedBatch) WriteSync() error { 199 return a.underlying.WriteSync() 200} 201 202// GetByteSize implements [store.Batch]. 203func (a *AdaptedBatch) GetByteSize() (int, error) { 204 return 0, nil 205} 206 207func compressValue(encoderPool *sync.Pool, value []byte) ([]byte, error) { 208 if encoderPool == nil { 209 return value, nil 210 } 211 if len(value) < 192 { 212 // this is probably a inner node of the iavl tree and we don't gain anything from compressing those 50-ish byte values 213 return prepend(value, 0x00), nil 214 } 215 216 encoder := encoderPool.Get().(*myEncoder) 217 _, err := encoder.zstdEncoder.Write(value) 218 if err != nil { 219 return nil, stacktrace.Propagate(err) 220 } 221 err = encoder.zstdEncoder.Close() 222 if err != nil { 223 return nil, stacktrace.Propagate(err) 224 } 225 226 out := make([]byte, encoder.buf.Len()+1) 227 out[0] = 0x01 228 // buf Read can't fail, can ignore err 229 _, _ = encoder.buf.Read(out[1:]) 230 231 go func() { 232 // zstd reset takes time, reduce Set latency by moving it to a goroutine 233 // (this is why we don't use EncodeAll) 234 encoder.buf.Reset() 235 encoder.zstdEncoder.Reset(encoder.buf) 236 encoderPool.Put(encoder) 237 }() 238 return out, nil 239} 240 241func decompressValue(decoder *zstd.Decoder, value []byte) ([]byte, error) { 242 // always create a copy of the value, see comment on Value() about iavldb expectations 243 if decoder == nil || len(value) == 0 { 244 return slices.Clone(value), nil 245 } else if value[0] == 0x00 { 246 return slices.Clone(value[1:]), nil 247 } 248 // passing a nil output buffer to DecodeAll means it'll optimistically start by allocating len(value)*2 249 // but we observe compression ratios better than 50% frequently, so we allocate a slice ourselves with cap len(value)*3 250 value, err := decoder.DecodeAll(value[1:], make([]byte, 0, len(value)*3)) 251 return value, stacktrace.Propagate(err) 252} 253 254// this is a simplified version of slices.Insert for prepending a single element to a slice, returning the modified slice 255func prepend[S ~[]E, E any](s S, v E) S { 256 n := len(s) 257 258 if n >= cap(s) { 259 s2 := make(S, n+1) 260 s2[0] = v 261 copy(s2[1:], s) 262 return s2 263 } 264 s = s[:n+1] 265 copy(s[1:], s) 266 s[0] = v 267 268 return s 269}