package main import ( "context" abci "github.com/cometbft/cometbft/abci/types" mempl "github.com/cometbft/cometbft/mempool" "github.com/cometbft/cometbft/node" "github.com/cometbft/cometbft/rpc/core" coretypes "github.com/cometbft/cometbft/rpc/core/types" cmttypes "github.com/cometbft/cometbft/types" "github.com/gbl08ma/stacktrace" "github.com/google/uuid" "tangled.org/gbl08ma.com/didplcbft/types" ) type txSubmitter struct { node *node.Node } var _ types.MempoolSubmitter = (*txSubmitter)(nil) // BroadcastTx implements [types.MempoolSubmitter]. func (t *txSubmitter) BroadcastTx(ctx context.Context, tx cmttypes.Tx, waitForInclusion bool) (*coretypes.ResultBroadcastTxCommit, error) { uuid, err := uuid.NewRandom() if err != nil { return nil, stacktrace.Propagate(err) } subscriber := uuid.String() eventBus := t.node.EventBus() mempool := t.node.Mempool() // Subscribe to tx being committed in block. var txSub cmttypes.Subscription if waitForInclusion { subCtx, cancel := context.WithTimeout(ctx, core.SubscribeTimeout) defer cancel() q := cmttypes.EventQueryTxFor(tx) txSub, err = eventBus.Subscribe(subCtx, subscriber, q) if err != nil { return nil, stacktrace.Propagate(err, "failed to subscribe to tx") } defer func() { err := eventBus.Unsubscribe(context.Background(), subscriber, q) _ = err }() } // Broadcast tx and wait for CheckTx result checkTxResCh := make(chan *abci.ResponseCheckTx, 1) err = mempool.CheckTx(tx, func(res *abci.ResponseCheckTx) { select { case <-ctx.Done(): case checkTxResCh <- res: } }, mempl.TxInfo{}) if err != nil { return nil, stacktrace.Propagate(err, "error on broadcastTxCommit") } select { case <-ctx.Done(): return nil, stacktrace.Propagate(ctx.Err(), "broadcast confirmation not received") case checkTxRes := <-checkTxResCh: if !waitForInclusion || checkTxRes.Code != abci.CodeTypeOK { return &coretypes.ResultBroadcastTxCommit{ CheckTx: *checkTxRes, TxResult: abci.ExecTxResult{}, Hash: tx.Hash(), }, nil } // Wait for the tx to be included in a block or timeout. select { case <-ctx.Done(): return nil, stacktrace.Propagate(ctx.Err(), "inclusion confirmation not received") case msg := <-txSub.Out(): // The tx was included in a block. txResultEvent := msg.Data().(cmttypes.EventDataTx) return &coretypes.ResultBroadcastTxCommit{ CheckTx: *checkTxRes, TxResult: txResultEvent.Result, Hash: tx.Hash(), Height: txResultEvent.Height, }, nil case <-txSub.Canceled(): err := txSub.Err() if err == nil { err = stacktrace.NewError("CometBFT exited") } return &coretypes.ResultBroadcastTxCommit{ CheckTx: *checkTxRes, TxResult: abci.ExecTxResult{}, Hash: tx.Hash(), }, stacktrace.Propagate(err, "txSub was canceled") } } }