[DEPRECATED] Go implementation of plcbundle
at rust-test 841 lines 23 kB view raw
1package storage 2 3import ( 4 "bufio" 5 "bytes" 6 "crypto/sha256" 7 "encoding/hex" 8 "fmt" 9 "io" 10 "os" 11 "path/filepath" 12 "sync" 13 "time" 14 15 "github.com/goccy/go-json" 16 "tangled.org/atscan.net/plcbundle/internal/plcclient" 17) 18 19const ( 20 MetadataFormatVersion = 1 21) 22 23// BundleMetadata - Self-describing bundle (content-focused, not container) 24type BundleMetadata struct { 25 // === Format Info === 26 Format string `json:"format"` // "plcbundle-v1" 27 28 // === Bundle Identity === 29 Origin string `json:"origin"` // Source PLC directory URL 30 BundleNumber int `json:"bundle_number"` // Sequential bundle number 31 32 // === Content Integrity === 33 ContentHash string `json:"content_hash"` // SHA256 of uncompressed JSONL content 34 ParentHash string `json:"parent_hash,omitempty"` // Hash of previous bundle (chain) 35 36 // === Content Description === 37 OperationCount int `json:"operation_count"` // Always 10000 for complete bundles 38 DIDCount int `json:"did_count"` // Unique DIDs in this bundle 39 StartTime time.Time `json:"start_time"` // First operation timestamp 40 EndTime time.Time `json:"end_time"` // Last operation timestamp 41 42 // === Creation Provenance === 43 CreatedAt time.Time `json:"created_at"` // When bundle was created 44 CreatedBy string `json:"created_by"` // "plcbundle/v1.2.3" 45 CreatedByHost string `json:"created_by_host,omitempty"` // Optional: hostname that created it 46 47 // === Optional Context === 48 Cursor string `json:"cursor,omitempty"` // PLC export cursor for this bundle 49 Notes string `json:"notes,omitempty"` // Optional description 50 51 // === Frame Structure (for random access) === 52 FrameCount int `json:"frame_count"` // Number of zstd frames (usually 100) 53 FrameSize int `json:"frame_size"` // Operations per frame (100) 54 FrameOffsets []int64 `json:"frame_offsets"` // Byte offsets of each frame 55} 56 57// Operations handles low-level bundle file operations 58type Operations struct { 59 logger Logger 60 verbose bool 61} 62 63// Logger interface 64type Logger interface { 65 Printf(format string, v ...interface{}) 66 Println(v ...interface{}) 67} 68 69func NewOperations(logger Logger, verbose bool) (*Operations, error) { 70 return &Operations{ 71 logger: logger, 72 verbose: verbose, 73 }, nil 74} 75 76func (op *Operations) Close() { 77 // Nothing to close 78} 79 80// BundleInfo contains info needed to create metadata 81type BundleInfo struct { 82 BundleNumber int 83 Origin string 84 ParentHash string 85 Cursor string 86 CreatedBy string // "plcbundle/v1.2.3" 87 Hostname string // Optional 88} 89 90// ======================================== 91// CORE SERIALIZATION (JSONL) 92// ======================================== 93 94// SerializeJSONL serializes operations to newline-delimited JSON 95func (op *Operations) SerializeJSONL(operations []plcclient.PLCOperation) []byte { 96 var buf bytes.Buffer 97 98 for _, operation := range operations { 99 if len(operation.RawJSON) > 0 { 100 buf.Write(operation.RawJSON) 101 } else { 102 data, _ := json.Marshal(operation) 103 buf.Write(data) 104 } 105 buf.WriteByte('\n') 106 } 107 108 return buf.Bytes() 109} 110 111// ParseJSONL parses newline-delimited JSON into operations 112func (op *Operations) ParseJSONL(data []byte) ([]plcclient.PLCOperation, error) { 113 var operations []plcclient.PLCOperation 114 scanner := bufio.NewScanner(bytes.NewReader(data)) 115 buf := make([]byte, 0, 64*1024) 116 scanner.Buffer(buf, 1024*1024) 117 118 for scanner.Scan() { 119 line := scanner.Bytes() 120 if len(line) == 0 { 121 continue 122 } 123 124 var operation plcclient.PLCOperation 125 if err := json.UnmarshalNoEscape(line, &operation); err != nil { 126 return nil, fmt.Errorf("failed to parse line: %w", err) 127 } 128 129 operation.RawJSON = make([]byte, len(line)) 130 copy(operation.RawJSON, line) 131 operations = append(operations, operation) 132 } 133 134 return operations, nil 135} 136 137// ======================================== 138// FILE OPERATIONS (using zstd abstraction) 139// ======================================== 140 141// SaveBundle saves operations with metadata containing RELATIVE frame offsets 142func (op *Operations) SaveBundle(path string, operations []plcclient.PLCOperation, bundleInfo *BundleInfo) (string, string, int64, int64, error) { 143 if bundleInfo == nil { 144 return "", "", 0, 0, fmt.Errorf("bundleInfo cannot be nil") 145 } 146 147 // 1. Calculate content 148 jsonlData := op.SerializeJSONL(operations) 149 contentSize := int64(len(jsonlData)) 150 contentHash := op.Hash(jsonlData) 151 dids := op.ExtractUniqueDIDs(operations) 152 153 hostnameHash := "" 154 if bundleInfo.Hostname != "" { 155 hostnameHash = op.Hash([]byte(bundleInfo.Hostname))[:16] // First 16 chars (64 bits) 156 } 157 158 // 2. Compress all frames 159 compressedFrames := make([][]byte, 0) 160 161 for i := 0; i < len(operations); i += FrameSize { 162 end := i + FrameSize 163 if end > len(operations) { 164 end = len(operations) 165 } 166 opChunk := operations[i:end] 167 chunkJsonlData := op.SerializeJSONL(opChunk) 168 169 compressedChunk, err := CompressFrame(chunkJsonlData) 170 if err != nil { 171 return "", "", 0, 0, fmt.Errorf("failed to compress frame: %w", err) 172 } 173 174 compressedFrames = append(compressedFrames, compressedChunk) 175 } 176 177 // 3. Calculate RELATIVE offsets (relative to first data frame) 178 relativeOffsets := make([]int64, len(compressedFrames)+1) 179 relativeOffsets[0] = 0 180 181 cumulative := int64(0) 182 for i, frame := range compressedFrames { 183 cumulative += int64(len(frame)) 184 relativeOffsets[i+1] = cumulative 185 } 186 187 // 4. Build metadata with RELATIVE offsets 188 metadata := &BundleMetadata{ 189 Format: fmt.Sprintf("plcbundle-v%d", MetadataFormatVersion), 190 BundleNumber: bundleInfo.BundleNumber, 191 Origin: bundleInfo.Origin, 192 CreatedAt: time.Now().UTC(), 193 CreatedBy: bundleInfo.CreatedBy, 194 CreatedByHost: hostnameHash, 195 ContentHash: contentHash, 196 ParentHash: bundleInfo.ParentHash, 197 OperationCount: len(operations), 198 DIDCount: len(dids), 199 FrameCount: len(compressedFrames), 200 FrameSize: FrameSize, 201 Cursor: bundleInfo.Cursor, 202 FrameOffsets: relativeOffsets, // RELATIVE to data start! 203 } 204 205 if len(operations) > 0 { 206 metadata.StartTime = operations[0].CreatedAt 207 metadata.EndTime = operations[len(operations)-1].CreatedAt 208 } 209 210 // 5. Write final file 211 finalFile, err := os.Create(path) 212 if err != nil { 213 return "", "", 0, 0, fmt.Errorf("failed to create file: %w", err) 214 } 215 defer func() { 216 finalFile.Close() 217 if err != nil { 218 os.Remove(path) 219 } 220 }() 221 222 // Write metadata frame 223 if _, err := op.WriteMetadataFrame(finalFile, metadata); err != nil { 224 return "", "", 0, 0, fmt.Errorf("failed to write metadata: %w", err) 225 } 226 227 // Write all data frames 228 for _, frame := range compressedFrames { 229 if _, err := finalFile.Write(frame); err != nil { 230 return "", "", 0, 0, fmt.Errorf("failed to write frame: %w", err) 231 } 232 } 233 234 finalFile.Sync() 235 finalFile.Close() 236 237 // 6. Hash 238 compressedData, err := os.ReadFile(path) 239 if err != nil { 240 return "", "", 0, 0, err 241 } 242 compressedHash := op.Hash(compressedData) 243 244 os.Remove(path + ".idx") 245 246 return contentHash, compressedHash, contentSize, int64(len(compressedData)), nil 247} 248 249// LoadBundle loads a compressed bundle 250func (op *Operations) LoadBundle(path string) ([]plcclient.PLCOperation, error) { 251 file, err := os.Open(path) 252 if err != nil { 253 return nil, fmt.Errorf("failed to open file: %w", err) 254 } 255 defer file.Close() 256 257 // Use abstracted streaming reader 258 reader, err := NewStreamingReader(file) 259 if err != nil { 260 return nil, fmt.Errorf("failed to create reader: %w", err) 261 } 262 defer reader.Release() 263 264 // Read all decompressed data from all frames 265 decompressed, err := io.ReadAll(reader) 266 if err != nil { 267 return nil, fmt.Errorf("failed to decompress: %w", err) 268 } 269 270 // DEFENSIVE: Validate we got actual data 271 if len(decompressed) == 0 { 272 return nil, fmt.Errorf("decompression produced empty result") 273 } 274 275 // Parse JSONL 276 operations, err := op.ParseJSONL(decompressed) 277 if err != nil { 278 return nil, fmt.Errorf("failed to parse JSONL: %w", err) 279 } 280 281 // DEFENSIVE: Additional validation 282 if len(operations) == 0 { 283 return nil, fmt.Errorf("bundle contains no valid operations") 284 } 285 286 return operations, nil 287} 288 289// ======================================== 290// STREAMING 291// ======================================== 292 293// StreamRaw returns a reader for the raw compressed bundle file 294func (op *Operations) StreamRaw(path string) (io.ReadCloser, error) { 295 file, err := os.Open(path) 296 if err != nil { 297 return nil, fmt.Errorf("failed to open bundle: %w", err) 298 } 299 return file, nil 300} 301 302// StreamDecompressed returns a reader for decompressed bundle data 303func (op *Operations) StreamDecompressed(path string) (io.ReadCloser, error) { 304 file, err := os.Open(path) 305 if err != nil { 306 return nil, fmt.Errorf("failed to open bundle: %w", err) 307 } 308 309 // Use abstracted reader 310 reader, err := NewStreamingReader(file) 311 if err != nil { 312 file.Close() 313 return nil, fmt.Errorf("failed to create reader: %w", err) 314 } 315 316 return &decompressedReader{ 317 reader: reader, 318 file: file, 319 }, nil 320} 321 322// decompressedReader wraps a zstd decoder and underlying file 323type decompressedReader struct { 324 reader StreamReader 325 file *os.File 326} 327 328func (dr *decompressedReader) Read(p []byte) (int, error) { 329 return dr.reader.Read(p) 330} 331 332func (dr *decompressedReader) Close() error { 333 dr.reader.Release() 334 return dr.file.Close() 335} 336 337// ======================================== 338// HASHING 339// ======================================== 340 341// Hash computes SHA256 hash of data 342func (op *Operations) Hash(data []byte) string { 343 h := sha256.Sum256(data) 344 return hex.EncodeToString(h[:]) 345} 346 347// CalculateChainHash calculates the cumulative chain hash 348func (op *Operations) CalculateChainHash(parent string, contentHash string) string { 349 var data string 350 if parent == "" { 351 data = "plcbundle:genesis:" + contentHash 352 } else { 353 data = parent + ":" + contentHash 354 } 355 return op.Hash([]byte(data)) 356} 357 358// CalculateFileHashes calculates both content and compressed hashes efficiently 359func (op *Operations) CalculateFileHashes(path string) (compressedHash string, compressedSize int64, contentHash string, contentSize int64, err error) { 360 // Read compressed file 361 compressedData, err := os.ReadFile(path) 362 if err != nil { 363 return "", 0, "", 0, fmt.Errorf("failed to read file: %w", err) 364 } 365 366 compressedHash = op.Hash(compressedData) 367 compressedSize = int64(len(compressedData)) 368 369 // Use abstracted decompression 370 decompressed, err := DecompressAll(compressedData) 371 if err != nil { 372 return "", 0, "", 0, fmt.Errorf("failed to decompress: %w", err) 373 } 374 375 contentHash = op.Hash(decompressed) 376 contentSize = int64(len(decompressed)) 377 378 return compressedHash, compressedSize, contentHash, contentSize, nil 379} 380 381// VerifyHash verifies the hash of a bundle file 382func (op *Operations) VerifyHash(path string, expectedHash string) (bool, string, error) { 383 data, err := os.ReadFile(path) 384 if err != nil { 385 return false, "", fmt.Errorf("failed to read file: %w", err) 386 } 387 388 actualHash := op.Hash(data) 389 return actualHash == expectedHash, actualHash, nil 390} 391 392// ======================================== 393// UTILITY FUNCTIONS 394// ======================================== 395 396// FileExists checks if a file exists 397func (op *Operations) FileExists(path string) bool { 398 _, err := os.Stat(path) 399 return err == nil 400} 401 402// GetFileSize returns the size of a file 403func (op *Operations) GetFileSize(path string) (int64, error) { 404 info, err := os.Stat(path) 405 if err != nil { 406 return 0, err 407 } 408 return info.Size(), nil 409} 410 411// ExtractUniqueDIDs extracts unique DIDs from operations 412func (op *Operations) ExtractUniqueDIDs(operations []plcclient.PLCOperation) []string { 413 didSet := make(map[string]bool) 414 for _, operation := range operations { 415 didSet[operation.DID] = true 416 } 417 418 dids := make([]string, 0, len(didSet)) 419 for did := range didSet { 420 dids = append(dids, did) 421 } 422 423 return dids 424} 425 426// GetBoundaryCIDs returns CIDs that share the same timestamp as the last operation 427func (op *Operations) GetBoundaryCIDs(operations []plcclient.PLCOperation) (time.Time, map[string]bool) { 428 if len(operations) == 0 { 429 return time.Time{}, nil 430 } 431 432 lastOp := operations[len(operations)-1] 433 boundaryTime := lastOp.CreatedAt 434 cidSet := make(map[string]bool) 435 436 // Walk backwards from the end 437 for i := len(operations) - 1; i >= 0; i-- { 438 op := operations[i] 439 if op.CreatedAt.Equal(boundaryTime) { 440 cidSet[op.CID] = true 441 } else { 442 break 443 } 444 } 445 446 return boundaryTime, cidSet 447} 448 449// StripBoundaryDuplicates removes operations that are in prevBoundaryCIDs 450func (op *Operations) StripBoundaryDuplicates(operations []plcclient.PLCOperation, boundaryTimestamp string, prevBoundaryCIDs map[string]bool) []plcclient.PLCOperation { 451 if len(operations) == 0 || len(prevBoundaryCIDs) == 0 { 452 return operations 453 } 454 455 boundaryTime, err := time.Parse(time.RFC3339Nano, boundaryTimestamp) 456 if err != nil { 457 return operations 458 } 459 460 startIdx := 0 461 for startIdx < len(operations) { 462 op := operations[startIdx] 463 464 if op.CreatedAt.After(boundaryTime) { 465 break 466 } 467 468 if op.CreatedAt.Equal(boundaryTime) && prevBoundaryCIDs[op.CID] { 469 startIdx++ 470 continue 471 } 472 473 break 474 } 475 476 return operations[startIdx:] 477} 478 479// Pool for scanner buffers 480var scannerBufPool = sync.Pool{ 481 New: func() interface{} { 482 buf := make([]byte, 64*1024) 483 return &buf 484 }, 485} 486 487// ======================================== 488// POSITION-BASED LOADING (with frame index) 489// ======================================== 490 491// LoadOperationAtPosition loads a single operation from a bundle 492func (op *Operations) LoadOperationAtPosition(path string, position int) (*plcclient.PLCOperation, error) { 493 if position < 0 { 494 return nil, fmt.Errorf("invalid position: %d", position) 495 } 496 497 // Try multiple sources for frame index (no goto!) 498 frameOffsets, err := op.loadFrameIndex(path) 499 if err != nil { 500 // No frame index available - use legacy full scan 501 if op.logger != nil { 502 op.logger.Printf("No frame index found for %s, using legacy scan", filepath.Base(path)) 503 } 504 return op.loadOperationAtPositionLegacy(path, position) 505 } 506 507 // We have frame index - use it for fast random access 508 return op.loadOperationFromFrame(path, position, frameOffsets) 509} 510 511// loadFrameIndex loads frame offsets and converts to absolute positions 512func (op *Operations) loadFrameIndex(path string) ([]int64, error) { 513 // Try embedded metadata first 514 meta, err := op.ExtractMetadataFromFile(path) 515 if err == nil && len(meta.FrameOffsets) > 0 { 516 // Convert relative offsets to absolute 517 // First, get metadata frame size by re-reading 518 file, _ := os.Open(path) 519 if file != nil { 520 defer file.Close() 521 522 // Read metadata frame to find where data starts 523 magic, data, readErr := op.ReadSkippableFrame(file) 524 if readErr == nil && magic == SkippableMagicMetadata { 525 // Metadata frame size = 4 (magic) + 4 (size) + len(data) 526 metadataFrameSize := int64(8 + len(data)) 527 528 // Convert relative to absolute 529 absoluteOffsets := make([]int64, len(meta.FrameOffsets)) 530 for i, relOffset := range meta.FrameOffsets { 531 absoluteOffsets[i] = metadataFrameSize + relOffset 532 } 533 534 return absoluteOffsets, nil 535 } 536 } 537 } 538 539 // Fallback to external .idx file 540 indexPath := path + ".idx" 541 indexData, err := os.ReadFile(indexPath) 542 if err != nil { 543 return nil, fmt.Errorf("no frame index available: %w", err) 544 } 545 546 var offsets []int64 547 if err := json.Unmarshal(indexData, &offsets); err != nil { 548 return nil, fmt.Errorf("invalid frame index: %w", err) 549 } 550 551 return offsets, nil 552} 553 554// loadOperationFromFrame loads operation using frame index 555func (op *Operations) loadOperationFromFrame(path string, position int, frameOffsets []int64) (*plcclient.PLCOperation, error) { 556 frameIndex := position / FrameSize 557 lineInFrame := position % FrameSize 558 559 if frameIndex >= len(frameOffsets)-1 { 560 return nil, fmt.Errorf("position %d out of bounds (frame %d, total frames %d)", 561 position, frameIndex, len(frameOffsets)-1) 562 } 563 564 startOffset := frameOffsets[frameIndex] 565 endOffset := frameOffsets[frameIndex+1] 566 frameLength := endOffset - startOffset 567 568 if frameLength <= 0 || frameLength > 10*1024*1024 { 569 return nil, fmt.Errorf("invalid frame length: %d (offsets: %d-%d)", 570 frameLength, startOffset, endOffset) 571 } 572 573 bundleFile, err := os.Open(path) 574 if err != nil { 575 return nil, fmt.Errorf("failed to open bundle: %w", err) 576 } 577 defer bundleFile.Close() 578 579 compressedFrame := make([]byte, frameLength) 580 _, err = bundleFile.ReadAt(compressedFrame, startOffset) 581 if err != nil { 582 return nil, fmt.Errorf("failed to read frame %d (offset %d, length %d): %w", 583 frameIndex, startOffset, frameLength, err) 584 } 585 586 // Decompress 587 decompressed, err := DecompressFrame(compressedFrame) 588 if err != nil { 589 if op.logger != nil { 590 preview := compressedFrame 591 if len(preview) > 16 { 592 preview = preview[:16] 593 } 594 if op.verbose { 595 op.logger.Printf("DEBUG: Failed frame data (first 16 bytes): % x", preview) 596 } 597 } 598 return nil, fmt.Errorf("failed to decompress frame %d: %w", frameIndex, err) 599 } 600 601 // Scan to find the line 602 scanner := bufio.NewScanner(bytes.NewReader(decompressed)) 603 lineNum := 0 604 605 for scanner.Scan() { 606 if lineNum == lineInFrame { 607 line := scanner.Bytes() 608 var operation plcclient.PLCOperation 609 if err := json.UnmarshalNoEscape(line, &operation); err != nil { 610 return nil, fmt.Errorf("failed to parse operation at position %d: %w", position, err) 611 } 612 operation.RawJSON = make([]byte, len(line)) 613 copy(operation.RawJSON, line) 614 return &operation, nil 615 } 616 lineNum++ 617 } 618 619 if err := scanner.Err(); err != nil { 620 return nil, fmt.Errorf("scanner error on frame %d: %w", frameIndex, err) 621 } 622 623 return nil, fmt.Errorf("position %d not found in frame %d", position, frameIndex) 624} 625 626// loadOperationAtPositionLegacy loads operation from old single-frame bundles 627func (op *Operations) loadOperationAtPositionLegacy(path string, position int) (*plcclient.PLCOperation, error) { 628 file, err := os.Open(path) 629 if err != nil { 630 return nil, fmt.Errorf("failed to open file: %w", err) 631 } 632 defer file.Close() 633 634 // Use abstracted streaming reader 635 reader, err := NewStreamingReader(file) 636 if err != nil { 637 return nil, fmt.Errorf("failed to create reader: %w", err) 638 } 639 defer reader.Release() 640 641 scanner := bufio.NewScanner(reader) 642 buf := make([]byte, 512*1024) 643 scanner.Buffer(buf, 1024*1024) 644 645 lineNum := 0 646 for scanner.Scan() { 647 if lineNum == position { 648 line := scanner.Bytes() 649 var operation plcclient.PLCOperation 650 if err := json.UnmarshalNoEscape(line, &operation); err != nil { 651 return nil, fmt.Errorf("failed to parse operation at position %d: %w", position, err) 652 } 653 operation.RawJSON = make([]byte, len(line)) 654 copy(operation.RawJSON, line) 655 return &operation, nil 656 } 657 lineNum++ 658 } 659 660 if err := scanner.Err(); err != nil { 661 return nil, fmt.Errorf("scanner error: %w", err) 662 } 663 664 return nil, fmt.Errorf("position %d not found in bundle", position) 665} 666 667// LoadOperationsAtPositions loads multiple operations from a bundle in one pass 668func (op *Operations) LoadOperationsAtPositions(path string, positions []int) (map[int]*plcclient.PLCOperation, error) { 669 if len(positions) == 0 { 670 return make(map[int]*plcclient.PLCOperation), nil 671 } 672 673 // Create position set for fast lookup 674 posSet := make(map[int]bool) 675 maxPos := 0 676 for _, pos := range positions { 677 if pos < 0 { 678 continue 679 } 680 posSet[pos] = true 681 if pos > maxPos { 682 maxPos = pos 683 } 684 } 685 686 file, err := os.Open(path) 687 if err != nil { 688 return nil, fmt.Errorf("failed to open file: %w", err) 689 } 690 defer file.Close() 691 692 // Use abstracted streaming reader 693 reader, err := NewStreamingReader(file) 694 if err != nil { 695 return nil, fmt.Errorf("failed to create reader: %w", err) 696 } 697 defer reader.Release() 698 699 bufPtr := scannerBufPool.Get().(*[]byte) 700 defer scannerBufPool.Put(bufPtr) 701 702 scanner := bufio.NewScanner(reader) 703 scanner.Buffer(*bufPtr, 512*1024) 704 705 results := make(map[int]*plcclient.PLCOperation) 706 lineNum := 0 707 708 for scanner.Scan() { 709 // Early exit if we found everything 710 if len(results) == len(posSet) { 711 break 712 } 713 714 // Only parse if this position is requested 715 if posSet[lineNum] { 716 line := scanner.Bytes() 717 var operation plcclient.PLCOperation 718 if err := json.UnmarshalNoEscape(line, &operation); err != nil { 719 return nil, fmt.Errorf("failed to parse operation at position %d: %w", lineNum, err) 720 } 721 722 operation.RawJSON = make([]byte, len(line)) 723 copy(operation.RawJSON, line) 724 results[lineNum] = &operation 725 } 726 727 lineNum++ 728 729 // Early exit if we passed the max position 730 if lineNum > maxPos { 731 break 732 } 733 } 734 735 if err := scanner.Err(); err != nil { 736 return nil, fmt.Errorf("scanner error: %w", err) 737 } 738 739 return results, nil 740} 741 742// CalculateMetadataWithoutLoading calculates metadata by streaming (no full load) 743func (op *Operations) CalculateMetadataWithoutLoading(path string) (opCount int, didCount int, startTime, endTime time.Time, err error) { 744 file, err := os.Open(path) 745 if err != nil { 746 return 0, 0, time.Time{}, time.Time{}, err 747 } 748 defer file.Close() 749 750 // Use abstracted reader 751 reader, err := NewStreamingReader(file) 752 if err != nil { 753 return 0, 0, time.Time{}, time.Time{}, fmt.Errorf("failed to create reader: %w", err) 754 } 755 defer reader.Release() 756 757 scanner := bufio.NewScanner(reader) 758 buf := make([]byte, 64*1024) 759 scanner.Buffer(buf, 1024*1024) 760 761 didSet := make(map[string]bool) 762 lineNum := 0 763 764 for scanner.Scan() { 765 line := scanner.Bytes() 766 if len(line) == 0 { 767 continue 768 } 769 770 // Only parse minimal fields needed for metadata 771 var op struct { 772 DID string `json:"did"` 773 CreatedAt time.Time `json:"createdAt"` 774 } 775 776 if err := json.Unmarshal(line, &op); err != nil { 777 continue 778 } 779 780 if lineNum == 0 { 781 startTime = op.CreatedAt 782 } 783 endTime = op.CreatedAt 784 785 didSet[op.DID] = true 786 lineNum++ 787 } 788 789 return lineNum, len(didSet), startTime, endTime, scanner.Err() 790} 791 792// ExtractBundleMetadata extracts metadata from bundle file without decompressing 793func (op *Operations) ExtractBundleMetadata(path string) (*BundleMetadata, error) { 794 meta, err := op.ExtractMetadataFromFile(path) 795 if err != nil { 796 return nil, fmt.Errorf("failed to extract metadata: %w", err) 797 } 798 return meta, nil 799} 800 801// LoadBundleWithMetadata loads bundle and returns both data and embedded metadata 802func (op *Operations) LoadBundleWithMetadata(path string) ([]plcclient.PLCOperation, *BundleMetadata, error) { 803 file, err := os.Open(path) 804 if err != nil { 805 return nil, nil, fmt.Errorf("failed to open file: %w", err) 806 } 807 defer file.Close() 808 809 // 1. Try to read metadata frame first 810 meta, err := op.ReadMetadataFrame(file) 811 if err != nil { 812 // No metadata frame - fall back to regular load 813 file.Seek(0, io.SeekStart) // Reset to beginning 814 ops, err := op.loadFromReader(file) 815 return ops, nil, err 816 } 817 818 // 2. Read compressed data (file position is now after metadata frame) 819 ops, err := op.loadFromReader(file) 820 if err != nil { 821 return nil, nil, err 822 } 823 824 return ops, meta, nil 825} 826 827// loadFromReader loads operations from a reader (internal helper) 828func (op *Operations) loadFromReader(r io.Reader) ([]plcclient.PLCOperation, error) { 829 reader, err := NewStreamingReader(r) 830 if err != nil { 831 return nil, fmt.Errorf("failed to create reader: %w", err) 832 } 833 defer reader.Release() 834 835 decompressed, err := io.ReadAll(reader) 836 if err != nil { 837 return nil, fmt.Errorf("failed to decompress: %w", err) 838 } 839 840 return op.ParseJSONL(decompressed) 841}