A very experimental PLC implementation which uses BFT consensus for decentralization
1package main
2
3import (
4 "context"
5
6 abci "github.com/cometbft/cometbft/abci/types"
7 mempl "github.com/cometbft/cometbft/mempool"
8 "github.com/cometbft/cometbft/node"
9 "github.com/cometbft/cometbft/rpc/core"
10 coretypes "github.com/cometbft/cometbft/rpc/core/types"
11 cmttypes "github.com/cometbft/cometbft/types"
12 "github.com/gbl08ma/stacktrace"
13 "github.com/google/uuid"
14 "tangled.org/gbl08ma.com/didplcbft/types"
15)
16
17type txSubmitter struct {
18 node *node.Node
19}
20
21var _ types.MempoolSubmitter = (*txSubmitter)(nil)
22
23// BroadcastTx implements [types.MempoolSubmitter].
24func (t *txSubmitter) BroadcastTx(ctx context.Context, tx cmttypes.Tx, waitForInclusion bool) (*coretypes.ResultBroadcastTxCommit, error) {
25 uuid, err := uuid.NewRandom()
26 if err != nil {
27 return nil, stacktrace.Propagate(err)
28 }
29 subscriber := uuid.String()
30 eventBus := t.node.EventBus()
31 mempool := t.node.Mempool()
32 // Subscribe to tx being committed in block.
33
34 var txSub cmttypes.Subscription
35 if waitForInclusion {
36 subCtx, cancel := context.WithTimeout(ctx, core.SubscribeTimeout)
37 defer cancel()
38 q := cmttypes.EventQueryTxFor(tx)
39 txSub, err = eventBus.Subscribe(subCtx, subscriber, q)
40 if err != nil {
41 return nil, stacktrace.Propagate(err, "failed to subscribe to tx")
42 }
43 defer func() {
44 err := eventBus.Unsubscribe(context.Background(), subscriber, q)
45 _ = err
46 }()
47 }
48
49 // Broadcast tx and wait for CheckTx result
50 checkTxResCh := make(chan *abci.ResponseCheckTx, 1)
51 err = mempool.CheckTx(tx, func(res *abci.ResponseCheckTx) {
52 select {
53 case <-ctx.Done():
54 case checkTxResCh <- res:
55 }
56 }, mempl.TxInfo{})
57 if err != nil {
58 return nil, stacktrace.Propagate(err, "error on broadcastTxCommit")
59 }
60 select {
61 case <-ctx.Done():
62 return nil, stacktrace.Propagate(ctx.Err(), "broadcast confirmation not received")
63 case checkTxRes := <-checkTxResCh:
64 if !waitForInclusion || checkTxRes.Code != abci.CodeTypeOK {
65 return &coretypes.ResultBroadcastTxCommit{
66 CheckTx: *checkTxRes,
67 TxResult: abci.ExecTxResult{},
68 Hash: tx.Hash(),
69 }, nil
70 }
71
72 // Wait for the tx to be included in a block or timeout.
73 select {
74 case <-ctx.Done():
75 return nil, stacktrace.Propagate(ctx.Err(), "inclusion confirmation not received")
76 case msg := <-txSub.Out(): // The tx was included in a block.
77 txResultEvent := msg.Data().(cmttypes.EventDataTx)
78 return &coretypes.ResultBroadcastTxCommit{
79 CheckTx: *checkTxRes,
80 TxResult: txResultEvent.Result,
81 Hash: tx.Hash(),
82 Height: txResultEvent.Height,
83 }, nil
84 case <-txSub.Canceled():
85 err := txSub.Err()
86 if err == nil {
87 err = stacktrace.NewError("CometBFT exited")
88 }
89 return &coretypes.ResultBroadcastTxCommit{
90 CheckTx: *checkTxRes,
91 TxResult: abci.ExecTxResult{},
92 Hash: tx.Hash(),
93 }, stacktrace.Propagate(err, "txSub was canceled")
94 }
95 }
96}