[DEPRECATED] Go implementation of plcbundle
at rust-test 412 lines 9.7 kB view raw
1package sync 2 3import ( 4 "context" 5 "fmt" 6 "io" 7 "net/http" 8 "os" 9 "path/filepath" 10 "strings" 11 "sync" 12 "time" 13 14 "github.com/goccy/go-json" 15 "tangled.org/atscan.net/plcbundle/internal/bundleindex" 16 "tangled.org/atscan.net/plcbundle/internal/storage" 17 "tangled.org/atscan.net/plcbundle/internal/types" 18) 19 20// Cloner handles cloning bundles from remote endpoints 21type Cloner struct { 22 operations *storage.Operations 23 bundleDir string 24 logger types.Logger 25} 26 27// NewCloner creates a new cloner 28func NewCloner(operations *storage.Operations, bundleDir string, logger types.Logger) *Cloner { 29 return &Cloner{ 30 operations: operations, 31 bundleDir: bundleDir, 32 logger: logger, 33 } 34} 35 36// CloneOptions configures cloning behavior 37type CloneOptions struct { 38 RemoteURL string 39 Workers int 40 SkipExisting bool 41 ProgressFunc func(downloaded, total int, bytesDownloaded, bytesTotal int64) 42 SaveInterval time.Duration 43 Verbose bool 44 Logger types.Logger 45} 46 47// CloneResult contains cloning results 48type CloneResult struct { 49 RemoteBundles int 50 Downloaded int 51 Failed int 52 Skipped int 53 TotalBytes int64 54 Duration time.Duration 55 Interrupted bool 56 FailedBundles []int 57} 58 59// Clone performs the cloning operation 60func (c *Cloner) Clone( 61 ctx context.Context, 62 opts CloneOptions, 63 localIndex *bundleindex.Index, 64 updateIndex func([]int, map[int]*bundleindex.BundleMetadata, bool) error, 65) (*CloneResult, error) { 66 67 if opts.Workers <= 0 { 68 opts.Workers = 4 69 } 70 if opts.SaveInterval <= 0 { 71 opts.SaveInterval = 5 * time.Second 72 } 73 if opts.Logger == nil { 74 opts.Logger = c.logger 75 } 76 77 result := &CloneResult{} 78 startTime := time.Now() 79 80 // Step 1: Fetch remote index 81 opts.Logger.Printf("Fetching remote index from %s", opts.RemoteURL) 82 remoteIndex, err := c.loadRemoteIndex(opts.RemoteURL) 83 if err != nil { 84 return nil, fmt.Errorf("failed to load remote index: %w", err) 85 } 86 87 remoteBundles := remoteIndex.GetBundles() 88 if len(remoteBundles) == 0 { 89 opts.Logger.Printf("Remote has no bundles") 90 return result, nil 91 } 92 93 result.RemoteBundles = len(remoteBundles) 94 opts.Logger.Printf("Remote has %d bundles", len(remoteBundles)) 95 96 // Step 2: Determine bundles to download 97 localBundleMap := make(map[int]*bundleindex.BundleMetadata) 98 for _, meta := range localIndex.GetBundles() { 99 localBundleMap[meta.BundleNumber] = meta 100 } 101 102 remoteBundleMap := make(map[int]*bundleindex.BundleMetadata) 103 for _, meta := range remoteBundles { 104 remoteBundleMap[meta.BundleNumber] = meta 105 } 106 107 var bundlesToDownload []int 108 var totalBytes int64 109 110 for _, meta := range remoteBundles { 111 if opts.SkipExisting && localBundleMap[meta.BundleNumber] != nil { 112 result.Skipped++ 113 if opts.Verbose { 114 opts.Logger.Printf("Skipping existing bundle %06d", meta.BundleNumber) 115 } 116 continue 117 } 118 bundlesToDownload = append(bundlesToDownload, meta.BundleNumber) 119 totalBytes += meta.CompressedSize 120 } 121 122 if len(bundlesToDownload) == 0 { 123 opts.Logger.Printf("All bundles already exist locally") 124 return result, nil 125 } 126 127 opts.Logger.Printf("Downloading %d bundles (%d bytes)", len(bundlesToDownload), totalBytes) 128 129 // Step 3: Set up periodic index saving 130 saveCtx, saveCancel := context.WithCancel(ctx) 131 defer saveCancel() 132 133 var downloadedBundles []int 134 var downloadedMu sync.Mutex 135 136 saveDone := make(chan struct{}) 137 go func() { 138 defer close(saveDone) 139 ticker := time.NewTicker(opts.SaveInterval) 140 defer ticker.Stop() 141 142 for { 143 select { 144 case <-saveCtx.Done(): 145 return 146 case <-ticker.C: 147 downloadedMu.Lock() 148 bundles := make([]int, len(downloadedBundles)) 149 copy(bundles, downloadedBundles) 150 downloadedMu.Unlock() 151 152 if opts.Verbose { 153 opts.Logger.Printf("Periodic save: updating index with %d bundles", len(bundles)) 154 } 155 updateIndex(bundles, remoteBundleMap, false) 156 } 157 } 158 }() 159 160 // Step 4: Download bundles 161 successList, failedList, bytes := c.downloadBundlesConcurrent( 162 ctx, 163 opts.RemoteURL, 164 bundlesToDownload, 165 remoteBundleMap, 166 totalBytes, 167 opts.Workers, 168 opts.ProgressFunc, 169 opts.Verbose, 170 &downloadedBundles, 171 &downloadedMu, 172 ) 173 174 result.Downloaded = len(successList) 175 result.Failed = len(failedList) 176 result.TotalBytes = bytes 177 result.FailedBundles = failedList 178 result.Interrupted = ctx.Err() != nil 179 180 // Stop periodic saves 181 saveCancel() 182 <-saveDone 183 184 // Step 5: Final index update 185 opts.Logger.Printf("Updating local index...") 186 if err := updateIndex(successList, remoteBundleMap, opts.Verbose); err != nil { 187 return result, fmt.Errorf("failed to update index: %w", err) 188 } 189 190 result.Duration = time.Since(startTime) 191 return result, nil 192} 193 194// loadRemoteIndex loads index from remote URL 195func (c *Cloner) loadRemoteIndex(baseURL string) (*bundleindex.Index, error) { 196 indexURL := strings.TrimSuffix(baseURL, "/") + "/index.json" 197 198 client := &http.Client{Timeout: 30 * time.Second} 199 200 resp, err := client.Get(indexURL) 201 if err != nil { 202 return nil, fmt.Errorf("failed to download: %w", err) 203 } 204 defer resp.Body.Close() 205 206 if resp.StatusCode != http.StatusOK { 207 return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode) 208 } 209 210 data, err := io.ReadAll(resp.Body) 211 if err != nil { 212 return nil, fmt.Errorf("failed to read response: %w", err) 213 } 214 215 var idx bundleindex.Index 216 if err := json.Unmarshal(data, &idx); err != nil { 217 return nil, fmt.Errorf("failed to parse index: %w", err) 218 } 219 220 return &idx, nil 221} 222 223// downloadBundlesConcurrent downloads bundles using worker pool 224func (c *Cloner) downloadBundlesConcurrent( 225 ctx context.Context, 226 baseURL string, 227 bundleNumbers []int, 228 remoteBundleMap map[int]*bundleindex.BundleMetadata, 229 totalBytes int64, 230 workers int, 231 progressFunc func(downloaded, total int, bytesDownloaded, bytesTotal int64), 232 verbose bool, 233 downloadedBundles *[]int, 234 downloadedMu *sync.Mutex, 235) (successList []int, failedList []int, downloadedBytes int64) { 236 237 type job struct { 238 bundleNum int 239 expectedHash string 240 } 241 242 type result struct { 243 bundleNum int 244 success bool 245 bytes int64 246 err error 247 } 248 249 jobs := make(chan job, len(bundleNumbers)) 250 results := make(chan result, len(bundleNumbers)) 251 252 // Shared state 253 var ( 254 mu sync.Mutex 255 processedCount int 256 processedBytes int64 257 success []int 258 failed []int 259 ) 260 261 // Start workers 262 var wg sync.WaitGroup 263 client := &http.Client{Timeout: 120 * time.Second} 264 265 for w := 0; w < workers; w++ { 266 wg.Add(1) 267 go func() { 268 defer wg.Done() 269 for j := range jobs { 270 // Check cancellation 271 select { 272 case <-ctx.Done(): 273 results <- result{ 274 bundleNum: j.bundleNum, 275 success: false, 276 err: ctx.Err(), 277 } 278 continue 279 default: 280 } 281 282 // Download bundle 283 bytes, err := c.downloadBundle(client, baseURL, j.bundleNum, j.expectedHash) 284 285 // Update progress 286 mu.Lock() 287 processedCount++ 288 if err == nil { 289 processedBytes += bytes 290 success = append(success, j.bundleNum) 291 if downloadedMu != nil && downloadedBundles != nil { 292 downloadedMu.Lock() 293 *downloadedBundles = append(*downloadedBundles, j.bundleNum) 294 downloadedMu.Unlock() 295 } 296 } else { 297 failed = append(failed, j.bundleNum) 298 } 299 300 if progressFunc != nil { 301 progressFunc(processedCount, len(bundleNumbers), processedBytes, totalBytes) 302 } 303 mu.Unlock() 304 305 results <- result{ 306 bundleNum: j.bundleNum, 307 success: err == nil, 308 bytes: bytes, 309 err: err, 310 } 311 } 312 }() 313 } 314 315 // Send jobs 316 for _, num := range bundleNumbers { 317 expectedHash := "" 318 if meta, exists := remoteBundleMap[num]; exists { 319 expectedHash = meta.CompressedHash 320 } 321 jobs <- job{ 322 bundleNum: num, 323 expectedHash: expectedHash, 324 } 325 } 326 close(jobs) 327 328 // Wait for completion 329 go func() { 330 wg.Wait() 331 close(results) 332 }() 333 334 // Collect results 335 for res := range results { 336 if res.err != nil && res.err != context.Canceled { 337 c.logger.Printf("Failed to download bundle %06d: %v", res.bundleNum, res.err) 338 } else if res.success && verbose { 339 c.logger.Printf("✓ Downloaded and verified bundle %06d (%d bytes)", res.bundleNum, res.bytes) 340 } 341 } 342 343 mu.Lock() 344 successList = success 345 failedList = failed 346 downloadedBytes = processedBytes 347 mu.Unlock() 348 349 return 350} 351 352// downloadBundle downloads a single bundle and verifies hash 353func (c *Cloner) downloadBundle(client *http.Client, baseURL string, bundleNum int, expectedHash string) (int64, error) { 354 url := fmt.Sprintf("%s/data/%d", strings.TrimSuffix(baseURL, "/"), bundleNum) 355 filename := fmt.Sprintf("%06d.jsonl.zst", bundleNum) 356 filepath := filepath.Join(c.bundleDir, filename) 357 358 // Create request 359 req, err := http.NewRequest("GET", url, nil) 360 if err != nil { 361 return 0, err 362 } 363 364 // Download 365 resp, err := client.Do(req) 366 if err != nil { 367 return 0, err 368 } 369 defer resp.Body.Close() 370 371 if resp.StatusCode != http.StatusOK { 372 body, _ := io.ReadAll(resp.Body) 373 return 0, fmt.Errorf("HTTP %d: %s", resp.StatusCode, string(body)) 374 } 375 376 // Write to temp file 377 tempPath := filepath + ".tmp" 378 outFile, err := os.Create(tempPath) 379 if err != nil { 380 return 0, err 381 } 382 383 written, err := io.Copy(outFile, resp.Body) 384 outFile.Close() 385 386 if err != nil { 387 os.Remove(tempPath) 388 return 0, err 389 } 390 391 // Verify hash 392 if expectedHash != "" { 393 valid, actualHash, err := c.operations.VerifyHash(tempPath, expectedHash) 394 if err != nil { 395 os.Remove(tempPath) 396 return 0, fmt.Errorf("hash verification failed: %w", err) 397 } 398 if !valid { 399 os.Remove(tempPath) 400 return 0, fmt.Errorf("hash mismatch: expected %s, got %s", 401 expectedHash[:16]+"...", actualHash[:16]+"...") 402 } 403 } 404 405 // Rename to final location 406 if err := os.Rename(tempPath, filepath); err != nil { 407 os.Remove(tempPath) 408 return 0, err 409 } 410 411 return written, nil 412}