[DEPRECATED] Go implementation of plcbundle
1package storage
2
3import (
4 "bufio"
5 "bytes"
6 "crypto/sha256"
7 "encoding/hex"
8 "fmt"
9 "io"
10 "os"
11 "path/filepath"
12 "sync"
13 "time"
14
15 "github.com/goccy/go-json"
16 "tangled.org/atscan.net/plcbundle/internal/plcclient"
17)
18
19const (
20 MetadataFormatVersion = 1
21)
22
23// BundleMetadata - Self-describing bundle (content-focused, not container)
24type BundleMetadata struct {
25 // === Format Info ===
26 Format string `json:"format"` // "plcbundle-v1"
27
28 // === Bundle Identity ===
29 Origin string `json:"origin"` // Source PLC directory URL
30 BundleNumber int `json:"bundle_number"` // Sequential bundle number
31
32 // === Content Integrity ===
33 ContentHash string `json:"content_hash"` // SHA256 of uncompressed JSONL content
34 ParentHash string `json:"parent_hash,omitempty"` // Hash of previous bundle (chain)
35
36 // === Content Description ===
37 OperationCount int `json:"operation_count"` // Always 10000 for complete bundles
38 DIDCount int `json:"did_count"` // Unique DIDs in this bundle
39 StartTime time.Time `json:"start_time"` // First operation timestamp
40 EndTime time.Time `json:"end_time"` // Last operation timestamp
41
42 // === Creation Provenance ===
43 CreatedAt time.Time `json:"created_at"` // When bundle was created
44 CreatedBy string `json:"created_by"` // "plcbundle/v1.2.3"
45 CreatedByHost string `json:"created_by_host,omitempty"` // Optional: hostname that created it
46
47 // === Optional Context ===
48 Cursor string `json:"cursor,omitempty"` // PLC export cursor for this bundle
49 Notes string `json:"notes,omitempty"` // Optional description
50
51 // === Frame Structure (for random access) ===
52 FrameCount int `json:"frame_count"` // Number of zstd frames (usually 100)
53 FrameSize int `json:"frame_size"` // Operations per frame (100)
54 FrameOffsets []int64 `json:"frame_offsets"` // Byte offsets of each frame
55}
56
57// Operations handles low-level bundle file operations
58type Operations struct {
59 logger Logger
60 verbose bool
61}
62
63// Logger interface
64type Logger interface {
65 Printf(format string, v ...interface{})
66 Println(v ...interface{})
67}
68
69func NewOperations(logger Logger, verbose bool) (*Operations, error) {
70 return &Operations{
71 logger: logger,
72 verbose: verbose,
73 }, nil
74}
75
76func (op *Operations) Close() {
77 // Nothing to close
78}
79
80// BundleInfo contains info needed to create metadata
81type BundleInfo struct {
82 BundleNumber int
83 Origin string
84 ParentHash string
85 Cursor string
86 CreatedBy string // "plcbundle/v1.2.3"
87 Hostname string // Optional
88}
89
90// ========================================
91// CORE SERIALIZATION (JSONL)
92// ========================================
93
94// SerializeJSONL serializes operations to newline-delimited JSON
95func (op *Operations) SerializeJSONL(operations []plcclient.PLCOperation) []byte {
96 var buf bytes.Buffer
97
98 for _, operation := range operations {
99 if len(operation.RawJSON) > 0 {
100 buf.Write(operation.RawJSON)
101 } else {
102 data, _ := json.Marshal(operation)
103 buf.Write(data)
104 }
105 buf.WriteByte('\n')
106 }
107
108 return buf.Bytes()
109}
110
111// ParseJSONL parses newline-delimited JSON into operations
112func (op *Operations) ParseJSONL(data []byte) ([]plcclient.PLCOperation, error) {
113 var operations []plcclient.PLCOperation
114 scanner := bufio.NewScanner(bytes.NewReader(data))
115 buf := make([]byte, 0, 64*1024)
116 scanner.Buffer(buf, 1024*1024)
117
118 for scanner.Scan() {
119 line := scanner.Bytes()
120 if len(line) == 0 {
121 continue
122 }
123
124 var operation plcclient.PLCOperation
125 if err := json.UnmarshalNoEscape(line, &operation); err != nil {
126 return nil, fmt.Errorf("failed to parse line: %w", err)
127 }
128
129 operation.RawJSON = make([]byte, len(line))
130 copy(operation.RawJSON, line)
131 operations = append(operations, operation)
132 }
133
134 return operations, nil
135}
136
137// ========================================
138// FILE OPERATIONS (using zstd abstraction)
139// ========================================
140
141// SaveBundle saves operations with metadata containing RELATIVE frame offsets
142func (op *Operations) SaveBundle(path string, operations []plcclient.PLCOperation, bundleInfo *BundleInfo) (string, string, int64, int64, error) {
143 if bundleInfo == nil {
144 return "", "", 0, 0, fmt.Errorf("bundleInfo cannot be nil")
145 }
146
147 // 1. Calculate content
148 jsonlData := op.SerializeJSONL(operations)
149 contentSize := int64(len(jsonlData))
150 contentHash := op.Hash(jsonlData)
151 dids := op.ExtractUniqueDIDs(operations)
152
153 hostnameHash := ""
154 if bundleInfo.Hostname != "" {
155 hostnameHash = op.Hash([]byte(bundleInfo.Hostname))[:16] // First 16 chars (64 bits)
156 }
157
158 // 2. Compress all frames
159 compressedFrames := make([][]byte, 0)
160
161 for i := 0; i < len(operations); i += FrameSize {
162 end := i + FrameSize
163 if end > len(operations) {
164 end = len(operations)
165 }
166 opChunk := operations[i:end]
167 chunkJsonlData := op.SerializeJSONL(opChunk)
168
169 compressedChunk, err := CompressFrame(chunkJsonlData)
170 if err != nil {
171 return "", "", 0, 0, fmt.Errorf("failed to compress frame: %w", err)
172 }
173
174 compressedFrames = append(compressedFrames, compressedChunk)
175 }
176
177 // 3. Calculate RELATIVE offsets (relative to first data frame)
178 relativeOffsets := make([]int64, len(compressedFrames)+1)
179 relativeOffsets[0] = 0
180
181 cumulative := int64(0)
182 for i, frame := range compressedFrames {
183 cumulative += int64(len(frame))
184 relativeOffsets[i+1] = cumulative
185 }
186
187 // 4. Build metadata with RELATIVE offsets
188 metadata := &BundleMetadata{
189 Format: fmt.Sprintf("plcbundle-v%d", MetadataFormatVersion),
190 BundleNumber: bundleInfo.BundleNumber,
191 Origin: bundleInfo.Origin,
192 CreatedAt: time.Now().UTC(),
193 CreatedBy: bundleInfo.CreatedBy,
194 CreatedByHost: hostnameHash,
195 ContentHash: contentHash,
196 ParentHash: bundleInfo.ParentHash,
197 OperationCount: len(operations),
198 DIDCount: len(dids),
199 FrameCount: len(compressedFrames),
200 FrameSize: FrameSize,
201 Cursor: bundleInfo.Cursor,
202 FrameOffsets: relativeOffsets, // RELATIVE to data start!
203 }
204
205 if len(operations) > 0 {
206 metadata.StartTime = operations[0].CreatedAt
207 metadata.EndTime = operations[len(operations)-1].CreatedAt
208 }
209
210 // 5. Write final file
211 finalFile, err := os.Create(path)
212 if err != nil {
213 return "", "", 0, 0, fmt.Errorf("failed to create file: %w", err)
214 }
215 defer func() {
216 finalFile.Close()
217 if err != nil {
218 os.Remove(path)
219 }
220 }()
221
222 // Write metadata frame
223 if _, err := op.WriteMetadataFrame(finalFile, metadata); err != nil {
224 return "", "", 0, 0, fmt.Errorf("failed to write metadata: %w", err)
225 }
226
227 // Write all data frames
228 for _, frame := range compressedFrames {
229 if _, err := finalFile.Write(frame); err != nil {
230 return "", "", 0, 0, fmt.Errorf("failed to write frame: %w", err)
231 }
232 }
233
234 finalFile.Sync()
235 finalFile.Close()
236
237 // 6. Hash
238 compressedData, err := os.ReadFile(path)
239 if err != nil {
240 return "", "", 0, 0, err
241 }
242 compressedHash := op.Hash(compressedData)
243
244 os.Remove(path + ".idx")
245
246 return contentHash, compressedHash, contentSize, int64(len(compressedData)), nil
247}
248
249// LoadBundle loads a compressed bundle
250func (op *Operations) LoadBundle(path string) ([]plcclient.PLCOperation, error) {
251 file, err := os.Open(path)
252 if err != nil {
253 return nil, fmt.Errorf("failed to open file: %w", err)
254 }
255 defer file.Close()
256
257 // Use abstracted streaming reader
258 reader, err := NewStreamingReader(file)
259 if err != nil {
260 return nil, fmt.Errorf("failed to create reader: %w", err)
261 }
262 defer reader.Release()
263
264 // Read all decompressed data from all frames
265 decompressed, err := io.ReadAll(reader)
266 if err != nil {
267 return nil, fmt.Errorf("failed to decompress: %w", err)
268 }
269
270 // DEFENSIVE: Validate we got actual data
271 if len(decompressed) == 0 {
272 return nil, fmt.Errorf("decompression produced empty result")
273 }
274
275 // Parse JSONL
276 operations, err := op.ParseJSONL(decompressed)
277 if err != nil {
278 return nil, fmt.Errorf("failed to parse JSONL: %w", err)
279 }
280
281 // DEFENSIVE: Additional validation
282 if len(operations) == 0 {
283 return nil, fmt.Errorf("bundle contains no valid operations")
284 }
285
286 return operations, nil
287}
288
289// ========================================
290// STREAMING
291// ========================================
292
293// StreamRaw returns a reader for the raw compressed bundle file
294func (op *Operations) StreamRaw(path string) (io.ReadCloser, error) {
295 file, err := os.Open(path)
296 if err != nil {
297 return nil, fmt.Errorf("failed to open bundle: %w", err)
298 }
299 return file, nil
300}
301
302// StreamDecompressed returns a reader for decompressed bundle data
303func (op *Operations) StreamDecompressed(path string) (io.ReadCloser, error) {
304 file, err := os.Open(path)
305 if err != nil {
306 return nil, fmt.Errorf("failed to open bundle: %w", err)
307 }
308
309 // Use abstracted reader
310 reader, err := NewStreamingReader(file)
311 if err != nil {
312 file.Close()
313 return nil, fmt.Errorf("failed to create reader: %w", err)
314 }
315
316 return &decompressedReader{
317 reader: reader,
318 file: file,
319 }, nil
320}
321
322// decompressedReader wraps a zstd decoder and underlying file
323type decompressedReader struct {
324 reader StreamReader
325 file *os.File
326}
327
328func (dr *decompressedReader) Read(p []byte) (int, error) {
329 return dr.reader.Read(p)
330}
331
332func (dr *decompressedReader) Close() error {
333 dr.reader.Release()
334 return dr.file.Close()
335}
336
337// ========================================
338// HASHING
339// ========================================
340
341// Hash computes SHA256 hash of data
342func (op *Operations) Hash(data []byte) string {
343 h := sha256.Sum256(data)
344 return hex.EncodeToString(h[:])
345}
346
347// CalculateChainHash calculates the cumulative chain hash
348func (op *Operations) CalculateChainHash(parent string, contentHash string) string {
349 var data string
350 if parent == "" {
351 data = "plcbundle:genesis:" + contentHash
352 } else {
353 data = parent + ":" + contentHash
354 }
355 return op.Hash([]byte(data))
356}
357
358// CalculateFileHashes calculates both content and compressed hashes efficiently
359func (op *Operations) CalculateFileHashes(path string) (compressedHash string, compressedSize int64, contentHash string, contentSize int64, err error) {
360 // Read compressed file
361 compressedData, err := os.ReadFile(path)
362 if err != nil {
363 return "", 0, "", 0, fmt.Errorf("failed to read file: %w", err)
364 }
365
366 compressedHash = op.Hash(compressedData)
367 compressedSize = int64(len(compressedData))
368
369 // Use abstracted decompression
370 decompressed, err := DecompressAll(compressedData)
371 if err != nil {
372 return "", 0, "", 0, fmt.Errorf("failed to decompress: %w", err)
373 }
374
375 contentHash = op.Hash(decompressed)
376 contentSize = int64(len(decompressed))
377
378 return compressedHash, compressedSize, contentHash, contentSize, nil
379}
380
381// VerifyHash verifies the hash of a bundle file
382func (op *Operations) VerifyHash(path string, expectedHash string) (bool, string, error) {
383 data, err := os.ReadFile(path)
384 if err != nil {
385 return false, "", fmt.Errorf("failed to read file: %w", err)
386 }
387
388 actualHash := op.Hash(data)
389 return actualHash == expectedHash, actualHash, nil
390}
391
392// ========================================
393// UTILITY FUNCTIONS
394// ========================================
395
396// FileExists checks if a file exists
397func (op *Operations) FileExists(path string) bool {
398 _, err := os.Stat(path)
399 return err == nil
400}
401
402// GetFileSize returns the size of a file
403func (op *Operations) GetFileSize(path string) (int64, error) {
404 info, err := os.Stat(path)
405 if err != nil {
406 return 0, err
407 }
408 return info.Size(), nil
409}
410
411// ExtractUniqueDIDs extracts unique DIDs from operations
412func (op *Operations) ExtractUniqueDIDs(operations []plcclient.PLCOperation) []string {
413 didSet := make(map[string]bool)
414 for _, operation := range operations {
415 didSet[operation.DID] = true
416 }
417
418 dids := make([]string, 0, len(didSet))
419 for did := range didSet {
420 dids = append(dids, did)
421 }
422
423 return dids
424}
425
426// GetBoundaryCIDs returns CIDs that share the same timestamp as the last operation
427func (op *Operations) GetBoundaryCIDs(operations []plcclient.PLCOperation) (time.Time, map[string]bool) {
428 if len(operations) == 0 {
429 return time.Time{}, nil
430 }
431
432 lastOp := operations[len(operations)-1]
433 boundaryTime := lastOp.CreatedAt
434 cidSet := make(map[string]bool)
435
436 // Walk backwards from the end
437 for i := len(operations) - 1; i >= 0; i-- {
438 op := operations[i]
439 if op.CreatedAt.Equal(boundaryTime) {
440 cidSet[op.CID] = true
441 } else {
442 break
443 }
444 }
445
446 return boundaryTime, cidSet
447}
448
449// StripBoundaryDuplicates removes operations that are in prevBoundaryCIDs
450func (op *Operations) StripBoundaryDuplicates(operations []plcclient.PLCOperation, boundaryTimestamp string, prevBoundaryCIDs map[string]bool) []plcclient.PLCOperation {
451 if len(operations) == 0 || len(prevBoundaryCIDs) == 0 {
452 return operations
453 }
454
455 boundaryTime, err := time.Parse(time.RFC3339Nano, boundaryTimestamp)
456 if err != nil {
457 return operations
458 }
459
460 startIdx := 0
461 for startIdx < len(operations) {
462 op := operations[startIdx]
463
464 if op.CreatedAt.After(boundaryTime) {
465 break
466 }
467
468 if op.CreatedAt.Equal(boundaryTime) && prevBoundaryCIDs[op.CID] {
469 startIdx++
470 continue
471 }
472
473 break
474 }
475
476 return operations[startIdx:]
477}
478
479// Pool for scanner buffers
480var scannerBufPool = sync.Pool{
481 New: func() interface{} {
482 buf := make([]byte, 64*1024)
483 return &buf
484 },
485}
486
487// ========================================
488// POSITION-BASED LOADING (with frame index)
489// ========================================
490
491// LoadOperationAtPosition loads a single operation from a bundle
492func (op *Operations) LoadOperationAtPosition(path string, position int) (*plcclient.PLCOperation, error) {
493 if position < 0 {
494 return nil, fmt.Errorf("invalid position: %d", position)
495 }
496
497 // Try multiple sources for frame index (no goto!)
498 frameOffsets, err := op.loadFrameIndex(path)
499 if err != nil {
500 // No frame index available - use legacy full scan
501 if op.logger != nil {
502 op.logger.Printf("No frame index found for %s, using legacy scan", filepath.Base(path))
503 }
504 return op.loadOperationAtPositionLegacy(path, position)
505 }
506
507 // We have frame index - use it for fast random access
508 return op.loadOperationFromFrame(path, position, frameOffsets)
509}
510
511// loadFrameIndex loads frame offsets and converts to absolute positions
512func (op *Operations) loadFrameIndex(path string) ([]int64, error) {
513 // Try embedded metadata first
514 meta, err := op.ExtractMetadataFromFile(path)
515 if err == nil && len(meta.FrameOffsets) > 0 {
516 // Convert relative offsets to absolute
517 // First, get metadata frame size by re-reading
518 file, _ := os.Open(path)
519 if file != nil {
520 defer file.Close()
521
522 // Read metadata frame to find where data starts
523 magic, data, readErr := op.ReadSkippableFrame(file)
524 if readErr == nil && magic == SkippableMagicMetadata {
525 // Metadata frame size = 4 (magic) + 4 (size) + len(data)
526 metadataFrameSize := int64(8 + len(data))
527
528 // Convert relative to absolute
529 absoluteOffsets := make([]int64, len(meta.FrameOffsets))
530 for i, relOffset := range meta.FrameOffsets {
531 absoluteOffsets[i] = metadataFrameSize + relOffset
532 }
533
534 return absoluteOffsets, nil
535 }
536 }
537 }
538
539 // Fallback to external .idx file
540 indexPath := path + ".idx"
541 indexData, err := os.ReadFile(indexPath)
542 if err != nil {
543 return nil, fmt.Errorf("no frame index available: %w", err)
544 }
545
546 var offsets []int64
547 if err := json.Unmarshal(indexData, &offsets); err != nil {
548 return nil, fmt.Errorf("invalid frame index: %w", err)
549 }
550
551 return offsets, nil
552}
553
554// loadOperationFromFrame loads operation using frame index
555func (op *Operations) loadOperationFromFrame(path string, position int, frameOffsets []int64) (*plcclient.PLCOperation, error) {
556 frameIndex := position / FrameSize
557 lineInFrame := position % FrameSize
558
559 if frameIndex >= len(frameOffsets)-1 {
560 return nil, fmt.Errorf("position %d out of bounds (frame %d, total frames %d)",
561 position, frameIndex, len(frameOffsets)-1)
562 }
563
564 startOffset := frameOffsets[frameIndex]
565 endOffset := frameOffsets[frameIndex+1]
566 frameLength := endOffset - startOffset
567
568 if frameLength <= 0 || frameLength > 10*1024*1024 {
569 return nil, fmt.Errorf("invalid frame length: %d (offsets: %d-%d)",
570 frameLength, startOffset, endOffset)
571 }
572
573 bundleFile, err := os.Open(path)
574 if err != nil {
575 return nil, fmt.Errorf("failed to open bundle: %w", err)
576 }
577 defer bundleFile.Close()
578
579 compressedFrame := make([]byte, frameLength)
580 _, err = bundleFile.ReadAt(compressedFrame, startOffset)
581 if err != nil {
582 return nil, fmt.Errorf("failed to read frame %d (offset %d, length %d): %w",
583 frameIndex, startOffset, frameLength, err)
584 }
585
586 // Decompress
587 decompressed, err := DecompressFrame(compressedFrame)
588 if err != nil {
589 if op.logger != nil {
590 preview := compressedFrame
591 if len(preview) > 16 {
592 preview = preview[:16]
593 }
594 if op.verbose {
595 op.logger.Printf("DEBUG: Failed frame data (first 16 bytes): % x", preview)
596 }
597 }
598 return nil, fmt.Errorf("failed to decompress frame %d: %w", frameIndex, err)
599 }
600
601 // Scan to find the line
602 scanner := bufio.NewScanner(bytes.NewReader(decompressed))
603 lineNum := 0
604
605 for scanner.Scan() {
606 if lineNum == lineInFrame {
607 line := scanner.Bytes()
608 var operation plcclient.PLCOperation
609 if err := json.UnmarshalNoEscape(line, &operation); err != nil {
610 return nil, fmt.Errorf("failed to parse operation at position %d: %w", position, err)
611 }
612 operation.RawJSON = make([]byte, len(line))
613 copy(operation.RawJSON, line)
614 return &operation, nil
615 }
616 lineNum++
617 }
618
619 if err := scanner.Err(); err != nil {
620 return nil, fmt.Errorf("scanner error on frame %d: %w", frameIndex, err)
621 }
622
623 return nil, fmt.Errorf("position %d not found in frame %d", position, frameIndex)
624}
625
626// loadOperationAtPositionLegacy loads operation from old single-frame bundles
627func (op *Operations) loadOperationAtPositionLegacy(path string, position int) (*plcclient.PLCOperation, error) {
628 file, err := os.Open(path)
629 if err != nil {
630 return nil, fmt.Errorf("failed to open file: %w", err)
631 }
632 defer file.Close()
633
634 // Use abstracted streaming reader
635 reader, err := NewStreamingReader(file)
636 if err != nil {
637 return nil, fmt.Errorf("failed to create reader: %w", err)
638 }
639 defer reader.Release()
640
641 scanner := bufio.NewScanner(reader)
642 buf := make([]byte, 512*1024)
643 scanner.Buffer(buf, 1024*1024)
644
645 lineNum := 0
646 for scanner.Scan() {
647 if lineNum == position {
648 line := scanner.Bytes()
649 var operation plcclient.PLCOperation
650 if err := json.UnmarshalNoEscape(line, &operation); err != nil {
651 return nil, fmt.Errorf("failed to parse operation at position %d: %w", position, err)
652 }
653 operation.RawJSON = make([]byte, len(line))
654 copy(operation.RawJSON, line)
655 return &operation, nil
656 }
657 lineNum++
658 }
659
660 if err := scanner.Err(); err != nil {
661 return nil, fmt.Errorf("scanner error: %w", err)
662 }
663
664 return nil, fmt.Errorf("position %d not found in bundle", position)
665}
666
667// LoadOperationsAtPositions loads multiple operations from a bundle in one pass
668func (op *Operations) LoadOperationsAtPositions(path string, positions []int) (map[int]*plcclient.PLCOperation, error) {
669 if len(positions) == 0 {
670 return make(map[int]*plcclient.PLCOperation), nil
671 }
672
673 // Create position set for fast lookup
674 posSet := make(map[int]bool)
675 maxPos := 0
676 for _, pos := range positions {
677 if pos < 0 {
678 continue
679 }
680 posSet[pos] = true
681 if pos > maxPos {
682 maxPos = pos
683 }
684 }
685
686 file, err := os.Open(path)
687 if err != nil {
688 return nil, fmt.Errorf("failed to open file: %w", err)
689 }
690 defer file.Close()
691
692 // Use abstracted streaming reader
693 reader, err := NewStreamingReader(file)
694 if err != nil {
695 return nil, fmt.Errorf("failed to create reader: %w", err)
696 }
697 defer reader.Release()
698
699 bufPtr := scannerBufPool.Get().(*[]byte)
700 defer scannerBufPool.Put(bufPtr)
701
702 scanner := bufio.NewScanner(reader)
703 scanner.Buffer(*bufPtr, 512*1024)
704
705 results := make(map[int]*plcclient.PLCOperation)
706 lineNum := 0
707
708 for scanner.Scan() {
709 // Early exit if we found everything
710 if len(results) == len(posSet) {
711 break
712 }
713
714 // Only parse if this position is requested
715 if posSet[lineNum] {
716 line := scanner.Bytes()
717 var operation plcclient.PLCOperation
718 if err := json.UnmarshalNoEscape(line, &operation); err != nil {
719 return nil, fmt.Errorf("failed to parse operation at position %d: %w", lineNum, err)
720 }
721
722 operation.RawJSON = make([]byte, len(line))
723 copy(operation.RawJSON, line)
724 results[lineNum] = &operation
725 }
726
727 lineNum++
728
729 // Early exit if we passed the max position
730 if lineNum > maxPos {
731 break
732 }
733 }
734
735 if err := scanner.Err(); err != nil {
736 return nil, fmt.Errorf("scanner error: %w", err)
737 }
738
739 return results, nil
740}
741
742// CalculateMetadataWithoutLoading calculates metadata by streaming (no full load)
743func (op *Operations) CalculateMetadataWithoutLoading(path string) (opCount int, didCount int, startTime, endTime time.Time, err error) {
744 file, err := os.Open(path)
745 if err != nil {
746 return 0, 0, time.Time{}, time.Time{}, err
747 }
748 defer file.Close()
749
750 // Use abstracted reader
751 reader, err := NewStreamingReader(file)
752 if err != nil {
753 return 0, 0, time.Time{}, time.Time{}, fmt.Errorf("failed to create reader: %w", err)
754 }
755 defer reader.Release()
756
757 scanner := bufio.NewScanner(reader)
758 buf := make([]byte, 64*1024)
759 scanner.Buffer(buf, 1024*1024)
760
761 didSet := make(map[string]bool)
762 lineNum := 0
763
764 for scanner.Scan() {
765 line := scanner.Bytes()
766 if len(line) == 0 {
767 continue
768 }
769
770 // Only parse minimal fields needed for metadata
771 var op struct {
772 DID string `json:"did"`
773 CreatedAt time.Time `json:"createdAt"`
774 }
775
776 if err := json.Unmarshal(line, &op); err != nil {
777 continue
778 }
779
780 if lineNum == 0 {
781 startTime = op.CreatedAt
782 }
783 endTime = op.CreatedAt
784
785 didSet[op.DID] = true
786 lineNum++
787 }
788
789 return lineNum, len(didSet), startTime, endTime, scanner.Err()
790}
791
792// ExtractBundleMetadata extracts metadata from bundle file without decompressing
793func (op *Operations) ExtractBundleMetadata(path string) (*BundleMetadata, error) {
794 meta, err := op.ExtractMetadataFromFile(path)
795 if err != nil {
796 return nil, fmt.Errorf("failed to extract metadata: %w", err)
797 }
798 return meta, nil
799}
800
801// LoadBundleWithMetadata loads bundle and returns both data and embedded metadata
802func (op *Operations) LoadBundleWithMetadata(path string) ([]plcclient.PLCOperation, *BundleMetadata, error) {
803 file, err := os.Open(path)
804 if err != nil {
805 return nil, nil, fmt.Errorf("failed to open file: %w", err)
806 }
807 defer file.Close()
808
809 // 1. Try to read metadata frame first
810 meta, err := op.ReadMetadataFrame(file)
811 if err != nil {
812 // No metadata frame - fall back to regular load
813 file.Seek(0, io.SeekStart) // Reset to beginning
814 ops, err := op.loadFromReader(file)
815 return ops, nil, err
816 }
817
818 // 2. Read compressed data (file position is now after metadata frame)
819 ops, err := op.loadFromReader(file)
820 if err != nil {
821 return nil, nil, err
822 }
823
824 return ops, meta, nil
825}
826
827// loadFromReader loads operations from a reader (internal helper)
828func (op *Operations) loadFromReader(r io.Reader) ([]plcclient.PLCOperation, error) {
829 reader, err := NewStreamingReader(r)
830 if err != nil {
831 return nil, fmt.Errorf("failed to create reader: %w", err)
832 }
833 defer reader.Release()
834
835 decompressed, err := io.ReadAll(reader)
836 if err != nil {
837 return nil, fmt.Errorf("failed to decompress: %w", err)
838 }
839
840 return op.ParseJSONL(decompressed)
841}