[DEPRECATED] Go implementation of plcbundle
1package sync
2
3import (
4 "context"
5 "fmt"
6 "io"
7 "net/http"
8 "os"
9 "path/filepath"
10 "strings"
11 "sync"
12 "time"
13
14 "github.com/goccy/go-json"
15 "tangled.org/atscan.net/plcbundle/internal/bundleindex"
16 "tangled.org/atscan.net/plcbundle/internal/storage"
17 "tangled.org/atscan.net/plcbundle/internal/types"
18)
19
20// Cloner handles cloning bundles from remote endpoints
21type Cloner struct {
22 operations *storage.Operations
23 bundleDir string
24 logger types.Logger
25}
26
27// NewCloner creates a new cloner
28func NewCloner(operations *storage.Operations, bundleDir string, logger types.Logger) *Cloner {
29 return &Cloner{
30 operations: operations,
31 bundleDir: bundleDir,
32 logger: logger,
33 }
34}
35
36// CloneOptions configures cloning behavior
37type CloneOptions struct {
38 RemoteURL string
39 Workers int
40 SkipExisting bool
41 ProgressFunc func(downloaded, total int, bytesDownloaded, bytesTotal int64)
42 SaveInterval time.Duration
43 Verbose bool
44 Logger types.Logger
45}
46
47// CloneResult contains cloning results
48type CloneResult struct {
49 RemoteBundles int
50 Downloaded int
51 Failed int
52 Skipped int
53 TotalBytes int64
54 Duration time.Duration
55 Interrupted bool
56 FailedBundles []int
57}
58
59// Clone performs the cloning operation
60func (c *Cloner) Clone(
61 ctx context.Context,
62 opts CloneOptions,
63 localIndex *bundleindex.Index,
64 updateIndex func([]int, map[int]*bundleindex.BundleMetadata, bool) error,
65) (*CloneResult, error) {
66
67 if opts.Workers <= 0 {
68 opts.Workers = 4
69 }
70 if opts.SaveInterval <= 0 {
71 opts.SaveInterval = 5 * time.Second
72 }
73 if opts.Logger == nil {
74 opts.Logger = c.logger
75 }
76
77 result := &CloneResult{}
78 startTime := time.Now()
79
80 // Step 1: Fetch remote index
81 opts.Logger.Printf("Fetching remote index from %s", opts.RemoteURL)
82 remoteIndex, err := c.loadRemoteIndex(opts.RemoteURL)
83 if err != nil {
84 return nil, fmt.Errorf("failed to load remote index: %w", err)
85 }
86
87 remoteBundles := remoteIndex.GetBundles()
88 if len(remoteBundles) == 0 {
89 opts.Logger.Printf("Remote has no bundles")
90 return result, nil
91 }
92
93 result.RemoteBundles = len(remoteBundles)
94 opts.Logger.Printf("Remote has %d bundles", len(remoteBundles))
95
96 // Step 2: Determine bundles to download
97 localBundleMap := make(map[int]*bundleindex.BundleMetadata)
98 for _, meta := range localIndex.GetBundles() {
99 localBundleMap[meta.BundleNumber] = meta
100 }
101
102 remoteBundleMap := make(map[int]*bundleindex.BundleMetadata)
103 for _, meta := range remoteBundles {
104 remoteBundleMap[meta.BundleNumber] = meta
105 }
106
107 var bundlesToDownload []int
108 var totalBytes int64
109
110 for _, meta := range remoteBundles {
111 if opts.SkipExisting && localBundleMap[meta.BundleNumber] != nil {
112 result.Skipped++
113 if opts.Verbose {
114 opts.Logger.Printf("Skipping existing bundle %06d", meta.BundleNumber)
115 }
116 continue
117 }
118 bundlesToDownload = append(bundlesToDownload, meta.BundleNumber)
119 totalBytes += meta.CompressedSize
120 }
121
122 if len(bundlesToDownload) == 0 {
123 opts.Logger.Printf("All bundles already exist locally")
124 return result, nil
125 }
126
127 opts.Logger.Printf("Downloading %d bundles (%d bytes)", len(bundlesToDownload), totalBytes)
128
129 // Step 3: Set up periodic index saving
130 saveCtx, saveCancel := context.WithCancel(ctx)
131 defer saveCancel()
132
133 var downloadedBundles []int
134 var downloadedMu sync.Mutex
135
136 saveDone := make(chan struct{})
137 go func() {
138 defer close(saveDone)
139 ticker := time.NewTicker(opts.SaveInterval)
140 defer ticker.Stop()
141
142 for {
143 select {
144 case <-saveCtx.Done():
145 return
146 case <-ticker.C:
147 downloadedMu.Lock()
148 bundles := make([]int, len(downloadedBundles))
149 copy(bundles, downloadedBundles)
150 downloadedMu.Unlock()
151
152 if opts.Verbose {
153 opts.Logger.Printf("Periodic save: updating index with %d bundles", len(bundles))
154 }
155 updateIndex(bundles, remoteBundleMap, false)
156 }
157 }
158 }()
159
160 // Step 4: Download bundles
161 successList, failedList, bytes := c.downloadBundlesConcurrent(
162 ctx,
163 opts.RemoteURL,
164 bundlesToDownload,
165 remoteBundleMap,
166 totalBytes,
167 opts.Workers,
168 opts.ProgressFunc,
169 opts.Verbose,
170 &downloadedBundles,
171 &downloadedMu,
172 )
173
174 result.Downloaded = len(successList)
175 result.Failed = len(failedList)
176 result.TotalBytes = bytes
177 result.FailedBundles = failedList
178 result.Interrupted = ctx.Err() != nil
179
180 // Stop periodic saves
181 saveCancel()
182 <-saveDone
183
184 // Step 5: Final index update
185 opts.Logger.Printf("Updating local index...")
186 if err := updateIndex(successList, remoteBundleMap, opts.Verbose); err != nil {
187 return result, fmt.Errorf("failed to update index: %w", err)
188 }
189
190 result.Duration = time.Since(startTime)
191 return result, nil
192}
193
194// loadRemoteIndex loads index from remote URL
195func (c *Cloner) loadRemoteIndex(baseURL string) (*bundleindex.Index, error) {
196 indexURL := strings.TrimSuffix(baseURL, "/") + "/index.json"
197
198 client := &http.Client{Timeout: 30 * time.Second}
199
200 resp, err := client.Get(indexURL)
201 if err != nil {
202 return nil, fmt.Errorf("failed to download: %w", err)
203 }
204 defer resp.Body.Close()
205
206 if resp.StatusCode != http.StatusOK {
207 return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
208 }
209
210 data, err := io.ReadAll(resp.Body)
211 if err != nil {
212 return nil, fmt.Errorf("failed to read response: %w", err)
213 }
214
215 var idx bundleindex.Index
216 if err := json.Unmarshal(data, &idx); err != nil {
217 return nil, fmt.Errorf("failed to parse index: %w", err)
218 }
219
220 return &idx, nil
221}
222
223// downloadBundlesConcurrent downloads bundles using worker pool
224func (c *Cloner) downloadBundlesConcurrent(
225 ctx context.Context,
226 baseURL string,
227 bundleNumbers []int,
228 remoteBundleMap map[int]*bundleindex.BundleMetadata,
229 totalBytes int64,
230 workers int,
231 progressFunc func(downloaded, total int, bytesDownloaded, bytesTotal int64),
232 verbose bool,
233 downloadedBundles *[]int,
234 downloadedMu *sync.Mutex,
235) (successList []int, failedList []int, downloadedBytes int64) {
236
237 type job struct {
238 bundleNum int
239 expectedHash string
240 }
241
242 type result struct {
243 bundleNum int
244 success bool
245 bytes int64
246 err error
247 }
248
249 jobs := make(chan job, len(bundleNumbers))
250 results := make(chan result, len(bundleNumbers))
251
252 // Shared state
253 var (
254 mu sync.Mutex
255 processedCount int
256 processedBytes int64
257 success []int
258 failed []int
259 )
260
261 // Start workers
262 var wg sync.WaitGroup
263 client := &http.Client{Timeout: 120 * time.Second}
264
265 for w := 0; w < workers; w++ {
266 wg.Add(1)
267 go func() {
268 defer wg.Done()
269 for j := range jobs {
270 // Check cancellation
271 select {
272 case <-ctx.Done():
273 results <- result{
274 bundleNum: j.bundleNum,
275 success: false,
276 err: ctx.Err(),
277 }
278 continue
279 default:
280 }
281
282 // Download bundle
283 bytes, err := c.downloadBundle(client, baseURL, j.bundleNum, j.expectedHash)
284
285 // Update progress
286 mu.Lock()
287 processedCount++
288 if err == nil {
289 processedBytes += bytes
290 success = append(success, j.bundleNum)
291 if downloadedMu != nil && downloadedBundles != nil {
292 downloadedMu.Lock()
293 *downloadedBundles = append(*downloadedBundles, j.bundleNum)
294 downloadedMu.Unlock()
295 }
296 } else {
297 failed = append(failed, j.bundleNum)
298 }
299
300 if progressFunc != nil {
301 progressFunc(processedCount, len(bundleNumbers), processedBytes, totalBytes)
302 }
303 mu.Unlock()
304
305 results <- result{
306 bundleNum: j.bundleNum,
307 success: err == nil,
308 bytes: bytes,
309 err: err,
310 }
311 }
312 }()
313 }
314
315 // Send jobs
316 for _, num := range bundleNumbers {
317 expectedHash := ""
318 if meta, exists := remoteBundleMap[num]; exists {
319 expectedHash = meta.CompressedHash
320 }
321 jobs <- job{
322 bundleNum: num,
323 expectedHash: expectedHash,
324 }
325 }
326 close(jobs)
327
328 // Wait for completion
329 go func() {
330 wg.Wait()
331 close(results)
332 }()
333
334 // Collect results
335 for res := range results {
336 if res.err != nil && res.err != context.Canceled {
337 c.logger.Printf("Failed to download bundle %06d: %v", res.bundleNum, res.err)
338 } else if res.success && verbose {
339 c.logger.Printf("✓ Downloaded and verified bundle %06d (%d bytes)", res.bundleNum, res.bytes)
340 }
341 }
342
343 mu.Lock()
344 successList = success
345 failedList = failed
346 downloadedBytes = processedBytes
347 mu.Unlock()
348
349 return
350}
351
352// downloadBundle downloads a single bundle and verifies hash
353func (c *Cloner) downloadBundle(client *http.Client, baseURL string, bundleNum int, expectedHash string) (int64, error) {
354 url := fmt.Sprintf("%s/data/%d", strings.TrimSuffix(baseURL, "/"), bundleNum)
355 filename := fmt.Sprintf("%06d.jsonl.zst", bundleNum)
356 filepath := filepath.Join(c.bundleDir, filename)
357
358 // Create request
359 req, err := http.NewRequest("GET", url, nil)
360 if err != nil {
361 return 0, err
362 }
363
364 // Download
365 resp, err := client.Do(req)
366 if err != nil {
367 return 0, err
368 }
369 defer resp.Body.Close()
370
371 if resp.StatusCode != http.StatusOK {
372 body, _ := io.ReadAll(resp.Body)
373 return 0, fmt.Errorf("HTTP %d: %s", resp.StatusCode, string(body))
374 }
375
376 // Write to temp file
377 tempPath := filepath + ".tmp"
378 outFile, err := os.Create(tempPath)
379 if err != nil {
380 return 0, err
381 }
382
383 written, err := io.Copy(outFile, resp.Body)
384 outFile.Close()
385
386 if err != nil {
387 os.Remove(tempPath)
388 return 0, err
389 }
390
391 // Verify hash
392 if expectedHash != "" {
393 valid, actualHash, err := c.operations.VerifyHash(tempPath, expectedHash)
394 if err != nil {
395 os.Remove(tempPath)
396 return 0, fmt.Errorf("hash verification failed: %w", err)
397 }
398 if !valid {
399 os.Remove(tempPath)
400 return 0, fmt.Errorf("hash mismatch: expected %s, got %s",
401 expectedHash[:16]+"...", actualHash[:16]+"...")
402 }
403 }
404
405 // Rename to final location
406 if err := os.Rename(tempPath, filepath); err != nil {
407 os.Remove(tempPath)
408 return 0, err
409 }
410
411 return written, nil
412}