A very experimental PLC implementation which uses BFT consensus for decentralization

Compress keys outside of iavl tree for better guarantees of determinism

gbl08ma.com 69c6f7c4 308f1b19

verified
+120 -42
+6 -1
abciapp/app.go
··· 11 11 dbm "github.com/cometbft/cometbft-db" 12 12 abcitypes "github.com/cometbft/cometbft/abci/types" 13 13 "github.com/cosmos/iavl" 14 + "github.com/klauspost/compress/zstd" 14 15 "github.com/palantir/stacktrace" 15 16 "github.com/samber/lo" 16 17 "tangled.org/gbl08ma.com/didplcbft/dbadapter" 18 + "tangled.org/gbl08ma.com/didplcbft/dbadapter/zstddict" 17 19 "tangled.org/gbl08ma.com/didplcbft/plc" 18 20 "tangled.org/gbl08ma.com/didplcbft/store" 19 21 "tangled.org/gbl08ma.com/didplcbft/transaction" ··· 42 44 // store and plc must be able to share transaction objects 43 45 func NewDIDPLCApplication(treeDB dbm.DB, indexDB dbm.DB, clearData func(), snapshotDirectory string) (*DIDPLCApplication, *transaction.Factory, plc.PLC, func(), error) { 44 46 mkTree := func() *iavl.MutableTree { 45 - return iavl.NewMutableTree(dbadapter.Adapt(treeDB), 500000, false, iavl.NewNopLogger(), iavl.AsyncPruningOption(false)) 47 + // Using SpeedDefault appears to cause the processing time for ExecuteOperation to double on average 48 + // Using SpeedBetterCompression appears to cause the processing time to double again 49 + // By using SpeedFastest we seem to give up on like 5% size reduction, it's not worth using the slower speeds 50 + return iavl.NewMutableTree(dbadapter.AdaptWithCompression(treeDB, zstd.SpeedFastest, zstddict.PLCZstdDict), 500000, false, iavl.NewNopLogger(), iavl.AsyncPruningOption(false)) 46 51 } 47 52 48 53 tree := mkTree()
+102 -7
dbadapter/adapter.go
··· 4 4 "cosmossdk.io/core/store" 5 5 dbm "github.com/cometbft/cometbft-db" 6 6 iavldbm "github.com/cosmos/iavl/db" 7 + "github.com/klauspost/compress/zstd" 8 + "github.com/palantir/stacktrace" 7 9 ) 8 10 9 11 type AdaptedDB struct { 10 12 underlying dbm.DB 13 + 14 + // these two may be nil when not compressing: 15 + zstdEncoder *zstd.Encoder 16 + zstdDecoder *zstd.Decoder 11 17 } 12 18 13 19 func Adapt(underlying dbm.DB) *AdaptedDB { ··· 16 22 } 17 23 } 18 24 25 + func AdaptWithCompression(underlying dbm.DB, level zstd.EncoderLevel, dict []byte) *AdaptedDB { 26 + zstdEncoder, _ := zstd.NewWriter(nil, zstd.WithEncoderDict(dict), zstd.WithEncoderLevel(level)) 27 + zstdDecoder, _ := zstd.NewReader(nil, zstd.WithDecoderDicts(dict)) 28 + 29 + return &AdaptedDB{ 30 + underlying: underlying, 31 + zstdEncoder: zstdEncoder, 32 + zstdDecoder: zstdDecoder, 33 + } 34 + } 35 + 19 36 var _ iavldbm.DB = (*AdaptedDB)(nil) 20 37 21 38 // Close implements [iavldbm.DB]. ··· 25 42 26 43 // Get implements [iavldbm.DB]. 27 44 func (b *AdaptedDB) Get(key []byte) ([]byte, error) { 28 - return b.underlying.Get(key) 45 + v, err := b.underlying.Get(key) 46 + if err != nil { 47 + return nil, stacktrace.Propagate(err, "") 48 + } 49 + v, err = decompressValue(b.zstdDecoder, v) 50 + return v, stacktrace.Propagate(err, "") 29 51 } 30 52 31 53 // Has implements [iavldbm.DB]. ··· 35 57 36 58 // AdaptedIterator adapts badger.Iterator to store.Iterator 37 59 type AdaptedIterator struct { 60 + zstdDecoder *zstd.Decoder 38 61 underlying dbm.Iterator 39 62 calledNextOnce bool 40 63 } ··· 60 83 } 61 84 62 85 func (i *AdaptedIterator) Value() []byte { 63 - return i.underlying.Value() 86 + v, _ := decompressValue(i.zstdDecoder, i.underlying.Value()) 87 + return v 64 88 } 65 89 66 90 func (i *AdaptedIterator) Error() error { ··· 77 101 if err != nil { 78 102 return nil, err 79 103 } 80 - return &AdaptedIterator{underlying: i}, nil 104 + return &AdaptedIterator{underlying: i, zstdDecoder: b.zstdDecoder}, nil 81 105 } 82 106 83 107 // ReverseIterator implements [iavldbm.DB]. ··· 86 110 if err != nil { 87 111 return nil, err 88 112 } 89 - return &AdaptedIterator{underlying: i}, nil 113 + return &AdaptedIterator{underlying: i, zstdDecoder: b.zstdDecoder}, nil 90 114 } 91 115 92 116 // NewBatch implements [db.DB]. 93 117 func (b *AdaptedDB) NewBatch() store.Batch { 94 - return &AdaptedBatch{b.underlying.NewBatch()} 118 + return &AdaptedBatch{ 119 + underlying: b.underlying.NewBatch(), 120 + zstdEncoder: b.zstdEncoder, 121 + } 95 122 } 96 123 97 124 // NewBatchWithSize implements [db.DB]. 98 125 func (b *AdaptedDB) NewBatchWithSize(int) store.Batch { 99 - return &AdaptedBatch{b.underlying.NewBatch()} 126 + return b.NewBatch() 100 127 } 101 128 102 129 type AdaptedBatch struct { 103 - dbm.Batch 130 + underlying dbm.Batch 131 + zstdEncoder *zstd.Encoder 132 + } 133 + 134 + // Close implements [store.Batch]. 135 + func (a *AdaptedBatch) Close() error { 136 + return a.underlying.Close() 137 + } 138 + 139 + // Delete implements [store.Batch]. 140 + func (a *AdaptedBatch) Delete(key []byte) error { 141 + return a.underlying.Delete(key) 142 + } 143 + 144 + // Set implements [store.Batch]. 145 + func (a *AdaptedBatch) Set(key []byte, value []byte) error { 146 + v := compressValue(a.zstdEncoder, value) 147 + return stacktrace.Propagate(a.underlying.Set(key, v), "") 148 + } 149 + 150 + // Write implements [store.Batch]. 151 + func (a *AdaptedBatch) Write() error { 152 + return a.underlying.Write() 153 + } 154 + 155 + // WriteSync implements [store.Batch]. 156 + func (a *AdaptedBatch) WriteSync() error { 157 + return a.underlying.WriteSync() 104 158 } 105 159 106 160 // GetByteSize implements [store.Batch]. 107 161 func (a *AdaptedBatch) GetByteSize() (int, error) { 108 162 return 0, nil 109 163 } 164 + 165 + func compressValue(encoder *zstd.Encoder, value []byte) []byte { 166 + if encoder == nil { 167 + return value 168 + } 169 + if len(value) < 192 { 170 + // this is probably a inner node of the iavl tree and we don't gain anything from compressing those 50-ish byte values 171 + return prepend(value, 0x00) 172 + } 173 + buf := make([]byte, 0, len(value)+5) // a bit of an extra buffer because, rarely, the value increases in size and this way we save on one reallocation 174 + return prepend(encoder.EncodeAll(value, buf), 0x01) 175 + } 176 + 177 + func decompressValue(decoder *zstd.Decoder, value []byte) ([]byte, error) { 178 + if decoder == nil || len(value) == 0 { 179 + return value, nil 180 + } else if value[0] == 0x00 { 181 + return value[1:], nil 182 + } 183 + // passing a nil output buffer to DecodeAll means it'll optimistically start by allocating len(value)*2 184 + // but we observe compression ratios better than 50% frequently, so we allocate a slice ourselves with cap len(value)*3 185 + value, err := decoder.DecodeAll(value[1:], make([]byte, 0, len(value)*3)) 186 + return value, stacktrace.Propagate(err, "") 187 + } 188 + 189 + // this is a simplified version of slices.Insert for prepending a single element to a slice, returning the modified slice 190 + func prepend[S ~[]E, E any](s S, v E) S { 191 + n := len(s) 192 + 193 + if n >= cap(s) { 194 + s2 := make(S, n+1) 195 + s2[0] = v 196 + copy(s2[1:], s) 197 + return s2 198 + } 199 + s = s[:n+1] 200 + copy(s[1:], s) 201 + s[0] = v 202 + 203 + return s 204 + }
dbadapter/zstddict/plcvalues

This is a binary file and will not be displayed.

+6
dbadapter/zstddict/plcvalues.go
··· 1 + package zstddict 2 + 3 + import _ "embed" 4 + 5 + //go:embed plcvalues 6 + var PLCZstdDict []byte
-2
plc/impl.go
··· 17 17 type plcImpl struct { 18 18 mu sync.Mutex // probably redundant, but let's keep for now 19 19 validator OperationValidator 20 - 21 - nextSeq uint64 22 20 } 23 21 24 22 var _ PLC = (*plcImpl)(nil)
+6 -32
store/tree.go
··· 15 15 ics23 "github.com/cosmos/ics23/go" 16 16 "github.com/did-method-plc/go-didplc" 17 17 cbornode "github.com/ipfs/go-ipld-cbor" 18 - "github.com/klauspost/compress/zstd" 19 18 "github.com/palantir/stacktrace" 20 19 "github.com/polydawn/refmt/obj/atlas" 21 20 "github.com/samber/lo" ··· 49 48 50 49 // TreeStore exists just to groups methods nicely 51 50 type TreeStore struct{} 52 - 53 - //go:embed zstddict/plcops 54 - var plcOpsZstdDict []byte 55 - 56 - // Using SpeedDefault appears to cause the processing time for ExecuteOperation to double on average 57 - // Using SpeedBetterCompression appears to cause the processing time to double again 58 - // By using SpeedFastest we seem to give up on like 5% size reduction, it's not worth using the slower speeds 59 - var zstdOpEncoder, _ = zstd.NewWriter(nil, zstd.WithEncoderDict(plcOpsZstdDict), zstd.WithEncoderLevel(zstd.SpeedFastest)) 60 - var zstdOpDecoder, _ = zstd.NewReader(nil, zstd.WithDecoderDicts(plcOpsZstdDict)) 61 51 62 52 func (t *TreeStore) ProduceOperationExamples(tx transaction.Read, interval, count int) iter.Seq[[]byte] { 63 53 return func(yield func([]byte) bool) { ··· 290 280 if err != nil { 291 281 return stacktrace.Propagate(err, "") 292 282 } 293 - operationValue, err = markCompressedOperationValueNullified(operationValue) 294 - if err != nil { 295 - return stacktrace.Propagate(err, "") 296 - } 297 283 298 - updated, err := tx.Tree().Set(opKey, operationValue) 284 + updated, err := tx.Tree().Set(opKey, markOperationValueNullified(operationValue)) 299 285 if err != nil { 300 286 return stacktrace.Propagate(err, "") 301 287 } ··· 477 463 binary.BigEndian.PutUint64(o[16:24], ts) 478 464 copy(o[24:], opAsBytes) 479 465 480 - return zstdOpEncoder.EncodeAll(o, make([]byte, 0, len(o))) 466 + return o 481 467 } 482 468 483 469 func unmarshalOperationValue(value []byte) (bool, string, time.Time, didplc.OpEnum, error) { 484 - // passing a nil output buffer to DecodeAll means it'll optimistically start by allocating len(value)*2 485 - // but we observe compression ratios better than 50% sometimes, so allocate len(value)*3 instead 486 - value, err := zstdOpDecoder.DecodeAll(value, make([]byte, 0, len(value)*3)) 487 - if err != nil { 488 - return false, "", time.Time{}, didplc.OpEnum{}, stacktrace.Propagate(err, "") 489 - } 490 - 491 470 nullified := value[0] != 0 492 471 493 472 did, err := bytesToDID(value[1:16]) ··· 506 485 return nullified, did, createdAt, opEnum, nil 507 486 } 508 487 509 - func markCompressedOperationValueNullified(value []byte) ([]byte, error) { 510 - value, err := zstdOpDecoder.DecodeAll(value, make([]byte, 0, len(value)*3)) 511 - if err != nil { 512 - return nil, stacktrace.Propagate(err, "") 513 - } 514 - 515 - value[0] = 1 516 - 517 - return zstdOpEncoder.EncodeAll(value, make([]byte, 0, len(value))), nil 488 + func markOperationValueNullified(value []byte) []byte { 489 + v := slices.Clone(value) 490 + v[0] = 1 491 + return v 518 492 } 519 493 520 494 func unmarshalLogEntry(operationKey, operationValue []byte) (types.SequencedLogEntry, error) {
store/zstddict/plcops

This is a binary file and will not be displayed.