[DEPRECATED] Go implementation of plcbundle
1package sync
2
3import (
4 "context"
5 "fmt"
6 "time"
7
8 "tangled.org/atscan.net/plcbundle/internal/plcclient"
9 "tangled.org/atscan.net/plcbundle/internal/storage"
10 "tangled.org/atscan.net/plcbundle/internal/types"
11)
12
13// Fetcher handles fetching operations from PLC directory
14type Fetcher struct {
15 plcClient *plcclient.Client
16 operations *storage.Operations
17 logger types.Logger
18}
19
20// MempoolInterface defines what we need from mempool
21type MempoolInterface interface {
22 Add(ops []plcclient.PLCOperation) (int, error)
23 Save() error
24 SaveIfNeeded() error
25 Count() int
26 GetLastTime() string
27}
28
29// NewFetcher creates a new fetcher
30func NewFetcher(plcClient *plcclient.Client, operations *storage.Operations, logger types.Logger) *Fetcher {
31 return &Fetcher{
32 plcClient: plcClient,
33 operations: operations,
34 logger: logger,
35 }
36}
37
38// FetchToMempool fetches operations and adds them to mempool (with auto-save)
39func (f *Fetcher) FetchToMempool(
40 ctx context.Context,
41 afterTime string,
42 prevBoundaryCIDs map[string]bool,
43 target int,
44 quiet bool,
45 mempool MempoolInterface,
46 totalFetchesSoFar int,
47) ([]plcclient.PLCOperation, int, error) {
48
49 seenCIDs := make(map[string]bool)
50
51 // Initialize current boundaries from previous bundle (or empty if first fetch)
52 currentBoundaryCIDs := prevBoundaryCIDs
53 if currentBoundaryCIDs == nil {
54 currentBoundaryCIDs = make(map[string]bool)
55 }
56
57 // Mark boundary CIDs as seen to prevent re-inclusion
58 for cid := range currentBoundaryCIDs {
59 seenCIDs[cid] = true
60 }
61
62 if !quiet && len(currentBoundaryCIDs) > 0 {
63 f.logger.Printf(" Starting with %d boundary CIDs from previous bundle", len(currentBoundaryCIDs))
64 }
65
66 currentAfter := afterTime
67 maxFetches := 20
68 var allNewOps []plcclient.PLCOperation
69 fetchesMade := 0
70 totalReceived := 0
71 totalDupes := 0
72
73 for fetchNum := 0; fetchNum < maxFetches; fetchNum++ {
74 fetchesMade++
75 remaining := target - len(allNewOps)
76 if remaining <= 0 {
77 break
78 }
79
80 // Smart batch sizing
81 var batchSize int
82 switch {
83 case remaining <= 50:
84 batchSize = 50
85 case remaining <= 100:
86 batchSize = 100
87 case remaining <= 500:
88 batchSize = 200
89 default:
90 batchSize = 1000
91 }
92
93 fetchStart := time.Now()
94
95 if !quiet {
96 f.logger.Printf(" Fetch #%d: requesting %d (need %d more, have %d/%d)",
97 totalFetchesSoFar+fetchesMade, batchSize, remaining, len(allNewOps), target)
98 }
99
100 batch, err := f.plcClient.Export(ctx, plcclient.ExportOptions{
101 Count: batchSize,
102 After: currentAfter,
103 })
104
105 fetchDuration := time.Since(fetchStart)
106
107 if err != nil {
108 return allNewOps, fetchesMade, fmt.Errorf("export failed: %w", err)
109 }
110
111 if len(batch) == 0 {
112 if !quiet {
113 f.logger.Printf(" No more operations available (in %s)", fetchDuration)
114 }
115 return allNewOps, fetchesMade, nil
116 }
117
118 originalBatchSize := len(batch)
119 totalReceived += originalBatchSize
120
121 // CRITICAL: Strip boundary duplicates using current boundaries
122 batch = f.operations.StripBoundaryDuplicates(
123 batch,
124 currentAfter,
125 currentBoundaryCIDs,
126 )
127
128 afterStripSize := len(batch)
129 strippedCount := originalBatchSize - afterStripSize
130
131 if !quiet && strippedCount > 0 {
132 f.logger.Printf(" Stripped %d boundary duplicates from fetch", strippedCount)
133 }
134
135 // Collect new ops (not in seenCIDs)
136 beforeDedup := len(allNewOps)
137 var batchNewOps []plcclient.PLCOperation
138
139 for _, op := range batch {
140 if !seenCIDs[op.CID] {
141 batchNewOps = append(batchNewOps, op)
142 }
143 }
144
145 uniqueInBatch := len(batchNewOps)
146 dupesFiltered := afterStripSize - uniqueInBatch
147 totalDupes += dupesFiltered + strippedCount
148
149 // Try to add to mempool
150 if uniqueInBatch > 0 && mempool != nil {
151 _, addErr := mempool.Add(batchNewOps)
152
153 if addErr != nil {
154 // Add failed - don't mark as seen
155 if !quiet {
156 f.logger.Printf(" ❌ Mempool add failed: %v", addErr)
157 }
158 mempool.Save()
159 return allNewOps, fetchesMade, fmt.Errorf("mempool add failed: %w", addErr)
160 }
161
162 // Success - mark as seen
163 for _, op := range batchNewOps {
164 seenCIDs[op.CID] = true
165 }
166 allNewOps = append(allNewOps, batchNewOps...)
167
168 uniqueAdded := len(allNewOps) - beforeDedup
169
170 if !quiet {
171 opsPerSec := float64(originalBatchSize) / fetchDuration.Seconds()
172 if dupesFiltered+strippedCount > 0 {
173 f.logger.Printf(" → +%d unique (%d dupes, %d boundary) in %s • Running: %d/%d (%.0f ops/sec)",
174 uniqueAdded, dupesFiltered, strippedCount, fetchDuration, len(allNewOps), target, opsPerSec)
175 } else {
176 f.logger.Printf(" → +%d unique in %s • Running: %d/%d (%.0f ops/sec)",
177 uniqueAdded, fetchDuration, len(allNewOps), target, opsPerSec)
178 }
179 }
180
181 // CRITICAL: Calculate NEW boundary CIDs from this fetch for next iteration
182 if len(batch) > 0 {
183 boundaryTime, newBoundaryCIDs := f.operations.GetBoundaryCIDs(batch)
184 currentBoundaryCIDs = newBoundaryCIDs
185 currentAfter = boundaryTime.Format(time.RFC3339Nano)
186
187 if !quiet && len(newBoundaryCIDs) > 1 {
188 f.logger.Printf(" Updated boundaries: %d CIDs at %s",
189 len(newBoundaryCIDs), currentAfter[:19])
190 }
191 }
192
193 // Save if threshold met
194 if err := mempool.SaveIfNeeded(); err != nil {
195 f.logger.Printf(" Warning: failed to save mempool: %v", err)
196 }
197
198 } else if uniqueInBatch > 0 {
199 // No mempool - just collect
200 for _, op := range batchNewOps {
201 seenCIDs[op.CID] = true
202 }
203 allNewOps = append(allNewOps, batchNewOps...)
204
205 // Still update boundaries even without mempool
206 if len(batch) > 0 {
207 boundaryTime, newBoundaryCIDs := f.operations.GetBoundaryCIDs(batch)
208 currentBoundaryCIDs = newBoundaryCIDs
209 currentAfter = boundaryTime.Format(time.RFC3339Nano)
210 }
211 }
212
213 // Check if incomplete batch (caught up)
214 if originalBatchSize < batchSize {
215 if !quiet {
216 f.logger.Printf(" Incomplete batch (%d/%d) → caught up", originalBatchSize, batchSize)
217 }
218 return allNewOps, fetchesMade, nil
219 }
220
221 if len(allNewOps) >= target {
222 break
223 }
224 }
225
226 // Summary
227 if !quiet && fetchesMade > 0 {
228 dedupRate := 0.0
229 if totalReceived > 0 {
230 dedupRate = float64(totalDupes) / float64(totalReceived) * 100
231 }
232 f.logger.Printf(" ✓ Collected %d unique ops from %d fetches (%.1f%% dedup)",
233 len(allNewOps), fetchesMade, dedupRate)
234 }
235
236 return allNewOps, fetchesMade, nil
237}