A very experimental PLC implementation which uses BFT consensus for decentralization

Begin work on authoritative import

gbl08ma.com 25f3bc2d f80970e7

verified
+452 -193
+1 -1
abciapp/app.go
··· 143 var _ plc.TreeProvider = (*DIDPLCApplication)(nil) 144 145 // ImmutableTree implements [plc.TreeProvider]. 146 - func (d *DIDPLCApplication) ImmutableTree(version plc.TreeVersion) (store.PossiblyMutableTree, error) { 147 if version.IsMutable() { 148 return store.AdaptMutableTree(d.tree), nil 149 }
··· 143 var _ plc.TreeProvider = (*DIDPLCApplication)(nil) 144 145 // ImmutableTree implements [plc.TreeProvider]. 146 + func (d *DIDPLCApplication) ImmutableTree(version plc.TreeVersion) (store.ReadOnlyTree, error) { 147 if version.IsMutable() { 148 return store.AdaptMutableTree(d.tree), nil 149 }
-12
abciapp/execution.go
··· 80 if err != nil { 81 return nil, stacktrace.Propagate(err, "") 82 } 83 - for _, c := range result.TreeChanges { 84 - _, err := d.tree.Set(c.Key, c.Value) 85 - if err != nil { 86 - return nil, stacktrace.Propagate(err, "") 87 - } 88 - } 89 // when preparing a proposal, invalid transactions should have been discarded 90 // so, if something doesn't succeed now, something has gone wrong and we should not vote in agreement of the proposal 91 if result.Code != 0 { ··· 141 result, err := processTx(ctx, d.plc, tx, req.Time, true) 142 if err != nil { 143 return nil, stacktrace.Propagate(err, "") 144 - } 145 - for _, c := range result.TreeChanges { 146 - _, err := d.tree.Set(c.Key, c.Value) 147 - if err != nil { 148 - return nil, stacktrace.Propagate(err, "") 149 - } 150 } 151 txResults[i] = &abcitypes.ExecTxResult{ 152 Code: result.Code,
··· 80 if err != nil { 81 return nil, stacktrace.Propagate(err, "") 82 } 83 // when preparing a proposal, invalid transactions should have been discarded 84 // so, if something doesn't succeed now, something has gone wrong and we should not vote in agreement of the proposal 85 if result.Code != 0 { ··· 135 result, err := processTx(ctx, d.plc, tx, req.Time, true) 136 if err != nil { 137 return nil, stacktrace.Propagate(err, "") 138 } 139 txResults[i] = &abcitypes.ExecTxResult{ 140 Code: result.Code,
+19 -19
abciapp/tx.go
··· 18 type TransactionAction string 19 20 var ( 21 - knownActions = map[TransactionAction]struct{}{} 22 - TransactionActionCreatePlcOp = registerTransactionAction[CreatePlcOpArguments]("CreatePlcOp") 23 ) 24 25 func registerTransactionAction[ArgType ArgumentType](action string) TransactionAction { ··· 77 return bytes.Equal(txBytes, s) 78 } 79 80 - type treeChange struct { 81 - Key []byte 82 - Value []byte 83 - } 84 type processResult struct { 85 - TreeChanges []treeChange 86 - Code uint32 87 - Data []byte 88 - Log string 89 - Info string 90 - GasWanted int64 91 - GasUsed int64 92 - Events []abcitypes.Event 93 - Codespace string 94 } 95 96 func processTx(ctx context.Context, p plc.PLC, txBytes []byte, atTime time.Time, execute bool) (*processResult, error) { ··· 123 }, nil 124 } 125 126 switch TransactionAction(action) { 127 case TransactionActionCreatePlcOp: 128 - result, err := processCreatePlcOpTx(ctx, p, txBytes, atTime, execute) 129 - return result, stacktrace.Propagate(err, "") 130 default: 131 - return &processResult{ 132 Code: 4001, 133 Info: "Unknown transaction action", 134 - }, nil 135 } 136 }
··· 18 type TransactionAction string 19 20 var ( 21 + knownActions = map[TransactionAction]struct{}{} 22 + TransactionActionCreatePlcOp = registerTransactionAction[CreatePlcOpArguments]("CreatePlcOp") 23 + TransactionActionAuthoritativeImport = registerTransactionAction[AuthoritativeImportArguments]("AuthoritativeImport") 24 ) 25 26 func registerTransactionAction[ArgType ArgumentType](action string) TransactionAction { ··· 78 return bytes.Equal(txBytes, s) 79 } 80 81 type processResult struct { 82 + Code uint32 83 + Data []byte 84 + Log string 85 + Info string 86 + GasWanted int64 87 + GasUsed int64 88 + Events []abcitypes.Event 89 + Codespace string 90 } 91 92 func processTx(ctx context.Context, p plc.PLC, txBytes []byte, atTime time.Time, execute bool) (*processResult, error) { ··· 119 }, nil 120 } 121 122 + var result *processResult 123 switch TransactionAction(action) { 124 case TransactionActionCreatePlcOp: 125 + result, err = processCreatePlcOpTx(ctx, p, txBytes, atTime, execute) 126 + case TransactionActionAuthoritativeImport: 127 + result, err = processAuthoritativeImportTx(ctx, p, txBytes, atTime, execute) 128 default: 129 + result = &processResult{ 130 Code: 4001, 131 Info: "Unknown transaction action", 132 + } 133 + 134 } 135 + return result, stacktrace.Propagate(err, "") 136 }
+1 -7
abciapp/tx_create_plc_op.go
··· 6 "time" 7 8 "github.com/did-method-plc/go-didplc" 9 - "github.com/ipfs/go-cid" 10 cbornode "github.com/ipfs/go-ipld-cbor" 11 "github.com/palantir/stacktrace" 12 "tangled.org/gbl08ma/didplcbft/plc" ··· 52 return nil, stacktrace.Propagate(err, "internal error") 53 } 54 55 - var cid cid.Cid 56 if execute { 57 - cid, err = p.ExecuteOperation(ctx, atTime, tx.Arguments.DID, opBytes) 58 } else { 59 err = p.ValidateOperation(ctx, plc.CommittedTreeVersion, atTime, tx.Arguments.DID, opBytes) 60 } ··· 69 } 70 71 return &processResult{ 72 - TreeChanges: []treeChange{{ 73 - Key: []byte(tx.Arguments.DID), 74 - Value: cid.Bytes(), 75 - }}, 76 Code: 0, 77 }, nil 78 }
··· 6 "time" 7 8 "github.com/did-method-plc/go-didplc" 9 cbornode "github.com/ipfs/go-ipld-cbor" 10 "github.com/palantir/stacktrace" 11 "tangled.org/gbl08ma/didplcbft/plc" ··· 51 return nil, stacktrace.Propagate(err, "internal error") 52 } 53 54 if execute { 55 + err = p.ExecuteOperation(ctx, atTime, tx.Arguments.DID, opBytes) 56 } else { 57 err = p.ValidateOperation(ctx, plc.CommittedTreeVersion, atTime, tx.Arguments.DID, opBytes) 58 } ··· 67 } 68 69 return &processResult{ 70 Code: 0, 71 }, nil 72 }
+29
abciapp/tx_import.go
···
··· 1 + package abciapp 2 + 3 + import ( 4 + "context" 5 + "time" 6 + 7 + cbornode "github.com/ipfs/go-ipld-cbor" 8 + "github.com/palantir/stacktrace" 9 + "tangled.org/gbl08ma/didplcbft/plc" 10 + ) 11 + 12 + type AuthoritativeImportArguments struct { 13 + Cursor string `json:"cursor" refmt:"cursor"` 14 + Hash string `json:"hash" refmt:"hash"` 15 + } 16 + 17 + func (AuthoritativeImportArguments) ForAction() TransactionAction { 18 + return TransactionActionAuthoritativeImport 19 + } 20 + 21 + func init() { 22 + cbornode.RegisterCborType(AuthoritativeImportArguments{}) 23 + cbornode.RegisterCborType(Transaction[AuthoritativeImportArguments]{}) 24 + } 25 + 26 + func processAuthoritativeImportTx(ctx context.Context, p plc.PLC, txBytes []byte, atTime time.Time, execute bool) (*processResult, error) { 27 + // TODO 28 + return nil, stacktrace.NewError("not implemented") 29 + }
+79 -10
plc/impl.go
··· 9 "github.com/bluesky-social/indigo/atproto/syntax" 10 "github.com/cosmos/iavl" 11 "github.com/did-method-plc/go-didplc" 12 - "github.com/ipfs/go-cid" 13 "github.com/palantir/stacktrace" 14 "github.com/samber/lo" 15 "tangled.org/gbl08ma/didplcbft/store" 16 ) 17 18 type TreeProvider interface { 19 MutableTree() (*iavl.MutableTree, error) 20 - ImmutableTree(version TreeVersion) (store.PossiblyMutableTree, error) 21 } 22 23 type plcImpl struct { ··· 43 plc.mu.Lock() 44 defer plc.mu.Unlock() 45 46 - timestamp := syntax.Datetime(at.Format(syntax.AtprotoDatetimeLayout)) 47 48 // TODO set true to false only while importing old ops 49 _, err := plc.validator.Validate(atHeight, timestamp, did, opBytes, true) ··· 54 return nil 55 } 56 57 - func (plc *plcImpl) ExecuteOperation(ctx context.Context, t time.Time, did string, opBytes []byte) (cid.Cid, error) { 58 plc.mu.Lock() 59 defer plc.mu.Unlock() 60 61 - timestamp := syntax.Datetime(t.Format(syntax.AtprotoDatetimeLayout)) 62 63 // TODO set true to false only while importing old ops 64 effects, err := plc.validator.Validate(WorkingTreeVersion, timestamp, did, opBytes, true) 65 if err != nil { 66 - return cid.Undef, stacktrace.Propagate(err, "operation failed validation") 67 } 68 69 tree, err := plc.treeProvider.MutableTree() 70 if err != nil { 71 - return cid.Undef, stacktrace.Propagate(err, "failed to obtain mutable tree") 72 } 73 74 - err = store.Tree.StoreOperation(tree, effects.NewLogEntry, effects.NewOperationIndex, effects.NullifiedEntriesStartingIndex) 75 if err != nil { 76 - return cid.Undef, stacktrace.Propagate(err, "failed to commit operation") 77 } 78 79 - return effects.NewOperationCID, nil 80 } 81 82 func (plc *plcImpl) Resolve(ctx context.Context, atHeight TreeVersion, did string) (didplc.Doc, error) {
··· 9 "github.com/bluesky-social/indigo/atproto/syntax" 10 "github.com/cosmos/iavl" 11 "github.com/did-method-plc/go-didplc" 12 "github.com/palantir/stacktrace" 13 "github.com/samber/lo" 14 + "github.com/samber/mo" 15 "tangled.org/gbl08ma/didplcbft/store" 16 ) 17 18 type TreeProvider interface { 19 MutableTree() (*iavl.MutableTree, error) 20 + ImmutableTree(version TreeVersion) (store.ReadOnlyTree, error) 21 } 22 23 type plcImpl struct { ··· 43 plc.mu.Lock() 44 defer plc.mu.Unlock() 45 46 + timestamp := syntax.Datetime(at.Format(store.ActualAtprotoDatetimeLayout)) 47 48 // TODO set true to false only while importing old ops 49 _, err := plc.validator.Validate(atHeight, timestamp, did, opBytes, true) ··· 54 return nil 55 } 56 57 + func (plc *plcImpl) ExecuteOperation(ctx context.Context, t time.Time, did string, opBytes []byte) error { 58 plc.mu.Lock() 59 defer plc.mu.Unlock() 60 61 + timestamp := syntax.Datetime(t.Format(store.ActualAtprotoDatetimeLayout)) 62 63 // TODO set true to false only while importing old ops 64 effects, err := plc.validator.Validate(WorkingTreeVersion, timestamp, did, opBytes, true) 65 if err != nil { 66 + return stacktrace.Propagate(err, "operation failed validation") 67 } 68 69 tree, err := plc.treeProvider.MutableTree() 70 if err != nil { 71 + return stacktrace.Propagate(err, "failed to obtain mutable tree") 72 + } 73 + 74 + err = store.Tree.StoreOperation(tree, effects.NewLogEntry, effects.NullifiedEntriesStartingIndex) 75 + if err != nil { 76 + return stacktrace.Propagate(err, "failed to commit operation") 77 + } 78 + 79 + return nil 80 + } 81 + 82 + func (plc *plcImpl) ImportOperationFromAuthoritativeSource(ctx context.Context, newEntry didplc.LogEntry, 83 + authoritativeAuditLogFetcher func() ([]didplc.LogEntry, error)) error { 84 + plc.mu.Lock() 85 + defer plc.mu.Unlock() 86 + 87 + tree, err := plc.treeProvider.MutableTree() 88 + if err != nil { 89 + return stacktrace.Propagate(err, "failed to obtain mutable tree") 90 + } 91 + 92 + l, _, err := store.Tree.AuditLog(tree, newEntry.DID, false) 93 + if err != nil { 94 + return stacktrace.Propagate(err, "") 95 + } 96 + 97 + newCID := newEntry.CID 98 + newPrev := newEntry.Operation.AsOperation().PrevCIDStr() 99 + 100 + // TODO avoid redundant CreatedAt formating and parsing by using a specialized LogEntry type internally (i.e. between us and the store) 101 + newCreatedAtDT, err := syntax.ParseDatetime(newEntry.CreatedAt) 102 + if err != nil { 103 + return stacktrace.Propagate(err, "") 104 } 105 + newCreatedAt := newCreatedAtDT.Time() 106 107 + mustFullyReplaceHistory := false 108 + for _, entry := range l { 109 + existingCreatedAt, err := syntax.ParseDatetime(entry.CreatedAt) 110 + if err != nil { 111 + return stacktrace.Propagate(err, "") 112 + } 113 + if existingCreatedAt.Time().After(newCreatedAt) { 114 + // We're trying to import an operation whose timestamp precedes one of the timestamps for operations we already know about 115 + // We'll need to discard all known history and import it anew using the authoritative source data (same as when dealing with sequence forks) 116 + mustFullyReplaceHistory = true 117 + break 118 + } 119 + 120 + if entry.CID == newCID { 121 + // If an operation with the same CID already exists -> easy-ish 122 + 123 + // this operation is already present, there is nothing to do 124 + // TODO re-evaluate whether we want to still update the timestamp on the existing operation, as not doing this will cause the export from our impl to definitely not match the authoritative source 125 + // (Though, the actually damaging cases of incorrect createdAt are already handled by the prior check) 126 + return nil 127 + } 128 + } 129 + 130 + if len(l) == 0 || (!mustFullyReplaceHistory && l[len(l)-1].CID == newPrev) { 131 + // If DID doesn't exist at all -> easy 132 + // If prev matches CID of latest operation, and resulting timestamp sequence monotonically increases -> easy 133 + err = store.Tree.StoreOperation(tree, newEntry, mo.None[int]()) 134 + return stacktrace.Propagate(err, "failed to commit operation") 135 + } 136 + 137 + // if we get here then we're dealing with a DID that has "complicated" history 138 + // to avoid dealing with nullification (which is made complicated here since we don't know which nullified ops are part of the "canonical audit log" 139 + // and which are caused by people purposefully submitting forking ops to the chain vs the authoritative source) 140 + // fetch audit log for DID and replace the entire history with the one from the authoritative source 141 + 142 + auditLog, err := authoritativeAuditLogFetcher() 143 if err != nil { 144 + return stacktrace.Propagate(err, "") 145 } 146 147 + err = store.Tree.ReplaceHistory(tree, auditLog) 148 + return stacktrace.Propagate(err, "") 149 } 150 151 func (plc *plcImpl) Resolve(ctx context.Context, atHeight TreeVersion, did string) (didplc.Doc, error) {
-5
plc/operation_validator.go
··· 9 "github.com/bluesky-social/indigo/atproto/atcrypto" 10 "github.com/bluesky-social/indigo/atproto/syntax" 11 "github.com/did-method-plc/go-didplc" 12 - "github.com/ipfs/go-cid" 13 "github.com/palantir/stacktrace" 14 "github.com/samber/mo" 15 ) ··· 32 type OperationEffects struct { 33 NullifiedEntriesStartingIndex mo.Option[int] 34 NewLogEntry didplc.LogEntry 35 - NewOperationCID cid.Cid // should be equivalent to the CID field inside NewLogEntry, but that's a string and we need the strongly typed Cid sometimes 36 - NewOperationIndex int 37 } 38 39 // Validate returns the new complete AuditLog that the DID history would assume if validation passes, and an error if it doesn't pass ··· 198 return OperationEffects{ 199 NullifiedEntriesStartingIndex: nullifiedEntriesStartingIndex, 200 NewLogEntry: newEntry, 201 - NewOperationCID: newOperationCID, 202 - NewOperationIndex: mostRecentOpIndex + 1, 203 }, nil 204 } 205
··· 9 "github.com/bluesky-social/indigo/atproto/atcrypto" 10 "github.com/bluesky-social/indigo/atproto/syntax" 11 "github.com/did-method-plc/go-didplc" 12 "github.com/palantir/stacktrace" 13 "github.com/samber/mo" 14 ) ··· 31 type OperationEffects struct { 32 NullifiedEntriesStartingIndex mo.Option[int] 33 NewLogEntry didplc.LogEntry 34 } 35 36 // Validate returns the new complete AuditLog that the DID history would assume if validation passes, and an error if it doesn't pass ··· 195 return OperationEffects{ 196 NullifiedEntriesStartingIndex: nullifiedEntriesStartingIndex, 197 NewLogEntry: newEntry, 198 }, nil 199 } 200
+2 -2
plc/plc.go
··· 7 8 "github.com/bluesky-social/indigo/atproto/syntax" 9 "github.com/did-method-plc/go-didplc" 10 - "github.com/ipfs/go-cid" 11 ) 12 13 var ErrDIDNotFound = errors.New("DID not found") ··· 65 } 66 67 type WritePLC interface { 68 - ExecuteOperation(ctx context.Context, timestamp time.Time, did string, opBytes []byte) (cid.Cid, error) 69 }
··· 7 8 "github.com/bluesky-social/indigo/atproto/syntax" 9 "github.com/did-method-plc/go-didplc" 10 ) 11 12 var ErrDIDNotFound = errors.New("DID not found") ··· 64 } 65 66 type WritePLC interface { 67 + ExecuteOperation(ctx context.Context, timestamp time.Time, did string, opBytes []byte) error 68 + ImportOperationFromAuthoritativeSource(ctx context.Context, entry didplc.LogEntry, authoritativeAuditLogFetcher func() ([]didplc.LogEntry, error)) error 69 }
+127 -3
plc/plc_test.go
··· 1 package plc_test 2 3 import ( 4 "encoding/json" 5 "testing" 6 "time" 7 8 "github.com/bluesky-social/indigo/atproto/syntax" 9 "github.com/did-method-plc/go-didplc" 10 "github.com/samber/lo" 11 "github.com/stretchr/testify/require" 12 "tangled.org/gbl08ma/didplcbft/plc" ··· 148 require.ErrorIs(t, err, plc.ErrDIDNotFound) 149 150 for _, c := range operations { 151 - _, err := testPLC.ExecuteOperation(ctx, c.ApplyAt.Time(), c.DID, []byte(c.Operation)) 152 if c.ExpectFailure { 153 require.Error(t, err) 154 } else { ··· 241 err = testPLC.ValidateOperation(ctx, plc.WorkingTreeVersion, at, logEntry.DID, b) 242 require.NoError(t, err) 243 244 - _, err = testPLC.ExecuteOperation(ctx, at, logEntry.DID, b) 245 require.NoError(t, err) 246 247 - _, err = testPLC.ExecuteOperation(ctx, at, logEntry.DID, b) 248 // committing the same operation twice should never work, 249 // as though even in non-genesis ops the referenced prev will exist, 250 // (and thus could seem like a recovery operation at first glance) ··· 391 require.NoError(t, err) 392 }) 393 }
··· 1 package plc_test 2 3 import ( 4 + "bufio" 5 + "context" 6 "encoding/json" 7 + "fmt" 8 + "iter" 9 + "net/http" 10 "testing" 11 "time" 12 13 "github.com/bluesky-social/indigo/atproto/syntax" 14 "github.com/did-method-plc/go-didplc" 15 + "github.com/palantir/stacktrace" 16 "github.com/samber/lo" 17 "github.com/stretchr/testify/require" 18 "tangled.org/gbl08ma/didplcbft/plc" ··· 154 require.ErrorIs(t, err, plc.ErrDIDNotFound) 155 156 for _, c := range operations { 157 + err := testPLC.ExecuteOperation(ctx, c.ApplyAt.Time(), c.DID, []byte(c.Operation)) 158 if c.ExpectFailure { 159 require.Error(t, err) 160 } else { ··· 247 err = testPLC.ValidateOperation(ctx, plc.WorkingTreeVersion, at, logEntry.DID, b) 248 require.NoError(t, err) 249 250 + err = testPLC.ExecuteOperation(ctx, at, logEntry.DID, b) 251 require.NoError(t, err) 252 253 + err = testPLC.ExecuteOperation(ctx, at, logEntry.DID, b) 254 // committing the same operation twice should never work, 255 // as though even in non-genesis ops the referenced prev will exist, 256 // (and thus could seem like a recovery operation at first glance) ··· 397 require.NoError(t, err) 398 }) 399 } 400 + 401 + func TestImportOperationFromAuthoritativeSource(t *testing.T) { 402 + var client didplc.Client 403 + 404 + ctx := t.Context() 405 + 406 + treeProvider := NewTestTreeProvider() 407 + testPLC := plc.NewPLC(treeProvider) 408 + 409 + tree, err := treeProvider.MutableTree() 410 + require.NoError(t, err) 411 + _, _, err = tree.SaveVersion() 412 + require.NoError(t, err) 413 + 414 + seenCIDs := map[string]struct{}{} 415 + for entry := range iterateOverExport(ctx, "") { 416 + err := testPLC.ImportOperationFromAuthoritativeSource(ctx, entry, func() ([]didplc.LogEntry, error) { 417 + e, err := client.AuditLog(ctx, entry.DID) 418 + return e, stacktrace.Propagate(err, "") 419 + }) 420 + require.NoError(t, err) 421 + 422 + seenCIDs[entry.CID] = struct{}{} 423 + if len(seenCIDs) == 4000 { 424 + break 425 + } 426 + } 427 + 428 + _, _, err = tree.SaveVersion() 429 + require.NoError(t, err) 430 + 431 + exportedEntries, err := testPLC.Export(ctx, plc.CommittedTreeVersion, time.Time{}, len(seenCIDs)+1) 432 + require.NoError(t, err) 433 + 434 + require.Len(t, exportedEntries, len(seenCIDs)) 435 + 436 + for _, exportedEntry := range exportedEntries { 437 + delete(seenCIDs, exportedEntry.CID) 438 + } 439 + require.Empty(t, seenCIDs) 440 + } 441 + 442 + func iterateOverExport(ctx context.Context, startAt string) iter.Seq[didplc.LogEntry] { 443 + return func(yield func(didplc.LogEntry) bool) { 444 + const batchSize = 1000 445 + baseURL := didplc.DefaultDirectoryURL + "/export" 446 + client := &http.Client{Timeout: 30 * time.Second} 447 + 448 + // The /export seems to sometimes return outright duplicated entries :weary: 449 + seenCIDs := map[string]struct{}{} 450 + 451 + after := startAt 452 + for { 453 + req, err := http.NewRequestWithContext(ctx, "GET", baseURL, nil) 454 + if err != nil { 455 + return // Failed to create request 456 + } 457 + 458 + req.Header.Set("User-Agent", "go-did-method-plc") 459 + 460 + q := req.URL.Query() 461 + q.Add("count", fmt.Sprint(batchSize)) 462 + if after != "" { 463 + q.Add("after", after) 464 + } 465 + req.URL.RawQuery = q.Encode() 466 + 467 + resp, err := client.Do(req) 468 + if err != nil { 469 + return // Failed to make request 470 + } 471 + defer resp.Body.Close() 472 + 473 + if resp.StatusCode != http.StatusOK { 474 + return // Non-200 status code 475 + } 476 + 477 + entries := make([]didplc.LogEntry, 0, batchSize) 478 + 479 + // Read response body 480 + s := bufio.NewScanner(resp.Body) 481 + receivedEntries := 0 482 + for s.Scan() { 483 + var entry didplc.LogEntry 484 + if err := json.Unmarshal(s.Bytes(), &entry); err != nil { 485 + return // Failed to decode JSON 486 + } 487 + if _, present := seenCIDs[entry.CID]; !present { 488 + entries = append(entries, entry) 489 + seenCIDs[entry.CID] = struct{}{} 490 + } 491 + receivedEntries++ 492 + } 493 + if s.Err() != nil { 494 + return // handle scan error 495 + } 496 + 497 + if len(entries) == 0 { 498 + return 499 + } 500 + 501 + // Process each entry 502 + var lastCreatedAt string 503 + for _, entry := range entries { 504 + lastCreatedAt = entry.CreatedAt 505 + if !yield(entry) { 506 + return 507 + } 508 + } 509 + 510 + if receivedEntries < batchSize { 511 + return 512 + } 513 + 514 + after = lastCreatedAt 515 + } 516 + } 517 + }
+1 -1
plc/testutil_test.go
··· 18 } 19 } 20 21 - func (t *testTreeProvider) ImmutableTree(version plc.TreeVersion) (store.PossiblyMutableTree, error) { 22 if version.IsMutable() { 23 return store.AdaptMutableTree(t.tree), nil 24 }
··· 18 } 19 } 20 21 + func (t *testTreeProvider) ImmutableTree(version plc.TreeVersion) (store.ReadOnlyTree, error) { 22 if version.IsMutable() { 23 return store.AdaptMutableTree(t.tree), nil 24 }
+93
store/iavl_adapter.go
···
··· 1 + package store 2 + 3 + import ( 4 + "github.com/cosmos/iavl" 5 + ics23 "github.com/cosmos/ics23/go" 6 + "github.com/palantir/stacktrace" 7 + ) 8 + 9 + type ReadOnlyTree interface { 10 + Has(key []byte) (bool, error) 11 + Get(key []byte) ([]byte, error) 12 + GetProof(key []byte) (*ics23.CommitmentProof, error) // won't actually work on mutable trees, but we don't need it to 13 + IterateRange(start, end []byte, ascending bool, fn func(key []byte, value []byte) bool) (stopped bool) 14 + } 15 + 16 + type mutableToUnifiedTree struct { 17 + tree *iavl.MutableTree 18 + } 19 + 20 + var _ ReadOnlyTree = (*mutableToUnifiedTree)(nil) 21 + 22 + func AdaptMutableTree(tree *iavl.MutableTree) ReadOnlyTree { 23 + return &mutableToUnifiedTree{ 24 + tree: tree, 25 + } 26 + } 27 + 28 + // Has implements [ReadOnlyTree]. 29 + func (m *mutableToUnifiedTree) Has(key []byte) (bool, error) { 30 + return m.tree.Has(key) 31 + } 32 + 33 + // Get implements [ReadOnlyTree]. 34 + func (m *mutableToUnifiedTree) Get(key []byte) ([]byte, error) { 35 + return m.tree.Get(key) 36 + } 37 + 38 + // GetProof implements [ReadOnlyTree]. 39 + func (m *mutableToUnifiedTree) GetProof(key []byte) (*ics23.CommitmentProof, error) { 40 + return nil, stacktrace.NewError("proof calculation not possible over mutable tree") 41 + } 42 + 43 + // IterateRange implements [ReadOnlyTree]. 44 + func (m *mutableToUnifiedTree) IterateRange(start []byte, end []byte, ascending bool, fn func(key []byte, value []byte) bool) (stopped bool) { 45 + // it might look like MutableTree implements IterateRange but it doesn't, 46 + // most iteration methods actually come from the embedded ImmutableTree we're not meant to use 47 + // (terrible API) 48 + itr, err := m.tree.Iterator(start, end, ascending) 49 + if err != nil { 50 + return false 51 + } 52 + 53 + defer itr.Close() 54 + 55 + for ; itr.Valid(); itr.Next() { 56 + if fn(itr.Key(), itr.Value()) { 57 + return true 58 + } 59 + } 60 + return false 61 + } 62 + 63 + type immutableToUnifiedTree struct { 64 + tree *iavl.ImmutableTree 65 + } 66 + 67 + var _ ReadOnlyTree = (*immutableToUnifiedTree)(nil) 68 + 69 + func AdaptImmutableTree(tree *iavl.ImmutableTree) ReadOnlyTree { 70 + return &immutableToUnifiedTree{ 71 + tree: tree, 72 + } 73 + } 74 + 75 + // Has implements [ReadOnlyTree]. 76 + func (i *immutableToUnifiedTree) Has(key []byte) (bool, error) { 77 + return i.tree.Has(key) 78 + } 79 + 80 + // Get implements [ReadOnlyTree]. 81 + func (i *immutableToUnifiedTree) Get(key []byte) ([]byte, error) { 82 + return i.tree.Get(key) 83 + } 84 + 85 + // GetProof implements [ReadOnlyTree]. 86 + func (i *immutableToUnifiedTree) GetProof(key []byte) (*ics23.CommitmentProof, error) { 87 + return i.tree.GetProof(key) 88 + } 89 + 90 + // IterateRange implements [ReadOnlyTree]. 91 + func (i *immutableToUnifiedTree) IterateRange(start []byte, end []byte, ascending bool, fn func(key []byte, value []byte) bool) (stopped bool) { 92 + return i.tree.IterateRange(start, end, ascending, fn) 93 + }
+100 -133
store/tree.go
··· 19 "github.com/samber/mo" 20 ) 21 22 var Tree PLCTreeStore = &TreeStore{} 23 24 type PLCTreeStore interface { 25 - AuditLog(tree PossiblyMutableTree, did string, withProof bool) ([]didplc.LogEntry, *ics23.CommitmentProof, error) 26 - AuditLogReverseIterator(tree PossiblyMutableTree, did string, err *error) iter.Seq2[int, didplc.LogEntry] 27 - ExportOperations(tree PossiblyMutableTree, after time.Time, count int) ([]didplc.LogEntry, error) // passing a count of zero means unlimited 28 - StoreOperation(tree *iavl.MutableTree, entry didplc.LogEntry, newIndex int, nullifyWithIndexEqualOrGreaterThan mo.Option[int]) error 29 } 30 31 var _ PLCTreeStore = (*TreeStore)(nil) ··· 33 // TreeStore exists just to groups methods nicely 34 type TreeStore struct{} 35 36 - type PossiblyMutableTree interface { 37 - IsMutable() bool 38 - Has(key []byte) (bool, error) 39 - Get(key []byte) ([]byte, error) 40 - GetProof(key []byte) (*ics23.CommitmentProof, error) // won't actually work on mutable trees, but we don't need it to 41 - IterateRange(start, end []byte, ascending bool, fn func(key []byte, value []byte) bool) (stopped bool) 42 - Set(key []byte, value []byte) (bool, error) 43 - } 44 - 45 - type mutableToUnifiedTree struct { 46 - tree *iavl.MutableTree 47 - } 48 - 49 - var _ PossiblyMutableTree = (*mutableToUnifiedTree)(nil) 50 - 51 - func AdaptMutableTree(tree *iavl.MutableTree) PossiblyMutableTree { 52 - return &mutableToUnifiedTree{ 53 - tree: tree, 54 - } 55 - } 56 - 57 - // IsMutable implements [PossiblyMutableTree]. 58 - func (m *mutableToUnifiedTree) IsMutable() bool { 59 - return true 60 - } 61 - 62 - // Has implements [PossiblyMutableTree]. 63 - func (m *mutableToUnifiedTree) Has(key []byte) (bool, error) { 64 - return m.tree.Has(key) 65 - } 66 - 67 - // Get implements [PossiblyMutableTree]. 68 - func (m *mutableToUnifiedTree) Get(key []byte) ([]byte, error) { 69 - return m.tree.Get(key) 70 - } 71 - 72 - // GetProof implements [PossiblyMutableTree]. 73 - func (m *mutableToUnifiedTree) GetProof(key []byte) (*ics23.CommitmentProof, error) { 74 - return nil, stacktrace.NewError("proof calculation not possible over mutable tree") 75 - } 76 - 77 - // Set implements [PossiblyMutableTree]. 78 - func (m *mutableToUnifiedTree) Set(key []byte, value []byte) (bool, error) { 79 - return m.tree.Set(key, value) 80 - } 81 - 82 - // IterateRange implements [PossiblyMutableTree]. 83 - func (m *mutableToUnifiedTree) IterateRange(start []byte, end []byte, ascending bool, fn func(key []byte, value []byte) bool) (stopped bool) { 84 - // it might look like MutableTree implements IterateRange but it doesn't, 85 - // most iteration methods actually come from the embedded ImmutableTree we're not meant to use 86 - // (terrible API) 87 - itr, err := m.tree.Iterator(start, end, ascending) 88 - if err != nil { 89 - return false 90 - } 91 - 92 - defer itr.Close() 93 - 94 - for ; itr.Valid(); itr.Next() { 95 - if fn(itr.Key(), itr.Value()) { 96 - return true 97 - } 98 - } 99 - return false 100 - } 101 - 102 - type immutableToUnifiedTree struct { 103 - tree *iavl.ImmutableTree 104 - } 105 - 106 - var _ PossiblyMutableTree = (*immutableToUnifiedTree)(nil) 107 - 108 - func AdaptImmutableTree(tree *iavl.ImmutableTree) PossiblyMutableTree { 109 - return &immutableToUnifiedTree{ 110 - tree: tree, 111 - } 112 - } 113 - 114 - // IsMutable implements [PossiblyMutableTree]. 115 - func (m *immutableToUnifiedTree) IsMutable() bool { 116 - return false 117 - } 118 - 119 - // Has implements [PossiblyMutableTree]. 120 - func (i *immutableToUnifiedTree) Has(key []byte) (bool, error) { 121 - return i.tree.Has(key) 122 - } 123 - 124 - // Get implements [PossiblyMutableTree]. 125 - func (i *immutableToUnifiedTree) Get(key []byte) ([]byte, error) { 126 - return i.tree.Get(key) 127 - } 128 - 129 - // GetProof implements [PossiblyMutableTree]. 130 - func (i *immutableToUnifiedTree) GetProof(key []byte) (*ics23.CommitmentProof, error) { 131 - return i.tree.GetProof(key) 132 - } 133 - 134 - // IterateRange implements [PossiblyMutableTree]. 135 - func (i *immutableToUnifiedTree) IterateRange(start []byte, end []byte, ascending bool, fn func(key []byte, value []byte) bool) (stopped bool) { 136 - return i.tree.IterateRange(start, end, ascending, fn) 137 - } 138 - 139 - // Set implements [PossiblyMutableTree]. 140 - func (i *immutableToUnifiedTree) Set(key []byte, value []byte) (bool, error) { 141 - return false, stacktrace.NewError("set not possible over immutable tree") 142 - } 143 - 144 - func (t *TreeStore) AuditLog(tree PossiblyMutableTree, did string, withProof bool) ([]didplc.LogEntry, *ics23.CommitmentProof, error) { 145 proofs := []*ics23.CommitmentProof{} 146 147 didBytes, err := didToBytes(did) ··· 206 Operation: operation, 207 CID: operation.AsOperation().CID().String(), 208 Nullified: nullified, 209 - CreatedAt: timestamp.Format(syntax.AtprotoDatetimeLayout), 210 }) 211 } 212 ··· 220 return logEntries, combinedProof, nil 221 } 222 223 - func (t *TreeStore) AuditLogReverseIterator(tree PossiblyMutableTree, did string, retErr *error) iter.Seq2[int, didplc.LogEntry] { 224 return func(yield func(int, didplc.LogEntry) bool) { 225 didBytes, err := didToBytes(did) 226 if err != nil { ··· 274 Operation: operation, 275 CID: operation.AsOperation().CID().String(), 276 Nullified: nullified, 277 - CreatedAt: timestamp.Format(syntax.AtprotoDatetimeLayout), 278 }) { 279 return 280 } ··· 282 } 283 } 284 285 - func (t *TreeStore) ExportOperations(tree PossiblyMutableTree, after time.Time, count int) ([]didplc.LogEntry, error) { 286 // as the name suggests, after is an exclusive lower bound, but our iterators use inclusive lower bounds 287 start := after.Add(1 * time.Nanosecond) 288 startKey := marshalOperationKey(start, make([]byte, 15)) ··· 312 Operation: operation, 313 CID: operation.AsOperation().CID().String(), 314 Nullified: nullified, 315 - CreatedAt: timestamp.Format(syntax.AtprotoDatetimeLayout), 316 }) 317 return len(entries) == count // this condition being checked here also makes it so that a count of zero means unlimited 318 }) ··· 322 return entries, nil 323 } 324 325 - func (t *TreeStore) StoreOperation(tree *iavl.MutableTree, entry didplc.LogEntry, newIndex int, nullifyWithIndexEqualOrGreaterThan mo.Option[int]) error { 326 didBytes, err := didToBytes(entry.DID) 327 if err != nil { 328 return stacktrace.Propagate(err, "") ··· 330 331 logKey := marshalDIDLogKey(didBytes) 332 333 - var operationKeys [][]byte 334 logOperations, err := tree.Get(logKey) 335 logOperations = slices.Clone(logOperations) 336 - if err != nil { 337 - operationKeys = [][]byte{} 338 - } else { 339 - operationKeys = make([][]byte, 0, len(logOperations)/8) 340 - for ts := range slices.Chunk(logOperations, 8) { 341 - operationKeys = append(operationKeys, timestampBytesToDIDOperationKey(ts, didBytes)) 342 - } 343 - } 344 345 if nullifyEGt, ok := nullifyWithIndexEqualOrGreaterThan.Get(); ok { 346 for _, opKey := range operationKeys[nullifyEGt:] { 347 operationValue, err := tree.Get(opKey) 348 if err != nil { ··· 363 return stacktrace.Propagate(err, "invalid CreatedAt") 364 } 365 366 - opKey := marshalOperationKey(opDatetime.Time().Truncate(1*time.Millisecond), didBytes) 367 - opValue := marshalOperationValue(entry.Nullified, entry.Operation) 368 369 _, err = tree.Set(opKey, opValue) 370 if err != nil { ··· 380 return nil 381 } 382 383 func didToBytes(did string) ([]byte, error) { 384 if !strings.HasPrefix(did, "did:plc:") { 385 return nil, stacktrace.NewError("invalid did:plc") ··· 426 func marshalOperationKey(createdAt time.Time, didBytes []byte) []byte { 427 key := make([]byte, 1+8+15) 428 key[0] = 'o' 429 - binary.BigEndian.PutUint64(key[1:], uint64(createdAt.UTC().UnixNano())) 430 copy(key[9:], didBytes) 431 return key 432 } ··· 438 return createdAt, did, stacktrace.Propagate(err, "") 439 } 440 441 - func marshalOperationValue(nullified bool, operation didplc.OpEnum) []byte { 442 o := []byte{lo.Ternary[byte](nullified, 1, 0)} 443 - o = append(o, operation.AsOperation().SignedCBORBytes()...) 444 return o 445 } 446
··· 19 "github.com/samber/mo" 20 ) 21 22 + // ActualAtprotoDatetimeLayout is the format for CreatedAt timestamps 23 + // AtprotoDatetimeLayout as defined by github.com/bluesky-social/indigo/atproto/syntax omits trailing zeros in the milliseconds 24 + // This doesn't match how the official plc.directory implementation formats them, so we define that format here with trailing zeros included 25 + const ActualAtprotoDatetimeLayout = "2006-01-02T15:04:05.000Z" 26 + 27 var Tree PLCTreeStore = &TreeStore{} 28 29 type PLCTreeStore interface { 30 + AuditLog(tree ReadOnlyTree, did string, withProof bool) ([]didplc.LogEntry, *ics23.CommitmentProof, error) 31 + AuditLogReverseIterator(tree ReadOnlyTree, did string, err *error) iter.Seq2[int, didplc.LogEntry] 32 + ExportOperations(tree ReadOnlyTree, after time.Time, count int) ([]didplc.LogEntry, error) // passing a count of zero means unlimited 33 + StoreOperation(tree *iavl.MutableTree, entry didplc.LogEntry, nullifyWithIndexEqualOrGreaterThan mo.Option[int]) error 34 + ReplaceHistory(tree *iavl.MutableTree, history []didplc.LogEntry) error 35 } 36 37 var _ PLCTreeStore = (*TreeStore)(nil) ··· 39 // TreeStore exists just to groups methods nicely 40 type TreeStore struct{} 41 42 + func (t *TreeStore) AuditLog(tree ReadOnlyTree, did string, withProof bool) ([]didplc.LogEntry, *ics23.CommitmentProof, error) { 43 proofs := []*ics23.CommitmentProof{} 44 45 didBytes, err := didToBytes(did) ··· 104 Operation: operation, 105 CID: operation.AsOperation().CID().String(), 106 Nullified: nullified, 107 + CreatedAt: timestamp.Format(ActualAtprotoDatetimeLayout), 108 }) 109 } 110 ··· 118 return logEntries, combinedProof, nil 119 } 120 121 + func (t *TreeStore) AuditLogReverseIterator(tree ReadOnlyTree, did string, retErr *error) iter.Seq2[int, didplc.LogEntry] { 122 return func(yield func(int, didplc.LogEntry) bool) { 123 didBytes, err := didToBytes(did) 124 if err != nil { ··· 172 Operation: operation, 173 CID: operation.AsOperation().CID().String(), 174 Nullified: nullified, 175 + CreatedAt: timestamp.Format(ActualAtprotoDatetimeLayout), 176 }) { 177 return 178 } ··· 180 } 181 } 182 183 + func (t *TreeStore) ExportOperations(tree ReadOnlyTree, after time.Time, count int) ([]didplc.LogEntry, error) { 184 // as the name suggests, after is an exclusive lower bound, but our iterators use inclusive lower bounds 185 start := after.Add(1 * time.Nanosecond) 186 startKey := marshalOperationKey(start, make([]byte, 15)) ··· 210 Operation: operation, 211 CID: operation.AsOperation().CID().String(), 212 Nullified: nullified, 213 + CreatedAt: timestamp.Format(ActualAtprotoDatetimeLayout), 214 }) 215 return len(entries) == count // this condition being checked here also makes it so that a count of zero means unlimited 216 }) ··· 220 return entries, nil 221 } 222 223 + func (t *TreeStore) StoreOperation(tree *iavl.MutableTree, entry didplc.LogEntry, nullifyWithIndexEqualOrGreaterThan mo.Option[int]) error { 224 didBytes, err := didToBytes(entry.DID) 225 if err != nil { 226 return stacktrace.Propagate(err, "") ··· 228 229 logKey := marshalDIDLogKey(didBytes) 230 231 logOperations, err := tree.Get(logKey) 232 logOperations = slices.Clone(logOperations) 233 234 if nullifyEGt, ok := nullifyWithIndexEqualOrGreaterThan.Get(); ok { 235 + var operationKeys [][]byte 236 + if err != nil { 237 + operationKeys = [][]byte{} 238 + } else { 239 + operationKeys = make([][]byte, 0, len(logOperations)/8) 240 + for ts := range slices.Chunk(logOperations, 8) { 241 + operationKeys = append(operationKeys, timestampBytesToDIDOperationKey(ts, didBytes)) 242 + } 243 + } 244 + 245 for _, opKey := range operationKeys[nullifyEGt:] { 246 operationValue, err := tree.Get(opKey) 247 if err != nil { ··· 262 return stacktrace.Propagate(err, "invalid CreatedAt") 263 } 264 265 + operation := entry.Operation.AsOperation() 266 + opKey := marshalOperationKey(opDatetime.Time(), didBytes) 267 + opValue := marshalOperationValue(entry.Nullified, operation) 268 269 _, err = tree.Set(opKey, opValue) 270 if err != nil { ··· 280 return nil 281 } 282 283 + func (t *TreeStore) ReplaceHistory(tree *iavl.MutableTree, history []didplc.LogEntry) error { 284 + if len(history) == 0 { 285 + // for now this isn't needed, if it's needed in the future we'll have to accept a DID as argument on this function 286 + return stacktrace.NewError("can't replace with empty history") 287 + } 288 + 289 + did := history[0].DID 290 + 291 + didBytes, err := didToBytes(did) 292 + if err != nil { 293 + return stacktrace.Propagate(err, "") 294 + } 295 + 296 + logKey := marshalDIDLogKey(didBytes) 297 + 298 + // identify keys of existing operations for this DID (if any) 299 + var prevOpKeys [][]byte 300 + logOperations, err := tree.Get(logKey) 301 + if err != nil { 302 + return stacktrace.Propagate(err, "") 303 + } 304 + prevOpKeys = make([][]byte, 0, len(logOperations)/8) 305 + for ts := range slices.Chunk(logOperations, 8) { 306 + prevOpKeys = append(prevOpKeys, timestampBytesToDIDOperationKey(ts, didBytes)) 307 + } 308 + 309 + // remove existing operations for this DID (if any) 310 + for _, key := range prevOpKeys { 311 + _, _, err = tree.Remove(key) 312 + if err != nil { 313 + return stacktrace.Propagate(err, "") 314 + } 315 + } 316 + 317 + // add new list of operations 318 + logOperations = make([]byte, 0, len(history)*8) 319 + for _, entry := range history { 320 + opDatetime, err := syntax.ParseDatetime(entry.CreatedAt) 321 + if err != nil { 322 + return stacktrace.Propagate(err, "invalid CreatedAt") 323 + } 324 + 325 + operation := entry.Operation.AsOperation() 326 + opKey := marshalOperationKey(opDatetime.Time(), didBytes) 327 + opValue := marshalOperationValue(entry.Nullified, operation) 328 + 329 + _, err = tree.Set(opKey, opValue) 330 + if err != nil { 331 + return stacktrace.Propagate(err, "") 332 + } 333 + 334 + // add to log for DID 335 + logOperations = append(logOperations, opKey[1:9]...) 336 + } 337 + 338 + // save updated log for DID 339 + _, err = tree.Set(logKey, logOperations) 340 + if err != nil { 341 + return stacktrace.Propagate(err, "") 342 + } 343 + 344 + return nil 345 + } 346 + 347 func didToBytes(did string) ([]byte, error) { 348 if !strings.HasPrefix(did, "did:plc:") { 349 return nil, stacktrace.NewError("invalid did:plc") ··· 390 func marshalOperationKey(createdAt time.Time, didBytes []byte) []byte { 391 key := make([]byte, 1+8+15) 392 key[0] = 'o' 393 + 394 + ts := uint64(createdAt.Truncate(1 * time.Millisecond).UTC().UnixNano()) 395 + binary.BigEndian.PutUint64(key[1:], ts) 396 + 397 copy(key[9:], didBytes) 398 return key 399 } ··· 405 return createdAt, did, stacktrace.Propagate(err, "") 406 } 407 408 + func marshalOperationValue(nullified bool, operation didplc.Operation) []byte { 409 o := []byte{lo.Ternary[byte](nullified, 1, 0)} 410 + o = append(o, operation.SignedCBORBytes()...) 411 return o 412 } 413