A very experimental PLC implementation which uses BFT consensus for decentralization
1package store_test
2
3import (
4 "bytes"
5 "errors"
6 "fmt"
7 "io"
8 "testing"
9 "time"
10
11 cmttypes "github.com/cometbft/cometbft/types"
12 "github.com/stretchr/testify/require"
13 "tangled.org/gbl08ma.com/didplcbft/store"
14 "tangled.org/gbl08ma.com/didplcbft/testutil"
15)
16
17func TestEmptySnapshot(t *testing.T) {
18 srcTxFactory, _, _ := testutil.NewTestTxFactory(t)
19
20 writeTx, err := srcTxFactory.ReadWorking(time.Now()).Upgrade()
21 require.NoError(t, err)
22
23 err = writeTx.Commit()
24 require.NoError(t, err)
25
26 blockHeaderGetter := func(height int64) (cmttypes.Header, error) {
27 require.Fail(t, "should not get any block headers")
28 return cmttypes.Header{}, nil
29 }
30
31 readTx := srcTxFactory.ReadCommitted()
32
33 buf := NewFileBuffer(nil)
34 err = store.Snapshot.Export(buf, readTx, blockHeaderGetter, 10, 0)
35 require.Error(t, err)
36 require.Contains(t, err.Error(), "unexpectedly missing index entries for validator voting participation")
37}
38
39func TestAlmostEmptySnapshot(t *testing.T) {
40 srcTxFactory, _, _ := testutil.NewTestTxFactory(t)
41
42 writeTx, err := srcTxFactory.ReadWorking(time.Now()).Upgrade()
43 require.NoError(t, err)
44
45 const epochSize = 10
46
47 store.Consensus.ConfigureEpochSize(epochSize)
48
49 err = store.Consensus.InitializeValidatorVotingActivity(writeTx, make([]byte, store.AddressLength), make([]byte, store.PublicKeyLength), 0)
50 require.NoError(t, err)
51
52 err = writeTx.Commit()
53 require.NoError(t, err)
54
55 treeVersion := writeTx.Tree().Version()
56 treeHash := writeTx.Tree().Hash()
57
58 blockHeaderGetter := func(height int64) (cmttypes.Header, error) {
59 require.Fail(t, "should not get any block headers")
60 return cmttypes.Header{}, nil
61 }
62
63 readTx := srcTxFactory.ReadCommitted()
64
65 buf := NewFileBuffer(nil)
66 err = store.Snapshot.Export(buf, readTx, blockHeaderGetter, epochSize, 0)
67 require.NoError(t, err)
68
69 chunkHashesBuf := new(bytes.Buffer)
70
71 err = store.Snapshot.WriteChunkHashes(buf, chunkHashesBuf)
72 require.NoError(t, err)
73
74 dstTxFactory, _, _ := testutil.NewTestTxFactory(t)
75
76 dstTx, err := dstTxFactory.ReadWorking(time.Now()).Upgrade()
77 require.NoError(t, err)
78
79 applier, err := store.Snapshot.CreateSnapshotApplier(dstTx.Tree(), dstTxFactory, treeVersion, treeHash, [][]byte{chunkHashesBuf.Bytes()})
80 require.NoError(t, err)
81
82 err = applier.Apply(0, buf.Bytes())
83 require.NoError(t, err)
84
85 require.True(t, applier.Done())
86
87 err = dstTx.Commit()
88 require.NoError(t, err)
89}
90
91func TestSnapshot(t *testing.T) {
92 srcTxFactory, _, _ := testutil.NewTestTxFactory(t)
93
94 writeTx, err := srcTxFactory.ReadWorking(time.Now()).Upgrade()
95 require.NoError(t, err)
96
97 const epochSize = 10
98
99 store.Consensus.ConfigureEpochSize(epochSize)
100
101 valAddr := make([]byte, store.AddressLength)
102 copy(valAddr, []byte("valAddr"))
103
104 valPubKey := make([]byte, store.PublicKeyLength)
105 copy(valPubKey, []byte("valPubKey"))
106
107 authoritativePLC := "https://authoritative-plc.local"
108
109 err = store.Consensus.InitializeValidatorVotingActivity(writeTx, valAddr, valPubKey, 0)
110 require.NoError(t, err)
111
112 err = store.Consensus.MarkValidatorVote(writeTx, valAddr, 3)
113 require.NoError(t, err)
114
115 err = store.Consensus.SetAuthoritativePLC(writeTx, "to throw away")
116 require.NoError(t, err)
117
118 err = writeTx.Commit()
119 require.NoError(t, err)
120
121 writeTx, err = srcTxFactory.ReadWorking(time.Now()).Upgrade()
122 require.NoError(t, err)
123
124 err = store.Consensus.SetAuthoritativePLC(writeTx, authoritativePLC)
125 require.NoError(t, err)
126
127 err = store.Consensus.ChangeValidatorReputation(writeTx, valPubKey, func(reputation uint64) (uint64, error) {
128 return reputation + 100, nil
129 })
130 require.NoError(t, err)
131
132 err = writeTx.Commit()
133 require.NoError(t, err)
134
135 treeVersion := writeTx.Tree().Version()
136 treeHash := writeTx.Tree().Hash()
137
138 blockHeaderGetter := func(height int64) (cmttypes.Header, error) {
139 return cmttypes.Header{
140 LastCommitHash: []byte("abcdef"),
141 Time: time.Now(),
142 }, nil
143 }
144
145 readTx := srcTxFactory.ReadCommitted()
146
147 buf := NewFileBuffer(nil)
148 err = store.Snapshot.Export(buf, readTx, blockHeaderGetter, epochSize, 1)
149 require.NoError(t, err)
150
151 chunkHashesBuf := new(bytes.Buffer)
152
153 err = store.Snapshot.WriteChunkHashes(buf, chunkHashesBuf)
154 require.NoError(t, err)
155
156 dstTxFactory, _, _ := testutil.NewTestTxFactory(t)
157
158 dstTx, err := dstTxFactory.ReadWorking(time.Now()).Upgrade()
159 require.NoError(t, err)
160
161 applier, err := store.Snapshot.CreateSnapshotApplier(dstTx.Tree(), dstTxFactory, treeVersion, treeHash, [][]byte{chunkHashesBuf.Bytes()})
162 require.NoError(t, err)
163
164 err = applier.Apply(0, buf.Bytes())
165 require.NoError(t, err)
166
167 require.True(t, applier.Done())
168
169 readTx = dstTxFactory.ReadCommitted()
170
171 aplc, err := store.Consensus.AuthoritativePLC(readTx)
172 require.NoError(t, err)
173 require.Equal(t, authoritativePLC, aplc)
174
175 valReputation, err := store.Consensus.ValidatorReputation(readTx, valPubKey)
176 require.NoError(t, err)
177 require.Equal(t, uint64(100), valReputation)
178
179 require.Equal(t, treeVersion, readTx.Height())
180
181 numVotes := 0
182 for activeValidator := range store.Consensus.ActiveValidatorsIterator(readTx, 0, &err) {
183 numVotes += int(activeValidator.VoteCount)
184 }
185 require.NoError(t, err)
186 require.Equal(t, 1, numVotes)
187}
188
189// Via https://stackoverflow.com/a/73679110 :
190
191// Implements io.ReadWriteSeeker for testing purposes.
192type FileBuffer struct {
193 buffer []byte
194 offset int64
195}
196
197// Creates new buffer that implements io.ReadWriteSeeker for testing purposes.
198func NewFileBuffer(initial []byte) *FileBuffer {
199 if initial == nil {
200 initial = make([]byte, 0, 100)
201 }
202 return &FileBuffer{
203 buffer: initial,
204 offset: 0,
205 }
206}
207
208func (fb *FileBuffer) Bytes() []byte {
209 return fb.buffer
210}
211
212func (fb *FileBuffer) Len() int {
213 return len(fb.buffer)
214}
215
216func (fb *FileBuffer) Read(b []byte) (int, error) {
217 available := len(fb.buffer) - int(fb.offset)
218 if available == 0 {
219 return 0, io.EOF
220 }
221 size := len(b)
222 if size > available {
223 size = available
224 }
225 copy(b, fb.buffer[fb.offset:fb.offset+int64(size)])
226 fb.offset += int64(size)
227 return size, nil
228}
229
230func (fb *FileBuffer) Write(b []byte) (int, error) {
231 copied := copy(fb.buffer[fb.offset:], b)
232 if copied < len(b) {
233 fb.buffer = append(fb.buffer, b[copied:]...)
234 }
235 fb.offset += int64(len(b))
236 return len(b), nil
237}
238
239func (fb *FileBuffer) Seek(offset int64, whence int) (int64, error) {
240 var newOffset int64
241 switch whence {
242 case io.SeekStart:
243 newOffset = offset
244 case io.SeekCurrent:
245 newOffset = fb.offset + offset
246 case io.SeekEnd:
247 newOffset = int64(len(fb.buffer)) + offset
248 default:
249 return 0, errors.New("Unknown Seek Method")
250 }
251 if newOffset > int64(len(fb.buffer)) || newOffset < 0 {
252 return 0, fmt.Errorf("Invalid Offset %d", offset)
253 }
254 fb.offset = newOffset
255 return newOffset, nil
256}