[DEPRECATED] Go implementation of plcbundle
at main 1862 lines 54 kB view raw
1package bundle 2 3import ( 4 "context" 5 "fmt" 6 "io" 7 "log" 8 "os" 9 "path/filepath" 10 "runtime" 11 "runtime/debug" 12 "sort" 13 "strings" 14 "sync" 15 "sync/atomic" 16 "time" 17 18 "tangled.org/atscan.net/plcbundle-go/internal/bundleindex" 19 "tangled.org/atscan.net/plcbundle-go/internal/didindex" 20 "tangled.org/atscan.net/plcbundle-go/internal/handleresolver" 21 "tangled.org/atscan.net/plcbundle-go/internal/mempool" 22 "tangled.org/atscan.net/plcbundle-go/internal/plcclient" 23 "tangled.org/atscan.net/plcbundle-go/internal/storage" 24 internalsync "tangled.org/atscan.net/plcbundle-go/internal/sync" 25 "tangled.org/atscan.net/plcbundle-go/internal/types" 26) 27 28// defaultLogger is a simple logger implementation 29type defaultLogger struct{} 30 31func (d defaultLogger) Printf(format string, v ...interface{}) { 32 log.Printf(format, v...) 33} 34 35func (d defaultLogger) Println(v ...interface{}) { 36 log.Println(v...) 37} 38 39// Manager handles bundle operations 40type Manager struct { 41 config *Config 42 operations *storage.Operations 43 index *bundleindex.Index 44 indexPath string 45 logger types.Logger 46 mempool *mempool.Mempool 47 didIndex *didindex.Manager 48 49 plcClient *plcclient.Client 50 handleResolver *handleresolver.Client 51 52 syncer *internalsync.Fetcher 53 cloner *internalsync.Cloner 54 55 bundleCache map[int]*Bundle 56 cacheMu sync.RWMutex 57 maxCacheSize int 58 59 // Resolver performance tracking 60 resolverStats struct { 61 sync.Mutex 62 totalResolutions int64 63 mempoolHits int64 64 bundleHits int64 65 errors int64 66 67 // Timing (in microseconds) 68 totalTime int64 69 totalMempoolTime int64 70 totalIndexTime int64 71 totalLoadOpTime int64 72 73 // Recent timings (circular buffer) 74 recentTimes []resolverTiming 75 recentIdx int 76 recentSize int 77 } 78} 79 80// NewManager creates a new bundle manager 81func NewManager(config *Config, plcClient *plcclient.Client) (*Manager, error) { 82 if config == nil { 83 config = DefaultConfig("./plc_bundles") 84 } 85 86 if config.Logger == nil { 87 config.Logger = defaultLogger{} 88 } 89 90 // CHECK: Don't auto-create if repository doesn't exist 91 repoExists := repositoryExists(config.BundleDir) 92 93 if !repoExists && !config.AutoInit { 94 return nil, fmt.Errorf( 95 "no plcbundle repository found in: %s\n\n"+ 96 "Initialize a new repository with:\n"+ 97 " plcbundle clone <url> # Clone from remote\n"+ 98 " plcbundle sync # Fetch from PLC directory", 99 config.BundleDir, 100 ) 101 } 102 103 // Ensure directory exists (only if repo exists OR AutoInit is enabled) 104 if err := os.MkdirAll(config.BundleDir, 0755); err != nil { 105 return nil, fmt.Errorf("failed to create bundle directory: %w", err) 106 } 107 108 // Initialize operations handler 109 ops, err := storage.NewOperations(config.Logger, config.Verbose) 110 if err != nil { 111 return nil, fmt.Errorf("failed to initialize operations: %w", err) 112 } 113 114 // Determine origin 115 var origin string 116 if plcClient != nil { 117 origin = plcClient.GetBaseURL() 118 } 119 120 // Load or create index 121 indexPath := filepath.Join(config.BundleDir, bundleindex.INDEX_FILE) 122 index, err := bundleindex.LoadIndex(indexPath) 123 124 // Check for bundle files in directory 125 bundleFiles, _ := filepath.Glob(filepath.Join(config.BundleDir, "*.jsonl.zst")) 126 bundleFiles = filterBundleFiles(bundleFiles) 127 hasBundleFiles := len(bundleFiles) > 0 128 129 // Check if clone/download is in progress (look for .tmp files) 130 tmpFiles, _ := filepath.Glob(filepath.Join(config.BundleDir, "*.tmp")) 131 cloneInProgress := len(tmpFiles) > 0 132 133 needsRebuild := false 134 135 if err != nil { 136 // Index doesn't exist or is invalid 137 if hasBundleFiles { 138 if cloneInProgress { 139 config.Logger.Printf("Clone/download in progress, skipping auto-rebuild") 140 } else { 141 config.Logger.Printf("No valid index found, but detected %d bundle files", len(bundleFiles)) 142 needsRebuild = true 143 } 144 } else { 145 // No index and no bundles - create fresh index 146 config.Logger.Printf("Creating new index at %s", indexPath) 147 index = bundleindex.NewIndex(origin) 148 if err := index.Save(indexPath); err != nil { 149 return nil, fmt.Errorf("failed to save new index: %w", err) 150 } 151 } 152 } else { 153 // Index exists - auto-populate origin if missing 154 if index.Origin == "" { 155 if origin != "" { 156 config.Logger.Printf("⚠️ Upgrading old index: setting origin to %s", origin) 157 index.Origin = origin 158 if err := index.Save(indexPath); err != nil { 159 return nil, fmt.Errorf("failed to update index with origin: %w", err) 160 } 161 } else { 162 config.Logger.Printf("⚠️ Warning: index has no origin and no PLC client configured") 163 } 164 } 165 166 // Validate origin matches if both are set 167 if index.Origin != "" && origin != "" && index.Origin != origin { 168 return nil, fmt.Errorf( 169 "origin mismatch: index has origin %q but PLC client points to %q\n"+ 170 "Cannot mix bundles from different sources. Use a different directory or reconfigure PLC client", 171 index.Origin, origin, 172 ) 173 } 174 175 if config.Verbose { 176 config.Logger.Printf("Loaded index with %d bundles (origin: %s)", index.Count(), index.Origin) 177 } 178 179 // Check if there are bundle files not in the index 180 if hasBundleFiles && len(bundleFiles) > index.Count() { 181 if cloneInProgress { 182 config.Logger.Printf("Clone/download in progress (%d .tmp files), skipping auto-rebuild", len(tmpFiles)) 183 } else { 184 config.Logger.Printf("Detected %d bundle files but index only has %d entries - rebuilding", 185 len(bundleFiles), index.Count()) 186 needsRebuild = true 187 } 188 } 189 } 190 191 if index != nil && plcClient != nil { 192 currentOrigin := plcClient.GetBaseURL() 193 194 // Check if origins match 195 if index.Origin != "" && index.Origin != currentOrigin { 196 return nil, fmt.Errorf( 197 "origin mismatch: index has origin %q but PLC client points to %q. "+ 198 "Cannot mix bundles from different sources", 199 index.Origin, currentOrigin, 200 ) 201 } 202 203 // Set origin if not set (for backward compatibility with old indexes) 204 if index.Origin == "" && currentOrigin != "" { 205 index.Origin = currentOrigin 206 config.Logger.Printf("Setting origin for existing index: %s", currentOrigin) 207 if err := index.Save(indexPath); err != nil { 208 return nil, fmt.Errorf("failed to update index with origin: %w", err) 209 } 210 } 211 } 212 213 // Perform rebuild if needed (using parallel scan) 214 if needsRebuild && config.AutoRebuild { 215 config.Logger.Printf("Rebuilding index from %d bundle files...", len(bundleFiles)) 216 217 // Create temporary manager for scanning 218 tempMgr := &Manager{ 219 config: config, 220 operations: ops, 221 index: bundleindex.NewIndex("test-origin"), 222 indexPath: indexPath, 223 logger: config.Logger, 224 } 225 226 // Use parallel scan with auto-detected CPU count 227 workers := config.RebuildWorkers 228 if workers <= 0 { 229 workers = runtime.NumCPU() 230 if workers < 1 { 231 workers = 1 232 } 233 } 234 235 config.Logger.Printf("Using %d workers for parallel scan", workers) 236 237 // Create progress callback wrapper with new signature 238 var progressCallback func(current, total int, bytesProcessed int64) 239 if config.RebuildProgress != nil { 240 // Wrap the old-style callback to work with new signature 241 oldCallback := config.RebuildProgress 242 progressCallback = func(current, total int, bytesProcessed int64) { 243 oldCallback(current, total) 244 } 245 } else { 246 // Default: log every 100 bundles 247 progressCallback = func(current, total int, bytesProcessed int64) { 248 if current%100 == 0 || current == total { 249 mbProcessed := float64(bytesProcessed) / (1024 * 1024) 250 config.Logger.Printf("Rebuild progress: %d/%d bundles (%.1f%%), %.1f MB processed", 251 current, total, float64(current)/float64(total)*100, mbProcessed) 252 } 253 } 254 } 255 256 start := time.Now() 257 258 // Scan directory to rebuild index (parallel) 259 result, err := tempMgr.ScanDirectoryParallel(workers, progressCallback) 260 if err != nil { 261 return nil, fmt.Errorf("failed to rebuild index: %w", err) 262 } 263 264 elapsed := time.Since(start) 265 266 // Reload the rebuilt index 267 index, err = bundleindex.LoadIndex(indexPath) 268 if err != nil { 269 return nil, fmt.Errorf("failed to load rebuilt index: %w", err) 270 } 271 272 // Calculate throughput 273 mbPerSec := float64(result.TotalUncompressed) / elapsed.Seconds() / (1024 * 1024) 274 275 config.Logger.Printf("✓ Index rebuilt with %d bundles in %s", 276 index.Count(), elapsed.Round(time.Millisecond)) 277 config.Logger.Printf(" Speed: %.1f bundles/sec, %.1f MB/s (uncompressed)", 278 float64(result.BundleCount)/elapsed.Seconds(), mbPerSec) 279 280 // Verify all chain hashes are present 281 bundles := index.GetBundles() 282 missingHashes := 0 283 for i, meta := range bundles { 284 if meta.ContentHash == "" { 285 missingHashes++ 286 } 287 if i > 0 && meta.Hash == "" { 288 missingHashes++ 289 } 290 } 291 if missingHashes > 0 { 292 config.Logger.Printf("⚠️ Warning: %d bundles have missing hashes", missingHashes) 293 } 294 } 295 296 if index == nil { 297 index = bundleindex.NewIndex("test-origin") 298 } 299 300 // Initialize mempool for next bundle 301 lastBundle := index.GetLastBundle() 302 nextBundleNum := 1 303 var minTimestamp time.Time 304 305 if lastBundle != nil { 306 nextBundleNum = lastBundle.BundleNumber + 1 307 minTimestamp = lastBundle.EndTime 308 } 309 310 mempool, err := mempool.NewMempool(config.BundleDir, nextBundleNum, minTimestamp, config.Logger, config.Verbose) 311 if err != nil { 312 return nil, fmt.Errorf("failed to initialize mempool: %w", err) 313 } 314 315 // Initialize DID index manager 316 didIndex := didindex.NewManager(config.BundleDir, config.Logger) 317 318 // Initialize sync components 319 fetcher := internalsync.NewFetcher(plcClient, ops, config.Logger) 320 cloner := internalsync.NewCloner(ops, config.BundleDir, config.Logger) 321 322 // Initialize handle resolver if configured 323 var handleResolver *handleresolver.Client 324 if config.HandleResolverURL != "" { 325 handleResolver = handleresolver.NewClient(config.HandleResolverURL) 326 } 327 328 m := &Manager{ 329 config: config, 330 operations: ops, 331 index: index, 332 indexPath: indexPath, 333 logger: config.Logger, 334 mempool: mempool, 335 didIndex: didIndex, // Updated type 336 bundleCache: make(map[int]*Bundle), 337 maxCacheSize: 10, 338 syncer: fetcher, 339 cloner: cloner, 340 plcClient: plcClient, 341 handleResolver: handleResolver, 342 } 343 // Initialize resolver stats 344 m.resolverStats.recentSize = 1000 345 m.resolverStats.recentTimes = make([]resolverTiming, 1000) 346 347 return m, nil 348} 349 350// Close cleans up resources 351func (m *Manager) Close() { 352 if m.operations != nil { 353 m.operations.Close() 354 } 355 if m.plcClient != nil { 356 m.plcClient.Close() 357 } 358 if m.mempool != nil { 359 if err := m.mempool.Save(); err != nil { 360 m.logger.Printf("Warning: failed to save mempool: %v", err) 361 } 362 } 363 if m.didIndex != nil { 364 m.didIndex.Close() 365 } 366} 367 368// GetIndex returns the current index 369func (m *Manager) GetIndex() *bundleindex.Index { 370 return m.index 371} 372 373// SaveIndex saves the index to disk 374func (m *Manager) SaveIndex() error { 375 return m.index.Save(m.indexPath) 376} 377 378// LoadBundle with caching 379func (m *Manager) LoadBundle(ctx context.Context, bundleNumber int) (*Bundle, error) { 380 // Check cache first 381 m.cacheMu.RLock() 382 if cached, ok := m.bundleCache[bundleNumber]; ok { 383 m.cacheMu.RUnlock() 384 return cached, nil 385 } 386 m.cacheMu.RUnlock() 387 388 // Load from disk (existing code) 389 bundle, err := m.loadBundleFromDisk(ctx, bundleNumber) 390 if err != nil { 391 return nil, err 392 } 393 394 // Add to cache 395 m.cacheMu.Lock() 396 m.bundleCache[bundleNumber] = bundle 397 398 // Simple LRU: if cache too big, remove oldest 399 if len(m.bundleCache) > m.maxCacheSize { 400 // Remove a random one (or implement proper LRU) 401 for k := range m.bundleCache { 402 delete(m.bundleCache, k) 403 break 404 } 405 } 406 m.cacheMu.Unlock() 407 408 return bundle, nil 409} 410 411// loadBundleFromDisk loads a bundle from disk 412func (m *Manager) loadBundleFromDisk(_ context.Context, bundleNumber int) (*Bundle, error) { 413 // Get metadata from index 414 meta, err := m.index.GetBundle(bundleNumber) 415 if err != nil { 416 return nil, fmt.Errorf("bundle not in index: %w", err) 417 } 418 419 // Load file 420 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", bundleNumber)) 421 if !m.operations.FileExists(path) { 422 return nil, fmt.Errorf("bundle file not found: %s", path) 423 } 424 425 // Verify hash if enabled 426 if m.config.VerifyOnLoad { 427 valid, actualHash, err := m.operations.VerifyHash(path, meta.CompressedHash) 428 if err != nil { 429 return nil, fmt.Errorf("failed to verify hash: %w", err) 430 } 431 if !valid { 432 return nil, fmt.Errorf("hash mismatch: expected %s, got %s", meta.CompressedHash, actualHash) 433 } 434 } 435 436 // Load operations 437 operations, err := m.operations.LoadBundle(path) 438 if err != nil { 439 return nil, fmt.Errorf("failed to load bundle: %w", err) 440 } 441 442 // Create bundle struct 443 bundle := &Bundle{ 444 BundleNumber: meta.BundleNumber, 445 StartTime: meta.StartTime, 446 EndTime: meta.EndTime, 447 Operations: operations, 448 DIDCount: meta.DIDCount, 449 Hash: meta.Hash, 450 ContentHash: meta.ContentHash, 451 Parent: meta.Parent, 452 CompressedHash: meta.CompressedHash, 453 CompressedSize: meta.CompressedSize, 454 UncompressedSize: meta.UncompressedSize, 455 Cursor: meta.Cursor, 456 Compressed: true, 457 CreatedAt: meta.CreatedAt, 458 } 459 460 return bundle, nil 461} 462 463// SaveBundle saves a bundle to disk and updates the index 464func (m *Manager) SaveBundle(ctx context.Context, bundle *Bundle, verbose bool, quiet bool, stats types.BundleProductionStats, skipDIDIndex bool) (time.Duration, error) { 465 466 totalStart := time.Now() 467 if err := bundle.ValidateForSave(); err != nil { 468 return 0, fmt.Errorf("bundle validation failed: %w", err) 469 } 470 471 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", bundle.BundleNumber)) 472 473 // Get parent 474 var parent string 475 if bundle.BundleNumber > 1 { 476 prevBundle := m.index.GetLastBundle() 477 if prevBundle != nil { 478 parent = prevBundle.Hash 479 } else { 480 if prevMeta, err := m.index.GetBundle(bundle.BundleNumber - 1); err == nil { 481 parent = prevMeta.Hash 482 } 483 } 484 } 485 bundle.Parent = parent 486 487 // Get origin 488 origin := m.index.Origin 489 if m.plcClient != nil { 490 origin = m.plcClient.GetBaseURL() 491 } 492 493 // Get version 494 version := "dev" 495 if info, ok := debug.ReadBuildInfo(); ok && info.Main.Version != "" && info.Main.Version != "(devel)" { 496 version = info.Main.Version 497 } 498 499 // Get hostname 500 hostname, _ := os.Hostname() 501 502 // Create BundleInfo 503 bundleInfo := &storage.BundleInfo{ 504 BundleNumber: bundle.BundleNumber, 505 Origin: origin, 506 ParentHash: parent, 507 Cursor: bundle.Cursor, 508 CreatedBy: fmt.Sprintf("plcbundle/%s", version), 509 Hostname: hostname, 510 } 511 512 if m.config.Verbose { 513 m.logger.Printf("DEBUG: Calling operations.SaveBundle with bundle=%d", bundleInfo.BundleNumber) 514 } 515 516 // Save to disk with 3 parameters 517 uncompressedHash, compressedHash, uncompressedSize, compressedSize, err := m.operations.SaveBundle(path, bundle.Operations, bundleInfo) 518 if err != nil { 519 m.logger.Printf("DEBUG: SaveBundle FAILED: %v", err) 520 return 0, fmt.Errorf("failed to save bundle: %w", err) 521 } 522 523 if m.config.Verbose { 524 m.logger.Printf("DEBUG: SaveBundle SUCCESS, setting bundle fields") 525 } 526 527 bundle.Hash = m.operations.CalculateChainHash(parent, bundle.ContentHash) 528 bundle.ContentHash = uncompressedHash 529 bundle.CompressedHash = compressedHash 530 bundle.UncompressedSize = uncompressedSize 531 bundle.CompressedSize = compressedSize 532 bundle.CreatedAt = time.Now().UTC() 533 bundle.Hash = m.operations.CalculateChainHash(parent, bundle.ContentHash) 534 535 if m.config.Verbose { 536 m.logger.Printf("DEBUG: Adding bundle %d to index", bundle.BundleNumber) 537 } 538 539 // Add to index 540 m.index.AddBundle(bundle.ToMetadata()) 541 542 if m.config.Verbose { 543 m.logger.Printf("DEBUG: Index now has %d bundles", m.index.Count()) 544 } 545 546 // Save index 547 if err := m.SaveIndex(); err != nil { 548 m.logger.Printf("DEBUG: SaveIndex FAILED: %v", err) 549 return 0, fmt.Errorf("failed to save index: %w", err) 550 } 551 552 if m.config.Verbose { 553 m.logger.Printf("DEBUG: Index saved, last bundle = %d", m.index.GetLastBundle().BundleNumber) 554 } 555 556 saveDuration := time.Since(totalStart) 557 558 // Clean up old mempool 559 oldMempoolFile := m.mempool.GetFilename() 560 if err := m.mempool.Delete(); err != nil && !quiet { 561 m.logger.Printf("Warning: failed to delete old mempool %s: %v", oldMempoolFile, err) 562 } 563 564 // Create new mempool 565 nextBundle := bundle.BundleNumber + 1 566 minTimestamp := bundle.EndTime 567 568 newMempool, err := mempool.NewMempool(m.config.BundleDir, nextBundle, minTimestamp, m.logger, m.config.Verbose) 569 if err != nil { 570 return 0, fmt.Errorf("failed to create new mempool: %w", err) 571 } 572 573 m.mempool = newMempool 574 575 // DID index update (if enabled) 576 var indexUpdateDuration time.Duration 577 if !skipDIDIndex && m.didIndex != nil && m.didIndex.Exists() { 578 indexUpdateStart := time.Now() 579 if err := m.updateDIDIndexForBundle(ctx, bundle); err != nil { 580 m.logger.Printf("Warning: failed to update DID index: %v", err) 581 } else { 582 indexUpdateDuration = time.Since(indexUpdateStart) 583 if !quiet && m.config.Verbose { 584 m.logger.Printf(" [DID Index] Updated in %s", indexUpdateDuration) 585 } 586 } 587 } 588 589 if !quiet { 590 msg := fmt.Sprintf("→ Bundle %06d | %s | fetch: %s (%d reqs)", 591 bundle.BundleNumber, 592 bundle.Hash[0:7], 593 stats.TotalDuration.Round(time.Millisecond), 594 stats.TotalFetches, 595 ) 596 if indexUpdateDuration > 0 { 597 msg += fmt.Sprintf(" | index: %s", indexUpdateDuration.Round(time.Millisecond)) 598 } 599 msg += fmt.Sprintf(" | %s", formatTimeDistance(time.Since(bundle.EndTime))) 600 m.logger.Println(msg) 601 } 602 603 if m.config.Verbose { 604 m.logger.Printf("DEBUG: Bundle done = %d, finish duration = %s", 605 m.index.GetLastBundle().BundleNumber, 606 saveDuration.Round(time.Millisecond)) 607 } 608 609 return indexUpdateDuration, nil 610} 611 612// GetMempoolStats returns mempool statistics 613func (m *Manager) GetMempoolStats() map[string]interface{} { 614 return m.mempool.Stats() 615} 616 617// GetMempoolOperations returns all operations currently in mempool 618func (m *Manager) GetMempoolOperations() ([]plcclient.PLCOperation, error) { 619 if m.mempool == nil { 620 return nil, fmt.Errorf("mempool not initialized") 621 } 622 623 count := m.mempool.Count() 624 if count == 0 { 625 return []plcclient.PLCOperation{}, nil 626 } 627 628 return m.mempool.Peek(count), nil 629} 630 631// Add to Bundle type to implement BundleData interface 632func (b *Bundle) GetBundleNumber() int { 633 return b.BundleNumber 634} 635 636// VerifyBundle verifies a bundle's integrity 637func (m *Manager) VerifyBundle(ctx context.Context, bundleNumber int) (*VerificationResult, error) { 638 result := &VerificationResult{ 639 BundleNumber: bundleNumber, 640 } 641 642 // Get from index 643 meta, err := m.index.GetBundle(bundleNumber) 644 if err != nil { 645 result.Error = err 646 return result, nil 647 } 648 649 result.ExpectedHash = meta.CompressedHash 650 651 // Check file exists 652 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", bundleNumber)) 653 result.FileExists = m.operations.FileExists(path) 654 if !result.FileExists { 655 result.Error = fmt.Errorf("file not found") 656 return result, nil 657 } 658 659 // Verify BOTH compressed and content hashes 660 compHash, compSize, contentHash, contentSize, err := m.operations.CalculateFileHashes(path) 661 if err != nil { 662 result.Error = err 663 return result, nil 664 } 665 666 result.LocalHash = compHash 667 668 // Verify compressed hash 669 if compHash != meta.CompressedHash { 670 result.HashMatch = false 671 result.Valid = false 672 result.Error = fmt.Errorf("compressed hash mismatch: expected %s, got %s", meta.CompressedHash, compHash) 673 return result, nil 674 } 675 676 // Verify content hash 677 if contentHash != meta.ContentHash { 678 result.HashMatch = false 679 result.Valid = false 680 result.Error = fmt.Errorf("content hash mismatch: expected %s, got %s", meta.ContentHash, contentHash) 681 return result, nil 682 } 683 684 // Verify sizes match 685 if compSize != meta.CompressedSize { 686 result.Valid = false 687 result.Error = fmt.Errorf("compressed size mismatch: expected %d, got %d", meta.CompressedSize, compSize) 688 return result, nil 689 } 690 691 if contentSize != meta.UncompressedSize { 692 result.Valid = false 693 result.Error = fmt.Errorf("uncompressed size mismatch: expected %d, got %d", meta.UncompressedSize, contentSize) 694 return result, nil 695 } 696 697 result.HashMatch = true 698 result.Valid = true 699 700 return result, nil 701} 702 703// GetInfo returns information about the bundle manager 704func (m *Manager) GetInfo() map[string]interface{} { 705 stats := m.index.GetStats() 706 stats["bundle_dir"] = m.config.BundleDir 707 stats["index_path"] = m.indexPath 708 stats["verify_on_load"] = m.config.VerifyOnLoad 709 return stats 710} 711 712// ExportOperations exports operations from bundles 713func (m *Manager) ExportOperations(ctx context.Context, afterTime time.Time, count int) ([]plcclient.PLCOperation, error) { 714 if count <= 0 { 715 count = 1000 716 } 717 718 var result []plcclient.PLCOperation 719 seenCIDs := make(map[string]bool) 720 721 bundles := m.index.GetBundles() 722 723 for _, meta := range bundles { 724 if result != nil && len(result) >= count { 725 break 726 } 727 728 // Skip bundles before afterTime 729 if !afterTime.IsZero() && meta.EndTime.Before(afterTime) { 730 continue 731 } 732 733 // Load bundle 734 bundle, err := m.LoadBundle(ctx, meta.BundleNumber) 735 if err != nil { 736 m.logger.Printf("Warning: failed to load bundle %d: %v", meta.BundleNumber, err) 737 continue 738 } 739 740 // Add operations 741 for _, op := range bundle.Operations { 742 if !afterTime.IsZero() && op.CreatedAt.Before(afterTime) { 743 continue 744 } 745 746 if seenCIDs[op.CID] { 747 continue 748 } 749 750 seenCIDs[op.CID] = true 751 result = append(result, op) 752 753 if len(result) >= count { 754 break 755 } 756 } 757 } 758 759 return result, nil 760} 761 762// IsBundleIndexed checks if a bundle is already in the index 763func (m *Manager) IsBundleIndexed(bundleNumber int) bool { 764 _, err := m.index.GetBundle(bundleNumber) 765 return err == nil 766} 767 768// RefreshMempool reloads mempool from disk 769func (m *Manager) RefreshMempool() error { 770 if m.mempool == nil { 771 return fmt.Errorf("mempool not initialized") 772 } 773 return m.mempool.Load() 774} 775 776// ClearMempool clears all operations from the mempool and saves 777func (m *Manager) ClearMempool() error { 778 if m.mempool == nil { 779 return fmt.Errorf("mempool not initialized") 780 } 781 782 m.logger.Printf("Clearing mempool...") 783 784 count := m.mempool.Count() 785 786 m.mempool.Clear() 787 788 if err := m.mempool.Save(); err != nil { 789 return fmt.Errorf("failed to save mempool: %w", err) 790 } 791 792 m.logger.Printf("Cleared %d operations from mempool", count) 793 794 return nil 795} 796 797// ValidateMempool validates mempool 798func (m *Manager) ValidateMempool() error { 799 if m.mempool == nil { 800 return fmt.Errorf("mempool not initialized") 801 } 802 return m.mempool.Validate() 803} 804 805// StreamBundleRaw streams the raw compressed bundle file 806func (m *Manager) StreamBundleRaw(ctx context.Context, bundleNumber int) (io.ReadCloser, error) { 807 // Get metadata from index 808 meta, err := m.index.GetBundle(bundleNumber) 809 if err != nil { 810 return nil, fmt.Errorf("bundle not in index: %w", err) 811 } 812 813 // Build file path 814 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", bundleNumber)) 815 if !m.operations.FileExists(path) { 816 return nil, fmt.Errorf("bundle file not found: %s", path) 817 } 818 819 // Optionally verify hash before streaming 820 if m.config.VerifyOnLoad { 821 valid, actualHash, err := m.operations.VerifyHash(path, meta.CompressedHash) 822 if err != nil { 823 return nil, fmt.Errorf("failed to verify hash: %w", err) 824 } 825 if !valid { 826 return nil, fmt.Errorf("hash mismatch: expected %s, got %s", meta.CompressedHash, actualHash) 827 } 828 } 829 830 return m.operations.StreamRaw(path) 831} 832 833// StreamBundleDecompressed streams the decompressed bundle data as JSONL 834func (m *Manager) StreamBundleDecompressed(ctx context.Context, bundleNumber int) (io.ReadCloser, error) { 835 // Get metadata from index 836 _, err := m.index.GetBundle(bundleNumber) 837 if err != nil { 838 return nil, fmt.Errorf("bundle not in index: %w", err) 839 } 840 841 // Build file path 842 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", bundleNumber)) 843 if !m.operations.FileExists(path) { 844 return nil, fmt.Errorf("bundle file not found: %s", path) 845 } 846 847 return m.operations.StreamDecompressed(path) 848} 849 850// RefreshIndex reloads the index from disk if it has been modified 851func (m *Manager) RefreshIndex() error { 852 // Check if index file has been modified 853 info, err := os.Stat(m.indexPath) 854 if err != nil { 855 return err 856 } 857 858 // If index was modified after we loaded it, reload 859 if info.ModTime().After(m.index.UpdatedAt) { 860 m.logger.Printf("Index file modified, reloading...") 861 862 newIndex, err := bundleindex.LoadIndex(m.indexPath) 863 if err != nil { 864 return fmt.Errorf("failed to reload index: %w", err) 865 } 866 867 m.index = newIndex 868 m.logger.Printf("Index reloaded: %d bundles", m.index.Count()) 869 } 870 871 return nil 872} 873 874// GetMempool returns the current mempool 875func (m *Manager) GetMempool() *mempool.Mempool { 876 return m.mempool 877} 878 879// SaveMempool saves the current mempool state to disk 880func (m *Manager) SaveMempool() error { 881 if m.mempool == nil { 882 return fmt.Errorf("mempool not initialized") 883 } 884 return m.mempool.Save() 885} 886 887// GetPLCOrigin returns the PLC directory origin URL 888func (m *Manager) GetPLCOrigin() string { 889 if m.plcClient == nil { 890 return "" 891 } 892 return m.plcClient.GetBaseURL() 893} 894 895// GetCurrentCursor returns the current latest cursor position (including mempool) 896func (m *Manager) GetCurrentCursor() int { 897 index := m.GetIndex() 898 bundles := index.GetBundles() 899 cursor := len(bundles) * types.BUNDLE_SIZE 900 901 // Add mempool operations 902 mempoolStats := m.GetMempoolStats() 903 if count, ok := mempoolStats["count"].(int); ok { 904 cursor += count 905 } 906 907 return cursor 908} 909 910// LoadOperation loads a single operation from a bundle efficiently 911func (m *Manager) LoadOperation(ctx context.Context, bundleNumber int, position int) (*plcclient.PLCOperation, error) { 912 // Validate position 913 if position < 0 || position >= types.BUNDLE_SIZE { 914 return nil, fmt.Errorf("invalid position: %d (must be 0-%d)", position, types.BUNDLE_SIZE-1) 915 } 916 917 // Validate bundle exists in index 918 _, err := m.index.GetBundle(bundleNumber) 919 if err != nil { 920 return nil, fmt.Errorf("bundle not in index: %w", err) 921 } 922 923 // Build file path 924 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", bundleNumber)) 925 if !m.operations.FileExists(path) { 926 return nil, fmt.Errorf("bundle file not found: %s", path) 927 } 928 929 // Load just the one operation (optimized - decompresses only until position) 930 return m.operations.LoadOperationAtPosition(path, position) 931} 932 933// LoadOperations loads multiple operations from a bundle efficiently 934func (m *Manager) LoadOperations(ctx context.Context, bundleNumber int, positions []int) (map[int]*plcclient.PLCOperation, error) { 935 // Build file path 936 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", bundleNumber)) 937 if !m.operations.FileExists(path) { 938 return nil, fmt.Errorf("bundle file not found: %s", path) 939 } 940 941 return m.operations.LoadOperationsAtPositions(path, positions) 942} 943 944// filterBundleFiles filters out files starting with . or _ 945func filterBundleFiles(files []string) []string { 946 filtered := make([]string, 0, len(files)) 947 for _, file := range files { 948 basename := filepath.Base(file) 949 if len(basename) > 0 && (basename[0] == '.' || basename[0] == '_') { 950 continue 951 } 952 filtered = append(filtered, file) 953 } 954 return filtered 955} 956 957// ========================================== 958// DID INDEX INTEGRATION (adapter methods) 959// ========================================== 960 961// Implement BundleProvider interface for didindex 962func (m *Manager) LoadBundleForDIDIndex(ctx context.Context, bundleNumber int) (*didindex.BundleData, error) { 963 bundle, err := m.LoadBundle(ctx, bundleNumber) 964 if err != nil { 965 return nil, err 966 } 967 968 return &didindex.BundleData{ 969 BundleNumber: bundle.BundleNumber, 970 Operations: bundle.Operations, 971 }, nil 972} 973 974func (m *Manager) GetBundleIndex() didindex.BundleIndexProvider { 975 return &bundleIndexAdapter{index: m.index} 976} 977 978// bundleIndexAdapter adapts Index to BundleIndexProvider interface 979type bundleIndexAdapter struct { 980 index *bundleindex.Index 981} 982 983func (a *bundleIndexAdapter) GetBundles() []*didindex.BundleMetadata { 984 bundles := a.index.GetBundles() 985 result := make([]*didindex.BundleMetadata, len(bundles)) 986 for i, b := range bundles { 987 result[i] = &didindex.BundleMetadata{ 988 BundleNumber: b.BundleNumber, 989 StartTime: b.StartTime, 990 EndTime: b.EndTime, 991 } 992 } 993 return result 994} 995 996func (a *bundleIndexAdapter) GetBundle(bundleNumber int) (*didindex.BundleMetadata, error) { 997 meta, err := a.index.GetBundle(bundleNumber) 998 if err != nil { 999 return nil, err 1000 } 1001 return &didindex.BundleMetadata{ 1002 BundleNumber: meta.BundleNumber, 1003 StartTime: meta.StartTime, 1004 EndTime: meta.EndTime, 1005 }, nil 1006} 1007 1008func (a *bundleIndexAdapter) GetLastBundle() *didindex.BundleMetadata { 1009 meta := a.index.GetLastBundle() 1010 if meta == nil { 1011 return nil 1012 } 1013 return &didindex.BundleMetadata{ 1014 BundleNumber: meta.BundleNumber, 1015 StartTime: meta.StartTime, 1016 EndTime: meta.EndTime, 1017 } 1018} 1019 1020// GetDIDIndex returns the DID index manager 1021func (m *Manager) GetDIDIndex() *didindex.Manager { 1022 return m.didIndex 1023} 1024 1025// BuildDIDIndex builds the complete DID index 1026func (m *Manager) BuildDIDIndex(ctx context.Context, progressCallback func(current, total int)) error { 1027 if m.didIndex == nil { 1028 m.didIndex = didindex.NewManager(m.config.BundleDir, m.logger) 1029 } 1030 1031 return m.didIndex.BuildIndexFromScratch(ctx, m, progressCallback) 1032} 1033 1034// updateDIDIndexForBundle updates index when a new bundle is added 1035func (m *Manager) updateDIDIndexForBundle(ctx context.Context, bundle *Bundle) error { 1036 if m.didIndex == nil { 1037 return nil 1038 } 1039 1040 // Convert to didindex.BundleData 1041 bundleData := &didindex.BundleData{ 1042 BundleNumber: bundle.BundleNumber, 1043 Operations: bundle.Operations, 1044 } 1045 1046 return m.didIndex.UpdateIndexForBundle(ctx, bundleData) 1047} 1048 1049// GetDIDIndexStats returns DID index statistics 1050func (m *Manager) GetDIDIndexStats() map[string]interface{} { 1051 if m.didIndex == nil { 1052 return map[string]interface{}{ 1053 "enabled": false, 1054 } 1055 } 1056 1057 stats := m.didIndex.GetStats() 1058 stats["enabled"] = true 1059 stats["exists"] = m.didIndex.Exists() 1060 1061 indexedDIDs := stats["total_dids"].(int64) 1062 1063 // Get unique DIDs from mempool 1064 mempoolDIDCount := int64(0) 1065 if m.mempool != nil { 1066 mempoolStats := m.GetMempoolStats() 1067 if didCount, ok := mempoolStats["did_count"].(int); ok { 1068 mempoolDIDCount = int64(didCount) 1069 } 1070 } 1071 1072 stats["indexed_dids"] = indexedDIDs 1073 stats["mempool_dids"] = mempoolDIDCount 1074 stats["total_dids"] = indexedDIDs + mempoolDIDCount 1075 1076 return stats 1077} 1078 1079// GetDIDOperations retrieves all operations for a DID (bundles + mempool combined) 1080// Returns: operations only, operations with locations, error 1081func (m *Manager) GetDIDOperations(ctx context.Context, did string, verbose bool) ([]plcclient.PLCOperation, []PLCOperationWithLocation, error) { 1082 if err := plcclient.ValidateDIDFormat(did); err != nil { 1083 return nil, nil, err 1084 } 1085 1086 // Set verbose mode 1087 if m.didIndex != nil { 1088 m.didIndex.SetVerbose(verbose) 1089 } 1090 1091 // Get bundled operations from DID index (includes nullified) 1092 bundledOpsWithLoc, err := m.didIndex.GetDIDOperations(ctx, did, m) 1093 if err != nil { 1094 return nil, nil, err 1095 } 1096 1097 // Convert to bundle types 1098 opsWithLoc := make([]PLCOperationWithLocation, len(bundledOpsWithLoc)) 1099 bundledOps := make([]plcclient.PLCOperation, len(bundledOpsWithLoc)) 1100 for i, r := range bundledOpsWithLoc { 1101 opsWithLoc[i] = PLCOperationWithLocation{ 1102 Operation: r.Operation, 1103 Bundle: r.Bundle, 1104 Position: r.Position, 1105 } 1106 bundledOps[i] = r.Operation 1107 } 1108 1109 // Get mempool operations 1110 mempoolOps, err := m.GetDIDOperationsFromMempool(did) 1111 if err != nil { 1112 return nil, nil, err 1113 } 1114 1115 if len(mempoolOps) > 0 && verbose { 1116 m.logger.Printf("DEBUG: Found %d operations in mempool", len(mempoolOps)) 1117 } 1118 1119 // Combine operations (for the slice return) 1120 allOps := append(bundledOps, mempoolOps...) 1121 1122 sort.Slice(allOps, func(i, j int) bool { 1123 return allOps[i].CreatedAt.Before(allOps[j].CreatedAt) 1124 }) 1125 1126 return allOps, opsWithLoc, nil 1127} 1128 1129// GetDIDOperationsFromMempool retrieves operations for a DID from mempool only 1130func (m *Manager) GetDIDOperationsFromMempool(did string) ([]plcclient.PLCOperation, error) { 1131 if m.mempool == nil { 1132 return []plcclient.PLCOperation{}, nil 1133 } 1134 1135 // Use direct search - only copies matching operations 1136 return m.mempool.FindDIDOperations(did), nil 1137} 1138 1139// GetLatestDIDOperation returns only the most recent non-nullified operation 1140func (m *Manager) GetLatestDIDOperation(ctx context.Context, did string) (*plcclient.PLCOperation, error) { 1141 if err := plcclient.ValidateDIDFormat(did); err != nil { 1142 return nil, err 1143 } 1144 1145 // Check mempool first (most recent data) 1146 mempoolOps, _ := m.GetDIDOperationsFromMempool(did) 1147 if len(mempoolOps) > 0 { 1148 for i := len(mempoolOps) - 1; i >= 0; i-- { 1149 if !mempoolOps[i].IsNullified() { 1150 return &mempoolOps[i], nil 1151 } 1152 } 1153 } 1154 1155 // Delegate to DID index for bundled operations 1156 return m.didIndex.GetLatestDIDOperation(ctx, did, m) 1157} 1158 1159// VerifyChain verifies the entire bundle chain 1160func (m *Manager) VerifyChain(ctx context.Context) (*ChainVerificationResult, error) { 1161 result := &ChainVerificationResult{ 1162 VerifiedBundles: make([]int, 0), 1163 } 1164 1165 bundles := m.index.GetBundles() 1166 if len(bundles) == 0 { 1167 result.Valid = true 1168 return result, nil 1169 } 1170 1171 result.ChainLength = len(bundles) 1172 1173 for i, meta := range bundles { 1174 // Verify file hash 1175 vr, err := m.VerifyBundle(ctx, meta.BundleNumber) 1176 if err != nil || !vr.Valid { 1177 result.Error = fmt.Sprintf("Bundle %d hash verification failed", meta.BundleNumber) 1178 result.BrokenAt = meta.BundleNumber 1179 return result, nil 1180 } 1181 1182 // Verify chain link 1183 if i > 0 { 1184 prevMeta := bundles[i-1] 1185 1186 // Check parent reference 1187 if meta.Parent != prevMeta.Hash { 1188 result.Error = fmt.Sprintf("Chain broken at bundle %d: parent mismatch", meta.BundleNumber) 1189 result.BrokenAt = meta.BundleNumber 1190 return result, nil 1191 } 1192 1193 // Verify chain hash calculation 1194 expectedHash := m.operations.CalculateChainHash(prevMeta.Hash, meta.ContentHash) 1195 if meta.Hash != expectedHash { 1196 result.Error = fmt.Sprintf("Chain broken at bundle %d: hash mismatch", meta.BundleNumber) 1197 result.BrokenAt = meta.BundleNumber 1198 return result, nil 1199 } 1200 } 1201 1202 result.VerifiedBundles = append(result.VerifiedBundles, meta.BundleNumber) 1203 } 1204 1205 result.Valid = true 1206 return result, nil 1207} 1208 1209// FetchNextBundle fetches operations and creates a bundle, looping until caught up 1210func (m *Manager) FetchNextBundle(ctx context.Context, verbose bool, quiet bool) (*Bundle, types.BundleProductionStats, error) { 1211 if m.plcClient == nil { 1212 return nil, types.BundleProductionStats{}, fmt.Errorf("PLC client not configured") 1213 } 1214 1215 lastBundle := m.index.GetLastBundle() 1216 nextBundleNum := 1 1217 var afterTime string 1218 var prevBoundaryCIDs map[string]bool 1219 var prevBundleHash string 1220 1221 if lastBundle != nil { 1222 nextBundleNum = lastBundle.BundleNumber + 1 1223 afterTime = lastBundle.EndTime.Format(time.RFC3339Nano) 1224 prevBundleHash = lastBundle.Hash 1225 1226 // ALWAYS get boundaries from last bundle initially 1227 prevBundle, err := m.LoadBundle(ctx, lastBundle.BundleNumber) 1228 if err == nil { 1229 _, prevBoundaryCIDs = m.operations.GetBoundaryCIDs(prevBundle.Operations) 1230 if verbose { 1231 m.logger.Printf("Loaded %d boundary CIDs from bundle %06d (at %s)", 1232 len(prevBoundaryCIDs), lastBundle.BundleNumber, 1233 lastBundle.EndTime.Format(time.RFC3339)[:19]) 1234 } 1235 } 1236 } 1237 1238 // If mempool has operations, update cursor but KEEP boundaries from bundle 1239 // (mempool operations already had boundary dedup applied when they were added) 1240 if m.mempool.Count() > 0 { 1241 mempoolLastTime := m.mempool.GetLastTime() 1242 if mempoolLastTime != "" { 1243 if verbose { 1244 m.logger.Printf("[DEBUG] Mempool has %d ops, resuming from %s", 1245 m.mempool.Count(), mempoolLastTime[:19]) 1246 } 1247 afterTime = mempoolLastTime 1248 1249 // Calculate boundaries from MEMPOOL for next fetch 1250 mempoolOps, _ := m.GetMempoolOperations() 1251 if len(mempoolOps) > 0 { 1252 _, mempoolBoundaries := m.operations.GetBoundaryCIDs(mempoolOps) 1253 prevBoundaryCIDs = mempoolBoundaries 1254 if verbose { 1255 m.logger.Printf("Using %d boundary CIDs from mempool", len(prevBoundaryCIDs)) 1256 } 1257 } 1258 } 1259 } 1260 1261 if verbose { 1262 m.logger.Printf("[DEBUG] Preparing bundle %06d (mempool: %d ops)...", nextBundleNum, m.mempool.Count()) 1263 m.logger.Printf("[DEBUG] Starting cursor: %s", afterTime) 1264 } 1265 1266 totalFetches := 0 1267 maxAttempts := 50 1268 attempt := 0 1269 caughtUp := false 1270 attemptStart := time.Now() 1271 1272 for m.mempool.Count() < types.BUNDLE_SIZE && attempt < maxAttempts { 1273 attempt++ 1274 needed := types.BUNDLE_SIZE - m.mempool.Count() 1275 1276 if !quiet && attempt > 1 { 1277 m.logger.Printf(" Attempt %d: Need %d more ops, cursor: %s", 1278 attempt, needed, afterTime[:19]) 1279 } 1280 1281 newOps, fetchCount, err := m.syncer.FetchToMempool( 1282 ctx, 1283 afterTime, 1284 prevBoundaryCIDs, 1285 needed, 1286 !verbose, 1287 m.mempool, 1288 totalFetches, 1289 ) 1290 1291 totalFetches += fetchCount 1292 1293 // Check if we got an incomplete batch 1294 gotIncompleteBatch := len(newOps) > 0 && len(newOps) < needed && err == nil 1295 1296 // Update cursor from mempool if we got new ops 1297 if len(newOps) > 0 && m.mempool.Count() > 0 { 1298 afterTime = m.mempool.GetLastTime() 1299 } 1300 1301 // Stop if caught up or error 1302 if err != nil || len(newOps) == 0 || gotIncompleteBatch { 1303 caughtUp = true 1304 if verbose && totalFetches > 0 { 1305 m.logger.Printf("DEBUG: Caught up to latest PLC data") 1306 } 1307 break 1308 } 1309 1310 if m.mempool.Count() >= types.BUNDLE_SIZE { 1311 break 1312 } 1313 } 1314 1315 totalDuration := time.Since(attemptStart) 1316 1317 if m.mempool.Count() < types.BUNDLE_SIZE { 1318 if caughtUp { 1319 return nil, types.BundleProductionStats{}, fmt.Errorf("insufficient operations: have %d, need %d (caught up to latest PLC data)", 1320 m.mempool.Count(), types.BUNDLE_SIZE) 1321 } else { 1322 return nil, types.BundleProductionStats{}, fmt.Errorf("insufficient operations: have %d, need %d (max attempts reached)", 1323 m.mempool.Count(), types.BUNDLE_SIZE) 1324 } 1325 } 1326 1327 // Create bundle 1328 operations, err := m.mempool.Take(types.BUNDLE_SIZE) 1329 if err != nil { 1330 return nil, types.BundleProductionStats{}, err 1331 } 1332 1333 syncBundle := internalsync.CreateBundle(nextBundleNum, operations, afterTime, prevBundleHash, m.operations) 1334 1335 bundle := &Bundle{ 1336 BundleNumber: syncBundle.BundleNumber, 1337 StartTime: syncBundle.StartTime, 1338 EndTime: syncBundle.EndTime, 1339 Operations: syncBundle.Operations, 1340 DIDCount: syncBundle.DIDCount, 1341 Cursor: syncBundle.Cursor, 1342 Parent: syncBundle.Parent, 1343 BoundaryCIDs: syncBundle.BoundaryCIDs, 1344 Compressed: syncBundle.Compressed, 1345 CreatedAt: syncBundle.CreatedAt, 1346 } 1347 1348 stats := types.BundleProductionStats{ 1349 TotalFetches: totalFetches, 1350 TotalDuration: totalDuration, 1351 AvgPerFetch: float64(types.BUNDLE_SIZE) / float64(totalFetches), 1352 Throughput: float64(types.BUNDLE_SIZE) / totalDuration.Seconds(), 1353 } 1354 1355 return bundle, stats, nil 1356} 1357 1358// CloneFromRemote clones bundles from a remote endpoint 1359func (m *Manager) CloneFromRemote(ctx context.Context, opts internalsync.CloneOptions) (*internalsync.CloneResult, error) { 1360 // Define index update callback inline 1361 updateIndexCallback := func(bundleNumbers []int, remoteMeta map[int]*bundleindex.BundleMetadata, verbose bool) error { 1362 if len(bundleNumbers) == 0 { 1363 return nil 1364 } 1365 1366 // Create file existence checker 1367 fileExists := func(bundleNum int) bool { 1368 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", bundleNum)) 1369 return m.operations.FileExists(path) 1370 } 1371 1372 // Update index with remote metadata 1373 if err := m.index.UpdateFromRemote(bundleNumbers, remoteMeta, fileExists, verbose, m.logger); err != nil { 1374 return err 1375 } 1376 1377 // Save index 1378 return m.SaveIndex() 1379 } 1380 1381 // Delegate to cloner with inline callback 1382 return m.cloner.Clone(ctx, opts, m.index, updateIndexCallback) 1383} 1384 1385// ResolveDID resolves a DID to its current document with detailed timing metrics 1386func (m *Manager) ResolveDID(ctx context.Context, did string) (*ResolveDIDResult, error) { 1387 if err := plcclient.ValidateDIDFormat(did); err != nil { 1388 atomic.AddInt64(&m.resolverStats.errors, 1) 1389 return nil, err 1390 } 1391 1392 result := &ResolveDIDResult{} 1393 totalStart := time.Now() 1394 1395 // STEP 1: Check mempool first 1396 mempoolStart := time.Now() 1397 var latestMempoolOp *plcclient.PLCOperation 1398 if m.mempool != nil { 1399 latestMempoolOp = m.mempool.FindLatestDIDOperation(did) 1400 } 1401 result.MempoolTime = time.Since(mempoolStart) 1402 1403 // Early return if found in mempool 1404 if latestMempoolOp != nil { 1405 doc, err := plcclient.ResolveDIDDocument(did, []plcclient.PLCOperation{*latestMempoolOp}) 1406 if err != nil { 1407 atomic.AddInt64(&m.resolverStats.errors, 1) 1408 return nil, fmt.Errorf("resolution failed: %w", err) 1409 } 1410 1411 result.Document = doc 1412 result.LatestOperation = latestMempoolOp 1413 result.Source = "mempool" 1414 result.TotalTime = time.Since(totalStart) 1415 1416 m.recordResolverTiming(result, nil) 1417 return result, nil 1418 } 1419 1420 // STEP 2: Index lookup 1421 if m.didIndex == nil || !m.didIndex.Exists() { 1422 atomic.AddInt64(&m.resolverStats.errors, 1) 1423 return nil, fmt.Errorf("DID index not available - run 'plcbundle index build' to enable DID resolution") 1424 } 1425 1426 indexStart := time.Now() 1427 locations, err := m.didIndex.GetDIDLocations(did) 1428 result.IndexTime = time.Since(indexStart) 1429 1430 if err != nil { 1431 atomic.AddInt64(&m.resolverStats.errors, 1) 1432 return nil, err 1433 } 1434 1435 if len(locations) == 0 { 1436 atomic.AddInt64(&m.resolverStats.errors, 1) 1437 return nil, fmt.Errorf("DID not found") 1438 } 1439 1440 // Find latest non-nullified location 1441 var latestLoc *didindex.OpLocation 1442 for i := range locations { 1443 if locations[i].Nullified() { 1444 continue 1445 } 1446 if latestLoc == nil || locations[i].IsAfter(*latestLoc) { 1447 latestLoc = &locations[i] 1448 } 1449 } 1450 1451 if latestLoc == nil { 1452 atomic.AddInt64(&m.resolverStats.errors, 1) 1453 return nil, fmt.Errorf("no valid operations (all nullified)") 1454 } 1455 1456 // STEP 3: Load operation 1457 opStart := time.Now() 1458 op, err := m.LoadOperation(ctx, latestLoc.BundleInt(), latestLoc.PositionInt()) 1459 result.LoadOpTime = time.Since(opStart) 1460 1461 if err != nil { 1462 atomic.AddInt64(&m.resolverStats.errors, 1) 1463 return nil, fmt.Errorf("failed to load operation: %w", err) 1464 } 1465 1466 result.BundleNumber = latestLoc.BundleInt() 1467 result.Position = latestLoc.PositionInt() 1468 1469 // STEP 4: Resolve document 1470 doc, err := plcclient.ResolveDIDDocument(did, []plcclient.PLCOperation{*op}) 1471 if err != nil { 1472 atomic.AddInt64(&m.resolverStats.errors, 1) 1473 return nil, fmt.Errorf("resolution failed: %w", err) 1474 } 1475 1476 result.Document = doc 1477 result.LatestOperation = op 1478 result.Source = "bundle" 1479 result.TotalTime = time.Since(totalStart) 1480 1481 m.recordResolverTiming(result, nil) 1482 return result, nil 1483} 1484 1485// GetLastBundleNumber returns the last bundle number (0 if no bundles) 1486func (m *Manager) GetLastBundleNumber() int { 1487 lastBundle := m.index.GetLastBundle() 1488 if lastBundle == nil { 1489 return 0 1490 } 1491 return lastBundle.BundleNumber 1492} 1493 1494// GetMempoolCount returns the number of operations in mempool 1495func (m *Manager) GetMempoolCount() int { 1496 return m.mempool.Count() 1497} 1498 1499func (m *Manager) FetchAndSaveNextBundle(ctx context.Context, verbose bool, quiet bool, skipDIDIndex bool) (int, *types.BundleProductionStats, error) { 1500 bundle, stats, err := m.FetchNextBundle(ctx, verbose, quiet) 1501 if err != nil { 1502 return 0, nil, err 1503 } 1504 1505 indexTime, err := m.SaveBundle(ctx, bundle, verbose, quiet, stats, skipDIDIndex) 1506 if err != nil { 1507 return 0, nil, err 1508 } 1509 stats.IndexTime = indexTime 1510 1511 return bundle.BundleNumber, &types.BundleProductionStats{}, nil 1512} 1513 1514// RunSyncLoop runs continuous sync loop (delegates to internal/sync) 1515func (m *Manager) RunSyncLoop(ctx context.Context, config *internalsync.SyncLoopConfig) error { 1516 // Manager itself implements the SyncManager interface 1517 return internalsync.RunSyncLoop(ctx, m, config) 1518} 1519 1520// RunSyncOnce performs a single sync cycle 1521func (m *Manager) RunSyncOnce(ctx context.Context, config *internalsync.SyncLoopConfig) (int, error) { 1522 // Manager itself implements the SyncManager interface 1523 return internalsync.SyncOnce(ctx, m, config) 1524} 1525 1526// EnsureDIDIndex ensures DID index is built and up-to-date 1527// Returns true if index was built/rebuilt, false if already up-to-date 1528func (m *Manager) EnsureDIDIndex(ctx context.Context, progressCallback func(current, total int)) (bool, error) { 1529 // Build index 1530 m.UpdateDIDIndexSmart(ctx, progressCallback) 1531 return true, nil 1532} 1533 1534// Add this helper function at the top of manager.go 1535func repositoryExists(bundleDir string) bool { 1536 indexPath := filepath.Join(bundleDir, bundleindex.INDEX_FILE) 1537 1538 // Check for index file 1539 if _, err := os.Stat(indexPath); err == nil { 1540 return true 1541 } 1542 1543 // Check for bundle files 1544 bundleFiles, _ := filepath.Glob(filepath.Join(bundleDir, "*.jsonl.zst")) 1545 bundleFiles = filterBundleFiles(bundleFiles) 1546 1547 return len(bundleFiles) > 0 1548} 1549 1550// ResolveHandleOrDID resolves input that can be either a handle or DID 1551// Returns: (did, handleResolveTime, error) 1552func (m *Manager) ResolveHandleOrDID(ctx context.Context, input string) (string, time.Duration, error) { 1553 input = strings.TrimSpace(input) 1554 1555 // Normalize handle format (remove at://, @ prefixes) 1556 if !strings.HasPrefix(input, "did:") { 1557 input = strings.TrimPrefix(input, "at://") 1558 input = strings.TrimPrefix(input, "@") 1559 } 1560 1561 // If already a DID, validate and return 1562 if strings.HasPrefix(input, "did:plc:") { 1563 if err := plcclient.ValidateDIDFormat(input); err != nil { 1564 return "", 0, err 1565 } 1566 return input, 0, nil // No resolution needed 1567 } 1568 1569 // Support did:web too 1570 if strings.HasPrefix(input, "did:web:") { 1571 return input, 0, nil 1572 } 1573 1574 // It's a handle - need resolver 1575 if m.handleResolver == nil { 1576 return "", 0, fmt.Errorf( 1577 "input '%s' appears to be a handle, but handle resolver is not configured\n\n"+ 1578 "Configure resolver with:\n"+ 1579 " plcbundle --handle-resolver https://quickdid.smokesignal.tools did resolve %s\n\n"+ 1580 "Or set default in config", 1581 input, input) 1582 } 1583 1584 resolveStart := time.Now() 1585 if !m.config.Quiet { 1586 m.logger.Printf("Resolving handle: %s", input) 1587 } 1588 did, err := m.handleResolver.ResolveHandle(ctx, input) 1589 resolveTime := time.Since(resolveStart) 1590 1591 if err != nil { 1592 return "", resolveTime, fmt.Errorf("failed to resolve handle '%s': %w", input, err) 1593 } 1594 1595 if !m.config.Quiet { 1596 m.logger.Printf("Resolved: %s → %s (in %s)", input, did, resolveTime) 1597 } 1598 return did, resolveTime, nil 1599} 1600 1601// GetHandleResolver returns the handle resolver (can be nil) 1602func (m *Manager) GetHandleResolver() *handleresolver.Client { 1603 return m.handleResolver 1604} 1605 1606// recordResolverTiming records resolver performance metrics 1607func (m *Manager) recordResolverTiming(result *ResolveDIDResult, _ error) { 1608 m.resolverStats.Lock() 1609 defer m.resolverStats.Unlock() 1610 1611 // Increment counters 1612 atomic.AddInt64(&m.resolverStats.totalResolutions, 1) 1613 1614 switch result.Source { 1615 case "mempool": 1616 atomic.AddInt64(&m.resolverStats.mempoolHits, 1) 1617 case "bundle": 1618 atomic.AddInt64(&m.resolverStats.bundleHits, 1) 1619 } 1620 1621 // Record timings 1622 timing := resolverTiming{ 1623 totalTime: result.TotalTime.Microseconds(), 1624 mempoolTime: result.MempoolTime.Microseconds(), 1625 indexTime: result.IndexTime.Microseconds(), 1626 loadOpTime: result.LoadOpTime.Microseconds(), 1627 source: result.Source, 1628 } 1629 1630 atomic.AddInt64(&m.resolverStats.totalTime, timing.totalTime) 1631 atomic.AddInt64(&m.resolverStats.totalMempoolTime, timing.mempoolTime) 1632 atomic.AddInt64(&m.resolverStats.totalIndexTime, timing.indexTime) 1633 atomic.AddInt64(&m.resolverStats.totalLoadOpTime, timing.loadOpTime) 1634 1635 // Add to circular buffer 1636 m.resolverStats.recentTimes[m.resolverStats.recentIdx] = timing 1637 m.resolverStats.recentIdx = (m.resolverStats.recentIdx + 1) % m.resolverStats.recentSize 1638} 1639 1640// GetResolverStats returns resolver performance statistics 1641func (m *Manager) GetResolverStats() map[string]interface{} { 1642 totalResolutions := atomic.LoadInt64(&m.resolverStats.totalResolutions) 1643 1644 if totalResolutions == 0 { 1645 return map[string]interface{}{ 1646 "total_resolutions": 0, 1647 } 1648 } 1649 1650 mempoolHits := atomic.LoadInt64(&m.resolverStats.mempoolHits) 1651 bundleHits := atomic.LoadInt64(&m.resolverStats.bundleHits) 1652 errors := atomic.LoadInt64(&m.resolverStats.errors) 1653 1654 totalTime := atomic.LoadInt64(&m.resolverStats.totalTime) 1655 totalMempoolTime := atomic.LoadInt64(&m.resolverStats.totalMempoolTime) 1656 totalIndexTime := atomic.LoadInt64(&m.resolverStats.totalIndexTime) 1657 totalLoadOpTime := atomic.LoadInt64(&m.resolverStats.totalLoadOpTime) 1658 1659 // Calculate overall averages 1660 avgTotalMs := float64(totalTime) / float64(totalResolutions) / 1000.0 1661 avgMempoolMs := float64(totalMempoolTime) / float64(totalResolutions) / 1000.0 1662 1663 stats := map[string]interface{}{ 1664 "total_resolutions": totalResolutions, 1665 "mempool_hits": mempoolHits, 1666 "bundle_hits": bundleHits, 1667 "errors": errors, 1668 "success_rate": float64(totalResolutions-errors) / float64(totalResolutions), 1669 "mempool_hit_rate": float64(mempoolHits) / float64(totalResolutions), 1670 1671 // Overall averages 1672 "avg_total_time_ms": avgTotalMs, 1673 "avg_mempool_time_ms": avgMempoolMs, 1674 } 1675 1676 // Only include bundle-specific stats if we have bundle hits 1677 if bundleHits > 0 { 1678 avgIndexMs := float64(totalIndexTime) / float64(bundleHits) / 1000.0 1679 avgLoadMs := float64(totalLoadOpTime) / float64(bundleHits) / 1000.0 1680 1681 stats["avg_index_time_ms"] = avgIndexMs 1682 stats["avg_load_op_time_ms"] = avgLoadMs 1683 } 1684 1685 // Recent statistics 1686 m.resolverStats.Lock() 1687 recentCopy := make([]resolverTiming, m.resolverStats.recentSize) 1688 copy(recentCopy, m.resolverStats.recentTimes) 1689 m.resolverStats.Unlock() 1690 1691 // Filter valid entries 1692 validRecent := make([]resolverTiming, 0) 1693 for _, t := range recentCopy { 1694 if t.totalTime > 0 { 1695 validRecent = append(validRecent, t) 1696 } 1697 } 1698 1699 if len(validRecent) > 0 { 1700 // Extract total times for percentiles 1701 totalTimes := make([]int64, len(validRecent)) 1702 for i, t := range validRecent { 1703 totalTimes[i] = t.totalTime 1704 } 1705 sort.Slice(totalTimes, func(i, j int) bool { 1706 return totalTimes[i] < totalTimes[j] 1707 }) 1708 1709 // Calculate recent average 1710 var recentSum int64 1711 var recentMempoolSum int64 1712 var recentIndexSum int64 1713 var recentLoadSum int64 1714 recentBundleCount := 0 1715 1716 for _, t := range validRecent { 1717 recentSum += t.totalTime 1718 recentMempoolSum += t.mempoolTime 1719 if t.source == "bundle" { 1720 recentIndexSum += t.indexTime 1721 recentLoadSum += t.loadOpTime 1722 recentBundleCount++ 1723 } 1724 } 1725 1726 stats["recent_avg_total_time_ms"] = float64(recentSum) / float64(len(validRecent)) / 1000.0 1727 stats["recent_avg_mempool_time_ms"] = float64(recentMempoolSum) / float64(len(validRecent)) / 1000.0 1728 1729 if recentBundleCount > 0 { 1730 stats["recent_avg_index_time_ms"] = float64(recentIndexSum) / float64(recentBundleCount) / 1000.0 1731 stats["recent_avg_load_time_ms"] = float64(recentLoadSum) / float64(recentBundleCount) / 1000.0 1732 } 1733 1734 stats["recent_sample_size"] = len(validRecent) 1735 1736 // Percentiles 1737 p50idx := len(totalTimes) * 50 / 100 1738 p95idx := len(totalTimes) * 95 / 100 1739 p99idx := len(totalTimes) * 99 / 100 1740 1741 stats["min_total_time_ms"] = float64(totalTimes[0]) / 1000.0 1742 stats["max_total_time_ms"] = float64(totalTimes[len(totalTimes)-1]) / 1000.0 1743 1744 if p50idx < len(totalTimes) { 1745 stats["p50_total_time_ms"] = float64(totalTimes[p50idx]) / 1000.0 1746 } 1747 if p95idx < len(totalTimes) { 1748 stats["p95_total_time_ms"] = float64(totalTimes[p95idx]) / 1000.0 1749 } 1750 if p99idx < len(totalTimes) { 1751 stats["p99_total_time_ms"] = float64(totalTimes[p99idx]) / 1000.0 1752 } 1753 } 1754 1755 return stats 1756} 1757 1758// ResetResolverStats resets resolver performance statistics 1759func (m *Manager) ResetResolverStats() { 1760 m.resolverStats.Lock() 1761 defer m.resolverStats.Unlock() 1762 1763 atomic.StoreInt64(&m.resolverStats.totalResolutions, 0) 1764 atomic.StoreInt64(&m.resolverStats.mempoolHits, 0) 1765 atomic.StoreInt64(&m.resolverStats.bundleHits, 0) 1766 atomic.StoreInt64(&m.resolverStats.errors, 0) 1767 atomic.StoreInt64(&m.resolverStats.totalTime, 0) 1768 atomic.StoreInt64(&m.resolverStats.totalMempoolTime, 0) 1769 atomic.StoreInt64(&m.resolverStats.totalIndexTime, 0) 1770 atomic.StoreInt64(&m.resolverStats.totalLoadOpTime, 0) 1771 1772 m.resolverStats.recentTimes = make([]resolverTiming, m.resolverStats.recentSize) 1773 m.resolverStats.recentIdx = 0 1774} 1775 1776func (m *Manager) SetQuiet(quiet bool) { 1777 m.config.Quiet = quiet 1778} 1779 1780// ShouldRebuildDIDIndex checks if DID index needs rebuilding 1781// Returns: (needsRebuild bool, reason string, canUpdateIncrementally bool) 1782func (m *Manager) ShouldRebuildDIDIndex() (bool, string, bool) { 1783 if m.didIndex == nil { 1784 return false, "DID index disabled", false 1785 } 1786 1787 needsRebuild, reason := m.didIndex.NeedsRebuild(m.GetBundleIndex()) 1788 1789 if needsRebuild { 1790 return true, reason, false 1791 } 1792 1793 // Check if incremental update is better 1794 canIncremental, behindBy := m.didIndex.ShouldUpdateIncrementally(m.GetBundleIndex()) 1795 if canIncremental { 1796 return false, fmt.Sprintf("can update incrementally (%d bundles)", behindBy), true 1797 } 1798 1799 return false, "index is up to date", false 1800} 1801 1802// UpdateDIDIndexSmart updates DID index intelligently (rebuild vs incremental) 1803func (m *Manager) UpdateDIDIndexSmart(ctx context.Context, progressCallback func(current, total int)) error { 1804 needsRebuild, reason, canIncremental := m.ShouldRebuildDIDIndex() 1805 1806 if !needsRebuild && !canIncremental { 1807 if m.config.Verbose { 1808 m.logger.Printf("DID index is up to date") 1809 } 1810 return nil 1811 } 1812 1813 if needsRebuild { 1814 m.logger.Printf("Rebuilding DID index: %s", reason) 1815 return m.BuildDIDIndex(ctx, progressCallback) 1816 } 1817 1818 if canIncremental { 1819 m.logger.Printf("Updating DID index incrementally: %s", reason) 1820 return m.updateDIDIndexIncremental(ctx, progressCallback) 1821 } 1822 1823 return nil 1824} 1825 1826// updateDIDIndexIncremental updates index for missing bundles only 1827func (m *Manager) updateDIDIndexIncremental(ctx context.Context, progressCallback func(current, total int)) error { 1828 config := m.didIndex.GetConfig() 1829 lastBundle := m.index.GetLastBundle() 1830 1831 if lastBundle == nil || config.LastBundle >= lastBundle.BundleNumber { 1832 return nil 1833 } 1834 1835 start := config.LastBundle + 1 1836 end := lastBundle.BundleNumber 1837 total := end - start + 1 1838 1839 m.logger.Printf("Updating DID index for bundles %d-%d (%d bundles)", start, end, total) 1840 1841 for bundleNum := start; bundleNum <= end; bundleNum++ { 1842 bundle, err := m.LoadBundle(ctx, bundleNum) 1843 if err != nil { 1844 return fmt.Errorf("failed to load bundle %d: %w", bundleNum, err) 1845 } 1846 1847 bundleData := &didindex.BundleData{ 1848 BundleNumber: bundle.BundleNumber, 1849 Operations: bundle.Operations, 1850 } 1851 1852 if err := m.didIndex.UpdateIndexForBundle(ctx, bundleData); err != nil { 1853 return fmt.Errorf("failed to update bundle %d: %w", bundleNum, err) 1854 } 1855 1856 if progressCallback != nil { 1857 progressCallback(bundleNum-start+1, total) 1858 } 1859 } 1860 1861 return nil 1862}