A very experimental PLC implementation which uses BFT consensus for decentralization
1package store
2
3import (
4 "context"
5 _ "embed"
6 "encoding/base32"
7 "encoding/binary"
8 "errors"
9 "iter"
10 "math"
11 "math/big"
12 "slices"
13 "strings"
14 "time"
15
16 "github.com/OffchainLabs/go-bitfield"
17 "github.com/bluesky-social/indigo/atproto/syntax"
18 cmtproto "github.com/cometbft/cometbft/proto/tendermint/types"
19 cmttypes "github.com/cometbft/cometbft/types"
20 ics23 "github.com/cosmos/ics23/go"
21 "github.com/dgraph-io/badger/v4"
22 "github.com/did-method-plc/go-didplc"
23 "github.com/gbl08ma/stacktrace"
24 cbornode "github.com/ipfs/go-ipld-cbor"
25 "github.com/polydawn/refmt/obj/atlas"
26 "github.com/samber/lo"
27 "github.com/samber/mo"
28 "tangled.org/gbl08ma.com/didplcbft/transaction"
29 "tangled.org/gbl08ma.com/didplcbft/types"
30)
31
32const (
33 PublicKeyLength = 32
34 AddressLength = 20
35
36 TreeOperationKeyPrefix = 'o'
37 TreeOperationKeyLength = 1 + 8
38 TreeRangeChallengeCommitmentKeyPrefix = 'C'
39 TreeRangeChallengeKeyLength = 1 + PublicKeyLength
40 TreeChallengeCompletionKeyPrefix = 'p'
41 TreeChallengeCompletionKeyLength = 1 + PublicKeyLength
42 TreeValidatorReputationKeyPrefix = 'r'
43 TreeValidatorReputationKeyLength = 1 + PublicKeyLength
44
45 IndexBlockChallengeKeyPrefix = 'c'
46 IndexBlockChallengeKeyLength = 1 + 8
47 IndexDIDLogKeyPrefix = 'l'
48 IndexDIDLogKeyLength = 1 + 15 + 8
49 IndexValidatorVotingActivityPrefix = 'v'
50 IndexValidatorVotingActivityKeyLength = 1 + AddressLength + 8
51 IndexBlockHeaderKeyPrefix = 'h'
52 IndexBlockHeaderKeyLength = 1 + 8
53
54 TreeAuthoritativePLCKey = "aPLCURL"
55 TreeAuthoritativeImportProgressKey = "aImportProgress"
56)
57
58var Consensus ConsensusStore = &consensusStore{}
59
60var ErrNoActiveChallengeCommitment = errors.New("the validator is currently not committed to a challenge")
61var ErrNoRecentChallengeCompletion = errors.New("the validator has not completed a range challenge recently")
62var ErrValidatorNotActive = errors.New("the validator is not active")
63
64// ConsensusStore manages all information that is directly or indirectly protected by consensus
65type ConsensusStore interface {
66 AuditLog(ctx context.Context, tx transaction.Read, did string, withProof bool) ([]types.SequencedLogEntry, *ics23.CommitmentProof, error)
67 AuditLogReverseIterator(ctx context.Context, tx transaction.Read, did string, err *error) iter.Seq[types.SequencedLogEntry]
68
69 ExportOperations(ctx context.Context, tx transaction.Read, after uint64, count int) ([]types.SequencedLogEntry, error) // passing a count of zero means unlimited
70 OperationsIterator(tx transaction.Read, after uint64, retErr *error) iter.Seq2[types.SequencedLogEntry, []byte]
71
72 StoreOperation(ctx context.Context, tx transaction.Write, entry didplc.LogEntry, nullifyWithSequenceEqualOrGreaterThan mo.Option[uint64]) error
73 SetOperationCreatedAt(tx transaction.Write, seqID uint64, createdAt time.Time) error
74
75 CountOperations(tx transaction.Read) (uint64, error)
76
77 AuthoritativePLC(tx transaction.Read) (string, error)
78 SetAuthoritativePLC(tx transaction.Write, url string) error
79
80 AuthoritativeImportProgress(tx transaction.Read) (uint64, error)
81 SetAuthoritativeImportProgress(tx transaction.Write, nextCursor uint64) error
82
83 ValidatorRangeChallengeCommitment(tx transaction.Read, validatorPubKey []byte) (fromHeight, toHeight, provenHeight, includedOnHeight uint64, treeRoot []byte, err error)
84 SetValidatorRangeChallengeCommitment(tx transaction.Write, validatorPubKey []byte, fromHeight, toHeight, provenHeight, includedOnHeight uint64, treeRoot []byte) error
85 ClearValidatorRangeChallengeCommitment(tx transaction.Write, validatorPubKey []byte) error
86
87 ValidatorRangeChallengeCompletion(tx transaction.Read, validatorPubKey []byte) (uint64, error)
88 SetValidatorRangeChallengeCompletion(tx transaction.Write, validatorPubKey []byte, completedToHeight uint64) error
89
90 BlockChallengeProof(tx transaction.Read, height uint64) ([]byte, error)
91 BlockChallengeProofsIterator(tx transaction.Read, afterHeight uint64, retErr *error) iter.Seq2[uint64, []byte] // afterHeight is exclusive for consistency with OperationsIterator
92 BlockChallengeProofsReverseIterator(tx transaction.Read, beforeHeight uint64, retErr *error) iter.Seq2[uint64, []byte]
93 StoreBlockChallengeProof(tx transaction.WriteIndex, blockHeight uint64, proof []byte) error
94 DeleteBlockChallengeProofsBelowHeight(ctx context.Context, tx transaction.WriteIndex, blockHeight uint64) error
95
96 ValidatorReputation(tx transaction.Read, validatorPubKey []byte) (uint64, error)
97 ChangeValidatorReputation(tx transaction.Write, validatorPubKey []byte, changer func(reputation uint64) (uint64, error)) error
98 ChangeAllNonZeroValidatorReputations(tx transaction.Write, changer func(validatorPubKey []byte, reputation uint64) (uint64, error)) error
99
100 ActiveValidatorPubKey(tx transaction.Read, height uint64, validatorAddress []byte) ([]byte, error)
101 ActiveValidatorsIterator(tx transaction.Read, epochHeight uint64, retErr *error) iter.Seq[ActiveValidator]
102 InitializeValidatorVotingActivity(tx transaction.WriteIndex, validatorAddress, validatorPubKey []byte, epochHeight uint64) error
103 MarkValidatorVote(tx transaction.WriteIndex, validatorAddress []byte, height uint64) error
104
105 FallbackBlockHeader(tx transaction.Read, height uint64) (cmttypes.Header, error)
106
107 ConfigureEpochSize(epochSize uint64)
108}
109
110type ActiveValidator struct {
111 Address []byte
112 PublicKey []byte
113 VoteCount uint64
114}
115
116var _ ConsensusStore = (*consensusStore)(nil)
117
118// consensusStore exists just to groups methods nicely
119type consensusStore struct {
120 epochSize uint64
121}
122
123// ConfigureEpochSize implements [ConsensusStore].
124func (t *consensusStore) ConfigureEpochSize(epochSize uint64) {
125 t.epochSize = epochSize
126}
127
128func (t *consensusStore) AuditLog(ctx context.Context, tx transaction.Read, did string, withProof bool) ([]types.SequencedLogEntry, *ics23.CommitmentProof, error) {
129 didBytes, err := DIDToBytes(did)
130 if err != nil {
131 return nil, nil, stacktrace.Propagate(err)
132 }
133
134 didRangeStart := marshalDIDLogKey(didBytes, 0)
135 didRangeEnd := marshalDIDLogKey(didBytes, math.MaxUint64)
136
137 didLogIterator, err := tx.IndexDB().Iterator(didRangeStart, didRangeEnd)
138 if err != nil {
139 return nil, nil, stacktrace.Propagate(err)
140 }
141
142 defer didLogIterator.Close()
143
144 logEntries := make([]types.SequencedLogEntry, 0, 1)
145 proofs := []*ics23.CommitmentProof{}
146 txHeight := uint64(tx.Height())
147 for didLogIterator.Valid() {
148 select {
149 case <-ctx.Done():
150 return nil, nil, stacktrace.Propagate(ctx.Err())
151 default:
152 }
153
154 sequence := unmarshalDIDLogSequence(didLogIterator.Key())
155 validFromHeight, validToHeight := UnmarshalDIDLogValue(didLogIterator.Value())
156
157 opKey := marshalOperationKey(sequence)
158
159 if txHeight >= validFromHeight && txHeight <= validToHeight {
160 operationValue, err := tx.Tree().Get(opKey)
161 if err != nil {
162 return nil, nil, stacktrace.Propagate(err)
163 }
164
165 if withProof {
166 proof, err := tx.Tree().GetProof(opKey)
167 if err != nil {
168 return nil, nil, stacktrace.Propagate(err)
169 }
170 proofs = append(proofs, proof)
171 }
172
173 logEntry, err := unmarshalLogEntry(opKey, operationValue)
174 if err != nil {
175 return nil, nil, stacktrace.Propagate(err)
176 }
177
178 logEntries = append(logEntries, logEntry)
179 }
180 didLogIterator.Next()
181 }
182
183 err = didLogIterator.Error()
184 if err != nil {
185 return nil, nil, stacktrace.Propagate(err)
186 }
187
188 var combinedProof *ics23.CommitmentProof
189 if withProof {
190 combinedProof, err = ics23.CombineProofs(proofs)
191 if err != nil {
192 return nil, nil, stacktrace.Propagate(err, "failed to combine proofs")
193 }
194 }
195 return logEntries, combinedProof, nil
196}
197
198func (t *consensusStore) AuditLogReverseIterator(ctx context.Context, tx transaction.Read, did string, retErr *error) iter.Seq[types.SequencedLogEntry] {
199 return func(yield func(types.SequencedLogEntry) bool) {
200 *retErr = nil
201
202 didBytes, err := DIDToBytes(did)
203 if err != nil {
204 *retErr = stacktrace.Propagate(err)
205 return
206 }
207
208 mayBePresent := tx.TestDIDBloomFilter(didBytes)
209 if !mayBePresent {
210 return
211 }
212
213 didRangeStart := marshalDIDLogKey(didBytes, 0)
214 didRangeEnd := marshalDIDLogKey(didBytes, math.MaxUint64)
215
216 opts := badger.DefaultIteratorOptions
217 // we rarely have more than one entry for a DID and we see great performance gains from not prefetching
218 // (iterating on DIDs that don't exist goes from taking over 150us to taking around 35 us)
219 opts.PrefetchValues = false
220 opts.Reverse = true
221 didLogIterator, err := tx.IndexDB().IteratorWithOptions(didRangeStart, didRangeEnd, opts)
222 if err != nil {
223 *retErr = stacktrace.Propagate(err)
224 return
225 }
226
227 defer didLogIterator.Close()
228
229 txHeight := uint64(tx.Height())
230
231 for didLogIterator.Valid() {
232 select {
233 case <-ctx.Done():
234 *retErr = stacktrace.Propagate(ctx.Err())
235 return
236 default:
237 }
238
239 sequence := unmarshalDIDLogSequence(didLogIterator.Key())
240 validFromHeight, validToHeight := UnmarshalDIDLogValue(didLogIterator.Value())
241
242 opKey := marshalOperationKey(sequence)
243
244 if txHeight >= validFromHeight && txHeight <= validToHeight {
245 operationValue, err := tx.Tree().Get(opKey)
246 if err != nil {
247 *retErr = stacktrace.Propagate(err)
248 return
249 }
250
251 logEntry, err := unmarshalLogEntry(opKey, operationValue)
252 if err != nil {
253 *retErr = stacktrace.Propagate(err)
254 return
255 }
256
257 if !yield(logEntry) {
258 return
259 }
260 }
261 didLogIterator.Next()
262 }
263
264 err = didLogIterator.Error()
265 if err != nil {
266 *retErr = stacktrace.Propagate(err)
267 }
268 }
269}
270
271func (t *consensusStore) ExportOperations(ctx context.Context, tx transaction.Read, after uint64, count int) ([]types.SequencedLogEntry, error) {
272 entries := make([]types.SequencedLogEntry, 0, count)
273 var iterErr error
274 for logEntry := range t.OperationsIterator(tx, after, &iterErr) {
275 entries = append(entries, logEntry)
276
277 // this condition being checked here also makes it so that a count of zero means unlimited
278 if len(entries) == count {
279 break
280 }
281 }
282 if iterErr != nil {
283 return nil, stacktrace.Propagate(iterErr, "ran into an error while iterating")
284 }
285 return entries, nil
286}
287
288func (t *consensusStore) OperationsIterator(tx transaction.Read, after uint64, retErr *error) iter.Seq2[types.SequencedLogEntry, []byte] {
289 return func(yield func(types.SequencedLogEntry, []byte) bool) {
290 *retErr = nil
291
292 // as the name suggests, after is an exclusive lower bound, but our iterators use inclusive lower bounds
293 start := after + 1
294 startKey := marshalOperationKey(start)
295 endKey := maxOperationKey
296
297 opIterator, err := tx.Tree().Iterator(startKey, endKey, true)
298 if err != nil {
299 *retErr = stacktrace.Propagate(err)
300 return
301 }
302
303 defer opIterator.Close()
304
305 for opIterator.Valid() {
306 logEntry, err := unmarshalLogEntry(opIterator.Key(), opIterator.Value())
307 if err != nil {
308 *retErr = stacktrace.Propagate(err)
309 return
310 }
311
312 if !yield(logEntry, opIterator.Value()) {
313 return
314 }
315
316 opIterator.Next()
317 }
318 err = opIterator.Error()
319 if err != nil {
320 *retErr = stacktrace.Propagate(err)
321 }
322 }
323}
324
325// StoreOperation stores an operation in the tree, nullifying existing operations whose index within the DID's history
326// is lower or equal to the specified optional integer.
327func (t *consensusStore) StoreOperation(ctx context.Context, tx transaction.Write, entry didplc.LogEntry, nullifyWithSequenceEqualOrGreaterThan mo.Option[uint64]) error {
328 didBytes, err := DIDToBytes(entry.DID)
329 if err != nil {
330 return stacktrace.Propagate(err)
331 }
332
333 txHeight := uint64(tx.Height())
334
335 if nullifyEGt, ok := nullifyWithSequenceEqualOrGreaterThan.Get(); ok {
336 didRangeStart := marshalDIDLogKey(didBytes, nullifyEGt)
337 didRangeEnd := marshalDIDLogKey(didBytes, math.MaxUint64)
338
339 opts := badger.DefaultIteratorOptions
340 opts.PrefetchValues = false
341 opts.Reverse = true
342 didLogIterator, err := tx.IndexDB().IteratorWithOptions(didRangeStart, didRangeEnd, opts)
343 if err != nil {
344 return stacktrace.Propagate(err)
345 }
346
347 defer didLogIterator.Close()
348
349 txHeight := uint64(tx.Height())
350
351 for didLogIterator.Valid() {
352 select {
353 case <-ctx.Done():
354 return stacktrace.Propagate(ctx.Err())
355 default:
356 }
357
358 sequence := unmarshalDIDLogSequence(didLogIterator.Key())
359 validFromHeight, validToHeight := UnmarshalDIDLogValue(didLogIterator.Value())
360
361 opKey := marshalOperationKey(sequence)
362
363 if txHeight < validFromHeight || txHeight > validToHeight {
364 // ignore ops that are invisible at this height
365 didLogIterator.Next()
366 continue
367 }
368
369 operationValue, err := tx.Tree().Get(opKey)
370 if err != nil {
371 return stacktrace.Propagate(err)
372 }
373
374 updated, err := tx.Tree().Set(opKey, markOperationValueNullified(operationValue))
375 if err != nil {
376 return stacktrace.Propagate(err)
377 }
378 if !updated {
379 // if we get to this point we have a mistake in our program, and the data is now inconsistent
380 // we are not supposed to be able to recover from this error without rolling back the tree
381 return stacktrace.NewError("expected to be updating an existing operation key but wrote new one instead")
382 }
383
384 didLogIterator.Next()
385 }
386 err = didLogIterator.Error()
387 if err != nil {
388 return stacktrace.Propagate(err)
389 }
390 }
391
392 opDatetime, err := syntax.ParseDatetime(entry.CreatedAt)
393 if err != nil {
394 return stacktrace.Propagate(err, "invalid CreatedAt")
395 }
396
397 sequence, err := tx.NextSequence()
398 if err != nil {
399 return stacktrace.Propagate(err)
400 }
401
402 operation := entry.Operation.AsOperation()
403 opKey := marshalOperationKey(sequence)
404 opValue := marshalOperationValue(entry.Nullified, didBytes, opDatetime.Time(), operation)
405
406 updated, err := tx.Tree().Set(opKey, opValue)
407 if err != nil {
408 return stacktrace.Propagate(err)
409 }
410 if updated {
411 // if we get to this point we have a mistake in our program, and the data is now inconsistent
412 // we are not supposed to be able to recover from this error without rolling back the tree
413 return stacktrace.NewError("expected to be writing to a new operation key but updated instead")
414 }
415
416 logKey := marshalDIDLogKey(didBytes, sequence)
417 logValue := marshalDIDLogValue(txHeight, math.MaxUint64)
418
419 err = tx.IndexDB().Set(logKey, logValue)
420 if err != nil {
421 return stacktrace.Propagate(err)
422 }
423
424 tx.AddToDIDBloomFilter(didBytes, sequence)
425
426 return nil
427}
428
429func (t *consensusStore) SetOperationCreatedAt(tx transaction.Write, seqID uint64, createdAt time.Time) error {
430 opKey := marshalOperationKey(seqID)
431
432 opValue, err := tx.Tree().Get(opKey)
433 if err != nil {
434 return stacktrace.Propagate(err)
435 }
436 if len(opValue) == 0 {
437 return stacktrace.NewError("operation %d not found", seqID)
438 }
439
440 opValue = slices.Clone(opValue)
441
442 ts := uint64(createdAt.Truncate(1 * time.Millisecond).UTC().UnixNano())
443 binary.BigEndian.PutUint64(opValue[16:24], ts)
444
445 updated, err := tx.Tree().Set(opKey, opValue)
446 if !updated {
447 // if we get to this point we have a mistake in our program, and the data is now inconsistent
448 // we are not supposed to be able to recover from this error without rolling back the tree
449 return stacktrace.NewError("expected to be updating an existing operation key but wrote new one instead")
450 }
451 return stacktrace.Propagate(err)
452}
453
454var minOperationKey = marshalOperationKey(0)
455var maxOperationKey = marshalOperationKey(math.MaxInt64)
456
457func (t *consensusStore) CountOperations(tx transaction.Read) (uint64, error) {
458 seq := uint64(0)
459
460 itr, err := tx.Tree().Iterator(minOperationKey, maxOperationKey, false)
461 if err != nil {
462 return 0, stacktrace.Propagate(err)
463 }
464
465 defer itr.Close()
466
467 if itr.Valid() {
468 seq, err = unmarshalOperationKey(itr.Key())
469 if err != nil {
470 return 0, stacktrace.Propagate(err)
471 }
472 }
473 return seq, stacktrace.Propagate(err)
474}
475
476func DIDToBytes(did string) ([]byte, error) {
477 if !strings.HasPrefix(did, "did:plc:") {
478 return nil, stacktrace.NewError("invalid did:plc")
479 }
480
481 didBytes := make([]byte, 15)
482
483 did = strings.ToUpper(did)
484
485 numWritten, err := base32.StdEncoding.Decode(didBytes, []byte(did[8:]))
486 if err != nil {
487 return nil, stacktrace.Propagate(err, "invalid did:plc")
488 }
489 if numWritten != 15 {
490 return nil, stacktrace.NewError("invalid did:plc")
491 }
492
493 return didBytes, nil
494}
495
496func bytesToDID(didBytes []byte) (string, error) {
497 did := "did:plc:" + strings.ToLower(base32.StdEncoding.EncodeToString(didBytes))
498 if len(did) != 8+24 {
499 return "", stacktrace.NewError("invalid did:plc")
500 }
501 return did, nil
502}
503
504func marshalDIDLogKey(didBytes []byte, sequence uint64) []byte {
505 key := make([]byte, IndexDIDLogKeyLength)
506 key[0] = IndexDIDLogKeyPrefix
507 copy(key[1:16], didBytes)
508 binary.BigEndian.PutUint64(key[16:], sequence)
509 return key
510}
511
512func unmarshalDIDLogSequence(logKey []byte) uint64 {
513 return binary.BigEndian.Uint64(logKey[16:24])
514}
515
516// validFromHeight, validToHeight are inclusive (i.e. if the former is 5 and the latter is 10, the value was valid at height 5 and 10, but not at 4 or 11)
517func marshalDIDLogValue(validFromHeight, validToHeight uint64) []byte {
518 value := make([]byte, 8+8)
519 binary.BigEndian.PutUint64(value, validFromHeight)
520 binary.BigEndian.PutUint64(value[8:], validToHeight)
521 return value
522}
523
524func UnmarshalDIDLogValue(value []byte) (validFromHeight, validToHeight uint64) {
525 validFromHeight = binary.BigEndian.Uint64(value[0:8])
526 validToHeight = binary.BigEndian.Uint64(value[8:16])
527 return
528}
529
530func marshalOperationKey(sequence uint64) []byte {
531 key := make([]byte, TreeOperationKeyLength)
532 key[0] = TreeOperationKeyPrefix
533
534 binary.BigEndian.PutUint64(key[1:], sequence)
535
536 return key
537}
538
539func unmarshalOperationKey(key []byte) (uint64, error) {
540 return binary.BigEndian.Uint64(key[1:9]), nil
541}
542
543func marshalOperationValue(nullified bool, didBytes []byte, createdAt time.Time, operation didplc.Operation) []byte {
544 opAsBytes := operation.SignedCBORBytes()
545 o := make([]byte, 1+15+8+len(opAsBytes))
546
547 o[0] = lo.Ternary[byte](nullified, 1, 0)
548
549 copy(o[1:16], didBytes)
550
551 ts := uint64(createdAt.Truncate(1 * time.Millisecond).UTC().UnixNano())
552 binary.BigEndian.PutUint64(o[16:24], ts)
553 copy(o[24:], opAsBytes)
554
555 return o
556}
557
558func unmarshalOperationValue(value []byte) (bool, string, time.Time, didplc.OpEnum, error) {
559 nullified := value[0] != 0
560
561 did, err := bytesToDID(value[1:16])
562 if err != nil {
563 return false, "", time.Time{}, didplc.OpEnum{}, stacktrace.Propagate(err)
564 }
565
566 createdAtUnixNano := binary.BigEndian.Uint64(value[16:24])
567 createdAt := time.Unix(0, int64(createdAtUnixNano)).UTC()
568
569 var opEnum didplc.OpEnum
570 err = cbornode.DecodeInto(value[24:], &opEnum)
571 if err != nil {
572 return false, "", time.Time{}, didplc.OpEnum{}, stacktrace.Propagate(err)
573 }
574 return nullified, did, createdAt, opEnum, nil
575}
576
577func markOperationValueNullified(value []byte) []byte {
578 v := slices.Clone(value)
579 v[0] = 1
580 return v
581}
582
583func unmarshalLogEntry(operationKey, operationValue []byte) (types.SequencedLogEntry, error) {
584 nullified, actualDID, timestamp, operation, err := unmarshalOperationValue(operationValue)
585 if err != nil {
586 return types.SequencedLogEntry{}, stacktrace.Propagate(err)
587 }
588
589 seq, err := unmarshalOperationKey(operationKey)
590 if err != nil {
591 return types.SequencedLogEntry{}, stacktrace.Propagate(err)
592 }
593
594 return types.SequencedLogEntry{
595 Seq: seq,
596 DID: actualDID,
597 Operation: operation,
598 CID: operation.AsOperation().CID(),
599 Nullified: nullified,
600 CreatedAt: timestamp,
601 }, nil
602}
603
604func init() {
605 cbornode.RegisterCborType(atlas.BuildEntry(didplc.OpEnum{}).
606 Transform().
607 TransformMarshal(atlas.MakeMarshalTransformFunc(
608 func(o didplc.OpEnum) (interface{}, error) {
609 if o.Regular != nil {
610 return o.Regular, nil
611 } else if o.Legacy != nil {
612 return o.Legacy, nil
613 } else if o.Tombstone != nil {
614 return o.Tombstone, nil
615 }
616 return nil, stacktrace.NewError("invalid OpEnum")
617 })).
618 TransformUnmarshal(atlas.MakeUnmarshalTransformFunc(
619 func(x map[string]any) (didplc.OpEnum, error) {
620 typ, ok := x["type"]
621 if !ok {
622 return didplc.OpEnum{}, stacktrace.NewError("did not find expected operation 'type' field")
623 }
624
625 // this is so stupid but oh well - maybe one day the entire thing will be upgraded to a less stupid cbor encoder
626 b, err := cbornode.DumpObject(x)
627 if err != nil {
628 return didplc.OpEnum{}, stacktrace.Propagate(err)
629 }
630
631 switch typ {
632 case "plc_operation":
633 o := &didplc.RegularOp{}
634 err = cbornode.DecodeInto(b, &o)
635 return didplc.OpEnum{
636 Regular: o,
637 }, stacktrace.Propagate(err)
638 case "create":
639 o := &didplc.LegacyOp{}
640 err = cbornode.DecodeInto(b, &o)
641 return didplc.OpEnum{
642 Legacy: o,
643 }, stacktrace.Propagate(err)
644 case "plc_tombstone":
645 o := &didplc.TombstoneOp{}
646 err = cbornode.DecodeInto(b, &o)
647 return didplc.OpEnum{
648 Tombstone: o,
649 }, stacktrace.Propagate(err)
650 default:
651 return didplc.OpEnum{}, stacktrace.NewError("unexpected operation type: %s", typ)
652 }
653 })).
654 Complete())
655}
656
657func (t *consensusStore) AuthoritativePLC(tx transaction.Read) (string, error) {
658 url, err := tx.Tree().Get([]byte(TreeAuthoritativePLCKey))
659 if err != nil {
660 return "", stacktrace.Propagate(err)
661 }
662 if url == nil {
663 return "", nil
664 }
665 return string(url), nil
666}
667
668func (t *consensusStore) SetAuthoritativePLC(tx transaction.Write, url string) error {
669 _, err := tx.Tree().Set([]byte(TreeAuthoritativePLCKey), []byte(url))
670 return stacktrace.Propagate(err)
671}
672
673func (t *consensusStore) AuthoritativeImportProgress(tx transaction.Read) (uint64, error) {
674 progBytes, err := tx.Tree().Get([]byte(TreeAuthoritativeImportProgressKey))
675 if err != nil {
676 return 0, stacktrace.Propagate(err)
677 }
678 if len(progBytes) != 8 {
679 return 0, nil
680 }
681 return binary.BigEndian.Uint64(progBytes), nil
682}
683
684func (t *consensusStore) SetAuthoritativeImportProgress(tx transaction.Write, nextCursor uint64) error {
685 value := make([]byte, 8)
686 binary.BigEndian.PutUint64(value, nextCursor)
687
688 _, err := tx.Tree().Set([]byte(TreeAuthoritativeImportProgressKey), value)
689 return stacktrace.Propagate(err)
690}
691
692func marshalRangeChallengeCommitmentKey(validatorPubKey []byte) []byte {
693 key := make([]byte, TreeRangeChallengeKeyLength)
694 key[0] = TreeRangeChallengeCommitmentKeyPrefix
695 copy(key[1:], validatorPubKey)
696 return key
697}
698
699func marshalRangeChallengeCommitmentValue(fromHeight, toHeight, provenHeight, includedOnHeight uint64, treeRoot []byte) []byte {
700 value := make([]byte, 8*4+32)
701 binary.BigEndian.PutUint64(value, fromHeight)
702 binary.BigEndian.PutUint64(value[8:], toHeight)
703 binary.BigEndian.PutUint64(value[16:], provenHeight)
704 binary.BigEndian.PutUint64(value[24:], includedOnHeight)
705 copy(value[32:], treeRoot[0:32])
706 return value
707}
708
709func unmarshalRangeChallengeCommitmentValue(value []byte) (fromHeight, toHeight, provenHeight, includedOnHeight uint64, treeRoot []byte) {
710 fromHeight = binary.BigEndian.Uint64(value)
711 toHeight = binary.BigEndian.Uint64(value[8:])
712 provenHeight = binary.BigEndian.Uint64(value[16:])
713 includedOnHeight = binary.BigEndian.Uint64(value[24:])
714 treeRoot = slices.Clone(value[32:])
715 return
716}
717
718// ValidatorRangeChallengeCommitment implements [ConsensusStore].
719func (t *consensusStore) ValidatorRangeChallengeCommitment(tx transaction.Read, validatorPubKey []byte) (fromHeight, toHeight, provenHeight, includedOnHeight uint64, treeRoot []byte, err error) {
720 key := marshalRangeChallengeCommitmentKey(validatorPubKey)
721 value, err := tx.Tree().Get(key)
722 if err != nil {
723 return 0, 0, 0, 0, nil, stacktrace.Propagate(err)
724 }
725 if value == nil {
726 return 0, 0, 0, 0, nil, stacktrace.Propagate(ErrNoActiveChallengeCommitment)
727 }
728 fromHeight, toHeight, provenHeight, includedOnHeight, treeRoot = unmarshalRangeChallengeCommitmentValue(value)
729 return
730}
731
732// SetValidatorRangeChallengeCommitment implements [ConsensusStore].
733func (t *consensusStore) SetValidatorRangeChallengeCommitment(tx transaction.Write, validatorPubKey []byte, fromHeight, toHeight, provenHeight, includedOnHeight uint64, treeRoot []byte) error {
734 key := marshalRangeChallengeCommitmentKey(validatorPubKey)
735 value := marshalRangeChallengeCommitmentValue(fromHeight, toHeight, provenHeight, includedOnHeight, treeRoot)
736 // this may overwrite sometimes (e.g. if a previous commitment has expired and the validator needs to submit a new one)
737 _, err := tx.Tree().Set(key, value)
738 return stacktrace.Propagate(err)
739}
740
741// ClearValidatorRangeChallengeCommitment implements [ConsensusStore].
742func (t *consensusStore) ClearValidatorRangeChallengeCommitment(tx transaction.Write, validatorPubKey []byte) error {
743 _, removed, err := tx.Tree().Remove(marshalRangeChallengeCommitmentKey(validatorPubKey))
744 if err != nil {
745 return stacktrace.Propagate(err)
746 }
747 if !removed {
748 // we are only expecting to call this after completing a range challenge, so we're expecting that the key would still exist
749 // (we would have needed it to validate the challenge completion transaction)
750 return stacktrace.NewError("validator did not have a range challenge commitment")
751 }
752 return nil
753}
754
755func marshalRangeChallengeCompletionKey(validatorPubKey []byte) []byte {
756 key := make([]byte, TreeChallengeCompletionKeyLength)
757 key[0] = TreeChallengeCompletionKeyPrefix
758 copy(key[1:], validatorPubKey)
759 return key
760}
761
762// ValidatorRangeChallengeCompletion implements [ConsensusStore].
763func (t *consensusStore) ValidatorRangeChallengeCompletion(tx transaction.Read, validatorPubKey []byte) (uint64, error) {
764 key := marshalRangeChallengeCompletionKey(validatorPubKey)
765 value, err := tx.Tree().Get(key)
766 if err != nil {
767 return 0, stacktrace.Propagate(err)
768 }
769 if value == nil {
770 return 0, stacktrace.Propagate(ErrNoRecentChallengeCompletion)
771 }
772 return binary.BigEndian.Uint64(value), nil
773}
774
775// SetValidatorRangeChallengeCompletion implements [ConsensusStore].
776func (t *consensusStore) SetValidatorRangeChallengeCompletion(tx transaction.Write, validatorPubKey []byte, completedToHeight uint64) error {
777 key := marshalRangeChallengeCompletionKey(validatorPubKey)
778 value := binary.BigEndian.AppendUint64(nil, completedToHeight)
779 _, err := tx.Tree().Set(key, value)
780 return stacktrace.Propagate(err)
781}
782
783func marshalBlockChallengeProofKey(block uint64) []byte {
784 key := make([]byte, IndexBlockChallengeKeyLength)
785 key[0] = IndexBlockChallengeKeyPrefix
786
787 binary.BigEndian.PutUint64(key[1:], block)
788
789 return key
790}
791
792func unmarshalBlockChallengeProofKey(key []byte) uint64 {
793 return binary.BigEndian.Uint64(key[1:9])
794}
795
796var minBlockChallengeProofKey = marshalBlockChallengeProofKey(0)
797var maxBlockChallengeProofKey = marshalBlockChallengeProofKey(math.MaxInt64)
798
799func (t *consensusStore) BlockChallengeProof(tx transaction.Read, height uint64) ([]byte, error) {
800 key := marshalBlockChallengeProofKey(height)
801 value, err := tx.IndexDB().Get(key)
802 return value, stacktrace.Propagate(err)
803}
804
805func (t *consensusStore) BlockChallengeProofsIterator(tx transaction.Read, afterHeight uint64, retErr *error) iter.Seq2[uint64, []byte] {
806 return func(yield func(uint64, []byte) bool) {
807 *retErr = nil
808 // as the name suggests, after is an exclusive lower bound, but our iterators use inclusive lower bounds
809 start := afterHeight + 1
810 startKey := marshalBlockChallengeProofKey(start)
811 endKey := maxBlockChallengeProofKey
812
813 proofsIterator, err := tx.IndexDB().Iterator(startKey, endKey)
814 if err != nil {
815 *retErr = stacktrace.Propagate(err)
816 return
817 }
818
819 defer proofsIterator.Close()
820
821 for proofsIterator.Valid() {
822 blockHeight := unmarshalBlockChallengeProofKey(proofsIterator.Key())
823
824 if !yield(blockHeight, proofsIterator.Value()) {
825 return
826 }
827
828 proofsIterator.Next()
829 }
830 err = proofsIterator.Error()
831 if err != nil {
832 *retErr = stacktrace.Propagate(err)
833 }
834 }
835}
836
837// BlockChallengeProofsReverseIterator implements [ConsensusStore].
838func (t *consensusStore) BlockChallengeProofsReverseIterator(tx transaction.Read, beforeHeight uint64, retErr *error) iter.Seq2[uint64, []byte] {
839 return func(yield func(uint64, []byte) bool) {
840 *retErr = nil
841 startKey := minBlockChallengeProofKey
842 endKey := marshalBlockChallengeProofKey(beforeHeight)
843
844 proofsIterator, err := tx.IndexDB().ReverseIterator(startKey, endKey)
845 if err != nil {
846 *retErr = stacktrace.Propagate(err)
847 return
848 }
849
850 defer proofsIterator.Close()
851
852 for proofsIterator.Valid() {
853 blockHeight := unmarshalBlockChallengeProofKey(proofsIterator.Key())
854
855 if !yield(blockHeight, proofsIterator.Value()) {
856 return
857 }
858
859 proofsIterator.Next()
860 }
861 err = proofsIterator.Error()
862 if err != nil {
863 *retErr = stacktrace.Propagate(err)
864 }
865 }
866}
867
868func (t *consensusStore) StoreBlockChallengeProof(tx transaction.WriteIndex, blockHeight uint64, proof []byte) error {
869 err := tx.IndexDB().Set(marshalBlockChallengeProofKey(blockHeight), proof)
870 return stacktrace.Propagate(err)
871}
872
873func (t *consensusStore) DeleteBlockChallengeProofsBelowHeight(ctx context.Context, tx transaction.WriteIndex, blockHeight uint64) error {
874 // we should be good to delete as we iterate, as the writes within the transaction only hit the underlying DB implementation on commit
875 startKey := marshalBlockChallengeProofKey(0)
876 endKey := marshalBlockChallengeProofKey(blockHeight)
877
878 proofsIterator, err := tx.IndexDB().Iterator(startKey, endKey)
879 if err != nil {
880 return stacktrace.Propagate(err)
881 }
882
883 defer proofsIterator.Close()
884
885 for proofsIterator.Valid() {
886 select {
887 case <-ctx.Done():
888 return stacktrace.Propagate(ctx.Err())
889 default:
890 }
891
892 err = tx.IndexDB().Delete(slices.Clone(proofsIterator.Key()))
893 if err != nil {
894 return stacktrace.Propagate(err)
895 }
896
897 proofsIterator.Next()
898 }
899 return nil
900}
901
902func marshalValidatorReputationKey(validatorPubKey []byte) []byte {
903 key := make([]byte, TreeValidatorReputationKeyLength)
904 key[0] = TreeValidatorReputationKeyPrefix
905 copy(key[1:], validatorPubKey)
906 return key
907}
908
909// ValidatorReputation implements [ConsensusStore].
910func (t *consensusStore) ValidatorReputation(tx transaction.Read, validatorPubKey []byte) (uint64, error) {
911 key := marshalValidatorReputationKey(validatorPubKey)
912
913 value, err := tx.Tree().Get(key)
914 if err != nil {
915 return 0, stacktrace.Propagate(err)
916 }
917
918 // the following returns 0 if value is nil:
919 return new(big.Int).SetBytes(value).Uint64(), nil
920}
921
922// ChangeValidatorReputation implements [ConsensusStore].
923func (t *consensusStore) ChangeValidatorReputation(tx transaction.Write, validatorPubKey []byte, changer func(reputation uint64) (uint64, error)) error {
924 key := marshalValidatorReputationKey(validatorPubKey)
925
926 value, err := tx.Tree().Get(key)
927 if err != nil {
928 return stacktrace.Propagate(err)
929 }
930
931 reputation := new(big.Int).SetBytes(value)
932 newValue, err := changer(reputation.Uint64())
933 if err != nil {
934 return stacktrace.Propagate(err)
935 }
936 reputation.SetUint64(newValue)
937 if reputation.Sign() <= 0 {
938 _, _, err := tx.Tree().Remove(key)
939 return stacktrace.Propagate(err)
940 }
941
942 _, err = tx.Tree().Set(key, reputation.Bytes())
943 return stacktrace.Propagate(err)
944}
945
946var minReputationKey = marshalValidatorReputationKey(make([]byte, 32))
947var maxReputationKey = marshalValidatorReputationKey(slices.Repeat([]byte{0xff}, 32))
948
949// ChangeAllNonZeroValidatorReputations implements [ConsensusStore].
950func (t *consensusStore) ChangeAllNonZeroValidatorReputations(tx transaction.Write, changer func(validatorPubKey []byte, reputation uint64) (uint64, error)) error {
951 // we are not allowed to make updates to the tree while an iterator is active
952 // process validators in batches of 100 to avoid loading too many key-value pairs into memory
953 const batchSize = 100
954
955 startingKey := slices.Clone(minReputationKey)
956 skipFirst := false
957
958 type kv struct {
959 key []byte
960 value []byte
961 }
962
963 batch := func() ([]kv, bool, error) {
964 toSet := make([]kv, 0, batchSize)
965 itr, err := tx.Tree().Iterator(startingKey, maxReputationKey, true)
966 if err != nil {
967 return nil, false, stacktrace.Propagate(err)
968 }
969 defer itr.Close()
970
971 if skipFirst && itr.Valid() {
972 itr.Next()
973 }
974
975 for i := 0; itr.Valid() && i < batchSize; i++ {
976 reputation := new(big.Int).SetBytes(itr.Value())
977
978 keyCopy := slices.Clone(itr.Key())
979 pubKey := slices.Clone(itr.Key()[1:])
980
981 newValue, err := changer(pubKey, reputation.Uint64())
982 if err != nil {
983 return nil, false, stacktrace.Propagate(err)
984 }
985 reputation.SetUint64(newValue)
986
987 if reputation.Sign() <= 0 {
988 toSet = append(toSet, kv{
989 key: keyCopy,
990 value: nil,
991 })
992 } else {
993 toSet = append(toSet, kv{
994 key: keyCopy,
995 value: reputation.Bytes(),
996 })
997 }
998
999 itr.Next()
1000 }
1001
1002 return toSet, itr.Valid(), nil
1003 }
1004
1005 for {
1006 toSet, goAgain, err := batch()
1007 if err != nil {
1008 return stacktrace.Propagate(err)
1009 }
1010 skipFirst = true
1011
1012 for _, s := range toSet {
1013 var updated bool
1014 if s.value == nil {
1015 _, updated, err = tx.Tree().Remove(s.key)
1016 if err != nil {
1017 return stacktrace.Propagate(err)
1018 }
1019 } else {
1020 updated, err = tx.Tree().Set(s.key, s.value)
1021 if err != nil {
1022 return stacktrace.Propagate(err)
1023 }
1024 }
1025
1026 if !updated {
1027 return stacktrace.NewError("expected to be updating an existing reputation key but didn't")
1028 }
1029 }
1030
1031 if !goAgain || len(toSet) == 0 {
1032 return nil
1033 }
1034
1035 startingKey = toSet[len(toSet)-1].key
1036 }
1037}
1038
1039func MarshalValidatorVotingActivityKey(epochHeight uint64, validatorAddress []byte) []byte {
1040 key := make([]byte, IndexValidatorVotingActivityKeyLength)
1041 key[0] = IndexValidatorVotingActivityPrefix
1042 binary.BigEndian.PutUint64(key[1:], epochHeight)
1043 copy(key[9:], validatorAddress)
1044 return key
1045}
1046
1047func unmarshalValidatorVotingActivityKey(key []byte) (epochHeight uint64, validatorAddress []byte) {
1048 epochHeight = binary.BigEndian.Uint64(key[1:])
1049 validatorAddress = slices.Clone(key[9:])
1050 return
1051}
1052
1053func unmarshalValidatorVotingActivityValue(value []byte) (validatorPubKey []byte, bitlist *bitfield.Bitlist64, err error) {
1054 validatorPubKey = slices.Clone(value[:PublicKeyLength])
1055 bitlist, err = bitfield.Bitlist(value[PublicKeyLength:]).ToBitlist64()
1056 return validatorPubKey, bitlist, stacktrace.Propagate(err)
1057}
1058
1059// InitializeValidatorVotingActivity implements [ConsensusStore].
1060func (t *consensusStore) InitializeValidatorVotingActivity(tx transaction.WriteIndex, validatorAddress, validatorPubKey []byte, epochHeight uint64) error {
1061 epochHeight = epochHeight - epochHeight%t.epochSize
1062
1063 key := MarshalValidatorVotingActivityKey(epochHeight, validatorAddress)
1064
1065 bitlist := bitfield.NewBitlist64(t.epochSize)
1066
1067 bitlistBytes := bitlist.ToBitlist()
1068 value := make([]byte, PublicKeyLength+len(bitlistBytes))
1069 copy(value, validatorPubKey)
1070 copy(value[PublicKeyLength:], bitlistBytes)
1071
1072 err := tx.IndexDB().Set(key, value)
1073 return stacktrace.Propagate(err)
1074}
1075
1076// MarkValidatorVote implements [ConsensusStore].
1077func (t *consensusStore) MarkValidatorVote(tx transaction.WriteIndex, validatorAddress []byte, height uint64) error {
1078 epochHeight := height - height%t.epochSize
1079
1080 key := MarshalValidatorVotingActivityKey(epochHeight, validatorAddress)
1081
1082 value, err := tx.IndexDB().Get(key)
1083 if err != nil {
1084 return stacktrace.Propagate(err)
1085 }
1086 if value == nil {
1087 return stacktrace.Propagate(ErrValidatorNotActive)
1088 }
1089
1090 bitfield := bitfield.Bitlist(value[PublicKeyLength:])
1091 bitfield.SetBitAt(height%t.epochSize, true)
1092
1093 copy(value[PublicKeyLength:], bitfield)
1094
1095 err = tx.IndexDB().Set(key, value)
1096 return stacktrace.Propagate(err)
1097}
1098
1099// ActiveValidatorPubKey implements [ConsensusStore].
1100func (t *consensusStore) ActiveValidatorPubKey(tx transaction.Read, height uint64, validatorAddress []byte) ([]byte, error) {
1101 epochHeight := height - height%t.epochSize
1102
1103 key := MarshalValidatorVotingActivityKey(epochHeight, validatorAddress)
1104
1105 value, err := tx.IndexDB().Get(key)
1106 if err != nil {
1107 return nil, stacktrace.Propagate(err)
1108 }
1109 if value == nil {
1110 return nil, stacktrace.Propagate(ErrValidatorNotActive)
1111 }
1112
1113 pubkey, _, err := unmarshalValidatorVotingActivityValue(value)
1114 return pubkey, stacktrace.Propagate(err)
1115}
1116
1117// ActiveValidatorsIterator implements [ConsensusStore].
1118func (t *consensusStore) ActiveValidatorsIterator(tx transaction.Read, epochHeight uint64, retErr *error) iter.Seq[ActiveValidator] {
1119 return func(yield func(ActiveValidator) bool) {
1120 *retErr = nil
1121
1122 epochHeight = epochHeight - epochHeight%t.epochSize
1123
1124 startKey := MarshalValidatorVotingActivityKey(uint64(epochHeight), make([]byte, AddressLength))
1125 endKey := MarshalValidatorVotingActivityKey(uint64(epochHeight), slices.Repeat([]byte{0xff}, AddressLength))
1126
1127 iterator, err := tx.IndexDB().Iterator(startKey, endKey)
1128 if err != nil {
1129 *retErr = stacktrace.Propagate(err)
1130 return
1131 }
1132
1133 defer iterator.Close()
1134
1135 for iterator.Valid() {
1136 _, validatorAddress := unmarshalValidatorVotingActivityKey(iterator.Key())
1137 validatorPubKey, bitlist, err := unmarshalValidatorVotingActivityValue(iterator.Value())
1138 if err != nil {
1139 *retErr = stacktrace.Propagate(err)
1140 return
1141 }
1142
1143 if !yield(ActiveValidator{
1144 Address: validatorAddress,
1145 PublicKey: validatorPubKey,
1146 VoteCount: bitlist.Count(),
1147 }) {
1148 return
1149 }
1150
1151 iterator.Next()
1152 }
1153 err = iterator.Error()
1154 if err != nil {
1155 *retErr = stacktrace.Propagate(err)
1156 }
1157 }
1158}
1159
1160// FallbackBlockHeader implements [ConsensusStore].
1161func (t *consensusStore) FallbackBlockHeader(tx transaction.Read, height uint64) (cmttypes.Header, error) {
1162 key := make([]byte, IndexBlockHeaderKeyLength)
1163 key[0] = IndexBlockHeaderKeyPrefix
1164 binary.BigEndian.PutUint64(key[1:], height)
1165
1166 value, err := tx.IndexDB().Get(key)
1167 if err != nil {
1168 return cmttypes.Header{}, stacktrace.Propagate(err)
1169 }
1170 if value == nil {
1171 return cmttypes.Header{}, stacktrace.NewError("block header not found")
1172 }
1173
1174 var protoHeader cmtproto.Header
1175 err = protoHeader.Unmarshal(value)
1176 if err != nil {
1177 return cmttypes.Header{}, stacktrace.Propagate(err, "failed to unmarshal block header")
1178 }
1179 header, err := cmttypes.HeaderFromProto(&protoHeader)
1180 if err != nil {
1181 return cmttypes.Header{}, stacktrace.Propagate(err, "failed to convert proto header to header")
1182 }
1183 return header, nil
1184}