A very experimental PLC implementation which uses BFT consensus for decentralization

Use a zstd encoder pool to speed up writes made by the iavl tree

It turns out that resetting the zstd encoder takes a significant amount of time (about as much as compressing the average compressible KV value). When using the nicer EncodeAll function, this reset must happen synchronously. By not using this helper function and switching to a pool of encoders, the resets can happen asynchronously without blocking the writes.

gbl08ma.com 44edad51 f5130e0a

verified
+55 -15
+55 -15
dbmtoiavldb/adapter.go
··· 1 1 package dbmtoiavldb 2 2 3 3 import ( 4 + "bytes" 4 5 "slices" 6 + "sync" 5 7 6 8 "cosmossdk.io/core/store" 7 9 dbm "github.com/cometbft/cometbft-db" ··· 14 16 underlying dbm.DB 15 17 16 18 // these two may be nil when not compressing: 19 + zstdEncoderPool sync.Pool 20 + zstdDecoder *zstd.Decoder 21 + } 22 + 23 + type myEncoder struct { 17 24 zstdEncoder *zstd.Encoder 18 - zstdDecoder *zstd.Decoder 25 + buf *bytes.Buffer 19 26 } 20 27 21 28 func Adapt(underlying dbm.DB) *AdaptedDB { ··· 25 32 } 26 33 27 34 func AdaptWithCompression(underlying dbm.DB, level zstd.EncoderLevel, dict []byte) *AdaptedDB { 28 - zstdEncoder, _ := zstd.NewWriter(nil, zstd.WithEncoderDict(dict), zstd.WithEncoderLevel(level)) 29 35 zstdDecoder, _ := zstd.NewReader(nil, zstd.WithDecoderDicts(dict)) 30 36 31 37 return &AdaptedDB{ 32 - underlying: underlying, 33 - zstdEncoder: zstdEncoder, 38 + underlying: underlying, 39 + zstdEncoderPool: sync.Pool{ 40 + New: func() any { 41 + buf := bytes.NewBuffer(make([]byte, 0, 4096)) 42 + enc, _ := zstd.NewWriter(buf, zstd.WithEncoderDict(dict), zstd.WithEncoderLevel(level)) 43 + return &myEncoder{ 44 + zstdEncoder: enc, 45 + buf: buf, 46 + } 47 + }, 48 + }, 34 49 zstdDecoder: zstdDecoder, 35 50 } 36 51 } ··· 140 155 // NewBatch implements [db.DB]. 141 156 func (b *AdaptedDB) NewBatch() store.Batch { 142 157 return &AdaptedBatch{ 143 - underlying: b.underlying.NewBatch(), 144 - zstdEncoder: b.zstdEncoder, 158 + underlying: b.underlying.NewBatch(), 159 + zstdEncoderPool: &b.zstdEncoderPool, 145 160 } 146 161 } 147 162 ··· 151 166 } 152 167 153 168 type AdaptedBatch struct { 154 - underlying dbm.Batch 155 - zstdEncoder *zstd.Encoder 169 + underlying dbm.Batch 170 + zstdEncoderPool *sync.Pool 156 171 } 157 172 158 173 // Close implements [store.Batch]. ··· 167 182 168 183 // Set implements [store.Batch]. 169 184 func (a *AdaptedBatch) Set(key []byte, value []byte) error { 170 - v := compressValue(a.zstdEncoder, value) 185 + v, err := compressValue(a.zstdEncoderPool, value) 186 + if err != nil { 187 + return stacktrace.Propagate(err) 188 + } 171 189 return stacktrace.Propagate(a.underlying.Set(key, v)) 172 190 } 173 191 ··· 186 204 return 0, nil 187 205 } 188 206 189 - func compressValue(encoder *zstd.Encoder, value []byte) []byte { 190 - if encoder == nil { 191 - return value 207 + func compressValue(encoderPool *sync.Pool, value []byte) ([]byte, error) { 208 + if encoderPool == nil { 209 + return value, nil 192 210 } 193 211 if len(value) < 192 { 194 212 // this is probably a inner node of the iavl tree and we don't gain anything from compressing those 50-ish byte values 195 - return prepend(value, 0x00) 213 + return prepend(value, 0x00), nil 214 + } 215 + 216 + encoder := encoderPool.Get().(*myEncoder) 217 + _, err := encoder.zstdEncoder.Write(value) 218 + if err != nil { 219 + return nil, stacktrace.Propagate(err) 220 + } 221 + err = encoder.zstdEncoder.Close() 222 + if err != nil { 223 + return nil, stacktrace.Propagate(err) 196 224 } 197 - buf := make([]byte, 0, len(value)+5) // a bit of an extra buffer because, rarely, the value increases in size and this way we save on one reallocation 198 - return prepend(encoder.EncodeAll(value, buf), 0x01) 225 + 226 + out := make([]byte, encoder.buf.Len()+1) 227 + out[0] = 0x01 228 + // buf Read can't fail, can ignore err 229 + _, _ = encoder.buf.Read(out[1:]) 230 + 231 + go func() { 232 + // zstd reset takes time, reduce Set latency by moving it to a goroutine 233 + // (this is why we don't use EncodeAll) 234 + encoder.buf.Reset() 235 + encoder.zstdEncoder.Reset(encoder.buf) 236 + encoderPool.Put(encoder) 237 + }() 238 + return out, nil 199 239 } 200 240 201 241 func decompressValue(decoder *zstd.Decoder, value []byte) ([]byte, error) {