A very experimental PLC implementation which uses BFT consensus for decentralization
1package dbmtoiavldb
2
3import (
4 "bytes"
5 "slices"
6 "sync"
7
8 "cosmossdk.io/core/store"
9 dbm "github.com/cometbft/cometbft-db"
10 iavldbm "github.com/cosmos/iavl/db"
11 "github.com/gbl08ma/stacktrace"
12 "github.com/klauspost/compress/zstd"
13)
14
15type AdaptedDB struct {
16 underlying dbm.DB
17
18 // these two may be nil when not compressing:
19 zstdEncoderPool sync.Pool
20 zstdDecoder *zstd.Decoder
21}
22
23type myEncoder struct {
24 zstdEncoder *zstd.Encoder
25 buf *bytes.Buffer
26}
27
28func Adapt(underlying dbm.DB) *AdaptedDB {
29 return &AdaptedDB{
30 underlying: underlying,
31 }
32}
33
34func AdaptWithCompression(underlying dbm.DB, level zstd.EncoderLevel, dict []byte) *AdaptedDB {
35 zstdDecoder, _ := zstd.NewReader(nil, zstd.WithDecoderDicts(dict))
36
37 return &AdaptedDB{
38 underlying: underlying,
39 zstdEncoderPool: sync.Pool{
40 New: func() any {
41 buf := bytes.NewBuffer(make([]byte, 0, 4096))
42 enc, _ := zstd.NewWriter(buf, zstd.WithEncoderDict(dict), zstd.WithEncoderLevel(level))
43 return &myEncoder{
44 zstdEncoder: enc,
45 buf: buf,
46 }
47 },
48 },
49 zstdDecoder: zstdDecoder,
50 }
51}
52
53var _ iavldbm.DB = (*AdaptedDB)(nil)
54
55// Close implements [iavldbm.DB].
56func (b *AdaptedDB) Close() error {
57 return b.underlying.Close()
58}
59
60// Get implements [iavldbm.DB].
61func (b *AdaptedDB) Get(key []byte) ([]byte, error) {
62 v, err := b.underlying.Get(key)
63 if err != nil {
64 return nil, stacktrace.Propagate(err)
65 }
66 v, err = decompressValue(b.zstdDecoder, v)
67 return v, stacktrace.Propagate(err)
68}
69
70// Has implements [iavldbm.DB].
71func (b *AdaptedDB) Has(key []byte) (bool, error) {
72 return b.underlying.Has(key)
73}
74
75// Delete implements [db.DB].
76func (b *AdaptedDB) Delete(key []byte) error {
77 batch := b.NewBatch()
78 defer batch.Close()
79 err := batch.Delete(key)
80 if err != nil {
81 return stacktrace.Propagate(err)
82 }
83 return stacktrace.Propagate(batch.Write())
84}
85
86// Set implements [db.DB].
87func (b *AdaptedDB) Set(key []byte, value []byte) error {
88 batch := b.NewBatch()
89 defer batch.Close()
90 err := batch.Set(key, value)
91 if err != nil {
92 return stacktrace.Propagate(err)
93 }
94 return stacktrace.Propagate(batch.Write())
95}
96
97// AdaptedIterator adapts badger.Iterator to store.Iterator
98type AdaptedIterator struct {
99 zstdDecoder *zstd.Decoder
100 underlying dbm.Iterator
101 calledNextOnce bool
102}
103
104func (i *AdaptedIterator) Domain() (start, end []byte) {
105 return i.underlying.Domain()
106}
107
108func (i *AdaptedIterator) Valid() bool {
109 return i.underlying.Valid()
110}
111
112func (i *AdaptedIterator) Next() {
113 i.underlying.Next()
114}
115
116func (i *AdaptedIterator) Key() []byte {
117 // dbm.Iterator says the result of underlying.Key() is not safe for modification, but
118 // corestore.Iterator (used by iavldbm) says "the key returned should be a copy and thus safe for modification"
119 return slices.Clone(i.underlying.Key())
120}
121
122func (i *AdaptedIterator) Value() []byte {
123 // dbm.Iterator says the result of underlying.Value() is not safe for modification, but
124 // corestore.Iterator (used by iavldbm) says "the value returned should be a copy and thus safe for modification"
125 v, _ := decompressValue(i.zstdDecoder, i.underlying.Value())
126 return v
127}
128
129func (i *AdaptedIterator) Error() error {
130 return i.underlying.Error()
131}
132
133func (i *AdaptedIterator) Close() error {
134 return i.underlying.Close()
135}
136
137// Iterator implements [iavldbm.DB].
138func (b *AdaptedDB) Iterator(start []byte, end []byte) (store.Iterator, error) {
139 i, err := b.underlying.Iterator(start, end)
140 if err != nil {
141 return nil, err
142 }
143 return &AdaptedIterator{underlying: i, zstdDecoder: b.zstdDecoder}, nil
144}
145
146// ReverseIterator implements [iavldbm.DB].
147func (b *AdaptedDB) ReverseIterator(start []byte, end []byte) (store.Iterator, error) {
148 i, err := b.underlying.ReverseIterator(start, end)
149 if err != nil {
150 return nil, err
151 }
152 return &AdaptedIterator{underlying: i, zstdDecoder: b.zstdDecoder}, nil
153}
154
155// NewBatch implements [db.DB].
156func (b *AdaptedDB) NewBatch() store.Batch {
157 return &AdaptedBatch{
158 underlying: b.underlying.NewBatch(),
159 zstdEncoderPool: &b.zstdEncoderPool,
160 }
161}
162
163// NewBatchWithSize implements [db.DB].
164func (b *AdaptedDB) NewBatchWithSize(int) store.Batch {
165 return b.NewBatch()
166}
167
168type AdaptedBatch struct {
169 underlying dbm.Batch
170 zstdEncoderPool *sync.Pool
171}
172
173// Close implements [store.Batch].
174func (a *AdaptedBatch) Close() error {
175 return a.underlying.Close()
176}
177
178// Delete implements [store.Batch].
179func (a *AdaptedBatch) Delete(key []byte) error {
180 return a.underlying.Delete(key)
181}
182
183// Set implements [store.Batch].
184func (a *AdaptedBatch) Set(key []byte, value []byte) error {
185 v, err := compressValue(a.zstdEncoderPool, value)
186 if err != nil {
187 return stacktrace.Propagate(err)
188 }
189 return stacktrace.Propagate(a.underlying.Set(key, v))
190}
191
192// Write implements [store.Batch].
193func (a *AdaptedBatch) Write() error {
194 return a.underlying.Write()
195}
196
197// WriteSync implements [store.Batch].
198func (a *AdaptedBatch) WriteSync() error {
199 return a.underlying.WriteSync()
200}
201
202// GetByteSize implements [store.Batch].
203func (a *AdaptedBatch) GetByteSize() (int, error) {
204 return 0, nil
205}
206
207func compressValue(encoderPool *sync.Pool, value []byte) ([]byte, error) {
208 if encoderPool == nil {
209 return value, nil
210 }
211 if len(value) < 192 {
212 // this is probably a inner node of the iavl tree and we don't gain anything from compressing those 50-ish byte values
213 return prepend(value, 0x00), nil
214 }
215
216 encoder := encoderPool.Get().(*myEncoder)
217 _, err := encoder.zstdEncoder.Write(value)
218 if err != nil {
219 return nil, stacktrace.Propagate(err)
220 }
221 err = encoder.zstdEncoder.Close()
222 if err != nil {
223 return nil, stacktrace.Propagate(err)
224 }
225
226 out := make([]byte, encoder.buf.Len()+1)
227 out[0] = 0x01
228 // buf Read can't fail, can ignore err
229 _, _ = encoder.buf.Read(out[1:])
230
231 go func() {
232 // zstd reset takes time, reduce Set latency by moving it to a goroutine
233 // (this is why we don't use EncodeAll)
234 encoder.buf.Reset()
235 encoder.zstdEncoder.Reset(encoder.buf)
236 encoderPool.Put(encoder)
237 }()
238 return out, nil
239}
240
241func decompressValue(decoder *zstd.Decoder, value []byte) ([]byte, error) {
242 // always create a copy of the value, see comment on Value() about iavldb expectations
243 if decoder == nil || len(value) == 0 {
244 return slices.Clone(value), nil
245 } else if value[0] == 0x00 {
246 return slices.Clone(value[1:]), nil
247 }
248 // passing a nil output buffer to DecodeAll means it'll optimistically start by allocating len(value)*2
249 // but we observe compression ratios better than 50% frequently, so we allocate a slice ourselves with cap len(value)*3
250 value, err := decoder.DecodeAll(value[1:], make([]byte, 0, len(value)*3))
251 return value, stacktrace.Propagate(err)
252}
253
254// this is a simplified version of slices.Insert for prepending a single element to a slice, returning the modified slice
255func prepend[S ~[]E, E any](s S, v E) S {
256 n := len(s)
257
258 if n >= cap(s) {
259 s2 := make(S, n+1)
260 s2[0] = v
261 copy(s2[1:], s)
262 return s2
263 }
264 s = s[:n+1]
265 copy(s[1:], s)
266 s[0] = v
267
268 return s
269}