A very experimental PLC implementation which uses BFT consensus for decentralization

Continue work on range challenges

gbl08ma.com 1fd7d6f9 686b6364

verified
+258 -65
+16 -10
abciapp/app.go
··· 9 9 10 10 dbm "github.com/cometbft/cometbft-db" 11 11 abcitypes "github.com/cometbft/cometbft/abci/types" 12 + "github.com/cometbft/cometbft/crypto" 13 + "github.com/cometbft/cometbft/privval" 12 14 bftstore "github.com/cometbft/cometbft/store" 13 - "github.com/cometbft/cometbft/types" 14 15 "github.com/cosmos/iavl" 15 16 "github.com/klauspost/compress/zstd" 16 17 "github.com/palantir/stacktrace" ··· 30 31 tree *iavl.MutableTree 31 32 fullyClearApplicationData func() error 32 33 33 - privValidator types.PrivValidator 34 + validatorPubKey crypto.PubKey 35 + validatorPrivKey crypto.PrivKey 34 36 35 37 ongoingRead transaction.Read 36 38 ongoingWrite transaction.Write ··· 42 44 lastProcessedProposalExecTxResults []*processResult 43 45 44 46 aocsByPLC map[string]*authoritativeOperationsCache 47 + 48 + blockStore *bftstore.BlockStore 45 49 46 50 blockChallengeCoordinator *blockChallengeCoordinator 47 51 rangeChallengeCoordinator *rangeChallengeCoordinator 48 52 } 49 53 50 54 // store and plc must be able to share transaction objects 51 - func NewDIDPLCApplication(pv types.PrivValidator, treeDB dbm.DB, indexDB transaction.ExtendedDB, clearData func(), snapshotDirectory, didBloomFilterPath string) (*DIDPLCApplication, *transaction.Factory, plc.PLC, func(), error) { 55 + func NewDIDPLCApplication(pv *privval.FilePV, treeDB dbm.DB, indexDB transaction.ExtendedDB, clearData func(), snapshotDirectory, didBloomFilterPath string) (*DIDPLCApplication, *transaction.Factory, plc.PLC, func(), error) { 52 56 mkTree := func() *iavl.MutableTree { 53 57 // Using SpeedDefault appears to cause the processing time for ExecuteOperation to double on average 54 58 // Using SpeedBetterCompression appears to cause the processing time to double again ··· 74 78 75 79 d := &DIDPLCApplication{ 76 80 runnerContext: runnerContext, 77 - privValidator: pv, 78 81 tree: tree, 79 82 indexDB: indexDB, 80 83 snapshotDirectory: snapshotDirectory, 81 84 aocsByPLC: make(map[string]*authoritativeOperationsCache), 85 + } 86 + 87 + if pv != nil { 88 + d.validatorPubKey = pv.Key.PubKey 89 + d.validatorPrivKey = pv.Key.PrivKey 82 90 } 83 91 84 92 d.txFactory, err = transaction.NewFactory(tree, indexDB, store.Consensus.CountOperations, store.NewDIDBloomFilterStore(didBloomFilterPath)) ··· 185 193 } 186 194 187 195 func (d *DIDPLCApplication) FinishInitializing(blockStore *bftstore.BlockStore) error { 188 - pubKey, err := d.privValidator.GetPubKey() 189 - if err != nil { 190 - return stacktrace.Propagate(err, "") 191 - } 196 + d.blockStore = blockStore 192 197 193 - d.blockChallengeCoordinator, err = newBlockChallengeCoordinator(d.runnerContext, d.txFactory, blockStore, pubKey) 198 + var err error 199 + d.blockChallengeCoordinator, err = newBlockChallengeCoordinator(d.runnerContext, d.txFactory, blockStore, d.validatorPubKey) 194 200 if err != nil { 195 201 return stacktrace.Propagate(err, "") 196 202 } 197 203 198 - d.rangeChallengeCoordinator, err = newRangeChallengeCoordinator(d.runnerContext, d.txFactory, blockStore, pubKey) 204 + d.rangeChallengeCoordinator, err = newRangeChallengeCoordinator(d.runnerContext, d.txFactory, blockStore, d.validatorPubKey, d.validatorPrivKey) 199 205 if err != nil { 200 206 return stacktrace.Propagate(err, "") 201 207 }
+21 -11
abciapp/block_challenge.go
··· 58 58 59 59 runnerContext context.Context 60 60 61 - validatorAddress []byte 62 - txFactory *transaction.Factory 63 - nodeBlockStore *bftstore.BlockStore 61 + isConfiguredToBeValidator bool 62 + validatorAddress []byte 63 + txFactory *transaction.Factory 64 + nodeBlockStore *bftstore.BlockStore 64 65 65 66 sharedWitnessDataCache *theine.LoadingCache[int64, proof.BlockChallengeCircuit] 66 67 } 67 68 68 69 func newBlockChallengeCoordinator(runnerContext context.Context, txFactory *transaction.Factory, blockStore *bftstore.BlockStore, pubKey crypto.PubKey) (*blockChallengeCoordinator, error) { 69 70 c := &blockChallengeCoordinator{ 70 - runnerContext: runnerContext, 71 - txFactory: txFactory, 72 - nodeBlockStore: blockStore, 73 - validatorAddress: pubKey.Address(), 71 + runnerContext: runnerContext, 72 + txFactory: txFactory, 73 + nodeBlockStore: blockStore, 74 + isConfiguredToBeValidator: pubKey != nil, 75 + } 76 + if c.isConfiguredToBeValidator { 77 + c.validatorAddress = pubKey.Address() 74 78 } 75 79 76 80 var err error ··· 116 120 // the proofs should be stable regardless of how many proposal rounds we go through, because they only use data from C-1 117 121 118 122 func (c *blockChallengeCoordinator) notifyOfIncomingBlockHeight(height int64) { 123 + if !c.isConfiguredToBeValidator { 124 + return 125 + } 119 126 go func() { 120 127 _, err := c.loadOrComputeBlockChallengeProof(c.runnerContext, height) 121 128 if err != nil { ··· 125 132 } 126 133 127 134 func (c *blockChallengeCoordinator) loadOrComputeBlockChallengeProof(ctx context.Context, height int64) ([]byte, error) { 135 + if !c.isConfiguredToBeValidator { 136 + return nil, stacktrace.NewError("node is not configured to be a validator") 137 + } 128 138 proof, _, err := c.g.Do(ctx, height, func(ctx context.Context) ([]byte, error) { 129 139 // we need to read the tree as it was on the block prior. We assume that this method will only be called in the processing of the latest block 130 140 // and we validate this assumption ··· 242 252 if height <= 1 { 243 253 return make([]byte, proof.OperationDataLength), make([]byte, 32), nil 244 254 } 245 - lastBlock := c.nodeBlockStore.LoadBlock(height - 1) 246 - if lastBlock == nil { 255 + lastBlockMeta := c.nodeBlockStore.LoadBlockMeta(height - 1) 256 + if lastBlockMeta == nil { 247 257 return nil, nil, stacktrace.NewError("height not found") 248 258 } 249 259 250 - lastCommitHash := lastBlock.LastCommitHash 260 + lastCommitHash := lastBlockMeta.Header.LastCommitHash 251 261 lastCommitHashBigInt := big.NewInt(0).SetBytes(lastCommitHash) 252 262 253 263 highestOp, err := tx.CountOperations() ··· 276 286 } 277 287 } 278 288 279 - return operationData[lastCommitHash[0]:], lastBlock.LastCommitHash, nil 289 + return operationData[lastCommitHash[0]:], lastBlockMeta.Header.LastCommitHash, nil 280 290 } 281 291 282 292 func (c *blockChallengeCoordinator) fetchOrBuildBlockChallengeCircuitAssignmentShared(tx transaction.Read, height int64) (proof.BlockChallengeCircuit, error) {
+20 -18
abciapp/execution.go
··· 126 126 } 127 127 }() 128 128 129 - if d.privValidator != nil { 130 - d.blockChallengeCoordinator.notifyOfIncomingBlockHeight(req.Height) 131 - } 129 + d.blockChallengeCoordinator.notifyOfIncomingBlockHeight(req.Height) 132 130 133 131 txResults := make([]*processResult, len(req.Txs)) 134 132 for i, tx := range req.Txs { ··· 167 165 168 166 // ExtendVote implements [types.Application]. 169 167 func (d *DIDPLCApplication) ExtendVote(ctx context.Context, req *abcitypes.RequestExtendVote) (*abcitypes.ResponseExtendVote, error) { 170 - if d.privValidator == nil { 171 - // we are not meant to be performing validator duties... 172 - return nil, stacktrace.NewError("unexpected ExtendVote call: node is not configured to be a validator") 173 - } 174 - 175 168 proof, err := d.blockChallengeCoordinator.loadOrComputeBlockChallengeProof(ctx, req.Height) 176 169 if err != nil { 177 170 return nil, stacktrace.Propagate(err, "") ··· 255 248 } 256 249 257 250 func (d *DIDPLCApplication) transactionProcessorDependenciesForCommittedRead() TransactionProcessorDependencies { 251 + readTx := d.txFactory.ReadCommitted() 258 252 return TransactionProcessorDependencies{ 259 - runnerContext: d.runnerContext, 260 - plc: d.plc, 261 - readTx: d.txFactory.ReadCommitted(), 262 - writeTx: mo.None[transaction.Write](), 263 - aocsByPLC: d.aocsByPLC, 253 + runnerContext: d.runnerContext, 254 + workingHeight: readTx.Height() + 1, 255 + plc: d.plc, 256 + readTx: readTx, 257 + writeTx: mo.None[transaction.Write](), 258 + aocsByPLC: d.aocsByPLC, 259 + blockChallengeCoordinator: d.blockChallengeCoordinator, 260 + rangeChallengeCoordinator: d.rangeChallengeCoordinator, 261 + blockStore: d.blockStore, 264 262 } 265 263 } 266 264 ··· 274 272 } 275 273 276 274 return TransactionProcessorDependencies{ 277 - runnerContext: d.runnerContext, 278 - plc: d.plc, 279 - readTx: d.ongoingRead, 280 - writeTx: writeTx, 281 - aocsByPLC: d.aocsByPLC, 275 + runnerContext: d.runnerContext, 276 + workingHeight: d.ongoingRead.Height(), 277 + readTx: d.ongoingRead, 278 + writeTx: writeTx, 279 + plc: d.plc, 280 + aocsByPLC: d.aocsByPLC, 281 + blockChallengeCoordinator: d.blockChallengeCoordinator, 282 + rangeChallengeCoordinator: d.rangeChallengeCoordinator, 283 + blockStore: d.blockStore, 282 284 } 283 285 } 284 286
+30 -9
abciapp/range_challenge.go
··· 4 4 "bytes" 5 5 "context" 6 6 "encoding/binary" 7 + "math/big" 7 8 "slices" 8 9 9 10 "github.com/Yiling-J/theine-go" ··· 20 21 type rangeChallengeCoordinator struct { 21 22 runnerContext context.Context 22 23 23 - validatorAddress []byte 24 - txFactory *transaction.Factory 25 - nodeBlockStore *bftstore.BlockStore 24 + isConfiguredToBeValidator bool 25 + validatorPubKey crypto.PubKey 26 + validatorPrivKey crypto.PrivKey 27 + validatorAddress []byte 28 + txFactory *transaction.Factory 29 + nodeBlockStore *bftstore.BlockStore 26 30 27 31 treeCache *theine.LoadingCache[treeCacheKey, cachedTree] 28 32 } 29 33 30 - func newRangeChallengeCoordinator(runnerContext context.Context, txFactory *transaction.Factory, blockStore *bftstore.BlockStore, pubKey crypto.PubKey) (*rangeChallengeCoordinator, error) { 34 + func newRangeChallengeCoordinator(runnerContext context.Context, txFactory *transaction.Factory, blockStore *bftstore.BlockStore, pubKey crypto.PubKey, privKey crypto.PrivKey) (*rangeChallengeCoordinator, error) { 31 35 c := &rangeChallengeCoordinator{ 32 - txFactory: txFactory, 33 - runnerContext: runnerContext, 34 - nodeBlockStore: blockStore, 35 - validatorAddress: pubKey.Address(), 36 + txFactory: txFactory, 37 + runnerContext: runnerContext, 38 + nodeBlockStore: blockStore, 39 + isConfiguredToBeValidator: pubKey != nil, 40 + validatorPubKey: pubKey, 41 + validatorPrivKey: privKey, 42 + } 43 + if c.isConfiguredToBeValidator { 44 + c.validatorAddress = pubKey.Address() 36 45 } 37 46 38 47 var err error ··· 82 91 }, nil 83 92 } 84 93 85 - func verifyMembershipOfRangeChallengeProofs(ctx context.Context, proofs ...rangeChallengeProof) (bool, error) { 94 + func verifyMembershipOfRangeChallengeProofs(proofs ...rangeChallengeProof) (bool, error) { 86 95 if len(proofs) < 1 { 87 96 return false, stacktrace.NewError("insufficient proofs") 88 97 } ··· 106 115 } 107 116 108 117 return true, nil 118 + } 119 + 120 + func computeHeightToProveInRange(lastCommitHash, validatorAddress []byte, fromHeight, toHeight int64) uint64 { 121 + lastCommitHashBigInt := new(big.Int).SetBytes(lastCommitHash) 122 + validatorBigInt := new(big.Int).SetBytes(validatorAddress) 123 + seed := new(big.Int).Xor(lastCommitHashBigInt, validatorBigInt) 124 + 125 + numBlocks := toHeight - fromHeight + 1 126 + 127 + randOffset := new(big.Int).Mod(seed, big.NewInt(numBlocks)) 128 + 129 + return uint64(fromHeight) + randOffset.Uint64() 109 130 } 110 131 111 132 func (c *rangeChallengeCoordinator) proofTreeLoader(ctx context.Context, cacheKey treeCacheKey) (theine.Loaded[cachedTree], error) {
+50 -5
abciapp/tx.go
··· 6 6 7 7 abcitypes "github.com/cometbft/cometbft/abci/types" 8 8 "github.com/cometbft/cometbft/crypto" 9 + "github.com/cometbft/cometbft/crypto/ed25519" 10 + "github.com/cometbft/cometbft/crypto/secp256k1" 11 + bftstore "github.com/cometbft/cometbft/store" 12 + cmttypes "github.com/cometbft/cometbft/types" 9 13 cbornode "github.com/ipfs/go-ipld-cbor" 10 14 "github.com/palantir/stacktrace" 11 15 "github.com/samber/mo" ··· 20 24 type TransactionAction string 21 25 22 26 type TransactionProcessorDependencies struct { 23 - runnerContext context.Context 24 - readTx transaction.Read 25 - writeTx mo.Option[transaction.Write] 26 - plc plc.PLC 27 - aocsByPLC map[string]*authoritativeOperationsCache 27 + runnerContext context.Context 28 + workingHeight int64 29 + readTx transaction.Read 30 + writeTx mo.Option[transaction.Write] 31 + plc plc.PLC 32 + aocsByPLC map[string]*authoritativeOperationsCache 33 + blockChallengeCoordinator *blockChallengeCoordinator 34 + rangeChallengeCoordinator *rangeChallengeCoordinator 35 + blockStore *bftstore.BlockStore 28 36 } 29 37 30 38 type TransactionProcessor func(ctx context.Context, deps TransactionProcessorDependencies, txBytes []byte) (*processResult, error) ··· 89 97 return bytes.Equal(txBytes, s) 90 98 } 91 99 100 + type PubKeyInArguments struct { 101 + Type string `json:"type" refmt:"type"` 102 + Key []byte `json:"key" refmt:"key"` 103 + } 104 + 92 105 func SignTransaction[ArgType ArgumentType](privKey crypto.PrivKey, tx Transaction[ArgType]) (Transaction[ArgType], error) { 93 106 var zeroValue Transaction[ArgType] 94 107 ··· 117 130 } 118 131 119 132 return publicKey.VerifySignature(bytesToSign, sig), nil 133 + } 134 + 135 + func VerifyTransactionSignatureWithMarshalledPubKey[ArgType ArgumentType](publicKey PubKeyInArguments, tx Transaction[ArgType]) (bool, crypto.PubKey, error) { 136 + sig := tx.Signature 137 + tx.Signature = nil 138 + 139 + bytesToSign, err := cbornode.DumpObject(tx) 140 + if err != nil { 141 + return false, nil, stacktrace.Propagate(err, "") 142 + } 143 + 144 + var pubKey crypto.PubKey 145 + switch publicKey.Type { 146 + case cmttypes.ABCIPubKeyTypeEd25519: 147 + pubKey = ed25519.PubKey(publicKey.Key) 148 + case cmttypes.ABCIPubKeyTypeSecp256k1: 149 + pubKey = secp256k1.PubKey(publicKey.Key) 150 + } 151 + 152 + return pubKey.VerifySignature(bytesToSign, sig), pubKey, nil 153 + } 154 + 155 + func MarshalPubKeyForArguments(pubKey crypto.PubKey) (PubKeyInArguments, error) { 156 + keyTypeName, ok := cmttypes.ABCIPubKeyTypesToNames[pubKey.Type()] 157 + if !ok { 158 + return PubKeyInArguments{}, stacktrace.NewError("unsupported key type") 159 + } 160 + 161 + return PubKeyInArguments{ 162 + Type: keyTypeName, 163 + Key: pubKey.Bytes(), 164 + }, nil 120 165 } 121 166 122 167 type processResult struct {
+121 -12
abciapp/tx_challenge.go
··· 2 2 3 3 import ( 4 4 "context" 5 + "encoding/binary" 6 + "time" 5 7 6 - "github.com/cometbft/cometbft/crypto" 7 - cmtjson "github.com/cometbft/cometbft/libs/json" 8 + ics23 "github.com/cosmos/ics23/go" 8 9 cbornode "github.com/ipfs/go-ipld-cbor" 9 10 "github.com/palantir/stacktrace" 11 + "github.com/samber/lo" 10 12 ) 11 13 12 14 var TransactionActionCommitToChallenge = registerTransactionAction[CommitToChallengeArguments]("CommitToChallenge", processCommitToChallengeTx) 13 15 16 + const CommitToChallengeMaxAgeInBlocks = 3 17 + const CommitToChallengeMinRange = 1000 18 + const CommitToChallengeMaxRange = 10000 19 + 20 + var CommitToChallengeMaxAge = lo.Must(time.ParseDuration("10s")) 21 + 14 22 type CommitToChallengeArguments struct { 15 - ValidatorPubKey []byte `json:"validator" refmt:"validator"` 16 - FromHeight int64 `json:"fromHeight" refmt:"fromHeight"` 23 + ValidatorPubKey PubKeyInArguments `json:"validator" refmt:"validator"` 24 + FromHeight int64 `json:"fromHeight" refmt:"fromHeight"` 17 25 18 26 // ToHeight should not be more than N blocks behind the block where this transaction is included 19 27 // (i.e. this transaction "expires" once ToHeight is more than N blocks old, and shouldn't be included after that) 20 - // TODO determine N 21 28 ToHeight int64 `json:"toHeight" refmt:"toHeight"` 22 29 Root []byte `json:"root" refmt:"root"` 23 30 Proof []byte `json:"proof" refmt:"proof"` ··· 41 48 }, nil 42 49 } 43 50 44 - var pubKey crypto.PubKey 45 - err = cmtjson.Unmarshal(tx.Arguments.ValidatorPubKey, &pubKey) 46 - 47 - verified, err := VerifyTransactionSignature(pubKey, tx) 51 + verified, validatorPubKey, err := VerifyTransactionSignatureWithMarshalledPubKey(tx.Arguments.ValidatorPubKey, tx) 48 52 if err != nil { 49 53 return nil, stacktrace.Propagate(err, "") 50 54 } ··· 56 60 }, nil 57 61 } 58 62 59 - return nil, stacktrace.NewError("not implemented") // TODO 63 + validatorAddress := validatorPubKey.Address() 64 + 65 + if tx.Arguments.ToHeight < tx.Arguments.FromHeight { 66 + return &processResult{ 67 + Code: 4201, 68 + Info: "invalid challenge range", 69 + }, nil 70 + } 71 + 72 + rangeSize := tx.Arguments.ToHeight - tx.Arguments.FromHeight + 1 73 + if rangeSize < CommitToChallengeMinRange { 74 + return &processResult{ 75 + Code: 4202, 76 + Info: "insufficient challenge range", 77 + }, nil 78 + } 79 + if rangeSize > CommitToChallengeMaxRange { 80 + return &processResult{ 81 + Code: 4203, 82 + Info: "excessive challenge range", 83 + }, nil 84 + } 85 + 86 + if tx.Arguments.ToHeight+CommitToChallengeMaxAgeInBlocks < deps.workingHeight { 87 + return &processResult{ 88 + Code: 4204, 89 + Info: "outdated challenge range", 90 + }, nil 91 + } 92 + 93 + toHeightBlockMeta := deps.blockStore.LoadBlockMeta(tx.Arguments.ToHeight) 94 + if toHeightBlockMeta == nil { 95 + return &processResult{ 96 + Code: 4205, 97 + Info: "unknown block in challenge range", 98 + }, nil 99 + } 100 + 101 + if time.Since(toHeightBlockMeta.Header.Time) > CommitToChallengeMaxAge { 102 + return &processResult{ 103 + Code: 4206, 104 + Info: "outdated challenge range", 105 + }, nil 106 + } 107 + 108 + proof := new(ics23.CommitmentProof) 109 + err = proof.Unmarshal(tx.Arguments.Proof) 110 + if err != nil || proof.GetExist() == nil { 111 + return &processResult{ 112 + Code: 4207, 113 + Info: "invalid proof", 114 + }, nil 115 + } 116 + existenceProof := proof.GetExist() 117 + 118 + proofHeight := binary.BigEndian.Uint64(existenceProof.Key) 119 + 120 + expectedProofHeight := computeHeightToProveInRange( 121 + toHeightBlockMeta.Header.LastCommitHash.Bytes(), 122 + validatorAddress.Bytes(), 123 + tx.Arguments.FromHeight, 124 + tx.Arguments.ToHeight) 125 + 126 + if proofHeight != expectedProofHeight { 127 + return &processResult{ 128 + Code: 4208, 129 + Info: "invalid proof", 130 + }, nil 131 + } 132 + 133 + blockProofValid, err := deps.blockChallengeCoordinator.verifyBlockChallengeProof(int64(proofHeight), validatorAddress, existenceProof.Value) 134 + if err != nil { 135 + return nil, stacktrace.Propagate(err, "") 136 + } 137 + if !blockProofValid { 138 + return &processResult{ 139 + Code: 4209, 140 + Info: "invalid proof", 141 + }, nil 142 + } 143 + 144 + rangeProof := rangeChallengeProof{ 145 + treeRoot: tx.Arguments.Root, 146 + membershipProof: proof, 147 + } 148 + 149 + rangeProofValid, err := verifyMembershipOfRangeChallengeProofs(rangeProof) 150 + if err != nil { 151 + return nil, stacktrace.Propagate(err, "") 152 + } 153 + if !rangeProofValid { 154 + return &processResult{ 155 + Code: 4210, 156 + Info: "invalid proof", 157 + }, nil 158 + } 159 + 160 + if writeTx, ok := deps.writeTx.Get(); ok { 161 + // TODO set challenge commit state for this validator in tree 162 + _ = writeTx 163 + return nil, stacktrace.NewError("not implemented") // TODO 164 + } 165 + 166 + return &processResult{ 167 + Code: 0, 168 + }, nil 60 169 } 61 170 62 - var TransactionActionCompleteChallenge = registerTransactionAction[CommitToChallengeArguments]("CompleteChallenge", processCompleteChallengeTx) 171 + var TransactionActionCompleteChallenge = registerTransactionAction[CompleteChallengeArguments]("CompleteChallenge", processCompleteChallengeTx) 63 172 64 173 type CompleteChallengeArguments struct { 65 174 Validator []byte `json:"validator" refmt:"validator"` ··· 73 182 } 74 183 75 184 func (CompleteChallengeArguments) ForAction() TransactionAction { 76 - return TransactionActionCommitToChallenge 185 + return TransactionActionCompleteChallenge 77 186 } 78 187 79 188 func init() {