[DEPRECATED] Go implementation of plcbundle
at main 197 lines 5.5 kB view raw
1// internal/sync/syncer.go 2package sync 3 4import ( 5 "context" 6 "time" 7 8 "tangled.org/atscan.net/plcbundle-go/internal/types" 9) 10 11// SyncLoopConfig configures continuous syncing 12type SyncLoopConfig struct { 13 Interval time.Duration 14 MaxBundles int // 0 = unlimited 15 Verbose bool 16 Logger types.Logger 17 OnBundleSynced func(bundleNum int, fetchedCount int, mempoolCount int, duration time.Duration, indexTime time.Duration) 18 SkipDIDIndex bool 19 Quiet bool 20} 21 22// DefaultSyncLoopConfig returns default configuration 23func DefaultSyncLoopConfig() *SyncLoopConfig { 24 return &SyncLoopConfig{ 25 Interval: 1 * time.Minute, 26 MaxBundles: 0, 27 Verbose: false, 28 SkipDIDIndex: false, 29 Quiet: false, 30 } 31} 32 33// SyncManager is the minimal interface needed for syncing 34type SyncManager interface { 35 GetLastBundleNumber() int 36 GetMempoolCount() int 37 // Returns: bundleNumber, indexUpdateTime, error 38 FetchAndSaveNextBundle(ctx context.Context, verbose bool, quiet bool, skipDIDIndex bool) (int, *types.BundleProductionStats, error) 39 SaveMempool() error 40 BuildDIDIndex(ctx context.Context, progressCallback func(current, total int)) error 41 UpdateDIDIndexSmart(ctx context.Context, progressCallback func(current, total int)) error 42} 43 44// SyncOnce performs a single sync cycle - fetches until caught up 45func SyncOnce(ctx context.Context, mgr SyncManager, config *SyncLoopConfig) (int, error) { 46 cycleStart := time.Now() 47 startMempool := mgr.GetMempoolCount() 48 49 fetchedCount := 0 50 var totalIndexTime time.Duration 51 52 // Keep fetching until caught up (detect by checking if state changes) 53 for { 54 // Track state before fetch 55 bundleBefore := mgr.GetLastBundleNumber() 56 mempoolBefore := mgr.GetMempoolCount() 57 58 // Attempt to fetch and save next bundle 59 bundleNum, stats, err := mgr.FetchAndSaveNextBundle(ctx, config.Verbose, config.Quiet, config.SkipDIDIndex) 60 61 // Check if we made any progress 62 bundleAfter := mgr.GetLastBundleNumber() 63 mempoolAfter := mgr.GetMempoolCount() 64 65 madeProgress := bundleAfter > bundleBefore || mempoolAfter > mempoolBefore 66 67 if err != nil { 68 // If no progress and got error → caught up 69 if !madeProgress { 70 break 71 } 72 73 // We added to mempool but couldn't complete bundle yet 74 // This is fine, just stop here 75 break 76 } 77 78 // Success 79 fetchedCount++ 80 if stats != nil { 81 totalIndexTime += stats.IndexTime 82 } 83 84 // Callback if provided 85 if config.OnBundleSynced != nil { 86 config.OnBundleSynced(bundleNum, fetchedCount, mempoolAfter, time.Since(cycleStart), totalIndexTime) 87 } 88 89 // Small delay between bundles 90 time.Sleep(500 * time.Millisecond) 91 92 // Check if we're still making progress 93 if !madeProgress { 94 break 95 } 96 } 97 98 // Summary output 99 if config.Logger != nil { 100 mempoolAfter := mgr.GetMempoolCount() 101 addedOps := mempoolAfter - startMempool 102 duration := time.Since(cycleStart) 103 currentBundle := mgr.GetLastBundleNumber() 104 105 if fetchedCount > 0 { 106 if totalIndexTime > 10*time.Millisecond { 107 config.Logger.Printf("[Sync] ✓ Bundle %06d | synced: %d | mempool: %d (+%d) | time: %s (index: %s)", 108 currentBundle, fetchedCount, mempoolAfter, addedOps, 109 duration.Round(time.Millisecond), totalIndexTime.Round(time.Millisecond)) 110 } else { 111 config.Logger.Printf("[Sync] ✓ Bundle %06d | synced: %d | mempool: %d (+%d) | time: %s", 112 currentBundle, fetchedCount, mempoolAfter, addedOps, duration.Round(time.Millisecond)) 113 } 114 } else if addedOps > 0 { 115 // No bundles but added to mempool 116 config.Logger.Printf("[Sync] ✓ Bundle %06d | mempool: %d (+%d) | time: %s", 117 currentBundle, mempoolAfter, addedOps, duration.Round(time.Millisecond)) 118 } else { 119 // Already up to date 120 config.Logger.Printf("[Sync] ✓ Bundle %06d | up to date | time: %s", 121 currentBundle, duration.Round(time.Millisecond)) 122 } 123 } 124 125 return fetchedCount, nil 126} 127 128// RunSyncLoop performs continuous syncing 129func RunSyncLoop(ctx context.Context, mgr SyncManager, config *SyncLoopConfig) error { 130 if config == nil { 131 config = DefaultSyncLoopConfig() 132 } 133 134 if config.Interval <= 0 { 135 config.Interval = 1 * time.Minute 136 } 137 138 bundlesSynced := 0 139 140 // Initial sync 141 if config.Logger != nil && config.MaxBundles != 1 { 142 config.Logger.Printf("[Sync] Initial sync starting...") 143 } 144 145 config.SkipDIDIndex = true 146 synced, err := SyncOnce(ctx, mgr, config) 147 if err != nil { 148 return err 149 } 150 bundlesSynced += synced 151 mgr.UpdateDIDIndexSmart(ctx, nil) 152 153 // Check if reached limit 154 if config.MaxBundles > 0 && bundlesSynced >= config.MaxBundles { 155 if config.Logger != nil { 156 config.Logger.Printf("[Sync] Reached max bundles limit (%d)", config.MaxBundles) 157 } 158 return nil 159 } 160 161 if config.Logger != nil { 162 config.Logger.Printf("[Sync] Loop started (interval: %s)", config.Interval) 163 } 164 165 ticker := time.NewTicker(config.Interval) 166 defer ticker.Stop() 167 168 for { 169 select { 170 case <-ctx.Done(): 171 if config.Logger != nil { 172 config.Logger.Printf("[Sync] Stopped (total synced: %d bundles)", bundlesSynced) 173 } 174 return ctx.Err() 175 176 case <-ticker.C: 177 // Each tick, do one sync cycle (which fetches until caught up) 178 synced, err := SyncOnce(ctx, mgr, config) 179 if err != nil { 180 if config.Logger != nil { 181 config.Logger.Printf("[Sync] Error: %v", err) 182 } 183 continue 184 } 185 186 bundlesSynced += synced 187 188 // Check if reached limit 189 if config.MaxBundles > 0 && bundlesSynced >= config.MaxBundles { 190 if config.Logger != nil { 191 config.Logger.Printf("[Sync] Reached max bundles limit (%d)", config.MaxBundles) 192 } 193 return nil 194 } 195 } 196 } 197}