[DEPRECATED] Go implementation of plcbundle
at main 178 lines 5.2 kB view raw
1package bundle 2 3import ( 4 "bufio" 5 "encoding/json" 6 "fmt" 7 "os" 8 "time" 9 10 "tangled.org/atscan.net/plcbundle-go/internal/bundleindex" 11 "tangled.org/atscan.net/plcbundle-go/internal/plcclient" 12 "tangled.org/atscan.net/plcbundle-go/internal/storage" 13) 14 15// CalculateBundleMetadata calculates complete metadata for a bundle 16func (m *Manager) CalculateBundleMetadata(bundleNumber int, path string, operations []plcclient.PLCOperation, parent string, cursor string) (*bundleindex.BundleMetadata, error) { 17 if len(operations) == 0 { 18 return nil, fmt.Errorf("bundle is empty") 19 } 20 21 // Get file info 22 info, err := os.Stat(path) 23 if err != nil { 24 return nil, fmt.Errorf("failed to stat file: %w", err) 25 } 26 27 // Extract unique DIDs 28 dids := m.operations.ExtractUniqueDIDs(operations) 29 30 // Serialize to JSONL and calculate content hash 31 jsonlData := m.operations.SerializeJSONL(operations) 32 contentSize := int64(len(jsonlData)) 33 contentHash := m.operations.Hash(jsonlData) 34 35 // Read compressed file and calculate compressed hash 36 compressedData, err := os.ReadFile(path) 37 if err != nil { 38 return nil, fmt.Errorf("failed to read compressed file: %w", err) 39 } 40 compressedHash := m.operations.Hash(compressedData) 41 compressedSize := info.Size() 42 43 // Calculate chain hash 44 chainHash := m.operations.CalculateChainHash(parent, contentHash) 45 46 return &bundleindex.BundleMetadata{ 47 BundleNumber: bundleNumber, 48 StartTime: operations[0].CreatedAt, 49 EndTime: operations[len(operations)-1].CreatedAt, 50 OperationCount: len(operations), 51 DIDCount: len(dids), 52 Hash: chainHash, 53 ContentHash: contentHash, 54 Parent: parent, 55 CompressedHash: compressedHash, 56 CompressedSize: compressedSize, 57 UncompressedSize: contentSize, 58 Cursor: cursor, 59 CreatedAt: time.Now().UTC(), 60 }, nil 61} 62 63// CalculateBundleMetadataFast calculates metadata quickly without chain hash 64func (m *Manager) CalculateBundleMetadataFast(bundleNumber int, path string, operations []plcclient.PLCOperation, cursor string) (*bundleindex.BundleMetadata, error) { 65 if len(operations) == 0 { 66 return nil, fmt.Errorf("bundle is empty") 67 } 68 69 // Calculate hashes efficiently 70 compressedHash, compressedSize, contentHash, contentSize, err := m.operations.CalculateFileHashes(path) 71 if err != nil { 72 return nil, err 73 } 74 75 // Extract unique DIDs 76 dids := m.operations.ExtractUniqueDIDs(operations) 77 78 return &bundleindex.BundleMetadata{ 79 BundleNumber: bundleNumber, 80 StartTime: operations[0].CreatedAt, 81 EndTime: operations[len(operations)-1].CreatedAt, 82 OperationCount: len(operations), 83 DIDCount: len(dids), 84 Hash: "", 85 ContentHash: contentHash, 86 Parent: "", 87 CompressedHash: compressedHash, 88 CompressedSize: compressedSize, 89 UncompressedSize: contentSize, 90 Cursor: cursor, 91 CreatedAt: time.Now().UTC(), 92 }, nil 93} 94 95// CalculateMetadataStreaming calculates metadata by streaming (NO full load) 96func (m *Manager) CalculateMetadataStreaming(bundleNumber int, path string) (*bundleindex.BundleMetadata, error) { 97 // STEP 1: Stream to get times + counts (minimal memory) 98 opCount, didCount, startTime, endTime, err := m.streamBundleInfo(path) 99 if err != nil { 100 return nil, err 101 } 102 103 // STEP 2: Calculate hashes from file 104 compressedHash, compressedSize, contentHash, contentSize, err := m.operations.CalculateFileHashes(path) 105 if err != nil { 106 return nil, err 107 } 108 109 return &bundleindex.BundleMetadata{ 110 BundleNumber: bundleNumber, 111 StartTime: startTime, 112 EndTime: endTime, 113 OperationCount: opCount, 114 DIDCount: didCount, 115 Hash: "", // Calculated later in sequential phase 116 ContentHash: contentHash, 117 Parent: "", // Calculated later 118 CompressedHash: compressedHash, 119 CompressedSize: compressedSize, 120 UncompressedSize: contentSize, 121 Cursor: "", 122 CreatedAt: time.Now().UTC(), 123 }, nil 124} 125 126// streamBundleInfo extracts metadata by streaming (minimal memory) 127func (m *Manager) streamBundleInfo(path string) (opCount, didCount int, startTime, endTime time.Time, err error) { 128 file, err := os.Open(path) 129 if err != nil { 130 return 0, 0, time.Time{}, time.Time{}, err 131 } 132 defer file.Close() 133 134 // Use abstracted reader from storage package 135 reader, err := storage.NewStreamingReader(file) 136 if err != nil { 137 return 0, 0, time.Time{}, time.Time{}, fmt.Errorf("failed to create reader: %w", err) 138 } 139 defer reader.Release() 140 141 scanner := bufio.NewScanner(reader) 142 buf := make([]byte, 64*1024) 143 scanner.Buffer(buf, 1024*1024) 144 145 didSet := make(map[string]bool) 146 lineNum := 0 147 148 for scanner.Scan() { 149 line := scanner.Bytes() 150 if len(line) == 0 { 151 continue 152 } 153 154 // Only parse minimal fields (DID + time) 155 var op struct { 156 DID string `json:"did"` 157 CreatedAt time.Time `json:"createdAt"` 158 } 159 160 if err := json.Unmarshal(line, &op); err != nil { 161 continue 162 } 163 164 if lineNum == 0 { 165 startTime = op.CreatedAt 166 } 167 endTime = op.CreatedAt 168 169 didSet[op.DID] = true 170 lineNum++ 171 } 172 173 if err := scanner.Err(); err != nil { 174 return 0, 0, time.Time{}, time.Time{}, err 175 } 176 177 return lineNum, len(didSet), startTime, endTime, nil 178}