A very experimental PLC implementation which uses BFT consensus for decentralization

More performance improvements

- Avoid fetching next seq from tree on every op store
- Misc cleanups

gbl08ma.com d65facfb 096b0f2c

verified
+106 -97
+16 -2
abciapp/app.go
··· 80 80 } 81 81 82 82 *d.tree = *mkTree() 83 + 84 + err = d.plc.RefreshTreeData() 85 + if err != nil { 86 + return stacktrace.Propagate(err, "") 87 + } 83 88 return nil 84 89 } 85 90 ··· 151 156 // ImmutableTree implements [plc.TreeProvider]. 152 157 func (d *DIDPLCApplication) ImmutableTree(version plc.TreeVersion) (store.ReadOnlyTree, error) { 153 158 if version.IsMutable() { 154 - return store.AdaptMutableTree(d.tree), nil 159 + return d.tree, nil 155 160 } 156 161 var v int64 157 162 if version.IsCommitted() { ··· 173 178 } 174 179 175 180 // MutableTree implements [plc.TreeProvider]. 176 - func (d *DIDPLCApplication) MutableTree() (*iavl.MutableTree, error) { 181 + func (d *DIDPLCApplication) MutableTree() (store.UnifiedTree, error) { 177 182 return d.tree, nil 178 183 } 184 + 185 + func (d *DIDPLCApplication) DiscardChanges() { 186 + d.tree.Rollback() 187 + err := d.plc.RefreshTreeData() 188 + if err != nil { 189 + // TODO proper logging mechanism 190 + fmt.Println(stacktrace.Propagate(err, "DiscardChanges RefreshNextOperationSequence failed")) 191 + } 192 + }
+3 -3
abciapp/execution.go
··· 21 21 22 22 // PrepareProposal implements [types.Application]. 23 23 func (d *DIDPLCApplication) PrepareProposal(ctx context.Context, req *abcitypes.RequestPrepareProposal) (*abcitypes.ResponsePrepareProposal, error) { 24 - defer d.tree.Rollback() 24 + defer d.DiscardChanges() 25 25 26 26 if req.Height == 2 { 27 27 tx := Transaction[SetAuthoritativePlcArguments]{ ··· 114 114 if d.lastProcessedProposalHash == nil { 115 115 // we didn't vote ACCEPT 116 116 // we could rollback only eventually on FinalizeBlock, but why wait - rollback now for safety 117 - d.tree.Rollback() 117 + d.DiscardChanges() 118 118 } 119 119 }() 120 120 ··· 179 179 } 180 180 // a block other than the one we processed in ProcessProposal was decided 181 181 // discard the current modified state, and process the decided block 182 - d.tree.Rollback() 182 + d.DiscardChanges() 183 183 184 184 txResults := make([]*processResult, len(req.Txs)) 185 185 for i, tx := range req.Txs {
+4 -1
abciapp/snapshots.go
··· 171 171 172 172 err := d.snapshotApplier.Apply(int(req.Index), req.Chunk) 173 173 if err != nil { 174 - fmt.Println("SNAPSHOT APPLY FAILED:", err.Error()) 175 174 if errors.Is(err, errMalformedChunk) { 176 175 return &abcitypes.ResponseApplySnapshotChunk{ 177 176 Result: abcitypes.ResponseApplySnapshotChunk_RETRY, ··· 189 188 190 189 if d.snapshotApplier.Done() { 191 190 d.snapshotApplier = nil 191 + err := d.plc.RefreshTreeData() 192 + if err != nil { 193 + return nil, stacktrace.Propagate(err, "") 194 + } 192 195 } 193 196 194 197 return &abcitypes.ResponseApplySnapshotChunk{
+34 -6
plc/impl.go
··· 7 7 "time" 8 8 9 9 "github.com/bluesky-social/indigo/atproto/syntax" 10 - "github.com/cosmos/iavl" 11 10 "github.com/did-method-plc/go-didplc" 12 11 "github.com/palantir/stacktrace" 13 12 "github.com/samber/lo" ··· 17 16 ) 18 17 19 18 type TreeProvider interface { 20 - MutableTree() (*iavl.MutableTree, error) 19 + MutableTree() (store.UnifiedTree, error) 21 20 ImmutableTree(version TreeVersion) (store.ReadOnlyTree, error) 22 21 } 23 22 ··· 25 24 mu sync.Mutex // probably redundant, but let's keep for now 26 25 treeProvider TreeProvider 27 26 validator OperationValidator 27 + 28 + nextSeq uint64 28 29 } 29 30 30 31 var _ PLC = (*plcImpl)(nil) ··· 37 38 p.validator = NewV0OperationValidator(&inMemoryAuditLogFetcher{ 38 39 plc: p, 39 40 }) 41 + _ = p.RefreshTreeData() // to populate nextSeq 40 42 return p 41 43 } 42 44 45 + func (plc *plcImpl) RefreshTreeData() error { 46 + plc.mu.Lock() 47 + defer plc.mu.Unlock() 48 + 49 + tree, err := plc.treeProvider.MutableTree() 50 + if err != nil { 51 + return stacktrace.Propagate(err, "") 52 + } 53 + 54 + plc.nextSeq, err = store.Tree.NextOperationSequence(tree) 55 + if err != nil { 56 + return stacktrace.Propagate(err, "") 57 + } 58 + 59 + return nil 60 + } 61 + 62 + func (plc *plcImpl) incrSeqWithinMutex() uint64 { 63 + // the nextSeq has already been incremented by NextOperationSequence 64 + // so we must return before incrementing 65 + // (it is ok if we skip sequence numbers occasionally, e.g. in case of error, but let's not make it a habit) 66 + n := plc.nextSeq 67 + plc.nextSeq++ 68 + return n 69 + } 70 + 43 71 func (plc *plcImpl) ValidateOperation(ctx context.Context, atHeight TreeVersion, at time.Time, did string, opBytes []byte) error { 44 72 plc.mu.Lock() 45 73 defer plc.mu.Unlock() ··· 72 100 return stacktrace.Propagate(err, "failed to obtain mutable tree") 73 101 } 74 102 75 - err = store.Tree.StoreOperation(tree, effects.NewLogEntry, effects.NullifiedEntriesStartingIndex) 103 + err = store.Tree.StoreOperation(tree, effects.NewLogEntry, effects.NullifiedEntriesStartingIndex, plc.incrSeqWithinMutex()) 76 104 if err != nil { 77 105 return stacktrace.Propagate(err, "failed to commit operation") 78 106 } ··· 111 139 return stacktrace.Propagate(plc.importOp(ctx, tree, newEntry), "") 112 140 } 113 141 114 - func (plc *plcImpl) importOp(ctx context.Context, tree *iavl.MutableTree, newEntry didplc.LogEntry) error { 142 + func (plc *plcImpl) importOp(ctx context.Context, tree store.UnifiedTree, newEntry didplc.LogEntry) error { 115 143 newCID := newEntry.CID 116 144 newPrev := newEntry.Operation.AsOperation().PrevCIDStr() 117 145 ··· 163 191 // there's nothing to do but store the operation, no nullification involved 164 192 newEntry.Nullified = false 165 193 166 - err := store.Tree.StoreOperation(tree, newEntry, nullifiedEntriesStartingIndex) 194 + err := store.Tree.StoreOperation(tree, newEntry, nullifiedEntriesStartingIndex, plc.incrSeqWithinMutex()) 167 195 return stacktrace.Propagate(err, "failed to commit operation") 168 196 } 169 197 ··· 180 208 } 181 209 182 210 newEntry.Nullified = false 183 - err := store.Tree.StoreOperation(tree, newEntry, nullifiedEntriesStartingIndex) 211 + err := store.Tree.StoreOperation(tree, newEntry, nullifiedEntriesStartingIndex, plc.incrSeqWithinMutex()) 184 212 return stacktrace.Propagate(err, "failed to commit operation") 185 213 } 186 214
+1
plc/plc.go
··· 65 65 } 66 66 67 67 type WritePLC interface { 68 + RefreshTreeData() error 68 69 ExecuteOperation(ctx context.Context, timestamp time.Time, did string, opBytes []byte) error 69 70 ImportOperationFromAuthoritativeSource(ctx context.Context, entry didplc.LogEntry) error 70 71 ImportOperationsFromAuthoritativeSource(ctx context.Context, entries []didplc.LogEntry) error
+7 -14
plc/plc_test.go
··· 12 12 13 13 "github.com/bluesky-social/indigo/atproto/syntax" 14 14 "github.com/did-method-plc/go-didplc" 15 - "github.com/samber/lo" 16 15 "github.com/stretchr/testify/require" 17 16 "tangled.org/gbl08ma.com/didplcbft/plc" 18 17 "tangled.org/gbl08ma.com/didplcbft/types" ··· 144 143 treeProvider := NewTestTreeProvider() 145 144 testPLC := plc.NewPLC(treeProvider) 146 145 147 - tree, err := treeProvider.MutableTree() 148 - require.NoError(t, err) 149 - _, origVersion, err := tree.SaveVersion() 146 + _, origVersion, err := treeProvider.tree.SaveVersion() 150 147 require.NoError(t, err) 151 148 152 149 // resolving a unknown DID should return an error ··· 160 157 } else { 161 158 require.NoError(t, err) 162 159 } 163 - _, _, err = tree.SaveVersion() 160 + _, _, err = treeProvider.tree.SaveVersion() 164 161 require.NoError(t, err) 165 162 } 166 163 ··· 260 257 } 261 258 } 262 259 263 - _, newVersion, err := lo.Must(treeProvider.MutableTree()).SaveVersion() 260 + _, newVersion, err := treeProvider.tree.SaveVersion() 264 261 require.NoError(t, err) 265 262 266 263 for i, testDID := range testDIDs { ··· 402 399 treeProvider := NewTestTreeProvider() 403 400 testPLC := plc.NewPLC(treeProvider) 404 401 405 - tree, err := treeProvider.MutableTree() 406 - require.NoError(t, err) 407 - _, _, err = tree.SaveVersion() 402 + _, _, err := treeProvider.tree.SaveVersion() 408 403 require.NoError(t, err) 409 404 410 405 seenCIDs := map[string]struct{}{} ··· 420 415 } 421 416 } 422 417 423 - _, _, err = tree.SaveVersion() 418 + _, _, err = treeProvider.tree.SaveVersion() 424 419 require.NoError(t, err) 425 420 426 421 exportedEntries, err := testPLC.Export(ctx, plc.CommittedTreeVersion, 0, len(seenCIDs)+1) ··· 449 444 treeProvider := NewTestTreeProvider() 450 445 testPLC := plc.NewPLC(treeProvider) 451 446 452 - tree, err := treeProvider.MutableTree() 453 - require.NoError(t, err) 454 - _, _, err = tree.SaveVersion() 447 + _, _, err := treeProvider.tree.SaveVersion() 455 448 require.NoError(t, err) 456 449 457 450 for _, entry := range toImport { ··· 460 453 require.NoError(t, err) 461 454 } 462 455 463 - _, _, err = tree.SaveVersion() 456 + _, _, err = treeProvider.tree.SaveVersion() 464 457 require.NoError(t, err) 465 458 466 459 exportedEntries, err := testPLC.Export(ctx, plc.CommittedTreeVersion, 0, len(toImport)+1)
+2 -2
plc/testutil_test.go
··· 20 20 21 21 func (t *testTreeProvider) ImmutableTree(version plc.TreeVersion) (store.ReadOnlyTree, error) { 22 22 if version.IsMutable() { 23 - return store.AdaptMutableTree(t.tree), nil 23 + return t.tree, nil 24 24 } 25 25 var v int64 26 26 if version.IsCommitted() { ··· 41 41 return store.AdaptImmutableTree(it), stacktrace.Propagate(err, "") 42 42 } 43 43 44 - func (t *testTreeProvider) MutableTree() (*iavl.MutableTree, error) { 44 + func (t *testTreeProvider) MutableTree() (store.UnifiedTree, error) { 45 45 return t.tree, nil 46 46 }
+17 -53
store/iavl_adapter.go
··· 1 1 package store 2 2 3 3 import ( 4 + corestore "cosmossdk.io/core/store" 4 5 "github.com/cosmos/iavl" 5 6 ics23 "github.com/cosmos/ics23/go" 6 - "github.com/palantir/stacktrace" 7 7 ) 8 8 9 9 type ReadOnlyTree interface { 10 10 Has(key []byte) (bool, error) 11 11 Get(key []byte) ([]byte, error) 12 12 GetProof(key []byte) (*ics23.CommitmentProof, error) // won't actually work on mutable trees, but we don't need it to 13 + Iterator(start, end []byte, ascending bool) (corestore.Iterator, error) 13 14 IterateRange(start, end []byte, ascending bool, fn func(key []byte, value []byte) bool) (stopped bool) 14 15 } 15 16 16 - type mutableToUnifiedTree struct { 17 - tree *iavl.MutableTree 18 - } 19 - 20 - var _ ReadOnlyTree = (*mutableToUnifiedTree)(nil) 21 - 22 - func AdaptMutableTree(tree *iavl.MutableTree) ReadOnlyTree { 23 - return &mutableToUnifiedTree{ 24 - tree: tree, 25 - } 26 - } 27 - 28 - // Has implements [ReadOnlyTree]. 29 - func (m *mutableToUnifiedTree) Has(key []byte) (bool, error) { 30 - return m.tree.Has(key) 31 - } 32 - 33 - // Get implements [ReadOnlyTree]. 34 - func (m *mutableToUnifiedTree) Get(key []byte) ([]byte, error) { 35 - return m.tree.Get(key) 36 - } 37 - 38 - // GetProof implements [ReadOnlyTree]. 39 - func (m *mutableToUnifiedTree) GetProof(key []byte) (*ics23.CommitmentProof, error) { 40 - return nil, stacktrace.NewError("proof calculation not possible over mutable tree") 41 - } 42 - 43 - // IterateRange implements [ReadOnlyTree]. 44 - func (m *mutableToUnifiedTree) IterateRange(start []byte, end []byte, ascending bool, fn func(key []byte, value []byte) bool) (stopped bool) { 45 - // it might look like MutableTree implements IterateRange but it doesn't, 46 - // most iteration methods actually come from the embedded ImmutableTree we're not meant to use 47 - // (terrible API) 48 - itr, err := m.tree.Iterator(start, end, ascending) 49 - if err != nil { 50 - return false 51 - } 52 - 53 - defer itr.Close() 54 - 55 - for ; itr.Valid(); itr.Next() { 56 - if fn(itr.Key(), itr.Value()) { 57 - return true 58 - } 59 - } 60 - return false 17 + type UnifiedTree interface { 18 + ReadOnlyTree 19 + Set(key, value []byte) (bool, error) 61 20 } 62 21 63 - type immutableToUnifiedTree struct { 22 + type immutableToReadOnlyTree struct { 64 23 tree *iavl.ImmutableTree 65 24 } 66 25 67 - var _ ReadOnlyTree = (*immutableToUnifiedTree)(nil) 26 + var _ ReadOnlyTree = (*immutableToReadOnlyTree)(nil) 68 27 69 28 func AdaptImmutableTree(tree *iavl.ImmutableTree) ReadOnlyTree { 70 - return &immutableToUnifiedTree{ 29 + return &immutableToReadOnlyTree{ 71 30 tree: tree, 72 31 } 73 32 } 74 33 75 34 // Has implements [ReadOnlyTree]. 76 - func (i *immutableToUnifiedTree) Has(key []byte) (bool, error) { 35 + func (i *immutableToReadOnlyTree) Has(key []byte) (bool, error) { 77 36 return i.tree.Has(key) 78 37 } 79 38 80 39 // Get implements [ReadOnlyTree]. 81 - func (i *immutableToUnifiedTree) Get(key []byte) ([]byte, error) { 40 + func (i *immutableToReadOnlyTree) Get(key []byte) ([]byte, error) { 82 41 return i.tree.Get(key) 83 42 } 84 43 85 44 // GetProof implements [ReadOnlyTree]. 86 - func (i *immutableToUnifiedTree) GetProof(key []byte) (*ics23.CommitmentProof, error) { 45 + func (i *immutableToReadOnlyTree) GetProof(key []byte) (*ics23.CommitmentProof, error) { 87 46 return i.tree.GetProof(key) 88 47 } 89 48 90 49 // IterateRange implements [ReadOnlyTree]. 91 - func (i *immutableToUnifiedTree) IterateRange(start []byte, end []byte, ascending bool, fn func(key []byte, value []byte) bool) (stopped bool) { 50 + func (i *immutableToReadOnlyTree) IterateRange(start []byte, end []byte, ascending bool, fn func(key []byte, value []byte) bool) (stopped bool) { 92 51 return i.tree.IterateRange(start, end, ascending, fn) 93 52 } 53 + 54 + // Iterator implements [ReadOnlyTree]. 55 + func (m *immutableToReadOnlyTree) Iterator(start, end []byte, ascending bool) (corestore.Iterator, error) { 56 + return m.tree.Iterator(start, end, ascending) 57 + }
+22 -16
store/tree.go
··· 11 11 "time" 12 12 13 13 "github.com/bluesky-social/indigo/atproto/syntax" 14 - "github.com/cosmos/iavl" 15 14 ics23 "github.com/cosmos/ics23/go" 16 15 "github.com/did-method-plc/go-didplc" 17 16 cbornode "github.com/ipfs/go-ipld-cbor" ··· 28 27 AuditLog(ctx context.Context, tree ReadOnlyTree, did string, withProof bool) ([]types.SequencedLogEntry, *ics23.CommitmentProof, error) 29 28 AuditLogReverseIterator(ctx context.Context, tree ReadOnlyTree, did string, err *error) iter.Seq2[int, types.SequencedLogEntry] 30 29 ExportOperations(ctx context.Context, tree ReadOnlyTree, after uint64, count int) ([]types.SequencedLogEntry, error) // passing a count of zero means unlimited 31 - StoreOperation(tree *iavl.MutableTree, entry didplc.LogEntry, nullifyWithIndexEqualOrGreaterThan mo.Option[int]) error 32 - SetOperationCreatedAt(tree *iavl.MutableTree, seqID uint64, createdAt time.Time) error 30 + StoreOperation(tree UnifiedTree, entry didplc.LogEntry, nullifyWithIndexEqualOrGreaterThan mo.Option[int], sequence uint64) error 31 + SetOperationCreatedAt(tree UnifiedTree, seqID uint64, createdAt time.Time) error 32 + 33 + NextOperationSequence(tree ReadOnlyTree) (uint64, error) 33 34 34 35 AuthoritativePLC(tree ReadOnlyTree) (string, error) 35 - SetAuthoritativePLC(tree *iavl.MutableTree, url string) error 36 + SetAuthoritativePLC(tree UnifiedTree, url string) error 36 37 37 38 AuthoritativeImportProgress(tree ReadOnlyTree) (uint64, error) 38 - SetAuthoritativeImportProgress(tree *iavl.MutableTree, nextCursor uint64) error 39 + SetAuthoritativeImportProgress(tree UnifiedTree, nextCursor uint64) error 39 40 } 40 41 41 42 var _ PLCTreeStore = (*TreeStore)(nil) ··· 188 189 return entries, nil 189 190 } 190 191 191 - func (t *TreeStore) StoreOperation(tree *iavl.MutableTree, entry didplc.LogEntry, nullifyWithIndexEqualOrGreaterThan mo.Option[int]) error { 192 + // StoreOperation stores an operation in the tree, nullifying existing operations whose index within the DID's history 193 + // is lower or equal to the specified optional integer. 194 + // 195 + // Even though this function is not meant to overwrite operations (it will error) we ask the caller to provide the sequence 196 + // The caller is responsible for managing the sequence and invalidating it when needed (e.g. after a rollback) using 197 + // [[TreeStore.NextOperationSequence]]. 198 + // Pushing the responsibility to the caller is preferable in terms of performance, even if it leads to less safe code, 199 + // because getting the sequence from the tree within this function every time has a significant performance hit 200 + func (t *TreeStore) StoreOperation(tree UnifiedTree, entry didplc.LogEntry, nullifyWithIndexEqualOrGreaterThan mo.Option[int], sequence uint64) error { 192 201 didBytes, err := DIDToBytes(entry.DID) 193 202 if err != nil { 194 203 return stacktrace.Propagate(err, "") ··· 230 239 return stacktrace.Propagate(err, "invalid CreatedAt") 231 240 } 232 241 233 - seq, err := getNextSeqID(tree) 234 - if err != nil { 235 - return stacktrace.Propagate(err, "") 236 - } 237 - 238 242 operation := entry.Operation.AsOperation() 239 - opKey := marshalOperationKey(seq) 243 + opKey := marshalOperationKey(sequence) 240 244 opValue := marshalOperationValue(entry.Nullified, didBytes, opDatetime.Time(), operation) 241 245 242 246 updated, err := tree.Set(opKey, opValue) ··· 244 248 return stacktrace.Propagate(err, "") 245 249 } 246 250 if updated { 251 + // if we get to this point we have a mistake in our program, and the data is now inconsistent 252 + // we are not supposed to be able to recover from this error without rolling back the tree 247 253 return stacktrace.NewError("expected to be writing to a new operation key but updated instead") 248 254 } 249 255 ··· 256 262 return nil 257 263 } 258 264 259 - func (t *TreeStore) SetOperationCreatedAt(tree *iavl.MutableTree, seqID uint64, createdAt time.Time) error { 265 + func (t *TreeStore) SetOperationCreatedAt(tree UnifiedTree, seqID uint64, createdAt time.Time) error { 260 266 opKey := marshalOperationKey(seqID) 261 267 262 268 opValue, err := tree.Get(opKey) ··· 279 285 var minOperationKey = marshalOperationKey(0) 280 286 var maxOperationKey = marshalOperationKey(math.MaxInt64) 281 287 282 - func getNextSeqID(tree *iavl.MutableTree) (uint64, error) { 288 + func (t *TreeStore) NextOperationSequence(tree ReadOnlyTree) (uint64, error) { 283 289 seq := uint64(0) 284 290 285 291 itr, err := tree.Iterator(minOperationKey, maxOperationKey, false) ··· 472 478 return string(url), nil 473 479 } 474 480 475 - func (t *TreeStore) SetAuthoritativePLC(tree *iavl.MutableTree, url string) error { 481 + func (t *TreeStore) SetAuthoritativePLC(tree UnifiedTree, url string) error { 476 482 _, err := tree.Set([]byte("aPLCURL"), []byte(url)) 477 483 return stacktrace.Propagate(err, "") 478 484 } ··· 488 494 return binary.BigEndian.Uint64(progBytes), nil 489 495 } 490 496 491 - func (t *TreeStore) SetAuthoritativeImportProgress(tree *iavl.MutableTree, nextCursor uint64) error { 497 + func (t *TreeStore) SetAuthoritativeImportProgress(tree UnifiedTree, nextCursor uint64) error { 492 498 value := make([]byte, 8) 493 499 binary.BigEndian.PutUint64(value, nextCursor) 494 500