A very experimental PLC implementation which uses BFT consensus for decentralization
at main 96 lines 2.9 kB view raw
1package main 2 3import ( 4 "context" 5 6 abci "github.com/cometbft/cometbft/abci/types" 7 mempl "github.com/cometbft/cometbft/mempool" 8 "github.com/cometbft/cometbft/node" 9 "github.com/cometbft/cometbft/rpc/core" 10 coretypes "github.com/cometbft/cometbft/rpc/core/types" 11 cmttypes "github.com/cometbft/cometbft/types" 12 "github.com/gbl08ma/stacktrace" 13 "github.com/google/uuid" 14 "tangled.org/gbl08ma.com/didplcbft/types" 15) 16 17type txSubmitter struct { 18 node *node.Node 19} 20 21var _ types.MempoolSubmitter = (*txSubmitter)(nil) 22 23// BroadcastTx implements [types.MempoolSubmitter]. 24func (t *txSubmitter) BroadcastTx(ctx context.Context, tx cmttypes.Tx, waitForInclusion bool) (*coretypes.ResultBroadcastTxCommit, error) { 25 uuid, err := uuid.NewRandom() 26 if err != nil { 27 return nil, stacktrace.Propagate(err) 28 } 29 subscriber := uuid.String() 30 eventBus := t.node.EventBus() 31 mempool := t.node.Mempool() 32 // Subscribe to tx being committed in block. 33 34 var txSub cmttypes.Subscription 35 if waitForInclusion { 36 subCtx, cancel := context.WithTimeout(ctx, core.SubscribeTimeout) 37 defer cancel() 38 q := cmttypes.EventQueryTxFor(tx) 39 txSub, err = eventBus.Subscribe(subCtx, subscriber, q) 40 if err != nil { 41 return nil, stacktrace.Propagate(err, "failed to subscribe to tx") 42 } 43 defer func() { 44 err := eventBus.Unsubscribe(context.Background(), subscriber, q) 45 _ = err 46 }() 47 } 48 49 // Broadcast tx and wait for CheckTx result 50 checkTxResCh := make(chan *abci.ResponseCheckTx, 1) 51 err = mempool.CheckTx(tx, func(res *abci.ResponseCheckTx) { 52 select { 53 case <-ctx.Done(): 54 case checkTxResCh <- res: 55 } 56 }, mempl.TxInfo{}) 57 if err != nil { 58 return nil, stacktrace.Propagate(err, "error on broadcastTxCommit") 59 } 60 select { 61 case <-ctx.Done(): 62 return nil, stacktrace.Propagate(ctx.Err(), "broadcast confirmation not received") 63 case checkTxRes := <-checkTxResCh: 64 if !waitForInclusion || checkTxRes.Code != abci.CodeTypeOK { 65 return &coretypes.ResultBroadcastTxCommit{ 66 CheckTx: *checkTxRes, 67 TxResult: abci.ExecTxResult{}, 68 Hash: tx.Hash(), 69 }, nil 70 } 71 72 // Wait for the tx to be included in a block or timeout. 73 select { 74 case <-ctx.Done(): 75 return nil, stacktrace.Propagate(ctx.Err(), "inclusion confirmation not received") 76 case msg := <-txSub.Out(): // The tx was included in a block. 77 txResultEvent := msg.Data().(cmttypes.EventDataTx) 78 return &coretypes.ResultBroadcastTxCommit{ 79 CheckTx: *checkTxRes, 80 TxResult: txResultEvent.Result, 81 Hash: tx.Hash(), 82 Height: txResultEvent.Height, 83 }, nil 84 case <-txSub.Canceled(): 85 err := txSub.Err() 86 if err == nil { 87 err = stacktrace.NewError("CometBFT exited") 88 } 89 return &coretypes.ResultBroadcastTxCommit{ 90 CheckTx: *checkTxRes, 91 TxResult: abci.ExecTxResult{}, 92 Hash: tx.Hash(), 93 }, stacktrace.Propagate(err, "txSub was canceled") 94 } 95 } 96}