A Transparent and Verifiable Way to Sync the AT Protocol's PLC Directory

load rebuild progress

+96 -15
+42 -5
bundle/manager.go
··· 110 110 } 111 111 112 112 // Use parallel scan with auto-detected CPU count 113 - workers := runtime.NumCPU() 114 - if workers < 1 { 115 - workers = 1 113 + workers := config.RebuildWorkers 114 + if workers <= 0 { 115 + workers = runtime.NumCPU() 116 + if workers < 1 { 117 + workers = 1 118 + } 116 119 } 117 120 118 121 config.Logger.Printf("Using %d workers for parallel scan", workers) 119 122 123 + // Create progress callback wrapper 124 + progressCallback := config.RebuildProgress 125 + if progressCallback == nil { 126 + // Default: log every 100 bundles 127 + progressCallback = func(current, total int) { 128 + if current%100 == 0 || current == total { 129 + config.Logger.Printf("Rebuild progress: %d/%d bundles (%.1f%%)", 130 + current, total, float64(current)/float64(total)*100) 131 + } 132 + } 133 + } 134 + 135 + start := time.Now() 136 + 120 137 // Scan directory to rebuild index (parallel) 121 - _, err := tempMgr.ScanDirectoryParallel(workers, nil) 138 + result, err := tempMgr.ScanDirectoryParallel(workers, progressCallback) 122 139 if err != nil { 123 140 return nil, fmt.Errorf("failed to rebuild index: %w", err) 124 141 } 142 + 143 + elapsed := time.Since(start) 125 144 126 145 // Reload the rebuilt index 127 146 index, err = LoadIndex(indexPath) ··· 129 148 return nil, fmt.Errorf("failed to load rebuilt index: %w", err) 130 149 } 131 150 132 - config.Logger.Printf("✓ Index rebuilt with %d bundles", index.Count()) 151 + config.Logger.Printf("✓ Index rebuilt with %d bundles in %s (%.1f bundles/sec)", 152 + index.Count(), elapsed.Round(time.Millisecond), float64(result.BundleCount)/elapsed.Seconds()) 153 + 154 + // Verify all chain hashes are present 155 + bundles := index.GetBundles() 156 + missingHashes := 0 157 + for i, meta := range bundles { 158 + if meta.Hash == "" { 159 + config.Logger.Printf("⚠️ Bundle %06d has empty Hash after rebuild!", meta.BundleNumber) 160 + missingHashes++ 161 + } 162 + if i > 0 && meta.ChainHash == "" { 163 + config.Logger.Printf("⚠️ Bundle %06d has empty ChainHash after rebuild!", meta.BundleNumber) 164 + missingHashes++ 165 + } 166 + } 167 + if missingHashes > 0 { 168 + config.Logger.Printf("⚠️ Warning: %d bundles have missing hashes", missingHashes) 169 + } 133 170 } 134 171 135 172 // Initialize mempool for next bundle
+12 -6
bundle/types.go
··· 173 173 174 174 // Config holds configuration for bundle operations 175 175 type Config struct { 176 - BundleDir string 177 - VerifyOnLoad bool 178 - Logger Logger 176 + BundleDir string 177 + VerifyOnLoad bool 178 + AutoRebuild bool 179 + RebuildWorkers int // Number of workers for parallel rebuild (0 = auto-detect) 180 + RebuildProgress func(current, total int) // Progress callback for rebuild 181 + Logger Logger 179 182 } 180 183 181 184 // DefaultConfig returns default configuration 182 185 func DefaultConfig(bundleDir string) *Config { 183 186 return &Config{ 184 - BundleDir: bundleDir, 185 - VerifyOnLoad: true, 186 - Logger: nil, // Will use defaultLogger in manager 187 + BundleDir: bundleDir, 188 + VerifyOnLoad: true, 189 + AutoRebuild: true, 190 + RebuildWorkers: 0, // 0 means auto-detect CPU count 191 + RebuildProgress: nil, // No progress callback by default 192 + Logger: nil, 187 193 } 188 194 }
+42 -4
cmd/plcbundle/main.go
··· 1001 1001 plcURL := fs.String("plc", "https://plc.directory", "PLC directory URL (for sync mode)") 1002 1002 syncInterval := fs.Duration("sync-interval", 5*time.Minute, "sync interval for sync mode") 1003 1003 enableWebSocket := fs.Bool("websocket", false, "enable WebSocket endpoint for streaming records") 1004 + workers := fs.Int("workers", 4, "number of workers for auto-rebuild (0 = CPU count)") 1004 1005 fs.Parse(os.Args[2:]) 1006 + 1007 + // Auto-detect CPU count 1008 + if *workers == 0 { 1009 + *workers = runtime.NumCPU() 1010 + } 1005 1011 1006 1012 // Create manager with PLC client if sync mode is enabled 1007 1013 var plcURLForManager string ··· 1009 1015 plcURLForManager = *plcURL 1010 1016 } 1011 1017 1012 - // NewManager now handles auto-rebuild automatically 1013 - mgr, dir, err := getManager(plcURLForManager) // ← Capture dir here 1018 + dir, err := os.Getwd() 1019 + if err != nil { 1020 + fmt.Fprintf(os.Stderr, "Error: %v\n", err) 1021 + os.Exit(1) 1022 + } 1023 + 1024 + // Ensure directory exists 1025 + if err := os.MkdirAll(dir, 0755); err != nil { 1026 + fmt.Fprintf(os.Stderr, "Error: %v\n", err) 1027 + os.Exit(1) 1028 + } 1029 + 1030 + config := bundle.DefaultConfig(dir) 1031 + config.RebuildWorkers = *workers 1032 + 1033 + // Add progress callback for rebuild 1034 + config.RebuildProgress = func(current, total int) { 1035 + if current%100 == 0 || current == total { 1036 + fmt.Printf("[Rebuild] Progress: %d/%d bundles (%.1f%%) \r", 1037 + current, total, float64(current)/float64(total)*100) 1038 + if current == total { 1039 + fmt.Println() // New line when complete 1040 + } 1041 + } 1042 + } 1043 + 1044 + var client *plc.Client 1045 + if plcURLForManager != "" { 1046 + client = plc.NewClient(plcURLForManager) 1047 + } 1048 + 1049 + fmt.Printf("Starting plcbundle HTTP server...\n") 1050 + fmt.Printf(" Directory: %s\n", dir) 1051 + 1052 + // NewManager now handles auto-rebuild automatically with progress 1053 + mgr, err := bundle.NewManager(config, client) 1014 1054 if err != nil { 1015 1055 fmt.Fprintf(os.Stderr, "Error: %v\n", err) 1016 1056 os.Exit(1) ··· 1019 1059 1020 1060 addr := fmt.Sprintf("%s:%s", *host, *port) 1021 1061 1022 - fmt.Printf("Starting plcbundle HTTP server...\n") 1023 - fmt.Printf(" Directory: %s\n", dir) // ← Now dir is available 1024 1062 fmt.Printf(" Listening: http://%s\n", addr) 1025 1063 1026 1064 if *sync {