package main import ( "context" "flag" "fmt" "log" "os" "os/signal" "path/filepath" "reflect" "syscall" "time" "unsafe" "github.com/cometbft/cometbft/mempool" "github.com/cometbft/cometbft/p2p" "github.com/cometbft/cometbft/privval" "github.com/cometbft/cometbft/proxy" cmtstore "github.com/cometbft/cometbft/store" cmttypes "github.com/cometbft/cometbft/types" "github.com/gbl08ma/stacktrace" "github.com/samber/lo" "tangled.org/gbl08ma.com/didplcbft/abciapp" "tangled.org/gbl08ma.com/didplcbft/badgertodbm" "tangled.org/gbl08ma.com/didplcbft/config" "tangled.org/gbl08ma.com/didplcbft/httpapi" "tangled.org/gbl08ma.com/didplcbft/store" "tangled.org/gbl08ma.com/didplcbft/transaction" cmtconfig "github.com/cometbft/cometbft/config" cmtflags "github.com/cometbft/cometbft/libs/cli/flags" cmtlog "github.com/cometbft/cometbft/libs/log" nm "github.com/cometbft/cometbft/node" "github.com/spf13/viper" ) var homeDir string func init() { flag.StringVar(&homeDir, "data-dir", "", "Path to the CometBFT config directory (if empty, uses ./didplcbft-data)") } func main() { flag.Parse() if homeDir == "" { homeDir = filepath.Join(lo.Must(os.Getwd()), "didplcbft-data") } cfg := config.DefaultConfig() cfg.SetRoot(homeDir) viper.SetConfigFile(fmt.Sprintf("%s/%s", homeDir, "config/config.toml")) if err := viper.ReadInConfig(); err != nil { log.Fatalf("Reading config: %v", err) } if err := viper.Unmarshal(cfg); err != nil { log.Fatalf("Decoding config: %v", err) } if err := cfg.ValidateBasic(); err != nil { log.Fatalf("Invalid configuration data: %v", err) } logger := cmtlog.NewTMLogger(cmtlog.NewSyncWriter(os.Stdout)) logger, err := cmtflags.ParseLogLevel(cfg.LogLevel, logger, cmtconfig.DefaultLogLevel) if err != nil { log.Fatalf("failed to parse log level: %v", err) } underlyingTreeDB, treeDB, err := badgertodbm.NewBadgerDB(logger.With("module", "treebadger"), "apptree", cfg.Config.DBDir()) if err != nil { log.Fatalf("failed to create application tree database: %v", err) } underlyingIndexDB, indexDB, err := badgertodbm.NewBadgerDB(logger.With("module", "indexbadger"), "appindex", cfg.Config.DBDir()) if err != nil { log.Fatalf("failed to create application index database: %v", err) } defer func() { if err := treeDB.Close(); err != nil { log.Printf("Closing application tree database: %v", err) } if err := indexDB.Close(); err != nil { log.Printf("Closing application index database: %v", err) } }() didBloomFilterPath := filepath.Join(homeDir, "data", "did.bloom") recreateDatabases := func() { err := underlyingTreeDB.DropAll() if err != nil { log.Fatalf("failed to drop application tree database: %v", err) } err = underlyingIndexDB.DropAll() if err != nil { log.Fatalf("failed to drop application index database: %v", err) } } mempoolSubmitter := &txSubmitter{} pv := privval.LoadFilePV( cfg.PrivValidatorKeyFile(), cfg.PrivValidatorStateFile(), ) appContext, cancelAppContext := context.WithCancel(context.Background()) defer cancelAppContext() // this must be done before we call NewNode, otherwise it will get a hold of the leveldb block store nodeDBProvider := cmtconfig.DefaultDBProvider recentBlockHeaders, err := readRecentBlockHeaders(nodeDBProvider, cfg.Config, max(abciapp.CommitToChallengeMaxAgeInBlocks, abciapp.CompleteChallengeMaxAgeInBlocks)+5) // 5 blocks safety margin if err != nil { log.Fatalf("failed to read recent block headers: %v", err) } var blockStore *cmtstore.BlockStore var txFactory *transaction.Factory blockHeaderGetter := func(height int64) (cmttypes.Header, error) { if blockStore != nil { blockMeta := blockStore.LoadBlockMeta(height) if blockMeta != nil { return blockMeta.Header, nil } } if header, ok := recentBlockHeaders[height]; ok { return header, nil } // if the headers indeed don't exist in the block store, hopefully they will have come in a state sync snapshot if txFactory != nil { readTx := txFactory.ReadWorking(time.Now()) blockHeader, err := store.Consensus.FallbackBlockHeader(readTx, uint64(height)) if err == nil { return blockHeader, nil } } return cmttypes.Header{}, stacktrace.NewError("height not found") } app, txf, plc, cleanup, err := abciapp.NewDIDPLCApplication( appContext, logger, pv, treeDB, indexDB, recreateDatabases, filepath.Join(homeDir, "snapshots"), cfg.StateSync.TempDir, didBloomFilterPath, mempoolSubmitter, blockHeaderGetter, cfg.PLC) if err != nil { log.Fatalf("failed to create DIDPLC application: %v", err) } defer cleanup() txFactory = txf nodeKey, err := p2p.LoadNodeKey(cfg.NodeKeyFile()) if err != nil { log.Fatalf("failed to load node's key: %v", err) } node, err := nm.NewNode( cfg.Config, pv, nodeKey, proxy.NewLocalClientCreator(app), nm.DefaultGenesisDocProviderFunc(cfg.Config), nodeDBProvider, nm.DefaultMetricsProvider(cfg.Config.Instrumentation), logger, ) if err != nil { log.Fatalf("Creating node: %v", err) } blockStore = node.BlockStore() // workaround for CometBFT bug where the temp_dir config entry is not taken into account err = fixStateSyncReactorTempDir(node, cfg.StateSync.TempDir) if err != nil { log.Fatalf("Creating node: %v", err) } txsAvailableChan := getMempoolTxsAvailableChan(node) mempoolSubmitter.node = node err = app.FinishInitializing(blockCreationTrigger(txsAvailableChan), node.EventBus(), node.ConsensusReactor()) if err != nil { log.Fatalf("Finishing ABCI app initialization: %v", err) } err = node.Start() if err != nil { log.Fatalf("Starting node: %v", err) } defer func() { node.Stop() node.Wait() }() if cfg.PLC.ListenAddress != "" { plcAPIServer, err := httpapi.NewServer(logger.With("module", "plcapi"), txFactory, plc, mempoolSubmitter, node.EventBus(), cfg.PLC) if err != nil { log.Fatalf("Creating PLC API server: %v", err) } err = plcAPIServer.Start() if err != nil { log.Fatalf("Starting PLC API server: %v", err) } defer func() { plcAPIServer.Stop() plcAPIServer.Wait() }() } defer cancelAppContext() c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt, syscall.SIGTERM) <-c } func getMempoolTxsAvailableChan(node *nm.Node) chan struct{} { clistMempool, ok := node.Mempool().(*mempool.CListMempool) if !ok { return nil } val := reflect.ValueOf(clistMempool) val = reflect.Indirect(val) field := val.FieldByName("txsAvailable") if field.IsZero() || !field.CanAddr() { return nil } chanIface := reflect.NewAt(field.Type(), unsafe.Pointer(field.UnsafeAddr())).Elem().Interface() c, ok := chanIface.(chan struct{}) if !ok { return nil } return c } func blockCreationTrigger(txsAvailable chan struct{}) func() { return func() { if txsAvailable == nil { return } select { case txsAvailable <- struct{}{}: default: } } } func fixStateSyncReactorTempDir(node *nm.Node, tempDir string) error { val := reflect.ValueOf(node) val = reflect.Indirect(val) field := val.FieldByName("stateSyncReactor") if !field.IsValid() { return stacktrace.NewError("stateSyncReactor field not valid") } if field.Kind() != reflect.Ptr { return stacktrace.NewError("stateSyncReactor field is not a pointer, got %v", field.Kind()) } if field.IsNil() { return stacktrace.NewError("stateSyncReactor field is nil") } reactorVal := reflect.Indirect(field) field = reactorVal.FieldByName("tempDir") if !field.IsValid() { return stacktrace.NewError("tempDir field not valid") } field = reflect.NewAt(field.Type(), unsafe.Pointer(field.UnsafeAddr())).Elem() field.SetString(tempDir) return nil } func readRecentBlockHeaders(dbProvider cmtconfig.DBProvider, config *cmtconfig.Config, numHeaders int) (map[int64]cmttypes.Header, error) { blockStoreDB, err := dbProvider(&cmtconfig.DBContext{ID: "blockstore", Config: config}) if err != nil { return nil, stacktrace.Propagate(err) } blockStore := cmtstore.NewBlockStore(blockStoreDB) defer blockStore.Close() result := make(map[int64]cmttypes.Header) bottom := max(0, blockStore.Height()-int64(numHeaders)) for i := blockStore.Height(); i > bottom; i-- { blockMeta := blockStore.LoadBlockMeta(i) result[i] = blockMeta.Header } return result, nil }