A very experimental PLC implementation which uses BFT consensus for decentralization
at main 310 lines 8.3 kB view raw
1package main 2 3import ( 4 "context" 5 "flag" 6 "fmt" 7 "log" 8 "os" 9 "os/signal" 10 "path/filepath" 11 "reflect" 12 "syscall" 13 "time" 14 "unsafe" 15 16 "github.com/cometbft/cometbft/mempool" 17 "github.com/cometbft/cometbft/p2p" 18 "github.com/cometbft/cometbft/privval" 19 "github.com/cometbft/cometbft/proxy" 20 cmtstore "github.com/cometbft/cometbft/store" 21 cmttypes "github.com/cometbft/cometbft/types" 22 "github.com/gbl08ma/stacktrace" 23 "github.com/samber/lo" 24 "tangled.org/gbl08ma.com/didplcbft/abciapp" 25 "tangled.org/gbl08ma.com/didplcbft/badgertodbm" 26 "tangled.org/gbl08ma.com/didplcbft/config" 27 "tangled.org/gbl08ma.com/didplcbft/httpapi" 28 "tangled.org/gbl08ma.com/didplcbft/store" 29 "tangled.org/gbl08ma.com/didplcbft/transaction" 30 31 cmtconfig "github.com/cometbft/cometbft/config" 32 cmtflags "github.com/cometbft/cometbft/libs/cli/flags" 33 cmtlog "github.com/cometbft/cometbft/libs/log" 34 nm "github.com/cometbft/cometbft/node" 35 "github.com/spf13/viper" 36) 37 38var homeDir string 39 40func init() { 41 flag.StringVar(&homeDir, "data-dir", "", "Path to the CometBFT config directory (if empty, uses ./didplcbft-data)") 42} 43 44func main() { 45 flag.Parse() 46 if homeDir == "" { 47 homeDir = filepath.Join(lo.Must(os.Getwd()), "didplcbft-data") 48 } 49 50 cfg := config.DefaultConfig() 51 cfg.SetRoot(homeDir) 52 viper.SetConfigFile(fmt.Sprintf("%s/%s", homeDir, "config/config.toml")) 53 54 if err := viper.ReadInConfig(); err != nil { 55 log.Fatalf("Reading config: %v", err) 56 } 57 if err := viper.Unmarshal(cfg); err != nil { 58 log.Fatalf("Decoding config: %v", err) 59 } 60 if err := cfg.ValidateBasic(); err != nil { 61 log.Fatalf("Invalid configuration data: %v", err) 62 } 63 64 logger := cmtlog.NewTMLogger(cmtlog.NewSyncWriter(os.Stdout)) 65 logger, err := cmtflags.ParseLogLevel(cfg.LogLevel, logger, cmtconfig.DefaultLogLevel) 66 if err != nil { 67 log.Fatalf("failed to parse log level: %v", err) 68 } 69 70 underlyingTreeDB, treeDB, err := badgertodbm.NewBadgerDB(logger.With("module", "treebadger"), "apptree", cfg.Config.DBDir()) 71 if err != nil { 72 log.Fatalf("failed to create application tree database: %v", err) 73 } 74 75 underlyingIndexDB, indexDB, err := badgertodbm.NewBadgerDB(logger.With("module", "indexbadger"), "appindex", cfg.Config.DBDir()) 76 if err != nil { 77 log.Fatalf("failed to create application index database: %v", err) 78 } 79 80 defer func() { 81 if err := treeDB.Close(); err != nil { 82 log.Printf("Closing application tree database: %v", err) 83 } 84 if err := indexDB.Close(); err != nil { 85 log.Printf("Closing application index database: %v", err) 86 } 87 }() 88 89 didBloomFilterPath := filepath.Join(homeDir, "data", "did.bloom") 90 91 recreateDatabases := func() { 92 err := underlyingTreeDB.DropAll() 93 if err != nil { 94 log.Fatalf("failed to drop application tree database: %v", err) 95 } 96 97 err = underlyingIndexDB.DropAll() 98 if err != nil { 99 log.Fatalf("failed to drop application index database: %v", err) 100 } 101 } 102 103 mempoolSubmitter := &txSubmitter{} 104 105 pv := privval.LoadFilePV( 106 cfg.PrivValidatorKeyFile(), 107 cfg.PrivValidatorStateFile(), 108 ) 109 110 appContext, cancelAppContext := context.WithCancel(context.Background()) 111 defer cancelAppContext() 112 113 // this must be done before we call NewNode, otherwise it will get a hold of the leveldb block store 114 nodeDBProvider := cmtconfig.DefaultDBProvider 115 116 recentBlockHeaders, err := readRecentBlockHeaders(nodeDBProvider, cfg.Config, max(abciapp.CommitToChallengeMaxAgeInBlocks, abciapp.CompleteChallengeMaxAgeInBlocks)+5) // 5 blocks safety margin 117 if err != nil { 118 log.Fatalf("failed to read recent block headers: %v", err) 119 } 120 121 var blockStore *cmtstore.BlockStore 122 var txFactory *transaction.Factory 123 blockHeaderGetter := func(height int64) (cmttypes.Header, error) { 124 if blockStore != nil { 125 blockMeta := blockStore.LoadBlockMeta(height) 126 if blockMeta != nil { 127 return blockMeta.Header, nil 128 } 129 } 130 131 if header, ok := recentBlockHeaders[height]; ok { 132 return header, nil 133 } 134 135 // if the headers indeed don't exist in the block store, hopefully they will have come in a state sync snapshot 136 if txFactory != nil { 137 readTx := txFactory.ReadWorking(time.Now()) 138 blockHeader, err := store.Consensus.FallbackBlockHeader(readTx, uint64(height)) 139 if err == nil { 140 return blockHeader, nil 141 } 142 } 143 144 return cmttypes.Header{}, stacktrace.NewError("height not found") 145 } 146 147 app, txf, plc, cleanup, err := abciapp.NewDIDPLCApplication( 148 appContext, 149 logger, 150 pv, 151 treeDB, 152 indexDB, 153 recreateDatabases, 154 filepath.Join(homeDir, "snapshots"), 155 cfg.StateSync.TempDir, 156 didBloomFilterPath, 157 mempoolSubmitter, 158 blockHeaderGetter, 159 cfg.PLC) 160 if err != nil { 161 log.Fatalf("failed to create DIDPLC application: %v", err) 162 } 163 defer cleanup() 164 165 txFactory = txf 166 167 nodeKey, err := p2p.LoadNodeKey(cfg.NodeKeyFile()) 168 if err != nil { 169 log.Fatalf("failed to load node's key: %v", err) 170 } 171 172 node, err := nm.NewNode( 173 cfg.Config, 174 pv, 175 nodeKey, 176 proxy.NewLocalClientCreator(app), 177 nm.DefaultGenesisDocProviderFunc(cfg.Config), 178 nodeDBProvider, 179 nm.DefaultMetricsProvider(cfg.Config.Instrumentation), 180 logger, 181 ) 182 183 if err != nil { 184 log.Fatalf("Creating node: %v", err) 185 } 186 187 blockStore = node.BlockStore() 188 189 // workaround for CometBFT bug where the temp_dir config entry is not taken into account 190 err = fixStateSyncReactorTempDir(node, cfg.StateSync.TempDir) 191 if err != nil { 192 log.Fatalf("Creating node: %v", err) 193 } 194 195 txsAvailableChan := getMempoolTxsAvailableChan(node) 196 197 mempoolSubmitter.node = node 198 199 err = app.FinishInitializing(blockCreationTrigger(txsAvailableChan), node.EventBus(), node.ConsensusReactor()) 200 if err != nil { 201 log.Fatalf("Finishing ABCI app initialization: %v", err) 202 } 203 204 err = node.Start() 205 if err != nil { 206 log.Fatalf("Starting node: %v", err) 207 } 208 defer func() { 209 node.Stop() 210 node.Wait() 211 }() 212 213 if cfg.PLC.ListenAddress != "" { 214 plcAPIServer, err := httpapi.NewServer(logger.With("module", "plcapi"), txFactory, plc, mempoolSubmitter, node.EventBus(), cfg.PLC) 215 if err != nil { 216 log.Fatalf("Creating PLC API server: %v", err) 217 } 218 219 err = plcAPIServer.Start() 220 if err != nil { 221 log.Fatalf("Starting PLC API server: %v", err) 222 } 223 defer func() { 224 plcAPIServer.Stop() 225 plcAPIServer.Wait() 226 }() 227 } 228 229 defer cancelAppContext() 230 231 c := make(chan os.Signal, 1) 232 signal.Notify(c, os.Interrupt, syscall.SIGTERM) 233 <-c 234} 235 236func getMempoolTxsAvailableChan(node *nm.Node) chan struct{} { 237 clistMempool, ok := node.Mempool().(*mempool.CListMempool) 238 if !ok { 239 return nil 240 } 241 242 val := reflect.ValueOf(clistMempool) 243 val = reflect.Indirect(val) 244 field := val.FieldByName("txsAvailable") 245 if field.IsZero() || !field.CanAddr() { 246 return nil 247 } 248 249 chanIface := reflect.NewAt(field.Type(), unsafe.Pointer(field.UnsafeAddr())).Elem().Interface() 250 c, ok := chanIface.(chan struct{}) 251 if !ok { 252 return nil 253 } 254 return c 255} 256 257func blockCreationTrigger(txsAvailable chan struct{}) func() { 258 return func() { 259 if txsAvailable == nil { 260 return 261 } 262 select { 263 case txsAvailable <- struct{}{}: 264 default: 265 } 266 } 267} 268 269func fixStateSyncReactorTempDir(node *nm.Node, tempDir string) error { 270 val := reflect.ValueOf(node) 271 val = reflect.Indirect(val) 272 field := val.FieldByName("stateSyncReactor") 273 if !field.IsValid() { 274 return stacktrace.NewError("stateSyncReactor field not valid") 275 } 276 if field.Kind() != reflect.Ptr { 277 return stacktrace.NewError("stateSyncReactor field is not a pointer, got %v", field.Kind()) 278 } 279 if field.IsNil() { 280 return stacktrace.NewError("stateSyncReactor field is nil") 281 } 282 283 reactorVal := reflect.Indirect(field) 284 field = reactorVal.FieldByName("tempDir") 285 if !field.IsValid() { 286 return stacktrace.NewError("tempDir field not valid") 287 } 288 289 field = reflect.NewAt(field.Type(), unsafe.Pointer(field.UnsafeAddr())).Elem() 290 field.SetString(tempDir) 291 292 return nil 293} 294 295func readRecentBlockHeaders(dbProvider cmtconfig.DBProvider, config *cmtconfig.Config, numHeaders int) (map[int64]cmttypes.Header, error) { 296 blockStoreDB, err := dbProvider(&cmtconfig.DBContext{ID: "blockstore", Config: config}) 297 if err != nil { 298 return nil, stacktrace.Propagate(err) 299 } 300 blockStore := cmtstore.NewBlockStore(blockStoreDB) 301 defer blockStore.Close() 302 303 result := make(map[int64]cmttypes.Header) 304 bottom := max(0, blockStore.Height()-int64(numHeaders)) 305 for i := blockStore.Height(); i > bottom; i-- { 306 blockMeta := blockStore.LoadBlockMeta(i) 307 result[i] = blockMeta.Header 308 } 309 return result, nil 310}