[DEPRECATED] Go implementation of plcbundle
1package didindex
2
3import (
4 "context"
5 "encoding/binary"
6 "fmt"
7 "os"
8 "path/filepath"
9 "runtime"
10 "sort"
11 "sync"
12 "time"
13)
14
15// newShardBuilder creates a new shard builder
16func newShardBuilder() *ShardBuilder {
17 return &ShardBuilder{
18 entries: make(map[string][]OpLocation),
19 }
20}
21
22// add adds a location to the shard
23func (sb *ShardBuilder) add(identifier string, loc OpLocation) {
24 sb.mu.Lock()
25 defer sb.mu.Unlock()
26
27 sb.entries[identifier] = append(sb.entries[identifier], loc)
28}
29
30// updateAndSaveConfig updates config with new values and saves atomically
31func (dim *Manager) updateAndSaveConfig(totalDIDs int64, lastBundle int) error {
32 dim.config.TotalDIDs = totalDIDs
33 dim.config.LastBundle = lastBundle
34 dim.config.Version = DIDINDEX_VERSION
35 dim.config.Format = "binary_v4"
36 dim.config.UpdatedAt = time.Now().UTC()
37
38 return dim.saveIndexConfig()
39}
40
41// BuildIndexFromScratch builds index with controlled memory usage
42func (dim *Manager) BuildIndexFromScratch(ctx context.Context, mgr BundleProvider, progressCallback func(current, total int)) error {
43 dim.indexMu.Lock()
44 defer dim.indexMu.Unlock()
45
46 dim.logger.Printf("Building DID index from scratch (memory-efficient mode)...")
47
48 bundles := mgr.GetBundleIndex().GetBundles()
49 if len(bundles) == 0 {
50 return fmt.Errorf("no bundles to index")
51 }
52
53 if err := os.MkdirAll(dim.shardDir, 0755); err != nil {
54 return fmt.Errorf("failed to create shard directory: %w", err)
55 }
56
57 // Create temporary shard files
58 tempShards := make([]*os.File, DID_SHARD_COUNT)
59 for i := 0; i < DID_SHARD_COUNT; i++ {
60 tempPath := filepath.Join(dim.shardDir, fmt.Sprintf("%02x.tmp", i))
61 f, err := os.Create(tempPath)
62 if err != nil {
63 for j := 0; j < i; j++ {
64 tempShards[j].Close()
65 os.Remove(filepath.Join(dim.shardDir, fmt.Sprintf("%02x.tmp", j)))
66 }
67 return fmt.Errorf("failed to create temp shard: %w", err)
68 }
69 tempShards[i] = f
70 }
71
72 dim.logger.Printf("Pass 1/2: Scanning %d bundles...", len(bundles))
73
74 // Stream all operations to temp files
75 for i, meta := range bundles {
76 select {
77 case <-ctx.Done():
78 for _, f := range tempShards {
79 f.Close()
80 os.Remove(f.Name())
81 }
82 return ctx.Err()
83 default:
84 }
85
86 if progressCallback != nil {
87 progressCallback(i+1, len(bundles))
88 }
89
90 // Load bundle
91 bundle, err := mgr.LoadBundleForDIDIndex(ctx, meta.BundleNumber)
92 if err != nil {
93 dim.logger.Printf("Warning: failed to load bundle %d: %v", meta.BundleNumber, err)
94 continue
95 }
96
97 // Process each operation
98 for pos, op := range bundle.Operations {
99 identifier, err := extractDIDIdentifier(op.DID)
100 if err != nil {
101 continue
102 }
103
104 shardNum := dim.calculateShard(identifier)
105
106 // Write entry: [24 bytes ID][4 bytes packed OpLocation]
107 entry := make([]byte, 28)
108 copy(entry[0:24], identifier)
109
110 // Create packed OpLocation (includes nullified bit)
111 loc := NewOpLocation(uint16(meta.BundleNumber), uint16(pos), op.IsNullified())
112 binary.LittleEndian.PutUint32(entry[24:28], uint32(loc))
113
114 if _, err := tempShards[shardNum].Write(entry); err != nil {
115 dim.logger.Printf("Warning: failed to write to temp shard %02x: %v", shardNum, err)
116 }
117 }
118 }
119
120 // Close temp files
121 for _, f := range tempShards {
122 f.Close()
123 }
124
125 dim.logger.Printf("\n")
126 dim.logger.Printf("Pass 2/2: Consolidating %d shards...", DID_SHARD_COUNT)
127
128 // Consolidate shards
129 totalDIDs := int64(0)
130 for i := 0; i < DID_SHARD_COUNT; i++ {
131 // Log every 32 shards
132 if i%32 == 0 || i == DID_SHARD_COUNT-1 {
133 dim.logger.Printf(" Consolidating shards: %d/%d (%.1f%%)",
134 i+1, DID_SHARD_COUNT, float64(i+1)/float64(DID_SHARD_COUNT)*100)
135 }
136
137 count, err := dim.consolidateShard(uint8(i))
138 if err != nil {
139 return fmt.Errorf("failed to consolidate shard %02x: %w", i, err)
140 }
141 totalDIDs += count
142 }
143
144 if err := dim.updateAndSaveConfig(totalDIDs, bundles[len(bundles)-1].BundleNumber); err != nil {
145 return fmt.Errorf("failed to save config: %w", err)
146 }
147
148 dim.logger.Printf("✓ Index built: %d DIDs indexed", totalDIDs)
149
150 return nil
151}
152
153// consolidateShard reads temp file, sorts, and writes final shard
154func (dim *Manager) consolidateShard(shardNum uint8) (int64, error) {
155 tempPath := filepath.Join(dim.shardDir, fmt.Sprintf("%02x.tmp", shardNum))
156
157 // Read all entries from temp file
158 data, err := os.ReadFile(tempPath)
159 if err != nil {
160 if os.IsNotExist(err) {
161 return 0, nil
162 }
163 return 0, err
164 }
165
166 if len(data) == 0 {
167 os.Remove(tempPath)
168 return 0, nil
169 }
170
171 // Parse entries (28 bytes each)
172 entryCount := len(data) / 28
173 if len(data)%28 != 0 {
174 return 0, fmt.Errorf("corrupted temp shard: size not multiple of 28")
175 }
176
177 type tempEntry struct {
178 identifier string
179 location OpLocation
180 }
181
182 entries := make([]tempEntry, entryCount)
183 for i := 0; i < entryCount; i++ {
184 offset := i * 28
185 entries[i] = tempEntry{
186 identifier: string(data[offset : offset+24]),
187 location: OpLocation(binary.LittleEndian.Uint32(data[offset+24 : offset+28])),
188 }
189 }
190
191 // Free the data slice
192 data = nil
193
194 // Sort by identifier
195 sort.Slice(entries, func(i, j int) bool {
196 return entries[i].identifier < entries[j].identifier
197 })
198
199 // Group by DID
200 builder := newShardBuilder()
201 for _, entry := range entries {
202 builder.add(entry.identifier, entry.location)
203 }
204
205 // Free entries
206 entries = nil
207
208 // Write final shard
209 if err := dim.writeShard(shardNum, builder); err != nil {
210 return 0, err
211 }
212
213 // Clean up temp file
214 os.Remove(tempPath)
215
216 return int64(len(builder.entries)), nil
217}
218
219// UpdateIndexForBundle adds operations from a new bundle (incremental + ATOMIC + PARALLEL)
220func (dim *Manager) UpdateIndexForBundle(ctx context.Context, bundle *BundleData) error {
221 dim.indexMu.Lock()
222 defer dim.indexMu.Unlock()
223
224 totalStart := time.Now()
225
226 // STEP 1: Group operations by shard
227 groupStart := time.Now()
228 shardOps := make(map[uint8]map[string][]OpLocation)
229
230 for pos, op := range bundle.Operations {
231 identifier, err := extractDIDIdentifier(op.DID)
232 if err != nil {
233 continue
234 }
235
236 shardNum := dim.calculateShard(identifier)
237
238 if shardOps[shardNum] == nil {
239 shardOps[shardNum] = make(map[string][]OpLocation)
240 }
241
242 loc := NewOpLocation(uint16(bundle.BundleNumber), uint16(pos), op.IsNullified())
243 shardOps[shardNum][identifier] = append(shardOps[shardNum][identifier], loc)
244 }
245
246 groupDuration := time.Since(groupStart)
247 if dim.verbose {
248 dim.logger.Printf(" [DID Index] Grouped operations into %d shards in %s",
249 len(shardOps), groupDuration)
250 }
251
252 // STEP 2: Write ALL shards to .tmp files FIRST (PARALLEL)
253 writeStart := time.Now()
254
255 tmpShards := make(map[uint8]string)
256 var tmpShardsMu sync.Mutex
257 var deltaCount int64
258 var deltaCountMu sync.Mutex
259
260 // Error handling
261 errChan := make(chan error, len(shardOps))
262
263 // Worker pool
264 workers := runtime.NumCPU()
265 if workers > len(shardOps) {
266 workers = len(shardOps)
267 }
268 if workers < 1 {
269 workers = 1
270 }
271
272 semaphore := make(chan struct{}, workers)
273 var wg sync.WaitGroup
274
275 if dim.verbose {
276 dim.logger.Printf(" [DID Index] Updating %d shards in parallel (%d workers)...",
277 len(shardOps), workers)
278 }
279
280 // Process each shard in parallel
281 for shardNum, newOps := range shardOps {
282 wg.Add(1)
283 go func(sNum uint8, ops map[string][]OpLocation) {
284 defer wg.Done()
285
286 // Acquire semaphore (limit concurrency)
287 semaphore <- struct{}{}
288 defer func() { <-semaphore }()
289
290 shardStart := time.Now()
291 tmpPath := filepath.Join(dim.shardDir, fmt.Sprintf("%02x.idx.tmp", sNum))
292
293 addedCount, err := dim.updateShardToTemp(sNum, ops, tmpPath)
294 if err != nil {
295 errChan <- fmt.Errorf("shard %02x: %w", sNum, err)
296 return
297 }
298
299 shardDuration := time.Since(shardStart)
300
301 // Update shared state
302 tmpShardsMu.Lock()
303 tmpShards[sNum] = tmpPath
304 tmpShardsMu.Unlock()
305
306 deltaCountMu.Lock()
307 deltaCount += addedCount
308 deltaCountMu.Unlock()
309
310 // Debug log for each shard
311 if dim.verbose {
312 dim.logger.Printf(" Shard %02x: +%d DIDs in %s (%d ops)",
313 sNum, addedCount, shardDuration, len(ops))
314 }
315 }(shardNum, newOps)
316 }
317
318 // Wait for all workers
319 wg.Wait()
320 close(errChan)
321
322 writeDuration := time.Since(writeStart)
323 if dim.verbose {
324 dim.logger.Printf(" [DID Index] Wrote %d temp files in %s (%.1f shards/sec)",
325 len(tmpShards), writeDuration, float64(len(tmpShards))/writeDuration.Seconds())
326 }
327
328 // Check for errors
329 if err := <-errChan; err != nil {
330 dim.cleanupTempShards(tmpShards)
331 return err
332 }
333
334 // STEP 3: Atomically commit ALL shards
335 commitStart := time.Now()
336
337 for shardNum, tmpPath := range tmpShards {
338 finalPath := filepath.Join(dim.shardDir, fmt.Sprintf("%02x.idx", shardNum))
339
340 if err := os.Rename(tmpPath, finalPath); err != nil {
341 dim.logger.Printf("ERROR: Failed to commit shard %02x: %v", shardNum, err)
342 return fmt.Errorf("failed to commit shard %02x: %w", shardNum, err)
343 }
344
345 // Invalidate cache
346 dim.invalidateShard(shardNum)
347 }
348
349 commitDuration := time.Since(commitStart)
350
351 // STEP 4: Update config
352 configStart := time.Now()
353
354 newTotal := dim.config.TotalDIDs + deltaCount
355 if err := dim.updateAndSaveConfig(newTotal, bundle.BundleNumber); err != nil {
356 return fmt.Errorf("failed to save config: %w", err)
357 }
358
359 configDuration := time.Since(configStart)
360 totalDuration := time.Since(totalStart)
361
362 // Summary log
363 if dim.verbose {
364 dim.logger.Printf(" [DID Index] ✓ Bundle %06d indexed: +%d DIDs, %d shards updated in %s",
365 bundle.BundleNumber, deltaCount, len(tmpShards), totalDuration)
366 }
367
368 if dim.verbose {
369 dim.logger.Printf(" Breakdown: group=%s write=%s commit=%s config=%s",
370 groupDuration, writeDuration, commitDuration, configDuration)
371 dim.logger.Printf(" Throughput: %.0f ops/sec",
372 float64(len(bundle.Operations))/totalDuration.Seconds())
373 }
374
375 return nil
376}
377
378// updateShardToTemp updates a shard and writes to temp file
379func (dim *Manager) updateShardToTemp(shardNum uint8, newOps map[string][]OpLocation, tmpPath string) (int64, error) {
380 shardPath := filepath.Join(dim.shardDir, fmt.Sprintf("%02x.idx", shardNum))
381
382 existingBuilder := newShardBuilder()
383
384 // Read existing shard if it exists
385 if data, err := os.ReadFile(shardPath); err == nil && len(data) > 32 {
386 if err := dim.parseShardData(data, existingBuilder); err != nil {
387 return 0, fmt.Errorf("failed to parse existing shard: %w", err)
388 }
389 }
390
391 beforeCount := len(existingBuilder.entries)
392
393 // Merge new operations
394 for identifier, locations := range newOps {
395 existingBuilder.entries[identifier] = append(existingBuilder.entries[identifier], locations...)
396 }
397
398 afterCount := len(existingBuilder.entries)
399 deltaCount := int64(afterCount - beforeCount)
400
401 // Write to TEMP file
402 if err := dim.writeShardToPath(tmpPath, shardNum, existingBuilder); err != nil {
403 return 0, err
404 }
405
406 return deltaCount, nil
407}
408
409// cleanupTempShards removes all temporary shard files
410func (dim *Manager) cleanupTempShards(tmpShards map[uint8]string) {
411 for shardNum, tmpPath := range tmpShards {
412 if err := os.Remove(tmpPath); err != nil && !os.IsNotExist(err) {
413 dim.logger.Printf("Warning: failed to cleanup temp shard %02x: %v", shardNum, err)
414 }
415 }
416}
417
418// VerifyAndRepairIndex checks if index is consistent with bundles and repairs if needed
419func (dim *Manager) VerifyAndRepairIndex(ctx context.Context, mgr BundleProvider) error {
420 bundles := mgr.GetBundleIndex().GetBundles()
421 if len(bundles) == 0 {
422 return nil
423 }
424
425 lastBundleInRepo := bundles[len(bundles)-1].BundleNumber
426 lastBundleInIndex := dim.config.LastBundle
427
428 if lastBundleInIndex == lastBundleInRepo {
429 return nil
430 }
431
432 if lastBundleInIndex > lastBundleInRepo {
433 dim.logger.Printf("⚠️ Warning: Index claims bundle %d but only %d bundles exist",
434 lastBundleInIndex, lastBundleInRepo)
435 dim.logger.Printf(" Rebuilding index...")
436 return dim.BuildIndexFromScratch(ctx, mgr, nil)
437 }
438
439 // Index is behind - update incrementally
440 dim.logger.Printf("Index is behind: has bundle %d, need %d",
441 lastBundleInIndex, lastBundleInRepo)
442 dim.logger.Printf("Updating index for %d missing bundles...",
443 lastBundleInRepo-lastBundleInIndex)
444
445 for bundleNum := lastBundleInIndex + 1; bundleNum <= lastBundleInRepo; bundleNum++ {
446 bundle, err := mgr.LoadBundleForDIDIndex(ctx, bundleNum)
447 if err != nil {
448 return fmt.Errorf("failed to load bundle %d: %w", bundleNum, err)
449 }
450
451 if err := dim.UpdateIndexForBundle(ctx, bundle); err != nil {
452 return fmt.Errorf("failed to update index for bundle %d: %w", bundleNum, err)
453 }
454
455 if bundleNum%100 == 0 {
456 dim.logger.Printf(" Updated through bundle %d...", bundleNum)
457 }
458 }
459
460 dim.logger.Printf("✓ Index repaired: now at bundle %d", lastBundleInRepo)
461 return nil
462}