package store import ( "context" _ "embed" "encoding/base32" "encoding/binary" "errors" "iter" "math" "math/big" "slices" "strings" "time" "github.com/OffchainLabs/go-bitfield" "github.com/bluesky-social/indigo/atproto/syntax" cmtproto "github.com/cometbft/cometbft/proto/tendermint/types" cmttypes "github.com/cometbft/cometbft/types" ics23 "github.com/cosmos/ics23/go" "github.com/dgraph-io/badger/v4" "github.com/did-method-plc/go-didplc" "github.com/gbl08ma/stacktrace" cbornode "github.com/ipfs/go-ipld-cbor" "github.com/polydawn/refmt/obj/atlas" "github.com/samber/lo" "github.com/samber/mo" "tangled.org/gbl08ma.com/didplcbft/transaction" "tangled.org/gbl08ma.com/didplcbft/types" ) const ( PublicKeyLength = 32 AddressLength = 20 TreeOperationKeyPrefix = 'o' TreeOperationKeyLength = 1 + 8 TreeRangeChallengeCommitmentKeyPrefix = 'C' TreeRangeChallengeKeyLength = 1 + PublicKeyLength TreeChallengeCompletionKeyPrefix = 'p' TreeChallengeCompletionKeyLength = 1 + PublicKeyLength TreeValidatorReputationKeyPrefix = 'r' TreeValidatorReputationKeyLength = 1 + PublicKeyLength IndexBlockChallengeKeyPrefix = 'c' IndexBlockChallengeKeyLength = 1 + 8 IndexDIDLogKeyPrefix = 'l' IndexDIDLogKeyLength = 1 + 15 + 8 IndexValidatorVotingActivityPrefix = 'v' IndexValidatorVotingActivityKeyLength = 1 + AddressLength + 8 IndexBlockHeaderKeyPrefix = 'h' IndexBlockHeaderKeyLength = 1 + 8 TreeAuthoritativePLCKey = "aPLCURL" TreeAuthoritativeImportProgressKey = "aImportProgress" ) var Consensus ConsensusStore = &consensusStore{} var ErrNoActiveChallengeCommitment = errors.New("the validator is currently not committed to a challenge") var ErrNoRecentChallengeCompletion = errors.New("the validator has not completed a range challenge recently") var ErrValidatorNotActive = errors.New("the validator is not active") // ConsensusStore manages all information that is directly or indirectly protected by consensus type ConsensusStore interface { AuditLog(ctx context.Context, tx transaction.Read, did string, withProof bool) ([]types.SequencedLogEntry, *ics23.CommitmentProof, error) AuditLogReverseIterator(ctx context.Context, tx transaction.Read, did string, err *error) iter.Seq[types.SequencedLogEntry] ExportOperations(ctx context.Context, tx transaction.Read, after uint64, count int) ([]types.SequencedLogEntry, error) // passing a count of zero means unlimited OperationsIterator(tx transaction.Read, after uint64, retErr *error) iter.Seq2[types.SequencedLogEntry, []byte] StoreOperation(ctx context.Context, tx transaction.Write, entry didplc.LogEntry, nullifyWithSequenceEqualOrGreaterThan mo.Option[uint64]) error SetOperationCreatedAt(tx transaction.Write, seqID uint64, createdAt time.Time) error CountOperations(tx transaction.Read) (uint64, error) AuthoritativePLC(tx transaction.Read) (string, error) SetAuthoritativePLC(tx transaction.Write, url string) error AuthoritativeImportProgress(tx transaction.Read) (uint64, error) SetAuthoritativeImportProgress(tx transaction.Write, nextCursor uint64) error ValidatorRangeChallengeCommitment(tx transaction.Read, validatorPubKey []byte) (fromHeight, toHeight, provenHeight, includedOnHeight uint64, treeRoot []byte, err error) SetValidatorRangeChallengeCommitment(tx transaction.Write, validatorPubKey []byte, fromHeight, toHeight, provenHeight, includedOnHeight uint64, treeRoot []byte) error ClearValidatorRangeChallengeCommitment(tx transaction.Write, validatorPubKey []byte) error ValidatorRangeChallengeCompletion(tx transaction.Read, validatorPubKey []byte) (uint64, error) SetValidatorRangeChallengeCompletion(tx transaction.Write, validatorPubKey []byte, completedToHeight uint64) error BlockChallengeProof(tx transaction.Read, height uint64) ([]byte, error) BlockChallengeProofsIterator(tx transaction.Read, afterHeight uint64, retErr *error) iter.Seq2[uint64, []byte] // afterHeight is exclusive for consistency with OperationsIterator BlockChallengeProofsReverseIterator(tx transaction.Read, beforeHeight uint64, retErr *error) iter.Seq2[uint64, []byte] StoreBlockChallengeProof(tx transaction.WriteIndex, blockHeight uint64, proof []byte) error DeleteBlockChallengeProofsBelowHeight(ctx context.Context, tx transaction.WriteIndex, blockHeight uint64) error ValidatorReputation(tx transaction.Read, validatorPubKey []byte) (uint64, error) ChangeValidatorReputation(tx transaction.Write, validatorPubKey []byte, changer func(reputation uint64) (uint64, error)) error ChangeAllNonZeroValidatorReputations(tx transaction.Write, changer func(validatorPubKey []byte, reputation uint64) (uint64, error)) error ActiveValidatorPubKey(tx transaction.Read, height uint64, validatorAddress []byte) ([]byte, error) ActiveValidatorsIterator(tx transaction.Read, epochHeight uint64, retErr *error) iter.Seq[ActiveValidator] InitializeValidatorVotingActivity(tx transaction.WriteIndex, validatorAddress, validatorPubKey []byte, epochHeight uint64) error MarkValidatorVote(tx transaction.WriteIndex, validatorAddress []byte, height uint64) error FallbackBlockHeader(tx transaction.Read, height uint64) (cmttypes.Header, error) ConfigureEpochSize(epochSize uint64) } type ActiveValidator struct { Address []byte PublicKey []byte VoteCount uint64 } var _ ConsensusStore = (*consensusStore)(nil) // consensusStore exists just to groups methods nicely type consensusStore struct { epochSize uint64 } // ConfigureEpochSize implements [ConsensusStore]. func (t *consensusStore) ConfigureEpochSize(epochSize uint64) { t.epochSize = epochSize } func (t *consensusStore) AuditLog(ctx context.Context, tx transaction.Read, did string, withProof bool) ([]types.SequencedLogEntry, *ics23.CommitmentProof, error) { didBytes, err := DIDToBytes(did) if err != nil { return nil, nil, stacktrace.Propagate(err) } didRangeStart := marshalDIDLogKey(didBytes, 0) didRangeEnd := marshalDIDLogKey(didBytes, math.MaxUint64) didLogIterator, err := tx.IndexDB().Iterator(didRangeStart, didRangeEnd) if err != nil { return nil, nil, stacktrace.Propagate(err) } defer didLogIterator.Close() logEntries := make([]types.SequencedLogEntry, 0, 1) proofs := []*ics23.CommitmentProof{} txHeight := uint64(tx.Height()) for didLogIterator.Valid() { select { case <-ctx.Done(): return nil, nil, stacktrace.Propagate(ctx.Err()) default: } sequence := unmarshalDIDLogSequence(didLogIterator.Key()) validFromHeight, validToHeight := UnmarshalDIDLogValue(didLogIterator.Value()) opKey := marshalOperationKey(sequence) if txHeight >= validFromHeight && txHeight <= validToHeight { operationValue, err := tx.Tree().Get(opKey) if err != nil { return nil, nil, stacktrace.Propagate(err) } if withProof { proof, err := tx.Tree().GetProof(opKey) if err != nil { return nil, nil, stacktrace.Propagate(err) } proofs = append(proofs, proof) } logEntry, err := unmarshalLogEntry(opKey, operationValue) if err != nil { return nil, nil, stacktrace.Propagate(err) } logEntries = append(logEntries, logEntry) } didLogIterator.Next() } err = didLogIterator.Error() if err != nil { return nil, nil, stacktrace.Propagate(err) } var combinedProof *ics23.CommitmentProof if withProof { combinedProof, err = ics23.CombineProofs(proofs) if err != nil { return nil, nil, stacktrace.Propagate(err, "failed to combine proofs") } } return logEntries, combinedProof, nil } func (t *consensusStore) AuditLogReverseIterator(ctx context.Context, tx transaction.Read, did string, retErr *error) iter.Seq[types.SequencedLogEntry] { return func(yield func(types.SequencedLogEntry) bool) { *retErr = nil didBytes, err := DIDToBytes(did) if err != nil { *retErr = stacktrace.Propagate(err) return } mayBePresent := tx.TestDIDBloomFilter(didBytes) if !mayBePresent { return } didRangeStart := marshalDIDLogKey(didBytes, 0) didRangeEnd := marshalDIDLogKey(didBytes, math.MaxUint64) opts := badger.DefaultIteratorOptions // we rarely have more than one entry for a DID and we see great performance gains from not prefetching // (iterating on DIDs that don't exist goes from taking over 150us to taking around 35 us) opts.PrefetchValues = false opts.Reverse = true didLogIterator, err := tx.IndexDB().IteratorWithOptions(didRangeStart, didRangeEnd, opts) if err != nil { *retErr = stacktrace.Propagate(err) return } defer didLogIterator.Close() txHeight := uint64(tx.Height()) for didLogIterator.Valid() { select { case <-ctx.Done(): *retErr = stacktrace.Propagate(ctx.Err()) return default: } sequence := unmarshalDIDLogSequence(didLogIterator.Key()) validFromHeight, validToHeight := UnmarshalDIDLogValue(didLogIterator.Value()) opKey := marshalOperationKey(sequence) if txHeight >= validFromHeight && txHeight <= validToHeight { operationValue, err := tx.Tree().Get(opKey) if err != nil { *retErr = stacktrace.Propagate(err) return } logEntry, err := unmarshalLogEntry(opKey, operationValue) if err != nil { *retErr = stacktrace.Propagate(err) return } if !yield(logEntry) { return } } didLogIterator.Next() } err = didLogIterator.Error() if err != nil { *retErr = stacktrace.Propagate(err) } } } func (t *consensusStore) ExportOperations(ctx context.Context, tx transaction.Read, after uint64, count int) ([]types.SequencedLogEntry, error) { entries := make([]types.SequencedLogEntry, 0, count) var iterErr error for logEntry := range t.OperationsIterator(tx, after, &iterErr) { entries = append(entries, logEntry) // this condition being checked here also makes it so that a count of zero means unlimited if len(entries) == count { break } } if iterErr != nil { return nil, stacktrace.Propagate(iterErr, "ran into an error while iterating") } return entries, nil } func (t *consensusStore) OperationsIterator(tx transaction.Read, after uint64, retErr *error) iter.Seq2[types.SequencedLogEntry, []byte] { return func(yield func(types.SequencedLogEntry, []byte) bool) { *retErr = nil // as the name suggests, after is an exclusive lower bound, but our iterators use inclusive lower bounds start := after + 1 startKey := marshalOperationKey(start) endKey := maxOperationKey opIterator, err := tx.Tree().Iterator(startKey, endKey, true) if err != nil { *retErr = stacktrace.Propagate(err) return } defer opIterator.Close() for opIterator.Valid() { logEntry, err := unmarshalLogEntry(opIterator.Key(), opIterator.Value()) if err != nil { *retErr = stacktrace.Propagate(err) return } if !yield(logEntry, opIterator.Value()) { return } opIterator.Next() } err = opIterator.Error() if err != nil { *retErr = stacktrace.Propagate(err) } } } // StoreOperation stores an operation in the tree, nullifying existing operations whose index within the DID's history // is lower or equal to the specified optional integer. func (t *consensusStore) StoreOperation(ctx context.Context, tx transaction.Write, entry didplc.LogEntry, nullifyWithSequenceEqualOrGreaterThan mo.Option[uint64]) error { didBytes, err := DIDToBytes(entry.DID) if err != nil { return stacktrace.Propagate(err) } txHeight := uint64(tx.Height()) if nullifyEGt, ok := nullifyWithSequenceEqualOrGreaterThan.Get(); ok { didRangeStart := marshalDIDLogKey(didBytes, nullifyEGt) didRangeEnd := marshalDIDLogKey(didBytes, math.MaxUint64) opts := badger.DefaultIteratorOptions opts.PrefetchValues = false opts.Reverse = true didLogIterator, err := tx.IndexDB().IteratorWithOptions(didRangeStart, didRangeEnd, opts) if err != nil { return stacktrace.Propagate(err) } defer didLogIterator.Close() txHeight := uint64(tx.Height()) for didLogIterator.Valid() { select { case <-ctx.Done(): return stacktrace.Propagate(ctx.Err()) default: } sequence := unmarshalDIDLogSequence(didLogIterator.Key()) validFromHeight, validToHeight := UnmarshalDIDLogValue(didLogIterator.Value()) opKey := marshalOperationKey(sequence) if txHeight < validFromHeight || txHeight > validToHeight { // ignore ops that are invisible at this height didLogIterator.Next() continue } operationValue, err := tx.Tree().Get(opKey) if err != nil { return stacktrace.Propagate(err) } updated, err := tx.Tree().Set(opKey, markOperationValueNullified(operationValue)) if err != nil { return stacktrace.Propagate(err) } if !updated { // if we get to this point we have a mistake in our program, and the data is now inconsistent // we are not supposed to be able to recover from this error without rolling back the tree return stacktrace.NewError("expected to be updating an existing operation key but wrote new one instead") } didLogIterator.Next() } err = didLogIterator.Error() if err != nil { return stacktrace.Propagate(err) } } opDatetime, err := syntax.ParseDatetime(entry.CreatedAt) if err != nil { return stacktrace.Propagate(err, "invalid CreatedAt") } sequence, err := tx.NextSequence() if err != nil { return stacktrace.Propagate(err) } operation := entry.Operation.AsOperation() opKey := marshalOperationKey(sequence) opValue := marshalOperationValue(entry.Nullified, didBytes, opDatetime.Time(), operation) updated, err := tx.Tree().Set(opKey, opValue) if err != nil { return stacktrace.Propagate(err) } if updated { // if we get to this point we have a mistake in our program, and the data is now inconsistent // we are not supposed to be able to recover from this error without rolling back the tree return stacktrace.NewError("expected to be writing to a new operation key but updated instead") } logKey := marshalDIDLogKey(didBytes, sequence) logValue := marshalDIDLogValue(txHeight, math.MaxUint64) err = tx.IndexDB().Set(logKey, logValue) if err != nil { return stacktrace.Propagate(err) } tx.AddToDIDBloomFilter(didBytes, sequence) return nil } func (t *consensusStore) SetOperationCreatedAt(tx transaction.Write, seqID uint64, createdAt time.Time) error { opKey := marshalOperationKey(seqID) opValue, err := tx.Tree().Get(opKey) if err != nil { return stacktrace.Propagate(err) } if len(opValue) == 0 { return stacktrace.NewError("operation %d not found", seqID) } opValue = slices.Clone(opValue) ts := uint64(createdAt.Truncate(1 * time.Millisecond).UTC().UnixNano()) binary.BigEndian.PutUint64(opValue[16:24], ts) updated, err := tx.Tree().Set(opKey, opValue) if !updated { // if we get to this point we have a mistake in our program, and the data is now inconsistent // we are not supposed to be able to recover from this error without rolling back the tree return stacktrace.NewError("expected to be updating an existing operation key but wrote new one instead") } return stacktrace.Propagate(err) } var minOperationKey = marshalOperationKey(0) var maxOperationKey = marshalOperationKey(math.MaxInt64) func (t *consensusStore) CountOperations(tx transaction.Read) (uint64, error) { seq := uint64(0) itr, err := tx.Tree().Iterator(minOperationKey, maxOperationKey, false) if err != nil { return 0, stacktrace.Propagate(err) } defer itr.Close() if itr.Valid() { seq, err = unmarshalOperationKey(itr.Key()) if err != nil { return 0, stacktrace.Propagate(err) } } return seq, stacktrace.Propagate(err) } func DIDToBytes(did string) ([]byte, error) { if !strings.HasPrefix(did, "did:plc:") { return nil, stacktrace.NewError("invalid did:plc") } didBytes := make([]byte, 15) did = strings.ToUpper(did) numWritten, err := base32.StdEncoding.Decode(didBytes, []byte(did[8:])) if err != nil { return nil, stacktrace.Propagate(err, "invalid did:plc") } if numWritten != 15 { return nil, stacktrace.NewError("invalid did:plc") } return didBytes, nil } func bytesToDID(didBytes []byte) (string, error) { did := "did:plc:" + strings.ToLower(base32.StdEncoding.EncodeToString(didBytes)) if len(did) != 8+24 { return "", stacktrace.NewError("invalid did:plc") } return did, nil } func marshalDIDLogKey(didBytes []byte, sequence uint64) []byte { key := make([]byte, IndexDIDLogKeyLength) key[0] = IndexDIDLogKeyPrefix copy(key[1:16], didBytes) binary.BigEndian.PutUint64(key[16:], sequence) return key } func unmarshalDIDLogSequence(logKey []byte) uint64 { return binary.BigEndian.Uint64(logKey[16:24]) } // validFromHeight, validToHeight are inclusive (i.e. if the former is 5 and the latter is 10, the value was valid at height 5 and 10, but not at 4 or 11) func marshalDIDLogValue(validFromHeight, validToHeight uint64) []byte { value := make([]byte, 8+8) binary.BigEndian.PutUint64(value, validFromHeight) binary.BigEndian.PutUint64(value[8:], validToHeight) return value } func UnmarshalDIDLogValue(value []byte) (validFromHeight, validToHeight uint64) { validFromHeight = binary.BigEndian.Uint64(value[0:8]) validToHeight = binary.BigEndian.Uint64(value[8:16]) return } func marshalOperationKey(sequence uint64) []byte { key := make([]byte, TreeOperationKeyLength) key[0] = TreeOperationKeyPrefix binary.BigEndian.PutUint64(key[1:], sequence) return key } func unmarshalOperationKey(key []byte) (uint64, error) { return binary.BigEndian.Uint64(key[1:9]), nil } func marshalOperationValue(nullified bool, didBytes []byte, createdAt time.Time, operation didplc.Operation) []byte { opAsBytes := operation.SignedCBORBytes() o := make([]byte, 1+15+8+len(opAsBytes)) o[0] = lo.Ternary[byte](nullified, 1, 0) copy(o[1:16], didBytes) ts := uint64(createdAt.Truncate(1 * time.Millisecond).UTC().UnixNano()) binary.BigEndian.PutUint64(o[16:24], ts) copy(o[24:], opAsBytes) return o } func unmarshalOperationValue(value []byte) (bool, string, time.Time, didplc.OpEnum, error) { nullified := value[0] != 0 did, err := bytesToDID(value[1:16]) if err != nil { return false, "", time.Time{}, didplc.OpEnum{}, stacktrace.Propagate(err) } createdAtUnixNano := binary.BigEndian.Uint64(value[16:24]) createdAt := time.Unix(0, int64(createdAtUnixNano)).UTC() var opEnum didplc.OpEnum err = cbornode.DecodeInto(value[24:], &opEnum) if err != nil { return false, "", time.Time{}, didplc.OpEnum{}, stacktrace.Propagate(err) } return nullified, did, createdAt, opEnum, nil } func markOperationValueNullified(value []byte) []byte { v := slices.Clone(value) v[0] = 1 return v } func unmarshalLogEntry(operationKey, operationValue []byte) (types.SequencedLogEntry, error) { nullified, actualDID, timestamp, operation, err := unmarshalOperationValue(operationValue) if err != nil { return types.SequencedLogEntry{}, stacktrace.Propagate(err) } seq, err := unmarshalOperationKey(operationKey) if err != nil { return types.SequencedLogEntry{}, stacktrace.Propagate(err) } return types.SequencedLogEntry{ Seq: seq, DID: actualDID, Operation: operation, CID: operation.AsOperation().CID(), Nullified: nullified, CreatedAt: timestamp, }, nil } func init() { cbornode.RegisterCborType(atlas.BuildEntry(didplc.OpEnum{}). Transform(). TransformMarshal(atlas.MakeMarshalTransformFunc( func(o didplc.OpEnum) (interface{}, error) { if o.Regular != nil { return o.Regular, nil } else if o.Legacy != nil { return o.Legacy, nil } else if o.Tombstone != nil { return o.Tombstone, nil } return nil, stacktrace.NewError("invalid OpEnum") })). TransformUnmarshal(atlas.MakeUnmarshalTransformFunc( func(x map[string]any) (didplc.OpEnum, error) { typ, ok := x["type"] if !ok { return didplc.OpEnum{}, stacktrace.NewError("did not find expected operation 'type' field") } // this is so stupid but oh well - maybe one day the entire thing will be upgraded to a less stupid cbor encoder b, err := cbornode.DumpObject(x) if err != nil { return didplc.OpEnum{}, stacktrace.Propagate(err) } switch typ { case "plc_operation": o := &didplc.RegularOp{} err = cbornode.DecodeInto(b, &o) return didplc.OpEnum{ Regular: o, }, stacktrace.Propagate(err) case "create": o := &didplc.LegacyOp{} err = cbornode.DecodeInto(b, &o) return didplc.OpEnum{ Legacy: o, }, stacktrace.Propagate(err) case "plc_tombstone": o := &didplc.TombstoneOp{} err = cbornode.DecodeInto(b, &o) return didplc.OpEnum{ Tombstone: o, }, stacktrace.Propagate(err) default: return didplc.OpEnum{}, stacktrace.NewError("unexpected operation type: %s", typ) } })). Complete()) } func (t *consensusStore) AuthoritativePLC(tx transaction.Read) (string, error) { url, err := tx.Tree().Get([]byte(TreeAuthoritativePLCKey)) if err != nil { return "", stacktrace.Propagate(err) } if url == nil { return "", nil } return string(url), nil } func (t *consensusStore) SetAuthoritativePLC(tx transaction.Write, url string) error { _, err := tx.Tree().Set([]byte(TreeAuthoritativePLCKey), []byte(url)) return stacktrace.Propagate(err) } func (t *consensusStore) AuthoritativeImportProgress(tx transaction.Read) (uint64, error) { progBytes, err := tx.Tree().Get([]byte(TreeAuthoritativeImportProgressKey)) if err != nil { return 0, stacktrace.Propagate(err) } if len(progBytes) != 8 { return 0, nil } return binary.BigEndian.Uint64(progBytes), nil } func (t *consensusStore) SetAuthoritativeImportProgress(tx transaction.Write, nextCursor uint64) error { value := make([]byte, 8) binary.BigEndian.PutUint64(value, nextCursor) _, err := tx.Tree().Set([]byte(TreeAuthoritativeImportProgressKey), value) return stacktrace.Propagate(err) } func marshalRangeChallengeCommitmentKey(validatorPubKey []byte) []byte { key := make([]byte, TreeRangeChallengeKeyLength) key[0] = TreeRangeChallengeCommitmentKeyPrefix copy(key[1:], validatorPubKey) return key } func marshalRangeChallengeCommitmentValue(fromHeight, toHeight, provenHeight, includedOnHeight uint64, treeRoot []byte) []byte { value := make([]byte, 8*4+32) binary.BigEndian.PutUint64(value, fromHeight) binary.BigEndian.PutUint64(value[8:], toHeight) binary.BigEndian.PutUint64(value[16:], provenHeight) binary.BigEndian.PutUint64(value[24:], includedOnHeight) copy(value[32:], treeRoot[0:32]) return value } func unmarshalRangeChallengeCommitmentValue(value []byte) (fromHeight, toHeight, provenHeight, includedOnHeight uint64, treeRoot []byte) { fromHeight = binary.BigEndian.Uint64(value) toHeight = binary.BigEndian.Uint64(value[8:]) provenHeight = binary.BigEndian.Uint64(value[16:]) includedOnHeight = binary.BigEndian.Uint64(value[24:]) treeRoot = slices.Clone(value[32:]) return } // ValidatorRangeChallengeCommitment implements [ConsensusStore]. func (t *consensusStore) ValidatorRangeChallengeCommitment(tx transaction.Read, validatorPubKey []byte) (fromHeight, toHeight, provenHeight, includedOnHeight uint64, treeRoot []byte, err error) { key := marshalRangeChallengeCommitmentKey(validatorPubKey) value, err := tx.Tree().Get(key) if err != nil { return 0, 0, 0, 0, nil, stacktrace.Propagate(err) } if value == nil { return 0, 0, 0, 0, nil, stacktrace.Propagate(ErrNoActiveChallengeCommitment) } fromHeight, toHeight, provenHeight, includedOnHeight, treeRoot = unmarshalRangeChallengeCommitmentValue(value) return } // SetValidatorRangeChallengeCommitment implements [ConsensusStore]. func (t *consensusStore) SetValidatorRangeChallengeCommitment(tx transaction.Write, validatorPubKey []byte, fromHeight, toHeight, provenHeight, includedOnHeight uint64, treeRoot []byte) error { key := marshalRangeChallengeCommitmentKey(validatorPubKey) value := marshalRangeChallengeCommitmentValue(fromHeight, toHeight, provenHeight, includedOnHeight, treeRoot) // this may overwrite sometimes (e.g. if a previous commitment has expired and the validator needs to submit a new one) _, err := tx.Tree().Set(key, value) return stacktrace.Propagate(err) } // ClearValidatorRangeChallengeCommitment implements [ConsensusStore]. func (t *consensusStore) ClearValidatorRangeChallengeCommitment(tx transaction.Write, validatorPubKey []byte) error { _, removed, err := tx.Tree().Remove(marshalRangeChallengeCommitmentKey(validatorPubKey)) if err != nil { return stacktrace.Propagate(err) } if !removed { // we are only expecting to call this after completing a range challenge, so we're expecting that the key would still exist // (we would have needed it to validate the challenge completion transaction) return stacktrace.NewError("validator did not have a range challenge commitment") } return nil } func marshalRangeChallengeCompletionKey(validatorPubKey []byte) []byte { key := make([]byte, TreeChallengeCompletionKeyLength) key[0] = TreeChallengeCompletionKeyPrefix copy(key[1:], validatorPubKey) return key } // ValidatorRangeChallengeCompletion implements [ConsensusStore]. func (t *consensusStore) ValidatorRangeChallengeCompletion(tx transaction.Read, validatorPubKey []byte) (uint64, error) { key := marshalRangeChallengeCompletionKey(validatorPubKey) value, err := tx.Tree().Get(key) if err != nil { return 0, stacktrace.Propagate(err) } if value == nil { return 0, stacktrace.Propagate(ErrNoRecentChallengeCompletion) } return binary.BigEndian.Uint64(value), nil } // SetValidatorRangeChallengeCompletion implements [ConsensusStore]. func (t *consensusStore) SetValidatorRangeChallengeCompletion(tx transaction.Write, validatorPubKey []byte, completedToHeight uint64) error { key := marshalRangeChallengeCompletionKey(validatorPubKey) value := binary.BigEndian.AppendUint64(nil, completedToHeight) _, err := tx.Tree().Set(key, value) return stacktrace.Propagate(err) } func marshalBlockChallengeProofKey(block uint64) []byte { key := make([]byte, IndexBlockChallengeKeyLength) key[0] = IndexBlockChallengeKeyPrefix binary.BigEndian.PutUint64(key[1:], block) return key } func unmarshalBlockChallengeProofKey(key []byte) uint64 { return binary.BigEndian.Uint64(key[1:9]) } var minBlockChallengeProofKey = marshalBlockChallengeProofKey(0) var maxBlockChallengeProofKey = marshalBlockChallengeProofKey(math.MaxInt64) func (t *consensusStore) BlockChallengeProof(tx transaction.Read, height uint64) ([]byte, error) { key := marshalBlockChallengeProofKey(height) value, err := tx.IndexDB().Get(key) return value, stacktrace.Propagate(err) } func (t *consensusStore) BlockChallengeProofsIterator(tx transaction.Read, afterHeight uint64, retErr *error) iter.Seq2[uint64, []byte] { return func(yield func(uint64, []byte) bool) { *retErr = nil // as the name suggests, after is an exclusive lower bound, but our iterators use inclusive lower bounds start := afterHeight + 1 startKey := marshalBlockChallengeProofKey(start) endKey := maxBlockChallengeProofKey proofsIterator, err := tx.IndexDB().Iterator(startKey, endKey) if err != nil { *retErr = stacktrace.Propagate(err) return } defer proofsIterator.Close() for proofsIterator.Valid() { blockHeight := unmarshalBlockChallengeProofKey(proofsIterator.Key()) if !yield(blockHeight, proofsIterator.Value()) { return } proofsIterator.Next() } err = proofsIterator.Error() if err != nil { *retErr = stacktrace.Propagate(err) } } } // BlockChallengeProofsReverseIterator implements [ConsensusStore]. func (t *consensusStore) BlockChallengeProofsReverseIterator(tx transaction.Read, beforeHeight uint64, retErr *error) iter.Seq2[uint64, []byte] { return func(yield func(uint64, []byte) bool) { *retErr = nil startKey := minBlockChallengeProofKey endKey := marshalBlockChallengeProofKey(beforeHeight) proofsIterator, err := tx.IndexDB().ReverseIterator(startKey, endKey) if err != nil { *retErr = stacktrace.Propagate(err) return } defer proofsIterator.Close() for proofsIterator.Valid() { blockHeight := unmarshalBlockChallengeProofKey(proofsIterator.Key()) if !yield(blockHeight, proofsIterator.Value()) { return } proofsIterator.Next() } err = proofsIterator.Error() if err != nil { *retErr = stacktrace.Propagate(err) } } } func (t *consensusStore) StoreBlockChallengeProof(tx transaction.WriteIndex, blockHeight uint64, proof []byte) error { err := tx.IndexDB().Set(marshalBlockChallengeProofKey(blockHeight), proof) return stacktrace.Propagate(err) } func (t *consensusStore) DeleteBlockChallengeProofsBelowHeight(ctx context.Context, tx transaction.WriteIndex, blockHeight uint64) error { // we should be good to delete as we iterate, as the writes within the transaction only hit the underlying DB implementation on commit startKey := marshalBlockChallengeProofKey(0) endKey := marshalBlockChallengeProofKey(blockHeight) proofsIterator, err := tx.IndexDB().Iterator(startKey, endKey) if err != nil { return stacktrace.Propagate(err) } defer proofsIterator.Close() for proofsIterator.Valid() { select { case <-ctx.Done(): return stacktrace.Propagate(ctx.Err()) default: } err = tx.IndexDB().Delete(slices.Clone(proofsIterator.Key())) if err != nil { return stacktrace.Propagate(err) } proofsIterator.Next() } return nil } func marshalValidatorReputationKey(validatorPubKey []byte) []byte { key := make([]byte, TreeValidatorReputationKeyLength) key[0] = TreeValidatorReputationKeyPrefix copy(key[1:], validatorPubKey) return key } // ValidatorReputation implements [ConsensusStore]. func (t *consensusStore) ValidatorReputation(tx transaction.Read, validatorPubKey []byte) (uint64, error) { key := marshalValidatorReputationKey(validatorPubKey) value, err := tx.Tree().Get(key) if err != nil { return 0, stacktrace.Propagate(err) } // the following returns 0 if value is nil: return new(big.Int).SetBytes(value).Uint64(), nil } // ChangeValidatorReputation implements [ConsensusStore]. func (t *consensusStore) ChangeValidatorReputation(tx transaction.Write, validatorPubKey []byte, changer func(reputation uint64) (uint64, error)) error { key := marshalValidatorReputationKey(validatorPubKey) value, err := tx.Tree().Get(key) if err != nil { return stacktrace.Propagate(err) } reputation := new(big.Int).SetBytes(value) newValue, err := changer(reputation.Uint64()) if err != nil { return stacktrace.Propagate(err) } reputation.SetUint64(newValue) if reputation.Sign() <= 0 { _, _, err := tx.Tree().Remove(key) return stacktrace.Propagate(err) } _, err = tx.Tree().Set(key, reputation.Bytes()) return stacktrace.Propagate(err) } var minReputationKey = marshalValidatorReputationKey(make([]byte, 32)) var maxReputationKey = marshalValidatorReputationKey(slices.Repeat([]byte{0xff}, 32)) // ChangeAllNonZeroValidatorReputations implements [ConsensusStore]. func (t *consensusStore) ChangeAllNonZeroValidatorReputations(tx transaction.Write, changer func(validatorPubKey []byte, reputation uint64) (uint64, error)) error { // we are not allowed to make updates to the tree while an iterator is active // process validators in batches of 100 to avoid loading too many key-value pairs into memory const batchSize = 100 startingKey := slices.Clone(minReputationKey) skipFirst := false type kv struct { key []byte value []byte } batch := func() ([]kv, bool, error) { toSet := make([]kv, 0, batchSize) itr, err := tx.Tree().Iterator(startingKey, maxReputationKey, true) if err != nil { return nil, false, stacktrace.Propagate(err) } defer itr.Close() if skipFirst && itr.Valid() { itr.Next() } for i := 0; itr.Valid() && i < batchSize; i++ { reputation := new(big.Int).SetBytes(itr.Value()) keyCopy := slices.Clone(itr.Key()) pubKey := slices.Clone(itr.Key()[1:]) newValue, err := changer(pubKey, reputation.Uint64()) if err != nil { return nil, false, stacktrace.Propagate(err) } reputation.SetUint64(newValue) if reputation.Sign() <= 0 { toSet = append(toSet, kv{ key: keyCopy, value: nil, }) } else { toSet = append(toSet, kv{ key: keyCopy, value: reputation.Bytes(), }) } itr.Next() } return toSet, itr.Valid(), nil } for { toSet, goAgain, err := batch() if err != nil { return stacktrace.Propagate(err) } skipFirst = true for _, s := range toSet { var updated bool if s.value == nil { _, updated, err = tx.Tree().Remove(s.key) if err != nil { return stacktrace.Propagate(err) } } else { updated, err = tx.Tree().Set(s.key, s.value) if err != nil { return stacktrace.Propagate(err) } } if !updated { return stacktrace.NewError("expected to be updating an existing reputation key but didn't") } } if !goAgain || len(toSet) == 0 { return nil } startingKey = toSet[len(toSet)-1].key } } func MarshalValidatorVotingActivityKey(epochHeight uint64, validatorAddress []byte) []byte { key := make([]byte, IndexValidatorVotingActivityKeyLength) key[0] = IndexValidatorVotingActivityPrefix binary.BigEndian.PutUint64(key[1:], epochHeight) copy(key[9:], validatorAddress) return key } func unmarshalValidatorVotingActivityKey(key []byte) (epochHeight uint64, validatorAddress []byte) { epochHeight = binary.BigEndian.Uint64(key[1:]) validatorAddress = slices.Clone(key[9:]) return } func unmarshalValidatorVotingActivityValue(value []byte) (validatorPubKey []byte, bitlist *bitfield.Bitlist64, err error) { validatorPubKey = slices.Clone(value[:PublicKeyLength]) bitlist, err = bitfield.Bitlist(value[PublicKeyLength:]).ToBitlist64() return validatorPubKey, bitlist, stacktrace.Propagate(err) } // InitializeValidatorVotingActivity implements [ConsensusStore]. func (t *consensusStore) InitializeValidatorVotingActivity(tx transaction.WriteIndex, validatorAddress, validatorPubKey []byte, epochHeight uint64) error { epochHeight = epochHeight - epochHeight%t.epochSize key := MarshalValidatorVotingActivityKey(epochHeight, validatorAddress) bitlist := bitfield.NewBitlist64(t.epochSize) bitlistBytes := bitlist.ToBitlist() value := make([]byte, PublicKeyLength+len(bitlistBytes)) copy(value, validatorPubKey) copy(value[PublicKeyLength:], bitlistBytes) err := tx.IndexDB().Set(key, value) return stacktrace.Propagate(err) } // MarkValidatorVote implements [ConsensusStore]. func (t *consensusStore) MarkValidatorVote(tx transaction.WriteIndex, validatorAddress []byte, height uint64) error { epochHeight := height - height%t.epochSize key := MarshalValidatorVotingActivityKey(epochHeight, validatorAddress) value, err := tx.IndexDB().Get(key) if err != nil { return stacktrace.Propagate(err) } if value == nil { return stacktrace.Propagate(ErrValidatorNotActive) } bitfield := bitfield.Bitlist(value[PublicKeyLength:]) bitfield.SetBitAt(height%t.epochSize, true) copy(value[PublicKeyLength:], bitfield) err = tx.IndexDB().Set(key, value) return stacktrace.Propagate(err) } // ActiveValidatorPubKey implements [ConsensusStore]. func (t *consensusStore) ActiveValidatorPubKey(tx transaction.Read, height uint64, validatorAddress []byte) ([]byte, error) { epochHeight := height - height%t.epochSize key := MarshalValidatorVotingActivityKey(epochHeight, validatorAddress) value, err := tx.IndexDB().Get(key) if err != nil { return nil, stacktrace.Propagate(err) } if value == nil { return nil, stacktrace.Propagate(ErrValidatorNotActive) } pubkey, _, err := unmarshalValidatorVotingActivityValue(value) return pubkey, stacktrace.Propagate(err) } // ActiveValidatorsIterator implements [ConsensusStore]. func (t *consensusStore) ActiveValidatorsIterator(tx transaction.Read, epochHeight uint64, retErr *error) iter.Seq[ActiveValidator] { return func(yield func(ActiveValidator) bool) { *retErr = nil epochHeight = epochHeight - epochHeight%t.epochSize startKey := MarshalValidatorVotingActivityKey(uint64(epochHeight), make([]byte, AddressLength)) endKey := MarshalValidatorVotingActivityKey(uint64(epochHeight), slices.Repeat([]byte{0xff}, AddressLength)) iterator, err := tx.IndexDB().Iterator(startKey, endKey) if err != nil { *retErr = stacktrace.Propagate(err) return } defer iterator.Close() for iterator.Valid() { _, validatorAddress := unmarshalValidatorVotingActivityKey(iterator.Key()) validatorPubKey, bitlist, err := unmarshalValidatorVotingActivityValue(iterator.Value()) if err != nil { *retErr = stacktrace.Propagate(err) return } if !yield(ActiveValidator{ Address: validatorAddress, PublicKey: validatorPubKey, VoteCount: bitlist.Count(), }) { return } iterator.Next() } err = iterator.Error() if err != nil { *retErr = stacktrace.Propagate(err) } } } // FallbackBlockHeader implements [ConsensusStore]. func (t *consensusStore) FallbackBlockHeader(tx transaction.Read, height uint64) (cmttypes.Header, error) { key := make([]byte, IndexBlockHeaderKeyLength) key[0] = IndexBlockHeaderKeyPrefix binary.BigEndian.PutUint64(key[1:], height) value, err := tx.IndexDB().Get(key) if err != nil { return cmttypes.Header{}, stacktrace.Propagate(err) } if value == nil { return cmttypes.Header{}, stacktrace.NewError("block header not found") } var protoHeader cmtproto.Header err = protoHeader.Unmarshal(value) if err != nil { return cmttypes.Header{}, stacktrace.Propagate(err, "failed to unmarshal block header") } header, err := cmttypes.HeaderFromProto(&protoHeader) if err != nil { return cmttypes.Header{}, stacktrace.Propagate(err, "failed to convert proto header to header") } return header, nil }