A very experimental PLC implementation which uses BFT consensus for decentralization

Update snapshot logic to include index

The index sadly can't be rebuilt from just the tree nodes, mainly because of the validFrom-validTo logic. If we ever confirm that historical queries (at a specific tree height) are not worth supporting, we can revisit this.

gbl08ma.com 16db2c56 6554bd55

verified
+165 -61
+156 -55
abciapp/snapshots.go
··· 17 "sync" 18 "time" 19 20 abcitypes "github.com/cometbft/cometbft/abci/types" 21 "github.com/cosmos/iavl" 22 "github.com/klauspost/compress/zstd" 23 "github.com/palantir/stacktrace" 24 ) 25 26 const snapshotChunkSize = 10 * 1024 * 1024 // 10 MB ··· 124 125 // LoadSnapshotChunk implements [types.Application]. 126 func (d *DIDPLCApplication) LoadSnapshotChunk(_ context.Context, req *abcitypes.RequestLoadSnapshotChunk) (*abcitypes.ResponseLoadSnapshotChunk, error) { 127 - if req.Format != 1 { 128 // just in case CometBFT asks us to load a chunk of a format we didn't declare to support in ListSnapshots... 129 return nil, stacktrace.NewError("unsupported snapshot format") 130 } ··· 204 } 205 } 206 207 - if req.Snapshot.Format != 1 { 208 return &abcitypes.ResponseOfferSnapshot{ 209 Result: abcitypes.ResponseOfferSnapshot_REJECT_FORMAT, 210 }, nil ··· 244 245 st := time.Now() 246 247 - err = writeSnapshot(f, it) 248 if err != nil { 249 return stacktrace.Propagate(err, "") 250 } ··· 282 return nil 283 } 284 285 - func writeSnapshot(writerSeeker io.WriteSeeker, it *iavl.ImmutableTree) error { 286 writtenUntilReservedFields := 0 287 288 bw := bufio.NewWriter(writerSeeker) ··· 294 } 295 writtenUntilReservedFields += c 296 297 - c, err = bw.Write([]byte{0, 0, 0, 0, 0, 1}) 298 if err != nil { 299 return stacktrace.Propagate(err, "") 300 } ··· 314 } 315 writtenUntilReservedFields += c 316 317 - // reserve space for writing number of bytes, number of nodes 318 - // 8 bytes for node list size in bytes 319 - // 8 bytes for number of nodes 320 - sizeOfReservedFields := 8 + 8 321 b = make([]byte, sizeOfReservedFields) 322 _, err = bw.Write(b) 323 if err != nil { ··· 325 } 326 327 zstdw, err := zstd.NewWriter(bw, zstd.WithEncoderLevel(zstd.SpeedBetterCompression)) 328 if err != nil { 329 return stacktrace.Propagate(err, "") 330 } ··· 344 return stacktrace.Propagate(err, "") 345 } 346 347 - // find total compressed node list file size 348 offset, err := writerSeeker.Seek(0, io.SeekCurrent) 349 if err != nil { 350 return stacktrace.Propagate(err, "") 351 } 352 - compressedNodeListSize := offset - int64(writtenUntilReservedFields) - int64(sizeOfReservedFields) 353 354 // seek back and write empty header fields 355 ··· 362 } 363 364 b = make([]byte, sizeOfReservedFields) 365 - binary.BigEndian.PutUint64(b, uint64(compressedNodeListSize)) 366 - binary.BigEndian.PutUint64(b[8:], uint64(numNodes)) 367 _, err = writerSeeker.Write(b) 368 if err != nil { 369 return stacktrace.Propagate(err, "") ··· 372 return nil 373 } 374 375 func exportNodes(it *iavl.ImmutableTree, w io.Writer) (int64, error) { 376 exporter, err := it.Export() 377 if err != nil { ··· 471 } 472 473 type snapshotApplier struct { 474 tree *iavl.MutableTree 475 treeVersion int64 476 expectedFinalHash []byte ··· 484 compressImporter iavl.NodeImporter 485 importerWg sync.WaitGroup 486 487 - numImportedNodes int 488 - claimedNodeCount int 489 - done bool 490 } 491 492 var errMalformedChunk = errors.New("malformed chunk") ··· 523 } 524 525 return &snapshotApplier{ 526 tree: d.tree, 527 treeVersion: treeVersion, 528 expectedFinalHash: expectedFinalHash, ··· 547 } 548 549 if chunkIndex == 0 { 550 - if len(chunkBytes) < 80 { 551 return stacktrace.Propagate(errMalformedChunk, "chunk too small") 552 } 553 ··· 555 return stacktrace.Propagate(errMalformedChunk, "invalid file magic") 556 } 557 558 - if binary.BigEndian.Uint32(chunkBytes[20:]) != 1 { 559 return stacktrace.Propagate(errMalformedChunk, "invalid snapshot format") 560 } 561 ··· 567 return stacktrace.Propagate(errMalformedChunk, "mismatched declared tree hash") 568 } 569 570 - declaredFileSize := 80 + binary.BigEndian.Uint64(chunkBytes[64:]) 571 minExpectedSize := uint64((len(a.expectedChunkHashes) - 1) * snapshotChunkSize) 572 maxExpectedSize := uint64(len(a.expectedChunkHashes) * snapshotChunkSize) 573 if declaredFileSize < minExpectedSize || 574 declaredFileSize > maxExpectedSize { 575 - return stacktrace.Propagate(errMalformedChunk, "unexpected compressed node list length") 576 } 577 578 - a.claimedNodeCount = int(binary.BigEndian.Uint64(chunkBytes[72:])) 579 580 // move to the start of the compressed portion 581 - chunkBytes = chunkBytes[80:] 582 583 a.importerWg.Go(a.streamingImporter) 584 } ··· 602 // wait for importer to finish reading and importing everything 603 a.importerWg.Wait() 604 605 if a.numImportedNodes != a.claimedNodeCount { 606 return stacktrace.Propagate(errTreeHashMismatch, "imported node count mismatch") 607 } 608 609 - err := a.importer.Commit() 610 if err != nil { 611 if strings.Contains(err.Error(), "invalid node structure") { 612 return stacktrace.Propagate(errors.Join(errMalformedChunk, err), "") ··· 627 628 func (a *snapshotApplier) streamingImporter() { 629 for { 630 - nodeHeader := make([]byte, 9+4+4) 631 - n, err := io.ReadFull(a.zstdReader, nodeHeader) 632 - if err != nil || n != 9+4+4 { 633 - // err may be EOF here, which is expected 634 - return 635 - } 636 637 - // validate lengths against sensible limits to prevent OOM DoS by malicious third parties 638 - keyLength := binary.BigEndian.Uint32(nodeHeader[9:13]) 639 - var key []byte 640 - if keyLength != 0xffffffff { 641 - if keyLength > 1024*1024 { 642 return 643 } 644 - key = make([]byte, keyLength) 645 646 n, err = io.ReadFull(a.zstdReader, key) 647 if err != nil || n != len(key) { ··· 649 // we can return silently here because since we didn't import all nodes, the tree hash won't match anyway 650 return 651 } 652 - } 653 654 - valueLength := binary.BigEndian.Uint32(nodeHeader[13:17]) 655 - var value []byte 656 - if valueLength != 0xffffffff { 657 - if valueLength > 1024*1024 { 658 return 659 } 660 - value = make([]byte, valueLength) 661 - n, err = io.ReadFull(a.zstdReader, value) 662 - if err != nil || n != len(value) { 663 return 664 } 665 - } 666 667 - err = a.compressImporter.Add(&iavl.ExportNode{ 668 - Height: int8(nodeHeader[0]), 669 - Version: int64(binary.BigEndian.Uint64(nodeHeader[1:9])), 670 - Key: key, 671 - Value: value, 672 - }) 673 - if err != nil { 674 - // this shouldn't happen unless the data is corrupt 675 - // we can return silently here because since we didn't import all nodes, the tree hash won't match anyway 676 - return 677 } 678 - a.numImportedNodes++ 679 } 680 } 681 ··· 705 } 706 707 err = a.pipeWriter.Close() 708 if err != nil { 709 return stacktrace.Propagate(err, "") 710 }
··· 17 "sync" 18 "time" 19 20 + dbm "github.com/cometbft/cometbft-db" 21 abcitypes "github.com/cometbft/cometbft/abci/types" 22 "github.com/cosmos/iavl" 23 "github.com/klauspost/compress/zstd" 24 "github.com/palantir/stacktrace" 25 + "tangled.org/gbl08ma.com/didplcbft/store" 26 ) 27 28 const snapshotChunkSize = 10 * 1024 * 1024 // 10 MB ··· 126 127 // LoadSnapshotChunk implements [types.Application]. 128 func (d *DIDPLCApplication) LoadSnapshotChunk(_ context.Context, req *abcitypes.RequestLoadSnapshotChunk) (*abcitypes.ResponseLoadSnapshotChunk, error) { 129 + if req.Format != 2 { 130 // just in case CometBFT asks us to load a chunk of a format we didn't declare to support in ListSnapshots... 131 return nil, stacktrace.NewError("unsupported snapshot format") 132 } ··· 206 } 207 } 208 209 + if req.Snapshot.Format != 2 { 210 return &abcitypes.ResponseOfferSnapshot{ 211 Result: abcitypes.ResponseOfferSnapshot_REJECT_FORMAT, 212 }, nil ··· 246 247 st := time.Now() 248 249 + err = writeSnapshot(f, d.indexDB, it) 250 if err != nil { 251 return stacktrace.Propagate(err, "") 252 } ··· 284 return nil 285 } 286 287 + func writeSnapshot(writerSeeker io.WriteSeeker, indexDB dbm.DB, it *iavl.ImmutableTree) error { 288 writtenUntilReservedFields := 0 289 290 bw := bufio.NewWriter(writerSeeker) ··· 296 } 297 writtenUntilReservedFields += c 298 299 + c, err = bw.Write([]byte{0, 0, 0, 0, 0, 2}) 300 if err != nil { 301 return stacktrace.Propagate(err, "") 302 } ··· 316 } 317 writtenUntilReservedFields += c 318 319 + // reserve space for writing: 320 + // - 8 bytes for compressed section size in bytes 321 + // - 8 bytes for number of index entries 322 + // - 8 bytes for number of nodes 323 + sizeOfReservedFields := 8 * 3 324 b = make([]byte, sizeOfReservedFields) 325 _, err = bw.Write(b) 326 if err != nil { ··· 328 } 329 330 zstdw, err := zstd.NewWriter(bw, zstd.WithEncoderLevel(zstd.SpeedBetterCompression)) 331 + if err != nil { 332 + return stacktrace.Propagate(err, "") 333 + } 334 + 335 + numIndexEntries, err := exportIndexEntries(indexDB, it.Version(), zstdw) 336 if err != nil { 337 return stacktrace.Propagate(err, "") 338 } ··· 352 return stacktrace.Propagate(err, "") 353 } 354 355 + // find total compressed section size 356 offset, err := writerSeeker.Seek(0, io.SeekCurrent) 357 if err != nil { 358 return stacktrace.Propagate(err, "") 359 } 360 + compressedSectionSize := offset - int64(writtenUntilReservedFields) - int64(sizeOfReservedFields) 361 362 // seek back and write empty header fields 363 ··· 370 } 371 372 b = make([]byte, sizeOfReservedFields) 373 + binary.BigEndian.PutUint64(b, uint64(compressedSectionSize)) 374 + binary.BigEndian.PutUint64(b[8:], uint64(numIndexEntries)) 375 + binary.BigEndian.PutUint64(b[16:], uint64(numNodes)) 376 _, err = writerSeeker.Write(b) 377 if err != nil { 378 return stacktrace.Propagate(err, "") ··· 381 return nil 382 } 383 384 + func exportIndexEntries(indexDB dbm.DB, treeVersion int64, w io.Writer) (int64, error) { 385 + didLogKeyStart := make([]byte, store.DIDLogKeySize) 386 + didLogKeyStart[0] = store.DIDLogKeyPrefix 387 + didLogKeyEnd := slices.Repeat([]byte{0xff}, store.DIDLogKeySize) 388 + didLogKeyEnd[0] = store.DIDLogKeyPrefix 389 + 390 + iterator, err := indexDB.Iterator(didLogKeyStart, didLogKeyEnd) 391 + if err != nil { 392 + return 0, stacktrace.Propagate(err, "") 393 + } 394 + defer iterator.Close() 395 + 396 + numEntries := int64(0) 397 + for iterator.Valid() { 398 + key := iterator.Key() 399 + value := iterator.Value() 400 + 401 + validFromHeight, validToHeight := store.UnmarshalDIDLogValue(value) 402 + if uint64(treeVersion) >= validFromHeight && uint64(treeVersion) <= validToHeight { 403 + header := make([]byte, 4+4) 404 + binary.BigEndian.PutUint32(header, uint32(len(key))) 405 + binary.BigEndian.PutUint32(header[4:], uint32(len(value))) 406 + w.Write(header) 407 + w.Write(key) 408 + w.Write(value) 409 + 410 + numEntries++ 411 + } 412 + 413 + iterator.Next() 414 + } 415 + return numEntries, nil 416 + } 417 + 418 func exportNodes(it *iavl.ImmutableTree, w io.Writer) (int64, error) { 419 exporter, err := it.Export() 420 if err != nil { ··· 514 } 515 516 type snapshotApplier struct { 517 + indexBatch dbm.Batch 518 tree *iavl.MutableTree 519 treeVersion int64 520 expectedFinalHash []byte ··· 528 compressImporter iavl.NodeImporter 529 importerWg sync.WaitGroup 530 531 + numImportedNodes int 532 + claimedNodeCount int 533 + numImportedIndexEntries int 534 + claimedIndexEntryCount int 535 + done bool 536 } 537 538 var errMalformedChunk = errors.New("malformed chunk") ··· 569 } 570 571 return &snapshotApplier{ 572 + indexBatch: d.indexDB.NewBatch(), 573 tree: d.tree, 574 treeVersion: treeVersion, 575 expectedFinalHash: expectedFinalHash, ··· 594 } 595 596 if chunkIndex == 0 { 597 + if len(chunkBytes) < 88 { 598 return stacktrace.Propagate(errMalformedChunk, "chunk too small") 599 } 600 ··· 602 return stacktrace.Propagate(errMalformedChunk, "invalid file magic") 603 } 604 605 + if binary.BigEndian.Uint32(chunkBytes[20:]) != 2 { 606 return stacktrace.Propagate(errMalformedChunk, "invalid snapshot format") 607 } 608 ··· 614 return stacktrace.Propagate(errMalformedChunk, "mismatched declared tree hash") 615 } 616 617 + declaredFileSize := 88 + binary.BigEndian.Uint64(chunkBytes[64:]) 618 minExpectedSize := uint64((len(a.expectedChunkHashes) - 1) * snapshotChunkSize) 619 maxExpectedSize := uint64(len(a.expectedChunkHashes) * snapshotChunkSize) 620 if declaredFileSize < minExpectedSize || 621 declaredFileSize > maxExpectedSize { 622 + return stacktrace.Propagate(errMalformedChunk, "unexpected compressed section length") 623 } 624 625 + a.claimedIndexEntryCount = int(binary.BigEndian.Uint64(chunkBytes[72:])) 626 + a.claimedNodeCount = int(binary.BigEndian.Uint64(chunkBytes[80:])) 627 628 // move to the start of the compressed portion 629 + chunkBytes = chunkBytes[88:] 630 631 a.importerWg.Go(a.streamingImporter) 632 } ··· 650 // wait for importer to finish reading and importing everything 651 a.importerWg.Wait() 652 653 + if a.numImportedIndexEntries != a.claimedIndexEntryCount { 654 + return stacktrace.Propagate(errTreeHashMismatch, "imported index entry count mismatch") 655 + } 656 + 657 if a.numImportedNodes != a.claimedNodeCount { 658 return stacktrace.Propagate(errTreeHashMismatch, "imported node count mismatch") 659 } 660 661 + err := a.indexBatch.Write() 662 + if err != nil { 663 + return stacktrace.Propagate(err, "") 664 + } 665 + 666 + err = a.importer.Commit() 667 if err != nil { 668 if strings.Contains(err.Error(), "invalid node structure") { 669 return stacktrace.Propagate(errors.Join(errMalformedChunk, err), "") ··· 684 685 func (a *snapshotApplier) streamingImporter() { 686 for { 687 + if a.numImportedIndexEntries < a.claimedIndexEntryCount { 688 + entryHeader := make([]byte, 4+4) 689 + n, err := io.ReadFull(a.zstdReader, entryHeader) 690 + if err != nil || n != 8 { 691 + // err may be EOF here, which is expected 692 + return 693 + } 694 695 + // validate lengths against sensible limits to prevent OOM DoS by malicious third parties 696 + keyLength := binary.BigEndian.Uint32(entryHeader[0:4]) 697 + valueLength := binary.BigEndian.Uint32(entryHeader[4:8]) 698 + if keyLength > 1024*1024 || valueLength > 1024*1024 { 699 return 700 } 701 + 702 + key := make([]byte, keyLength) 703 704 n, err = io.ReadFull(a.zstdReader, key) 705 if err != nil || n != len(key) { ··· 707 // we can return silently here because since we didn't import all nodes, the tree hash won't match anyway 708 return 709 } 710 + 711 + value := make([]byte, valueLength) 712 + n, err = io.ReadFull(a.zstdReader, value) 713 + if err != nil || n != len(value) { 714 + return 715 + } 716 717 + err = a.indexBatch.Set(key, value) 718 + if err != nil { 719 + // we can return silently here because since we didn't import all nodes, the tree hash won't match anyway 720 return 721 } 722 + 723 + a.numImportedIndexEntries++ 724 + } else { 725 + nodeHeader := make([]byte, 9+4+4) 726 + n, err := io.ReadFull(a.zstdReader, nodeHeader) 727 + if err != nil || n != 9+4+4 { 728 + // err may be EOF here, which is expected 729 return 730 } 731 + 732 + // validate lengths against sensible limits to prevent OOM DoS by malicious third parties 733 + keyLength := binary.BigEndian.Uint32(nodeHeader[9:13]) 734 + var key []byte 735 + if keyLength != 0xffffffff { 736 + if keyLength > 1024*1024 { 737 + return 738 + } 739 + key = make([]byte, keyLength) 740 741 + n, err = io.ReadFull(a.zstdReader, key) 742 + if err != nil || n != len(key) { 743 + // this shouldn't happen unless the data is corrupt 744 + // we can return silently here because since we didn't import all nodes, the tree hash won't match anyway 745 + return 746 + } 747 + } 748 + 749 + valueLength := binary.BigEndian.Uint32(nodeHeader[13:17]) 750 + var value []byte 751 + if valueLength != 0xffffffff { 752 + if valueLength > 1024*1024 { 753 + return 754 + } 755 + value = make([]byte, valueLength) 756 + n, err = io.ReadFull(a.zstdReader, value) 757 + if err != nil || n != len(value) { 758 + return 759 + } 760 + } 761 + 762 + err = a.compressImporter.Add(&iavl.ExportNode{ 763 + Height: int8(nodeHeader[0]), 764 + Version: int64(binary.BigEndian.Uint64(nodeHeader[1:9])), 765 + Key: key, 766 + Value: value, 767 + }) 768 + if err != nil { 769 + // this shouldn't happen unless the data is corrupt 770 + // we can return silently here because since we didn't import all nodes, the tree hash won't match anyway 771 + return 772 + } 773 + a.numImportedNodes++ 774 } 775 } 776 } 777 ··· 801 } 802 803 err = a.pipeWriter.Close() 804 + if err != nil { 805 + return stacktrace.Propagate(err, "") 806 + } 807 + 808 + err = a.indexBatch.Close() 809 if err != nil { 810 return stacktrace.Propagate(err, "") 811 }
+9 -6
store/tree.go
··· 73 } 74 75 sequence := unmarshalDIDLogSequence(didLogIterator.Key()) 76 - validFromHeight, validToHeight := unmarshalDIDLogValue(didLogIterator.Value()) 77 78 opKey := marshalOperationKey(sequence) 79 ··· 146 } 147 148 sequence := unmarshalDIDLogSequence(didLogIterator.Key()) 149 - validFromHeight, validToHeight := unmarshalDIDLogValue(didLogIterator.Value()) 150 151 opKey := marshalOperationKey(sequence) 152 ··· 245 } 246 247 sequence := unmarshalDIDLogSequence(didLogIterator.Key()) 248 - validFromHeight, validToHeight := unmarshalDIDLogValue(didLogIterator.Value()) 249 250 opKey := marshalOperationKey(sequence) 251 ··· 390 return did, nil 391 } 392 393 func marshalDIDLogKey(didBytes []byte, sequence uint64) []byte { 394 - key := make([]byte, 1+15+8) 395 - key[0] = 'l' 396 copy(key[1:16], didBytes) 397 binary.BigEndian.PutUint64(key[16:], sequence) 398 return key ··· 410 return value 411 } 412 413 - func unmarshalDIDLogValue(value []byte) (validFromHeight, validToHeight uint64) { 414 validFromHeight = binary.BigEndian.Uint64(value[0:8]) 415 validToHeight = binary.BigEndian.Uint64(value[8:16]) 416 return
··· 73 } 74 75 sequence := unmarshalDIDLogSequence(didLogIterator.Key()) 76 + validFromHeight, validToHeight := UnmarshalDIDLogValue(didLogIterator.Value()) 77 78 opKey := marshalOperationKey(sequence) 79 ··· 146 } 147 148 sequence := unmarshalDIDLogSequence(didLogIterator.Key()) 149 + validFromHeight, validToHeight := UnmarshalDIDLogValue(didLogIterator.Value()) 150 151 opKey := marshalOperationKey(sequence) 152 ··· 245 } 246 247 sequence := unmarshalDIDLogSequence(didLogIterator.Key()) 248 + validFromHeight, validToHeight := UnmarshalDIDLogValue(didLogIterator.Value()) 249 250 opKey := marshalOperationKey(sequence) 251 ··· 390 return did, nil 391 } 392 393 + const DIDLogKeySize = 1 + 15 + 8 394 + const DIDLogKeyPrefix = 'l' 395 + 396 func marshalDIDLogKey(didBytes []byte, sequence uint64) []byte { 397 + key := make([]byte, DIDLogKeySize) 398 + key[0] = DIDLogKeyPrefix 399 copy(key[1:16], didBytes) 400 binary.BigEndian.PutUint64(key[16:], sequence) 401 return key ··· 413 return value 414 } 415 416 + func UnmarshalDIDLogValue(value []byte) (validFromHeight, validToHeight uint64) { 417 validFromHeight = binary.BigEndian.Uint64(value[0:8]) 418 validToHeight = binary.BigEndian.Uint64(value[8:16]) 419 return