[DEPRECATED] Go implementation of plcbundle
1package bundle
2
3import (
4 "context"
5 "fmt"
6 "io"
7 "log"
8 "os"
9 "path/filepath"
10 "runtime"
11 "runtime/debug"
12 "sort"
13 "strings"
14 "sync"
15 "sync/atomic"
16 "time"
17
18 "tangled.org/atscan.net/plcbundle-go/internal/bundleindex"
19 "tangled.org/atscan.net/plcbundle-go/internal/didindex"
20 "tangled.org/atscan.net/plcbundle-go/internal/handleresolver"
21 "tangled.org/atscan.net/plcbundle-go/internal/mempool"
22 "tangled.org/atscan.net/plcbundle-go/internal/plcclient"
23 "tangled.org/atscan.net/plcbundle-go/internal/storage"
24 internalsync "tangled.org/atscan.net/plcbundle-go/internal/sync"
25 "tangled.org/atscan.net/plcbundle-go/internal/types"
26)
27
28// defaultLogger is a simple logger implementation
29type defaultLogger struct{}
30
31func (d defaultLogger) Printf(format string, v ...interface{}) {
32 log.Printf(format, v...)
33}
34
35func (d defaultLogger) Println(v ...interface{}) {
36 log.Println(v...)
37}
38
39// Manager handles bundle operations
40type Manager struct {
41 config *Config
42 operations *storage.Operations
43 index *bundleindex.Index
44 indexPath string
45 logger types.Logger
46 mempool *mempool.Mempool
47 didIndex *didindex.Manager
48
49 plcClient *plcclient.Client
50 handleResolver *handleresolver.Client
51
52 syncer *internalsync.Fetcher
53 cloner *internalsync.Cloner
54
55 bundleCache map[int]*Bundle
56 cacheMu sync.RWMutex
57 maxCacheSize int
58
59 // Resolver performance tracking
60 resolverStats struct {
61 sync.Mutex
62 totalResolutions int64
63 mempoolHits int64
64 bundleHits int64
65 errors int64
66
67 // Timing (in microseconds)
68 totalTime int64
69 totalMempoolTime int64
70 totalIndexTime int64
71 totalLoadOpTime int64
72
73 // Recent timings (circular buffer)
74 recentTimes []resolverTiming
75 recentIdx int
76 recentSize int
77 }
78}
79
80// NewManager creates a new bundle manager
81func NewManager(config *Config, plcClient *plcclient.Client) (*Manager, error) {
82 if config == nil {
83 config = DefaultConfig("./plc_bundles")
84 }
85
86 if config.Logger == nil {
87 config.Logger = defaultLogger{}
88 }
89
90 // CHECK: Don't auto-create if repository doesn't exist
91 repoExists := repositoryExists(config.BundleDir)
92
93 if !repoExists && !config.AutoInit {
94 return nil, fmt.Errorf(
95 "no plcbundle repository found in: %s\n\n"+
96 "Initialize a new repository with:\n"+
97 " plcbundle clone <url> # Clone from remote\n"+
98 " plcbundle sync # Fetch from PLC directory",
99 config.BundleDir,
100 )
101 }
102
103 // Ensure directory exists (only if repo exists OR AutoInit is enabled)
104 if err := os.MkdirAll(config.BundleDir, 0755); err != nil {
105 return nil, fmt.Errorf("failed to create bundle directory: %w", err)
106 }
107
108 // Initialize operations handler
109 ops, err := storage.NewOperations(config.Logger, config.Verbose)
110 if err != nil {
111 return nil, fmt.Errorf("failed to initialize operations: %w", err)
112 }
113
114 // Determine origin
115 var origin string
116 if plcClient != nil {
117 origin = plcClient.GetBaseURL()
118 }
119
120 // Load or create index
121 indexPath := filepath.Join(config.BundleDir, bundleindex.INDEX_FILE)
122 index, err := bundleindex.LoadIndex(indexPath)
123
124 // Check for bundle files in directory
125 bundleFiles, _ := filepath.Glob(filepath.Join(config.BundleDir, "*.jsonl.zst"))
126 bundleFiles = filterBundleFiles(bundleFiles)
127 hasBundleFiles := len(bundleFiles) > 0
128
129 // Check if clone/download is in progress (look for .tmp files)
130 tmpFiles, _ := filepath.Glob(filepath.Join(config.BundleDir, "*.tmp"))
131 cloneInProgress := len(tmpFiles) > 0
132
133 needsRebuild := false
134
135 if err != nil {
136 // Index doesn't exist or is invalid
137 if hasBundleFiles {
138 if cloneInProgress {
139 config.Logger.Printf("Clone/download in progress, skipping auto-rebuild")
140 } else {
141 config.Logger.Printf("No valid index found, but detected %d bundle files", len(bundleFiles))
142 needsRebuild = true
143 }
144 } else {
145 // No index and no bundles - create fresh index
146 config.Logger.Printf("Creating new index at %s", indexPath)
147 index = bundleindex.NewIndex(origin)
148 if err := index.Save(indexPath); err != nil {
149 return nil, fmt.Errorf("failed to save new index: %w", err)
150 }
151 }
152 } else {
153 // Index exists - auto-populate origin if missing
154 if index.Origin == "" {
155 if origin != "" {
156 config.Logger.Printf("⚠️ Upgrading old index: setting origin to %s", origin)
157 index.Origin = origin
158 if err := index.Save(indexPath); err != nil {
159 return nil, fmt.Errorf("failed to update index with origin: %w", err)
160 }
161 } else {
162 config.Logger.Printf("⚠️ Warning: index has no origin and no PLC client configured")
163 }
164 }
165
166 // Validate origin matches if both are set
167 if index.Origin != "" && origin != "" && index.Origin != origin {
168 return nil, fmt.Errorf(
169 "origin mismatch: index has origin %q but PLC client points to %q\n"+
170 "Cannot mix bundles from different sources. Use a different directory or reconfigure PLC client",
171 index.Origin, origin,
172 )
173 }
174
175 if config.Verbose {
176 config.Logger.Printf("Loaded index with %d bundles (origin: %s)", index.Count(), index.Origin)
177 }
178
179 // Check if there are bundle files not in the index
180 if hasBundleFiles && len(bundleFiles) > index.Count() {
181 if cloneInProgress {
182 config.Logger.Printf("Clone/download in progress (%d .tmp files), skipping auto-rebuild", len(tmpFiles))
183 } else {
184 config.Logger.Printf("Detected %d bundle files but index only has %d entries - rebuilding",
185 len(bundleFiles), index.Count())
186 needsRebuild = true
187 }
188 }
189 }
190
191 if index != nil && plcClient != nil {
192 currentOrigin := plcClient.GetBaseURL()
193
194 // Check if origins match
195 if index.Origin != "" && index.Origin != currentOrigin {
196 return nil, fmt.Errorf(
197 "origin mismatch: index has origin %q but PLC client points to %q. "+
198 "Cannot mix bundles from different sources",
199 index.Origin, currentOrigin,
200 )
201 }
202
203 // Set origin if not set (for backward compatibility with old indexes)
204 if index.Origin == "" && currentOrigin != "" {
205 index.Origin = currentOrigin
206 config.Logger.Printf("Setting origin for existing index: %s", currentOrigin)
207 if err := index.Save(indexPath); err != nil {
208 return nil, fmt.Errorf("failed to update index with origin: %w", err)
209 }
210 }
211 }
212
213 // Perform rebuild if needed (using parallel scan)
214 if needsRebuild && config.AutoRebuild {
215 config.Logger.Printf("Rebuilding index from %d bundle files...", len(bundleFiles))
216
217 // Create temporary manager for scanning
218 tempMgr := &Manager{
219 config: config,
220 operations: ops,
221 index: bundleindex.NewIndex("test-origin"),
222 indexPath: indexPath,
223 logger: config.Logger,
224 }
225
226 // Use parallel scan with auto-detected CPU count
227 workers := config.RebuildWorkers
228 if workers <= 0 {
229 workers = runtime.NumCPU()
230 if workers < 1 {
231 workers = 1
232 }
233 }
234
235 config.Logger.Printf("Using %d workers for parallel scan", workers)
236
237 // Create progress callback wrapper with new signature
238 var progressCallback func(current, total int, bytesProcessed int64)
239 if config.RebuildProgress != nil {
240 // Wrap the old-style callback to work with new signature
241 oldCallback := config.RebuildProgress
242 progressCallback = func(current, total int, bytesProcessed int64) {
243 oldCallback(current, total)
244 }
245 } else {
246 // Default: log every 100 bundles
247 progressCallback = func(current, total int, bytesProcessed int64) {
248 if current%100 == 0 || current == total {
249 mbProcessed := float64(bytesProcessed) / (1024 * 1024)
250 config.Logger.Printf("Rebuild progress: %d/%d bundles (%.1f%%), %.1f MB processed",
251 current, total, float64(current)/float64(total)*100, mbProcessed)
252 }
253 }
254 }
255
256 start := time.Now()
257
258 // Scan directory to rebuild index (parallel)
259 result, err := tempMgr.ScanDirectoryParallel(workers, progressCallback)
260 if err != nil {
261 return nil, fmt.Errorf("failed to rebuild index: %w", err)
262 }
263
264 elapsed := time.Since(start)
265
266 // Reload the rebuilt index
267 index, err = bundleindex.LoadIndex(indexPath)
268 if err != nil {
269 return nil, fmt.Errorf("failed to load rebuilt index: %w", err)
270 }
271
272 // Calculate throughput
273 mbPerSec := float64(result.TotalUncompressed) / elapsed.Seconds() / (1024 * 1024)
274
275 config.Logger.Printf("✓ Index rebuilt with %d bundles in %s",
276 index.Count(), elapsed.Round(time.Millisecond))
277 config.Logger.Printf(" Speed: %.1f bundles/sec, %.1f MB/s (uncompressed)",
278 float64(result.BundleCount)/elapsed.Seconds(), mbPerSec)
279
280 // Verify all chain hashes are present
281 bundles := index.GetBundles()
282 missingHashes := 0
283 for i, meta := range bundles {
284 if meta.ContentHash == "" {
285 missingHashes++
286 }
287 if i > 0 && meta.Hash == "" {
288 missingHashes++
289 }
290 }
291 if missingHashes > 0 {
292 config.Logger.Printf("⚠️ Warning: %d bundles have missing hashes", missingHashes)
293 }
294 }
295
296 if index == nil {
297 index = bundleindex.NewIndex("test-origin")
298 }
299
300 // Initialize mempool for next bundle
301 lastBundle := index.GetLastBundle()
302 nextBundleNum := 1
303 var minTimestamp time.Time
304
305 if lastBundle != nil {
306 nextBundleNum = lastBundle.BundleNumber + 1
307 minTimestamp = lastBundle.EndTime
308 }
309
310 mempool, err := mempool.NewMempool(config.BundleDir, nextBundleNum, minTimestamp, config.Logger, config.Verbose)
311 if err != nil {
312 return nil, fmt.Errorf("failed to initialize mempool: %w", err)
313 }
314
315 // Initialize DID index manager
316 didIndex := didindex.NewManager(config.BundleDir, config.Logger)
317
318 // Initialize sync components
319 fetcher := internalsync.NewFetcher(plcClient, ops, config.Logger)
320 cloner := internalsync.NewCloner(ops, config.BundleDir, config.Logger)
321
322 // Initialize handle resolver if configured
323 var handleResolver *handleresolver.Client
324 if config.HandleResolverURL != "" {
325 handleResolver = handleresolver.NewClient(config.HandleResolverURL)
326 }
327
328 m := &Manager{
329 config: config,
330 operations: ops,
331 index: index,
332 indexPath: indexPath,
333 logger: config.Logger,
334 mempool: mempool,
335 didIndex: didIndex, // Updated type
336 bundleCache: make(map[int]*Bundle),
337 maxCacheSize: 10,
338 syncer: fetcher,
339 cloner: cloner,
340 plcClient: plcClient,
341 handleResolver: handleResolver,
342 }
343 // Initialize resolver stats
344 m.resolverStats.recentSize = 1000
345 m.resolverStats.recentTimes = make([]resolverTiming, 1000)
346
347 return m, nil
348}
349
350// Close cleans up resources
351func (m *Manager) Close() {
352 if m.operations != nil {
353 m.operations.Close()
354 }
355 if m.plcClient != nil {
356 m.plcClient.Close()
357 }
358 if m.mempool != nil {
359 if err := m.mempool.Save(); err != nil {
360 m.logger.Printf("Warning: failed to save mempool: %v", err)
361 }
362 }
363 if m.didIndex != nil {
364 m.didIndex.Close()
365 }
366}
367
368// GetIndex returns the current index
369func (m *Manager) GetIndex() *bundleindex.Index {
370 return m.index
371}
372
373// SaveIndex saves the index to disk
374func (m *Manager) SaveIndex() error {
375 return m.index.Save(m.indexPath)
376}
377
378// LoadBundle with caching
379func (m *Manager) LoadBundle(ctx context.Context, bundleNumber int) (*Bundle, error) {
380 // Check cache first
381 m.cacheMu.RLock()
382 if cached, ok := m.bundleCache[bundleNumber]; ok {
383 m.cacheMu.RUnlock()
384 return cached, nil
385 }
386 m.cacheMu.RUnlock()
387
388 // Load from disk (existing code)
389 bundle, err := m.loadBundleFromDisk(ctx, bundleNumber)
390 if err != nil {
391 return nil, err
392 }
393
394 // Add to cache
395 m.cacheMu.Lock()
396 m.bundleCache[bundleNumber] = bundle
397
398 // Simple LRU: if cache too big, remove oldest
399 if len(m.bundleCache) > m.maxCacheSize {
400 // Remove a random one (or implement proper LRU)
401 for k := range m.bundleCache {
402 delete(m.bundleCache, k)
403 break
404 }
405 }
406 m.cacheMu.Unlock()
407
408 return bundle, nil
409}
410
411// loadBundleFromDisk loads a bundle from disk
412func (m *Manager) loadBundleFromDisk(_ context.Context, bundleNumber int) (*Bundle, error) {
413 // Get metadata from index
414 meta, err := m.index.GetBundle(bundleNumber)
415 if err != nil {
416 return nil, fmt.Errorf("bundle not in index: %w", err)
417 }
418
419 // Load file
420 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", bundleNumber))
421 if !m.operations.FileExists(path) {
422 return nil, fmt.Errorf("bundle file not found: %s", path)
423 }
424
425 // Verify hash if enabled
426 if m.config.VerifyOnLoad {
427 valid, actualHash, err := m.operations.VerifyHash(path, meta.CompressedHash)
428 if err != nil {
429 return nil, fmt.Errorf("failed to verify hash: %w", err)
430 }
431 if !valid {
432 return nil, fmt.Errorf("hash mismatch: expected %s, got %s", meta.CompressedHash, actualHash)
433 }
434 }
435
436 // Load operations
437 operations, err := m.operations.LoadBundle(path)
438 if err != nil {
439 return nil, fmt.Errorf("failed to load bundle: %w", err)
440 }
441
442 // Create bundle struct
443 bundle := &Bundle{
444 BundleNumber: meta.BundleNumber,
445 StartTime: meta.StartTime,
446 EndTime: meta.EndTime,
447 Operations: operations,
448 DIDCount: meta.DIDCount,
449 Hash: meta.Hash,
450 ContentHash: meta.ContentHash,
451 Parent: meta.Parent,
452 CompressedHash: meta.CompressedHash,
453 CompressedSize: meta.CompressedSize,
454 UncompressedSize: meta.UncompressedSize,
455 Cursor: meta.Cursor,
456 Compressed: true,
457 CreatedAt: meta.CreatedAt,
458 }
459
460 return bundle, nil
461}
462
463// SaveBundle saves a bundle to disk and updates the index
464func (m *Manager) SaveBundle(ctx context.Context, bundle *Bundle, verbose bool, quiet bool, stats types.BundleProductionStats, skipDIDIndex bool) (time.Duration, error) {
465
466 totalStart := time.Now()
467 if err := bundle.ValidateForSave(); err != nil {
468 return 0, fmt.Errorf("bundle validation failed: %w", err)
469 }
470
471 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", bundle.BundleNumber))
472
473 // Get parent
474 var parent string
475 if bundle.BundleNumber > 1 {
476 prevBundle := m.index.GetLastBundle()
477 if prevBundle != nil {
478 parent = prevBundle.Hash
479 } else {
480 if prevMeta, err := m.index.GetBundle(bundle.BundleNumber - 1); err == nil {
481 parent = prevMeta.Hash
482 }
483 }
484 }
485 bundle.Parent = parent
486
487 // Get origin
488 origin := m.index.Origin
489 if m.plcClient != nil {
490 origin = m.plcClient.GetBaseURL()
491 }
492
493 // Get version
494 version := "dev"
495 if info, ok := debug.ReadBuildInfo(); ok && info.Main.Version != "" && info.Main.Version != "(devel)" {
496 version = info.Main.Version
497 }
498
499 // Get hostname
500 hostname, _ := os.Hostname()
501
502 // Create BundleInfo
503 bundleInfo := &storage.BundleInfo{
504 BundleNumber: bundle.BundleNumber,
505 Origin: origin,
506 ParentHash: parent,
507 Cursor: bundle.Cursor,
508 CreatedBy: fmt.Sprintf("plcbundle/%s", version),
509 Hostname: hostname,
510 }
511
512 if m.config.Verbose {
513 m.logger.Printf("DEBUG: Calling operations.SaveBundle with bundle=%d", bundleInfo.BundleNumber)
514 }
515
516 // Save to disk with 3 parameters
517 uncompressedHash, compressedHash, uncompressedSize, compressedSize, err := m.operations.SaveBundle(path, bundle.Operations, bundleInfo)
518 if err != nil {
519 m.logger.Printf("DEBUG: SaveBundle FAILED: %v", err)
520 return 0, fmt.Errorf("failed to save bundle: %w", err)
521 }
522
523 if m.config.Verbose {
524 m.logger.Printf("DEBUG: SaveBundle SUCCESS, setting bundle fields")
525 }
526
527 bundle.Hash = m.operations.CalculateChainHash(parent, bundle.ContentHash)
528 bundle.ContentHash = uncompressedHash
529 bundle.CompressedHash = compressedHash
530 bundle.UncompressedSize = uncompressedSize
531 bundle.CompressedSize = compressedSize
532 bundle.CreatedAt = time.Now().UTC()
533 bundle.Hash = m.operations.CalculateChainHash(parent, bundle.ContentHash)
534
535 if m.config.Verbose {
536 m.logger.Printf("DEBUG: Adding bundle %d to index", bundle.BundleNumber)
537 }
538
539 // Add to index
540 m.index.AddBundle(bundle.ToMetadata())
541
542 if m.config.Verbose {
543 m.logger.Printf("DEBUG: Index now has %d bundles", m.index.Count())
544 }
545
546 // Save index
547 if err := m.SaveIndex(); err != nil {
548 m.logger.Printf("DEBUG: SaveIndex FAILED: %v", err)
549 return 0, fmt.Errorf("failed to save index: %w", err)
550 }
551
552 if m.config.Verbose {
553 m.logger.Printf("DEBUG: Index saved, last bundle = %d", m.index.GetLastBundle().BundleNumber)
554 }
555
556 saveDuration := time.Since(totalStart)
557
558 // Clean up old mempool
559 oldMempoolFile := m.mempool.GetFilename()
560 if err := m.mempool.Delete(); err != nil && !quiet {
561 m.logger.Printf("Warning: failed to delete old mempool %s: %v", oldMempoolFile, err)
562 }
563
564 // Create new mempool
565 nextBundle := bundle.BundleNumber + 1
566 minTimestamp := bundle.EndTime
567
568 newMempool, err := mempool.NewMempool(m.config.BundleDir, nextBundle, minTimestamp, m.logger, m.config.Verbose)
569 if err != nil {
570 return 0, fmt.Errorf("failed to create new mempool: %w", err)
571 }
572
573 m.mempool = newMempool
574
575 // DID index update (if enabled)
576 var indexUpdateDuration time.Duration
577 if !skipDIDIndex && m.didIndex != nil && m.didIndex.Exists() {
578 indexUpdateStart := time.Now()
579 if err := m.updateDIDIndexForBundle(ctx, bundle); err != nil {
580 m.logger.Printf("Warning: failed to update DID index: %v", err)
581 } else {
582 indexUpdateDuration = time.Since(indexUpdateStart)
583 if !quiet && m.config.Verbose {
584 m.logger.Printf(" [DID Index] Updated in %s", indexUpdateDuration)
585 }
586 }
587 }
588
589 if !quiet {
590 msg := fmt.Sprintf("→ Bundle %06d | %s | fetch: %s (%d reqs)",
591 bundle.BundleNumber,
592 bundle.Hash[0:7],
593 stats.TotalDuration.Round(time.Millisecond),
594 stats.TotalFetches,
595 )
596 if indexUpdateDuration > 0 {
597 msg += fmt.Sprintf(" | index: %s", indexUpdateDuration.Round(time.Millisecond))
598 }
599 msg += fmt.Sprintf(" | %s", formatTimeDistance(time.Since(bundle.EndTime)))
600 m.logger.Println(msg)
601 }
602
603 if m.config.Verbose {
604 m.logger.Printf("DEBUG: Bundle done = %d, finish duration = %s",
605 m.index.GetLastBundle().BundleNumber,
606 saveDuration.Round(time.Millisecond))
607 }
608
609 return indexUpdateDuration, nil
610}
611
612// GetMempoolStats returns mempool statistics
613func (m *Manager) GetMempoolStats() map[string]interface{} {
614 return m.mempool.Stats()
615}
616
617// GetMempoolOperations returns all operations currently in mempool
618func (m *Manager) GetMempoolOperations() ([]plcclient.PLCOperation, error) {
619 if m.mempool == nil {
620 return nil, fmt.Errorf("mempool not initialized")
621 }
622
623 count := m.mempool.Count()
624 if count == 0 {
625 return []plcclient.PLCOperation{}, nil
626 }
627
628 return m.mempool.Peek(count), nil
629}
630
631// Add to Bundle type to implement BundleData interface
632func (b *Bundle) GetBundleNumber() int {
633 return b.BundleNumber
634}
635
636// VerifyBundle verifies a bundle's integrity
637func (m *Manager) VerifyBundle(ctx context.Context, bundleNumber int) (*VerificationResult, error) {
638 result := &VerificationResult{
639 BundleNumber: bundleNumber,
640 }
641
642 // Get from index
643 meta, err := m.index.GetBundle(bundleNumber)
644 if err != nil {
645 result.Error = err
646 return result, nil
647 }
648
649 result.ExpectedHash = meta.CompressedHash
650
651 // Check file exists
652 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", bundleNumber))
653 result.FileExists = m.operations.FileExists(path)
654 if !result.FileExists {
655 result.Error = fmt.Errorf("file not found")
656 return result, nil
657 }
658
659 // Verify BOTH compressed and content hashes
660 compHash, compSize, contentHash, contentSize, err := m.operations.CalculateFileHashes(path)
661 if err != nil {
662 result.Error = err
663 return result, nil
664 }
665
666 result.LocalHash = compHash
667
668 // Verify compressed hash
669 if compHash != meta.CompressedHash {
670 result.HashMatch = false
671 result.Valid = false
672 result.Error = fmt.Errorf("compressed hash mismatch: expected %s, got %s", meta.CompressedHash, compHash)
673 return result, nil
674 }
675
676 // Verify content hash
677 if contentHash != meta.ContentHash {
678 result.HashMatch = false
679 result.Valid = false
680 result.Error = fmt.Errorf("content hash mismatch: expected %s, got %s", meta.ContentHash, contentHash)
681 return result, nil
682 }
683
684 // Verify sizes match
685 if compSize != meta.CompressedSize {
686 result.Valid = false
687 result.Error = fmt.Errorf("compressed size mismatch: expected %d, got %d", meta.CompressedSize, compSize)
688 return result, nil
689 }
690
691 if contentSize != meta.UncompressedSize {
692 result.Valid = false
693 result.Error = fmt.Errorf("uncompressed size mismatch: expected %d, got %d", meta.UncompressedSize, contentSize)
694 return result, nil
695 }
696
697 result.HashMatch = true
698 result.Valid = true
699
700 return result, nil
701}
702
703// GetInfo returns information about the bundle manager
704func (m *Manager) GetInfo() map[string]interface{} {
705 stats := m.index.GetStats()
706 stats["bundle_dir"] = m.config.BundleDir
707 stats["index_path"] = m.indexPath
708 stats["verify_on_load"] = m.config.VerifyOnLoad
709 return stats
710}
711
712// ExportOperations exports operations from bundles
713func (m *Manager) ExportOperations(ctx context.Context, afterTime time.Time, count int) ([]plcclient.PLCOperation, error) {
714 if count <= 0 {
715 count = 1000
716 }
717
718 var result []plcclient.PLCOperation
719 seenCIDs := make(map[string]bool)
720
721 bundles := m.index.GetBundles()
722
723 for _, meta := range bundles {
724 if result != nil && len(result) >= count {
725 break
726 }
727
728 // Skip bundles before afterTime
729 if !afterTime.IsZero() && meta.EndTime.Before(afterTime) {
730 continue
731 }
732
733 // Load bundle
734 bundle, err := m.LoadBundle(ctx, meta.BundleNumber)
735 if err != nil {
736 m.logger.Printf("Warning: failed to load bundle %d: %v", meta.BundleNumber, err)
737 continue
738 }
739
740 // Add operations
741 for _, op := range bundle.Operations {
742 if !afterTime.IsZero() && op.CreatedAt.Before(afterTime) {
743 continue
744 }
745
746 if seenCIDs[op.CID] {
747 continue
748 }
749
750 seenCIDs[op.CID] = true
751 result = append(result, op)
752
753 if len(result) >= count {
754 break
755 }
756 }
757 }
758
759 return result, nil
760}
761
762// IsBundleIndexed checks if a bundle is already in the index
763func (m *Manager) IsBundleIndexed(bundleNumber int) bool {
764 _, err := m.index.GetBundle(bundleNumber)
765 return err == nil
766}
767
768// RefreshMempool reloads mempool from disk
769func (m *Manager) RefreshMempool() error {
770 if m.mempool == nil {
771 return fmt.Errorf("mempool not initialized")
772 }
773 return m.mempool.Load()
774}
775
776// ClearMempool clears all operations from the mempool and saves
777func (m *Manager) ClearMempool() error {
778 if m.mempool == nil {
779 return fmt.Errorf("mempool not initialized")
780 }
781
782 m.logger.Printf("Clearing mempool...")
783
784 count := m.mempool.Count()
785
786 m.mempool.Clear()
787
788 if err := m.mempool.Save(); err != nil {
789 return fmt.Errorf("failed to save mempool: %w", err)
790 }
791
792 m.logger.Printf("Cleared %d operations from mempool", count)
793
794 return nil
795}
796
797// ValidateMempool validates mempool
798func (m *Manager) ValidateMempool() error {
799 if m.mempool == nil {
800 return fmt.Errorf("mempool not initialized")
801 }
802 return m.mempool.Validate()
803}
804
805// StreamBundleRaw streams the raw compressed bundle file
806func (m *Manager) StreamBundleRaw(ctx context.Context, bundleNumber int) (io.ReadCloser, error) {
807 // Get metadata from index
808 meta, err := m.index.GetBundle(bundleNumber)
809 if err != nil {
810 return nil, fmt.Errorf("bundle not in index: %w", err)
811 }
812
813 // Build file path
814 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", bundleNumber))
815 if !m.operations.FileExists(path) {
816 return nil, fmt.Errorf("bundle file not found: %s", path)
817 }
818
819 // Optionally verify hash before streaming
820 if m.config.VerifyOnLoad {
821 valid, actualHash, err := m.operations.VerifyHash(path, meta.CompressedHash)
822 if err != nil {
823 return nil, fmt.Errorf("failed to verify hash: %w", err)
824 }
825 if !valid {
826 return nil, fmt.Errorf("hash mismatch: expected %s, got %s", meta.CompressedHash, actualHash)
827 }
828 }
829
830 return m.operations.StreamRaw(path)
831}
832
833// StreamBundleDecompressed streams the decompressed bundle data as JSONL
834func (m *Manager) StreamBundleDecompressed(ctx context.Context, bundleNumber int) (io.ReadCloser, error) {
835 // Get metadata from index
836 _, err := m.index.GetBundle(bundleNumber)
837 if err != nil {
838 return nil, fmt.Errorf("bundle not in index: %w", err)
839 }
840
841 // Build file path
842 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", bundleNumber))
843 if !m.operations.FileExists(path) {
844 return nil, fmt.Errorf("bundle file not found: %s", path)
845 }
846
847 return m.operations.StreamDecompressed(path)
848}
849
850// RefreshIndex reloads the index from disk if it has been modified
851func (m *Manager) RefreshIndex() error {
852 // Check if index file has been modified
853 info, err := os.Stat(m.indexPath)
854 if err != nil {
855 return err
856 }
857
858 // If index was modified after we loaded it, reload
859 if info.ModTime().After(m.index.UpdatedAt) {
860 m.logger.Printf("Index file modified, reloading...")
861
862 newIndex, err := bundleindex.LoadIndex(m.indexPath)
863 if err != nil {
864 return fmt.Errorf("failed to reload index: %w", err)
865 }
866
867 m.index = newIndex
868 m.logger.Printf("Index reloaded: %d bundles", m.index.Count())
869 }
870
871 return nil
872}
873
874// GetMempool returns the current mempool
875func (m *Manager) GetMempool() *mempool.Mempool {
876 return m.mempool
877}
878
879// SaveMempool saves the current mempool state to disk
880func (m *Manager) SaveMempool() error {
881 if m.mempool == nil {
882 return fmt.Errorf("mempool not initialized")
883 }
884 return m.mempool.Save()
885}
886
887// GetPLCOrigin returns the PLC directory origin URL
888func (m *Manager) GetPLCOrigin() string {
889 if m.plcClient == nil {
890 return ""
891 }
892 return m.plcClient.GetBaseURL()
893}
894
895// GetCurrentCursor returns the current latest cursor position (including mempool)
896func (m *Manager) GetCurrentCursor() int {
897 index := m.GetIndex()
898 bundles := index.GetBundles()
899 cursor := len(bundles) * types.BUNDLE_SIZE
900
901 // Add mempool operations
902 mempoolStats := m.GetMempoolStats()
903 if count, ok := mempoolStats["count"].(int); ok {
904 cursor += count
905 }
906
907 return cursor
908}
909
910// LoadOperation loads a single operation from a bundle efficiently
911func (m *Manager) LoadOperation(ctx context.Context, bundleNumber int, position int) (*plcclient.PLCOperation, error) {
912 // Validate position
913 if position < 0 || position >= types.BUNDLE_SIZE {
914 return nil, fmt.Errorf("invalid position: %d (must be 0-%d)", position, types.BUNDLE_SIZE-1)
915 }
916
917 // Validate bundle exists in index
918 _, err := m.index.GetBundle(bundleNumber)
919 if err != nil {
920 return nil, fmt.Errorf("bundle not in index: %w", err)
921 }
922
923 // Build file path
924 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", bundleNumber))
925 if !m.operations.FileExists(path) {
926 return nil, fmt.Errorf("bundle file not found: %s", path)
927 }
928
929 // Load just the one operation (optimized - decompresses only until position)
930 return m.operations.LoadOperationAtPosition(path, position)
931}
932
933// LoadOperations loads multiple operations from a bundle efficiently
934func (m *Manager) LoadOperations(ctx context.Context, bundleNumber int, positions []int) (map[int]*plcclient.PLCOperation, error) {
935 // Build file path
936 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", bundleNumber))
937 if !m.operations.FileExists(path) {
938 return nil, fmt.Errorf("bundle file not found: %s", path)
939 }
940
941 return m.operations.LoadOperationsAtPositions(path, positions)
942}
943
944// filterBundleFiles filters out files starting with . or _
945func filterBundleFiles(files []string) []string {
946 filtered := make([]string, 0, len(files))
947 for _, file := range files {
948 basename := filepath.Base(file)
949 if len(basename) > 0 && (basename[0] == '.' || basename[0] == '_') {
950 continue
951 }
952 filtered = append(filtered, file)
953 }
954 return filtered
955}
956
957// ==========================================
958// DID INDEX INTEGRATION (adapter methods)
959// ==========================================
960
961// Implement BundleProvider interface for didindex
962func (m *Manager) LoadBundleForDIDIndex(ctx context.Context, bundleNumber int) (*didindex.BundleData, error) {
963 bundle, err := m.LoadBundle(ctx, bundleNumber)
964 if err != nil {
965 return nil, err
966 }
967
968 return &didindex.BundleData{
969 BundleNumber: bundle.BundleNumber,
970 Operations: bundle.Operations,
971 }, nil
972}
973
974func (m *Manager) GetBundleIndex() didindex.BundleIndexProvider {
975 return &bundleIndexAdapter{index: m.index}
976}
977
978// bundleIndexAdapter adapts Index to BundleIndexProvider interface
979type bundleIndexAdapter struct {
980 index *bundleindex.Index
981}
982
983func (a *bundleIndexAdapter) GetBundles() []*didindex.BundleMetadata {
984 bundles := a.index.GetBundles()
985 result := make([]*didindex.BundleMetadata, len(bundles))
986 for i, b := range bundles {
987 result[i] = &didindex.BundleMetadata{
988 BundleNumber: b.BundleNumber,
989 StartTime: b.StartTime,
990 EndTime: b.EndTime,
991 }
992 }
993 return result
994}
995
996func (a *bundleIndexAdapter) GetBundle(bundleNumber int) (*didindex.BundleMetadata, error) {
997 meta, err := a.index.GetBundle(bundleNumber)
998 if err != nil {
999 return nil, err
1000 }
1001 return &didindex.BundleMetadata{
1002 BundleNumber: meta.BundleNumber,
1003 StartTime: meta.StartTime,
1004 EndTime: meta.EndTime,
1005 }, nil
1006}
1007
1008func (a *bundleIndexAdapter) GetLastBundle() *didindex.BundleMetadata {
1009 meta := a.index.GetLastBundle()
1010 if meta == nil {
1011 return nil
1012 }
1013 return &didindex.BundleMetadata{
1014 BundleNumber: meta.BundleNumber,
1015 StartTime: meta.StartTime,
1016 EndTime: meta.EndTime,
1017 }
1018}
1019
1020// GetDIDIndex returns the DID index manager
1021func (m *Manager) GetDIDIndex() *didindex.Manager {
1022 return m.didIndex
1023}
1024
1025// BuildDIDIndex builds the complete DID index
1026func (m *Manager) BuildDIDIndex(ctx context.Context, progressCallback func(current, total int)) error {
1027 if m.didIndex == nil {
1028 m.didIndex = didindex.NewManager(m.config.BundleDir, m.logger)
1029 }
1030
1031 return m.didIndex.BuildIndexFromScratch(ctx, m, progressCallback)
1032}
1033
1034// updateDIDIndexForBundle updates index when a new bundle is added
1035func (m *Manager) updateDIDIndexForBundle(ctx context.Context, bundle *Bundle) error {
1036 if m.didIndex == nil {
1037 return nil
1038 }
1039
1040 // Convert to didindex.BundleData
1041 bundleData := &didindex.BundleData{
1042 BundleNumber: bundle.BundleNumber,
1043 Operations: bundle.Operations,
1044 }
1045
1046 return m.didIndex.UpdateIndexForBundle(ctx, bundleData)
1047}
1048
1049// GetDIDIndexStats returns DID index statistics
1050func (m *Manager) GetDIDIndexStats() map[string]interface{} {
1051 if m.didIndex == nil {
1052 return map[string]interface{}{
1053 "enabled": false,
1054 }
1055 }
1056
1057 stats := m.didIndex.GetStats()
1058 stats["enabled"] = true
1059 stats["exists"] = m.didIndex.Exists()
1060
1061 indexedDIDs := stats["total_dids"].(int64)
1062
1063 // Get unique DIDs from mempool
1064 mempoolDIDCount := int64(0)
1065 if m.mempool != nil {
1066 mempoolStats := m.GetMempoolStats()
1067 if didCount, ok := mempoolStats["did_count"].(int); ok {
1068 mempoolDIDCount = int64(didCount)
1069 }
1070 }
1071
1072 stats["indexed_dids"] = indexedDIDs
1073 stats["mempool_dids"] = mempoolDIDCount
1074 stats["total_dids"] = indexedDIDs + mempoolDIDCount
1075
1076 return stats
1077}
1078
1079// GetDIDOperations retrieves all operations for a DID (bundles + mempool combined)
1080// Returns: operations only, operations with locations, error
1081func (m *Manager) GetDIDOperations(ctx context.Context, did string, verbose bool) ([]plcclient.PLCOperation, []PLCOperationWithLocation, error) {
1082 if err := plcclient.ValidateDIDFormat(did); err != nil {
1083 return nil, nil, err
1084 }
1085
1086 // Set verbose mode
1087 if m.didIndex != nil {
1088 m.didIndex.SetVerbose(verbose)
1089 }
1090
1091 // Get bundled operations from DID index (includes nullified)
1092 bundledOpsWithLoc, err := m.didIndex.GetDIDOperations(ctx, did, m)
1093 if err != nil {
1094 return nil, nil, err
1095 }
1096
1097 // Convert to bundle types
1098 opsWithLoc := make([]PLCOperationWithLocation, len(bundledOpsWithLoc))
1099 bundledOps := make([]plcclient.PLCOperation, len(bundledOpsWithLoc))
1100 for i, r := range bundledOpsWithLoc {
1101 opsWithLoc[i] = PLCOperationWithLocation{
1102 Operation: r.Operation,
1103 Bundle: r.Bundle,
1104 Position: r.Position,
1105 }
1106 bundledOps[i] = r.Operation
1107 }
1108
1109 // Get mempool operations
1110 mempoolOps, err := m.GetDIDOperationsFromMempool(did)
1111 if err != nil {
1112 return nil, nil, err
1113 }
1114
1115 if len(mempoolOps) > 0 && verbose {
1116 m.logger.Printf("DEBUG: Found %d operations in mempool", len(mempoolOps))
1117 }
1118
1119 // Combine operations (for the slice return)
1120 allOps := append(bundledOps, mempoolOps...)
1121
1122 sort.Slice(allOps, func(i, j int) bool {
1123 return allOps[i].CreatedAt.Before(allOps[j].CreatedAt)
1124 })
1125
1126 return allOps, opsWithLoc, nil
1127}
1128
1129// GetDIDOperationsFromMempool retrieves operations for a DID from mempool only
1130func (m *Manager) GetDIDOperationsFromMempool(did string) ([]plcclient.PLCOperation, error) {
1131 if m.mempool == nil {
1132 return []plcclient.PLCOperation{}, nil
1133 }
1134
1135 // Use direct search - only copies matching operations
1136 return m.mempool.FindDIDOperations(did), nil
1137}
1138
1139// GetLatestDIDOperation returns only the most recent non-nullified operation
1140func (m *Manager) GetLatestDIDOperation(ctx context.Context, did string) (*plcclient.PLCOperation, error) {
1141 if err := plcclient.ValidateDIDFormat(did); err != nil {
1142 return nil, err
1143 }
1144
1145 // Check mempool first (most recent data)
1146 mempoolOps, _ := m.GetDIDOperationsFromMempool(did)
1147 if len(mempoolOps) > 0 {
1148 for i := len(mempoolOps) - 1; i >= 0; i-- {
1149 if !mempoolOps[i].IsNullified() {
1150 return &mempoolOps[i], nil
1151 }
1152 }
1153 }
1154
1155 // Delegate to DID index for bundled operations
1156 return m.didIndex.GetLatestDIDOperation(ctx, did, m)
1157}
1158
1159// VerifyChain verifies the entire bundle chain
1160func (m *Manager) VerifyChain(ctx context.Context) (*ChainVerificationResult, error) {
1161 result := &ChainVerificationResult{
1162 VerifiedBundles: make([]int, 0),
1163 }
1164
1165 bundles := m.index.GetBundles()
1166 if len(bundles) == 0 {
1167 result.Valid = true
1168 return result, nil
1169 }
1170
1171 result.ChainLength = len(bundles)
1172
1173 for i, meta := range bundles {
1174 // Verify file hash
1175 vr, err := m.VerifyBundle(ctx, meta.BundleNumber)
1176 if err != nil || !vr.Valid {
1177 result.Error = fmt.Sprintf("Bundle %d hash verification failed", meta.BundleNumber)
1178 result.BrokenAt = meta.BundleNumber
1179 return result, nil
1180 }
1181
1182 // Verify chain link
1183 if i > 0 {
1184 prevMeta := bundles[i-1]
1185
1186 // Check parent reference
1187 if meta.Parent != prevMeta.Hash {
1188 result.Error = fmt.Sprintf("Chain broken at bundle %d: parent mismatch", meta.BundleNumber)
1189 result.BrokenAt = meta.BundleNumber
1190 return result, nil
1191 }
1192
1193 // Verify chain hash calculation
1194 expectedHash := m.operations.CalculateChainHash(prevMeta.Hash, meta.ContentHash)
1195 if meta.Hash != expectedHash {
1196 result.Error = fmt.Sprintf("Chain broken at bundle %d: hash mismatch", meta.BundleNumber)
1197 result.BrokenAt = meta.BundleNumber
1198 return result, nil
1199 }
1200 }
1201
1202 result.VerifiedBundles = append(result.VerifiedBundles, meta.BundleNumber)
1203 }
1204
1205 result.Valid = true
1206 return result, nil
1207}
1208
1209// FetchNextBundle fetches operations and creates a bundle, looping until caught up
1210func (m *Manager) FetchNextBundle(ctx context.Context, verbose bool, quiet bool) (*Bundle, types.BundleProductionStats, error) {
1211 if m.plcClient == nil {
1212 return nil, types.BundleProductionStats{}, fmt.Errorf("PLC client not configured")
1213 }
1214
1215 lastBundle := m.index.GetLastBundle()
1216 nextBundleNum := 1
1217 var afterTime string
1218 var prevBoundaryCIDs map[string]bool
1219 var prevBundleHash string
1220
1221 if lastBundle != nil {
1222 nextBundleNum = lastBundle.BundleNumber + 1
1223 afterTime = lastBundle.EndTime.Format(time.RFC3339Nano)
1224 prevBundleHash = lastBundle.Hash
1225
1226 // ALWAYS get boundaries from last bundle initially
1227 prevBundle, err := m.LoadBundle(ctx, lastBundle.BundleNumber)
1228 if err == nil {
1229 _, prevBoundaryCIDs = m.operations.GetBoundaryCIDs(prevBundle.Operations)
1230 if verbose {
1231 m.logger.Printf("Loaded %d boundary CIDs from bundle %06d (at %s)",
1232 len(prevBoundaryCIDs), lastBundle.BundleNumber,
1233 lastBundle.EndTime.Format(time.RFC3339)[:19])
1234 }
1235 }
1236 }
1237
1238 // If mempool has operations, update cursor but KEEP boundaries from bundle
1239 // (mempool operations already had boundary dedup applied when they were added)
1240 if m.mempool.Count() > 0 {
1241 mempoolLastTime := m.mempool.GetLastTime()
1242 if mempoolLastTime != "" {
1243 if verbose {
1244 m.logger.Printf("[DEBUG] Mempool has %d ops, resuming from %s",
1245 m.mempool.Count(), mempoolLastTime[:19])
1246 }
1247 afterTime = mempoolLastTime
1248
1249 // Calculate boundaries from MEMPOOL for next fetch
1250 mempoolOps, _ := m.GetMempoolOperations()
1251 if len(mempoolOps) > 0 {
1252 _, mempoolBoundaries := m.operations.GetBoundaryCIDs(mempoolOps)
1253 prevBoundaryCIDs = mempoolBoundaries
1254 if verbose {
1255 m.logger.Printf("Using %d boundary CIDs from mempool", len(prevBoundaryCIDs))
1256 }
1257 }
1258 }
1259 }
1260
1261 if verbose {
1262 m.logger.Printf("[DEBUG] Preparing bundle %06d (mempool: %d ops)...", nextBundleNum, m.mempool.Count())
1263 m.logger.Printf("[DEBUG] Starting cursor: %s", afterTime)
1264 }
1265
1266 totalFetches := 0
1267 maxAttempts := 50
1268 attempt := 0
1269 caughtUp := false
1270 attemptStart := time.Now()
1271
1272 for m.mempool.Count() < types.BUNDLE_SIZE && attempt < maxAttempts {
1273 attempt++
1274 needed := types.BUNDLE_SIZE - m.mempool.Count()
1275
1276 if !quiet && attempt > 1 {
1277 m.logger.Printf(" Attempt %d: Need %d more ops, cursor: %s",
1278 attempt, needed, afterTime[:19])
1279 }
1280
1281 newOps, fetchCount, err := m.syncer.FetchToMempool(
1282 ctx,
1283 afterTime,
1284 prevBoundaryCIDs,
1285 needed,
1286 !verbose,
1287 m.mempool,
1288 totalFetches,
1289 )
1290
1291 totalFetches += fetchCount
1292
1293 // Check if we got an incomplete batch
1294 gotIncompleteBatch := len(newOps) > 0 && len(newOps) < needed && err == nil
1295
1296 // Update cursor from mempool if we got new ops
1297 if len(newOps) > 0 && m.mempool.Count() > 0 {
1298 afterTime = m.mempool.GetLastTime()
1299 }
1300
1301 // Stop if caught up or error
1302 if err != nil || len(newOps) == 0 || gotIncompleteBatch {
1303 caughtUp = true
1304 if verbose && totalFetches > 0 {
1305 m.logger.Printf("DEBUG: Caught up to latest PLC data")
1306 }
1307 break
1308 }
1309
1310 if m.mempool.Count() >= types.BUNDLE_SIZE {
1311 break
1312 }
1313 }
1314
1315 totalDuration := time.Since(attemptStart)
1316
1317 if m.mempool.Count() < types.BUNDLE_SIZE {
1318 if caughtUp {
1319 return nil, types.BundleProductionStats{}, fmt.Errorf("insufficient operations: have %d, need %d (caught up to latest PLC data)",
1320 m.mempool.Count(), types.BUNDLE_SIZE)
1321 } else {
1322 return nil, types.BundleProductionStats{}, fmt.Errorf("insufficient operations: have %d, need %d (max attempts reached)",
1323 m.mempool.Count(), types.BUNDLE_SIZE)
1324 }
1325 }
1326
1327 // Create bundle
1328 operations, err := m.mempool.Take(types.BUNDLE_SIZE)
1329 if err != nil {
1330 return nil, types.BundleProductionStats{}, err
1331 }
1332
1333 syncBundle := internalsync.CreateBundle(nextBundleNum, operations, afterTime, prevBundleHash, m.operations)
1334
1335 bundle := &Bundle{
1336 BundleNumber: syncBundle.BundleNumber,
1337 StartTime: syncBundle.StartTime,
1338 EndTime: syncBundle.EndTime,
1339 Operations: syncBundle.Operations,
1340 DIDCount: syncBundle.DIDCount,
1341 Cursor: syncBundle.Cursor,
1342 Parent: syncBundle.Parent,
1343 BoundaryCIDs: syncBundle.BoundaryCIDs,
1344 Compressed: syncBundle.Compressed,
1345 CreatedAt: syncBundle.CreatedAt,
1346 }
1347
1348 stats := types.BundleProductionStats{
1349 TotalFetches: totalFetches,
1350 TotalDuration: totalDuration,
1351 AvgPerFetch: float64(types.BUNDLE_SIZE) / float64(totalFetches),
1352 Throughput: float64(types.BUNDLE_SIZE) / totalDuration.Seconds(),
1353 }
1354
1355 return bundle, stats, nil
1356}
1357
1358// CloneFromRemote clones bundles from a remote endpoint
1359func (m *Manager) CloneFromRemote(ctx context.Context, opts internalsync.CloneOptions) (*internalsync.CloneResult, error) {
1360 // Define index update callback inline
1361 updateIndexCallback := func(bundleNumbers []int, remoteMeta map[int]*bundleindex.BundleMetadata, verbose bool) error {
1362 if len(bundleNumbers) == 0 {
1363 return nil
1364 }
1365
1366 // Create file existence checker
1367 fileExists := func(bundleNum int) bool {
1368 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", bundleNum))
1369 return m.operations.FileExists(path)
1370 }
1371
1372 // Update index with remote metadata
1373 if err := m.index.UpdateFromRemote(bundleNumbers, remoteMeta, fileExists, verbose, m.logger); err != nil {
1374 return err
1375 }
1376
1377 // Save index
1378 return m.SaveIndex()
1379 }
1380
1381 // Delegate to cloner with inline callback
1382 return m.cloner.Clone(ctx, opts, m.index, updateIndexCallback)
1383}
1384
1385// ResolveDID resolves a DID to its current document with detailed timing metrics
1386func (m *Manager) ResolveDID(ctx context.Context, did string) (*ResolveDIDResult, error) {
1387 if err := plcclient.ValidateDIDFormat(did); err != nil {
1388 atomic.AddInt64(&m.resolverStats.errors, 1)
1389 return nil, err
1390 }
1391
1392 result := &ResolveDIDResult{}
1393 totalStart := time.Now()
1394
1395 // STEP 1: Check mempool first
1396 mempoolStart := time.Now()
1397 var latestMempoolOp *plcclient.PLCOperation
1398 if m.mempool != nil {
1399 latestMempoolOp = m.mempool.FindLatestDIDOperation(did)
1400 }
1401 result.MempoolTime = time.Since(mempoolStart)
1402
1403 // Early return if found in mempool
1404 if latestMempoolOp != nil {
1405 doc, err := plcclient.ResolveDIDDocument(did, []plcclient.PLCOperation{*latestMempoolOp})
1406 if err != nil {
1407 atomic.AddInt64(&m.resolverStats.errors, 1)
1408 return nil, fmt.Errorf("resolution failed: %w", err)
1409 }
1410
1411 result.Document = doc
1412 result.LatestOperation = latestMempoolOp
1413 result.Source = "mempool"
1414 result.TotalTime = time.Since(totalStart)
1415
1416 m.recordResolverTiming(result, nil)
1417 return result, nil
1418 }
1419
1420 // STEP 2: Index lookup
1421 if m.didIndex == nil || !m.didIndex.Exists() {
1422 atomic.AddInt64(&m.resolverStats.errors, 1)
1423 return nil, fmt.Errorf("DID index not available - run 'plcbundle index build' to enable DID resolution")
1424 }
1425
1426 indexStart := time.Now()
1427 locations, err := m.didIndex.GetDIDLocations(did)
1428 result.IndexTime = time.Since(indexStart)
1429
1430 if err != nil {
1431 atomic.AddInt64(&m.resolverStats.errors, 1)
1432 return nil, err
1433 }
1434
1435 if len(locations) == 0 {
1436 atomic.AddInt64(&m.resolverStats.errors, 1)
1437 return nil, fmt.Errorf("DID not found")
1438 }
1439
1440 // Find latest non-nullified location
1441 var latestLoc *didindex.OpLocation
1442 for i := range locations {
1443 if locations[i].Nullified() {
1444 continue
1445 }
1446 if latestLoc == nil || locations[i].IsAfter(*latestLoc) {
1447 latestLoc = &locations[i]
1448 }
1449 }
1450
1451 if latestLoc == nil {
1452 atomic.AddInt64(&m.resolverStats.errors, 1)
1453 return nil, fmt.Errorf("no valid operations (all nullified)")
1454 }
1455
1456 // STEP 3: Load operation
1457 opStart := time.Now()
1458 op, err := m.LoadOperation(ctx, latestLoc.BundleInt(), latestLoc.PositionInt())
1459 result.LoadOpTime = time.Since(opStart)
1460
1461 if err != nil {
1462 atomic.AddInt64(&m.resolverStats.errors, 1)
1463 return nil, fmt.Errorf("failed to load operation: %w", err)
1464 }
1465
1466 result.BundleNumber = latestLoc.BundleInt()
1467 result.Position = latestLoc.PositionInt()
1468
1469 // STEP 4: Resolve document
1470 doc, err := plcclient.ResolveDIDDocument(did, []plcclient.PLCOperation{*op})
1471 if err != nil {
1472 atomic.AddInt64(&m.resolverStats.errors, 1)
1473 return nil, fmt.Errorf("resolution failed: %w", err)
1474 }
1475
1476 result.Document = doc
1477 result.LatestOperation = op
1478 result.Source = "bundle"
1479 result.TotalTime = time.Since(totalStart)
1480
1481 m.recordResolverTiming(result, nil)
1482 return result, nil
1483}
1484
1485// GetLastBundleNumber returns the last bundle number (0 if no bundles)
1486func (m *Manager) GetLastBundleNumber() int {
1487 lastBundle := m.index.GetLastBundle()
1488 if lastBundle == nil {
1489 return 0
1490 }
1491 return lastBundle.BundleNumber
1492}
1493
1494// GetMempoolCount returns the number of operations in mempool
1495func (m *Manager) GetMempoolCount() int {
1496 return m.mempool.Count()
1497}
1498
1499func (m *Manager) FetchAndSaveNextBundle(ctx context.Context, verbose bool, quiet bool, skipDIDIndex bool) (int, *types.BundleProductionStats, error) {
1500 bundle, stats, err := m.FetchNextBundle(ctx, verbose, quiet)
1501 if err != nil {
1502 return 0, nil, err
1503 }
1504
1505 indexTime, err := m.SaveBundle(ctx, bundle, verbose, quiet, stats, skipDIDIndex)
1506 if err != nil {
1507 return 0, nil, err
1508 }
1509 stats.IndexTime = indexTime
1510
1511 return bundle.BundleNumber, &types.BundleProductionStats{}, nil
1512}
1513
1514// RunSyncLoop runs continuous sync loop (delegates to internal/sync)
1515func (m *Manager) RunSyncLoop(ctx context.Context, config *internalsync.SyncLoopConfig) error {
1516 // Manager itself implements the SyncManager interface
1517 return internalsync.RunSyncLoop(ctx, m, config)
1518}
1519
1520// RunSyncOnce performs a single sync cycle
1521func (m *Manager) RunSyncOnce(ctx context.Context, config *internalsync.SyncLoopConfig) (int, error) {
1522 // Manager itself implements the SyncManager interface
1523 return internalsync.SyncOnce(ctx, m, config)
1524}
1525
1526// EnsureDIDIndex ensures DID index is built and up-to-date
1527// Returns true if index was built/rebuilt, false if already up-to-date
1528func (m *Manager) EnsureDIDIndex(ctx context.Context, progressCallback func(current, total int)) (bool, error) {
1529 // Build index
1530 m.UpdateDIDIndexSmart(ctx, progressCallback)
1531 return true, nil
1532}
1533
1534// Add this helper function at the top of manager.go
1535func repositoryExists(bundleDir string) bool {
1536 indexPath := filepath.Join(bundleDir, bundleindex.INDEX_FILE)
1537
1538 // Check for index file
1539 if _, err := os.Stat(indexPath); err == nil {
1540 return true
1541 }
1542
1543 // Check for bundle files
1544 bundleFiles, _ := filepath.Glob(filepath.Join(bundleDir, "*.jsonl.zst"))
1545 bundleFiles = filterBundleFiles(bundleFiles)
1546
1547 return len(bundleFiles) > 0
1548}
1549
1550// ResolveHandleOrDID resolves input that can be either a handle or DID
1551// Returns: (did, handleResolveTime, error)
1552func (m *Manager) ResolveHandleOrDID(ctx context.Context, input string) (string, time.Duration, error) {
1553 input = strings.TrimSpace(input)
1554
1555 // Normalize handle format (remove at://, @ prefixes)
1556 if !strings.HasPrefix(input, "did:") {
1557 input = strings.TrimPrefix(input, "at://")
1558 input = strings.TrimPrefix(input, "@")
1559 }
1560
1561 // If already a DID, validate and return
1562 if strings.HasPrefix(input, "did:plc:") {
1563 if err := plcclient.ValidateDIDFormat(input); err != nil {
1564 return "", 0, err
1565 }
1566 return input, 0, nil // No resolution needed
1567 }
1568
1569 // Support did:web too
1570 if strings.HasPrefix(input, "did:web:") {
1571 return input, 0, nil
1572 }
1573
1574 // It's a handle - need resolver
1575 if m.handleResolver == nil {
1576 return "", 0, fmt.Errorf(
1577 "input '%s' appears to be a handle, but handle resolver is not configured\n\n"+
1578 "Configure resolver with:\n"+
1579 " plcbundle --handle-resolver https://quickdid.smokesignal.tools did resolve %s\n\n"+
1580 "Or set default in config",
1581 input, input)
1582 }
1583
1584 resolveStart := time.Now()
1585 if !m.config.Quiet {
1586 m.logger.Printf("Resolving handle: %s", input)
1587 }
1588 did, err := m.handleResolver.ResolveHandle(ctx, input)
1589 resolveTime := time.Since(resolveStart)
1590
1591 if err != nil {
1592 return "", resolveTime, fmt.Errorf("failed to resolve handle '%s': %w", input, err)
1593 }
1594
1595 if !m.config.Quiet {
1596 m.logger.Printf("Resolved: %s → %s (in %s)", input, did, resolveTime)
1597 }
1598 return did, resolveTime, nil
1599}
1600
1601// GetHandleResolver returns the handle resolver (can be nil)
1602func (m *Manager) GetHandleResolver() *handleresolver.Client {
1603 return m.handleResolver
1604}
1605
1606// recordResolverTiming records resolver performance metrics
1607func (m *Manager) recordResolverTiming(result *ResolveDIDResult, _ error) {
1608 m.resolverStats.Lock()
1609 defer m.resolverStats.Unlock()
1610
1611 // Increment counters
1612 atomic.AddInt64(&m.resolverStats.totalResolutions, 1)
1613
1614 switch result.Source {
1615 case "mempool":
1616 atomic.AddInt64(&m.resolverStats.mempoolHits, 1)
1617 case "bundle":
1618 atomic.AddInt64(&m.resolverStats.bundleHits, 1)
1619 }
1620
1621 // Record timings
1622 timing := resolverTiming{
1623 totalTime: result.TotalTime.Microseconds(),
1624 mempoolTime: result.MempoolTime.Microseconds(),
1625 indexTime: result.IndexTime.Microseconds(),
1626 loadOpTime: result.LoadOpTime.Microseconds(),
1627 source: result.Source,
1628 }
1629
1630 atomic.AddInt64(&m.resolverStats.totalTime, timing.totalTime)
1631 atomic.AddInt64(&m.resolverStats.totalMempoolTime, timing.mempoolTime)
1632 atomic.AddInt64(&m.resolverStats.totalIndexTime, timing.indexTime)
1633 atomic.AddInt64(&m.resolverStats.totalLoadOpTime, timing.loadOpTime)
1634
1635 // Add to circular buffer
1636 m.resolverStats.recentTimes[m.resolverStats.recentIdx] = timing
1637 m.resolverStats.recentIdx = (m.resolverStats.recentIdx + 1) % m.resolverStats.recentSize
1638}
1639
1640// GetResolverStats returns resolver performance statistics
1641func (m *Manager) GetResolverStats() map[string]interface{} {
1642 totalResolutions := atomic.LoadInt64(&m.resolverStats.totalResolutions)
1643
1644 if totalResolutions == 0 {
1645 return map[string]interface{}{
1646 "total_resolutions": 0,
1647 }
1648 }
1649
1650 mempoolHits := atomic.LoadInt64(&m.resolverStats.mempoolHits)
1651 bundleHits := atomic.LoadInt64(&m.resolverStats.bundleHits)
1652 errors := atomic.LoadInt64(&m.resolverStats.errors)
1653
1654 totalTime := atomic.LoadInt64(&m.resolverStats.totalTime)
1655 totalMempoolTime := atomic.LoadInt64(&m.resolverStats.totalMempoolTime)
1656 totalIndexTime := atomic.LoadInt64(&m.resolverStats.totalIndexTime)
1657 totalLoadOpTime := atomic.LoadInt64(&m.resolverStats.totalLoadOpTime)
1658
1659 // Calculate overall averages
1660 avgTotalMs := float64(totalTime) / float64(totalResolutions) / 1000.0
1661 avgMempoolMs := float64(totalMempoolTime) / float64(totalResolutions) / 1000.0
1662
1663 stats := map[string]interface{}{
1664 "total_resolutions": totalResolutions,
1665 "mempool_hits": mempoolHits,
1666 "bundle_hits": bundleHits,
1667 "errors": errors,
1668 "success_rate": float64(totalResolutions-errors) / float64(totalResolutions),
1669 "mempool_hit_rate": float64(mempoolHits) / float64(totalResolutions),
1670
1671 // Overall averages
1672 "avg_total_time_ms": avgTotalMs,
1673 "avg_mempool_time_ms": avgMempoolMs,
1674 }
1675
1676 // Only include bundle-specific stats if we have bundle hits
1677 if bundleHits > 0 {
1678 avgIndexMs := float64(totalIndexTime) / float64(bundleHits) / 1000.0
1679 avgLoadMs := float64(totalLoadOpTime) / float64(bundleHits) / 1000.0
1680
1681 stats["avg_index_time_ms"] = avgIndexMs
1682 stats["avg_load_op_time_ms"] = avgLoadMs
1683 }
1684
1685 // Recent statistics
1686 m.resolverStats.Lock()
1687 recentCopy := make([]resolverTiming, m.resolverStats.recentSize)
1688 copy(recentCopy, m.resolverStats.recentTimes)
1689 m.resolverStats.Unlock()
1690
1691 // Filter valid entries
1692 validRecent := make([]resolverTiming, 0)
1693 for _, t := range recentCopy {
1694 if t.totalTime > 0 {
1695 validRecent = append(validRecent, t)
1696 }
1697 }
1698
1699 if len(validRecent) > 0 {
1700 // Extract total times for percentiles
1701 totalTimes := make([]int64, len(validRecent))
1702 for i, t := range validRecent {
1703 totalTimes[i] = t.totalTime
1704 }
1705 sort.Slice(totalTimes, func(i, j int) bool {
1706 return totalTimes[i] < totalTimes[j]
1707 })
1708
1709 // Calculate recent average
1710 var recentSum int64
1711 var recentMempoolSum int64
1712 var recentIndexSum int64
1713 var recentLoadSum int64
1714 recentBundleCount := 0
1715
1716 for _, t := range validRecent {
1717 recentSum += t.totalTime
1718 recentMempoolSum += t.mempoolTime
1719 if t.source == "bundle" {
1720 recentIndexSum += t.indexTime
1721 recentLoadSum += t.loadOpTime
1722 recentBundleCount++
1723 }
1724 }
1725
1726 stats["recent_avg_total_time_ms"] = float64(recentSum) / float64(len(validRecent)) / 1000.0
1727 stats["recent_avg_mempool_time_ms"] = float64(recentMempoolSum) / float64(len(validRecent)) / 1000.0
1728
1729 if recentBundleCount > 0 {
1730 stats["recent_avg_index_time_ms"] = float64(recentIndexSum) / float64(recentBundleCount) / 1000.0
1731 stats["recent_avg_load_time_ms"] = float64(recentLoadSum) / float64(recentBundleCount) / 1000.0
1732 }
1733
1734 stats["recent_sample_size"] = len(validRecent)
1735
1736 // Percentiles
1737 p50idx := len(totalTimes) * 50 / 100
1738 p95idx := len(totalTimes) * 95 / 100
1739 p99idx := len(totalTimes) * 99 / 100
1740
1741 stats["min_total_time_ms"] = float64(totalTimes[0]) / 1000.0
1742 stats["max_total_time_ms"] = float64(totalTimes[len(totalTimes)-1]) / 1000.0
1743
1744 if p50idx < len(totalTimes) {
1745 stats["p50_total_time_ms"] = float64(totalTimes[p50idx]) / 1000.0
1746 }
1747 if p95idx < len(totalTimes) {
1748 stats["p95_total_time_ms"] = float64(totalTimes[p95idx]) / 1000.0
1749 }
1750 if p99idx < len(totalTimes) {
1751 stats["p99_total_time_ms"] = float64(totalTimes[p99idx]) / 1000.0
1752 }
1753 }
1754
1755 return stats
1756}
1757
1758// ResetResolverStats resets resolver performance statistics
1759func (m *Manager) ResetResolverStats() {
1760 m.resolverStats.Lock()
1761 defer m.resolverStats.Unlock()
1762
1763 atomic.StoreInt64(&m.resolverStats.totalResolutions, 0)
1764 atomic.StoreInt64(&m.resolverStats.mempoolHits, 0)
1765 atomic.StoreInt64(&m.resolverStats.bundleHits, 0)
1766 atomic.StoreInt64(&m.resolverStats.errors, 0)
1767 atomic.StoreInt64(&m.resolverStats.totalTime, 0)
1768 atomic.StoreInt64(&m.resolverStats.totalMempoolTime, 0)
1769 atomic.StoreInt64(&m.resolverStats.totalIndexTime, 0)
1770 atomic.StoreInt64(&m.resolverStats.totalLoadOpTime, 0)
1771
1772 m.resolverStats.recentTimes = make([]resolverTiming, m.resolverStats.recentSize)
1773 m.resolverStats.recentIdx = 0
1774}
1775
1776func (m *Manager) SetQuiet(quiet bool) {
1777 m.config.Quiet = quiet
1778}
1779
1780// ShouldRebuildDIDIndex checks if DID index needs rebuilding
1781// Returns: (needsRebuild bool, reason string, canUpdateIncrementally bool)
1782func (m *Manager) ShouldRebuildDIDIndex() (bool, string, bool) {
1783 if m.didIndex == nil {
1784 return false, "DID index disabled", false
1785 }
1786
1787 needsRebuild, reason := m.didIndex.NeedsRebuild(m.GetBundleIndex())
1788
1789 if needsRebuild {
1790 return true, reason, false
1791 }
1792
1793 // Check if incremental update is better
1794 canIncremental, behindBy := m.didIndex.ShouldUpdateIncrementally(m.GetBundleIndex())
1795 if canIncremental {
1796 return false, fmt.Sprintf("can update incrementally (%d bundles)", behindBy), true
1797 }
1798
1799 return false, "index is up to date", false
1800}
1801
1802// UpdateDIDIndexSmart updates DID index intelligently (rebuild vs incremental)
1803func (m *Manager) UpdateDIDIndexSmart(ctx context.Context, progressCallback func(current, total int)) error {
1804 needsRebuild, reason, canIncremental := m.ShouldRebuildDIDIndex()
1805
1806 if !needsRebuild && !canIncremental {
1807 if m.config.Verbose {
1808 m.logger.Printf("DID index is up to date")
1809 }
1810 return nil
1811 }
1812
1813 if needsRebuild {
1814 m.logger.Printf("Rebuilding DID index: %s", reason)
1815 return m.BuildDIDIndex(ctx, progressCallback)
1816 }
1817
1818 if canIncremental {
1819 m.logger.Printf("Updating DID index incrementally: %s", reason)
1820 return m.updateDIDIndexIncremental(ctx, progressCallback)
1821 }
1822
1823 return nil
1824}
1825
1826// updateDIDIndexIncremental updates index for missing bundles only
1827func (m *Manager) updateDIDIndexIncremental(ctx context.Context, progressCallback func(current, total int)) error {
1828 config := m.didIndex.GetConfig()
1829 lastBundle := m.index.GetLastBundle()
1830
1831 if lastBundle == nil || config.LastBundle >= lastBundle.BundleNumber {
1832 return nil
1833 }
1834
1835 start := config.LastBundle + 1
1836 end := lastBundle.BundleNumber
1837 total := end - start + 1
1838
1839 m.logger.Printf("Updating DID index for bundles %d-%d (%d bundles)", start, end, total)
1840
1841 for bundleNum := start; bundleNum <= end; bundleNum++ {
1842 bundle, err := m.LoadBundle(ctx, bundleNum)
1843 if err != nil {
1844 return fmt.Errorf("failed to load bundle %d: %w", bundleNum, err)
1845 }
1846
1847 bundleData := &didindex.BundleData{
1848 BundleNumber: bundle.BundleNumber,
1849 Operations: bundle.Operations,
1850 }
1851
1852 if err := m.didIndex.UpdateIndexForBundle(ctx, bundleData); err != nil {
1853 return fmt.Errorf("failed to update bundle %d: %w", bundleNum, err)
1854 }
1855
1856 if progressCallback != nil {
1857 progressCallback(bundleNum-start+1, total)
1858 }
1859 }
1860
1861 return nil
1862}