A very experimental PLC implementation which uses BFT consensus for decentralization

Refactor snapshot logic

gbl08ma.com 3d5476e8 a30cbf5e

verified
+1156 -802
+4 -4
abciapp/app.go
··· 30 30 logger cmtlog.Logger 31 31 plc plc.PLC 32 32 txFactory *transaction.Factory 33 - indexDB dbm.DB 33 + indexDB transaction.ExtendedDB 34 34 tree *iavl.MutableTree 35 35 fullyClearApplicationData func() error 36 36 ··· 43 43 ongoingWrite transaction.Write 44 44 45 45 snapshotDirectory string 46 - snapshotApplier *snapshotApplier 46 + snapshotApplier *store.SnapshotApplier 47 47 stateSyncTempDir string 48 48 49 49 lastProcessedProposalHash []byte ··· 51 51 52 52 aoc *authoritativeOperationsFetcher 53 53 54 - blockHeaderGetter BlockHeaderGetter 54 + blockHeaderGetter store.BlockHeaderGetter 55 55 triggerBlockCreation func() 56 56 57 57 blockChallengeCoordinator *blockChallengeCoordinator ··· 59 59 } 60 60 61 61 // store and plc must be able to share transaction objects 62 - func NewDIDPLCApplication(appContext context.Context, logger cmtlog.Logger, pv *privval.FilePV, treeDB dbm.DB, indexDB transaction.ExtendedDB, clearData func(), snapshotDirectory, stateSyncTempDir, didBloomFilterPath string, mempoolSubmitter types.MempoolSubmitter, blockHeaderGetter BlockHeaderGetter) (*DIDPLCApplication, *transaction.Factory, plc.PLC, func(), error) { 62 + func NewDIDPLCApplication(appContext context.Context, logger cmtlog.Logger, pv *privval.FilePV, treeDB dbm.DB, indexDB transaction.ExtendedDB, clearData func(), snapshotDirectory, stateSyncTempDir, didBloomFilterPath string, mempoolSubmitter types.MempoolSubmitter, blockHeaderGetter store.BlockHeaderGetter) (*DIDPLCApplication, *transaction.Factory, plc.PLC, func(), error) { 63 63 mkTree := func() *iavl.MutableTree { 64 64 // Using SpeedDefault appears to cause the processing time for ExecuteOperation to double on average 65 65 // Using SpeedBetterCompression appears to cause the processing time to double again
+2 -5
abciapp/block_challenge.go
··· 10 10 "github.com/Yiling-J/theine-go" 11 11 "github.com/cometbft/cometbft/crypto" 12 12 cmtlog "github.com/cometbft/cometbft/libs/log" 13 - cmttypes "github.com/cometbft/cometbft/types" 14 13 "github.com/consensys/gnark-crypto/ecc" 15 14 "github.com/consensys/gnark-crypto/ecc/bn254" 16 15 "github.com/consensys/gnark-crypto/ecc/bn254/fr/mimc" ··· 54 53 gnarklogger.Set(zerolog.Nop()) 55 54 } 56 55 57 - type BlockHeaderGetter func(int64) (cmttypes.Header, error) 58 - 59 56 type blockChallengeCoordinator struct { 60 57 g singleflight.Group[int64, []byte] 61 58 ··· 65 62 isConfiguredToBeValidator bool 66 63 validatorAddress []byte 67 64 txFactory *transaction.Factory 68 - blockHeaderGetter BlockHeaderGetter 65 + blockHeaderGetter store.BlockHeaderGetter 69 66 70 67 sharedWitnessDataCache *theine.LoadingCache[int64, proof.BlockChallengeCircuit] 71 68 } 72 69 73 - func newBlockChallengeCoordinator(runnerContext context.Context, logger cmtlog.Logger, txFactory *transaction.Factory, headerGetter BlockHeaderGetter, pubKey crypto.PubKey) (*blockChallengeCoordinator, error) { 70 + func newBlockChallengeCoordinator(runnerContext context.Context, logger cmtlog.Logger, txFactory *transaction.Factory, headerGetter store.BlockHeaderGetter, pubKey crypto.PubKey) (*blockChallengeCoordinator, error) { 74 71 c := &blockChallengeCoordinator{ 75 72 runnerContext: runnerContext, 76 73 logger: logger,
+2 -2
abciapp/range_challenge.go
··· 36 36 validatorPrivKey crypto.PrivKey 37 37 txFactory *transaction.Factory 38 38 blockChallengeCoordinator *blockChallengeCoordinator 39 - blockHeaderGetter BlockHeaderGetter 39 + blockHeaderGetter store.BlockHeaderGetter 40 40 nodeEventBus *cmttypes.EventBus 41 41 mempoolSubmitter types.MempoolSubmitter 42 42 consensusReactor consensusReactor ··· 63 63 validatorPrivKey crypto.PrivKey, 64 64 txFactory *transaction.Factory, 65 65 blockChallengeCoordinator *blockChallengeCoordinator, 66 - blockHeaderGetter BlockHeaderGetter, 66 + blockHeaderGetter store.BlockHeaderGetter, 67 67 nodeEventBus *cmttypes.EventBus, 68 68 mempoolSubmitter types.MempoolSubmitter, 69 69 consensusReactor consensusReactor) (*rangeChallengeCoordinator, error) {
+30 -770
abciapp/snapshots.go
··· 1 1 package abciapp 2 2 3 3 import ( 4 - "bufio" 5 - "bytes" 6 4 "context" 7 - "crypto/sha256" 8 - "encoding/binary" 9 5 "errors" 10 6 "fmt" 11 - "io" 12 7 "os" 13 8 "path/filepath" 14 9 "slices" 15 10 "strconv" 16 - "strings" 17 - "sync" 11 + "time" 18 12 19 - dbm "github.com/cometbft/cometbft-db" 20 13 abcitypes "github.com/cometbft/cometbft/abci/types" 21 - "github.com/cosmos/iavl" 22 14 "github.com/gbl08ma/stacktrace" 23 - "github.com/klauspost/compress/zstd" 24 - _ "tangled.org/gbl08ma.com/didplcbft/badgertodbm" // for reference in comment 25 15 "tangled.org/gbl08ma.com/didplcbft/store" 26 16 ) 27 17 28 - const snapshotChunkSize = 10 * 1024 * 1024 // 10 MB 29 - const snapshotChunkHashSize = 32 18 + // snapshotNumRecentBlockHeaders is the number of recent block headers to include in snapshots 19 + // It should be enough to handle challenges that depend on recent block headers 20 + const snapshotNumRecentBlockHeaders = max(CommitToChallengeMaxAgeInBlocks, CompleteChallengeMaxAgeInBlocks) + 5 30 21 31 22 // ListSnapshots implements [types.Application]. 32 23 func (d *DIDPLCApplication) ListSnapshots(context.Context, *abcitypes.RequestListSnapshots) (*abcitypes.ResponseListSnapshots, error) { ··· 37 28 38 29 snapshots := make([]*abcitypes.Snapshot, 0, len(files)) 39 30 for _, filename := range files { 40 - s, err := readSnapshotMetadata(filename) 31 + height, format, chunks, hash, chunksumsData, err := store.Snapshot.ReadSnapshotMetadata(filename) 41 32 if err != nil { 42 33 return nil, stacktrace.Propagate(err) 43 34 } 44 35 45 - snapshots = append(snapshots, s) 36 + snapshots = append(snapshots, &abcitypes.Snapshot{ 37 + Height: height, 38 + Format: format, 39 + Chunks: chunks, 40 + Hash: hash, 41 + Metadata: chunksumsData, 42 + }) 46 43 } 47 44 48 45 return &abcitypes.ResponseListSnapshots{ ··· 50 47 }, nil 51 48 } 52 49 53 - func readSnapshotMetadata(filename string) (*abcitypes.Snapshot, error) { 54 - // Extract height from filename pattern: %020d.snapshot 55 - base := filepath.Base(filename) 56 - if !strings.HasSuffix(base, ".snapshot") { 57 - return nil, stacktrace.NewError("invalid snapshot filename format: %s", filename) 58 - } 59 - heightStr := strings.TrimSuffix(base, ".snapshot") 60 - height, err := strconv.ParseInt(heightStr, 10, 64) 61 - if err != nil { 62 - return nil, stacktrace.Propagate(err, "failed to parse height from filename: %s", filename) 63 - } 64 - 65 - // Open and read snapshot file header 66 - f, err := os.Open(filename) 67 - if err != nil { 68 - return nil, stacktrace.Propagate(err, "failed to open snapshot file: %s", filename) 69 - } 70 - defer f.Close() 71 - 72 - // Read file magic (18 bytes) 73 - magic := make([]byte, 18) 74 - _, err = io.ReadFull(f, magic) 75 - if err != nil { 76 - return nil, stacktrace.Propagate(err, "failed to read file magic") 77 - } 78 - if string(magic) != "didplcbft-snapshot" { 79 - return nil, stacktrace.NewError("invalid file magic: expected 'didplcbft-snapshot', got '%s'", string(magic)) 80 - } 81 - 82 - // Read version bytes (6 bytes) 83 - versionBytes := make([]byte, 6) 84 - _, err = io.ReadFull(f, versionBytes) 85 - if err != nil { 86 - return nil, stacktrace.Propagate(err, "failed to read version bytes") 87 - } 88 - format := binary.BigEndian.Uint32(versionBytes[2:]) 89 - 90 - // Read height (8 bytes, big-endian) 91 - heightBytes := make([]byte, 8) 92 - _, err = io.ReadFull(f, heightBytes) 93 - if err != nil { 94 - return nil, stacktrace.Propagate(err, "failed to read height") 95 - } 96 - fileHeight := int64(binary.BigEndian.Uint64(heightBytes)) 97 - if fileHeight != height { 98 - return nil, stacktrace.NewError("height mismatch: filename indicates %d, file header contains %d", height, fileHeight) 99 - } 100 - 101 - // Read tree hash (32 bytes) 102 - hash := make([]byte, 32) 103 - _, err = io.ReadFull(f, hash) 104 - if err != nil { 105 - return nil, stacktrace.Propagate(err, "failed to read tree hash") 106 - } 107 - 108 - // Read corresponding chunksums file 109 - chunksumsFilename := strings.TrimSuffix(filename, ".snapshot") + ".chunksums" 110 - chunksumsData, err := os.ReadFile(chunksumsFilename) 111 - if err != nil { 112 - return nil, stacktrace.Propagate(err, "failed to read chunksums file: %s", chunksumsFilename) 113 - } 114 - 115 - // Calculate number of chunks (each chunk hash is 32 bytes) 116 - chunks := int64(len(chunksumsData)) / snapshotChunkHashSize 117 - 118 - return &abcitypes.Snapshot{ 119 - Height: uint64(height), 120 - Format: format, 121 - Chunks: uint32(chunks), 122 - Hash: hash, 123 - Metadata: chunksumsData, 124 - }, nil 125 - } 126 - 127 50 // LoadSnapshotChunk implements [types.Application]. 128 51 func (d *DIDPLCApplication) LoadSnapshotChunk(_ context.Context, req *abcitypes.RequestLoadSnapshotChunk) (*abcitypes.ResponseLoadSnapshotChunk, error) { 129 - if req.Format != 2 { 52 + if req.Format != store.SnapshotFormatVersion { 130 53 // just in case CometBFT asks us to load a chunk of a format we didn't declare to support in ListSnapshots... 131 54 return nil, stacktrace.NewError("unsupported snapshot format") 132 55 } 133 56 134 - // Construct filename from height using the same pattern as createSnapshot 135 - snapshotFilename := filepath.Join(d.snapshotDirectory, fmt.Sprintf("%020d.snapshot", req.Height)) 136 - 137 - // Open the snapshot file 138 - f, err := os.Open(snapshotFilename) 57 + chunkData, err := store.Snapshot.LoadSnapshotChunk(d.snapshotDirectory, req.Height, int(req.Chunk)) 139 58 if err != nil { 140 - return nil, stacktrace.Propagate(err, "failed to open snapshot file: %s", snapshotFilename) 141 - } 142 - defer f.Close() 143 - 144 - // Calculate the offset for the requested chunk (start from beginning of file, including header) 145 - offset := int64(req.Chunk) * snapshotChunkSize 146 - _, err = f.Seek(offset, io.SeekStart) 147 - if err != nil { 148 - return nil, stacktrace.Propagate(err, "failed to seek to chunk offset") 149 - } 150 - 151 - // Read up to snapshotChunkSize bytes 152 - chunkData := make([]byte, snapshotChunkSize) 153 - n, err := f.Read(chunkData) 154 - if err != nil && err != io.EOF { 155 - return nil, stacktrace.Propagate(err, "failed to read chunk data") 156 - } 157 - 158 - // If we read less than snapshotChunkSize, trim the slice 159 - if n < snapshotChunkSize { 160 - chunkData = chunkData[:n] 59 + return nil, err 161 60 } 162 61 163 62 return &abcitypes.ResponseLoadSnapshotChunk{ ··· 173 72 174 73 err := d.snapshotApplier.Apply(int(req.Index), req.Chunk) 175 74 if err != nil { 176 - if errors.Is(err, errMalformedChunk) { 75 + if errors.Is(err, store.ErrMalformedChunk) { 177 76 return &abcitypes.ResponseApplySnapshotChunk{ 178 77 Result: abcitypes.ResponseApplySnapshotChunk_RETRY, 179 78 RefetchChunks: []uint32{req.Index}, 180 79 RejectSenders: []string{req.Sender}, 181 80 }, nil 182 - } else if errors.Is(err, errTreeHashMismatch) { 81 + } else if errors.Is(err, store.ErrTreeHashMismatch) || errors.Is(err, store.ErrIndexEntryCountMismatch) { 183 82 return &abcitypes.ResponseApplySnapshotChunk{ 184 83 Result: abcitypes.ResponseApplySnapshotChunk_REJECT_SNAPSHOT, 185 84 RejectSenders: []string{req.Sender}, 186 85 }, nil 187 86 } 188 - return nil, stacktrace.NewError("failed to apply") 87 + return nil, stacktrace.Propagate(err, "failed to apply") 189 88 } 190 89 191 90 if d.snapshotApplier.Done() { ··· 234 133 } 235 134 } 236 135 237 - if req.Snapshot.Format != 2 { 136 + if req.Snapshot.Format != store.SnapshotFormatVersion { 238 137 return &abcitypes.ResponseOfferSnapshot{ 239 138 Result: abcitypes.ResponseOfferSnapshot_REJECT_FORMAT, 240 139 }, nil ··· 244 143 d.snapshotApplier, err = d.beginApplyingSnapshot(int64(req.Snapshot.Height), req.AppHash, int(req.Snapshot.Chunks), req.Snapshot.Metadata) 245 144 if err != nil { 246 145 d.snapshotApplier = nil 247 - if errors.Is(err, errInvalidMetadata) { 146 + if errors.Is(err, store.ErrInvalidMetadata) { 248 147 return &abcitypes.ResponseOfferSnapshot{ 249 148 Result: abcitypes.ResponseOfferSnapshot_REJECT_SENDER, 250 149 }, nil ··· 260 159 func (d *DIDPLCApplication) createSnapshot(treeVersion int64, tempFilename string) error { 261 160 defer (d.logMethod("createSnapshot", "treeVersion", treeVersion, "tempFilename", tempFilename))() 262 161 263 - it, err := d.tree.GetImmutable(treeVersion) 162 + readTx, err := d.txFactory.ReadHeight(time.Now(), treeVersion) 264 163 if err != nil { 265 164 return stacktrace.Propagate(err) 266 165 } ··· 274 173 } 275 174 defer f.Close() 276 175 277 - err = writeSnapshot(f, d.indexDB, d.blockHeaderGetter, it) 176 + err = store.Snapshot.Export(f, readTx, d.blockHeaderGetter, UpdateValidatorsBlockInterval, snapshotNumRecentBlockHeaders) 278 177 if err != nil { 279 178 return stacktrace.Propagate(err) 280 179 } ··· 290 189 } 291 190 defer hf.Close() 292 191 293 - err = writeChunkHashes(f, hf) 192 + err = store.Snapshot.WriteChunkHashes(f, hf) 294 193 if err != nil { 295 194 return stacktrace.Propagate(err) 296 195 } ··· 310 209 return nil 311 210 } 312 211 313 - func writeSnapshot(writerSeeker io.WriteSeeker, indexDB dbm.DB, blockHeaderGetter BlockHeaderGetter, it *iavl.ImmutableTree) error { 314 - writtenUntilReservedFields := 0 315 - 316 - bw := bufio.NewWriter(writerSeeker) 317 - 318 - // file magic and version 319 - c, err := bw.Write([]byte("didplcbft-snapshot")) 320 - if err != nil { 321 - return stacktrace.Propagate(err) 322 - } 323 - writtenUntilReservedFields += c 324 - 325 - c, err = bw.Write([]byte{0, 0, 0, 0, 0, 2}) 326 - if err != nil { 327 - return stacktrace.Propagate(err) 328 - } 329 - writtenUntilReservedFields += c 330 - 331 - b := make([]byte, 8) 332 - binary.BigEndian.PutUint64(b, uint64(it.Version())) 333 - c, err = bw.Write(b) 334 - if err != nil { 335 - return stacktrace.Propagate(err) 336 - } 337 - writtenUntilReservedFields += c 338 - 339 - c, err = bw.Write(it.Hash()) 340 - if err != nil { 341 - return stacktrace.Propagate(err) 342 - } 343 - writtenUntilReservedFields += c 344 - 345 - // reserve space for writing: 346 - // - 8 bytes for compressed section size in bytes 347 - // - 8 bytes for number of index entries 348 - // - 8 bytes for number of nodes 349 - sizeOfReservedFields := 8 * 3 350 - b = make([]byte, sizeOfReservedFields) 351 - _, err = bw.Write(b) 352 - if err != nil { 353 - return stacktrace.Propagate(err) 354 - } 355 - 356 - zstdw, err := zstd.NewWriter(bw, zstd.WithEncoderLevel(zstd.SpeedBetterCompression)) 357 - if err != nil { 358 - return stacktrace.Propagate(err) 359 - } 360 - 361 - numIndexEntries, err := exportIndexEntries(indexDB, blockHeaderGetter, it.Version(), zstdw) 362 - if err != nil { 363 - return stacktrace.Propagate(err) 364 - } 365 - 366 - numNodes, err := exportNodes(it, zstdw) 367 - if err != nil { 368 - return stacktrace.Propagate(err) 369 - } 370 - 371 - err = zstdw.Close() 372 - if err != nil { 373 - return stacktrace.Propagate(err) 374 - } 375 - 376 - err = bw.Flush() 377 - if err != nil { 378 - return stacktrace.Propagate(err) 379 - } 380 - 381 - // find total compressed section size 382 - offset, err := writerSeeker.Seek(0, io.SeekCurrent) 383 - if err != nil { 384 - return stacktrace.Propagate(err) 385 - } 386 - compressedSectionSize := offset - int64(writtenUntilReservedFields) - int64(sizeOfReservedFields) 387 - 388 - // seek back and write empty header fields 389 - 390 - offset, err = writerSeeker.Seek(int64(writtenUntilReservedFields), io.SeekStart) 391 - if err != nil { 392 - return stacktrace.Propagate(err) 393 - } 394 - if offset != int64(writtenUntilReservedFields) { 395 - return stacktrace.NewError("unexpected seek result") 396 - } 397 - 398 - b = make([]byte, sizeOfReservedFields) 399 - binary.BigEndian.PutUint64(b, uint64(compressedSectionSize)) 400 - binary.BigEndian.PutUint64(b[8:], uint64(numIndexEntries)) 401 - binary.BigEndian.PutUint64(b[16:], uint64(numNodes)) 402 - _, err = writerSeeker.Write(b) 403 - if err != nil { 404 - return stacktrace.Propagate(err) 405 - } 406 - 407 - return nil 408 - } 409 - 410 - func exportIndexEntries(indexDB dbm.DB, blockHeaderGetter BlockHeaderGetter, treeVersion int64, w io.Writer) (int64, error) { 411 - numDIDEntries, err := exportIndexDIDEntries(indexDB, treeVersion, w) 412 - if err != nil { 413 - return 0, stacktrace.Propagate(err) 414 - } 415 - 416 - numValidatorParticipationEntries, err := exportIndexValidatorParticipation(indexDB, treeVersion, w) 417 - if err != nil { 418 - return 0, stacktrace.Propagate(err) 419 - } 420 - 421 - numRecentBlockHeaders, err := exportRecentBlockHeaders(blockHeaderGetter, treeVersion, w) 422 - if err != nil { 423 - return 0, stacktrace.Propagate(err) 424 - } 425 - 426 - return numDIDEntries + numValidatorParticipationEntries + numRecentBlockHeaders, nil 427 - } 428 - 429 - func exportIndexDIDEntries(indexDB dbm.DB, treeVersion int64, w io.Writer) (int64, error) { 430 - didLogKeyStart := make([]byte, store.IndexDIDLogKeyLength) 431 - didLogKeyStart[0] = store.IndexDIDLogKeyPrefix 432 - didLogKeyEnd := slices.Repeat([]byte{0xff}, store.IndexDIDLogKeyLength) 433 - didLogKeyEnd[0] = store.IndexDIDLogKeyPrefix 434 - 435 - // reading the index using an iterator while writes happen to its domain technically violates the documented contract for [dbm.Iterator] 436 - // in practice, we know this is safe because of how we implemented [badgertodbm.BadgerDB.IteratorWithOptions] - it uses a read-only transaction per iterator 437 - iterator, err := indexDB.Iterator(didLogKeyStart, didLogKeyEnd) 438 - if err != nil { 439 - return 0, stacktrace.Propagate(err) 440 - } 441 - defer iterator.Close() 442 - 443 - numEntries := int64(0) 444 - for iterator.Valid() { 445 - key := iterator.Key() 446 - value := iterator.Value() 447 - 448 - validFromHeight, validToHeight := store.UnmarshalDIDLogValue(value) 449 - if uint64(treeVersion) >= validFromHeight && uint64(treeVersion) <= validToHeight { 450 - header := make([]byte, 4+4) 451 - binary.BigEndian.PutUint32(header, uint32(len(key))) 452 - binary.BigEndian.PutUint32(header[4:], uint32(len(value))) 453 - 454 - _, err = w.Write(header) 455 - if err != nil { 456 - return 0, stacktrace.Propagate(err) 457 - } 458 - 459 - _, err = w.Write(key) 460 - if err != nil { 461 - return 0, stacktrace.Propagate(err) 462 - } 463 - 464 - _, err = w.Write(value) 465 - if err != nil { 466 - return 0, stacktrace.Propagate(err) 467 - } 468 - 469 - numEntries++ 470 - } 471 - 472 - iterator.Next() 473 - } 474 - return numEntries, nil 475 - } 476 - 477 - func exportIndexValidatorParticipation(indexDB dbm.DB, treeVersion int64, w io.Writer) (int64, error) { 478 - epochHeight := uint64(treeVersion) - uint64(treeVersion)%UpdateValidatorsBlockInterval 479 - startKey := store.MarshalValidatorVotingActivityKey(uint64(epochHeight), make([]byte, store.AddressLength)) 480 - endKey := store.MarshalValidatorVotingActivityKey(uint64(epochHeight), slices.Repeat([]byte{0xff}, store.AddressLength)) 481 - 482 - // reading the index using an iterator while writes happen to its domain technically violates the documented contract for [dbm.Iterator] 483 - // in practice, we know this is safe because of how we implemented [badgertodbm.BadgerDB.IteratorWithOptions] - it uses a read-only transaction per iterator 484 - iterator, err := indexDB.Iterator(startKey, endKey) 485 - if err != nil { 486 - return 0, stacktrace.Propagate(err) 487 - } 488 - defer iterator.Close() 489 - 490 - numEntries := int64(0) 491 - for iterator.Valid() { 492 - key := iterator.Key() 493 - value := iterator.Value() 494 - 495 - header := make([]byte, 4+4) 496 - binary.BigEndian.PutUint32(header, uint32(len(key))) 497 - binary.BigEndian.PutUint32(header[4:], uint32(len(value))) 498 - 499 - _, err = w.Write(header) 500 - if err != nil { 501 - return 0, stacktrace.Propagate(err) 502 - } 503 - 504 - _, err = w.Write(key) 505 - if err != nil { 506 - return 0, stacktrace.Propagate(err) 507 - } 508 - 509 - _, err = w.Write(value) 510 - if err != nil { 511 - return 0, stacktrace.Propagate(err) 512 - } 513 - 514 - numEntries++ 515 - 516 - iterator.Next() 517 - } 518 - 519 - if numEntries == 0 { 520 - // there should always be at least one active validator 521 - return 0, stacktrace.NewError("unexpectedly missing index entries for validator voting participation - treeVersion may be too old to export") 522 - } 523 - 524 - return numEntries, nil 525 - } 526 - 527 - func exportRecentBlockHeaders(blockHeaderGetter BlockHeaderGetter, treeVersion int64, w io.Writer) (int64, error) { 528 - // export sufficient block headers for the nodes resuming from this snapshot to be able to e.g. execute TransactionActionCompleteChallenge 529 - // (i.e. transactions which depend on recent block headers to determine end state) 530 - 531 - numBlockHeaders := max(CommitToChallengeMaxAgeInBlocks, CompleteChallengeMaxAgeInBlocks) + 5 // 5 blocks safety margin 532 - startHeight := treeVersion - int64(numBlockHeaders) 533 - if startHeight < 1 { 534 - startHeight = 1 535 - } 536 - 537 - numExportedBlockHeaders := int64(0) 538 - for height := startHeight; height <= treeVersion; height++ { 539 - blockHeader, err := blockHeaderGetter(height) 540 - if err != nil { 541 - return 0, stacktrace.Propagate(err) 542 - } 543 - 544 - blockHeaderProto := blockHeader.ToProto() 545 - blockHeaderBytes, err := blockHeaderProto.Marshal() 546 - if err != nil { 547 - return 0, stacktrace.Propagate(err) 548 - } 549 - 550 - key := make([]byte, 1+8) 551 - key[0] = store.IndexBlockHeaderKeyPrefix 552 - binary.BigEndian.PutUint64(key[1:], uint64(height)) 553 - 554 - header := make([]byte, 4+4) 555 - binary.BigEndian.PutUint32(header, uint32(len(key))) 556 - binary.BigEndian.PutUint32(header[4:], uint32(len(blockHeaderBytes))) 557 - 558 - _, err = w.Write(header) 559 - if err != nil { 560 - return 0, stacktrace.Propagate(err) 561 - } 562 - 563 - _, err = w.Write(key) 564 - if err != nil { 565 - return 0, stacktrace.Propagate(err) 566 - } 567 - 568 - _, err = w.Write(blockHeaderBytes) 569 - if err != nil { 570 - return 0, stacktrace.Propagate(err) 571 - } 572 - 573 - numExportedBlockHeaders++ 574 - } 575 - 576 - return numExportedBlockHeaders, nil 577 - } 578 - 579 - func exportNodes(it *iavl.ImmutableTree, w io.Writer) (int64, error) { 580 - exporter, err := it.Export() 581 - if err != nil { 582 - return 0, stacktrace.Propagate(err) 583 - } 584 - defer exporter.Close() 585 - cexporter := iavl.NewCompressExporter(exporter) 586 - 587 - // 1 byte for node height 588 - // 8 bytes for node version 589 - // 4 bytes for node key length (0xffffffff if node key is nil) 590 - // 4 bytes for node value length (0xffffffff if node value is nil) 591 - // this buffer is completely rewritten on every iteration 592 - nodeHeaderBuffer := make([]byte, 1+8+4+4) 593 - 594 - numNodes := int64(0) 595 - for { 596 - node, err := cexporter.Next() 597 - if errors.Is(err, iavl.ErrorExportDone) { 598 - break 599 - } 600 - if err != nil { 601 - return 0, stacktrace.Propagate(err) 602 - } 603 - 604 - nodeHeaderBuffer[0] = byte(node.Height) 605 - 606 - binary.BigEndian.PutUint64(nodeHeaderBuffer[1:], uint64(node.Version)) 607 - 608 - // nil node values are different from 0-byte values 609 - if node.Key != nil { 610 - binary.BigEndian.PutUint32(nodeHeaderBuffer[9:13], uint32(len(node.Key))) 611 - } else { 612 - copy(nodeHeaderBuffer[9:13], []byte{0xff, 0xff, 0xff, 0xff}) 613 - } 614 - 615 - if node.Value != nil { 616 - binary.BigEndian.PutUint32(nodeHeaderBuffer[13:17], uint32(len(node.Value))) 617 - } else { 618 - copy(nodeHeaderBuffer[13:17], []byte{0xff, 0xff, 0xff, 0xff}) 619 - } 620 - 621 - _, err = w.Write(nodeHeaderBuffer) 622 - if err != nil { 623 - return 0, stacktrace.Propagate(err) 624 - } 625 - 626 - _, err = w.Write(node.Key) 627 - if err != nil { 628 - return 0, stacktrace.Propagate(err) 629 - } 630 - 631 - _, err = w.Write(node.Value) 632 - if err != nil { 633 - return 0, stacktrace.Propagate(err) 634 - } 635 - numNodes++ 636 - } 637 - 638 - return numNodes, nil 639 - } 640 - 641 - func writeChunkHashes(snapshotFile io.ReadSeeker, w io.Writer) error { 642 - bw := bufio.NewWriter(w) 643 - defer bw.Flush() 644 - 645 - _, err := snapshotFile.Seek(0, io.SeekStart) 646 - if err != nil { 647 - return stacktrace.Propagate(err) 648 - } 649 - 650 - buf := make([]byte, snapshotChunkSize) 651 - for { 652 - n, err := io.ReadFull(snapshotFile, buf) 653 - if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { 654 - return stacktrace.Propagate(err) 655 - } 656 - if n == 0 { 657 - break 658 - } 659 - 660 - hash := sha256.Sum256(buf[:n]) 661 - c, err := w.Write(hash[:]) 662 - if err != nil { 663 - return stacktrace.Propagate(err) 664 - } 665 - 666 - if c != snapshotChunkHashSize { 667 - return stacktrace.NewError("unexpected chunk hash size") 668 - } 669 - 670 - if n < snapshotChunkSize { 671 - break 672 - } 673 - } 674 - 675 - return nil 676 - } 677 - 678 - type snapshotApplier struct { 679 - indexBatch dbm.Batch 680 - tree *iavl.MutableTree 681 - treeVersion int64 682 - expectedFinalHash []byte 683 - expectedChunkHashes [][]byte 684 - 685 - pipeWriter *io.PipeWriter 686 - pipeReader *io.PipeReader 687 - zstdReader io.ReadCloser 688 - 689 - importer *iavl.Importer 690 - compressImporter iavl.NodeImporter 691 - importerWg sync.WaitGroup 692 - 693 - numImportedNodes int 694 - claimedNodeCount int 695 - numImportedIndexEntries int 696 - claimedIndexEntryCount int 697 - done bool 698 - } 699 - 700 - var errMalformedChunk = errors.New("malformed chunk") 701 - var errInvalidMetadata = errors.New("invalid metadata") 702 - var errTreeHashMismatch = errors.New("tree hash mismatch") 703 - 704 - func (d *DIDPLCApplication) beginApplyingSnapshot(treeVersion int64, expectedFinalHash []byte, expectedNumChunks int, chunksums []byte) (*snapshotApplier, error) { 705 - if len(chunksums)%snapshotChunkHashSize != 0 || len(chunksums)/snapshotChunkHashSize != expectedNumChunks { 706 - return nil, stacktrace.Propagate(errInvalidMetadata) 212 + func (d *DIDPLCApplication) beginApplyingSnapshot(treeVersion int64, expectedFinalHash []byte, expectedNumChunks int, chunksums []byte) (*store.SnapshotApplier, error) { 213 + if len(chunksums)%store.SnapshotChunkHashSize != 0 || len(chunksums)/store.SnapshotChunkHashSize != expectedNumChunks { 214 + return nil, stacktrace.Propagate(store.ErrInvalidMetadata) 707 215 } 708 216 709 217 if !d.tree.IsEmpty() { ··· 713 221 } 714 222 } 715 223 716 - importer, err := d.tree.Import(treeVersion) 717 - if err != nil { 718 - return nil, stacktrace.Propagate(err) 719 - } 720 - 721 - pipeReader, pipeWriter := io.Pipe() 722 - 723 - zstdReader, err := zstd.NewReader(pipeReader) 724 - if err != nil { 725 - return nil, stacktrace.Propagate(err) 726 - } 727 - 728 224 chunkHashes := make([][]byte, 0, expectedNumChunks) 729 - for hash := range slices.Chunk(chunksums, snapshotChunkHashSize) { 225 + for hash := range slices.Chunk(chunksums, store.SnapshotChunkHashSize) { 730 226 chunkHashes = append(chunkHashes, hash) 731 227 } 732 228 733 - return &snapshotApplier{ 734 - indexBatch: d.indexDB.NewBatch(), 735 - tree: d.tree, 736 - treeVersion: treeVersion, 737 - expectedFinalHash: expectedFinalHash, 738 - expectedChunkHashes: chunkHashes, 739 - 740 - pipeWriter: pipeWriter, 741 - pipeReader: pipeReader, 742 - zstdReader: zstdReader.IOReadCloser(), 743 - 744 - importer: importer, 745 - compressImporter: iavl.NewCompressImporter(importer), 746 - }, nil 747 - } 748 - 749 - func (a *snapshotApplier) Apply(chunkIndex int, chunkBytes []byte) error { 750 - if len(chunkBytes) > snapshotChunkSize { 751 - return stacktrace.Propagate(errMalformedChunk, "chunk too large") 752 - } 753 - hash := sha256.Sum256(chunkBytes) 754 - if !bytes.Equal(a.expectedChunkHashes[chunkIndex], hash[:]) { 755 - return stacktrace.Propagate(errMalformedChunk, "hash mismatch") 756 - } 757 - 758 - if chunkIndex == 0 { 759 - if len(chunkBytes) < 88 { 760 - return stacktrace.Propagate(errMalformedChunk, "chunk too small") 761 - } 762 - 763 - if string(chunkBytes[0:18]) != "didplcbft-snapshot" { 764 - return stacktrace.Propagate(errMalformedChunk, "invalid file magic") 765 - } 766 - 767 - if binary.BigEndian.Uint32(chunkBytes[20:]) != 2 { 768 - return stacktrace.Propagate(errMalformedChunk, "invalid snapshot format") 769 - } 770 - 771 - if binary.BigEndian.Uint64(chunkBytes[24:]) != uint64(a.treeVersion) { 772 - return stacktrace.Propagate(errMalformedChunk, "mismatched tree version") 773 - } 774 - 775 - if !bytes.Equal(chunkBytes[32:64], a.expectedFinalHash) { 776 - return stacktrace.Propagate(errMalformedChunk, "mismatched declared tree hash") 777 - } 778 - 779 - declaredFileSize := 88 + binary.BigEndian.Uint64(chunkBytes[64:]) 780 - minExpectedSize := uint64((len(a.expectedChunkHashes) - 1) * snapshotChunkSize) 781 - maxExpectedSize := uint64(len(a.expectedChunkHashes) * snapshotChunkSize) 782 - if declaredFileSize < minExpectedSize || 783 - declaredFileSize > maxExpectedSize { 784 - return stacktrace.Propagate(errMalformedChunk, "unexpected compressed section length") 785 - } 786 - 787 - a.claimedIndexEntryCount = int(binary.BigEndian.Uint64(chunkBytes[72:])) 788 - a.claimedNodeCount = int(binary.BigEndian.Uint64(chunkBytes[80:])) 789 - 790 - // move to the start of the compressed portion 791 - chunkBytes = chunkBytes[88:] 792 - 793 - a.importerWg.Go(a.streamingImporter) 794 - } 795 - 796 - _, err := a.pipeWriter.Write(chunkBytes) 797 - if err != nil { 798 - return stacktrace.Propagate(err) 799 - } 800 - 801 - isLastChunk := chunkIndex == len(a.expectedChunkHashes)-1 802 - if isLastChunk { 803 - _ = a.pipeWriter.Close() 804 - // wait for importer to finish reading and importing everything 805 - a.importerWg.Wait() 806 - 807 - if a.numImportedIndexEntries != a.claimedIndexEntryCount { 808 - return stacktrace.Propagate(errTreeHashMismatch, "imported index entry count mismatch") 809 - } 810 - 811 - if a.numImportedNodes != a.claimedNodeCount { 812 - return stacktrace.Propagate(errTreeHashMismatch, "imported node count mismatch") 813 - } 814 - 815 - err := a.indexBatch.Write() 816 - if err != nil { 817 - return stacktrace.Propagate(err) 818 - } 819 - 820 - err = a.importer.Commit() 821 - if err != nil { 822 - if strings.Contains(err.Error(), "invalid node structure") { 823 - return stacktrace.Propagate(errors.Join(errMalformedChunk, err)) 824 - } 825 - return stacktrace.Propagate(err) 826 - } 827 - 828 - a.closeCommons() 829 - a.done = true 830 - 831 - if !bytes.Equal(a.tree.Hash(), a.expectedFinalHash) { 832 - return stacktrace.Propagate(errTreeHashMismatch) 833 - } 834 - } 835 - 836 - return nil 837 - } 838 - 839 - func (a *snapshotApplier) streamingImporter() { 840 - for { 841 - if a.numImportedIndexEntries < a.claimedIndexEntryCount { 842 - entryHeader := make([]byte, 4+4) 843 - n, err := io.ReadFull(a.zstdReader, entryHeader) 844 - if err != nil || n != 8 { 845 - // err may be EOF here, which is expected 846 - return 847 - } 848 - 849 - // validate lengths against sensible limits to prevent OOM DoS by malicious third parties 850 - keyLength := binary.BigEndian.Uint32(entryHeader[0:4]) 851 - valueLength := binary.BigEndian.Uint32(entryHeader[4:8]) 852 - if keyLength > 1024*1024 || valueLength > 1024*1024 { 853 - return 854 - } 855 - 856 - key := make([]byte, keyLength) 857 - 858 - n, err = io.ReadFull(a.zstdReader, key) 859 - if err != nil || n != len(key) { 860 - // this shouldn't happen unless the data is corrupt 861 - // we can return silently here because since we didn't import all nodes, the tree hash won't match anyway 862 - return 863 - } 864 - 865 - value := make([]byte, valueLength) 866 - n, err = io.ReadFull(a.zstdReader, value) 867 - if err != nil || n != len(value) { 868 - return 869 - } 870 - 871 - err = a.indexBatch.Set(key, value) 872 - if err != nil { 873 - // we can return silently here because since we didn't import all nodes, the tree hash won't match anyway 874 - return 875 - } 876 - 877 - a.numImportedIndexEntries++ 878 - } else { 879 - nodeHeader := make([]byte, 9+4+4) 880 - n, err := io.ReadFull(a.zstdReader, nodeHeader) 881 - if err != nil || n != 9+4+4 { 882 - // err may be EOF here, which is expected 883 - return 884 - } 885 - 886 - // validate lengths against sensible limits to prevent OOM DoS by malicious third parties 887 - keyLength := binary.BigEndian.Uint32(nodeHeader[9:13]) 888 - var key []byte 889 - if keyLength != 0xffffffff { 890 - if keyLength > 1024*1024 { 891 - return 892 - } 893 - key = make([]byte, keyLength) 894 - 895 - n, err = io.ReadFull(a.zstdReader, key) 896 - if err != nil || n != len(key) { 897 - // this shouldn't happen unless the data is corrupt 898 - // we can return silently here because since we didn't import all nodes, the tree hash won't match anyway 899 - return 900 - } 901 - } 902 - 903 - valueLength := binary.BigEndian.Uint32(nodeHeader[13:17]) 904 - var value []byte 905 - if valueLength != 0xffffffff { 906 - if valueLength > 1024*1024 { 907 - return 908 - } 909 - value = make([]byte, valueLength) 910 - n, err = io.ReadFull(a.zstdReader, value) 911 - if err != nil || n != len(value) { 912 - return 913 - } 914 - } 915 - 916 - err = a.compressImporter.Add(&iavl.ExportNode{ 917 - Height: int8(nodeHeader[0]), 918 - Version: int64(binary.BigEndian.Uint64(nodeHeader[1:9])), 919 - Key: key, 920 - Value: value, 921 - }) 922 - if err != nil { 923 - // this shouldn't happen unless the data is corrupt 924 - // we can return silently here because since we didn't import all nodes, the tree hash won't match anyway 925 - return 926 - } 927 - a.numImportedNodes++ 928 - } 929 - } 930 - } 931 - 932 - func (a *snapshotApplier) Abort() error { 933 - err := a.closeCommons() 934 - if err != nil { 935 - return stacktrace.Propagate(err) 936 - } 937 - 938 - err = a.tree.DeleteVersionsFrom(0) 939 - if err != nil { 940 - return stacktrace.Propagate(err) 941 - } 942 - 943 - return nil 944 - } 945 - 946 - func (a *snapshotApplier) closeCommons() error { 947 - err := a.zstdReader.Close() 948 - if err != nil { 949 - return stacktrace.Propagate(err) 950 - } 951 - 952 - err = a.pipeReader.Close() 229 + applier, err := store.Snapshot.CreateSnapshotApplier(d.tree, d.txFactory, treeVersion, expectedFinalHash, chunkHashes) 953 230 if err != nil { 954 - return stacktrace.Propagate(err) 231 + return nil, stacktrace.Propagate(err) 955 232 } 956 233 957 - err = a.pipeWriter.Close() 958 - if err != nil { 959 - return stacktrace.Propagate(err) 960 - } 961 - 962 - err = a.indexBatch.Close() 963 - if err != nil { 964 - return stacktrace.Propagate(err) 965 - } 966 - 967 - a.importerWg.Wait() 968 - a.importer.Close() 969 - 970 - return nil 971 - } 972 - 973 - func (a *snapshotApplier) Done() bool { 974 - return a.done 234 + return applier, nil 975 235 }
+2 -1
abciapp/tx.go
··· 12 12 cbornode "github.com/ipfs/go-ipld-cbor" 13 13 "github.com/samber/mo" 14 14 "tangled.org/gbl08ma.com/didplcbft/plc" 15 + "tangled.org/gbl08ma.com/didplcbft/store" 15 16 "tangled.org/gbl08ma.com/didplcbft/transaction" 16 17 ) 17 18 ··· 37 38 getAuthoritativeOperationsFetcher func(plc string) *authoritativeOperationsFetcher 38 39 destroyAuthoritativeOperationsFetcher func() 39 40 blockChallengeCoordinator *blockChallengeCoordinator 40 - blockHeaderGetter BlockHeaderGetter 41 + blockHeaderGetter store.BlockHeaderGetter 41 42 } 42 43 43 44 type TransactionProcessor func(ctx context.Context, deps TransactionProcessorDependencies, txBytes []byte) (*processResult, error)
+4
badgertodbm/badger.go
··· 128 128 if err != nil { 129 129 return err 130 130 } 131 + if db.Opts().InMemory { 132 + // calling Sync in memory mode will cause panic 133 + return nil 134 + } 131 135 return db.Sync() 132 136 } 133 137
+1 -7
main.go
··· 16 16 "github.com/cometbft/cometbft/mempool" 17 17 "github.com/cometbft/cometbft/p2p" 18 18 "github.com/cometbft/cometbft/privval" 19 - cmtproto "github.com/cometbft/cometbft/proto/tendermint/types" 20 19 "github.com/cometbft/cometbft/proxy" 21 20 cmtstore "github.com/cometbft/cometbft/store" 22 21 cmttypes "github.com/cometbft/cometbft/types" ··· 138 137 readTx := txFactory.ReadWorking(time.Now()) 139 138 blockHeader, err := store.Consensus.FallbackBlockHeader(readTx, uint64(height)) 140 139 if err == nil { 141 - var protoHeader *cmtproto.Header 142 - err = protoHeader.Unmarshal(blockHeader) 143 - if err == nil { 144 - blockHeader, err := cmttypes.HeaderFromProto(protoHeader) 145 - return blockHeader, stacktrace.Propagate(err) 146 - } 140 + return blockHeader, nil 147 141 } 148 142 } 149 143
+17 -5
store/consensus.go
··· 15 15 16 16 "github.com/OffchainLabs/go-bitfield" 17 17 "github.com/bluesky-social/indigo/atproto/syntax" 18 + cmtproto "github.com/cometbft/cometbft/proto/tendermint/types" 19 + cmttypes "github.com/cometbft/cometbft/types" 18 20 ics23 "github.com/cosmos/ics23/go" 19 21 "github.com/dgraph-io/badger/v4" 20 22 "github.com/did-method-plc/go-didplc" ··· 99 101 InitializeValidatorVotingActivity(tx transaction.WriteIndex, validatorAddress, validatorPubKey []byte, epochHeight uint64) error 100 102 MarkValidatorVote(tx transaction.WriteIndex, validatorAddress []byte, height uint64) error 101 103 102 - FallbackBlockHeader(tx transaction.Read, height uint64) ([]byte, error) 104 + FallbackBlockHeader(tx transaction.Read, height uint64) (cmttypes.Header, error) 103 105 104 106 ConfigureEpochSize(epochSize uint64) 105 107 } ··· 1133 1135 } 1134 1136 1135 1137 // FallbackBlockHeader implements [ConsensusStore]. 1136 - func (t *consensusStore) FallbackBlockHeader(tx transaction.Read, height uint64) ([]byte, error) { 1138 + func (t *consensusStore) FallbackBlockHeader(tx transaction.Read, height uint64) (cmttypes.Header, error) { 1137 1139 key := make([]byte, IndexBlockHeaderKeyLength) 1138 1140 key[0] = IndexBlockHeaderKeyPrefix 1139 1141 binary.BigEndian.PutUint64(key[1:], height) 1140 1142 1141 1143 value, err := tx.IndexDB().Get(key) 1142 1144 if err != nil { 1143 - return nil, stacktrace.Propagate(err) 1145 + return cmttypes.Header{}, stacktrace.Propagate(err) 1144 1146 } 1145 1147 if value == nil { 1146 - return nil, stacktrace.NewError("block header not found") 1148 + return cmttypes.Header{}, stacktrace.NewError("block header not found") 1147 1149 } 1148 - return value, nil 1150 + 1151 + var protoHeader cmtproto.Header 1152 + err = protoHeader.Unmarshal(value) 1153 + if err != nil { 1154 + return cmttypes.Header{}, stacktrace.Propagate(err, "failed to unmarshal block header") 1155 + } 1156 + header, err := cmttypes.HeaderFromProto(&protoHeader) 1157 + if err != nil { 1158 + return cmttypes.Header{}, stacktrace.Propagate(err, "failed to convert proto header to header") 1159 + } 1160 + return header, nil 1149 1161 }
+830
store/snapshot.go
··· 1 + package store 2 + 3 + import ( 4 + "bufio" 5 + "bytes" 6 + "crypto/sha256" 7 + "encoding/binary" 8 + "errors" 9 + "fmt" 10 + "io" 11 + "math" 12 + "os" 13 + "path/filepath" 14 + "slices" 15 + "strconv" 16 + "strings" 17 + "sync" 18 + "time" 19 + 20 + cmttypes "github.com/cometbft/cometbft/types" 21 + "github.com/cosmos/iavl" 22 + "github.com/gbl08ma/stacktrace" 23 + "github.com/klauspost/compress/zstd" 24 + "github.com/samber/lo" 25 + "tangled.org/gbl08ma.com/didplcbft/transaction" 26 + ) 27 + 28 + // Snapshot format constants 29 + const ( 30 + SnapshotChunkSize = 10 * 1024 * 1024 // 10 MB 31 + SnapshotChunkHashSize = 32 32 + SnapshotFormatVersion = 2 33 + SnapshotFileMagic = "didplcbft-snapshot" 34 + ) 35 + 36 + // BlockHeaderGetter is a function type for retrieving block headers 37 + type BlockHeaderGetter func(height int64) (cmttypes.Header, error) 38 + 39 + // ErrMalformedChunk is returned when a snapshot chunk is invalid 40 + var ErrMalformedChunk = errors.New("malformed chunk") 41 + 42 + // ErrInvalidMetadata is returned when snapshot metadata is invalid 43 + var ErrInvalidMetadata = errors.New("invalid metadata") 44 + 45 + // ErrTreeHashMismatch is returned when the imported tree hash doesn't match expected 46 + var ErrTreeHashMismatch = errors.New("tree hash mismatch") 47 + 48 + // ErrIndexEntryCountMismatch is returned when the imported index entry count doesn't match expected 49 + var ErrIndexEntryCountMismatch = errors.New("index entry count mismatch") 50 + 51 + // SnapshotStore provides snapshot creation and import functionality 52 + type SnapshotStore struct{} 53 + 54 + var Snapshot = &SnapshotStore{} 55 + 56 + // Export creates a snapshot of the current state and writes it to the provided writer 57 + func (s *SnapshotStore) Export(writerSeeker io.WriteSeeker, tx transaction.Read, blockHeaderGetter BlockHeaderGetter, updateValidatorsBlockInterval uint64, numRecentBlockHeaders int64) error { 58 + return writeSnapshot(writerSeeker, tx, blockHeaderGetter, updateValidatorsBlockInterval, numRecentBlockHeaders) 59 + } 60 + 61 + // WriteChunkHashes calculates and writes chunk hashes for a snapshot file 62 + func (s *SnapshotStore) WriteChunkHashes(snapshotFile io.ReadSeeker, w io.Writer) error { 63 + return writeChunkHashes(snapshotFile, w) 64 + } 65 + 66 + // CreateSnapshotApplier creates a new SnapshotApplier for applying a snapshot 67 + func (s *SnapshotStore) CreateSnapshotApplier(tree *iavl.MutableTree, txFactory *transaction.Factory, treeVersion int64, expectedFinalHash []byte, expectedChunkHashes [][]byte) (*SnapshotApplier, error) { 68 + writeTx, err := txFactory.ReadWorking(time.Now()).UpgradeForIndexOnly() 69 + if err != nil { 70 + return nil, stacktrace.Propagate(err) 71 + } 72 + 73 + sa := &SnapshotApplier{ 74 + writeIndex: writeTx, 75 + tree: tree, 76 + treeVersion: treeVersion, 77 + expectedFinalHash: expectedFinalHash, 78 + expectedChunkHashes: expectedChunkHashes, 79 + } 80 + 81 + sa.importer, err = sa.tree.Import(treeVersion) 82 + if err != nil { 83 + return nil, stacktrace.Propagate(err) 84 + } 85 + 86 + sa.compressImporter = iavl.NewCompressImporter(sa.importer) 87 + 88 + return sa, nil 89 + } 90 + 91 + func writeSnapshot(writerSeeker io.WriteSeeker, tx transaction.Read, blockHeaderGetter BlockHeaderGetter, updateValidatorsBlockInterval uint64, numRecentBlockHeaders int64) error { 92 + it, ok := transaction.UnderlyingImmutableTree(tx.Tree()) 93 + if !ok { 94 + return stacktrace.NewError("expected immutable tree") 95 + } 96 + 97 + writtenUntilReservedFields := 0 98 + 99 + bw := bufio.NewWriter(writerSeeker) 100 + 101 + // file magic and version 102 + c, err := bw.Write([]byte(SnapshotFileMagic)) 103 + if err != nil { 104 + return stacktrace.Propagate(err) 105 + } 106 + writtenUntilReservedFields += c 107 + 108 + c, err = bw.Write([]byte{0, 0, 0, 0, 0, byte(SnapshotFormatVersion)}) 109 + if err != nil { 110 + return stacktrace.Propagate(err) 111 + } 112 + writtenUntilReservedFields += c 113 + 114 + b := make([]byte, 8) 115 + binary.BigEndian.PutUint64(b, uint64(it.Version())) 116 + c, err = bw.Write(b) 117 + if err != nil { 118 + return stacktrace.Propagate(err) 119 + } 120 + writtenUntilReservedFields += c 121 + 122 + c, err = bw.Write(it.Hash()) 123 + if err != nil { 124 + return stacktrace.Propagate(err) 125 + } 126 + writtenUntilReservedFields += c 127 + 128 + // reserve space for writing: 129 + // - 8 bytes for compressed section size in bytes 130 + // - 8 bytes for number of index entries 131 + // - 8 bytes for number of nodes 132 + sizeOfReservedFields := 8 * 3 133 + b = make([]byte, sizeOfReservedFields) 134 + _, err = bw.Write(b) 135 + if err != nil { 136 + return stacktrace.Propagate(err) 137 + } 138 + 139 + zstdw, err := zstd.NewWriter(bw, zstd.WithEncoderLevel(zstd.SpeedBetterCompression)) 140 + if err != nil { 141 + return stacktrace.Propagate(err) 142 + } 143 + 144 + numIndexEntries, err := exportIndexEntries(tx, blockHeaderGetter, updateValidatorsBlockInterval, numRecentBlockHeaders, it.Version(), zstdw) 145 + if err != nil { 146 + return stacktrace.Propagate(err) 147 + } 148 + 149 + numNodes, err := exportNodes(it, zstdw) 150 + if err != nil { 151 + return stacktrace.Propagate(err) 152 + } 153 + 154 + err = zstdw.Close() 155 + if err != nil { 156 + return stacktrace.Propagate(err) 157 + } 158 + 159 + err = bw.Flush() 160 + if err != nil { 161 + return stacktrace.Propagate(err) 162 + } 163 + 164 + // find total compressed section size 165 + offset, err := writerSeeker.Seek(0, io.SeekCurrent) 166 + if err != nil { 167 + return stacktrace.Propagate(err) 168 + } 169 + compressedSectionSize := offset - int64(writtenUntilReservedFields) - int64(sizeOfReservedFields) 170 + 171 + // seek back and write empty header fields 172 + 173 + offset, err = writerSeeker.Seek(int64(writtenUntilReservedFields), io.SeekStart) 174 + if err != nil { 175 + return stacktrace.Propagate(err) 176 + } 177 + if offset != int64(writtenUntilReservedFields) { 178 + return stacktrace.NewError("unexpected seek result") 179 + } 180 + 181 + b = make([]byte, sizeOfReservedFields) 182 + binary.BigEndian.PutUint64(b, uint64(compressedSectionSize)) 183 + binary.BigEndian.PutUint64(b[8:], uint64(numIndexEntries)) 184 + binary.BigEndian.PutUint64(b[16:], uint64(numNodes)) 185 + _, err = writerSeeker.Write(b) 186 + if err != nil { 187 + return stacktrace.Propagate(err) 188 + } 189 + 190 + return nil 191 + } 192 + 193 + func exportIndexEntries(tx transaction.Read, blockHeaderGetter BlockHeaderGetter, updateValidatorsBlockInterval uint64, numRecentBlockHeaders int64, treeVersion int64, w io.Writer) (int64, error) { 194 + indexDB := tx.IndexDB() 195 + numDIDEntries, err := exportIndexDIDEntries(indexDB, treeVersion, w) 196 + if err != nil { 197 + return 0, stacktrace.Propagate(err) 198 + } 199 + 200 + numValidatorParticipationEntries, err := exportIndexValidatorParticipation(indexDB, updateValidatorsBlockInterval, treeVersion, w) 201 + if err != nil { 202 + return 0, stacktrace.Propagate(err) 203 + } 204 + 205 + numRecentBlockHeadersExported, err := exportRecentBlockHeaders(blockHeaderGetter, numRecentBlockHeaders, treeVersion, w) 206 + if err != nil { 207 + return 0, stacktrace.Propagate(err) 208 + } 209 + 210 + return numDIDEntries + numValidatorParticipationEntries + numRecentBlockHeadersExported, nil 211 + } 212 + 213 + func exportIndexDIDEntries(indexDB transaction.IndexReader, treeVersion int64, w io.Writer) (int64, error) { 214 + didLogKeyStart := make([]byte, IndexDIDLogKeyLength) 215 + didLogKeyStart[0] = IndexDIDLogKeyPrefix 216 + didLogKeyEnd := slices.Repeat([]byte{0xff}, IndexDIDLogKeyLength) 217 + didLogKeyEnd[0] = IndexDIDLogKeyPrefix 218 + 219 + iterator, err := indexDB.Iterator(didLogKeyStart, didLogKeyEnd) 220 + if err != nil { 221 + return 0, stacktrace.Propagate(err) 222 + } 223 + defer iterator.Close() 224 + 225 + numEntries := int64(0) 226 + for iterator.Valid() { 227 + key := iterator.Key() 228 + value := iterator.Value() 229 + 230 + validFromHeight, validToHeight := UnmarshalDIDLogValue(value) 231 + if uint64(treeVersion) >= validFromHeight && uint64(treeVersion) <= validToHeight { 232 + header := make([]byte, 4+4) 233 + binary.BigEndian.PutUint32(header, uint32(len(key))) 234 + binary.BigEndian.PutUint32(header[4:], uint32(len(value))) 235 + 236 + _, err = w.Write(header) 237 + if err != nil { 238 + return 0, stacktrace.Propagate(err) 239 + } 240 + 241 + _, err = w.Write(key) 242 + if err != nil { 243 + return 0, stacktrace.Propagate(err) 244 + } 245 + 246 + _, err = w.Write(value) 247 + if err != nil { 248 + return 0, stacktrace.Propagate(err) 249 + } 250 + 251 + numEntries++ 252 + } 253 + 254 + iterator.Next() 255 + } 256 + return numEntries, nil 257 + } 258 + 259 + func exportIndexValidatorParticipation(indexDB transaction.IndexReader, updateValidatorsBlockInterval uint64, treeVersion int64, w io.Writer) (int64, error) { 260 + epochHeight := uint64(treeVersion) - uint64(treeVersion)%updateValidatorsBlockInterval 261 + startKey := MarshalValidatorVotingActivityKey(uint64(epochHeight), make([]byte, AddressLength)) 262 + endKey := MarshalValidatorVotingActivityKey(uint64(epochHeight), slices.Repeat([]byte{0xff}, AddressLength)) 263 + 264 + iterator, err := indexDB.Iterator(startKey, endKey) 265 + if err != nil { 266 + return 0, stacktrace.Propagate(err) 267 + } 268 + defer iterator.Close() 269 + 270 + numEntries := int64(0) 271 + for iterator.Valid() { 272 + key := iterator.Key() 273 + value := iterator.Value() 274 + 275 + header := make([]byte, 4+4) 276 + binary.BigEndian.PutUint32(header, uint32(len(key))) 277 + binary.BigEndian.PutUint32(header[4:], uint32(len(value))) 278 + 279 + _, err = w.Write(header) 280 + if err != nil { 281 + return 0, stacktrace.Propagate(err) 282 + } 283 + 284 + _, err = w.Write(key) 285 + if err != nil { 286 + return 0, stacktrace.Propagate(err) 287 + } 288 + 289 + _, err = w.Write(value) 290 + if err != nil { 291 + return 0, stacktrace.Propagate(err) 292 + } 293 + 294 + numEntries++ 295 + 296 + iterator.Next() 297 + } 298 + 299 + if numEntries == 0 { 300 + // there should always be at least one active validator 301 + return 0, stacktrace.NewError("unexpectedly missing index entries for validator voting participation - treeVersion may be too old to export") 302 + } 303 + 304 + return numEntries, nil 305 + } 306 + 307 + func exportRecentBlockHeaders(blockHeaderGetter BlockHeaderGetter, numRecentBlockHeaders int64, treeVersion int64, w io.Writer) (int64, error) { 308 + startHeight := treeVersion - numRecentBlockHeaders + 1 // plus one because we want to include the block at treeVersion 309 + if startHeight < 1 { 310 + startHeight = 1 311 + } 312 + 313 + numExportedBlockHeaders := int64(0) 314 + for height := startHeight; height <= treeVersion; height++ { 315 + blockHeader, err := blockHeaderGetter(height) 316 + if err != nil { 317 + return 0, stacktrace.Propagate(err) 318 + } 319 + 320 + blockHeaderProto := blockHeader.ToProto() 321 + blockHeaderBytes, err := blockHeaderProto.Marshal() 322 + if err != nil { 323 + return 0, stacktrace.Propagate(err) 324 + } 325 + 326 + key := make([]byte, 1+8) 327 + key[0] = IndexBlockHeaderKeyPrefix 328 + binary.BigEndian.PutUint64(key[1:], uint64(height)) 329 + 330 + header := make([]byte, 4+4) 331 + binary.BigEndian.PutUint32(header, uint32(len(key))) 332 + binary.BigEndian.PutUint32(header[4:], uint32(len(blockHeaderBytes))) 333 + 334 + _, err = w.Write(header) 335 + if err != nil { 336 + return 0, stacktrace.Propagate(err) 337 + } 338 + 339 + _, err = w.Write(key) 340 + if err != nil { 341 + return 0, stacktrace.Propagate(err) 342 + } 343 + 344 + _, err = w.Write(blockHeaderBytes) 345 + if err != nil { 346 + return 0, stacktrace.Propagate(err) 347 + } 348 + 349 + numExportedBlockHeaders++ 350 + } 351 + 352 + return numExportedBlockHeaders, nil 353 + } 354 + 355 + func exportNodes(it *iavl.ImmutableTree, w io.Writer) (int64, error) { 356 + exporter, err := it.Export() 357 + if err != nil { 358 + return 0, stacktrace.Propagate(err) 359 + } 360 + defer exporter.Close() 361 + cexporter := iavl.NewCompressExporter(exporter) 362 + 363 + // 1 byte for node height 364 + // 8 bytes for node version 365 + // 4 bytes for node key length (0xffffffff if node key is nil) 366 + // 4 bytes for node value length (0xffffffff if node value is nil) 367 + nodeHeaderBuffer := make([]byte, 1+8+4+4) 368 + 369 + numNodes := int64(0) 370 + for { 371 + node, err := cexporter.Next() 372 + if errors.Is(err, iavl.ErrorExportDone) { 373 + break 374 + } 375 + if err != nil { 376 + return 0, stacktrace.Propagate(err) 377 + } 378 + 379 + nodeHeaderBuffer[0] = byte(node.Height) 380 + 381 + binary.BigEndian.PutUint64(nodeHeaderBuffer[1:], uint64(node.Version)) 382 + 383 + // nil node values are different from 0-byte values 384 + binary.BigEndian.PutUint32(nodeHeaderBuffer[9:13], lo.Ternary(node.Key != nil, uint32(len(node.Key)), math.MaxUint32)) 385 + binary.BigEndian.PutUint32(nodeHeaderBuffer[13:17], lo.Ternary(node.Value != nil, uint32(len(node.Value)), math.MaxUint32)) 386 + 387 + _, err = w.Write(nodeHeaderBuffer) 388 + if err != nil { 389 + return 0, stacktrace.Propagate(err) 390 + } 391 + 392 + _, err = w.Write(node.Key) 393 + if err != nil { 394 + return 0, stacktrace.Propagate(err) 395 + } 396 + 397 + _, err = w.Write(node.Value) 398 + if err != nil { 399 + return 0, stacktrace.Propagate(err) 400 + } 401 + numNodes++ 402 + } 403 + 404 + return numNodes, nil 405 + } 406 + 407 + func writeChunkHashes(snapshotFile io.ReadSeeker, w io.Writer) error { 408 + bw := bufio.NewWriter(w) 409 + defer bw.Flush() 410 + 411 + _, err := snapshotFile.Seek(0, io.SeekStart) 412 + if err != nil { 413 + return stacktrace.Propagate(err) 414 + } 415 + 416 + buf := make([]byte, SnapshotChunkSize) 417 + for { 418 + n, err := io.ReadFull(snapshotFile, buf) 419 + if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { 420 + return stacktrace.Propagate(err) 421 + } 422 + if n == 0 { 423 + break 424 + } 425 + 426 + hash := sha256.Sum256(buf[:n]) 427 + c, err := w.Write(hash[:]) 428 + if err != nil { 429 + return stacktrace.Propagate(err) 430 + } 431 + 432 + if c != SnapshotChunkHashSize { 433 + return stacktrace.NewError("unexpected chunk hash size") 434 + } 435 + 436 + if n < SnapshotChunkSize { 437 + break 438 + } 439 + } 440 + 441 + return nil 442 + } 443 + 444 + // SnapshotApplier handles applying snapshot chunks 445 + type SnapshotApplier struct { 446 + writeIndex transaction.WriteIndex 447 + tree *iavl.MutableTree 448 + treeVersion int64 449 + expectedFinalHash []byte 450 + expectedChunkHashes [][]byte 451 + 452 + pipeWriter *io.PipeWriter 453 + pipeReader *io.PipeReader 454 + zstdReader io.ReadCloser 455 + 456 + importer *iavl.Importer 457 + compressImporter iavl.NodeImporter 458 + importerWg sync.WaitGroup 459 + 460 + numImportedNodes int 461 + claimedNodeCount int 462 + numImportedIndexEntries int 463 + claimedIndexEntryCount int 464 + done bool 465 + } 466 + 467 + func (a *SnapshotApplier) initApplier() { 468 + pipeReader, pipeWriter := io.Pipe() 469 + 470 + zstdReader, err := zstd.NewReader(pipeReader) 471 + if err != nil { 472 + _ = pipeReader.Close() 473 + _ = pipeWriter.Close() 474 + return 475 + } 476 + 477 + a.pipeWriter = pipeWriter 478 + a.pipeReader = pipeReader 479 + a.zstdReader = zstdReader.IOReadCloser() 480 + 481 + a.importerWg.Go(a.streamingImporter) 482 + } 483 + 484 + func (a *SnapshotApplier) Apply(chunkIndex int, chunkBytes []byte) error { 485 + if len(chunkBytes) > SnapshotChunkSize { 486 + return stacktrace.Propagate(ErrMalformedChunk, "chunk too large") 487 + } 488 + hash := sha256.Sum256(chunkBytes) 489 + if !bytes.Equal(a.expectedChunkHashes[chunkIndex], hash[:]) { 490 + return stacktrace.Propagate(ErrMalformedChunk, "hash mismatch") 491 + } 492 + 493 + if chunkIndex == 0 { 494 + if len(chunkBytes) < 88 { 495 + return stacktrace.Propagate(ErrMalformedChunk, "chunk too small") 496 + } 497 + 498 + if string(chunkBytes[0:18]) != SnapshotFileMagic { 499 + return stacktrace.Propagate(ErrMalformedChunk, "invalid file magic") 500 + } 501 + 502 + if binary.BigEndian.Uint32(chunkBytes[20:]) != SnapshotFormatVersion { 503 + return stacktrace.Propagate(ErrMalformedChunk, "invalid snapshot format") 504 + } 505 + 506 + if binary.BigEndian.Uint64(chunkBytes[24:]) != uint64(a.treeVersion) { 507 + return stacktrace.Propagate(ErrMalformedChunk, "mismatched tree version") 508 + } 509 + 510 + if !bytes.Equal(chunkBytes[32:64], a.expectedFinalHash) { 511 + return stacktrace.Propagate(ErrMalformedChunk, "mismatched declared tree hash") 512 + } 513 + 514 + declaredFileSize := 88 + binary.BigEndian.Uint64(chunkBytes[64:]) 515 + minExpectedSize := uint64((len(a.expectedChunkHashes) - 1) * SnapshotChunkSize) 516 + maxExpectedSize := uint64(len(a.expectedChunkHashes) * SnapshotChunkSize) 517 + if declaredFileSize < minExpectedSize || 518 + declaredFileSize > maxExpectedSize { 519 + return stacktrace.Propagate(ErrMalformedChunk, "unexpected compressed section length") 520 + } 521 + 522 + a.claimedIndexEntryCount = int(binary.BigEndian.Uint64(chunkBytes[72:])) 523 + a.claimedNodeCount = int(binary.BigEndian.Uint64(chunkBytes[80:])) 524 + 525 + // move to the start of the compressed portion 526 + chunkBytes = chunkBytes[88:] 527 + 528 + a.initApplier() 529 + } 530 + 531 + _, err := a.pipeWriter.Write(chunkBytes) 532 + if err != nil { 533 + return stacktrace.Propagate(err) 534 + } 535 + 536 + isLastChunk := chunkIndex == len(a.expectedChunkHashes)-1 537 + if isLastChunk { 538 + _ = a.pipeWriter.Close() 539 + // wait for importer to finish reading and importing everything 540 + a.importerWg.Wait() 541 + 542 + if a.numImportedIndexEntries != a.claimedIndexEntryCount { 543 + return stacktrace.Propagate(ErrIndexEntryCountMismatch, "imported index entry count mismatch") 544 + } 545 + 546 + if a.numImportedNodes != a.claimedNodeCount { 547 + return stacktrace.Propagate(ErrTreeHashMismatch, "imported node count mismatch") 548 + } 549 + 550 + err := a.writeIndex.Commit() 551 + if err != nil { 552 + return stacktrace.Propagate(err) 553 + } 554 + 555 + err = a.importer.Commit() 556 + if err != nil { 557 + if strings.Contains(err.Error(), "invalid node structure") { 558 + return stacktrace.Propagate(errors.Join(ErrMalformedChunk, err)) 559 + } 560 + return stacktrace.Propagate(err) 561 + } 562 + 563 + err = a.closeCommons() 564 + if err != nil { 565 + return stacktrace.Propagate(err) 566 + } 567 + a.done = true 568 + 569 + if !bytes.Equal(a.tree.Hash(), a.expectedFinalHash) { 570 + return stacktrace.Propagate(ErrTreeHashMismatch) 571 + } 572 + } 573 + 574 + return nil 575 + } 576 + 577 + func (a *SnapshotApplier) streamingImporter() { 578 + for { 579 + if a.numImportedIndexEntries < a.claimedIndexEntryCount { 580 + entryHeader := make([]byte, 4+4) 581 + n, err := io.ReadFull(a.zstdReader, entryHeader) 582 + if err != nil || n != 8 { 583 + // err may be EOF here, which is expected 584 + return 585 + } 586 + 587 + // validate lengths against sensible limits to prevent OOM DoS by malicious third parties 588 + keyLength := binary.BigEndian.Uint32(entryHeader[0:4]) 589 + valueLength := binary.BigEndian.Uint32(entryHeader[4:8]) 590 + if keyLength > 1024*1024 || valueLength > 1024*1024 { 591 + return 592 + } 593 + 594 + key := make([]byte, keyLength) 595 + 596 + n, err = io.ReadFull(a.zstdReader, key) 597 + if err != nil || n != len(key) { 598 + // this shouldn't happen unless the data is corrupt 599 + // we can return silently here because since we didn't import all nodes, the tree hash won't match anyway 600 + return 601 + } 602 + 603 + value := make([]byte, valueLength) 604 + n, err = io.ReadFull(a.zstdReader, value) 605 + if err != nil || n != len(value) { 606 + return 607 + } 608 + 609 + err = a.writeIndex.IndexDB().Set(key, value) 610 + if err != nil { 611 + // we can return silently here because since we didn't import all nodes, the tree hash won't match anyway 612 + return 613 + } 614 + 615 + a.numImportedIndexEntries++ 616 + } else { 617 + nodeHeader := make([]byte, 9+4+4) 618 + n, err := io.ReadFull(a.zstdReader, nodeHeader) 619 + if err != nil || n != 9+4+4 { 620 + // err may be EOF here, which is expected 621 + return 622 + } 623 + 624 + // validate lengths against sensible limits to prevent OOM DoS by malicious third parties 625 + keyLength := binary.BigEndian.Uint32(nodeHeader[9:13]) 626 + var key []byte 627 + if keyLength != 0xffffffff { 628 + if keyLength > 1024*1024 { 629 + return 630 + } 631 + key = make([]byte, keyLength) 632 + 633 + n, err = io.ReadFull(a.zstdReader, key) 634 + if err != nil || n != len(key) { 635 + // this shouldn't happen unless the data is corrupt 636 + // we can return silently here because since we didn't import all nodes, the tree hash won't match anyway 637 + return 638 + } 639 + } 640 + 641 + valueLength := binary.BigEndian.Uint32(nodeHeader[13:17]) 642 + var value []byte 643 + if valueLength != 0xffffffff { 644 + if valueLength > 1024*1024 { 645 + return 646 + } 647 + value = make([]byte, valueLength) 648 + n, err = io.ReadFull(a.zstdReader, value) 649 + if err != nil || n != len(value) { 650 + return 651 + } 652 + } 653 + 654 + err = a.compressImporter.Add(&iavl.ExportNode{ 655 + Height: int8(nodeHeader[0]), 656 + Version: int64(binary.BigEndian.Uint64(nodeHeader[1:9])), 657 + Key: key, 658 + Value: value, 659 + }) 660 + if err != nil { 661 + // this shouldn't happen unless the data is corrupt 662 + // we can return silently here because since we didn't import all nodes, the tree hash won't match anyway 663 + return 664 + } 665 + a.numImportedNodes++ 666 + } 667 + } 668 + } 669 + 670 + // Abort cancels the snapshot application 671 + func (a *SnapshotApplier) Abort() error { 672 + err := a.closeCommons() 673 + if err != nil { 674 + return stacktrace.Propagate(err) 675 + } 676 + 677 + if a.tree != nil { 678 + err = a.tree.DeleteVersionsFrom(0) 679 + if err != nil { 680 + return stacktrace.Propagate(err) 681 + } 682 + } 683 + 684 + return nil 685 + } 686 + 687 + func (a *SnapshotApplier) closeCommons() error { 688 + if a.zstdReader != nil { 689 + err := a.zstdReader.Close() 690 + if err != nil { 691 + return stacktrace.Propagate(err) 692 + } 693 + } 694 + 695 + if a.pipeReader != nil { 696 + err := a.pipeReader.Close() 697 + if err != nil { 698 + return stacktrace.Propagate(err) 699 + } 700 + } 701 + 702 + if a.pipeWriter != nil { 703 + err := a.pipeWriter.Close() 704 + if err != nil { 705 + return stacktrace.Propagate(err) 706 + } 707 + } 708 + 709 + if a.writeIndex != nil { 710 + err := a.writeIndex.Rollback() 711 + if err != nil { 712 + return stacktrace.Propagate(err) 713 + } 714 + } 715 + 716 + if a.importerWg != (sync.WaitGroup{}) { 717 + a.importerWg.Wait() 718 + } 719 + 720 + a.importer.Close() 721 + 722 + return nil 723 + } 724 + 725 + // Done returns true if the snapshot has been fully applied 726 + func (a *SnapshotApplier) Done() bool { 727 + return a.done 728 + } 729 + 730 + // ReadSnapshotMetadata reads metadata from a snapshot file 731 + func (s *SnapshotStore) ReadSnapshotMetadata(filename string) (height uint64, format uint32, chunks uint32, hash []byte, chunksumsData []byte, err error) { 732 + // Extract height from filename pattern: %020d.snapshot 733 + base := filepath.Base(filename) 734 + if !strings.HasSuffix(base, ".snapshot") { 735 + return 0, 0, 0, nil, nil, stacktrace.NewError("invalid snapshot filename format: %s", filename) 736 + } 737 + heightStr := strings.TrimSuffix(base, ".snapshot") 738 + height, err = strconv.ParseUint(heightStr, 10, 64) 739 + if err != nil { 740 + return 0, 0, 0, nil, nil, stacktrace.Propagate(err, "failed to parse height from filename: %s", filename) 741 + } 742 + 743 + // Open and read snapshot file header 744 + f, err := os.Open(filename) 745 + if err != nil { 746 + return 0, 0, 0, nil, nil, stacktrace.Propagate(err, "failed to open snapshot file") 747 + } 748 + defer f.Close() 749 + 750 + // Read file magic (18 bytes) 751 + magic := make([]byte, 18) 752 + _, err = io.ReadFull(f, magic) 753 + if err != nil { 754 + return 0, 0, 0, nil, nil, stacktrace.Propagate(err, "failed to read file magic") 755 + } 756 + if string(magic) != SnapshotFileMagic { 757 + return 0, 0, 0, nil, nil, stacktrace.NewError("invalid file magic") 758 + } 759 + 760 + // Read version bytes (6 bytes) 761 + versionBytes := make([]byte, 6) 762 + _, err = io.ReadFull(f, versionBytes) 763 + if err != nil { 764 + return 0, 0, 0, nil, nil, stacktrace.Propagate(err, "failed to read version bytes") 765 + } 766 + format = binary.BigEndian.Uint32(versionBytes[2:]) 767 + 768 + // Read height (8 bytes, big-endian) 769 + heightBytes := make([]byte, 8) 770 + _, err = io.ReadFull(f, heightBytes) 771 + if err != nil { 772 + return 0, 0, 0, nil, nil, stacktrace.Propagate(err, "failed to read height") 773 + } 774 + fileHeight := int64(binary.BigEndian.Uint64(heightBytes)) 775 + if fileHeight != int64(height) { 776 + return 0, 0, 0, nil, nil, stacktrace.NewError("height mismatch") 777 + } 778 + 779 + // Read tree hash (32 bytes) 780 + hash = make([]byte, 32) 781 + _, err = io.ReadFull(f, hash) 782 + if err != nil { 783 + return 0, 0, 0, nil, nil, stacktrace.Propagate(err, "failed to read tree hash") 784 + } 785 + 786 + // Read corresponding chunksums file 787 + chunksumsFilename := strings.TrimSuffix(filename, ".snapshot") + ".chunksums" 788 + chunksumsData, err = os.ReadFile(chunksumsFilename) 789 + if err != nil { 790 + return 0, 0, 0, nil, nil, stacktrace.Propagate(err, "failed to read chunksums file") 791 + } 792 + 793 + // Calculate number of chunks 794 + chunks = uint32(len(chunksumsData) / SnapshotChunkHashSize) 795 + 796 + return height, format, chunks, hash, chunksumsData, nil 797 + } 798 + 799 + // LoadSnapshotChunk reads a chunk from a snapshot file at the given height and chunk index 800 + func (s *SnapshotStore) LoadSnapshotChunk(snapshotDirectory string, height uint64, chunkIndex int) ([]byte, error) { 801 + snapshotFilename := filepath.Join(snapshotDirectory, fmt.Sprintf("%020d.snapshot", height)) 802 + 803 + // Open the snapshot file 804 + f, err := os.Open(snapshotFilename) 805 + if err != nil { 806 + return nil, stacktrace.Propagate(err, "failed to open snapshot file: %s", snapshotFilename) 807 + } 808 + defer f.Close() 809 + 810 + // Calculate the offset for the requested chunk (start from beginning of file, including header) 811 + offset := int64(chunkIndex) * int64(SnapshotChunkSize) 812 + _, err = f.Seek(offset, io.SeekStart) 813 + if err != nil { 814 + return nil, stacktrace.Propagate(err, "failed to seek to chunk offset") 815 + } 816 + 817 + // Read up to SnapshotChunkSize bytes 818 + chunkData := make([]byte, SnapshotChunkSize) 819 + n, err := f.Read(chunkData) 820 + if err != nil && err != io.EOF { 821 + return nil, stacktrace.Propagate(err, "failed to read chunk data") 822 + } 823 + 824 + // If we read less than SnapshotChunkSize, trim the slice 825 + if n < SnapshotChunkSize { 826 + chunkData = chunkData[:n] 827 + } 828 + 829 + return chunkData, nil 830 + }
+254
store/snapshot_test.go
··· 1 + package store_test 2 + 3 + import ( 4 + "bytes" 5 + "errors" 6 + "fmt" 7 + "io" 8 + "testing" 9 + "time" 10 + 11 + cmttypes "github.com/cometbft/cometbft/types" 12 + "github.com/stretchr/testify/require" 13 + "tangled.org/gbl08ma.com/didplcbft/store" 14 + "tangled.org/gbl08ma.com/didplcbft/testutil" 15 + ) 16 + 17 + func TestEmptySnapshot(t *testing.T) { 18 + srcTxFactory, _, _ := testutil.NewTestTxFactory(t) 19 + 20 + writeTx, err := srcTxFactory.ReadWorking(time.Now()).Upgrade() 21 + require.NoError(t, err) 22 + 23 + err = writeTx.Commit() 24 + require.NoError(t, err) 25 + 26 + blockHeaderGetter := func(height int64) (cmttypes.Header, error) { 27 + require.Fail(t, "should not get any block headers") 28 + return cmttypes.Header{}, nil 29 + } 30 + 31 + readTx := srcTxFactory.ReadCommitted() 32 + 33 + buf := NewFileBuffer(nil) 34 + err = store.Snapshot.Export(buf, readTx, blockHeaderGetter, 10, 0) 35 + require.Error(t, err) 36 + require.Contains(t, err.Error(), "unexpectedly missing index entries for validator voting participation") 37 + } 38 + 39 + func TestAlmostEmptySnapshot(t *testing.T) { 40 + srcTxFactory, _, _ := testutil.NewTestTxFactory(t) 41 + 42 + writeTx, err := srcTxFactory.ReadWorking(time.Now()).Upgrade() 43 + require.NoError(t, err) 44 + 45 + const epochSize = 10 46 + 47 + store.Consensus.ConfigureEpochSize(epochSize) 48 + 49 + err = store.Consensus.InitializeValidatorVotingActivity(writeTx, make([]byte, store.AddressLength), make([]byte, store.PublicKeyLength), 0) 50 + require.NoError(t, err) 51 + 52 + err = writeTx.Commit() 53 + require.NoError(t, err) 54 + 55 + treeVersion := writeTx.Tree().Version() 56 + treeHash := writeTx.Tree().Hash() 57 + 58 + blockHeaderGetter := func(height int64) (cmttypes.Header, error) { 59 + require.Fail(t, "should not get any block headers") 60 + return cmttypes.Header{}, nil 61 + } 62 + 63 + readTx := srcTxFactory.ReadCommitted() 64 + 65 + buf := NewFileBuffer(nil) 66 + err = store.Snapshot.Export(buf, readTx, blockHeaderGetter, epochSize, 0) 67 + require.NoError(t, err) 68 + 69 + chunkHashesBuf := new(bytes.Buffer) 70 + 71 + err = store.Snapshot.WriteChunkHashes(buf, chunkHashesBuf) 72 + require.NoError(t, err) 73 + 74 + dstTxFactory, _, _ := testutil.NewTestTxFactory(t) 75 + 76 + dstTx, err := dstTxFactory.ReadWorking(time.Now()).Upgrade() 77 + require.NoError(t, err) 78 + 79 + applier, err := store.Snapshot.CreateSnapshotApplier(dstTx.Tree(), dstTxFactory, treeVersion, treeHash, [][]byte{chunkHashesBuf.Bytes()}) 80 + require.NoError(t, err) 81 + 82 + err = applier.Apply(0, buf.Bytes()) 83 + require.NoError(t, err) 84 + 85 + require.True(t, applier.Done()) 86 + 87 + err = dstTx.Commit() 88 + require.NoError(t, err) 89 + } 90 + 91 + func TestSnapshot(t *testing.T) { 92 + srcTxFactory, _, _ := testutil.NewTestTxFactory(t) 93 + 94 + writeTx, err := srcTxFactory.ReadWorking(time.Now()).Upgrade() 95 + require.NoError(t, err) 96 + 97 + const epochSize = 10 98 + 99 + store.Consensus.ConfigureEpochSize(epochSize) 100 + 101 + valAddr := make([]byte, store.AddressLength) 102 + copy(valAddr, []byte("valAddr")) 103 + 104 + valPubKey := make([]byte, store.PublicKeyLength) 105 + copy(valPubKey, []byte("valPubKey")) 106 + 107 + authoritativePLC := "https://authoritative-plc.local" 108 + 109 + err = store.Consensus.InitializeValidatorVotingActivity(writeTx, valAddr, valPubKey, 0) 110 + require.NoError(t, err) 111 + 112 + err = store.Consensus.MarkValidatorVote(writeTx, valAddr, 3) 113 + require.NoError(t, err) 114 + 115 + err = store.Consensus.SetAuthoritativePLC(writeTx, "to throw away") 116 + require.NoError(t, err) 117 + 118 + err = writeTx.Commit() 119 + require.NoError(t, err) 120 + 121 + writeTx, err = srcTxFactory.ReadWorking(time.Now()).Upgrade() 122 + require.NoError(t, err) 123 + 124 + err = store.Consensus.SetAuthoritativePLC(writeTx, authoritativePLC) 125 + require.NoError(t, err) 126 + 127 + err = store.Consensus.ChangeValidatorReputation(writeTx, valPubKey, 100) 128 + require.NoError(t, err) 129 + 130 + err = writeTx.Commit() 131 + require.NoError(t, err) 132 + 133 + treeVersion := writeTx.Tree().Version() 134 + treeHash := writeTx.Tree().Hash() 135 + 136 + blockHeaderGetter := func(height int64) (cmttypes.Header, error) { 137 + return cmttypes.Header{ 138 + LastCommitHash: []byte("abcdef"), 139 + Time: time.Now(), 140 + }, nil 141 + } 142 + 143 + readTx := srcTxFactory.ReadCommitted() 144 + 145 + buf := NewFileBuffer(nil) 146 + err = store.Snapshot.Export(buf, readTx, blockHeaderGetter, epochSize, 1) 147 + require.NoError(t, err) 148 + 149 + chunkHashesBuf := new(bytes.Buffer) 150 + 151 + err = store.Snapshot.WriteChunkHashes(buf, chunkHashesBuf) 152 + require.NoError(t, err) 153 + 154 + dstTxFactory, _, _ := testutil.NewTestTxFactory(t) 155 + 156 + dstTx, err := dstTxFactory.ReadWorking(time.Now()).Upgrade() 157 + require.NoError(t, err) 158 + 159 + applier, err := store.Snapshot.CreateSnapshotApplier(dstTx.Tree(), dstTxFactory, treeVersion, treeHash, [][]byte{chunkHashesBuf.Bytes()}) 160 + require.NoError(t, err) 161 + 162 + err = applier.Apply(0, buf.Bytes()) 163 + require.NoError(t, err) 164 + 165 + require.True(t, applier.Done()) 166 + 167 + readTx = dstTxFactory.ReadCommitted() 168 + 169 + aplc, err := store.Consensus.AuthoritativePLC(readTx) 170 + require.NoError(t, err) 171 + require.Equal(t, authoritativePLC, aplc) 172 + 173 + valReputation, err := store.Consensus.ValidatorReputation(readTx, valPubKey) 174 + require.NoError(t, err) 175 + require.Equal(t, uint64(100), valReputation) 176 + 177 + require.Equal(t, treeVersion, readTx.Height()) 178 + 179 + numVotes := 0 180 + for activeValidator := range store.Consensus.ActiveValidatorsIterator(readTx, 0, &err) { 181 + numVotes += int(activeValidator.VoteCount) 182 + } 183 + require.NoError(t, err) 184 + require.Equal(t, 1, numVotes) 185 + } 186 + 187 + // Via https://stackoverflow.com/a/73679110 : 188 + 189 + // Implements io.ReadWriteSeeker for testing purposes. 190 + type FileBuffer struct { 191 + buffer []byte 192 + offset int64 193 + } 194 + 195 + // Creates new buffer that implements io.ReadWriteSeeker for testing purposes. 196 + func NewFileBuffer(initial []byte) *FileBuffer { 197 + if initial == nil { 198 + initial = make([]byte, 0, 100) 199 + } 200 + return &FileBuffer{ 201 + buffer: initial, 202 + offset: 0, 203 + } 204 + } 205 + 206 + func (fb *FileBuffer) Bytes() []byte { 207 + return fb.buffer 208 + } 209 + 210 + func (fb *FileBuffer) Len() int { 211 + return len(fb.buffer) 212 + } 213 + 214 + func (fb *FileBuffer) Read(b []byte) (int, error) { 215 + available := len(fb.buffer) - int(fb.offset) 216 + if available == 0 { 217 + return 0, io.EOF 218 + } 219 + size := len(b) 220 + if size > available { 221 + size = available 222 + } 223 + copy(b, fb.buffer[fb.offset:fb.offset+int64(size)]) 224 + fb.offset += int64(size) 225 + return size, nil 226 + } 227 + 228 + func (fb *FileBuffer) Write(b []byte) (int, error) { 229 + copied := copy(fb.buffer[fb.offset:], b) 230 + if copied < len(b) { 231 + fb.buffer = append(fb.buffer, b[copied:]...) 232 + } 233 + fb.offset += int64(len(b)) 234 + return len(b), nil 235 + } 236 + 237 + func (fb *FileBuffer) Seek(offset int64, whence int) (int64, error) { 238 + var newOffset int64 239 + switch whence { 240 + case io.SeekStart: 241 + newOffset = offset 242 + case io.SeekCurrent: 243 + newOffset = fb.offset + offset 244 + case io.SeekEnd: 245 + newOffset = int64(len(fb.buffer)) + offset 246 + default: 247 + return 0, errors.New("Unknown Seek Method") 248 + } 249 + if newOffset > int64(len(fb.buffer)) || newOffset < 0 { 250 + return 0, fmt.Errorf("Invalid Offset %d", offset) 251 + } 252 + fb.offset = newOffset 253 + return newOffset, nil 254 + }
+6 -6
transaction/iavl_adapter.go
··· 14 14 IterateRange(start, end []byte, ascending bool, fn func(key []byte, value []byte) bool) (stopped bool) 15 15 } 16 16 17 - type UnifiedTree interface { 18 - ReadTree 19 - Set(key, value []byte) (bool, error) 20 - Remove(key []byte) ([]byte, bool, error) 21 - } 22 - 23 17 type immutableToReadOnlyTree struct { 24 18 tree *iavl.ImmutableTree 25 19 } ··· 56 50 func (m *immutableToReadOnlyTree) Iterator(start, end []byte, ascending bool) (corestore.Iterator, error) { 57 51 return m.tree.Iterator(start, end, ascending) 58 52 } 53 + 54 + // UnderlyingImmutableTree is only meant to be used in very specific scenarios 55 + func UnderlyingImmutableTree(tree ReadTree) (*iavl.ImmutableTree, bool) { 56 + t, ok := tree.(*immutableToReadOnlyTree) 57 + return t.tree, ok 58 + }
+2 -1
transaction/interface.go
··· 4 4 "time" 5 5 6 6 dbm "github.com/cometbft/cometbft-db" 7 + "github.com/cosmos/iavl" 7 8 "github.com/dgraph-io/badger/v4" 8 9 ) 9 10 ··· 25 26 Timestamp() time.Time 26 27 27 28 NextSequence() (uint64, error) 28 - Tree() UnifiedTree 29 + Tree() *iavl.MutableTree 29 30 IndexDB() IndexWriter 30 31 TestDIDBloomFilter(did []byte) bool 31 32 // sequence is only used to track how up-to-date the bloom filter is. it is not added to the bloom filter
+2 -1
transaction/write_tx.go
··· 3 3 import ( 4 4 "time" 5 5 6 + "github.com/cosmos/iavl" 6 7 "github.com/gbl08ma/stacktrace" 7 8 ) 8 9 ··· 74 75 } 75 76 76 77 // Tree implements [Write]. 77 - func (w *writeTx) Tree() UnifiedTree { 78 + func (w *writeTx) Tree() *iavl.MutableTree { 78 79 return w.readTx.mutableTree 79 80 } 80 81