A very experimental PLC implementation which uses BFT consensus for decentralization

Finish most of the work on range challenges

gbl08ma.com 7a8e3ec7 34c54a43

verified
+274 -85
+2 -8
abciapp/app.go
··· 51 51 blockStore *bftstore.BlockStore 52 52 53 53 blockChallengeCoordinator *blockChallengeCoordinator 54 - rangeChallengeCoordinator *rangeChallengeCoordinator 55 54 } 56 55 57 56 // store and plc must be able to share transaction objects 58 - func NewDIDPLCApplication(pv *privval.FilePV, treeDB dbm.DB, indexDB transaction.ExtendedDB, clearData func(), snapshotDirectory, didBloomFilterPath string, mempoolSubmitter types.MempoolSubmitter) (*DIDPLCApplication, *transaction.Factory, plc.PLC, func(), error) { 57 + func NewDIDPLCApplication(appContext context.Context, pv *privval.FilePV, treeDB dbm.DB, indexDB transaction.ExtendedDB, clearData func(), snapshotDirectory, didBloomFilterPath string, mempoolSubmitter types.MempoolSubmitter) (*DIDPLCApplication, *transaction.Factory, plc.PLC, func(), error) { 59 58 mkTree := func() *iavl.MutableTree { 60 59 // Using SpeedDefault appears to cause the processing time for ExecuteOperation to double on average 61 60 // Using SpeedBetterCompression appears to cause the processing time to double again ··· 77 76 } 78 77 } 79 78 80 - runnerContext, cancelRunnerContext := context.WithCancel(context.Background()) 79 + runnerContext, cancelRunnerContext := context.WithCancel(appContext) 81 80 82 81 d := &DIDPLCApplication{ 83 82 runnerContext: runnerContext, ··· 201 200 202 201 var err error 203 202 d.blockChallengeCoordinator, err = newBlockChallengeCoordinator(d.runnerContext, d.txFactory, blockStore, d.validatorPubKey) 204 - if err != nil { 205 - return stacktrace.Propagate(err, "") 206 - } 207 - 208 - d.rangeChallengeCoordinator, err = newRangeChallengeCoordinator(d.runnerContext, d.txFactory, blockStore, d.mempoolSubmitter, d.validatorPubKey, d.validatorPrivKey) 209 203 if err != nil { 210 204 return stacktrace.Propagate(err, "") 211 205 }
+1 -1
abciapp/app_test.go
··· 22 22 } 23 23 24 24 func TestCheckTx(t *testing.T) { 25 - app, _, _, cleanup, err := abciapp.NewDIDPLCApplication(nil, dbm.NewMemDB(), memDBWrapper{dbm.NewMemDB()}, nil, "", "", nil) 25 + app, _, _, cleanup, err := abciapp.NewDIDPLCApplication(t.Context(), nil, dbm.NewMemDB(), memDBWrapper{dbm.NewMemDB()}, nil, "", "", nil) 26 26 require.NoError(t, err) 27 27 t.Cleanup(cleanup) 28 28
-4
abciapp/block_challenge.go
··· 176 176 } 177 177 178 178 func (c *blockChallengeCoordinator) computeBlockChallengeProof(tx transaction.Read, height int64) ([]byte, error) { 179 - st := time.Now() 180 - 181 179 witness, err := c.buildPrivateChallengeWitnessForHeight(tx, height) 182 180 if err != nil { 183 181 return nil, stacktrace.Propagate(err, "") ··· 196 194 if err != nil { 197 195 return nil, stacktrace.Propagate(err, "") 198 196 } 199 - 200 - fmt.Println("COMPUTED CHALLENGE FOR BLOCK", height, "IN", time.Since(st), "SIZE", buf.Len(), "BYTES") 201 197 202 198 return buf.Bytes(), nil 203 199 }
+7 -2
abciapp/execution.go
··· 176 176 177 177 // VerifyVoteExtension implements [types.Application]. 178 178 func (d *DIDPLCApplication) VerifyVoteExtension(_ context.Context, req *abcitypes.RequestVerifyVoteExtension) (*abcitypes.ResponseVerifyVoteExtension, error) { 179 + if len(req.VoteExtension) > 200 { 180 + // that definitely ain't right 181 + return &abcitypes.ResponseVerifyVoteExtension{ 182 + Status: abcitypes.ResponseVerifyVoteExtension_REJECT, 183 + }, nil 184 + } 185 + 179 186 proofOK, err := d.blockChallengeCoordinator.verifyBlockChallengeProof(req.Height, req.ValidatorAddress, req.VoteExtension) 180 187 if err != nil { 181 188 return nil, stacktrace.Propagate(err, "") ··· 257 264 writeTx: mo.None[transaction.Write](), 258 265 aocsByPLC: d.aocsByPLC, 259 266 blockChallengeCoordinator: d.blockChallengeCoordinator, 260 - rangeChallengeCoordinator: d.rangeChallengeCoordinator, 261 267 blockStore: d.blockStore, 262 268 } 263 269 } ··· 279 285 plc: d.plc, 280 286 aocsByPLC: d.aocsByPLC, 281 287 blockChallengeCoordinator: d.blockChallengeCoordinator, 282 - rangeChallengeCoordinator: d.rangeChallengeCoordinator, 283 288 blockStore: d.blockStore, 284 289 } 285 290 }
+186 -37
abciapp/range_challenge.go
··· 4 4 "context" 5 5 "encoding/binary" 6 6 "errors" 7 + "fmt" 7 8 "math/big" 8 9 "slices" 10 + "sync" 9 11 "time" 10 12 11 13 "github.com/Yiling-J/theine-go" 12 14 "github.com/cometbft/cometbft/crypto" 15 + "github.com/cometbft/cometbft/mempool" 16 + "github.com/cometbft/cometbft/privval" 17 + "github.com/cometbft/cometbft/rpc/core" 13 18 bftstore "github.com/cometbft/cometbft/store" 19 + cmttypes "github.com/cometbft/cometbft/types" 14 20 "github.com/cosmos/iavl" 15 21 "github.com/cosmos/iavl/db" 16 22 ics23 "github.com/cosmos/ics23/go" ··· 22 28 "tangled.org/gbl08ma.com/didplcbft/types" 23 29 ) 24 30 25 - type rangeChallengeCoordinator struct { 31 + type RangeChallengeCoordinator struct { 26 32 runnerContext context.Context 27 33 28 34 isConfiguredToBeValidator bool ··· 31 37 validatorAddress []byte 32 38 txFactory *transaction.Factory 33 39 nodeBlockStore *bftstore.BlockStore 40 + nodeEventBus *cmttypes.EventBus 34 41 mempoolSubmitter types.MempoolSubmitter 42 + consensusReactor consensusReactor 35 43 36 44 treeCache *theine.LoadingCache[treeCacheKey, cachedTree] 37 45 cachedNextProofFromHeight mo.Option[int64] 46 + 47 + startMu sync.Mutex 48 + started bool 49 + wg sync.WaitGroup 50 + newBlockCh chan int64 51 + 52 + hasSubmittedChallengeCompletion bool 38 53 } 39 54 40 - func newRangeChallengeCoordinator(runnerContext context.Context, txFactory *transaction.Factory, blockStore *bftstore.BlockStore, mempoolSubmitter types.MempoolSubmitter, pubKey crypto.PubKey, privKey crypto.PrivKey) (*rangeChallengeCoordinator, error) { 41 - c := &rangeChallengeCoordinator{ 55 + type consensusReactor interface { 56 + WaitSync() bool 57 + } 58 + 59 + func NewRangeChallengeCoordinator( 60 + runnerContext context.Context, 61 + txFactory *transaction.Factory, 62 + blockStore *bftstore.BlockStore, 63 + nodeEventBus *cmttypes.EventBus, 64 + mempoolSubmitter types.MempoolSubmitter, 65 + consensusReactor consensusReactor, 66 + pv *privval.FilePV) (*RangeChallengeCoordinator, error) { 67 + c := &RangeChallengeCoordinator{ 42 68 txFactory: txFactory, 43 69 runnerContext: runnerContext, 44 70 nodeBlockStore: blockStore, 71 + nodeEventBus: nodeEventBus, 45 72 mempoolSubmitter: mempoolSubmitter, 46 - isConfiguredToBeValidator: pubKey != nil, 47 - validatorPubKey: pubKey, 48 - validatorPrivKey: privKey, 73 + consensusReactor: consensusReactor, 74 + isConfiguredToBeValidator: pv != nil, 75 + newBlockCh: make(chan int64), 49 76 } 50 77 if c.isConfiguredToBeValidator { 51 - c.validatorAddress = pubKey.Address() 78 + c.validatorPubKey = pv.Key.PubKey 79 + c.validatorPrivKey = pv.Key.PrivKey 80 + c.validatorAddress = c.validatorPubKey.Address() 52 81 } 53 82 54 83 var err error ··· 60 89 return c, nil 61 90 } 62 91 92 + func (c *RangeChallengeCoordinator) Start() error { 93 + c.startMu.Lock() 94 + defer c.startMu.Unlock() 95 + if c.started { 96 + return stacktrace.NewError("already started") 97 + } 98 + c.wg.Go(func() { 99 + err := c.newBlocksSubscriber() 100 + if err != nil { 101 + fmt.Println("newBlocksSubscriber FAILED:", err) 102 + } 103 + }) 104 + c.wg.Go(func() { 105 + for { 106 + select { 107 + case <-c.runnerContext.Done(): 108 + return 109 + case newHeight := <-c.newBlockCh: 110 + func() { 111 + ctx, cancel := context.WithTimeout(c.runnerContext, CommitToChallengeMaxAge) 112 + defer cancel() 113 + err := c.onNewBlock(ctx, newHeight) 114 + if err != nil { 115 + // note: this is expected in certain circumstances, such as the proof for the toHeight block not being ready yet as the block was just finalized 116 + // (and the block may have been finalized without our votes) 117 + fmt.Println("onNewBlock FAILED:", err) 118 + } 119 + }() 120 + } 121 + } 122 + }) 123 + c.started = true 124 + return nil 125 + } 126 + 127 + func (c *RangeChallengeCoordinator) Wait() { 128 + c.startMu.Lock() 129 + started := c.started 130 + c.startMu.Unlock() 131 + if !started { 132 + return 133 + } 134 + c.wg.Wait() 135 + } 136 + 63 137 type treeCacheKey struct { 64 138 startHeight int64 65 139 endHeight int64 ··· 70 144 root []byte 71 145 } 72 146 73 - func (c *rangeChallengeCoordinator) getOrFetchNextProofFromHeight(tx transaction.Read) (int64, error) { 74 - if !c.isConfiguredToBeValidator { 75 - return 0, stacktrace.NewError("not configured to be a validator") 76 - } 147 + func (c *RangeChallengeCoordinator) getOrFetchNextProofFromHeight(tx transaction.Read) (int64, error) { 77 148 if completion, hasCache := c.cachedNextProofFromHeight.Get(); hasCache { 78 149 return completion, nil 79 150 } ··· 101 172 return minProvable, nil 102 173 } 103 174 104 - func (c *rangeChallengeCoordinator) onNewBlock(ctx context.Context, newBlockHeight int64) error { 175 + func (c *RangeChallengeCoordinator) newBlocksSubscriber() error { 176 + subscriber := "rangeChallengeCoordinator" 177 + 178 + subCtx, cancel := context.WithTimeout(c.runnerContext, core.SubscribeTimeout) 179 + defer cancel() 180 + blocksSub, err := c.nodeEventBus.Subscribe(subCtx, subscriber, cmttypes.EventQueryNewBlockHeader) 181 + if err != nil { 182 + return stacktrace.Propagate(err, "failed to subscribe to new blocks") 183 + } 184 + defer func() { 185 + err := c.nodeEventBus.Unsubscribe(context.Background(), subscriber, cmttypes.EventQueryNewBlockHeader) 186 + _ = err 187 + }() 188 + 189 + for { 190 + select { 191 + case <-c.runnerContext.Done(): 192 + return nil 193 + case msg := <-blocksSub.Out(): 194 + newBlockHeaderEvent := msg.Data().(cmttypes.EventDataNewBlockHeader) 195 + // We can't block here! Otherwise our subscription will be terminated for not consuming messages fast enough 196 + // It is fine if we skip some blocks (worst case, we might miss our challenge commitment window and have to commit again) 197 + select { 198 + case c.newBlockCh <- newBlockHeaderEvent.Header.Height: 199 + default: 200 + } 201 + case <-blocksSub.Canceled(): 202 + err := blocksSub.Err() 203 + if err == nil { 204 + err = stacktrace.NewError("CometBFT exited") 205 + } 206 + return stacktrace.Propagate(err, "blocksSub was canceled") 207 + } 208 + } 209 + } 210 + 211 + func (c *RangeChallengeCoordinator) onNewBlock(ctx context.Context, newBlockHeight int64) error { 105 212 if !c.isConfiguredToBeValidator { 106 213 return nil 107 214 } 108 - 109 - // TODO if ABCI is just replaying old blocks -> do nothing 110 - // TODO wait until block is actually committed 111 - // ^ solution to both of these problems: don't call this method from the ABCI app, instead, call it from an "outside subscriber" that listens for truly new blocks 215 + if c.consensusReactor.WaitSync() { 216 + // still catching up 217 + return nil 218 + } 112 219 113 220 tx := c.txFactory.ReadCommitted() 114 221 if tx.Height() < newBlockHeight { 115 - // the ABCI app is probably still catching up to new blocks 116 - return nil 222 + return stacktrace.NewError("read committed transaction height lower than expected: new block height %d, transaction height %d", newBlockHeight, tx.Height()) 117 223 } 118 224 119 225 shouldCommitToChallenge := false ··· 121 227 122 228 fromHeight, toHeight, provenHeight, includedOnHeight, _, err := store.Consensus.ValidatorRangeChallengeCommitment(tx, c.validatorAddress) 123 229 if errors.Is(err, store.ErrNoActiveChallengeCommitment) { 230 + deleteOldProofs := false 231 + if c.hasSubmittedChallengeCompletion { 232 + // this means our previous commitment just cleared due to being accepted 233 + // we can delete the proofs we don't need anymore 234 + deleteOldProofs = true 235 + c.hasSubmittedChallengeCompletion = false 236 + c.cachedNextProofFromHeight = mo.None[int64]() 237 + } 124 238 nextFromHeight, err := c.getOrFetchNextProofFromHeight(tx) 125 239 if err != nil { 126 240 return stacktrace.Propagate(err, "") 127 241 } 128 - shouldCommitToChallenge = nextFromHeight+CommitToChallengeMinRange+50 <= newBlockHeight 242 + 243 + if deleteOldProofs { 244 + writeTx, err := tx.UpgradeForIndexOnly() 245 + if err != nil { 246 + return stacktrace.Propagate(err, "") 247 + } 248 + defer writeTx.Rollback() 249 + 250 + err = store.Consensus.DeleteBlockChallengeProofsBelowHeight(ctx, writeTx, uint64(nextFromHeight)) 251 + if err != nil { 252 + return stacktrace.Propagate(err, "") 253 + } 254 + 255 + err = writeTx.Commit() 256 + if err != nil { 257 + return stacktrace.Propagate(err, "") 258 + } 259 + } 260 + 261 + shouldCommitToChallenge = nextFromHeight+CommitToChallengeTargetInterval-1 <= newBlockHeight 129 262 } else if err != nil { 130 263 return stacktrace.Propagate(err, "") 131 264 } else { ··· 144 277 } 145 278 } 146 279 280 + fmt.Println("RANGE CHALLENGE EVAL", shouldCommitToChallenge, shouldCompleteChallenge) 281 + 147 282 var transactionBytes []byte 148 283 if shouldCompleteChallenge { 149 284 transactionBytes, err = c.createCompleteChallengeTx(ctx, tx, int64(fromHeight), int64(toHeight), int64(provenHeight), int64(includedOnHeight)) ··· 153 288 } else if shouldCommitToChallenge { 154 289 transactionBytes, err = c.createCommitToChallengeTx(ctx, tx, newBlockHeight) 155 290 if err != nil { 291 + if errors.Is(err, errMissingProofs) { 292 + // this is expected on nodes that take longer to generate the proofs 293 + // shouldCommitToChallenge will be true again on the next block and we'll try again 294 + return nil 295 + } 156 296 return stacktrace.Propagate(err, "") 157 297 } 158 298 } 159 299 160 - _, err = c.mempoolSubmitter.BroadcastTxCommit(ctx, transactionBytes) 300 + if len(transactionBytes) == 0 { 301 + return nil 302 + } 303 + 304 + result, err := c.mempoolSubmitter.BroadcastTx(ctx, transactionBytes, true) 161 305 if err != nil { 306 + if errors.Is(err, mempool.ErrTxInCache) { 307 + // expected, as we don't wait for broadcast and therefore will try to repeatedly commit/complete 308 + return nil 309 + } 162 310 return stacktrace.Propagate(err, "") 163 311 } 312 + if result.CheckTx.Code == 0 && shouldCompleteChallenge { 313 + c.hasSubmittedChallengeCompletion = true 314 + } 164 315 c.cachedNextProofFromHeight = mo.None[int64]() 165 316 return nil 166 317 } 167 318 168 - func (c *rangeChallengeCoordinator) createCommitToChallengeTx(ctx context.Context, tx transaction.Read, toHeight int64) ([]byte, error) { 169 - if !c.isConfiguredToBeValidator { 170 - return nil, stacktrace.NewError("not configured to be validator") 171 - } 172 - 319 + func (c *RangeChallengeCoordinator) createCommitToChallengeTx(ctx context.Context, tx transaction.Read, toHeight int64) ([]byte, error) { 173 320 fromHeight, err := c.getOrFetchNextProofFromHeight(tx) 174 321 if err != nil { 175 322 return nil, stacktrace.Propagate(err, "") 176 323 } 177 324 178 - if toHeight < fromHeight+CommitToChallengeMinRange { 325 + rangeSize := toHeight - fromHeight + 1 326 + if rangeSize < CommitToChallengeMinRange { 179 327 return nil, stacktrace.NewError("insufficient blocks passed") 180 328 } 181 329 182 - if toHeight > fromHeight+CommitToChallengeMaxRange { 183 - fromHeight = toHeight - CommitToChallengeMaxRange 330 + if rangeSize > CommitToChallengeMaxRange { 331 + fromHeight = toHeight - CommitToChallengeMaxRange + 1 184 332 } 185 333 186 334 pubKeyArg, err := MarshalPubKeyForArguments(c.validatorPubKey) ··· 232 380 return out, nil 233 381 } 234 382 235 - func (c *rangeChallengeCoordinator) createCompleteChallengeTx(ctx context.Context, tx transaction.Read, fromHeight, toHeight, prevProvenHeight, commitmentIncludedOnHeight int64) ([]byte, error) { 236 - if !c.isConfiguredToBeValidator { 237 - return nil, stacktrace.NewError("not configured to be validator") 238 - } 239 - 383 + func (c *RangeChallengeCoordinator) createCompleteChallengeTx(ctx context.Context, tx transaction.Read, fromHeight, toHeight, prevProvenHeight, commitmentIncludedOnHeight int64) ([]byte, error) { 240 384 nextBlockMeta := c.nodeBlockStore.LoadBlockMeta(commitmentIncludedOnHeight + 1) 241 385 if nextBlockMeta == nil { 242 386 return nil, stacktrace.NewError("block not found at height") ··· 269 413 return out, nil 270 414 } 271 415 272 - func (c *rangeChallengeCoordinator) computeRangeChallengeProof(ctx context.Context, tx transaction.Read, startHeight, endHeight, proveHeight int64) ([]byte, *ics23.CommitmentProof, error) { 416 + func (c *RangeChallengeCoordinator) computeRangeChallengeProof(ctx context.Context, tx transaction.Read, startHeight, endHeight, proveHeight int64) ([]byte, *ics23.CommitmentProof, error) { 273 417 ctx = context.WithValue(ctx, contextTxKey{}, tx) 274 418 ct, err := c.treeCache.Get(ctx, treeCacheKey{ 275 419 startHeight: startHeight, ··· 300 444 301 445 candidateHeight := fromHeight + int64(randOffset.Uint64()) 302 446 if h, ok := avoidHeight.Get(); ok && candidateHeight == h { 303 - candidateHeight = (candidateHeight + 1) % int64(numBlocks) 447 + candidateHeight++ 448 + if candidateHeight > toHeight { 449 + candidateHeight = fromHeight 450 + } 304 451 } 305 452 return candidateHeight 306 453 } 307 454 308 - func (c *rangeChallengeCoordinator) proofTreeLoader(ctx context.Context, cacheKey treeCacheKey) (theine.Loaded[cachedTree], error) { 455 + var errMissingProofs = errors.New("missing block challenge proofs in requested range") 456 + 457 + func (c *RangeChallengeCoordinator) proofTreeLoader(ctx context.Context, cacheKey treeCacheKey) (theine.Loaded[cachedTree], error) { 309 458 var zeroValue theine.Loaded[cachedTree] 310 459 311 460 anyTx := ctx.Value(contextTxKey{}) ··· 345 494 } 346 495 347 496 if immutableTree.Size() != cacheKey.endHeight-cacheKey.startHeight+1 { 348 - return zeroValue, stacktrace.NewError("missing block challenge proofs in requested range") 497 + return zeroValue, stacktrace.Propagate(errMissingProofs, "") 349 498 } 350 499 351 500 return theine.Loaded[cachedTree]{
+6 -3
abciapp/tx.go
··· 31 31 plc plc.PLC 32 32 aocsByPLC map[string]*authoritativeOperationsCache 33 33 blockChallengeCoordinator *blockChallengeCoordinator 34 - rangeChallengeCoordinator *rangeChallengeCoordinator 35 34 blockStore *bftstore.BlockStore 36 35 } 37 36 ··· 102 101 Key []byte `json:"key" refmt:"key"` 103 102 } 104 103 104 + func init() { 105 + cbornode.RegisterCborType(PubKeyInArguments{}) 106 + } 107 + 105 108 func SignTransaction[ArgType ArgumentType](privKey crypto.PrivKey, tx Transaction[ArgType]) (Transaction[ArgType], error) { 106 109 var zeroValue Transaction[ArgType] 107 110 ··· 143 146 144 147 var pubKey crypto.PubKey 145 148 switch publicKey.Type { 146 - case cmttypes.ABCIPubKeyTypeEd25519: 149 + case ed25519.PubKeyName: 147 150 pubKey = ed25519.PubKey(publicKey.Key) 148 - case cmttypes.ABCIPubKeyTypeSecp256k1: 151 + case secp256k1.PubKeyName: 149 152 pubKey = secp256k1.PubKey(publicKey.Key) 150 153 } 151 154
+6 -5
abciapp/tx_challenge.go
··· 19 19 const CommitToChallengeMaxAgeInBlocks = 3 20 20 const CommitToChallengeMinRange = 1000 21 21 const CommitToChallengeMaxRange = 10000 22 + const CommitToChallengeTargetInterval = 5000 22 23 23 24 var CommitToChallengeMaxAge = lo.Must(time.ParseDuration("10s")) 24 25 25 - const CompleteChallengeMaxAgeInBlocks = 3 26 + const CompleteChallengeMaxAgeInBlocks = 4 26 27 27 28 var CompleteChallengeMaxAge = lo.Must(time.ParseDuration("10s")) 28 29 ··· 245 246 }, nil 246 247 } 247 248 248 - includedHeightBlockMeta := deps.blockStore.LoadBlockMeta(int64(includedOnHeight)) 249 - if includedHeightBlockMeta == nil { 249 + blockAfterMeta := deps.blockStore.LoadBlockMeta(int64(includedOnHeight + 1)) 250 + if blockAfterMeta == nil { 250 251 // this shouldn't happen unless the prover is submitting the completion on the same block as the commitment 251 252 return &processResult{ 252 253 Code: 4302, ··· 254 255 }, nil 255 256 } 256 257 257 - if time.Since(includedHeightBlockMeta.Header.Time) > CompleteChallengeMaxAge { 258 + if time.Since(blockAfterMeta.Header.Time) > CompleteChallengeMaxAge { 258 259 // validator must commit to a new challenge 259 260 return &processResult{ 260 261 Code: 4303, ··· 275 276 proofHeight := int64(binary.BigEndian.Uint64(existenceProof.Key)) 276 277 277 278 expectedProofHeight := computeHeightToProveInRange( 278 - includedHeightBlockMeta.Header.LastCommitHash.Bytes(), 279 + blockAfterMeta.Header.LastCommitHash.Bytes(), 279 280 tx.Arguments.Validator, 280 281 int64(fromHeight), 281 282 int64(toHeight),
+2 -2
httpapi/server.go
··· 228 228 return 229 229 } 230 230 231 - // broadcastTxCommit will wait for inclusion until the context deadline expires 231 + // we'll wait for inclusion until the context deadline expires 232 232 // in practice we expect operations to be included in about one second 233 - result, err := s.mempoolSubmitter.BroadcastTxCommit(r.Context(), txBytes) 233 + result, err := s.mempoolSubmitter.BroadcastTx(r.Context(), txBytes, true) 234 234 // TODO more robust error handling 235 235 if handlePLCError(w, err, "") { 236 236 return
+20 -7
main.go
··· 1 1 package main 2 2 3 3 import ( 4 + "context" 4 5 "flag" 5 6 "fmt" 6 7 "log" 7 8 "os" 8 9 "os/signal" 9 10 "path/filepath" 10 - "sync" 11 11 "syscall" 12 12 "time" 13 13 ··· 52 52 log.Fatalf("Invalid configuration data: %v", err) 53 53 } 54 54 55 - var wg sync.WaitGroup 56 - closeGoroutinesCh := make(chan struct{}) 57 - 58 55 underlyingTreeDB, treeDB, err := badgertodbm.NewBadgerDB("apptree", config.Config.DBDir()) 59 56 if err != nil { 60 57 log.Fatalf("failed to create application tree database: %v", err) ··· 97 94 config.PrivValidatorStateFile(), 98 95 ) 99 96 100 - app, txFactory, plc, cleanup, err := abciapp.NewDIDPLCApplication(pv, treeDB, indexDB, recreateDatabases, filepath.Join(homeDir, "snapshots"), didBloomFilterPath, mempoolSubmitter) 97 + appContext, cancelAppContext := context.WithCancel(context.Background()) 98 + defer cancelAppContext() 99 + 100 + app, txFactory, plc, cleanup, err := abciapp.NewDIDPLCApplication(appContext, pv, treeDB, indexDB, recreateDatabases, filepath.Join(homeDir, "snapshots"), didBloomFilterPath, mempoolSubmitter) 101 101 if err != nil { 102 102 log.Fatalf("failed to create DIDPLC application: %v", err) 103 103 } ··· 137 137 log.Fatalf("Finishing ABCI app initialization: %v", err) 138 138 } 139 139 140 + rangeChallengeCoordinator, err := abciapp.NewRangeChallengeCoordinator(appContext, txFactory, node.BlockStore(), node.EventBus(), mempoolSubmitter, node.ConsensusReactor(), pv) 141 + if err != nil { 142 + log.Fatalf("Creating RangeChallengeCoordinator: %v", err) 143 + } 144 + 140 145 err = node.Start() 141 146 if err != nil { 142 147 log.Fatalf("Starting node: %v", err) ··· 162 167 }() 163 168 } 164 169 170 + if pv != nil { 171 + err := rangeChallengeCoordinator.Start() 172 + if err != nil { 173 + log.Fatalf("Starting RangeChallengeCoordinator: %v", err) 174 + } 175 + defer rangeChallengeCoordinator.Wait() 176 + } 177 + 178 + defer cancelAppContext() 179 + 165 180 c := make(chan os.Signal, 1) 166 181 signal.Notify(c, os.Interrupt, syscall.SIGTERM) 167 182 <-c 168 - close(closeGoroutinesCh) 169 - wg.Wait() 170 183 }
+26 -2
store/consensus.go
··· 76 76 BlockChallengeProof(tx transaction.Read, height uint64) ([]byte, error) 77 77 BlockChalengeProofsIterator(tx transaction.Read, afterHeight uint64, retErr *error) iter.Seq2[uint64, []byte] // afterHeight is exclusive for consistency with OperationsIterator 78 78 StoreBlockChallengeProof(ctx context.Context, tx transaction.WriteIndex, blockHeight uint64, proof []byte) error 79 + DeleteBlockChallengeProofsBelowHeight(ctx context.Context, tx transaction.WriteIndex, blockHeight uint64) error 79 80 } 80 81 81 82 var _ ConsensusStore = (*consensusStore)(nil) ··· 655 656 } 656 657 657 658 func marshalRangeChallengeCommitmentValue(fromHeight, toHeight, provenHeight, includedOnHeight uint64, treeRoot []byte) []byte { 658 - value := make([]byte, 8+8+8+32) 659 + value := make([]byte, 8*4+32) 659 660 binary.BigEndian.PutUint64(value, fromHeight) 660 661 binary.BigEndian.PutUint64(value[8:], toHeight) 661 662 binary.BigEndian.PutUint64(value[16:], provenHeight) 662 663 binary.BigEndian.PutUint64(value[24:], includedOnHeight) 663 - copy(value[32:], treeRoot[0:24]) 664 + copy(value[32:], treeRoot[0:32]) 664 665 return value 665 666 } 666 667 ··· 795 796 err := tx.IndexDB().Set(marshalBlockChallengeProofKey(blockHeight), proof) 796 797 return stacktrace.Propagate(err, "") 797 798 } 799 + 800 + func (t *consensusStore) DeleteBlockChallengeProofsBelowHeight(ctx context.Context, tx transaction.WriteIndex, blockHeight uint64) error { 801 + // we should be good to delete as we iterate, as the writes within the transaction only hit the underlying DB implementation on commit 802 + startKey := marshalBlockChallengeProofKey(0) 803 + endKey := marshalBlockChallengeProofKey(blockHeight) 804 + 805 + proofsIterator, err := tx.IndexDB().Iterator(startKey, endKey) 806 + if err != nil { 807 + return stacktrace.Propagate(err, "") 808 + } 809 + 810 + defer proofsIterator.Close() 811 + 812 + for proofsIterator.Valid() { 813 + err = tx.IndexDB().Delete(slices.Clone(proofsIterator.Key())) 814 + if err != nil { 815 + return stacktrace.Propagate(err, "") 816 + } 817 + 818 + proofsIterator.Next() 819 + } 820 + return nil 821 + }
+17 -13
tx_submitter.go
··· 20 20 21 21 var _ types.MempoolSubmitter = (*txSubmitter)(nil) 22 22 23 - // BroadcastTxCommit implements [types.MempoolSubmitter]. 24 - func (t *txSubmitter) BroadcastTxCommit(ctx context.Context, tx cmttypes.Tx) (*coretypes.ResultBroadcastTxCommit, error) { 23 + // BroadcastTx implements [types.MempoolSubmitter]. 24 + func (t *txSubmitter) BroadcastTx(ctx context.Context, tx cmttypes.Tx, waitForInclusion bool) (*coretypes.ResultBroadcastTxCommit, error) { 25 25 uuid, err := uuid.NewRandom() 26 26 if err != nil { 27 27 return nil, stacktrace.Propagate(err, "") ··· 30 30 eventBus := t.node.EventBus() 31 31 mempool := t.node.Mempool() 32 32 // Subscribe to tx being committed in block. 33 - subCtx, cancel := context.WithTimeout(ctx, core.SubscribeTimeout) 34 - defer cancel() 35 - q := cmttypes.EventQueryTxFor(tx) 36 - txSub, err := eventBus.Subscribe(subCtx, subscriber, q) 37 - if err != nil { 38 - return nil, stacktrace.Propagate(err, "failed to subscribe to tx") 33 + 34 + var txSub cmttypes.Subscription 35 + if waitForInclusion { 36 + subCtx, cancel := context.WithTimeout(ctx, core.SubscribeTimeout) 37 + defer cancel() 38 + q := cmttypes.EventQueryTxFor(tx) 39 + txSub, err = eventBus.Subscribe(subCtx, subscriber, q) 40 + if err != nil { 41 + return nil, stacktrace.Propagate(err, "failed to subscribe to tx") 42 + } 43 + defer func() { 44 + err := eventBus.Unsubscribe(context.Background(), subscriber, q) 45 + _ = err 46 + }() 39 47 } 40 - defer func() { 41 - err := eventBus.Unsubscribe(context.Background(), subscriber, q) 42 - _ = err 43 - }() 44 48 45 49 // Broadcast tx and wait for CheckTx result 46 50 checkTxResCh := make(chan *abci.ResponseCheckTx, 1) ··· 57 61 case <-ctx.Done(): 58 62 return nil, stacktrace.Propagate(ctx.Err(), "broadcast confirmation not received") 59 63 case checkTxRes := <-checkTxResCh: 60 - if checkTxRes.Code != abci.CodeTypeOK { 64 + if !waitForInclusion || checkTxRes.Code != abci.CodeTypeOK { 61 65 return &coretypes.ResultBroadcastTxCommit{ 62 66 CheckTx: *checkTxRes, 63 67 TxResult: abci.ExecTxResult{},
+1 -1
types/tx.go
··· 8 8 ) 9 9 10 10 type MempoolSubmitter interface { 11 - BroadcastTxCommit(ctx context.Context, tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) 11 + BroadcastTx(ctx context.Context, tx types.Tx, waitForInclusion bool) (*ctypes.ResultBroadcastTxCommit, error) 12 12 }