[DEPRECATED] Go implementation of plcbundle
1package didindex
2
3import (
4 "encoding/binary"
5 "fmt"
6 "hash/fnv"
7 "os"
8 "path/filepath"
9 "sort"
10 "sync/atomic"
11 "syscall"
12 "time"
13
14 "github.com/goccy/go-json"
15 "golang.org/x/sys/unix"
16 "tangled.org/atscan.net/plcbundle-go/internal/plcclient"
17)
18
19// NewManager creates a new DID index manager
20func NewManager(baseDir string, logger Logger) *Manager {
21 indexDir := filepath.Join(baseDir, DID_INDEX_DIR)
22 shardDir := filepath.Join(indexDir, DID_INDEX_SHARDS)
23 configPath := filepath.Join(indexDir, DID_INDEX_CONFIG)
24
25 // Load or create config
26 config, _ := loadIndexConfig(configPath)
27 if config == nil {
28 config = &Config{
29 Version: DIDINDEX_VERSION, // Will be 4
30 Format: "binary_v4", // Update format name
31 ShardCount: DID_SHARD_COUNT,
32 UpdatedAt: time.Now().UTC(),
33 }
34 } else if config.Version < DIDINDEX_VERSION {
35 // Auto-trigger rebuild on version mismatch
36 logger.Printf("DID index version outdated (v%d, need v%d) - rebuild required",
37 config.Version, DIDINDEX_VERSION)
38 }
39
40 return &Manager{
41 baseDir: baseDir,
42 indexDir: indexDir,
43 shardDir: shardDir,
44 configPath: configPath,
45 maxCache: 5,
46 evictionThreshold: 5,
47 config: config,
48 logger: logger,
49 recentLookupSize: 1000, // Track last 100 lookups
50 recentLookups: make([]int64, 1000),
51 }
52}
53
54// Close unmaps all shards and cleans up
55func (dim *Manager) Close() error {
56 // Mark all shards for eviction
57 var shards []*mmapShard
58
59 dim.shardCache.Range(func(key, value interface{}) bool {
60 shard := value.(*mmapShard)
61 shards = append(shards, shard)
62 dim.shardCache.Delete(key)
63 return true
64 })
65
66 // Wait for refcounts to drop to 0
67 for _, shard := range shards {
68 for atomic.LoadInt64(&shard.refCount) > 0 {
69 time.Sleep(1 * time.Millisecond)
70 }
71 dim.unmapShard(shard)
72 }
73
74 return nil
75}
76
77func (dim *Manager) SetVerbose(verbose bool) {
78 dim.verbose = verbose
79}
80
81// GetDIDLocations returns all bundle+position locations for a DID (with timing)
82func (dim *Manager) GetDIDLocations(did string) ([]OpLocation, error) {
83 // Start timing
84 lookupStart := time.Now()
85 defer func() {
86 dim.recordLookupTime(time.Since(lookupStart))
87 }()
88
89 identifier, err := extractDIDIdentifier(did)
90 if err != nil {
91 return nil, err
92 }
93
94 shardNum := dim.calculateShard(identifier)
95 if dim.verbose {
96 dim.logger.Printf("DEBUG: DID %s -> identifier '%s' -> shard %02x", did, identifier, shardNum)
97 }
98
99 shard, err := dim.loadShard(shardNum)
100 if err != nil {
101 if dim.verbose {
102 dim.logger.Printf("DEBUG: Failed to load shard: %v", err)
103 }
104 return nil, fmt.Errorf("failed to load shard %02x: %w", shardNum, err)
105 }
106
107 defer dim.releaseShard(shard)
108
109 if shard.data == nil {
110 if dim.verbose {
111 dim.logger.Printf("DEBUG: Shard %02x has no data (empty shard)", shardNum)
112 }
113 return nil, nil
114 }
115
116 if dim.verbose {
117 dim.logger.Printf("DEBUG: Shard %02x loaded, size: %d bytes", shardNum, len(shard.data))
118 }
119
120 locations := dim.searchShard(shard, identifier)
121
122 if dim.verbose {
123 dim.logger.Printf("DEBUG: Binary search found %d locations", len(locations))
124 if len(locations) > 0 {
125 dim.logger.Printf("DEBUG: Locations: %v", locations)
126 }
127 }
128
129 return locations, nil
130}
131
132// calculateShard determines which shard a DID belongs to
133func (dim *Manager) calculateShard(identifier string) uint8 {
134 h := fnv.New32a()
135 h.Write([]byte(identifier))
136 hash := h.Sum32()
137 return uint8(hash % DID_SHARD_COUNT)
138}
139
140// loadShard loads a shard from cache or disk (with madvise optimization)
141func (dim *Manager) loadShard(shardNum uint8) (*mmapShard, error) {
142 // Fast path: cache hit
143 if val, ok := dim.shardCache.Load(shardNum); ok {
144 shard := val.(*mmapShard)
145
146 // Increment refcount BEFORE returning
147 atomic.AddInt64(&shard.refCount, 1)
148 atomic.StoreInt64(&shard.lastUsed, time.Now().Unix())
149 atomic.AddInt64(&shard.accessCount, 1)
150 atomic.AddInt64(&dim.cacheHits, 1)
151
152 return shard, nil
153 }
154 atomic.AddInt64(&dim.cacheMisses, 1)
155
156 // Cache miss - load from disk
157 shardPath := filepath.Join(dim.shardDir, fmt.Sprintf("%02x.idx", shardNum))
158
159 if _, err := os.Stat(shardPath); os.IsNotExist(err) {
160 // Empty shard - no refcount needed
161 return &mmapShard{
162 shardNum: shardNum,
163 data: nil,
164 lastUsed: time.Now().Unix(),
165 refCount: 0, // Not in cache
166 }, nil
167 }
168
169 file, err := os.Open(shardPath)
170 if err != nil {
171 return nil, err
172 }
173
174 info, err := file.Stat()
175 if err != nil {
176 file.Close()
177 return nil, err
178 }
179
180 if info.Size() == 0 {
181 file.Close()
182 return &mmapShard{
183 shardNum: shardNum,
184 data: nil,
185 lastUsed: time.Now().Unix(),
186 refCount: 0,
187 }, nil
188 }
189
190 // Memory-map the file
191 data, err := syscall.Mmap(int(file.Fd()), 0, int(info.Size()),
192 syscall.PROT_READ, syscall.MAP_SHARED)
193 if err != nil {
194 file.Close()
195 return nil, fmt.Errorf("mmap failed: %w", err)
196 }
197
198 if err := dim.applyMadviseHints(data, info.Size()); err != nil {
199 if dim.verbose {
200 dim.logger.Printf("DEBUG: madvise failed (non-fatal): %v", err)
201 }
202 }
203
204 shard := &mmapShard{
205 shardNum: shardNum,
206 data: data,
207 file: file,
208 lastUsed: time.Now().Unix(),
209 accessCount: 1,
210 refCount: 1,
211 }
212
213 // Try to store
214 actual, loaded := dim.shardCache.LoadOrStore(shardNum, shard)
215
216 if loaded {
217 // Someone else loaded it - cleanup ours
218 dim.unmapShard(shard)
219
220 actualShard := actual.(*mmapShard)
221 atomic.AddInt64(&actualShard.refCount, 1) // Increment their refcount
222 atomic.StoreInt64(&actualShard.lastUsed, time.Now().Unix())
223 atomic.AddInt64(&actualShard.accessCount, 1)
224 return actualShard, nil
225 }
226
227 // We stored it - maybe evict
228 go dim.evictIfNeeded() // Run async to avoid blocking
229
230 return shard, nil
231}
232
233// applyMadviseHints applies OS-level memory hints for optimal performance
234func (dim *Manager) applyMadviseHints(data []byte, fileSize int64) error {
235 const headerPrefetchSize = 16 * 1024 // 16 KB for header + prefix + start of offset table
236
237 // 1. Prefetch critical header section (prefix index + offset table start)
238 if len(data) >= headerPrefetchSize {
239 if err := unix.Madvise(data[:headerPrefetchSize], unix.MADV_WILLNEED); err != nil {
240 if dim.verbose {
241 dim.logger.Printf("DEBUG: [madvise] Header prefetch failed: %v", err)
242 }
243 } else if dim.verbose {
244 dim.logger.Printf("DEBUG: [madvise] Prefetching header (%d KB) + marking rest as RANDOM",
245 headerPrefetchSize/1024)
246 }
247 }
248
249 // 2. Mark rest as random access (tells OS not to do read-ahead)
250 if err := unix.Madvise(data, unix.MADV_RANDOM); err != nil {
251 return err
252 }
253
254 if dim.verbose {
255 dim.logger.Printf("DEBUG: [madvise] Shard size: %.1f MB → RANDOM access pattern",
256 float64(fileSize)/(1024*1024))
257 }
258
259 return nil
260}
261
262// searchShard performs optimized binary search using prefix index
263func (dim *Manager) searchShard(shard *mmapShard, identifier string) []OpLocation {
264 if len(shard.data) < 1056 {
265 return nil
266 }
267
268 data := shard.data
269
270 // Read header
271 if string(data[0:4]) != DIDINDEX_MAGIC {
272 dim.logger.Printf("Warning: invalid shard magic")
273 return nil
274 }
275
276 version := binary.LittleEndian.Uint32(data[4:8])
277 entryCount := binary.LittleEndian.Uint32(data[9:13])
278
279 if entryCount == 0 {
280 return nil
281 }
282
283 // Determine search range using prefix index
284 left, right := 0, int(entryCount)
285
286 // Use prefix index to narrow range (only for v3+)
287 if version >= 3 && len(identifier) > 0 {
288 prefixByte := identifier[0]
289 prefixIndexPos := 32 + (int(prefixByte) * 4)
290
291 if prefixIndexPos+4 <= len(data) {
292 startIdx := binary.LittleEndian.Uint32(data[prefixIndexPos : prefixIndexPos+4])
293
294 if startIdx != 0xFFFFFFFF {
295 left = int(startIdx)
296
297 // Find end of this prefix range
298 for nextPrefix := int(prefixByte) + 1; nextPrefix < 256; nextPrefix++ {
299 nextPos := 32 + (nextPrefix * 4)
300 if nextPos+4 > len(data) {
301 break
302 }
303 nextIdx := binary.LittleEndian.Uint32(data[nextPos : nextPos+4])
304 if nextIdx != 0xFFFFFFFF {
305 right = int(nextIdx)
306 break
307 }
308 }
309
310 if dim.verbose {
311 dim.logger.Printf("DEBUG: Prefix index narrowed search: %d entries → %d entries (%.1f%% reduction)",
312 entryCount, right-left, (1.0-float64(right-left)/float64(entryCount))*100)
313 }
314 } else {
315 // No entries with this prefix
316 if dim.verbose {
317 dim.logger.Printf("DEBUG: Prefix index: no entries with prefix 0x%02x", prefixByte)
318 }
319 return nil
320 }
321 }
322 }
323
324 if dim.verbose {
325 dim.logger.Printf("DEBUG: Binary search range: [%d, %d) of %d entries", left, right, entryCount)
326 }
327
328 // Binary search within narrowed range
329 attempts := 0
330 offsetTableStart := 1056 // After header + prefix index
331
332 for left < right {
333 attempts++
334 mid := (left + right) / 2
335
336 // Get entry offset from offset table
337 offsetPos := offsetTableStart + (mid * 4)
338 if offsetPos+4 > len(data) {
339 if dim.verbose {
340 dim.logger.Printf("DEBUG: Offset position out of bounds")
341 }
342 return nil
343 }
344
345 entryOffset := int(binary.LittleEndian.Uint32(data[offsetPos : offsetPos+4]))
346
347 if entryOffset+DID_IDENTIFIER_LEN > len(data) {
348 if dim.verbose {
349 dim.logger.Printf("DEBUG: Entry offset out of bounds: %d + %d > %d",
350 entryOffset, DID_IDENTIFIER_LEN, len(data))
351 }
352 return nil
353 }
354
355 entryID := string(data[entryOffset : entryOffset+DID_IDENTIFIER_LEN])
356
357 if dim.verbose && attempts <= 5 {
358 dim.logger.Printf("DEBUG: Attempt %d: mid=%d, comparing '%s' vs '%s'",
359 attempts, mid, identifier, entryID)
360 }
361
362 // Compare
363 if identifier == entryID {
364 if dim.verbose {
365 dim.logger.Printf("DEBUG: FOUND at mid=%d after %d attempts (vs ~%d without prefix index)",
366 mid, attempts, logBase2(int(entryCount)))
367 }
368 return dim.readLocations(data, entryOffset)
369 } else if identifier < entryID {
370 right = mid
371 } else {
372 left = mid + 1
373 }
374 }
375
376 if dim.verbose {
377 dim.logger.Printf("DEBUG: NOT FOUND after %d attempts", attempts)
378 }
379
380 return nil
381}
382
383// Helper function
384func logBase2(n int) int {
385 if n <= 0 {
386 return 0
387 }
388 count := 0
389 for n > 1 {
390 n >>= 1
391 count++
392 }
393 return count
394}
395
396// getEntryOffset reads entry offset from offset table - O(1) lookup
397func (dim *Manager) getEntryOffset(data []byte, entryIndex int) int {
398 if len(data) < 1056 {
399 return -1
400 }
401
402 entryCount := binary.LittleEndian.Uint32(data[9:13])
403 if entryIndex < 0 || entryIndex >= int(entryCount) {
404 return -1
405 }
406
407 // Offset table starts at 1056 (after header + prefix index)
408 offsetTableStart := 1056
409 offsetPos := offsetTableStart + (entryIndex * 4)
410
411 if offsetPos+4 > len(data) {
412 return -1
413 }
414
415 offset := int(binary.LittleEndian.Uint32(data[offsetPos : offsetPos+4]))
416
417 if offset < 0 || offset >= len(data) {
418 return -1
419 }
420
421 return offset
422}
423
424// readLocations reads location data at given offset
425func (dim *Manager) readLocations(data []byte, offset int) []OpLocation {
426 // Skip identifier
427 offset += DID_IDENTIFIER_LEN
428
429 // Read count
430 if offset+2 > len(data) {
431 return nil
432 }
433 count := binary.LittleEndian.Uint16(data[offset : offset+2])
434 offset += 2
435
436 // Read locations
437 locations := make([]OpLocation, count)
438 for i := 0; i < int(count); i++ {
439 if offset+4 > len(data) {
440 return locations[:i]
441 }
442
443 // Read packed uint32
444 packed := binary.LittleEndian.Uint32(data[offset : offset+4])
445 locations[i] = OpLocation(packed)
446
447 offset += 4
448 }
449
450 return locations
451}
452
453// unmapShard unmaps and closes a shard
454func (dim *Manager) unmapShard(shard *mmapShard) {
455 if shard.data != nil {
456 unix.Madvise(shard.data, unix.MADV_DONTNEED)
457
458 syscall.Munmap(shard.data)
459 }
460 if shard.file != nil {
461 if f, ok := shard.file.(*os.File); ok {
462 f.Close()
463 }
464 }
465}
466
467// GetStats returns index statistics (updated)
468func (dim *Manager) GetStats() map[string]interface{} {
469 cachedShards := make([]int, 0)
470
471 dim.shardCache.Range(func(key, value interface{}) bool {
472 cachedShards = append(cachedShards, int(key.(uint8)))
473 return true
474 })
475
476 sort.Ints(cachedShards)
477
478 // Calculate cache hit rate
479 hits := atomic.LoadInt64(&dim.cacheHits)
480 misses := atomic.LoadInt64(&dim.cacheMisses)
481 total := hits + misses
482
483 cacheHitRate := 0.0
484 if total > 0 {
485 cacheHitRate = float64(hits) / float64(total)
486 }
487
488 baseStats := map[string]interface{}{
489 "total_dids": dim.config.TotalDIDs,
490 "last_bundle": dim.config.LastBundle,
491 "shard_count": dim.config.ShardCount,
492 "cached_shards": len(cachedShards),
493 "cache_limit": dim.maxCache,
494 "cache_order": cachedShards,
495 "updated_at": dim.config.UpdatedAt,
496 "cache_hits": hits,
497 "cache_misses": misses,
498 "cache_hit_rate": cacheHitRate,
499 "total_lookups": total,
500 }
501
502 // Merge with performance stats
503 perfStats := dim.calculateLookupStats()
504 for k, v := range perfStats {
505 baseStats[k] = v
506 }
507
508 return baseStats
509}
510
511// Exists checks if index exists
512func (dim *Manager) Exists() bool {
513 _, err := os.Stat(dim.configPath)
514 return err == nil
515}
516
517// TrimCache trims cache to keep only most recent shard
518func (dim *Manager) TrimCache() {
519 // Count current size
520 size := 0
521 dim.shardCache.Range(func(k, v interface{}) bool {
522 size++
523 return true
524 })
525
526 if size <= 1 {
527 return
528 }
529
530 // Find most recent shard
531 var newestTime int64
532 var keepNum uint8
533
534 dim.shardCache.Range(func(key, value interface{}) bool {
535 shard := value.(*mmapShard)
536 lastUsed := atomic.LoadInt64(&shard.lastUsed)
537 if lastUsed > newestTime {
538 newestTime = lastUsed
539 keepNum = key.(uint8)
540 }
541 return true
542 })
543
544 // Evict all except newest
545 dim.shardCache.Range(func(key, value interface{}) bool {
546 num := key.(uint8)
547 if num != keepNum {
548 if val, ok := dim.shardCache.LoadAndDelete(key); ok {
549 shard := val.(*mmapShard)
550 dim.unmapShard(shard)
551 }
552 }
553 return true
554 })
555}
556
557// GetConfig returns the index configuration
558func (dim *Manager) GetConfig() *Config {
559 return dim.config
560}
561
562// DebugShard shows shard debugging information
563func (dim *Manager) DebugShard(shardNum uint8) error {
564 shardPath := filepath.Join(dim.shardDir, fmt.Sprintf("%02x.idx", shardNum))
565 data, err := os.ReadFile(shardPath)
566 if err != nil {
567 return err
568 }
569
570 fmt.Printf("Shard %02x debug:\n", shardNum)
571 fmt.Printf(" File size: %d bytes\n", len(data))
572 fmt.Printf(" Magic: %s\n", string(data[0:4]))
573 fmt.Printf(" Version: %d\n", binary.LittleEndian.Uint32(data[4:8]))
574 fmt.Printf(" Shard num: %d\n", data[8])
575
576 entryCount := binary.LittleEndian.Uint32(data[9:13])
577 fmt.Printf(" Entry count: %d\n", entryCount)
578
579 // Show offset table info
580 offsetTableSize := int(entryCount) * 4
581 dataStartOffset := 32 + offsetTableSize
582 fmt.Printf(" Offset table size: %d bytes\n", offsetTableSize)
583 fmt.Printf(" Data starts at: %d\n", dataStartOffset)
584
585 // Show first few entries
586 fmt.Printf("\n First 5 entries:\n")
587
588 for i := 0; i < 5 && i < int(entryCount); i++ {
589 offset := dim.getEntryOffset(data, i)
590 if offset < 0 || offset+DID_IDENTIFIER_LEN+2 > len(data) {
591 break
592 }
593
594 identifier := string(data[offset : offset+DID_IDENTIFIER_LEN])
595 locCount := binary.LittleEndian.Uint16(data[offset+DID_IDENTIFIER_LEN : offset+DID_IDENTIFIER_LEN+2])
596
597 fmt.Printf(" %d. '%s' (%d locations) @ offset %d\n", i+1, identifier, locCount, offset)
598 }
599
600 return nil
601}
602
603func (dim *Manager) invalidateShard(shardNum uint8) {
604 if val, ok := dim.shardCache.LoadAndDelete(shardNum); ok {
605 shard := val.(*mmapShard)
606
607 for atomic.LoadInt64(&shard.refCount) > 0 {
608 time.Sleep(1 * time.Millisecond)
609 }
610
611 dim.unmapShard(shard)
612 }
613}
614
615// writeShard writes a shard to disk in binary format with offset table
616func (dim *Manager) writeShard(shardNum uint8, builder *ShardBuilder) error {
617 // Write to temp file first
618 shardPath := filepath.Join(dim.shardDir, fmt.Sprintf("%02x.idx", shardNum))
619 tempPath := shardPath + ".tmp"
620
621 if err := dim.writeShardToPath(tempPath, shardNum, builder); err != nil {
622 return err
623 }
624
625 // Atomic rename
626 if err := os.Rename(tempPath, shardPath); err != nil {
627 os.Remove(tempPath)
628 return err
629 }
630
631 // Invalidate cache for this shard
632 dim.invalidateShard(shardNum)
633
634 return nil
635}
636
637// writeShardToPath writes shard to a specific path with prefix index
638func (dim *Manager) writeShardToPath(path string, shardNum uint8, builder *ShardBuilder) error {
639 // Sort identifiers for binary search
640 identifiers := make([]string, 0, len(builder.entries))
641 for id := range builder.entries {
642 identifiers = append(identifiers, id)
643 }
644 sort.Strings(identifiers)
645
646 if len(identifiers) == 0 {
647 // Write empty shard
648 return os.WriteFile(path, make([]byte, 0), 0644)
649 }
650
651 // Build prefix index: map first byte → first entry index with that prefix
652 prefixIndex := make([]uint32, 256)
653 for i := range prefixIndex {
654 prefixIndex[i] = 0xFFFFFFFF // Marker for "no entries"
655 }
656
657 for i, identifier := range identifiers {
658 if len(identifier) == 0 {
659 continue
660 }
661 prefixByte := identifier[0]
662
663 // Set to first occurrence only
664 if prefixIndex[prefixByte] == 0xFFFFFFFF {
665 prefixIndex[prefixByte] = uint32(i)
666 }
667 }
668
669 // Calculate entry offsets
670 offsetTableStart := 1056 // After header (32) + prefix index (1024)
671 dataStartOffset := offsetTableStart + (len(identifiers) * 4)
672
673 offsetTable := make([]uint32, len(identifiers))
674 currentOffset := dataStartOffset
675
676 for i, id := range identifiers {
677 offsetTable[i] = uint32(currentOffset)
678 locations := builder.entries[id]
679 entrySize := DID_IDENTIFIER_LEN + 2 + (len(locations) * 4)
680 currentOffset += entrySize
681 }
682
683 totalSize := currentOffset
684
685 // Allocate buffer
686 buf := make([]byte, totalSize)
687
688 // Write header (32 bytes)
689 copy(buf[0:4], DIDINDEX_MAGIC)
690 binary.LittleEndian.PutUint32(buf[4:8], DIDINDEX_VERSION)
691 buf[8] = shardNum
692 binary.LittleEndian.PutUint32(buf[9:13], uint32(len(identifiers)))
693 // bytes 13-31: reserved (zeros)
694
695 // Write prefix index (32-1055: 256 × 4 bytes)
696 for i, entryIdx := range prefixIndex {
697 pos := 32 + (i * 4)
698 binary.LittleEndian.PutUint32(buf[pos:pos+4], entryIdx)
699 }
700
701 // Write offset table (1056+)
702 for i, offset := range offsetTable {
703 pos := offsetTableStart + (i * 4)
704 binary.LittleEndian.PutUint32(buf[pos:pos+4], offset)
705 }
706
707 // Write entries (same as before)
708 for i, identifier := range identifiers {
709 offset := int(offsetTable[i])
710 locations := builder.entries[identifier]
711
712 copy(buf[offset:offset+DID_IDENTIFIER_LEN], identifier)
713 offset += DID_IDENTIFIER_LEN
714
715 binary.LittleEndian.PutUint16(buf[offset:offset+2], uint16(len(locations)))
716 offset += 2
717
718 for _, loc := range locations {
719 // Write packed uint32 (global position + nullified bit)
720 binary.LittleEndian.PutUint32(buf[offset:offset+4], uint32(loc))
721 offset += 4
722 }
723 }
724
725 return os.WriteFile(path, buf, 0644)
726}
727
728// parseShardData parses binary shard data into builder (supports v2 and v3)
729func (dim *Manager) parseShardData(data []byte, builder *ShardBuilder) error {
730 if len(data) < 32 {
731 return nil
732 }
733
734 entryCount := binary.LittleEndian.Uint32(data[9:13])
735
736 offsetTableStart := 1056
737
738 // Start reading entries after offset table
739 offset := offsetTableStart + (int(entryCount) * 4)
740
741 for i := 0; i < int(entryCount); i++ {
742 if offset+DID_IDENTIFIER_LEN+2 > len(data) {
743 break
744 }
745
746 // Read identifier
747 identifier := string(data[offset : offset+DID_IDENTIFIER_LEN])
748 offset += DID_IDENTIFIER_LEN
749
750 // Read location count
751 locCount := binary.LittleEndian.Uint16(data[offset : offset+2])
752 offset += 2
753
754 // Read locations
755 locations := make([]OpLocation, locCount)
756
757 // Check version to determine format
758 version := binary.LittleEndian.Uint32(data[4:8])
759
760 for j := 0; j < int(locCount); j++ {
761 if version >= 4 {
762 // New format: 4-byte packed uint32
763 if offset+4 > len(data) {
764 break
765 }
766 packed := binary.LittleEndian.Uint32(data[offset : offset+4])
767 locations[j] = OpLocation(packed)
768 offset += 4
769 } else {
770 // Old format: 5-byte separate fields (for migration)
771 if offset+5 > len(data) {
772 break
773 }
774 bundle := binary.LittleEndian.Uint16(data[offset : offset+2])
775 position := binary.LittleEndian.Uint16(data[offset+2 : offset+4])
776 nullified := data[offset+4] != 0
777
778 // Convert to new format
779 locations[j] = NewOpLocation(bundle, position, nullified)
780 offset += 5
781 }
782 }
783
784 builder.entries[identifier] = locations
785 }
786
787 return nil
788}
789
790// saveIndexConfig saves index configuration
791func (dim *Manager) saveIndexConfig() error {
792 dim.config.UpdatedAt = time.Now().UTC()
793
794 data, err := json.MarshalIndent(dim.config, "", " ")
795 if err != nil {
796 return err
797 }
798
799 return os.WriteFile(dim.configPath, data, 0644)
800}
801
802// extractDIDIdentifier extracts the 24-char identifier from full DID
803func extractDIDIdentifier(did string) (string, error) {
804 if err := plcclient.ValidateDIDFormat(did); err != nil {
805 return "", err
806 }
807
808 // Remove "did:plc:" prefix
809 identifier := did[8:]
810
811 if len(identifier) != DID_IDENTIFIER_LEN {
812 return "", fmt.Errorf("invalid identifier length: %d", len(identifier))
813 }
814
815 return identifier, nil
816}
817
818// loadIndexConfig loads index configuration
819func loadIndexConfig(path string) (*Config, error) {
820 data, err := os.ReadFile(path)
821 if err != nil {
822 return nil, err
823 }
824
825 var config Config
826 if err := json.Unmarshal(data, &config); err != nil {
827 return nil, err
828 }
829
830 return &config, nil
831}
832
833func (dim *Manager) evictIfNeeded() {
834 size := 0
835 dim.shardCache.Range(func(_, _ interface{}) bool {
836 size++
837 return true
838 })
839
840 if size <= dim.evictionThreshold {
841 return
842 }
843
844 type entry struct {
845 num uint8
846 lastUsed int64
847 refCount int64
848 }
849
850 var entries []entry
851
852 dim.shardCache.Range(func(key, value interface{}) bool {
853 shard := value.(*mmapShard)
854 entries = append(entries, entry{
855 num: key.(uint8),
856 lastUsed: atomic.LoadInt64(&shard.lastUsed),
857 refCount: atomic.LoadInt64(&shard.refCount),
858 })
859 return true
860 })
861
862 // Sort by lastUsed (oldest first)
863 sort.Slice(entries, func(i, j int) bool {
864 return entries[i].lastUsed < entries[j].lastUsed
865 })
866
867 // Evict oldest shards that are NOT in use
868 toEvict := size - dim.maxCache
869 evicted := 0
870
871 for i := 0; i < len(entries) && evicted < toEvict; i++ {
872 // Only evict if refCount == 0 (not in use)
873 if entries[i].refCount == 0 {
874 if val, ok := dim.shardCache.LoadAndDelete(entries[i].num); ok {
875 shard := val.(*mmapShard)
876
877 // Double-check refcount (race protection)
878 if atomic.LoadInt64(&shard.refCount) == 0 {
879 dim.unmapShard(shard)
880 evicted++
881 } else {
882 // Someone started using it - put it back
883 dim.shardCache.Store(entries[i].num, shard)
884 }
885 }
886 }
887 }
888}
889
890// releaseShard decrements reference count
891func (dim *Manager) releaseShard(shard *mmapShard) {
892 if shard == nil || shard.data == nil {
893 return
894 }
895
896 atomic.AddInt64(&shard.refCount, -1)
897}
898
899// ResetCacheStats resets cache statistics (useful for monitoring)
900func (dim *Manager) ResetCacheStats() {
901 atomic.StoreInt64(&dim.cacheHits, 0)
902 atomic.StoreInt64(&dim.cacheMisses, 0)
903}
904
905// recordLookupTime records a lookup time (thread-safe)
906func (dim *Manager) recordLookupTime(duration time.Duration) {
907 micros := duration.Microseconds()
908
909 // Update totals (atomic)
910 atomic.AddInt64(&dim.totalLookups, 1)
911 atomic.AddInt64(&dim.totalLookupTime, micros)
912
913 // Update circular buffer (with lock)
914 dim.lookupTimeLock.Lock()
915 dim.recentLookups[dim.recentLookupIdx] = micros
916 dim.recentLookupIdx = (dim.recentLookupIdx + 1) % dim.recentLookupSize
917 dim.lookupTimeLock.Unlock()
918}
919
920// calculateLookupStats calculates performance statistics
921func (dim *Manager) calculateLookupStats() map[string]interface{} {
922 totalLookups := atomic.LoadInt64(&dim.totalLookups)
923 totalTime := atomic.LoadInt64(&dim.totalLookupTime)
924
925 stats := make(map[string]interface{})
926
927 if totalLookups == 0 {
928 return stats
929 }
930
931 // Overall average (all time)
932 avgMicros := float64(totalTime) / float64(totalLookups)
933 stats["avg_lookup_time_ms"] = avgMicros / 1000.0
934 stats["total_lookups"] = totalLookups
935
936 // Recent statistics (last N lookups)
937 dim.lookupTimeLock.Lock()
938 recentCopy := make([]int64, dim.recentLookupSize)
939 copy(recentCopy, dim.recentLookups)
940 dim.lookupTimeLock.Unlock()
941
942 // Find valid entries (non-zero)
943 validRecent := make([]int64, 0, dim.recentLookupSize)
944 for _, t := range recentCopy {
945 if t > 0 {
946 validRecent = append(validRecent, t)
947 }
948 }
949
950 if len(validRecent) > 0 {
951 // Sort for percentiles
952 sortedRecent := make([]int64, len(validRecent))
953 copy(sortedRecent, validRecent)
954 sort.Slice(sortedRecent, func(i, j int) bool {
955 return sortedRecent[i] < sortedRecent[j]
956 })
957
958 // Calculate recent average
959 var recentSum int64
960 for _, t := range validRecent {
961 recentSum += t
962 }
963 recentAvg := float64(recentSum) / float64(len(validRecent))
964 stats["recent_avg_lookup_time_ms"] = recentAvg / 1000.0
965 stats["recent_sample_size"] = len(validRecent)
966
967 // Min/Max
968 stats["min_lookup_time_ms"] = float64(sortedRecent[0]) / 1000.0
969 stats["max_lookup_time_ms"] = float64(sortedRecent[len(sortedRecent)-1]) / 1000.0
970
971 // Percentiles (p50, p95, p99)
972 p50idx := len(sortedRecent) * 50 / 100
973 p95idx := len(sortedRecent) * 95 / 100
974 p99idx := len(sortedRecent) * 99 / 100
975
976 if p50idx < len(sortedRecent) {
977 stats["p50_lookup_time_ms"] = float64(sortedRecent[p50idx]) / 1000.0
978 }
979 if p95idx < len(sortedRecent) {
980 stats["p95_lookup_time_ms"] = float64(sortedRecent[p95idx]) / 1000.0
981 }
982 if p99idx < len(sortedRecent) {
983 stats["p99_lookup_time_ms"] = float64(sortedRecent[p99idx]) / 1000.0
984 }
985 }
986
987 return stats
988}
989
990// ResetPerformanceStats resets performance statistics (useful for monitoring periods)
991func (dim *Manager) ResetPerformanceStats() {
992 atomic.StoreInt64(&dim.cacheHits, 0)
993 atomic.StoreInt64(&dim.cacheMisses, 0)
994 atomic.StoreInt64(&dim.totalLookups, 0)
995 atomic.StoreInt64(&dim.totalLookupTime, 0)
996
997 dim.lookupTimeLock.Lock()
998 dim.recentLookups = make([]int64, dim.recentLookupSize)
999 dim.recentLookupIdx = 0
1000 dim.lookupTimeLock.Unlock()
1001}
1002
1003// NeedsRebuild checks if index needs rebuilding and returns reason
1004func (dim *Manager) NeedsRebuild(bundleProvider BundleIndexProvider) (bool, string) {
1005 // Check if index exists
1006 if !dim.Exists() {
1007 return true, "index does not exist"
1008 }
1009
1010 // Get repository state
1011 bundles := bundleProvider.GetBundles()
1012 if len(bundles) == 0 {
1013 return false, "" // No bundles, no need to rebuild
1014 }
1015
1016 lastBundleInRepo := bundles[len(bundles)-1].BundleNumber
1017
1018 // Check version
1019 if dim.config.Version != DIDINDEX_VERSION {
1020 return true, fmt.Sprintf("index version outdated (v%d, need v%d)",
1021 dim.config.Version, DIDINDEX_VERSION)
1022 }
1023
1024 // Check if index is behind
1025 if dim.config.LastBundle < lastBundleInRepo {
1026 bundlesBehind := lastBundleInRepo - dim.config.LastBundle
1027
1028 // Smart logic: only rebuild if significantly behind
1029 // Otherwise can do incremental update
1030 if bundlesBehind > 100 {
1031 return true, fmt.Sprintf("index significantly behind (%d bundles)", bundlesBehind)
1032 }
1033
1034 return false, fmt.Sprintf("index slightly behind (%d bundles) - can update incrementally", bundlesBehind)
1035 }
1036
1037 // Check if index is ahead (corruption indicator)
1038 if dim.config.LastBundle > lastBundleInRepo {
1039 return true, fmt.Sprintf("index is ahead of repository (has %d, repo has %d) - likely corrupted",
1040 dim.config.LastBundle, lastBundleInRepo)
1041 }
1042
1043 // Index is up to date
1044 return false, ""
1045}
1046
1047// ShouldUpdateIncrementally checks if incremental update is appropriate
1048func (dim *Manager) ShouldUpdateIncrementally(bundleProvider BundleIndexProvider) (bool, int) {
1049 if !dim.Exists() {
1050 return false, 0
1051 }
1052
1053 bundles := bundleProvider.GetBundles()
1054 if len(bundles) == 0 {
1055 return false, 0
1056 }
1057
1058 lastBundleInRepo := bundles[len(bundles)-1].BundleNumber
1059 bundlesBehind := lastBundleInRepo - dim.config.LastBundle
1060
1061 // Only do incremental if behind by less than 100 bundles
1062 if bundlesBehind > 0 && bundlesBehind <= 100 {
1063 return true, bundlesBehind
1064 }
1065
1066 return false, 0
1067}