A very experimental PLC implementation which uses BFT consensus for decentralization
at main 1184 lines 38 kB view raw
1package store 2 3import ( 4 "context" 5 _ "embed" 6 "encoding/base32" 7 "encoding/binary" 8 "errors" 9 "iter" 10 "math" 11 "math/big" 12 "slices" 13 "strings" 14 "time" 15 16 "github.com/OffchainLabs/go-bitfield" 17 "github.com/bluesky-social/indigo/atproto/syntax" 18 cmtproto "github.com/cometbft/cometbft/proto/tendermint/types" 19 cmttypes "github.com/cometbft/cometbft/types" 20 ics23 "github.com/cosmos/ics23/go" 21 "github.com/dgraph-io/badger/v4" 22 "github.com/did-method-plc/go-didplc" 23 "github.com/gbl08ma/stacktrace" 24 cbornode "github.com/ipfs/go-ipld-cbor" 25 "github.com/polydawn/refmt/obj/atlas" 26 "github.com/samber/lo" 27 "github.com/samber/mo" 28 "tangled.org/gbl08ma.com/didplcbft/transaction" 29 "tangled.org/gbl08ma.com/didplcbft/types" 30) 31 32const ( 33 PublicKeyLength = 32 34 AddressLength = 20 35 36 TreeOperationKeyPrefix = 'o' 37 TreeOperationKeyLength = 1 + 8 38 TreeRangeChallengeCommitmentKeyPrefix = 'C' 39 TreeRangeChallengeKeyLength = 1 + PublicKeyLength 40 TreeChallengeCompletionKeyPrefix = 'p' 41 TreeChallengeCompletionKeyLength = 1 + PublicKeyLength 42 TreeValidatorReputationKeyPrefix = 'r' 43 TreeValidatorReputationKeyLength = 1 + PublicKeyLength 44 45 IndexBlockChallengeKeyPrefix = 'c' 46 IndexBlockChallengeKeyLength = 1 + 8 47 IndexDIDLogKeyPrefix = 'l' 48 IndexDIDLogKeyLength = 1 + 15 + 8 49 IndexValidatorVotingActivityPrefix = 'v' 50 IndexValidatorVotingActivityKeyLength = 1 + AddressLength + 8 51 IndexBlockHeaderKeyPrefix = 'h' 52 IndexBlockHeaderKeyLength = 1 + 8 53 54 TreeAuthoritativePLCKey = "aPLCURL" 55 TreeAuthoritativeImportProgressKey = "aImportProgress" 56) 57 58var Consensus ConsensusStore = &consensusStore{} 59 60var ErrNoActiveChallengeCommitment = errors.New("the validator is currently not committed to a challenge") 61var ErrNoRecentChallengeCompletion = errors.New("the validator has not completed a range challenge recently") 62var ErrValidatorNotActive = errors.New("the validator is not active") 63 64// ConsensusStore manages all information that is directly or indirectly protected by consensus 65type ConsensusStore interface { 66 AuditLog(ctx context.Context, tx transaction.Read, did string, withProof bool) ([]types.SequencedLogEntry, *ics23.CommitmentProof, error) 67 AuditLogReverseIterator(ctx context.Context, tx transaction.Read, did string, err *error) iter.Seq[types.SequencedLogEntry] 68 69 ExportOperations(ctx context.Context, tx transaction.Read, after uint64, count int) ([]types.SequencedLogEntry, error) // passing a count of zero means unlimited 70 OperationsIterator(tx transaction.Read, after uint64, retErr *error) iter.Seq2[types.SequencedLogEntry, []byte] 71 72 StoreOperation(ctx context.Context, tx transaction.Write, entry didplc.LogEntry, nullifyWithSequenceEqualOrGreaterThan mo.Option[uint64]) error 73 SetOperationCreatedAt(tx transaction.Write, seqID uint64, createdAt time.Time) error 74 75 CountOperations(tx transaction.Read) (uint64, error) 76 77 AuthoritativePLC(tx transaction.Read) (string, error) 78 SetAuthoritativePLC(tx transaction.Write, url string) error 79 80 AuthoritativeImportProgress(tx transaction.Read) (uint64, error) 81 SetAuthoritativeImportProgress(tx transaction.Write, nextCursor uint64) error 82 83 ValidatorRangeChallengeCommitment(tx transaction.Read, validatorPubKey []byte) (fromHeight, toHeight, provenHeight, includedOnHeight uint64, treeRoot []byte, err error) 84 SetValidatorRangeChallengeCommitment(tx transaction.Write, validatorPubKey []byte, fromHeight, toHeight, provenHeight, includedOnHeight uint64, treeRoot []byte) error 85 ClearValidatorRangeChallengeCommitment(tx transaction.Write, validatorPubKey []byte) error 86 87 ValidatorRangeChallengeCompletion(tx transaction.Read, validatorPubKey []byte) (uint64, error) 88 SetValidatorRangeChallengeCompletion(tx transaction.Write, validatorPubKey []byte, completedToHeight uint64) error 89 90 BlockChallengeProof(tx transaction.Read, height uint64) ([]byte, error) 91 BlockChallengeProofsIterator(tx transaction.Read, afterHeight uint64, retErr *error) iter.Seq2[uint64, []byte] // afterHeight is exclusive for consistency with OperationsIterator 92 BlockChallengeProofsReverseIterator(tx transaction.Read, beforeHeight uint64, retErr *error) iter.Seq2[uint64, []byte] 93 StoreBlockChallengeProof(tx transaction.WriteIndex, blockHeight uint64, proof []byte) error 94 DeleteBlockChallengeProofsBelowHeight(ctx context.Context, tx transaction.WriteIndex, blockHeight uint64) error 95 96 ValidatorReputation(tx transaction.Read, validatorPubKey []byte) (uint64, error) 97 ChangeValidatorReputation(tx transaction.Write, validatorPubKey []byte, changer func(reputation uint64) (uint64, error)) error 98 ChangeAllNonZeroValidatorReputations(tx transaction.Write, changer func(validatorPubKey []byte, reputation uint64) (uint64, error)) error 99 100 ActiveValidatorPubKey(tx transaction.Read, height uint64, validatorAddress []byte) ([]byte, error) 101 ActiveValidatorsIterator(tx transaction.Read, epochHeight uint64, retErr *error) iter.Seq[ActiveValidator] 102 InitializeValidatorVotingActivity(tx transaction.WriteIndex, validatorAddress, validatorPubKey []byte, epochHeight uint64) error 103 MarkValidatorVote(tx transaction.WriteIndex, validatorAddress []byte, height uint64) error 104 105 FallbackBlockHeader(tx transaction.Read, height uint64) (cmttypes.Header, error) 106 107 ConfigureEpochSize(epochSize uint64) 108} 109 110type ActiveValidator struct { 111 Address []byte 112 PublicKey []byte 113 VoteCount uint64 114} 115 116var _ ConsensusStore = (*consensusStore)(nil) 117 118// consensusStore exists just to groups methods nicely 119type consensusStore struct { 120 epochSize uint64 121} 122 123// ConfigureEpochSize implements [ConsensusStore]. 124func (t *consensusStore) ConfigureEpochSize(epochSize uint64) { 125 t.epochSize = epochSize 126} 127 128func (t *consensusStore) AuditLog(ctx context.Context, tx transaction.Read, did string, withProof bool) ([]types.SequencedLogEntry, *ics23.CommitmentProof, error) { 129 didBytes, err := DIDToBytes(did) 130 if err != nil { 131 return nil, nil, stacktrace.Propagate(err) 132 } 133 134 didRangeStart := marshalDIDLogKey(didBytes, 0) 135 didRangeEnd := marshalDIDLogKey(didBytes, math.MaxUint64) 136 137 didLogIterator, err := tx.IndexDB().Iterator(didRangeStart, didRangeEnd) 138 if err != nil { 139 return nil, nil, stacktrace.Propagate(err) 140 } 141 142 defer didLogIterator.Close() 143 144 logEntries := make([]types.SequencedLogEntry, 0, 1) 145 proofs := []*ics23.CommitmentProof{} 146 txHeight := uint64(tx.Height()) 147 for didLogIterator.Valid() { 148 select { 149 case <-ctx.Done(): 150 return nil, nil, stacktrace.Propagate(ctx.Err()) 151 default: 152 } 153 154 sequence := unmarshalDIDLogSequence(didLogIterator.Key()) 155 validFromHeight, validToHeight := UnmarshalDIDLogValue(didLogIterator.Value()) 156 157 opKey := marshalOperationKey(sequence) 158 159 if txHeight >= validFromHeight && txHeight <= validToHeight { 160 operationValue, err := tx.Tree().Get(opKey) 161 if err != nil { 162 return nil, nil, stacktrace.Propagate(err) 163 } 164 165 if withProof { 166 proof, err := tx.Tree().GetProof(opKey) 167 if err != nil { 168 return nil, nil, stacktrace.Propagate(err) 169 } 170 proofs = append(proofs, proof) 171 } 172 173 logEntry, err := unmarshalLogEntry(opKey, operationValue) 174 if err != nil { 175 return nil, nil, stacktrace.Propagate(err) 176 } 177 178 logEntries = append(logEntries, logEntry) 179 } 180 didLogIterator.Next() 181 } 182 183 err = didLogIterator.Error() 184 if err != nil { 185 return nil, nil, stacktrace.Propagate(err) 186 } 187 188 var combinedProof *ics23.CommitmentProof 189 if withProof { 190 combinedProof, err = ics23.CombineProofs(proofs) 191 if err != nil { 192 return nil, nil, stacktrace.Propagate(err, "failed to combine proofs") 193 } 194 } 195 return logEntries, combinedProof, nil 196} 197 198func (t *consensusStore) AuditLogReverseIterator(ctx context.Context, tx transaction.Read, did string, retErr *error) iter.Seq[types.SequencedLogEntry] { 199 return func(yield func(types.SequencedLogEntry) bool) { 200 *retErr = nil 201 202 didBytes, err := DIDToBytes(did) 203 if err != nil { 204 *retErr = stacktrace.Propagate(err) 205 return 206 } 207 208 mayBePresent := tx.TestDIDBloomFilter(didBytes) 209 if !mayBePresent { 210 return 211 } 212 213 didRangeStart := marshalDIDLogKey(didBytes, 0) 214 didRangeEnd := marshalDIDLogKey(didBytes, math.MaxUint64) 215 216 opts := badger.DefaultIteratorOptions 217 // we rarely have more than one entry for a DID and we see great performance gains from not prefetching 218 // (iterating on DIDs that don't exist goes from taking over 150us to taking around 35 us) 219 opts.PrefetchValues = false 220 opts.Reverse = true 221 didLogIterator, err := tx.IndexDB().IteratorWithOptions(didRangeStart, didRangeEnd, opts) 222 if err != nil { 223 *retErr = stacktrace.Propagate(err) 224 return 225 } 226 227 defer didLogIterator.Close() 228 229 txHeight := uint64(tx.Height()) 230 231 for didLogIterator.Valid() { 232 select { 233 case <-ctx.Done(): 234 *retErr = stacktrace.Propagate(ctx.Err()) 235 return 236 default: 237 } 238 239 sequence := unmarshalDIDLogSequence(didLogIterator.Key()) 240 validFromHeight, validToHeight := UnmarshalDIDLogValue(didLogIterator.Value()) 241 242 opKey := marshalOperationKey(sequence) 243 244 if txHeight >= validFromHeight && txHeight <= validToHeight { 245 operationValue, err := tx.Tree().Get(opKey) 246 if err != nil { 247 *retErr = stacktrace.Propagate(err) 248 return 249 } 250 251 logEntry, err := unmarshalLogEntry(opKey, operationValue) 252 if err != nil { 253 *retErr = stacktrace.Propagate(err) 254 return 255 } 256 257 if !yield(logEntry) { 258 return 259 } 260 } 261 didLogIterator.Next() 262 } 263 264 err = didLogIterator.Error() 265 if err != nil { 266 *retErr = stacktrace.Propagate(err) 267 } 268 } 269} 270 271func (t *consensusStore) ExportOperations(ctx context.Context, tx transaction.Read, after uint64, count int) ([]types.SequencedLogEntry, error) { 272 entries := make([]types.SequencedLogEntry, 0, count) 273 var iterErr error 274 for logEntry := range t.OperationsIterator(tx, after, &iterErr) { 275 entries = append(entries, logEntry) 276 277 // this condition being checked here also makes it so that a count of zero means unlimited 278 if len(entries) == count { 279 break 280 } 281 } 282 if iterErr != nil { 283 return nil, stacktrace.Propagate(iterErr, "ran into an error while iterating") 284 } 285 return entries, nil 286} 287 288func (t *consensusStore) OperationsIterator(tx transaction.Read, after uint64, retErr *error) iter.Seq2[types.SequencedLogEntry, []byte] { 289 return func(yield func(types.SequencedLogEntry, []byte) bool) { 290 *retErr = nil 291 292 // as the name suggests, after is an exclusive lower bound, but our iterators use inclusive lower bounds 293 start := after + 1 294 startKey := marshalOperationKey(start) 295 endKey := maxOperationKey 296 297 opIterator, err := tx.Tree().Iterator(startKey, endKey, true) 298 if err != nil { 299 *retErr = stacktrace.Propagate(err) 300 return 301 } 302 303 defer opIterator.Close() 304 305 for opIterator.Valid() { 306 logEntry, err := unmarshalLogEntry(opIterator.Key(), opIterator.Value()) 307 if err != nil { 308 *retErr = stacktrace.Propagate(err) 309 return 310 } 311 312 if !yield(logEntry, opIterator.Value()) { 313 return 314 } 315 316 opIterator.Next() 317 } 318 err = opIterator.Error() 319 if err != nil { 320 *retErr = stacktrace.Propagate(err) 321 } 322 } 323} 324 325// StoreOperation stores an operation in the tree, nullifying existing operations whose index within the DID's history 326// is lower or equal to the specified optional integer. 327func (t *consensusStore) StoreOperation(ctx context.Context, tx transaction.Write, entry didplc.LogEntry, nullifyWithSequenceEqualOrGreaterThan mo.Option[uint64]) error { 328 didBytes, err := DIDToBytes(entry.DID) 329 if err != nil { 330 return stacktrace.Propagate(err) 331 } 332 333 txHeight := uint64(tx.Height()) 334 335 if nullifyEGt, ok := nullifyWithSequenceEqualOrGreaterThan.Get(); ok { 336 didRangeStart := marshalDIDLogKey(didBytes, nullifyEGt) 337 didRangeEnd := marshalDIDLogKey(didBytes, math.MaxUint64) 338 339 opts := badger.DefaultIteratorOptions 340 opts.PrefetchValues = false 341 opts.Reverse = true 342 didLogIterator, err := tx.IndexDB().IteratorWithOptions(didRangeStart, didRangeEnd, opts) 343 if err != nil { 344 return stacktrace.Propagate(err) 345 } 346 347 defer didLogIterator.Close() 348 349 txHeight := uint64(tx.Height()) 350 351 for didLogIterator.Valid() { 352 select { 353 case <-ctx.Done(): 354 return stacktrace.Propagate(ctx.Err()) 355 default: 356 } 357 358 sequence := unmarshalDIDLogSequence(didLogIterator.Key()) 359 validFromHeight, validToHeight := UnmarshalDIDLogValue(didLogIterator.Value()) 360 361 opKey := marshalOperationKey(sequence) 362 363 if txHeight < validFromHeight || txHeight > validToHeight { 364 // ignore ops that are invisible at this height 365 didLogIterator.Next() 366 continue 367 } 368 369 operationValue, err := tx.Tree().Get(opKey) 370 if err != nil { 371 return stacktrace.Propagate(err) 372 } 373 374 updated, err := tx.Tree().Set(opKey, markOperationValueNullified(operationValue)) 375 if err != nil { 376 return stacktrace.Propagate(err) 377 } 378 if !updated { 379 // if we get to this point we have a mistake in our program, and the data is now inconsistent 380 // we are not supposed to be able to recover from this error without rolling back the tree 381 return stacktrace.NewError("expected to be updating an existing operation key but wrote new one instead") 382 } 383 384 didLogIterator.Next() 385 } 386 err = didLogIterator.Error() 387 if err != nil { 388 return stacktrace.Propagate(err) 389 } 390 } 391 392 opDatetime, err := syntax.ParseDatetime(entry.CreatedAt) 393 if err != nil { 394 return stacktrace.Propagate(err, "invalid CreatedAt") 395 } 396 397 sequence, err := tx.NextSequence() 398 if err != nil { 399 return stacktrace.Propagate(err) 400 } 401 402 operation := entry.Operation.AsOperation() 403 opKey := marshalOperationKey(sequence) 404 opValue := marshalOperationValue(entry.Nullified, didBytes, opDatetime.Time(), operation) 405 406 updated, err := tx.Tree().Set(opKey, opValue) 407 if err != nil { 408 return stacktrace.Propagate(err) 409 } 410 if updated { 411 // if we get to this point we have a mistake in our program, and the data is now inconsistent 412 // we are not supposed to be able to recover from this error without rolling back the tree 413 return stacktrace.NewError("expected to be writing to a new operation key but updated instead") 414 } 415 416 logKey := marshalDIDLogKey(didBytes, sequence) 417 logValue := marshalDIDLogValue(txHeight, math.MaxUint64) 418 419 err = tx.IndexDB().Set(logKey, logValue) 420 if err != nil { 421 return stacktrace.Propagate(err) 422 } 423 424 tx.AddToDIDBloomFilter(didBytes, sequence) 425 426 return nil 427} 428 429func (t *consensusStore) SetOperationCreatedAt(tx transaction.Write, seqID uint64, createdAt time.Time) error { 430 opKey := marshalOperationKey(seqID) 431 432 opValue, err := tx.Tree().Get(opKey) 433 if err != nil { 434 return stacktrace.Propagate(err) 435 } 436 if len(opValue) == 0 { 437 return stacktrace.NewError("operation %d not found", seqID) 438 } 439 440 opValue = slices.Clone(opValue) 441 442 ts := uint64(createdAt.Truncate(1 * time.Millisecond).UTC().UnixNano()) 443 binary.BigEndian.PutUint64(opValue[16:24], ts) 444 445 updated, err := tx.Tree().Set(opKey, opValue) 446 if !updated { 447 // if we get to this point we have a mistake in our program, and the data is now inconsistent 448 // we are not supposed to be able to recover from this error without rolling back the tree 449 return stacktrace.NewError("expected to be updating an existing operation key but wrote new one instead") 450 } 451 return stacktrace.Propagate(err) 452} 453 454var minOperationKey = marshalOperationKey(0) 455var maxOperationKey = marshalOperationKey(math.MaxInt64) 456 457func (t *consensusStore) CountOperations(tx transaction.Read) (uint64, error) { 458 seq := uint64(0) 459 460 itr, err := tx.Tree().Iterator(minOperationKey, maxOperationKey, false) 461 if err != nil { 462 return 0, stacktrace.Propagate(err) 463 } 464 465 defer itr.Close() 466 467 if itr.Valid() { 468 seq, err = unmarshalOperationKey(itr.Key()) 469 if err != nil { 470 return 0, stacktrace.Propagate(err) 471 } 472 } 473 return seq, stacktrace.Propagate(err) 474} 475 476func DIDToBytes(did string) ([]byte, error) { 477 if !strings.HasPrefix(did, "did:plc:") { 478 return nil, stacktrace.NewError("invalid did:plc") 479 } 480 481 didBytes := make([]byte, 15) 482 483 did = strings.ToUpper(did) 484 485 numWritten, err := base32.StdEncoding.Decode(didBytes, []byte(did[8:])) 486 if err != nil { 487 return nil, stacktrace.Propagate(err, "invalid did:plc") 488 } 489 if numWritten != 15 { 490 return nil, stacktrace.NewError("invalid did:plc") 491 } 492 493 return didBytes, nil 494} 495 496func bytesToDID(didBytes []byte) (string, error) { 497 did := "did:plc:" + strings.ToLower(base32.StdEncoding.EncodeToString(didBytes)) 498 if len(did) != 8+24 { 499 return "", stacktrace.NewError("invalid did:plc") 500 } 501 return did, nil 502} 503 504func marshalDIDLogKey(didBytes []byte, sequence uint64) []byte { 505 key := make([]byte, IndexDIDLogKeyLength) 506 key[0] = IndexDIDLogKeyPrefix 507 copy(key[1:16], didBytes) 508 binary.BigEndian.PutUint64(key[16:], sequence) 509 return key 510} 511 512func unmarshalDIDLogSequence(logKey []byte) uint64 { 513 return binary.BigEndian.Uint64(logKey[16:24]) 514} 515 516// validFromHeight, validToHeight are inclusive (i.e. if the former is 5 and the latter is 10, the value was valid at height 5 and 10, but not at 4 or 11) 517func marshalDIDLogValue(validFromHeight, validToHeight uint64) []byte { 518 value := make([]byte, 8+8) 519 binary.BigEndian.PutUint64(value, validFromHeight) 520 binary.BigEndian.PutUint64(value[8:], validToHeight) 521 return value 522} 523 524func UnmarshalDIDLogValue(value []byte) (validFromHeight, validToHeight uint64) { 525 validFromHeight = binary.BigEndian.Uint64(value[0:8]) 526 validToHeight = binary.BigEndian.Uint64(value[8:16]) 527 return 528} 529 530func marshalOperationKey(sequence uint64) []byte { 531 key := make([]byte, TreeOperationKeyLength) 532 key[0] = TreeOperationKeyPrefix 533 534 binary.BigEndian.PutUint64(key[1:], sequence) 535 536 return key 537} 538 539func unmarshalOperationKey(key []byte) (uint64, error) { 540 return binary.BigEndian.Uint64(key[1:9]), nil 541} 542 543func marshalOperationValue(nullified bool, didBytes []byte, createdAt time.Time, operation didplc.Operation) []byte { 544 opAsBytes := operation.SignedCBORBytes() 545 o := make([]byte, 1+15+8+len(opAsBytes)) 546 547 o[0] = lo.Ternary[byte](nullified, 1, 0) 548 549 copy(o[1:16], didBytes) 550 551 ts := uint64(createdAt.Truncate(1 * time.Millisecond).UTC().UnixNano()) 552 binary.BigEndian.PutUint64(o[16:24], ts) 553 copy(o[24:], opAsBytes) 554 555 return o 556} 557 558func unmarshalOperationValue(value []byte) (bool, string, time.Time, didplc.OpEnum, error) { 559 nullified := value[0] != 0 560 561 did, err := bytesToDID(value[1:16]) 562 if err != nil { 563 return false, "", time.Time{}, didplc.OpEnum{}, stacktrace.Propagate(err) 564 } 565 566 createdAtUnixNano := binary.BigEndian.Uint64(value[16:24]) 567 createdAt := time.Unix(0, int64(createdAtUnixNano)).UTC() 568 569 var opEnum didplc.OpEnum 570 err = cbornode.DecodeInto(value[24:], &opEnum) 571 if err != nil { 572 return false, "", time.Time{}, didplc.OpEnum{}, stacktrace.Propagate(err) 573 } 574 return nullified, did, createdAt, opEnum, nil 575} 576 577func markOperationValueNullified(value []byte) []byte { 578 v := slices.Clone(value) 579 v[0] = 1 580 return v 581} 582 583func unmarshalLogEntry(operationKey, operationValue []byte) (types.SequencedLogEntry, error) { 584 nullified, actualDID, timestamp, operation, err := unmarshalOperationValue(operationValue) 585 if err != nil { 586 return types.SequencedLogEntry{}, stacktrace.Propagate(err) 587 } 588 589 seq, err := unmarshalOperationKey(operationKey) 590 if err != nil { 591 return types.SequencedLogEntry{}, stacktrace.Propagate(err) 592 } 593 594 return types.SequencedLogEntry{ 595 Seq: seq, 596 DID: actualDID, 597 Operation: operation, 598 CID: operation.AsOperation().CID(), 599 Nullified: nullified, 600 CreatedAt: timestamp, 601 }, nil 602} 603 604func init() { 605 cbornode.RegisterCborType(atlas.BuildEntry(didplc.OpEnum{}). 606 Transform(). 607 TransformMarshal(atlas.MakeMarshalTransformFunc( 608 func(o didplc.OpEnum) (interface{}, error) { 609 if o.Regular != nil { 610 return o.Regular, nil 611 } else if o.Legacy != nil { 612 return o.Legacy, nil 613 } else if o.Tombstone != nil { 614 return o.Tombstone, nil 615 } 616 return nil, stacktrace.NewError("invalid OpEnum") 617 })). 618 TransformUnmarshal(atlas.MakeUnmarshalTransformFunc( 619 func(x map[string]any) (didplc.OpEnum, error) { 620 typ, ok := x["type"] 621 if !ok { 622 return didplc.OpEnum{}, stacktrace.NewError("did not find expected operation 'type' field") 623 } 624 625 // this is so stupid but oh well - maybe one day the entire thing will be upgraded to a less stupid cbor encoder 626 b, err := cbornode.DumpObject(x) 627 if err != nil { 628 return didplc.OpEnum{}, stacktrace.Propagate(err) 629 } 630 631 switch typ { 632 case "plc_operation": 633 o := &didplc.RegularOp{} 634 err = cbornode.DecodeInto(b, &o) 635 return didplc.OpEnum{ 636 Regular: o, 637 }, stacktrace.Propagate(err) 638 case "create": 639 o := &didplc.LegacyOp{} 640 err = cbornode.DecodeInto(b, &o) 641 return didplc.OpEnum{ 642 Legacy: o, 643 }, stacktrace.Propagate(err) 644 case "plc_tombstone": 645 o := &didplc.TombstoneOp{} 646 err = cbornode.DecodeInto(b, &o) 647 return didplc.OpEnum{ 648 Tombstone: o, 649 }, stacktrace.Propagate(err) 650 default: 651 return didplc.OpEnum{}, stacktrace.NewError("unexpected operation type: %s", typ) 652 } 653 })). 654 Complete()) 655} 656 657func (t *consensusStore) AuthoritativePLC(tx transaction.Read) (string, error) { 658 url, err := tx.Tree().Get([]byte(TreeAuthoritativePLCKey)) 659 if err != nil { 660 return "", stacktrace.Propagate(err) 661 } 662 if url == nil { 663 return "", nil 664 } 665 return string(url), nil 666} 667 668func (t *consensusStore) SetAuthoritativePLC(tx transaction.Write, url string) error { 669 _, err := tx.Tree().Set([]byte(TreeAuthoritativePLCKey), []byte(url)) 670 return stacktrace.Propagate(err) 671} 672 673func (t *consensusStore) AuthoritativeImportProgress(tx transaction.Read) (uint64, error) { 674 progBytes, err := tx.Tree().Get([]byte(TreeAuthoritativeImportProgressKey)) 675 if err != nil { 676 return 0, stacktrace.Propagate(err) 677 } 678 if len(progBytes) != 8 { 679 return 0, nil 680 } 681 return binary.BigEndian.Uint64(progBytes), nil 682} 683 684func (t *consensusStore) SetAuthoritativeImportProgress(tx transaction.Write, nextCursor uint64) error { 685 value := make([]byte, 8) 686 binary.BigEndian.PutUint64(value, nextCursor) 687 688 _, err := tx.Tree().Set([]byte(TreeAuthoritativeImportProgressKey), value) 689 return stacktrace.Propagate(err) 690} 691 692func marshalRangeChallengeCommitmentKey(validatorPubKey []byte) []byte { 693 key := make([]byte, TreeRangeChallengeKeyLength) 694 key[0] = TreeRangeChallengeCommitmentKeyPrefix 695 copy(key[1:], validatorPubKey) 696 return key 697} 698 699func marshalRangeChallengeCommitmentValue(fromHeight, toHeight, provenHeight, includedOnHeight uint64, treeRoot []byte) []byte { 700 value := make([]byte, 8*4+32) 701 binary.BigEndian.PutUint64(value, fromHeight) 702 binary.BigEndian.PutUint64(value[8:], toHeight) 703 binary.BigEndian.PutUint64(value[16:], provenHeight) 704 binary.BigEndian.PutUint64(value[24:], includedOnHeight) 705 copy(value[32:], treeRoot[0:32]) 706 return value 707} 708 709func unmarshalRangeChallengeCommitmentValue(value []byte) (fromHeight, toHeight, provenHeight, includedOnHeight uint64, treeRoot []byte) { 710 fromHeight = binary.BigEndian.Uint64(value) 711 toHeight = binary.BigEndian.Uint64(value[8:]) 712 provenHeight = binary.BigEndian.Uint64(value[16:]) 713 includedOnHeight = binary.BigEndian.Uint64(value[24:]) 714 treeRoot = slices.Clone(value[32:]) 715 return 716} 717 718// ValidatorRangeChallengeCommitment implements [ConsensusStore]. 719func (t *consensusStore) ValidatorRangeChallengeCommitment(tx transaction.Read, validatorPubKey []byte) (fromHeight, toHeight, provenHeight, includedOnHeight uint64, treeRoot []byte, err error) { 720 key := marshalRangeChallengeCommitmentKey(validatorPubKey) 721 value, err := tx.Tree().Get(key) 722 if err != nil { 723 return 0, 0, 0, 0, nil, stacktrace.Propagate(err) 724 } 725 if value == nil { 726 return 0, 0, 0, 0, nil, stacktrace.Propagate(ErrNoActiveChallengeCommitment) 727 } 728 fromHeight, toHeight, provenHeight, includedOnHeight, treeRoot = unmarshalRangeChallengeCommitmentValue(value) 729 return 730} 731 732// SetValidatorRangeChallengeCommitment implements [ConsensusStore]. 733func (t *consensusStore) SetValidatorRangeChallengeCommitment(tx transaction.Write, validatorPubKey []byte, fromHeight, toHeight, provenHeight, includedOnHeight uint64, treeRoot []byte) error { 734 key := marshalRangeChallengeCommitmentKey(validatorPubKey) 735 value := marshalRangeChallengeCommitmentValue(fromHeight, toHeight, provenHeight, includedOnHeight, treeRoot) 736 // this may overwrite sometimes (e.g. if a previous commitment has expired and the validator needs to submit a new one) 737 _, err := tx.Tree().Set(key, value) 738 return stacktrace.Propagate(err) 739} 740 741// ClearValidatorRangeChallengeCommitment implements [ConsensusStore]. 742func (t *consensusStore) ClearValidatorRangeChallengeCommitment(tx transaction.Write, validatorPubKey []byte) error { 743 _, removed, err := tx.Tree().Remove(marshalRangeChallengeCommitmentKey(validatorPubKey)) 744 if err != nil { 745 return stacktrace.Propagate(err) 746 } 747 if !removed { 748 // we are only expecting to call this after completing a range challenge, so we're expecting that the key would still exist 749 // (we would have needed it to validate the challenge completion transaction) 750 return stacktrace.NewError("validator did not have a range challenge commitment") 751 } 752 return nil 753} 754 755func marshalRangeChallengeCompletionKey(validatorPubKey []byte) []byte { 756 key := make([]byte, TreeChallengeCompletionKeyLength) 757 key[0] = TreeChallengeCompletionKeyPrefix 758 copy(key[1:], validatorPubKey) 759 return key 760} 761 762// ValidatorRangeChallengeCompletion implements [ConsensusStore]. 763func (t *consensusStore) ValidatorRangeChallengeCompletion(tx transaction.Read, validatorPubKey []byte) (uint64, error) { 764 key := marshalRangeChallengeCompletionKey(validatorPubKey) 765 value, err := tx.Tree().Get(key) 766 if err != nil { 767 return 0, stacktrace.Propagate(err) 768 } 769 if value == nil { 770 return 0, stacktrace.Propagate(ErrNoRecentChallengeCompletion) 771 } 772 return binary.BigEndian.Uint64(value), nil 773} 774 775// SetValidatorRangeChallengeCompletion implements [ConsensusStore]. 776func (t *consensusStore) SetValidatorRangeChallengeCompletion(tx transaction.Write, validatorPubKey []byte, completedToHeight uint64) error { 777 key := marshalRangeChallengeCompletionKey(validatorPubKey) 778 value := binary.BigEndian.AppendUint64(nil, completedToHeight) 779 _, err := tx.Tree().Set(key, value) 780 return stacktrace.Propagate(err) 781} 782 783func marshalBlockChallengeProofKey(block uint64) []byte { 784 key := make([]byte, IndexBlockChallengeKeyLength) 785 key[0] = IndexBlockChallengeKeyPrefix 786 787 binary.BigEndian.PutUint64(key[1:], block) 788 789 return key 790} 791 792func unmarshalBlockChallengeProofKey(key []byte) uint64 { 793 return binary.BigEndian.Uint64(key[1:9]) 794} 795 796var minBlockChallengeProofKey = marshalBlockChallengeProofKey(0) 797var maxBlockChallengeProofKey = marshalBlockChallengeProofKey(math.MaxInt64) 798 799func (t *consensusStore) BlockChallengeProof(tx transaction.Read, height uint64) ([]byte, error) { 800 key := marshalBlockChallengeProofKey(height) 801 value, err := tx.IndexDB().Get(key) 802 return value, stacktrace.Propagate(err) 803} 804 805func (t *consensusStore) BlockChallengeProofsIterator(tx transaction.Read, afterHeight uint64, retErr *error) iter.Seq2[uint64, []byte] { 806 return func(yield func(uint64, []byte) bool) { 807 *retErr = nil 808 // as the name suggests, after is an exclusive lower bound, but our iterators use inclusive lower bounds 809 start := afterHeight + 1 810 startKey := marshalBlockChallengeProofKey(start) 811 endKey := maxBlockChallengeProofKey 812 813 proofsIterator, err := tx.IndexDB().Iterator(startKey, endKey) 814 if err != nil { 815 *retErr = stacktrace.Propagate(err) 816 return 817 } 818 819 defer proofsIterator.Close() 820 821 for proofsIterator.Valid() { 822 blockHeight := unmarshalBlockChallengeProofKey(proofsIterator.Key()) 823 824 if !yield(blockHeight, proofsIterator.Value()) { 825 return 826 } 827 828 proofsIterator.Next() 829 } 830 err = proofsIterator.Error() 831 if err != nil { 832 *retErr = stacktrace.Propagate(err) 833 } 834 } 835} 836 837// BlockChallengeProofsReverseIterator implements [ConsensusStore]. 838func (t *consensusStore) BlockChallengeProofsReverseIterator(tx transaction.Read, beforeHeight uint64, retErr *error) iter.Seq2[uint64, []byte] { 839 return func(yield func(uint64, []byte) bool) { 840 *retErr = nil 841 startKey := minBlockChallengeProofKey 842 endKey := marshalBlockChallengeProofKey(beforeHeight) 843 844 proofsIterator, err := tx.IndexDB().ReverseIterator(startKey, endKey) 845 if err != nil { 846 *retErr = stacktrace.Propagate(err) 847 return 848 } 849 850 defer proofsIterator.Close() 851 852 for proofsIterator.Valid() { 853 blockHeight := unmarshalBlockChallengeProofKey(proofsIterator.Key()) 854 855 if !yield(blockHeight, proofsIterator.Value()) { 856 return 857 } 858 859 proofsIterator.Next() 860 } 861 err = proofsIterator.Error() 862 if err != nil { 863 *retErr = stacktrace.Propagate(err) 864 } 865 } 866} 867 868func (t *consensusStore) StoreBlockChallengeProof(tx transaction.WriteIndex, blockHeight uint64, proof []byte) error { 869 err := tx.IndexDB().Set(marshalBlockChallengeProofKey(blockHeight), proof) 870 return stacktrace.Propagate(err) 871} 872 873func (t *consensusStore) DeleteBlockChallengeProofsBelowHeight(ctx context.Context, tx transaction.WriteIndex, blockHeight uint64) error { 874 // we should be good to delete as we iterate, as the writes within the transaction only hit the underlying DB implementation on commit 875 startKey := marshalBlockChallengeProofKey(0) 876 endKey := marshalBlockChallengeProofKey(blockHeight) 877 878 proofsIterator, err := tx.IndexDB().Iterator(startKey, endKey) 879 if err != nil { 880 return stacktrace.Propagate(err) 881 } 882 883 defer proofsIterator.Close() 884 885 for proofsIterator.Valid() { 886 select { 887 case <-ctx.Done(): 888 return stacktrace.Propagate(ctx.Err()) 889 default: 890 } 891 892 err = tx.IndexDB().Delete(slices.Clone(proofsIterator.Key())) 893 if err != nil { 894 return stacktrace.Propagate(err) 895 } 896 897 proofsIterator.Next() 898 } 899 return nil 900} 901 902func marshalValidatorReputationKey(validatorPubKey []byte) []byte { 903 key := make([]byte, TreeValidatorReputationKeyLength) 904 key[0] = TreeValidatorReputationKeyPrefix 905 copy(key[1:], validatorPubKey) 906 return key 907} 908 909// ValidatorReputation implements [ConsensusStore]. 910func (t *consensusStore) ValidatorReputation(tx transaction.Read, validatorPubKey []byte) (uint64, error) { 911 key := marshalValidatorReputationKey(validatorPubKey) 912 913 value, err := tx.Tree().Get(key) 914 if err != nil { 915 return 0, stacktrace.Propagate(err) 916 } 917 918 // the following returns 0 if value is nil: 919 return new(big.Int).SetBytes(value).Uint64(), nil 920} 921 922// ChangeValidatorReputation implements [ConsensusStore]. 923func (t *consensusStore) ChangeValidatorReputation(tx transaction.Write, validatorPubKey []byte, changer func(reputation uint64) (uint64, error)) error { 924 key := marshalValidatorReputationKey(validatorPubKey) 925 926 value, err := tx.Tree().Get(key) 927 if err != nil { 928 return stacktrace.Propagate(err) 929 } 930 931 reputation := new(big.Int).SetBytes(value) 932 newValue, err := changer(reputation.Uint64()) 933 if err != nil { 934 return stacktrace.Propagate(err) 935 } 936 reputation.SetUint64(newValue) 937 if reputation.Sign() <= 0 { 938 _, _, err := tx.Tree().Remove(key) 939 return stacktrace.Propagate(err) 940 } 941 942 _, err = tx.Tree().Set(key, reputation.Bytes()) 943 return stacktrace.Propagate(err) 944} 945 946var minReputationKey = marshalValidatorReputationKey(make([]byte, 32)) 947var maxReputationKey = marshalValidatorReputationKey(slices.Repeat([]byte{0xff}, 32)) 948 949// ChangeAllNonZeroValidatorReputations implements [ConsensusStore]. 950func (t *consensusStore) ChangeAllNonZeroValidatorReputations(tx transaction.Write, changer func(validatorPubKey []byte, reputation uint64) (uint64, error)) error { 951 // we are not allowed to make updates to the tree while an iterator is active 952 // process validators in batches of 100 to avoid loading too many key-value pairs into memory 953 const batchSize = 100 954 955 startingKey := slices.Clone(minReputationKey) 956 skipFirst := false 957 958 type kv struct { 959 key []byte 960 value []byte 961 } 962 963 batch := func() ([]kv, bool, error) { 964 toSet := make([]kv, 0, batchSize) 965 itr, err := tx.Tree().Iterator(startingKey, maxReputationKey, true) 966 if err != nil { 967 return nil, false, stacktrace.Propagate(err) 968 } 969 defer itr.Close() 970 971 if skipFirst && itr.Valid() { 972 itr.Next() 973 } 974 975 for i := 0; itr.Valid() && i < batchSize; i++ { 976 reputation := new(big.Int).SetBytes(itr.Value()) 977 978 keyCopy := slices.Clone(itr.Key()) 979 pubKey := slices.Clone(itr.Key()[1:]) 980 981 newValue, err := changer(pubKey, reputation.Uint64()) 982 if err != nil { 983 return nil, false, stacktrace.Propagate(err) 984 } 985 reputation.SetUint64(newValue) 986 987 if reputation.Sign() <= 0 { 988 toSet = append(toSet, kv{ 989 key: keyCopy, 990 value: nil, 991 }) 992 } else { 993 toSet = append(toSet, kv{ 994 key: keyCopy, 995 value: reputation.Bytes(), 996 }) 997 } 998 999 itr.Next() 1000 } 1001 1002 return toSet, itr.Valid(), nil 1003 } 1004 1005 for { 1006 toSet, goAgain, err := batch() 1007 if err != nil { 1008 return stacktrace.Propagate(err) 1009 } 1010 skipFirst = true 1011 1012 for _, s := range toSet { 1013 var updated bool 1014 if s.value == nil { 1015 _, updated, err = tx.Tree().Remove(s.key) 1016 if err != nil { 1017 return stacktrace.Propagate(err) 1018 } 1019 } else { 1020 updated, err = tx.Tree().Set(s.key, s.value) 1021 if err != nil { 1022 return stacktrace.Propagate(err) 1023 } 1024 } 1025 1026 if !updated { 1027 return stacktrace.NewError("expected to be updating an existing reputation key but didn't") 1028 } 1029 } 1030 1031 if !goAgain || len(toSet) == 0 { 1032 return nil 1033 } 1034 1035 startingKey = toSet[len(toSet)-1].key 1036 } 1037} 1038 1039func MarshalValidatorVotingActivityKey(epochHeight uint64, validatorAddress []byte) []byte { 1040 key := make([]byte, IndexValidatorVotingActivityKeyLength) 1041 key[0] = IndexValidatorVotingActivityPrefix 1042 binary.BigEndian.PutUint64(key[1:], epochHeight) 1043 copy(key[9:], validatorAddress) 1044 return key 1045} 1046 1047func unmarshalValidatorVotingActivityKey(key []byte) (epochHeight uint64, validatorAddress []byte) { 1048 epochHeight = binary.BigEndian.Uint64(key[1:]) 1049 validatorAddress = slices.Clone(key[9:]) 1050 return 1051} 1052 1053func unmarshalValidatorVotingActivityValue(value []byte) (validatorPubKey []byte, bitlist *bitfield.Bitlist64, err error) { 1054 validatorPubKey = slices.Clone(value[:PublicKeyLength]) 1055 bitlist, err = bitfield.Bitlist(value[PublicKeyLength:]).ToBitlist64() 1056 return validatorPubKey, bitlist, stacktrace.Propagate(err) 1057} 1058 1059// InitializeValidatorVotingActivity implements [ConsensusStore]. 1060func (t *consensusStore) InitializeValidatorVotingActivity(tx transaction.WriteIndex, validatorAddress, validatorPubKey []byte, epochHeight uint64) error { 1061 epochHeight = epochHeight - epochHeight%t.epochSize 1062 1063 key := MarshalValidatorVotingActivityKey(epochHeight, validatorAddress) 1064 1065 bitlist := bitfield.NewBitlist64(t.epochSize) 1066 1067 bitlistBytes := bitlist.ToBitlist() 1068 value := make([]byte, PublicKeyLength+len(bitlistBytes)) 1069 copy(value, validatorPubKey) 1070 copy(value[PublicKeyLength:], bitlistBytes) 1071 1072 err := tx.IndexDB().Set(key, value) 1073 return stacktrace.Propagate(err) 1074} 1075 1076// MarkValidatorVote implements [ConsensusStore]. 1077func (t *consensusStore) MarkValidatorVote(tx transaction.WriteIndex, validatorAddress []byte, height uint64) error { 1078 epochHeight := height - height%t.epochSize 1079 1080 key := MarshalValidatorVotingActivityKey(epochHeight, validatorAddress) 1081 1082 value, err := tx.IndexDB().Get(key) 1083 if err != nil { 1084 return stacktrace.Propagate(err) 1085 } 1086 if value == nil { 1087 return stacktrace.Propagate(ErrValidatorNotActive) 1088 } 1089 1090 bitfield := bitfield.Bitlist(value[PublicKeyLength:]) 1091 bitfield.SetBitAt(height%t.epochSize, true) 1092 1093 copy(value[PublicKeyLength:], bitfield) 1094 1095 err = tx.IndexDB().Set(key, value) 1096 return stacktrace.Propagate(err) 1097} 1098 1099// ActiveValidatorPubKey implements [ConsensusStore]. 1100func (t *consensusStore) ActiveValidatorPubKey(tx transaction.Read, height uint64, validatorAddress []byte) ([]byte, error) { 1101 epochHeight := height - height%t.epochSize 1102 1103 key := MarshalValidatorVotingActivityKey(epochHeight, validatorAddress) 1104 1105 value, err := tx.IndexDB().Get(key) 1106 if err != nil { 1107 return nil, stacktrace.Propagate(err) 1108 } 1109 if value == nil { 1110 return nil, stacktrace.Propagate(ErrValidatorNotActive) 1111 } 1112 1113 pubkey, _, err := unmarshalValidatorVotingActivityValue(value) 1114 return pubkey, stacktrace.Propagate(err) 1115} 1116 1117// ActiveValidatorsIterator implements [ConsensusStore]. 1118func (t *consensusStore) ActiveValidatorsIterator(tx transaction.Read, epochHeight uint64, retErr *error) iter.Seq[ActiveValidator] { 1119 return func(yield func(ActiveValidator) bool) { 1120 *retErr = nil 1121 1122 epochHeight = epochHeight - epochHeight%t.epochSize 1123 1124 startKey := MarshalValidatorVotingActivityKey(uint64(epochHeight), make([]byte, AddressLength)) 1125 endKey := MarshalValidatorVotingActivityKey(uint64(epochHeight), slices.Repeat([]byte{0xff}, AddressLength)) 1126 1127 iterator, err := tx.IndexDB().Iterator(startKey, endKey) 1128 if err != nil { 1129 *retErr = stacktrace.Propagate(err) 1130 return 1131 } 1132 1133 defer iterator.Close() 1134 1135 for iterator.Valid() { 1136 _, validatorAddress := unmarshalValidatorVotingActivityKey(iterator.Key()) 1137 validatorPubKey, bitlist, err := unmarshalValidatorVotingActivityValue(iterator.Value()) 1138 if err != nil { 1139 *retErr = stacktrace.Propagate(err) 1140 return 1141 } 1142 1143 if !yield(ActiveValidator{ 1144 Address: validatorAddress, 1145 PublicKey: validatorPubKey, 1146 VoteCount: bitlist.Count(), 1147 }) { 1148 return 1149 } 1150 1151 iterator.Next() 1152 } 1153 err = iterator.Error() 1154 if err != nil { 1155 *retErr = stacktrace.Propagate(err) 1156 } 1157 } 1158} 1159 1160// FallbackBlockHeader implements [ConsensusStore]. 1161func (t *consensusStore) FallbackBlockHeader(tx transaction.Read, height uint64) (cmttypes.Header, error) { 1162 key := make([]byte, IndexBlockHeaderKeyLength) 1163 key[0] = IndexBlockHeaderKeyPrefix 1164 binary.BigEndian.PutUint64(key[1:], height) 1165 1166 value, err := tx.IndexDB().Get(key) 1167 if err != nil { 1168 return cmttypes.Header{}, stacktrace.Propagate(err) 1169 } 1170 if value == nil { 1171 return cmttypes.Header{}, stacktrace.NewError("block header not found") 1172 } 1173 1174 var protoHeader cmtproto.Header 1175 err = protoHeader.Unmarshal(value) 1176 if err != nil { 1177 return cmttypes.Header{}, stacktrace.Propagate(err, "failed to unmarshal block header") 1178 } 1179 header, err := cmttypes.HeaderFromProto(&protoHeader) 1180 if err != nil { 1181 return cmttypes.Header{}, stacktrace.Propagate(err, "failed to convert proto header to header") 1182 } 1183 return header, nil 1184}