A very experimental PLC implementation which uses BFT consensus for decentralization

Fix multiple authoritative import bugs

gbl08ma.com 67bf9fda f1b03f77

verified
+19 -8
+10
abciapp/app.go
··· 219 return stacktrace.Propagate(err) 220 } 221 222 return nil 223 } 224
··· 219 return stacktrace.Propagate(err) 220 } 221 222 + // ensure we resume importing even if there is no pending AuthoritativeImport tx 223 + readTx := d.txFactory.ReadWorking(time.Now()) 224 + 225 + plc, err := store.Consensus.AuthoritativePLC(readTx) 226 + if err != nil { 227 + return stacktrace.Propagate(err) 228 + } 229 + 230 + _ = d.buildAuthoritativeOperationsFetcher(plc) 231 + 232 return nil 233 } 234
+1 -1
abciapp/execution.go
··· 318 d.destroyAuthoritativeOperationsFetcher() 319 } 320 321 - if d.aoc == nil { 322 d.aoc = newAuthoritativeOperationsFetcher(d.runnerContext, d.logger, plc, d.triggerBlockCreation) 323 } 324
··· 318 d.destroyAuthoritativeOperationsFetcher() 319 } 320 321 + if d.aoc == nil && plc != "" { 322 d.aoc = newAuthoritativeOperationsFetcher(d.runnerContext, d.logger, plc, d.triggerBlockCreation) 323 } 324
+6 -5
abciapp/import.go
··· 97 } 98 99 func (a *authoritativeOperationsFetcher) launchWebsocketClient(ctx context.Context, cursor uint64) { 100 - ctx, cancel := context.WithTimeout(ctx, 30*time.Second) 101 - defer cancel() 102 - 103 url, err := url.Parse(a.plcURL) 104 if err != nil { 105 // this really shouldn't happen ··· 108 } 109 110 url.Scheme = "wss" 111 - url = url.JoinPath(a.plcURL, "/export/stream") 112 q := url.Query() 113 q.Add("cursor", fmt.Sprint(cursor)) 114 url.RawQuery = q.Encode() 115 116 header := http.Header{} 117 header.Set("User-Agent", SoftwareUserAgent) 118 - c, _, err := websocket.Dial(ctx, url.String(), &websocket.DialOptions{ 119 HTTPClient: a.client, 120 HTTPHeader: header, 121 })
··· 97 } 98 99 func (a *authoritativeOperationsFetcher) launchWebsocketClient(ctx context.Context, cursor uint64) { 100 url, err := url.Parse(a.plcURL) 101 if err != nil { 102 // this really shouldn't happen ··· 105 } 106 107 url.Scheme = "wss" 108 + url = url.JoinPath("/export/stream") 109 q := url.Query() 110 q.Add("cursor", fmt.Sprint(cursor)) 111 url.RawQuery = q.Encode() 112 113 header := http.Header{} 114 header.Set("User-Agent", SoftwareUserAgent) 115 + 116 + dialCtx, cancel := context.WithTimeout(ctx, 30*time.Second) 117 + defer cancel() 118 + 119 + c, _, err := websocket.Dial(dialCtx, url.String(), &websocket.DialOptions{ 120 HTTPClient: a.client, 121 HTTPHeader: header, 122 })
+2 -2
abciapp/tx_import.go
··· 107 return nil, stacktrace.Propagate(err) 108 } 109 110 - if expectedPlcUrl != tx.Arguments.PLCURL { 111 return &processResult{ 112 Code: 4110, 113 Info: "Unexpected Authoritative PLC URL", ··· 128 }, nil 129 } 130 131 - if tx.Arguments.Count > MaxOpsPerImportTx { 132 return &processResult{ 133 Code: 4112, 134 Info: "Unexpected import count",
··· 107 return nil, stacktrace.Propagate(err) 108 } 109 110 + if expectedPlcUrl != tx.Arguments.PLCURL || expectedPlcUrl == "" { 111 return &processResult{ 112 Code: 4110, 113 Info: "Unexpected Authoritative PLC URL", ··· 128 }, nil 129 } 130 131 + if tx.Arguments.Count > MaxOpsPerImportTx || tx.Arguments.Count == 0 { 132 return &processResult{ 133 Code: 4112, 134 Info: "Unexpected import count",