A very experimental PLC implementation which uses BFT consensus for decentralization

Creative reduction of disk space use during snapshot application

- Delete files as chunks are applied, behind cometbft's back
- Use unsafe reflection to "fix" cometbft bug where temp_dir setting was ignored

Ideally both of these would result in PRs to cometbft (one for a feature to let the ABCI application specify snapshots to delete, another to fix the temp_dir bug) but ain't nobody got time for that

gbl08ma.com 0cf9deed 5cdbe9fc

verified
+69 -2
+10 -1
abciapp/app.go
··· 43 43 44 44 snapshotDirectory string 45 45 snapshotApplier *snapshotApplier 46 + stateSyncTempDir string 46 47 47 48 lastProcessedProposalHash []byte 48 49 lastProcessedProposalExecTxResults []*processResult ··· 56 57 } 57 58 58 59 // store and plc must be able to share transaction objects 59 - func NewDIDPLCApplication(appContext context.Context, logger cmtlog.Logger, pv *privval.FilePV, treeDB dbm.DB, indexDB transaction.ExtendedDB, clearData func(), snapshotDirectory, didBloomFilterPath string, mempoolSubmitter types.MempoolSubmitter) (*DIDPLCApplication, *transaction.Factory, plc.PLC, func(), error) { 60 + func NewDIDPLCApplication(appContext context.Context, logger cmtlog.Logger, pv *privval.FilePV, treeDB dbm.DB, indexDB transaction.ExtendedDB, clearData func(), snapshotDirectory, stateSyncTempDir, didBloomFilterPath string, mempoolSubmitter types.MempoolSubmitter) (*DIDPLCApplication, *transaction.Factory, plc.PLC, func(), error) { 60 61 mkTree := func() *iavl.MutableTree { 61 62 // Using SpeedDefault appears to cause the processing time for ExecuteOperation to double on average 62 63 // Using SpeedBetterCompression appears to cause the processing time to double again ··· 78 79 } 79 80 } 80 81 82 + if stateSyncTempDir != "" { 83 + err = os.MkdirAll(stateSyncTempDir, os.FileMode(0755)) 84 + if err != nil { 85 + return nil, nil, nil, func() {}, stacktrace.Propagate(err) 86 + } 87 + } 88 + 81 89 runnerContext, cancelRunnerContext := context.WithCancel(appContext) 82 90 83 91 d := &DIDPLCApplication{ ··· 87 95 indexDB: indexDB, 88 96 mempoolSubmitter: mempoolSubmitter, 89 97 snapshotDirectory: snapshotDirectory, 98 + stateSyncTempDir: stateSyncTempDir, 90 99 } 91 100 92 101 if pv != nil {
+1 -1
abciapp/app_test.go
··· 24 24 25 25 func TestCheckTx(t *testing.T) { 26 26 logger := cmtlog.NewNopLogger() 27 - app, _, _, cleanup, err := abciapp.NewDIDPLCApplication(t.Context(), logger, nil, dbm.NewMemDB(), memDBWrapper{dbm.NewMemDB()}, nil, "", "", nil) 27 + app, _, _, cleanup, err := abciapp.NewDIDPLCApplication(t.Context(), logger, nil, dbm.NewMemDB(), memDBWrapper{dbm.NewMemDB()}, nil, "", "", "", nil) 28 28 require.NoError(t, err) 29 29 t.Cleanup(cleanup) 30 30
+24
abciapp/snapshots.go
··· 191 191 d.snapshotApplier = nil 192 192 } 193 193 194 + // this is a giant hack but I found no other way to not make the machine run out of space as it applies a snapshot 195 + // because the cometbft statesync does not delete the snapshot chunks that were already applied as it goes 196 + // so we need to delete the files ourselves 197 + 198 + tempDir := d.stateSyncTempDir 199 + if tempDir == "" { 200 + tempDir = os.TempDir() 201 + } 202 + 203 + matches, err := filepath.Glob(filepath.Join(tempDir, "tm-statesync*")) 204 + if err != nil { 205 + return nil, stacktrace.Propagate(err) 206 + } 207 + 208 + if len(matches) != 1 { 209 + return nil, stacktrace.NewError("expected exactly one statesync temp directory, found %d", len(matches)) 210 + } 211 + 212 + // remove file for the index that was just applied 213 + err = os.Remove(filepath.Join(matches[0], strconv.Itoa(int(req.Index)))) 214 + if err != nil { 215 + return nil, stacktrace.Propagate(err) 216 + } 217 + 194 218 return &abcitypes.ResponseApplySnapshotChunk{ 195 219 Result: abcitypes.ResponseApplySnapshotChunk_ACCEPT, 196 220 }, nil
+34
main.go
··· 17 17 "github.com/cometbft/cometbft/p2p" 18 18 "github.com/cometbft/cometbft/privval" 19 19 "github.com/cometbft/cometbft/proxy" 20 + "github.com/gbl08ma/stacktrace" 20 21 "github.com/samber/lo" 21 22 "tangled.org/gbl08ma.com/didplcbft/abciapp" 22 23 "tangled.org/gbl08ma.com/didplcbft/badgertodbm" ··· 114 115 indexDB, 115 116 recreateDatabases, 116 117 filepath.Join(homeDir, "snapshots"), 118 + config.StateSync.TempDir, 117 119 didBloomFilterPath, 118 120 mempoolSubmitter) 119 121 if err != nil { ··· 141 143 log.Fatalf("Creating node: %v", err) 142 144 } 143 145 146 + // workaround for CometBFT bug where the temp_dir config entry is not taken into account 147 + err = fixStateSyncReactorTempDir(node, config.StateSync.TempDir) 148 + if err != nil { 149 + log.Fatalf("Creating node: %v", err) 150 + } 151 + 144 152 txsAvailableChan := getMempoolTxsAvailableChan(node) 145 153 146 154 mempoolSubmitter.node = node ··· 235 243 } 236 244 } 237 245 } 246 + 247 + func fixStateSyncReactorTempDir(node *nm.Node, tempDir string) error { 248 + val := reflect.ValueOf(node) 249 + val = reflect.Indirect(val) 250 + field := val.FieldByName("stateSyncReactor") 251 + if !field.IsValid() { 252 + return stacktrace.NewError("stateSyncReactor field not valid") 253 + } 254 + if field.Kind() != reflect.Ptr { 255 + return stacktrace.NewError("stateSyncReactor field is not a pointer, got %v", field.Kind()) 256 + } 257 + if field.IsNil() { 258 + return stacktrace.NewError("stateSyncReactor field is nil") 259 + } 260 + 261 + reactorVal := reflect.Indirect(field) 262 + field = reactorVal.FieldByName("tempDir") 263 + if !field.IsValid() { 264 + return stacktrace.NewError("tempDir field not valid") 265 + } 266 + 267 + field = reflect.NewAt(field.Type(), unsafe.Pointer(field.UnsafeAddr())).Elem() 268 + field.SetString(tempDir) 269 + 270 + return nil 271 + }