A very experimental PLC implementation which uses BFT consensus for decentralization
1package abciapp
2
3import (
4 "context"
5 "encoding/hex"
6 "net/url"
7
8 "github.com/gbl08ma/stacktrace"
9 cbornode "github.com/ipfs/go-ipld-cbor"
10 "tangled.org/gbl08ma.com/didplcbft/store"
11)
12
13var TransactionActionSetAuthoritativePlc = registerTransactionAction[SetAuthoritativePlcArguments]("SetAuthoritativePlc", processSetAuthoritativePlcTx)
14
15type SetAuthoritativePlcArguments struct {
16 PLCURL string `json:"plcURL" refmt:"plcURL"`
17 RestartImport bool `json:"restartImport" refmt:"restartImport"`
18}
19
20func (SetAuthoritativePlcArguments) ForAction() TransactionAction {
21 return TransactionActionSetAuthoritativePlc
22}
23
24func init() {
25 cbornode.RegisterCborType(SetAuthoritativePlcArguments{})
26 cbornode.RegisterCborType(Transaction[SetAuthoritativePlcArguments]{})
27}
28
29func processSetAuthoritativePlcTx(ctx context.Context, deps TransactionProcessorDependencies, txBytes []byte) (*processResult, error) {
30 tx, err := UnmarshalTransaction[SetAuthoritativePlcArguments](txBytes)
31 if err != nil {
32 return &processResult{
33 Code: 4000,
34 Log: err.Error(),
35 }, nil
36 }
37
38 // TODO this transaction must somehow validate that whoever submitted it has the permission to change this
39 // Does it even make sense to keep this operation type as something submitted via the mempool in the long run,
40 // or would it be tied to some sort of proposal/participation system, where the validators submit this operation type in response to some on-chain trigger?
41
42 // A simple solution in the short term might be to just validate a "simple" public-private signature + an expiry timestamp (to prevent replay attacks)
43 // which would both be part of the SetAuthoritativePlcArguments. Very centralized, but very straightforward
44 // (the public key would be part of the config or even hardcoded for good measure)
45
46 if tx.Arguments.PLCURL != "" {
47 parsed, err := url.Parse(tx.Arguments.PLCURL)
48 if err != nil || parsed.Scheme != "https" {
49 return &processResult{
50 Code: 4100,
51 Log: "Malformed Authoritative PLC URL",
52 }, nil
53 }
54 }
55
56 if writeTx, ok := deps.writeTx.Get(); ok {
57 err = store.Consensus.SetAuthoritativePLC(writeTx, tx.Arguments.PLCURL)
58 if err != nil {
59 return nil, stacktrace.Propagate(err)
60 }
61
62 if tx.Arguments.RestartImport {
63 err = store.Consensus.SetAuthoritativeImportProgress(writeTx, 0)
64 if err != nil {
65 return nil, stacktrace.Propagate(err)
66 }
67 }
68 }
69
70 return &processResult{
71 Code: 0,
72 commitSideEffects: []func(){
73 deps.destroyAuthoritativeOperationsFetcher,
74 },
75 }, nil
76}
77
78var TransactionActionAuthoritativeImport = registerTransactionAction[AuthoritativeImportArguments]("AuthoritativeImport", processAuthoritativeImportTx)
79
80type AuthoritativeImportArguments struct {
81 PLCURL string `json:"plcURL" refmt:"plcURL"`
82 Cursor uint64 `json:"cursor" refmt:"cursor"`
83 Count uint64 `json:"count" refmt:"count"`
84 Hash string `json:"hash" refmt:"hash"`
85}
86
87func (AuthoritativeImportArguments) ForAction() TransactionAction {
88 return TransactionActionAuthoritativeImport
89}
90
91func init() {
92 cbornode.RegisterCborType(AuthoritativeImportArguments{})
93 cbornode.RegisterCborType(Transaction[AuthoritativeImportArguments]{})
94}
95
96func processAuthoritativeImportTx(ctx context.Context, deps TransactionProcessorDependencies, txBytes []byte) (*processResult, error) {
97 tx, err := UnmarshalTransaction[AuthoritativeImportArguments](txBytes)
98 if err != nil {
99 return &processResult{
100 Code: 4000,
101 Log: err.Error(),
102 }, nil
103 }
104
105 expectedPlcUrl, err := store.Consensus.AuthoritativePLC(deps.readTx)
106 if err != nil {
107 return nil, stacktrace.Propagate(err)
108 }
109
110 if expectedPlcUrl != tx.Arguments.PLCURL || expectedPlcUrl == "" {
111 return &processResult{
112 Code: 4110,
113 Log: "Unexpected Authoritative PLC URL",
114 }, nil
115 }
116
117 fetcher := deps.getAuthoritativeOperationsFetcher(expectedPlcUrl)
118
119 expectedCursor, err := store.Consensus.AuthoritativeImportProgress(deps.readTx)
120 if err != nil {
121 return nil, stacktrace.Propagate(err)
122 }
123
124 if expectedCursor != tx.Arguments.Cursor {
125 return &processResult{
126 Code: 4111,
127 Log: "Unexpected import cursor",
128 }, nil
129 }
130
131 if tx.Arguments.Count > MaxOpsPerImportTx || tx.Arguments.Count == 0 {
132 return &processResult{
133 Code: 4112,
134 Log: "Unexpected import count",
135 }, nil
136 }
137
138 operations, err := fetcher.get(ctx, expectedCursor, tx.Arguments.Count)
139 if err != nil {
140 return &processResult{
141 Code: 4113,
142 Log: "Failure to obtain authoritative operations",
143 }, nil
144 }
145
146 if uint64(len(operations)) < tx.Arguments.Count {
147 return &processResult{
148 Code: 4114,
149 Log: "Unexpected import count",
150 }, nil
151 }
152
153 expectedHashBytes, err := computeLogEntriesHash(operations)
154 if err != nil {
155 return nil, stacktrace.Propagate(err)
156 }
157
158 if hex.EncodeToString(expectedHashBytes) != tx.Arguments.Hash {
159 return &processResult{
160 Code: 4115,
161 Log: "Unexpected import hash",
162 }, nil
163 }
164
165 newCursor := expectedCursor
166 if len(operations) > 0 {
167 newCursor = operations[len(operations)-1].Seq
168 }
169
170 if writeTx, ok := deps.writeTx.Get(); ok {
171 for _, op := range operations {
172 err := deps.plc.ImportOperationFromAuthoritativeSource(ctx, writeTx, op.LogEntry)
173 if err != nil {
174 return nil, stacktrace.Propagate(err)
175 }
176 }
177
178 err = store.Consensus.SetAuthoritativeImportProgress(writeTx, newCursor)
179 if err != nil {
180 return nil, stacktrace.Propagate(err)
181 }
182 }
183
184 return &processResult{
185 commitSideEffects: []func(){
186 func() {
187 fetcher.setImportProgress(newCursor)
188 },
189 },
190 Code: 0,
191 }, nil
192}