A very experimental PLC implementation which uses BFT consensus for decentralization

Tentatively bring back badger and fix transaction state management

leveldb causes too much write amplification on SSDs to the point of being ridiculous, especially when paired with iavl's merkle tree updates.
badger uses _much_ more memory but if we separate the operation values (not the tree inner nodes) into its value log, it seems to write significantly fewer GBs to the disk and compactions are either less frequent or much less noticeable (i.e. they don't stall the normal operation of the application as much).
(writing to the iavl tree still causes many more GBs to be written to disk than the data that's actually being inserted into the tree, but such appears to be the cost of using merkle trees)

gbl08ma.com 6554bd55 7f639431

verified
+337 -31
+5 -3
abciapp/app.go
··· 23 23 runnerContext context.Context 24 24 plc plc.PLC 25 25 txFactory *transaction.Factory 26 + indexDB dbm.DB 26 27 tree *iavl.MutableTree 27 28 fullyClearApplicationData func() error 28 29 ··· 39 40 } 40 41 41 42 // store and plc must be able to share transaction objects 42 - func NewDIDPLCApplication(treeDB dbm.DB, indexDB dbm.DB, clearData func() (dbm.DB, dbm.DB), snapshotDirectory string) (*DIDPLCApplication, *transaction.Factory, plc.PLC, func(), error) { 43 + func NewDIDPLCApplication(treeDB dbm.DB, indexDB dbm.DB, clearData func(), snapshotDirectory string) (*DIDPLCApplication, *transaction.Factory, plc.PLC, func(), error) { 43 44 mkTree := func() *iavl.MutableTree { 44 - return iavl.NewMutableTree(dbadapter.Adapt(treeDB), 2048, false, iavl.NewNopLogger(), iavl.AsyncPruningOption(false)) 45 + return iavl.NewMutableTree(dbadapter.Adapt(treeDB), 500000, false, iavl.NewNopLogger(), iavl.AsyncPruningOption(false)) 45 46 } 46 47 47 48 tree := mkTree() ··· 61 62 d := &DIDPLCApplication{ 62 63 runnerContext: context.Background(), 63 64 tree: tree, 65 + indexDB: indexDB, 64 66 txFactory: transaction.NewFactory(tree, indexDB, store.Tree.NextOperationSequence), 65 67 snapshotDirectory: snapshotDirectory, 66 68 aocsByPLC: make(map[string]*authoritativeOperationsCache), ··· 74 76 return stacktrace.Propagate(err, "") 75 77 } 76 78 77 - treeDB, indexDB = clearData() 79 + clearData() 78 80 79 81 *d.tree = *mkTree() 80 82
+7
abciapp/execution.go
··· 103 103 104 104 // ProcessProposal implements [types.Application]. 105 105 func (d *DIDPLCApplication) ProcessProposal(ctx context.Context, req *abcitypes.RequestProcessProposal) (*abcitypes.ResponseProcessProposal, error) { 106 + // always reset state before processing a new proposal 107 + d.DiscardChanges() 108 + // do not unconditionally defer DiscardChanges because we want to re-use the results in FinalizeBlock when we vote accept 109 + 106 110 // do not rollback tree in this method, in case the changes can be reused in FinalizeBlock 107 111 if req.Height != d.tree.WorkingVersion() { 108 112 // our tree went out of sync, this should never happen ··· 218 222 cb() 219 223 } 220 224 } 225 + 226 + d.ongoingWrite = nil 227 + d.ongoingRead = nil 221 228 222 229 return &abcitypes.ResponseCommit{ 223 230 // TODO only discard actual blockchain history based on settings
+315
badger.go
··· 1 + package main 2 + 3 + import ( 4 + "bytes" 5 + "errors" 6 + "fmt" 7 + "os" 8 + "path/filepath" 9 + 10 + dbm "github.com/cometbft/cometbft-db" 11 + "github.com/dgraph-io/badger/v4" 12 + ) 13 + 14 + // Much of this code is lifted from cometbft-db, slightly changed to give us: 15 + // - Ability to use badger without specifying additional build tags (an annoyance) 16 + // - Ability to set specific options on the badger database 17 + // - Ability to get a reference to the underlying badger database (e.g. for dropping all entries on snapshot import) 18 + 19 + var ( 20 + // errKeyEmpty is returned when attempting to use an empty or nil key. 21 + errKeyEmpty = errors.New("key cannot be empty") 22 + 23 + // errValueNil is returned when attempting to set a nil value. 24 + errValueNil = errors.New("value cannot be nil") 25 + ) 26 + 27 + func NewBadgerDB(dbName, dir string) (*badger.DB, *BadgerDB, error) { 28 + // Since Badger doesn't support database names, we join both to obtain 29 + // the final directory to use for the database. 30 + path := filepath.Join(dir, dbName) 31 + 32 + if err := os.MkdirAll(path, 0o755); err != nil { 33 + return nil, nil, err 34 + } 35 + opts := badger.DefaultOptions(path) 36 + opts.SyncWrites = false // note that we have Sync methods 37 + //opts.Logger = nil // badger is too chatty by default 38 + opts.NumCompactors = 8 39 + 40 + // use low ValueThreshold to force operation values into the value logs 41 + // sadly this means they won't be compressed at all (TODO implement compression ourselves, maybe based on a fixed zstd dictionary?) 42 + // because these large values don't seem to change (unlike the non-leaf nodes within the iavl tree, which are changing all the time), 43 + // this makes compaction much faster and decreases SSD thrashing 44 + opts.ValueThreshold = 256 45 + return NewBadgerDBWithOptions(opts) 46 + } 47 + 48 + // NewBadgerDBWithOptions creates a BadgerDB key value store 49 + // gives the flexibility of initializing a database with the 50 + // respective options. 51 + func NewBadgerDBWithOptions(opts badger.Options) (*badger.DB, *BadgerDB, error) { 52 + db, err := badger.Open(opts) 53 + if err != nil { 54 + return nil, nil, err 55 + } 56 + return db, &BadgerDB{db: db}, nil 57 + } 58 + 59 + type BadgerDB struct { 60 + db *badger.DB 61 + } 62 + 63 + var _ dbm.DB = (*BadgerDB)(nil) 64 + 65 + func (b *BadgerDB) Get(key []byte) ([]byte, error) { 66 + if len(key) == 0 { 67 + return nil, errKeyEmpty 68 + } 69 + var val []byte 70 + err := b.db.View(func(txn *badger.Txn) error { 71 + item, err := txn.Get(key) 72 + if err == badger.ErrKeyNotFound { 73 + return nil 74 + } else if err != nil { 75 + return err 76 + } 77 + val, err = item.ValueCopy(nil) 78 + if err == nil && val == nil { 79 + val = []byte{} 80 + } 81 + return err 82 + }) 83 + return val, err 84 + } 85 + 86 + func (b *BadgerDB) Has(key []byte) (bool, error) { 87 + if len(key) == 0 { 88 + return false, errKeyEmpty 89 + } 90 + var found bool 91 + err := b.db.View(func(txn *badger.Txn) error { 92 + _, err := txn.Get(key) 93 + if err != nil && err != badger.ErrKeyNotFound { 94 + return err 95 + } 96 + found = (err != badger.ErrKeyNotFound) 97 + return nil 98 + }) 99 + return found, err 100 + } 101 + 102 + func (b *BadgerDB) Set(key, value []byte) error { 103 + if len(key) == 0 { 104 + return errKeyEmpty 105 + } 106 + if value == nil { 107 + return errValueNil 108 + } 109 + return b.db.Update(func(txn *badger.Txn) error { 110 + return txn.Set(key, value) 111 + }) 112 + } 113 + 114 + func withSync(db *badger.DB, err error) error { 115 + if err != nil { 116 + return err 117 + } 118 + return db.Sync() 119 + } 120 + 121 + func (b *BadgerDB) SetSync(key, value []byte) error { 122 + return withSync(b.db, b.Set(key, value)) 123 + } 124 + 125 + func (b *BadgerDB) Delete(key []byte) error { 126 + if len(key) == 0 { 127 + return errKeyEmpty 128 + } 129 + return b.db.Update(func(txn *badger.Txn) error { 130 + return txn.Delete(key) 131 + }) 132 + } 133 + 134 + func (b *BadgerDB) DeleteSync(key []byte) error { 135 + return withSync(b.db, b.Delete(key)) 136 + } 137 + 138 + func (b *BadgerDB) Close() error { 139 + return b.db.Close() 140 + } 141 + 142 + func (b *BadgerDB) Print() error { 143 + return nil 144 + } 145 + 146 + func (b *BadgerDB) iteratorOpts(start, end []byte, opts badger.IteratorOptions) (*badgerDBIterator, error) { 147 + if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) { 148 + return nil, errKeyEmpty 149 + } 150 + txn := b.db.NewTransaction(false) 151 + iter := txn.NewIterator(opts) 152 + iter.Rewind() 153 + iter.Seek(start) 154 + if opts.Reverse && iter.Valid() && bytes.Equal(iter.Item().Key(), start) { 155 + // If we're going in reverse, our starting point was "end", 156 + // which is exclusive. 157 + iter.Next() 158 + } 159 + return &badgerDBIterator{ 160 + reverse: opts.Reverse, 161 + start: start, 162 + end: end, 163 + 164 + txn: txn, 165 + iter: iter, 166 + }, nil 167 + } 168 + 169 + func (b *BadgerDB) Iterator(start, end []byte) (dbm.Iterator, error) { 170 + opts := badger.DefaultIteratorOptions 171 + return b.iteratorOpts(start, end, opts) 172 + } 173 + 174 + func (b *BadgerDB) ReverseIterator(start, end []byte) (dbm.Iterator, error) { 175 + opts := badger.DefaultIteratorOptions 176 + opts.Reverse = true 177 + return b.iteratorOpts(end, start, opts) 178 + } 179 + 180 + func (b *BadgerDB) Stats() map[string]string { 181 + return nil 182 + } 183 + 184 + func (b *BadgerDB) Compact(start, end []byte) error { 185 + // Explicit compaction is not currently supported in badger 186 + return nil 187 + } 188 + 189 + func (b *BadgerDB) NewBatch() dbm.Batch { 190 + wb := &badgerDBBatch{ 191 + db: b.db, 192 + wb: b.db.NewWriteBatch(), 193 + firstFlush: make(chan struct{}, 1), 194 + } 195 + wb.firstFlush <- struct{}{} 196 + return wb 197 + } 198 + 199 + var _ dbm.Batch = (*badgerDBBatch)(nil) 200 + 201 + type badgerDBBatch struct { 202 + db *badger.DB 203 + wb *badger.WriteBatch 204 + 205 + // Calling db.Flush twice panics, so we must keep track of whether we've 206 + // flushed already on our own. If Write can receive from the firstFlush 207 + // channel, then it's the first and only Flush call we should do. 208 + // 209 + // Upstream bug report: 210 + // https://github.com/dgraph-io/badger/issues/1394 211 + firstFlush chan struct{} 212 + } 213 + 214 + func (b *badgerDBBatch) Set(key, value []byte) error { 215 + if len(key) == 0 { 216 + return errKeyEmpty 217 + } 218 + if value == nil { 219 + return errValueNil 220 + } 221 + return b.wb.Set(key, value) 222 + } 223 + 224 + func (b *badgerDBBatch) Delete(key []byte) error { 225 + if len(key) == 0 { 226 + return errKeyEmpty 227 + } 228 + return b.wb.Delete(key) 229 + } 230 + 231 + func (b *badgerDBBatch) Write() error { 232 + select { 233 + case <-b.firstFlush: 234 + return b.wb.Flush() 235 + default: 236 + return fmt.Errorf("batch already flushed") 237 + } 238 + } 239 + 240 + func (b *badgerDBBatch) WriteSync() error { 241 + return withSync(b.db, b.Write()) 242 + } 243 + 244 + func (b *badgerDBBatch) Close() error { 245 + select { 246 + case <-b.firstFlush: // a Flush after Cancel panics too 247 + default: 248 + } 249 + b.wb.Cancel() 250 + return nil 251 + } 252 + 253 + type badgerDBIterator struct { 254 + reverse bool 255 + start, end []byte 256 + 257 + txn *badger.Txn 258 + iter *badger.Iterator 259 + 260 + lastErr error 261 + } 262 + 263 + func (i *badgerDBIterator) Close() error { 264 + i.iter.Close() 265 + i.txn.Discard() 266 + return nil 267 + } 268 + 269 + func (i *badgerDBIterator) Domain() (start, end []byte) { return i.start, i.end } 270 + func (i *badgerDBIterator) Error() error { return i.lastErr } 271 + 272 + func (i *badgerDBIterator) Next() { 273 + if !i.Valid() { 274 + panic("iterator is invalid") 275 + } 276 + i.iter.Next() 277 + } 278 + 279 + func (i *badgerDBIterator) Valid() bool { 280 + if !i.iter.Valid() { 281 + return false 282 + } 283 + if len(i.end) > 0 { 284 + key := i.iter.Item().Key() 285 + if c := bytes.Compare(key, i.end); (!i.reverse && c >= 0) || (i.reverse && c < 0) { 286 + // We're at the end key, or past the end. 287 + return false 288 + } 289 + } 290 + return true 291 + } 292 + 293 + // Key implements Iterator. 294 + // The caller should not modify the contents of the returned slice. 295 + // Instead, the caller should make a copy and work on the copy. 296 + func (i *badgerDBIterator) Key() []byte { 297 + if !i.Valid() { 298 + panic("iterator is invalid") 299 + } 300 + return i.iter.Item().Key() 301 + } 302 + 303 + // Value implements Iterator. 304 + // The returned slice is a copy of the original data, therefore it is safe to modify. 305 + func (i *badgerDBIterator) Value() []byte { 306 + if !i.Valid() { 307 + panic("iterator is invalid") 308 + } 309 + 310 + val, err := i.iter.Item().ValueCopy(nil) 311 + if err != nil { 312 + i.lastErr = err 313 + } 314 + return val 315 + }
+1 -1
go.mod
··· 9 9 github.com/cometbft/cometbft-db v0.14.1 10 10 github.com/cosmos/iavl v1.3.5 11 11 github.com/cosmos/ics23/go v0.10.0 12 + github.com/dgraph-io/badger/v4 v4.9.0 12 13 github.com/did-method-plc/go-didplc v0.0.0-20251125183445-342320c327e2 13 14 github.com/google/uuid v1.6.0 14 15 github.com/ipfs/go-cid v0.4.1 ··· 38 39 github.com/cosmos/gogoproto v1.7.0 // indirect 39 40 github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect 40 41 github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 // indirect 41 - github.com/dgraph-io/badger/v4 v4.9.0 // indirect 42 42 github.com/dgraph-io/ristretto/v2 v2.2.0 // indirect 43 43 github.com/dustin/go-humanize v1.0.1 // indirect 44 44 github.com/emicklei/dot v1.6.2 // indirect
+9 -27
main.go
··· 6 6 "log" 7 7 "os" 8 8 "os/signal" 9 - "path" 10 9 "path/filepath" 11 10 "sync" 12 11 "syscall" 13 12 "time" 14 13 15 - db "github.com/cometbft/cometbft-db" 16 14 "github.com/cometbft/cometbft/p2p" 17 15 "github.com/cometbft/cometbft/privval" 18 16 "github.com/cometbft/cometbft/proxy" ··· 56 54 var wg sync.WaitGroup 57 55 closeGoroutinesCh := make(chan struct{}) 58 56 59 - treeDBContext := &bftconfig.DBContext{ID: "apptree", Config: config.Config} 60 - treeDB, err := bftconfig.DefaultDBProvider(treeDBContext) 57 + underlyingTreeDB, treeDB, err := NewBadgerDB("apptree", config.Config.DBDir()) 61 58 if err != nil { 62 - log.Fatalf("failed to create application database: %v", err) 59 + log.Fatalf("failed to create application tree database: %v", err) 63 60 } 64 61 65 - indexDBContext := &bftconfig.DBContext{ID: "appindex", Config: config.Config} 66 - indexDB, err := bftconfig.DefaultDBProvider(indexDBContext) 62 + underlyingIndexDB, indexDB, err := NewBadgerDB("appindex", config.Config.DBDir()) 67 63 if err != nil { 68 - log.Fatalf("failed to create application database: %v", err) 64 + log.Fatalf("failed to create application index database: %v", err) 69 65 } 70 66 71 67 defer func() { ··· 77 73 } 78 74 }() 79 75 80 - recreateDatabases := func() (db.DB, db.DB) { 81 - if err := treeDB.Close(); err != nil { 82 - log.Printf("Closing application tree database for clearing: %v", err) 83 - } 84 - if err := indexDB.Close(); err != nil { 85 - log.Printf("Closing application index database for clearing: %v", err) 86 - } 87 - 88 - // we're depending on an implementation detail of cometbft, but I'm yet to find a more elegant way to do this 89 - _ = os.RemoveAll(path.Join(homeDir, "data/appindex.db")) 90 - _ = os.RemoveAll(path.Join(homeDir, "data/apptree.db")) 91 - 92 - var err error 93 - treeDB, err = bftconfig.DefaultDBProvider(treeDBContext) 76 + recreateDatabases := func() { 77 + err := underlyingTreeDB.DropAll() 94 78 if err != nil { 95 - log.Fatalf("failed to create application database: %v", err) 79 + log.Fatalf("failed to drop application tree database: %v", err) 96 80 } 97 81 98 - indexDB, err = bftconfig.DefaultDBProvider(indexDBContext) 82 + err = underlyingIndexDB.DropAll() 99 83 if err != nil { 100 - log.Fatalf("failed to create application database: %v", err) 84 + log.Fatalf("failed to drop application index database: %v", err) 101 85 } 102 - 103 - return treeDB, indexDB 104 86 } 105 87 106 88 app, txFactory, plc, cleanup, err := abciapp.NewDIDPLCApplication(treeDB, indexDB, recreateDatabases, filepath.Join(homeDir, "snapshots"))