A very experimental PLC implementation which uses BFT consensus for decentralization

Begin work on epoch transactions and automated validator set management

gbl08ma.com 244a26e5 d5732b12

verified
+1102 -250
+45 -18
abciapp/app.go
··· 8 9 dbm "github.com/cometbft/cometbft-db" 10 abcitypes "github.com/cometbft/cometbft/abci/types" 11 "github.com/cometbft/cometbft/crypto" 12 cmtlog "github.com/cometbft/cometbft/libs/log" 13 "github.com/cometbft/cometbft/privval" 14 - bftstore "github.com/cometbft/cometbft/store" 15 "github.com/cosmos/iavl" 16 "github.com/gbl08ma/stacktrace" 17 "github.com/klauspost/compress/zstd" ··· 50 51 aoc *authoritativeOperationsFetcher 52 53 - blockStore *bftstore.BlockStore 54 triggerBlockCreation func() 55 56 blockChallengeCoordinator *blockChallengeCoordinator 57 } 58 59 // store and plc must be able to share transaction objects 60 - func NewDIDPLCApplication(appContext context.Context, logger cmtlog.Logger, pv *privval.FilePV, treeDB dbm.DB, indexDB transaction.ExtendedDB, clearData func(), snapshotDirectory, stateSyncTempDir, didBloomFilterPath string, mempoolSubmitter types.MempoolSubmitter) (*DIDPLCApplication, *transaction.Factory, plc.PLC, func(), error) { 61 mkTree := func() *iavl.MutableTree { 62 // Using SpeedDefault appears to cause the processing time for ExecuteOperation to double on average 63 // Using SpeedBetterCompression appears to cause the processing time to double again ··· 89 runnerContext, cancelRunnerContext := context.WithCancel(appContext) 90 91 d := &DIDPLCApplication{ 92 - runnerContext: runnerContext, 93 - logger: logger.With("module", "plcapp"), 94 - tree: tree, 95 - indexDB: indexDB, 96 - mempoolSubmitter: mempoolSubmitter, 97 - snapshotDirectory: snapshotDirectory, 98 - stateSyncTempDir: stateSyncTempDir, 99 } 100 101 if pv != nil { ··· 130 } 131 132 d.plc = plc.NewPLC() 133 134 var wg sync.WaitGroup 135 wg.Go(func() { ··· 203 204 return d, d.txFactory, d.plc, func() { 205 cancelRunnerContext() 206 wg.Wait() 207 lo.Must0(d.tree.Close()) 208 }, nil ··· 220 } 221 } 222 223 - func (d *DIDPLCApplication) FinishInitializing(blockStore *bftstore.BlockStore, triggerBlockCreation func()) error { 224 - d.blockStore = blockStore 225 d.triggerBlockCreation = triggerBlockCreation 226 227 - var err error 228 - d.blockChallengeCoordinator, err = newBlockChallengeCoordinator(d.runnerContext, d.logger, d.txFactory, blockStore, d.validatorPubKey) 229 if err != nil { 230 return stacktrace.Propagate(err) 231 } 232 233 - // ensure we resume importing even if there is no pending AuthoritativeImport tx 234 - readTx := d.txFactory.ReadWorking(time.Now()) 235 236 - plc, err := store.Consensus.AuthoritativePLC(readTx) 237 if err != nil { 238 return stacktrace.Propagate(err) 239 } 240 241 - _ = d.buildAuthoritativeOperationsFetcher(plc) 242 243 return nil 244 }
··· 8 9 dbm "github.com/cometbft/cometbft-db" 10 abcitypes "github.com/cometbft/cometbft/abci/types" 11 + "github.com/cometbft/cometbft/consensus" 12 "github.com/cometbft/cometbft/crypto" 13 cmtlog "github.com/cometbft/cometbft/libs/log" 14 "github.com/cometbft/cometbft/privval" 15 + cmttypes "github.com/cometbft/cometbft/types" 16 "github.com/cosmos/iavl" 17 "github.com/gbl08ma/stacktrace" 18 "github.com/klauspost/compress/zstd" ··· 51 52 aoc *authoritativeOperationsFetcher 53 54 + blockHeaderGetter BlockHeaderGetter 55 triggerBlockCreation func() 56 57 blockChallengeCoordinator *blockChallengeCoordinator 58 + rangeChallengeCoordinator *rangeChallengeCoordinator 59 } 60 61 // store and plc must be able to share transaction objects 62 + func NewDIDPLCApplication(appContext context.Context, logger cmtlog.Logger, pv *privval.FilePV, treeDB dbm.DB, indexDB transaction.ExtendedDB, clearData func(), snapshotDirectory, stateSyncTempDir, didBloomFilterPath string, mempoolSubmitter types.MempoolSubmitter, blockHeaderGetter BlockHeaderGetter) (*DIDPLCApplication, *transaction.Factory, plc.PLC, func(), error) { 63 mkTree := func() *iavl.MutableTree { 64 // Using SpeedDefault appears to cause the processing time for ExecuteOperation to double on average 65 // Using SpeedBetterCompression appears to cause the processing time to double again ··· 91 runnerContext, cancelRunnerContext := context.WithCancel(appContext) 92 93 d := &DIDPLCApplication{ 94 + runnerContext: runnerContext, 95 + logger: logger.With("module", "plcapp"), 96 + tree: tree, 97 + indexDB: indexDB, 98 + mempoolSubmitter: mempoolSubmitter, 99 + snapshotDirectory: snapshotDirectory, 100 + stateSyncTempDir: stateSyncTempDir, 101 + blockHeaderGetter: blockHeaderGetter, 102 + triggerBlockCreation: func() {}, 103 } 104 105 if pv != nil { ··· 134 } 135 136 d.plc = plc.NewPLC() 137 + 138 + d.blockChallengeCoordinator, err = newBlockChallengeCoordinator(d.runnerContext, d.logger, d.txFactory, d.blockHeaderGetter, d.validatorPubKey) 139 + if err != nil { 140 + return nil, nil, nil, cancelRunnerContext, stacktrace.Propagate(err) 141 + } 142 143 var wg sync.WaitGroup 144 wg.Go(func() { ··· 212 213 return d, d.txFactory, d.plc, func() { 214 cancelRunnerContext() 215 + if d.rangeChallengeCoordinator != nil { 216 + d.rangeChallengeCoordinator.Wait() 217 + } 218 wg.Wait() 219 lo.Must0(d.tree.Close()) 220 }, nil ··· 232 } 233 } 234 235 + func (d *DIDPLCApplication) FinishInitializing(triggerBlockCreation func(), nodeEventBus *cmttypes.EventBus, nodeConsensusReactor *consensus.Reactor) error { 236 d.triggerBlockCreation = triggerBlockCreation 237 238 + // ensure we resume importing even if there is no pending AuthoritativeImport tx 239 + readTx := d.txFactory.ReadWorking(time.Now()) 240 + 241 + plc, err := store.Consensus.AuthoritativePLC(readTx) 242 if err != nil { 243 return stacktrace.Propagate(err) 244 } 245 246 + _ = d.buildAuthoritativeOperationsFetcher(plc) 247 248 + d.rangeChallengeCoordinator, err = newRangeChallengeCoordinator( 249 + d.runnerContext, 250 + d.logger, 251 + d.validatorPubKey, 252 + d.validatorPrivKey, 253 + d.txFactory, 254 + d.blockChallengeCoordinator, 255 + d.blockHeaderGetter, 256 + nodeEventBus, 257 + d.mempoolSubmitter, 258 + nodeConsensusReactor) 259 if err != nil { 260 return stacktrace.Propagate(err) 261 } 262 263 + if d.validatorPubKey != nil { 264 + err := d.rangeChallengeCoordinator.Start() 265 + if err != nil { 266 + return stacktrace.Propagate(err) 267 + } 268 + } 269 270 return nil 271 }
+1 -1
abciapp/app_test.go
··· 24 25 func TestCheckTx(t *testing.T) { 26 logger := cmtlog.NewNopLogger() 27 - app, _, _, cleanup, err := abciapp.NewDIDPLCApplication(t.Context(), logger, nil, dbm.NewMemDB(), memDBWrapper{dbm.NewMemDB()}, nil, "", "", "", nil) 28 require.NoError(t, err) 29 t.Cleanup(cleanup) 30
··· 24 25 func TestCheckTx(t *testing.T) { 26 logger := cmtlog.NewNopLogger() 27 + app, _, _, cleanup, err := abciapp.NewDIDPLCApplication(t.Context(), logger, nil, dbm.NewMemDB(), memDBWrapper{dbm.NewMemDB()}, nil, "", "", "", nil, nil) 28 require.NoError(t, err) 29 t.Cleanup(cleanup) 30
+16 -15
abciapp/block_challenge.go
··· 10 "github.com/Yiling-J/theine-go" 11 "github.com/cometbft/cometbft/crypto" 12 cmtlog "github.com/cometbft/cometbft/libs/log" 13 - bftstore "github.com/cometbft/cometbft/store" 14 "github.com/consensys/gnark-crypto/ecc" 15 "github.com/consensys/gnark-crypto/ecc/bn254" 16 "github.com/consensys/gnark-crypto/ecc/bn254/fr/mimc" ··· 54 gnarklogger.Set(zerolog.Nop()) 55 } 56 57 type blockChallengeCoordinator struct { 58 g singleflight.Group[int64, []byte] 59 ··· 63 isConfiguredToBeValidator bool 64 validatorAddress []byte 65 txFactory *transaction.Factory 66 - nodeBlockStore *bftstore.BlockStore 67 68 sharedWitnessDataCache *theine.LoadingCache[int64, proof.BlockChallengeCircuit] 69 } 70 71 - func newBlockChallengeCoordinator(runnerContext context.Context, logger cmtlog.Logger, txFactory *transaction.Factory, blockStore *bftstore.BlockStore, pubKey crypto.PubKey) (*blockChallengeCoordinator, error) { 72 c := &blockChallengeCoordinator{ 73 runnerContext: runnerContext, 74 logger: logger, 75 txFactory: txFactory, 76 - nodeBlockStore: blockStore, 77 isConfiguredToBeValidator: pubKey != nil, 78 } 79 if c.isConfiguredToBeValidator { ··· 139 return nil, stacktrace.NewError("node is not configured to be a validator") 140 } 141 proof, _, err := c.g.Do(ctx, height, func(ctx context.Context) ([]byte, error) { 142 - // we need to read the tree as it was on the block prior. We assume that this method will only be called in the processing of the latest block 143 - // and we validate this assumption 144 - tx := c.txFactory.ReadCommitted() 145 - if tx.Height() != height-1 { 146 - return nil, stacktrace.NewError("challenge being loaded or computed for unexpected height %d, expected %d", height, tx.Height()+1) 147 } 148 149 proof, err := store.Consensus.BlockChallengeProof(tx, uint64(height)) ··· 254 if height <= 1 { 255 return make([]byte, proof.OperationDataLength), make([]byte, 32), nil 256 } 257 - lastBlockMeta := c.nodeBlockStore.LoadBlockMeta(height - 1) 258 - if lastBlockMeta == nil { 259 - return nil, nil, stacktrace.NewError("height not found") 260 - } 261 262 - lastCommitHash := lastBlockMeta.Header.LastCommitHash 263 lastCommitHashBigInt := big.NewInt(0).SetBytes(lastCommitHash) 264 265 highestOp, err := tx.CountOperations() ··· 288 } 289 } 290 291 - return operationData[lastCommitHash[0]:], lastBlockMeta.Header.LastCommitHash, nil 292 } 293 294 func (c *blockChallengeCoordinator) fetchOrBuildBlockChallengeCircuitAssignmentShared(tx transaction.Read, height int64) (proof.BlockChallengeCircuit, error) {
··· 10 "github.com/Yiling-J/theine-go" 11 "github.com/cometbft/cometbft/crypto" 12 cmtlog "github.com/cometbft/cometbft/libs/log" 13 + cmttypes "github.com/cometbft/cometbft/types" 14 "github.com/consensys/gnark-crypto/ecc" 15 "github.com/consensys/gnark-crypto/ecc/bn254" 16 "github.com/consensys/gnark-crypto/ecc/bn254/fr/mimc" ··· 54 gnarklogger.Set(zerolog.Nop()) 55 } 56 57 + type BlockHeaderGetter func(int64) (cmttypes.Header, error) 58 + 59 type blockChallengeCoordinator struct { 60 g singleflight.Group[int64, []byte] 61 ··· 65 isConfiguredToBeValidator bool 66 validatorAddress []byte 67 txFactory *transaction.Factory 68 + blockHeaderGetter BlockHeaderGetter 69 70 sharedWitnessDataCache *theine.LoadingCache[int64, proof.BlockChallengeCircuit] 71 } 72 73 + func newBlockChallengeCoordinator(runnerContext context.Context, logger cmtlog.Logger, txFactory *transaction.Factory, headerGetter BlockHeaderGetter, pubKey crypto.PubKey) (*blockChallengeCoordinator, error) { 74 c := &blockChallengeCoordinator{ 75 runnerContext: runnerContext, 76 logger: logger, 77 txFactory: txFactory, 78 + blockHeaderGetter: headerGetter, 79 isConfiguredToBeValidator: pubKey != nil, 80 } 81 if c.isConfiguredToBeValidator { ··· 141 return nil, stacktrace.NewError("node is not configured to be a validator") 142 } 143 proof, _, err := c.g.Do(ctx, height, func(ctx context.Context) ([]byte, error) { 144 + // we need to read the tree as it was on the block prior 145 + tx, err := c.txFactory.ReadHeight(time.Now(), height-1) 146 + if err != nil { 147 + return nil, stacktrace.Propagate(err) 148 } 149 150 proof, err := store.Consensus.BlockChallengeProof(tx, uint64(height)) ··· 255 if height <= 1 { 256 return make([]byte, proof.OperationDataLength), make([]byte, 32), nil 257 } 258 259 + blockHeader, err := c.blockHeaderGetter(height - 1) 260 + if err != nil { 261 + return nil, nil, stacktrace.Propagate(err) 262 + } 263 + lastCommitHash := blockHeader.LastCommitHash 264 lastCommitHashBigInt := big.NewInt(0).SetBytes(lastCommitHash) 265 266 highestOp, err := tx.CountOperations() ··· 289 } 290 } 291 292 + return operationData[lastCommitHash[0]:], lastCommitHash, nil 293 } 294 295 func (c *blockChallengeCoordinator) fetchOrBuildBlockChallengeCircuitAssignmentShared(tx transaction.Read, height int64) (proof.BlockChallengeCircuit, error) {
+120 -26
abciapp/execution.go
··· 4 "bytes" 5 "context" 6 "encoding/hex" 7 "slices" 8 "time" 9 10 abcitypes "github.com/cometbft/cometbft/abci/types" 11 "github.com/gbl08ma/stacktrace" 12 cbornode "github.com/ipfs/go-ipld-cbor" 13 "github.com/samber/lo" 14 "github.com/samber/mo" 15 "tangled.org/gbl08ma.com/didplcbft/transaction" 16 ) 17 18 // InitChain implements [types.Application]. 19 func (d *DIDPLCApplication) InitChain(_ context.Context, req *abcitypes.RequestInitChain) (*abcitypes.ResponseInitChain, error) { 20 req.ConsensusParams.Abci.VoteExtensionsEnableHeight = 1 21 return &abcitypes.ResponseInitChain{ 22 ConsensusParams: req.ConsensusParams, 23 }, nil ··· 78 toProcess = toTryNext 79 } 80 81 - maybeTx, err := d.maybeCreateAuthoritativeImportTx(ctx, deps.getAuthoritativeOperationsFetcher) 82 if err != nil { 83 - // TODO don't fail absolutely silently always, we should at least check what the error is 84 - d.logger.Error("failed to create authoritative import transaction", "error", stacktrace.Propagate(err)) 85 } 86 87 - if err == nil && len(maybeTx) != 0 { 88 - totalSize := lo.SumBy(acceptedTx, func(tx []byte) int { return len(tx) }) 89 - // 4 KB safety margin 90 - if totalSize+len(maybeTx) < int(req.MaxTxBytes)-4096 { 91 - // we have space to fit the import transaction 92 93 - // set execute to false to save a lot of time 94 - // (we trust that running the import will succeed, so just do bare minimum checks here) 95 - result, err := processTx(ctx, d.transactionProcessorDependenciesForOngoingProcessing(false, req.Time), maybeTx) 96 - if err != nil { 97 - return nil, stacktrace.Propagate(err) 98 - } 99 - if result.Code == 0 { 100 - acceptedTx = append(acceptedTx, maybeTx) 101 - } 102 } 103 } 104 105 return &abcitypes.ResponsePrepareProposal{Txs: acceptedTx}, nil 106 } 107 108 // ProcessProposal implements [types.Application]. 109 func (d *DIDPLCApplication) ProcessProposal(ctx context.Context, req *abcitypes.RequestProcessProposal) (*abcitypes.ResponseProcessProposal, error) { 110 defer (d.logMethod("ProcessProposal", "height", req.Height, "hash", req.Hash, "txs", len(req.Txs)))() ··· 134 135 txResults := make([]*processResult, len(req.Txs)) 136 deps := d.transactionProcessorDependenciesForOngoingProcessing(true, req.Time) 137 for i, tx := range req.Txs { 138 result, action, processor, err := beginProcessTx(tx) 139 if err != nil { 140 return nil, stacktrace.Propagate(err) 141 } 142 if result.Code == 0 { 143 - if action == TransactionActionAuthoritativeImport && i != len(req.Txs)-1 { 144 - // if an Authoritative Import transaction is present on the block, it must be the last one 145 - return &abcitypes.ResponseProcessProposal{Status: abcitypes.ResponseProcessProposal_REJECT}, nil 146 - } 147 - 148 result, err = finishProcessTx(ctx, deps, processor, tx) 149 if err != nil { 150 return nil, stacktrace.Propagate(err) ··· 158 } 159 160 txResults[i] = result 161 } 162 163 d.lastProcessedProposalHash = slices.Clone(req.Hash) ··· 204 func (d *DIDPLCApplication) FinalizeBlock(ctx context.Context, req *abcitypes.RequestFinalizeBlock) (*abcitypes.ResponseFinalizeBlock, error) { 205 defer (d.logMethod("FinalizeBlock", "height", req.Height, "hash", req.Hash))() 206 207 if bytes.Equal(req.Hash, d.lastProcessedProposalHash) && d.lastProcessedProposalExecTxResults != nil { 208 // the block that was decided was the one we processed in ProcessProposal, and ProcessProposal processed successfully 209 // reuse the uncommitted results 210 return &abcitypes.ResponseFinalizeBlock{ 211 TxResults: lo.Map(d.lastProcessedProposalExecTxResults, func(result *processResult, _ int) *abcitypes.ExecTxResult { 212 return result.ToABCI() 213 }), 214 AppHash: d.tree.WorkingHash(), 215 }, nil 216 } ··· 219 d.DiscardChanges() 220 221 txResults := make([]*processResult, len(req.Txs)) 222 for i, tx := range req.Txs { 223 var err error 224 txResults[i], err = processTx(ctx, d.transactionProcessorDependenciesForOngoingProcessing(true, req.Time), tx) 225 if err != nil { 226 return nil, stacktrace.Propagate(err) 227 } 228 } 229 230 d.lastProcessedProposalHash = slices.Clone(req.Hash) 231 d.lastProcessedProposalExecTxResults = txResults 232 233 return &abcitypes.ResponseFinalizeBlock{ 234 TxResults: lo.Map(d.lastProcessedProposalExecTxResults, func(result *processResult, _ int) *abcitypes.ExecTxResult { 235 return result.ToABCI() 236 }), 237 - AppHash: d.tree.WorkingHash(), 238 }, nil 239 } 240 ··· 275 getAuthoritativeOperationsFetcher: d.buildAuthoritativeOperationsFetcher, 276 destroyAuthoritativeOperationsFetcher: d.destroyAuthoritativeOperationsFetcher, 277 blockChallengeCoordinator: d.blockChallengeCoordinator, 278 - blockStore: d.blockStore, 279 } 280 } 281 ··· 296 getAuthoritativeOperationsFetcher: d.buildAuthoritativeOperationsFetcher, 297 destroyAuthoritativeOperationsFetcher: d.destroyAuthoritativeOperationsFetcher, 298 blockChallengeCoordinator: d.blockChallengeCoordinator, 299 - blockStore: d.blockStore, 300 } 301 } 302 ··· 319 } 320 321 if d.aoc == nil && plc != "" { 322 - d.aoc = newAuthoritativeOperationsFetcher(d.runnerContext, d.logger, plc, d.triggerBlockCreation) 323 } 324 325 return d.aoc
··· 4 "bytes" 5 "context" 6 "encoding/hex" 7 + "errors" 8 "slices" 9 "time" 10 11 abcitypes "github.com/cometbft/cometbft/abci/types" 12 + "github.com/cometbft/cometbft/crypto/ed25519" 13 "github.com/gbl08ma/stacktrace" 14 cbornode "github.com/ipfs/go-ipld-cbor" 15 "github.com/samber/lo" 16 "github.com/samber/mo" 17 + "tangled.org/gbl08ma.com/didplcbft/store" 18 "tangled.org/gbl08ma.com/didplcbft/transaction" 19 ) 20 21 // InitChain implements [types.Application]. 22 func (d *DIDPLCApplication) InitChain(_ context.Context, req *abcitypes.RequestInitChain) (*abcitypes.ResponseInitChain, error) { 23 + writeTx, err := d.txFactory.ReadWorking(req.Time).UpgradeForIndexOnly() 24 + if err != nil { 25 + return nil, stacktrace.Propagate(err) 26 + } 27 + 28 + for _, validator := range req.Validators { 29 + pubKey := validator.PubKey.GetEd25519() 30 + if pubKey == nil { 31 + return nil, errors.New("only ed25519 keys are supported") 32 + } 33 + address := ed25519.PubKey(validator.PubKey.GetEd25519()).Address().Bytes() 34 + err := store.Consensus.InitializeValidatorVotingActivity(writeTx, address, pubKey, 0) 35 + if err != nil { 36 + return nil, stacktrace.Propagate(err) 37 + } 38 + } 39 + 40 + err = writeTx.Commit() 41 + if err != nil { 42 + return nil, stacktrace.Propagate(err) 43 + } 44 + 45 req.ConsensusParams.Abci.VoteExtensionsEnableHeight = 1 46 + 47 return &abcitypes.ResponseInitChain{ 48 ConsensusParams: req.ConsensusParams, 49 }, nil ··· 104 toProcess = toTryNext 105 } 106 107 + trailingMustIncludes, err := d.createTrailingTransactions(ctx, deps) 108 if err != nil { 109 + return nil, stacktrace.Propagate(err) 110 } 111 112 + // drop transactions if needed to make space for trailingMustIncludes 113 + totalSize := lo.SumBy(acceptedTx, func(tx []byte) int { 114 + // account for 10 bytes of per-transaction overhead (protobuf field tags, length varints etc.) 115 + return len(tx) + 10 116 + }) 117 + sizeOfTrailingMustIncludes := lo.SumBy(trailingMustIncludes, func(tx []byte) int { return len(tx) + 10 }) 118 + for len(acceptedTx) != 0 && totalSize+sizeOfTrailingMustIncludes > int(req.MaxTxBytes) { 119 + totalSize -= len(acceptedTx[len(acceptedTx)-1]) + 10 120 + acceptedTx = acceptedTx[:len(acceptedTx)-1] 121 + } 122 123 + for _, maybeTx := range trailingMustIncludes { 124 + result, err := processTx(ctx, d.transactionProcessorDependenciesForOngoingProcessing(false, req.Time), maybeTx) 125 + if err != nil { 126 + return nil, stacktrace.Propagate(err) 127 + } 128 + if result.Code == 0 { 129 + acceptedTx = append(acceptedTx, maybeTx) 130 } 131 } 132 133 return &abcitypes.ResponsePrepareProposal{Txs: acceptedTx}, nil 134 } 135 136 + func (d *DIDPLCApplication) createTrailingTransactions(ctx context.Context, deps TransactionProcessorDependencies) ([][]byte, error) { 137 + trailingMustIncludes := [][]byte{} 138 + 139 + maybeTx, err := d.maybeCreateAuthoritativeImportTx(ctx, deps.getAuthoritativeOperationsFetcher) 140 + if err != nil { 141 + // TODO don't fail absolutely silently always, we should at least check what the error is 142 + d.logger.Error("failed to create authoritative import transaction", "error", stacktrace.Propagate(err)) 143 + } else if len(maybeTx) != 0 { 144 + trailingMustIncludes = append(trailingMustIncludes, maybeTx) 145 + } 146 + 147 + maybeTx, err = d.maybeCreateUpdateValidatorsTx(ctx) 148 + if err != nil { 149 + return nil, stacktrace.Propagate(err) 150 + } 151 + if len(maybeTx) != 0 { 152 + trailingMustIncludes = append(trailingMustIncludes, maybeTx) 153 + } 154 + 155 + return trailingMustIncludes, nil 156 + } 157 + 158 // ProcessProposal implements [types.Application]. 159 func (d *DIDPLCApplication) ProcessProposal(ctx context.Context, req *abcitypes.RequestProcessProposal) (*abcitypes.ResponseProcessProposal, error) { 160 defer (d.logMethod("ProcessProposal", "height", req.Height, "hash", req.Hash, "txs", len(req.Txs)))() ··· 184 185 txResults := make([]*processResult, len(req.Txs)) 186 deps := d.transactionProcessorDependenciesForOngoingProcessing(true, req.Time) 187 + trailingOnly := false 188 + foundEpochTransaction := false 189 for i, tx := range req.Txs { 190 result, action, processor, err := beginProcessTx(tx) 191 if err != nil { 192 return nil, stacktrace.Propagate(err) 193 } 194 + if trailingOnly && !action.MustBeTrailingInBlock() { 195 + return &abcitypes.ResponseProcessProposal{Status: abcitypes.ResponseProcessProposal_REJECT}, nil 196 + } 197 + if action.MustBeTrailingInBlock() { 198 + trailingOnly = true 199 + } 200 + if action == TransactionActionUpdateValidators { 201 + foundEpochTransaction = true 202 + } 203 + 204 if result.Code == 0 { 205 result, err = finishProcessTx(ctx, deps, processor, tx) 206 if err != nil { 207 return nil, stacktrace.Propagate(err) ··· 215 } 216 217 txResults[i] = result 218 + } 219 + 220 + if isUpdateValidatorsTxHeight(req.Height) && !foundEpochTransaction { 221 + return &abcitypes.ResponseProcessProposal{Status: abcitypes.ResponseProcessProposal_REJECT}, nil 222 } 223 224 d.lastProcessedProposalHash = slices.Clone(req.Hash) ··· 265 func (d *DIDPLCApplication) FinalizeBlock(ctx context.Context, req *abcitypes.RequestFinalizeBlock) (*abcitypes.ResponseFinalizeBlock, error) { 266 defer (d.logMethod("FinalizeBlock", "height", req.Height, "hash", req.Hash))() 267 268 + markVotingParticipation := func() error { 269 + for _, vote := range req.DecidedLastCommit.Votes { 270 + err := store.Consensus.MarkValidatorVote(d.ongoingWrite, vote.GetValidator().Address, uint64(req.Height)) 271 + // we expect to attempt to store votes for validators that aren't active because validator_updates take a few blocks to fully take effect 272 + if err != nil && !errors.Is(err, store.ErrValidatorNotActive) { 273 + return stacktrace.Propagate(err) 274 + } 275 + } 276 + return nil 277 + } 278 + 279 if bytes.Equal(req.Hash, d.lastProcessedProposalHash) && d.lastProcessedProposalExecTxResults != nil { 280 + d.createOngoingTxIfNeeded(req.Time) 281 + 282 + err := markVotingParticipation() 283 + if err != nil { 284 + return nil, stacktrace.Propagate(err) 285 + } 286 + 287 // the block that was decided was the one we processed in ProcessProposal, and ProcessProposal processed successfully 288 // reuse the uncommitted results 289 return &abcitypes.ResponseFinalizeBlock{ 290 TxResults: lo.Map(d.lastProcessedProposalExecTxResults, func(result *processResult, _ int) *abcitypes.ExecTxResult { 291 return result.ToABCI() 292 }), 293 + ValidatorUpdates: lo.FlatMap(d.lastProcessedProposalExecTxResults, func(result *processResult, _ int) []abcitypes.ValidatorUpdate { 294 + return result.validatorUpdates 295 + }), 296 AppHash: d.tree.WorkingHash(), 297 }, nil 298 } ··· 301 d.DiscardChanges() 302 303 txResults := make([]*processResult, len(req.Txs)) 304 + validatorUpdates := []abcitypes.ValidatorUpdate{} 305 for i, tx := range req.Txs { 306 var err error 307 txResults[i], err = processTx(ctx, d.transactionProcessorDependenciesForOngoingProcessing(true, req.Time), tx) 308 if err != nil { 309 return nil, stacktrace.Propagate(err) 310 } 311 + validatorUpdates = append(validatorUpdates, txResults[i].validatorUpdates...) 312 } 313 314 d.lastProcessedProposalHash = slices.Clone(req.Hash) 315 d.lastProcessedProposalExecTxResults = txResults 316 317 + err := markVotingParticipation() 318 + if err != nil { 319 + return nil, stacktrace.Propagate(err) 320 + } 321 + 322 return &abcitypes.ResponseFinalizeBlock{ 323 TxResults: lo.Map(d.lastProcessedProposalExecTxResults, func(result *processResult, _ int) *abcitypes.ExecTxResult { 324 return result.ToABCI() 325 }), 326 + ValidatorUpdates: validatorUpdates, 327 + AppHash: d.tree.WorkingHash(), 328 }, nil 329 } 330 ··· 365 getAuthoritativeOperationsFetcher: d.buildAuthoritativeOperationsFetcher, 366 destroyAuthoritativeOperationsFetcher: d.destroyAuthoritativeOperationsFetcher, 367 blockChallengeCoordinator: d.blockChallengeCoordinator, 368 + blockHeaderGetter: d.blockHeaderGetter, 369 } 370 } 371 ··· 386 getAuthoritativeOperationsFetcher: d.buildAuthoritativeOperationsFetcher, 387 destroyAuthoritativeOperationsFetcher: d.destroyAuthoritativeOperationsFetcher, 388 blockChallengeCoordinator: d.blockChallengeCoordinator, 389 + blockHeaderGetter: d.blockHeaderGetter, 390 } 391 } 392 ··· 409 } 410 411 if d.aoc == nil && plc != "" { 412 + d.aoc = newAuthoritativeOperationsFetcher(d.runnerContext, d.logger, plc, func() { 413 + // note that this is different from passing d.triggerBlockCreation directly 414 + // d.triggerBlockCreation is changed in FinishInitializing 415 + d.triggerBlockCreation() 416 + }) 417 } 418 419 return d.aoc
+13 -13
abciapp/info.go
··· 2 3 import ( 4 "context" 5 - "encoding/hex" 6 "encoding/json" 7 "errors" 8 "fmt" ··· 195 }, 196 }, 197 { 198 - matcher: urlpath.New("/validator/:address/reputation"), 199 handler: func(match urlpath.Match) (*abcitypes.ResponseQuery, error) { 200 - addressString := match.Params["address"] 201 - addressBytes, err := hex.DecodeString(addressString) 202 - if err != nil || len(addressBytes) != 20 { 203 return &abcitypes.ResponseQuery{ 204 - Key: []byte(addressString), 205 Code: 6100, 206 - Log: "Invalid address", 207 }, nil 208 } 209 - reputation, err := store.Consensus.ValidatorReputation(readTx, addressBytes) 210 if err != nil { 211 return nil, stacktrace.Propagate(err) 212 } 213 214 resp := struct { 215 - ValidatorAddress string `json:"validatorAddress"` 216 - Reputation string `json:"reputation"` 217 }{ 218 - ValidatorAddress: hex.EncodeToString(addressBytes), 219 - Reputation: strconv.FormatUint(reputation, 10), 220 } 221 222 respJSON, err := json.Marshal(&resp) ··· 225 } 226 227 return &abcitypes.ResponseQuery{ 228 - Key: []byte(resp.ValidatorAddress), 229 Value: []byte(respJSON), 230 Code: 0, 231 }, nil
··· 2 3 import ( 4 "context" 5 + "encoding/base64" 6 "encoding/json" 7 "errors" 8 "fmt" ··· 195 }, 196 }, 197 { 198 + matcher: urlpath.New("/validator/:pubkey/reputation"), 199 handler: func(match urlpath.Match) (*abcitypes.ResponseQuery, error) { 200 + pubkeyString := match.Params["pubkey"] 201 + pubkeyBytes, err := base64.StdEncoding.DecodeString(pubkeyString) 202 + if err != nil || len(pubkeyBytes) != 32 { 203 return &abcitypes.ResponseQuery{ 204 + Key: []byte(pubkeyString), 205 Code: 6100, 206 + Log: "Invalid public key", 207 }, nil 208 } 209 + reputation, err := store.Consensus.ValidatorReputation(readTx, pubkeyBytes) 210 if err != nil { 211 return nil, stacktrace.Propagate(err) 212 } 213 214 resp := struct { 215 + ValidatorPubKey string `json:"validatorPubKey"` 216 + Reputation string `json:"reputation"` 217 }{ 218 + ValidatorPubKey: base64.StdEncoding.EncodeToString(pubkeyBytes), 219 + Reputation: strconv.FormatUint(reputation, 10), 220 } 221 222 respJSON, err := json.Marshal(&resp) ··· 225 } 226 227 return &abcitypes.ResponseQuery{ 228 + Key: []byte(resp.ValidatorPubKey), 229 Value: []byte(respJSON), 230 Code: 0, 231 }, nil
+2 -2
abciapp/mempool.go
··· 14 return nil, stacktrace.Propagate(err) 15 } 16 if result.Code == 0 { 17 - if action == TransactionActionAuthoritativeImport { 18 // this type of transaction is meant to be included only by validator nodes 19 return &abcitypes.ResponseCheckTx{ 20 Code: 4002, 21 - Log: "AuthoritativeImport transactions can only be introduced by validator nodes", 22 }, nil 23 } 24
··· 14 return nil, stacktrace.Propagate(err) 15 } 16 if result.Code == 0 { 17 + if !action.SubmittableViaMempool() { 18 // this type of transaction is meant to be included only by validator nodes 19 return &abcitypes.ResponseCheckTx{ 20 Code: 4002, 21 + Log: "This action can only be introduced by validator nodes", 22 }, nil 23 } 24
+87 -56
abciapp/range_challenge.go
··· 13 "github.com/Yiling-J/theine-go" 14 "github.com/cometbft/cometbft/crypto" 15 cmtlog "github.com/cometbft/cometbft/libs/log" 16 - "github.com/cometbft/cometbft/privval" 17 "github.com/cometbft/cometbft/rpc/core" 18 - bftstore "github.com/cometbft/cometbft/store" 19 cmttypes "github.com/cometbft/cometbft/types" 20 "github.com/cosmos/iavl" 21 "github.com/cosmos/iavl/db" 22 ics23 "github.com/cosmos/ics23/go" 23 "github.com/gbl08ma/stacktrace" 24 cbornode "github.com/ipfs/go-ipld-cbor" 25 "github.com/samber/mo" 26 "tangled.org/gbl08ma.com/didplcbft/store" 27 "tangled.org/gbl08ma.com/didplcbft/transaction" 28 "tangled.org/gbl08ma.com/didplcbft/types" 29 ) 30 31 - type RangeChallengeCoordinator struct { 32 runnerContext context.Context 33 logger cmtlog.Logger 34 35 isConfiguredToBeValidator bool 36 validatorPubKey crypto.PubKey 37 validatorPrivKey crypto.PrivKey 38 - validatorAddress []byte 39 txFactory *transaction.Factory 40 - nodeBlockStore *bftstore.BlockStore 41 nodeEventBus *cmttypes.EventBus 42 mempoolSubmitter types.MempoolSubmitter 43 consensusReactor consensusReactor ··· 57 WaitSync() bool 58 } 59 60 - func NewRangeChallengeCoordinator( 61 runnerContext context.Context, 62 logger cmtlog.Logger, 63 - pv *privval.FilePV, 64 txFactory *transaction.Factory, 65 - blockStore *bftstore.BlockStore, 66 nodeEventBus *cmttypes.EventBus, 67 mempoolSubmitter types.MempoolSubmitter, 68 - consensusReactor consensusReactor) (*RangeChallengeCoordinator, error) { 69 - c := &RangeChallengeCoordinator{ 70 - txFactory: txFactory, 71 runnerContext: runnerContext, 72 logger: logger, 73 - nodeBlockStore: blockStore, 74 nodeEventBus: nodeEventBus, 75 mempoolSubmitter: mempoolSubmitter, 76 consensusReactor: consensusReactor, 77 - isConfiguredToBeValidator: pv != nil, 78 newBlockCh: make(chan int64), 79 } 80 if c.isConfiguredToBeValidator { 81 - c.validatorPubKey = pv.Key.PubKey 82 - c.validatorPrivKey = pv.Key.PrivKey 83 - c.validatorAddress = c.validatorPubKey.Address() 84 } 85 86 var err error ··· 92 return c, nil 93 } 94 95 - func (c *RangeChallengeCoordinator) Start() error { 96 c.startMu.Lock() 97 defer c.startMu.Unlock() 98 if c.started { ··· 115 defer cancel() 116 err := c.onNewBlock(ctx, newHeight) 117 if err != nil { 118 - // note: this is expected in certain circumstances, such as the proof for the toHeight block not being ready yet as the block was just finalized 119 - // (and the block may have been finalized without our votes) 120 c.logger.Error("range challenge block handler error", "error", stacktrace.Propagate(err)) 121 } 122 }() ··· 127 return nil 128 } 129 130 - func (c *RangeChallengeCoordinator) Wait() { 131 c.startMu.Lock() 132 started := c.started 133 c.startMu.Unlock() ··· 147 root []byte 148 } 149 150 - func (c *RangeChallengeCoordinator) getOrFetchNextProofFromHeight(tx transaction.Read) (int64, error) { 151 if completion, hasCache := c.cachedNextProofFromHeight.Get(); hasCache { 152 return completion, nil 153 } 154 155 - completion, err := store.Consensus.ValidatorRangeChallengeCompletion(tx, c.validatorAddress) 156 if err != nil { 157 if !errors.Is(err, store.ErrNoRecentChallengeCompletion) { 158 return 0, stacktrace.Propagate(err) ··· 160 completion = 0 161 } 162 163 - minProvenBlock := int64(0) 164 - for proofHeight := range store.Consensus.BlockChalengeProofsIterator(tx, 0, &err) { 165 - minProvenBlock = int64(proofHeight) 166 - break 167 } 168 if err != nil { 169 return 0, stacktrace.Propagate(err) 170 } 171 172 - minProvable := max(minProvenBlock, int64(completion+1)) 173 174 c.cachedNextProofFromHeight = mo.Some(minProvable) 175 return minProvable, nil 176 } 177 178 - func (c *RangeChallengeCoordinator) newBlocksSubscriber() error { 179 subscriber := "rangeChallengeCoordinator" 180 181 subCtx, cancel := context.WithTimeout(c.runnerContext, core.SubscribeTimeout) ··· 203 case c.newBlockCh <- newBlockHeaderEvent.Header.Height: 204 default: 205 } 206 case <-blocksSub.Canceled(): 207 err := blocksSub.Err() 208 if err == nil { ··· 213 } 214 } 215 216 - func (c *RangeChallengeCoordinator) onNewBlock(ctx context.Context, newBlockHeight int64) error { 217 if !c.isConfiguredToBeValidator { 218 return nil 219 } ··· 222 return nil 223 } 224 225 tx := c.txFactory.ReadCommitted() 226 if tx.Height() < newBlockHeight { 227 return stacktrace.NewError("read committed transaction height lower than expected: new block height %d, transaction height %d", newBlockHeight, tx.Height()) ··· 230 shouldCommitToChallenge := false 231 shouldCompleteChallenge := false 232 233 - fromHeight, toHeight, provenHeight, includedOnHeight, _, err := store.Consensus.ValidatorRangeChallengeCommitment(tx, c.validatorAddress) 234 if errors.Is(err, store.ErrNoActiveChallengeCommitment) { 235 deleteOldProofs := false 236 if c.hasSubmittedChallengeCompletion { ··· 240 c.hasSubmittedChallengeCompletion = false 241 c.cachedNextProofFromHeight = mo.None[int64]() 242 } 243 - nextFromHeight, err := c.getOrFetchNextProofFromHeight(tx) 244 if err != nil { 245 return stacktrace.Propagate(err) 246 } ··· 267 } else if err != nil { 268 return stacktrace.Propagate(err) 269 } else { 270 - commitmentBlockMeta := c.nodeBlockStore.LoadBlockMeta(int64(includedOnHeight)) 271 commitmentExpired := false 272 - if commitmentBlockMeta != nil { 273 commitmentExpired = uint64(newBlockHeight) >= includedOnHeight+CompleteChallengeMaxAgeInBlocks || 274 - time.Since(commitmentBlockMeta.Header.Time) >= CompleteChallengeMaxAge-1*time.Second 275 276 // shouldCompleteChallenge if not too many blocks have passed AND enough blocks have passed 277 shouldCompleteChallenge = !commitmentExpired && includedOnHeight+1 <= uint64(newBlockHeight) 278 } 279 280 - if !shouldCompleteChallenge { 281 - shouldCommitToChallenge = commitmentExpired 282 } 283 } 284 ··· 290 return stacktrace.Propagate(err) 291 } 292 } else if shouldCommitToChallenge { 293 - c.logger.Info("Creating challenge commitment transaction", "toHeight", toHeight) 294 transactionBytes, err = c.createCommitToChallengeTx(ctx, tx, newBlockHeight) 295 if err != nil { 296 if errors.Is(err, errMissingProofs) { 297 // this is expected on nodes that take longer to generate the proofs 298 // shouldCommitToChallenge will be true again on the next block and we'll try again 299 return nil ··· 320 return nil 321 } 322 323 - func (c *RangeChallengeCoordinator) createCommitToChallengeTx(ctx context.Context, tx transaction.Read, toHeight int64) ([]byte, error) { 324 - fromHeight, err := c.getOrFetchNextProofFromHeight(tx) 325 if err != nil { 326 return nil, stacktrace.Propagate(err) 327 } ··· 340 return nil, stacktrace.Propagate(err) 341 } 342 343 - toHeightBlockMeta := c.nodeBlockStore.LoadBlockMeta(toHeight) 344 - if toHeightBlockMeta == nil { 345 - return nil, stacktrace.NewError("block not found at height") 346 } 347 348 - if tx.Timestamp().Sub(toHeightBlockMeta.Header.Time) > CommitToChallengeMaxAge { 349 return nil, stacktrace.NewError("too much time passed since block at height") 350 } 351 352 - proveHeight := computeHeightToProveInRange(toHeightBlockMeta.Header.LastCommitHash, c.validatorAddress, int64(fromHeight), int64(toHeight), mo.None[int64]()) 353 354 commitToRoot, membershipProof, err := c.computeRangeChallengeProof(ctx, tx, fromHeight, toHeight, proveHeight) 355 if err != nil { ··· 384 return out, nil 385 } 386 387 - func (c *RangeChallengeCoordinator) createCompleteChallengeTx(ctx context.Context, tx transaction.Read, fromHeight, toHeight, prevProvenHeight, commitmentIncludedOnHeight int64) ([]byte, error) { 388 - nextBlockMeta := c.nodeBlockStore.LoadBlockMeta(commitmentIncludedOnHeight + 1) 389 - if nextBlockMeta == nil { 390 - return nil, stacktrace.NewError("block not found at height") 391 } 392 393 - proveHeight := computeHeightToProveInRange(nextBlockMeta.Header.LastCommitHash, c.validatorAddress, int64(fromHeight), int64(toHeight), mo.Some(prevProvenHeight)) 394 395 _, membershipProof, err := c.computeRangeChallengeProof(ctx, tx, fromHeight, toHeight, proveHeight) 396 if err != nil { ··· 405 transaction := Transaction[CompleteChallengeArguments]{ 406 Action: TransactionActionCompleteChallenge, 407 Arguments: CompleteChallengeArguments{ 408 - Validator: c.validatorAddress, 409 - Proof: proofBytes, 410 }, 411 } 412 ··· 417 return out, nil 418 } 419 420 - func (c *RangeChallengeCoordinator) computeRangeChallengeProof(ctx context.Context, tx transaction.Read, startHeight, endHeight, proveHeight int64) ([]byte, *ics23.CommitmentProof, error) { 421 ctx = context.WithValue(ctx, contextTxKey{}, tx) 422 ct, err := c.treeCache.Get(ctx, treeCacheKey{ 423 startHeight: startHeight, ··· 437 return ct.root, membershipProof, nil 438 } 439 440 - func computeHeightToProveInRange(lastCommitHash, validatorAddress []byte, fromHeight, toHeight int64, avoidHeight mo.Option[int64]) int64 { 441 lastCommitHashBigInt := new(big.Int).SetBytes(lastCommitHash) 442 - validatorBigInt := new(big.Int).SetBytes(validatorAddress) 443 seed := new(big.Int).Xor(lastCommitHashBigInt, validatorBigInt) 444 445 numBlocks := toHeight - fromHeight + 1 ··· 458 459 var errMissingProofs = errors.New("missing block challenge proofs in requested range") 460 461 - func (c *RangeChallengeCoordinator) proofTreeLoader(ctx context.Context, cacheKey treeCacheKey) (theine.Loaded[cachedTree], error) { 462 var zeroValue theine.Loaded[cachedTree] 463 464 anyTx := ctx.Value(contextTxKey{}) ··· 473 tree := iavl.NewMutableTree(db.NewMemDB(), 16, false, iavl.NewNopLogger(), iavl.AsyncPruningOption(false)) 474 475 var err error 476 - for proofHeight, proof := range store.Consensus.BlockChalengeProofsIterator(tx, uint64(max(cacheKey.startHeight-1, 0)), &err) { 477 select { 478 case <-ctx.Done(): 479 return zeroValue, stacktrace.Propagate(ctx.Err()) ··· 483 if proofHeight > uint64(cacheKey.endHeight) { 484 break 485 } 486 487 _, err := tree.Set(binary.BigEndian.AppendUint64(nil, proofHeight), slices.Clone(proof)) 488 if err != nil {
··· 13 "github.com/Yiling-J/theine-go" 14 "github.com/cometbft/cometbft/crypto" 15 cmtlog "github.com/cometbft/cometbft/libs/log" 16 "github.com/cometbft/cometbft/rpc/core" 17 cmttypes "github.com/cometbft/cometbft/types" 18 "github.com/cosmos/iavl" 19 "github.com/cosmos/iavl/db" 20 ics23 "github.com/cosmos/ics23/go" 21 "github.com/gbl08ma/stacktrace" 22 cbornode "github.com/ipfs/go-ipld-cbor" 23 + "github.com/samber/lo" 24 "github.com/samber/mo" 25 "tangled.org/gbl08ma.com/didplcbft/store" 26 "tangled.org/gbl08ma.com/didplcbft/transaction" 27 "tangled.org/gbl08ma.com/didplcbft/types" 28 ) 29 30 + type rangeChallengeCoordinator struct { 31 runnerContext context.Context 32 logger cmtlog.Logger 33 34 isConfiguredToBeValidator bool 35 validatorPubKey crypto.PubKey 36 validatorPrivKey crypto.PrivKey 37 txFactory *transaction.Factory 38 + blockChallengeCoordinator *blockChallengeCoordinator 39 + blockHeaderGetter BlockHeaderGetter 40 nodeEventBus *cmttypes.EventBus 41 mempoolSubmitter types.MempoolSubmitter 42 consensusReactor consensusReactor ··· 56 WaitSync() bool 57 } 58 59 + func newRangeChallengeCoordinator( 60 runnerContext context.Context, 61 logger cmtlog.Logger, 62 + validatorPubKey crypto.PubKey, 63 + validatorPrivKey crypto.PrivKey, 64 txFactory *transaction.Factory, 65 + blockChallengeCoordinator *blockChallengeCoordinator, 66 + blockHeaderGetter BlockHeaderGetter, 67 nodeEventBus *cmttypes.EventBus, 68 mempoolSubmitter types.MempoolSubmitter, 69 + consensusReactor consensusReactor) (*rangeChallengeCoordinator, error) { 70 + c := &rangeChallengeCoordinator{ 71 runnerContext: runnerContext, 72 logger: logger, 73 + txFactory: txFactory, 74 + blockChallengeCoordinator: blockChallengeCoordinator, 75 + blockHeaderGetter: blockHeaderGetter, 76 nodeEventBus: nodeEventBus, 77 mempoolSubmitter: mempoolSubmitter, 78 consensusReactor: consensusReactor, 79 + isConfiguredToBeValidator: validatorPubKey != nil && validatorPrivKey != nil, 80 newBlockCh: make(chan int64), 81 } 82 if c.isConfiguredToBeValidator { 83 + c.validatorPubKey = validatorPubKey 84 + c.validatorPrivKey = validatorPrivKey 85 } 86 87 var err error ··· 93 return c, nil 94 } 95 96 + func (c *rangeChallengeCoordinator) Start() error { 97 c.startMu.Lock() 98 defer c.startMu.Unlock() 99 if c.started { ··· 116 defer cancel() 117 err := c.onNewBlock(ctx, newHeight) 118 if err != nil { 119 c.logger.Error("range challenge block handler error", "error", stacktrace.Propagate(err)) 120 } 121 }() ··· 126 return nil 127 } 128 129 + func (c *rangeChallengeCoordinator) Wait() { 130 c.startMu.Lock() 131 started := c.started 132 c.startMu.Unlock() ··· 146 root []byte 147 } 148 149 + func (c *rangeChallengeCoordinator) getOrFetchNextProofFromHeight(tx transaction.Read, toHeight int64) (int64, error) { 150 if completion, hasCache := c.cachedNextProofFromHeight.Get(); hasCache { 151 return completion, nil 152 } 153 154 + completion, err := store.Consensus.ValidatorRangeChallengeCompletion(tx, c.validatorPubKey.Bytes()) 155 if err != nil { 156 if !errors.Is(err, store.ErrNoRecentChallengeCompletion) { 157 return 0, stacktrace.Propagate(err) ··· 159 completion = 0 160 } 161 162 + minConsecutiveProvenBlock := uint64(toHeight) 163 + for proofHeight := range store.Consensus.BlockChallengeProofsReverseIterator(tx, minConsecutiveProvenBlock, &err) { 164 + if proofHeight+1 != minConsecutiveProvenBlock { 165 + break 166 + } 167 + minConsecutiveProvenBlock = proofHeight 168 } 169 if err != nil { 170 return 0, stacktrace.Propagate(err) 171 } 172 173 + if minConsecutiveProvenBlock == uint64(toHeight) { 174 + return 0, stacktrace.Propagate(errMissingProofs) 175 + } 176 + 177 + minProvable := int64(max(minConsecutiveProvenBlock, completion+1)) 178 179 c.cachedNextProofFromHeight = mo.Some(minProvable) 180 return minProvable, nil 181 } 182 183 + func (c *rangeChallengeCoordinator) newBlocksSubscriber() error { 184 subscriber := "rangeChallengeCoordinator" 185 186 subCtx, cancel := context.WithTimeout(c.runnerContext, core.SubscribeTimeout) ··· 208 case c.newBlockCh <- newBlockHeaderEvent.Header.Height: 209 default: 210 } 211 + 212 + if c.isConfiguredToBeValidator && !c.consensusReactor.WaitSync() { 213 + // ensure we don't skip creating any proof even if we are not an active validator 214 + // (i.e. ExtendVote won't trigger the computation of block challenge proofs, and/or we may be slightly behind, so notifyOfIncomingBlockHeight in ProcessProposal won't be called) 215 + c.blockChallengeCoordinator.notifyOfIncomingBlockHeight(newBlockHeaderEvent.Header.Height) 216 + } 217 case <-blocksSub.Canceled(): 218 err := blocksSub.Err() 219 if err == nil { ··· 224 } 225 } 226 227 + func (c *rangeChallengeCoordinator) onNewBlock(ctx context.Context, newBlockHeight int64) error { 228 if !c.isConfiguredToBeValidator { 229 return nil 230 } ··· 233 return nil 234 } 235 236 + // if we are not an active validator, when we get to this point, the block challenge for newBlockHeight might not be ready yet, so we need to wait for it 237 + // even if we are an active validator, the block may have been finalized without our votes 238 + _, err := c.blockChallengeCoordinator.loadOrComputeBlockChallengeProof(ctx, newBlockHeight) 239 + if err != nil { 240 + return stacktrace.Propagate(err) 241 + } 242 + 243 tx := c.txFactory.ReadCommitted() 244 if tx.Height() < newBlockHeight { 245 return stacktrace.NewError("read committed transaction height lower than expected: new block height %d, transaction height %d", newBlockHeight, tx.Height()) ··· 248 shouldCommitToChallenge := false 249 shouldCompleteChallenge := false 250 251 + fromHeight, toHeight, provenHeight, includedOnHeight, _, err := store.Consensus.ValidatorRangeChallengeCommitment(tx, c.validatorPubKey.Bytes()) 252 if errors.Is(err, store.ErrNoActiveChallengeCommitment) { 253 deleteOldProofs := false 254 if c.hasSubmittedChallengeCompletion { ··· 258 c.hasSubmittedChallengeCompletion = false 259 c.cachedNextProofFromHeight = mo.None[int64]() 260 } 261 + nextFromHeight, err := c.getOrFetchNextProofFromHeight(tx, newBlockHeight) 262 if err != nil { 263 return stacktrace.Propagate(err) 264 } ··· 285 } else if err != nil { 286 return stacktrace.Propagate(err) 287 } else { 288 + commitmentBlockHeader, err := c.blockHeaderGetter(int64(includedOnHeight)) 289 commitmentExpired := false 290 + if err == nil { 291 commitmentExpired = uint64(newBlockHeight) >= includedOnHeight+CompleteChallengeMaxAgeInBlocks || 292 + time.Since(commitmentBlockHeader.Time) >= CompleteChallengeMaxAge-1*time.Second 293 294 // shouldCompleteChallenge if not too many blocks have passed AND enough blocks have passed 295 shouldCompleteChallenge = !commitmentExpired && includedOnHeight+1 <= uint64(newBlockHeight) 296 } 297 298 + if !shouldCompleteChallenge && commitmentExpired { 299 + nextFromHeight, err := c.getOrFetchNextProofFromHeight(tx, newBlockHeight) 300 + if err != nil { 301 + return stacktrace.Propagate(err) 302 + } 303 + 304 + shouldCommitToChallenge = nextFromHeight+CommitToChallengeTargetInterval-1 <= newBlockHeight 305 } 306 } 307 ··· 313 return stacktrace.Propagate(err) 314 } 315 } else if shouldCommitToChallenge { 316 + c.logger.Info("Creating challenge commitment transaction", "toHeight", newBlockHeight) 317 transactionBytes, err = c.createCommitToChallengeTx(ctx, tx, newBlockHeight) 318 if err != nil { 319 if errors.Is(err, errMissingProofs) { 320 + c.cachedNextProofFromHeight = mo.None[int64]() 321 // this is expected on nodes that take longer to generate the proofs 322 // shouldCommitToChallenge will be true again on the next block and we'll try again 323 return nil ··· 344 return nil 345 } 346 347 + func (c *rangeChallengeCoordinator) createCommitToChallengeTx(ctx context.Context, tx transaction.Read, toHeight int64) ([]byte, error) { 348 + fromHeight, err := c.getOrFetchNextProofFromHeight(tx, toHeight) 349 if err != nil { 350 return nil, stacktrace.Propagate(err) 351 } ··· 364 return nil, stacktrace.Propagate(err) 365 } 366 367 + toHeightBlockHeader, err := c.blockHeaderGetter(toHeight) 368 + if err != nil { 369 + return nil, stacktrace.Propagate(err) 370 } 371 372 + if tx.Timestamp().Sub(toHeightBlockHeader.Time) > CommitToChallengeMaxAge { 373 return nil, stacktrace.NewError("too much time passed since block at height") 374 } 375 376 + proveHeight := computeHeightToProveInRange(toHeightBlockHeader.LastCommitHash, c.validatorPubKey, int64(fromHeight), int64(toHeight), mo.None[int64]()) 377 378 commitToRoot, membershipProof, err := c.computeRangeChallengeProof(ctx, tx, fromHeight, toHeight, proveHeight) 379 if err != nil { ··· 408 return out, nil 409 } 410 411 + func (c *rangeChallengeCoordinator) createCompleteChallengeTx(ctx context.Context, tx transaction.Read, fromHeight, toHeight, prevProvenHeight, commitmentIncludedOnHeight int64) ([]byte, error) { 412 + nextBlockHeader, err := c.blockHeaderGetter(commitmentIncludedOnHeight + 1) 413 + if err != nil { 414 + return nil, stacktrace.Propagate(err) 415 } 416 417 + proveHeight := computeHeightToProveInRange(nextBlockHeader.LastCommitHash, c.validatorPubKey, int64(fromHeight), int64(toHeight), mo.Some(prevProvenHeight)) 418 419 _, membershipProof, err := c.computeRangeChallengeProof(ctx, tx, fromHeight, toHeight, proveHeight) 420 if err != nil { ··· 429 transaction := Transaction[CompleteChallengeArguments]{ 430 Action: TransactionActionCompleteChallenge, 431 Arguments: CompleteChallengeArguments{ 432 + ValidatorPubKey: lo.Must(MarshalPubKeyForArguments(c.validatorPubKey)), 433 + Proof: proofBytes, 434 }, 435 } 436 ··· 441 return out, nil 442 } 443 444 + func (c *rangeChallengeCoordinator) computeRangeChallengeProof(ctx context.Context, tx transaction.Read, startHeight, endHeight, proveHeight int64) ([]byte, *ics23.CommitmentProof, error) { 445 ctx = context.WithValue(ctx, contextTxKey{}, tx) 446 ct, err := c.treeCache.Get(ctx, treeCacheKey{ 447 startHeight: startHeight, ··· 461 return ct.root, membershipProof, nil 462 } 463 464 + func computeHeightToProveInRange(lastCommitHash []byte, validatorPubKey crypto.PubKey, fromHeight, toHeight int64, avoidHeight mo.Option[int64]) int64 { 465 lastCommitHashBigInt := new(big.Int).SetBytes(lastCommitHash) 466 + validatorBigInt := new(big.Int).SetBytes(validatorPubKey.Bytes()) 467 seed := new(big.Int).Xor(lastCommitHashBigInt, validatorBigInt) 468 469 numBlocks := toHeight - fromHeight + 1 ··· 482 483 var errMissingProofs = errors.New("missing block challenge proofs in requested range") 484 485 + func (c *rangeChallengeCoordinator) proofTreeLoader(ctx context.Context, cacheKey treeCacheKey) (theine.Loaded[cachedTree], error) { 486 var zeroValue theine.Loaded[cachedTree] 487 488 anyTx := ctx.Value(contextTxKey{}) ··· 497 tree := iavl.NewMutableTree(db.NewMemDB(), 16, false, iavl.NewNopLogger(), iavl.AsyncPruningOption(false)) 498 499 var err error 500 + 501 + afterHeight := uint64(max(cacheKey.startHeight-1, 0)) 502 + for proofHeight, proof := range store.Consensus.BlockChallengeProofsIterator(tx, afterHeight, &err) { 503 select { 504 case <-ctx.Done(): 505 return zeroValue, stacktrace.Propagate(ctx.Err()) ··· 509 if proofHeight > uint64(cacheKey.endHeight) { 510 break 511 } 512 + 513 + if proofHeight != afterHeight+1 { 514 + return zeroValue, stacktrace.Propagate(errMissingProofs) 515 + } 516 + afterHeight = proofHeight 517 518 _, err := tree.Set(binary.BigEndian.AppendUint64(nil, proofHeight), slices.Clone(proof)) 519 if err != nil {
+128 -4
abciapp/snapshots.go
··· 21 "github.com/cosmos/iavl" 22 "github.com/gbl08ma/stacktrace" 23 "github.com/klauspost/compress/zstd" 24 "tangled.org/gbl08ma.com/didplcbft/store" 25 ) 26 ··· 273 } 274 defer f.Close() 275 276 - err = writeSnapshot(f, d.indexDB, it) 277 if err != nil { 278 return stacktrace.Propagate(err) 279 } ··· 309 return nil 310 } 311 312 - func writeSnapshot(writerSeeker io.WriteSeeker, indexDB dbm.DB, it *iavl.ImmutableTree) error { 313 writtenUntilReservedFields := 0 314 315 bw := bufio.NewWriter(writerSeeker) ··· 357 return stacktrace.Propagate(err) 358 } 359 360 - numIndexEntries, err := exportIndexEntries(indexDB, it.Version(), zstdw) 361 if err != nil { 362 return stacktrace.Propagate(err) 363 } ··· 406 return nil 407 } 408 409 - func exportIndexEntries(indexDB dbm.DB, treeVersion int64, w io.Writer) (int64, error) { 410 didLogKeyStart := make([]byte, store.IndexDIDLogKeyLength) 411 didLogKeyStart[0] = store.IndexDIDLogKeyPrefix 412 didLogKeyEnd := slices.Repeat([]byte{0xff}, store.IndexDIDLogKeyLength) 413 didLogKeyEnd[0] = store.IndexDIDLogKeyPrefix 414 415 iterator, err := indexDB.Iterator(didLogKeyStart, didLogKeyEnd) 416 if err != nil { 417 return 0, stacktrace.Propagate(err) ··· 450 iterator.Next() 451 } 452 return numEntries, nil 453 } 454 455 func exportNodes(it *iavl.ImmutableTree, w io.Writer) (int64, error) {
··· 21 "github.com/cosmos/iavl" 22 "github.com/gbl08ma/stacktrace" 23 "github.com/klauspost/compress/zstd" 24 + _ "tangled.org/gbl08ma.com/didplcbft/badgertodbm" // for reference in comment 25 "tangled.org/gbl08ma.com/didplcbft/store" 26 ) 27 ··· 274 } 275 defer f.Close() 276 277 + err = writeSnapshot(f, d.indexDB, d.blockHeaderGetter, it) 278 if err != nil { 279 return stacktrace.Propagate(err) 280 } ··· 310 return nil 311 } 312 313 + func writeSnapshot(writerSeeker io.WriteSeeker, indexDB dbm.DB, blockHeaderGetter BlockHeaderGetter, it *iavl.ImmutableTree) error { 314 writtenUntilReservedFields := 0 315 316 bw := bufio.NewWriter(writerSeeker) ··· 358 return stacktrace.Propagate(err) 359 } 360 361 + numIndexEntries, err := exportIndexEntries(indexDB, blockHeaderGetter, it.Version(), zstdw) 362 if err != nil { 363 return stacktrace.Propagate(err) 364 } ··· 407 return nil 408 } 409 410 + func exportIndexEntries(indexDB dbm.DB, blockHeaderGetter BlockHeaderGetter, treeVersion int64, w io.Writer) (int64, error) { 411 + numDIDEntries, err := exportIndexDIDEntries(indexDB, treeVersion, w) 412 + if err != nil { 413 + return 0, stacktrace.Propagate(err) 414 + } 415 + 416 + numValidatorParticipationEntries, err := exportIndexValidatorParticipation(indexDB, treeVersion, w) 417 + if err != nil { 418 + return 0, stacktrace.Propagate(err) 419 + } 420 + 421 + numRecentBlockHeaders, err := exportRecentBlockHeaders(blockHeaderGetter, treeVersion, w) 422 + if err != nil { 423 + return 0, stacktrace.Propagate(err) 424 + } 425 + 426 + return numDIDEntries + numValidatorParticipationEntries + numRecentBlockHeaders, nil 427 + } 428 + 429 + func exportIndexDIDEntries(indexDB dbm.DB, treeVersion int64, w io.Writer) (int64, error) { 430 didLogKeyStart := make([]byte, store.IndexDIDLogKeyLength) 431 didLogKeyStart[0] = store.IndexDIDLogKeyPrefix 432 didLogKeyEnd := slices.Repeat([]byte{0xff}, store.IndexDIDLogKeyLength) 433 didLogKeyEnd[0] = store.IndexDIDLogKeyPrefix 434 435 + // reading the index using an iterator while writes happen to its domain technically violates the documented contract for [dbm.Iterator] 436 + // in practice, we know this is safe because of how we implemented [badgertodbm.BadgerDB.IteratorWithOptions] - it uses a read-only transaction per iterator 437 iterator, err := indexDB.Iterator(didLogKeyStart, didLogKeyEnd) 438 if err != nil { 439 return 0, stacktrace.Propagate(err) ··· 472 iterator.Next() 473 } 474 return numEntries, nil 475 + } 476 + 477 + func exportIndexValidatorParticipation(indexDB dbm.DB, treeVersion int64, w io.Writer) (int64, error) { 478 + epochHeight := uint64(treeVersion) - uint64(treeVersion)%UpdateValidatorsBlockInterval 479 + startKey := store.MarshalValidatorVotingActivityKey(uint64(epochHeight), make([]byte, store.AddressLength)) 480 + endKey := store.MarshalValidatorVotingActivityKey(uint64(epochHeight), slices.Repeat([]byte{0xff}, store.AddressLength)) 481 + 482 + // reading the index using an iterator while writes happen to its domain technically violates the documented contract for [dbm.Iterator] 483 + // in practice, we know this is safe because of how we implemented [badgertodbm.BadgerDB.IteratorWithOptions] - it uses a read-only transaction per iterator 484 + iterator, err := indexDB.Iterator(startKey, endKey) 485 + if err != nil { 486 + return 0, stacktrace.Propagate(err) 487 + } 488 + defer iterator.Close() 489 + 490 + numEntries := int64(0) 491 + for iterator.Valid() { 492 + key := iterator.Key() 493 + value := iterator.Value() 494 + 495 + header := make([]byte, 4+4) 496 + binary.BigEndian.PutUint32(header, uint32(len(key))) 497 + binary.BigEndian.PutUint32(header[4:], uint32(len(value))) 498 + 499 + _, err = w.Write(header) 500 + if err != nil { 501 + return 0, stacktrace.Propagate(err) 502 + } 503 + 504 + _, err = w.Write(key) 505 + if err != nil { 506 + return 0, stacktrace.Propagate(err) 507 + } 508 + 509 + _, err = w.Write(value) 510 + if err != nil { 511 + return 0, stacktrace.Propagate(err) 512 + } 513 + 514 + numEntries++ 515 + 516 + iterator.Next() 517 + } 518 + 519 + if numEntries == 0 { 520 + // there should always be at least one active validator 521 + return 0, stacktrace.NewError("unexpectedly missing index entries for validator voting participation - treeVersion may be too old to export") 522 + } 523 + 524 + return numEntries, nil 525 + } 526 + 527 + func exportRecentBlockHeaders(blockHeaderGetter BlockHeaderGetter, treeVersion int64, w io.Writer) (int64, error) { 528 + // export sufficient block headers for the nodes resuming from this snapshot to be able to e.g. execute TransactionActionCompleteChallenge 529 + // (i.e. transactions which depend on recent block headers to determine end state) 530 + 531 + numBlockHeaders := max(CommitToChallengeMaxAgeInBlocks, CompleteChallengeMaxAgeInBlocks) + 5 // 5 blocks safety margin 532 + startHeight := treeVersion - int64(numBlockHeaders) 533 + if startHeight < 1 { 534 + startHeight = 1 535 + } 536 + 537 + numExportedBlockHeaders := int64(0) 538 + for height := startHeight; height <= treeVersion; height++ { 539 + blockHeader, err := blockHeaderGetter(height) 540 + if err != nil { 541 + return 0, stacktrace.Propagate(err) 542 + } 543 + 544 + blockHeaderProto := blockHeader.ToProto() 545 + blockHeaderBytes, err := blockHeaderProto.Marshal() 546 + if err != nil { 547 + return 0, stacktrace.Propagate(err) 548 + } 549 + 550 + key := make([]byte, 1+8) 551 + key[0] = store.IndexBlockHeaderKeyPrefix 552 + binary.BigEndian.PutUint64(key[1:], uint64(height)) 553 + 554 + header := make([]byte, 4+4) 555 + binary.BigEndian.PutUint32(header, uint32(len(key))) 556 + binary.BigEndian.PutUint32(header[4:], uint32(len(blockHeaderBytes))) 557 + 558 + _, err = w.Write(header) 559 + if err != nil { 560 + return 0, stacktrace.Propagate(err) 561 + } 562 + 563 + _, err = w.Write(key) 564 + if err != nil { 565 + return 0, stacktrace.Propagate(err) 566 + } 567 + 568 + _, err = w.Write(blockHeaderBytes) 569 + if err != nil { 570 + return 0, stacktrace.Propagate(err) 571 + } 572 + 573 + numExportedBlockHeaders++ 574 + } 575 + 576 + return numExportedBlockHeaders, nil 577 } 578 579 func exportNodes(it *iavl.ImmutableTree, w io.Writer) (int64, error) {
+24 -10
abciapp/tx.go
··· 7 abcitypes "github.com/cometbft/cometbft/abci/types" 8 "github.com/cometbft/cometbft/crypto" 9 "github.com/cometbft/cometbft/crypto/ed25519" 10 - "github.com/cometbft/cometbft/crypto/secp256k1" 11 - bftstore "github.com/cometbft/cometbft/store" 12 cmttypes "github.com/cometbft/cometbft/types" 13 "github.com/gbl08ma/stacktrace" 14 cbornode "github.com/ipfs/go-ipld-cbor" ··· 23 24 type TransactionAction string 25 26 type TransactionProcessorDependencies struct { 27 workingHeight int64 28 readTx transaction.Read ··· 31 getAuthoritativeOperationsFetcher func(plc string) *authoritativeOperationsFetcher 32 destroyAuthoritativeOperationsFetcher func() 33 blockChallengeCoordinator *blockChallengeCoordinator 34 - blockStore *bftstore.BlockStore 35 } 36 37 type TransactionProcessor func(ctx context.Context, deps TransactionProcessorDependencies, txBytes []byte) (*processResult, error) ··· 101 Key []byte `json:"key" refmt:"key"` 102 } 103 104 func init() { 105 cbornode.RegisterCborType(PubKeyInArguments{}) 106 } ··· 144 return false, nil, stacktrace.Propagate(err) 145 } 146 147 - var pubKey crypto.PubKey 148 - switch publicKey.Type { 149 - case ed25519.PubKeyName: 150 - pubKey = ed25519.PubKey(publicKey.Key) 151 - case secp256k1.PubKeyName: 152 - pubKey = secp256k1.PubKey(publicKey.Key) 153 } 154 155 return pubKey.VerifySignature(bytesToSign, sig), pubKey, nil ··· 168 } 169 170 type processResult struct { 171 - commitSideEffects []func() 172 173 Code uint32 174 Data []byte
··· 7 abcitypes "github.com/cometbft/cometbft/abci/types" 8 "github.com/cometbft/cometbft/crypto" 9 "github.com/cometbft/cometbft/crypto/ed25519" 10 cmttypes "github.com/cometbft/cometbft/types" 11 "github.com/gbl08ma/stacktrace" 12 cbornode "github.com/ipfs/go-ipld-cbor" ··· 21 22 type TransactionAction string 23 24 + func (a TransactionAction) SubmittableViaMempool() bool { 25 + return a != TransactionActionAuthoritativeImport && a != TransactionActionUpdateValidators 26 + } 27 + 28 + func (a TransactionAction) MustBeTrailingInBlock() bool { 29 + return a == TransactionActionAuthoritativeImport || a == TransactionActionUpdateValidators 30 + } 31 + 32 type TransactionProcessorDependencies struct { 33 workingHeight int64 34 readTx transaction.Read ··· 37 getAuthoritativeOperationsFetcher func(plc string) *authoritativeOperationsFetcher 38 destroyAuthoritativeOperationsFetcher func() 39 blockChallengeCoordinator *blockChallengeCoordinator 40 + blockHeaderGetter BlockHeaderGetter 41 } 42 43 type TransactionProcessor func(ctx context.Context, deps TransactionProcessorDependencies, txBytes []byte) (*processResult, error) ··· 107 Key []byte `json:"key" refmt:"key"` 108 } 109 110 + func (p PubKeyInArguments) ToPubKey() (crypto.PubKey, error) { 111 + switch p.Type { 112 + case ed25519.PubKeyName: 113 + return ed25519.PubKey(p.Key), nil 114 + default: 115 + return nil, stacktrace.NewError("unknown or unsupported key type") 116 + } 117 + } 118 + 119 func init() { 120 cbornode.RegisterCborType(PubKeyInArguments{}) 121 } ··· 159 return false, nil, stacktrace.Propagate(err) 160 } 161 162 + pubKey, err := publicKey.ToPubKey() 163 + if err != nil { 164 + return false, nil, stacktrace.Propagate(err) 165 } 166 167 return pubKey.VerifySignature(bytesToSign, sig), pubKey, nil ··· 180 } 181 182 type processResult struct { 183 + commitSideEffects []func() 184 + validatorUpdates []abcitypes.ValidatorUpdate 185 + rejectEntireProposal bool 186 187 Code uint32 188 Data []byte
+32 -32
abciapp/tx_challenge.go
··· 18 19 const CommitToChallengeMaxAgeInBlocks = 3 20 const CommitToChallengeMinRange = 1000 21 - const CommitToChallengeMaxRange = 10000 22 - const CommitToChallengeTargetInterval = 5000 23 24 - // TODO adjust these depending on how fast we want inactive validators to lose reputation 25 - // TODO reputation loss (and gain?) should probably be based on a % of the current reputation 26 - // or loss should stack (multiplicatively?) if a validator remains inactive for too long 27 - // so that if a very reputable validator goes offline, it doesn't continue to have a lot of voting power for too long 28 - // perhaps simpler idea: in addition to entropy, apply additional (separate) penalty based on age of last proven height 29 - // (store.Consensus.ValidatorRangeChallengeCompletion) when age crosses some threshold 30 const ReputationGainPerProvenBlock = 100 31 const ReputationEntropyLossPerBlock = 90 32 ··· 77 }, nil 78 } 79 80 - validatorAddress := validatorPubKey.Address() 81 - 82 if tx.Arguments.ToHeight < tx.Arguments.FromHeight || tx.Arguments.ToHeight >= deps.workingHeight { 83 return &processResult{ 84 Code: 4201, ··· 107 }, nil 108 } 109 110 - toHeightBlockMeta := deps.blockStore.LoadBlockMeta(tx.Arguments.ToHeight) 111 - if toHeightBlockMeta == nil { 112 return &processResult{ 113 Code: 4205, 114 Log: "unknown block in challenge range", 115 }, nil 116 } 117 118 - if deps.readTx.Timestamp().Sub(toHeightBlockMeta.Header.Time) > CommitToChallengeMaxAge { 119 return &processResult{ 120 Code: 4206, 121 Log: "outdated challenge range", 122 }, nil 123 } 124 125 - currentCompletion, err := store.Consensus.ValidatorRangeChallengeCompletion(deps.readTx, validatorAddress) 126 if err == nil { 127 if tx.Arguments.FromHeight <= int64(currentCompletion) { 128 return &processResult{ ··· 154 proofHeight := int64(binary.BigEndian.Uint64(existenceProof.Key)) 155 156 expectedProofHeight := computeHeightToProveInRange( 157 - toHeightBlockMeta.Header.LastCommitHash.Bytes(), 158 - validatorAddress.Bytes(), 159 tx.Arguments.FromHeight, 160 tx.Arguments.ToHeight, 161 mo.None[int64]()) ··· 167 }, nil 168 } 169 170 - blockProofValid, err := deps.blockChallengeCoordinator.verifyBlockChallengeProof(int64(proofHeight), validatorAddress, existenceProof.Value) 171 if err != nil { 172 return nil, stacktrace.Propagate(err) 173 } ··· 188 if writeTx, ok := deps.writeTx.Get(); ok { 189 err = store.Consensus.SetValidatorRangeChallengeCommitment( 190 writeTx, 191 - validatorAddress, 192 uint64(tx.Arguments.FromHeight), 193 uint64(tx.Arguments.ToHeight), 194 uint64(proofHeight), ··· 209 type CompleteChallengeArguments struct { 210 // This transaction is not signed. It is a no-op if it isn't valid and we don't really care if an entity is able to complete a challenge on behalf of another validator 211 // (that would be quite an achievement, on the level of a validator being able to find a collision for the committed root in order to fake not doing all the work) 212 - Validator []byte `json:"validator" refmt:"validator"` 213 214 // this shall be a membership proof on the same tree the validator previously committed to, 215 // for the key deterministically-randomly determined by the last_commit_hash of the block _after_ the one ··· 236 }, nil 237 } 238 239 - fromHeight, toHeight, provenHeight, includedOnHeight, committedTreeRoot, err := store.Consensus.ValidatorRangeChallengeCommitment(deps.readTx, tx.Arguments.Validator) 240 if err != nil { 241 if errors.Is(err, store.ErrNoActiveChallengeCommitment) { 242 return &processResult{ ··· 255 }, nil 256 } 257 258 - blockAfterMeta := deps.blockStore.LoadBlockMeta(int64(includedOnHeight + 1)) 259 - if blockAfterMeta == nil { 260 // this shouldn't happen unless the prover is submitting the completion on the same block as the commitment 261 return &processResult{ 262 Code: 4302, ··· 264 }, nil 265 } 266 267 - if deps.readTx.Timestamp().Sub(blockAfterMeta.Header.Time) > CompleteChallengeMaxAge { 268 // validator must commit to a new challenge 269 return &processResult{ 270 Code: 4303, ··· 284 285 proofHeight := int64(binary.BigEndian.Uint64(existenceProof.Key)) 286 287 expectedProofHeight := computeHeightToProveInRange( 288 - blockAfterMeta.Header.LastCommitHash.Bytes(), 289 - tx.Arguments.Validator, 290 int64(fromHeight), 291 int64(toHeight), 292 mo.Some(int64(provenHeight))) 293 294 if proofHeight != expectedProofHeight { 295 return &processResult{ 296 - Code: 4305, 297 Log: "incorrect key proven", 298 }, nil 299 } 300 301 - blockProofValid, err := deps.blockChallengeCoordinator.verifyBlockChallengeProof(int64(proofHeight), tx.Arguments.Validator, existenceProof.Value) 302 if err != nil { 303 return nil, stacktrace.Propagate(err) 304 } 305 if !blockProofValid { 306 return &processResult{ 307 - Code: 4306, 308 Log: "invalid proof", 309 }, nil 310 } 311 312 if !ics23.VerifyMembership(ics23.IavlSpec, committedTreeRoot, proof, existenceProof.Key, existenceProof.Value) { 313 return &processResult{ 314 - Code: 4307, 315 Log: "invalid proof", 316 }, nil 317 } 318 319 if writeTx, ok := deps.writeTx.Get(); ok { 320 - err = store.Consensus.ClearValidatorRangeChallengeCommitment(writeTx, tx.Arguments.Validator) 321 if err != nil { 322 return nil, stacktrace.Propagate(err) 323 } 324 325 - err = store.Consensus.SetValidatorRangeChallengeCompletion(writeTx, tx.Arguments.Validator, toHeight) 326 if err != nil { 327 return nil, stacktrace.Propagate(err) 328 } ··· 330 numProvenBlocks := toHeight - fromHeight + 1 331 repGain := numProvenBlocks * ReputationGainPerProvenBlock 332 333 - err = store.Consensus.ChangeValidatorReputation(writeTx, tx.Arguments.Validator, int64(repGain)) 334 if err != nil { 335 return nil, stacktrace.Propagate(err) 336 }
··· 18 19 const CommitToChallengeMaxAgeInBlocks = 3 20 const CommitToChallengeMinRange = 1000 21 + const CommitToChallengeMaxRange = 5000 22 + const CommitToChallengeTargetInterval = 2000 23 24 const ReputationGainPerProvenBlock = 100 25 const ReputationEntropyLossPerBlock = 90 26 ··· 71 }, nil 72 } 73 74 if tx.Arguments.ToHeight < tx.Arguments.FromHeight || tx.Arguments.ToHeight >= deps.workingHeight { 75 return &processResult{ 76 Code: 4201, ··· 99 }, nil 100 } 101 102 + toHeightBlockHeader, err := deps.blockHeaderGetter(tx.Arguments.ToHeight) 103 + if err != nil { 104 return &processResult{ 105 Code: 4205, 106 Log: "unknown block in challenge range", 107 }, nil 108 } 109 110 + if deps.readTx.Timestamp().Sub(toHeightBlockHeader.Time) > CommitToChallengeMaxAge { 111 return &processResult{ 112 Code: 4206, 113 Log: "outdated challenge range", 114 }, nil 115 } 116 117 + currentCompletion, err := store.Consensus.ValidatorRangeChallengeCompletion(deps.readTx, validatorPubKey.Bytes()) 118 if err == nil { 119 if tx.Arguments.FromHeight <= int64(currentCompletion) { 120 return &processResult{ ··· 146 proofHeight := int64(binary.BigEndian.Uint64(existenceProof.Key)) 147 148 expectedProofHeight := computeHeightToProveInRange( 149 + toHeightBlockHeader.LastCommitHash.Bytes(), 150 + validatorPubKey, 151 tx.Arguments.FromHeight, 152 tx.Arguments.ToHeight, 153 mo.None[int64]()) ··· 159 }, nil 160 } 161 162 + blockProofValid, err := deps.blockChallengeCoordinator.verifyBlockChallengeProof(int64(proofHeight), validatorPubKey.Address(), existenceProof.Value) 163 if err != nil { 164 return nil, stacktrace.Propagate(err) 165 } ··· 180 if writeTx, ok := deps.writeTx.Get(); ok { 181 err = store.Consensus.SetValidatorRangeChallengeCommitment( 182 writeTx, 183 + validatorPubKey.Bytes(), 184 uint64(tx.Arguments.FromHeight), 185 uint64(tx.Arguments.ToHeight), 186 uint64(proofHeight), ··· 201 type CompleteChallengeArguments struct { 202 // This transaction is not signed. It is a no-op if it isn't valid and we don't really care if an entity is able to complete a challenge on behalf of another validator 203 // (that would be quite an achievement, on the level of a validator being able to find a collision for the committed root in order to fake not doing all the work) 204 + ValidatorPubKey PubKeyInArguments `json:"validator" refmt:"validator"` 205 206 // this shall be a membership proof on the same tree the validator previously committed to, 207 // for the key deterministically-randomly determined by the last_commit_hash of the block _after_ the one ··· 228 }, nil 229 } 230 231 + fromHeight, toHeight, provenHeight, includedOnHeight, committedTreeRoot, err := store.Consensus.ValidatorRangeChallengeCommitment(deps.readTx, tx.Arguments.ValidatorPubKey.Key) 232 if err != nil { 233 if errors.Is(err, store.ErrNoActiveChallengeCommitment) { 234 return &processResult{ ··· 247 }, nil 248 } 249 250 + blockAfterHeader, err := deps.blockHeaderGetter(int64(includedOnHeight + 1)) 251 + if err != nil { 252 // this shouldn't happen unless the prover is submitting the completion on the same block as the commitment 253 return &processResult{ 254 Code: 4302, ··· 256 }, nil 257 } 258 259 + if deps.readTx.Timestamp().Sub(blockAfterHeader.Time) > CompleteChallengeMaxAge { 260 // validator must commit to a new challenge 261 return &processResult{ 262 Code: 4303, ··· 276 277 proofHeight := int64(binary.BigEndian.Uint64(existenceProof.Key)) 278 279 + pubKey, err := tx.Arguments.ValidatorPubKey.ToPubKey() 280 + if err != nil || proof.GetExist() == nil { 281 + return &processResult{ 282 + Code: 4305, 283 + Log: "invalid public key", 284 + }, nil 285 + } 286 + 287 expectedProofHeight := computeHeightToProveInRange( 288 + blockAfterHeader.LastCommitHash.Bytes(), 289 + pubKey, 290 int64(fromHeight), 291 int64(toHeight), 292 mo.Some(int64(provenHeight))) 293 294 if proofHeight != expectedProofHeight { 295 return &processResult{ 296 + Code: 4306, 297 Log: "incorrect key proven", 298 }, nil 299 } 300 301 + blockProofValid, err := deps.blockChallengeCoordinator.verifyBlockChallengeProof(int64(proofHeight), pubKey.Address(), existenceProof.Value) 302 if err != nil { 303 return nil, stacktrace.Propagate(err) 304 } 305 if !blockProofValid { 306 return &processResult{ 307 + Code: 4307, 308 Log: "invalid proof", 309 }, nil 310 } 311 312 if !ics23.VerifyMembership(ics23.IavlSpec, committedTreeRoot, proof, existenceProof.Key, existenceProof.Value) { 313 return &processResult{ 314 + Code: 4308, 315 Log: "invalid proof", 316 }, nil 317 } 318 319 if writeTx, ok := deps.writeTx.Get(); ok { 320 + err = store.Consensus.ClearValidatorRangeChallengeCommitment(writeTx, pubKey.Bytes()) 321 if err != nil { 322 return nil, stacktrace.Propagate(err) 323 } 324 325 + err = store.Consensus.SetValidatorRangeChallengeCompletion(writeTx, pubKey.Bytes(), toHeight) 326 if err != nil { 327 return nil, stacktrace.Propagate(err) 328 } ··· 330 numProvenBlocks := toHeight - fromHeight + 1 331 repGain := numProvenBlocks * ReputationGainPerProvenBlock 332 333 + err = store.Consensus.ChangeValidatorReputation(writeTx, pubKey.Bytes(), int64(repGain)) 334 if err != nil { 335 return nil, stacktrace.Propagate(err) 336 }
+275
abciapp/tx_epoch.go
···
··· 1 + package abciapp 2 + 3 + import ( 4 + "bytes" 5 + "container/heap" 6 + "context" 7 + "errors" 8 + "fmt" 9 + "sort" 10 + 11 + abcitypes "github.com/cometbft/cometbft/abci/types" 12 + "github.com/cometbft/cometbft/crypto/ed25519" 13 + protocrypto "github.com/cometbft/cometbft/proto/tendermint/crypto" 14 + "github.com/gbl08ma/stacktrace" 15 + cbornode "github.com/ipfs/go-ipld-cbor" 16 + "tangled.org/gbl08ma.com/didplcbft/store" 17 + ) 18 + 19 + const UpdateValidatorsBlockInterval = 10000 20 + const MaxActiveValidators = 50 21 + const MinReputationForBecomingValidator = 20000 22 + 23 + func init() { 24 + store.Consensus.ConfigureEpochSize(UpdateValidatorsBlockInterval) 25 + } 26 + 27 + func isUpdateValidatorsTxHeight(height int64) bool { 28 + return height%UpdateValidatorsBlockInterval == 0 29 + } 30 + 31 + var TransactionActionUpdateValidators = registerTransactionAction[UpdateValidatorsArguments]("UpdateValidators", processUpdateValidatorsTx) 32 + 33 + type UpdateValidatorsArguments struct{} 34 + 35 + func (UpdateValidatorsArguments) ForAction() TransactionAction { 36 + return TransactionActionUpdateValidators 37 + } 38 + 39 + func init() { 40 + cbornode.RegisterCborType(UpdateValidatorsArguments{}) 41 + cbornode.RegisterCborType(Transaction[UpdateValidatorsArguments]{}) 42 + } 43 + 44 + func computeVotingPowerFromReputation(reputation uint64) uint64 { 45 + if reputation < MinReputationForBecomingValidator { 46 + return 0 47 + } 48 + return reputation - MinReputationForBecomingValidator // TODO design and apply S-curve 49 + } 50 + 51 + func processUpdateValidatorsTx(ctx context.Context, deps TransactionProcessorDependencies, txBytes []byte) (*processResult, error) { 52 + _, err := UnmarshalTransaction[UpdateValidatorsArguments](txBytes) 53 + if err != nil { 54 + return &processResult{ 55 + Code: 4000, 56 + Log: err.Error(), 57 + }, nil 58 + } 59 + 60 + if !isUpdateValidatorsTxHeight(deps.workingHeight) { 61 + return &processResult{ 62 + rejectEntireProposal: true, // only a misbehaving validator would cause this 63 + Code: 4400, 64 + Log: fmt.Sprintf("update validators transaction can only be executed on block heights that are multiples of %d", UpdateValidatorsBlockInterval), 65 + }, nil 66 + } 67 + 68 + writeTx, ok := deps.writeTx.Get() 69 + if !ok { 70 + return &processResult{}, nil 71 + } 72 + 73 + oldActiveValidatorSet := make(map[[store.PublicKeyLength]byte]uint64, MaxActiveValidators) 74 + // use previous epoch 75 + if deps.workingHeight > 0 { 76 + prevEpoch := deps.workingHeight - UpdateValidatorsBlockInterval 77 + for v := range store.Consensus.ActiveValidatorsIterator(deps.readTx, uint64(prevEpoch), &err) { 78 + oldActiveValidatorSet[[store.PublicKeyLength]byte(v.PublicKey)] = v.VoteCount 79 + } 80 + if err != nil { 81 + return nil, stacktrace.Propagate(err) 82 + } 83 + } 84 + 85 + valHeap := make(validatorHeap, 0, MaxActiveValidators) 86 + 87 + validatorUpdates := []abcitypes.ValidatorUpdate{} 88 + 89 + err = store.Consensus.ChangeAllNonZeroValidatorReputations(writeTx, func(validatorPubKey []byte, reputation uint64) (uint64, error) { 90 + rangeChallengeCompletion, err := store.Consensus.ValidatorRangeChallengeCompletion(writeTx.Downgrade(), validatorPubKey) 91 + if err != nil { 92 + if !errors.Is(err, store.ErrNoRecentChallengeCompletion) { 93 + return 0, stacktrace.Propagate(err) 94 + } 95 + rangeChallengeCompletion = 0 96 + } 97 + 98 + voteCount, hasVotingPower := oldActiveValidatorSet[[store.PublicKeyLength]byte(validatorPubKey)] 99 + votesIncludedInFraction := float64(voteCount) / float64(UpdateValidatorsBlockInterval) 100 + 101 + decrease := computeReputationDecrease(uint64(deps.workingHeight), reputation, rangeChallengeCompletion, votesIncludedInFraction, hasVotingPower) 102 + if decrease > reputation { 103 + reputation = 0 104 + } else { 105 + reputation -= decrease 106 + } 107 + 108 + votingPower := computeVotingPowerFromReputation(reputation) 109 + if votingPower > 0 { 110 + vwvp := validatorWithVotingPower{ 111 + validatorPubKey: validatorPubKey, 112 + votingPower: votingPower, 113 + } 114 + 115 + if valHeap.Len() < MaxActiveValidators { 116 + heap.Push(&valHeap, vwvp) 117 + } else if votingPower > valHeap[0].votingPower { 118 + heap.Pop(&valHeap) 119 + heap.Push(&valHeap, vwvp) 120 + } 121 + } 122 + 123 + return reputation, nil 124 + }) 125 + 126 + if err != nil { 127 + return nil, stacktrace.Propagate(err) 128 + } 129 + 130 + if valHeap.Len() == 0 { 131 + // we would end up without active validators, this will cause CometBFT to halt the chain 132 + // this is mostly expected when the chain has just started and validators are yet to meet MinReputationForBecomingValidator 133 + // keep the existing set of validators unchanged until on-chain conditions cause future newActiveValidatorSet to not be empty 134 + return &processResult{ 135 + Code: 0, 136 + }, nil 137 + } 138 + 139 + newActiveValidatorSet := make(map[[store.PublicKeyLength]byte]struct{}, valHeap.Len()) 140 + 141 + // iterate directly over heap storage as order doesn't matter (we'll sort the validator updates later) 142 + for _, vwvp := range valHeap { 143 + newActiveValidatorSet[[store.PublicKeyLength]byte(vwvp.validatorPubKey)] = struct{}{} 144 + 145 + validatorUpdates = append(validatorUpdates, abcitypes.ValidatorUpdate{ 146 + PubKey: protocrypto.PublicKey{ 147 + Sum: &protocrypto.PublicKey_Ed25519{ 148 + Ed25519: vwvp.validatorPubKey[:], 149 + }, 150 + }, 151 + Power: int64(vwvp.votingPower), 152 + }) 153 + 154 + // mark validators as active in the new epoch, initializing their participation bitfields and storing the relation between address and pubkey 155 + address := ed25519.PubKey(vwvp.validatorPubKey).Address().Bytes() 156 + err = store.Consensus.InitializeValidatorVotingActivity(writeTx, address, vwvp.validatorPubKey, uint64(deps.workingHeight)) 157 + if err != nil { 158 + return nil, stacktrace.Propagate(err) 159 + } 160 + } 161 + 162 + // figure out which validators are no longer active so we can set their voting power to 0 163 + for validatorPubKey := range oldActiveValidatorSet { 164 + if _, ok := newActiveValidatorSet[validatorPubKey]; !ok { 165 + validatorUpdates = append(validatorUpdates, abcitypes.ValidatorUpdate{ 166 + PubKey: protocrypto.PublicKey{ 167 + Sum: &protocrypto.PublicKey_Ed25519{ 168 + Ed25519: validatorPubKey[:], 169 + }, 170 + }, 171 + Power: 0, 172 + }) 173 + } 174 + } 175 + 176 + // sort validator updates by public key to ensure determinism 177 + sort.Slice(validatorUpdates, func(i, j int) bool { 178 + return bytes.Compare(validatorUpdates[i].PubKey.GetEd25519(), validatorUpdates[j].PubKey.GetEd25519()) < 0 179 + }) 180 + 181 + return &processResult{ 182 + commitSideEffects: []func(){ 183 + func() { 184 + // TODO delete old vote tallies 185 + }, 186 + }, 187 + validatorUpdates: validatorUpdates, 188 + Code: 0, 189 + }, nil 190 + } 191 + 192 + type validatorWithVotingPower struct { 193 + validatorPubKey []byte 194 + votingPower uint64 195 + } 196 + 197 + type validatorHeap []validatorWithVotingPower 198 + 199 + func (h validatorHeap) Len() int { return len(h) } 200 + func (h validatorHeap) Less(i, j int) bool { return h[i].votingPower < h[j].votingPower } 201 + func (h validatorHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } 202 + 203 + func (h *validatorHeap) Push(x any) { 204 + *h = append(*h, x.(validatorWithVotingPower)) 205 + } 206 + 207 + func (h *validatorHeap) Pop() any { 208 + old := *h 209 + n := len(old) 210 + x := old[n-1] 211 + *h = old[0 : n-1] 212 + return x 213 + } 214 + 215 + func (d *DIDPLCApplication) maybeCreateUpdateValidatorsTx(_ context.Context) ([]byte, error) { 216 + if !isUpdateValidatorsTxHeight(d.tree.WorkingVersion()) { 217 + return nil, nil 218 + } 219 + 220 + tx := Transaction[UpdateValidatorsArguments]{ 221 + Action: TransactionActionUpdateValidators, 222 + Arguments: UpdateValidatorsArguments{}, 223 + } 224 + 225 + out, err := cbornode.DumpObject(tx) 226 + if err != nil { 227 + return nil, stacktrace.Propagate(err) 228 + } 229 + return out, nil 230 + } 231 + 232 + func computeReputationDecrease(workingHeight uint64, reputation uint64, rangeChallengeCompletion uint64, voteInclusionFrequency float64, validatorHasVotingPower bool) uint64 { 233 + const expectedGainForCompletelyActiveValidator = ReputationGainPerProvenBlock * UpdateValidatorsBlockInterval 234 + entropyLoss := ReputationEntropyLossPerBlock * UpdateValidatorsBlockInterval 235 + 236 + if reputation < expectedGainForCompletelyActiveValidator { 237 + // smooth things over for validators that are just starting, 238 + // while still allowing inactive validators to gradually lose reputation to the point of being forgotten 239 + entropyLoss /= 4 240 + } 241 + 242 + decrease := uint64(entropyLoss) 243 + 244 + rangeChallengeMissedEpochs := (workingHeight - rangeChallengeCompletion) / UpdateValidatorsBlockInterval 245 + // allow for missing one epoch without penalty 246 + missedRangeChallengePenalty := float64(max(0, int64(rangeChallengeMissedEpochs)-1)) * 0.03 247 + if missedRangeChallengePenalty > 0.15 { 248 + // avoid a too sharp drop off 249 + missedRangeChallengePenalty = 0.15 250 + } 251 + if missedRangeChallengePenalty > 0 { 252 + penaltyInt := uint64(float64(reputation) * missedRangeChallengePenalty) 253 + if reputation < penaltyInt { 254 + return 0 255 + } 256 + decrease += penaltyInt 257 + } 258 + 259 + // penalize active validators that haven't been voting 260 + // note: we expect even "perfect" validators to miss some votes because we align the participation bitmasks to the epoch, 261 + // but this doesn't perfectly align with what happens in practice: 262 + // 1. DecidedLastCommit.Votes, as the name implies, has the votes for the _previous_ height but we mark them as being associated with the current height 263 + // 2. MarkValidatorVote only runs on FinalizeBlock, which is called after the epoch transaction has been processed 264 + // 3. Validator updates take a few blocks to fully take effect, meaning validators might only become (in)active after the epoch transaction has been processed 265 + if validatorHasVotingPower { 266 + switch { 267 + case voteInclusionFrequency < 0.1: 268 + decrease += 5 * expectedGainForCompletelyActiveValidator 269 + case voteInclusionFrequency < 0.7: 270 + decrease += uint64(5 * float64(expectedGainForCompletelyActiveValidator) * (0.7 - voteInclusionFrequency) / 0.7) 271 + } 272 + } 273 + 274 + return decrease 275 + }
+1
go.mod
··· 35 36 require ( 37 github.com/DataDog/zstd v1.4.5 // indirect 38 github.com/beorn7/perks v1.0.1 // indirect 39 github.com/bits-and-blooms/bitset v1.24.2 // indirect 40 github.com/blang/semver/v4 v4.0.0 // indirect
··· 35 36 require ( 37 github.com/DataDog/zstd v1.4.5 // indirect 38 + github.com/OffchainLabs/go-bitfield v0.0.0-20251031151322-f427d04d8506 // indirect 39 github.com/beorn7/perks v1.0.1 // indirect 40 github.com/bits-and-blooms/bitset v1.24.2 // indirect 41 github.com/blang/semver/v4 v4.0.0 // indirect
+2
go.sum
··· 9 github.com/Microsoft/go-winio v0.6.1/go.mod h1:LRdKpFKfdobln8UmuiYcKPot9D2v6svN5+sAH+4kjUM= 10 github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 h1:TngWCqHvy9oXAN6lEVMRuU21PR1EtLVZJmdB18Gu3Rw= 11 github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5/go.mod h1:lmUJ/7eu/Q8D7ML55dXQrVaamCz2vxCfdQBasLZfHKk= 12 github.com/VividCortex/gohistogram v1.0.0 h1:6+hBz+qvs0JOrrNhhmR7lFxo5sINxBCGXrdtl/UvroE= 13 github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g= 14 github.com/Yiling-J/theine-go v0.6.2 h1:1GeoXeQ0O0AUkiwj2S9Jc0Mzx+hpqzmqsJ4kIC4M9AY=
··· 9 github.com/Microsoft/go-winio v0.6.1/go.mod h1:LRdKpFKfdobln8UmuiYcKPot9D2v6svN5+sAH+4kjUM= 10 github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 h1:TngWCqHvy9oXAN6lEVMRuU21PR1EtLVZJmdB18Gu3Rw= 11 github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5/go.mod h1:lmUJ/7eu/Q8D7ML55dXQrVaamCz2vxCfdQBasLZfHKk= 12 + github.com/OffchainLabs/go-bitfield v0.0.0-20251031151322-f427d04d8506 h1:d/SJkN8/9Ca+1YmuDiUJxAiV4w/a9S8NcsG7GMQSrVI= 13 + github.com/OffchainLabs/go-bitfield v0.0.0-20251031151322-f427d04d8506/go.mod h1:6TZI4FU6zT8x6ZfWa1J8YQ2NgW0wLV/W3fHRca8ISBo= 14 github.com/VividCortex/gohistogram v1.0.0 h1:6+hBz+qvs0JOrrNhhmR7lFxo5sINxBCGXrdtl/UvroE= 15 github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g= 16 github.com/Yiling-J/theine-go v0.6.2 h1:1GeoXeQ0O0AUkiwj2S9Jc0Mzx+hpqzmqsJ4kIC4M9AY=
+72 -27
main.go
··· 16 "github.com/cometbft/cometbft/mempool" 17 "github.com/cometbft/cometbft/p2p" 18 "github.com/cometbft/cometbft/privval" 19 "github.com/cometbft/cometbft/proxy" 20 "github.com/gbl08ma/stacktrace" 21 "github.com/samber/lo" 22 "tangled.org/gbl08ma.com/didplcbft/abciapp" 23 "tangled.org/gbl08ma.com/didplcbft/badgertodbm" 24 "tangled.org/gbl08ma.com/didplcbft/httpapi" 25 26 - bftconfig "github.com/cometbft/cometbft/config" 27 cmtflags "github.com/cometbft/cometbft/libs/cli/flags" 28 cmtlog "github.com/cometbft/cometbft/libs/log" 29 nm "github.com/cometbft/cometbft/node" ··· 57 } 58 59 logger := cmtlog.NewTMLogger(cmtlog.NewSyncWriter(os.Stdout)) 60 - logger, err := cmtflags.ParseLogLevel(config.LogLevel, logger, bftconfig.DefaultLogLevel) 61 if err != nil { 62 log.Fatalf("failed to parse log level: %v", err) 63 } ··· 105 appContext, cancelAppContext := context.WithCancel(context.Background()) 106 defer cancelAppContext() 107 108 - app, txFactory, plc, cleanup, err := abciapp.NewDIDPLCApplication( 109 appContext, 110 logger, 111 pv, ··· 115 filepath.Join(homeDir, "snapshots"), 116 config.StateSync.TempDir, 117 didBloomFilterPath, 118 - mempoolSubmitter) 119 if err != nil { 120 log.Fatalf("failed to create DIDPLC application: %v", err) 121 } 122 defer cleanup() 123 124 nodeKey, err := p2p.LoadNodeKey(config.NodeKeyFile()) 125 if err != nil { ··· 132 nodeKey, 133 proxy.NewLocalClientCreator(app), 134 nm.DefaultGenesisDocProviderFunc(config.Config), 135 - bftconfig.DefaultDBProvider, 136 nm.DefaultMetricsProvider(config.Config.Instrumentation), 137 logger, 138 ) ··· 141 log.Fatalf("Creating node: %v", err) 142 } 143 144 // workaround for CometBFT bug where the temp_dir config entry is not taken into account 145 err = fixStateSyncReactorTempDir(node, config.StateSync.TempDir) 146 if err != nil { ··· 151 152 mempoolSubmitter.node = node 153 154 - err = app.FinishInitializing(node.BlockStore(), blockCreationTrigger(txsAvailableChan)) 155 if err != nil { 156 log.Fatalf("Finishing ABCI app initialization: %v", err) 157 } 158 159 - rangeChallengeCoordinator, err := abciapp.NewRangeChallengeCoordinator( 160 - appContext, 161 - logger.With("module", "plcapp"), 162 - pv, 163 - txFactory, 164 - node.BlockStore(), 165 - node.EventBus(), 166 - mempoolSubmitter, 167 - node.ConsensusReactor()) 168 - if err != nil { 169 - log.Fatalf("Creating RangeChallengeCoordinator: %v", err) 170 - } 171 - 172 err = node.Start() 173 if err != nil { 174 log.Fatalf("Starting node: %v", err) ··· 194 }() 195 } 196 197 - if pv != nil { 198 - err := rangeChallengeCoordinator.Start() 199 - if err != nil { 200 - log.Fatalf("Starting RangeChallengeCoordinator: %v", err) 201 - } 202 - defer rangeChallengeCoordinator.Wait() 203 - } 204 - 205 defer cancelAppContext() 206 207 c := make(chan os.Signal, 1) ··· 267 268 return nil 269 }
··· 16 "github.com/cometbft/cometbft/mempool" 17 "github.com/cometbft/cometbft/p2p" 18 "github.com/cometbft/cometbft/privval" 19 + cmtproto "github.com/cometbft/cometbft/proto/tendermint/types" 20 "github.com/cometbft/cometbft/proxy" 21 + cmtstore "github.com/cometbft/cometbft/store" 22 + cmttypes "github.com/cometbft/cometbft/types" 23 "github.com/gbl08ma/stacktrace" 24 "github.com/samber/lo" 25 "tangled.org/gbl08ma.com/didplcbft/abciapp" 26 "tangled.org/gbl08ma.com/didplcbft/badgertodbm" 27 "tangled.org/gbl08ma.com/didplcbft/httpapi" 28 + "tangled.org/gbl08ma.com/didplcbft/store" 29 + "tangled.org/gbl08ma.com/didplcbft/transaction" 30 31 + cmtconfig "github.com/cometbft/cometbft/config" 32 cmtflags "github.com/cometbft/cometbft/libs/cli/flags" 33 cmtlog "github.com/cometbft/cometbft/libs/log" 34 nm "github.com/cometbft/cometbft/node" ··· 62 } 63 64 logger := cmtlog.NewTMLogger(cmtlog.NewSyncWriter(os.Stdout)) 65 + logger, err := cmtflags.ParseLogLevel(config.LogLevel, logger, cmtconfig.DefaultLogLevel) 66 if err != nil { 67 log.Fatalf("failed to parse log level: %v", err) 68 } ··· 110 appContext, cancelAppContext := context.WithCancel(context.Background()) 111 defer cancelAppContext() 112 113 + // this must be done before we call NewNode, otherwise it will get a hold of the leveldb block store 114 + nodeDBProvider := cmtconfig.DefaultDBProvider 115 + 116 + recentBlockHeaders, err := readRecentBlockHeaders(nodeDBProvider, config.Config, max(abciapp.CommitToChallengeMaxAgeInBlocks, abciapp.CompleteChallengeMaxAgeInBlocks)+5) // 5 blocks safety margin 117 + if err != nil { 118 + log.Fatalf("failed to read recent block headers: %v", err) 119 + } 120 + 121 + var blockStore *cmtstore.BlockStore 122 + var txFactory *transaction.Factory 123 + blockHeaderGetter := func(height int64) (cmttypes.Header, error) { 124 + if blockStore != nil { 125 + blockMeta := blockStore.LoadBlockMeta(height) 126 + if blockMeta != nil { 127 + return blockMeta.Header, nil 128 + } 129 + } 130 + 131 + if header, ok := recentBlockHeaders[height]; ok { 132 + return header, nil 133 + } 134 + 135 + // if the headers indeed don't exist in the block store, hopefully they will have come in a state sync snapshot 136 + if txFactory != nil { 137 + readTx := txFactory.ReadWorking(time.Now()) 138 + blockHeader, err := store.Consensus.FallbackBlockHeader(readTx, uint64(height)) 139 + if err == nil { 140 + var protoHeader *cmtproto.Header 141 + err = protoHeader.Unmarshal(blockHeader) 142 + if err == nil { 143 + blockHeader, err := cmttypes.HeaderFromProto(protoHeader) 144 + return blockHeader, stacktrace.Propagate(err) 145 + } 146 + } 147 + } 148 + 149 + return cmttypes.Header{}, stacktrace.NewError("height not found") 150 + } 151 + 152 + app, txf, plc, cleanup, err := abciapp.NewDIDPLCApplication( 153 appContext, 154 logger, 155 pv, ··· 159 filepath.Join(homeDir, "snapshots"), 160 config.StateSync.TempDir, 161 didBloomFilterPath, 162 + mempoolSubmitter, 163 + blockHeaderGetter) 164 if err != nil { 165 log.Fatalf("failed to create DIDPLC application: %v", err) 166 } 167 defer cleanup() 168 + 169 + txFactory = txf 170 171 nodeKey, err := p2p.LoadNodeKey(config.NodeKeyFile()) 172 if err != nil { ··· 179 nodeKey, 180 proxy.NewLocalClientCreator(app), 181 nm.DefaultGenesisDocProviderFunc(config.Config), 182 + nodeDBProvider, 183 nm.DefaultMetricsProvider(config.Config.Instrumentation), 184 logger, 185 ) ··· 188 log.Fatalf("Creating node: %v", err) 189 } 190 191 + blockStore = node.BlockStore() 192 + 193 // workaround for CometBFT bug where the temp_dir config entry is not taken into account 194 err = fixStateSyncReactorTempDir(node, config.StateSync.TempDir) 195 if err != nil { ··· 200 201 mempoolSubmitter.node = node 202 203 + err = app.FinishInitializing(blockCreationTrigger(txsAvailableChan), node.EventBus(), node.ConsensusReactor()) 204 if err != nil { 205 log.Fatalf("Finishing ABCI app initialization: %v", err) 206 } 207 208 err = node.Start() 209 if err != nil { 210 log.Fatalf("Starting node: %v", err) ··· 230 }() 231 } 232 233 defer cancelAppContext() 234 235 c := make(chan os.Signal, 1) ··· 295 296 return nil 297 } 298 + 299 + func readRecentBlockHeaders(dbProvider cmtconfig.DBProvider, config *cmtconfig.Config, numHeaders int) (map[int64]cmttypes.Header, error) { 300 + blockStoreDB, err := dbProvider(&cmtconfig.DBContext{ID: "blockstore", Config: config}) 301 + if err != nil { 302 + return nil, stacktrace.Propagate(err) 303 + } 304 + blockStore := cmtstore.NewBlockStore(blockStoreDB) 305 + defer blockStore.Close() 306 + 307 + result := make(map[int64]cmttypes.Header) 308 + bottom := max(0, blockStore.Height()-int64(numHeaders)) 309 + for i := blockStore.Height(); i > bottom; i-- { 310 + blockMeta := blockStore.LoadBlockMeta(i) 311 + result[i] = blockMeta.Header 312 + } 313 + return result, nil 314 + }
+225 -45
store/consensus.go
··· 13 "strings" 14 "time" 15 16 "github.com/bluesky-social/indigo/atproto/syntax" 17 ics23 "github.com/cosmos/ics23/go" 18 "github.com/dgraph-io/badger/v4" ··· 27 ) 28 29 const ( 30 TreeOperationKeyPrefix = 'o' 31 TreeOperationKeyLength = 1 + 8 32 TreeRangeChallengeCommitmentKeyPrefix = 'C' 33 - TreeRangeChallengeKeyLength = 1 + 20 34 TreeChallengeCompletionKeyPrefix = 'p' 35 - TreeChallengeCompletionKeyLength = 1 + 20 36 TreeValidatorReputationKeyPrefix = 'r' 37 - TreeValidatorReputationKeyLength = 1 + 20 38 39 - IndexBlockChallengeKeyPrefix = 'c' 40 - IndexBlockChallengeKeyLength = 1 + 8 41 - IndexDIDLogKeyPrefix = 'l' 42 - IndexDIDLogKeyLength = 1 + 15 + 8 43 44 TreeAuthoritativePLCKey = "aPLCURL" 45 TreeAuthoritativeImportProgressKey = "aImportProgress" ··· 49 50 var ErrNoActiveChallengeCommitment = errors.New("the validator is currently not committed to a challenge") 51 var ErrNoRecentChallengeCompletion = errors.New("the validator has not completed a range challenge recently") 52 53 // ConsensusStore manages all information that is directly or indirectly protected by consensus 54 type ConsensusStore interface { ··· 69 AuthoritativeImportProgress(tx transaction.Read) (uint64, error) 70 SetAuthoritativeImportProgress(tx transaction.Write, nextCursor uint64) error 71 72 - ValidatorRangeChallengeCommitment(tx transaction.Read, validatorAddress []byte) (fromHeight, toHeight, provenHeight, includedOnHeight uint64, treeRoot []byte, err error) 73 - SetValidatorRangeChallengeCommitment(tx transaction.Write, validatorAddress []byte, fromHeight, toHeight, provenHeight, includedOnHeight uint64, treeRoot []byte) error 74 - ClearValidatorRangeChallengeCommitment(tx transaction.Write, validatorAddress []byte) error 75 76 - ValidatorRangeChallengeCompletion(tx transaction.Read, validatorAddress []byte) (uint64, error) 77 - SetValidatorRangeChallengeCompletion(tx transaction.Write, validatorAddress []byte, completedToHeight uint64) error 78 79 BlockChallengeProof(tx transaction.Read, height uint64) ([]byte, error) 80 - BlockChalengeProofsIterator(tx transaction.Read, afterHeight uint64, retErr *error) iter.Seq2[uint64, []byte] // afterHeight is exclusive for consistency with OperationsIterator 81 StoreBlockChallengeProof(tx transaction.WriteIndex, blockHeight uint64, proof []byte) error 82 DeleteBlockChallengeProofsBelowHeight(ctx context.Context, tx transaction.WriteIndex, blockHeight uint64) error 83 84 - ValidatorReputation(tx transaction.Read, validatorAddress []byte) (uint64, error) 85 - ChangeValidatorReputation(tx transaction.Write, validatorAddress []byte, change int64) error 86 - ChangeAllNonZeroValidatorReputations(tx transaction.Write, changer func(validatorAddress []byte, reputation uint64) (uint64, error)) error 87 } 88 89 var _ ConsensusStore = (*consensusStore)(nil) 90 91 // consensusStore exists just to groups methods nicely 92 - type consensusStore struct{} 93 94 func (t *consensusStore) AuditLog(ctx context.Context, tx transaction.Read, did string, withProof bool) ([]types.SequencedLogEntry, *ics23.CommitmentProof, error) { 95 didBytes, err := DIDToBytes(did) ··· 655 return stacktrace.Propagate(err) 656 } 657 658 - func marshalRangeChallengeCommitmentKey(validatorAddress []byte) []byte { 659 key := make([]byte, TreeRangeChallengeKeyLength) 660 key[0] = TreeRangeChallengeCommitmentKeyPrefix 661 - copy(key[1:], validatorAddress) 662 return key 663 } 664 ··· 682 } 683 684 // ValidatorRangeChallengeCommitment implements [ConsensusStore]. 685 - func (t *consensusStore) ValidatorRangeChallengeCommitment(tx transaction.Read, validatorAddress []byte) (fromHeight, toHeight, provenHeight, includedOnHeight uint64, treeRoot []byte, err error) { 686 - key := marshalRangeChallengeCommitmentKey(validatorAddress) 687 value, err := tx.Tree().Get(key) 688 if err != nil { 689 return 0, 0, 0, 0, nil, stacktrace.Propagate(err) ··· 696 } 697 698 // SetValidatorRangeChallengeCommitment implements [ConsensusStore]. 699 - func (t *consensusStore) SetValidatorRangeChallengeCommitment(tx transaction.Write, validatorAddress []byte, fromHeight, toHeight, provenHeight, includedOnHeight uint64, treeRoot []byte) error { 700 - key := marshalRangeChallengeCommitmentKey(validatorAddress) 701 value := marshalRangeChallengeCommitmentValue(fromHeight, toHeight, provenHeight, includedOnHeight, treeRoot) 702 // this may overwrite sometimes (e.g. if a previous commitment has expired and the validator needs to submit a new one) 703 _, err := tx.Tree().Set(key, value) ··· 705 } 706 707 // ClearValidatorRangeChallengeCommitment implements [ConsensusStore]. 708 - func (t *consensusStore) ClearValidatorRangeChallengeCommitment(tx transaction.Write, validatorAddress []byte) error { 709 - _, removed, err := tx.Tree().Remove(marshalRangeChallengeCommitmentKey(validatorAddress)) 710 if err != nil { 711 return stacktrace.Propagate(err) 712 } ··· 718 return nil 719 } 720 721 - func marshalRangeChallengeCompletionKey(validatorAddress []byte) []byte { 722 key := make([]byte, TreeChallengeCompletionKeyLength) 723 key[0] = TreeChallengeCompletionKeyPrefix 724 - copy(key[1:], validatorAddress) 725 return key 726 } 727 728 // ValidatorRangeChallengeCompletion implements [ConsensusStore]. 729 - func (t *consensusStore) ValidatorRangeChallengeCompletion(tx transaction.Read, validatorAddress []byte) (uint64, error) { 730 - key := marshalRangeChallengeCompletionKey(validatorAddress) 731 value, err := tx.Tree().Get(key) 732 if err != nil { 733 return 0, stacktrace.Propagate(err) ··· 739 } 740 741 // SetValidatorRangeChallengeCompletion implements [ConsensusStore]. 742 - func (t *consensusStore) SetValidatorRangeChallengeCompletion(tx transaction.Write, validatorAddress []byte, completedToHeight uint64) error { 743 - key := marshalRangeChallengeCompletionKey(validatorAddress) 744 value := binary.BigEndian.AppendUint64(nil, completedToHeight) 745 _, err := tx.Tree().Set(key, value) 746 return stacktrace.Propagate(err) ··· 759 return binary.BigEndian.Uint64(key[1:9]) 760 } 761 762 var maxBlockChallengeProofKey = marshalBlockChallengeProofKey(math.MaxInt64) 763 764 func (t *consensusStore) BlockChallengeProof(tx transaction.Read, height uint64) ([]byte, error) { ··· 767 return value, stacktrace.Propagate(err) 768 } 769 770 - func (t *consensusStore) BlockChalengeProofsIterator(tx transaction.Read, afterHeight uint64, retErr *error) iter.Seq2[uint64, []byte] { 771 return func(yield func(uint64, []byte) bool) { 772 *retErr = nil 773 // as the name suggests, after is an exclusive lower bound, but our iterators use inclusive lower bounds ··· 799 } 800 } 801 802 func (t *consensusStore) StoreBlockChallengeProof(tx transaction.WriteIndex, blockHeight uint64, proof []byte) error { 803 err := tx.IndexDB().Set(marshalBlockChallengeProofKey(blockHeight), proof) 804 return stacktrace.Propagate(err) ··· 833 return nil 834 } 835 836 - func marshalValidatorReputationKey(validatorAddress []byte) []byte { 837 key := make([]byte, TreeValidatorReputationKeyLength) 838 key[0] = TreeValidatorReputationKeyPrefix 839 - copy(key[1:], validatorAddress) 840 return key 841 } 842 843 // ValidatorReputation implements [ConsensusStore]. 844 - func (t *consensusStore) ValidatorReputation(tx transaction.Read, validatorAddress []byte) (uint64, error) { 845 - key := marshalValidatorReputationKey(validatorAddress) 846 847 value, err := tx.Tree().Get(key) 848 if err != nil { ··· 854 } 855 856 // ChangeValidatorReputation implements [ConsensusStore]. 857 - func (t *consensusStore) ChangeValidatorReputation(tx transaction.Write, validatorAddress []byte, change int64) error { 858 - key := marshalValidatorReputationKey(validatorAddress) 859 860 value, err := tx.Tree().Get(key) 861 if err != nil { ··· 873 return stacktrace.Propagate(err) 874 } 875 876 - var minReputationKey = marshalValidatorReputationKey(make([]byte, 20)) 877 - var maxReputationKey = marshalValidatorReputationKey(slices.Repeat([]byte{0xff}, 20)) 878 879 // ChangeAllNonZeroValidatorReputations implements [ConsensusStore]. 880 - func (t *consensusStore) ChangeAllNonZeroValidatorReputations(tx transaction.Write, changer func(validatorAddress []byte, reputation uint64) (uint64, error)) error { 881 // we are not allowed to make updates to the tree while an iterator is active 882 // process validators in batches of 100 to avoid loading too many key-value pairs into memory 883 const batchSize = 100 ··· 902 itr.Next() 903 } 904 905 - validatorAddrTmp := make([]byte, 20) 906 - 907 for i := 0; itr.Valid() && i < batchSize; i++ { 908 reputation := new(big.Int).SetBytes(itr.Value()) 909 910 keyCopy := slices.Clone(itr.Key()) 911 - copy(validatorAddrTmp, keyCopy[1:21]) 912 913 - newValue, err := changer(validatorAddrTmp, reputation.Uint64()) 914 if err != nil { 915 return nil, false, stacktrace.Propagate(err) 916 } ··· 967 startingKey = toSet[len(toSet)-1].key 968 } 969 }
··· 13 "strings" 14 "time" 15 16 + "github.com/OffchainLabs/go-bitfield" 17 "github.com/bluesky-social/indigo/atproto/syntax" 18 ics23 "github.com/cosmos/ics23/go" 19 "github.com/dgraph-io/badger/v4" ··· 28 ) 29 30 const ( 31 + PublicKeyLength = 32 32 + AddressLength = 20 33 + 34 TreeOperationKeyPrefix = 'o' 35 TreeOperationKeyLength = 1 + 8 36 TreeRangeChallengeCommitmentKeyPrefix = 'C' 37 + TreeRangeChallengeKeyLength = 1 + PublicKeyLength 38 TreeChallengeCompletionKeyPrefix = 'p' 39 + TreeChallengeCompletionKeyLength = 1 + PublicKeyLength 40 TreeValidatorReputationKeyPrefix = 'r' 41 + TreeValidatorReputationKeyLength = 1 + PublicKeyLength 42 43 + IndexBlockChallengeKeyPrefix = 'c' 44 + IndexBlockChallengeKeyLength = 1 + 8 45 + IndexDIDLogKeyPrefix = 'l' 46 + IndexDIDLogKeyLength = 1 + 15 + 8 47 + IndexValidatorVotingActivityPrefix = 'v' 48 + IndexValidatorVotingActivityKeyLength = 1 + AddressLength + 8 49 + IndexBlockHeaderKeyPrefix = 'h' 50 + IndexBlockHeaderKeyLength = 1 + 8 51 52 TreeAuthoritativePLCKey = "aPLCURL" 53 TreeAuthoritativeImportProgressKey = "aImportProgress" ··· 57 58 var ErrNoActiveChallengeCommitment = errors.New("the validator is currently not committed to a challenge") 59 var ErrNoRecentChallengeCompletion = errors.New("the validator has not completed a range challenge recently") 60 + var ErrValidatorNotActive = errors.New("the validator is not active") 61 62 // ConsensusStore manages all information that is directly or indirectly protected by consensus 63 type ConsensusStore interface { ··· 78 AuthoritativeImportProgress(tx transaction.Read) (uint64, error) 79 SetAuthoritativeImportProgress(tx transaction.Write, nextCursor uint64) error 80 81 + ValidatorRangeChallengeCommitment(tx transaction.Read, validatorPubKey []byte) (fromHeight, toHeight, provenHeight, includedOnHeight uint64, treeRoot []byte, err error) 82 + SetValidatorRangeChallengeCommitment(tx transaction.Write, validatorPubKey []byte, fromHeight, toHeight, provenHeight, includedOnHeight uint64, treeRoot []byte) error 83 + ClearValidatorRangeChallengeCommitment(tx transaction.Write, validatorPubKey []byte) error 84 85 + ValidatorRangeChallengeCompletion(tx transaction.Read, validatorPubKey []byte) (uint64, error) 86 + SetValidatorRangeChallengeCompletion(tx transaction.Write, validatorPubKey []byte, completedToHeight uint64) error 87 88 BlockChallengeProof(tx transaction.Read, height uint64) ([]byte, error) 89 + BlockChallengeProofsIterator(tx transaction.Read, afterHeight uint64, retErr *error) iter.Seq2[uint64, []byte] // afterHeight is exclusive for consistency with OperationsIterator 90 + BlockChallengeProofsReverseIterator(tx transaction.Read, beforeHeight uint64, retErr *error) iter.Seq2[uint64, []byte] 91 StoreBlockChallengeProof(tx transaction.WriteIndex, blockHeight uint64, proof []byte) error 92 DeleteBlockChallengeProofsBelowHeight(ctx context.Context, tx transaction.WriteIndex, blockHeight uint64) error 93 94 + ValidatorReputation(tx transaction.Read, validatorPubKey []byte) (uint64, error) 95 + ChangeValidatorReputation(tx transaction.Write, validatorPubKey []byte, change int64) error 96 + ChangeAllNonZeroValidatorReputations(tx transaction.Write, changer func(validatorPubKey []byte, reputation uint64) (uint64, error)) error 97 + 98 + ActiveValidatorsIterator(tx transaction.Read, epochHeight uint64, retErr *error) iter.Seq[ActiveValidator] 99 + InitializeValidatorVotingActivity(tx transaction.WriteIndex, validatorAddress, validatorPubKey []byte, epochHeight uint64) error 100 + MarkValidatorVote(tx transaction.WriteIndex, validatorAddress []byte, height uint64) error 101 + 102 + FallbackBlockHeader(tx transaction.Read, height uint64) ([]byte, error) 103 + 104 + ConfigureEpochSize(epochSize uint64) 105 + } 106 + 107 + type ActiveValidator struct { 108 + Address []byte 109 + PublicKey []byte 110 + VoteCount uint64 111 } 112 113 var _ ConsensusStore = (*consensusStore)(nil) 114 115 // consensusStore exists just to groups methods nicely 116 + type consensusStore struct { 117 + epochSize uint64 118 + } 119 + 120 + // ConfigureEpochSize implements [ConsensusStore]. 121 + func (t *consensusStore) ConfigureEpochSize(epochSize uint64) { 122 + t.epochSize = epochSize 123 + } 124 125 func (t *consensusStore) AuditLog(ctx context.Context, tx transaction.Read, did string, withProof bool) ([]types.SequencedLogEntry, *ics23.CommitmentProof, error) { 126 didBytes, err := DIDToBytes(did) ··· 686 return stacktrace.Propagate(err) 687 } 688 689 + func marshalRangeChallengeCommitmentKey(validatorPubKey []byte) []byte { 690 key := make([]byte, TreeRangeChallengeKeyLength) 691 key[0] = TreeRangeChallengeCommitmentKeyPrefix 692 + copy(key[1:], validatorPubKey) 693 return key 694 } 695 ··· 713 } 714 715 // ValidatorRangeChallengeCommitment implements [ConsensusStore]. 716 + func (t *consensusStore) ValidatorRangeChallengeCommitment(tx transaction.Read, validatorPubKey []byte) (fromHeight, toHeight, provenHeight, includedOnHeight uint64, treeRoot []byte, err error) { 717 + key := marshalRangeChallengeCommitmentKey(validatorPubKey) 718 value, err := tx.Tree().Get(key) 719 if err != nil { 720 return 0, 0, 0, 0, nil, stacktrace.Propagate(err) ··· 727 } 728 729 // SetValidatorRangeChallengeCommitment implements [ConsensusStore]. 730 + func (t *consensusStore) SetValidatorRangeChallengeCommitment(tx transaction.Write, validatorPubKey []byte, fromHeight, toHeight, provenHeight, includedOnHeight uint64, treeRoot []byte) error { 731 + key := marshalRangeChallengeCommitmentKey(validatorPubKey) 732 value := marshalRangeChallengeCommitmentValue(fromHeight, toHeight, provenHeight, includedOnHeight, treeRoot) 733 // this may overwrite sometimes (e.g. if a previous commitment has expired and the validator needs to submit a new one) 734 _, err := tx.Tree().Set(key, value) ··· 736 } 737 738 // ClearValidatorRangeChallengeCommitment implements [ConsensusStore]. 739 + func (t *consensusStore) ClearValidatorRangeChallengeCommitment(tx transaction.Write, validatorPubKey []byte) error { 740 + _, removed, err := tx.Tree().Remove(marshalRangeChallengeCommitmentKey(validatorPubKey)) 741 if err != nil { 742 return stacktrace.Propagate(err) 743 } ··· 749 return nil 750 } 751 752 + func marshalRangeChallengeCompletionKey(validatorPubKey []byte) []byte { 753 key := make([]byte, TreeChallengeCompletionKeyLength) 754 key[0] = TreeChallengeCompletionKeyPrefix 755 + copy(key[1:], validatorPubKey) 756 return key 757 } 758 759 // ValidatorRangeChallengeCompletion implements [ConsensusStore]. 760 + func (t *consensusStore) ValidatorRangeChallengeCompletion(tx transaction.Read, validatorPubKey []byte) (uint64, error) { 761 + key := marshalRangeChallengeCompletionKey(validatorPubKey) 762 value, err := tx.Tree().Get(key) 763 if err != nil { 764 return 0, stacktrace.Propagate(err) ··· 770 } 771 772 // SetValidatorRangeChallengeCompletion implements [ConsensusStore]. 773 + func (t *consensusStore) SetValidatorRangeChallengeCompletion(tx transaction.Write, validatorPubKey []byte, completedToHeight uint64) error { 774 + key := marshalRangeChallengeCompletionKey(validatorPubKey) 775 value := binary.BigEndian.AppendUint64(nil, completedToHeight) 776 _, err := tx.Tree().Set(key, value) 777 return stacktrace.Propagate(err) ··· 790 return binary.BigEndian.Uint64(key[1:9]) 791 } 792 793 + var minBlockChallengeProofKey = marshalBlockChallengeProofKey(0) 794 var maxBlockChallengeProofKey = marshalBlockChallengeProofKey(math.MaxInt64) 795 796 func (t *consensusStore) BlockChallengeProof(tx transaction.Read, height uint64) ([]byte, error) { ··· 799 return value, stacktrace.Propagate(err) 800 } 801 802 + func (t *consensusStore) BlockChallengeProofsIterator(tx transaction.Read, afterHeight uint64, retErr *error) iter.Seq2[uint64, []byte] { 803 return func(yield func(uint64, []byte) bool) { 804 *retErr = nil 805 // as the name suggests, after is an exclusive lower bound, but our iterators use inclusive lower bounds ··· 831 } 832 } 833 834 + // BlockChallengeProofsReverseIterator implements [ConsensusStore]. 835 + func (t *consensusStore) BlockChallengeProofsReverseIterator(tx transaction.Read, beforeHeight uint64, retErr *error) iter.Seq2[uint64, []byte] { 836 + return func(yield func(uint64, []byte) bool) { 837 + *retErr = nil 838 + startKey := minBlockChallengeProofKey 839 + endKey := marshalBlockChallengeProofKey(beforeHeight) 840 + 841 + proofsIterator, err := tx.IndexDB().ReverseIterator(startKey, endKey) 842 + if err != nil { 843 + *retErr = stacktrace.Propagate(err) 844 + return 845 + } 846 + 847 + defer proofsIterator.Close() 848 + 849 + for proofsIterator.Valid() { 850 + blockHeight := unmarshalBlockChallengeProofKey(proofsIterator.Key()) 851 + 852 + if !yield(blockHeight, proofsIterator.Value()) { 853 + return 854 + } 855 + 856 + proofsIterator.Next() 857 + } 858 + err = proofsIterator.Error() 859 + if err != nil { 860 + *retErr = stacktrace.Propagate(err) 861 + } 862 + } 863 + } 864 + 865 func (t *consensusStore) StoreBlockChallengeProof(tx transaction.WriteIndex, blockHeight uint64, proof []byte) error { 866 err := tx.IndexDB().Set(marshalBlockChallengeProofKey(blockHeight), proof) 867 return stacktrace.Propagate(err) ··· 896 return nil 897 } 898 899 + func marshalValidatorReputationKey(validatorPubKey []byte) []byte { 900 key := make([]byte, TreeValidatorReputationKeyLength) 901 key[0] = TreeValidatorReputationKeyPrefix 902 + copy(key[1:], validatorPubKey) 903 return key 904 } 905 906 // ValidatorReputation implements [ConsensusStore]. 907 + func (t *consensusStore) ValidatorReputation(tx transaction.Read, validatorPubKey []byte) (uint64, error) { 908 + key := marshalValidatorReputationKey(validatorPubKey) 909 910 value, err := tx.Tree().Get(key) 911 if err != nil { ··· 917 } 918 919 // ChangeValidatorReputation implements [ConsensusStore]. 920 + func (t *consensusStore) ChangeValidatorReputation(tx transaction.Write, validatorPubKey []byte, change int64) error { 921 + key := marshalValidatorReputationKey(validatorPubKey) 922 923 value, err := tx.Tree().Get(key) 924 if err != nil { ··· 936 return stacktrace.Propagate(err) 937 } 938 939 + var minReputationKey = marshalValidatorReputationKey(make([]byte, 32)) 940 + var maxReputationKey = marshalValidatorReputationKey(slices.Repeat([]byte{0xff}, 32)) 941 942 // ChangeAllNonZeroValidatorReputations implements [ConsensusStore]. 943 + func (t *consensusStore) ChangeAllNonZeroValidatorReputations(tx transaction.Write, changer func(validatorPubKey []byte, reputation uint64) (uint64, error)) error { 944 // we are not allowed to make updates to the tree while an iterator is active 945 // process validators in batches of 100 to avoid loading too many key-value pairs into memory 946 const batchSize = 100 ··· 965 itr.Next() 966 } 967 968 for i := 0; itr.Valid() && i < batchSize; i++ { 969 reputation := new(big.Int).SetBytes(itr.Value()) 970 971 keyCopy := slices.Clone(itr.Key()) 972 + pubKey := slices.Clone(itr.Key()[1:]) 973 974 + newValue, err := changer(pubKey, reputation.Uint64()) 975 if err != nil { 976 return nil, false, stacktrace.Propagate(err) 977 } ··· 1028 startingKey = toSet[len(toSet)-1].key 1029 } 1030 } 1031 + 1032 + func MarshalValidatorVotingActivityKey(epochHeight uint64, validatorAddress []byte) []byte { 1033 + key := make([]byte, IndexValidatorVotingActivityKeyLength) 1034 + key[0] = IndexValidatorVotingActivityPrefix 1035 + binary.BigEndian.PutUint64(key[1:], epochHeight) 1036 + copy(key[9:], validatorAddress) 1037 + return key 1038 + } 1039 + 1040 + func unmarshalValidatorVotingActivityKey(key []byte) (epochHeight uint64, validatorAddress []byte) { 1041 + epochHeight = binary.BigEndian.Uint64(key[1:]) 1042 + validatorAddress = slices.Clone(key[9:]) 1043 + return 1044 + } 1045 + 1046 + func unmarshalValidatorVotingActivityValue(value []byte) (validatorPubKey []byte, bitlist *bitfield.Bitlist64, err error) { 1047 + validatorPubKey = slices.Clone(value[:PublicKeyLength]) 1048 + bitlist, err = bitfield.Bitlist(value[PublicKeyLength:]).ToBitlist64() 1049 + return validatorPubKey, bitlist, stacktrace.Propagate(err) 1050 + } 1051 + 1052 + // InitializeValidatorVotingActivity implements [ConsensusStore]. 1053 + func (t *consensusStore) InitializeValidatorVotingActivity(tx transaction.WriteIndex, validatorAddress, validatorPubKey []byte, epochHeight uint64) error { 1054 + epochHeight = epochHeight - epochHeight%t.epochSize 1055 + 1056 + key := MarshalValidatorVotingActivityKey(epochHeight, validatorAddress) 1057 + 1058 + bitlist := bitfield.NewBitlist64(t.epochSize) 1059 + 1060 + bitlistBytes := bitlist.ToBitlist() 1061 + value := make([]byte, PublicKeyLength+len(bitlistBytes)) 1062 + copy(value, validatorPubKey) 1063 + copy(value[PublicKeyLength:], bitlistBytes) 1064 + 1065 + err := tx.IndexDB().Set(key, value) 1066 + return stacktrace.Propagate(err) 1067 + } 1068 + 1069 + // MarkValidatorVote implements [ConsensusStore]. 1070 + func (t *consensusStore) MarkValidatorVote(tx transaction.WriteIndex, validatorAddress []byte, height uint64) error { 1071 + epochHeight := height - height%t.epochSize 1072 + 1073 + key := MarshalValidatorVotingActivityKey(epochHeight, validatorAddress) 1074 + 1075 + value, err := tx.IndexDB().Get(key) 1076 + if err != nil { 1077 + return stacktrace.Propagate(err) 1078 + } 1079 + if value == nil { 1080 + return stacktrace.Propagate(ErrValidatorNotActive) 1081 + } 1082 + 1083 + bitfield := bitfield.Bitlist(value[PublicKeyLength:]) 1084 + bitfield.SetBitAt(height%t.epochSize, true) 1085 + 1086 + copy(value[PublicKeyLength:], bitfield) 1087 + 1088 + err = tx.IndexDB().Set(key, value) 1089 + return stacktrace.Propagate(err) 1090 + } 1091 + 1092 + // ActiveValidatorsIterator implements [ConsensusStore]. 1093 + func (t *consensusStore) ActiveValidatorsIterator(tx transaction.Read, epochHeight uint64, retErr *error) iter.Seq[ActiveValidator] { 1094 + return func(yield func(ActiveValidator) bool) { 1095 + *retErr = nil 1096 + 1097 + epochHeight = epochHeight - epochHeight%t.epochSize 1098 + 1099 + startKey := MarshalValidatorVotingActivityKey(uint64(epochHeight), make([]byte, AddressLength)) 1100 + endKey := MarshalValidatorVotingActivityKey(uint64(epochHeight), slices.Repeat([]byte{0xff}, AddressLength)) 1101 + 1102 + iterator, err := tx.IndexDB().Iterator(startKey, endKey) 1103 + if err != nil { 1104 + *retErr = stacktrace.Propagate(err) 1105 + return 1106 + } 1107 + 1108 + defer iterator.Close() 1109 + 1110 + for iterator.Valid() { 1111 + _, validatorAddress := unmarshalValidatorVotingActivityKey(iterator.Key()) 1112 + validatorPubKey, bitlist, err := unmarshalValidatorVotingActivityValue(iterator.Value()) 1113 + if err != nil { 1114 + *retErr = stacktrace.Propagate(err) 1115 + return 1116 + } 1117 + 1118 + if !yield(ActiveValidator{ 1119 + Address: validatorAddress, 1120 + PublicKey: validatorPubKey, 1121 + VoteCount: bitlist.Count(), 1122 + }) { 1123 + return 1124 + } 1125 + 1126 + iterator.Next() 1127 + } 1128 + err = iterator.Error() 1129 + if err != nil { 1130 + *retErr = stacktrace.Propagate(err) 1131 + } 1132 + } 1133 + } 1134 + 1135 + // FallbackBlockHeader implements [ConsensusStore]. 1136 + func (t *consensusStore) FallbackBlockHeader(tx transaction.Read, height uint64) ([]byte, error) { 1137 + key := make([]byte, IndexBlockHeaderKeyLength) 1138 + key[0] = IndexBlockHeaderKeyPrefix 1139 + binary.BigEndian.PutUint64(key[1:], height) 1140 + 1141 + value, err := tx.IndexDB().Get(key) 1142 + if err != nil { 1143 + return nil, stacktrace.Propagate(err) 1144 + } 1145 + if value == nil { 1146 + return nil, stacktrace.NewError("block header not found") 1147 + } 1148 + return value, nil 1149 + }
+59 -1
store/store_test.go
··· 24 25 validators := make([][]byte, 10) 26 for i := range validators { 27 - validators[i] = make([]byte, 20) 28 rand.Read(validators[i]) 29 } 30 ··· 78 require.Zero(t, rep) 79 } 80 }
··· 24 25 validators := make([][]byte, 10) 26 for i := range validators { 27 + validators[i] = make([]byte, store.PublicKeyLength) 28 rand.Read(validators[i]) 29 } 30 ··· 78 require.Zero(t, rep) 79 } 80 } 81 + 82 + func TestValidatorVotingActivity(t *testing.T) { 83 + txFactory, _, _ := testutil.NewTestTxFactory(t) 84 + 85 + tx, err := txFactory.ReadWorking(time.Now()).Upgrade() 86 + require.NoError(t, err) 87 + 88 + store.Consensus.ConfigureEpochSize(100) 89 + 90 + validatorPubKey := make([]byte, store.PublicKeyLength) 91 + rand.Read(validatorPubKey) 92 + 93 + validatorAddress := make([]byte, store.AddressLength) 94 + rand.Read(validatorAddress) 95 + 96 + err = store.Consensus.InitializeValidatorVotingActivity(tx, validatorAddress, validatorPubKey, 100) 97 + require.NoError(t, err) 98 + 99 + err = store.Consensus.MarkValidatorVote(tx, validatorAddress, 100) 100 + require.NoError(t, err) 101 + 102 + err = store.Consensus.MarkValidatorVote(tx, validatorAddress, 101) 103 + require.NoError(t, err) 104 + 105 + err = store.Consensus.MarkValidatorVote(tx, validatorAddress, 123) 106 + require.NoError(t, err) 107 + 108 + err = store.Consensus.MarkValidatorVote(tx, validatorAddress, 199) 109 + require.NoError(t, err) 110 + 111 + err = store.Consensus.MarkValidatorVote(tx, validatorAddress, 321) 112 + require.Error(t, err) 113 + 114 + err = store.Consensus.InitializeValidatorVotingActivity(tx, validatorAddress, validatorPubKey, 300) 115 + require.NoError(t, err) 116 + 117 + err = store.Consensus.MarkValidatorVote(tx, validatorAddress, 321) 118 + require.NoError(t, err) 119 + 120 + for epoch := range []uint64{100, 200, 300} { 121 + iterated := false 122 + for v := range store.Consensus.ActiveValidatorsIterator(tx.Downgrade(), 100, &err) { 123 + iterated = true 124 + require.Equal(t, validatorAddress, v.Address) 125 + require.Equal(t, validatorPubKey, v.PublicKey) 126 + switch epoch { 127 + case 100: 128 + require.Equal(t, uint64(4), v.VoteCount) 129 + case 200: 130 + require.Zero(t, v.VoteCount) 131 + case 300: 132 + require.Equal(t, uint64(1), v.VoteCount) 133 + } 134 + } 135 + require.True(t, iterated) 136 + require.NoError(t, err) 137 + } 138 + }