tangled
alpha
login
or
join now
atscan.net
/
plcbundle-go
1
fork
atom
[DEPRECATED] Go implementation of plcbundle
1
fork
atom
overview
issues
pulls
pipelines
update
tree.fail
4 months ago
3acc6c5a
d864b4af
+147
-16
4 changed files
expand all
collapse all
unified
split
bundle
metadata.go
scanner.go
internal
didindex
builder.go
storage
storage.go
+84
bundle/metadata.go
···
1
1
package bundle
2
2
3
3
import (
4
4
+
"bufio"
5
5
+
"encoding/json"
4
6
"fmt"
5
7
"os"
6
8
"time"
7
9
10
10
+
gozstd "github.com/DataDog/zstd"
8
11
"tangled.org/atscan.net/plcbundle/internal/bundleindex"
9
12
"tangled.org/atscan.net/plcbundle/internal/plcclient"
10
13
)
···
88
91
CreatedAt: time.Now().UTC(),
89
92
}, nil
90
93
}
94
94
+
95
95
+
// CalculateMetadataStreaming calculates metadata by streaming (NO full load)
96
96
+
func (m *Manager) CalculateMetadataStreaming(bundleNumber int, path string) (*bundleindex.BundleMetadata, error) {
97
97
+
// STEP 1: Stream to get times + counts (minimal memory)
98
98
+
opCount, didCount, startTime, endTime, err := m.streamBundleInfo(path)
99
99
+
if err != nil {
100
100
+
return nil, err
101
101
+
}
102
102
+
103
103
+
// STEP 2: Calculate hashes from file
104
104
+
compressedHash, compressedSize, contentHash, contentSize, err := m.operations.CalculateFileHashes(path)
105
105
+
if err != nil {
106
106
+
return nil, err
107
107
+
}
108
108
+
109
109
+
return &bundleindex.BundleMetadata{
110
110
+
BundleNumber: bundleNumber,
111
111
+
StartTime: startTime,
112
112
+
EndTime: endTime,
113
113
+
OperationCount: opCount,
114
114
+
DIDCount: didCount,
115
115
+
Hash: "", // Calculated later in sequential phase
116
116
+
ContentHash: contentHash,
117
117
+
Parent: "", // Calculated later
118
118
+
CompressedHash: compressedHash,
119
119
+
CompressedSize: compressedSize,
120
120
+
UncompressedSize: contentSize,
121
121
+
Cursor: "",
122
122
+
CreatedAt: time.Now().UTC(),
123
123
+
}, nil
124
124
+
}
125
125
+
126
126
+
// streamBundleInfo extracts metadata by streaming (minimal memory)
127
127
+
func (m *Manager) streamBundleInfo(path string) (opCount, didCount int, startTime, endTime time.Time, err error) {
128
128
+
file, err := os.Open(path)
129
129
+
if err != nil {
130
130
+
return 0, 0, time.Time{}, time.Time{}, err
131
131
+
}
132
132
+
defer file.Close()
133
133
+
134
134
+
reader := gozstd.NewReader(file)
135
135
+
defer reader.Close()
136
136
+
137
137
+
scanner := bufio.NewScanner(reader)
138
138
+
buf := make([]byte, 64*1024)
139
139
+
scanner.Buffer(buf, 1024*1024)
140
140
+
141
141
+
didSet := make(map[string]bool)
142
142
+
lineNum := 0
143
143
+
144
144
+
for scanner.Scan() {
145
145
+
line := scanner.Bytes()
146
146
+
if len(line) == 0 {
147
147
+
continue
148
148
+
}
149
149
+
150
150
+
// Only parse minimal fields (DID + time)
151
151
+
var op struct {
152
152
+
DID string `json:"did"`
153
153
+
CreatedAt time.Time `json:"createdAt"`
154
154
+
}
155
155
+
156
156
+
if err := json.Unmarshal(line, &op); err != nil {
157
157
+
continue
158
158
+
}
159
159
+
160
160
+
if lineNum == 0 {
161
161
+
startTime = op.CreatedAt
162
162
+
}
163
163
+
endTime = op.CreatedAt
164
164
+
165
165
+
didSet[op.DID] = true
166
166
+
lineNum++
167
167
+
}
168
168
+
169
169
+
if err := scanner.Err(); err != nil {
170
170
+
return 0, 0, time.Time{}, time.Time{}, err
171
171
+
}
172
172
+
173
173
+
return lineNum, len(didSet), startTime, endTime, nil
174
174
+
}
+2
-9
bundle/scanner.go
···
205
205
for num := range jobs {
206
206
path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", num))
207
207
208
208
-
// Load and process bundle
209
209
-
ops, err := m.operations.LoadBundle(path)
210
210
-
if err != nil {
211
211
-
results <- bundleResult{index: num, err: err}
212
212
-
continue
213
213
-
}
214
214
-
215
215
-
// Use the FAST method (cursor will be set later in sequential phase)
216
216
-
meta, err := m.CalculateBundleMetadataFast(num, path, ops, "")
208
208
+
// ✅ NEW: Stream metadata WITHOUT loading all operations
209
209
+
meta, err := m.CalculateMetadataStreaming(num, path)
217
210
if err != nil {
218
211
results <- bundleResult{index: num, err: err}
219
212
continue
+4
internal/didindex/builder.go
···
50
50
return fmt.Errorf("no bundles to index")
51
51
}
52
52
53
53
+
if err := os.MkdirAll(dim.shardDir, 0755); err != nil {
54
54
+
return fmt.Errorf("failed to create shard directory: %w", err)
55
55
+
}
56
56
+
53
57
// Create temporary shard files
54
58
tempShards := make([]*os.File, DID_SHARD_COUNT)
55
59
for i := 0; i < DID_SHARD_COUNT; i++ {
+57
-7
internal/storage/storage.go
···
89
89
90
90
// LoadBundle loads a compressed bundle
91
91
func (op *Operations) LoadBundle(path string) ([]plcclient.PLCOperation, error) {
92
92
-
// 1. Read the entire compressed file into memory.
93
93
-
compressed, err := os.ReadFile(path)
92
92
+
// ✅ FIX: Use streaming reader instead of one-shot Decompress
93
93
+
file, err := os.Open(path)
94
94
if err != nil {
95
95
-
return nil, fmt.Errorf("failed to read file: %w", err)
95
95
+
return nil, fmt.Errorf("failed to open file: %w", err)
96
96
}
97
97
+
defer file.Close()
97
98
98
98
-
// This is the key: The one-shot Decompress function is designed to correctly
99
99
-
// handle a byte slice containing one or more concatenated frames.
100
100
-
decompressed, err := gozstd.Decompress(nil, compressed)
99
99
+
// NewReader properly handles multi-frame concatenated zstd
100
100
+
reader := gozstd.NewReader(file)
101
101
+
defer reader.Close()
102
102
+
103
103
+
// Read ALL decompressed data
104
104
+
decompressed, err := io.ReadAll(reader)
101
105
if err != nil {
102
106
return nil, fmt.Errorf("failed to decompress: %w", err)
103
107
}
104
108
105
105
-
// 3. Parse the fully decompressed JSONL data.
109
109
+
// Parse JSONL
106
110
return op.ParseJSONL(decompressed)
107
111
}
108
112
···
571
575
572
576
return results, nil
573
577
}
578
578
+
579
579
+
// CalculateMetadataWithoutLoading calculates metadata by streaming (no full load)
580
580
+
func (op *Operations) CalculateMetadataWithoutLoading(path string) (opCount int, didCount int, startTime, endTime time.Time, err error) {
581
581
+
file, err := os.Open(path)
582
582
+
if err != nil {
583
583
+
return 0, 0, time.Time{}, time.Time{}, err
584
584
+
}
585
585
+
defer file.Close()
586
586
+
587
587
+
reader := gozstd.NewReader(file)
588
588
+
defer reader.Close()
589
589
+
590
590
+
scanner := bufio.NewScanner(reader)
591
591
+
buf := make([]byte, 64*1024)
592
592
+
scanner.Buffer(buf, 1024*1024)
593
593
+
594
594
+
didSet := make(map[string]bool)
595
595
+
lineNum := 0
596
596
+
597
597
+
for scanner.Scan() {
598
598
+
line := scanner.Bytes()
599
599
+
if len(line) == 0 {
600
600
+
continue
601
601
+
}
602
602
+
603
603
+
// Only parse minimal fields needed for metadata
604
604
+
var op struct {
605
605
+
DID string `json:"did"`
606
606
+
CreatedAt time.Time `json:"createdAt"`
607
607
+
}
608
608
+
609
609
+
if err := json.Unmarshal(line, &op); err != nil {
610
610
+
continue
611
611
+
}
612
612
+
613
613
+
if lineNum == 0 {
614
614
+
startTime = op.CreatedAt
615
615
+
}
616
616
+
endTime = op.CreatedAt
617
617
+
618
618
+
didSet[op.DID] = true
619
619
+
lineNum++
620
620
+
}
621
621
+
622
622
+
return lineNum, len(didSet), startTime, endTime, scanner.Err()
623
623
+
}