tangled
alpha
login
or
join now
atscan.net
/
plcbundle-go
1
fork
atom
[DEPRECATED] Go implementation of plcbundle
1
fork
atom
overview
issues
pulls
pipelines
update fetcher
tree.fail
4 months ago
a187a685
c1d36a29
+182
-51
3 changed files
expand all
collapse all
unified
split
bundle
manager.go
internal
didindex
builder.go
sync
fetcher.go
+31
-22
bundle/manager.go
···
470
471
// Update DID index if enabled (ONLY when bundle is created)
472
if m.didIndex != nil && m.didIndex.Exists() {
0
0
473
if err := m.updateDIDIndexForBundle(ctx, bundle); err != nil {
474
m.logger.Printf("Warning: failed to update DID index: %v", err)
0
0
0
0
0
0
475
}
476
}
477
···
1099
}
1100
}
1101
1102
-
// ✨ Use mempool's last time if available
1103
if m.mempool.Count() > 0 {
1104
mempoolLastTime := m.mempool.GetLastTime()
1105
if mempoolLastTime != "" {
···
1115
m.logger.Printf("Preparing bundle %06d (mempool: %d ops)...", nextBundleNum, m.mempool.Count())
1116
}
1117
1118
-
// ✨ NEW: Loop until we have enough OR catch up to latest
1119
-
maxAttempts := 50 // Safety limit
0
1120
attempt := 0
1121
caughtUp := false
0
1122
1123
for m.mempool.Count() < types.BUNDLE_SIZE && attempt < maxAttempts {
1124
attempt++
1125
-
1126
needed := types.BUNDLE_SIZE - m.mempool.Count()
1127
1128
if !quiet && attempt > 1 {
1129
m.logger.Printf(" Attempt %d: Need %d more ops...", attempt, needed)
1130
}
1131
1132
-
newOps, err := m.syncer.FetchToMempool(
1133
ctx,
1134
afterTime,
1135
prevBoundaryCIDs,
1136
needed,
1137
quiet,
1138
m.mempool.Count(),
0
1139
)
1140
0
0
0
0
0
0
1141
// Add operations if we got any
1142
if len(newOps) > 0 {
1143
added, addErr := m.mempool.Add(newOps)
···
1151
}
1152
1153
// Update cursor for next fetch
1154
-
if len(newOps) > 0 {
1155
-
afterTime = newOps[len(newOps)-1].CreatedAt.Format(time.RFC3339Nano)
1156
-
}
1157
}
1158
1159
-
// Check if we caught up (got incomplete batch)
1160
-
if err != nil || len(newOps) == 0 {
1161
caughtUp = true
1162
-
if !quiet {
1163
m.logger.Printf(" Caught up to latest PLC data")
1164
}
1165
break
1166
}
1167
1168
-
// If we got a full batch but still need more, continue looping
1169
-
if len(newOps) > 0 && m.mempool.Count() < types.BUNDLE_SIZE {
1170
-
if !quiet {
1171
-
m.logger.Printf(" Got full batch, fetching more...")
1172
-
}
1173
-
continue
1174
-
}
1175
-
1176
// If we have enough, break
1177
if m.mempool.Count() >= types.BUNDLE_SIZE {
1178
break
···
1182
// Save mempool state
1183
m.mempool.Save()
1184
0
0
1185
// Check if we have enough for a bundle
1186
if m.mempool.Count() < types.BUNDLE_SIZE {
1187
if caughtUp {
···
1202
// Create bundle structure
1203
syncBundle := internalsync.CreateBundle(nextBundleNum, operations, afterTime, prevBundleHash, m.operations)
1204
1205
-
// Convert to bundle.Bundle
1206
bundle := &Bundle{
1207
BundleNumber: syncBundle.BundleNumber,
1208
StartTime: syncBundle.StartTime,
···
1217
}
1218
1219
if !quiet {
1220
-
m.logger.Printf("✓ Bundle %06d ready (%d ops, %d DIDs)",
1221
-
bundle.BundleNumber, len(bundle.Operations), bundle.DIDCount)
0
0
1222
}
1223
1224
return bundle, nil
···
470
471
// Update DID index if enabled (ONLY when bundle is created)
472
if m.didIndex != nil && m.didIndex.Exists() {
473
+
indexUpdateStart := time.Now()
474
+
475
if err := m.updateDIDIndexForBundle(ctx, bundle); err != nil {
476
m.logger.Printf("Warning: failed to update DID index: %v", err)
477
+
} else {
478
+
indexUpdateDuration := time.Since(indexUpdateStart)
479
+
480
+
if !quiet {
481
+
m.logger.Printf(" [DID Index] Updated in %s", indexUpdateDuration)
482
+
}
483
}
484
}
485
···
1107
}
1108
}
1109
1110
+
// Use mempool's last time if available
1111
if m.mempool.Count() > 0 {
1112
mempoolLastTime := m.mempool.GetLastTime()
1113
if mempoolLastTime != "" {
···
1123
m.logger.Printf("Preparing bundle %06d (mempool: %d ops)...", nextBundleNum, m.mempool.Count())
1124
}
1125
1126
+
// Track total fetches across all attempts
1127
+
totalFetches := 0
1128
+
maxAttempts := 50
1129
attempt := 0
1130
caughtUp := false
1131
+
attemptStart := time.Now()
1132
1133
for m.mempool.Count() < types.BUNDLE_SIZE && attempt < maxAttempts {
1134
attempt++
0
1135
needed := types.BUNDLE_SIZE - m.mempool.Count()
1136
1137
if !quiet && attempt > 1 {
1138
m.logger.Printf(" Attempt %d: Need %d more ops...", attempt, needed)
1139
}
1140
1141
+
newOps, fetchCount, err := m.syncer.FetchToMempool(
1142
ctx,
1143
afterTime,
1144
prevBoundaryCIDs,
1145
needed,
1146
quiet,
1147
m.mempool.Count(),
1148
+
totalFetches, // Pass current total
1149
)
1150
1151
+
// Update total fetch counter
1152
+
totalFetches += fetchCount
1153
+
1154
+
// Check if we got an incomplete batch
1155
+
gotIncompleteBatch := len(newOps) > 0 && len(newOps) < needed && err == nil
1156
+
1157
// Add operations if we got any
1158
if len(newOps) > 0 {
1159
added, addErr := m.mempool.Add(newOps)
···
1167
}
1168
1169
// Update cursor for next fetch
1170
+
afterTime = newOps[len(newOps)-1].CreatedAt.Format(time.RFC3339Nano)
0
0
1171
}
1172
1173
+
// Stop if caught up or error
1174
+
if err != nil || len(newOps) == 0 || gotIncompleteBatch {
1175
caughtUp = true
1176
+
if !quiet && totalFetches > 0 {
1177
m.logger.Printf(" Caught up to latest PLC data")
1178
}
1179
break
1180
}
1181
0
0
0
0
0
0
0
0
1182
// If we have enough, break
1183
if m.mempool.Count() >= types.BUNDLE_SIZE {
1184
break
···
1188
// Save mempool state
1189
m.mempool.Save()
1190
1191
+
totalDuration := time.Since(attemptStart)
1192
+
1193
// Check if we have enough for a bundle
1194
if m.mempool.Count() < types.BUNDLE_SIZE {
1195
if caughtUp {
···
1210
// Create bundle structure
1211
syncBundle := internalsync.CreateBundle(nextBundleNum, operations, afterTime, prevBundleHash, m.operations)
1212
0
1213
bundle := &Bundle{
1214
BundleNumber: syncBundle.BundleNumber,
1215
StartTime: syncBundle.StartTime,
···
1224
}
1225
1226
if !quiet {
1227
+
avgPerFetch := float64(types.BUNDLE_SIZE) / float64(totalFetches)
1228
+
m.logger.Printf("✓ Bundle %06d ready (%d ops, %d DIDs) - %d fetches in %s (avg %.0f ops/fetch)",
1229
+
bundle.BundleNumber, len(bundle.Operations), bundle.DIDCount,
1230
+
totalFetches, totalDuration.Round(time.Millisecond), avgPerFetch)
1231
}
1232
1233
return bundle, nil
+109
-14
internal/didindex/builder.go
···
6
"fmt"
7
"os"
8
"path/filepath"
0
9
"sort"
0
0
10
)
11
12
// newShardBuilder creates a new shard builder
···
214
return int64(len(builder.entries)), nil
215
}
216
217
-
// UpdateIndexForBundle adds operations from a new bundle (incremental + ATOMIC)
218
func (dim *Manager) UpdateIndexForBundle(ctx context.Context, bundle *BundleData) error {
219
dim.indexMu.Lock()
220
defer dim.indexMu.Unlock()
221
222
-
// Group operations by shard
0
0
0
223
shardOps := make(map[uint8]map[string][]OpLocation)
224
225
for pos, op := range bundle.Operations {
···
241
})
242
}
243
244
-
// PHASE 1: Write ALL shards to .tmp files FIRST
0
0
0
0
0
0
245
tmpShards := make(map[uint8]string)
0
246
var deltaCount int64
0
247
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
248
for shardNum, newOps := range shardOps {
249
-
tmpPath := filepath.Join(dim.shardDir, fmt.Sprintf("%02x.idx.tmp", shardNum))
0
0
250
251
-
addedCount, err := dim.updateShardToTemp(shardNum, newOps, tmpPath)
252
-
if err != nil {
253
-
dim.cleanupTempShards(tmpShards)
254
-
return fmt.Errorf("failed to prepare shard %02x: %w", shardNum, err)
255
-
}
256
257
-
tmpShards[shardNum] = tmpPath
258
-
deltaCount += addedCount
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
259
}
260
261
-
// PHASE 2: Atomically commit ALL shards
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
262
for shardNum, tmpPath := range tmpShards {
263
finalPath := filepath.Join(dim.shardDir, fmt.Sprintf("%02x.idx", shardNum))
264
···
271
dim.invalidateShard(shardNum)
272
}
273
274
-
// PHASE 3: Update config
0
0
0
0
275
dim.config.TotalDIDs += deltaCount
276
dim.config.LastBundle = bundle.BundleNumber
277
278
-
return dim.saveIndexConfig()
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
279
}
280
281
// updateShardToTemp updates a shard and writes to temp file
···
6
"fmt"
7
"os"
8
"path/filepath"
9
+
"runtime"
10
"sort"
11
+
"sync"
12
+
"time"
13
)
14
15
// newShardBuilder creates a new shard builder
···
217
return int64(len(builder.entries)), nil
218
}
219
220
+
// UpdateIndexForBundle adds operations from a new bundle (incremental + ATOMIC + PARALLEL)
221
func (dim *Manager) UpdateIndexForBundle(ctx context.Context, bundle *BundleData) error {
222
dim.indexMu.Lock()
223
defer dim.indexMu.Unlock()
224
225
+
totalStart := time.Now()
226
+
227
+
// STEP 1: Group operations by shard
228
+
groupStart := time.Now()
229
shardOps := make(map[uint8]map[string][]OpLocation)
230
231
for pos, op := range bundle.Operations {
···
247
})
248
}
249
250
+
groupDuration := time.Since(groupStart)
251
+
dim.logger.Printf(" [DID Index] Grouped operations into %d shards in %s",
252
+
len(shardOps), groupDuration)
253
+
254
+
// STEP 2: Write ALL shards to .tmp files FIRST (PARALLEL)
255
+
writeStart := time.Now()
256
+
257
tmpShards := make(map[uint8]string)
258
+
var tmpShardsMu sync.Mutex
259
var deltaCount int64
260
+
var deltaCountMu sync.Mutex
261
262
+
// Error handling
263
+
errChan := make(chan error, len(shardOps))
264
+
265
+
// Worker pool
266
+
workers := runtime.NumCPU()
267
+
if workers > len(shardOps) {
268
+
workers = len(shardOps)
269
+
}
270
+
if workers < 1 {
271
+
workers = 1
272
+
}
273
+
274
+
semaphore := make(chan struct{}, workers)
275
+
var wg sync.WaitGroup
276
+
277
+
dim.logger.Printf(" [DID Index] Updating %d shards in parallel (%d workers)...",
278
+
len(shardOps), workers)
279
+
280
+
// Process each shard in parallel
281
for shardNum, newOps := range shardOps {
282
+
wg.Add(1)
283
+
go func(sNum uint8, ops map[string][]OpLocation) {
284
+
defer wg.Done()
285
286
+
// Acquire semaphore (limit concurrency)
287
+
semaphore <- struct{}{}
288
+
defer func() { <-semaphore }()
0
0
289
290
+
shardStart := time.Now()
291
+
tmpPath := filepath.Join(dim.shardDir, fmt.Sprintf("%02x.idx.tmp", sNum))
292
+
293
+
addedCount, err := dim.updateShardToTemp(sNum, ops, tmpPath)
294
+
if err != nil {
295
+
errChan <- fmt.Errorf("shard %02x: %w", sNum, err)
296
+
return
297
+
}
298
+
299
+
shardDuration := time.Since(shardStart)
300
+
301
+
// Update shared state
302
+
tmpShardsMu.Lock()
303
+
tmpShards[sNum] = tmpPath
304
+
tmpShardsMu.Unlock()
305
+
306
+
deltaCountMu.Lock()
307
+
deltaCount += addedCount
308
+
deltaCountMu.Unlock()
309
+
310
+
// Debug log for each shard
311
+
if dim.verbose {
312
+
dim.logger.Printf(" Shard %02x: +%d DIDs in %s (%d ops)",
313
+
sNum, addedCount, shardDuration, len(ops))
314
+
}
315
+
}(shardNum, newOps)
316
}
317
318
+
// Wait for all workers
319
+
wg.Wait()
320
+
close(errChan)
321
+
322
+
writeDuration := time.Since(writeStart)
323
+
dim.logger.Printf(" [DID Index] Wrote %d temp files in %s (%.1f shards/sec)",
324
+
len(tmpShards), writeDuration, float64(len(tmpShards))/writeDuration.Seconds())
325
+
326
+
// Check for errors
327
+
if err := <-errChan; err != nil {
328
+
dim.cleanupTempShards(tmpShards)
329
+
return err
330
+
}
331
+
332
+
// STEP 3: Atomically commit ALL shards
333
+
commitStart := time.Now()
334
+
335
for shardNum, tmpPath := range tmpShards {
336
finalPath := filepath.Join(dim.shardDir, fmt.Sprintf("%02x.idx", shardNum))
337
···
344
dim.invalidateShard(shardNum)
345
}
346
347
+
commitDuration := time.Since(commitStart)
348
+
349
+
// STEP 4: Update config
350
+
configStart := time.Now()
351
+
352
dim.config.TotalDIDs += deltaCount
353
dim.config.LastBundle = bundle.BundleNumber
354
355
+
if err := dim.saveIndexConfig(); err != nil {
356
+
return fmt.Errorf("failed to save config: %w", err)
357
+
}
358
+
359
+
configDuration := time.Since(configStart)
360
+
totalDuration := time.Since(totalStart)
361
+
362
+
// Summary log
363
+
dim.logger.Printf(" [DID Index] ✓ Bundle %06d indexed: +%d DIDs, %d shards updated in %s",
364
+
bundle.BundleNumber, deltaCount, len(tmpShards), totalDuration)
365
+
366
+
if dim.verbose {
367
+
dim.logger.Printf(" Breakdown: group=%s write=%s commit=%s config=%s",
368
+
groupDuration, writeDuration, commitDuration, configDuration)
369
+
dim.logger.Printf(" Throughput: %.0f ops/sec",
370
+
float64(len(bundle.Operations))/totalDuration.Seconds())
371
+
}
372
+
373
+
return nil
374
}
375
376
// updateShardToTemp updates a shard and writes to temp file
+42
-15
internal/sync/fetcher.go
···
35
target int,
36
quiet bool,
37
currentMempoolCount int,
38
-
) ([]plcclient.PLCOperation, error) {
0
39
40
seenCIDs := make(map[string]bool)
41
···
47
currentAfter := afterTime
48
maxFetches := 20
49
var allNewOps []plcclient.PLCOperation
0
50
51
for fetchNum := 0; fetchNum < maxFetches; fetchNum++ {
0
52
remaining := target - len(allNewOps)
53
if remaining <= 0 {
54
break
55
}
56
57
-
batchSize := 1000
58
-
if remaining < 500 {
0
0
0
0
0
0
59
batchSize = 200
0
0
60
}
0
0
61
62
if !quiet {
63
-
f.logger.Printf(" Fetch #%d: requesting %d operations (after: %s)",
64
-
fetchNum+1, batchSize, currentAfter[:19])
65
}
66
67
batch, err := f.plcClient.Export(ctx, plcclient.ExportOptions{
···
69
After: currentAfter,
70
})
71
0
0
72
if err != nil {
73
-
return allNewOps, fmt.Errorf("export failed: %w", err)
74
}
75
76
if len(batch) == 0 {
77
if !quiet {
78
-
f.logger.Printf(" No more operations available from PLC")
79
}
80
-
// Return what we have (might be incomplete)
81
-
return allNewOps, nil
82
}
83
84
// Deduplicate
···
90
}
91
}
92
93
-
if !quiet && len(batch) > 0 {
94
-
deduped := len(batch) - (len(allNewOps) - beforeDedup)
0
0
0
0
0
95
if deduped > 0 {
96
-
f.logger.Printf(" Received %d ops (%d duplicates filtered)", len(batch), deduped)
0
0
0
0
97
}
98
}
99
···
102
currentAfter = batch[len(batch)-1].CreatedAt.Format(time.RFC3339Nano)
103
}
104
105
-
// ✨ KEY: Stop if we got incomplete batch (caught up!)
106
if len(batch) < batchSize {
107
if !quiet {
108
f.logger.Printf(" Received incomplete batch (%d/%d) → caught up to latest",
109
len(batch), batchSize)
110
}
111
-
return allNewOps, nil
112
}
113
114
// If we have enough, stop
115
if len(allNewOps) >= target {
0
0
0
0
116
break
117
}
118
}
119
120
-
return allNewOps, nil
121
}
···
35
target int,
36
quiet bool,
37
currentMempoolCount int,
38
+
totalFetchesSoFar int,
39
+
) ([]plcclient.PLCOperation, int, error) {
40
41
seenCIDs := make(map[string]bool)
42
···
48
currentAfter := afterTime
49
maxFetches := 20
50
var allNewOps []plcclient.PLCOperation
51
+
fetchesMade := 0
52
53
for fetchNum := 0; fetchNum < maxFetches; fetchNum++ {
54
+
fetchesMade++
55
remaining := target - len(allNewOps)
56
if remaining <= 0 {
57
break
58
}
59
60
+
// ✨ SMART BATCH SIZING
61
+
var batchSize int
62
+
switch {
63
+
case remaining <= 50:
64
+
batchSize = 50 // Fetch exactly what we need (with small buffer)
65
+
case remaining <= 100:
66
+
batchSize = 100
67
+
case remaining <= 500:
68
batchSize = 200
69
+
default:
70
+
batchSize = 1000
71
}
72
+
73
+
fetchStart := time.Now()
74
75
if !quiet {
76
+
f.logger.Printf(" Fetch #%d: requesting %d operations (need %d, after: %s)",
77
+
totalFetchesSoFar+fetchesMade, batchSize, remaining, currentAfter[:19])
78
}
79
80
batch, err := f.plcClient.Export(ctx, plcclient.ExportOptions{
···
82
After: currentAfter,
83
})
84
85
+
fetchDuration := time.Since(fetchStart)
86
+
87
if err != nil {
88
+
return allNewOps, fetchesMade, fmt.Errorf("export failed: %w", err)
89
}
90
91
if len(batch) == 0 {
92
if !quiet {
93
+
f.logger.Printf(" No more operations available from PLC (in %s)", fetchDuration)
94
}
95
+
return allNewOps, fetchesMade, nil
0
96
}
97
98
// Deduplicate
···
104
}
105
}
106
107
+
uniqueAdded := len(allNewOps) - beforeDedup
108
+
deduped := len(batch) - uniqueAdded
109
+
110
+
// ✨ DETAILED METRICS
111
+
if !quiet {
112
+
opsPerSec := float64(len(batch)) / fetchDuration.Seconds()
113
+
114
if deduped > 0 {
115
+
f.logger.Printf(" Received %d ops (%d unique, %d dupes) in %s (%.0f ops/sec)",
116
+
len(batch), uniqueAdded, deduped, fetchDuration, opsPerSec)
117
+
} else {
118
+
f.logger.Printf(" Received %d ops in %s (%.0f ops/sec)",
119
+
len(batch), fetchDuration, opsPerSec)
120
}
121
}
122
···
125
currentAfter = batch[len(batch)-1].CreatedAt.Format(time.RFC3339Nano)
126
}
127
128
+
// Stop if we got incomplete batch (caught up!)
129
if len(batch) < batchSize {
130
if !quiet {
131
f.logger.Printf(" Received incomplete batch (%d/%d) → caught up to latest",
132
len(batch), batchSize)
133
}
134
+
return allNewOps, fetchesMade, nil
135
}
136
137
// If we have enough, stop
138
if len(allNewOps) >= target {
139
+
if !quiet {
140
+
f.logger.Printf(" ✓ Target reached (%d/%d unique ops collected)",
141
+
len(allNewOps), target)
142
+
}
143
break
144
}
145
}
146
147
+
return allNewOps, fetchesMade, nil
148
}