[DEPRECATED] Go implementation of plcbundle
at rust-test 237 lines 6.3 kB view raw
1package sync 2 3import ( 4 "context" 5 "fmt" 6 "time" 7 8 "tangled.org/atscan.net/plcbundle/internal/plcclient" 9 "tangled.org/atscan.net/plcbundle/internal/storage" 10 "tangled.org/atscan.net/plcbundle/internal/types" 11) 12 13// Fetcher handles fetching operations from PLC directory 14type Fetcher struct { 15 plcClient *plcclient.Client 16 operations *storage.Operations 17 logger types.Logger 18} 19 20// MempoolInterface defines what we need from mempool 21type MempoolInterface interface { 22 Add(ops []plcclient.PLCOperation) (int, error) 23 Save() error 24 SaveIfNeeded() error 25 Count() int 26 GetLastTime() string 27} 28 29// NewFetcher creates a new fetcher 30func NewFetcher(plcClient *plcclient.Client, operations *storage.Operations, logger types.Logger) *Fetcher { 31 return &Fetcher{ 32 plcClient: plcClient, 33 operations: operations, 34 logger: logger, 35 } 36} 37 38// FetchToMempool fetches operations and adds them to mempool (with auto-save) 39func (f *Fetcher) FetchToMempool( 40 ctx context.Context, 41 afterTime string, 42 prevBoundaryCIDs map[string]bool, 43 target int, 44 quiet bool, 45 mempool MempoolInterface, 46 totalFetchesSoFar int, 47) ([]plcclient.PLCOperation, int, error) { 48 49 seenCIDs := make(map[string]bool) 50 51 // Initialize current boundaries from previous bundle (or empty if first fetch) 52 currentBoundaryCIDs := prevBoundaryCIDs 53 if currentBoundaryCIDs == nil { 54 currentBoundaryCIDs = make(map[string]bool) 55 } 56 57 // Mark boundary CIDs as seen to prevent re-inclusion 58 for cid := range currentBoundaryCIDs { 59 seenCIDs[cid] = true 60 } 61 62 if !quiet && len(currentBoundaryCIDs) > 0 { 63 f.logger.Printf(" Starting with %d boundary CIDs from previous bundle", len(currentBoundaryCIDs)) 64 } 65 66 currentAfter := afterTime 67 maxFetches := 20 68 var allNewOps []plcclient.PLCOperation 69 fetchesMade := 0 70 totalReceived := 0 71 totalDupes := 0 72 73 for fetchNum := 0; fetchNum < maxFetches; fetchNum++ { 74 fetchesMade++ 75 remaining := target - len(allNewOps) 76 if remaining <= 0 { 77 break 78 } 79 80 // Smart batch sizing 81 var batchSize int 82 switch { 83 case remaining <= 50: 84 batchSize = 50 85 case remaining <= 100: 86 batchSize = 100 87 case remaining <= 500: 88 batchSize = 200 89 default: 90 batchSize = 1000 91 } 92 93 fetchStart := time.Now() 94 95 if !quiet { 96 f.logger.Printf(" Fetch #%d: requesting %d (need %d more, have %d/%d)", 97 totalFetchesSoFar+fetchesMade, batchSize, remaining, len(allNewOps), target) 98 } 99 100 batch, err := f.plcClient.Export(ctx, plcclient.ExportOptions{ 101 Count: batchSize, 102 After: currentAfter, 103 }) 104 105 fetchDuration := time.Since(fetchStart) 106 107 if err != nil { 108 return allNewOps, fetchesMade, fmt.Errorf("export failed: %w", err) 109 } 110 111 if len(batch) == 0 { 112 if !quiet { 113 f.logger.Printf(" No more operations available (in %s)", fetchDuration) 114 } 115 return allNewOps, fetchesMade, nil 116 } 117 118 originalBatchSize := len(batch) 119 totalReceived += originalBatchSize 120 121 // CRITICAL: Strip boundary duplicates using current boundaries 122 batch = f.operations.StripBoundaryDuplicates( 123 batch, 124 currentAfter, 125 currentBoundaryCIDs, 126 ) 127 128 afterStripSize := len(batch) 129 strippedCount := originalBatchSize - afterStripSize 130 131 if !quiet && strippedCount > 0 { 132 f.logger.Printf(" Stripped %d boundary duplicates from fetch", strippedCount) 133 } 134 135 // Collect new ops (not in seenCIDs) 136 beforeDedup := len(allNewOps) 137 var batchNewOps []plcclient.PLCOperation 138 139 for _, op := range batch { 140 if !seenCIDs[op.CID] { 141 batchNewOps = append(batchNewOps, op) 142 } 143 } 144 145 uniqueInBatch := len(batchNewOps) 146 dupesFiltered := afterStripSize - uniqueInBatch 147 totalDupes += dupesFiltered + strippedCount 148 149 // Try to add to mempool 150 if uniqueInBatch > 0 && mempool != nil { 151 _, addErr := mempool.Add(batchNewOps) 152 153 if addErr != nil { 154 // Add failed - don't mark as seen 155 if !quiet { 156 f.logger.Printf(" ❌ Mempool add failed: %v", addErr) 157 } 158 mempool.Save() 159 return allNewOps, fetchesMade, fmt.Errorf("mempool add failed: %w", addErr) 160 } 161 162 // Success - mark as seen 163 for _, op := range batchNewOps { 164 seenCIDs[op.CID] = true 165 } 166 allNewOps = append(allNewOps, batchNewOps...) 167 168 uniqueAdded := len(allNewOps) - beforeDedup 169 170 if !quiet { 171 opsPerSec := float64(originalBatchSize) / fetchDuration.Seconds() 172 if dupesFiltered+strippedCount > 0 { 173 f.logger.Printf(" → +%d unique (%d dupes, %d boundary) in %s • Running: %d/%d (%.0f ops/sec)", 174 uniqueAdded, dupesFiltered, strippedCount, fetchDuration, len(allNewOps), target, opsPerSec) 175 } else { 176 f.logger.Printf(" → +%d unique in %s • Running: %d/%d (%.0f ops/sec)", 177 uniqueAdded, fetchDuration, len(allNewOps), target, opsPerSec) 178 } 179 } 180 181 // CRITICAL: Calculate NEW boundary CIDs from this fetch for next iteration 182 if len(batch) > 0 { 183 boundaryTime, newBoundaryCIDs := f.operations.GetBoundaryCIDs(batch) 184 currentBoundaryCIDs = newBoundaryCIDs 185 currentAfter = boundaryTime.Format(time.RFC3339Nano) 186 187 if !quiet && len(newBoundaryCIDs) > 1 { 188 f.logger.Printf(" Updated boundaries: %d CIDs at %s", 189 len(newBoundaryCIDs), currentAfter[:19]) 190 } 191 } 192 193 // Save if threshold met 194 if err := mempool.SaveIfNeeded(); err != nil { 195 f.logger.Printf(" Warning: failed to save mempool: %v", err) 196 } 197 198 } else if uniqueInBatch > 0 { 199 // No mempool - just collect 200 for _, op := range batchNewOps { 201 seenCIDs[op.CID] = true 202 } 203 allNewOps = append(allNewOps, batchNewOps...) 204 205 // Still update boundaries even without mempool 206 if len(batch) > 0 { 207 boundaryTime, newBoundaryCIDs := f.operations.GetBoundaryCIDs(batch) 208 currentBoundaryCIDs = newBoundaryCIDs 209 currentAfter = boundaryTime.Format(time.RFC3339Nano) 210 } 211 } 212 213 // Check if incomplete batch (caught up) 214 if originalBatchSize < batchSize { 215 if !quiet { 216 f.logger.Printf(" Incomplete batch (%d/%d) → caught up", originalBatchSize, batchSize) 217 } 218 return allNewOps, fetchesMade, nil 219 } 220 221 if len(allNewOps) >= target { 222 break 223 } 224 } 225 226 // Summary 227 if !quiet && fetchesMade > 0 { 228 dedupRate := 0.0 229 if totalReceived > 0 { 230 dedupRate = float64(totalDupes) / float64(totalReceived) * 100 231 } 232 f.logger.Printf(" ✓ Collected %d unique ops from %d fetches (%.1f%% dedup)", 233 len(allNewOps), fetchesMade, dedupRate) 234 } 235 236 return allNewOps, fetchesMade, nil 237}