A very experimental PLC implementation which uses BFT consensus for decentralization
at main 192 lines 5.5 kB view raw
1package abciapp 2 3import ( 4 "context" 5 "encoding/hex" 6 "net/url" 7 8 "github.com/gbl08ma/stacktrace" 9 cbornode "github.com/ipfs/go-ipld-cbor" 10 "tangled.org/gbl08ma.com/didplcbft/store" 11) 12 13var TransactionActionSetAuthoritativePlc = registerTransactionAction[SetAuthoritativePlcArguments]("SetAuthoritativePlc", processSetAuthoritativePlcTx) 14 15type SetAuthoritativePlcArguments struct { 16 PLCURL string `json:"plcURL" refmt:"plcURL"` 17 RestartImport bool `json:"restartImport" refmt:"restartImport"` 18} 19 20func (SetAuthoritativePlcArguments) ForAction() TransactionAction { 21 return TransactionActionSetAuthoritativePlc 22} 23 24func init() { 25 cbornode.RegisterCborType(SetAuthoritativePlcArguments{}) 26 cbornode.RegisterCborType(Transaction[SetAuthoritativePlcArguments]{}) 27} 28 29func processSetAuthoritativePlcTx(ctx context.Context, deps TransactionProcessorDependencies, txBytes []byte) (*processResult, error) { 30 tx, err := UnmarshalTransaction[SetAuthoritativePlcArguments](txBytes) 31 if err != nil { 32 return &processResult{ 33 Code: 4000, 34 Log: err.Error(), 35 }, nil 36 } 37 38 // TODO this transaction must somehow validate that whoever submitted it has the permission to change this 39 // Does it even make sense to keep this operation type as something submitted via the mempool in the long run, 40 // or would it be tied to some sort of proposal/participation system, where the validators submit this operation type in response to some on-chain trigger? 41 42 // A simple solution in the short term might be to just validate a "simple" public-private signature + an expiry timestamp (to prevent replay attacks) 43 // which would both be part of the SetAuthoritativePlcArguments. Very centralized, but very straightforward 44 // (the public key would be part of the config or even hardcoded for good measure) 45 46 if tx.Arguments.PLCURL != "" { 47 parsed, err := url.Parse(tx.Arguments.PLCURL) 48 if err != nil || parsed.Scheme != "https" { 49 return &processResult{ 50 Code: 4100, 51 Log: "Malformed Authoritative PLC URL", 52 }, nil 53 } 54 } 55 56 if writeTx, ok := deps.writeTx.Get(); ok { 57 err = store.Consensus.SetAuthoritativePLC(writeTx, tx.Arguments.PLCURL) 58 if err != nil { 59 return nil, stacktrace.Propagate(err) 60 } 61 62 if tx.Arguments.RestartImport { 63 err = store.Consensus.SetAuthoritativeImportProgress(writeTx, 0) 64 if err != nil { 65 return nil, stacktrace.Propagate(err) 66 } 67 } 68 } 69 70 return &processResult{ 71 Code: 0, 72 commitSideEffects: []func(){ 73 deps.destroyAuthoritativeOperationsFetcher, 74 }, 75 }, nil 76} 77 78var TransactionActionAuthoritativeImport = registerTransactionAction[AuthoritativeImportArguments]("AuthoritativeImport", processAuthoritativeImportTx) 79 80type AuthoritativeImportArguments struct { 81 PLCURL string `json:"plcURL" refmt:"plcURL"` 82 Cursor uint64 `json:"cursor" refmt:"cursor"` 83 Count uint64 `json:"count" refmt:"count"` 84 Hash string `json:"hash" refmt:"hash"` 85} 86 87func (AuthoritativeImportArguments) ForAction() TransactionAction { 88 return TransactionActionAuthoritativeImport 89} 90 91func init() { 92 cbornode.RegisterCborType(AuthoritativeImportArguments{}) 93 cbornode.RegisterCborType(Transaction[AuthoritativeImportArguments]{}) 94} 95 96func processAuthoritativeImportTx(ctx context.Context, deps TransactionProcessorDependencies, txBytes []byte) (*processResult, error) { 97 tx, err := UnmarshalTransaction[AuthoritativeImportArguments](txBytes) 98 if err != nil { 99 return &processResult{ 100 Code: 4000, 101 Log: err.Error(), 102 }, nil 103 } 104 105 expectedPlcUrl, err := store.Consensus.AuthoritativePLC(deps.readTx) 106 if err != nil { 107 return nil, stacktrace.Propagate(err) 108 } 109 110 if expectedPlcUrl != tx.Arguments.PLCURL || expectedPlcUrl == "" { 111 return &processResult{ 112 Code: 4110, 113 Log: "Unexpected Authoritative PLC URL", 114 }, nil 115 } 116 117 fetcher := deps.getAuthoritativeOperationsFetcher(expectedPlcUrl) 118 119 expectedCursor, err := store.Consensus.AuthoritativeImportProgress(deps.readTx) 120 if err != nil { 121 return nil, stacktrace.Propagate(err) 122 } 123 124 if expectedCursor != tx.Arguments.Cursor { 125 return &processResult{ 126 Code: 4111, 127 Log: "Unexpected import cursor", 128 }, nil 129 } 130 131 if tx.Arguments.Count > MaxOpsPerImportTx || tx.Arguments.Count == 0 { 132 return &processResult{ 133 Code: 4112, 134 Log: "Unexpected import count", 135 }, nil 136 } 137 138 operations, err := fetcher.get(ctx, expectedCursor, tx.Arguments.Count) 139 if err != nil { 140 return &processResult{ 141 Code: 4113, 142 Log: "Failure to obtain authoritative operations", 143 }, nil 144 } 145 146 if uint64(len(operations)) < tx.Arguments.Count { 147 return &processResult{ 148 Code: 4114, 149 Log: "Unexpected import count", 150 }, nil 151 } 152 153 expectedHashBytes, err := computeLogEntriesHash(operations) 154 if err != nil { 155 return nil, stacktrace.Propagate(err) 156 } 157 158 if hex.EncodeToString(expectedHashBytes) != tx.Arguments.Hash { 159 return &processResult{ 160 Code: 4115, 161 Log: "Unexpected import hash", 162 }, nil 163 } 164 165 newCursor := expectedCursor 166 if len(operations) > 0 { 167 newCursor = operations[len(operations)-1].Seq 168 } 169 170 if writeTx, ok := deps.writeTx.Get(); ok { 171 for _, op := range operations { 172 err := deps.plc.ImportOperationFromAuthoritativeSource(ctx, writeTx, op.LogEntry) 173 if err != nil { 174 return nil, stacktrace.Propagate(err) 175 } 176 } 177 178 err = store.Consensus.SetAuthoritativeImportProgress(writeTx, newCursor) 179 if err != nil { 180 return nil, stacktrace.Propagate(err) 181 } 182 } 183 184 return &processResult{ 185 commitSideEffects: []func(){ 186 func() { 187 fetcher.setImportProgress(newCursor) 188 }, 189 }, 190 Code: 0, 191 }, nil 192}