tangled
alpha
login
or
join now
atscan.net
/
plcbundle-go
1
fork
atom
[DEPRECATED] Go implementation of plcbundle
1
fork
atom
overview
issues
pulls
pipelines
fix fetchnextbundle
tree.fail
4 months ago
c1d36a29
862ebb97
+90
-63
3 changed files
expand all
collapse all
unified
split
bundle
manager.go
cmd
plcbundle
commands
server.go
internal
sync
fetcher.go
+58
-17
bundle/manager.go
···
1076
return result, nil
1077
}
1078
1079
-
// FetchNextBundle delegates to sync.Fetcher
1080
func (m *Manager) FetchNextBundle(ctx context.Context, quiet bool) (*Bundle, error) {
1081
if m.plcClient == nil {
1082
return nil, fmt.Errorf("PLC client not configured")
···
1115
m.logger.Printf("Preparing bundle %06d (mempool: %d ops)...", nextBundleNum, m.mempool.Count())
1116
}
1117
1118
-
// ✨ Fetch operations if needed (FetchToMempool loops internally)
1119
-
if m.mempool.Count() < types.BUNDLE_SIZE {
0
0
0
0
0
0
0
0
0
0
0
0
1120
newOps, err := m.syncer.FetchToMempool(
1121
ctx,
1122
afterTime,
1123
prevBoundaryCIDs,
1124
-
types.BUNDLE_SIZE-m.mempool.Count(),
1125
quiet,
1126
m.mempool.Count(),
1127
)
···
1135
}
1136
1137
if !quiet && added > 0 {
1138
-
m.logger.Printf("Added %d new operations (mempool now: %d)", added, m.mempool.Count())
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
1139
}
0
1140
}
1141
1142
-
// If fetch failed AND we don't have enough, return error
1143
-
if err != nil && m.mempool.Count() < types.BUNDLE_SIZE {
1144
-
m.mempool.Save()
1145
-
return nil, err
1146
}
1147
}
1148
0
0
0
1149
// Check if we have enough for a bundle
1150
if m.mempool.Count() < types.BUNDLE_SIZE {
1151
-
m.mempool.Save()
1152
-
return nil, fmt.Errorf("insufficient operations: have %d, need %d (no more available)",
1153
-
m.mempool.Count(), types.BUNDLE_SIZE)
0
0
0
0
1154
}
1155
1156
// Create bundle
1157
operations, err := m.mempool.Take(types.BUNDLE_SIZE)
1158
if err != nil {
1159
-
m.mempool.Save()
1160
return nil, err
1161
}
1162
1163
-
// Create bundle structure directly
1164
syncBundle := internalsync.CreateBundle(nextBundleNum, operations, afterTime, prevBundleHash, m.operations)
1165
1166
-
// Convert from sync.Bundle to bundle.Bundle inline
1167
bundle := &Bundle{
1168
BundleNumber: syncBundle.BundleNumber,
1169
StartTime: syncBundle.StartTime,
···
1175
BoundaryCIDs: syncBundle.BoundaryCIDs,
1176
Compressed: syncBundle.Compressed,
1177
CreatedAt: syncBundle.CreatedAt,
1178
-
// Note: Hash fields are empty here, will be calculated in SaveBundle
1179
}
1180
1181
-
m.mempool.Save()
0
0
0
1182
1183
return bundle, nil
1184
}
···
1076
return result, nil
1077
}
1078
1079
+
// FetchNextBundle fetches operations and creates a bundle, looping until caught up
1080
func (m *Manager) FetchNextBundle(ctx context.Context, quiet bool) (*Bundle, error) {
1081
if m.plcClient == nil {
1082
return nil, fmt.Errorf("PLC client not configured")
···
1115
m.logger.Printf("Preparing bundle %06d (mempool: %d ops)...", nextBundleNum, m.mempool.Count())
1116
}
1117
1118
+
// ✨ NEW: Loop until we have enough OR catch up to latest
1119
+
maxAttempts := 50 // Safety limit
1120
+
attempt := 0
1121
+
caughtUp := false
1122
+
1123
+
for m.mempool.Count() < types.BUNDLE_SIZE && attempt < maxAttempts {
1124
+
attempt++
1125
+
1126
+
needed := types.BUNDLE_SIZE - m.mempool.Count()
1127
+
1128
+
if !quiet && attempt > 1 {
1129
+
m.logger.Printf(" Attempt %d: Need %d more ops...", attempt, needed)
1130
+
}
1131
+
1132
newOps, err := m.syncer.FetchToMempool(
1133
ctx,
1134
afterTime,
1135
prevBoundaryCIDs,
1136
+
needed,
1137
quiet,
1138
m.mempool.Count(),
1139
)
···
1147
}
1148
1149
if !quiet && added > 0 {
1150
+
m.logger.Printf(" Added %d new operations (mempool now: %d)", added, m.mempool.Count())
1151
+
}
1152
+
1153
+
// Update cursor for next fetch
1154
+
if len(newOps) > 0 {
1155
+
afterTime = newOps[len(newOps)-1].CreatedAt.Format(time.RFC3339Nano)
1156
+
}
1157
+
}
1158
+
1159
+
// Check if we caught up (got incomplete batch)
1160
+
if err != nil || len(newOps) == 0 {
1161
+
caughtUp = true
1162
+
if !quiet {
1163
+
m.logger.Printf(" Caught up to latest PLC data")
1164
+
}
1165
+
break
1166
+
}
1167
+
1168
+
// If we got a full batch but still need more, continue looping
1169
+
if len(newOps) > 0 && m.mempool.Count() < types.BUNDLE_SIZE {
1170
+
if !quiet {
1171
+
m.logger.Printf(" Got full batch, fetching more...")
1172
}
1173
+
continue
1174
}
1175
1176
+
// If we have enough, break
1177
+
if m.mempool.Count() >= types.BUNDLE_SIZE {
1178
+
break
0
1179
}
1180
}
1181
1182
+
// Save mempool state
1183
+
m.mempool.Save()
1184
+
1185
// Check if we have enough for a bundle
1186
if m.mempool.Count() < types.BUNDLE_SIZE {
1187
+
if caughtUp {
1188
+
return nil, fmt.Errorf("insufficient operations: have %d, need %d (caught up to latest PLC data)",
1189
+
m.mempool.Count(), types.BUNDLE_SIZE)
1190
+
} else {
1191
+
return nil, fmt.Errorf("insufficient operations: have %d, need %d (max attempts reached)",
1192
+
m.mempool.Count(), types.BUNDLE_SIZE)
1193
+
}
1194
}
1195
1196
// Create bundle
1197
operations, err := m.mempool.Take(types.BUNDLE_SIZE)
1198
if err != nil {
0
1199
return nil, err
1200
}
1201
1202
+
// Create bundle structure
1203
syncBundle := internalsync.CreateBundle(nextBundleNum, operations, afterTime, prevBundleHash, m.operations)
1204
1205
+
// Convert to bundle.Bundle
1206
bundle := &Bundle{
1207
BundleNumber: syncBundle.BundleNumber,
1208
StartTime: syncBundle.StartTime,
···
1214
BoundaryCIDs: syncBundle.BoundaryCIDs,
1215
Compressed: syncBundle.Compressed,
1216
CreatedAt: syncBundle.CreatedAt,
0
1217
}
1218
1219
+
if !quiet {
1220
+
m.logger.Printf("✓ Bundle %06d ready (%d ops, %d DIDs)",
1221
+
bundle.BundleNumber, len(bundle.Operations), bundle.DIDCount)
1222
+
}
1223
1224
return bundle, nil
1225
}
+9
-33
cmd/plcbundle/commands/server.go
···
310
startBundle = lastBundle.BundleNumber + 1
311
}
312
313
-
isInitialSync := (lastBundle == nil || lastBundle.BundleNumber < 10)
314
-
315
-
if isInitialSync && !verbose {
316
-
fmt.Fprintf(os.Stderr, "[Sync] Initial sync - fast loading mode (bundle %06d → ...)\n", startBundle)
317
-
} else if verbose {
318
-
fmt.Fprintf(os.Stderr, "[Sync] Checking for new bundles (current: %06d)...\n", startBundle-1)
319
-
}
320
-
321
mempoolBefore := mgr.GetMempoolStats()["count"].(int)
322
fetchedCount := 0
323
-
consecutiveErrors := 0
324
0
325
for {
326
-
currentBundle := startBundle + fetchedCount
327
-
328
b, err := mgr.FetchNextBundle(ctx, !verbose)
329
if err != nil {
330
if isEndOfDataError(err) {
···
334
335
if fetchedCount > 0 {
336
fmt.Fprintf(os.Stderr, "[Sync] ✓ Bundle %06d | Synced: %d | Mempool: %d (+%d) | %dms\n",
337
-
currentBundle-1, fetchedCount, mempoolAfter, addedOps, duration.Milliseconds())
338
-
} else if !isInitialSync {
339
fmt.Fprintf(os.Stderr, "[Sync] ✓ Bundle %06d | Up to date | Mempool: %d (+%d) | %dms\n",
340
startBundle-1, mempoolAfter, addedOps, duration.Milliseconds())
341
}
342
break
343
}
344
345
-
consecutiveErrors++
346
-
if verbose {
347
-
fmt.Fprintf(os.Stderr, "[Sync] Error fetching bundle %06d: %v\n", currentBundle, err)
348
-
}
349
-
350
-
if consecutiveErrors >= 3 {
351
-
fmt.Fprintf(os.Stderr, "[Sync] Too many errors, stopping\n")
352
-
break
353
-
}
354
-
355
-
time.Sleep(5 * time.Second)
356
-
continue
357
}
358
359
-
consecutiveErrors = 0
360
-
361
if err := mgr.SaveBundle(ctx, b, !verbose); err != nil {
362
fmt.Fprintf(os.Stderr, "[Sync] Error saving bundle %06d: %v\n", b.BundleNumber, err)
363
break
···
366
fetchedCount++
367
368
if !verbose {
369
-
fmt.Fprintf(os.Stderr, "[Sync] ✓ %06d | hash=%s | content=%s | %d ops, %d DIDs\n",
370
-
b.BundleNumber,
371
-
b.Hash[:16]+"...",
372
-
b.ContentHash[:16]+"...",
373
-
len(b.Operations),
374
-
b.DIDCount)
375
}
376
377
time.Sleep(500 * time.Millisecond)
···
310
startBundle = lastBundle.BundleNumber + 1
311
}
312
0
0
0
0
0
0
0
0
313
mempoolBefore := mgr.GetMempoolStats()["count"].(int)
314
fetchedCount := 0
0
315
316
+
// ✨ SIMPLIFIED: Just keep calling FetchNextBundle until it fails
317
for {
0
0
318
b, err := mgr.FetchNextBundle(ctx, !verbose)
319
if err != nil {
320
if isEndOfDataError(err) {
···
324
325
if fetchedCount > 0 {
326
fmt.Fprintf(os.Stderr, "[Sync] ✓ Bundle %06d | Synced: %d | Mempool: %d (+%d) | %dms\n",
327
+
startBundle+fetchedCount-1, fetchedCount, mempoolAfter, addedOps, duration.Milliseconds())
328
+
} else {
329
fmt.Fprintf(os.Stderr, "[Sync] ✓ Bundle %06d | Up to date | Mempool: %d (+%d) | %dms\n",
330
startBundle-1, mempoolAfter, addedOps, duration.Milliseconds())
331
}
332
break
333
}
334
335
+
// Real error
336
+
fmt.Fprintf(os.Stderr, "[Sync] Error: %v\n", err)
337
+
break
0
0
0
0
0
0
0
0
0
338
}
339
340
+
// Save bundle
0
341
if err := mgr.SaveBundle(ctx, b, !verbose); err != nil {
342
fmt.Fprintf(os.Stderr, "[Sync] Error saving bundle %06d: %v\n", b.BundleNumber, err)
343
break
···
346
fetchedCount++
347
348
if !verbose {
349
+
fmt.Fprintf(os.Stderr, "[Sync] ✓ %06d | %d ops, %d DIDs\n",
350
+
b.BundleNumber, len(b.Operations), b.DIDCount)
0
0
0
0
351
}
352
353
time.Sleep(500 * time.Millisecond)
+23
-13
internal/sync/fetcher.go
···
27
}
28
29
// FetchToMempool fetches operations and returns them
0
30
func (f *Fetcher) FetchToMempool(
31
ctx context.Context,
32
afterTime string,
···
59
}
60
61
if !quiet {
62
-
f.logger.Printf(" Fetch #%d: requesting %d operations", fetchNum+1, batchSize)
0
63
}
64
65
batch, err := f.plcClient.Export(ctx, plcclient.ExportOptions{
66
Count: batchSize,
67
After: currentAfter,
68
})
0
69
if err != nil {
70
return allNewOps, fmt.Errorf("export failed: %w", err)
71
}
···
74
if !quiet {
75
f.logger.Printf(" No more operations available from PLC")
76
}
77
-
break
0
78
}
79
80
-
// Deduplicate against boundary CIDs only
81
-
// Mempool will handle deduplication of operations already in mempool
82
for _, op := range batch {
83
if !seenCIDs[op.CID] {
84
seenCIDs[op.CID] = true
···
86
}
87
}
88
0
0
0
0
0
0
0
89
// Update cursor
90
if len(batch) > 0 {
91
currentAfter = batch[len(batch)-1].CreatedAt.Format(time.RFC3339Nano)
92
}
93
94
-
// Stop if we got less than requested (caught up)
95
if len(batch) < batchSize {
96
if !quiet {
97
-
f.logger.Printf(" Received incomplete batch (%d/%d), caught up to latest", len(batch), batchSize)
0
98
}
99
-
break
100
}
101
-
}
102
103
-
if len(allNewOps) > 0 {
104
-
if !quiet {
105
-
f.logger.Printf("✓ Fetch complete: %d operations", len(allNewOps))
106
}
107
-
return allNewOps, nil
108
}
109
110
-
return nil, fmt.Errorf("no operations available (reached latest data)")
111
}
···
27
}
28
29
// FetchToMempool fetches operations and returns them
30
+
// Returns empty slice + nil error when caught up (incomplete batch received)
31
func (f *Fetcher) FetchToMempool(
32
ctx context.Context,
33
afterTime string,
···
60
}
61
62
if !quiet {
63
+
f.logger.Printf(" Fetch #%d: requesting %d operations (after: %s)",
64
+
fetchNum+1, batchSize, currentAfter[:19])
65
}
66
67
batch, err := f.plcClient.Export(ctx, plcclient.ExportOptions{
68
Count: batchSize,
69
After: currentAfter,
70
})
71
+
72
if err != nil {
73
return allNewOps, fmt.Errorf("export failed: %w", err)
74
}
···
77
if !quiet {
78
f.logger.Printf(" No more operations available from PLC")
79
}
80
+
// Return what we have (might be incomplete)
81
+
return allNewOps, nil
82
}
83
84
+
// Deduplicate
85
+
beforeDedup := len(allNewOps)
86
for _, op := range batch {
87
if !seenCIDs[op.CID] {
88
seenCIDs[op.CID] = true
···
90
}
91
}
92
93
+
if !quiet && len(batch) > 0 {
94
+
deduped := len(batch) - (len(allNewOps) - beforeDedup)
95
+
if deduped > 0 {
96
+
f.logger.Printf(" Received %d ops (%d duplicates filtered)", len(batch), deduped)
97
+
}
98
+
}
99
+
100
// Update cursor
101
if len(batch) > 0 {
102
currentAfter = batch[len(batch)-1].CreatedAt.Format(time.RFC3339Nano)
103
}
104
105
+
// ✨ KEY: Stop if we got incomplete batch (caught up!)
106
if len(batch) < batchSize {
107
if !quiet {
108
+
f.logger.Printf(" Received incomplete batch (%d/%d) → caught up to latest",
109
+
len(batch), batchSize)
110
}
111
+
return allNewOps, nil
112
}
0
113
114
+
// If we have enough, stop
115
+
if len(allNewOps) >= target {
116
+
break
117
}
0
118
}
119
120
+
return allNewOps, nil
121
}