[DEPRECATED] Go implementation of plcbundle
1// internal/sync/syncer.go
2package sync
3
4import (
5 "context"
6 "time"
7
8 "tangled.org/atscan.net/plcbundle-go/internal/types"
9)
10
11// SyncLoopConfig configures continuous syncing
12type SyncLoopConfig struct {
13 Interval time.Duration
14 MaxBundles int // 0 = unlimited
15 Verbose bool
16 Logger types.Logger
17 OnBundleSynced func(bundleNum int, fetchedCount int, mempoolCount int, duration time.Duration, indexTime time.Duration)
18 SkipDIDIndex bool
19 Quiet bool
20}
21
22// DefaultSyncLoopConfig returns default configuration
23func DefaultSyncLoopConfig() *SyncLoopConfig {
24 return &SyncLoopConfig{
25 Interval: 1 * time.Minute,
26 MaxBundles: 0,
27 Verbose: false,
28 SkipDIDIndex: false,
29 Quiet: false,
30 }
31}
32
33// SyncManager is the minimal interface needed for syncing
34type SyncManager interface {
35 GetLastBundleNumber() int
36 GetMempoolCount() int
37 // Returns: bundleNumber, indexUpdateTime, error
38 FetchAndSaveNextBundle(ctx context.Context, verbose bool, quiet bool, skipDIDIndex bool) (int, *types.BundleProductionStats, error)
39 SaveMempool() error
40 BuildDIDIndex(ctx context.Context, progressCallback func(current, total int)) error
41 UpdateDIDIndexSmart(ctx context.Context, progressCallback func(current, total int)) error
42}
43
44// SyncOnce performs a single sync cycle - fetches until caught up
45func SyncOnce(ctx context.Context, mgr SyncManager, config *SyncLoopConfig) (int, error) {
46 cycleStart := time.Now()
47 startMempool := mgr.GetMempoolCount()
48
49 fetchedCount := 0
50 var totalIndexTime time.Duration
51
52 // Keep fetching until caught up (detect by checking if state changes)
53 for {
54 // Track state before fetch
55 bundleBefore := mgr.GetLastBundleNumber()
56 mempoolBefore := mgr.GetMempoolCount()
57
58 // Attempt to fetch and save next bundle
59 bundleNum, stats, err := mgr.FetchAndSaveNextBundle(ctx, config.Verbose, config.Quiet, config.SkipDIDIndex)
60
61 // Check if we made any progress
62 bundleAfter := mgr.GetLastBundleNumber()
63 mempoolAfter := mgr.GetMempoolCount()
64
65 madeProgress := bundleAfter > bundleBefore || mempoolAfter > mempoolBefore
66
67 if err != nil {
68 // If no progress and got error → caught up
69 if !madeProgress {
70 break
71 }
72
73 // We added to mempool but couldn't complete bundle yet
74 // This is fine, just stop here
75 break
76 }
77
78 // Success
79 fetchedCount++
80 if stats != nil {
81 totalIndexTime += stats.IndexTime
82 }
83
84 // Callback if provided
85 if config.OnBundleSynced != nil {
86 config.OnBundleSynced(bundleNum, fetchedCount, mempoolAfter, time.Since(cycleStart), totalIndexTime)
87 }
88
89 // Small delay between bundles
90 time.Sleep(500 * time.Millisecond)
91
92 // Check if we're still making progress
93 if !madeProgress {
94 break
95 }
96 }
97
98 // Summary output
99 if config.Logger != nil {
100 mempoolAfter := mgr.GetMempoolCount()
101 addedOps := mempoolAfter - startMempool
102 duration := time.Since(cycleStart)
103 currentBundle := mgr.GetLastBundleNumber()
104
105 if fetchedCount > 0 {
106 if totalIndexTime > 10*time.Millisecond {
107 config.Logger.Printf("[Sync] ✓ Bundle %06d | synced: %d | mempool: %d (+%d) | time: %s (index: %s)",
108 currentBundle, fetchedCount, mempoolAfter, addedOps,
109 duration.Round(time.Millisecond), totalIndexTime.Round(time.Millisecond))
110 } else {
111 config.Logger.Printf("[Sync] ✓ Bundle %06d | synced: %d | mempool: %d (+%d) | time: %s",
112 currentBundle, fetchedCount, mempoolAfter, addedOps, duration.Round(time.Millisecond))
113 }
114 } else if addedOps > 0 {
115 // No bundles but added to mempool
116 config.Logger.Printf("[Sync] ✓ Bundle %06d | mempool: %d (+%d) | time: %s",
117 currentBundle, mempoolAfter, addedOps, duration.Round(time.Millisecond))
118 } else {
119 // Already up to date
120 config.Logger.Printf("[Sync] ✓ Bundle %06d | up to date | time: %s",
121 currentBundle, duration.Round(time.Millisecond))
122 }
123 }
124
125 return fetchedCount, nil
126}
127
128// RunSyncLoop performs continuous syncing
129func RunSyncLoop(ctx context.Context, mgr SyncManager, config *SyncLoopConfig) error {
130 if config == nil {
131 config = DefaultSyncLoopConfig()
132 }
133
134 if config.Interval <= 0 {
135 config.Interval = 1 * time.Minute
136 }
137
138 bundlesSynced := 0
139
140 // Initial sync
141 if config.Logger != nil && config.MaxBundles != 1 {
142 config.Logger.Printf("[Sync] Initial sync starting...")
143 }
144
145 config.SkipDIDIndex = true
146 synced, err := SyncOnce(ctx, mgr, config)
147 if err != nil {
148 return err
149 }
150 bundlesSynced += synced
151 mgr.UpdateDIDIndexSmart(ctx, nil)
152
153 // Check if reached limit
154 if config.MaxBundles > 0 && bundlesSynced >= config.MaxBundles {
155 if config.Logger != nil {
156 config.Logger.Printf("[Sync] Reached max bundles limit (%d)", config.MaxBundles)
157 }
158 return nil
159 }
160
161 if config.Logger != nil {
162 config.Logger.Printf("[Sync] Loop started (interval: %s)", config.Interval)
163 }
164
165 ticker := time.NewTicker(config.Interval)
166 defer ticker.Stop()
167
168 for {
169 select {
170 case <-ctx.Done():
171 if config.Logger != nil {
172 config.Logger.Printf("[Sync] Stopped (total synced: %d bundles)", bundlesSynced)
173 }
174 return ctx.Err()
175
176 case <-ticker.C:
177 // Each tick, do one sync cycle (which fetches until caught up)
178 synced, err := SyncOnce(ctx, mgr, config)
179 if err != nil {
180 if config.Logger != nil {
181 config.Logger.Printf("[Sync] Error: %v", err)
182 }
183 continue
184 }
185
186 bundlesSynced += synced
187
188 // Check if reached limit
189 if config.MaxBundles > 0 && bundlesSynced >= config.MaxBundles {
190 if config.Logger != nil {
191 config.Logger.Printf("[Sync] Reached max bundles limit (%d)", config.MaxBundles)
192 }
193 return nil
194 }
195 }
196 }
197}