A very experimental PLC implementation which uses BFT consensus for decentralization

Clean up logging situation

gbl08ma.com f1d776e6 0c201f68

verified
+107 -54
+21 -7
abciapp/app.go
··· 2 3 import ( 4 "context" 5 - "fmt" 6 "os" 7 "sync" 8 "time" ··· 10 dbm "github.com/cometbft/cometbft-db" 11 abcitypes "github.com/cometbft/cometbft/abci/types" 12 "github.com/cometbft/cometbft/crypto" 13 "github.com/cometbft/cometbft/privval" 14 bftstore "github.com/cometbft/cometbft/store" 15 "github.com/cosmos/iavl" ··· 26 27 type DIDPLCApplication struct { 28 runnerContext context.Context 29 plc plc.PLC 30 txFactory *transaction.Factory 31 indexDB dbm.DB ··· 54 } 55 56 // store and plc must be able to share transaction objects 57 - func NewDIDPLCApplication(appContext context.Context, pv *privval.FilePV, treeDB dbm.DB, indexDB transaction.ExtendedDB, clearData func(), snapshotDirectory, didBloomFilterPath string, mempoolSubmitter types.MempoolSubmitter) (*DIDPLCApplication, *transaction.Factory, plc.PLC, func(), error) { 58 mkTree := func() *iavl.MutableTree { 59 // Using SpeedDefault appears to cause the processing time for ExecuteOperation to double on average 60 // Using SpeedBetterCompression appears to cause the processing time to double again ··· 80 81 d := &DIDPLCApplication{ 82 runnerContext: runnerContext, 83 tree: tree, 84 indexDB: indexDB, 85 mempoolSubmitter: mempoolSubmitter, ··· 92 d.validatorPrivKey = pv.Key.PrivKey 93 } 94 95 - d.txFactory, err = transaction.NewFactory(tree, indexDB, store.Consensus.CountOperations, store.NewDIDBloomFilterStore(didBloomFilterPath)) 96 if err != nil { 97 return nil, nil, nil, cancelRunnerContext, stacktrace.Propagate(err, "") 98 } ··· 109 110 *d.tree = *mkTree() 111 112 - d.txFactory, err = transaction.NewFactory(tree, indexDB, store.Consensus.CountOperations, store.NewDIDBloomFilterStore(didBloomFilterPath)) 113 if err != nil { 114 return stacktrace.Propagate(err, "") 115 } ··· 131 st := time.Now() 132 err := d.txFactory.SaveDIDBloomFilter() 133 if err != nil { 134 - fmt.Println("FAILED TO SAVE BLOOM FILTER:", stacktrace.Propagate(err, "")) 135 } 136 - fmt.Println("SAVED BLOOM FILTER IN", time.Since(st)) 137 } 138 }) 139 ··· 195 }, nil 196 } 197 198 func (d *DIDPLCApplication) FinishInitializing(blockStore *bftstore.BlockStore) error { 199 d.blockStore = blockStore 200 201 var err error 202 - d.blockChallengeCoordinator, err = newBlockChallengeCoordinator(d.runnerContext, d.txFactory, blockStore, d.validatorPubKey) 203 if err != nil { 204 return stacktrace.Propagate(err, "") 205 }
··· 2 3 import ( 4 "context" 5 "os" 6 "sync" 7 "time" ··· 9 dbm "github.com/cometbft/cometbft-db" 10 abcitypes "github.com/cometbft/cometbft/abci/types" 11 "github.com/cometbft/cometbft/crypto" 12 + cmtlog "github.com/cometbft/cometbft/libs/log" 13 "github.com/cometbft/cometbft/privval" 14 bftstore "github.com/cometbft/cometbft/store" 15 "github.com/cosmos/iavl" ··· 26 27 type DIDPLCApplication struct { 28 runnerContext context.Context 29 + logger cmtlog.Logger 30 plc plc.PLC 31 txFactory *transaction.Factory 32 indexDB dbm.DB ··· 55 } 56 57 // store and plc must be able to share transaction objects 58 + func NewDIDPLCApplication(appContext context.Context, logger cmtlog.Logger, pv *privval.FilePV, treeDB dbm.DB, indexDB transaction.ExtendedDB, clearData func(), snapshotDirectory, didBloomFilterPath string, mempoolSubmitter types.MempoolSubmitter) (*DIDPLCApplication, *transaction.Factory, plc.PLC, func(), error) { 59 mkTree := func() *iavl.MutableTree { 60 // Using SpeedDefault appears to cause the processing time for ExecuteOperation to double on average 61 // Using SpeedBetterCompression appears to cause the processing time to double again ··· 81 82 d := &DIDPLCApplication{ 83 runnerContext: runnerContext, 84 + logger: logger.With("module", "plcapp"), 85 tree: tree, 86 indexDB: indexDB, 87 mempoolSubmitter: mempoolSubmitter, ··· 94 d.validatorPrivKey = pv.Key.PrivKey 95 } 96 97 + d.txFactory, err = transaction.NewFactory(tree, indexDB, store.Consensus.CountOperations, store.NewDIDBloomFilterStore(d.logger, didBloomFilterPath)) 98 if err != nil { 99 return nil, nil, nil, cancelRunnerContext, stacktrace.Propagate(err, "") 100 } ··· 111 112 *d.tree = *mkTree() 113 114 + d.txFactory, err = transaction.NewFactory(tree, indexDB, store.Consensus.CountOperations, store.NewDIDBloomFilterStore(d.logger, didBloomFilterPath)) 115 if err != nil { 116 return stacktrace.Propagate(err, "") 117 } ··· 133 st := time.Now() 134 err := d.txFactory.SaveDIDBloomFilter() 135 if err != nil { 136 + d.logger.Error("failed to save bloom filter", "error", stacktrace.Propagate(err, "")) 137 } 138 + d.logger.Debug("saved bloom filter", "took", time.Since(st)) 139 } 140 }) 141 ··· 197 }, nil 198 } 199 200 + func (d *DIDPLCApplication) logMethod(method string, keyvals ...any) func(...any) { 201 + st := time.Now() 202 + d.logger.Debug(method+" start", keyvals...) 203 + return func(extra ...any) { 204 + args := make([]any, 0, len(keyvals)+len(extra)+2) 205 + args = append(args, keyvals...) 206 + args = append(args, extra...) 207 + args = append(args, "took", time.Since(st)) 208 + d.logger.Debug(method+" done", args...) 209 + } 210 + } 211 + 212 func (d *DIDPLCApplication) FinishInitializing(blockStore *bftstore.BlockStore) error { 213 d.blockStore = blockStore 214 215 var err error 216 + d.blockChallengeCoordinator, err = newBlockChallengeCoordinator(d.runnerContext, d.logger, d.txFactory, blockStore, d.validatorPubKey) 217 if err != nil { 218 return stacktrace.Propagate(err, "") 219 }
+3 -1
abciapp/app_test.go
··· 6 7 dbm "github.com/cometbft/cometbft-db" 8 "github.com/cometbft/cometbft/abci/types" 9 "github.com/dgraph-io/badger/v4" 10 cbornode "github.com/ipfs/go-ipld-cbor" 11 "github.com/stretchr/testify/require" ··· 22 } 23 24 func TestCheckTx(t *testing.T) { 25 - app, _, _, cleanup, err := abciapp.NewDIDPLCApplication(t.Context(), nil, dbm.NewMemDB(), memDBWrapper{dbm.NewMemDB()}, nil, "", "", nil) 26 require.NoError(t, err) 27 t.Cleanup(cleanup) 28
··· 6 7 dbm "github.com/cometbft/cometbft-db" 8 "github.com/cometbft/cometbft/abci/types" 9 + cmtlog "github.com/cometbft/cometbft/libs/log" 10 "github.com/dgraph-io/badger/v4" 11 cbornode "github.com/ipfs/go-ipld-cbor" 12 "github.com/stretchr/testify/require" ··· 23 } 24 25 func TestCheckTx(t *testing.T) { 26 + logger := cmtlog.NewNopLogger() 27 + app, _, _, cleanup, err := abciapp.NewDIDPLCApplication(t.Context(), logger, nil, dbm.NewMemDB(), memDBWrapper{dbm.NewMemDB()}, nil, "", "", nil) 28 require.NoError(t, err) 29 t.Cleanup(cleanup) 30
+12 -9
abciapp/block_challenge.go
··· 4 "bytes" 5 "context" 6 "embed" 7 - "fmt" 8 "math/big" 9 "time" 10 11 "github.com/Yiling-J/theine-go" 12 "github.com/cometbft/cometbft/crypto" 13 bftstore "github.com/cometbft/cometbft/store" 14 "github.com/consensys/gnark-crypto/ecc" 15 "github.com/consensys/gnark-crypto/ecc/bn254" 16 "github.com/consensys/gnark-crypto/ecc/bn254/fr/mimc" 17 - "github.com/consensys/gnark/backend" 18 "github.com/consensys/gnark/backend/groth16" 19 "github.com/consensys/gnark/backend/witness" 20 "github.com/consensys/gnark/constraint" 21 - "github.com/consensys/gnark/constraint/solver" 22 "github.com/consensys/gnark/frontend" 23 "github.com/palantir/stacktrace" 24 "github.com/rs/zerolog" 25 "github.com/samber/lo" ··· 51 vkFile := lo.Must(blockChallengeCircuitFS.Open("proofcircuit/BlockChallenge_VerifyingKey")) 52 defer vkFile.Close() 53 lo.Must(blockChallengeVerifyingKey.ReadFrom(vkFile)) 54 } 55 56 type blockChallengeCoordinator struct { 57 g singleflight.Group[int64, []byte] 58 59 runnerContext context.Context 60 61 isConfiguredToBeValidator bool 62 validatorAddress []byte ··· 66 sharedWitnessDataCache *theine.LoadingCache[int64, proof.BlockChallengeCircuit] 67 } 68 69 - func newBlockChallengeCoordinator(runnerContext context.Context, txFactory *transaction.Factory, blockStore *bftstore.BlockStore, pubKey crypto.PubKey) (*blockChallengeCoordinator, error) { 70 c := &blockChallengeCoordinator{ 71 runnerContext: runnerContext, 72 txFactory: txFactory, 73 nodeBlockStore: blockStore, 74 isConfiguredToBeValidator: pubKey != nil, ··· 126 go func() { 127 _, err := c.loadOrComputeBlockChallengeProof(c.runnerContext, height) 128 if err != nil { 129 - fmt.Printf("FAILED TO COMPUTE CHALLENGE FOR BLOCK %d: %v\n", height, stacktrace.Propagate(err, "")) 130 } 131 }() 132 } ··· 148 return nil, stacktrace.Propagate(err, "") 149 } 150 if proof == nil { 151 // compute and store 152 proof, err = c.computeBlockChallengeProof(tx, height) 153 if err != nil { ··· 169 if err != nil { 170 return nil, stacktrace.Propagate(err, "") 171 } 172 } 173 return proof, nil 174 }) ··· 180 if err != nil { 181 return nil, stacktrace.Propagate(err, "") 182 } 183 - 184 - // TODO consider using a different logger once we clean up our logging act 185 - // TODO open an issue in the gnark repo because backend.WithSolverOptions(solver.WithLogger(zerolog.Nop())) has no effect... 186 - proof, err := groth16.Prove(blockChallengeConstraintSystem, blockChallengeProvingKey, witness, backend.WithSolverOptions(solver.WithLogger(zerolog.Nop()))) 187 if err != nil { 188 return nil, stacktrace.Propagate(err, "") 189 }
··· 4 "bytes" 5 "context" 6 "embed" 7 "math/big" 8 "time" 9 10 "github.com/Yiling-J/theine-go" 11 "github.com/cometbft/cometbft/crypto" 12 + cmtlog "github.com/cometbft/cometbft/libs/log" 13 bftstore "github.com/cometbft/cometbft/store" 14 "github.com/consensys/gnark-crypto/ecc" 15 "github.com/consensys/gnark-crypto/ecc/bn254" 16 "github.com/consensys/gnark-crypto/ecc/bn254/fr/mimc" 17 "github.com/consensys/gnark/backend/groth16" 18 "github.com/consensys/gnark/backend/witness" 19 "github.com/consensys/gnark/constraint" 20 "github.com/consensys/gnark/frontend" 21 + gnarklogger "github.com/consensys/gnark/logger" 22 "github.com/palantir/stacktrace" 23 "github.com/rs/zerolog" 24 "github.com/samber/lo" ··· 50 vkFile := lo.Must(blockChallengeCircuitFS.Open("proofcircuit/BlockChallenge_VerifyingKey")) 51 defer vkFile.Close() 52 lo.Must(blockChallengeVerifyingKey.ReadFrom(vkFile)) 53 + 54 + gnarklogger.Set(zerolog.Nop()) 55 } 56 57 type blockChallengeCoordinator struct { 58 g singleflight.Group[int64, []byte] 59 60 runnerContext context.Context 61 + logger cmtlog.Logger 62 63 isConfiguredToBeValidator bool 64 validatorAddress []byte ··· 68 sharedWitnessDataCache *theine.LoadingCache[int64, proof.BlockChallengeCircuit] 69 } 70 71 + func newBlockChallengeCoordinator(runnerContext context.Context, logger cmtlog.Logger, txFactory *transaction.Factory, blockStore *bftstore.BlockStore, pubKey crypto.PubKey) (*blockChallengeCoordinator, error) { 72 c := &blockChallengeCoordinator{ 73 runnerContext: runnerContext, 74 + logger: logger, 75 txFactory: txFactory, 76 nodeBlockStore: blockStore, 77 isConfiguredToBeValidator: pubKey != nil, ··· 129 go func() { 130 _, err := c.loadOrComputeBlockChallengeProof(c.runnerContext, height) 131 if err != nil { 132 + c.logger.Error("failed to compute block challenge", "height", height, "error", stacktrace.Propagate(err, "")) 133 } 134 }() 135 } ··· 151 return nil, stacktrace.Propagate(err, "") 152 } 153 if proof == nil { 154 + st := time.Now() 155 // compute and store 156 proof, err = c.computeBlockChallengeProof(tx, height) 157 if err != nil { ··· 173 if err != nil { 174 return nil, stacktrace.Propagate(err, "") 175 } 176 + 177 + c.logger.Debug("computed and stored block challenge", "height", height, "took", time.Since(st)) 178 } 179 return proof, nil 180 }) ··· 186 if err != nil { 187 return nil, stacktrace.Propagate(err, "") 188 } 189 + proof, err := groth16.Prove(blockChallengeConstraintSystem, blockChallengeProvingKey, witness) 190 if err != nil { 191 return nil, stacktrace.Propagate(err, "") 192 }
+12 -3
abciapp/execution.go
··· 3 import ( 4 "bytes" 5 "context" 6 - "fmt" 7 "slices" 8 "time" 9 ··· 25 26 // PrepareProposal implements [types.Application]. 27 func (d *DIDPLCApplication) PrepareProposal(ctx context.Context, req *abcitypes.RequestPrepareProposal) (*abcitypes.ResponsePrepareProposal, error) { 28 defer d.DiscardChanges() 29 30 if req.Height == 2 { ··· 105 106 // ProcessProposal implements [types.Application]. 107 func (d *DIDPLCApplication) ProcessProposal(ctx context.Context, req *abcitypes.RequestProcessProposal) (*abcitypes.ResponseProcessProposal, error) { 108 // always reset state before processing a new proposal 109 d.DiscardChanges() 110 // do not unconditionally defer DiscardChanges because we want to re-use the results in FinalizeBlock when we vote accept ··· 140 return &abcitypes.ResponseProcessProposal{Status: abcitypes.ResponseProcessProposal_REJECT}, nil 141 } 142 143 - st := time.Now() 144 result, err = finishProcessTx(ctx, d.transactionProcessorDependenciesForOngoingProcessing(true, req.Time), processor, tx) 145 if err != nil { 146 return nil, stacktrace.Propagate(err, "") 147 } 148 - fmt.Println("FINISHPROCESSTX TOOK", time.Since(st)) 149 } 150 151 // when preparing a proposal, invalid transactions should have been discarded ··· 165 166 // ExtendVote implements [types.Application]. 167 func (d *DIDPLCApplication) ExtendVote(ctx context.Context, req *abcitypes.RequestExtendVote) (*abcitypes.ResponseExtendVote, error) { 168 proof, err := d.blockChallengeCoordinator.loadOrComputeBlockChallengeProof(ctx, req.Height) 169 if err != nil { 170 return nil, stacktrace.Propagate(err, "") ··· 176 177 // VerifyVoteExtension implements [types.Application]. 178 func (d *DIDPLCApplication) VerifyVoteExtension(_ context.Context, req *abcitypes.RequestVerifyVoteExtension) (*abcitypes.ResponseVerifyVoteExtension, error) { 179 if len(req.VoteExtension) > 200 { 180 // that definitely ain't right 181 return &abcitypes.ResponseVerifyVoteExtension{ ··· 195 196 // FinalizeBlock implements [types.Application]. 197 func (d *DIDPLCApplication) FinalizeBlock(ctx context.Context, req *abcitypes.RequestFinalizeBlock) (*abcitypes.ResponseFinalizeBlock, error) { 198 if bytes.Equal(req.Hash, d.lastProcessedProposalHash) && d.lastProcessedProposalExecTxResults != nil { 199 // the block that was decided was the one we processed in ProcessProposal, and ProcessProposal processed successfully 200 // reuse the uncommitted results ··· 231 232 // Commit implements [types.Application]. 233 func (d *DIDPLCApplication) Commit(context.Context, *abcitypes.RequestCommit) (*abcitypes.ResponseCommit, error) { 234 // ensure we always advance tree version by creating ongoingWrite if it hasn't been created already 235 d.createOngoingTxIfNeeded(time.Now()) 236
··· 3 import ( 4 "bytes" 5 "context" 6 + "encoding/hex" 7 "slices" 8 "time" 9 ··· 25 26 // PrepareProposal implements [types.Application]. 27 func (d *DIDPLCApplication) PrepareProposal(ctx context.Context, req *abcitypes.RequestPrepareProposal) (*abcitypes.ResponsePrepareProposal, error) { 28 + defer (d.logMethod("PrepareProposal", "height", req.Height, "txs", len(req.Txs)))() 29 defer d.DiscardChanges() 30 31 if req.Height == 2 { ··· 106 107 // ProcessProposal implements [types.Application]. 108 func (d *DIDPLCApplication) ProcessProposal(ctx context.Context, req *abcitypes.RequestProcessProposal) (*abcitypes.ResponseProcessProposal, error) { 109 + defer (d.logMethod("ProcessProposal", "height", req.Height, "hash", req.Hash, "txs", len(req.Txs)))() 110 + 111 // always reset state before processing a new proposal 112 d.DiscardChanges() 113 // do not unconditionally defer DiscardChanges because we want to re-use the results in FinalizeBlock when we vote accept ··· 143 return &abcitypes.ResponseProcessProposal{Status: abcitypes.ResponseProcessProposal_REJECT}, nil 144 } 145 146 result, err = finishProcessTx(ctx, d.transactionProcessorDependenciesForOngoingProcessing(true, req.Time), processor, tx) 147 if err != nil { 148 return nil, stacktrace.Propagate(err, "") 149 } 150 } 151 152 // when preparing a proposal, invalid transactions should have been discarded ··· 166 167 // ExtendVote implements [types.Application]. 168 func (d *DIDPLCApplication) ExtendVote(ctx context.Context, req *abcitypes.RequestExtendVote) (*abcitypes.ResponseExtendVote, error) { 169 + defer (d.logMethod("ExtendVote", "height", req.Height, "hash", req.Hash))() 170 + 171 proof, err := d.blockChallengeCoordinator.loadOrComputeBlockChallengeProof(ctx, req.Height) 172 if err != nil { 173 return nil, stacktrace.Propagate(err, "") ··· 179 180 // VerifyVoteExtension implements [types.Application]. 181 func (d *DIDPLCApplication) VerifyVoteExtension(_ context.Context, req *abcitypes.RequestVerifyVoteExtension) (*abcitypes.ResponseVerifyVoteExtension, error) { 182 + defer (d.logMethod("VerifyVoteExtension", "height", req.Height, "hash", req.Hash, "validator", hex.EncodeToString(req.ValidatorAddress)))() 183 + 184 if len(req.VoteExtension) > 200 { 185 // that definitely ain't right 186 return &abcitypes.ResponseVerifyVoteExtension{ ··· 200 201 // FinalizeBlock implements [types.Application]. 202 func (d *DIDPLCApplication) FinalizeBlock(ctx context.Context, req *abcitypes.RequestFinalizeBlock) (*abcitypes.ResponseFinalizeBlock, error) { 203 + defer (d.logMethod("FinalizeBlock", "height", req.Height, "hash", req.Hash))() 204 + 205 if bytes.Equal(req.Hash, d.lastProcessedProposalHash) && d.lastProcessedProposalExecTxResults != nil { 206 // the block that was decided was the one we processed in ProcessProposal, and ProcessProposal processed successfully 207 // reuse the uncommitted results ··· 238 239 // Commit implements [types.Application]. 240 func (d *DIDPLCApplication) Commit(context.Context, *abcitypes.RequestCommit) (*abcitypes.ResponseCommit, error) { 241 + defer (d.logMethod("Commit"))() 242 + 243 // ensure we always advance tree version by creating ongoingWrite if it hasn't been created already 244 d.createOngoingTxIfNeeded(time.Now()) 245
+15 -13
abciapp/range_challenge.go
··· 3 import ( 4 "context" 5 "encoding/binary" 6 "errors" 7 - "fmt" 8 "math/big" 9 "slices" 10 "sync" ··· 12 13 "github.com/Yiling-J/theine-go" 14 "github.com/cometbft/cometbft/crypto" 15 - "github.com/cometbft/cometbft/mempool" 16 "github.com/cometbft/cometbft/privval" 17 "github.com/cometbft/cometbft/rpc/core" 18 bftstore "github.com/cometbft/cometbft/store" ··· 30 31 type RangeChallengeCoordinator struct { 32 runnerContext context.Context 33 34 isConfiguredToBeValidator bool 35 validatorPubKey crypto.PubKey ··· 58 59 func NewRangeChallengeCoordinator( 60 runnerContext context.Context, 61 txFactory *transaction.Factory, 62 blockStore *bftstore.BlockStore, 63 nodeEventBus *cmttypes.EventBus, 64 mempoolSubmitter types.MempoolSubmitter, 65 - consensusReactor consensusReactor, 66 - pv *privval.FilePV) (*RangeChallengeCoordinator, error) { 67 c := &RangeChallengeCoordinator{ 68 txFactory: txFactory, 69 runnerContext: runnerContext, 70 nodeBlockStore: blockStore, 71 nodeEventBus: nodeEventBus, 72 mempoolSubmitter: mempoolSubmitter, ··· 98 c.wg.Go(func() { 99 err := c.newBlocksSubscriber() 100 if err != nil { 101 - fmt.Println("newBlocksSubscriber FAILED:", err) 102 } 103 }) 104 c.wg.Go(func() { ··· 114 if err != nil { 115 // note: this is expected in certain circumstances, such as the proof for the toHeight block not being ready yet as the block was just finalized 116 // (and the block may have been finalized without our votes) 117 - fmt.Println("onNewBlock FAILED:", err) 118 } 119 }() 120 } ··· 277 } 278 } 279 280 - fmt.Println("RANGE CHALLENGE EVAL", shouldCommitToChallenge, shouldCompleteChallenge) 281 - 282 - var transactionBytes []byte 283 if shouldCompleteChallenge { 284 transactionBytes, err = c.createCompleteChallengeTx(ctx, tx, int64(fromHeight), int64(toHeight), int64(provenHeight), int64(includedOnHeight)) 285 if err != nil { 286 return stacktrace.Propagate(err, "") 287 } 288 } else if shouldCommitToChallenge { 289 transactionBytes, err = c.createCommitToChallengeTx(ctx, tx, newBlockHeight) 290 if err != nil { 291 if errors.Is(err, errMissingProofs) { ··· 301 return nil 302 } 303 304 result, err := c.mempoolSubmitter.BroadcastTx(ctx, transactionBytes, true) 305 if err != nil { 306 - if errors.Is(err, mempool.ErrTxInCache) { 307 - // expected, as we don't wait for broadcast and therefore will try to repeatedly commit/complete 308 - return nil 309 - } 310 return stacktrace.Propagate(err, "") 311 } 312 if result.CheckTx.Code == 0 && shouldCompleteChallenge { 313 c.hasSubmittedChallengeCompletion = true 314 } 315 c.cachedNextProofFromHeight = mo.None[int64]() 316 return nil 317 }
··· 3 import ( 4 "context" 5 "encoding/binary" 6 + "encoding/hex" 7 "errors" 8 "math/big" 9 "slices" 10 "sync" ··· 12 13 "github.com/Yiling-J/theine-go" 14 "github.com/cometbft/cometbft/crypto" 15 + cmtlog "github.com/cometbft/cometbft/libs/log" 16 "github.com/cometbft/cometbft/privval" 17 "github.com/cometbft/cometbft/rpc/core" 18 bftstore "github.com/cometbft/cometbft/store" ··· 30 31 type RangeChallengeCoordinator struct { 32 runnerContext context.Context 33 + logger cmtlog.Logger 34 35 isConfiguredToBeValidator bool 36 validatorPubKey crypto.PubKey ··· 59 60 func NewRangeChallengeCoordinator( 61 runnerContext context.Context, 62 + logger cmtlog.Logger, 63 + pv *privval.FilePV, 64 txFactory *transaction.Factory, 65 blockStore *bftstore.BlockStore, 66 nodeEventBus *cmttypes.EventBus, 67 mempoolSubmitter types.MempoolSubmitter, 68 + consensusReactor consensusReactor) (*RangeChallengeCoordinator, error) { 69 c := &RangeChallengeCoordinator{ 70 txFactory: txFactory, 71 runnerContext: runnerContext, 72 + logger: logger, 73 nodeBlockStore: blockStore, 74 nodeEventBus: nodeEventBus, 75 mempoolSubmitter: mempoolSubmitter, ··· 101 c.wg.Go(func() { 102 err := c.newBlocksSubscriber() 103 if err != nil { 104 + c.logger.Error("blocks subscriber failed", "error", stacktrace.Propagate(err, "")) 105 } 106 }) 107 c.wg.Go(func() { ··· 117 if err != nil { 118 // note: this is expected in certain circumstances, such as the proof for the toHeight block not being ready yet as the block was just finalized 119 // (and the block may have been finalized without our votes) 120 + c.logger.Error("range challenge block handler error", "error", stacktrace.Propagate(err, "")) 121 } 122 }() 123 } ··· 280 } 281 } 282 283 + var transactionBytes cmttypes.Tx 284 if shouldCompleteChallenge { 285 + c.logger.Info("Creating challenge completion transaction", "fromHeight", fromHeight, "toHeight", toHeight, "provenHeight", provenHeight, "includedOnHeight", includedOnHeight) 286 transactionBytes, err = c.createCompleteChallengeTx(ctx, tx, int64(fromHeight), int64(toHeight), int64(provenHeight), int64(includedOnHeight)) 287 if err != nil { 288 return stacktrace.Propagate(err, "") 289 } 290 } else if shouldCommitToChallenge { 291 + c.logger.Info("Creating challenge commitment transaction", "toHeight", toHeight) 292 transactionBytes, err = c.createCommitToChallengeTx(ctx, tx, newBlockHeight) 293 if err != nil { 294 if errors.Is(err, errMissingProofs) { ··· 304 return nil 305 } 306 307 + txHashHex := hex.EncodeToString(transactionBytes.Hash()) 308 + c.logger.Debug("broadcasting range challenge transaction", "hash", txHashHex) 309 result, err := c.mempoolSubmitter.BroadcastTx(ctx, transactionBytes, true) 310 if err != nil { 311 return stacktrace.Propagate(err, "") 312 } 313 if result.CheckTx.Code == 0 && shouldCompleteChallenge { 314 c.hasSubmittedChallengeCompletion = true 315 } 316 + c.logger.Debug("range challenge transaction included", "hash", txHashHex, "txResult", result.TxResult.Code) 317 c.cachedNextProofFromHeight = mo.None[int64]() 318 return nil 319 }
+2 -5
abciapp/snapshots.go
··· 15 "strconv" 16 "strings" 17 "sync" 18 - "time" 19 20 dbm "github.com/cometbft/cometbft-db" 21 abcitypes "github.com/cometbft/cometbft/abci/types" ··· 230 } 231 232 func (d *DIDPLCApplication) createSnapshot(treeVersion int64, tempFilename string) error { 233 it, err := d.tree.GetImmutable(treeVersion) 234 if err != nil { 235 return stacktrace.Propagate(err, "") ··· 243 return stacktrace.Propagate(err, "") 244 } 245 defer f.Close() 246 - 247 - st := time.Now() 248 249 err = writeSnapshot(f, d.indexDB, it) 250 if err != nil { ··· 278 } 279 280 os.Rename(tempFilename, filepath.Join(d.snapshotDirectory, fmt.Sprintf("%020d.snapshot", treeVersion))) 281 - 282 - fmt.Println("Took", time.Since(st), "to export") 283 284 return nil 285 }
··· 15 "strconv" 16 "strings" 17 "sync" 18 19 dbm "github.com/cometbft/cometbft-db" 20 abcitypes "github.com/cometbft/cometbft/abci/types" ··· 229 } 230 231 func (d *DIDPLCApplication) createSnapshot(treeVersion int64, tempFilename string) error { 232 + defer (d.logMethod("createSnapshot", "treeVersion", treeVersion, "tempFilename", tempFilename))() 233 + 234 it, err := d.tree.GetImmutable(treeVersion) 235 if err != nil { 236 return stacktrace.Propagate(err, "") ··· 244 return stacktrace.Propagate(err, "") 245 } 246 defer f.Close() 247 248 err = writeSnapshot(f, d.indexDB, it) 249 if err != nil { ··· 277 } 278 279 os.Rename(tempFilename, filepath.Join(d.snapshotDirectory, fmt.Sprintf("%020d.snapshot", treeVersion))) 280 281 return nil 282 }
+25 -9
main.go
··· 97 appContext, cancelAppContext := context.WithCancel(context.Background()) 98 defer cancelAppContext() 99 100 - app, txFactory, plc, cleanup, err := abciapp.NewDIDPLCApplication(appContext, pv, treeDB, indexDB, recreateDatabases, filepath.Join(homeDir, "snapshots"), didBloomFilterPath, mempoolSubmitter) 101 if err != nil { 102 log.Fatalf("failed to create DIDPLC application: %v", err) 103 } ··· 108 log.Fatalf("failed to load node's key: %v", err) 109 } 110 111 - logger := cmtlog.NewTMLogger(cmtlog.NewSyncWriter(os.Stdout)) 112 - logger, err = cmtflags.ParseLogLevel(config.LogLevel, logger, bftconfig.DefaultLogLevel) 113 - 114 - if err != nil { 115 - log.Fatalf("failed to parse log level: %v", err) 116 - } 117 - 118 node, err := nm.NewNode( 119 config.Config, 120 pv, ··· 137 log.Fatalf("Finishing ABCI app initialization: %v", err) 138 } 139 140 - rangeChallengeCoordinator, err := abciapp.NewRangeChallengeCoordinator(appContext, txFactory, node.BlockStore(), node.EventBus(), mempoolSubmitter, node.ConsensusReactor(), pv) 141 if err != nil { 142 log.Fatalf("Creating RangeChallengeCoordinator: %v", err) 143 }
··· 97 appContext, cancelAppContext := context.WithCancel(context.Background()) 98 defer cancelAppContext() 99 100 + logger := cmtlog.NewTMLogger(cmtlog.NewSyncWriter(os.Stdout)) 101 + logger, err = cmtflags.ParseLogLevel(config.LogLevel, logger, bftconfig.DefaultLogLevel) 102 + if err != nil { 103 + log.Fatalf("failed to parse log level: %v", err) 104 + } 105 + 106 + app, txFactory, plc, cleanup, err := abciapp.NewDIDPLCApplication( 107 + appContext, 108 + logger, 109 + pv, 110 + treeDB, 111 + indexDB, 112 + recreateDatabases, 113 + filepath.Join(homeDir, "snapshots"), 114 + didBloomFilterPath, 115 + mempoolSubmitter) 116 if err != nil { 117 log.Fatalf("failed to create DIDPLC application: %v", err) 118 } ··· 123 log.Fatalf("failed to load node's key: %v", err) 124 } 125 126 node, err := nm.NewNode( 127 config.Config, 128 pv, ··· 145 log.Fatalf("Finishing ABCI app initialization: %v", err) 146 } 147 148 + rangeChallengeCoordinator, err := abciapp.NewRangeChallengeCoordinator( 149 + appContext, 150 + logger.With("module", "plcapp"), 151 + pv, 152 + txFactory, 153 + node.BlockStore(), 154 + node.EventBus(), 155 + mempoolSubmitter, 156 + node.ConsensusReactor()) 157 if err != nil { 158 log.Fatalf("Creating RangeChallengeCoordinator: %v", err) 159 }
+1
startfresh.sh
··· 3 go build -trimpath 4 go run github.com/cometbft/cometbft/cmd/cometbft@v0.38.19 init --home didplcbft-data 5 sed -i 's/^create_empty_blocks = true$/create_empty_blocks = false/g' didplcbft-data/config/config.toml 6 ./didplcbft
··· 3 go build -trimpath 4 go run github.com/cometbft/cometbft/cmd/cometbft@v0.38.19 init --home didplcbft-data 5 sed -i 's/^create_empty_blocks = true$/create_empty_blocks = false/g' didplcbft-data/config/config.toml 6 + sed -i 's/^log_level = "info"$/log_level = "plcapp:debug,*:info"/g' didplcbft-data/config/config.toml 7 ./didplcbft
+14 -6
store/did_bloom.go
··· 3 import ( 4 "encoding/binary" 5 "errors" 6 - "fmt" 7 "io" 8 "math" 9 "os" 10 "slices" 11 12 "github.com/bits-and-blooms/bloom/v3" 13 "github.com/palantir/stacktrace" 14 "tangled.org/gbl08ma.com/didplcbft/transaction" 15 ) 16 17 type DIDBloomFilterStore struct { 18 filePath string 19 } 20 21 - func NewInMemoryDIDBloomFilterStore() *DIDBloomFilterStore { 22 - return &DIDBloomFilterStore{} 23 } 24 25 - func NewDIDBloomFilterStore(filePath string) *DIDBloomFilterStore { 26 return &DIDBloomFilterStore{ 27 filePath: filePath, 28 } 29 } ··· 93 return filter, nil 94 } 95 96 - fmt.Println("(RE)BUILDING DID BLOOM FILTER") 97 - 98 filterEstimatedItems := uint(100000000) // we know there are like 80M DIDs at the time of writing 99 if estimatedDIDCount != 0 { 100 filterEstimatedItems = max(filterEstimatedItems, uint(estimatedDIDCount*3)) 101 } 102 103 filter = bloom.NewWithEstimates(filterEstimatedItems, 0.01) 104 ··· 112 113 defer iterator.Close() 114 115 for iterator.Valid() { 116 filter.Add(iterator.Key()[1:16]) 117 118 iterator.Next() 119 } 120 err = iterator.Error() 121 if err != nil { 122 return nil, stacktrace.Propagate(err, "") 123 } 124 125 return filter, nil 126 }
··· 3 import ( 4 "encoding/binary" 5 "errors" 6 "io" 7 "math" 8 "os" 9 "slices" 10 11 "github.com/bits-and-blooms/bloom/v3" 12 + cmtlog "github.com/cometbft/cometbft/libs/log" 13 "github.com/palantir/stacktrace" 14 "tangled.org/gbl08ma.com/didplcbft/transaction" 15 ) 16 17 type DIDBloomFilterStore struct { 18 + logger cmtlog.Logger 19 filePath string 20 } 21 22 + func NewInMemoryDIDBloomFilterStore(logger cmtlog.Logger) *DIDBloomFilterStore { 23 + return &DIDBloomFilterStore{ 24 + logger: logger, 25 + } 26 } 27 28 + func NewDIDBloomFilterStore(logger cmtlog.Logger, filePath string) *DIDBloomFilterStore { 29 return &DIDBloomFilterStore{ 30 + logger: logger, 31 filePath: filePath, 32 } 33 } ··· 97 return filter, nil 98 } 99 100 filterEstimatedItems := uint(100000000) // we know there are like 80M DIDs at the time of writing 101 if estimatedDIDCount != 0 { 102 filterEstimatedItems = max(filterEstimatedItems, uint(estimatedDIDCount*3)) 103 } 104 + 105 + s.logger.Info("Rebuilding DID bloom filter", "itemCapacity", filterEstimatedItems) 106 107 filter = bloom.NewWithEstimates(filterEstimatedItems, 0.01) 108 ··· 116 117 defer iterator.Close() 118 119 + itemCount := 0 120 for iterator.Valid() { 121 filter.Add(iterator.Key()[1:16]) 122 123 iterator.Next() 124 + itemCount++ 125 } 126 err = iterator.Error() 127 if err != nil { 128 return nil, stacktrace.Propagate(err, "") 129 } 130 + 131 + s.logger.Debug("rebuilt DID bloom filter", "itemCapacity", filterEstimatedItems, "itemCount", itemCount) 132 133 return filter, nil 134 }
+2 -1
testutil/testutil.go
··· 7 "github.com/klauspost/compress/zstd" 8 "github.com/stretchr/testify/require" 9 10 "tangled.org/gbl08ma.com/didplcbft/badgertodbm" 11 "tangled.org/gbl08ma.com/didplcbft/dbmtoiavldb" 12 "tangled.org/gbl08ma.com/didplcbft/dbmtoiavldb/zstddict" ··· 23 _, indexDB, err := badgertodbm.NewBadgerInMemoryDB() 24 require.NoError(t, err) 25 26 - factory, err := transaction.NewFactory(tree, indexDB, store.Consensus.CountOperations, store.NewInMemoryDIDBloomFilterStore()) 27 require.NoError(t, err) 28 29 return factory, tree, indexDB
··· 7 "github.com/klauspost/compress/zstd" 8 "github.com/stretchr/testify/require" 9 10 + cmtlog "github.com/cometbft/cometbft/libs/log" 11 "tangled.org/gbl08ma.com/didplcbft/badgertodbm" 12 "tangled.org/gbl08ma.com/didplcbft/dbmtoiavldb" 13 "tangled.org/gbl08ma.com/didplcbft/dbmtoiavldb/zstddict" ··· 24 _, indexDB, err := badgertodbm.NewBadgerInMemoryDB() 25 require.NoError(t, err) 26 27 + factory, err := transaction.NewFactory(tree, indexDB, store.Consensus.CountOperations, store.NewInMemoryDIDBloomFilterStore(cmtlog.NewNopLogger())) 28 require.NoError(t, err) 29 30 return factory, tree, indexDB