A very experimental PLC implementation which uses BFT consensus for decentralization
1package main
2
3import (
4 "context"
5 "flag"
6 "fmt"
7 "log"
8 "os"
9 "os/signal"
10 "path/filepath"
11 "reflect"
12 "syscall"
13 "time"
14 "unsafe"
15
16 "github.com/cometbft/cometbft/mempool"
17 "github.com/cometbft/cometbft/p2p"
18 "github.com/cometbft/cometbft/privval"
19 "github.com/cometbft/cometbft/proxy"
20 cmtstore "github.com/cometbft/cometbft/store"
21 cmttypes "github.com/cometbft/cometbft/types"
22 "github.com/gbl08ma/stacktrace"
23 "github.com/samber/lo"
24 "tangled.org/gbl08ma.com/didplcbft/abciapp"
25 "tangled.org/gbl08ma.com/didplcbft/badgertodbm"
26 "tangled.org/gbl08ma.com/didplcbft/config"
27 "tangled.org/gbl08ma.com/didplcbft/httpapi"
28 "tangled.org/gbl08ma.com/didplcbft/store"
29 "tangled.org/gbl08ma.com/didplcbft/transaction"
30
31 cmtconfig "github.com/cometbft/cometbft/config"
32 cmtflags "github.com/cometbft/cometbft/libs/cli/flags"
33 cmtlog "github.com/cometbft/cometbft/libs/log"
34 nm "github.com/cometbft/cometbft/node"
35 "github.com/spf13/viper"
36)
37
38var homeDir string
39
40func init() {
41 flag.StringVar(&homeDir, "data-dir", "", "Path to the CometBFT config directory (if empty, uses ./didplcbft-data)")
42}
43
44func main() {
45 flag.Parse()
46 if homeDir == "" {
47 homeDir = filepath.Join(lo.Must(os.Getwd()), "didplcbft-data")
48 }
49
50 cfg := config.DefaultConfig()
51 cfg.SetRoot(homeDir)
52 viper.SetConfigFile(fmt.Sprintf("%s/%s", homeDir, "config/config.toml"))
53
54 if err := viper.ReadInConfig(); err != nil {
55 log.Fatalf("Reading config: %v", err)
56 }
57 if err := viper.Unmarshal(cfg); err != nil {
58 log.Fatalf("Decoding config: %v", err)
59 }
60 if err := cfg.ValidateBasic(); err != nil {
61 log.Fatalf("Invalid configuration data: %v", err)
62 }
63
64 logger := cmtlog.NewTMLogger(cmtlog.NewSyncWriter(os.Stdout))
65 logger, err := cmtflags.ParseLogLevel(cfg.LogLevel, logger, cmtconfig.DefaultLogLevel)
66 if err != nil {
67 log.Fatalf("failed to parse log level: %v", err)
68 }
69
70 underlyingTreeDB, treeDB, err := badgertodbm.NewBadgerDB(logger.With("module", "treebadger"), "apptree", cfg.Config.DBDir())
71 if err != nil {
72 log.Fatalf("failed to create application tree database: %v", err)
73 }
74
75 underlyingIndexDB, indexDB, err := badgertodbm.NewBadgerDB(logger.With("module", "indexbadger"), "appindex", cfg.Config.DBDir())
76 if err != nil {
77 log.Fatalf("failed to create application index database: %v", err)
78 }
79
80 defer func() {
81 if err := treeDB.Close(); err != nil {
82 log.Printf("Closing application tree database: %v", err)
83 }
84 if err := indexDB.Close(); err != nil {
85 log.Printf("Closing application index database: %v", err)
86 }
87 }()
88
89 didBloomFilterPath := filepath.Join(homeDir, "data", "did.bloom")
90
91 recreateDatabases := func() {
92 err := underlyingTreeDB.DropAll()
93 if err != nil {
94 log.Fatalf("failed to drop application tree database: %v", err)
95 }
96
97 err = underlyingIndexDB.DropAll()
98 if err != nil {
99 log.Fatalf("failed to drop application index database: %v", err)
100 }
101 }
102
103 mempoolSubmitter := &txSubmitter{}
104
105 pv := privval.LoadFilePV(
106 cfg.PrivValidatorKeyFile(),
107 cfg.PrivValidatorStateFile(),
108 )
109
110 appContext, cancelAppContext := context.WithCancel(context.Background())
111 defer cancelAppContext()
112
113 // this must be done before we call NewNode, otherwise it will get a hold of the leveldb block store
114 nodeDBProvider := cmtconfig.DefaultDBProvider
115
116 recentBlockHeaders, err := readRecentBlockHeaders(nodeDBProvider, cfg.Config, max(abciapp.CommitToChallengeMaxAgeInBlocks, abciapp.CompleteChallengeMaxAgeInBlocks)+5) // 5 blocks safety margin
117 if err != nil {
118 log.Fatalf("failed to read recent block headers: %v", err)
119 }
120
121 var blockStore *cmtstore.BlockStore
122 var txFactory *transaction.Factory
123 blockHeaderGetter := func(height int64) (cmttypes.Header, error) {
124 if blockStore != nil {
125 blockMeta := blockStore.LoadBlockMeta(height)
126 if blockMeta != nil {
127 return blockMeta.Header, nil
128 }
129 }
130
131 if header, ok := recentBlockHeaders[height]; ok {
132 return header, nil
133 }
134
135 // if the headers indeed don't exist in the block store, hopefully they will have come in a state sync snapshot
136 if txFactory != nil {
137 readTx := txFactory.ReadWorking(time.Now())
138 blockHeader, err := store.Consensus.FallbackBlockHeader(readTx, uint64(height))
139 if err == nil {
140 return blockHeader, nil
141 }
142 }
143
144 return cmttypes.Header{}, stacktrace.NewError("height not found")
145 }
146
147 app, txf, plc, cleanup, err := abciapp.NewDIDPLCApplication(
148 appContext,
149 logger,
150 pv,
151 treeDB,
152 indexDB,
153 recreateDatabases,
154 filepath.Join(homeDir, "snapshots"),
155 cfg.StateSync.TempDir,
156 didBloomFilterPath,
157 mempoolSubmitter,
158 blockHeaderGetter,
159 cfg.PLC)
160 if err != nil {
161 log.Fatalf("failed to create DIDPLC application: %v", err)
162 }
163 defer cleanup()
164
165 txFactory = txf
166
167 nodeKey, err := p2p.LoadNodeKey(cfg.NodeKeyFile())
168 if err != nil {
169 log.Fatalf("failed to load node's key: %v", err)
170 }
171
172 node, err := nm.NewNode(
173 cfg.Config,
174 pv,
175 nodeKey,
176 proxy.NewLocalClientCreator(app),
177 nm.DefaultGenesisDocProviderFunc(cfg.Config),
178 nodeDBProvider,
179 nm.DefaultMetricsProvider(cfg.Config.Instrumentation),
180 logger,
181 )
182
183 if err != nil {
184 log.Fatalf("Creating node: %v", err)
185 }
186
187 blockStore = node.BlockStore()
188
189 // workaround for CometBFT bug where the temp_dir config entry is not taken into account
190 err = fixStateSyncReactorTempDir(node, cfg.StateSync.TempDir)
191 if err != nil {
192 log.Fatalf("Creating node: %v", err)
193 }
194
195 txsAvailableChan := getMempoolTxsAvailableChan(node)
196
197 mempoolSubmitter.node = node
198
199 err = app.FinishInitializing(blockCreationTrigger(txsAvailableChan), node.EventBus(), node.ConsensusReactor())
200 if err != nil {
201 log.Fatalf("Finishing ABCI app initialization: %v", err)
202 }
203
204 err = node.Start()
205 if err != nil {
206 log.Fatalf("Starting node: %v", err)
207 }
208 defer func() {
209 node.Stop()
210 node.Wait()
211 }()
212
213 if cfg.PLC.ListenAddress != "" {
214 plcAPIServer, err := httpapi.NewServer(logger.With("module", "plcapi"), txFactory, plc, mempoolSubmitter, node.EventBus(), cfg.PLC)
215 if err != nil {
216 log.Fatalf("Creating PLC API server: %v", err)
217 }
218
219 err = plcAPIServer.Start()
220 if err != nil {
221 log.Fatalf("Starting PLC API server: %v", err)
222 }
223 defer func() {
224 plcAPIServer.Stop()
225 plcAPIServer.Wait()
226 }()
227 }
228
229 defer cancelAppContext()
230
231 c := make(chan os.Signal, 1)
232 signal.Notify(c, os.Interrupt, syscall.SIGTERM)
233 <-c
234}
235
236func getMempoolTxsAvailableChan(node *nm.Node) chan struct{} {
237 clistMempool, ok := node.Mempool().(*mempool.CListMempool)
238 if !ok {
239 return nil
240 }
241
242 val := reflect.ValueOf(clistMempool)
243 val = reflect.Indirect(val)
244 field := val.FieldByName("txsAvailable")
245 if field.IsZero() || !field.CanAddr() {
246 return nil
247 }
248
249 chanIface := reflect.NewAt(field.Type(), unsafe.Pointer(field.UnsafeAddr())).Elem().Interface()
250 c, ok := chanIface.(chan struct{})
251 if !ok {
252 return nil
253 }
254 return c
255}
256
257func blockCreationTrigger(txsAvailable chan struct{}) func() {
258 return func() {
259 if txsAvailable == nil {
260 return
261 }
262 select {
263 case txsAvailable <- struct{}{}:
264 default:
265 }
266 }
267}
268
269func fixStateSyncReactorTempDir(node *nm.Node, tempDir string) error {
270 val := reflect.ValueOf(node)
271 val = reflect.Indirect(val)
272 field := val.FieldByName("stateSyncReactor")
273 if !field.IsValid() {
274 return stacktrace.NewError("stateSyncReactor field not valid")
275 }
276 if field.Kind() != reflect.Ptr {
277 return stacktrace.NewError("stateSyncReactor field is not a pointer, got %v", field.Kind())
278 }
279 if field.IsNil() {
280 return stacktrace.NewError("stateSyncReactor field is nil")
281 }
282
283 reactorVal := reflect.Indirect(field)
284 field = reactorVal.FieldByName("tempDir")
285 if !field.IsValid() {
286 return stacktrace.NewError("tempDir field not valid")
287 }
288
289 field = reflect.NewAt(field.Type(), unsafe.Pointer(field.UnsafeAddr())).Elem()
290 field.SetString(tempDir)
291
292 return nil
293}
294
295func readRecentBlockHeaders(dbProvider cmtconfig.DBProvider, config *cmtconfig.Config, numHeaders int) (map[int64]cmttypes.Header, error) {
296 blockStoreDB, err := dbProvider(&cmtconfig.DBContext{ID: "blockstore", Config: config})
297 if err != nil {
298 return nil, stacktrace.Propagate(err)
299 }
300 blockStore := cmtstore.NewBlockStore(blockStoreDB)
301 defer blockStore.Close()
302
303 result := make(map[int64]cmttypes.Header)
304 bottom := max(0, blockStore.Height()-int64(numHeaders))
305 for i := blockStore.Height(); i > bottom; i-- {
306 blockMeta := blockStore.LoadBlockMeta(i)
307 result[i] = blockMeta.Header
308 }
309 return result, nil
310}