[DEPRECATED] Go implementation of plcbundle
at main 1067 lines 28 kB view raw
1package didindex 2 3import ( 4 "encoding/binary" 5 "fmt" 6 "hash/fnv" 7 "os" 8 "path/filepath" 9 "sort" 10 "sync/atomic" 11 "syscall" 12 "time" 13 14 "github.com/goccy/go-json" 15 "golang.org/x/sys/unix" 16 "tangled.org/atscan.net/plcbundle-go/internal/plcclient" 17) 18 19// NewManager creates a new DID index manager 20func NewManager(baseDir string, logger Logger) *Manager { 21 indexDir := filepath.Join(baseDir, DID_INDEX_DIR) 22 shardDir := filepath.Join(indexDir, DID_INDEX_SHARDS) 23 configPath := filepath.Join(indexDir, DID_INDEX_CONFIG) 24 25 // Load or create config 26 config, _ := loadIndexConfig(configPath) 27 if config == nil { 28 config = &Config{ 29 Version: DIDINDEX_VERSION, // Will be 4 30 Format: "binary_v4", // Update format name 31 ShardCount: DID_SHARD_COUNT, 32 UpdatedAt: time.Now().UTC(), 33 } 34 } else if config.Version < DIDINDEX_VERSION { 35 // Auto-trigger rebuild on version mismatch 36 logger.Printf("DID index version outdated (v%d, need v%d) - rebuild required", 37 config.Version, DIDINDEX_VERSION) 38 } 39 40 return &Manager{ 41 baseDir: baseDir, 42 indexDir: indexDir, 43 shardDir: shardDir, 44 configPath: configPath, 45 maxCache: 5, 46 evictionThreshold: 5, 47 config: config, 48 logger: logger, 49 recentLookupSize: 1000, // Track last 100 lookups 50 recentLookups: make([]int64, 1000), 51 } 52} 53 54// Close unmaps all shards and cleans up 55func (dim *Manager) Close() error { 56 // Mark all shards for eviction 57 var shards []*mmapShard 58 59 dim.shardCache.Range(func(key, value interface{}) bool { 60 shard := value.(*mmapShard) 61 shards = append(shards, shard) 62 dim.shardCache.Delete(key) 63 return true 64 }) 65 66 // Wait for refcounts to drop to 0 67 for _, shard := range shards { 68 for atomic.LoadInt64(&shard.refCount) > 0 { 69 time.Sleep(1 * time.Millisecond) 70 } 71 dim.unmapShard(shard) 72 } 73 74 return nil 75} 76 77func (dim *Manager) SetVerbose(verbose bool) { 78 dim.verbose = verbose 79} 80 81// GetDIDLocations returns all bundle+position locations for a DID (with timing) 82func (dim *Manager) GetDIDLocations(did string) ([]OpLocation, error) { 83 // Start timing 84 lookupStart := time.Now() 85 defer func() { 86 dim.recordLookupTime(time.Since(lookupStart)) 87 }() 88 89 identifier, err := extractDIDIdentifier(did) 90 if err != nil { 91 return nil, err 92 } 93 94 shardNum := dim.calculateShard(identifier) 95 if dim.verbose { 96 dim.logger.Printf("DEBUG: DID %s -> identifier '%s' -> shard %02x", did, identifier, shardNum) 97 } 98 99 shard, err := dim.loadShard(shardNum) 100 if err != nil { 101 if dim.verbose { 102 dim.logger.Printf("DEBUG: Failed to load shard: %v", err) 103 } 104 return nil, fmt.Errorf("failed to load shard %02x: %w", shardNum, err) 105 } 106 107 defer dim.releaseShard(shard) 108 109 if shard.data == nil { 110 if dim.verbose { 111 dim.logger.Printf("DEBUG: Shard %02x has no data (empty shard)", shardNum) 112 } 113 return nil, nil 114 } 115 116 if dim.verbose { 117 dim.logger.Printf("DEBUG: Shard %02x loaded, size: %d bytes", shardNum, len(shard.data)) 118 } 119 120 locations := dim.searchShard(shard, identifier) 121 122 if dim.verbose { 123 dim.logger.Printf("DEBUG: Binary search found %d locations", len(locations)) 124 if len(locations) > 0 { 125 dim.logger.Printf("DEBUG: Locations: %v", locations) 126 } 127 } 128 129 return locations, nil 130} 131 132// calculateShard determines which shard a DID belongs to 133func (dim *Manager) calculateShard(identifier string) uint8 { 134 h := fnv.New32a() 135 h.Write([]byte(identifier)) 136 hash := h.Sum32() 137 return uint8(hash % DID_SHARD_COUNT) 138} 139 140// loadShard loads a shard from cache or disk (with madvise optimization) 141func (dim *Manager) loadShard(shardNum uint8) (*mmapShard, error) { 142 // Fast path: cache hit 143 if val, ok := dim.shardCache.Load(shardNum); ok { 144 shard := val.(*mmapShard) 145 146 // Increment refcount BEFORE returning 147 atomic.AddInt64(&shard.refCount, 1) 148 atomic.StoreInt64(&shard.lastUsed, time.Now().Unix()) 149 atomic.AddInt64(&shard.accessCount, 1) 150 atomic.AddInt64(&dim.cacheHits, 1) 151 152 return shard, nil 153 } 154 atomic.AddInt64(&dim.cacheMisses, 1) 155 156 // Cache miss - load from disk 157 shardPath := filepath.Join(dim.shardDir, fmt.Sprintf("%02x.idx", shardNum)) 158 159 if _, err := os.Stat(shardPath); os.IsNotExist(err) { 160 // Empty shard - no refcount needed 161 return &mmapShard{ 162 shardNum: shardNum, 163 data: nil, 164 lastUsed: time.Now().Unix(), 165 refCount: 0, // Not in cache 166 }, nil 167 } 168 169 file, err := os.Open(shardPath) 170 if err != nil { 171 return nil, err 172 } 173 174 info, err := file.Stat() 175 if err != nil { 176 file.Close() 177 return nil, err 178 } 179 180 if info.Size() == 0 { 181 file.Close() 182 return &mmapShard{ 183 shardNum: shardNum, 184 data: nil, 185 lastUsed: time.Now().Unix(), 186 refCount: 0, 187 }, nil 188 } 189 190 // Memory-map the file 191 data, err := syscall.Mmap(int(file.Fd()), 0, int(info.Size()), 192 syscall.PROT_READ, syscall.MAP_SHARED) 193 if err != nil { 194 file.Close() 195 return nil, fmt.Errorf("mmap failed: %w", err) 196 } 197 198 if err := dim.applyMadviseHints(data, info.Size()); err != nil { 199 if dim.verbose { 200 dim.logger.Printf("DEBUG: madvise failed (non-fatal): %v", err) 201 } 202 } 203 204 shard := &mmapShard{ 205 shardNum: shardNum, 206 data: data, 207 file: file, 208 lastUsed: time.Now().Unix(), 209 accessCount: 1, 210 refCount: 1, 211 } 212 213 // Try to store 214 actual, loaded := dim.shardCache.LoadOrStore(shardNum, shard) 215 216 if loaded { 217 // Someone else loaded it - cleanup ours 218 dim.unmapShard(shard) 219 220 actualShard := actual.(*mmapShard) 221 atomic.AddInt64(&actualShard.refCount, 1) // Increment their refcount 222 atomic.StoreInt64(&actualShard.lastUsed, time.Now().Unix()) 223 atomic.AddInt64(&actualShard.accessCount, 1) 224 return actualShard, nil 225 } 226 227 // We stored it - maybe evict 228 go dim.evictIfNeeded() // Run async to avoid blocking 229 230 return shard, nil 231} 232 233// applyMadviseHints applies OS-level memory hints for optimal performance 234func (dim *Manager) applyMadviseHints(data []byte, fileSize int64) error { 235 const headerPrefetchSize = 16 * 1024 // 16 KB for header + prefix + start of offset table 236 237 // 1. Prefetch critical header section (prefix index + offset table start) 238 if len(data) >= headerPrefetchSize { 239 if err := unix.Madvise(data[:headerPrefetchSize], unix.MADV_WILLNEED); err != nil { 240 if dim.verbose { 241 dim.logger.Printf("DEBUG: [madvise] Header prefetch failed: %v", err) 242 } 243 } else if dim.verbose { 244 dim.logger.Printf("DEBUG: [madvise] Prefetching header (%d KB) + marking rest as RANDOM", 245 headerPrefetchSize/1024) 246 } 247 } 248 249 // 2. Mark rest as random access (tells OS not to do read-ahead) 250 if err := unix.Madvise(data, unix.MADV_RANDOM); err != nil { 251 return err 252 } 253 254 if dim.verbose { 255 dim.logger.Printf("DEBUG: [madvise] Shard size: %.1f MB → RANDOM access pattern", 256 float64(fileSize)/(1024*1024)) 257 } 258 259 return nil 260} 261 262// searchShard performs optimized binary search using prefix index 263func (dim *Manager) searchShard(shard *mmapShard, identifier string) []OpLocation { 264 if len(shard.data) < 1056 { 265 return nil 266 } 267 268 data := shard.data 269 270 // Read header 271 if string(data[0:4]) != DIDINDEX_MAGIC { 272 dim.logger.Printf("Warning: invalid shard magic") 273 return nil 274 } 275 276 version := binary.LittleEndian.Uint32(data[4:8]) 277 entryCount := binary.LittleEndian.Uint32(data[9:13]) 278 279 if entryCount == 0 { 280 return nil 281 } 282 283 // Determine search range using prefix index 284 left, right := 0, int(entryCount) 285 286 // Use prefix index to narrow range (only for v3+) 287 if version >= 3 && len(identifier) > 0 { 288 prefixByte := identifier[0] 289 prefixIndexPos := 32 + (int(prefixByte) * 4) 290 291 if prefixIndexPos+4 <= len(data) { 292 startIdx := binary.LittleEndian.Uint32(data[prefixIndexPos : prefixIndexPos+4]) 293 294 if startIdx != 0xFFFFFFFF { 295 left = int(startIdx) 296 297 // Find end of this prefix range 298 for nextPrefix := int(prefixByte) + 1; nextPrefix < 256; nextPrefix++ { 299 nextPos := 32 + (nextPrefix * 4) 300 if nextPos+4 > len(data) { 301 break 302 } 303 nextIdx := binary.LittleEndian.Uint32(data[nextPos : nextPos+4]) 304 if nextIdx != 0xFFFFFFFF { 305 right = int(nextIdx) 306 break 307 } 308 } 309 310 if dim.verbose { 311 dim.logger.Printf("DEBUG: Prefix index narrowed search: %d entries → %d entries (%.1f%% reduction)", 312 entryCount, right-left, (1.0-float64(right-left)/float64(entryCount))*100) 313 } 314 } else { 315 // No entries with this prefix 316 if dim.verbose { 317 dim.logger.Printf("DEBUG: Prefix index: no entries with prefix 0x%02x", prefixByte) 318 } 319 return nil 320 } 321 } 322 } 323 324 if dim.verbose { 325 dim.logger.Printf("DEBUG: Binary search range: [%d, %d) of %d entries", left, right, entryCount) 326 } 327 328 // Binary search within narrowed range 329 attempts := 0 330 offsetTableStart := 1056 // After header + prefix index 331 332 for left < right { 333 attempts++ 334 mid := (left + right) / 2 335 336 // Get entry offset from offset table 337 offsetPos := offsetTableStart + (mid * 4) 338 if offsetPos+4 > len(data) { 339 if dim.verbose { 340 dim.logger.Printf("DEBUG: Offset position out of bounds") 341 } 342 return nil 343 } 344 345 entryOffset := int(binary.LittleEndian.Uint32(data[offsetPos : offsetPos+4])) 346 347 if entryOffset+DID_IDENTIFIER_LEN > len(data) { 348 if dim.verbose { 349 dim.logger.Printf("DEBUG: Entry offset out of bounds: %d + %d > %d", 350 entryOffset, DID_IDENTIFIER_LEN, len(data)) 351 } 352 return nil 353 } 354 355 entryID := string(data[entryOffset : entryOffset+DID_IDENTIFIER_LEN]) 356 357 if dim.verbose && attempts <= 5 { 358 dim.logger.Printf("DEBUG: Attempt %d: mid=%d, comparing '%s' vs '%s'", 359 attempts, mid, identifier, entryID) 360 } 361 362 // Compare 363 if identifier == entryID { 364 if dim.verbose { 365 dim.logger.Printf("DEBUG: FOUND at mid=%d after %d attempts (vs ~%d without prefix index)", 366 mid, attempts, logBase2(int(entryCount))) 367 } 368 return dim.readLocations(data, entryOffset) 369 } else if identifier < entryID { 370 right = mid 371 } else { 372 left = mid + 1 373 } 374 } 375 376 if dim.verbose { 377 dim.logger.Printf("DEBUG: NOT FOUND after %d attempts", attempts) 378 } 379 380 return nil 381} 382 383// Helper function 384func logBase2(n int) int { 385 if n <= 0 { 386 return 0 387 } 388 count := 0 389 for n > 1 { 390 n >>= 1 391 count++ 392 } 393 return count 394} 395 396// getEntryOffset reads entry offset from offset table - O(1) lookup 397func (dim *Manager) getEntryOffset(data []byte, entryIndex int) int { 398 if len(data) < 1056 { 399 return -1 400 } 401 402 entryCount := binary.LittleEndian.Uint32(data[9:13]) 403 if entryIndex < 0 || entryIndex >= int(entryCount) { 404 return -1 405 } 406 407 // Offset table starts at 1056 (after header + prefix index) 408 offsetTableStart := 1056 409 offsetPos := offsetTableStart + (entryIndex * 4) 410 411 if offsetPos+4 > len(data) { 412 return -1 413 } 414 415 offset := int(binary.LittleEndian.Uint32(data[offsetPos : offsetPos+4])) 416 417 if offset < 0 || offset >= len(data) { 418 return -1 419 } 420 421 return offset 422} 423 424// readLocations reads location data at given offset 425func (dim *Manager) readLocations(data []byte, offset int) []OpLocation { 426 // Skip identifier 427 offset += DID_IDENTIFIER_LEN 428 429 // Read count 430 if offset+2 > len(data) { 431 return nil 432 } 433 count := binary.LittleEndian.Uint16(data[offset : offset+2]) 434 offset += 2 435 436 // Read locations 437 locations := make([]OpLocation, count) 438 for i := 0; i < int(count); i++ { 439 if offset+4 > len(data) { 440 return locations[:i] 441 } 442 443 // Read packed uint32 444 packed := binary.LittleEndian.Uint32(data[offset : offset+4]) 445 locations[i] = OpLocation(packed) 446 447 offset += 4 448 } 449 450 return locations 451} 452 453// unmapShard unmaps and closes a shard 454func (dim *Manager) unmapShard(shard *mmapShard) { 455 if shard.data != nil { 456 unix.Madvise(shard.data, unix.MADV_DONTNEED) 457 458 syscall.Munmap(shard.data) 459 } 460 if shard.file != nil { 461 if f, ok := shard.file.(*os.File); ok { 462 f.Close() 463 } 464 } 465} 466 467// GetStats returns index statistics (updated) 468func (dim *Manager) GetStats() map[string]interface{} { 469 cachedShards := make([]int, 0) 470 471 dim.shardCache.Range(func(key, value interface{}) bool { 472 cachedShards = append(cachedShards, int(key.(uint8))) 473 return true 474 }) 475 476 sort.Ints(cachedShards) 477 478 // Calculate cache hit rate 479 hits := atomic.LoadInt64(&dim.cacheHits) 480 misses := atomic.LoadInt64(&dim.cacheMisses) 481 total := hits + misses 482 483 cacheHitRate := 0.0 484 if total > 0 { 485 cacheHitRate = float64(hits) / float64(total) 486 } 487 488 baseStats := map[string]interface{}{ 489 "total_dids": dim.config.TotalDIDs, 490 "last_bundle": dim.config.LastBundle, 491 "shard_count": dim.config.ShardCount, 492 "cached_shards": len(cachedShards), 493 "cache_limit": dim.maxCache, 494 "cache_order": cachedShards, 495 "updated_at": dim.config.UpdatedAt, 496 "cache_hits": hits, 497 "cache_misses": misses, 498 "cache_hit_rate": cacheHitRate, 499 "total_lookups": total, 500 } 501 502 // Merge with performance stats 503 perfStats := dim.calculateLookupStats() 504 for k, v := range perfStats { 505 baseStats[k] = v 506 } 507 508 return baseStats 509} 510 511// Exists checks if index exists 512func (dim *Manager) Exists() bool { 513 _, err := os.Stat(dim.configPath) 514 return err == nil 515} 516 517// TrimCache trims cache to keep only most recent shard 518func (dim *Manager) TrimCache() { 519 // Count current size 520 size := 0 521 dim.shardCache.Range(func(k, v interface{}) bool { 522 size++ 523 return true 524 }) 525 526 if size <= 1 { 527 return 528 } 529 530 // Find most recent shard 531 var newestTime int64 532 var keepNum uint8 533 534 dim.shardCache.Range(func(key, value interface{}) bool { 535 shard := value.(*mmapShard) 536 lastUsed := atomic.LoadInt64(&shard.lastUsed) 537 if lastUsed > newestTime { 538 newestTime = lastUsed 539 keepNum = key.(uint8) 540 } 541 return true 542 }) 543 544 // Evict all except newest 545 dim.shardCache.Range(func(key, value interface{}) bool { 546 num := key.(uint8) 547 if num != keepNum { 548 if val, ok := dim.shardCache.LoadAndDelete(key); ok { 549 shard := val.(*mmapShard) 550 dim.unmapShard(shard) 551 } 552 } 553 return true 554 }) 555} 556 557// GetConfig returns the index configuration 558func (dim *Manager) GetConfig() *Config { 559 return dim.config 560} 561 562// DebugShard shows shard debugging information 563func (dim *Manager) DebugShard(shardNum uint8) error { 564 shardPath := filepath.Join(dim.shardDir, fmt.Sprintf("%02x.idx", shardNum)) 565 data, err := os.ReadFile(shardPath) 566 if err != nil { 567 return err 568 } 569 570 fmt.Printf("Shard %02x debug:\n", shardNum) 571 fmt.Printf(" File size: %d bytes\n", len(data)) 572 fmt.Printf(" Magic: %s\n", string(data[0:4])) 573 fmt.Printf(" Version: %d\n", binary.LittleEndian.Uint32(data[4:8])) 574 fmt.Printf(" Shard num: %d\n", data[8]) 575 576 entryCount := binary.LittleEndian.Uint32(data[9:13]) 577 fmt.Printf(" Entry count: %d\n", entryCount) 578 579 // Show offset table info 580 offsetTableSize := int(entryCount) * 4 581 dataStartOffset := 32 + offsetTableSize 582 fmt.Printf(" Offset table size: %d bytes\n", offsetTableSize) 583 fmt.Printf(" Data starts at: %d\n", dataStartOffset) 584 585 // Show first few entries 586 fmt.Printf("\n First 5 entries:\n") 587 588 for i := 0; i < 5 && i < int(entryCount); i++ { 589 offset := dim.getEntryOffset(data, i) 590 if offset < 0 || offset+DID_IDENTIFIER_LEN+2 > len(data) { 591 break 592 } 593 594 identifier := string(data[offset : offset+DID_IDENTIFIER_LEN]) 595 locCount := binary.LittleEndian.Uint16(data[offset+DID_IDENTIFIER_LEN : offset+DID_IDENTIFIER_LEN+2]) 596 597 fmt.Printf(" %d. '%s' (%d locations) @ offset %d\n", i+1, identifier, locCount, offset) 598 } 599 600 return nil 601} 602 603func (dim *Manager) invalidateShard(shardNum uint8) { 604 if val, ok := dim.shardCache.LoadAndDelete(shardNum); ok { 605 shard := val.(*mmapShard) 606 607 for atomic.LoadInt64(&shard.refCount) > 0 { 608 time.Sleep(1 * time.Millisecond) 609 } 610 611 dim.unmapShard(shard) 612 } 613} 614 615// writeShard writes a shard to disk in binary format with offset table 616func (dim *Manager) writeShard(shardNum uint8, builder *ShardBuilder) error { 617 // Write to temp file first 618 shardPath := filepath.Join(dim.shardDir, fmt.Sprintf("%02x.idx", shardNum)) 619 tempPath := shardPath + ".tmp" 620 621 if err := dim.writeShardToPath(tempPath, shardNum, builder); err != nil { 622 return err 623 } 624 625 // Atomic rename 626 if err := os.Rename(tempPath, shardPath); err != nil { 627 os.Remove(tempPath) 628 return err 629 } 630 631 // Invalidate cache for this shard 632 dim.invalidateShard(shardNum) 633 634 return nil 635} 636 637// writeShardToPath writes shard to a specific path with prefix index 638func (dim *Manager) writeShardToPath(path string, shardNum uint8, builder *ShardBuilder) error { 639 // Sort identifiers for binary search 640 identifiers := make([]string, 0, len(builder.entries)) 641 for id := range builder.entries { 642 identifiers = append(identifiers, id) 643 } 644 sort.Strings(identifiers) 645 646 if len(identifiers) == 0 { 647 // Write empty shard 648 return os.WriteFile(path, make([]byte, 0), 0644) 649 } 650 651 // Build prefix index: map first byte → first entry index with that prefix 652 prefixIndex := make([]uint32, 256) 653 for i := range prefixIndex { 654 prefixIndex[i] = 0xFFFFFFFF // Marker for "no entries" 655 } 656 657 for i, identifier := range identifiers { 658 if len(identifier) == 0 { 659 continue 660 } 661 prefixByte := identifier[0] 662 663 // Set to first occurrence only 664 if prefixIndex[prefixByte] == 0xFFFFFFFF { 665 prefixIndex[prefixByte] = uint32(i) 666 } 667 } 668 669 // Calculate entry offsets 670 offsetTableStart := 1056 // After header (32) + prefix index (1024) 671 dataStartOffset := offsetTableStart + (len(identifiers) * 4) 672 673 offsetTable := make([]uint32, len(identifiers)) 674 currentOffset := dataStartOffset 675 676 for i, id := range identifiers { 677 offsetTable[i] = uint32(currentOffset) 678 locations := builder.entries[id] 679 entrySize := DID_IDENTIFIER_LEN + 2 + (len(locations) * 4) 680 currentOffset += entrySize 681 } 682 683 totalSize := currentOffset 684 685 // Allocate buffer 686 buf := make([]byte, totalSize) 687 688 // Write header (32 bytes) 689 copy(buf[0:4], DIDINDEX_MAGIC) 690 binary.LittleEndian.PutUint32(buf[4:8], DIDINDEX_VERSION) 691 buf[8] = shardNum 692 binary.LittleEndian.PutUint32(buf[9:13], uint32(len(identifiers))) 693 // bytes 13-31: reserved (zeros) 694 695 // Write prefix index (32-1055: 256 × 4 bytes) 696 for i, entryIdx := range prefixIndex { 697 pos := 32 + (i * 4) 698 binary.LittleEndian.PutUint32(buf[pos:pos+4], entryIdx) 699 } 700 701 // Write offset table (1056+) 702 for i, offset := range offsetTable { 703 pos := offsetTableStart + (i * 4) 704 binary.LittleEndian.PutUint32(buf[pos:pos+4], offset) 705 } 706 707 // Write entries (same as before) 708 for i, identifier := range identifiers { 709 offset := int(offsetTable[i]) 710 locations := builder.entries[identifier] 711 712 copy(buf[offset:offset+DID_IDENTIFIER_LEN], identifier) 713 offset += DID_IDENTIFIER_LEN 714 715 binary.LittleEndian.PutUint16(buf[offset:offset+2], uint16(len(locations))) 716 offset += 2 717 718 for _, loc := range locations { 719 // Write packed uint32 (global position + nullified bit) 720 binary.LittleEndian.PutUint32(buf[offset:offset+4], uint32(loc)) 721 offset += 4 722 } 723 } 724 725 return os.WriteFile(path, buf, 0644) 726} 727 728// parseShardData parses binary shard data into builder (supports v2 and v3) 729func (dim *Manager) parseShardData(data []byte, builder *ShardBuilder) error { 730 if len(data) < 32 { 731 return nil 732 } 733 734 entryCount := binary.LittleEndian.Uint32(data[9:13]) 735 736 offsetTableStart := 1056 737 738 // Start reading entries after offset table 739 offset := offsetTableStart + (int(entryCount) * 4) 740 741 for i := 0; i < int(entryCount); i++ { 742 if offset+DID_IDENTIFIER_LEN+2 > len(data) { 743 break 744 } 745 746 // Read identifier 747 identifier := string(data[offset : offset+DID_IDENTIFIER_LEN]) 748 offset += DID_IDENTIFIER_LEN 749 750 // Read location count 751 locCount := binary.LittleEndian.Uint16(data[offset : offset+2]) 752 offset += 2 753 754 // Read locations 755 locations := make([]OpLocation, locCount) 756 757 // Check version to determine format 758 version := binary.LittleEndian.Uint32(data[4:8]) 759 760 for j := 0; j < int(locCount); j++ { 761 if version >= 4 { 762 // New format: 4-byte packed uint32 763 if offset+4 > len(data) { 764 break 765 } 766 packed := binary.LittleEndian.Uint32(data[offset : offset+4]) 767 locations[j] = OpLocation(packed) 768 offset += 4 769 } else { 770 // Old format: 5-byte separate fields (for migration) 771 if offset+5 > len(data) { 772 break 773 } 774 bundle := binary.LittleEndian.Uint16(data[offset : offset+2]) 775 position := binary.LittleEndian.Uint16(data[offset+2 : offset+4]) 776 nullified := data[offset+4] != 0 777 778 // Convert to new format 779 locations[j] = NewOpLocation(bundle, position, nullified) 780 offset += 5 781 } 782 } 783 784 builder.entries[identifier] = locations 785 } 786 787 return nil 788} 789 790// saveIndexConfig saves index configuration 791func (dim *Manager) saveIndexConfig() error { 792 dim.config.UpdatedAt = time.Now().UTC() 793 794 data, err := json.MarshalIndent(dim.config, "", " ") 795 if err != nil { 796 return err 797 } 798 799 return os.WriteFile(dim.configPath, data, 0644) 800} 801 802// extractDIDIdentifier extracts the 24-char identifier from full DID 803func extractDIDIdentifier(did string) (string, error) { 804 if err := plcclient.ValidateDIDFormat(did); err != nil { 805 return "", err 806 } 807 808 // Remove "did:plc:" prefix 809 identifier := did[8:] 810 811 if len(identifier) != DID_IDENTIFIER_LEN { 812 return "", fmt.Errorf("invalid identifier length: %d", len(identifier)) 813 } 814 815 return identifier, nil 816} 817 818// loadIndexConfig loads index configuration 819func loadIndexConfig(path string) (*Config, error) { 820 data, err := os.ReadFile(path) 821 if err != nil { 822 return nil, err 823 } 824 825 var config Config 826 if err := json.Unmarshal(data, &config); err != nil { 827 return nil, err 828 } 829 830 return &config, nil 831} 832 833func (dim *Manager) evictIfNeeded() { 834 size := 0 835 dim.shardCache.Range(func(_, _ interface{}) bool { 836 size++ 837 return true 838 }) 839 840 if size <= dim.evictionThreshold { 841 return 842 } 843 844 type entry struct { 845 num uint8 846 lastUsed int64 847 refCount int64 848 } 849 850 var entries []entry 851 852 dim.shardCache.Range(func(key, value interface{}) bool { 853 shard := value.(*mmapShard) 854 entries = append(entries, entry{ 855 num: key.(uint8), 856 lastUsed: atomic.LoadInt64(&shard.lastUsed), 857 refCount: atomic.LoadInt64(&shard.refCount), 858 }) 859 return true 860 }) 861 862 // Sort by lastUsed (oldest first) 863 sort.Slice(entries, func(i, j int) bool { 864 return entries[i].lastUsed < entries[j].lastUsed 865 }) 866 867 // Evict oldest shards that are NOT in use 868 toEvict := size - dim.maxCache 869 evicted := 0 870 871 for i := 0; i < len(entries) && evicted < toEvict; i++ { 872 // Only evict if refCount == 0 (not in use) 873 if entries[i].refCount == 0 { 874 if val, ok := dim.shardCache.LoadAndDelete(entries[i].num); ok { 875 shard := val.(*mmapShard) 876 877 // Double-check refcount (race protection) 878 if atomic.LoadInt64(&shard.refCount) == 0 { 879 dim.unmapShard(shard) 880 evicted++ 881 } else { 882 // Someone started using it - put it back 883 dim.shardCache.Store(entries[i].num, shard) 884 } 885 } 886 } 887 } 888} 889 890// releaseShard decrements reference count 891func (dim *Manager) releaseShard(shard *mmapShard) { 892 if shard == nil || shard.data == nil { 893 return 894 } 895 896 atomic.AddInt64(&shard.refCount, -1) 897} 898 899// ResetCacheStats resets cache statistics (useful for monitoring) 900func (dim *Manager) ResetCacheStats() { 901 atomic.StoreInt64(&dim.cacheHits, 0) 902 atomic.StoreInt64(&dim.cacheMisses, 0) 903} 904 905// recordLookupTime records a lookup time (thread-safe) 906func (dim *Manager) recordLookupTime(duration time.Duration) { 907 micros := duration.Microseconds() 908 909 // Update totals (atomic) 910 atomic.AddInt64(&dim.totalLookups, 1) 911 atomic.AddInt64(&dim.totalLookupTime, micros) 912 913 // Update circular buffer (with lock) 914 dim.lookupTimeLock.Lock() 915 dim.recentLookups[dim.recentLookupIdx] = micros 916 dim.recentLookupIdx = (dim.recentLookupIdx + 1) % dim.recentLookupSize 917 dim.lookupTimeLock.Unlock() 918} 919 920// calculateLookupStats calculates performance statistics 921func (dim *Manager) calculateLookupStats() map[string]interface{} { 922 totalLookups := atomic.LoadInt64(&dim.totalLookups) 923 totalTime := atomic.LoadInt64(&dim.totalLookupTime) 924 925 stats := make(map[string]interface{}) 926 927 if totalLookups == 0 { 928 return stats 929 } 930 931 // Overall average (all time) 932 avgMicros := float64(totalTime) / float64(totalLookups) 933 stats["avg_lookup_time_ms"] = avgMicros / 1000.0 934 stats["total_lookups"] = totalLookups 935 936 // Recent statistics (last N lookups) 937 dim.lookupTimeLock.Lock() 938 recentCopy := make([]int64, dim.recentLookupSize) 939 copy(recentCopy, dim.recentLookups) 940 dim.lookupTimeLock.Unlock() 941 942 // Find valid entries (non-zero) 943 validRecent := make([]int64, 0, dim.recentLookupSize) 944 for _, t := range recentCopy { 945 if t > 0 { 946 validRecent = append(validRecent, t) 947 } 948 } 949 950 if len(validRecent) > 0 { 951 // Sort for percentiles 952 sortedRecent := make([]int64, len(validRecent)) 953 copy(sortedRecent, validRecent) 954 sort.Slice(sortedRecent, func(i, j int) bool { 955 return sortedRecent[i] < sortedRecent[j] 956 }) 957 958 // Calculate recent average 959 var recentSum int64 960 for _, t := range validRecent { 961 recentSum += t 962 } 963 recentAvg := float64(recentSum) / float64(len(validRecent)) 964 stats["recent_avg_lookup_time_ms"] = recentAvg / 1000.0 965 stats["recent_sample_size"] = len(validRecent) 966 967 // Min/Max 968 stats["min_lookup_time_ms"] = float64(sortedRecent[0]) / 1000.0 969 stats["max_lookup_time_ms"] = float64(sortedRecent[len(sortedRecent)-1]) / 1000.0 970 971 // Percentiles (p50, p95, p99) 972 p50idx := len(sortedRecent) * 50 / 100 973 p95idx := len(sortedRecent) * 95 / 100 974 p99idx := len(sortedRecent) * 99 / 100 975 976 if p50idx < len(sortedRecent) { 977 stats["p50_lookup_time_ms"] = float64(sortedRecent[p50idx]) / 1000.0 978 } 979 if p95idx < len(sortedRecent) { 980 stats["p95_lookup_time_ms"] = float64(sortedRecent[p95idx]) / 1000.0 981 } 982 if p99idx < len(sortedRecent) { 983 stats["p99_lookup_time_ms"] = float64(sortedRecent[p99idx]) / 1000.0 984 } 985 } 986 987 return stats 988} 989 990// ResetPerformanceStats resets performance statistics (useful for monitoring periods) 991func (dim *Manager) ResetPerformanceStats() { 992 atomic.StoreInt64(&dim.cacheHits, 0) 993 atomic.StoreInt64(&dim.cacheMisses, 0) 994 atomic.StoreInt64(&dim.totalLookups, 0) 995 atomic.StoreInt64(&dim.totalLookupTime, 0) 996 997 dim.lookupTimeLock.Lock() 998 dim.recentLookups = make([]int64, dim.recentLookupSize) 999 dim.recentLookupIdx = 0 1000 dim.lookupTimeLock.Unlock() 1001} 1002 1003// NeedsRebuild checks if index needs rebuilding and returns reason 1004func (dim *Manager) NeedsRebuild(bundleProvider BundleIndexProvider) (bool, string) { 1005 // Check if index exists 1006 if !dim.Exists() { 1007 return true, "index does not exist" 1008 } 1009 1010 // Get repository state 1011 bundles := bundleProvider.GetBundles() 1012 if len(bundles) == 0 { 1013 return false, "" // No bundles, no need to rebuild 1014 } 1015 1016 lastBundleInRepo := bundles[len(bundles)-1].BundleNumber 1017 1018 // Check version 1019 if dim.config.Version != DIDINDEX_VERSION { 1020 return true, fmt.Sprintf("index version outdated (v%d, need v%d)", 1021 dim.config.Version, DIDINDEX_VERSION) 1022 } 1023 1024 // Check if index is behind 1025 if dim.config.LastBundle < lastBundleInRepo { 1026 bundlesBehind := lastBundleInRepo - dim.config.LastBundle 1027 1028 // Smart logic: only rebuild if significantly behind 1029 // Otherwise can do incremental update 1030 if bundlesBehind > 100 { 1031 return true, fmt.Sprintf("index significantly behind (%d bundles)", bundlesBehind) 1032 } 1033 1034 return false, fmt.Sprintf("index slightly behind (%d bundles) - can update incrementally", bundlesBehind) 1035 } 1036 1037 // Check if index is ahead (corruption indicator) 1038 if dim.config.LastBundle > lastBundleInRepo { 1039 return true, fmt.Sprintf("index is ahead of repository (has %d, repo has %d) - likely corrupted", 1040 dim.config.LastBundle, lastBundleInRepo) 1041 } 1042 1043 // Index is up to date 1044 return false, "" 1045} 1046 1047// ShouldUpdateIncrementally checks if incremental update is appropriate 1048func (dim *Manager) ShouldUpdateIncrementally(bundleProvider BundleIndexProvider) (bool, int) { 1049 if !dim.Exists() { 1050 return false, 0 1051 } 1052 1053 bundles := bundleProvider.GetBundles() 1054 if len(bundles) == 0 { 1055 return false, 0 1056 } 1057 1058 lastBundleInRepo := bundles[len(bundles)-1].BundleNumber 1059 bundlesBehind := lastBundleInRepo - dim.config.LastBundle 1060 1061 // Only do incremental if behind by less than 100 bundles 1062 if bundlesBehind > 0 && bundlesBehind <= 100 { 1063 return true, bundlesBehind 1064 } 1065 1066 return false, 0 1067}