A very experimental PLC implementation which uses BFT consensus for decentralization

Complete bulk of initial authoritative import logic

gbl08ma.com fb2d74d9 d984008e

verified
+372 -333
+4 -1
abciapp/app.go
··· 26 26 snapshotApplier *snapshotApplier 27 27 28 28 lastProcessedProposalHash []byte 29 - lastProcessedProposalExecTxResults []*abcitypes.ExecTxResult 29 + lastProcessedProposalExecTxResults []*processResult 30 + 31 + aocsByPLC map[string]*authoritativeOperationsCache 30 32 } 31 33 32 34 // store and plc must be able to share transaction objects ··· 51 53 d := &DIDPLCApplication{ 52 54 tree: tree, 53 55 snapshotDirectory: snapshotDirectory, 56 + aocsByPLC: make(map[string]*authoritativeOperationsCache), 54 57 } 55 58 d.fullyClearTree = func() error { 56 59 // we assume this is called in a single-threaded context, which should be a safe assumption since we'll only call this during snapshot import
+67 -50
abciapp/execution.go
··· 9 9 abcitypes "github.com/cometbft/cometbft/abci/types" 10 10 "github.com/palantir/stacktrace" 11 11 "github.com/samber/lo" 12 + "tangled.org/gbl08ma/didplcbft/store" 12 13 ) 13 14 14 15 // InitChain implements [types.Application]. ··· 21 22 func (d *DIDPLCApplication) PrepareProposal(ctx context.Context, req *abcitypes.RequestPrepareProposal) (*abcitypes.ResponsePrepareProposal, error) { 22 23 defer d.tree.Rollback() 23 24 24 - deps := TransactionProcessorDependencies{ 25 - plc: d.plc, 26 - tree: d, 27 - } 28 - 29 25 st := time.Now() 30 26 acceptedTx := make([][]byte, 0, len(req.Txs)) 31 27 toProcess := req.Txs 32 28 for { 33 29 toTryNext := [][]byte{} 34 30 for _, tx := range toProcess { 35 - result, err := processTx(ctx, deps, tx, req.Time, true) 31 + result, err := processTx(ctx, d.transactionProcessorDependencies(), tx, req.Time, true) 36 32 if err != nil { 37 33 return nil, stacktrace.Propagate(err, "") 38 34 } 39 35 40 - if result.IsAuthoritativeImportTransaction { 36 + if result.isAuthoritativeImportTransaction { 41 37 // this type of transaction is not meant to appear in the mempool, 42 38 // but maybe it's not impossible that a non-compliant node could have gossiped it to us? 43 39 // (not sure if CometBFT checks transactions coming from other peers against CheckTx) ··· 65 61 toProcess = toTryNext 66 62 } 67 63 68 - totalSize := lo.SumBy(acceptedTx, func(tx []byte) int { return len(tx) }) 69 - if totalSize < int(req.MaxTxBytes)-4096 { 70 - // we have space to fit an import transaction 71 - // TODO 64 + maybeTx, err := d.maybeCreateAuthoritativeImportTx(ctx) 65 + if err != nil { 66 + // TODO don't fail absolutely silently always, we should at least check what the error is 67 + //return nil, stacktrace.Propagate(err, "") 68 + } 69 + 70 + if err == nil && len(maybeTx) != 0 { 71 + totalSize := lo.SumBy(acceptedTx, func(tx []byte) int { return len(tx) }) 72 + // 4K safety margin 73 + if totalSize+len(maybeTx) < int(req.MaxTxBytes)-4096 { 74 + // we have space to fit the import transaction 75 + 76 + result, err := processTx(ctx, d.transactionProcessorDependencies(), maybeTx, req.Time, true) 77 + if err != nil { 78 + return nil, stacktrace.Propagate(err, "") 79 + } 80 + if result.Code == 0 { 81 + acceptedTx = append(acceptedTx, maybeTx) 82 + } 83 + } 72 84 } 73 85 74 86 return &abcitypes.ResponsePrepareProposal{Txs: acceptedTx}, nil ··· 82 94 return &abcitypes.ResponseProcessProposal{Status: abcitypes.ResponseProcessProposal_REJECT}, nil 83 95 } 84 96 97 + if req.Height == 1 { 98 + tree, err := d.MutableTree() 99 + if err != nil { 100 + return nil, stacktrace.Propagate(err, "") 101 + } 102 + 103 + err = store.Tree.SetAuthoritativePLC(tree, "https://plc.directory") 104 + if err != nil { 105 + return nil, stacktrace.Propagate(err, "") 106 + } 107 + } 108 + 85 109 // if we return early, ensure we don't use incomplete results where we haven't voted ACCEPT 86 110 d.lastProcessedProposalHash = nil 87 111 d.lastProcessedProposalExecTxResults = nil ··· 93 117 } 94 118 }() 95 119 96 - deps := TransactionProcessorDependencies{ 97 - plc: d.plc, 98 - tree: d, 99 - } 100 - 101 - txResults := make([]*abcitypes.ExecTxResult, len(req.Txs)) 120 + txResults := make([]*processResult, len(req.Txs)) 102 121 for i, tx := range req.Txs { 103 - result, err := processTx(ctx, deps, tx, req.Time, true) 122 + result, err := processTx(ctx, d.transactionProcessorDependencies(), tx, req.Time, true) 104 123 if err != nil { 105 124 return nil, stacktrace.Propagate(err, "") 106 125 } ··· 110 129 return &abcitypes.ResponseProcessProposal{Status: abcitypes.ResponseProcessProposal_REJECT}, nil 111 130 } 112 131 113 - if result.IsAuthoritativeImportTransaction && i != len(req.Txs)-1 { 132 + if result.isAuthoritativeImportTransaction && i != len(req.Txs)-1 { 114 133 // if an Authoritative Import transaction is present on the block, it must be the last one 115 134 return &abcitypes.ResponseProcessProposal{Status: abcitypes.ResponseProcessProposal_REJECT}, nil 116 135 } 117 136 118 - txResults[i] = &abcitypes.ExecTxResult{ 119 - Code: result.Code, 120 - Data: result.Data, 121 - Log: result.Log, 122 - Info: result.Info, 123 - GasWanted: result.GasWanted, 124 - GasUsed: result.GasUsed, 125 - Events: result.Events, 126 - Codespace: result.Codespace, 127 - } 137 + txResults[i] = result 128 138 } 129 139 130 140 d.lastProcessedProposalHash = slices.Clone(req.Hash) ··· 151 161 // the block that was decided was the one we processed in ProcessProposal, and ProcessProposal processed successfully 152 162 // reuse the uncommitted results 153 163 return &abcitypes.ResponseFinalizeBlock{ 154 - TxResults: d.lastProcessedProposalExecTxResults, 155 - AppHash: d.tree.WorkingHash(), 164 + TxResults: lo.Map(d.lastProcessedProposalExecTxResults, func(result *processResult, _ int) *abcitypes.ExecTxResult { 165 + return result.ToABCI() 166 + }), 167 + AppHash: d.tree.WorkingHash(), 156 168 }, nil 157 169 } 158 170 // a block other than the one we processed in ProcessProposal was decided 159 171 // discard the current modified state, and process the decided block 160 172 d.tree.Rollback() 161 173 162 - deps := TransactionProcessorDependencies{ 163 - plc: d.plc, 164 - tree: d, 165 - } 166 - 167 - txResults := make([]*abcitypes.ExecTxResult, len(req.Txs)) 174 + txResults := make([]*processResult, len(req.Txs)) 168 175 for i, tx := range req.Txs { 169 - result, err := processTx(ctx, deps, tx, req.Time, true) 176 + var err error 177 + txResults[i], err = processTx(ctx, d.transactionProcessorDependencies(), tx, req.Time, true) 170 178 if err != nil { 171 179 return nil, stacktrace.Propagate(err, "") 172 180 } 173 - txResults[i] = &abcitypes.ExecTxResult{ 174 - Code: result.Code, 175 - Data: result.Data, 176 - Log: result.Log, 177 - Info: result.Info, 178 - GasWanted: result.GasWanted, 179 - GasUsed: result.GasUsed, 180 - Events: result.Events, 181 - Codespace: result.Codespace, 182 - } 183 181 } 184 182 183 + d.lastProcessedProposalHash = slices.Clone(req.Hash) 184 + d.lastProcessedProposalExecTxResults = txResults 185 + 185 186 return &abcitypes.ResponseFinalizeBlock{ 186 - TxResults: txResults, 187 - AppHash: d.tree.WorkingHash(), 187 + TxResults: lo.Map(d.lastProcessedProposalExecTxResults, func(result *processResult, _ int) *abcitypes.ExecTxResult { 188 + return result.ToABCI() 189 + }), 190 + AppHash: d.tree.WorkingHash(), 188 191 }, nil 189 192 } 190 193 ··· 195 198 return nil, stacktrace.Propagate(err, "") 196 199 } 197 200 201 + for _, r := range d.lastProcessedProposalExecTxResults { 202 + for _, cb := range r.commitSideEffects { 203 + cb() 204 + } 205 + } 206 + 198 207 // TODO(later) consider whether we can set some RetainHeight in the response 199 208 return &abcitypes.ResponseCommit{}, nil 200 209 } 210 + 211 + func (d *DIDPLCApplication) transactionProcessorDependencies() TransactionProcessorDependencies { 212 + return TransactionProcessorDependencies{ 213 + plc: d.plc, 214 + tree: d, 215 + aocsByPLC: d.aocsByPLC, 216 + } 217 + }
+154 -16
abciapp/import.go
··· 5 5 "context" 6 6 "crypto/sha256" 7 7 "encoding/binary" 8 + "encoding/hex" 8 9 "encoding/json" 9 10 "fmt" 10 11 "net/http" 11 12 "net/url" 13 + "sync" 12 14 "time" 13 15 14 16 "github.com/bluesky-social/indigo/atproto/syntax" 15 17 "github.com/did-method-plc/go-didplc" 16 18 "github.com/ipfs/go-cid" 19 + cbornode "github.com/ipfs/go-ipld-cbor" 17 20 "github.com/palantir/stacktrace" 18 - "github.com/samber/lo" 21 + "tangled.org/gbl08ma/didplcbft/plc" 19 22 "tangled.org/gbl08ma/didplcbft/store" 20 23 ) 21 24 22 - func fetchExportedBatchFromAuthoritativeSource(ctx context.Context, plcURL string, startAt, maxCount uint64) ([]didplc.LogEntry, uint64, error) { 25 + type authoritativeOperationsCache struct { 26 + mu sync.Mutex 27 + 28 + plcURL string 29 + operations map[uint64]logEntryWithSeq 30 + } 31 + 32 + type logEntryWithSeq struct { 33 + didplc.LogEntry 34 + Seq uint64 `json:"seq"` 35 + } 36 + 37 + func newAuthoritativeOperationsCache(plc string) *authoritativeOperationsCache { 38 + return &authoritativeOperationsCache{ 39 + plcURL: plc, 40 + operations: make(map[uint64]logEntryWithSeq), 41 + } 42 + } 43 + 44 + func getOrCreateAuthoritativeOperationsCache(aocsByPLC map[string]*authoritativeOperationsCache, plc string) *authoritativeOperationsCache { 45 + aoc, ok := aocsByPLC[plc] 46 + if !ok { 47 + aoc = newAuthoritativeOperationsCache(plc) 48 + aocsByPLC[plc] = aoc 49 + } 50 + return aoc 51 + } 52 + 53 + func (a *authoritativeOperationsCache) dropSeqBelowOrEqual(highestCommittedSeq uint64) { 54 + a.mu.Lock() 55 + defer a.mu.Unlock() 56 + 57 + for i := range a.operations { 58 + if a.operations[i].Seq <= highestCommittedSeq { 59 + delete(a.operations, i) 60 + } 61 + } 62 + } 63 + 64 + func (a *authoritativeOperationsCache) fetchInMutex(ctx context.Context, after, count uint64) (bool, error) { 65 + entries, _, err := fetchExportedBatchFromAuthoritativeSource(ctx, a.plcURL, after, count) 66 + if err != nil { 67 + return false, stacktrace.Propagate(err, "") 68 + } 69 + 70 + for _, entry := range entries { 71 + a.operations[entry.Seq] = entry 72 + } 73 + return uint64(len(entries)) < count, nil 74 + } 75 + 76 + func (a *authoritativeOperationsCache) get(ctx context.Context, after, count uint64) ([]logEntryWithSeq, error) { 77 + a.mu.Lock() 78 + defer a.mu.Unlock() 79 + 80 + result := make([]logEntryWithSeq, 0, count) 81 + reachedEnd := false 82 + for i := uint64(0); uint64(len(result)) < count; i++ { 83 + opSeq := after + i + 1 84 + op, ok := a.operations[opSeq] 85 + if !ok { 86 + if reachedEnd { 87 + // it's because we are asking about ops that don't exist yet, return 88 + break 89 + } 90 + 91 + re, err := a.fetchInMutex(ctx, after+i, count) 92 + if err != nil { 93 + return nil, stacktrace.Propagate(err, "") 94 + } 95 + 96 + reachedEnd = reachedEnd || re 97 + 98 + op, ok = a.operations[opSeq] 99 + if !ok { 100 + // still not present even after fetching 101 + // the authoritative source probably skipped this seq? 102 + continue 103 + } 104 + } 105 + 106 + result = append(result, op) 107 + } 108 + 109 + return result, nil 110 + } 111 + 112 + func fetchExportedBatchFromAuthoritativeSource(ctx context.Context, plcURL string, startAt, maxCount uint64) ([]logEntryWithSeq, uint64, error) { 23 113 baseURL, err := url.JoinPath(plcURL, "/export") 24 114 if err != nil { 25 115 return nil, 0, stacktrace.Propagate(err, "") ··· 27 117 28 118 client := &http.Client{Timeout: 30 * time.Second} 29 119 30 - entries := make([]didplc.LogEntry, 0, maxCount) 120 + entries := make([]logEntryWithSeq, 0, maxCount) 31 121 for { 32 122 req, err := http.NewRequestWithContext(ctx, "GET", baseURL, nil) 33 123 if err != nil { 34 124 return nil, 0, stacktrace.Propagate(err, "") 35 125 } 36 126 37 - req.Header.Set("User-Agent", "go-did-method-plc") 127 + req.Header.Set("User-Agent", "didplcbft") 38 128 39 129 requestCount := min(1000, maxCount-uint64(len(entries))) 40 130 ··· 53 143 return nil, 0, stacktrace.NewError("non-200 status code") 54 144 } 55 145 56 - type logEntryWithSeq struct { 57 - didplc.LogEntry 58 - Seq uint64 `json:"seq"` 59 - } 60 - 61 146 // Read response body 62 147 s := bufio.NewScanner(resp.Body) 63 148 numEntriesThisResponse := 0 ··· 66 151 if err := json.Unmarshal(s.Bytes(), &entry); err != nil { 67 152 return nil, 0, stacktrace.Propagate(err, "") 68 153 } 69 - entries = append(entries, entry.LogEntry) 154 + entries = append(entries, entry) 70 155 numEntriesThisResponse++ 71 156 startAt = entry.Seq 72 157 } ··· 82 167 return entries, startAt, nil 83 168 } 84 169 85 - func computeLogEntriesHash(logEntries []didplc.LogEntry) ([]byte, error) { 170 + func computeLogEntriesHash(logEntries []logEntryWithSeq) ([]byte, error) { 86 171 // let's _not_ rely on the specifics of the JSON representation 87 172 // (instead let's rely on specifics of our implementation, heh) 88 173 ··· 125 210 return nil, stacktrace.Propagate(err, "") 126 211 } 127 212 128 - // Write Nullified 129 - _, err = hash.Write([]byte{lo.Ternary[byte](entry.Nullified, 1, 0)}) 130 - if err != nil { 131 - return nil, stacktrace.Propagate(err, "") 132 - } 213 + // Nullified can't be part of the hash as it can change on the authoritative source at any moment, 214 + // we always import operations as if they weren't nullified and recompute the nullification status as needed 133 215 } 134 216 135 217 return hash.Sum(nil), nil 136 218 } 219 + 220 + func (d *DIDPLCApplication) maybeCreateAuthoritativeImportTx(ctx context.Context) ([]byte, error) { 221 + // use WorkingTreeVersion so we take into account any import operation that may have been processed in this block 222 + roTree, err := d.ImmutableTree(plc.WorkingTreeVersion) 223 + if err != nil { 224 + return nil, stacktrace.Propagate(err, "") 225 + } 226 + 227 + plcURL, err := store.Tree.AuthoritativePLC(roTree) 228 + if err != nil { 229 + return nil, stacktrace.Propagate(err, "") 230 + } 231 + 232 + if plcURL == "" { 233 + // we're not doing imports 234 + return nil, nil 235 + } 236 + 237 + cursor, err := store.Tree.AuthoritativeImportProgress(roTree) 238 + if err != nil { 239 + return nil, stacktrace.Propagate(err, "") 240 + } 241 + 242 + aoc := getOrCreateAuthoritativeOperationsCache(d.aocsByPLC, plcURL) 243 + 244 + entries, err := aoc.get(ctx, cursor, 1000) 245 + if err != nil { 246 + return nil, stacktrace.Propagate(err, "") 247 + } 248 + 249 + if len(entries) == 0 { 250 + // nothing to import at the moment 251 + return nil, nil 252 + } 253 + 254 + hashBytes, err := computeLogEntriesHash(entries) 255 + if err != nil { 256 + return nil, stacktrace.Propagate(err, "") 257 + } 258 + 259 + tx := Transaction[AuthoritativeImportArguments]{ 260 + Action: TransactionActionAuthoritativeImport, 261 + Arguments: AuthoritativeImportArguments{ 262 + PLCURL: plcURL, 263 + Hash: hex.EncodeToString(hashBytes), 264 + Cursor: cursor, 265 + Count: uint64(len(entries)), 266 + }, 267 + } 268 + 269 + out, err := cbornode.DumpObject(tx) 270 + if err != nil { 271 + return nil, stacktrace.Propagate(err, "") 272 + } 273 + return out, nil 274 + }
+2 -7
abciapp/mempool.go
··· 10 10 11 11 // CheckTx implements [types.Application]. 12 12 func (d *DIDPLCApplication) CheckTx(ctx context.Context, req *abcitypes.RequestCheckTx) (*abcitypes.ResponseCheckTx, error) { 13 - deps := TransactionProcessorDependencies{ 14 - plc: d.plc, 15 - tree: d, 16 - } 17 - 18 - result, err := processTx(ctx, deps, req.Tx, time.Now(), false) 13 + result, err := processTx(ctx, d.transactionProcessorDependencies(), req.Tx, time.Now(), false) 19 14 if err != nil { 20 15 return nil, stacktrace.Propagate(err, "") 21 16 } 22 - if result.IsAuthoritativeImportTransaction { 17 + if result.isAuthoritativeImportTransaction { 23 18 // this type of transaction is meant to be included only by validator nodes 24 19 return &abcitypes.ResponseCheckTx{ 25 20 Code: 4002,
+18 -3
abciapp/tx.go
··· 18 18 type TransactionAction string 19 19 20 20 type TransactionProcessorDependencies struct { 21 - plc plc.PLC 22 - tree plc.TreeProvider // TODO maybe we should move the TreeProvider definition out of the plc package then? 21 + plc plc.PLC 22 + tree plc.TreeProvider // TODO maybe we should move the TreeProvider definition out of the plc package then? 23 + aocsByPLC map[string]*authoritativeOperationsCache 23 24 } 24 25 25 26 type TransactionProcessor func(ctx context.Context, deps TransactionProcessorDependencies, txBytes []byte, atTime time.Time, execute bool) (*processResult, error) ··· 82 83 } 83 84 84 85 type processResult struct { 85 - IsAuthoritativeImportTransaction bool 86 + isAuthoritativeImportTransaction bool 87 + commitSideEffects []func() 86 88 87 89 Code uint32 88 90 Data []byte ··· 92 94 GasUsed int64 93 95 Events []abcitypes.Event 94 96 Codespace string 97 + } 98 + 99 + func (result processResult) ToABCI() *abcitypes.ExecTxResult { 100 + return &abcitypes.ExecTxResult{ 101 + Code: result.Code, 102 + Data: result.Data, 103 + Log: result.Log, 104 + Info: result.Info, 105 + GasWanted: result.GasWanted, 106 + GasUsed: result.GasUsed, 107 + Events: result.Events, 108 + Codespace: result.Codespace, 109 + } 95 110 } 96 111 97 112 func processTx(ctx context.Context, deps TransactionProcessorDependencies, txBytes []byte, atTime time.Time, execute bool) (*processResult, error) {
+29 -26
abciapp/tx_import.go
··· 6 6 "net/url" 7 7 "time" 8 8 9 - "github.com/did-method-plc/go-didplc" 10 9 cbornode "github.com/ipfs/go-ipld-cbor" 11 10 "github.com/palantir/stacktrace" 12 11 "tangled.org/gbl08ma/didplcbft/plc" ··· 123 122 }, nil 124 123 } 125 124 125 + aoc := getOrCreateAuthoritativeOperationsCache(deps.aocsByPLC, expectedPlcUrl) 126 + 126 127 expectedCursor, err := store.Tree.AuthoritativeImportProgress(roTree) 127 128 if err != nil { 128 129 return nil, stacktrace.Propagate(err, "") ··· 135 136 }, nil 136 137 } 137 138 138 - // TODO this shouldn't be happening synchronously! We should always be ahead of the next transaction! 139 - // or at the very least it should only happen once (e.g. when processing the proposal) and then we should cache until it expires or until we actually commit 140 - operations, newCursor, err := fetchExportedBatchFromAuthoritativeSource(ctx, expectedPlcUrl, expectedCursor, tx.Arguments.Count) 139 + operations, err := aoc.get(ctx, expectedCursor, tx.Arguments.Count) 141 140 if err != nil { 142 - // returning an actual error like this means "consensus failure". Probably not the best way to deal with this, we would rather drop the transaction if not all nodes can fetch the same thing 143 - // TODO investigate 144 - return nil, stacktrace.Propagate(err, "") 141 + return &processResult{ 142 + Code: 4112, 143 + Info: "Failure to obtain authoritative operations", 144 + }, nil 145 + } 146 + 147 + if uint64(len(operations)) < tx.Arguments.Count { 148 + return &processResult{ 149 + Code: 4113, 150 + Info: "Unexpected import count", 151 + }, nil 145 152 } 146 153 147 154 expectedHashBytes, err := computeLogEntriesHash(operations) ··· 151 158 152 159 if hex.EncodeToString(expectedHashBytes) != tx.Arguments.Hash { 153 160 return &processResult{ 154 - Code: 4112, 161 + Code: 4114, 155 162 Info: "Unexpected import hash", 156 163 }, nil 164 + } 165 + 166 + newCursor := expectedCursor 167 + if len(operations) > 0 { 168 + newCursor = operations[len(operations)-1].Seq 157 169 } 158 170 159 171 if execute { ··· 162 174 return nil, stacktrace.Propagate(err, "") 163 175 } 164 176 165 - var client didplc.Client 166 177 for _, entry := range operations { 167 - err := deps.plc.ImportOperationFromAuthoritativeSource(ctx, entry, func() ([]didplc.LogEntry, error) { 168 - // TODO Oh NOOOOOOO! This is not deterministic 169 - // (fetched at different times, the audit log might grow, therefore we'll fetch and insert more ops, and change the apphash) 170 - // we need to either limit how much audit log we return (only doable if how much was fetched for each op was part of the tx, ugh) 171 - // or (probably preferred approach) make it so that the ImportOperationFromAuthoritativeSource / ReplaceHistory function only replaces up until the CID that's being imported, and no further ops 172 - // Even then there is a problem: what if the nullified status changes between imports :dizzy_face: 173 - // (can the nullified status change for the ops that are being imported only up until CID? Need to think) 174 - e, err := client.AuditLog(ctx, entry.DID) 175 - return e, stacktrace.Propagate(err, "") 176 - }) 178 + err := deps.plc.ImportOperationFromAuthoritativeSource(ctx, entry.LogEntry) 177 179 if err != nil { 178 180 return nil, stacktrace.Propagate(err, "") 179 181 } ··· 184 186 } 185 187 } 186 188 187 - // TODO finish implementation 188 - // 1. if execute is true: actually import the operations 189 - // 2. if execute is true: update AuthoritativeImportProgress 190 - 191 189 return &processResult{ 192 - IsAuthoritativeImportTransaction: true, 193 - Code: 0, 194 - }, stacktrace.NewError("not implemented") 190 + isAuthoritativeImportTransaction: true, 191 + commitSideEffects: []func(){ 192 + func() { 193 + aoc.dropSeqBelowOrEqual(newCursor) 194 + }, 195 + }, 196 + Code: 0, 197 + }, nil 195 198 }
+2
httpapi/server.go
··· 305 305 sendErrorResponse(w, http.StatusBadRequest, "Invalid count parameter") 306 306 return 307 307 } 308 + 309 + // TODO limit count to 1000 (for debugging it's more useful without limit) 308 310 } 309 311 310 312 afterStr := query.Get("after")
+1 -1
importer/importer_test.go
··· 197 197 return // Failed to create request 198 198 } 199 199 200 - req.Header.Set("User-Agent", "go-did-method-plc") 200 + req.Header.Set("User-Agent", "didplcbft") 201 201 202 202 q := req.URL.Query() 203 203 q.Add("count", fmt.Sprint(batchSize))
+68 -86
plc/impl.go
··· 80 80 return nil 81 81 } 82 82 83 - func (plc *plcImpl) ImportOperationFromAuthoritativeSource(ctx context.Context, newEntry didplc.LogEntry, 84 - authoritativeAuditLogFetcher func() ([]didplc.LogEntry, error)) error { 83 + func (plc *plcImpl) ImportOperationFromAuthoritativeSource(ctx context.Context, newEntry didplc.LogEntry) error { 85 84 plc.mu.Lock() 86 85 defer plc.mu.Unlock() 87 86 ··· 90 89 return stacktrace.Propagate(err, "failed to obtain mutable tree") 91 90 } 92 91 93 - l, _, err := store.Tree.AuditLog(tree, newEntry.DID, false) 94 - if err != nil { 95 - return stacktrace.Propagate(err, "") 96 - } 97 - 98 92 newCID := newEntry.CID 99 93 newPrev := newEntry.Operation.AsOperation().PrevCIDStr() 100 94 101 - newCreatedAtDT, err := syntax.ParseDatetime(newEntry.CreatedAt) 102 - if err != nil { 103 - return stacktrace.Propagate(err, "") 104 - } 105 - newCreatedAt := newCreatedAtDT.Time() 95 + mostRecentOpIndex := -1 96 + indexOfPrev := -1 97 + var iteratorErr error 98 + for entryIdx, entry := range store.Tree.AuditLogReverseIterator(tree, newEntry.DID, &iteratorErr) { 99 + entryCID := entry.CID.String() 100 + if mostRecentOpIndex == -1 { 101 + mostRecentOpIndex = entryIdx 102 + 103 + if newPrev == "" && entryCID != newCID { 104 + // this should never happen unless the authoritative source doesn't compute DIDs from genesis ops the way we do 105 + return stacktrace.NewError("invalid internal state reached") 106 + } 107 + } 108 + 109 + if entryCID == newCID { 110 + // should we already have an operation with the same CID, this condition should trigger before the next one 111 + // because this is a reverse iterator 112 + // looks like we already have the op we're trying to import. just need to update the timestamp 113 + newCreatedAtDT, err := syntax.ParseDatetime(newEntry.CreatedAt) 114 + if err != nil { 115 + return stacktrace.Propagate(err, "") 116 + } 117 + 118 + return stacktrace.Propagate( 119 + store.Tree.SetOperationCreatedAt(tree, entry.Seq, newCreatedAtDT.Time()), 120 + "") 121 + } 106 122 107 - mustFullyReplaceHistory := false 108 - for _, entry := range l { 109 - if entry.CreatedAt.After(newCreatedAt) { 110 - // We're trying to import an operation whose timestamp precedes one of the timestamps for operations we already know about 111 - // We'll need to discard all known history and import it anew using the authoritative source data (same as when dealing with sequence forks) 112 - mustFullyReplaceHistory = true 123 + if entryCID == newPrev { 124 + indexOfPrev = entryIdx 113 125 break 114 126 } 127 + } 128 + if iteratorErr != nil { 129 + return stacktrace.Propagate(iteratorErr, "") 130 + } 115 131 116 - if entry.CID.String() == newCID && entry.Nullified == newEntry.Nullified { 117 - // If an operation with the same CID already exists -> easy-ish 132 + nullifiedEntriesStartingIndex := mo.None[int]() 118 133 119 - // this operation is already present, there is nothing to do 120 - // TODO re-evaluate whether we want to still update the timestamp on the existing operation, as not doing this will cause the export from our impl to definitely not match the authoritative source 121 - // (Though, the actually damaging cases of incorrect createdAt are already handled by the prior check) 122 - return nil 134 + if mostRecentOpIndex < 0 { 135 + // we have nothing for this DID - this should be a creation op, if not, then we're not importing things in order 136 + if newPrev != "" { 137 + return stacktrace.NewError("invalid internal state reached") 123 138 } 124 - } 139 + 140 + // there's nothing to do but store the operation, no nullification involved 141 + newEntry.Nullified = false 125 142 126 - if len(l) == 0 || (!mustFullyReplaceHistory && l[len(l)-1].CID.String() == newPrev) { 127 - // If DID doesn't exist at all -> easy 128 - // If prev matches CID of latest operation, and resulting timestamp sequence monotonically increases -> easy 129 - err = store.Tree.StoreOperation(tree, newEntry, mo.None[int]()) 143 + err = store.Tree.StoreOperation(tree, newEntry, nullifiedEntriesStartingIndex) 130 144 return stacktrace.Propagate(err, "failed to commit operation") 131 145 } 132 146 133 - // if we get here then we're dealing with a DID that has "complicated" history 134 - // to avoid dealing with nullification (which is made complicated here since we don't know which nullified ops are part of the "canonical audit log" 135 - // and which are caused by people purposefully submitting forking ops to the chain vs the authoritative source) 136 - // fetch audit log for DID and replace the entire history with the one from the authoritative source 147 + if indexOfPrev < 0 { 148 + // there are entries in the audit log but none of them has a CID matching prev 149 + // if this isn't a creation op, then this shouldn't happen 150 + // (even when history forks between us and the authoritative source, at least the initial op should be the same, otherwise the DIDs wouldn't match) 151 + // if this is a creation op, then this case should have been caught above 152 + return stacktrace.NewError("invalid internal state reached") 153 + } 137 154 138 - auditLog, err := authoritativeAuditLogFetcher() 139 - if err != nil { 140 - return stacktrace.Propagate(err, "") 155 + if indexOfPrev+1 <= mostRecentOpIndex { 156 + nullifiedEntriesStartingIndex = mo.Some(indexOfPrev + 1) 141 157 } 142 158 143 - err = store.Tree.ReplaceHistory(tree, auditLog) 144 - return stacktrace.Propagate(err, "") 159 + newEntry.Nullified = false 160 + err = store.Tree.StoreOperation(tree, newEntry, nullifiedEntriesStartingIndex) 161 + return stacktrace.Propagate(err, "failed to commit operation") 145 162 } 146 163 147 164 func (plc *plcImpl) Resolve(ctx context.Context, atHeight TreeVersion, did string) (didplc.Doc, error) { ··· 162 179 return didplc.Doc{}, stacktrace.Propagate(ErrDIDNotFound, "") 163 180 } 164 181 165 - // find most recent operation that isn't nullified (during authoritative import, the latest operation might be nullified) 166 - for i := len(l) - 1; i >= 0; i-- { 167 - opEnum := l[i].Operation 168 - if !l[i].Nullified { 169 - if opEnum.Tombstone != nil { 170 - return didplc.Doc{}, stacktrace.Propagate(ErrDIDGone, "") 171 - } 172 - return opEnum.AsOperation().Doc(did) 173 - } 182 + opEnum := l[len(l)-1].Operation 183 + if opEnum.Tombstone != nil { 184 + return didplc.Doc{}, stacktrace.Propagate(ErrDIDGone, "") 174 185 } 175 - // in the worst case all operations are somehow nullified and the loop ends with opEnum holding a nullified operation 176 - // that _shouldn't_ be possible (right?) but if it does happen, let's just behave as if the DID was tombstoned 177 - return didplc.Doc{}, stacktrace.Propagate(ErrDIDGone, "") 186 + return opEnum.AsOperation().Doc(did) 178 187 } 179 188 180 189 func (plc *plcImpl) OperationLog(ctx context.Context, atHeight TreeVersion, did string) ([]didplc.OpEnum, error) { ··· 229 238 return nil, stacktrace.Propagate(ErrDIDNotFound, "") 230 239 } 231 240 232 - // if the latest operations are nullified (happens while authoritative import is in progress), just pretend we don't have them yet, 233 - // since a properly functioning PLC implementation could never have the latest operation for a DID be nullified 234 - dropAfterIdx := len(l) - 1 235 - for ; dropAfterIdx >= 0; dropAfterIdx-- { 236 - if !l[dropAfterIdx].Nullified { 237 - break 238 - } 239 - } 240 - l = l[0 : dropAfterIdx+1] 241 - 242 241 return lo.Map(l, func(logEntry types.SequencedLogEntry, _ int) didplc.LogEntry { 243 242 return logEntry.ToDIDPLCLogEntry() 244 243 }), nil 245 244 } 246 245 247 246 func (plc *plcImpl) LastOperation(ctx context.Context, atHeight TreeVersion, did string) (didplc.OpEnum, error) { 248 - // GetLastOp - /:did/log/last - latest op from audit log which isn't nullified 247 + // GetLastOp - /:did/log/last - latest op from audit log which isn't nullified (the latest op is guaranteed not to be nullified) 249 248 // if missing -> returns ErrDIDNotFound 250 249 // if tombstone -> returns tombstone op 251 250 plc.mu.Lock() ··· 265 264 return didplc.OpEnum{}, stacktrace.Propagate(ErrDIDNotFound, "") 266 265 } 267 266 268 - // find most recent operation that isn't nullified (during authoritative import, the latest operation might be nullified) 269 - for i := len(l) - 1; i >= 0; i-- { 270 - opEnum := l[i].Operation 271 - if !l[i].Nullified { 272 - return opEnum, nil 273 - } 274 - } 275 - // in the worst case all operations are somehow nullified and the loop ends with opEnum holding a nullified operation 276 - // that _shouldn't_ be possible (right?) but if it does happen, let's just behave as if the DID did not exist 277 - return didplc.OpEnum{}, stacktrace.Propagate(ErrDIDNotFound, "") 267 + return l[len(l)-1].Operation, nil 278 268 } 279 269 280 270 func (plc *plcImpl) Data(ctx context.Context, atHeight TreeVersion, did string) (didplc.RegularOp, error) { ··· 298 288 return didplc.RegularOp{}, stacktrace.Propagate(ErrDIDNotFound, "") 299 289 } 300 290 301 - // find most recent operation that isn't nullified (during authoritative import, the latest operation might be nullified) 302 - for i := len(l) - 1; i >= 0; i-- { 303 - opEnum := l[i].Operation 304 - if !l[i].Nullified { 305 - if opEnum.Tombstone != nil { 306 - return didplc.RegularOp{}, stacktrace.Propagate(ErrDIDGone, "") 307 - } 308 - if opEnum.Regular != nil { 309 - return *opEnum.Regular, nil 310 - } 311 - return *modernizeOp(opEnum.Legacy), nil 312 - } 291 + opEnum := l[len(l)-1].Operation 292 + if opEnum.Tombstone != nil { 293 + return didplc.RegularOp{}, stacktrace.Propagate(ErrDIDGone, "") 294 + } 295 + if opEnum.Regular != nil { 296 + return *opEnum.Regular, nil 313 297 } 314 - // in the worst case all operations are somehow nullified and the loop ends with opEnum holding a nullified operation 315 - // that _shouldn't_ be possible (right?) but if it does happen, let's just behave as if the DID was tombstoned 316 - return didplc.RegularOp{}, stacktrace.Propagate(ErrDIDGone, "") 298 + return *modernizeOp(opEnum.Legacy), nil 317 299 318 300 } 319 301
+1 -1
plc/plc.go
··· 66 66 67 67 type WritePLC interface { 68 68 ExecuteOperation(ctx context.Context, timestamp time.Time, did string, opBytes []byte) error 69 - ImportOperationFromAuthoritativeSource(ctx context.Context, entry didplc.LogEntry, authoritativeAuditLogFetcher func() ([]didplc.LogEntry, error)) error 69 + ImportOperationFromAuthoritativeSource(ctx context.Context, entry didplc.LogEntry) error 70 70 }
+15 -47
plc/plc_test.go
··· 12 12 13 13 "github.com/bluesky-social/indigo/atproto/syntax" 14 14 "github.com/did-method-plc/go-didplc" 15 - "github.com/palantir/stacktrace" 16 15 "github.com/samber/lo" 17 16 "github.com/stretchr/testify/require" 18 17 "tangled.org/gbl08ma/didplcbft/plc" ··· 398 397 } 399 398 400 399 func TestImportOperationFromAuthoritativeSource(t *testing.T) { 401 - var client didplc.Client 402 - 403 400 ctx := t.Context() 404 401 405 402 treeProvider := NewTestTreeProvider() ··· 411 408 require.NoError(t, err) 412 409 413 410 seenCIDs := map[string]struct{}{} 411 + seenDIDs := map[string]struct{}{} 414 412 for entry := range iterateOverExport(ctx, 0) { 415 - err := testPLC.ImportOperationFromAuthoritativeSource(ctx, entry, func() ([]didplc.LogEntry, error) { 416 - e, err := client.AuditLog(ctx, entry.DID) 417 - return e, stacktrace.Propagate(err, "") 418 - }) 413 + err := testPLC.ImportOperationFromAuthoritativeSource(ctx, entry) 419 414 require.NoError(t, err) 420 415 421 416 seenCIDs[entry.CID] = struct{}{} 417 + seenDIDs[entry.DID] = struct{}{} 422 418 if len(seenCIDs) == 10000 { 423 419 break 424 420 } ··· 436 432 delete(seenCIDs, exportedEntry.CID.String()) 437 433 } 438 434 require.Empty(t, seenCIDs) 435 + 436 + for did := range seenDIDs { 437 + auditLog, err := testPLC.AuditLog(ctx, plc.CommittedTreeVersion, did) 438 + require.NoError(t, err) 439 + 440 + err = didplc.VerifyOpLog(auditLog) 441 + require.NoError(t, err) 442 + } 439 443 } 440 444 441 445 func TestImportOperationWithNullification(t *testing.T) { 442 - var client didplc.Client 443 - 444 446 ctx := t.Context() 445 447 446 448 testFn := func(toImport []didplc.LogEntry, mutate func(didplc.LogEntry) didplc.LogEntry) ([]types.SequencedLogEntry, []didplc.LogEntry) { ··· 454 456 455 457 for _, entry := range toImport { 456 458 entry = mutate(entry) 457 - err := testPLC.ImportOperationFromAuthoritativeSource(ctx, entry, func() ([]didplc.LogEntry, error) { 458 - e, err := client.AuditLog(ctx, entry.DID) 459 - return e, stacktrace.Propagate(err, "") 460 - }) 459 + err := testPLC.ImportOperationFromAuthoritativeSource(ctx, entry) 461 460 require.NoError(t, err) 462 461 } 463 462 ··· 475 474 return exportedEntries, auditLog 476 475 } 477 476 477 + var client didplc.Client 478 478 toImport, err := client.AuditLog(ctx, "did:plc:pkmfz5soq2swsvbhvjekb36g") 479 479 require.NoError(t, err) 480 480 ··· 491 491 require.Equal(t, toImport[i].Nullified, auditLog[i].Nullified) 492 492 } 493 493 494 - // ensure auditLog never returns nullified entries as the last entries 495 - exportedEntries, auditLog = testFn(toImport[0:5], func(le didplc.LogEntry) didplc.LogEntry { return le }) 496 - 497 - require.Len(t, exportedEntries, 5) 498 - require.Len(t, auditLog, 1) 499 - require.False(t, auditLog[0].Nullified) 500 - require.Equal(t, auditLog[0].CID, "bafyreid2tbopmtuguvuvij5kjcqo7rv7yvqza37uvfcvk5zdxyo57xlfdi") 501 - 502 494 // now pretend that at the time of import, no operations were nullified 503 495 exportedEntries, auditLog = testFn(toImport, func(le didplc.LogEntry) didplc.LogEntry { 504 496 le.Nullified = false ··· 507 499 require.Len(t, auditLog, len(toImport)) 508 500 509 501 for i, entry := range exportedEntries { 510 - if i < 1 { 511 - require.Equal(t, uint64(i+1), entry.Seq) 512 - } else { 513 - require.Equal(t, uint64(i+5), entry.Seq) 514 - } 515 - require.Equal(t, toImport[i].CID, entry.CID.String()) 516 - require.Equal(t, toImport[i].CID, auditLog[i].CID) 517 - require.Equal(t, toImport[i].CreatedAt, entry.CreatedAt.Format(types.ActualAtprotoDatetimeLayout)) 518 - require.Equal(t, toImport[i].CreatedAt, auditLog[i].CreatedAt) 519 - require.Equal(t, toImport[i].Nullified, entry.Nullified) 520 - require.Equal(t, toImport[i].Nullified, auditLog[i].Nullified) 521 - } 522 - 523 - // now manipulate the timestamp on the first operation just to see the first operation get rewritten 524 - exportedEntries, auditLog = testFn(toImport, func(le didplc.LogEntry) didplc.LogEntry { 525 - if le.CID == "bafyreid2tbopmtuguvuvij5kjcqo7rv7yvqza37uvfcvk5zdxyo57xlfdi" { 526 - // this should cause mustFullyReplaceHistory to become true 527 - le.CreatedAt = syntax.DatetimeNow().String() 528 - } 529 - return le 530 - }) 531 - require.Len(t, auditLog, len(toImport)) 532 - 533 - for i, entry := range exportedEntries { 534 - require.Equal(t, uint64(i+2), entry.Seq) 502 + require.Equal(t, uint64(i+1), entry.Seq) 535 503 require.Equal(t, toImport[i].CID, entry.CID.String()) 536 504 require.Equal(t, toImport[i].CID, auditLog[i].CID) 537 505 require.Equal(t, toImport[i].CreatedAt, entry.CreatedAt.Format(types.ActualAtprotoDatetimeLayout)) ··· 554 522 return // Failed to create request 555 523 } 556 524 557 - req.Header.Set("User-Agent", "go-did-method-plc") 525 + req.Header.Set("User-Agent", "didplcbft") 558 526 559 527 q := req.URL.Query() 560 528 q.Add("count", fmt.Sprint(batchSize))
+11 -95
store/tree.go
··· 28 28 AuditLogReverseIterator(tree ReadOnlyTree, did string, err *error) iter.Seq2[int, types.SequencedLogEntry] 29 29 ExportOperations(tree ReadOnlyTree, after uint64, count int) ([]types.SequencedLogEntry, error) // passing a count of zero means unlimited 30 30 StoreOperation(tree *iavl.MutableTree, entry didplc.LogEntry, nullifyWithIndexEqualOrGreaterThan mo.Option[int]) error 31 - ReplaceHistory(tree *iavl.MutableTree, history []didplc.LogEntry) error 31 + SetOperationCreatedAt(tree *iavl.MutableTree, seqID uint64, createdAt time.Time) error 32 32 33 33 AuthoritativePLC(tree ReadOnlyTree) (string, error) 34 34 SetAuthoritativePLC(tree *iavl.MutableTree, url string) error ··· 249 249 return nil 250 250 } 251 251 252 - func (t *TreeStore) ReplaceHistory(tree *iavl.MutableTree, remoteHistory []didplc.LogEntry) error { 253 - if len(remoteHistory) == 0 { 254 - // for now this isn't needed, if it's needed in the future we'll have to accept a DID as argument on this function 255 - return stacktrace.NewError("can't replace with empty history") 256 - } 257 - 258 - did := remoteHistory[0].DID 259 - 260 - didBytes, err := DIDToBytes(did) 261 - if err != nil { 262 - return stacktrace.Propagate(err, "") 263 - } 264 - 265 - logKey := marshalDIDLogKey(didBytes) 266 - 267 - localHistory, _, err := t.AuditLog(tree, did, false) 268 - if err != nil { 269 - return stacktrace.Propagate(err, "") 270 - } 271 - 272 - // if the first operations are equal to what we already have, keep them untouched to minimize the turmoil 273 - keepLocalBeforeIdx := 0 274 - for i, localEntry := range localHistory { 275 - if i >= len(remoteHistory) { 276 - break 277 - } 278 - remoteEntry := remoteHistory[i] 279 - 280 - // stop looping once we find a difference 281 - // we trust that the authoritative source computes CIDs properly (i.e. that two operations having the same CID are indeed equal) 282 - if localEntry.Nullified != remoteEntry.Nullified || localEntry.CID.String() != remoteEntry.CID { 283 - break 284 - } 285 - 286 - remoteDatetime, err := syntax.ParseDatetime(remoteEntry.CreatedAt) 287 - if err != nil { 288 - return stacktrace.Propagate(err, "invalid CreatedAt") 289 - } 290 - 291 - if !localEntry.CreatedAt.Equal(remoteDatetime.Time()) { 292 - break 293 - } 294 - 295 - keepLocalBeforeIdx++ 296 - } 297 - 298 - // all replaced/added operations get new sequence IDs. 299 - // Get the highest sequence ID before removing any keys to ensure the sequence IDs actually change 300 - seq, err := getNextSeqID(tree) 301 - if err != nil { 302 - return stacktrace.Propagate(err, "") 303 - } 252 + func (t *TreeStore) SetOperationCreatedAt(tree *iavl.MutableTree, seqID uint64, createdAt time.Time) error { 253 + opKey := marshalOperationKey(seqID) 304 254 305 - // remove existing conflicting operations for this DID (if any) 306 - logOperations, err := tree.Get(logKey) 255 + opValue, err := tree.Get(opKey) 307 256 if err != nil { 308 257 return stacktrace.Propagate(err, "") 309 258 } 310 - logOperationsToDelete := logOperations[8*keepLocalBeforeIdx:] 311 - for seqBytes := range slices.Chunk(logOperationsToDelete, 8) { 312 - key := sequenceBytesToOperationKey(seqBytes) 313 - 314 - _, _, err = tree.Remove(key) 315 - if err != nil { 316 - return stacktrace.Propagate(err, "") 317 - } 259 + if len(opValue) == 0 { 260 + return stacktrace.NewError("operation %d not found", seqID) 318 261 } 319 262 320 - // add just the operations past the point they weren't kept 321 - remoteHistory = remoteHistory[keepLocalBeforeIdx:] 322 - 323 - // keep the operations log up until the point we've kept the history 324 - // clone just to make sure we avoid issues since we got this slice from the tree, it is not meant to be modified 325 - logOperations = slices.Clone(logOperations[0 : 8*keepLocalBeforeIdx]) 326 - 327 - for _, entry := range remoteHistory { 328 - opDatetime, err := syntax.ParseDatetime(entry.CreatedAt) 329 - if err != nil { 330 - return stacktrace.Propagate(err, "invalid CreatedAt") 331 - } 332 - 333 - operation := entry.Operation.AsOperation() 334 - opKey := marshalOperationKey(seq) 335 - seq++ 336 - opValue := marshalOperationValue(entry.Nullified, didBytes, opDatetime.Time(), operation) 337 - 338 - _, err = tree.Set(opKey, opValue) 339 - if err != nil { 340 - return stacktrace.Propagate(err, "") 341 - } 342 - 343 - // add to log for DID 344 - logOperations = append(logOperations, opKey[1:9]...) 345 - } 263 + opValue = slices.Clone(opValue) 346 264 347 - // save updated log for DID 348 - _, err = tree.Set(logKey, logOperations) 349 - if err != nil { 350 - return stacktrace.Propagate(err, "") 351 - } 265 + ts := uint64(createdAt.Truncate(1 * time.Millisecond).UTC().UnixNano()) 266 + binary.BigEndian.PutUint64(opValue[16:24], ts) 352 267 353 - return nil 268 + _, err = tree.Set(opKey, opValue) 269 + return stacktrace.Propagate(err, "") 354 270 } 355 271 356 272 var minOperationKey = marshalOperationKey(0)