tangled
alpha
login
or
join now
atscan.net
/
plcbundle-go
1
fork
atom
[DEPRECATED] Go implementation of plcbundle
1
fork
atom
overview
issues
pulls
pipelines
update structure (4)
tree.fail
4 months ago
aa875f24
dfd8f54f
+971
-811
16 changed files
expand all
collapse all
unified
split
cmd
plcbundle
compare.go
info.go
main.go
internal
bundle
bundle_test.go
bundler.go
manager.go
metadata.go
scanner.go
types.go
bundleindex
bundleindex.go
index.go
metadata.go
types.go
sync
bundler.go
cloner.go
fetcher.go
+8
-7
cmd/plcbundle/compare.go
···
12
12
13
13
"github.com/goccy/go-json"
14
14
"tangled.org/atscan.net/plcbundle/internal/bundle"
15
15
+
"tangled.org/atscan.net/plcbundle/internal/bundleindex"
15
16
)
16
17
17
18
// IndexComparison holds comparison results
···
45
46
}
46
47
47
48
// loadTargetIndex loads an index from a file or URL
48
48
-
func loadTargetIndex(target string) (*bundle.Index, error) {
49
49
+
func loadTargetIndex(target string) (*bundleindex.Index, error) {
49
50
if strings.HasPrefix(target, "http://") || strings.HasPrefix(target, "https://") {
50
51
// Load from URL
51
52
return loadIndexFromURL(target)
52
53
}
53
54
54
55
// Load from file
55
55
-
return bundle.LoadIndex(target)
56
56
+
return bundleindex.LoadIndex(target)
56
57
}
57
58
58
59
// loadIndexFromURL downloads and parses an index from a URL
59
59
-
func loadIndexFromURL(url string) (*bundle.Index, error) {
60
60
+
func loadIndexFromURL(url string) (*bundleindex.Index, error) {
60
61
// Smart URL handling - if it doesn't end with .json, append /index.json
61
62
if !strings.HasSuffix(url, ".json") {
62
63
url = strings.TrimSuffix(url, "/") + "/index.json"
···
81
82
return nil, fmt.Errorf("failed to read response: %w", err)
82
83
}
83
84
84
84
-
var idx bundle.Index
85
85
+
var idx bundleindex.Index
85
86
if err := json.Unmarshal(data, &idx); err != nil {
86
87
return nil, fmt.Errorf("failed to parse index: %w", err)
87
88
}
···
90
91
}
91
92
92
93
// compareIndexes compares two indexes
93
93
-
func compareIndexes(local, target *bundle.Index) *IndexComparison {
94
94
+
func compareIndexes(local, target *bundleindex.Index) *IndexComparison {
94
95
localBundles := local.GetBundles()
95
96
targetBundles := target.GetBundles()
96
97
97
98
// Create maps for quick lookup
98
98
-
localMap := make(map[int]*bundle.BundleMetadata)
99
99
-
targetMap := make(map[int]*bundle.BundleMetadata)
99
99
+
localMap := make(map[int]*bundleindex.BundleMetadata)
100
100
+
targetMap := make(map[int]*bundleindex.BundleMetadata)
100
101
101
102
for _, b := range localBundles {
102
103
localMap[b.BundleNumber] = b
+2
-1
cmd/plcbundle/info.go
···
10
10
"time"
11
11
12
12
"tangled.org/atscan.net/plcbundle/internal/bundle"
13
13
+
"tangled.org/atscan.net/plcbundle/internal/bundleindex"
13
14
"tangled.org/atscan.net/plcbundle/internal/types"
14
15
)
15
16
···
252
253
}
253
254
}
254
255
255
255
-
func visualizeTimeline(index *bundle.Index, verbose bool) {
256
256
+
func visualizeTimeline(index *bundleindex.Index, verbose bool) {
256
257
bundles := index.GetBundles()
257
258
if len(bundles) == 0 {
258
259
return
+11
-9
cmd/plcbundle/main.go
···
19
19
"github.com/goccy/go-json"
20
20
21
21
"tangled.org/atscan.net/plcbundle/internal/bundle"
22
22
+
"tangled.org/atscan.net/plcbundle/internal/bundleindex"
22
23
"tangled.org/atscan.net/plcbundle/internal/didindex"
24
24
+
internalsync "tangled.org/atscan.net/plcbundle/internal/sync"
23
25
"tangled.org/atscan.net/plcbundle/internal/types"
24
26
"tangled.org/atscan.net/plcbundle/plcclient"
25
27
)
···
276
278
fs.PrintDefaults()
277
279
fmt.Fprintf(os.Stderr, "\nExample:\n")
278
280
fmt.Fprintf(os.Stderr, " plcbundle clone https://plc.example.com\n")
279
279
-
fmt.Fprintf(os.Stderr, " plcbundle clone https://plc.example.com --workers 8\n")
280
281
os.Exit(1)
281
282
}
282
283
···
302
303
sigChan := make(chan os.Signal, 1)
303
304
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
304
305
305
305
-
// Set up progress bar with interrupt tracking
306
306
+
// Set up progress bar
306
307
var progress *ProgressBar
307
308
var progressMu sync.Mutex
308
308
-
progressActive := true // Track if progress should be updated
309
309
+
progressActive := true
309
310
310
311
go func() {
311
312
<-sigChan
312
312
-
// Stop progress updates immediately
313
313
progressMu.Lock()
314
314
progressActive = false
315
315
if progress != nil {
316
316
-
fmt.Println() // Move to new line after progress bar
316
316
+
fmt.Println()
317
317
}
318
318
progressMu.Unlock()
319
319
···
322
322
}()
323
323
324
324
// Clone with library
325
325
-
result, err := mgr.CloneFromRemote(ctx, bundle.CloneOptions{
325
325
+
result, err := mgr.CloneFromRemote(ctx, internalsync.CloneOptions{
326
326
RemoteURL: remoteURL,
327
327
Workers: *workers,
328
328
SkipExisting: *skipExisting,
···
348
348
// Ensure progress is stopped
349
349
progressMu.Lock()
350
350
progressActive = false
351
351
+
if progress != nil {
352
352
+
progress.Finish()
353
353
+
}
351
354
progressMu.Unlock()
352
355
353
356
if err != nil {
···
391
394
}
392
395
fmt.Printf("%06d", num)
393
396
}
394
394
-
fmt.Printf("\n")
395
395
-
fmt.Printf("Re-run the clone command to retry failed bundles.\n")
397
397
+
fmt.Printf("\nRe-run the clone command to retry failed bundles.\n")
396
398
os.Exit(1)
397
399
}
398
400
···
513
515
fmt.Printf(" Throughput (uncompressed): %.1f MB/s\n", uncompressedThroughput)
514
516
}
515
517
516
516
-
fmt.Printf(" Index file: %s\n", filepath.Join(dir, bundle.INDEX_FILE))
518
518
+
fmt.Printf(" Index file: %s\n", filepath.Join(dir, bundleindex.INDEX_FILE))
517
519
518
520
if len(result.MissingGaps) > 0 {
519
521
fmt.Printf(" ⚠️ Missing gaps: %d bundles\n", len(result.MissingGaps))
+11
-10
internal/bundle/bundle_test.go
···
6
6
"time"
7
7
8
8
"tangled.org/atscan.net/plcbundle/internal/bundle"
9
9
+
"tangled.org/atscan.net/plcbundle/internal/bundleindex"
9
10
"tangled.org/atscan.net/plcbundle/internal/mempool"
10
11
"tangled.org/atscan.net/plcbundle/internal/storage"
11
12
"tangled.org/atscan.net/plcbundle/internal/types"
···
15
16
// TestIndex tests index operations
16
17
func TestIndex(t *testing.T) {
17
18
t.Run("CreateNewIndex", func(t *testing.T) {
18
18
-
idx := bundle.NewIndex("test-origin")
19
19
+
idx := bundleindex.NewIndex("test-origin")
19
20
if idx == nil {
20
21
t.Fatal("NewIndex returned nil")
21
22
}
···
28
29
})
29
30
30
31
t.Run("AddBundle", func(t *testing.T) {
31
31
-
idx := bundle.NewIndex("test-origin")
32
32
-
meta := &bundle.BundleMetadata{
32
32
+
idx := bundleindex.NewIndex("test-origin")
33
33
+
meta := &bundleindex.BundleMetadata{
33
34
BundleNumber: 1,
34
35
StartTime: time.Now(),
35
36
EndTime: time.Now().Add(time.Hour),
···
59
60
indexPath := filepath.Join(tmpDir, "test_index.json")
60
61
61
62
// Create and save
62
62
-
idx := bundle.NewIndex("test-origin")
63
63
-
idx.AddBundle(&bundle.BundleMetadata{
63
63
+
idx := bundleindex.NewIndex("test-origin")
64
64
+
idx.AddBundle(&bundleindex.BundleMetadata{
64
65
BundleNumber: 1,
65
66
StartTime: time.Now(),
66
67
EndTime: time.Now().Add(time.Hour),
···
73
74
}
74
75
75
76
// Load
76
76
-
loaded, err := bundle.LoadIndex(indexPath)
77
77
+
loaded, err := bundleindex.LoadIndex(indexPath)
77
78
if err != nil {
78
79
t.Fatalf("LoadIndex failed: %v", err)
79
80
}
···
84
85
})
85
86
86
87
t.Run("GetBundleRange", func(t *testing.T) {
87
87
-
idx := bundle.NewIndex("test-origin")
88
88
+
idx := bundleindex.NewIndex("test-origin")
88
89
for i := 1; i <= 5; i++ {
89
89
-
idx.AddBundle(&bundle.BundleMetadata{
90
90
+
idx.AddBundle(&bundleindex.BundleMetadata{
90
91
BundleNumber: i,
91
92
StartTime: time.Now(),
92
93
EndTime: time.Now().Add(time.Hour),
···
104
105
})
105
106
106
107
t.Run("FindGaps", func(t *testing.T) {
107
107
-
idx := bundle.NewIndex("test-origin")
108
108
+
idx := bundleindex.NewIndex("test-origin")
108
109
// Add bundles 1, 2, 4, 5 (missing 3)
109
110
for _, num := range []int{1, 2, 4, 5} {
110
110
-
idx.AddBundle(&bundle.BundleMetadata{
111
111
+
idx.AddBundle(&bundleindex.BundleMetadata{
111
112
BundleNumber: num,
112
113
StartTime: time.Now(),
113
114
EndTime: time.Now().Add(time.Hour),
-39
internal/bundle/bundler.go
···
1
1
-
package bundle
2
2
-
3
3
-
import (
4
4
-
"time"
5
5
-
6
6
-
"tangled.org/atscan.net/plcbundle/internal/types"
7
7
-
"tangled.org/atscan.net/plcbundle/plcclient"
8
8
-
)
9
9
-
10
10
-
// CreateBundle creates a complete bundle structure from operations
11
11
-
func (m *Manager) CreateBundle(bundleNumber int, operations []plcclient.PLCOperation, cursor string, parent string) *Bundle {
12
12
-
if len(operations) != types.BUNDLE_SIZE {
13
13
-
m.logger.Printf("Warning: bundle has %d operations, expected %d", len(operations), types.BUNDLE_SIZE)
14
14
-
}
15
15
-
16
16
-
dids := m.operations.ExtractUniqueDIDs(operations)
17
17
-
_, boundaryCIDs := m.operations.GetBoundaryCIDs(operations)
18
18
-
19
19
-
// Convert boundary CIDs map to slice
20
20
-
cidSlice := make([]string, 0, len(boundaryCIDs))
21
21
-
for cid := range boundaryCIDs {
22
22
-
cidSlice = append(cidSlice, cid)
23
23
-
}
24
24
-
25
25
-
bundle := &Bundle{
26
26
-
BundleNumber: bundleNumber,
27
27
-
StartTime: operations[0].CreatedAt,
28
28
-
EndTime: operations[len(operations)-1].CreatedAt,
29
29
-
Operations: operations,
30
30
-
DIDCount: len(dids),
31
31
-
Cursor: cursor,
32
32
-
Parent: parent,
33
33
-
BoundaryCIDs: cidSlice,
34
34
-
Compressed: true,
35
35
-
CreatedAt: time.Now().UTC(),
36
36
-
}
37
37
-
38
38
-
return bundle
39
39
-
}
+109
-96
internal/bundle/clone.go
internal/sync/cloner.go
···
1
1
-
package bundle
1
1
+
package sync
2
2
3
3
import (
4
4
"context"
···
12
12
"time"
13
13
14
14
"github.com/goccy/go-json"
15
15
+
"tangled.org/atscan.net/plcbundle/internal/bundleindex"
16
16
+
"tangled.org/atscan.net/plcbundle/internal/storage"
17
17
+
"tangled.org/atscan.net/plcbundle/internal/types"
15
18
)
16
19
17
17
-
// CloneFromRemote clones bundles from a remote HTTP endpoint
18
18
-
func (m *Manager) CloneFromRemote(ctx context.Context, opts CloneOptions) (*CloneResult, error) {
20
20
+
// Cloner handles cloning bundles from remote endpoints
21
21
+
type Cloner struct {
22
22
+
operations *storage.Operations
23
23
+
bundleDir string
24
24
+
logger types.Logger
25
25
+
}
26
26
+
27
27
+
// NewCloner creates a new cloner
28
28
+
func NewCloner(operations *storage.Operations, bundleDir string, logger types.Logger) *Cloner {
29
29
+
return &Cloner{
30
30
+
operations: operations,
31
31
+
bundleDir: bundleDir,
32
32
+
logger: logger,
33
33
+
}
34
34
+
}
35
35
+
36
36
+
// CloneOptions configures cloning behavior
37
37
+
type CloneOptions struct {
38
38
+
RemoteURL string
39
39
+
Workers int
40
40
+
SkipExisting bool
41
41
+
ProgressFunc func(downloaded, total int, bytesDownloaded, bytesTotal int64)
42
42
+
SaveInterval time.Duration
43
43
+
Verbose bool
44
44
+
Logger types.Logger
45
45
+
}
46
46
+
47
47
+
// CloneResult contains cloning results
48
48
+
type CloneResult struct {
49
49
+
RemoteBundles int
50
50
+
Downloaded int
51
51
+
Failed int
52
52
+
Skipped int
53
53
+
TotalBytes int64
54
54
+
Duration time.Duration
55
55
+
Interrupted bool
56
56
+
FailedBundles []int
57
57
+
}
58
58
+
59
59
+
// Clone performs the cloning operation
60
60
+
func (c *Cloner) Clone(
61
61
+
ctx context.Context,
62
62
+
opts CloneOptions,
63
63
+
localIndex *bundleindex.Index,
64
64
+
updateIndex func([]int, map[int]*bundleindex.BundleMetadata, bool) error,
65
65
+
) (*CloneResult, error) {
66
66
+
19
67
if opts.Workers <= 0 {
20
68
opts.Workers = 4
21
69
}
···
23
71
opts.SaveInterval = 5 * time.Second
24
72
}
25
73
if opts.Logger == nil {
26
26
-
opts.Logger = m.logger
74
74
+
opts.Logger = c.logger
27
75
}
28
76
29
77
result := &CloneResult{}
···
31
79
32
80
// Step 1: Fetch remote index
33
81
opts.Logger.Printf("Fetching remote index from %s", opts.RemoteURL)
34
34
-
remoteIndex, err := m.loadRemoteIndex(opts.RemoteURL)
82
82
+
remoteIndex, err := c.loadRemoteIndex(opts.RemoteURL)
35
83
if err != nil {
36
84
return nil, fmt.Errorf("failed to load remote index: %w", err)
37
85
}
···
45
93
result.RemoteBundles = len(remoteBundles)
46
94
opts.Logger.Printf("Remote has %d bundles", len(remoteBundles))
47
95
48
48
-
// Step 2: Determine which bundles to download
49
49
-
localBundleMap := make(map[int]*BundleMetadata)
50
50
-
for _, meta := range m.index.GetBundles() {
96
96
+
// Step 2: Determine bundles to download
97
97
+
localBundleMap := make(map[int]*bundleindex.BundleMetadata)
98
98
+
for _, meta := range localIndex.GetBundles() {
51
99
localBundleMap[meta.BundleNumber] = meta
52
100
}
53
101
54
54
-
// Create map of remote metadata for easy lookup
55
55
-
remoteBundleMap := make(map[int]*BundleMetadata)
102
102
+
remoteBundleMap := make(map[int]*bundleindex.BundleMetadata)
56
103
for _, meta := range remoteBundles {
57
104
remoteBundleMap[meta.BundleNumber] = meta
58
105
}
59
106
60
107
var bundlesToDownload []int
61
108
var totalBytes int64
109
109
+
62
110
for _, meta := range remoteBundles {
63
111
if opts.SkipExisting && localBundleMap[meta.BundleNumber] != nil {
64
112
result.Skipped++
···
78
126
79
127
opts.Logger.Printf("Downloading %d bundles (%d bytes)", len(bundlesToDownload), totalBytes)
80
128
81
81
-
// Step 3: Set up periodic index saving (using remote metadata)
129
129
+
// Step 3: Set up periodic index saving
82
130
saveCtx, saveCancel := context.WithCancel(ctx)
83
131
defer saveCancel()
84
132
···
96
144
case <-saveCtx.Done():
97
145
return
98
146
case <-ticker.C:
99
99
-
// Save index using remote metadata for downloaded bundles
100
147
downloadedMu.Lock()
101
148
bundles := make([]int, len(downloadedBundles))
102
149
copy(bundles, downloadedBundles)
···
105
152
if opts.Verbose {
106
153
opts.Logger.Printf("Periodic save: updating index with %d bundles", len(bundles))
107
154
}
108
108
-
m.updateIndexFromRemote(bundles, remoteBundleMap, false) // silent during periodic save
155
155
+
updateIndex(bundles, remoteBundleMap, false)
109
156
}
110
157
}
111
158
}()
112
159
113
113
-
// Step 4: Download bundles concurrently
114
114
-
successList, failedList, bytes := m.downloadBundlesConcurrent(
160
160
+
// Step 4: Download bundles
161
161
+
successList, failedList, bytes := c.downloadBundlesConcurrent(
115
162
ctx,
116
163
opts.RemoteURL,
117
164
bundlesToDownload,
118
118
-
remoteBundleMap, // Pass the metadata map for hash verification
165
165
+
remoteBundleMap,
119
166
totalBytes,
120
167
opts.Workers,
121
168
opts.ProgressFunc,
···
134
181
saveCancel()
135
182
<-saveDone
136
183
137
137
-
// Step 5: Final index update using remote metadata
184
184
+
// Step 5: Final index update
138
185
opts.Logger.Printf("Updating local index...")
139
139
-
if err := m.updateIndexFromRemote(successList, remoteBundleMap, opts.Verbose); err != nil {
186
186
+
if err := updateIndex(successList, remoteBundleMap, opts.Verbose); err != nil {
140
187
return result, fmt.Errorf("failed to update index: %w", err)
141
188
}
142
189
···
144
191
return result, nil
145
192
}
146
193
147
147
-
// downloadBundlesConcurrent downloads bundles using a worker pool
148
148
-
func (m *Manager) downloadBundlesConcurrent(
194
194
+
// loadRemoteIndex loads index from remote URL
195
195
+
func (c *Cloner) loadRemoteIndex(baseURL string) (*bundleindex.Index, error) {
196
196
+
indexURL := strings.TrimSuffix(baseURL, "/") + "/index.json"
197
197
+
198
198
+
client := &http.Client{Timeout: 30 * time.Second}
199
199
+
200
200
+
resp, err := client.Get(indexURL)
201
201
+
if err != nil {
202
202
+
return nil, fmt.Errorf("failed to download: %w", err)
203
203
+
}
204
204
+
defer resp.Body.Close()
205
205
+
206
206
+
if resp.StatusCode != http.StatusOK {
207
207
+
return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
208
208
+
}
209
209
+
210
210
+
data, err := io.ReadAll(resp.Body)
211
211
+
if err != nil {
212
212
+
return nil, fmt.Errorf("failed to read response: %w", err)
213
213
+
}
214
214
+
215
215
+
var idx bundleindex.Index
216
216
+
if err := json.Unmarshal(data, &idx); err != nil {
217
217
+
return nil, fmt.Errorf("failed to parse index: %w", err)
218
218
+
}
219
219
+
220
220
+
return &idx, nil
221
221
+
}
222
222
+
223
223
+
// downloadBundlesConcurrent downloads bundles using worker pool
224
224
+
func (c *Cloner) downloadBundlesConcurrent(
149
225
ctx context.Context,
150
226
baseURL string,
151
227
bundleNumbers []int,
152
152
-
remoteBundleMap map[int]*BundleMetadata,
228
228
+
remoteBundleMap map[int]*bundleindex.BundleMetadata,
153
229
totalBytes int64,
154
230
workers int,
155
231
progressFunc func(downloaded, total int, bytesDownloaded, bytesTotal int64),
···
184
260
185
261
// Start workers
186
262
var wg sync.WaitGroup
187
187
-
client := &http.Client{
188
188
-
Timeout: 120 * time.Second,
189
189
-
}
263
263
+
client := &http.Client{Timeout: 120 * time.Second}
190
264
191
265
for w := 0; w < workers; w++ {
192
266
wg.Add(1)
···
205
279
default:
206
280
}
207
281
208
208
-
// Download bundle with hash verification
209
209
-
bytes, err := m.downloadBundle(client, baseURL, j.bundleNum, j.expectedHash)
282
282
+
// Download bundle
283
283
+
bytes, err := c.downloadBundle(client, baseURL, j.bundleNum, j.expectedHash)
210
284
211
285
// Update progress
212
286
mu.Lock()
···
238
312
}()
239
313
}
240
314
241
241
-
// Send jobs with expected hashes
315
315
+
// Send jobs
242
316
for _, num := range bundleNumbers {
243
317
expectedHash := ""
244
318
if meta, exists := remoteBundleMap[num]; exists {
···
260
334
// Collect results
261
335
for res := range results {
262
336
if res.err != nil && res.err != context.Canceled {
263
263
-
m.logger.Printf("Failed to download bundle %06d: %v", res.bundleNum, res.err)
337
337
+
c.logger.Printf("Failed to download bundle %06d: %v", res.bundleNum, res.err)
264
338
} else if res.success && verbose {
265
265
-
m.logger.Printf("✓ Downloaded and verified bundle %06d (%d bytes)", res.bundleNum, res.bytes)
339
339
+
c.logger.Printf("✓ Downloaded and verified bundle %06d (%d bytes)", res.bundleNum, res.bytes)
266
340
}
267
341
}
268
342
···
275
349
return
276
350
}
277
351
278
278
-
// updateIndexFromRemote updates local index with metadata from remote index
279
279
-
func (m *Manager) updateIndexFromRemote(bundleNumbers []int, remoteMeta map[int]*BundleMetadata, verbose bool) error {
280
280
-
if len(bundleNumbers) == 0 {
281
281
-
return nil
282
282
-
}
283
283
-
284
284
-
// Add/update bundles in local index using remote metadata
285
285
-
// Hash verification was already done during download
286
286
-
for _, num := range bundleNumbers {
287
287
-
if meta, exists := remoteMeta[num]; exists {
288
288
-
// Verify the file exists locally
289
289
-
path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", num))
290
290
-
if !m.operations.FileExists(path) {
291
291
-
m.logger.Printf("Warning: bundle %06d not found locally, skipping", num)
292
292
-
continue
293
293
-
}
294
294
-
295
295
-
// Add to index (no need to re-verify hash - already verified during download)
296
296
-
m.index.AddBundle(meta)
297
297
-
298
298
-
if verbose {
299
299
-
m.logger.Printf("Added bundle %06d to index", num)
300
300
-
}
301
301
-
}
302
302
-
}
303
303
-
304
304
-
// Save index
305
305
-
return m.SaveIndex()
306
306
-
}
307
307
-
308
308
-
// loadRemoteIndex loads an index from a remote URL
309
309
-
func (m *Manager) loadRemoteIndex(baseURL string) (*Index, error) {
310
310
-
indexURL := strings.TrimSuffix(baseURL, "/") + "/index.json"
311
311
-
312
312
-
client := &http.Client{
313
313
-
Timeout: 30 * time.Second,
314
314
-
}
315
315
-
316
316
-
resp, err := client.Get(indexURL)
317
317
-
if err != nil {
318
318
-
return nil, fmt.Errorf("failed to download: %w", err)
319
319
-
}
320
320
-
defer resp.Body.Close()
321
321
-
322
322
-
if resp.StatusCode != http.StatusOK {
323
323
-
return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
324
324
-
}
325
325
-
326
326
-
data, err := io.ReadAll(resp.Body)
327
327
-
if err != nil {
328
328
-
return nil, fmt.Errorf("failed to read response: %w", err)
329
329
-
}
330
330
-
331
331
-
var idx Index
332
332
-
if err := json.Unmarshal(data, &idx); err != nil {
333
333
-
return nil, fmt.Errorf("failed to parse index: %w", err)
334
334
-
}
335
335
-
336
336
-
return &idx, nil
337
337
-
}
338
338
-
339
339
-
// downloadBundle downloads a single bundle file and verifies its hash
340
340
-
func (m *Manager) downloadBundle(client *http.Client, baseURL string, bundleNum int, expectedHash string) (int64, error) {
352
352
+
// downloadBundle downloads a single bundle and verifies hash
353
353
+
func (c *Cloner) downloadBundle(client *http.Client, baseURL string, bundleNum int, expectedHash string) (int64, error) {
341
354
url := fmt.Sprintf("%s/data/%d", strings.TrimSuffix(baseURL, "/"), bundleNum)
342
355
filename := fmt.Sprintf("%06d.jsonl.zst", bundleNum)
343
343
-
filepath := filepath.Join(m.config.BundleDir, filename)
356
356
+
filepath := filepath.Join(c.bundleDir, filename)
344
357
345
358
// Create request
346
359
req, err := http.NewRequest("GET", url, nil)
···
360
373
return 0, fmt.Errorf("HTTP %d: %s", resp.StatusCode, string(body))
361
374
}
362
375
363
363
-
// Write to temp file (atomic write)
376
376
+
// Write to temp file
364
377
tempPath := filepath + ".tmp"
365
378
outFile, err := os.Create(tempPath)
366
379
if err != nil {
···
375
388
return 0, err
376
389
}
377
390
378
378
-
// Verify hash before committing
391
391
+
// Verify hash
379
392
if expectedHash != "" {
380
380
-
valid, actualHash, err := m.operations.VerifyHash(tempPath, expectedHash)
393
393
+
valid, actualHash, err := c.operations.VerifyHash(tempPath, expectedHash)
381
394
if err != nil {
382
395
os.Remove(tempPath)
383
396
return 0, fmt.Errorf("hash verification failed: %w", err)
+28
-28
internal/bundle/index.go
internal/bundleindex/index.go
···
1
1
-
package bundle
1
1
+
package bundleindex
2
2
3
3
import (
4
4
"fmt"
···
169
169
return len(idx.Bundles)
170
170
}
171
171
172
172
-
// FindGaps finds missing bundle numbers in the sequence
173
173
-
func (idx *Index) FindGaps() []int {
174
174
-
idx.mu.RLock()
175
175
-
defer idx.mu.RUnlock()
176
176
-
177
177
-
if len(idx.Bundles) == 0 {
178
178
-
return nil
179
179
-
}
180
180
-
181
181
-
var gaps []int
182
182
-
first := idx.Bundles[0].BundleNumber
183
183
-
last := idx.Bundles[len(idx.Bundles)-1].BundleNumber
184
184
-
185
185
-
bundleMap := make(map[int]bool)
186
186
-
for _, meta := range idx.Bundles {
187
187
-
bundleMap[meta.BundleNumber] = true
188
188
-
}
189
189
-
190
190
-
for i := first; i <= last; i++ {
191
191
-
if !bundleMap[i] {
192
192
-
gaps = append(gaps, i)
193
193
-
}
194
194
-
}
195
195
-
196
196
-
return gaps
197
197
-
}
198
198
-
199
172
// GetStats returns statistics about the index
200
173
func (idx *Index) GetStats() map[string]interface{} {
201
174
idx.mu.RLock()
···
280
253
idx.TotalSize = 0
281
254
idx.UpdatedAt = time.Now().UTC()
282
255
}
256
256
+
257
257
+
// FindGaps finds missing bundle numbers in the sequence
258
258
+
func (idx *Index) FindGaps() []int {
259
259
+
idx.mu.RLock()
260
260
+
defer idx.mu.RUnlock()
261
261
+
262
262
+
if len(idx.Bundles) == 0 {
263
263
+
return nil
264
264
+
}
265
265
+
266
266
+
var gaps []int
267
267
+
first := idx.Bundles[0].BundleNumber
268
268
+
last := idx.Bundles[len(idx.Bundles)-1].BundleNumber
269
269
+
270
270
+
bundleMap := make(map[int]bool)
271
271
+
for _, meta := range idx.Bundles {
272
272
+
bundleMap[meta.BundleNumber] = true
273
273
+
}
274
274
+
275
275
+
for i := first; i <= last; i++ {
276
276
+
if !bundleMap[i] {
277
277
+
gaps = append(gaps, i)
278
278
+
}
279
279
+
}
280
280
+
281
281
+
return gaps
282
282
+
}
+204
-573
internal/bundle/manager.go
···
9
9
"path/filepath"
10
10
"runtime"
11
11
"sort"
12
12
-
"strconv"
13
13
-
"strings"
14
12
"sync"
15
13
"time"
16
14
15
15
+
"tangled.org/atscan.net/plcbundle/internal/bundleindex"
17
16
"tangled.org/atscan.net/plcbundle/internal/didindex"
18
17
"tangled.org/atscan.net/plcbundle/internal/mempool"
19
18
"tangled.org/atscan.net/plcbundle/internal/storage"
19
19
+
internalsync "tangled.org/atscan.net/plcbundle/internal/sync"
20
20
"tangled.org/atscan.net/plcbundle/internal/types"
21
21
"tangled.org/atscan.net/plcbundle/plcclient"
22
22
)
···
36
36
type Manager struct {
37
37
config *Config
38
38
operations *storage.Operations
39
39
-
index *Index
39
39
+
index *bundleindex.Index
40
40
indexPath string
41
41
plcClient *plcclient.Client
42
42
logger types.Logger
43
43
mempool *mempool.Mempool
44
44
-
didIndex *didindex.Manager // Updated type
44
44
+
didIndex *didindex.Manager
45
45
+
46
46
+
syncer *internalsync.Fetcher
47
47
+
cloner *internalsync.Cloner
45
48
46
49
bundleCache map[int]*Bundle
47
50
cacheMu sync.RWMutex
···
76
79
}
77
80
78
81
// Load or create index
79
79
-
indexPath := filepath.Join(config.BundleDir, INDEX_FILE)
80
80
-
index, err := LoadIndex(indexPath)
82
82
+
indexPath := filepath.Join(config.BundleDir, bundleindex.INDEX_FILE)
83
83
+
index, err := bundleindex.LoadIndex(indexPath)
81
84
82
85
// Check for bundle files in directory
83
86
bundleFiles, _ := filepath.Glob(filepath.Join(config.BundleDir, "*.jsonl.zst"))
···
102
105
} else {
103
106
// No index and no bundles - create fresh index
104
107
config.Logger.Printf("Creating new index at %s", indexPath)
105
105
-
index = NewIndex(origin)
108
108
+
index = bundleindex.NewIndex(origin)
106
109
if err := index.Save(indexPath); err != nil {
107
110
return nil, fmt.Errorf("failed to save new index: %w", err)
108
111
}
···
174
177
tempMgr := &Manager{
175
178
config: config,
176
179
operations: ops,
177
177
-
index: NewIndex("test-origin"),
180
180
+
index: bundleindex.NewIndex("test-origin"),
178
181
indexPath: indexPath,
179
182
logger: config.Logger,
180
183
}
···
220
223
elapsed := time.Since(start)
221
224
222
225
// Reload the rebuilt index
223
223
-
index, err = LoadIndex(indexPath)
226
226
+
index, err = bundleindex.LoadIndex(indexPath)
224
227
if err != nil {
225
228
return nil, fmt.Errorf("failed to load rebuilt index: %w", err)
226
229
}
···
250
253
}
251
254
252
255
if index == nil {
253
253
-
index = NewIndex("test-origin")
256
256
+
index = bundleindex.NewIndex("test-origin")
254
257
}
255
258
256
259
// Initialize mempool for next bundle
···
271
274
// Initialize DID index manager
272
275
didIndex := didindex.NewManager(config.BundleDir, config.Logger)
273
276
277
277
+
// Initialize sync components
278
278
+
fetcher := internalsync.NewFetcher(plcClient, ops, config.Logger)
279
279
+
cloner := internalsync.NewCloner(ops, config.BundleDir, config.Logger)
280
280
+
274
281
return &Manager{
275
282
config: config,
276
283
operations: ops,
···
282
289
didIndex: didIndex, // Updated type
283
290
bundleCache: make(map[int]*Bundle),
284
291
maxCacheSize: 2,
292
292
+
syncer: fetcher,
293
293
+
cloner: cloner,
285
294
}, nil
286
295
}
287
296
···
304
313
}
305
314
306
315
// GetIndex returns the current index
307
307
-
func (m *Manager) GetIndex() *Index {
316
316
+
func (m *Manager) GetIndex() *bundleindex.Index {
308
317
return m.index
309
318
}
310
319
···
469
478
return nil
470
479
}
471
480
472
472
-
// FetchNextBundle fetches the next bundle from PLC directory
473
473
-
func (m *Manager) FetchNextBundle(ctx context.Context, quiet bool) (*Bundle, error) {
474
474
-
if m.plcClient == nil {
475
475
-
return nil, fmt.Errorf("PLC client not configured")
476
476
-
}
477
477
-
478
478
-
lastBundle := m.index.GetLastBundle()
479
479
-
nextBundleNum := 1
480
480
-
var afterTime string
481
481
-
var prevBoundaryCIDs map[string]bool
482
482
-
var prevBundleHash string
483
483
-
484
484
-
if lastBundle != nil {
485
485
-
nextBundleNum = lastBundle.BundleNumber + 1
486
486
-
afterTime = lastBundle.EndTime.Format(time.RFC3339Nano)
487
487
-
prevBundleHash = lastBundle.Hash
488
488
-
489
489
-
prevBundle, err := m.LoadBundle(ctx, lastBundle.BundleNumber)
490
490
-
if err == nil {
491
491
-
_, prevBoundaryCIDs = m.operations.GetBoundaryCIDs(prevBundle.Operations)
492
492
-
}
493
493
-
}
494
494
-
495
495
-
if !quiet {
496
496
-
m.logger.Printf("Preparing bundle %06d (mempool: %d ops)...", nextBundleNum, m.mempool.Count())
497
497
-
}
498
498
-
499
499
-
for m.mempool.Count() < types.BUNDLE_SIZE {
500
500
-
if !quiet {
501
501
-
m.logger.Printf("Fetching more operations (have %d/%d)...", m.mempool.Count(), types.BUNDLE_SIZE)
502
502
-
}
503
503
-
504
504
-
err := m.fetchToMempool(ctx, afterTime, prevBoundaryCIDs, types.BUNDLE_SIZE-m.mempool.Count(), quiet)
505
505
-
if err != nil {
506
506
-
if m.mempool.Count() >= types.BUNDLE_SIZE {
507
507
-
break
508
508
-
}
509
509
-
m.mempool.Save()
510
510
-
return nil, fmt.Errorf("insufficient operations: have %d, need %d", m.mempool.Count(), types.BUNDLE_SIZE)
511
511
-
}
512
512
-
513
513
-
if m.mempool.Count() < types.BUNDLE_SIZE {
514
514
-
m.mempool.Save()
515
515
-
return nil, fmt.Errorf("insufficient operations: have %d, need %d (no more available)", m.mempool.Count(), types.BUNDLE_SIZE)
516
516
-
}
517
517
-
}
518
518
-
519
519
-
if !quiet {
520
520
-
m.logger.Printf("Creating bundle %06d from mempool", nextBundleNum)
521
521
-
}
522
522
-
operations, err := m.mempool.Take(types.BUNDLE_SIZE)
523
523
-
if err != nil {
524
524
-
m.mempool.Save()
525
525
-
return nil, fmt.Errorf("failed to take operations from mempool: %w", err)
526
526
-
}
527
527
-
528
528
-
bundle := m.CreateBundle(nextBundleNum, operations, afterTime, prevBundleHash)
529
529
-
530
530
-
if err := m.mempool.Save(); err != nil {
531
531
-
m.logger.Printf("Warning: failed to save mempool: %v", err)
532
532
-
}
533
533
-
534
534
-
if !quiet {
535
535
-
m.logger.Printf("✓ Bundle %06d ready (%d ops, mempool: %d remaining)",
536
536
-
nextBundleNum, len(operations), m.mempool.Count())
537
537
-
}
538
538
-
539
539
-
return bundle, nil
540
540
-
}
541
541
-
542
542
-
// fetchToMempool fetches operations and adds them to mempool
543
543
-
func (m *Manager) fetchToMempool(ctx context.Context, afterTime string, prevBoundaryCIDs map[string]bool, target int, quiet bool) error {
544
544
-
seenCIDs := make(map[string]bool)
545
545
-
546
546
-
// Mark previous boundary CIDs as seen
547
547
-
for cid := range prevBoundaryCIDs {
548
548
-
seenCIDs[cid] = true
549
549
-
}
550
550
-
551
551
-
// Use last mempool time if available
552
552
-
if m.mempool.Count() > 0 {
553
553
-
afterTime = m.mempool.GetLastTime()
554
554
-
if !quiet {
555
555
-
m.logger.Printf(" Continuing from mempool cursor: %s", afterTime)
556
556
-
}
557
557
-
}
558
558
-
559
559
-
currentAfter := afterTime
560
560
-
maxFetches := 20
561
561
-
totalAdded := 0
562
562
-
startingCount := m.mempool.Count()
563
563
-
564
564
-
for fetchNum := 0; fetchNum < maxFetches; fetchNum++ {
565
565
-
// Calculate batch size
566
566
-
remaining := target - (m.mempool.Count() - startingCount)
567
567
-
if remaining <= 0 {
568
568
-
break
569
569
-
}
570
570
-
571
571
-
batchSize := 1000
572
572
-
if remaining < 500 {
573
573
-
batchSize = 200
574
574
-
}
575
575
-
576
576
-
if !quiet {
577
577
-
m.logger.Printf(" Fetch #%d: requesting %d operations (mempool: %d)",
578
578
-
fetchNum+1, batchSize, m.mempool.Count())
579
579
-
}
580
580
-
581
581
-
batch, err := m.plcClient.Export(ctx, plcclient.ExportOptions{
582
582
-
Count: batchSize,
583
583
-
After: currentAfter,
584
584
-
})
585
585
-
if err != nil {
586
586
-
m.mempool.Save()
587
587
-
return fmt.Errorf("export failed: %w", err)
588
588
-
}
589
589
-
590
590
-
if len(batch) == 0 {
591
591
-
if !quiet {
592
592
-
m.logger.Printf(" No more operations available from PLC")
593
593
-
}
594
594
-
m.mempool.Save()
595
595
-
if totalAdded > 0 {
596
596
-
return nil
597
597
-
}
598
598
-
return fmt.Errorf("no operations available")
599
599
-
}
600
600
-
601
601
-
// Deduplicate
602
602
-
uniqueOps := make([]plcclient.PLCOperation, 0)
603
603
-
for _, op := range batch {
604
604
-
if !seenCIDs[op.CID] {
605
605
-
seenCIDs[op.CID] = true
606
606
-
uniqueOps = append(uniqueOps, op)
607
607
-
}
608
608
-
}
609
609
-
610
610
-
if len(uniqueOps) > 0 {
611
611
-
added, err := m.mempool.Add(uniqueOps)
612
612
-
if err != nil {
613
613
-
m.mempool.Save()
614
614
-
return fmt.Errorf("chronological validation failed: %w", err)
615
615
-
}
616
616
-
617
617
-
totalAdded += added
618
618
-
if !quiet {
619
619
-
m.logger.Printf(" Added %d new operations (mempool now: %d)", added, m.mempool.Count())
620
620
-
}
621
621
-
}
622
622
-
623
623
-
// Update cursor
624
624
-
if len(batch) > 0 {
625
625
-
currentAfter = batch[len(batch)-1].CreatedAt.Format(time.RFC3339Nano)
626
626
-
}
627
627
-
628
628
-
// Stop if we got less than requested
629
629
-
if len(batch) < batchSize {
630
630
-
if !quiet {
631
631
-
m.logger.Printf(" Received incomplete batch (%d/%d), caught up to latest", len(batch), batchSize)
632
632
-
}
633
633
-
break
634
634
-
}
635
635
-
}
636
636
-
637
637
-
if totalAdded > 0 {
638
638
-
if !quiet {
639
639
-
m.logger.Printf("✓ Fetch complete: added %d operations (mempool: %d)", totalAdded, m.mempool.Count())
640
640
-
}
641
641
-
return nil
642
642
-
}
643
643
-
644
644
-
return fmt.Errorf("no new operations added")
645
645
-
}
646
646
-
647
481
// GetMempoolStats returns mempool statistics
648
482
func (m *Manager) GetMempoolStats() map[string]interface{} {
649
483
return m.mempool.Stats()
···
730
564
return result, nil
731
565
}
732
566
733
733
-
// VerifyChain verifies the entire bundle chain
734
734
-
func (m *Manager) VerifyChain(ctx context.Context) (*ChainVerificationResult, error) {
735
735
-
result := &ChainVerificationResult{
736
736
-
VerifiedBundles: make([]int, 0),
737
737
-
}
738
738
-
739
739
-
bundles := m.index.GetBundles()
740
740
-
if len(bundles) == 0 {
741
741
-
result.Valid = true
742
742
-
return result, nil
743
743
-
}
744
744
-
745
745
-
result.ChainLength = len(bundles)
746
746
-
747
747
-
for i, meta := range bundles {
748
748
-
// Verify file hash
749
749
-
vr, err := m.VerifyBundle(ctx, meta.BundleNumber)
750
750
-
if err != nil || !vr.Valid {
751
751
-
result.Error = fmt.Sprintf("Bundle %d hash verification failed", meta.BundleNumber)
752
752
-
result.BrokenAt = meta.BundleNumber
753
753
-
return result, nil
754
754
-
}
755
755
-
756
756
-
// Verify chain link
757
757
-
if i > 0 {
758
758
-
prevMeta := bundles[i-1]
759
759
-
760
760
-
// Check parent reference
761
761
-
if meta.Parent != prevMeta.Hash {
762
762
-
result.Error = fmt.Sprintf("Chain broken at bundle %d: parent mismatch", meta.BundleNumber)
763
763
-
result.BrokenAt = meta.BundleNumber
764
764
-
return result, nil
765
765
-
}
766
766
-
767
767
-
// Verify chain hash calculation
768
768
-
expectedHash := m.operations.CalculateChainHash(prevMeta.Hash, meta.ContentHash)
769
769
-
if meta.Hash != expectedHash {
770
770
-
result.Error = fmt.Sprintf("Chain broken at bundle %d: hash mismatch", meta.BundleNumber)
771
771
-
result.BrokenAt = meta.BundleNumber
772
772
-
return result, nil
773
773
-
}
774
774
-
}
775
775
-
776
776
-
result.VerifiedBundles = append(result.VerifiedBundles, meta.BundleNumber)
777
777
-
}
778
778
-
779
779
-
result.Valid = true
780
780
-
return result, nil
781
781
-
}
782
782
-
783
783
-
// ScanDirectory scans the bundle directory and rebuilds the index
784
784
-
func (m *Manager) ScanDirectory() (*DirectoryScanResult, error) {
785
785
-
result := &DirectoryScanResult{
786
786
-
BundleDir: m.config.BundleDir,
787
787
-
}
788
788
-
789
789
-
m.logger.Printf("Scanning directory: %s", m.config.BundleDir)
790
790
-
791
791
-
// Find all bundle files
792
792
-
files, err := filepath.Glob(filepath.Join(m.config.BundleDir, "*.jsonl.zst"))
793
793
-
if err != nil {
794
794
-
return nil, fmt.Errorf("failed to scan directory: %w", err)
795
795
-
}
796
796
-
files = filterBundleFiles(files)
797
797
-
798
798
-
if len(files) == 0 {
799
799
-
m.logger.Printf("No bundle files found")
800
800
-
return result, nil
801
801
-
}
802
802
-
803
803
-
// Parse bundle numbers
804
804
-
var bundleNumbers []int
805
805
-
for _, file := range files {
806
806
-
base := filepath.Base(file)
807
807
-
numStr := strings.TrimSuffix(base, ".jsonl.zst")
808
808
-
num, err := strconv.Atoi(numStr)
809
809
-
if err != nil {
810
810
-
m.logger.Printf("Warning: skipping invalid filename: %s", base)
811
811
-
continue
812
812
-
}
813
813
-
bundleNumbers = append(bundleNumbers, num)
814
814
-
}
815
815
-
816
816
-
sort.Ints(bundleNumbers)
817
817
-
818
818
-
result.BundleCount = len(bundleNumbers)
819
819
-
if len(bundleNumbers) > 0 {
820
820
-
result.FirstBundle = bundleNumbers[0]
821
821
-
result.LastBundle = bundleNumbers[len(bundleNumbers)-1]
822
822
-
}
823
823
-
824
824
-
// Find gaps
825
825
-
if len(bundleNumbers) > 1 {
826
826
-
for i := result.FirstBundle; i <= result.LastBundle; i++ {
827
827
-
found := false
828
828
-
for _, num := range bundleNumbers {
829
829
-
if num == i {
830
830
-
found = true
831
831
-
break
832
832
-
}
833
833
-
}
834
834
-
if !found {
835
835
-
result.MissingGaps = append(result.MissingGaps, i)
836
836
-
}
837
837
-
}
838
838
-
}
839
839
-
840
840
-
m.logger.Printf("Found %d bundles (gaps: %d)", result.BundleCount, len(result.MissingGaps))
841
841
-
842
842
-
// Load each bundle and rebuild index
843
843
-
var newMetadata []*BundleMetadata
844
844
-
var totalSize int64
845
845
-
846
846
-
for _, num := range bundleNumbers {
847
847
-
path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", num))
848
848
-
849
849
-
// Load bundle
850
850
-
ops, err := m.operations.LoadBundle(path)
851
851
-
if err != nil {
852
852
-
m.logger.Printf("Warning: failed to load bundle %d: %v", num, err)
853
853
-
continue
854
854
-
}
855
855
-
856
856
-
// Get file size
857
857
-
size, _ := m.operations.GetFileSize(path)
858
858
-
totalSize += size
859
859
-
860
860
-
// Calculate parent and cursor from previous bundle
861
861
-
var parent string
862
862
-
var cursor string
863
863
-
if num > 1 && len(newMetadata) > 0 {
864
864
-
prevMeta := newMetadata[len(newMetadata)-1]
865
865
-
parent = prevMeta.Hash
866
866
-
cursor = prevMeta.EndTime.Format(time.RFC3339Nano)
867
867
-
}
868
868
-
869
869
-
// Use the ONE method for metadata calculation
870
870
-
meta, err := m.CalculateBundleMetadata(num, path, ops, parent, cursor)
871
871
-
if err != nil {
872
872
-
m.logger.Printf("Warning: failed to calculate metadata for bundle %d: %v", num, err)
873
873
-
continue
874
874
-
}
875
875
-
876
876
-
newMetadata = append(newMetadata, meta)
877
877
-
878
878
-
m.logger.Printf(" Scanned bundle %06d: %d ops, %d DIDs", num, len(ops), meta.DIDCount)
879
879
-
}
880
880
-
881
881
-
result.TotalSize = totalSize
882
882
-
883
883
-
// Rebuild index
884
884
-
m.index.Rebuild(newMetadata)
885
885
-
886
886
-
// Save index
887
887
-
if err := m.SaveIndex(); err != nil {
888
888
-
return nil, fmt.Errorf("failed to save index: %w", err)
889
889
-
}
890
890
-
891
891
-
result.IndexUpdated = true
892
892
-
893
893
-
m.logger.Printf("Index rebuilt with %d bundles", len(newMetadata))
894
894
-
895
895
-
return result, nil
896
896
-
}
897
897
-
898
898
-
// ScanDirectoryParallel scans the bundle directory in parallel and rebuilds the index
899
899
-
func (m *Manager) ScanDirectoryParallel(workers int, progressCallback func(current, total int, bytesProcessed int64)) (*DirectoryScanResult, error) {
900
900
-
result := &DirectoryScanResult{
901
901
-
BundleDir: m.config.BundleDir,
902
902
-
}
903
903
-
904
904
-
m.logger.Printf("Scanning directory (parallel, %d workers): %s", workers, m.config.BundleDir)
905
905
-
906
906
-
// Find all bundle files
907
907
-
files, err := filepath.Glob(filepath.Join(m.config.BundleDir, "*.jsonl.zst"))
908
908
-
if err != nil {
909
909
-
return nil, fmt.Errorf("failed to scan directory: %w", err)
910
910
-
}
911
911
-
files = filterBundleFiles(files)
912
912
-
913
913
-
if len(files) == 0 {
914
914
-
m.logger.Printf("No bundle files found")
915
915
-
return result, nil
916
916
-
}
917
917
-
918
918
-
// Parse bundle numbers
919
919
-
var bundleNumbers []int
920
920
-
for _, file := range files {
921
921
-
base := filepath.Base(file)
922
922
-
numStr := strings.TrimSuffix(base, ".jsonl.zst")
923
923
-
num, err := strconv.Atoi(numStr)
924
924
-
if err != nil {
925
925
-
m.logger.Printf("Warning: skipping invalid filename: %s", base)
926
926
-
continue
927
927
-
}
928
928
-
bundleNumbers = append(bundleNumbers, num)
929
929
-
}
930
930
-
931
931
-
sort.Ints(bundleNumbers)
932
932
-
933
933
-
result.BundleCount = len(bundleNumbers)
934
934
-
if len(bundleNumbers) > 0 {
935
935
-
result.FirstBundle = bundleNumbers[0]
936
936
-
result.LastBundle = bundleNumbers[len(bundleNumbers)-1]
937
937
-
}
938
938
-
939
939
-
// Find gaps
940
940
-
if len(bundleNumbers) > 1 {
941
941
-
for i := result.FirstBundle; i <= result.LastBundle; i++ {
942
942
-
found := false
943
943
-
for _, num := range bundleNumbers {
944
944
-
if num == i {
945
945
-
found = true
946
946
-
break
947
947
-
}
948
948
-
}
949
949
-
if !found {
950
950
-
result.MissingGaps = append(result.MissingGaps, i)
951
951
-
}
952
952
-
}
953
953
-
}
954
954
-
955
955
-
m.logger.Printf("Found %d bundles (gaps: %d)", result.BundleCount, len(result.MissingGaps))
956
956
-
957
957
-
// Process bundles in parallel
958
958
-
type bundleResult struct {
959
959
-
index int
960
960
-
meta *BundleMetadata
961
961
-
err error
962
962
-
}
963
963
-
964
964
-
jobs := make(chan int, len(bundleNumbers))
965
965
-
results := make(chan bundleResult, len(bundleNumbers))
966
966
-
967
967
-
// Start workers
968
968
-
var wg sync.WaitGroup
969
969
-
for w := 0; w < workers; w++ {
970
970
-
wg.Add(1)
971
971
-
go func() {
972
972
-
defer wg.Done()
973
973
-
for num := range jobs {
974
974
-
path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", num))
975
975
-
976
976
-
// Load and process bundle
977
977
-
ops, err := m.operations.LoadBundle(path)
978
978
-
if err != nil {
979
979
-
results <- bundleResult{index: num, err: err}
980
980
-
continue
981
981
-
}
982
982
-
983
983
-
// Use the FAST method (cursor will be set later in sequential phase)
984
984
-
meta, err := m.CalculateBundleMetadataFast(num, path, ops, "")
985
985
-
if err != nil {
986
986
-
results <- bundleResult{index: num, err: err}
987
987
-
continue
988
988
-
}
989
989
-
990
990
-
results <- bundleResult{index: num, meta: meta}
991
991
-
}
992
992
-
}()
993
993
-
}
994
994
-
995
995
-
// Send jobs
996
996
-
for _, num := range bundleNumbers {
997
997
-
jobs <- num
998
998
-
}
999
999
-
close(jobs)
1000
1000
-
1001
1001
-
// Wait for all workers to finish
1002
1002
-
go func() {
1003
1003
-
wg.Wait()
1004
1004
-
close(results)
1005
1005
-
}()
1006
1006
-
1007
1007
-
// Collect results
1008
1008
-
metadataMap := make(map[int]*BundleMetadata)
1009
1009
-
var totalSize int64
1010
1010
-
var totalUncompressed int64
1011
1011
-
processed := 0
1012
1012
-
1013
1013
-
for result := range results {
1014
1014
-
processed++
1015
1015
-
1016
1016
-
// Update progress WITH bytes
1017
1017
-
if progressCallback != nil {
1018
1018
-
if result.meta != nil {
1019
1019
-
totalUncompressed += result.meta.UncompressedSize
1020
1020
-
}
1021
1021
-
progressCallback(processed, len(bundleNumbers), totalUncompressed)
1022
1022
-
}
1023
1023
-
1024
1024
-
if result.err != nil {
1025
1025
-
m.logger.Printf("Warning: failed to process bundle %d: %v", result.index, result.err)
1026
1026
-
continue
1027
1027
-
}
1028
1028
-
metadataMap[result.index] = result.meta
1029
1029
-
totalSize += result.meta.CompressedSize
1030
1030
-
}
1031
1031
-
1032
1032
-
// Build ordered metadata slice and calculate chain hashes
1033
1033
-
var newMetadata []*BundleMetadata
1034
1034
-
var parent string
1035
1035
-
1036
1036
-
for i, num := range bundleNumbers {
1037
1037
-
meta, ok := metadataMap[num]
1038
1038
-
if !ok {
1039
1039
-
continue
1040
1040
-
}
1041
1041
-
1042
1042
-
// Set cursor from previous bundle's EndTime
1043
1043
-
if i > 0 && len(newMetadata) > 0 {
1044
1044
-
prevMeta := newMetadata[len(newMetadata)-1]
1045
1045
-
meta.Cursor = prevMeta.EndTime.Format(time.RFC3339Nano)
1046
1046
-
}
1047
1047
-
1048
1048
-
// Calculate chain hash (must be done sequentially)
1049
1049
-
meta.Hash = m.operations.CalculateChainHash(parent, meta.ContentHash)
1050
1050
-
meta.Parent = parent
1051
1051
-
1052
1052
-
newMetadata = append(newMetadata, meta)
1053
1053
-
parent = meta.Hash
1054
1054
-
}
1055
1055
-
1056
1056
-
result.TotalSize = totalSize
1057
1057
-
result.TotalUncompressed = totalUncompressed
1058
1058
-
1059
1059
-
// Rebuild index
1060
1060
-
m.index.Rebuild(newMetadata)
1061
1061
-
1062
1062
-
// Save index
1063
1063
-
if err := m.SaveIndex(); err != nil {
1064
1064
-
return nil, fmt.Errorf("failed to save index: %w", err)
1065
1065
-
}
1066
1066
-
1067
1067
-
result.IndexUpdated = true
1068
1068
-
1069
1069
-
m.logger.Printf("Index rebuilt with %d bundles", len(newMetadata))
1070
1070
-
1071
1071
-
return result, nil
1072
1072
-
}
1073
1073
-
1074
567
// GetInfo returns information about the bundle manager
1075
568
func (m *Manager) GetInfo() map[string]interface{} {
1076
569
stats := m.index.GetStats()
···
1130
623
return result, nil
1131
624
}
1132
625
1133
1133
-
// ScanBundle scans a single bundle file and returns its metadata
1134
1134
-
func (m *Manager) ScanBundle(path string, bundleNumber int) (*BundleMetadata, error) {
1135
1135
-
// Load bundle file
1136
1136
-
operations, err := m.operations.LoadBundle(path)
1137
1137
-
if err != nil {
1138
1138
-
return nil, fmt.Errorf("failed to load bundle: %w", err)
1139
1139
-
}
1140
1140
-
1141
1141
-
if len(operations) == 0 {
1142
1142
-
return nil, fmt.Errorf("bundle is empty")
1143
1143
-
}
1144
1144
-
1145
1145
-
// Get parent chain hash and cursor from previous bundle
1146
1146
-
var parent string
1147
1147
-
var cursor string
1148
1148
-
if bundleNumber > 1 {
1149
1149
-
if prevMeta, err := m.index.GetBundle(bundleNumber - 1); err == nil {
1150
1150
-
parent = prevMeta.Hash
1151
1151
-
cursor = prevMeta.EndTime.Format(time.RFC3339Nano)
1152
1152
-
}
1153
1153
-
}
1154
1154
-
1155
1155
-
// Use the ONE method
1156
1156
-
return m.CalculateBundleMetadata(bundleNumber, path, operations, parent, cursor)
1157
1157
-
}
1158
1158
-
1159
1159
-
// ScanAndIndexBundle scans a bundle file and adds it to the index
1160
1160
-
func (m *Manager) ScanAndIndexBundle(path string, bundleNumber int) (*BundleMetadata, error) {
1161
1161
-
meta, err := m.ScanBundle(path, bundleNumber)
1162
1162
-
if err != nil {
1163
1163
-
return nil, err
1164
1164
-
}
1165
1165
-
1166
1166
-
// Add to index
1167
1167
-
m.index.AddBundle(meta)
1168
1168
-
1169
1169
-
// Save index
1170
1170
-
if err := m.SaveIndex(); err != nil {
1171
1171
-
return nil, fmt.Errorf("failed to save index: %w", err)
1172
1172
-
}
1173
1173
-
1174
1174
-
return meta, nil
1175
1175
-
}
1176
1176
-
1177
626
// IsBundleIndexed checks if a bundle is already in the index
1178
627
func (m *Manager) IsBundleIndexed(bundleNumber int) bool {
1179
628
_, err := m.index.GetBundle(bundleNumber)
···
1274
723
if info.ModTime().After(m.index.UpdatedAt) {
1275
724
m.logger.Printf("Index file modified, reloading...")
1276
725
1277
1277
-
newIndex, err := LoadIndex(m.indexPath)
726
726
+
newIndex, err := bundleindex.LoadIndex(m.indexPath)
1278
727
if err != nil {
1279
728
return fmt.Errorf("failed to reload index: %w", err)
1280
729
}
···
1381
830
1382
831
// bundleIndexAdapter adapts Index to BundleIndexProvider interface
1383
832
type bundleIndexAdapter struct {
1384
1384
-
index *Index
833
833
+
index *bundleindex.Index
1385
834
}
1386
835
1387
836
func (a *bundleIndexAdapter) GetBundles() []*didindex.BundleMetadata {
···
1844
1293
1845
1294
return results, nil
1846
1295
}
1296
1296
+
1297
1297
+
// VerifyChain verifies the entire bundle chain
1298
1298
+
func (m *Manager) VerifyChain(ctx context.Context) (*ChainVerificationResult, error) {
1299
1299
+
result := &ChainVerificationResult{
1300
1300
+
VerifiedBundles: make([]int, 0),
1301
1301
+
}
1302
1302
+
1303
1303
+
bundles := m.index.GetBundles()
1304
1304
+
if len(bundles) == 0 {
1305
1305
+
result.Valid = true
1306
1306
+
return result, nil
1307
1307
+
}
1308
1308
+
1309
1309
+
result.ChainLength = len(bundles)
1310
1310
+
1311
1311
+
for i, meta := range bundles {
1312
1312
+
// Verify file hash
1313
1313
+
vr, err := m.VerifyBundle(ctx, meta.BundleNumber)
1314
1314
+
if err != nil || !vr.Valid {
1315
1315
+
result.Error = fmt.Sprintf("Bundle %d hash verification failed", meta.BundleNumber)
1316
1316
+
result.BrokenAt = meta.BundleNumber
1317
1317
+
return result, nil
1318
1318
+
}
1319
1319
+
1320
1320
+
// Verify chain link
1321
1321
+
if i > 0 {
1322
1322
+
prevMeta := bundles[i-1]
1323
1323
+
1324
1324
+
// Check parent reference
1325
1325
+
if meta.Parent != prevMeta.Hash {
1326
1326
+
result.Error = fmt.Sprintf("Chain broken at bundle %d: parent mismatch", meta.BundleNumber)
1327
1327
+
result.BrokenAt = meta.BundleNumber
1328
1328
+
return result, nil
1329
1329
+
}
1330
1330
+
1331
1331
+
// Verify chain hash calculation
1332
1332
+
expectedHash := m.operations.CalculateChainHash(prevMeta.Hash, meta.ContentHash)
1333
1333
+
if meta.Hash != expectedHash {
1334
1334
+
result.Error = fmt.Sprintf("Chain broken at bundle %d: hash mismatch", meta.BundleNumber)
1335
1335
+
result.BrokenAt = meta.BundleNumber
1336
1336
+
return result, nil
1337
1337
+
}
1338
1338
+
}
1339
1339
+
1340
1340
+
result.VerifiedBundles = append(result.VerifiedBundles, meta.BundleNumber)
1341
1341
+
}
1342
1342
+
1343
1343
+
result.Valid = true
1344
1344
+
return result, nil
1345
1345
+
}
1346
1346
+
1347
1347
+
// FetchNextBundle delegates to sync.Fetcher
1348
1348
+
func (m *Manager) FetchNextBundle(ctx context.Context, quiet bool) (*Bundle, error) {
1349
1349
+
if m.plcClient == nil {
1350
1350
+
return nil, fmt.Errorf("PLC client not configured")
1351
1351
+
}
1352
1352
+
1353
1353
+
lastBundle := m.index.GetLastBundle()
1354
1354
+
nextBundleNum := 1
1355
1355
+
var afterTime string
1356
1356
+
var prevBoundaryCIDs map[string]bool
1357
1357
+
var prevBundleHash string
1358
1358
+
1359
1359
+
if lastBundle != nil {
1360
1360
+
nextBundleNum = lastBundle.BundleNumber + 1
1361
1361
+
afterTime = lastBundle.EndTime.Format(time.RFC3339Nano)
1362
1362
+
prevBundleHash = lastBundle.Hash
1363
1363
+
1364
1364
+
prevBundle, err := m.LoadBundle(ctx, lastBundle.BundleNumber)
1365
1365
+
if err == nil {
1366
1366
+
_, prevBoundaryCIDs = m.operations.GetBoundaryCIDs(prevBundle.Operations)
1367
1367
+
}
1368
1368
+
}
1369
1369
+
1370
1370
+
if !quiet {
1371
1371
+
m.logger.Printf("Preparing bundle %06d (mempool: %d ops)...", nextBundleNum, m.mempool.Count())
1372
1372
+
}
1373
1373
+
1374
1374
+
// Fetch operations using syncer
1375
1375
+
for m.mempool.Count() < types.BUNDLE_SIZE {
1376
1376
+
newOps, err := m.syncer.FetchToMempool(
1377
1377
+
ctx,
1378
1378
+
afterTime,
1379
1379
+
prevBoundaryCIDs,
1380
1380
+
types.BUNDLE_SIZE-m.mempool.Count(),
1381
1381
+
quiet,
1382
1382
+
m.mempool.Count(),
1383
1383
+
)
1384
1384
+
1385
1385
+
if err != nil {
1386
1386
+
m.mempool.Save()
1387
1387
+
return nil, err
1388
1388
+
}
1389
1389
+
1390
1390
+
// Add to mempool
1391
1391
+
added, err := m.mempool.Add(newOps)
1392
1392
+
if err != nil {
1393
1393
+
m.mempool.Save()
1394
1394
+
return nil, fmt.Errorf("chronological validation failed: %w", err)
1395
1395
+
}
1396
1396
+
1397
1397
+
if !quiet {
1398
1398
+
m.logger.Printf("Added %d new operations (mempool now: %d)", added, m.mempool.Count())
1399
1399
+
}
1400
1400
+
1401
1401
+
if len(newOps) == 0 {
1402
1402
+
break
1403
1403
+
}
1404
1404
+
}
1405
1405
+
1406
1406
+
if m.mempool.Count() < types.BUNDLE_SIZE {
1407
1407
+
m.mempool.Save()
1408
1408
+
return nil, fmt.Errorf("insufficient operations: have %d, need %d", m.mempool.Count(), types.BUNDLE_SIZE)
1409
1409
+
}
1410
1410
+
1411
1411
+
// Create bundle
1412
1412
+
operations, err := m.mempool.Take(types.BUNDLE_SIZE)
1413
1413
+
if err != nil {
1414
1414
+
m.mempool.Save()
1415
1415
+
return nil, err
1416
1416
+
}
1417
1417
+
1418
1418
+
bundle := m.CreateBundle(nextBundleNum, operations, afterTime, prevBundleHash)
1419
1419
+
m.mempool.Save()
1420
1420
+
1421
1421
+
return bundle, nil
1422
1422
+
}
1423
1423
+
1424
1424
+
// CloneFromRemote delegates to sync.Cloner
1425
1425
+
func (m *Manager) CloneFromRemote(ctx context.Context, opts internalsync.CloneOptions) (*internalsync.CloneResult, error) {
1426
1426
+
// Delegate to cloner with index update callback
1427
1427
+
return m.cloner.Clone(ctx, opts, m.index, m.updateIndexFromRemote)
1428
1428
+
}
1429
1429
+
1430
1430
+
// updateIndexFromRemote updates local index with metadata from remote index
1431
1431
+
func (m *Manager) updateIndexFromRemote(bundleNumbers []int, remoteMeta map[int]*bundleindex.BundleMetadata, verbose bool) error {
1432
1432
+
if len(bundleNumbers) == 0 {
1433
1433
+
return nil
1434
1434
+
}
1435
1435
+
1436
1436
+
// Add/update bundles in local index using remote metadata
1437
1437
+
// Hash verification was already done during download
1438
1438
+
for _, num := range bundleNumbers {
1439
1439
+
if meta, exists := remoteMeta[num]; exists {
1440
1440
+
// Verify the file exists locally
1441
1441
+
path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", num))
1442
1442
+
if !m.operations.FileExists(path) {
1443
1443
+
m.logger.Printf("Warning: bundle %06d not found locally, skipping", num)
1444
1444
+
continue
1445
1445
+
}
1446
1446
+
1447
1447
+
// Add to index (no need to re-verify hash - already verified during download)
1448
1448
+
m.index.AddBundle(meta)
1449
1449
+
1450
1450
+
if verbose {
1451
1451
+
m.logger.Printf("Added bundle %06d to index", num)
1452
1452
+
}
1453
1453
+
}
1454
1454
+
}
1455
1455
+
1456
1456
+
// Save index
1457
1457
+
return m.SaveIndex()
1458
1458
+
}
1459
1459
+
1460
1460
+
func (m *Manager) CreateBundle(bundleNumber int, operations []plcclient.PLCOperation, cursor string, parent string) *Bundle {
1461
1461
+
// Delegate to sync package
1462
1462
+
syncBundle := internalsync.CreateBundle(bundleNumber, operations, cursor, parent, m.operations)
1463
1463
+
1464
1464
+
// Convert if needed (or just return directly if types match)
1465
1465
+
return &Bundle{
1466
1466
+
BundleNumber: syncBundle.BundleNumber,
1467
1467
+
StartTime: syncBundle.StartTime,
1468
1468
+
EndTime: syncBundle.EndTime,
1469
1469
+
Operations: syncBundle.Operations,
1470
1470
+
DIDCount: syncBundle.DIDCount,
1471
1471
+
Cursor: syncBundle.Cursor,
1472
1472
+
Parent: syncBundle.Parent,
1473
1473
+
BoundaryCIDs: syncBundle.BoundaryCIDs,
1474
1474
+
Compressed: syncBundle.Compressed,
1475
1475
+
CreatedAt: syncBundle.CreatedAt,
1476
1476
+
}
1477
1477
+
}
+5
-4
internal/bundle/metadata.go
···
5
5
"os"
6
6
"time"
7
7
8
8
+
"tangled.org/atscan.net/plcbundle/internal/bundleindex"
8
9
"tangled.org/atscan.net/plcbundle/plcclient"
9
10
)
10
11
11
12
// CalculateBundleMetadata calculates complete metadata for a bundle
12
12
-
func (m *Manager) CalculateBundleMetadata(bundleNumber int, path string, operations []plcclient.PLCOperation, parent string, cursor string) (*BundleMetadata, error) {
13
13
+
func (m *Manager) CalculateBundleMetadata(bundleNumber int, path string, operations []plcclient.PLCOperation, parent string, cursor string) (*bundleindex.BundleMetadata, error) {
13
14
if len(operations) == 0 {
14
15
return nil, fmt.Errorf("bundle is empty")
15
16
}
···
39
40
// Calculate chain hash
40
41
chainHash := m.operations.CalculateChainHash(parent, contentHash)
41
42
42
42
-
return &BundleMetadata{
43
43
+
return &bundleindex.BundleMetadata{
43
44
BundleNumber: bundleNumber,
44
45
StartTime: operations[0].CreatedAt,
45
46
EndTime: operations[len(operations)-1].CreatedAt,
···
57
58
}
58
59
59
60
// CalculateBundleMetadataFast calculates metadata quickly without chain hash
60
60
-
func (m *Manager) CalculateBundleMetadataFast(bundleNumber int, path string, operations []plcclient.PLCOperation, cursor string) (*BundleMetadata, error) {
61
61
+
func (m *Manager) CalculateBundleMetadataFast(bundleNumber int, path string, operations []plcclient.PLCOperation, cursor string) (*bundleindex.BundleMetadata, error) {
61
62
if len(operations) == 0 {
62
63
return nil, fmt.Errorf("bundle is empty")
63
64
}
···
71
72
// Extract unique DIDs
72
73
dids := m.operations.ExtractUniqueDIDs(operations)
73
74
74
74
-
return &BundleMetadata{
75
75
+
return &bundleindex.BundleMetadata{
75
76
BundleNumber: bundleNumber,
76
77
StartTime: operations[0].CreatedAt,
77
78
EndTime: operations[len(operations)-1].CreatedAt,
+348
internal/bundle/scanner.go
···
1
1
+
package bundle
2
2
+
3
3
+
import (
4
4
+
"fmt"
5
5
+
"path/filepath"
6
6
+
"sort"
7
7
+
"strconv"
8
8
+
"strings"
9
9
+
"sync"
10
10
+
"time"
11
11
+
12
12
+
"tangled.org/atscan.net/plcbundle/internal/bundleindex"
13
13
+
)
14
14
+
15
15
+
// ScanDirectory scans the bundle directory and rebuilds the index
16
16
+
func (m *Manager) ScanDirectory() (*DirectoryScanResult, error) {
17
17
+
result := &DirectoryScanResult{
18
18
+
BundleDir: m.config.BundleDir,
19
19
+
}
20
20
+
21
21
+
m.logger.Printf("Scanning directory: %s", m.config.BundleDir)
22
22
+
23
23
+
// Find all bundle files
24
24
+
files, err := filepath.Glob(filepath.Join(m.config.BundleDir, "*.jsonl.zst"))
25
25
+
if err != nil {
26
26
+
return nil, fmt.Errorf("failed to scan directory: %w", err)
27
27
+
}
28
28
+
files = filterBundleFiles(files)
29
29
+
30
30
+
if len(files) == 0 {
31
31
+
m.logger.Printf("No bundle files found")
32
32
+
return result, nil
33
33
+
}
34
34
+
35
35
+
// Parse bundle numbers
36
36
+
var bundleNumbers []int
37
37
+
for _, file := range files {
38
38
+
base := filepath.Base(file)
39
39
+
numStr := strings.TrimSuffix(base, ".jsonl.zst")
40
40
+
num, err := strconv.Atoi(numStr)
41
41
+
if err != nil {
42
42
+
m.logger.Printf("Warning: skipping invalid filename: %s", base)
43
43
+
continue
44
44
+
}
45
45
+
bundleNumbers = append(bundleNumbers, num)
46
46
+
}
47
47
+
48
48
+
sort.Ints(bundleNumbers)
49
49
+
50
50
+
result.BundleCount = len(bundleNumbers)
51
51
+
if len(bundleNumbers) > 0 {
52
52
+
result.FirstBundle = bundleNumbers[0]
53
53
+
result.LastBundle = bundleNumbers[len(bundleNumbers)-1]
54
54
+
}
55
55
+
56
56
+
// Find gaps
57
57
+
if len(bundleNumbers) > 1 {
58
58
+
for i := result.FirstBundle; i <= result.LastBundle; i++ {
59
59
+
found := false
60
60
+
for _, num := range bundleNumbers {
61
61
+
if num == i {
62
62
+
found = true
63
63
+
break
64
64
+
}
65
65
+
}
66
66
+
if !found {
67
67
+
result.MissingGaps = append(result.MissingGaps, i)
68
68
+
}
69
69
+
}
70
70
+
}
71
71
+
72
72
+
m.logger.Printf("Found %d bundles (gaps: %d)", result.BundleCount, len(result.MissingGaps))
73
73
+
74
74
+
// Load each bundle and rebuild index
75
75
+
var newMetadata []*bundleindex.BundleMetadata
76
76
+
var totalSize int64
77
77
+
78
78
+
for _, num := range bundleNumbers {
79
79
+
path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", num))
80
80
+
81
81
+
// Load bundle
82
82
+
ops, err := m.operations.LoadBundle(path)
83
83
+
if err != nil {
84
84
+
m.logger.Printf("Warning: failed to load bundle %d: %v", num, err)
85
85
+
continue
86
86
+
}
87
87
+
88
88
+
// Get file size
89
89
+
size, _ := m.operations.GetFileSize(path)
90
90
+
totalSize += size
91
91
+
92
92
+
// Calculate parent and cursor from previous bundle
93
93
+
var parent string
94
94
+
var cursor string
95
95
+
if num > 1 && len(newMetadata) > 0 {
96
96
+
prevMeta := newMetadata[len(newMetadata)-1]
97
97
+
parent = prevMeta.Hash
98
98
+
cursor = prevMeta.EndTime.Format(time.RFC3339Nano)
99
99
+
}
100
100
+
101
101
+
// Use the ONE method for metadata calculation
102
102
+
meta, err := m.CalculateBundleMetadata(num, path, ops, parent, cursor)
103
103
+
if err != nil {
104
104
+
m.logger.Printf("Warning: failed to calculate metadata for bundle %d: %v", num, err)
105
105
+
continue
106
106
+
}
107
107
+
108
108
+
newMetadata = append(newMetadata, meta)
109
109
+
110
110
+
m.logger.Printf(" Scanned bundle %06d: %d ops, %d DIDs", num, len(ops), meta.DIDCount)
111
111
+
}
112
112
+
113
113
+
result.TotalSize = totalSize
114
114
+
115
115
+
// Rebuild index
116
116
+
m.index.Rebuild(newMetadata)
117
117
+
118
118
+
// Save index
119
119
+
if err := m.SaveIndex(); err != nil {
120
120
+
return nil, fmt.Errorf("failed to save index: %w", err)
121
121
+
}
122
122
+
123
123
+
result.IndexUpdated = true
124
124
+
125
125
+
m.logger.Printf("Index rebuilt with %d bundles", len(newMetadata))
126
126
+
127
127
+
return result, nil
128
128
+
}
129
129
+
130
130
+
// ScanDirectoryParallel scans the bundle directory in parallel and rebuilds the index
131
131
+
func (m *Manager) ScanDirectoryParallel(workers int, progressCallback func(current, total int, bytesProcessed int64)) (*DirectoryScanResult, error) {
132
132
+
result := &DirectoryScanResult{
133
133
+
BundleDir: m.config.BundleDir,
134
134
+
}
135
135
+
136
136
+
m.logger.Printf("Scanning directory (parallel, %d workers): %s", workers, m.config.BundleDir)
137
137
+
138
138
+
// Find all bundle files
139
139
+
files, err := filepath.Glob(filepath.Join(m.config.BundleDir, "*.jsonl.zst"))
140
140
+
if err != nil {
141
141
+
return nil, fmt.Errorf("failed to scan directory: %w", err)
142
142
+
}
143
143
+
files = filterBundleFiles(files)
144
144
+
145
145
+
if len(files) == 0 {
146
146
+
m.logger.Printf("No bundle files found")
147
147
+
return result, nil
148
148
+
}
149
149
+
150
150
+
// Parse bundle numbers
151
151
+
var bundleNumbers []int
152
152
+
for _, file := range files {
153
153
+
base := filepath.Base(file)
154
154
+
numStr := strings.TrimSuffix(base, ".jsonl.zst")
155
155
+
num, err := strconv.Atoi(numStr)
156
156
+
if err != nil {
157
157
+
m.logger.Printf("Warning: skipping invalid filename: %s", base)
158
158
+
continue
159
159
+
}
160
160
+
bundleNumbers = append(bundleNumbers, num)
161
161
+
}
162
162
+
163
163
+
sort.Ints(bundleNumbers)
164
164
+
165
165
+
result.BundleCount = len(bundleNumbers)
166
166
+
if len(bundleNumbers) > 0 {
167
167
+
result.FirstBundle = bundleNumbers[0]
168
168
+
result.LastBundle = bundleNumbers[len(bundleNumbers)-1]
169
169
+
}
170
170
+
171
171
+
// Find gaps
172
172
+
if len(bundleNumbers) > 1 {
173
173
+
for i := result.FirstBundle; i <= result.LastBundle; i++ {
174
174
+
found := false
175
175
+
for _, num := range bundleNumbers {
176
176
+
if num == i {
177
177
+
found = true
178
178
+
break
179
179
+
}
180
180
+
}
181
181
+
if !found {
182
182
+
result.MissingGaps = append(result.MissingGaps, i)
183
183
+
}
184
184
+
}
185
185
+
}
186
186
+
187
187
+
m.logger.Printf("Found %d bundles (gaps: %d)", result.BundleCount, len(result.MissingGaps))
188
188
+
189
189
+
// Process bundles in parallel
190
190
+
type bundleResult struct {
191
191
+
index int
192
192
+
meta *bundleindex.BundleMetadata
193
193
+
err error
194
194
+
}
195
195
+
196
196
+
jobs := make(chan int, len(bundleNumbers))
197
197
+
results := make(chan bundleResult, len(bundleNumbers))
198
198
+
199
199
+
// Start workers
200
200
+
var wg sync.WaitGroup
201
201
+
for w := 0; w < workers; w++ {
202
202
+
wg.Add(1)
203
203
+
go func() {
204
204
+
defer wg.Done()
205
205
+
for num := range jobs {
206
206
+
path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", num))
207
207
+
208
208
+
// Load and process bundle
209
209
+
ops, err := m.operations.LoadBundle(path)
210
210
+
if err != nil {
211
211
+
results <- bundleResult{index: num, err: err}
212
212
+
continue
213
213
+
}
214
214
+
215
215
+
// Use the FAST method (cursor will be set later in sequential phase)
216
216
+
meta, err := m.CalculateBundleMetadataFast(num, path, ops, "")
217
217
+
if err != nil {
218
218
+
results <- bundleResult{index: num, err: err}
219
219
+
continue
220
220
+
}
221
221
+
222
222
+
results <- bundleResult{index: num, meta: meta}
223
223
+
}
224
224
+
}()
225
225
+
}
226
226
+
227
227
+
// Send jobs
228
228
+
for _, num := range bundleNumbers {
229
229
+
jobs <- num
230
230
+
}
231
231
+
close(jobs)
232
232
+
233
233
+
// Wait for all workers to finish
234
234
+
go func() {
235
235
+
wg.Wait()
236
236
+
close(results)
237
237
+
}()
238
238
+
239
239
+
// Collect results
240
240
+
metadataMap := make(map[int]*bundleindex.BundleMetadata)
241
241
+
var totalSize int64
242
242
+
var totalUncompressed int64
243
243
+
processed := 0
244
244
+
245
245
+
for result := range results {
246
246
+
processed++
247
247
+
248
248
+
// Update progress WITH bytes
249
249
+
if progressCallback != nil {
250
250
+
if result.meta != nil {
251
251
+
totalUncompressed += result.meta.UncompressedSize
252
252
+
}
253
253
+
progressCallback(processed, len(bundleNumbers), totalUncompressed)
254
254
+
}
255
255
+
256
256
+
if result.err != nil {
257
257
+
m.logger.Printf("Warning: failed to process bundle %d: %v", result.index, result.err)
258
258
+
continue
259
259
+
}
260
260
+
metadataMap[result.index] = result.meta
261
261
+
totalSize += result.meta.CompressedSize
262
262
+
}
263
263
+
264
264
+
// Build ordered metadata slice and calculate chain hashes
265
265
+
var newMetadata []*bundleindex.BundleMetadata
266
266
+
var parent string
267
267
+
268
268
+
for i, num := range bundleNumbers {
269
269
+
meta, ok := metadataMap[num]
270
270
+
if !ok {
271
271
+
continue
272
272
+
}
273
273
+
274
274
+
// Set cursor from previous bundle's EndTime
275
275
+
if i > 0 && len(newMetadata) > 0 {
276
276
+
prevMeta := newMetadata[len(newMetadata)-1]
277
277
+
meta.Cursor = prevMeta.EndTime.Format(time.RFC3339Nano)
278
278
+
}
279
279
+
280
280
+
// Calculate chain hash (must be done sequentially)
281
281
+
meta.Hash = m.operations.CalculateChainHash(parent, meta.ContentHash)
282
282
+
meta.Parent = parent
283
283
+
284
284
+
newMetadata = append(newMetadata, meta)
285
285
+
parent = meta.Hash
286
286
+
}
287
287
+
288
288
+
result.TotalSize = totalSize
289
289
+
result.TotalUncompressed = totalUncompressed
290
290
+
291
291
+
// Rebuild index
292
292
+
m.index.Rebuild(newMetadata)
293
293
+
294
294
+
// Save index
295
295
+
if err := m.SaveIndex(); err != nil {
296
296
+
return nil, fmt.Errorf("failed to save index: %w", err)
297
297
+
}
298
298
+
299
299
+
result.IndexUpdated = true
300
300
+
301
301
+
m.logger.Printf("Index rebuilt with %d bundles", len(newMetadata))
302
302
+
303
303
+
return result, nil
304
304
+
}
305
305
+
306
306
+
// ScanBundle scans a single bundle file and returns its metadata
307
307
+
func (m *Manager) ScanBundle(path string, bundleNumber int) (*bundleindex.BundleMetadata, error) {
308
308
+
// Load bundle file
309
309
+
operations, err := m.operations.LoadBundle(path)
310
310
+
if err != nil {
311
311
+
return nil, fmt.Errorf("failed to load bundle: %w", err)
312
312
+
}
313
313
+
314
314
+
if len(operations) == 0 {
315
315
+
return nil, fmt.Errorf("bundle is empty")
316
316
+
}
317
317
+
318
318
+
// Get parent chain hash and cursor from previous bundle
319
319
+
var parent string
320
320
+
var cursor string
321
321
+
if bundleNumber > 1 {
322
322
+
if prevMeta, err := m.index.GetBundle(bundleNumber - 1); err == nil {
323
323
+
parent = prevMeta.Hash
324
324
+
cursor = prevMeta.EndTime.Format(time.RFC3339Nano)
325
325
+
}
326
326
+
}
327
327
+
328
328
+
// Use the ONE method
329
329
+
return m.CalculateBundleMetadata(bundleNumber, path, operations, parent, cursor)
330
330
+
}
331
331
+
332
332
+
// ScanAndIndexBundle scans a bundle file and adds it to the index
333
333
+
func (m *Manager) ScanAndIndexBundle(path string, bundleNumber int) (*bundleindex.BundleMetadata, error) {
334
334
+
meta, err := m.ScanBundle(path, bundleNumber)
335
335
+
if err != nil {
336
336
+
return nil, err
337
337
+
}
338
338
+
339
339
+
// Add to index
340
340
+
m.index.AddBundle(meta)
341
341
+
342
342
+
// Save index
343
343
+
if err := m.SaveIndex(); err != nil {
344
344
+
return nil, fmt.Errorf("failed to save index: %w", err)
345
345
+
}
346
346
+
347
347
+
return meta, nil
348
348
+
}
+19
-43
internal/bundle/types.go
···
5
5
"path/filepath"
6
6
"time"
7
7
8
8
+
"tangled.org/atscan.net/plcbundle/internal/bundleindex"
8
9
"tangled.org/atscan.net/plcbundle/internal/types"
9
10
"tangled.org/atscan.net/plcbundle/plcclient"
10
11
)
···
84
85
return nil
85
86
}
86
87
87
87
-
// BundleMetadata represents metadata about a bundle
88
88
-
type BundleMetadata struct {
89
89
-
BundleNumber int `json:"bundle_number"`
90
90
-
StartTime time.Time `json:"start_time"`
91
91
-
EndTime time.Time `json:"end_time"`
92
92
-
OperationCount int `json:"operation_count"`
93
93
-
DIDCount int `json:"did_count"`
94
94
-
95
95
-
// Primary hash - cumulative chain hash (includes all history)
96
96
-
Hash string `json:"hash"`
97
97
-
98
98
-
// Content hash - SHA256 of bundle operations only
99
99
-
ContentHash string `json:"content_hash"`
100
100
-
101
101
-
// Parent chain hash - links to previous bundle
102
102
-
Parent string `json:"parent,omitempty"`
103
103
-
104
104
-
// File hashes and sizes
105
105
-
CompressedHash string `json:"compressed_hash"`
106
106
-
CompressedSize int64 `json:"compressed_size"`
107
107
-
UncompressedSize int64 `json:"uncompressed_size"`
108
108
-
Cursor string `json:"cursor"`
109
109
-
CreatedAt time.Time `json:"created_at"`
110
110
-
}
111
111
-
112
112
-
func (b *Bundle) ToMetadata() *BundleMetadata {
113
113
-
return &BundleMetadata{
114
114
-
BundleNumber: b.BundleNumber,
115
115
-
StartTime: b.StartTime,
116
116
-
EndTime: b.EndTime,
117
117
-
OperationCount: b.OperationCount(),
118
118
-
DIDCount: b.DIDCount,
119
119
-
Hash: b.Hash, // Chain hash
120
120
-
ContentHash: b.ContentHash, // Content hash
121
121
-
Parent: b.Parent,
122
122
-
CompressedHash: b.CompressedHash,
123
123
-
CompressedSize: b.CompressedSize,
124
124
-
UncompressedSize: b.UncompressedSize,
125
125
-
Cursor: b.Cursor,
126
126
-
CreatedAt: b.CreatedAt,
127
127
-
}
128
128
-
}
129
129
-
130
88
// VerificationResult contains the result of bundle verification
131
89
type VerificationResult struct {
132
90
BundleNumber int
···
210
168
Bundle int
211
169
Position int
212
170
}
171
171
+
172
172
+
func (b *Bundle) ToMetadata() *bundleindex.BundleMetadata {
173
173
+
return &bundleindex.BundleMetadata{
174
174
+
BundleNumber: b.BundleNumber,
175
175
+
StartTime: b.StartTime,
176
176
+
EndTime: b.EndTime,
177
177
+
OperationCount: b.OperationCount(),
178
178
+
DIDCount: b.DIDCount,
179
179
+
Hash: b.Hash, // Chain hash
180
180
+
ContentHash: b.ContentHash, // Content hash
181
181
+
Parent: b.Parent,
182
182
+
CompressedHash: b.CompressedHash,
183
183
+
CompressedSize: b.CompressedSize,
184
184
+
UncompressedSize: b.UncompressedSize,
185
185
+
Cursor: b.Cursor,
186
186
+
CreatedAt: b.CreatedAt,
187
187
+
}
188
188
+
}
-1
internal/bundleindex/bundleindex.go
···
1
1
-
package bundleindex
+28
internal/bundleindex/metadata.go
···
1
1
+
package bundleindex
2
2
+
3
3
+
import "time"
4
4
+
5
5
+
// BundleMetadata represents metadata about a bundle
6
6
+
type BundleMetadata struct {
7
7
+
BundleNumber int `json:"bundle_number"`
8
8
+
StartTime time.Time `json:"start_time"`
9
9
+
EndTime time.Time `json:"end_time"`
10
10
+
OperationCount int `json:"operation_count"`
11
11
+
DIDCount int `json:"did_count"`
12
12
+
13
13
+
// Primary hash - cumulative chain hash (includes all history)
14
14
+
Hash string `json:"hash"`
15
15
+
16
16
+
// Content hash - SHA256 of bundle operations only
17
17
+
ContentHash string `json:"content_hash"`
18
18
+
19
19
+
// Parent chain hash - links to previous bundle
20
20
+
Parent string `json:"parent,omitempty"`
21
21
+
22
22
+
// File hashes and sizes
23
23
+
CompressedHash string `json:"compressed_hash"`
24
24
+
CompressedSize int64 `json:"compressed_size"`
25
25
+
UncompressedSize int64 `json:"uncompressed_size"`
26
26
+
Cursor string `json:"cursor"`
27
27
+
CreatedAt time.Time `json:"created_at"`
28
28
+
}
+21
internal/bundleindex/types.go
···
1
1
+
package bundleindex
2
2
+
3
3
+
// ChainVerificationResult contains the result of chain verification
4
4
+
type ChainVerificationResult struct {
5
5
+
Valid bool
6
6
+
ChainLength int
7
7
+
BrokenAt int
8
8
+
Error string
9
9
+
VerifiedBundles []int
10
10
+
}
11
11
+
12
12
+
// VerificationResult contains the result of bundle verification
13
13
+
type VerificationResult struct {
14
14
+
BundleNumber int
15
15
+
Valid bool
16
16
+
HashMatch bool
17
17
+
FileExists bool
18
18
+
Error error
19
19
+
LocalHash string
20
20
+
ExpectedHash string
21
21
+
}
+61
internal/sync/bundler.go
···
1
1
+
package sync
2
2
+
3
3
+
import (
4
4
+
"time"
5
5
+
6
6
+
"tangled.org/atscan.net/plcbundle/internal/storage"
7
7
+
"tangled.org/atscan.net/plcbundle/plcclient"
8
8
+
)
9
9
+
10
10
+
// CreateBundle creates a bundle structure from operations
11
11
+
// Note: This doesn't do hashing - that's done by Manager.SaveBundle
12
12
+
func CreateBundle(
13
13
+
bundleNumber int,
14
14
+
operations []plcclient.PLCOperation,
15
15
+
cursor string,
16
16
+
parent string,
17
17
+
ops *storage.Operations,
18
18
+
) *Bundle {
19
19
+
20
20
+
dids := ops.ExtractUniqueDIDs(operations)
21
21
+
_, boundaryCIDs := ops.GetBoundaryCIDs(operations)
22
22
+
23
23
+
cidSlice := make([]string, 0, len(boundaryCIDs))
24
24
+
for cid := range boundaryCIDs {
25
25
+
cidSlice = append(cidSlice, cid)
26
26
+
}
27
27
+
28
28
+
return &Bundle{
29
29
+
BundleNumber: bundleNumber,
30
30
+
StartTime: operations[0].CreatedAt,
31
31
+
EndTime: operations[len(operations)-1].CreatedAt,
32
32
+
Operations: operations,
33
33
+
DIDCount: len(dids),
34
34
+
Cursor: cursor,
35
35
+
Parent: parent,
36
36
+
BoundaryCIDs: cidSlice,
37
37
+
Compressed: true,
38
38
+
CreatedAt: time.Now().UTC(),
39
39
+
}
40
40
+
}
41
41
+
42
42
+
// Bundle is defined here temporarily - move to parent package later
43
43
+
type Bundle struct {
44
44
+
BundleNumber int
45
45
+
StartTime time.Time
46
46
+
EndTime time.Time
47
47
+
Operations []plcclient.PLCOperation
48
48
+
DIDCount int
49
49
+
50
50
+
Hash string
51
51
+
ContentHash string
52
52
+
Parent string
53
53
+
54
54
+
CompressedHash string
55
55
+
CompressedSize int64
56
56
+
UncompressedSize int64
57
57
+
Cursor string
58
58
+
BoundaryCIDs []string
59
59
+
Compressed bool
60
60
+
CreatedAt time.Time
61
61
+
}
+116
internal/sync/fetcher.go
···
1
1
+
package sync
2
2
+
3
3
+
import (
4
4
+
"context"
5
5
+
"fmt"
6
6
+
"time"
7
7
+
8
8
+
"tangled.org/atscan.net/plcbundle/internal/storage"
9
9
+
"tangled.org/atscan.net/plcbundle/internal/types"
10
10
+
"tangled.org/atscan.net/plcbundle/plcclient"
11
11
+
)
12
12
+
13
13
+
// Fetcher handles fetching operations from PLC directory
14
14
+
type Fetcher struct {
15
15
+
plcClient *plcclient.Client
16
16
+
operations *storage.Operations
17
17
+
logger types.Logger
18
18
+
}
19
19
+
20
20
+
// NewFetcher creates a new fetcher
21
21
+
func NewFetcher(plcClient *plcclient.Client, operations *storage.Operations, logger types.Logger) *Fetcher {
22
22
+
return &Fetcher{
23
23
+
plcClient: plcClient,
24
24
+
operations: operations,
25
25
+
logger: logger,
26
26
+
}
27
27
+
}
28
28
+
29
29
+
// FetchToMempool fetches operations and returns them
30
30
+
// Returns: operations, error
31
31
+
func (f *Fetcher) FetchToMempool(
32
32
+
ctx context.Context,
33
33
+
afterTime string,
34
34
+
prevBoundaryCIDs map[string]bool,
35
35
+
target int,
36
36
+
quiet bool,
37
37
+
currentMempoolCount int,
38
38
+
) ([]plcclient.PLCOperation, error) {
39
39
+
40
40
+
seenCIDs := make(map[string]bool)
41
41
+
42
42
+
// Mark previous boundary CIDs as seen
43
43
+
for cid := range prevBoundaryCIDs {
44
44
+
seenCIDs[cid] = true
45
45
+
}
46
46
+
47
47
+
currentAfter := afterTime
48
48
+
maxFetches := 20
49
49
+
var allNewOps []plcclient.PLCOperation
50
50
+
51
51
+
for fetchNum := 0; fetchNum < maxFetches; fetchNum++ {
52
52
+
// Calculate batch size
53
53
+
remaining := target - len(allNewOps)
54
54
+
if remaining <= 0 {
55
55
+
break
56
56
+
}
57
57
+
58
58
+
batchSize := 1000
59
59
+
if remaining < 500 {
60
60
+
batchSize = 200
61
61
+
}
62
62
+
63
63
+
if !quiet {
64
64
+
f.logger.Printf(" Fetch #%d: requesting %d operations",
65
65
+
fetchNum+1, batchSize)
66
66
+
}
67
67
+
68
68
+
batch, err := f.plcClient.Export(ctx, plcclient.ExportOptions{
69
69
+
Count: batchSize,
70
70
+
After: currentAfter,
71
71
+
})
72
72
+
if err != nil {
73
73
+
return allNewOps, fmt.Errorf("export failed: %w", err)
74
74
+
}
75
75
+
76
76
+
if len(batch) == 0 {
77
77
+
if !quiet {
78
78
+
f.logger.Printf(" No more operations available from PLC")
79
79
+
}
80
80
+
if len(allNewOps) > 0 {
81
81
+
return allNewOps, nil
82
82
+
}
83
83
+
return nil, fmt.Errorf("no operations available")
84
84
+
}
85
85
+
86
86
+
// Deduplicate
87
87
+
for _, op := range batch {
88
88
+
if !seenCIDs[op.CID] {
89
89
+
seenCIDs[op.CID] = true
90
90
+
allNewOps = append(allNewOps, op)
91
91
+
}
92
92
+
}
93
93
+
94
94
+
// Update cursor
95
95
+
if len(batch) > 0 {
96
96
+
currentAfter = batch[len(batch)-1].CreatedAt.Format(time.RFC3339Nano)
97
97
+
}
98
98
+
99
99
+
// Stop if we got less than requested
100
100
+
if len(batch) < batchSize {
101
101
+
if !quiet {
102
102
+
f.logger.Printf(" Received incomplete batch (%d/%d), caught up to latest", len(batch), batchSize)
103
103
+
}
104
104
+
break
105
105
+
}
106
106
+
}
107
107
+
108
108
+
if len(allNewOps) > 0 {
109
109
+
if !quiet {
110
110
+
f.logger.Printf("✓ Fetch complete: %d operations", len(allNewOps))
111
111
+
}
112
112
+
return allNewOps, nil
113
113
+
}
114
114
+
115
115
+
return nil, fmt.Errorf("no new operations added")
116
116
+
}