tangled
alpha
login
or
join now
atscan.net
/
plcbundle-go
1
fork
atom
[DEPRECATED] Go implementation of plcbundle
1
fork
atom
overview
issues
pulls
pipelines
abstract zstd
tree.fail
4 months ago
53af3300
3acc6c5a
+331
-187
5 changed files
expand all
collapse all
unified
split
bundle
metadata.go
go.mod
go.sum
internal
storage
storage.go
zstd.go
+7
-3
bundle/metadata.go
···
7
7
"os"
8
8
"time"
9
9
10
10
-
gozstd "github.com/DataDog/zstd"
11
10
"tangled.org/atscan.net/plcbundle/internal/bundleindex"
12
11
"tangled.org/atscan.net/plcbundle/internal/plcclient"
12
12
+
"tangled.org/atscan.net/plcbundle/internal/storage"
13
13
)
14
14
15
15
// CalculateBundleMetadata calculates complete metadata for a bundle
···
131
131
}
132
132
defer file.Close()
133
133
134
134
-
reader := gozstd.NewReader(file)
135
135
-
defer reader.Close()
134
134
+
// ✅ Use abstracted reader from storage package
135
135
+
reader, err := storage.NewStreamingReader(file)
136
136
+
if err != nil {
137
137
+
return 0, 0, time.Time{}, time.Time{}, fmt.Errorf("failed to create reader: %w", err)
138
138
+
}
139
139
+
defer reader.Release()
136
140
137
141
scanner := bufio.NewScanner(reader)
138
142
buf := make([]byte, 64*1024)
+1
go.mod
···
11
11
12
12
require (
13
13
github.com/spf13/cobra v1.10.1
14
14
+
github.com/valyala/gozstd v1.23.2
14
15
golang.org/x/term v0.36.0
15
16
)
16
17
+2
go.sum
···
12
12
github.com/spf13/cobra v1.10.1/go.mod h1:7SmJGaTHFVBY0jW4NXGluQoLvhqFQM+6XSKD+P4XaB0=
13
13
github.com/spf13/pflag v1.0.9 h1:9exaQaMOCwffKiiiYk6/BndUBv+iRViNW+4lEMi0PvY=
14
14
github.com/spf13/pflag v1.0.9/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
15
15
+
github.com/valyala/gozstd v1.23.2 h1:S3rRsskaDvBCM2XJzQFYIDAO6txxmvTc1arA/9Wgi9o=
16
16
+
github.com/valyala/gozstd v1.23.2/go.mod h1:y5Ew47GLlP37EkTB+B4s7r6A5rdaeB7ftbl9zoYiIPQ=
15
17
golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ=
16
18
golang.org/x/sys v0.37.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
17
19
golang.org/x/term v0.36.0 h1:zMPR+aF8gfksFprF/Nc/rd1wRS1EI6nDBGyWAvDzx2Q=
+203
-184
internal/storage/storage.go
···
12
12
"sync"
13
13
"time"
14
14
15
15
-
gozstd "github.com/DataDog/zstd"
16
15
"github.com/goccy/go-json"
17
17
-
"tangled.org/atscan.net/plcbundle/internal/plcclient" // ONLY import plcclient, NOT bundle
16
16
+
"tangled.org/atscan.net/plcbundle/internal/plcclient"
18
17
)
19
18
20
19
// Operations handles low-level bundle file operations
···
84
83
}
85
84
86
85
// ========================================
87
87
-
// FILE OPERATIONS
86
86
+
// FILE OPERATIONS (using zstd abstraction)
88
87
// ========================================
89
88
90
90
-
// LoadBundle loads a compressed bundle
91
91
-
func (op *Operations) LoadBundle(path string) ([]plcclient.PLCOperation, error) {
92
92
-
// ✅ FIX: Use streaming reader instead of one-shot Decompress
93
93
-
file, err := os.Open(path)
94
94
-
if err != nil {
95
95
-
return nil, fmt.Errorf("failed to open file: %w", err)
96
96
-
}
97
97
-
defer file.Close()
98
98
-
99
99
-
// NewReader properly handles multi-frame concatenated zstd
100
100
-
reader := gozstd.NewReader(file)
101
101
-
defer reader.Close()
102
102
-
103
103
-
// Read ALL decompressed data
104
104
-
decompressed, err := io.ReadAll(reader)
105
105
-
if err != nil {
106
106
-
return nil, fmt.Errorf("failed to decompress: %w", err)
107
107
-
}
108
108
-
109
109
-
// Parse JSONL
110
110
-
return op.ParseJSONL(decompressed)
111
111
-
}
112
112
-
113
113
-
// SaveBundle saves operations to disk (compressed)
89
89
+
// SaveBundle saves operations to disk (compressed with multi-frame support)
114
90
func (op *Operations) SaveBundle(path string, operations []plcclient.PLCOperation) (string, string, int64, int64, error) {
115
115
-
// 1. Serialize all operations once to get a single, consistent content hash.
116
116
-
// This is critical for preserving chain hash integrity.
91
91
+
// 1. Serialize all operations once
117
92
jsonlData := op.SerializeJSONL(operations)
118
93
contentSize := int64(len(jsonlData))
119
94
contentHash := op.Hash(jsonlData)
120
95
121
121
-
// --- Correct Multi-Frame Streaming Logic ---
122
122
-
123
123
-
// 2. Create the destination file.
96
96
+
// 2. Create the destination file
124
97
bundleFile, err := os.Create(path)
125
98
if err != nil {
126
99
return "", "", 0, 0, fmt.Errorf("could not create bundle file: %w", err)
127
100
}
128
128
-
defer bundleFile.Close() // Ensure the file is closed on exit.
101
101
+
defer bundleFile.Close()
129
102
130
130
-
frameSize := 100 // Each frame will contain 100 operations.
131
131
-
frameOffsets := []int64{0} // The first frame always starts at offset 0.
103
103
+
frameOffsets := []int64{0}
132
104
133
133
-
// 3. Loop through operations in chunks.
134
134
-
for i := 0; i < len(operations); i += frameSize {
135
135
-
end := i + frameSize
105
105
+
// 3. Loop through operations in chunks
106
106
+
for i := 0; i < len(operations); i += FrameSize {
107
107
+
end := i + FrameSize
136
108
if end > len(operations) {
137
109
end = len(operations)
138
110
}
139
111
opChunk := operations[i:end]
140
112
chunkJsonlData := op.SerializeJSONL(opChunk)
141
113
142
142
-
// a. Create a NEW zstd writer FOR EACH CHUNK. This is the key.
143
143
-
zstdWriter := gozstd.NewWriter(bundleFile)
144
144
-
145
145
-
// b. Write the uncompressed chunk to the zstd writer.
146
146
-
_, err := zstdWriter.Write(chunkJsonlData)
114
114
+
// ✅ Use abstracted compression
115
115
+
compressedChunk, err := CompressFrame(chunkJsonlData)
147
116
if err != nil {
148
148
-
zstdWriter.Close() // Attempt to clean up
149
149
-
return "", "", 0, 0, fmt.Errorf("failed to write frame data: %w", err)
117
117
+
return "", "", 0, 0, fmt.Errorf("failed to compress frame: %w", err)
150
118
}
151
119
152
152
-
// c. Close the zstd writer. This finalizes the frame and flushes it
153
153
-
// to the underlying file. It does NOT close the bundleFile itself.
154
154
-
if err := zstdWriter.Close(); err != nil {
155
155
-
return "", "", 0, 0, fmt.Errorf("failed to close/finalize frame: %w", err)
120
120
+
// Write frame to file
121
121
+
_, err = bundleFile.Write(compressedChunk)
122
122
+
if err != nil {
123
123
+
return "", "", 0, 0, fmt.Errorf("failed to write frame: %w", err)
156
124
}
157
125
158
158
-
// d. After closing the frame, get the file's new total size.
126
126
+
// Get current offset for next frame
159
127
currentOffset, err := bundleFile.Seek(0, io.SeekCurrent)
160
128
if err != nil {
161
129
return "", "", 0, 0, fmt.Errorf("failed to get file offset: %w", err)
162
130
}
163
131
164
164
-
// e. Record this offset as the start of the next frame.
165
132
if end < len(operations) {
166
133
frameOffsets = append(frameOffsets, currentOffset)
167
134
}
168
135
}
169
136
170
170
-
// 4. Get the final total file size. This is the end of the last frame.
137
137
+
// 4. Get final file size
171
138
finalSize, _ := bundleFile.Seek(0, io.SeekCurrent)
172
139
frameOffsets = append(frameOffsets, finalSize)
173
140
174
174
-
// 5. Save the companion frame-offset index file.
141
141
+
// 5. Sync to disk
142
142
+
if err := bundleFile.Sync(); err != nil {
143
143
+
return "", "", 0, 0, fmt.Errorf("failed to sync file: %w", err)
144
144
+
}
145
145
+
146
146
+
// 6. Save frame index
175
147
indexPath := path + ".idx"
176
148
indexData, _ := json.Marshal(frameOffsets)
177
149
if err := os.WriteFile(indexPath, indexData, 0644); err != nil {
178
178
-
os.Remove(path) // Clean up to avoid inconsistent state.
150
150
+
os.Remove(path)
179
151
return "", "", 0, 0, fmt.Errorf("failed to write frame index: %w", err)
180
152
}
181
153
182
182
-
// 6. Re-read the full compressed file to get its final hash for the main index.
154
154
+
// 7. Calculate compressed hash
183
155
compressedData, err := os.ReadFile(path)
184
156
if err != nil {
185
157
return "", "", 0, 0, fmt.Errorf("failed to re-read bundle for hashing: %w", err)
···
189
161
return contentHash, compressedHash, contentSize, finalSize, nil
190
162
}
191
163
192
192
-
// Pool for scanner buffers
193
193
-
var scannerBufPool = sync.Pool{
194
194
-
New: func() interface{} {
195
195
-
buf := make([]byte, 64*1024)
196
196
-
return &buf
197
197
-
},
198
198
-
}
199
199
-
200
200
-
// LoadOperationAtPosition loads a single operation from a bundle
201
201
-
func (op *Operations) LoadOperationAtPosition(path string, position int) (*plcclient.PLCOperation, error) {
202
202
-
if position < 0 {
203
203
-
return nil, fmt.Errorf("invalid position: %d", position)
204
204
-
}
205
205
-
206
206
-
frameSize := 100 // Must match the frame size used in SaveBundle
207
207
-
indexPath := path + ".idx"
208
208
-
209
209
-
// 1. Load the frame offset index.
210
210
-
indexData, err := os.ReadFile(indexPath)
211
211
-
if err != nil {
212
212
-
// If the frame index doesn't exist, fall back to the legacy full-scan method.
213
213
-
// This ensures backward compatibility with your old bundle files during migration.
214
214
-
if os.IsNotExist(err) {
215
215
-
op.logger.Printf("DEBUG: Frame index not found for %s, falling back to legacy full scan.", filepath.Base(path))
216
216
-
return op.loadOperationAtPositionLegacy(path, position)
217
217
-
}
218
218
-
return nil, fmt.Errorf("could not read frame index %s: %w", indexPath, err)
219
219
-
}
220
220
-
221
221
-
var frameOffsets []int64
222
222
-
if err := json.Unmarshal(indexData, &frameOffsets); err != nil {
223
223
-
return nil, fmt.Errorf("could not parse frame index %s: %w", indexPath, err)
224
224
-
}
225
225
-
226
226
-
// 2. Calculate target frame and the line number within that frame.
227
227
-
frameIndex := position / frameSize
228
228
-
lineInFrame := position % frameSize
229
229
-
230
230
-
if frameIndex >= len(frameOffsets)-1 {
231
231
-
return nil, fmt.Errorf("position %d is out of bounds for bundle with %d frames", position, len(frameOffsets)-1)
232
232
-
}
233
233
-
234
234
-
// 3. Get frame boundaries from the index.
235
235
-
startOffset := frameOffsets[frameIndex]
236
236
-
endOffset := frameOffsets[frameIndex+1]
237
237
-
frameLength := endOffset - startOffset
238
238
-
239
239
-
if frameLength <= 0 {
240
240
-
return nil, fmt.Errorf("invalid frame length calculated for position %d", position)
241
241
-
}
242
242
-
243
243
-
// 4. Open the bundle file.
244
244
-
bundleFile, err := os.Open(path)
245
245
-
if err != nil {
246
246
-
return nil, err
247
247
-
}
248
248
-
defer bundleFile.Close()
249
249
-
250
250
-
// 5. Read ONLY the bytes for that single frame from the correct offset.
251
251
-
compressedFrame := make([]byte, frameLength)
252
252
-
_, err = bundleFile.ReadAt(compressedFrame, startOffset)
253
253
-
if err != nil {
254
254
-
return nil, fmt.Errorf("failed to read frame %d from bundle: %w", frameIndex, err)
255
255
-
}
256
256
-
257
257
-
// 6. Decompress just that small frame.
258
258
-
decompressed, err := gozstd.Decompress(nil, compressedFrame)
259
259
-
if err != nil {
260
260
-
return nil, fmt.Errorf("failed to decompress frame %d: %w", frameIndex, err)
261
261
-
}
262
262
-
263
263
-
// 7. Scan the ~100 lines to get the target operation.
264
264
-
scanner := bufio.NewScanner(bytes.NewReader(decompressed))
265
265
-
lineNum := 0
266
266
-
for scanner.Scan() {
267
267
-
if lineNum == lineInFrame {
268
268
-
line := scanner.Bytes()
269
269
-
var operation plcclient.PLCOperation
270
270
-
if err := json.UnmarshalNoEscape(line, &operation); err != nil {
271
271
-
return nil, fmt.Errorf("failed to parse operation at position %d: %w", position, err)
272
272
-
}
273
273
-
operation.RawJSON = make([]byte, len(line))
274
274
-
copy(operation.RawJSON, line)
275
275
-
return &operation, nil
276
276
-
}
277
277
-
lineNum++
278
278
-
}
279
279
-
280
280
-
if err := scanner.Err(); err != nil {
281
281
-
return nil, fmt.Errorf("scanner error on frame %d: %w", frameIndex, err)
282
282
-
}
283
283
-
284
284
-
return nil, fmt.Errorf("operation at position %d not found", position)
285
285
-
}
286
286
-
287
287
-
func (op *Operations) loadOperationAtPositionLegacy(path string, position int) (*plcclient.PLCOperation, error) {
164
164
+
// LoadBundle loads a compressed bundle
165
165
+
func (op *Operations) LoadBundle(path string) ([]plcclient.PLCOperation, error) {
288
166
file, err := os.Open(path)
289
167
if err != nil {
290
168
return nil, fmt.Errorf("failed to open file: %w", err)
291
169
}
292
170
defer file.Close()
293
171
294
294
-
reader := gozstd.NewReader(file)
295
295
-
defer reader.Close()
296
296
-
297
297
-
scanner := bufio.NewScanner(reader)
298
298
-
// Use a larger buffer for potentially large lines
299
299
-
buf := make([]byte, 512*1024)
300
300
-
scanner.Buffer(buf, 1024*1024)
301
301
-
302
302
-
lineNum := 0
303
303
-
for scanner.Scan() {
304
304
-
if lineNum == position {
305
305
-
line := scanner.Bytes()
306
306
-
var operation plcclient.PLCOperation
307
307
-
if err := json.UnmarshalNoEscape(line, &operation); err != nil {
308
308
-
return nil, fmt.Errorf("failed to parse legacy operation at position %d: %w", position, err)
309
309
-
}
310
310
-
operation.RawJSON = make([]byte, len(line))
311
311
-
copy(operation.RawJSON, line)
312
312
-
return &operation, nil
313
313
-
}
314
314
-
lineNum++
172
172
+
// ✅ Use abstracted streaming reader
173
173
+
reader, err := NewStreamingReader(file)
174
174
+
if err != nil {
175
175
+
return nil, fmt.Errorf("failed to create reader: %w", err)
315
176
}
177
177
+
defer reader.Release()
316
178
317
317
-
if err := scanner.Err(); err != nil {
318
318
-
return nil, fmt.Errorf("legacy scanner error: %w", err)
179
179
+
// Read all decompressed data from all frames
180
180
+
decompressed, err := io.ReadAll(reader)
181
181
+
if err != nil {
182
182
+
return nil, fmt.Errorf("failed to decompress: %w", err)
319
183
}
320
184
321
321
-
return nil, fmt.Errorf("position %d not found in legacy bundle", position)
185
185
+
// Parse JSONL
186
186
+
return op.ParseJSONL(decompressed)
322
187
}
323
188
324
189
// ========================================
···
341
206
return nil, fmt.Errorf("failed to open bundle: %w", err)
342
207
}
343
208
344
344
-
reader := gozstd.NewReader(file)
209
209
+
// ✅ Use abstracted reader
210
210
+
reader, err := NewStreamingReader(file)
211
211
+
if err != nil {
212
212
+
file.Close()
213
213
+
return nil, fmt.Errorf("failed to create reader: %w", err)
214
214
+
}
345
215
346
216
return &decompressedReader{
347
217
reader: reader,
···
351
221
352
222
// decompressedReader wraps a zstd decoder and underlying file
353
223
type decompressedReader struct {
354
354
-
reader io.ReadCloser
224
224
+
reader StreamReader
355
225
file *os.File
356
226
}
357
227
···
360
230
}
361
231
362
232
func (dr *decompressedReader) Close() error {
363
363
-
dr.reader.Close()
233
233
+
dr.reader.Release()
364
234
return dr.file.Close()
365
235
}
366
236
···
396
266
compressedHash = op.Hash(compressedData)
397
267
compressedSize = int64(len(compressedData))
398
268
399
399
-
decompressed, err := gozstd.Decompress(nil, compressedData)
269
269
+
// ✅ Use abstracted decompression
270
270
+
decompressed, err := DecompressAll(compressedData)
400
271
if err != nil {
401
272
return "", 0, "", 0, fmt.Errorf("failed to decompress: %w", err)
402
273
}
···
505
376
return operations[startIdx:]
506
377
}
507
378
379
379
+
// Pool for scanner buffers
380
380
+
var scannerBufPool = sync.Pool{
381
381
+
New: func() interface{} {
382
382
+
buf := make([]byte, 64*1024)
383
383
+
return &buf
384
384
+
},
385
385
+
}
386
386
+
387
387
+
// ========================================
388
388
+
// POSITION-BASED LOADING (with frame index)
389
389
+
// ========================================
390
390
+
391
391
+
// LoadOperationAtPosition loads a single operation from a bundle
392
392
+
func (op *Operations) LoadOperationAtPosition(path string, position int) (*plcclient.PLCOperation, error) {
393
393
+
if position < 0 {
394
394
+
return nil, fmt.Errorf("invalid position: %d", position)
395
395
+
}
396
396
+
397
397
+
indexPath := path + ".idx"
398
398
+
399
399
+
// 1. Try to load frame index
400
400
+
indexData, err := os.ReadFile(indexPath)
401
401
+
if err != nil {
402
402
+
if os.IsNotExist(err) {
403
403
+
// Fallback to legacy full scan
404
404
+
if op.logger != nil {
405
405
+
op.logger.Printf("Frame index not found for %s, using legacy scan", filepath.Base(path))
406
406
+
}
407
407
+
return op.loadOperationAtPositionLegacy(path, position)
408
408
+
}
409
409
+
return nil, fmt.Errorf("could not read frame index: %w", err)
410
410
+
}
411
411
+
412
412
+
var frameOffsets []int64
413
413
+
if err := json.Unmarshal(indexData, &frameOffsets); err != nil {
414
414
+
return nil, fmt.Errorf("could not parse frame index: %w", err)
415
415
+
}
416
416
+
417
417
+
// 2. Calculate target frame
418
418
+
frameIndex := position / FrameSize
419
419
+
lineInFrame := position % FrameSize
420
420
+
421
421
+
if frameIndex >= len(frameOffsets)-1 {
422
422
+
return nil, fmt.Errorf("position %d out of bounds (frame %d, total frames %d)",
423
423
+
position, frameIndex, len(frameOffsets)-1)
424
424
+
}
425
425
+
426
426
+
// 3. Read the specific frame from file
427
427
+
startOffset := frameOffsets[frameIndex]
428
428
+
endOffset := frameOffsets[frameIndex+1]
429
429
+
frameLength := endOffset - startOffset
430
430
+
431
431
+
if frameLength <= 0 {
432
432
+
return nil, fmt.Errorf("invalid frame length: %d", frameLength)
433
433
+
}
434
434
+
435
435
+
bundleFile, err := os.Open(path)
436
436
+
if err != nil {
437
437
+
return nil, fmt.Errorf("failed to open bundle: %w", err)
438
438
+
}
439
439
+
defer bundleFile.Close()
440
440
+
441
441
+
compressedFrame := make([]byte, frameLength)
442
442
+
_, err = bundleFile.ReadAt(compressedFrame, startOffset)
443
443
+
if err != nil {
444
444
+
return nil, fmt.Errorf("failed to read frame %d: %w", frameIndex, err)
445
445
+
}
446
446
+
447
447
+
// 4. ✅ Decompress this single frame
448
448
+
decompressed, err := DecompressFrame(compressedFrame)
449
449
+
if err != nil {
450
450
+
return nil, fmt.Errorf("failed to decompress frame %d: %w", frameIndex, err)
451
451
+
}
452
452
+
453
453
+
// 5. Scan the decompressed data to find the target line
454
454
+
scanner := bufio.NewScanner(bytes.NewReader(decompressed))
455
455
+
lineNum := 0
456
456
+
457
457
+
for scanner.Scan() {
458
458
+
if lineNum == lineInFrame {
459
459
+
line := scanner.Bytes()
460
460
+
var operation plcclient.PLCOperation
461
461
+
if err := json.UnmarshalNoEscape(line, &operation); err != nil {
462
462
+
return nil, fmt.Errorf("failed to parse operation at position %d: %w", position, err)
463
463
+
}
464
464
+
operation.RawJSON = make([]byte, len(line))
465
465
+
copy(operation.RawJSON, line)
466
466
+
return &operation, nil
467
467
+
}
468
468
+
lineNum++
469
469
+
}
470
470
+
471
471
+
if err := scanner.Err(); err != nil {
472
472
+
return nil, fmt.Errorf("scanner error on frame %d: %w", frameIndex, err)
473
473
+
}
474
474
+
475
475
+
return nil, fmt.Errorf("position %d not found in frame %d", position, frameIndex)
476
476
+
}
477
477
+
478
478
+
// loadOperationAtPositionLegacy loads operation from old single-frame bundles
479
479
+
func (op *Operations) loadOperationAtPositionLegacy(path string, position int) (*plcclient.PLCOperation, error) {
480
480
+
file, err := os.Open(path)
481
481
+
if err != nil {
482
482
+
return nil, fmt.Errorf("failed to open file: %w", err)
483
483
+
}
484
484
+
defer file.Close()
485
485
+
486
486
+
// ✅ Use abstracted streaming reader
487
487
+
reader, err := NewStreamingReader(file)
488
488
+
if err != nil {
489
489
+
return nil, fmt.Errorf("failed to create reader: %w", err)
490
490
+
}
491
491
+
defer reader.Release()
492
492
+
493
493
+
scanner := bufio.NewScanner(reader)
494
494
+
buf := make([]byte, 512*1024)
495
495
+
scanner.Buffer(buf, 1024*1024)
496
496
+
497
497
+
lineNum := 0
498
498
+
for scanner.Scan() {
499
499
+
if lineNum == position {
500
500
+
line := scanner.Bytes()
501
501
+
var operation plcclient.PLCOperation
502
502
+
if err := json.UnmarshalNoEscape(line, &operation); err != nil {
503
503
+
return nil, fmt.Errorf("failed to parse operation at position %d: %w", position, err)
504
504
+
}
505
505
+
operation.RawJSON = make([]byte, len(line))
506
506
+
copy(operation.RawJSON, line)
507
507
+
return &operation, nil
508
508
+
}
509
509
+
lineNum++
510
510
+
}
511
511
+
512
512
+
if err := scanner.Err(); err != nil {
513
513
+
return nil, fmt.Errorf("scanner error: %w", err)
514
514
+
}
515
515
+
516
516
+
return nil, fmt.Errorf("position %d not found in bundle", position)
517
517
+
}
518
518
+
508
519
// LoadOperationsAtPositions loads multiple operations from a bundle in one pass
509
520
func (op *Operations) LoadOperationsAtPositions(path string, positions []int) (map[int]*plcclient.PLCOperation, error) {
510
521
if len(positions) == 0 {
···
530
541
}
531
542
defer file.Close()
532
543
533
533
-
reader := gozstd.NewReader(file)
534
534
-
defer reader.Close()
544
544
+
// ✅ Use abstracted streaming reader
545
545
+
reader, err := NewStreamingReader(file)
546
546
+
if err != nil {
547
547
+
return nil, fmt.Errorf("failed to create reader: %w", err)
548
548
+
}
549
549
+
defer reader.Release()
535
550
536
551
bufPtr := scannerBufPool.Get().(*[]byte)
537
552
defer scannerBufPool.Put(bufPtr)
···
563
578
564
579
lineNum++
565
580
566
566
-
// Early exit if we passed the max position we need
581
581
+
// Early exit if we passed the max position
567
582
if lineNum > maxPos {
568
583
break
569
584
}
···
584
599
}
585
600
defer file.Close()
586
601
587
587
-
reader := gozstd.NewReader(file)
588
588
-
defer reader.Close()
602
602
+
// ✅ Use abstracted reader
603
603
+
reader, err := NewStreamingReader(file)
604
604
+
if err != nil {
605
605
+
return 0, 0, time.Time{}, time.Time{}, fmt.Errorf("failed to create reader: %w", err)
606
606
+
}
607
607
+
defer reader.Release()
589
608
590
609
scanner := bufio.NewScanner(reader)
591
610
buf := make([]byte, 64*1024)
+118
internal/storage/zstd.go
···
1
1
+
package storage
2
2
+
3
3
+
import (
4
4
+
"fmt"
5
5
+
"io"
6
6
+
7
7
+
"github.com/valyala/gozstd"
8
8
+
)
9
9
+
10
10
+
// ============================================================================
11
11
+
// ZSTD COMPRESSION ABSTRACTION LAYER
12
12
+
// ============================================================================
13
13
+
// This file provides a clean interface for zstd operations.
14
14
+
// Swap implementations by changing the functions in this file.
15
15
+
16
16
+
const (
17
17
+
// CompressionLevel is the default compression level
18
18
+
CompressionLevel = 2 // Default from zstd
19
19
+
20
20
+
// FrameSize is the number of operations per frame
21
21
+
FrameSize = 100
22
22
+
)
23
23
+
24
24
+
// CompressFrame compresses a single chunk of data into a zstd frame
25
25
+
// with proper content size headers for multi-frame concatenation
26
26
+
func CompressFrame(data []byte) ([]byte, error) {
27
27
+
// ✅ valyala/gozstd.Compress creates proper frames with content size
28
28
+
compressed := gozstd.Compress(nil, data)
29
29
+
return compressed, nil
30
30
+
}
31
31
+
32
32
+
// DecompressAll decompresses all frames in the compressed data
33
33
+
func DecompressAll(compressed []byte) ([]byte, error) {
34
34
+
// ✅ valyala/gozstd.Decompress handles multi-frame
35
35
+
decompressed, err := gozstd.Decompress(nil, compressed)
36
36
+
if err != nil {
37
37
+
return nil, fmt.Errorf("decompression failed: %w", err)
38
38
+
}
39
39
+
return decompressed, nil
40
40
+
}
41
41
+
42
42
+
// DecompressFrame decompresses a single frame
43
43
+
func DecompressFrame(compressedFrame []byte) ([]byte, error) {
44
44
+
return gozstd.Decompress(nil, compressedFrame)
45
45
+
}
46
46
+
47
47
+
// NewStreamingReader creates a streaming decompressor
48
48
+
// Returns a reader that must be released with Release()
49
49
+
func NewStreamingReader(r io.Reader) (StreamReader, error) {
50
50
+
reader := gozstd.NewReader(r)
51
51
+
return &gozstdReader{reader: reader}, nil
52
52
+
}
53
53
+
54
54
+
// NewStreamingWriter creates a streaming compressor at default level
55
55
+
// Returns a writer that must be closed with Close() then released with Release()
56
56
+
func NewStreamingWriter(w io.Writer) (StreamWriter, error) {
57
57
+
writer := gozstd.NewWriterLevel(w, CompressionLevel)
58
58
+
return &gozstdWriter{writer: writer}, nil
59
59
+
}
60
60
+
61
61
+
// ============================================================================
62
62
+
// INTERFACES (for abstraction)
63
63
+
// ============================================================================
64
64
+
65
65
+
// StreamReader is a streaming decompression reader
66
66
+
type StreamReader interface {
67
67
+
io.Reader
68
68
+
io.WriterTo
69
69
+
Release()
70
70
+
}
71
71
+
72
72
+
// StreamWriter is a streaming compression writer
73
73
+
type StreamWriter interface {
74
74
+
io.Writer
75
75
+
io.Closer
76
76
+
Flush() error
77
77
+
Release()
78
78
+
}
79
79
+
80
80
+
// ============================================================================
81
81
+
// WRAPPER TYPES (valyala/gozstd specific)
82
82
+
// ============================================================================
83
83
+
84
84
+
type gozstdReader struct {
85
85
+
reader *gozstd.Reader
86
86
+
}
87
87
+
88
88
+
func (r *gozstdReader) Read(p []byte) (int, error) {
89
89
+
return r.reader.Read(p)
90
90
+
}
91
91
+
92
92
+
func (r *gozstdReader) WriteTo(w io.Writer) (int64, error) {
93
93
+
return r.reader.WriteTo(w)
94
94
+
}
95
95
+
96
96
+
func (r *gozstdReader) Release() {
97
97
+
r.reader.Release()
98
98
+
}
99
99
+
100
100
+
type gozstdWriter struct {
101
101
+
writer *gozstd.Writer
102
102
+
}
103
103
+
104
104
+
func (w *gozstdWriter) Write(p []byte) (int, error) {
105
105
+
return w.writer.Write(p)
106
106
+
}
107
107
+
108
108
+
func (w *gozstdWriter) Close() error {
109
109
+
return w.writer.Close()
110
110
+
}
111
111
+
112
112
+
func (w *gozstdWriter) Flush() error {
113
113
+
return w.writer.Flush()
114
114
+
}
115
115
+
116
116
+
func (w *gozstdWriter) Release() {
117
117
+
w.writer.Release()
118
118
+
}