[DEPRECATED] Go implementation of plcbundle
at rust-test 1839 lines 54 kB view raw
1package bundle 2 3import ( 4 "context" 5 "fmt" 6 "io" 7 "log" 8 "os" 9 "path/filepath" 10 "runtime" 11 "runtime/debug" 12 "sort" 13 "strings" 14 "sync" 15 "sync/atomic" 16 "time" 17 18 plcbundle "tangled.org/atscan.net/plcbundle-rs/bindings/go" 19 "tangled.org/atscan.net/plcbundle/internal/bundleindex" 20 "tangled.org/atscan.net/plcbundle/internal/didindex" 21 "tangled.org/atscan.net/plcbundle/internal/handleresolver" 22 "tangled.org/atscan.net/plcbundle/internal/mempool" 23 "tangled.org/atscan.net/plcbundle/internal/plcclient" 24 "tangled.org/atscan.net/plcbundle/internal/storage" 25 internalsync "tangled.org/atscan.net/plcbundle/internal/sync" 26 "tangled.org/atscan.net/plcbundle/internal/types" 27) 28 29// defaultLogger is a simple logger implementation 30type defaultLogger struct{} 31 32func (d defaultLogger) Printf(format string, v ...interface{}) { 33 log.Printf(format, v...) 34} 35 36func (d defaultLogger) Println(v ...interface{}) { 37 log.Println(v...) 38} 39 40// Manager handles bundle operations 41type Manager struct { 42 config *Config 43 operations *storage.Operations 44 index *bundleindex.Index 45 indexPath string 46 logger types.Logger 47 mempool *mempool.Mempool 48 didIndex *didindex.Manager 49 50 plcClient *plcclient.Client 51 handleResolver *handleresolver.Client 52 53 syncer *internalsync.Fetcher 54 cloner *internalsync.Cloner 55 56 bundleCache map[int]*Bundle 57 cacheMu sync.RWMutex 58 maxCacheSize int 59 60 // Resolver performance tracking 61 resolverStats struct { 62 sync.Mutex 63 totalResolutions int64 64 mempoolHits int64 65 bundleHits int64 66 errors int64 67 68 // Timing (in microseconds) 69 totalTime int64 70 totalMempoolTime int64 71 totalIndexTime int64 72 totalLoadOpTime int64 73 74 // Recent timings (circular buffer) 75 recentTimes []resolverTiming 76 recentIdx int 77 recentSize int 78 } 79 80 // Rust-based bundle manager for high-performance operations 81 rsManager *plcbundle.BundleManager 82 rsManagerOnce sync.Once 83 rsManagerErr error 84} 85 86// NewManager creates a new bundle manager 87func NewManager(config *Config, plcClient *plcclient.Client) (*Manager, error) { 88 if config == nil { 89 config = DefaultConfig("./plc_bundles") 90 } 91 92 if config.Logger == nil { 93 config.Logger = defaultLogger{} 94 } 95 96 // CHECK: Don't auto-create if repository doesn't exist 97 repoExists := repositoryExists(config.BundleDir) 98 99 if !repoExists && !config.AutoInit { 100 return nil, fmt.Errorf( 101 "no plcbundle repository found in: %s\n\n"+ 102 "Initialize a new repository with:\n"+ 103 " plcbundle clone <url> # Clone from remote\n"+ 104 " plcbundle sync # Fetch from PLC directory", 105 config.BundleDir, 106 ) 107 } 108 109 // Ensure directory exists (only if repo exists OR AutoInit is enabled) 110 if err := os.MkdirAll(config.BundleDir, 0755); err != nil { 111 return nil, fmt.Errorf("failed to create bundle directory: %w", err) 112 } 113 114 // Initialize operations handler 115 ops, err := storage.NewOperations(config.Logger, config.Verbose) 116 if err != nil { 117 return nil, fmt.Errorf("failed to initialize operations: %w", err) 118 } 119 120 // Determine origin 121 var origin string 122 if plcClient != nil { 123 origin = plcClient.GetBaseURL() 124 } 125 126 // Load or create index 127 indexPath := filepath.Join(config.BundleDir, bundleindex.INDEX_FILE) 128 index, err := bundleindex.LoadIndex(indexPath) 129 130 // Check for bundle files in directory 131 bundleFiles, _ := filepath.Glob(filepath.Join(config.BundleDir, "*.jsonl.zst")) 132 bundleFiles = filterBundleFiles(bundleFiles) 133 hasBundleFiles := len(bundleFiles) > 0 134 135 // Check if clone/download is in progress (look for .tmp files) 136 tmpFiles, _ := filepath.Glob(filepath.Join(config.BundleDir, "*.tmp")) 137 cloneInProgress := len(tmpFiles) > 0 138 139 needsRebuild := false 140 141 if err != nil { 142 // Index doesn't exist or is invalid 143 if hasBundleFiles { 144 if cloneInProgress { 145 config.Logger.Printf("Clone/download in progress, skipping auto-rebuild") 146 } else { 147 config.Logger.Printf("No valid index found, but detected %d bundle files", len(bundleFiles)) 148 needsRebuild = true 149 } 150 } else { 151 // No index and no bundles - create fresh index 152 config.Logger.Printf("Creating new index at %s", indexPath) 153 index = bundleindex.NewIndex(origin) 154 if err := index.Save(indexPath); err != nil { 155 return nil, fmt.Errorf("failed to save new index: %w", err) 156 } 157 } 158 } else { 159 // Index exists - auto-populate origin if missing 160 if index.Origin == "" { 161 if origin != "" { 162 config.Logger.Printf("⚠️ Upgrading old index: setting origin to %s", origin) 163 index.Origin = origin 164 if err := index.Save(indexPath); err != nil { 165 return nil, fmt.Errorf("failed to update index with origin: %w", err) 166 } 167 } else { 168 config.Logger.Printf("⚠️ Warning: index has no origin and no PLC client configured") 169 } 170 } 171 172 // Validate origin matches if both are set 173 if index.Origin != "" && origin != "" && index.Origin != origin { 174 return nil, fmt.Errorf( 175 "origin mismatch: index has origin %q but PLC client points to %q\n"+ 176 "Cannot mix bundles from different sources. Use a different directory or reconfigure PLC client", 177 index.Origin, origin, 178 ) 179 } 180 181 if config.Verbose { 182 config.Logger.Printf("Loaded index with %d bundles (origin: %s)", index.Count(), index.Origin) 183 } 184 185 // Check if there are bundle files not in the index 186 if hasBundleFiles && len(bundleFiles) > index.Count() { 187 if cloneInProgress { 188 config.Logger.Printf("Clone/download in progress (%d .tmp files), skipping auto-rebuild", len(tmpFiles)) 189 } else { 190 config.Logger.Printf("Detected %d bundle files but index only has %d entries - rebuilding", 191 len(bundleFiles), index.Count()) 192 needsRebuild = true 193 } 194 } 195 } 196 197 if index != nil && plcClient != nil { 198 currentOrigin := plcClient.GetBaseURL() 199 200 // Check if origins match 201 if index.Origin != "" && index.Origin != currentOrigin { 202 return nil, fmt.Errorf( 203 "origin mismatch: index has origin %q but PLC client points to %q. "+ 204 "Cannot mix bundles from different sources", 205 index.Origin, currentOrigin, 206 ) 207 } 208 209 // Set origin if not set (for backward compatibility with old indexes) 210 if index.Origin == "" && currentOrigin != "" { 211 index.Origin = currentOrigin 212 config.Logger.Printf("Setting origin for existing index: %s", currentOrigin) 213 if err := index.Save(indexPath); err != nil { 214 return nil, fmt.Errorf("failed to update index with origin: %w", err) 215 } 216 } 217 } 218 219 // Perform rebuild if needed (using parallel scan) 220 if needsRebuild && config.AutoRebuild { 221 config.Logger.Printf("Rebuilding index from %d bundle files...", len(bundleFiles)) 222 223 // Create temporary manager for scanning 224 tempMgr := &Manager{ 225 config: config, 226 operations: ops, 227 index: bundleindex.NewIndex("test-origin"), 228 indexPath: indexPath, 229 logger: config.Logger, 230 } 231 232 // Use parallel scan with auto-detected CPU count 233 workers := config.RebuildWorkers 234 if workers <= 0 { 235 workers = runtime.NumCPU() 236 if workers < 1 { 237 workers = 1 238 } 239 } 240 241 config.Logger.Printf("Using %d workers for parallel scan", workers) 242 243 // Create progress callback wrapper with new signature 244 var progressCallback func(current, total int, bytesProcessed int64) 245 if config.RebuildProgress != nil { 246 // Wrap the old-style callback to work with new signature 247 oldCallback := config.RebuildProgress 248 progressCallback = func(current, total int, bytesProcessed int64) { 249 oldCallback(current, total) 250 } 251 } else { 252 // Default: log every 100 bundles 253 progressCallback = func(current, total int, bytesProcessed int64) { 254 if current%100 == 0 || current == total { 255 mbProcessed := float64(bytesProcessed) / (1024 * 1024) 256 config.Logger.Printf("Rebuild progress: %d/%d bundles (%.1f%%), %.1f MB processed", 257 current, total, float64(current)/float64(total)*100, mbProcessed) 258 } 259 } 260 } 261 262 start := time.Now() 263 264 // Scan directory to rebuild index (parallel) 265 result, err := tempMgr.ScanDirectoryParallel(workers, progressCallback) 266 if err != nil { 267 return nil, fmt.Errorf("failed to rebuild index: %w", err) 268 } 269 270 elapsed := time.Since(start) 271 272 // Reload the rebuilt index 273 index, err = bundleindex.LoadIndex(indexPath) 274 if err != nil { 275 return nil, fmt.Errorf("failed to load rebuilt index: %w", err) 276 } 277 278 // Calculate throughput 279 mbPerSec := float64(result.TotalUncompressed) / elapsed.Seconds() / (1024 * 1024) 280 281 config.Logger.Printf("✓ Index rebuilt with %d bundles in %s", 282 index.Count(), elapsed.Round(time.Millisecond)) 283 config.Logger.Printf(" Speed: %.1f bundles/sec, %.1f MB/s (uncompressed)", 284 float64(result.BundleCount)/elapsed.Seconds(), mbPerSec) 285 286 // Verify all chain hashes are present 287 bundles := index.GetBundles() 288 missingHashes := 0 289 for i, meta := range bundles { 290 if meta.ContentHash == "" { 291 missingHashes++ 292 } 293 if i > 0 && meta.Hash == "" { 294 missingHashes++ 295 } 296 } 297 if missingHashes > 0 { 298 config.Logger.Printf("⚠️ Warning: %d bundles have missing hashes", missingHashes) 299 } 300 } 301 302 if index == nil { 303 index = bundleindex.NewIndex("test-origin") 304 } 305 306 // Initialize mempool for next bundle 307 lastBundle := index.GetLastBundle() 308 nextBundleNum := 1 309 var minTimestamp time.Time 310 311 if lastBundle != nil { 312 nextBundleNum = lastBundle.BundleNumber + 1 313 minTimestamp = lastBundle.EndTime 314 } 315 316 mempool, err := mempool.NewMempool(config.BundleDir, nextBundleNum, minTimestamp, config.Logger, config.Verbose) 317 if err != nil { 318 return nil, fmt.Errorf("failed to initialize mempool: %w", err) 319 } 320 321 // Initialize DID index manager 322 didIndex := didindex.NewManager(config.BundleDir, config.Logger) 323 324 // Initialize sync components 325 fetcher := internalsync.NewFetcher(plcClient, ops, config.Logger) 326 cloner := internalsync.NewCloner(ops, config.BundleDir, config.Logger) 327 328 // Initialize handle resolver if configured 329 var handleResolver *handleresolver.Client 330 if config.HandleResolverURL != "" { 331 handleResolver = handleresolver.NewClient(config.HandleResolverURL) 332 } 333 334 m := &Manager{ 335 config: config, 336 operations: ops, 337 index: index, 338 indexPath: indexPath, 339 logger: config.Logger, 340 mempool: mempool, 341 didIndex: didIndex, // Updated type 342 bundleCache: make(map[int]*Bundle), 343 maxCacheSize: 10, 344 syncer: fetcher, 345 cloner: cloner, 346 plcClient: plcClient, 347 handleResolver: handleResolver, 348 } 349 // Initialize resolver stats 350 m.resolverStats.recentSize = 1000 351 m.resolverStats.recentTimes = make([]resolverTiming, 1000) 352 353 return m, nil 354} 355 356// getRSManager lazily initializes the Rust bundle manager 357func (m *Manager) getRSManager() (*plcbundle.BundleManager, error) { 358 m.rsManagerOnce.Do(func() { 359 rsMgr, err := plcbundle.NewBundleManager(m.config.BundleDir) 360 if err != nil { 361 m.rsManagerErr = fmt.Errorf("failed to create Rust bundle manager: %w", err) 362 return 363 } 364 m.rsManager = rsMgr 365 }) 366 return m.rsManager, m.rsManagerErr 367} 368 369// Close cleans up resources 370func (m *Manager) Close() { 371 if m.rsManager != nil { 372 m.rsManager.Close() 373 } 374 if m.operations != nil { 375 m.operations.Close() 376 } 377 if m.plcClient != nil { 378 m.plcClient.Close() 379 } 380 if m.mempool != nil { 381 if err := m.mempool.Save(); err != nil { 382 m.logger.Printf("Warning: failed to save mempool: %v", err) 383 } 384 } 385 if m.didIndex != nil { 386 m.didIndex.Close() 387 } 388} 389 390// GetIndex returns the current index 391func (m *Manager) GetIndex() *bundleindex.Index { 392 return m.index 393} 394 395// SaveIndex saves the index to disk 396func (m *Manager) SaveIndex() error { 397 return m.index.Save(m.indexPath) 398} 399 400// LoadBundle with caching 401func (m *Manager) LoadBundle(ctx context.Context, bundleNumber int) (*Bundle, error) { 402 // Check cache first 403 m.cacheMu.RLock() 404 if cached, ok := m.bundleCache[bundleNumber]; ok { 405 m.cacheMu.RUnlock() 406 return cached, nil 407 } 408 m.cacheMu.RUnlock() 409 410 // Load from disk (existing code) 411 bundle, err := m.loadBundleFromDisk(ctx, bundleNumber) 412 if err != nil { 413 return nil, err 414 } 415 416 // Add to cache 417 m.cacheMu.Lock() 418 m.bundleCache[bundleNumber] = bundle 419 420 // Simple LRU: if cache too big, remove oldest 421 if len(m.bundleCache) > m.maxCacheSize { 422 // Remove a random one (or implement proper LRU) 423 for k := range m.bundleCache { 424 delete(m.bundleCache, k) 425 break 426 } 427 } 428 m.cacheMu.Unlock() 429 430 return bundle, nil 431} 432 433// loadBundleFromDisk loads a bundle from disk 434func (m *Manager) loadBundleFromDisk(_ context.Context, bundleNumber int) (*Bundle, error) { 435 // Get metadata from index 436 meta, err := m.index.GetBundle(bundleNumber) 437 if err != nil { 438 return nil, fmt.Errorf("bundle not in index: %w", err) 439 } 440 441 // Load file 442 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", bundleNumber)) 443 if !m.operations.FileExists(path) { 444 return nil, fmt.Errorf("bundle file not found: %s", path) 445 } 446 447 // Verify hash if enabled 448 if m.config.VerifyOnLoad { 449 valid, actualHash, err := m.operations.VerifyHash(path, meta.CompressedHash) 450 if err != nil { 451 return nil, fmt.Errorf("failed to verify hash: %w", err) 452 } 453 if !valid { 454 return nil, fmt.Errorf("hash mismatch: expected %s, got %s", meta.CompressedHash, actualHash) 455 } 456 } 457 458 // Load operations 459 operations, err := m.operations.LoadBundle(path) 460 if err != nil { 461 return nil, fmt.Errorf("failed to load bundle: %w", err) 462 } 463 464 // Create bundle struct 465 bundle := &Bundle{ 466 BundleNumber: meta.BundleNumber, 467 StartTime: meta.StartTime, 468 EndTime: meta.EndTime, 469 Operations: operations, 470 DIDCount: meta.DIDCount, 471 Hash: meta.Hash, 472 ContentHash: meta.ContentHash, 473 Parent: meta.Parent, 474 CompressedHash: meta.CompressedHash, 475 CompressedSize: meta.CompressedSize, 476 UncompressedSize: meta.UncompressedSize, 477 Cursor: meta.Cursor, 478 Compressed: true, 479 CreatedAt: meta.CreatedAt, 480 } 481 482 return bundle, nil 483} 484 485// SaveBundle saves a bundle to disk and updates the index 486func (m *Manager) SaveBundle(ctx context.Context, bundle *Bundle, verbose bool, quiet bool, stats types.BundleProductionStats, skipDIDIndex bool) (time.Duration, error) { 487 488 totalStart := time.Now() 489 if err := bundle.ValidateForSave(); err != nil { 490 return 0, fmt.Errorf("bundle validation failed: %w", err) 491 } 492 493 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", bundle.BundleNumber)) 494 495 // Get parent 496 var parent string 497 if bundle.BundleNumber > 1 { 498 prevBundle := m.index.GetLastBundle() 499 if prevBundle != nil { 500 parent = prevBundle.Hash 501 } else { 502 if prevMeta, err := m.index.GetBundle(bundle.BundleNumber - 1); err == nil { 503 parent = prevMeta.Hash 504 } 505 } 506 } 507 bundle.Parent = parent 508 509 // Get origin 510 origin := m.index.Origin 511 if m.plcClient != nil { 512 origin = m.plcClient.GetBaseURL() 513 } 514 515 // Get version 516 version := "dev" 517 if info, ok := debug.ReadBuildInfo(); ok && info.Main.Version != "" && info.Main.Version != "(devel)" { 518 version = info.Main.Version 519 } 520 521 // Get hostname 522 hostname, _ := os.Hostname() 523 524 // Create BundleInfo 525 bundleInfo := &storage.BundleInfo{ 526 BundleNumber: bundle.BundleNumber, 527 Origin: origin, 528 ParentHash: parent, 529 Cursor: bundle.Cursor, 530 CreatedBy: fmt.Sprintf("plcbundle/%s", version), 531 Hostname: hostname, 532 } 533 534 if m.config.Verbose { 535 m.logger.Printf("DEBUG: Calling operations.SaveBundle with bundle=%d", bundleInfo.BundleNumber) 536 } 537 538 // Save to disk with 3 parameters 539 uncompressedHash, compressedHash, uncompressedSize, compressedSize, err := m.operations.SaveBundle(path, bundle.Operations, bundleInfo) 540 if err != nil { 541 m.logger.Printf("DEBUG: SaveBundle FAILED: %v", err) 542 return 0, fmt.Errorf("failed to save bundle: %w", err) 543 } 544 545 if m.config.Verbose { 546 m.logger.Printf("DEBUG: SaveBundle SUCCESS, setting bundle fields") 547 } 548 549 bundle.Hash = m.operations.CalculateChainHash(parent, bundle.ContentHash) 550 bundle.ContentHash = uncompressedHash 551 bundle.CompressedHash = compressedHash 552 bundle.UncompressedSize = uncompressedSize 553 bundle.CompressedSize = compressedSize 554 bundle.CreatedAt = time.Now().UTC() 555 bundle.Hash = m.operations.CalculateChainHash(parent, bundle.ContentHash) 556 557 if m.config.Verbose { 558 m.logger.Printf("DEBUG: Adding bundle %d to index", bundle.BundleNumber) 559 } 560 561 // Add to index 562 m.index.AddBundle(bundle.ToMetadata()) 563 564 if m.config.Verbose { 565 m.logger.Printf("DEBUG: Index now has %d bundles", m.index.Count()) 566 } 567 568 // Save index 569 if err := m.SaveIndex(); err != nil { 570 m.logger.Printf("DEBUG: SaveIndex FAILED: %v", err) 571 return 0, fmt.Errorf("failed to save index: %w", err) 572 } 573 574 if m.config.Verbose { 575 m.logger.Printf("DEBUG: Index saved, last bundle = %d", m.index.GetLastBundle().BundleNumber) 576 } 577 578 saveDuration := time.Since(totalStart) 579 580 // Clean up old mempool 581 oldMempoolFile := m.mempool.GetFilename() 582 if err := m.mempool.Delete(); err != nil && !quiet { 583 m.logger.Printf("Warning: failed to delete old mempool %s: %v", oldMempoolFile, err) 584 } 585 586 // Create new mempool 587 nextBundle := bundle.BundleNumber + 1 588 minTimestamp := bundle.EndTime 589 590 newMempool, err := mempool.NewMempool(m.config.BundleDir, nextBundle, minTimestamp, m.logger, m.config.Verbose) 591 if err != nil { 592 return 0, fmt.Errorf("failed to create new mempool: %w", err) 593 } 594 595 m.mempool = newMempool 596 597 // DID index update (if enabled) 598 var indexUpdateDuration time.Duration 599 if !skipDIDIndex && m.didIndex != nil && m.didIndex.Exists() { 600 indexUpdateStart := time.Now() 601 if err := m.updateDIDIndexForBundle(ctx, bundle); err != nil { 602 m.logger.Printf("Warning: failed to update DID index: %v", err) 603 } else { 604 indexUpdateDuration = time.Since(indexUpdateStart) 605 if !quiet && m.config.Verbose { 606 m.logger.Printf(" [DID Index] Updated in %s", indexUpdateDuration) 607 } 608 } 609 } 610 611 if !quiet { 612 msg := fmt.Sprintf("→ Bundle %06d | %s | fetch: %s (%d reqs)", 613 bundle.BundleNumber, 614 bundle.Hash[0:7], 615 stats.TotalDuration.Round(time.Millisecond), 616 stats.TotalFetches, 617 ) 618 if indexUpdateDuration > 0 { 619 msg += fmt.Sprintf(" | index: %s", indexUpdateDuration.Round(time.Millisecond)) 620 } 621 msg += fmt.Sprintf(" | %s", formatTimeDistance(time.Since(bundle.EndTime))) 622 m.logger.Println(msg) 623 } 624 625 if m.config.Verbose { 626 m.logger.Printf("DEBUG: Bundle done = %d, finish duration = %s", 627 m.index.GetLastBundle().BundleNumber, 628 saveDuration.Round(time.Millisecond)) 629 } 630 631 return indexUpdateDuration, nil 632} 633 634// GetMempoolStats returns mempool statistics 635func (m *Manager) GetMempoolStats() map[string]interface{} { 636 return m.mempool.Stats() 637} 638 639// GetMempoolOperations returns all operations currently in mempool 640func (m *Manager) GetMempoolOperations() ([]plcclient.PLCOperation, error) { 641 if m.mempool == nil { 642 return nil, fmt.Errorf("mempool not initialized") 643 } 644 645 count := m.mempool.Count() 646 if count == 0 { 647 return []plcclient.PLCOperation{}, nil 648 } 649 650 return m.mempool.Peek(count), nil 651} 652 653// Add to Bundle type to implement BundleData interface 654func (b *Bundle) GetBundleNumber() int { 655 return b.BundleNumber 656} 657 658// VerifyBundle verifies a bundle's integrity 659func (m *Manager) VerifyBundle(ctx context.Context, bundleNumber int) (*VerificationResult, error) { 660 result := &VerificationResult{ 661 BundleNumber: bundleNumber, 662 } 663 664 // Get from index 665 meta, err := m.index.GetBundle(bundleNumber) 666 if err != nil { 667 result.Error = err 668 return result, nil 669 } 670 671 result.ExpectedHash = meta.CompressedHash 672 673 // Check file exists 674 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", bundleNumber)) 675 result.FileExists = m.operations.FileExists(path) 676 if !result.FileExists { 677 result.Error = fmt.Errorf("file not found") 678 return result, nil 679 } 680 681 // Verify BOTH compressed and content hashes 682 compHash, compSize, contentHash, contentSize, err := m.operations.CalculateFileHashes(path) 683 if err != nil { 684 result.Error = err 685 return result, nil 686 } 687 688 result.LocalHash = compHash 689 690 // Verify compressed hash 691 if compHash != meta.CompressedHash { 692 result.HashMatch = false 693 result.Valid = false 694 result.Error = fmt.Errorf("compressed hash mismatch: expected %s, got %s", meta.CompressedHash, compHash) 695 return result, nil 696 } 697 698 // Verify content hash 699 if contentHash != meta.ContentHash { 700 result.HashMatch = false 701 result.Valid = false 702 result.Error = fmt.Errorf("content hash mismatch: expected %s, got %s", meta.ContentHash, contentHash) 703 return result, nil 704 } 705 706 // Verify sizes match 707 if compSize != meta.CompressedSize { 708 result.Valid = false 709 result.Error = fmt.Errorf("compressed size mismatch: expected %d, got %d", meta.CompressedSize, compSize) 710 return result, nil 711 } 712 713 if contentSize != meta.UncompressedSize { 714 result.Valid = false 715 result.Error = fmt.Errorf("uncompressed size mismatch: expected %d, got %d", meta.UncompressedSize, contentSize) 716 return result, nil 717 } 718 719 result.HashMatch = true 720 result.Valid = true 721 722 return result, nil 723} 724 725// GetInfo returns information about the bundle manager 726func (m *Manager) GetInfo() map[string]interface{} { 727 stats := m.index.GetStats() 728 stats["bundle_dir"] = m.config.BundleDir 729 stats["index_path"] = m.indexPath 730 stats["verify_on_load"] = m.config.VerifyOnLoad 731 return stats 732} 733 734// GetRSManager returns the Rust bundle manager (proxy method) 735func (m *Manager) GetRSManager() (*plcbundle.BundleManager, error) { 736 return m.getRSManager() 737} 738 739// IsBundleIndexed checks if a bundle is already in the index 740func (m *Manager) IsBundleIndexed(bundleNumber int) bool { 741 _, err := m.index.GetBundle(bundleNumber) 742 return err == nil 743} 744 745// RefreshMempool reloads mempool from disk 746func (m *Manager) RefreshMempool() error { 747 if m.mempool == nil { 748 return fmt.Errorf("mempool not initialized") 749 } 750 return m.mempool.Load() 751} 752 753// ClearMempool clears all operations from the mempool and saves 754func (m *Manager) ClearMempool() error { 755 if m.mempool == nil { 756 return fmt.Errorf("mempool not initialized") 757 } 758 759 m.logger.Printf("Clearing mempool...") 760 761 count := m.mempool.Count() 762 763 m.mempool.Clear() 764 765 if err := m.mempool.Save(); err != nil { 766 return fmt.Errorf("failed to save mempool: %w", err) 767 } 768 769 m.logger.Printf("Cleared %d operations from mempool", count) 770 771 return nil 772} 773 774// ValidateMempool validates mempool 775func (m *Manager) ValidateMempool() error { 776 if m.mempool == nil { 777 return fmt.Errorf("mempool not initialized") 778 } 779 return m.mempool.Validate() 780} 781 782// StreamBundleRaw streams the raw compressed bundle file 783func (m *Manager) StreamBundleRaw(ctx context.Context, bundleNumber int) (io.ReadCloser, error) { 784 // Get metadata from index 785 meta, err := m.index.GetBundle(bundleNumber) 786 if err != nil { 787 return nil, fmt.Errorf("bundle not in index: %w", err) 788 } 789 790 // Build file path 791 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", bundleNumber)) 792 if !m.operations.FileExists(path) { 793 return nil, fmt.Errorf("bundle file not found: %s", path) 794 } 795 796 // Optionally verify hash before streaming 797 if m.config.VerifyOnLoad { 798 valid, actualHash, err := m.operations.VerifyHash(path, meta.CompressedHash) 799 if err != nil { 800 return nil, fmt.Errorf("failed to verify hash: %w", err) 801 } 802 if !valid { 803 return nil, fmt.Errorf("hash mismatch: expected %s, got %s", meta.CompressedHash, actualHash) 804 } 805 } 806 807 return m.operations.StreamRaw(path) 808} 809 810// StreamBundleDecompressed streams the decompressed bundle data as JSONL 811func (m *Manager) StreamBundleDecompressed(ctx context.Context, bundleNumber int) (io.ReadCloser, error) { 812 // Get metadata from index 813 _, err := m.index.GetBundle(bundleNumber) 814 if err != nil { 815 return nil, fmt.Errorf("bundle not in index: %w", err) 816 } 817 818 // Build file path 819 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", bundleNumber)) 820 if !m.operations.FileExists(path) { 821 return nil, fmt.Errorf("bundle file not found: %s", path) 822 } 823 824 return m.operations.StreamDecompressed(path) 825} 826 827// RefreshIndex reloads the index from disk if it has been modified 828func (m *Manager) RefreshIndex() error { 829 // Check if index file has been modified 830 info, err := os.Stat(m.indexPath) 831 if err != nil { 832 return err 833 } 834 835 // If index was modified after we loaded it, reload 836 if info.ModTime().After(m.index.UpdatedAt) { 837 m.logger.Printf("Index file modified, reloading...") 838 839 newIndex, err := bundleindex.LoadIndex(m.indexPath) 840 if err != nil { 841 return fmt.Errorf("failed to reload index: %w", err) 842 } 843 844 m.index = newIndex 845 m.logger.Printf("Index reloaded: %d bundles", m.index.Count()) 846 } 847 848 return nil 849} 850 851// GetMempool returns the current mempool 852func (m *Manager) GetMempool() *mempool.Mempool { 853 return m.mempool 854} 855 856// SaveMempool saves the current mempool state to disk 857func (m *Manager) SaveMempool() error { 858 if m.mempool == nil { 859 return fmt.Errorf("mempool not initialized") 860 } 861 return m.mempool.Save() 862} 863 864// GetPLCOrigin returns the PLC directory origin URL 865func (m *Manager) GetPLCOrigin() string { 866 if m.plcClient == nil { 867 return "" 868 } 869 return m.plcClient.GetBaseURL() 870} 871 872// GetCurrentCursor returns the current latest cursor position (including mempool) 873func (m *Manager) GetCurrentCursor() int { 874 index := m.GetIndex() 875 bundles := index.GetBundles() 876 cursor := len(bundles) * types.BUNDLE_SIZE 877 878 // Add mempool operations 879 mempoolStats := m.GetMempoolStats() 880 if count, ok := mempoolStats["count"].(int); ok { 881 cursor += count 882 } 883 884 return cursor 885} 886 887// LoadOperation loads a single operation from a bundle efficiently 888func (m *Manager) LoadOperation(ctx context.Context, bundleNumber int, position int) (*plcclient.PLCOperation, error) { 889 // Validate position 890 if position < 0 || position >= types.BUNDLE_SIZE { 891 return nil, fmt.Errorf("invalid position: %d (must be 0-%d)", position, types.BUNDLE_SIZE-1) 892 } 893 894 // Validate bundle exists in index 895 _, err := m.index.GetBundle(bundleNumber) 896 if err != nil { 897 return nil, fmt.Errorf("bundle not in index: %w", err) 898 } 899 900 // Build file path 901 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", bundleNumber)) 902 if !m.operations.FileExists(path) { 903 return nil, fmt.Errorf("bundle file not found: %s", path) 904 } 905 906 // Load just the one operation (optimized - decompresses only until position) 907 return m.operations.LoadOperationAtPosition(path, position) 908} 909 910// LoadOperations loads multiple operations from a bundle efficiently 911func (m *Manager) LoadOperations(ctx context.Context, bundleNumber int, positions []int) (map[int]*plcclient.PLCOperation, error) { 912 // Build file path 913 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", bundleNumber)) 914 if !m.operations.FileExists(path) { 915 return nil, fmt.Errorf("bundle file not found: %s", path) 916 } 917 918 return m.operations.LoadOperationsAtPositions(path, positions) 919} 920 921// filterBundleFiles filters out files starting with . or _ 922func filterBundleFiles(files []string) []string { 923 filtered := make([]string, 0, len(files)) 924 for _, file := range files { 925 basename := filepath.Base(file) 926 if len(basename) > 0 && (basename[0] == '.' || basename[0] == '_') { 927 continue 928 } 929 filtered = append(filtered, file) 930 } 931 return filtered 932} 933 934// ========================================== 935// DID INDEX INTEGRATION (adapter methods) 936// ========================================== 937 938// Implement BundleProvider interface for didindex 939func (m *Manager) LoadBundleForDIDIndex(ctx context.Context, bundleNumber int) (*didindex.BundleData, error) { 940 bundle, err := m.LoadBundle(ctx, bundleNumber) 941 if err != nil { 942 return nil, err 943 } 944 945 return &didindex.BundleData{ 946 BundleNumber: bundle.BundleNumber, 947 Operations: bundle.Operations, 948 }, nil 949} 950 951func (m *Manager) GetBundleIndex() didindex.BundleIndexProvider { 952 return &bundleIndexAdapter{index: m.index} 953} 954 955// bundleIndexAdapter adapts Index to BundleIndexProvider interface 956type bundleIndexAdapter struct { 957 index *bundleindex.Index 958} 959 960func (a *bundleIndexAdapter) GetBundles() []*didindex.BundleMetadata { 961 bundles := a.index.GetBundles() 962 result := make([]*didindex.BundleMetadata, len(bundles)) 963 for i, b := range bundles { 964 result[i] = &didindex.BundleMetadata{ 965 BundleNumber: b.BundleNumber, 966 StartTime: b.StartTime, 967 EndTime: b.EndTime, 968 } 969 } 970 return result 971} 972 973func (a *bundleIndexAdapter) GetBundle(bundleNumber int) (*didindex.BundleMetadata, error) { 974 meta, err := a.index.GetBundle(bundleNumber) 975 if err != nil { 976 return nil, err 977 } 978 return &didindex.BundleMetadata{ 979 BundleNumber: meta.BundleNumber, 980 StartTime: meta.StartTime, 981 EndTime: meta.EndTime, 982 }, nil 983} 984 985func (a *bundleIndexAdapter) GetLastBundle() *didindex.BundleMetadata { 986 meta := a.index.GetLastBundle() 987 if meta == nil { 988 return nil 989 } 990 return &didindex.BundleMetadata{ 991 BundleNumber: meta.BundleNumber, 992 StartTime: meta.StartTime, 993 EndTime: meta.EndTime, 994 } 995} 996 997// GetDIDIndex returns the DID index manager 998func (m *Manager) GetDIDIndex() *didindex.Manager { 999 return m.didIndex 1000} 1001 1002// BuildDIDIndex builds the complete DID index 1003func (m *Manager) BuildDIDIndex(ctx context.Context, progressCallback func(current, total int)) error { 1004 if m.didIndex == nil { 1005 m.didIndex = didindex.NewManager(m.config.BundleDir, m.logger) 1006 } 1007 1008 return m.didIndex.BuildIndexFromScratch(ctx, m, progressCallback) 1009} 1010 1011// updateDIDIndexForBundle updates index when a new bundle is added 1012func (m *Manager) updateDIDIndexForBundle(ctx context.Context, bundle *Bundle) error { 1013 if m.didIndex == nil { 1014 return nil 1015 } 1016 1017 // Convert to didindex.BundleData 1018 bundleData := &didindex.BundleData{ 1019 BundleNumber: bundle.BundleNumber, 1020 Operations: bundle.Operations, 1021 } 1022 1023 return m.didIndex.UpdateIndexForBundle(ctx, bundleData) 1024} 1025 1026// GetDIDIndexStats returns DID index statistics 1027func (m *Manager) GetDIDIndexStats() map[string]interface{} { 1028 if m.didIndex == nil { 1029 return map[string]interface{}{ 1030 "enabled": false, 1031 } 1032 } 1033 1034 stats := m.didIndex.GetStats() 1035 stats["enabled"] = true 1036 stats["exists"] = m.didIndex.Exists() 1037 1038 indexedDIDs := stats["total_dids"].(int64) 1039 1040 // Get unique DIDs from mempool 1041 mempoolDIDCount := int64(0) 1042 if m.mempool != nil { 1043 mempoolStats := m.GetMempoolStats() 1044 if didCount, ok := mempoolStats["did_count"].(int); ok { 1045 mempoolDIDCount = int64(didCount) 1046 } 1047 } 1048 1049 stats["indexed_dids"] = indexedDIDs 1050 stats["mempool_dids"] = mempoolDIDCount 1051 stats["total_dids"] = indexedDIDs + mempoolDIDCount 1052 1053 return stats 1054} 1055 1056// GetDIDOperations retrieves all operations for a DID (bundles + mempool combined) 1057// Returns: operations only, operations with locations, error 1058func (m *Manager) GetDIDOperations(ctx context.Context, did string, verbose bool) ([]plcclient.PLCOperation, []PLCOperationWithLocation, error) { 1059 if err := plcclient.ValidateDIDFormat(did); err != nil { 1060 return nil, nil, err 1061 } 1062 1063 // Set verbose mode 1064 if m.didIndex != nil { 1065 m.didIndex.SetVerbose(verbose) 1066 } 1067 1068 // Get bundled operations from DID index (includes nullified) 1069 bundledOpsWithLoc, err := m.didIndex.GetDIDOperations(ctx, did, m) 1070 if err != nil { 1071 return nil, nil, err 1072 } 1073 1074 // Convert to bundle types 1075 opsWithLoc := make([]PLCOperationWithLocation, len(bundledOpsWithLoc)) 1076 bundledOps := make([]plcclient.PLCOperation, len(bundledOpsWithLoc)) 1077 for i, r := range bundledOpsWithLoc { 1078 opsWithLoc[i] = PLCOperationWithLocation{ 1079 Operation: r.Operation, 1080 Bundle: r.Bundle, 1081 Position: r.Position, 1082 } 1083 bundledOps[i] = r.Operation 1084 } 1085 1086 // Get mempool operations 1087 mempoolOps, err := m.GetDIDOperationsFromMempool(did) 1088 if err != nil { 1089 return nil, nil, err 1090 } 1091 1092 if len(mempoolOps) > 0 && verbose { 1093 m.logger.Printf("DEBUG: Found %d operations in mempool", len(mempoolOps)) 1094 } 1095 1096 // Combine operations (for the slice return) 1097 allOps := append(bundledOps, mempoolOps...) 1098 1099 sort.Slice(allOps, func(i, j int) bool { 1100 return allOps[i].CreatedAt.Before(allOps[j].CreatedAt) 1101 }) 1102 1103 return allOps, opsWithLoc, nil 1104} 1105 1106// GetDIDOperationsFromMempool retrieves operations for a DID from mempool only 1107func (m *Manager) GetDIDOperationsFromMempool(did string) ([]plcclient.PLCOperation, error) { 1108 if m.mempool == nil { 1109 return []plcclient.PLCOperation{}, nil 1110 } 1111 1112 // Use direct search - only copies matching operations 1113 return m.mempool.FindDIDOperations(did), nil 1114} 1115 1116// GetLatestDIDOperation returns only the most recent non-nullified operation 1117func (m *Manager) GetLatestDIDOperation(ctx context.Context, did string) (*plcclient.PLCOperation, error) { 1118 if err := plcclient.ValidateDIDFormat(did); err != nil { 1119 return nil, err 1120 } 1121 1122 // Check mempool first (most recent data) 1123 mempoolOps, _ := m.GetDIDOperationsFromMempool(did) 1124 if len(mempoolOps) > 0 { 1125 for i := len(mempoolOps) - 1; i >= 0; i-- { 1126 if !mempoolOps[i].IsNullified() { 1127 return &mempoolOps[i], nil 1128 } 1129 } 1130 } 1131 1132 // Delegate to DID index for bundled operations 1133 return m.didIndex.GetLatestDIDOperation(ctx, did, m) 1134} 1135 1136// VerifyChain verifies the entire bundle chain 1137func (m *Manager) VerifyChain(ctx context.Context) (*ChainVerificationResult, error) { 1138 result := &ChainVerificationResult{ 1139 VerifiedBundles: make([]int, 0), 1140 } 1141 1142 bundles := m.index.GetBundles() 1143 if len(bundles) == 0 { 1144 result.Valid = true 1145 return result, nil 1146 } 1147 1148 result.ChainLength = len(bundles) 1149 1150 for i, meta := range bundles { 1151 // Verify file hash 1152 vr, err := m.VerifyBundle(ctx, meta.BundleNumber) 1153 if err != nil || !vr.Valid { 1154 result.Error = fmt.Sprintf("Bundle %d hash verification failed", meta.BundleNumber) 1155 result.BrokenAt = meta.BundleNumber 1156 return result, nil 1157 } 1158 1159 // Verify chain link 1160 if i > 0 { 1161 prevMeta := bundles[i-1] 1162 1163 // Check parent reference 1164 if meta.Parent != prevMeta.Hash { 1165 result.Error = fmt.Sprintf("Chain broken at bundle %d: parent mismatch", meta.BundleNumber) 1166 result.BrokenAt = meta.BundleNumber 1167 return result, nil 1168 } 1169 1170 // Verify chain hash calculation 1171 expectedHash := m.operations.CalculateChainHash(prevMeta.Hash, meta.ContentHash) 1172 if meta.Hash != expectedHash { 1173 result.Error = fmt.Sprintf("Chain broken at bundle %d: hash mismatch", meta.BundleNumber) 1174 result.BrokenAt = meta.BundleNumber 1175 return result, nil 1176 } 1177 } 1178 1179 result.VerifiedBundles = append(result.VerifiedBundles, meta.BundleNumber) 1180 } 1181 1182 result.Valid = true 1183 return result, nil 1184} 1185 1186// FetchNextBundle fetches operations and creates a bundle, looping until caught up 1187func (m *Manager) FetchNextBundle(ctx context.Context, verbose bool, quiet bool) (*Bundle, types.BundleProductionStats, error) { 1188 if m.plcClient == nil { 1189 return nil, types.BundleProductionStats{}, fmt.Errorf("PLC client not configured") 1190 } 1191 1192 lastBundle := m.index.GetLastBundle() 1193 nextBundleNum := 1 1194 var afterTime string 1195 var prevBoundaryCIDs map[string]bool 1196 var prevBundleHash string 1197 1198 if lastBundle != nil { 1199 nextBundleNum = lastBundle.BundleNumber + 1 1200 afterTime = lastBundle.EndTime.Format(time.RFC3339Nano) 1201 prevBundleHash = lastBundle.Hash 1202 1203 // ALWAYS get boundaries from last bundle initially 1204 prevBundle, err := m.LoadBundle(ctx, lastBundle.BundleNumber) 1205 if err == nil { 1206 _, prevBoundaryCIDs = m.operations.GetBoundaryCIDs(prevBundle.Operations) 1207 if verbose { 1208 m.logger.Printf("Loaded %d boundary CIDs from bundle %06d (at %s)", 1209 len(prevBoundaryCIDs), lastBundle.BundleNumber, 1210 lastBundle.EndTime.Format(time.RFC3339)[:19]) 1211 } 1212 } 1213 } 1214 1215 // If mempool has operations, update cursor but KEEP boundaries from bundle 1216 // (mempool operations already had boundary dedup applied when they were added) 1217 if m.mempool.Count() > 0 { 1218 mempoolLastTime := m.mempool.GetLastTime() 1219 if mempoolLastTime != "" { 1220 if verbose { 1221 m.logger.Printf("[DEBUG] Mempool has %d ops, resuming from %s", 1222 m.mempool.Count(), mempoolLastTime[:19]) 1223 } 1224 afterTime = mempoolLastTime 1225 1226 // Calculate boundaries from MEMPOOL for next fetch 1227 mempoolOps, _ := m.GetMempoolOperations() 1228 if len(mempoolOps) > 0 { 1229 _, mempoolBoundaries := m.operations.GetBoundaryCIDs(mempoolOps) 1230 prevBoundaryCIDs = mempoolBoundaries 1231 if verbose { 1232 m.logger.Printf("Using %d boundary CIDs from mempool", len(prevBoundaryCIDs)) 1233 } 1234 } 1235 } 1236 } 1237 1238 if verbose { 1239 m.logger.Printf("[DEBUG] Preparing bundle %06d (mempool: %d ops)...", nextBundleNum, m.mempool.Count()) 1240 m.logger.Printf("[DEBUG] Starting cursor: %s", afterTime) 1241 } 1242 1243 totalFetches := 0 1244 maxAttempts := 50 1245 attempt := 0 1246 caughtUp := false 1247 attemptStart := time.Now() 1248 1249 for m.mempool.Count() < types.BUNDLE_SIZE && attempt < maxAttempts { 1250 attempt++ 1251 needed := types.BUNDLE_SIZE - m.mempool.Count() 1252 1253 if !quiet && attempt > 1 { 1254 m.logger.Printf(" Attempt %d: Need %d more ops, cursor: %s", 1255 attempt, needed, afterTime[:19]) 1256 } 1257 1258 newOps, fetchCount, err := m.syncer.FetchToMempool( 1259 ctx, 1260 afterTime, 1261 prevBoundaryCIDs, 1262 needed, 1263 !verbose, 1264 m.mempool, 1265 totalFetches, 1266 ) 1267 1268 totalFetches += fetchCount 1269 1270 // Check if we got an incomplete batch 1271 gotIncompleteBatch := len(newOps) > 0 && len(newOps) < needed && err == nil 1272 1273 // Update cursor from mempool if we got new ops 1274 if len(newOps) > 0 && m.mempool.Count() > 0 { 1275 afterTime = m.mempool.GetLastTime() 1276 } 1277 1278 // Stop if caught up or error 1279 if err != nil || len(newOps) == 0 || gotIncompleteBatch { 1280 caughtUp = true 1281 if verbose && totalFetches > 0 { 1282 m.logger.Printf("DEBUG: Caught up to latest PLC data") 1283 } 1284 break 1285 } 1286 1287 if m.mempool.Count() >= types.BUNDLE_SIZE { 1288 break 1289 } 1290 } 1291 1292 totalDuration := time.Since(attemptStart) 1293 1294 if m.mempool.Count() < types.BUNDLE_SIZE { 1295 if caughtUp { 1296 return nil, types.BundleProductionStats{}, fmt.Errorf("insufficient operations: have %d, need %d (caught up to latest PLC data)", 1297 m.mempool.Count(), types.BUNDLE_SIZE) 1298 } else { 1299 return nil, types.BundleProductionStats{}, fmt.Errorf("insufficient operations: have %d, need %d (max attempts reached)", 1300 m.mempool.Count(), types.BUNDLE_SIZE) 1301 } 1302 } 1303 1304 // Create bundle 1305 operations, err := m.mempool.Take(types.BUNDLE_SIZE) 1306 if err != nil { 1307 return nil, types.BundleProductionStats{}, err 1308 } 1309 1310 syncBundle := internalsync.CreateBundle(nextBundleNum, operations, afterTime, prevBundleHash, m.operations) 1311 1312 bundle := &Bundle{ 1313 BundleNumber: syncBundle.BundleNumber, 1314 StartTime: syncBundle.StartTime, 1315 EndTime: syncBundle.EndTime, 1316 Operations: syncBundle.Operations, 1317 DIDCount: syncBundle.DIDCount, 1318 Cursor: syncBundle.Cursor, 1319 Parent: syncBundle.Parent, 1320 BoundaryCIDs: syncBundle.BoundaryCIDs, 1321 Compressed: syncBundle.Compressed, 1322 CreatedAt: syncBundle.CreatedAt, 1323 } 1324 1325 stats := types.BundleProductionStats{ 1326 TotalFetches: totalFetches, 1327 TotalDuration: totalDuration, 1328 AvgPerFetch: float64(types.BUNDLE_SIZE) / float64(totalFetches), 1329 Throughput: float64(types.BUNDLE_SIZE) / totalDuration.Seconds(), 1330 } 1331 1332 return bundle, stats, nil 1333} 1334 1335// CloneFromRemote clones bundles from a remote endpoint 1336func (m *Manager) CloneFromRemote(ctx context.Context, opts internalsync.CloneOptions) (*internalsync.CloneResult, error) { 1337 // Define index update callback inline 1338 updateIndexCallback := func(bundleNumbers []int, remoteMeta map[int]*bundleindex.BundleMetadata, verbose bool) error { 1339 if len(bundleNumbers) == 0 { 1340 return nil 1341 } 1342 1343 // Create file existence checker 1344 fileExists := func(bundleNum int) bool { 1345 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", bundleNum)) 1346 return m.operations.FileExists(path) 1347 } 1348 1349 // Update index with remote metadata 1350 if err := m.index.UpdateFromRemote(bundleNumbers, remoteMeta, fileExists, verbose, m.logger); err != nil { 1351 return err 1352 } 1353 1354 // Save index 1355 return m.SaveIndex() 1356 } 1357 1358 // Delegate to cloner with inline callback 1359 return m.cloner.Clone(ctx, opts, m.index, updateIndexCallback) 1360} 1361 1362// ResolveDID resolves a DID to its current document with detailed timing metrics 1363func (m *Manager) ResolveDID(ctx context.Context, did string) (*ResolveDIDResult, error) { 1364 if err := plcclient.ValidateDIDFormat(did); err != nil { 1365 atomic.AddInt64(&m.resolverStats.errors, 1) 1366 return nil, err 1367 } 1368 1369 result := &ResolveDIDResult{} 1370 totalStart := time.Now() 1371 1372 // STEP 1: Check mempool first 1373 mempoolStart := time.Now() 1374 var latestMempoolOp *plcclient.PLCOperation 1375 if m.mempool != nil { 1376 latestMempoolOp = m.mempool.FindLatestDIDOperation(did) 1377 } 1378 result.MempoolTime = time.Since(mempoolStart) 1379 1380 // Early return if found in mempool 1381 if latestMempoolOp != nil { 1382 doc, err := plcclient.ResolveDIDDocument(did, []plcclient.PLCOperation{*latestMempoolOp}) 1383 if err != nil { 1384 atomic.AddInt64(&m.resolverStats.errors, 1) 1385 return nil, fmt.Errorf("resolution failed: %w", err) 1386 } 1387 1388 result.Document = doc 1389 result.LatestOperation = latestMempoolOp 1390 result.Source = "mempool" 1391 result.TotalTime = time.Since(totalStart) 1392 1393 m.recordResolverTiming(result, nil) 1394 return result, nil 1395 } 1396 1397 // STEP 2: Index lookup 1398 if m.didIndex == nil || !m.didIndex.Exists() { 1399 atomic.AddInt64(&m.resolverStats.errors, 1) 1400 return nil, fmt.Errorf("DID index not available - run 'plcbundle index build' to enable DID resolution") 1401 } 1402 1403 indexStart := time.Now() 1404 locations, err := m.didIndex.GetDIDLocations(did) 1405 result.IndexTime = time.Since(indexStart) 1406 1407 if err != nil { 1408 atomic.AddInt64(&m.resolverStats.errors, 1) 1409 return nil, err 1410 } 1411 1412 if len(locations) == 0 { 1413 atomic.AddInt64(&m.resolverStats.errors, 1) 1414 return nil, fmt.Errorf("DID not found") 1415 } 1416 1417 // Find latest non-nullified location 1418 var latestLoc *didindex.OpLocation 1419 for i := range locations { 1420 if locations[i].Nullified() { 1421 continue 1422 } 1423 if latestLoc == nil || locations[i].IsAfter(*latestLoc) { 1424 latestLoc = &locations[i] 1425 } 1426 } 1427 1428 if latestLoc == nil { 1429 atomic.AddInt64(&m.resolverStats.errors, 1) 1430 return nil, fmt.Errorf("no valid operations (all nullified)") 1431 } 1432 1433 // STEP 3: Load operation 1434 opStart := time.Now() 1435 op, err := m.LoadOperation(ctx, latestLoc.BundleInt(), latestLoc.PositionInt()) 1436 result.LoadOpTime = time.Since(opStart) 1437 1438 if err != nil { 1439 atomic.AddInt64(&m.resolverStats.errors, 1) 1440 return nil, fmt.Errorf("failed to load operation: %w", err) 1441 } 1442 1443 result.BundleNumber = latestLoc.BundleInt() 1444 result.Position = latestLoc.PositionInt() 1445 1446 // STEP 4: Resolve document 1447 doc, err := plcclient.ResolveDIDDocument(did, []plcclient.PLCOperation{*op}) 1448 if err != nil { 1449 atomic.AddInt64(&m.resolverStats.errors, 1) 1450 return nil, fmt.Errorf("resolution failed: %w", err) 1451 } 1452 1453 result.Document = doc 1454 result.LatestOperation = op 1455 result.Source = "bundle" 1456 result.TotalTime = time.Since(totalStart) 1457 1458 m.recordResolverTiming(result, nil) 1459 return result, nil 1460} 1461 1462// GetLastBundleNumber returns the last bundle number (0 if no bundles) 1463func (m *Manager) GetLastBundleNumber() int { 1464 lastBundle := m.index.GetLastBundle() 1465 if lastBundle == nil { 1466 return 0 1467 } 1468 return lastBundle.BundleNumber 1469} 1470 1471// GetMempoolCount returns the number of operations in mempool 1472func (m *Manager) GetMempoolCount() int { 1473 return m.mempool.Count() 1474} 1475 1476func (m *Manager) FetchAndSaveNextBundle(ctx context.Context, verbose bool, quiet bool, skipDIDIndex bool) (int, *types.BundleProductionStats, error) { 1477 bundle, stats, err := m.FetchNextBundle(ctx, verbose, quiet) 1478 if err != nil { 1479 return 0, nil, err 1480 } 1481 1482 indexTime, err := m.SaveBundle(ctx, bundle, verbose, quiet, stats, skipDIDIndex) 1483 if err != nil { 1484 return 0, nil, err 1485 } 1486 stats.IndexTime = indexTime 1487 1488 return bundle.BundleNumber, &types.BundleProductionStats{}, nil 1489} 1490 1491// RunSyncLoop runs continuous sync loop (delegates to internal/sync) 1492func (m *Manager) RunSyncLoop(ctx context.Context, config *internalsync.SyncLoopConfig) error { 1493 // Manager itself implements the SyncManager interface 1494 return internalsync.RunSyncLoop(ctx, m, config) 1495} 1496 1497// RunSyncOnce performs a single sync cycle 1498func (m *Manager) RunSyncOnce(ctx context.Context, config *internalsync.SyncLoopConfig) (int, error) { 1499 // Manager itself implements the SyncManager interface 1500 return internalsync.SyncOnce(ctx, m, config) 1501} 1502 1503// EnsureDIDIndex ensures DID index is built and up-to-date 1504// Returns true if index was built/rebuilt, false if already up-to-date 1505func (m *Manager) EnsureDIDIndex(ctx context.Context, progressCallback func(current, total int)) (bool, error) { 1506 // Build index 1507 m.UpdateDIDIndexSmart(ctx, progressCallback) 1508 return true, nil 1509} 1510 1511// Add this helper function at the top of manager.go 1512func repositoryExists(bundleDir string) bool { 1513 indexPath := filepath.Join(bundleDir, bundleindex.INDEX_FILE) 1514 1515 // Check for index file 1516 if _, err := os.Stat(indexPath); err == nil { 1517 return true 1518 } 1519 1520 // Check for bundle files 1521 bundleFiles, _ := filepath.Glob(filepath.Join(bundleDir, "*.jsonl.zst")) 1522 bundleFiles = filterBundleFiles(bundleFiles) 1523 1524 return len(bundleFiles) > 0 1525} 1526 1527// ResolveHandleOrDID resolves input that can be either a handle or DID 1528// Returns: (did, handleResolveTime, error) 1529func (m *Manager) ResolveHandleOrDID(ctx context.Context, input string) (string, time.Duration, error) { 1530 input = strings.TrimSpace(input) 1531 1532 // Normalize handle format (remove at://, @ prefixes) 1533 if !strings.HasPrefix(input, "did:") { 1534 input = strings.TrimPrefix(input, "at://") 1535 input = strings.TrimPrefix(input, "@") 1536 } 1537 1538 // If already a DID, validate and return 1539 if strings.HasPrefix(input, "did:plc:") { 1540 if err := plcclient.ValidateDIDFormat(input); err != nil { 1541 return "", 0, err 1542 } 1543 return input, 0, nil // No resolution needed 1544 } 1545 1546 // Support did:web too 1547 if strings.HasPrefix(input, "did:web:") { 1548 return input, 0, nil 1549 } 1550 1551 // It's a handle - need resolver 1552 if m.handleResolver == nil { 1553 return "", 0, fmt.Errorf( 1554 "input '%s' appears to be a handle, but handle resolver is not configured\n\n"+ 1555 "Configure resolver with:\n"+ 1556 " plcbundle --handle-resolver https://quickdid.smokesignal.tools did resolve %s\n\n"+ 1557 "Or set default in config", 1558 input, input) 1559 } 1560 1561 resolveStart := time.Now() 1562 if !m.config.Quiet { 1563 m.logger.Printf("Resolving handle: %s", input) 1564 } 1565 did, err := m.handleResolver.ResolveHandle(ctx, input) 1566 resolveTime := time.Since(resolveStart) 1567 1568 if err != nil { 1569 return "", resolveTime, fmt.Errorf("failed to resolve handle '%s': %w", input, err) 1570 } 1571 1572 if !m.config.Quiet { 1573 m.logger.Printf("Resolved: %s → %s (in %s)", input, did, resolveTime) 1574 } 1575 return did, resolveTime, nil 1576} 1577 1578// GetHandleResolver returns the handle resolver (can be nil) 1579func (m *Manager) GetHandleResolver() *handleresolver.Client { 1580 return m.handleResolver 1581} 1582 1583// recordResolverTiming records resolver performance metrics 1584func (m *Manager) recordResolverTiming(result *ResolveDIDResult, _ error) { 1585 m.resolverStats.Lock() 1586 defer m.resolverStats.Unlock() 1587 1588 // Increment counters 1589 atomic.AddInt64(&m.resolverStats.totalResolutions, 1) 1590 1591 switch result.Source { 1592 case "mempool": 1593 atomic.AddInt64(&m.resolverStats.mempoolHits, 1) 1594 case "bundle": 1595 atomic.AddInt64(&m.resolverStats.bundleHits, 1) 1596 } 1597 1598 // Record timings 1599 timing := resolverTiming{ 1600 totalTime: result.TotalTime.Microseconds(), 1601 mempoolTime: result.MempoolTime.Microseconds(), 1602 indexTime: result.IndexTime.Microseconds(), 1603 loadOpTime: result.LoadOpTime.Microseconds(), 1604 source: result.Source, 1605 } 1606 1607 atomic.AddInt64(&m.resolverStats.totalTime, timing.totalTime) 1608 atomic.AddInt64(&m.resolverStats.totalMempoolTime, timing.mempoolTime) 1609 atomic.AddInt64(&m.resolverStats.totalIndexTime, timing.indexTime) 1610 atomic.AddInt64(&m.resolverStats.totalLoadOpTime, timing.loadOpTime) 1611 1612 // Add to circular buffer 1613 m.resolverStats.recentTimes[m.resolverStats.recentIdx] = timing 1614 m.resolverStats.recentIdx = (m.resolverStats.recentIdx + 1) % m.resolverStats.recentSize 1615} 1616 1617// GetResolverStats returns resolver performance statistics 1618func (m *Manager) GetResolverStats() map[string]interface{} { 1619 totalResolutions := atomic.LoadInt64(&m.resolverStats.totalResolutions) 1620 1621 if totalResolutions == 0 { 1622 return map[string]interface{}{ 1623 "total_resolutions": 0, 1624 } 1625 } 1626 1627 mempoolHits := atomic.LoadInt64(&m.resolverStats.mempoolHits) 1628 bundleHits := atomic.LoadInt64(&m.resolverStats.bundleHits) 1629 errors := atomic.LoadInt64(&m.resolverStats.errors) 1630 1631 totalTime := atomic.LoadInt64(&m.resolverStats.totalTime) 1632 totalMempoolTime := atomic.LoadInt64(&m.resolverStats.totalMempoolTime) 1633 totalIndexTime := atomic.LoadInt64(&m.resolverStats.totalIndexTime) 1634 totalLoadOpTime := atomic.LoadInt64(&m.resolverStats.totalLoadOpTime) 1635 1636 // Calculate overall averages 1637 avgTotalMs := float64(totalTime) / float64(totalResolutions) / 1000.0 1638 avgMempoolMs := float64(totalMempoolTime) / float64(totalResolutions) / 1000.0 1639 1640 stats := map[string]interface{}{ 1641 "total_resolutions": totalResolutions, 1642 "mempool_hits": mempoolHits, 1643 "bundle_hits": bundleHits, 1644 "errors": errors, 1645 "success_rate": float64(totalResolutions-errors) / float64(totalResolutions), 1646 "mempool_hit_rate": float64(mempoolHits) / float64(totalResolutions), 1647 1648 // Overall averages 1649 "avg_total_time_ms": avgTotalMs, 1650 "avg_mempool_time_ms": avgMempoolMs, 1651 } 1652 1653 // Only include bundle-specific stats if we have bundle hits 1654 if bundleHits > 0 { 1655 avgIndexMs := float64(totalIndexTime) / float64(bundleHits) / 1000.0 1656 avgLoadMs := float64(totalLoadOpTime) / float64(bundleHits) / 1000.0 1657 1658 stats["avg_index_time_ms"] = avgIndexMs 1659 stats["avg_load_op_time_ms"] = avgLoadMs 1660 } 1661 1662 // Recent statistics 1663 m.resolverStats.Lock() 1664 recentCopy := make([]resolverTiming, m.resolverStats.recentSize) 1665 copy(recentCopy, m.resolverStats.recentTimes) 1666 m.resolverStats.Unlock() 1667 1668 // Filter valid entries 1669 validRecent := make([]resolverTiming, 0) 1670 for _, t := range recentCopy { 1671 if t.totalTime > 0 { 1672 validRecent = append(validRecent, t) 1673 } 1674 } 1675 1676 if len(validRecent) > 0 { 1677 // Extract total times for percentiles 1678 totalTimes := make([]int64, len(validRecent)) 1679 for i, t := range validRecent { 1680 totalTimes[i] = t.totalTime 1681 } 1682 sort.Slice(totalTimes, func(i, j int) bool { 1683 return totalTimes[i] < totalTimes[j] 1684 }) 1685 1686 // Calculate recent average 1687 var recentSum int64 1688 var recentMempoolSum int64 1689 var recentIndexSum int64 1690 var recentLoadSum int64 1691 recentBundleCount := 0 1692 1693 for _, t := range validRecent { 1694 recentSum += t.totalTime 1695 recentMempoolSum += t.mempoolTime 1696 if t.source == "bundle" { 1697 recentIndexSum += t.indexTime 1698 recentLoadSum += t.loadOpTime 1699 recentBundleCount++ 1700 } 1701 } 1702 1703 stats["recent_avg_total_time_ms"] = float64(recentSum) / float64(len(validRecent)) / 1000.0 1704 stats["recent_avg_mempool_time_ms"] = float64(recentMempoolSum) / float64(len(validRecent)) / 1000.0 1705 1706 if recentBundleCount > 0 { 1707 stats["recent_avg_index_time_ms"] = float64(recentIndexSum) / float64(recentBundleCount) / 1000.0 1708 stats["recent_avg_load_time_ms"] = float64(recentLoadSum) / float64(recentBundleCount) / 1000.0 1709 } 1710 1711 stats["recent_sample_size"] = len(validRecent) 1712 1713 // Percentiles 1714 p50idx := len(totalTimes) * 50 / 100 1715 p95idx := len(totalTimes) * 95 / 100 1716 p99idx := len(totalTimes) * 99 / 100 1717 1718 stats["min_total_time_ms"] = float64(totalTimes[0]) / 1000.0 1719 stats["max_total_time_ms"] = float64(totalTimes[len(totalTimes)-1]) / 1000.0 1720 1721 if p50idx < len(totalTimes) { 1722 stats["p50_total_time_ms"] = float64(totalTimes[p50idx]) / 1000.0 1723 } 1724 if p95idx < len(totalTimes) { 1725 stats["p95_total_time_ms"] = float64(totalTimes[p95idx]) / 1000.0 1726 } 1727 if p99idx < len(totalTimes) { 1728 stats["p99_total_time_ms"] = float64(totalTimes[p99idx]) / 1000.0 1729 } 1730 } 1731 1732 return stats 1733} 1734 1735// ResetResolverStats resets resolver performance statistics 1736func (m *Manager) ResetResolverStats() { 1737 m.resolverStats.Lock() 1738 defer m.resolverStats.Unlock() 1739 1740 atomic.StoreInt64(&m.resolverStats.totalResolutions, 0) 1741 atomic.StoreInt64(&m.resolverStats.mempoolHits, 0) 1742 atomic.StoreInt64(&m.resolverStats.bundleHits, 0) 1743 atomic.StoreInt64(&m.resolverStats.errors, 0) 1744 atomic.StoreInt64(&m.resolverStats.totalTime, 0) 1745 atomic.StoreInt64(&m.resolverStats.totalMempoolTime, 0) 1746 atomic.StoreInt64(&m.resolverStats.totalIndexTime, 0) 1747 atomic.StoreInt64(&m.resolverStats.totalLoadOpTime, 0) 1748 1749 m.resolverStats.recentTimes = make([]resolverTiming, m.resolverStats.recentSize) 1750 m.resolverStats.recentIdx = 0 1751} 1752 1753func (m *Manager) SetQuiet(quiet bool) { 1754 m.config.Quiet = quiet 1755} 1756 1757// ShouldRebuildDIDIndex checks if DID index needs rebuilding 1758// Returns: (needsRebuild bool, reason string, canUpdateIncrementally bool) 1759func (m *Manager) ShouldRebuildDIDIndex() (bool, string, bool) { 1760 if m.didIndex == nil { 1761 return false, "DID index disabled", false 1762 } 1763 1764 needsRebuild, reason := m.didIndex.NeedsRebuild(m.GetBundleIndex()) 1765 1766 if needsRebuild { 1767 return true, reason, false 1768 } 1769 1770 // Check if incremental update is better 1771 canIncremental, behindBy := m.didIndex.ShouldUpdateIncrementally(m.GetBundleIndex()) 1772 if canIncremental { 1773 return false, fmt.Sprintf("can update incrementally (%d bundles)", behindBy), true 1774 } 1775 1776 return false, "index is up to date", false 1777} 1778 1779// UpdateDIDIndexSmart updates DID index intelligently (rebuild vs incremental) 1780func (m *Manager) UpdateDIDIndexSmart(ctx context.Context, progressCallback func(current, total int)) error { 1781 needsRebuild, reason, canIncremental := m.ShouldRebuildDIDIndex() 1782 1783 if !needsRebuild && !canIncremental { 1784 if m.config.Verbose { 1785 m.logger.Printf("DID index is up to date") 1786 } 1787 return nil 1788 } 1789 1790 if needsRebuild { 1791 m.logger.Printf("Rebuilding DID index: %s", reason) 1792 return m.BuildDIDIndex(ctx, progressCallback) 1793 } 1794 1795 if canIncremental { 1796 m.logger.Printf("Updating DID index incrementally: %s", reason) 1797 return m.updateDIDIndexIncremental(ctx, progressCallback) 1798 } 1799 1800 return nil 1801} 1802 1803// updateDIDIndexIncremental updates index for missing bundles only 1804func (m *Manager) updateDIDIndexIncremental(ctx context.Context, progressCallback func(current, total int)) error { 1805 config := m.didIndex.GetConfig() 1806 lastBundle := m.index.GetLastBundle() 1807 1808 if lastBundle == nil || config.LastBundle >= lastBundle.BundleNumber { 1809 return nil 1810 } 1811 1812 start := config.LastBundle + 1 1813 end := lastBundle.BundleNumber 1814 total := end - start + 1 1815 1816 m.logger.Printf("Updating DID index for bundles %d-%d (%d bundles)", start, end, total) 1817 1818 for bundleNum := start; bundleNum <= end; bundleNum++ { 1819 bundle, err := m.LoadBundle(ctx, bundleNum) 1820 if err != nil { 1821 return fmt.Errorf("failed to load bundle %d: %w", bundleNum, err) 1822 } 1823 1824 bundleData := &didindex.BundleData{ 1825 BundleNumber: bundle.BundleNumber, 1826 Operations: bundle.Operations, 1827 } 1828 1829 if err := m.didIndex.UpdateIndexForBundle(ctx, bundleData); err != nil { 1830 return fmt.Errorf("failed to update bundle %d: %w", bundleNum, err) 1831 } 1832 1833 if progressCallback != nil { 1834 progressCallback(bundleNum-start+1, total) 1835 } 1836 } 1837 1838 return nil 1839}