A very experimental PLC implementation which uses BFT consensus for decentralization
at main 250 lines 8.0 kB view raw
1package abciapp 2 3import ( 4 "context" 5 "os" 6 "sync" 7 "time" 8 9 dbm "github.com/cometbft/cometbft-db" 10 abcitypes "github.com/cometbft/cometbft/abci/types" 11 "github.com/cometbft/cometbft/consensus" 12 "github.com/cometbft/cometbft/crypto" 13 cmtlog "github.com/cometbft/cometbft/libs/log" 14 "github.com/cometbft/cometbft/privval" 15 cmttypes "github.com/cometbft/cometbft/types" 16 "github.com/cosmos/iavl" 17 "github.com/gbl08ma/stacktrace" 18 "github.com/klauspost/compress/zstd" 19 "github.com/samber/lo" 20 "tangled.org/gbl08ma.com/didplcbft/config" 21 "tangled.org/gbl08ma.com/didplcbft/dbmtoiavldb" 22 "tangled.org/gbl08ma.com/didplcbft/dbmtoiavldb/zstddict" 23 "tangled.org/gbl08ma.com/didplcbft/plc" 24 "tangled.org/gbl08ma.com/didplcbft/store" 25 "tangled.org/gbl08ma.com/didplcbft/transaction" 26 "tangled.org/gbl08ma.com/didplcbft/types" 27) 28 29type DIDPLCApplication struct { 30 runnerContext context.Context 31 logger cmtlog.Logger 32 plc plc.PLC 33 txFactory *transaction.Factory 34 indexDB transaction.ExtendedDB 35 tree *iavl.MutableTree 36 fullyClearApplicationData func() error 37 38 mempoolSubmitter types.MempoolSubmitter 39 40 validatorPubKey crypto.PubKey 41 validatorPrivKey crypto.PrivKey 42 43 ongoingRead transaction.Read 44 ongoingWrite transaction.Write 45 46 snapshotDirectory string 47 snapshotApplier *store.SnapshotApplier 48 stateSyncTempDir string 49 50 lastProcessedProposalHash []byte 51 lastProcessedProposalExecTxResults []*processResult 52 53 aoc *authoritativeOperationsFetcher 54 55 blockHeaderGetter store.BlockHeaderGetter 56 triggerBlockCreation func() 57 58 blockChallengeCoordinator *blockChallengeCoordinator 59 rangeChallengeCoordinator *rangeChallengeCoordinator 60 61 // for snapshot creation: 62 plcConfig *config.PLCConfig 63 snapshotManagerLatestHeightChan chan int64 64 lastSnapshotHeight int64 65} 66 67// store and plc must be able to share transaction objects 68func NewDIDPLCApplication(appContext context.Context, logger cmtlog.Logger, pv *privval.FilePV, treeDB dbm.DB, indexDB transaction.ExtendedDB, clearData func(), snapshotDirectory, stateSyncTempDir, didBloomFilterPath string, mempoolSubmitter types.MempoolSubmitter, blockHeaderGetter store.BlockHeaderGetter, plcConfig *config.PLCConfig) (*DIDPLCApplication, *transaction.Factory, plc.PLC, func(), error) { 69 mkTree := func() *iavl.MutableTree { 70 // Using SpeedDefault appears to cause the processing time for ExecuteOperation to double on average 71 // Using SpeedBetterCompression appears to cause the processing time to double again 72 // By using SpeedFastest we seem to give up on like 5% size reduction, it's not worth using the slower speeds 73 return iavl.NewMutableTree(dbmtoiavldb.AdaptWithCompression(treeDB, zstd.SpeedFastest, zstddict.PLCZstdDict), 500000, true, iavl.NewNopLogger(), iavl.AsyncPruningOption(false)) 74 } 75 76 tree := mkTree() 77 78 _, err := tree.Load() 79 if err != nil { 80 return nil, nil, nil, func() {}, stacktrace.Propagate(err, "error loading latest version of the tree from storage") 81 } 82 83 if snapshotDirectory != "" { 84 err = os.MkdirAll(snapshotDirectory, os.FileMode(0755)) 85 if err != nil { 86 return nil, nil, nil, func() {}, stacktrace.Propagate(err) 87 } 88 } 89 90 if stateSyncTempDir != "" { 91 err = os.MkdirAll(stateSyncTempDir, os.FileMode(0755)) 92 if err != nil { 93 return nil, nil, nil, func() {}, stacktrace.Propagate(err) 94 } 95 } 96 97 runnerContext, cancelRunnerContext := context.WithCancel(appContext) 98 99 d := &DIDPLCApplication{ 100 runnerContext: runnerContext, 101 logger: logger.With("module", "plcapp"), 102 tree: tree, 103 indexDB: indexDB, 104 mempoolSubmitter: mempoolSubmitter, 105 snapshotDirectory: snapshotDirectory, 106 stateSyncTempDir: stateSyncTempDir, 107 blockHeaderGetter: blockHeaderGetter, 108 triggerBlockCreation: func() {}, 109 plcConfig: plcConfig, 110 snapshotManagerLatestHeightChan: make(chan int64, 1), 111 } 112 113 if pv != nil { 114 d.validatorPubKey = pv.Key.PubKey 115 d.validatorPrivKey = pv.Key.PrivKey 116 } 117 118 d.txFactory, err = transaction.NewFactory(tree, indexDB, store.Consensus.CountOperations, store.NewDIDBloomFilterStore(d.logger, didBloomFilterPath)) 119 if err != nil { 120 return nil, nil, nil, cancelRunnerContext, stacktrace.Propagate(err) 121 } 122 123 d.fullyClearApplicationData = func() error { 124 // we assume this is called in a single-threaded context, which should be a safe assumption since we'll only call this during snapshot import 125 // and CometBFT only calls one ABCI method at a time 126 err := d.tree.Close() 127 if err != nil { 128 return stacktrace.Propagate(err) 129 } 130 131 clearData() 132 133 *d.tree = *mkTree() 134 135 d.txFactory, err = transaction.NewFactory(tree, indexDB, store.Consensus.CountOperations, store.NewDIDBloomFilterStore(d.logger, didBloomFilterPath)) 136 if err != nil { 137 return stacktrace.Propagate(err) 138 } 139 d.logger.Info("Recreating DID bloom filter") 140 err = d.txFactory.RecreateDIDBloomFilter() 141 return stacktrace.Propagate(err) 142 } 143 144 d.plc = plc.NewPLC() 145 146 d.blockChallengeCoordinator, err = newBlockChallengeCoordinator(d.runnerContext, d.logger, d.txFactory, d.blockHeaderGetter, d.validatorPubKey) 147 if err != nil { 148 return nil, nil, nil, cancelRunnerContext, stacktrace.Propagate(err) 149 } 150 151 var wg sync.WaitGroup 152 wg.Go(func() { 153 // periodically store bloom filter so we don't have to wait so long on the next startup 154 for { 155 select { 156 case <-runnerContext.Done(): 157 return 158 case <-time.After(5 * time.Minute): 159 } 160 161 st := time.Now() 162 err := d.txFactory.SaveDIDBloomFilter() 163 if err != nil { 164 d.logger.Error("failed to save bloom filter", "error", stacktrace.Propagate(err)) 165 } 166 d.logger.Debug("saved bloom filter", "took", time.Since(st)) 167 } 168 }) 169 170 if plcConfig != nil && plcConfig.SnapshotInterval > 0 && snapshotDirectory != "" { 171 h, err := store.Snapshot.MostRecentSnapshotHeight(snapshotDirectory) 172 if err != nil { 173 return nil, nil, nil, cancelRunnerContext, stacktrace.Propagate(err) 174 } 175 d.lastSnapshotHeight = int64(h) 176 177 wg.Go(func() { 178 d.runSnapshotManager(runnerContext, snapshotDirectory) 179 }) 180 } 181 182 return d, d.txFactory, d.plc, func() { 183 cancelRunnerContext() 184 if d.rangeChallengeCoordinator != nil { 185 d.rangeChallengeCoordinator.Wait() 186 } 187 wg.Wait() 188 lo.Must0(d.tree.Close()) 189 }, nil 190} 191 192func (d *DIDPLCApplication) logMethod(method string, keyvals ...any) func(...any) { 193 st := time.Now() 194 d.logger.Debug(method+" start", keyvals...) 195 return func(extra ...any) { 196 args := make([]any, 0, len(keyvals)+len(extra)+2) 197 args = append(args, keyvals...) 198 args = append(args, extra...) 199 args = append(args, "took", time.Since(st)) 200 d.logger.Debug(method+" done", args...) 201 } 202} 203 204func (d *DIDPLCApplication) FinishInitializing(triggerBlockCreation func(), nodeEventBus *cmttypes.EventBus, nodeConsensusReactor *consensus.Reactor) error { 205 d.triggerBlockCreation = triggerBlockCreation 206 207 // ensure we resume importing even if there is no pending AuthoritativeImport tx 208 readTx := d.txFactory.ReadWorking(time.Now()) 209 210 plc, err := store.Consensus.AuthoritativePLC(readTx) 211 if err != nil { 212 return stacktrace.Propagate(err) 213 } 214 215 _ = d.buildAuthoritativeOperationsFetcher(plc) 216 217 d.rangeChallengeCoordinator, err = newRangeChallengeCoordinator( 218 d.runnerContext, 219 d.logger, 220 d.validatorPubKey, 221 d.validatorPrivKey, 222 d.txFactory, 223 d.blockChallengeCoordinator, 224 d.blockHeaderGetter, 225 nodeEventBus, 226 d.mempoolSubmitter, 227 nodeConsensusReactor) 228 if err != nil { 229 return stacktrace.Propagate(err) 230 } 231 232 if d.validatorPubKey != nil { 233 err := d.rangeChallengeCoordinator.Start() 234 if err != nil { 235 return stacktrace.Propagate(err) 236 } 237 } 238 239 return nil 240} 241 242var _ abcitypes.Application = (*DIDPLCApplication)(nil) 243 244func (d *DIDPLCApplication) DiscardChanges() { 245 if d.ongoingWrite != nil { 246 d.ongoingWrite.Rollback() 247 } 248 d.ongoingWrite = nil 249 d.ongoingRead = nil 250}