A very experimental PLC implementation which uses BFT consensus for decentralization
at main 256 lines 6.7 kB view raw
1package store_test 2 3import ( 4 "bytes" 5 "errors" 6 "fmt" 7 "io" 8 "testing" 9 "time" 10 11 cmttypes "github.com/cometbft/cometbft/types" 12 "github.com/stretchr/testify/require" 13 "tangled.org/gbl08ma.com/didplcbft/store" 14 "tangled.org/gbl08ma.com/didplcbft/testutil" 15) 16 17func TestEmptySnapshot(t *testing.T) { 18 srcTxFactory, _, _ := testutil.NewTestTxFactory(t) 19 20 writeTx, err := srcTxFactory.ReadWorking(time.Now()).Upgrade() 21 require.NoError(t, err) 22 23 err = writeTx.Commit() 24 require.NoError(t, err) 25 26 blockHeaderGetter := func(height int64) (cmttypes.Header, error) { 27 require.Fail(t, "should not get any block headers") 28 return cmttypes.Header{}, nil 29 } 30 31 readTx := srcTxFactory.ReadCommitted() 32 33 buf := NewFileBuffer(nil) 34 err = store.Snapshot.Export(buf, readTx, blockHeaderGetter, 10, 0) 35 require.Error(t, err) 36 require.Contains(t, err.Error(), "unexpectedly missing index entries for validator voting participation") 37} 38 39func TestAlmostEmptySnapshot(t *testing.T) { 40 srcTxFactory, _, _ := testutil.NewTestTxFactory(t) 41 42 writeTx, err := srcTxFactory.ReadWorking(time.Now()).Upgrade() 43 require.NoError(t, err) 44 45 const epochSize = 10 46 47 store.Consensus.ConfigureEpochSize(epochSize) 48 49 err = store.Consensus.InitializeValidatorVotingActivity(writeTx, make([]byte, store.AddressLength), make([]byte, store.PublicKeyLength), 0) 50 require.NoError(t, err) 51 52 err = writeTx.Commit() 53 require.NoError(t, err) 54 55 treeVersion := writeTx.Tree().Version() 56 treeHash := writeTx.Tree().Hash() 57 58 blockHeaderGetter := func(height int64) (cmttypes.Header, error) { 59 require.Fail(t, "should not get any block headers") 60 return cmttypes.Header{}, nil 61 } 62 63 readTx := srcTxFactory.ReadCommitted() 64 65 buf := NewFileBuffer(nil) 66 err = store.Snapshot.Export(buf, readTx, blockHeaderGetter, epochSize, 0) 67 require.NoError(t, err) 68 69 chunkHashesBuf := new(bytes.Buffer) 70 71 err = store.Snapshot.WriteChunkHashes(buf, chunkHashesBuf) 72 require.NoError(t, err) 73 74 dstTxFactory, _, _ := testutil.NewTestTxFactory(t) 75 76 dstTx, err := dstTxFactory.ReadWorking(time.Now()).Upgrade() 77 require.NoError(t, err) 78 79 applier, err := store.Snapshot.CreateSnapshotApplier(dstTx.Tree(), dstTxFactory, treeVersion, treeHash, [][]byte{chunkHashesBuf.Bytes()}) 80 require.NoError(t, err) 81 82 err = applier.Apply(0, buf.Bytes()) 83 require.NoError(t, err) 84 85 require.True(t, applier.Done()) 86 87 err = dstTx.Commit() 88 require.NoError(t, err) 89} 90 91func TestSnapshot(t *testing.T) { 92 srcTxFactory, _, _ := testutil.NewTestTxFactory(t) 93 94 writeTx, err := srcTxFactory.ReadWorking(time.Now()).Upgrade() 95 require.NoError(t, err) 96 97 const epochSize = 10 98 99 store.Consensus.ConfigureEpochSize(epochSize) 100 101 valAddr := make([]byte, store.AddressLength) 102 copy(valAddr, []byte("valAddr")) 103 104 valPubKey := make([]byte, store.PublicKeyLength) 105 copy(valPubKey, []byte("valPubKey")) 106 107 authoritativePLC := "https://authoritative-plc.local" 108 109 err = store.Consensus.InitializeValidatorVotingActivity(writeTx, valAddr, valPubKey, 0) 110 require.NoError(t, err) 111 112 err = store.Consensus.MarkValidatorVote(writeTx, valAddr, 3) 113 require.NoError(t, err) 114 115 err = store.Consensus.SetAuthoritativePLC(writeTx, "to throw away") 116 require.NoError(t, err) 117 118 err = writeTx.Commit() 119 require.NoError(t, err) 120 121 writeTx, err = srcTxFactory.ReadWorking(time.Now()).Upgrade() 122 require.NoError(t, err) 123 124 err = store.Consensus.SetAuthoritativePLC(writeTx, authoritativePLC) 125 require.NoError(t, err) 126 127 err = store.Consensus.ChangeValidatorReputation(writeTx, valPubKey, func(reputation uint64) (uint64, error) { 128 return reputation + 100, nil 129 }) 130 require.NoError(t, err) 131 132 err = writeTx.Commit() 133 require.NoError(t, err) 134 135 treeVersion := writeTx.Tree().Version() 136 treeHash := writeTx.Tree().Hash() 137 138 blockHeaderGetter := func(height int64) (cmttypes.Header, error) { 139 return cmttypes.Header{ 140 LastCommitHash: []byte("abcdef"), 141 Time: time.Now(), 142 }, nil 143 } 144 145 readTx := srcTxFactory.ReadCommitted() 146 147 buf := NewFileBuffer(nil) 148 err = store.Snapshot.Export(buf, readTx, blockHeaderGetter, epochSize, 1) 149 require.NoError(t, err) 150 151 chunkHashesBuf := new(bytes.Buffer) 152 153 err = store.Snapshot.WriteChunkHashes(buf, chunkHashesBuf) 154 require.NoError(t, err) 155 156 dstTxFactory, _, _ := testutil.NewTestTxFactory(t) 157 158 dstTx, err := dstTxFactory.ReadWorking(time.Now()).Upgrade() 159 require.NoError(t, err) 160 161 applier, err := store.Snapshot.CreateSnapshotApplier(dstTx.Tree(), dstTxFactory, treeVersion, treeHash, [][]byte{chunkHashesBuf.Bytes()}) 162 require.NoError(t, err) 163 164 err = applier.Apply(0, buf.Bytes()) 165 require.NoError(t, err) 166 167 require.True(t, applier.Done()) 168 169 readTx = dstTxFactory.ReadCommitted() 170 171 aplc, err := store.Consensus.AuthoritativePLC(readTx) 172 require.NoError(t, err) 173 require.Equal(t, authoritativePLC, aplc) 174 175 valReputation, err := store.Consensus.ValidatorReputation(readTx, valPubKey) 176 require.NoError(t, err) 177 require.Equal(t, uint64(100), valReputation) 178 179 require.Equal(t, treeVersion, readTx.Height()) 180 181 numVotes := 0 182 for activeValidator := range store.Consensus.ActiveValidatorsIterator(readTx, 0, &err) { 183 numVotes += int(activeValidator.VoteCount) 184 } 185 require.NoError(t, err) 186 require.Equal(t, 1, numVotes) 187} 188 189// Via https://stackoverflow.com/a/73679110 : 190 191// Implements io.ReadWriteSeeker for testing purposes. 192type FileBuffer struct { 193 buffer []byte 194 offset int64 195} 196 197// Creates new buffer that implements io.ReadWriteSeeker for testing purposes. 198func NewFileBuffer(initial []byte) *FileBuffer { 199 if initial == nil { 200 initial = make([]byte, 0, 100) 201 } 202 return &FileBuffer{ 203 buffer: initial, 204 offset: 0, 205 } 206} 207 208func (fb *FileBuffer) Bytes() []byte { 209 return fb.buffer 210} 211 212func (fb *FileBuffer) Len() int { 213 return len(fb.buffer) 214} 215 216func (fb *FileBuffer) Read(b []byte) (int, error) { 217 available := len(fb.buffer) - int(fb.offset) 218 if available == 0 { 219 return 0, io.EOF 220 } 221 size := len(b) 222 if size > available { 223 size = available 224 } 225 copy(b, fb.buffer[fb.offset:fb.offset+int64(size)]) 226 fb.offset += int64(size) 227 return size, nil 228} 229 230func (fb *FileBuffer) Write(b []byte) (int, error) { 231 copied := copy(fb.buffer[fb.offset:], b) 232 if copied < len(b) { 233 fb.buffer = append(fb.buffer, b[copied:]...) 234 } 235 fb.offset += int64(len(b)) 236 return len(b), nil 237} 238 239func (fb *FileBuffer) Seek(offset int64, whence int) (int64, error) { 240 var newOffset int64 241 switch whence { 242 case io.SeekStart: 243 newOffset = offset 244 case io.SeekCurrent: 245 newOffset = fb.offset + offset 246 case io.SeekEnd: 247 newOffset = int64(len(fb.buffer)) + offset 248 default: 249 return 0, errors.New("Unknown Seek Method") 250 } 251 if newOffset > int64(len(fb.buffer)) || newOffset < 0 { 252 return 0, fmt.Errorf("Invalid Offset %d", offset) 253 } 254 fb.offset = newOffset 255 return newOffset, nil 256}