A very experimental PLC implementation which uses BFT consensus for decentralization
1package abciapp
2
3import (
4 "context"
5 "os"
6 "sync"
7 "time"
8
9 dbm "github.com/cometbft/cometbft-db"
10 abcitypes "github.com/cometbft/cometbft/abci/types"
11 "github.com/cometbft/cometbft/consensus"
12 "github.com/cometbft/cometbft/crypto"
13 cmtlog "github.com/cometbft/cometbft/libs/log"
14 "github.com/cometbft/cometbft/privval"
15 cmttypes "github.com/cometbft/cometbft/types"
16 "github.com/cosmos/iavl"
17 "github.com/gbl08ma/stacktrace"
18 "github.com/klauspost/compress/zstd"
19 "github.com/samber/lo"
20 "tangled.org/gbl08ma.com/didplcbft/config"
21 "tangled.org/gbl08ma.com/didplcbft/dbmtoiavldb"
22 "tangled.org/gbl08ma.com/didplcbft/dbmtoiavldb/zstddict"
23 "tangled.org/gbl08ma.com/didplcbft/plc"
24 "tangled.org/gbl08ma.com/didplcbft/store"
25 "tangled.org/gbl08ma.com/didplcbft/transaction"
26 "tangled.org/gbl08ma.com/didplcbft/types"
27)
28
29type DIDPLCApplication struct {
30 runnerContext context.Context
31 logger cmtlog.Logger
32 plc plc.PLC
33 txFactory *transaction.Factory
34 indexDB transaction.ExtendedDB
35 tree *iavl.MutableTree
36 fullyClearApplicationData func() error
37
38 mempoolSubmitter types.MempoolSubmitter
39
40 validatorPubKey crypto.PubKey
41 validatorPrivKey crypto.PrivKey
42
43 ongoingRead transaction.Read
44 ongoingWrite transaction.Write
45
46 snapshotDirectory string
47 snapshotApplier *store.SnapshotApplier
48 stateSyncTempDir string
49
50 lastProcessedProposalHash []byte
51 lastProcessedProposalExecTxResults []*processResult
52
53 aoc *authoritativeOperationsFetcher
54
55 blockHeaderGetter store.BlockHeaderGetter
56 triggerBlockCreation func()
57
58 blockChallengeCoordinator *blockChallengeCoordinator
59 rangeChallengeCoordinator *rangeChallengeCoordinator
60
61 // for snapshot creation:
62 plcConfig *config.PLCConfig
63 snapshotManagerLatestHeightChan chan int64
64 lastSnapshotHeight int64
65}
66
67// store and plc must be able to share transaction objects
68func NewDIDPLCApplication(appContext context.Context, logger cmtlog.Logger, pv *privval.FilePV, treeDB dbm.DB, indexDB transaction.ExtendedDB, clearData func(), snapshotDirectory, stateSyncTempDir, didBloomFilterPath string, mempoolSubmitter types.MempoolSubmitter, blockHeaderGetter store.BlockHeaderGetter, plcConfig *config.PLCConfig) (*DIDPLCApplication, *transaction.Factory, plc.PLC, func(), error) {
69 mkTree := func() *iavl.MutableTree {
70 // Using SpeedDefault appears to cause the processing time for ExecuteOperation to double on average
71 // Using SpeedBetterCompression appears to cause the processing time to double again
72 // By using SpeedFastest we seem to give up on like 5% size reduction, it's not worth using the slower speeds
73 return iavl.NewMutableTree(dbmtoiavldb.AdaptWithCompression(treeDB, zstd.SpeedFastest, zstddict.PLCZstdDict), 500000, true, iavl.NewNopLogger(), iavl.AsyncPruningOption(false))
74 }
75
76 tree := mkTree()
77
78 _, err := tree.Load()
79 if err != nil {
80 return nil, nil, nil, func() {}, stacktrace.Propagate(err, "error loading latest version of the tree from storage")
81 }
82
83 if snapshotDirectory != "" {
84 err = os.MkdirAll(snapshotDirectory, os.FileMode(0755))
85 if err != nil {
86 return nil, nil, nil, func() {}, stacktrace.Propagate(err)
87 }
88 }
89
90 if stateSyncTempDir != "" {
91 err = os.MkdirAll(stateSyncTempDir, os.FileMode(0755))
92 if err != nil {
93 return nil, nil, nil, func() {}, stacktrace.Propagate(err)
94 }
95 }
96
97 runnerContext, cancelRunnerContext := context.WithCancel(appContext)
98
99 d := &DIDPLCApplication{
100 runnerContext: runnerContext,
101 logger: logger.With("module", "plcapp"),
102 tree: tree,
103 indexDB: indexDB,
104 mempoolSubmitter: mempoolSubmitter,
105 snapshotDirectory: snapshotDirectory,
106 stateSyncTempDir: stateSyncTempDir,
107 blockHeaderGetter: blockHeaderGetter,
108 triggerBlockCreation: func() {},
109 plcConfig: plcConfig,
110 snapshotManagerLatestHeightChan: make(chan int64, 1),
111 }
112
113 if pv != nil {
114 d.validatorPubKey = pv.Key.PubKey
115 d.validatorPrivKey = pv.Key.PrivKey
116 }
117
118 d.txFactory, err = transaction.NewFactory(tree, indexDB, store.Consensus.CountOperations, store.NewDIDBloomFilterStore(d.logger, didBloomFilterPath))
119 if err != nil {
120 return nil, nil, nil, cancelRunnerContext, stacktrace.Propagate(err)
121 }
122
123 d.fullyClearApplicationData = func() error {
124 // we assume this is called in a single-threaded context, which should be a safe assumption since we'll only call this during snapshot import
125 // and CometBFT only calls one ABCI method at a time
126 err := d.tree.Close()
127 if err != nil {
128 return stacktrace.Propagate(err)
129 }
130
131 clearData()
132
133 *d.tree = *mkTree()
134
135 d.txFactory, err = transaction.NewFactory(tree, indexDB, store.Consensus.CountOperations, store.NewDIDBloomFilterStore(d.logger, didBloomFilterPath))
136 if err != nil {
137 return stacktrace.Propagate(err)
138 }
139 d.logger.Info("Recreating DID bloom filter")
140 err = d.txFactory.RecreateDIDBloomFilter()
141 return stacktrace.Propagate(err)
142 }
143
144 d.plc = plc.NewPLC()
145
146 d.blockChallengeCoordinator, err = newBlockChallengeCoordinator(d.runnerContext, d.logger, d.txFactory, d.blockHeaderGetter, d.validatorPubKey)
147 if err != nil {
148 return nil, nil, nil, cancelRunnerContext, stacktrace.Propagate(err)
149 }
150
151 var wg sync.WaitGroup
152 wg.Go(func() {
153 // periodically store bloom filter so we don't have to wait so long on the next startup
154 for {
155 select {
156 case <-runnerContext.Done():
157 return
158 case <-time.After(5 * time.Minute):
159 }
160
161 st := time.Now()
162 err := d.txFactory.SaveDIDBloomFilter()
163 if err != nil {
164 d.logger.Error("failed to save bloom filter", "error", stacktrace.Propagate(err))
165 }
166 d.logger.Debug("saved bloom filter", "took", time.Since(st))
167 }
168 })
169
170 if plcConfig != nil && plcConfig.SnapshotInterval > 0 && snapshotDirectory != "" {
171 h, err := store.Snapshot.MostRecentSnapshotHeight(snapshotDirectory)
172 if err != nil {
173 return nil, nil, nil, cancelRunnerContext, stacktrace.Propagate(err)
174 }
175 d.lastSnapshotHeight = int64(h)
176
177 wg.Go(func() {
178 d.runSnapshotManager(runnerContext, snapshotDirectory)
179 })
180 }
181
182 return d, d.txFactory, d.plc, func() {
183 cancelRunnerContext()
184 if d.rangeChallengeCoordinator != nil {
185 d.rangeChallengeCoordinator.Wait()
186 }
187 wg.Wait()
188 lo.Must0(d.tree.Close())
189 }, nil
190}
191
192func (d *DIDPLCApplication) logMethod(method string, keyvals ...any) func(...any) {
193 st := time.Now()
194 d.logger.Debug(method+" start", keyvals...)
195 return func(extra ...any) {
196 args := make([]any, 0, len(keyvals)+len(extra)+2)
197 args = append(args, keyvals...)
198 args = append(args, extra...)
199 args = append(args, "took", time.Since(st))
200 d.logger.Debug(method+" done", args...)
201 }
202}
203
204func (d *DIDPLCApplication) FinishInitializing(triggerBlockCreation func(), nodeEventBus *cmttypes.EventBus, nodeConsensusReactor *consensus.Reactor) error {
205 d.triggerBlockCreation = triggerBlockCreation
206
207 // ensure we resume importing even if there is no pending AuthoritativeImport tx
208 readTx := d.txFactory.ReadWorking(time.Now())
209
210 plc, err := store.Consensus.AuthoritativePLC(readTx)
211 if err != nil {
212 return stacktrace.Propagate(err)
213 }
214
215 _ = d.buildAuthoritativeOperationsFetcher(plc)
216
217 d.rangeChallengeCoordinator, err = newRangeChallengeCoordinator(
218 d.runnerContext,
219 d.logger,
220 d.validatorPubKey,
221 d.validatorPrivKey,
222 d.txFactory,
223 d.blockChallengeCoordinator,
224 d.blockHeaderGetter,
225 nodeEventBus,
226 d.mempoolSubmitter,
227 nodeConsensusReactor)
228 if err != nil {
229 return stacktrace.Propagate(err)
230 }
231
232 if d.validatorPubKey != nil {
233 err := d.rangeChallengeCoordinator.Start()
234 if err != nil {
235 return stacktrace.Propagate(err)
236 }
237 }
238
239 return nil
240}
241
242var _ abcitypes.Application = (*DIDPLCApplication)(nil)
243
244func (d *DIDPLCApplication) DiscardChanges() {
245 if d.ongoingWrite != nil {
246 d.ongoingWrite.Rollback()
247 }
248 d.ongoingWrite = nil
249 d.ongoingRead = nil
250}