[DEPRECATED] Go implementation of plcbundle
at rust-test 462 lines 13 kB view raw
1package didindex 2 3import ( 4 "context" 5 "encoding/binary" 6 "fmt" 7 "os" 8 "path/filepath" 9 "runtime" 10 "sort" 11 "sync" 12 "time" 13) 14 15// newShardBuilder creates a new shard builder 16func newShardBuilder() *ShardBuilder { 17 return &ShardBuilder{ 18 entries: make(map[string][]OpLocation), 19 } 20} 21 22// add adds a location to the shard 23func (sb *ShardBuilder) add(identifier string, loc OpLocation) { 24 sb.mu.Lock() 25 defer sb.mu.Unlock() 26 27 sb.entries[identifier] = append(sb.entries[identifier], loc) 28} 29 30// updateAndSaveConfig updates config with new values and saves atomically 31func (dim *Manager) updateAndSaveConfig(totalDIDs int64, lastBundle int) error { 32 dim.config.TotalDIDs = totalDIDs 33 dim.config.LastBundle = lastBundle 34 dim.config.Version = DIDINDEX_VERSION 35 dim.config.Format = "binary_v4" 36 dim.config.UpdatedAt = time.Now().UTC() 37 38 return dim.saveIndexConfig() 39} 40 41// BuildIndexFromScratch builds index with controlled memory usage 42func (dim *Manager) BuildIndexFromScratch(ctx context.Context, mgr BundleProvider, progressCallback func(current, total int)) error { 43 dim.indexMu.Lock() 44 defer dim.indexMu.Unlock() 45 46 dim.logger.Printf("Building DID index from scratch (memory-efficient mode)...") 47 48 bundles := mgr.GetBundleIndex().GetBundles() 49 if len(bundles) == 0 { 50 return fmt.Errorf("no bundles to index") 51 } 52 53 if err := os.MkdirAll(dim.shardDir, 0755); err != nil { 54 return fmt.Errorf("failed to create shard directory: %w", err) 55 } 56 57 // Create temporary shard files 58 tempShards := make([]*os.File, DID_SHARD_COUNT) 59 for i := 0; i < DID_SHARD_COUNT; i++ { 60 tempPath := filepath.Join(dim.shardDir, fmt.Sprintf("%02x.tmp", i)) 61 f, err := os.Create(tempPath) 62 if err != nil { 63 for j := 0; j < i; j++ { 64 tempShards[j].Close() 65 os.Remove(filepath.Join(dim.shardDir, fmt.Sprintf("%02x.tmp", j))) 66 } 67 return fmt.Errorf("failed to create temp shard: %w", err) 68 } 69 tempShards[i] = f 70 } 71 72 dim.logger.Printf("Pass 1/2: Scanning %d bundles...", len(bundles)) 73 74 // Stream all operations to temp files 75 for i, meta := range bundles { 76 select { 77 case <-ctx.Done(): 78 for _, f := range tempShards { 79 f.Close() 80 os.Remove(f.Name()) 81 } 82 return ctx.Err() 83 default: 84 } 85 86 if progressCallback != nil { 87 progressCallback(i+1, len(bundles)) 88 } 89 90 // Load bundle 91 bundle, err := mgr.LoadBundleForDIDIndex(ctx, meta.BundleNumber) 92 if err != nil { 93 dim.logger.Printf("Warning: failed to load bundle %d: %v", meta.BundleNumber, err) 94 continue 95 } 96 97 // Process each operation 98 for pos, op := range bundle.Operations { 99 identifier, err := extractDIDIdentifier(op.DID) 100 if err != nil { 101 continue 102 } 103 104 shardNum := dim.calculateShard(identifier) 105 106 // Write entry: [24 bytes ID][4 bytes packed OpLocation] 107 entry := make([]byte, 28) 108 copy(entry[0:24], identifier) 109 110 // Create packed OpLocation (includes nullified bit) 111 loc := NewOpLocation(uint16(meta.BundleNumber), uint16(pos), op.IsNullified()) 112 binary.LittleEndian.PutUint32(entry[24:28], uint32(loc)) 113 114 if _, err := tempShards[shardNum].Write(entry); err != nil { 115 dim.logger.Printf("Warning: failed to write to temp shard %02x: %v", shardNum, err) 116 } 117 } 118 } 119 120 // Close temp files 121 for _, f := range tempShards { 122 f.Close() 123 } 124 125 dim.logger.Printf("\n") 126 dim.logger.Printf("Pass 2/2: Consolidating %d shards...", DID_SHARD_COUNT) 127 128 // Consolidate shards 129 totalDIDs := int64(0) 130 for i := 0; i < DID_SHARD_COUNT; i++ { 131 // Log every 32 shards 132 if i%32 == 0 || i == DID_SHARD_COUNT-1 { 133 dim.logger.Printf(" Consolidating shards: %d/%d (%.1f%%)", 134 i+1, DID_SHARD_COUNT, float64(i+1)/float64(DID_SHARD_COUNT)*100) 135 } 136 137 count, err := dim.consolidateShard(uint8(i)) 138 if err != nil { 139 return fmt.Errorf("failed to consolidate shard %02x: %w", i, err) 140 } 141 totalDIDs += count 142 } 143 144 if err := dim.updateAndSaveConfig(totalDIDs, bundles[len(bundles)-1].BundleNumber); err != nil { 145 return fmt.Errorf("failed to save config: %w", err) 146 } 147 148 dim.logger.Printf("✓ Index built: %d DIDs indexed", totalDIDs) 149 150 return nil 151} 152 153// consolidateShard reads temp file, sorts, and writes final shard 154func (dim *Manager) consolidateShard(shardNum uint8) (int64, error) { 155 tempPath := filepath.Join(dim.shardDir, fmt.Sprintf("%02x.tmp", shardNum)) 156 157 // Read all entries from temp file 158 data, err := os.ReadFile(tempPath) 159 if err != nil { 160 if os.IsNotExist(err) { 161 return 0, nil 162 } 163 return 0, err 164 } 165 166 if len(data) == 0 { 167 os.Remove(tempPath) 168 return 0, nil 169 } 170 171 // Parse entries (28 bytes each) 172 entryCount := len(data) / 28 173 if len(data)%28 != 0 { 174 return 0, fmt.Errorf("corrupted temp shard: size not multiple of 28") 175 } 176 177 type tempEntry struct { 178 identifier string 179 location OpLocation 180 } 181 182 entries := make([]tempEntry, entryCount) 183 for i := 0; i < entryCount; i++ { 184 offset := i * 28 185 entries[i] = tempEntry{ 186 identifier: string(data[offset : offset+24]), 187 location: OpLocation(binary.LittleEndian.Uint32(data[offset+24 : offset+28])), 188 } 189 } 190 191 // Free the data slice 192 data = nil 193 194 // Sort by identifier 195 sort.Slice(entries, func(i, j int) bool { 196 return entries[i].identifier < entries[j].identifier 197 }) 198 199 // Group by DID 200 builder := newShardBuilder() 201 for _, entry := range entries { 202 builder.add(entry.identifier, entry.location) 203 } 204 205 // Free entries 206 entries = nil 207 208 // Write final shard 209 if err := dim.writeShard(shardNum, builder); err != nil { 210 return 0, err 211 } 212 213 // Clean up temp file 214 os.Remove(tempPath) 215 216 return int64(len(builder.entries)), nil 217} 218 219// UpdateIndexForBundle adds operations from a new bundle (incremental + ATOMIC + PARALLEL) 220func (dim *Manager) UpdateIndexForBundle(ctx context.Context, bundle *BundleData) error { 221 dim.indexMu.Lock() 222 defer dim.indexMu.Unlock() 223 224 totalStart := time.Now() 225 226 // STEP 1: Group operations by shard 227 groupStart := time.Now() 228 shardOps := make(map[uint8]map[string][]OpLocation) 229 230 for pos, op := range bundle.Operations { 231 identifier, err := extractDIDIdentifier(op.DID) 232 if err != nil { 233 continue 234 } 235 236 shardNum := dim.calculateShard(identifier) 237 238 if shardOps[shardNum] == nil { 239 shardOps[shardNum] = make(map[string][]OpLocation) 240 } 241 242 loc := NewOpLocation(uint16(bundle.BundleNumber), uint16(pos), op.IsNullified()) 243 shardOps[shardNum][identifier] = append(shardOps[shardNum][identifier], loc) 244 } 245 246 groupDuration := time.Since(groupStart) 247 if dim.verbose { 248 dim.logger.Printf(" [DID Index] Grouped operations into %d shards in %s", 249 len(shardOps), groupDuration) 250 } 251 252 // STEP 2: Write ALL shards to .tmp files FIRST (PARALLEL) 253 writeStart := time.Now() 254 255 tmpShards := make(map[uint8]string) 256 var tmpShardsMu sync.Mutex 257 var deltaCount int64 258 var deltaCountMu sync.Mutex 259 260 // Error handling 261 errChan := make(chan error, len(shardOps)) 262 263 // Worker pool 264 workers := runtime.NumCPU() 265 if workers > len(shardOps) { 266 workers = len(shardOps) 267 } 268 if workers < 1 { 269 workers = 1 270 } 271 272 semaphore := make(chan struct{}, workers) 273 var wg sync.WaitGroup 274 275 if dim.verbose { 276 dim.logger.Printf(" [DID Index] Updating %d shards in parallel (%d workers)...", 277 len(shardOps), workers) 278 } 279 280 // Process each shard in parallel 281 for shardNum, newOps := range shardOps { 282 wg.Add(1) 283 go func(sNum uint8, ops map[string][]OpLocation) { 284 defer wg.Done() 285 286 // Acquire semaphore (limit concurrency) 287 semaphore <- struct{}{} 288 defer func() { <-semaphore }() 289 290 shardStart := time.Now() 291 tmpPath := filepath.Join(dim.shardDir, fmt.Sprintf("%02x.idx.tmp", sNum)) 292 293 addedCount, err := dim.updateShardToTemp(sNum, ops, tmpPath) 294 if err != nil { 295 errChan <- fmt.Errorf("shard %02x: %w", sNum, err) 296 return 297 } 298 299 shardDuration := time.Since(shardStart) 300 301 // Update shared state 302 tmpShardsMu.Lock() 303 tmpShards[sNum] = tmpPath 304 tmpShardsMu.Unlock() 305 306 deltaCountMu.Lock() 307 deltaCount += addedCount 308 deltaCountMu.Unlock() 309 310 // Debug log for each shard 311 if dim.verbose { 312 dim.logger.Printf(" Shard %02x: +%d DIDs in %s (%d ops)", 313 sNum, addedCount, shardDuration, len(ops)) 314 } 315 }(shardNum, newOps) 316 } 317 318 // Wait for all workers 319 wg.Wait() 320 close(errChan) 321 322 writeDuration := time.Since(writeStart) 323 if dim.verbose { 324 dim.logger.Printf(" [DID Index] Wrote %d temp files in %s (%.1f shards/sec)", 325 len(tmpShards), writeDuration, float64(len(tmpShards))/writeDuration.Seconds()) 326 } 327 328 // Check for errors 329 if err := <-errChan; err != nil { 330 dim.cleanupTempShards(tmpShards) 331 return err 332 } 333 334 // STEP 3: Atomically commit ALL shards 335 commitStart := time.Now() 336 337 for shardNum, tmpPath := range tmpShards { 338 finalPath := filepath.Join(dim.shardDir, fmt.Sprintf("%02x.idx", shardNum)) 339 340 if err := os.Rename(tmpPath, finalPath); err != nil { 341 dim.logger.Printf("ERROR: Failed to commit shard %02x: %v", shardNum, err) 342 return fmt.Errorf("failed to commit shard %02x: %w", shardNum, err) 343 } 344 345 // Invalidate cache 346 dim.invalidateShard(shardNum) 347 } 348 349 commitDuration := time.Since(commitStart) 350 351 // STEP 4: Update config 352 configStart := time.Now() 353 354 newTotal := dim.config.TotalDIDs + deltaCount 355 if err := dim.updateAndSaveConfig(newTotal, bundle.BundleNumber); err != nil { 356 return fmt.Errorf("failed to save config: %w", err) 357 } 358 359 configDuration := time.Since(configStart) 360 totalDuration := time.Since(totalStart) 361 362 // Summary log 363 if dim.verbose { 364 dim.logger.Printf(" [DID Index] ✓ Bundle %06d indexed: +%d DIDs, %d shards updated in %s", 365 bundle.BundleNumber, deltaCount, len(tmpShards), totalDuration) 366 } 367 368 if dim.verbose { 369 dim.logger.Printf(" Breakdown: group=%s write=%s commit=%s config=%s", 370 groupDuration, writeDuration, commitDuration, configDuration) 371 dim.logger.Printf(" Throughput: %.0f ops/sec", 372 float64(len(bundle.Operations))/totalDuration.Seconds()) 373 } 374 375 return nil 376} 377 378// updateShardToTemp updates a shard and writes to temp file 379func (dim *Manager) updateShardToTemp(shardNum uint8, newOps map[string][]OpLocation, tmpPath string) (int64, error) { 380 shardPath := filepath.Join(dim.shardDir, fmt.Sprintf("%02x.idx", shardNum)) 381 382 existingBuilder := newShardBuilder() 383 384 // Read existing shard if it exists 385 if data, err := os.ReadFile(shardPath); err == nil && len(data) > 32 { 386 if err := dim.parseShardData(data, existingBuilder); err != nil { 387 return 0, fmt.Errorf("failed to parse existing shard: %w", err) 388 } 389 } 390 391 beforeCount := len(existingBuilder.entries) 392 393 // Merge new operations 394 for identifier, locations := range newOps { 395 existingBuilder.entries[identifier] = append(existingBuilder.entries[identifier], locations...) 396 } 397 398 afterCount := len(existingBuilder.entries) 399 deltaCount := int64(afterCount - beforeCount) 400 401 // Write to TEMP file 402 if err := dim.writeShardToPath(tmpPath, shardNum, existingBuilder); err != nil { 403 return 0, err 404 } 405 406 return deltaCount, nil 407} 408 409// cleanupTempShards removes all temporary shard files 410func (dim *Manager) cleanupTempShards(tmpShards map[uint8]string) { 411 for shardNum, tmpPath := range tmpShards { 412 if err := os.Remove(tmpPath); err != nil && !os.IsNotExist(err) { 413 dim.logger.Printf("Warning: failed to cleanup temp shard %02x: %v", shardNum, err) 414 } 415 } 416} 417 418// VerifyAndRepairIndex checks if index is consistent with bundles and repairs if needed 419func (dim *Manager) VerifyAndRepairIndex(ctx context.Context, mgr BundleProvider) error { 420 bundles := mgr.GetBundleIndex().GetBundles() 421 if len(bundles) == 0 { 422 return nil 423 } 424 425 lastBundleInRepo := bundles[len(bundles)-1].BundleNumber 426 lastBundleInIndex := dim.config.LastBundle 427 428 if lastBundleInIndex == lastBundleInRepo { 429 return nil 430 } 431 432 if lastBundleInIndex > lastBundleInRepo { 433 dim.logger.Printf("⚠️ Warning: Index claims bundle %d but only %d bundles exist", 434 lastBundleInIndex, lastBundleInRepo) 435 dim.logger.Printf(" Rebuilding index...") 436 return dim.BuildIndexFromScratch(ctx, mgr, nil) 437 } 438 439 // Index is behind - update incrementally 440 dim.logger.Printf("Index is behind: has bundle %d, need %d", 441 lastBundleInIndex, lastBundleInRepo) 442 dim.logger.Printf("Updating index for %d missing bundles...", 443 lastBundleInRepo-lastBundleInIndex) 444 445 for bundleNum := lastBundleInIndex + 1; bundleNum <= lastBundleInRepo; bundleNum++ { 446 bundle, err := mgr.LoadBundleForDIDIndex(ctx, bundleNum) 447 if err != nil { 448 return fmt.Errorf("failed to load bundle %d: %w", bundleNum, err) 449 } 450 451 if err := dim.UpdateIndexForBundle(ctx, bundle); err != nil { 452 return fmt.Errorf("failed to update index for bundle %d: %w", bundleNum, err) 453 } 454 455 if bundleNum%100 == 0 { 456 dim.logger.Printf(" Updated through bundle %d...", bundleNum) 457 } 458 } 459 460 dim.logger.Printf("✓ Index repaired: now at bundle %d", lastBundleInRepo) 461 return nil 462}