A very experimental PLC implementation which uses BFT consensus for decentralization

Speed up import operations by 10x

And all it took was disabling prefetching on the iterator (the bloom filter then applied an additional 2x speedup)

gbl08ma.com e6c75929 a9223d48

verified
+195 -31
+11 -3
abciapp/app.go
··· 42 42 } 43 43 44 44 // store and plc must be able to share transaction objects 45 - func NewDIDPLCApplication(treeDB dbm.DB, indexDB dbm.DB, clearData func(), snapshotDirectory string) (*DIDPLCApplication, *transaction.Factory, plc.PLC, func(), error) { 45 + func NewDIDPLCApplication(treeDB dbm.DB, indexDB transaction.ExtendedDB, clearData func(), snapshotDirectory string) (*DIDPLCApplication, *transaction.Factory, plc.PLC, func(), error) { 46 46 mkTree := func() *iavl.MutableTree { 47 47 // Using SpeedDefault appears to cause the processing time for ExecuteOperation to double on average 48 48 // Using SpeedBetterCompression appears to cause the processing time to double again ··· 68 68 runnerContext: context.Background(), 69 69 tree: tree, 70 70 indexDB: indexDB, 71 - txFactory: transaction.NewFactory(tree, indexDB, store.Tree.NextOperationSequence), 72 71 snapshotDirectory: snapshotDirectory, 73 72 aocsByPLC: make(map[string]*authoritativeOperationsCache), 74 73 } 75 74 75 + d.txFactory, err = transaction.NewFactory(tree, indexDB, store.Tree.NextOperationSequence, store.Tree.BuildDIDBloomFilter) 76 + if err != nil { 77 + return nil, nil, nil, func() {}, stacktrace.Propagate(err, "") 78 + } 79 + 80 + 76 81 d.fullyClearApplicationData = func() error { 77 82 // we assume this is called in a single-threaded context, which should be a safe assumption since we'll only call this during snapshot import 78 83 // and CometBFT only calls one ABCI method at a time ··· 85 90 86 91 *d.tree = *mkTree() 87 92 88 - d.txFactory = transaction.NewFactory(tree, indexDB, store.Tree.NextOperationSequence) 93 + d.txFactory, err = transaction.NewFactory(tree, indexDB, store.Tree.NextOperationSequence, store.Tree.BuildDIDBloomFilter) 94 + if err != nil { 95 + return stacktrace.Propagate(err, "") 96 + } 89 97 return nil 90 98 } 91 99
+13 -1
abciapp/app_test.go
··· 6 6 7 7 dbm "github.com/cometbft/cometbft-db" 8 8 "github.com/cometbft/cometbft/abci/types" 9 + "github.com/dgraph-io/badger/v4" 9 10 cbornode "github.com/ipfs/go-ipld-cbor" 10 11 "github.com/stretchr/testify/require" 11 12 "tangled.org/gbl08ma.com/didplcbft/abciapp" ··· 21 22 } 22 23 23 24 func TestCheckTx(t *testing.T) { 24 - app, _, _, cleanup, err := abciapp.NewDIDPLCApplication(dbm.NewMemDB(), dbm.NewMemDB(), nil, "") 25 + app, _, _, cleanup, err := abciapp.NewDIDPLCApplication(dbm.NewMemDB(), memDBWrapper{dbm.NewMemDB()}, nil, "") 25 26 require.NoError(t, err) 26 27 t.Cleanup(cleanup) 27 28 ··· 63 64 require.NoError(t, err) 64 65 require.Equal(t, uint32(0), response.Code) 65 66 } 67 + 68 + type memDBWrapper struct { 69 + dbm.DB 70 + } 71 + 72 + func (w memDBWrapper) IteratorWithOptions(start, end []byte, opts badger.IteratorOptions) (dbm.Iterator, error) { 73 + if opts.Reverse { 74 + return w.ReverseIterator(start, end) 75 + } 76 + return w.Iterator(start, end) 77 + }
+8 -3
badger.go
··· 144 144 return nil 145 145 } 146 146 147 - func (b *BadgerDB) iteratorOpts(start, end []byte, opts badger.IteratorOptions) (*badgerDBIterator, error) { 147 + func (b *BadgerDB) IteratorWithOptions(start, end []byte, opts badger.IteratorOptions) (dbm.Iterator, error) { 148 148 if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) { 149 149 return nil, errKeyEmpty 150 150 } 151 + if opts.Reverse { 152 + start, end = end, start 153 + } 154 + 151 155 txn := b.db.NewTransaction(false) 152 156 iter := txn.NewIterator(opts) 153 157 iter.Rewind() ··· 157 161 // which is exclusive. 158 162 iter.Next() 159 163 } 164 + 160 165 return &badgerDBIterator{ 161 166 reverse: opts.Reverse, 162 167 start: start, ··· 169 174 170 175 func (b *BadgerDB) Iterator(start, end []byte) (dbm.Iterator, error) { 171 176 opts := badger.DefaultIteratorOptions 172 - return b.iteratorOpts(start, end, opts) 177 + return b.IteratorWithOptions(start, end, opts) 173 178 } 174 179 175 180 func (b *BadgerDB) ReverseIterator(start, end []byte) (dbm.Iterator, error) { 176 181 opts := badger.DefaultIteratorOptions 177 182 opts.Reverse = true 178 - return b.iteratorOpts(end, start, opts) 183 + return b.IteratorWithOptions(start, end, opts) 179 184 } 180 185 181 186 func (b *BadgerDB) Stats() map[string]string {
+2
go.mod
··· 4 4 5 5 require ( 6 6 cosmossdk.io/core v0.12.1-0.20240725072823-6a2d039e1212 7 + github.com/bits-and-blooms/bloom/v3 v3.7.1 7 8 github.com/bluesky-social/indigo v0.0.0-20251009212240-20524de167fe 8 9 github.com/cometbft/cometbft v0.38.19 9 10 github.com/cometbft/cometbft-db v0.14.1 ··· 29 30 require ( 30 31 github.com/DataDog/zstd v1.4.5 // indirect 31 32 github.com/beorn7/perks v1.0.1 // indirect 33 + github.com/bits-and-blooms/bitset v1.24.2 // indirect 32 34 github.com/cespare/xxhash/v2 v2.3.0 // indirect 33 35 github.com/cockroachdb/errors v1.11.3 // indirect 34 36 github.com/cockroachdb/fifo v0.0.0-20240606204812-0bbfbd93a7ce // indirect
+6
go.sum
··· 15 15 github.com/adlio/schema v1.3.6/go.mod h1:qkxwLgPBd1FgLRHYVCmQT/rrBr3JH38J9LjmVzWNudg= 16 16 github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= 17 17 github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= 18 + github.com/bits-and-blooms/bitset v1.24.2 h1:M7/NzVbsytmtfHbumG+K2bremQPMJuqv1JD3vOaFxp0= 19 + github.com/bits-and-blooms/bitset v1.24.2/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= 20 + github.com/bits-and-blooms/bloom/v3 v3.7.1 h1:WXovk4TRKZttAMJfoQx6K2DM0zNIt8w+c67UqO+etV0= 21 + github.com/bits-and-blooms/bloom/v3 v3.7.1/go.mod h1:rZzYLLje2dfzXfAkJNxQQHsKurAyK55KUnL43Euk0hU= 18 22 github.com/bluesky-social/indigo v0.0.0-20251009212240-20524de167fe h1:VBhaqE5ewQgXbY5SfSWFZC/AwHFo7cHxZKFYi2ce9Yo= 19 23 github.com/bluesky-social/indigo v0.0.0-20251009212240-20524de167fe/go.mod h1:RuQVrCGm42QNsgumKaR6se+XkFKfCPNwdCiTvqKRUck= 20 24 github.com/btcsuite/btcd/btcutil v1.1.6 h1:zFL2+c3Lb9gEgqKNzowKUPQNb8jV7v5Oaodi/AYFd6c= ··· 287 291 github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= 288 292 github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 h1:epCh84lMvA70Z7CTTCmYQn2CKbY8j86K7/FAIr141uY= 289 293 github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc= 294 + github.com/twmb/murmur3 v1.1.8 h1:8Yt9taO/WN3l08xErzjeschgZU2QSrwm1kclYq+0aRg= 295 + github.com/twmb/murmur3 v1.1.8/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ= 290 296 github.com/ucarion/urlpath v0.0.0-20200424170820-7ccc79b76bbb h1:Ywfo8sUltxogBpFuMOFRrrSifO788kAFxmvVw31PtQQ= 291 297 github.com/ucarion/urlpath v0.0.0-20200424170820-7ccc79b76bbb/go.mod h1:ikPs9bRWicNw3S7XpJ8sK/smGwU9WcSVU3dy9qahYBM= 292 298 github.com/urfave/cli v1.22.10/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
+2 -1
httpapi/server_test.go
··· 110 110 // this tree is just to avoid a nil pointer when creating a transaction with the factory 111 111 // the transactions don't actually get used 112 112 tree := iavl.NewMutableTree(dbm.NewMemDB(), 128, false, iavl.NewNopLogger()) 113 - txFactory := transaction.NewFactory(tree, nil, store.Tree.NextOperationSequence) 113 + txFactory, err := transaction.NewFactory(tree, nil, store.Tree.NextOperationSequence, store.Tree.BuildDIDBloomFilter) 114 + require.NoError(t, err) 114 115 115 116 t.Run("Test Resolve DID", func(t *testing.T) { 116 117 server, err := NewServer(txFactory, mockPLC, nil, "tcp://127.0.0.1:8080", 15*time.Second)
+2 -2
plc/impl.go
··· 29 29 func (plc *plcImpl) ValidateOperation(ctx context.Context, readTx transaction.Read, did string, opBytes []byte) error { 30 30 timestamp := syntax.Datetime(readTx.Timestamp().Format(types.ActualAtprotoDatetimeLayout)) 31 31 32 - // TODO set true to false only while importing old ops 32 + // TODO set last parameter to true only while importing old ops 33 33 _, err := plc.validator.Validate(ctx, readTx, timestamp, did, opBytes, true) 34 34 if err != nil { 35 35 return stacktrace.Propagate(err, "operation failed validation") ··· 41 41 func (plc *plcImpl) ExecuteOperation(ctx context.Context, tx transaction.Write, did string, opBytes []byte) error { 42 42 timestamp := syntax.Datetime(tx.Timestamp().Format(types.ActualAtprotoDatetimeLayout)) 43 43 44 - // TODO set true to false only while importing old ops 44 + // TODO set last parameter to true only while importing old ops 45 45 effects, err := plc.validator.Validate(ctx, tx.Downgrade(), timestamp, did, opBytes, true) 46 46 if err != nil { 47 47 return stacktrace.Propagate(err, "operation failed validation")
+20 -3
plc/testutil_test.go
··· 4 4 dbm "github.com/cometbft/cometbft-db" 5 5 "github.com/cosmos/iavl" 6 6 iavldb "github.com/cosmos/iavl/db" 7 + "github.com/dgraph-io/badger/v4" 7 8 "github.com/palantir/stacktrace" 8 9 "tangled.org/gbl08ma.com/didplcbft/store" 9 10 "tangled.org/gbl08ma.com/didplcbft/transaction" 10 11 ) 11 12 12 - func NewTestTxFactory() (*transaction.Factory, *iavl.MutableTree, dbm.DB) { 13 + func NewTestTxFactory() (*transaction.Factory, *iavl.MutableTree, transaction.ExtendedDB) { 13 14 tree := iavl.NewMutableTree(iavldb.NewMemDB(), 128, false, iavl.NewNopLogger()) 14 15 _, _, err := tree.SaveVersion() 15 16 if err != nil { 16 17 panic(stacktrace.Propagate(err, "")) 17 18 } 18 19 19 - indexDB := dbm.NewMemDB() 20 - return transaction.NewFactory(tree, indexDB, store.Tree.NextOperationSequence), tree, indexDB 20 + indexDB := memDBWrapper{dbm.NewMemDB()} 21 + factory, err := transaction.NewFactory(tree, indexDB, store.Tree.NextOperationSequence, store.Tree.BuildDIDBloomFilter) 22 + if err != nil { 23 + panic(stacktrace.Propagate(err, "")) 24 + } 25 + 26 + return factory, tree, indexDB 27 + } 28 + 29 + type memDBWrapper struct { 30 + dbm.DB 31 + } 32 + 33 + func (w memDBWrapper) IteratorWithOptions(start, end []byte, opts badger.IteratorOptions) (dbm.Iterator, error) { 34 + if opts.Reverse { 35 + return w.ReverseIterator(start, end) 36 + } 37 + return w.Iterator(start, end) 21 38 }
+47 -2
store/tree.go
··· 11 11 "strings" 12 12 "time" 13 13 14 + "github.com/bits-and-blooms/bloom/v3" 14 15 "github.com/bluesky-social/indigo/atproto/syntax" 15 16 ics23 "github.com/cosmos/ics23/go" 17 + "github.com/dgraph-io/badger/v4" 16 18 "github.com/did-method-plc/go-didplc" 17 19 cbornode "github.com/ipfs/go-ipld-cbor" 18 20 "github.com/palantir/stacktrace" ··· 27 29 var Tree PLCTreeStore = &TreeStore{} 28 30 29 31 type PLCTreeStore interface { 32 + BuildDIDBloomFilter(tx transaction.Read) (*bloom.BloomFilter, error) 30 33 AuditLog(ctx context.Context, tx transaction.Read, did string, withProof bool) ([]types.SequencedLogEntry, *ics23.CommitmentProof, error) 31 34 AuditLogReverseIterator(ctx context.Context, tx transaction.Read, did string, err *error) iter.Seq[types.SequencedLogEntry] 32 35 ExportOperations(ctx context.Context, tx transaction.Read, after uint64, count int) ([]types.SequencedLogEntry, error) // passing a count of zero means unlimited ··· 49 52 // TreeStore exists just to groups methods nicely 50 53 type TreeStore struct{} 51 54 55 + func (t *TreeStore) BuildDIDBloomFilter(tx transaction.Read) (*bloom.BloomFilter, error) { 56 + // TODO find an elegant way to dynamically size the bloom filter adequately as the number of DIDs grows 57 + filter := bloom.NewWithEstimates(100000000, 0.01) 58 + 59 + didRangeStart := marshalDIDLogKey(make([]byte, 15), 0) 60 + didRangeEnd := marshalDIDLogKey(slices.Repeat([]byte{0xff}, 15), math.MaxUint64) 61 + 62 + iterator, err := tx.IndexDB().Iterator(didRangeStart, didRangeEnd) 63 + if err != nil { 64 + return nil, stacktrace.Propagate(err, "") 65 + } 66 + 67 + defer iterator.Close() 68 + 69 + for iterator.Valid() { 70 + filter.Add(iterator.Key()[1:16]) 71 + 72 + iterator.Next() 73 + } 74 + err = iterator.Error() 75 + if err != nil { 76 + return nil, stacktrace.Propagate(err, "") 77 + } 78 + 79 + return filter, nil 80 + } 81 + 52 82 func (t *TreeStore) ProduceOperationExamples(tx transaction.Read, interval, count int) iter.Seq[[]byte] { 53 83 return func(yield func([]byte) bool) { 54 84 for i := 0; i < count*interval; i += interval { ··· 145 175 return 146 176 } 147 177 178 + mayBePresent := tx.TestDIDBloomFilter(didBytes) 179 + if !mayBePresent { 180 + return 181 + } 182 + 148 183 didRangeStart := marshalDIDLogKey(didBytes, 0) 149 184 didRangeEnd := marshalDIDLogKey(didBytes, math.MaxUint64) 150 185 151 - didLogIterator, err := tx.IndexDB().ReverseIterator(didRangeStart, didRangeEnd) 186 + opts := badger.DefaultIteratorOptions 187 + // we rarely have more than one entry for a DID and we see great performance gains from not prefetching 188 + // (iterating on DIDs that don't exist goes from taking over 150us to taking around 35 us) 189 + opts.PrefetchValues = false 190 + opts.Reverse = true 191 + didLogIterator, err := tx.IndexDB().IteratorWithOptions(didRangeStart, didRangeEnd, opts) 152 192 if err != nil { 153 193 *retErr = stacktrace.Propagate(err, "") 154 194 return ··· 249 289 didRangeStart := marshalDIDLogKey(didBytes, nullifyEGt) 250 290 didRangeEnd := marshalDIDLogKey(didBytes, math.MaxUint64) 251 291 252 - didLogIterator, err := tx.IndexDB().ReverseIterator(didRangeStart, didRangeEnd) 292 + opts := badger.DefaultIteratorOptions 293 + opts.PrefetchValues = false 294 + opts.Reverse = true 295 + didLogIterator, err := tx.IndexDB().IteratorWithOptions(didRangeStart, didRangeEnd, opts) 253 296 if err != nil { 254 297 return stacktrace.Propagate(err, "") 255 298 } ··· 330 373 if err != nil { 331 374 return stacktrace.Propagate(err, "") 332 375 } 376 + 377 + tx.AddToDIDBloomFilter(didBytes) 333 378 334 379 return nil 335 380 }
+5
transaction/interface.go
··· 4 4 "time" 5 5 6 6 dbm "github.com/cometbft/cometbft-db" 7 + "github.com/dgraph-io/badger/v4" 7 8 ) 8 9 9 10 type Read interface { ··· 12 13 13 14 Tree() ReadTree 14 15 IndexDB() ReadIndex 16 + TestDIDBloomFilter(did []byte) bool 15 17 16 18 Upgrade() (Write, error) 17 19 } ··· 23 25 NextSequence() (uint64, error) 24 26 Tree() UnifiedTree 25 27 IndexDB() WriteIndex 28 + TestDIDBloomFilter(did []byte) bool 29 + AddToDIDBloomFilter(did []byte) 26 30 27 31 Commit() error 28 32 Rollback() error ··· 34 38 Get([]byte) ([]byte, error) 35 39 Has(key []byte) (bool, error) 36 40 41 + IteratorWithOptions(start, end []byte, opts badger.IteratorOptions) (dbm.Iterator, error) 37 42 Iterator(start, end []byte) (dbm.Iterator, error) 38 43 ReverseIterator(start, end []byte) (dbm.Iterator, error) 39 44 }
+7
transaction/read_on_write_tx.go
··· 33 33 return d.w.readTx.mutableTree 34 34 } 35 35 36 + // TestDIDBloomFilter implements [Read]. 37 + func (d *readOnWriteTx) TestDIDBloomFilter(did []byte) bool { 38 + d.w.readTx.bloomMu.RLock() 39 + defer d.w.readTx.bloomMu.RUnlock() 40 + return d.w.readTx.bloomFilter.Test(did) 41 + } 42 + 36 43 // Upgrade implements [Read]. 37 44 func (d *readOnWriteTx) Upgrade() (Write, error) { 38 45 return d.w, nil
+42 -10
transaction/read_tx.go
··· 1 1 package transaction 2 2 3 3 import ( 4 + "sync" 4 5 "time" 5 6 7 + "github.com/bits-and-blooms/bloom/v3" 6 8 dbm "github.com/cometbft/cometbft-db" 7 9 "github.com/cosmos/iavl" 10 + "github.com/dgraph-io/badger/v4" 8 11 "github.com/palantir/stacktrace" 9 12 ) 10 13 14 + type ExtendedDB interface { 15 + dbm.DB 16 + IteratorWithOptions(start, end []byte, opts badger.IteratorOptions) (dbm.Iterator, error) 17 + } 18 + 11 19 type Factory struct { 12 - db dbm.DB 13 - tree *iavl.MutableTree 20 + db ExtendedDB 21 + mutableTree *iavl.MutableTree 14 22 sequenceGetter func(tx Read) (uint64, error) 23 + 24 + bloomMu sync.RWMutex 25 + bloomFilter *bloom.BloomFilter 15 26 } 16 27 17 - func NewFactory(tree *iavl.MutableTree, indexDB dbm.DB, sequenceGetter func(tx Read) (uint64, error)) *Factory { 18 - return &Factory{ 28 + func NewFactory(tree *iavl.MutableTree, indexDB ExtendedDB, sequenceGetter func(tx Read) (uint64, error), bloomFilterBuilder func(tx Read) (*bloom.BloomFilter, error)) (*Factory, error) { 29 + f := &Factory{ 19 30 db: indexDB, 20 - tree: tree, 31 + mutableTree: tree, 21 32 sequenceGetter: sequenceGetter, 22 33 } 34 + 35 + var err error 36 + // if we use ReadCommitted, it's going to fail when the tree doesn't have versions yet 37 + // in practice we just want to blindly list all DIDs "past present and future", so it doesn't matter what the version is 38 + // (when doing historical queries, it's fine if the bloom filter claims that a DID already exists when it might not exist yet) 39 + f.bloomFilter, err = bloomFilterBuilder(f.ReadWorking(time.Now())) 40 + return f, stacktrace.Propagate(err, "") 23 41 } 24 42 25 43 type readTx struct { ··· 28 46 29 47 mutableTree *iavl.MutableTree // only present if upgradable 30 48 tree ReadTree 31 - db dbm.DB 49 + db ExtendedDB 50 + 51 + bloomMu *sync.RWMutex 52 + bloomFilter *bloom.BloomFilter 32 53 33 54 sequenceGetter func(tx Read) (uint64, error) 34 55 } ··· 36 57 func (f *Factory) ReadWorking(ts time.Time) Read { 37 58 return &readTx{ 38 59 ts: ts, 39 - height: f.tree.WorkingVersion(), 40 - mutableTree: f.tree, 60 + height: f.mutableTree.WorkingVersion(), 61 + mutableTree: f.mutableTree, 41 62 db: f.db, 42 63 sequenceGetter: f.sequenceGetter, 64 + bloomMu: &f.bloomMu, 65 + bloomFilter: f.bloomFilter, 43 66 } 44 67 } 45 68 46 69 func (f *Factory) ReadCommitted() Read { 47 - tx, err := f.ReadHeight(time.Now(), f.tree.Version()) 70 + tx, err := f.ReadHeight(time.Now(), f.mutableTree.Version()) 48 71 if err != nil { 49 72 // this should never happen, it's not worth making the signature of this function more 50 73 // complex for an error we'll never return unless the ABCI application is yet to be initialized ··· 54 77 } 55 78 56 79 func (f *Factory) ReadHeight(ts time.Time, height int64) (Read, error) { 57 - immutable, err := f.tree.GetImmutable(height) 80 + immutable, err := f.mutableTree.GetImmutable(height) 58 81 if err != nil { 59 82 return nil, stacktrace.Propagate(err, "") 60 83 } ··· 64 87 tree: AdaptImmutableTree(immutable), 65 88 db: f.db, 66 89 sequenceGetter: f.sequenceGetter, 90 + bloomMu: &f.bloomMu, 91 + bloomFilter: f.bloomFilter, 67 92 }, nil 68 93 } 69 94 ··· 88 113 // IndexDB implements [Read]. 89 114 func (t *readTx) IndexDB() ReadIndex { 90 115 return t.db 116 + } 117 + 118 + // TestDIDBloomFilter implements [Read]. 119 + func (t *readTx) TestDIDBloomFilter(did []byte) bool { 120 + t.bloomMu.RLock() 121 + defer t.bloomMu.RUnlock() 122 + return t.bloomFilter.Test(did) 91 123 } 92 124 93 125 // Upgrade implements [Read].
+16 -6
transaction/write_index.go
··· 8 8 9 9 "cosmossdk.io/core/store" 10 10 dbm "github.com/cometbft/cometbft-db" 11 + "github.com/dgraph-io/badger/v4" 11 12 "github.com/palantir/stacktrace" 12 13 ) 13 14 14 15 // writeIndex provides write transactions with read uncommitted behavior 15 16 type writeIndex struct { 16 17 batch dbm.Batch 17 - db dbm.DB 18 + db ExtendedDB 18 19 19 20 unsavedAdditions map[string][]byte 20 21 unsavedRemovals map[string]struct{} ··· 78 79 return v, stacktrace.Propagate(err, "") 79 80 } 80 81 82 + // IteratorWithOptions implements [WriteIndex]. 83 + func (w *writeIndex) IteratorWithOptions(start []byte, end []byte, opts badger.IteratorOptions) (dbm.Iterator, error) { 84 + v, err := newUnsavedIterator(start, end, !opts.Reverse, w.db, w.unsavedAdditions, w.unsavedRemovals, &opts) 85 + return v, stacktrace.Propagate(err, "") 86 + } 87 + 81 88 // Iterator implements [WriteIndex]. 82 89 func (w *writeIndex) Iterator(start []byte, end []byte) (dbm.Iterator, error) { 83 - v, err := newUnsavedIterator(start, end, true, w.db, w.unsavedAdditions, w.unsavedRemovals) 90 + v, err := newUnsavedIterator(start, end, true, w.db, w.unsavedAdditions, w.unsavedRemovals, nil) 84 91 return v, stacktrace.Propagate(err, "") 85 92 } 86 93 87 94 // ReverseIterator implements [WriteIndex]. 88 95 func (w *writeIndex) ReverseIterator(start []byte, end []byte) (dbm.Iterator, error) { 89 - v, err := newUnsavedIterator(start, end, false, w.db, w.unsavedAdditions, w.unsavedRemovals) 96 + v, err := newUnsavedIterator(start, end, false, w.db, w.unsavedAdditions, w.unsavedRemovals, nil) 90 97 return v, stacktrace.Propagate(err, "") 91 98 } 92 99 ··· 128 135 129 136 var _ store.Iterator = (*unsavedIterator)(nil) 130 137 131 - func newUnsavedIterator(start, end []byte, ascending bool, db dbm.DB, unsavedNodeAdditions map[string][]byte, unsavedNodeRemovals map[string]struct{}) (*unsavedIterator, error) { 138 + func newUnsavedIterator(start, end []byte, ascending bool, db ExtendedDB, unsavedNodeAdditions map[string][]byte, unsavedNodeRemovals map[string]struct{}, opts *badger.IteratorOptions) (*unsavedIterator, error) { 132 139 iter := &unsavedIterator{ 133 140 start: start, 134 141 end: end, ··· 141 148 } 142 149 143 150 var err error 144 - if ascending { 151 + if opts != nil { 152 + iter.underlyingIterator, err = db.IteratorWithOptions(start, end, *opts) 153 + iter.ascending = !opts.Reverse 154 + } else if ascending { 145 155 iter.underlyingIterator, err = db.Iterator(start, end) 146 156 } else { 147 157 iter.underlyingIterator, err = db.ReverseIterator(start, end) ··· 167 177 } 168 178 169 179 sort.Slice(iter.unsavedKeysToSort, func(i, j int) bool { 170 - if ascending { 180 + if iter.ascending { 171 181 return iter.unsavedKeysToSort[i] < iter.unsavedKeysToSort[j] 172 182 } 173 183 return iter.unsavedKeysToSort[i] > iter.unsavedKeysToSort[j]
+14
transaction/write_tx.go
··· 87 87 return w.readTx.ts 88 88 } 89 89 90 + // TestDIDBloomFilter implements [Write]. 91 + func (t *writeTx) TestDIDBloomFilter(did []byte) bool { 92 + t.readTx.bloomMu.RLock() 93 + defer t.readTx.bloomMu.RUnlock() 94 + return t.readTx.bloomFilter.Test(did) 95 + } 96 + 97 + // AddToDIDBloomFilter implements [Write]. 98 + func (t *writeTx) AddToDIDBloomFilter(did []byte) { 99 + t.readTx.bloomMu.Lock() 100 + defer t.readTx.bloomMu.Unlock() 101 + t.readTx.bloomFilter.Add(did) 102 + } 103 + 90 104 func (w *writeTx) createWriteIndexIfNeeded() { 91 105 if w.writeIndex == nil { 92 106 w.writeIndex = &writeIndex{