[DEPRECATED] Go implementation of plcbundle
1package bundle
2
3import (
4 "context"
5 "fmt"
6 "io"
7 "log"
8 "os"
9 "path/filepath"
10 "runtime"
11 "runtime/debug"
12 "sort"
13 "strings"
14 "sync"
15 "sync/atomic"
16 "time"
17
18 plcbundle "tangled.org/atscan.net/plcbundle-rs/bindings/go"
19 "tangled.org/atscan.net/plcbundle/internal/bundleindex"
20 "tangled.org/atscan.net/plcbundle/internal/didindex"
21 "tangled.org/atscan.net/plcbundle/internal/handleresolver"
22 "tangled.org/atscan.net/plcbundle/internal/mempool"
23 "tangled.org/atscan.net/plcbundle/internal/plcclient"
24 "tangled.org/atscan.net/plcbundle/internal/storage"
25 internalsync "tangled.org/atscan.net/plcbundle/internal/sync"
26 "tangled.org/atscan.net/plcbundle/internal/types"
27)
28
29// defaultLogger is a simple logger implementation
30type defaultLogger struct{}
31
32func (d defaultLogger) Printf(format string, v ...interface{}) {
33 log.Printf(format, v...)
34}
35
36func (d defaultLogger) Println(v ...interface{}) {
37 log.Println(v...)
38}
39
40// Manager handles bundle operations
41type Manager struct {
42 config *Config
43 operations *storage.Operations
44 index *bundleindex.Index
45 indexPath string
46 logger types.Logger
47 mempool *mempool.Mempool
48 didIndex *didindex.Manager
49
50 plcClient *plcclient.Client
51 handleResolver *handleresolver.Client
52
53 syncer *internalsync.Fetcher
54 cloner *internalsync.Cloner
55
56 bundleCache map[int]*Bundle
57 cacheMu sync.RWMutex
58 maxCacheSize int
59
60 // Resolver performance tracking
61 resolverStats struct {
62 sync.Mutex
63 totalResolutions int64
64 mempoolHits int64
65 bundleHits int64
66 errors int64
67
68 // Timing (in microseconds)
69 totalTime int64
70 totalMempoolTime int64
71 totalIndexTime int64
72 totalLoadOpTime int64
73
74 // Recent timings (circular buffer)
75 recentTimes []resolverTiming
76 recentIdx int
77 recentSize int
78 }
79
80 // Rust-based bundle manager for high-performance operations
81 rsManager *plcbundle.BundleManager
82 rsManagerOnce sync.Once
83 rsManagerErr error
84}
85
86// NewManager creates a new bundle manager
87func NewManager(config *Config, plcClient *plcclient.Client) (*Manager, error) {
88 if config == nil {
89 config = DefaultConfig("./plc_bundles")
90 }
91
92 if config.Logger == nil {
93 config.Logger = defaultLogger{}
94 }
95
96 // CHECK: Don't auto-create if repository doesn't exist
97 repoExists := repositoryExists(config.BundleDir)
98
99 if !repoExists && !config.AutoInit {
100 return nil, fmt.Errorf(
101 "no plcbundle repository found in: %s\n\n"+
102 "Initialize a new repository with:\n"+
103 " plcbundle clone <url> # Clone from remote\n"+
104 " plcbundle sync # Fetch from PLC directory",
105 config.BundleDir,
106 )
107 }
108
109 // Ensure directory exists (only if repo exists OR AutoInit is enabled)
110 if err := os.MkdirAll(config.BundleDir, 0755); err != nil {
111 return nil, fmt.Errorf("failed to create bundle directory: %w", err)
112 }
113
114 // Initialize operations handler
115 ops, err := storage.NewOperations(config.Logger, config.Verbose)
116 if err != nil {
117 return nil, fmt.Errorf("failed to initialize operations: %w", err)
118 }
119
120 // Determine origin
121 var origin string
122 if plcClient != nil {
123 origin = plcClient.GetBaseURL()
124 }
125
126 // Load or create index
127 indexPath := filepath.Join(config.BundleDir, bundleindex.INDEX_FILE)
128 index, err := bundleindex.LoadIndex(indexPath)
129
130 // Check for bundle files in directory
131 bundleFiles, _ := filepath.Glob(filepath.Join(config.BundleDir, "*.jsonl.zst"))
132 bundleFiles = filterBundleFiles(bundleFiles)
133 hasBundleFiles := len(bundleFiles) > 0
134
135 // Check if clone/download is in progress (look for .tmp files)
136 tmpFiles, _ := filepath.Glob(filepath.Join(config.BundleDir, "*.tmp"))
137 cloneInProgress := len(tmpFiles) > 0
138
139 needsRebuild := false
140
141 if err != nil {
142 // Index doesn't exist or is invalid
143 if hasBundleFiles {
144 if cloneInProgress {
145 config.Logger.Printf("Clone/download in progress, skipping auto-rebuild")
146 } else {
147 config.Logger.Printf("No valid index found, but detected %d bundle files", len(bundleFiles))
148 needsRebuild = true
149 }
150 } else {
151 // No index and no bundles - create fresh index
152 config.Logger.Printf("Creating new index at %s", indexPath)
153 index = bundleindex.NewIndex(origin)
154 if err := index.Save(indexPath); err != nil {
155 return nil, fmt.Errorf("failed to save new index: %w", err)
156 }
157 }
158 } else {
159 // Index exists - auto-populate origin if missing
160 if index.Origin == "" {
161 if origin != "" {
162 config.Logger.Printf("⚠️ Upgrading old index: setting origin to %s", origin)
163 index.Origin = origin
164 if err := index.Save(indexPath); err != nil {
165 return nil, fmt.Errorf("failed to update index with origin: %w", err)
166 }
167 } else {
168 config.Logger.Printf("⚠️ Warning: index has no origin and no PLC client configured")
169 }
170 }
171
172 // Validate origin matches if both are set
173 if index.Origin != "" && origin != "" && index.Origin != origin {
174 return nil, fmt.Errorf(
175 "origin mismatch: index has origin %q but PLC client points to %q\n"+
176 "Cannot mix bundles from different sources. Use a different directory or reconfigure PLC client",
177 index.Origin, origin,
178 )
179 }
180
181 if config.Verbose {
182 config.Logger.Printf("Loaded index with %d bundles (origin: %s)", index.Count(), index.Origin)
183 }
184
185 // Check if there are bundle files not in the index
186 if hasBundleFiles && len(bundleFiles) > index.Count() {
187 if cloneInProgress {
188 config.Logger.Printf("Clone/download in progress (%d .tmp files), skipping auto-rebuild", len(tmpFiles))
189 } else {
190 config.Logger.Printf("Detected %d bundle files but index only has %d entries - rebuilding",
191 len(bundleFiles), index.Count())
192 needsRebuild = true
193 }
194 }
195 }
196
197 if index != nil && plcClient != nil {
198 currentOrigin := plcClient.GetBaseURL()
199
200 // Check if origins match
201 if index.Origin != "" && index.Origin != currentOrigin {
202 return nil, fmt.Errorf(
203 "origin mismatch: index has origin %q but PLC client points to %q. "+
204 "Cannot mix bundles from different sources",
205 index.Origin, currentOrigin,
206 )
207 }
208
209 // Set origin if not set (for backward compatibility with old indexes)
210 if index.Origin == "" && currentOrigin != "" {
211 index.Origin = currentOrigin
212 config.Logger.Printf("Setting origin for existing index: %s", currentOrigin)
213 if err := index.Save(indexPath); err != nil {
214 return nil, fmt.Errorf("failed to update index with origin: %w", err)
215 }
216 }
217 }
218
219 // Perform rebuild if needed (using parallel scan)
220 if needsRebuild && config.AutoRebuild {
221 config.Logger.Printf("Rebuilding index from %d bundle files...", len(bundleFiles))
222
223 // Create temporary manager for scanning
224 tempMgr := &Manager{
225 config: config,
226 operations: ops,
227 index: bundleindex.NewIndex("test-origin"),
228 indexPath: indexPath,
229 logger: config.Logger,
230 }
231
232 // Use parallel scan with auto-detected CPU count
233 workers := config.RebuildWorkers
234 if workers <= 0 {
235 workers = runtime.NumCPU()
236 if workers < 1 {
237 workers = 1
238 }
239 }
240
241 config.Logger.Printf("Using %d workers for parallel scan", workers)
242
243 // Create progress callback wrapper with new signature
244 var progressCallback func(current, total int, bytesProcessed int64)
245 if config.RebuildProgress != nil {
246 // Wrap the old-style callback to work with new signature
247 oldCallback := config.RebuildProgress
248 progressCallback = func(current, total int, bytesProcessed int64) {
249 oldCallback(current, total)
250 }
251 } else {
252 // Default: log every 100 bundles
253 progressCallback = func(current, total int, bytesProcessed int64) {
254 if current%100 == 0 || current == total {
255 mbProcessed := float64(bytesProcessed) / (1024 * 1024)
256 config.Logger.Printf("Rebuild progress: %d/%d bundles (%.1f%%), %.1f MB processed",
257 current, total, float64(current)/float64(total)*100, mbProcessed)
258 }
259 }
260 }
261
262 start := time.Now()
263
264 // Scan directory to rebuild index (parallel)
265 result, err := tempMgr.ScanDirectoryParallel(workers, progressCallback)
266 if err != nil {
267 return nil, fmt.Errorf("failed to rebuild index: %w", err)
268 }
269
270 elapsed := time.Since(start)
271
272 // Reload the rebuilt index
273 index, err = bundleindex.LoadIndex(indexPath)
274 if err != nil {
275 return nil, fmt.Errorf("failed to load rebuilt index: %w", err)
276 }
277
278 // Calculate throughput
279 mbPerSec := float64(result.TotalUncompressed) / elapsed.Seconds() / (1024 * 1024)
280
281 config.Logger.Printf("✓ Index rebuilt with %d bundles in %s",
282 index.Count(), elapsed.Round(time.Millisecond))
283 config.Logger.Printf(" Speed: %.1f bundles/sec, %.1f MB/s (uncompressed)",
284 float64(result.BundleCount)/elapsed.Seconds(), mbPerSec)
285
286 // Verify all chain hashes are present
287 bundles := index.GetBundles()
288 missingHashes := 0
289 for i, meta := range bundles {
290 if meta.ContentHash == "" {
291 missingHashes++
292 }
293 if i > 0 && meta.Hash == "" {
294 missingHashes++
295 }
296 }
297 if missingHashes > 0 {
298 config.Logger.Printf("⚠️ Warning: %d bundles have missing hashes", missingHashes)
299 }
300 }
301
302 if index == nil {
303 index = bundleindex.NewIndex("test-origin")
304 }
305
306 // Initialize mempool for next bundle
307 lastBundle := index.GetLastBundle()
308 nextBundleNum := 1
309 var minTimestamp time.Time
310
311 if lastBundle != nil {
312 nextBundleNum = lastBundle.BundleNumber + 1
313 minTimestamp = lastBundle.EndTime
314 }
315
316 mempool, err := mempool.NewMempool(config.BundleDir, nextBundleNum, minTimestamp, config.Logger, config.Verbose)
317 if err != nil {
318 return nil, fmt.Errorf("failed to initialize mempool: %w", err)
319 }
320
321 // Initialize DID index manager
322 didIndex := didindex.NewManager(config.BundleDir, config.Logger)
323
324 // Initialize sync components
325 fetcher := internalsync.NewFetcher(plcClient, ops, config.Logger)
326 cloner := internalsync.NewCloner(ops, config.BundleDir, config.Logger)
327
328 // Initialize handle resolver if configured
329 var handleResolver *handleresolver.Client
330 if config.HandleResolverURL != "" {
331 handleResolver = handleresolver.NewClient(config.HandleResolverURL)
332 }
333
334 m := &Manager{
335 config: config,
336 operations: ops,
337 index: index,
338 indexPath: indexPath,
339 logger: config.Logger,
340 mempool: mempool,
341 didIndex: didIndex, // Updated type
342 bundleCache: make(map[int]*Bundle),
343 maxCacheSize: 10,
344 syncer: fetcher,
345 cloner: cloner,
346 plcClient: plcClient,
347 handleResolver: handleResolver,
348 }
349 // Initialize resolver stats
350 m.resolverStats.recentSize = 1000
351 m.resolverStats.recentTimes = make([]resolverTiming, 1000)
352
353 return m, nil
354}
355
356// getRSManager lazily initializes the Rust bundle manager
357func (m *Manager) getRSManager() (*plcbundle.BundleManager, error) {
358 m.rsManagerOnce.Do(func() {
359 rsMgr, err := plcbundle.NewBundleManager(m.config.BundleDir)
360 if err != nil {
361 m.rsManagerErr = fmt.Errorf("failed to create Rust bundle manager: %w", err)
362 return
363 }
364 m.rsManager = rsMgr
365 })
366 return m.rsManager, m.rsManagerErr
367}
368
369// Close cleans up resources
370func (m *Manager) Close() {
371 if m.rsManager != nil {
372 m.rsManager.Close()
373 }
374 if m.operations != nil {
375 m.operations.Close()
376 }
377 if m.plcClient != nil {
378 m.plcClient.Close()
379 }
380 if m.mempool != nil {
381 if err := m.mempool.Save(); err != nil {
382 m.logger.Printf("Warning: failed to save mempool: %v", err)
383 }
384 }
385 if m.didIndex != nil {
386 m.didIndex.Close()
387 }
388}
389
390// GetIndex returns the current index
391func (m *Manager) GetIndex() *bundleindex.Index {
392 return m.index
393}
394
395// SaveIndex saves the index to disk
396func (m *Manager) SaveIndex() error {
397 return m.index.Save(m.indexPath)
398}
399
400// LoadBundle with caching
401func (m *Manager) LoadBundle(ctx context.Context, bundleNumber int) (*Bundle, error) {
402 // Check cache first
403 m.cacheMu.RLock()
404 if cached, ok := m.bundleCache[bundleNumber]; ok {
405 m.cacheMu.RUnlock()
406 return cached, nil
407 }
408 m.cacheMu.RUnlock()
409
410 // Load from disk (existing code)
411 bundle, err := m.loadBundleFromDisk(ctx, bundleNumber)
412 if err != nil {
413 return nil, err
414 }
415
416 // Add to cache
417 m.cacheMu.Lock()
418 m.bundleCache[bundleNumber] = bundle
419
420 // Simple LRU: if cache too big, remove oldest
421 if len(m.bundleCache) > m.maxCacheSize {
422 // Remove a random one (or implement proper LRU)
423 for k := range m.bundleCache {
424 delete(m.bundleCache, k)
425 break
426 }
427 }
428 m.cacheMu.Unlock()
429
430 return bundle, nil
431}
432
433// loadBundleFromDisk loads a bundle from disk
434func (m *Manager) loadBundleFromDisk(_ context.Context, bundleNumber int) (*Bundle, error) {
435 // Get metadata from index
436 meta, err := m.index.GetBundle(bundleNumber)
437 if err != nil {
438 return nil, fmt.Errorf("bundle not in index: %w", err)
439 }
440
441 // Load file
442 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", bundleNumber))
443 if !m.operations.FileExists(path) {
444 return nil, fmt.Errorf("bundle file not found: %s", path)
445 }
446
447 // Verify hash if enabled
448 if m.config.VerifyOnLoad {
449 valid, actualHash, err := m.operations.VerifyHash(path, meta.CompressedHash)
450 if err != nil {
451 return nil, fmt.Errorf("failed to verify hash: %w", err)
452 }
453 if !valid {
454 return nil, fmt.Errorf("hash mismatch: expected %s, got %s", meta.CompressedHash, actualHash)
455 }
456 }
457
458 // Load operations
459 operations, err := m.operations.LoadBundle(path)
460 if err != nil {
461 return nil, fmt.Errorf("failed to load bundle: %w", err)
462 }
463
464 // Create bundle struct
465 bundle := &Bundle{
466 BundleNumber: meta.BundleNumber,
467 StartTime: meta.StartTime,
468 EndTime: meta.EndTime,
469 Operations: operations,
470 DIDCount: meta.DIDCount,
471 Hash: meta.Hash,
472 ContentHash: meta.ContentHash,
473 Parent: meta.Parent,
474 CompressedHash: meta.CompressedHash,
475 CompressedSize: meta.CompressedSize,
476 UncompressedSize: meta.UncompressedSize,
477 Cursor: meta.Cursor,
478 Compressed: true,
479 CreatedAt: meta.CreatedAt,
480 }
481
482 return bundle, nil
483}
484
485// SaveBundle saves a bundle to disk and updates the index
486func (m *Manager) SaveBundle(ctx context.Context, bundle *Bundle, verbose bool, quiet bool, stats types.BundleProductionStats, skipDIDIndex bool) (time.Duration, error) {
487
488 totalStart := time.Now()
489 if err := bundle.ValidateForSave(); err != nil {
490 return 0, fmt.Errorf("bundle validation failed: %w", err)
491 }
492
493 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", bundle.BundleNumber))
494
495 // Get parent
496 var parent string
497 if bundle.BundleNumber > 1 {
498 prevBundle := m.index.GetLastBundle()
499 if prevBundle != nil {
500 parent = prevBundle.Hash
501 } else {
502 if prevMeta, err := m.index.GetBundle(bundle.BundleNumber - 1); err == nil {
503 parent = prevMeta.Hash
504 }
505 }
506 }
507 bundle.Parent = parent
508
509 // Get origin
510 origin := m.index.Origin
511 if m.plcClient != nil {
512 origin = m.plcClient.GetBaseURL()
513 }
514
515 // Get version
516 version := "dev"
517 if info, ok := debug.ReadBuildInfo(); ok && info.Main.Version != "" && info.Main.Version != "(devel)" {
518 version = info.Main.Version
519 }
520
521 // Get hostname
522 hostname, _ := os.Hostname()
523
524 // Create BundleInfo
525 bundleInfo := &storage.BundleInfo{
526 BundleNumber: bundle.BundleNumber,
527 Origin: origin,
528 ParentHash: parent,
529 Cursor: bundle.Cursor,
530 CreatedBy: fmt.Sprintf("plcbundle/%s", version),
531 Hostname: hostname,
532 }
533
534 if m.config.Verbose {
535 m.logger.Printf("DEBUG: Calling operations.SaveBundle with bundle=%d", bundleInfo.BundleNumber)
536 }
537
538 // Save to disk with 3 parameters
539 uncompressedHash, compressedHash, uncompressedSize, compressedSize, err := m.operations.SaveBundle(path, bundle.Operations, bundleInfo)
540 if err != nil {
541 m.logger.Printf("DEBUG: SaveBundle FAILED: %v", err)
542 return 0, fmt.Errorf("failed to save bundle: %w", err)
543 }
544
545 if m.config.Verbose {
546 m.logger.Printf("DEBUG: SaveBundle SUCCESS, setting bundle fields")
547 }
548
549 bundle.Hash = m.operations.CalculateChainHash(parent, bundle.ContentHash)
550 bundle.ContentHash = uncompressedHash
551 bundle.CompressedHash = compressedHash
552 bundle.UncompressedSize = uncompressedSize
553 bundle.CompressedSize = compressedSize
554 bundle.CreatedAt = time.Now().UTC()
555 bundle.Hash = m.operations.CalculateChainHash(parent, bundle.ContentHash)
556
557 if m.config.Verbose {
558 m.logger.Printf("DEBUG: Adding bundle %d to index", bundle.BundleNumber)
559 }
560
561 // Add to index
562 m.index.AddBundle(bundle.ToMetadata())
563
564 if m.config.Verbose {
565 m.logger.Printf("DEBUG: Index now has %d bundles", m.index.Count())
566 }
567
568 // Save index
569 if err := m.SaveIndex(); err != nil {
570 m.logger.Printf("DEBUG: SaveIndex FAILED: %v", err)
571 return 0, fmt.Errorf("failed to save index: %w", err)
572 }
573
574 if m.config.Verbose {
575 m.logger.Printf("DEBUG: Index saved, last bundle = %d", m.index.GetLastBundle().BundleNumber)
576 }
577
578 saveDuration := time.Since(totalStart)
579
580 // Clean up old mempool
581 oldMempoolFile := m.mempool.GetFilename()
582 if err := m.mempool.Delete(); err != nil && !quiet {
583 m.logger.Printf("Warning: failed to delete old mempool %s: %v", oldMempoolFile, err)
584 }
585
586 // Create new mempool
587 nextBundle := bundle.BundleNumber + 1
588 minTimestamp := bundle.EndTime
589
590 newMempool, err := mempool.NewMempool(m.config.BundleDir, nextBundle, minTimestamp, m.logger, m.config.Verbose)
591 if err != nil {
592 return 0, fmt.Errorf("failed to create new mempool: %w", err)
593 }
594
595 m.mempool = newMempool
596
597 // DID index update (if enabled)
598 var indexUpdateDuration time.Duration
599 if !skipDIDIndex && m.didIndex != nil && m.didIndex.Exists() {
600 indexUpdateStart := time.Now()
601 if err := m.updateDIDIndexForBundle(ctx, bundle); err != nil {
602 m.logger.Printf("Warning: failed to update DID index: %v", err)
603 } else {
604 indexUpdateDuration = time.Since(indexUpdateStart)
605 if !quiet && m.config.Verbose {
606 m.logger.Printf(" [DID Index] Updated in %s", indexUpdateDuration)
607 }
608 }
609 }
610
611 if !quiet {
612 msg := fmt.Sprintf("→ Bundle %06d | %s | fetch: %s (%d reqs)",
613 bundle.BundleNumber,
614 bundle.Hash[0:7],
615 stats.TotalDuration.Round(time.Millisecond),
616 stats.TotalFetches,
617 )
618 if indexUpdateDuration > 0 {
619 msg += fmt.Sprintf(" | index: %s", indexUpdateDuration.Round(time.Millisecond))
620 }
621 msg += fmt.Sprintf(" | %s", formatTimeDistance(time.Since(bundle.EndTime)))
622 m.logger.Println(msg)
623 }
624
625 if m.config.Verbose {
626 m.logger.Printf("DEBUG: Bundle done = %d, finish duration = %s",
627 m.index.GetLastBundle().BundleNumber,
628 saveDuration.Round(time.Millisecond))
629 }
630
631 return indexUpdateDuration, nil
632}
633
634// GetMempoolStats returns mempool statistics
635func (m *Manager) GetMempoolStats() map[string]interface{} {
636 return m.mempool.Stats()
637}
638
639// GetMempoolOperations returns all operations currently in mempool
640func (m *Manager) GetMempoolOperations() ([]plcclient.PLCOperation, error) {
641 if m.mempool == nil {
642 return nil, fmt.Errorf("mempool not initialized")
643 }
644
645 count := m.mempool.Count()
646 if count == 0 {
647 return []plcclient.PLCOperation{}, nil
648 }
649
650 return m.mempool.Peek(count), nil
651}
652
653// Add to Bundle type to implement BundleData interface
654func (b *Bundle) GetBundleNumber() int {
655 return b.BundleNumber
656}
657
658// VerifyBundle verifies a bundle's integrity
659func (m *Manager) VerifyBundle(ctx context.Context, bundleNumber int) (*VerificationResult, error) {
660 result := &VerificationResult{
661 BundleNumber: bundleNumber,
662 }
663
664 // Get from index
665 meta, err := m.index.GetBundle(bundleNumber)
666 if err != nil {
667 result.Error = err
668 return result, nil
669 }
670
671 result.ExpectedHash = meta.CompressedHash
672
673 // Check file exists
674 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", bundleNumber))
675 result.FileExists = m.operations.FileExists(path)
676 if !result.FileExists {
677 result.Error = fmt.Errorf("file not found")
678 return result, nil
679 }
680
681 // Verify BOTH compressed and content hashes
682 compHash, compSize, contentHash, contentSize, err := m.operations.CalculateFileHashes(path)
683 if err != nil {
684 result.Error = err
685 return result, nil
686 }
687
688 result.LocalHash = compHash
689
690 // Verify compressed hash
691 if compHash != meta.CompressedHash {
692 result.HashMatch = false
693 result.Valid = false
694 result.Error = fmt.Errorf("compressed hash mismatch: expected %s, got %s", meta.CompressedHash, compHash)
695 return result, nil
696 }
697
698 // Verify content hash
699 if contentHash != meta.ContentHash {
700 result.HashMatch = false
701 result.Valid = false
702 result.Error = fmt.Errorf("content hash mismatch: expected %s, got %s", meta.ContentHash, contentHash)
703 return result, nil
704 }
705
706 // Verify sizes match
707 if compSize != meta.CompressedSize {
708 result.Valid = false
709 result.Error = fmt.Errorf("compressed size mismatch: expected %d, got %d", meta.CompressedSize, compSize)
710 return result, nil
711 }
712
713 if contentSize != meta.UncompressedSize {
714 result.Valid = false
715 result.Error = fmt.Errorf("uncompressed size mismatch: expected %d, got %d", meta.UncompressedSize, contentSize)
716 return result, nil
717 }
718
719 result.HashMatch = true
720 result.Valid = true
721
722 return result, nil
723}
724
725// GetInfo returns information about the bundle manager
726func (m *Manager) GetInfo() map[string]interface{} {
727 stats := m.index.GetStats()
728 stats["bundle_dir"] = m.config.BundleDir
729 stats["index_path"] = m.indexPath
730 stats["verify_on_load"] = m.config.VerifyOnLoad
731 return stats
732}
733
734// GetRSManager returns the Rust bundle manager (proxy method)
735func (m *Manager) GetRSManager() (*plcbundle.BundleManager, error) {
736 return m.getRSManager()
737}
738
739// IsBundleIndexed checks if a bundle is already in the index
740func (m *Manager) IsBundleIndexed(bundleNumber int) bool {
741 _, err := m.index.GetBundle(bundleNumber)
742 return err == nil
743}
744
745// RefreshMempool reloads mempool from disk
746func (m *Manager) RefreshMempool() error {
747 if m.mempool == nil {
748 return fmt.Errorf("mempool not initialized")
749 }
750 return m.mempool.Load()
751}
752
753// ClearMempool clears all operations from the mempool and saves
754func (m *Manager) ClearMempool() error {
755 if m.mempool == nil {
756 return fmt.Errorf("mempool not initialized")
757 }
758
759 m.logger.Printf("Clearing mempool...")
760
761 count := m.mempool.Count()
762
763 m.mempool.Clear()
764
765 if err := m.mempool.Save(); err != nil {
766 return fmt.Errorf("failed to save mempool: %w", err)
767 }
768
769 m.logger.Printf("Cleared %d operations from mempool", count)
770
771 return nil
772}
773
774// ValidateMempool validates mempool
775func (m *Manager) ValidateMempool() error {
776 if m.mempool == nil {
777 return fmt.Errorf("mempool not initialized")
778 }
779 return m.mempool.Validate()
780}
781
782// StreamBundleRaw streams the raw compressed bundle file
783func (m *Manager) StreamBundleRaw(ctx context.Context, bundleNumber int) (io.ReadCloser, error) {
784 // Get metadata from index
785 meta, err := m.index.GetBundle(bundleNumber)
786 if err != nil {
787 return nil, fmt.Errorf("bundle not in index: %w", err)
788 }
789
790 // Build file path
791 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", bundleNumber))
792 if !m.operations.FileExists(path) {
793 return nil, fmt.Errorf("bundle file not found: %s", path)
794 }
795
796 // Optionally verify hash before streaming
797 if m.config.VerifyOnLoad {
798 valid, actualHash, err := m.operations.VerifyHash(path, meta.CompressedHash)
799 if err != nil {
800 return nil, fmt.Errorf("failed to verify hash: %w", err)
801 }
802 if !valid {
803 return nil, fmt.Errorf("hash mismatch: expected %s, got %s", meta.CompressedHash, actualHash)
804 }
805 }
806
807 return m.operations.StreamRaw(path)
808}
809
810// StreamBundleDecompressed streams the decompressed bundle data as JSONL
811func (m *Manager) StreamBundleDecompressed(ctx context.Context, bundleNumber int) (io.ReadCloser, error) {
812 // Get metadata from index
813 _, err := m.index.GetBundle(bundleNumber)
814 if err != nil {
815 return nil, fmt.Errorf("bundle not in index: %w", err)
816 }
817
818 // Build file path
819 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", bundleNumber))
820 if !m.operations.FileExists(path) {
821 return nil, fmt.Errorf("bundle file not found: %s", path)
822 }
823
824 return m.operations.StreamDecompressed(path)
825}
826
827// RefreshIndex reloads the index from disk if it has been modified
828func (m *Manager) RefreshIndex() error {
829 // Check if index file has been modified
830 info, err := os.Stat(m.indexPath)
831 if err != nil {
832 return err
833 }
834
835 // If index was modified after we loaded it, reload
836 if info.ModTime().After(m.index.UpdatedAt) {
837 m.logger.Printf("Index file modified, reloading...")
838
839 newIndex, err := bundleindex.LoadIndex(m.indexPath)
840 if err != nil {
841 return fmt.Errorf("failed to reload index: %w", err)
842 }
843
844 m.index = newIndex
845 m.logger.Printf("Index reloaded: %d bundles", m.index.Count())
846 }
847
848 return nil
849}
850
851// GetMempool returns the current mempool
852func (m *Manager) GetMempool() *mempool.Mempool {
853 return m.mempool
854}
855
856// SaveMempool saves the current mempool state to disk
857func (m *Manager) SaveMempool() error {
858 if m.mempool == nil {
859 return fmt.Errorf("mempool not initialized")
860 }
861 return m.mempool.Save()
862}
863
864// GetPLCOrigin returns the PLC directory origin URL
865func (m *Manager) GetPLCOrigin() string {
866 if m.plcClient == nil {
867 return ""
868 }
869 return m.plcClient.GetBaseURL()
870}
871
872// GetCurrentCursor returns the current latest cursor position (including mempool)
873func (m *Manager) GetCurrentCursor() int {
874 index := m.GetIndex()
875 bundles := index.GetBundles()
876 cursor := len(bundles) * types.BUNDLE_SIZE
877
878 // Add mempool operations
879 mempoolStats := m.GetMempoolStats()
880 if count, ok := mempoolStats["count"].(int); ok {
881 cursor += count
882 }
883
884 return cursor
885}
886
887// LoadOperation loads a single operation from a bundle efficiently
888func (m *Manager) LoadOperation(ctx context.Context, bundleNumber int, position int) (*plcclient.PLCOperation, error) {
889 // Validate position
890 if position < 0 || position >= types.BUNDLE_SIZE {
891 return nil, fmt.Errorf("invalid position: %d (must be 0-%d)", position, types.BUNDLE_SIZE-1)
892 }
893
894 // Validate bundle exists in index
895 _, err := m.index.GetBundle(bundleNumber)
896 if err != nil {
897 return nil, fmt.Errorf("bundle not in index: %w", err)
898 }
899
900 // Build file path
901 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", bundleNumber))
902 if !m.operations.FileExists(path) {
903 return nil, fmt.Errorf("bundle file not found: %s", path)
904 }
905
906 // Load just the one operation (optimized - decompresses only until position)
907 return m.operations.LoadOperationAtPosition(path, position)
908}
909
910// LoadOperations loads multiple operations from a bundle efficiently
911func (m *Manager) LoadOperations(ctx context.Context, bundleNumber int, positions []int) (map[int]*plcclient.PLCOperation, error) {
912 // Build file path
913 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", bundleNumber))
914 if !m.operations.FileExists(path) {
915 return nil, fmt.Errorf("bundle file not found: %s", path)
916 }
917
918 return m.operations.LoadOperationsAtPositions(path, positions)
919}
920
921// filterBundleFiles filters out files starting with . or _
922func filterBundleFiles(files []string) []string {
923 filtered := make([]string, 0, len(files))
924 for _, file := range files {
925 basename := filepath.Base(file)
926 if len(basename) > 0 && (basename[0] == '.' || basename[0] == '_') {
927 continue
928 }
929 filtered = append(filtered, file)
930 }
931 return filtered
932}
933
934// ==========================================
935// DID INDEX INTEGRATION (adapter methods)
936// ==========================================
937
938// Implement BundleProvider interface for didindex
939func (m *Manager) LoadBundleForDIDIndex(ctx context.Context, bundleNumber int) (*didindex.BundleData, error) {
940 bundle, err := m.LoadBundle(ctx, bundleNumber)
941 if err != nil {
942 return nil, err
943 }
944
945 return &didindex.BundleData{
946 BundleNumber: bundle.BundleNumber,
947 Operations: bundle.Operations,
948 }, nil
949}
950
951func (m *Manager) GetBundleIndex() didindex.BundleIndexProvider {
952 return &bundleIndexAdapter{index: m.index}
953}
954
955// bundleIndexAdapter adapts Index to BundleIndexProvider interface
956type bundleIndexAdapter struct {
957 index *bundleindex.Index
958}
959
960func (a *bundleIndexAdapter) GetBundles() []*didindex.BundleMetadata {
961 bundles := a.index.GetBundles()
962 result := make([]*didindex.BundleMetadata, len(bundles))
963 for i, b := range bundles {
964 result[i] = &didindex.BundleMetadata{
965 BundleNumber: b.BundleNumber,
966 StartTime: b.StartTime,
967 EndTime: b.EndTime,
968 }
969 }
970 return result
971}
972
973func (a *bundleIndexAdapter) GetBundle(bundleNumber int) (*didindex.BundleMetadata, error) {
974 meta, err := a.index.GetBundle(bundleNumber)
975 if err != nil {
976 return nil, err
977 }
978 return &didindex.BundleMetadata{
979 BundleNumber: meta.BundleNumber,
980 StartTime: meta.StartTime,
981 EndTime: meta.EndTime,
982 }, nil
983}
984
985func (a *bundleIndexAdapter) GetLastBundle() *didindex.BundleMetadata {
986 meta := a.index.GetLastBundle()
987 if meta == nil {
988 return nil
989 }
990 return &didindex.BundleMetadata{
991 BundleNumber: meta.BundleNumber,
992 StartTime: meta.StartTime,
993 EndTime: meta.EndTime,
994 }
995}
996
997// GetDIDIndex returns the DID index manager
998func (m *Manager) GetDIDIndex() *didindex.Manager {
999 return m.didIndex
1000}
1001
1002// BuildDIDIndex builds the complete DID index
1003func (m *Manager) BuildDIDIndex(ctx context.Context, progressCallback func(current, total int)) error {
1004 if m.didIndex == nil {
1005 m.didIndex = didindex.NewManager(m.config.BundleDir, m.logger)
1006 }
1007
1008 return m.didIndex.BuildIndexFromScratch(ctx, m, progressCallback)
1009}
1010
1011// updateDIDIndexForBundle updates index when a new bundle is added
1012func (m *Manager) updateDIDIndexForBundle(ctx context.Context, bundle *Bundle) error {
1013 if m.didIndex == nil {
1014 return nil
1015 }
1016
1017 // Convert to didindex.BundleData
1018 bundleData := &didindex.BundleData{
1019 BundleNumber: bundle.BundleNumber,
1020 Operations: bundle.Operations,
1021 }
1022
1023 return m.didIndex.UpdateIndexForBundle(ctx, bundleData)
1024}
1025
1026// GetDIDIndexStats returns DID index statistics
1027func (m *Manager) GetDIDIndexStats() map[string]interface{} {
1028 if m.didIndex == nil {
1029 return map[string]interface{}{
1030 "enabled": false,
1031 }
1032 }
1033
1034 stats := m.didIndex.GetStats()
1035 stats["enabled"] = true
1036 stats["exists"] = m.didIndex.Exists()
1037
1038 indexedDIDs := stats["total_dids"].(int64)
1039
1040 // Get unique DIDs from mempool
1041 mempoolDIDCount := int64(0)
1042 if m.mempool != nil {
1043 mempoolStats := m.GetMempoolStats()
1044 if didCount, ok := mempoolStats["did_count"].(int); ok {
1045 mempoolDIDCount = int64(didCount)
1046 }
1047 }
1048
1049 stats["indexed_dids"] = indexedDIDs
1050 stats["mempool_dids"] = mempoolDIDCount
1051 stats["total_dids"] = indexedDIDs + mempoolDIDCount
1052
1053 return stats
1054}
1055
1056// GetDIDOperations retrieves all operations for a DID (bundles + mempool combined)
1057// Returns: operations only, operations with locations, error
1058func (m *Manager) GetDIDOperations(ctx context.Context, did string, verbose bool) ([]plcclient.PLCOperation, []PLCOperationWithLocation, error) {
1059 if err := plcclient.ValidateDIDFormat(did); err != nil {
1060 return nil, nil, err
1061 }
1062
1063 // Set verbose mode
1064 if m.didIndex != nil {
1065 m.didIndex.SetVerbose(verbose)
1066 }
1067
1068 // Get bundled operations from DID index (includes nullified)
1069 bundledOpsWithLoc, err := m.didIndex.GetDIDOperations(ctx, did, m)
1070 if err != nil {
1071 return nil, nil, err
1072 }
1073
1074 // Convert to bundle types
1075 opsWithLoc := make([]PLCOperationWithLocation, len(bundledOpsWithLoc))
1076 bundledOps := make([]plcclient.PLCOperation, len(bundledOpsWithLoc))
1077 for i, r := range bundledOpsWithLoc {
1078 opsWithLoc[i] = PLCOperationWithLocation{
1079 Operation: r.Operation,
1080 Bundle: r.Bundle,
1081 Position: r.Position,
1082 }
1083 bundledOps[i] = r.Operation
1084 }
1085
1086 // Get mempool operations
1087 mempoolOps, err := m.GetDIDOperationsFromMempool(did)
1088 if err != nil {
1089 return nil, nil, err
1090 }
1091
1092 if len(mempoolOps) > 0 && verbose {
1093 m.logger.Printf("DEBUG: Found %d operations in mempool", len(mempoolOps))
1094 }
1095
1096 // Combine operations (for the slice return)
1097 allOps := append(bundledOps, mempoolOps...)
1098
1099 sort.Slice(allOps, func(i, j int) bool {
1100 return allOps[i].CreatedAt.Before(allOps[j].CreatedAt)
1101 })
1102
1103 return allOps, opsWithLoc, nil
1104}
1105
1106// GetDIDOperationsFromMempool retrieves operations for a DID from mempool only
1107func (m *Manager) GetDIDOperationsFromMempool(did string) ([]plcclient.PLCOperation, error) {
1108 if m.mempool == nil {
1109 return []plcclient.PLCOperation{}, nil
1110 }
1111
1112 // Use direct search - only copies matching operations
1113 return m.mempool.FindDIDOperations(did), nil
1114}
1115
1116// GetLatestDIDOperation returns only the most recent non-nullified operation
1117func (m *Manager) GetLatestDIDOperation(ctx context.Context, did string) (*plcclient.PLCOperation, error) {
1118 if err := plcclient.ValidateDIDFormat(did); err != nil {
1119 return nil, err
1120 }
1121
1122 // Check mempool first (most recent data)
1123 mempoolOps, _ := m.GetDIDOperationsFromMempool(did)
1124 if len(mempoolOps) > 0 {
1125 for i := len(mempoolOps) - 1; i >= 0; i-- {
1126 if !mempoolOps[i].IsNullified() {
1127 return &mempoolOps[i], nil
1128 }
1129 }
1130 }
1131
1132 // Delegate to DID index for bundled operations
1133 return m.didIndex.GetLatestDIDOperation(ctx, did, m)
1134}
1135
1136// VerifyChain verifies the entire bundle chain
1137func (m *Manager) VerifyChain(ctx context.Context) (*ChainVerificationResult, error) {
1138 result := &ChainVerificationResult{
1139 VerifiedBundles: make([]int, 0),
1140 }
1141
1142 bundles := m.index.GetBundles()
1143 if len(bundles) == 0 {
1144 result.Valid = true
1145 return result, nil
1146 }
1147
1148 result.ChainLength = len(bundles)
1149
1150 for i, meta := range bundles {
1151 // Verify file hash
1152 vr, err := m.VerifyBundle(ctx, meta.BundleNumber)
1153 if err != nil || !vr.Valid {
1154 result.Error = fmt.Sprintf("Bundle %d hash verification failed", meta.BundleNumber)
1155 result.BrokenAt = meta.BundleNumber
1156 return result, nil
1157 }
1158
1159 // Verify chain link
1160 if i > 0 {
1161 prevMeta := bundles[i-1]
1162
1163 // Check parent reference
1164 if meta.Parent != prevMeta.Hash {
1165 result.Error = fmt.Sprintf("Chain broken at bundle %d: parent mismatch", meta.BundleNumber)
1166 result.BrokenAt = meta.BundleNumber
1167 return result, nil
1168 }
1169
1170 // Verify chain hash calculation
1171 expectedHash := m.operations.CalculateChainHash(prevMeta.Hash, meta.ContentHash)
1172 if meta.Hash != expectedHash {
1173 result.Error = fmt.Sprintf("Chain broken at bundle %d: hash mismatch", meta.BundleNumber)
1174 result.BrokenAt = meta.BundleNumber
1175 return result, nil
1176 }
1177 }
1178
1179 result.VerifiedBundles = append(result.VerifiedBundles, meta.BundleNumber)
1180 }
1181
1182 result.Valid = true
1183 return result, nil
1184}
1185
1186// FetchNextBundle fetches operations and creates a bundle, looping until caught up
1187func (m *Manager) FetchNextBundle(ctx context.Context, verbose bool, quiet bool) (*Bundle, types.BundleProductionStats, error) {
1188 if m.plcClient == nil {
1189 return nil, types.BundleProductionStats{}, fmt.Errorf("PLC client not configured")
1190 }
1191
1192 lastBundle := m.index.GetLastBundle()
1193 nextBundleNum := 1
1194 var afterTime string
1195 var prevBoundaryCIDs map[string]bool
1196 var prevBundleHash string
1197
1198 if lastBundle != nil {
1199 nextBundleNum = lastBundle.BundleNumber + 1
1200 afterTime = lastBundle.EndTime.Format(time.RFC3339Nano)
1201 prevBundleHash = lastBundle.Hash
1202
1203 // ALWAYS get boundaries from last bundle initially
1204 prevBundle, err := m.LoadBundle(ctx, lastBundle.BundleNumber)
1205 if err == nil {
1206 _, prevBoundaryCIDs = m.operations.GetBoundaryCIDs(prevBundle.Operations)
1207 if verbose {
1208 m.logger.Printf("Loaded %d boundary CIDs from bundle %06d (at %s)",
1209 len(prevBoundaryCIDs), lastBundle.BundleNumber,
1210 lastBundle.EndTime.Format(time.RFC3339)[:19])
1211 }
1212 }
1213 }
1214
1215 // If mempool has operations, update cursor but KEEP boundaries from bundle
1216 // (mempool operations already had boundary dedup applied when they were added)
1217 if m.mempool.Count() > 0 {
1218 mempoolLastTime := m.mempool.GetLastTime()
1219 if mempoolLastTime != "" {
1220 if verbose {
1221 m.logger.Printf("[DEBUG] Mempool has %d ops, resuming from %s",
1222 m.mempool.Count(), mempoolLastTime[:19])
1223 }
1224 afterTime = mempoolLastTime
1225
1226 // Calculate boundaries from MEMPOOL for next fetch
1227 mempoolOps, _ := m.GetMempoolOperations()
1228 if len(mempoolOps) > 0 {
1229 _, mempoolBoundaries := m.operations.GetBoundaryCIDs(mempoolOps)
1230 prevBoundaryCIDs = mempoolBoundaries
1231 if verbose {
1232 m.logger.Printf("Using %d boundary CIDs from mempool", len(prevBoundaryCIDs))
1233 }
1234 }
1235 }
1236 }
1237
1238 if verbose {
1239 m.logger.Printf("[DEBUG] Preparing bundle %06d (mempool: %d ops)...", nextBundleNum, m.mempool.Count())
1240 m.logger.Printf("[DEBUG] Starting cursor: %s", afterTime)
1241 }
1242
1243 totalFetches := 0
1244 maxAttempts := 50
1245 attempt := 0
1246 caughtUp := false
1247 attemptStart := time.Now()
1248
1249 for m.mempool.Count() < types.BUNDLE_SIZE && attempt < maxAttempts {
1250 attempt++
1251 needed := types.BUNDLE_SIZE - m.mempool.Count()
1252
1253 if !quiet && attempt > 1 {
1254 m.logger.Printf(" Attempt %d: Need %d more ops, cursor: %s",
1255 attempt, needed, afterTime[:19])
1256 }
1257
1258 newOps, fetchCount, err := m.syncer.FetchToMempool(
1259 ctx,
1260 afterTime,
1261 prevBoundaryCIDs,
1262 needed,
1263 !verbose,
1264 m.mempool,
1265 totalFetches,
1266 )
1267
1268 totalFetches += fetchCount
1269
1270 // Check if we got an incomplete batch
1271 gotIncompleteBatch := len(newOps) > 0 && len(newOps) < needed && err == nil
1272
1273 // Update cursor from mempool if we got new ops
1274 if len(newOps) > 0 && m.mempool.Count() > 0 {
1275 afterTime = m.mempool.GetLastTime()
1276 }
1277
1278 // Stop if caught up or error
1279 if err != nil || len(newOps) == 0 || gotIncompleteBatch {
1280 caughtUp = true
1281 if verbose && totalFetches > 0 {
1282 m.logger.Printf("DEBUG: Caught up to latest PLC data")
1283 }
1284 break
1285 }
1286
1287 if m.mempool.Count() >= types.BUNDLE_SIZE {
1288 break
1289 }
1290 }
1291
1292 totalDuration := time.Since(attemptStart)
1293
1294 if m.mempool.Count() < types.BUNDLE_SIZE {
1295 if caughtUp {
1296 return nil, types.BundleProductionStats{}, fmt.Errorf("insufficient operations: have %d, need %d (caught up to latest PLC data)",
1297 m.mempool.Count(), types.BUNDLE_SIZE)
1298 } else {
1299 return nil, types.BundleProductionStats{}, fmt.Errorf("insufficient operations: have %d, need %d (max attempts reached)",
1300 m.mempool.Count(), types.BUNDLE_SIZE)
1301 }
1302 }
1303
1304 // Create bundle
1305 operations, err := m.mempool.Take(types.BUNDLE_SIZE)
1306 if err != nil {
1307 return nil, types.BundleProductionStats{}, err
1308 }
1309
1310 syncBundle := internalsync.CreateBundle(nextBundleNum, operations, afterTime, prevBundleHash, m.operations)
1311
1312 bundle := &Bundle{
1313 BundleNumber: syncBundle.BundleNumber,
1314 StartTime: syncBundle.StartTime,
1315 EndTime: syncBundle.EndTime,
1316 Operations: syncBundle.Operations,
1317 DIDCount: syncBundle.DIDCount,
1318 Cursor: syncBundle.Cursor,
1319 Parent: syncBundle.Parent,
1320 BoundaryCIDs: syncBundle.BoundaryCIDs,
1321 Compressed: syncBundle.Compressed,
1322 CreatedAt: syncBundle.CreatedAt,
1323 }
1324
1325 stats := types.BundleProductionStats{
1326 TotalFetches: totalFetches,
1327 TotalDuration: totalDuration,
1328 AvgPerFetch: float64(types.BUNDLE_SIZE) / float64(totalFetches),
1329 Throughput: float64(types.BUNDLE_SIZE) / totalDuration.Seconds(),
1330 }
1331
1332 return bundle, stats, nil
1333}
1334
1335// CloneFromRemote clones bundles from a remote endpoint
1336func (m *Manager) CloneFromRemote(ctx context.Context, opts internalsync.CloneOptions) (*internalsync.CloneResult, error) {
1337 // Define index update callback inline
1338 updateIndexCallback := func(bundleNumbers []int, remoteMeta map[int]*bundleindex.BundleMetadata, verbose bool) error {
1339 if len(bundleNumbers) == 0 {
1340 return nil
1341 }
1342
1343 // Create file existence checker
1344 fileExists := func(bundleNum int) bool {
1345 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", bundleNum))
1346 return m.operations.FileExists(path)
1347 }
1348
1349 // Update index with remote metadata
1350 if err := m.index.UpdateFromRemote(bundleNumbers, remoteMeta, fileExists, verbose, m.logger); err != nil {
1351 return err
1352 }
1353
1354 // Save index
1355 return m.SaveIndex()
1356 }
1357
1358 // Delegate to cloner with inline callback
1359 return m.cloner.Clone(ctx, opts, m.index, updateIndexCallback)
1360}
1361
1362// ResolveDID resolves a DID to its current document with detailed timing metrics
1363func (m *Manager) ResolveDID(ctx context.Context, did string) (*ResolveDIDResult, error) {
1364 if err := plcclient.ValidateDIDFormat(did); err != nil {
1365 atomic.AddInt64(&m.resolverStats.errors, 1)
1366 return nil, err
1367 }
1368
1369 result := &ResolveDIDResult{}
1370 totalStart := time.Now()
1371
1372 // STEP 1: Check mempool first
1373 mempoolStart := time.Now()
1374 var latestMempoolOp *plcclient.PLCOperation
1375 if m.mempool != nil {
1376 latestMempoolOp = m.mempool.FindLatestDIDOperation(did)
1377 }
1378 result.MempoolTime = time.Since(mempoolStart)
1379
1380 // Early return if found in mempool
1381 if latestMempoolOp != nil {
1382 doc, err := plcclient.ResolveDIDDocument(did, []plcclient.PLCOperation{*latestMempoolOp})
1383 if err != nil {
1384 atomic.AddInt64(&m.resolverStats.errors, 1)
1385 return nil, fmt.Errorf("resolution failed: %w", err)
1386 }
1387
1388 result.Document = doc
1389 result.LatestOperation = latestMempoolOp
1390 result.Source = "mempool"
1391 result.TotalTime = time.Since(totalStart)
1392
1393 m.recordResolverTiming(result, nil)
1394 return result, nil
1395 }
1396
1397 // STEP 2: Index lookup
1398 if m.didIndex == nil || !m.didIndex.Exists() {
1399 atomic.AddInt64(&m.resolverStats.errors, 1)
1400 return nil, fmt.Errorf("DID index not available - run 'plcbundle index build' to enable DID resolution")
1401 }
1402
1403 indexStart := time.Now()
1404 locations, err := m.didIndex.GetDIDLocations(did)
1405 result.IndexTime = time.Since(indexStart)
1406
1407 if err != nil {
1408 atomic.AddInt64(&m.resolverStats.errors, 1)
1409 return nil, err
1410 }
1411
1412 if len(locations) == 0 {
1413 atomic.AddInt64(&m.resolverStats.errors, 1)
1414 return nil, fmt.Errorf("DID not found")
1415 }
1416
1417 // Find latest non-nullified location
1418 var latestLoc *didindex.OpLocation
1419 for i := range locations {
1420 if locations[i].Nullified() {
1421 continue
1422 }
1423 if latestLoc == nil || locations[i].IsAfter(*latestLoc) {
1424 latestLoc = &locations[i]
1425 }
1426 }
1427
1428 if latestLoc == nil {
1429 atomic.AddInt64(&m.resolverStats.errors, 1)
1430 return nil, fmt.Errorf("no valid operations (all nullified)")
1431 }
1432
1433 // STEP 3: Load operation
1434 opStart := time.Now()
1435 op, err := m.LoadOperation(ctx, latestLoc.BundleInt(), latestLoc.PositionInt())
1436 result.LoadOpTime = time.Since(opStart)
1437
1438 if err != nil {
1439 atomic.AddInt64(&m.resolverStats.errors, 1)
1440 return nil, fmt.Errorf("failed to load operation: %w", err)
1441 }
1442
1443 result.BundleNumber = latestLoc.BundleInt()
1444 result.Position = latestLoc.PositionInt()
1445
1446 // STEP 4: Resolve document
1447 doc, err := plcclient.ResolveDIDDocument(did, []plcclient.PLCOperation{*op})
1448 if err != nil {
1449 atomic.AddInt64(&m.resolverStats.errors, 1)
1450 return nil, fmt.Errorf("resolution failed: %w", err)
1451 }
1452
1453 result.Document = doc
1454 result.LatestOperation = op
1455 result.Source = "bundle"
1456 result.TotalTime = time.Since(totalStart)
1457
1458 m.recordResolverTiming(result, nil)
1459 return result, nil
1460}
1461
1462// GetLastBundleNumber returns the last bundle number (0 if no bundles)
1463func (m *Manager) GetLastBundleNumber() int {
1464 lastBundle := m.index.GetLastBundle()
1465 if lastBundle == nil {
1466 return 0
1467 }
1468 return lastBundle.BundleNumber
1469}
1470
1471// GetMempoolCount returns the number of operations in mempool
1472func (m *Manager) GetMempoolCount() int {
1473 return m.mempool.Count()
1474}
1475
1476func (m *Manager) FetchAndSaveNextBundle(ctx context.Context, verbose bool, quiet bool, skipDIDIndex bool) (int, *types.BundleProductionStats, error) {
1477 bundle, stats, err := m.FetchNextBundle(ctx, verbose, quiet)
1478 if err != nil {
1479 return 0, nil, err
1480 }
1481
1482 indexTime, err := m.SaveBundle(ctx, bundle, verbose, quiet, stats, skipDIDIndex)
1483 if err != nil {
1484 return 0, nil, err
1485 }
1486 stats.IndexTime = indexTime
1487
1488 return bundle.BundleNumber, &types.BundleProductionStats{}, nil
1489}
1490
1491// RunSyncLoop runs continuous sync loop (delegates to internal/sync)
1492func (m *Manager) RunSyncLoop(ctx context.Context, config *internalsync.SyncLoopConfig) error {
1493 // Manager itself implements the SyncManager interface
1494 return internalsync.RunSyncLoop(ctx, m, config)
1495}
1496
1497// RunSyncOnce performs a single sync cycle
1498func (m *Manager) RunSyncOnce(ctx context.Context, config *internalsync.SyncLoopConfig) (int, error) {
1499 // Manager itself implements the SyncManager interface
1500 return internalsync.SyncOnce(ctx, m, config)
1501}
1502
1503// EnsureDIDIndex ensures DID index is built and up-to-date
1504// Returns true if index was built/rebuilt, false if already up-to-date
1505func (m *Manager) EnsureDIDIndex(ctx context.Context, progressCallback func(current, total int)) (bool, error) {
1506 // Build index
1507 m.UpdateDIDIndexSmart(ctx, progressCallback)
1508 return true, nil
1509}
1510
1511// Add this helper function at the top of manager.go
1512func repositoryExists(bundleDir string) bool {
1513 indexPath := filepath.Join(bundleDir, bundleindex.INDEX_FILE)
1514
1515 // Check for index file
1516 if _, err := os.Stat(indexPath); err == nil {
1517 return true
1518 }
1519
1520 // Check for bundle files
1521 bundleFiles, _ := filepath.Glob(filepath.Join(bundleDir, "*.jsonl.zst"))
1522 bundleFiles = filterBundleFiles(bundleFiles)
1523
1524 return len(bundleFiles) > 0
1525}
1526
1527// ResolveHandleOrDID resolves input that can be either a handle or DID
1528// Returns: (did, handleResolveTime, error)
1529func (m *Manager) ResolveHandleOrDID(ctx context.Context, input string) (string, time.Duration, error) {
1530 input = strings.TrimSpace(input)
1531
1532 // Normalize handle format (remove at://, @ prefixes)
1533 if !strings.HasPrefix(input, "did:") {
1534 input = strings.TrimPrefix(input, "at://")
1535 input = strings.TrimPrefix(input, "@")
1536 }
1537
1538 // If already a DID, validate and return
1539 if strings.HasPrefix(input, "did:plc:") {
1540 if err := plcclient.ValidateDIDFormat(input); err != nil {
1541 return "", 0, err
1542 }
1543 return input, 0, nil // No resolution needed
1544 }
1545
1546 // Support did:web too
1547 if strings.HasPrefix(input, "did:web:") {
1548 return input, 0, nil
1549 }
1550
1551 // It's a handle - need resolver
1552 if m.handleResolver == nil {
1553 return "", 0, fmt.Errorf(
1554 "input '%s' appears to be a handle, but handle resolver is not configured\n\n"+
1555 "Configure resolver with:\n"+
1556 " plcbundle --handle-resolver https://quickdid.smokesignal.tools did resolve %s\n\n"+
1557 "Or set default in config",
1558 input, input)
1559 }
1560
1561 resolveStart := time.Now()
1562 if !m.config.Quiet {
1563 m.logger.Printf("Resolving handle: %s", input)
1564 }
1565 did, err := m.handleResolver.ResolveHandle(ctx, input)
1566 resolveTime := time.Since(resolveStart)
1567
1568 if err != nil {
1569 return "", resolveTime, fmt.Errorf("failed to resolve handle '%s': %w", input, err)
1570 }
1571
1572 if !m.config.Quiet {
1573 m.logger.Printf("Resolved: %s → %s (in %s)", input, did, resolveTime)
1574 }
1575 return did, resolveTime, nil
1576}
1577
1578// GetHandleResolver returns the handle resolver (can be nil)
1579func (m *Manager) GetHandleResolver() *handleresolver.Client {
1580 return m.handleResolver
1581}
1582
1583// recordResolverTiming records resolver performance metrics
1584func (m *Manager) recordResolverTiming(result *ResolveDIDResult, _ error) {
1585 m.resolverStats.Lock()
1586 defer m.resolverStats.Unlock()
1587
1588 // Increment counters
1589 atomic.AddInt64(&m.resolverStats.totalResolutions, 1)
1590
1591 switch result.Source {
1592 case "mempool":
1593 atomic.AddInt64(&m.resolverStats.mempoolHits, 1)
1594 case "bundle":
1595 atomic.AddInt64(&m.resolverStats.bundleHits, 1)
1596 }
1597
1598 // Record timings
1599 timing := resolverTiming{
1600 totalTime: result.TotalTime.Microseconds(),
1601 mempoolTime: result.MempoolTime.Microseconds(),
1602 indexTime: result.IndexTime.Microseconds(),
1603 loadOpTime: result.LoadOpTime.Microseconds(),
1604 source: result.Source,
1605 }
1606
1607 atomic.AddInt64(&m.resolverStats.totalTime, timing.totalTime)
1608 atomic.AddInt64(&m.resolverStats.totalMempoolTime, timing.mempoolTime)
1609 atomic.AddInt64(&m.resolverStats.totalIndexTime, timing.indexTime)
1610 atomic.AddInt64(&m.resolverStats.totalLoadOpTime, timing.loadOpTime)
1611
1612 // Add to circular buffer
1613 m.resolverStats.recentTimes[m.resolverStats.recentIdx] = timing
1614 m.resolverStats.recentIdx = (m.resolverStats.recentIdx + 1) % m.resolverStats.recentSize
1615}
1616
1617// GetResolverStats returns resolver performance statistics
1618func (m *Manager) GetResolverStats() map[string]interface{} {
1619 totalResolutions := atomic.LoadInt64(&m.resolverStats.totalResolutions)
1620
1621 if totalResolutions == 0 {
1622 return map[string]interface{}{
1623 "total_resolutions": 0,
1624 }
1625 }
1626
1627 mempoolHits := atomic.LoadInt64(&m.resolverStats.mempoolHits)
1628 bundleHits := atomic.LoadInt64(&m.resolverStats.bundleHits)
1629 errors := atomic.LoadInt64(&m.resolverStats.errors)
1630
1631 totalTime := atomic.LoadInt64(&m.resolverStats.totalTime)
1632 totalMempoolTime := atomic.LoadInt64(&m.resolverStats.totalMempoolTime)
1633 totalIndexTime := atomic.LoadInt64(&m.resolverStats.totalIndexTime)
1634 totalLoadOpTime := atomic.LoadInt64(&m.resolverStats.totalLoadOpTime)
1635
1636 // Calculate overall averages
1637 avgTotalMs := float64(totalTime) / float64(totalResolutions) / 1000.0
1638 avgMempoolMs := float64(totalMempoolTime) / float64(totalResolutions) / 1000.0
1639
1640 stats := map[string]interface{}{
1641 "total_resolutions": totalResolutions,
1642 "mempool_hits": mempoolHits,
1643 "bundle_hits": bundleHits,
1644 "errors": errors,
1645 "success_rate": float64(totalResolutions-errors) / float64(totalResolutions),
1646 "mempool_hit_rate": float64(mempoolHits) / float64(totalResolutions),
1647
1648 // Overall averages
1649 "avg_total_time_ms": avgTotalMs,
1650 "avg_mempool_time_ms": avgMempoolMs,
1651 }
1652
1653 // Only include bundle-specific stats if we have bundle hits
1654 if bundleHits > 0 {
1655 avgIndexMs := float64(totalIndexTime) / float64(bundleHits) / 1000.0
1656 avgLoadMs := float64(totalLoadOpTime) / float64(bundleHits) / 1000.0
1657
1658 stats["avg_index_time_ms"] = avgIndexMs
1659 stats["avg_load_op_time_ms"] = avgLoadMs
1660 }
1661
1662 // Recent statistics
1663 m.resolverStats.Lock()
1664 recentCopy := make([]resolverTiming, m.resolverStats.recentSize)
1665 copy(recentCopy, m.resolverStats.recentTimes)
1666 m.resolverStats.Unlock()
1667
1668 // Filter valid entries
1669 validRecent := make([]resolverTiming, 0)
1670 for _, t := range recentCopy {
1671 if t.totalTime > 0 {
1672 validRecent = append(validRecent, t)
1673 }
1674 }
1675
1676 if len(validRecent) > 0 {
1677 // Extract total times for percentiles
1678 totalTimes := make([]int64, len(validRecent))
1679 for i, t := range validRecent {
1680 totalTimes[i] = t.totalTime
1681 }
1682 sort.Slice(totalTimes, func(i, j int) bool {
1683 return totalTimes[i] < totalTimes[j]
1684 })
1685
1686 // Calculate recent average
1687 var recentSum int64
1688 var recentMempoolSum int64
1689 var recentIndexSum int64
1690 var recentLoadSum int64
1691 recentBundleCount := 0
1692
1693 for _, t := range validRecent {
1694 recentSum += t.totalTime
1695 recentMempoolSum += t.mempoolTime
1696 if t.source == "bundle" {
1697 recentIndexSum += t.indexTime
1698 recentLoadSum += t.loadOpTime
1699 recentBundleCount++
1700 }
1701 }
1702
1703 stats["recent_avg_total_time_ms"] = float64(recentSum) / float64(len(validRecent)) / 1000.0
1704 stats["recent_avg_mempool_time_ms"] = float64(recentMempoolSum) / float64(len(validRecent)) / 1000.0
1705
1706 if recentBundleCount > 0 {
1707 stats["recent_avg_index_time_ms"] = float64(recentIndexSum) / float64(recentBundleCount) / 1000.0
1708 stats["recent_avg_load_time_ms"] = float64(recentLoadSum) / float64(recentBundleCount) / 1000.0
1709 }
1710
1711 stats["recent_sample_size"] = len(validRecent)
1712
1713 // Percentiles
1714 p50idx := len(totalTimes) * 50 / 100
1715 p95idx := len(totalTimes) * 95 / 100
1716 p99idx := len(totalTimes) * 99 / 100
1717
1718 stats["min_total_time_ms"] = float64(totalTimes[0]) / 1000.0
1719 stats["max_total_time_ms"] = float64(totalTimes[len(totalTimes)-1]) / 1000.0
1720
1721 if p50idx < len(totalTimes) {
1722 stats["p50_total_time_ms"] = float64(totalTimes[p50idx]) / 1000.0
1723 }
1724 if p95idx < len(totalTimes) {
1725 stats["p95_total_time_ms"] = float64(totalTimes[p95idx]) / 1000.0
1726 }
1727 if p99idx < len(totalTimes) {
1728 stats["p99_total_time_ms"] = float64(totalTimes[p99idx]) / 1000.0
1729 }
1730 }
1731
1732 return stats
1733}
1734
1735// ResetResolverStats resets resolver performance statistics
1736func (m *Manager) ResetResolverStats() {
1737 m.resolverStats.Lock()
1738 defer m.resolverStats.Unlock()
1739
1740 atomic.StoreInt64(&m.resolverStats.totalResolutions, 0)
1741 atomic.StoreInt64(&m.resolverStats.mempoolHits, 0)
1742 atomic.StoreInt64(&m.resolverStats.bundleHits, 0)
1743 atomic.StoreInt64(&m.resolverStats.errors, 0)
1744 atomic.StoreInt64(&m.resolverStats.totalTime, 0)
1745 atomic.StoreInt64(&m.resolverStats.totalMempoolTime, 0)
1746 atomic.StoreInt64(&m.resolverStats.totalIndexTime, 0)
1747 atomic.StoreInt64(&m.resolverStats.totalLoadOpTime, 0)
1748
1749 m.resolverStats.recentTimes = make([]resolverTiming, m.resolverStats.recentSize)
1750 m.resolverStats.recentIdx = 0
1751}
1752
1753func (m *Manager) SetQuiet(quiet bool) {
1754 m.config.Quiet = quiet
1755}
1756
1757// ShouldRebuildDIDIndex checks if DID index needs rebuilding
1758// Returns: (needsRebuild bool, reason string, canUpdateIncrementally bool)
1759func (m *Manager) ShouldRebuildDIDIndex() (bool, string, bool) {
1760 if m.didIndex == nil {
1761 return false, "DID index disabled", false
1762 }
1763
1764 needsRebuild, reason := m.didIndex.NeedsRebuild(m.GetBundleIndex())
1765
1766 if needsRebuild {
1767 return true, reason, false
1768 }
1769
1770 // Check if incremental update is better
1771 canIncremental, behindBy := m.didIndex.ShouldUpdateIncrementally(m.GetBundleIndex())
1772 if canIncremental {
1773 return false, fmt.Sprintf("can update incrementally (%d bundles)", behindBy), true
1774 }
1775
1776 return false, "index is up to date", false
1777}
1778
1779// UpdateDIDIndexSmart updates DID index intelligently (rebuild vs incremental)
1780func (m *Manager) UpdateDIDIndexSmart(ctx context.Context, progressCallback func(current, total int)) error {
1781 needsRebuild, reason, canIncremental := m.ShouldRebuildDIDIndex()
1782
1783 if !needsRebuild && !canIncremental {
1784 if m.config.Verbose {
1785 m.logger.Printf("DID index is up to date")
1786 }
1787 return nil
1788 }
1789
1790 if needsRebuild {
1791 m.logger.Printf("Rebuilding DID index: %s", reason)
1792 return m.BuildDIDIndex(ctx, progressCallback)
1793 }
1794
1795 if canIncremental {
1796 m.logger.Printf("Updating DID index incrementally: %s", reason)
1797 return m.updateDIDIndexIncremental(ctx, progressCallback)
1798 }
1799
1800 return nil
1801}
1802
1803// updateDIDIndexIncremental updates index for missing bundles only
1804func (m *Manager) updateDIDIndexIncremental(ctx context.Context, progressCallback func(current, total int)) error {
1805 config := m.didIndex.GetConfig()
1806 lastBundle := m.index.GetLastBundle()
1807
1808 if lastBundle == nil || config.LastBundle >= lastBundle.BundleNumber {
1809 return nil
1810 }
1811
1812 start := config.LastBundle + 1
1813 end := lastBundle.BundleNumber
1814 total := end - start + 1
1815
1816 m.logger.Printf("Updating DID index for bundles %d-%d (%d bundles)", start, end, total)
1817
1818 for bundleNum := start; bundleNum <= end; bundleNum++ {
1819 bundle, err := m.LoadBundle(ctx, bundleNum)
1820 if err != nil {
1821 return fmt.Errorf("failed to load bundle %d: %w", bundleNum, err)
1822 }
1823
1824 bundleData := &didindex.BundleData{
1825 BundleNumber: bundle.BundleNumber,
1826 Operations: bundle.Operations,
1827 }
1828
1829 if err := m.didIndex.UpdateIndexForBundle(ctx, bundleData); err != nil {
1830 return fmt.Errorf("failed to update bundle %d: %w", bundleNum, err)
1831 }
1832
1833 if progressCallback != nil {
1834 progressCallback(bundleNum-start+1, total)
1835 }
1836 }
1837
1838 return nil
1839}