tangled
alpha
login
or
join now
atscan.net
/
plcbundle-go
1
fork
atom
[DEPRECATED] Go implementation of plcbundle
1
fork
atom
overview
issues
pulls
pipelines
mempool saving
tree.fail
4 months ago
cf1155d7
f5294c05
+97
-123
6 changed files
expand all
collapse all
unified
split
bundle
manager.go
cmd
plcbundle
commands
backfill.go
common.go
fetch.go
server.go
internal
sync
fetcher.go
+16
-37
bundle/manager.go
···
408
}
409
410
// SaveBundle saves a bundle to disk and updates the index
411
-
func (m *Manager) SaveBundle(ctx context.Context, bundle *Bundle, quiet bool) error {
0
412
if err := bundle.ValidateForSave(); err != nil {
413
-
return fmt.Errorf("bundle validation failed: %w", err)
414
}
415
416
path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", bundle.BundleNumber))
···
418
// Save to disk
419
uncompressedHash, compressedHash, uncompressedSize, compressedSize, err := m.operations.SaveBundle(path, bundle.Operations)
420
if err != nil {
421
-
return fmt.Errorf("failed to save bundle: %w", err)
422
}
423
424
bundle.ContentHash = uncompressedHash
···
448
449
// Save index
450
if err := m.SaveIndex(); err != nil {
451
-
return fmt.Errorf("failed to save index: %w", err)
452
}
453
454
-
// Clean up old mempool (silent unless verbose)
455
oldMempoolFile := m.mempool.GetFilename()
456
if err := m.mempool.Delete(); err != nil && !quiet {
457
m.logger.Printf("Warning: failed to delete old mempool %s: %v", oldMempoolFile, err)
···
463
464
newMempool, err := mempool.NewMempool(m.config.BundleDir, nextBundle, minTimestamp, m.logger)
465
if err != nil {
466
-
return fmt.Errorf("failed to create new mempool: %w", err)
467
}
468
469
m.mempool = newMempool
470
471
-
// Update DID index if enabled (ONLY when bundle is created)
0
472
if m.didIndex != nil && m.didIndex.Exists() {
473
indexUpdateStart := time.Now()
474
475
if err := m.updateDIDIndexForBundle(ctx, bundle); err != nil {
476
m.logger.Printf("Warning: failed to update DID index: %v", err)
477
} else {
478
-
indexUpdateDuration := time.Since(indexUpdateStart)
479
480
if !quiet {
481
m.logger.Printf(" [DID Index] Updated in %s", indexUpdateDuration)
···
483
}
484
}
485
486
-
return nil
487
}
488
489
// GetMempoolStats returns mempool statistics
···
1127
m.logger.Printf("Starting cursor: %s", afterTime)
1128
}
1129
1130
-
// Track total fetches and timing
1131
totalFetches := 0
1132
maxAttempts := 50
1133
attempt := 0
···
1149
prevBoundaryCIDs,
1150
needed,
1151
quiet,
1152
-
m.mempool.Count(),
1153
totalFetches,
1154
)
1155
···
1158
// Check if we got an incomplete batch
1159
gotIncompleteBatch := len(newOps) > 0 && len(newOps) < needed && err == nil
1160
1161
-
// Add operations if we got any
1162
-
if len(newOps) > 0 {
1163
-
added, addErr := m.mempool.Add(newOps)
1164
-
if addErr != nil {
1165
-
m.mempool.Save()
1166
-
return nil, fmt.Errorf("chronological validation failed: %w", addErr)
1167
-
}
1168
-
1169
-
// ✨ ALWAYS update cursor from mempool (source of truth)
1170
afterTime = m.mempool.GetLastTime()
1171
-
1172
-
if !quiet && added > 0 {
1173
-
addRejected := len(newOps) - added
1174
-
if addRejected > 0 {
1175
-
m.logger.Printf(" Added %d ops (mempool: %d, rejected: %d dupes, cursor: %s)",
1176
-
added, m.mempool.Count(), addRejected, afterTime[:19])
1177
-
} else {
1178
-
m.logger.Printf(" Added %d ops (mempool: %d, cursor: %s)",
1179
-
added, m.mempool.Count(), afterTime[:19])
1180
-
}
1181
-
}
1182
}
1183
1184
// Stop if caught up or error
···
1190
break
1191
}
1192
1193
-
// If we have enough, break
1194
if m.mempool.Count() >= types.BUNDLE_SIZE {
1195
break
1196
}
1197
}
1198
1199
-
// Save mempool state
1200
-
m.mempool.Save()
1201
1202
totalDuration := time.Since(attemptStart)
1203
1204
-
// Check if we have enough for a bundle
1205
if m.mempool.Count() < types.BUNDLE_SIZE {
1206
if caughtUp {
1207
return nil, fmt.Errorf("insufficient operations: have %d, need %d (caught up to latest PLC data)",
···
1218
return nil, err
1219
}
1220
1221
-
// Create bundle structure
1222
syncBundle := internalsync.CreateBundle(nextBundleNum, operations, afterTime, prevBundleHash, m.operations)
1223
1224
bundle := &Bundle{
···
1237
if !quiet {
1238
avgPerFetch := float64(types.BUNDLE_SIZE) / float64(totalFetches)
1239
throughput := float64(types.BUNDLE_SIZE) / totalDuration.Seconds()
1240
-
m.logger.Printf("✓ Bundle %06d ready (%d ops, %d DIDs) - %d fetches in %s (avg %.0f unique/fetch, %.0f ops/sec)",
1241
bundle.BundleNumber, len(bundle.Operations), bundle.DIDCount,
1242
totalFetches, totalDuration.Round(time.Millisecond), avgPerFetch, throughput)
1243
}
···
408
}
409
410
// SaveBundle saves a bundle to disk and updates the index
411
+
// Returns the DID index update duration
412
+
func (m *Manager) SaveBundle(ctx context.Context, bundle *Bundle, quiet bool) (time.Duration, error) {
413
if err := bundle.ValidateForSave(); err != nil {
414
+
return 0, fmt.Errorf("bundle validation failed: %w", err)
415
}
416
417
path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", bundle.BundleNumber))
···
419
// Save to disk
420
uncompressedHash, compressedHash, uncompressedSize, compressedSize, err := m.operations.SaveBundle(path, bundle.Operations)
421
if err != nil {
422
+
return 0, fmt.Errorf("failed to save bundle: %w", err)
423
}
424
425
bundle.ContentHash = uncompressedHash
···
449
450
// Save index
451
if err := m.SaveIndex(); err != nil {
452
+
return 0, fmt.Errorf("failed to save index: %w", err)
453
}
454
455
+
// Clean up old mempool
456
oldMempoolFile := m.mempool.GetFilename()
457
if err := m.mempool.Delete(); err != nil && !quiet {
458
m.logger.Printf("Warning: failed to delete old mempool %s: %v", oldMempoolFile, err)
···
464
465
newMempool, err := mempool.NewMempool(m.config.BundleDir, nextBundle, minTimestamp, m.logger)
466
if err != nil {
467
+
return 0, fmt.Errorf("failed to create new mempool: %w", err)
468
}
469
470
m.mempool = newMempool
471
472
+
// ✨ Update DID index if enabled and track timing
473
+
var indexUpdateDuration time.Duration
474
if m.didIndex != nil && m.didIndex.Exists() {
475
indexUpdateStart := time.Now()
476
477
if err := m.updateDIDIndexForBundle(ctx, bundle); err != nil {
478
m.logger.Printf("Warning: failed to update DID index: %v", err)
479
} else {
480
+
indexUpdateDuration = time.Since(indexUpdateStart)
481
482
if !quiet {
483
m.logger.Printf(" [DID Index] Updated in %s", indexUpdateDuration)
···
485
}
486
}
487
488
+
return indexUpdateDuration, nil
489
}
490
491
// GetMempoolStats returns mempool statistics
···
1129
m.logger.Printf("Starting cursor: %s", afterTime)
1130
}
1131
0
1132
totalFetches := 0
1133
maxAttempts := 50
1134
attempt := 0
···
1150
prevBoundaryCIDs,
1151
needed,
1152
quiet,
1153
+
m.mempool,
1154
totalFetches,
1155
)
1156
···
1159
// Check if we got an incomplete batch
1160
gotIncompleteBatch := len(newOps) > 0 && len(newOps) < needed && err == nil
1161
1162
+
// Update cursor from mempool if we got new ops
1163
+
if len(newOps) > 0 && m.mempool.Count() > 0 {
0
0
0
0
0
0
0
1164
afterTime = m.mempool.GetLastTime()
0
0
0
0
0
0
0
0
0
0
0
1165
}
1166
1167
// Stop if caught up or error
···
1173
break
1174
}
1175
0
1176
if m.mempool.Count() >= types.BUNDLE_SIZE {
1177
break
1178
}
1179
}
1180
1181
+
// ✨ REMOVED: m.mempool.Save() - now handled by FetchToMempool
0
1182
1183
totalDuration := time.Since(attemptStart)
1184
0
1185
if m.mempool.Count() < types.BUNDLE_SIZE {
1186
if caughtUp {
1187
return nil, fmt.Errorf("insufficient operations: have %d, need %d (caught up to latest PLC data)",
···
1198
return nil, err
1199
}
1200
0
1201
syncBundle := internalsync.CreateBundle(nextBundleNum, operations, afterTime, prevBundleHash, m.operations)
1202
1203
bundle := &Bundle{
···
1216
if !quiet {
1217
avgPerFetch := float64(types.BUNDLE_SIZE) / float64(totalFetches)
1218
throughput := float64(types.BUNDLE_SIZE) / totalDuration.Seconds()
1219
+
m.logger.Printf("✓ Bundle %06d ready (%d ops, %d DIDs) - %d fetches in %s (avg %.0f/fetch, %.0f ops/sec)",
1220
bundle.BundleNumber, len(bundle.Operations), bundle.DIDCount,
1221
totalFetches, totalDuration.Round(time.Millisecond), avgPerFetch, throughput)
1222
}
+1
-1
cmd/plcbundle/commands/backfill.go
···
66
break
67
}
68
69
-
if err := mgr.SaveBundle(ctx, bundle, !*verbose); err != nil {
70
return fmt.Errorf("error saving: %w", err)
71
}
72
···
66
break
67
}
68
69
+
if _, err := mgr.SaveBundle(ctx, bundle, !*verbose); err != nil {
70
return fmt.Errorf("error saving: %w", err)
71
}
72
+2
-1
cmd/plcbundle/commands/common.go
···
28
RefreshMempool() error
29
ClearMempool() error
30
FetchNextBundle(ctx context.Context, quiet bool) (*bundle.Bundle, error)
31
-
SaveBundle(ctx context.Context, b *bundle.Bundle, quiet bool) error
32
GetDIDIndexStats() map[string]interface{}
33
GetDIDIndex() *didindex.Manager
34
BuildDIDIndex(ctx context.Context, progress func(int, int)) error
···
37
GetLatestDIDOperation(ctx context.Context, did string) (*plcclient.PLCOperation, error)
38
LoadOperation(ctx context.Context, bundleNum, position int) (*plcclient.PLCOperation, error)
39
CloneFromRemote(ctx context.Context, opts internalsync.CloneOptions) (*internalsync.CloneResult, error)
0
40
}
41
42
// PLCOperationWithLocation wraps operation with location info
···
28
RefreshMempool() error
29
ClearMempool() error
30
FetchNextBundle(ctx context.Context, quiet bool) (*bundle.Bundle, error)
31
+
SaveBundle(ctx context.Context, b *bundle.Bundle, quiet bool) (time.Duration, error) // ✨ Updated signature
32
GetDIDIndexStats() map[string]interface{}
33
GetDIDIndex() *didindex.Manager
34
BuildDIDIndex(ctx context.Context, progress func(int, int)) error
···
37
GetLatestDIDOperation(ctx context.Context, did string) (*plcclient.PLCOperation, error)
38
LoadOperation(ctx context.Context, bundleNum, position int) (*plcclient.PLCOperation, error)
39
CloneFromRemote(ctx context.Context, opts internalsync.CloneOptions) (*internalsync.CloneResult, error)
40
+
ResolveDID(ctx context.Context, did string) (*bundle.ResolveDIDResult, error)
41
}
42
43
// PLCOperationWithLocation wraps operation with location info
+2
-1
cmd/plcbundle/commands/fetch.go
···
88
// Reset error counter on success
89
consecutiveErrors = 0
90
91
-
if err := mgr.SaveBundle(ctx, b, !*verbose); err != nil {
0
92
return fmt.Errorf("error saving bundle %06d: %w", b.BundleNumber, err)
93
}
94
···
88
// Reset error counter on success
89
consecutiveErrors = 0
90
91
+
_, err = mgr.SaveBundle(ctx, b, !*verbose)
92
+
if err != nil {
93
return fmt.Errorf("error saving bundle %06d: %w", b.BundleNumber, err)
94
}
95
+35
-72
cmd/plcbundle/commands/server.go
···
266
267
// runSyncLoop runs the background sync loop
268
func runSyncLoop(ctx context.Context, mgr *bundle.Manager, interval time.Duration, verbose bool, resolverEnabled bool) {
269
-
// Initial sync
270
-
syncBundles(ctx, mgr, verbose, resolverEnabled)
0
271
272
-
fmt.Fprintf(os.Stderr, "[Sync] Starting sync loop (interval: %s)\n", interval)
273
274
ticker := time.NewTicker(interval)
275
defer ticker.Stop()
276
277
-
saveTicker := time.NewTicker(5 * time.Minute)
278
-
defer saveTicker.Stop()
279
-
280
for {
281
select {
282
case <-ctx.Done():
···
287
return
288
289
case <-ticker.C:
0
290
syncBundles(ctx, mgr, verbose, resolverEnabled)
291
-
292
-
case <-saveTicker.C:
293
-
stats := mgr.GetMempoolStats()
294
-
if stats["count"].(int) > 0 && verbose {
295
-
fmt.Fprintf(os.Stderr, "[Sync] Saving mempool (%d ops)\n", stats["count"])
296
-
mgr.SaveMempool()
297
-
}
298
}
299
}
300
}
301
302
-
// syncBundles performs a sync cycle with detailed progress
303
func syncBundles(ctx context.Context, mgr *bundle.Manager, verbose bool, resolverEnabled bool) {
304
cycleStart := time.Now()
305
···
312
313
mempoolBefore := mgr.GetMempoolStats()["count"].(int)
314
fetchedCount := 0
315
-
totalOps := 0
316
-
totalDIDs := 0
317
-
318
-
fmt.Fprintf(os.Stderr, "[Sync] Starting from bundle %06d (mempool: %d ops)\n",
319
-
startBundle, mempoolBefore)
320
321
// Keep fetching until caught up
322
for {
323
-
bundleStart := time.Now()
324
-
325
b, err := mgr.FetchNextBundle(ctx, !verbose)
326
if err != nil {
327
if isEndOfDataError(err) {
328
-
// Caught up - show summary
329
-
mempoolAfter := mgr.GetMempoolStats()["count"].(int)
330
-
addedOps := mempoolAfter - mempoolBefore
331
-
duration := time.Since(cycleStart)
332
-
333
-
if fetchedCount > 0 {
334
-
avgBundleTime := duration / time.Duration(fetchedCount)
335
-
opsPerSec := float64(totalOps) / duration.Seconds()
336
-
337
-
fmt.Fprintf(os.Stderr, "[Sync] ✓ Synced %d bundles in %s\n",
338
-
fetchedCount, duration.Round(time.Millisecond))
339
-
fmt.Fprintf(os.Stderr, "[Sync] Range: %06d → %06d\n",
340
-
startBundle, startBundle+fetchedCount-1)
341
-
fmt.Fprintf(os.Stderr, "[Sync] Total: %d ops, %d unique DIDs\n",
342
-
totalOps, totalDIDs)
343
-
fmt.Fprintf(os.Stderr, "[Sync] Speed: %.1f bundles/sec, %.0f ops/sec\n",
344
-
float64(fetchedCount)/duration.Seconds(), opsPerSec)
345
-
fmt.Fprintf(os.Stderr, "[Sync] Avg: %s per bundle\n",
346
-
avgBundleTime.Round(time.Millisecond))
347
-
fmt.Fprintf(os.Stderr, "[Sync] Mempool: %d ops (+%d)\n",
348
-
mempoolAfter, addedOps)
349
-
} else {
350
-
fmt.Fprintf(os.Stderr, "[Sync] ✓ Already up to date (mempool: %d, +%d ops in %s)\n",
351
-
mempoolAfter, addedOps, duration.Round(time.Millisecond))
352
-
}
353
break
354
}
355
-
356
-
// Real error
357
fmt.Fprintf(os.Stderr, "[Sync] ✗ Error: %v\n", err)
358
break
359
}
360
361
-
bundleDuration := time.Since(bundleStart)
362
-
363
-
// Save bundle
364
-
saveStart := time.Now()
365
-
if err := mgr.SaveBundle(ctx, b, !verbose); err != nil {
366
fmt.Fprintf(os.Stderr, "[Sync] ✗ Error saving bundle %06d: %v\n", b.BundleNumber, err)
367
break
368
}
369
-
saveDuration := time.Since(saveStart)
370
371
fetchedCount++
372
-
totalOps += len(b.Operations)
373
-
totalDIDs += b.DIDCount
0
0
0
0
0
0
0
374
375
-
// ✨ Enhanced progress log
376
-
totalElapsed := time.Since(cycleStart)
377
-
bundlesPerSec := float64(fetchedCount) / totalElapsed.Seconds()
378
-
opsPerSec := float64(totalOps) / totalElapsed.Seconds()
379
380
-
if verbose {
381
-
fmt.Fprintf(os.Stderr, "[Sync] ✓ Bundle %06d | %d ops, %d DIDs | fetch: %s, save: %s, total: %s\n",
382
-
b.BundleNumber, len(b.Operations), b.DIDCount,
383
-
bundleDuration.Round(time.Millisecond),
384
-
saveDuration.Round(time.Millisecond),
385
-
totalElapsed.Round(time.Millisecond))
386
} else {
387
-
fmt.Fprintf(os.Stderr, "[Sync] ✓ %06d | %d ops, %d DIDs | %.1f b/s, %.0f ops/s | %s total\n",
388
-
b.BundleNumber, len(b.Operations), b.DIDCount,
389
-
bundlesPerSec, opsPerSec, totalElapsed.Round(time.Second))
390
}
391
-
392
-
// Small delay to prevent hammering
393
-
time.Sleep(500 * time.Millisecond)
394
}
395
}
396
···
266
267
// runSyncLoop runs the background sync loop
268
func runSyncLoop(ctx context.Context, mgr *bundle.Manager, interval time.Duration, verbose bool, resolverEnabled bool) {
269
+
// ✨ Initial sync - ALWAYS show detailed progress
270
+
fmt.Fprintf(os.Stderr, "[Sync] Initial sync starting...\n")
271
+
syncBundles(ctx, mgr, true, resolverEnabled) // Force verbose=true for initial sync
272
273
+
fmt.Fprintf(os.Stderr, "[Sync] Loop started (interval: %s)\n", interval)
274
275
ticker := time.NewTicker(interval)
276
defer ticker.Stop()
277
0
0
0
278
for {
279
select {
280
case <-ctx.Done():
···
285
return
286
287
case <-ticker.C:
288
+
// ✨ Loop iterations - respect user's verbose flag
289
syncBundles(ctx, mgr, verbose, resolverEnabled)
0
0
0
0
0
0
0
290
}
291
}
292
}
293
294
+
// syncBundles performs a sync cycle
295
func syncBundles(ctx context.Context, mgr *bundle.Manager, verbose bool, resolverEnabled bool) {
296
cycleStart := time.Now()
297
···
304
305
mempoolBefore := mgr.GetMempoolStats()["count"].(int)
306
fetchedCount := 0
307
+
var totalIndexTime time.Duration
0
0
0
0
308
309
// Keep fetching until caught up
310
for {
311
+
// quiet = !verbose (show details only if verbose)
0
312
b, err := mgr.FetchNextBundle(ctx, !verbose)
313
if err != nil {
314
if isEndOfDataError(err) {
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
315
break
316
}
0
0
317
fmt.Fprintf(os.Stderr, "[Sync] ✗ Error: %v\n", err)
318
break
319
}
320
321
+
// Save bundle and track index update time
322
+
indexTime, err := mgr.SaveBundle(ctx, b, !verbose)
323
+
if err != nil {
0
0
324
fmt.Fprintf(os.Stderr, "[Sync] ✗ Error saving bundle %06d: %v\n", b.BundleNumber, err)
325
break
326
}
0
327
328
fetchedCount++
329
+
totalIndexTime += indexTime
330
+
331
+
time.Sleep(500 * time.Millisecond)
332
+
}
333
+
334
+
// ✨ ONE LINE SUMMARY with optional index timing
335
+
mempoolAfter := mgr.GetMempoolStats()["count"].(int)
336
+
addedOps := mempoolAfter - mempoolBefore
337
+
duration := time.Since(cycleStart)
338
339
+
currentBundle := startBundle + fetchedCount - 1
340
+
if fetchedCount == 0 {
341
+
currentBundle = startBundle - 1
342
+
}
343
344
+
if fetchedCount > 0 {
345
+
// Show index time if it was significant (>10ms)
346
+
if totalIndexTime > 10*time.Millisecond {
347
+
fmt.Fprintf(os.Stderr, "[Sync] ✓ Bundle %06d | Synced: %d | Mempool: %d (+%d) | %s (index: %s)\n",
348
+
currentBundle, fetchedCount, mempoolAfter, addedOps,
349
+
duration.Round(time.Millisecond), totalIndexTime.Round(time.Millisecond))
350
} else {
351
+
fmt.Fprintf(os.Stderr, "[Sync] ✓ Bundle %06d | Synced: %d | Mempool: %d (+%d) | %s\n",
352
+
currentBundle, fetchedCount, mempoolAfter, addedOps, duration.Round(time.Millisecond))
0
353
}
354
+
} else {
355
+
fmt.Fprintf(os.Stderr, "[Sync] ✓ Bundle %06d | Up to date | Mempool: %d (+%d) | %s\n",
356
+
currentBundle, mempoolAfter, addedOps, duration.Round(time.Millisecond))
357
}
358
}
359
+41
-11
internal/sync/fetcher.go
···
17
logger types.Logger
18
}
19
0
0
0
0
0
0
0
0
20
// NewFetcher creates a new fetcher
21
func NewFetcher(plcClient *plcclient.Client, operations *storage.Operations, logger types.Logger) *Fetcher {
22
return &Fetcher{
···
26
}
27
}
28
29
-
// FetchToMempool fetches operations and returns them
30
func (f *Fetcher) FetchToMempool(
31
ctx context.Context,
32
afterTime string,
33
prevBoundaryCIDs map[string]bool,
34
target int,
35
quiet bool,
36
-
currentMempoolCount int,
37
totalFetchesSoFar int,
38
) ([]plcclient.PLCOperation, int, error) {
39
···
100
return allNewOps, fetchesMade, nil
101
}
102
103
-
// Store counts for metrics
104
originalBatchSize := len(batch)
105
totalReceived += originalBatchSize
106
···
117
dupesFiltered := originalBatchSize - uniqueAdded
118
totalDupes += dupesFiltered
119
120
-
// ✨ Show fetch result with running totals
121
if !quiet {
122
opsPerSec := float64(originalBatchSize) / fetchDuration.Seconds()
123
124
if dupesFiltered > 0 {
125
-
f.logger.Printf(" → +%d unique (%d dupes) in %s • Running: %d/%d unique (%.0f ops/sec)",
126
uniqueAdded, dupesFiltered, fetchDuration, len(allNewOps), target, opsPerSec)
127
} else {
128
f.logger.Printf(" → +%d unique in %s • Running: %d/%d (%.0f ops/sec)",
···
130
}
131
}
132
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
133
// Update cursor
134
if len(batch) > 0 {
135
currentAfter = batch[len(batch)-1].CreatedAt.Format(time.RFC3339Nano)
136
}
137
138
-
// Check completeness using ORIGINAL batch size
139
if originalBatchSize < batchSize {
140
if !quiet {
141
f.logger.Printf(" Incomplete batch (%d/%d) → caught up", originalBatchSize, batchSize)
···
143
return allNewOps, fetchesMade, nil
144
}
145
146
-
// If we have enough unique ops, stop
147
if len(allNewOps) >= target {
148
break
149
}
150
}
151
152
-
// ✨ Summary at the end
153
if !quiet && fetchesMade > 0 {
154
-
dedupRate := float64(totalDupes) / float64(totalReceived) * 100
155
-
f.logger.Printf(" ✓ Fetched %d ops total, %d unique (%.1f%% dedup rate)",
156
-
totalReceived, len(allNewOps), dedupRate)
0
0
0
157
}
158
159
return allNewOps, fetchesMade, nil
···
17
logger types.Logger
18
}
19
20
+
// MempoolInterface defines what we need from mempool
21
+
type MempoolInterface interface {
22
+
Add(ops []plcclient.PLCOperation) (int, error)
23
+
Save() error
24
+
Count() int
25
+
GetLastTime() string
26
+
}
27
+
28
// NewFetcher creates a new fetcher
29
func NewFetcher(plcClient *plcclient.Client, operations *storage.Operations, logger types.Logger) *Fetcher {
30
return &Fetcher{
···
34
}
35
}
36
37
+
// FetchToMempool fetches operations and adds them to mempool (with auto-save)
38
func (f *Fetcher) FetchToMempool(
39
ctx context.Context,
40
afterTime string,
41
prevBoundaryCIDs map[string]bool,
42
target int,
43
quiet bool,
44
+
mempool MempoolInterface, // NEW: pass mempool directly
45
totalFetchesSoFar int,
46
) ([]plcclient.PLCOperation, int, error) {
47
···
108
return allNewOps, fetchesMade, nil
109
}
110
0
111
originalBatchSize := len(batch)
112
totalReceived += originalBatchSize
113
···
124
dupesFiltered := originalBatchSize - uniqueAdded
125
totalDupes += dupesFiltered
126
127
+
// Show fetch result with running totals
128
if !quiet {
129
opsPerSec := float64(originalBatchSize) / fetchDuration.Seconds()
130
131
if dupesFiltered > 0 {
132
+
f.logger.Printf(" → +%d unique (%d dupes) in %s • Running: %d/%d (%.0f ops/sec)",
133
uniqueAdded, dupesFiltered, fetchDuration, len(allNewOps), target, opsPerSec)
134
} else {
135
f.logger.Printf(" → +%d unique in %s • Running: %d/%d (%.0f ops/sec)",
···
137
}
138
}
139
140
+
// ✨ ADD TO MEMPOOL AND SAVE after each fetch
141
+
if uniqueAdded > 0 && mempool != nil {
142
+
added, addErr := mempool.Add(allNewOps[beforeDedup:])
143
+
if addErr != nil {
144
+
// Save before returning error
145
+
mempool.Save()
146
+
return allNewOps, fetchesMade, fmt.Errorf("mempool add failed: %w", addErr)
147
+
}
148
+
149
+
// ✨ Auto-save after each successful add
150
+
if err := mempool.Save(); err != nil {
151
+
f.logger.Printf(" Warning: failed to save mempool: %v", err)
152
+
}
153
+
154
+
if !quiet && added > 0 {
155
+
cursor := mempool.GetLastTime()
156
+
f.logger.Printf(" Saved to mempool: %d ops (total: %d, cursor: %s)",
157
+
added, mempool.Count(), cursor[:19])
158
+
}
159
+
}
160
+
161
// Update cursor
162
if len(batch) > 0 {
163
currentAfter = batch[len(batch)-1].CreatedAt.Format(time.RFC3339Nano)
164
}
165
166
+
// Check completeness
167
if originalBatchSize < batchSize {
168
if !quiet {
169
f.logger.Printf(" Incomplete batch (%d/%d) → caught up", originalBatchSize, batchSize)
···
171
return allNewOps, fetchesMade, nil
172
}
173
0
174
if len(allNewOps) >= target {
175
break
176
}
177
}
178
179
+
// Summary
180
if !quiet && fetchesMade > 0 {
181
+
dedupRate := 0.0
182
+
if totalReceived > 0 {
183
+
dedupRate = float64(totalDupes) / float64(totalReceived) * 100
184
+
}
185
+
f.logger.Printf(" ✓ Collected %d unique ops from %d fetches (%.1f%% dedup)",
186
+
len(allNewOps), fetchesMade, dedupRate)
187
}
188
189
return allNewOps, fetchesMade, nil