A Transparent and Verifiable Way to Sync the AT Protocol's PLC Directory

mempool

+269 -7
+29 -2
cmd/plcbundle/main.go
··· 811 811 fs := flag.NewFlagSet("serve", flag.ExitOnError) 812 812 port := fs.String("port", "8080", "HTTP server port") 813 813 host := fs.String("host", "127.0.0.1", "HTTP server host") 814 + mirror := fs.Bool("mirror", false, "enable mirror mode (auto-sync from PLC)") 815 + plcURL := fs.String("plc", "https://plc.directory", "PLC directory URL (for mirror mode)") 816 + syncInterval := fs.Duration("sync-interval", 5*time.Minute, "sync interval for mirror mode") 814 817 fs.Parse(os.Args[2:]) 815 818 816 - mgr, dir, err := getManager("") 819 + // Create manager with PLC client if mirror mode is enabled 820 + var plcURLForManager string 821 + if *mirror { 822 + plcURLForManager = *plcURL 823 + } 824 + 825 + mgr, dir, err := getManager(plcURLForManager) 817 826 if err != nil { 818 827 fmt.Fprintf(os.Stderr, "Error: %v\n", err) 819 828 os.Exit(1) ··· 825 834 fmt.Printf("Starting plcbundle HTTP server...\n") 826 835 fmt.Printf(" Directory: %s\n", dir) 827 836 fmt.Printf(" Listening: http://%s\n", addr) 837 + 838 + if *mirror { 839 + fmt.Printf(" Mirror mode: ENABLED\n") 840 + fmt.Printf(" PLC URL: %s\n", *plcURL) 841 + fmt.Printf(" Sync interval: %s\n", *syncInterval) 842 + fmt.Printf(" Mempool API: ENABLED\n") // Added 843 + } else { 844 + fmt.Printf(" Mirror mode: disabled\n") 845 + } 846 + 828 847 fmt.Printf("\nPress Ctrl+C to stop\n\n") 829 848 849 + // Start mirror sync if enabled 850 + ctx, cancel := context.WithCancel(context.Background()) 851 + defer cancel() 852 + 853 + if *mirror { 854 + go runMirrorSync(ctx, mgr, *syncInterval) 855 + } 856 + 830 857 server := &http.Server{ 831 858 Addr: addr, 832 - Handler: newServerHandler(mgr), 859 + Handler: newServerHandler(mgr, *mirror), // Pass mirror flag 833 860 ReadTimeout: 30 * time.Second, 834 861 WriteTimeout: 30 * time.Second, 835 862 }
+240 -5
cmd/plcbundle/server.go
··· 1 1 package main 2 2 3 3 import ( 4 + "context" 4 5 "encoding/json" 5 6 "fmt" 6 7 "io" ··· 12 13 "github.com/atscan/plcbundle/bundle" 13 14 ) 14 15 15 - func newServerHandler(mgr *bundle.Manager) http.Handler { 16 + func newServerHandler(mgr *bundle.Manager, mirrorMode bool) http.Handler { 16 17 mux := http.NewServeMux() 17 18 18 19 // Root - ASCII art + info ··· 21 22 http.NotFound(w, r) 22 23 return 23 24 } 24 - handleRoot(w, r, mgr) 25 + handleRoot(w, r, mgr, mirrorMode) 25 26 }) 26 27 27 28 // Index JSON ··· 44 45 handleBundleJSONL(w, r, mgr) 45 46 }) 46 47 48 + // Mempool endpoints (only if mirror mode enabled) 49 + if mirrorMode { 50 + mux.HandleFunc("/mempool", func(w http.ResponseWriter, r *http.Request) { 51 + handleMempool(w, mgr) 52 + }) 53 + 54 + mux.HandleFunc("/mempool/stats", func(w http.ResponseWriter, r *http.Request) { 55 + handleMempoolStats(w, mgr) 56 + }) 57 + 58 + mux.HandleFunc("/mempool/operations", func(w http.ResponseWriter, r *http.Request) { 59 + handleMempoolOperations(w, mgr) 60 + }) 61 + } 62 + 47 63 return mux 48 64 } 49 65 50 - func handleRoot(w http.ResponseWriter, r *http.Request, mgr *bundle.Manager) { 66 + func handleRoot(w http.ResponseWriter, r *http.Request, mgr *bundle.Manager, mirrorMode bool) { 51 67 w.Header().Set("Content-Type", "text/plain; charset=utf-8") 52 68 53 69 index := mgr.GetIndex() ··· 70 86 fmt.Fprintf(w, "Server Stats\n") 71 87 fmt.Fprintf(w, "━━━━━━━━━━━━\n") 72 88 fmt.Fprintf(w, " Bundle count: %d\n", bundleCount) 89 + fmt.Fprintf(w, " Mirror mode: %v\n", mirrorMode) 73 90 74 91 if bundleCount > 0 { 75 92 firstBundle := stats["first_bundle"].(int) ··· 96 113 } 97 114 } 98 115 116 + // Show mempool stats if mirror mode 117 + if mirrorMode { 118 + mempoolStats := mgr.GetMempoolStats() 119 + count := mempoolStats["count"].(int) 120 + targetBundle := mempoolStats["target_bundle"].(int) 121 + canCreate := mempoolStats["can_create_bundle"].(bool) 122 + 123 + fmt.Fprintf(w, "\nMempool Stats\n") 124 + fmt.Fprintf(w, "━━━━━━━━━━━━━\n") 125 + fmt.Fprintf(w, " Target bundle: %06d\n", targetBundle) 126 + fmt.Fprintf(w, " Operations: %d / %d\n", count, bundle.BUNDLE_SIZE) 127 + fmt.Fprintf(w, " Can create bundle: %v\n", canCreate) 128 + 129 + if count > 0 { 130 + progress := float64(count) / float64(bundle.BUNDLE_SIZE) * 100 131 + fmt.Fprintf(w, " Progress: %.1f%%\n", progress) 132 + 133 + if firstTime, ok := mempoolStats["first_time"].(time.Time); ok { 134 + fmt.Fprintf(w, " First op: %s\n", firstTime.Format("2006-01-02 15:04:05")) 135 + } 136 + if lastTime, ok := mempoolStats["last_time"].(time.Time); ok { 137 + fmt.Fprintf(w, " Last op: %s\n", lastTime.Format("2006-01-02 15:04:05")) 138 + } 139 + } 140 + } 141 + 99 142 fmt.Fprintf(w, "\nAPI Endpoints\n") 100 143 fmt.Fprintf(w, "━━━━━━━━━━━━━\n") 101 144 fmt.Fprintf(w, " GET / This info page\n") ··· 104 147 fmt.Fprintf(w, " GET /data/:number Raw bundle (zstd compressed)\n") 105 148 fmt.Fprintf(w, " GET /jsonl/:number Decompressed JSONL stream\n") 106 149 150 + if mirrorMode { 151 + fmt.Fprintf(w, "\nMempool Endpoints (Mirror Mode)\n") 152 + fmt.Fprintf(w, "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n") 153 + fmt.Fprintf(w, " GET /mempool Mempool info (HTML)\n") 154 + fmt.Fprintf(w, " GET /mempool/stats Mempool statistics (JSON)\n") 155 + fmt.Fprintf(w, " GET /mempool/operations Mempool operations (JSONL)\n") 156 + } 157 + 107 158 fmt.Fprintf(w, "\nExamples\n") 108 159 fmt.Fprintf(w, "━━━━━━━━\n") 109 160 fmt.Fprintf(w, " # Get bundle metadata\n") ··· 112 163 fmt.Fprintf(w, " curl http://%s/data/1 -o 000001.jsonl.zst\n\n", r.Host) 113 164 fmt.Fprintf(w, " # Stream decompressed operations\n") 114 165 fmt.Fprintf(w, " curl http://%s/jsonl/1\n\n", r.Host) 115 - fmt.Fprintf(w, " # Get full index\n") 116 - fmt.Fprintf(w, " curl http://%s/index.json\n\n", r.Host) 166 + 167 + if mirrorMode { 168 + fmt.Fprintf(w, " # Get mempool operations\n") 169 + fmt.Fprintf(w, " curl http://%s/mempool/operations\n\n", r.Host) 170 + fmt.Fprintf(w, " # Get mempool stats\n") 171 + fmt.Fprintf(w, " curl http://%s/mempool/stats\n\n", r.Host) 172 + } 117 173 118 174 fmt.Fprintf(w, "\n────────────────────────────────────────────────────────────────\n") 119 175 fmt.Fprintf(w, "plcbundle v%s | https://github.com/atscan/plcbundle\n", version) 120 176 } 121 177 178 + // handleMempool returns mempool info as HTML/text 179 + func handleMempool(w http.ResponseWriter, mgr *bundle.Manager) { 180 + w.Header().Set("Content-Type", "text/plain; charset=utf-8") 181 + w.Header().Set("Access-Control-Allow-Origin", "*") 182 + 183 + stats := mgr.GetMempoolStats() 184 + count := stats["count"].(int) 185 + targetBundle := stats["target_bundle"].(int) 186 + canCreate := stats["can_create_bundle"].(bool) 187 + minTimestamp := stats["min_timestamp"].(time.Time) 188 + validated := stats["validated"].(bool) 189 + 190 + fmt.Fprintf(w, "Mempool Status\n") 191 + fmt.Fprintf(w, "══════════════\n\n") 192 + fmt.Fprintf(w, "Target Bundle: %06d\n", targetBundle) 193 + fmt.Fprintf(w, "Operations: %d / %d\n", count, bundle.BUNDLE_SIZE) 194 + fmt.Fprintf(w, "Can Create Bundle: %v\n", canCreate) 195 + fmt.Fprintf(w, "Min Timestamp: %s\n", minTimestamp.Format(time.RFC3339)) 196 + fmt.Fprintf(w, "Validated: %v\n\n", validated) 197 + 198 + if count > 0 { 199 + progress := float64(count) / float64(bundle.BUNDLE_SIZE) * 100 200 + fmt.Fprintf(w, "Progress: %.1f%%\n", progress) 201 + 202 + // Progress bar 203 + barWidth := 50 204 + filled := int(float64(barWidth) * float64(count) / float64(bundle.BUNDLE_SIZE)) 205 + if filled > barWidth { 206 + filled = barWidth 207 + } 208 + bar := strings.Repeat("█", filled) + strings.Repeat("░", barWidth-filled) 209 + fmt.Fprintf(w, "[%s]\n\n", bar) 210 + 211 + if sizeBytes, ok := stats["size_bytes"].(int); ok { 212 + fmt.Fprintf(w, "Size: %.2f KB\n", float64(sizeBytes)/1024) 213 + } 214 + if firstTime, ok := stats["first_time"].(time.Time); ok { 215 + fmt.Fprintf(w, "First Operation: %s\n", firstTime.Format(time.RFC3339)) 216 + } 217 + if lastTime, ok := stats["last_time"].(time.Time); ok { 218 + fmt.Fprintf(w, "Last Operation: %s\n", lastTime.Format(time.RFC3339)) 219 + } 220 + } else { 221 + fmt.Fprintf(w, "(empty)\n") 222 + } 223 + 224 + fmt.Fprintf(w, "\nEndpoints:\n") 225 + fmt.Fprintf(w, " /mempool/stats - JSON statistics\n") 226 + fmt.Fprintf(w, " /mempool/operations - JSONL stream of operations\n") 227 + } 228 + 229 + // handleMempoolStats returns mempool statistics as JSON 230 + func handleMempoolStats(w http.ResponseWriter, mgr *bundle.Manager) { 231 + w.Header().Set("Content-Type", "application/json") 232 + w.Header().Set("Access-Control-Allow-Origin", "*") 233 + 234 + stats := mgr.GetMempoolStats() 235 + 236 + data, err := json.MarshalIndent(stats, "", " ") 237 + if err != nil { 238 + http.Error(w, "Failed to marshal stats", http.StatusInternalServerError) 239 + return 240 + } 241 + 242 + w.Write(data) 243 + } 244 + 245 + // handleMempoolOperations streams mempool operations as JSONL 246 + func handleMempoolOperations(w http.ResponseWriter, mgr *bundle.Manager) { 247 + ops, err := mgr.GetMempoolOperations() 248 + if err != nil { 249 + http.Error(w, fmt.Sprintf("Failed to get mempool operations: %v", err), http.StatusInternalServerError) 250 + return 251 + } 252 + 253 + w.Header().Set("Content-Type", "application/x-ndjson") 254 + w.Header().Set("Access-Control-Allow-Origin", "*") 255 + 256 + if len(ops) == 0 { 257 + // Return empty response 258 + return 259 + } 260 + 261 + // Stream operations as JSONL 262 + for _, op := range ops { 263 + if len(op.RawJSON) > 0 { 264 + w.Write(op.RawJSON) 265 + } else { 266 + // Fallback to marshaling if no raw JSON 267 + data, _ := json.Marshal(op) 268 + w.Write(data) 269 + } 270 + w.Write([]byte("\n")) 271 + } 272 + } 273 + 122 274 func handleIndexJSON(w http.ResponseWriter, mgr *bundle.Manager) { 123 275 index := mgr.GetIndex() 124 276 ··· 229 381 fmt.Fprintf(os.Stderr, "Error streaming bundle %d: %v\n", bundleNum, err) 230 382 } 231 383 } 384 + 385 + // runMirrorSync continuously fetches new bundles in the background 386 + func runMirrorSync(ctx context.Context, mgr *bundle.Manager, interval time.Duration) { 387 + fmt.Printf("[Mirror] Starting sync loop (interval: %s)\n", interval) 388 + 389 + // Do initial sync immediately 390 + syncBundles(ctx, mgr) 391 + 392 + ticker := time.NewTicker(interval) 393 + defer ticker.Stop() 394 + 395 + for { 396 + select { 397 + case <-ctx.Done(): 398 + fmt.Printf("[Mirror] Sync stopped\n") 399 + return 400 + case <-ticker.C: 401 + syncBundles(ctx, mgr) 402 + } 403 + } 404 + } 405 + 406 + // syncBundles fetches all available bundles 407 + func syncBundles(ctx context.Context, mgr *bundle.Manager) { 408 + index := mgr.GetIndex() 409 + lastBundle := index.GetLastBundle() 410 + startBundle := 1 411 + if lastBundle != nil { 412 + startBundle = lastBundle.BundleNumber + 1 413 + } 414 + 415 + fmt.Printf("[Mirror] Checking for new bundles (current: %06d)...\n", startBundle-1) 416 + 417 + fetchedCount := 0 418 + consecutiveErrors := 0 419 + maxConsecutiveErrors := 3 420 + 421 + for { 422 + currentBundle := startBundle + fetchedCount 423 + 424 + b, err := mgr.FetchNextBundle(ctx) 425 + if err != nil { 426 + // Check if we've reached the end 427 + if isEndOfDataError(err) { 428 + if fetchedCount > 0 { 429 + fmt.Printf("[Mirror] ✓ Synced %d new bundles (now at %06d)\n", 430 + fetchedCount, currentBundle-1) 431 + } else { 432 + fmt.Printf("[Mirror] ✓ Already up to date (bundle %06d)\n", startBundle-1) 433 + } 434 + break 435 + } 436 + 437 + // Handle other errors 438 + consecutiveErrors++ 439 + fmt.Fprintf(os.Stderr, "[Mirror] Error fetching bundle %06d: %v\n", currentBundle, err) 440 + 441 + if consecutiveErrors >= maxConsecutiveErrors { 442 + fmt.Fprintf(os.Stderr, "[Mirror] Too many consecutive errors, stopping sync\n") 443 + break 444 + } 445 + 446 + // Wait before retry 447 + time.Sleep(5 * time.Second) 448 + continue 449 + } 450 + 451 + // Reset error counter on success 452 + consecutiveErrors = 0 453 + 454 + if err := mgr.SaveBundle(ctx, b); err != nil { 455 + fmt.Fprintf(os.Stderr, "[Mirror] Error saving bundle %06d: %v\n", b.BundleNumber, err) 456 + break 457 + } 458 + 459 + fetchedCount++ 460 + fmt.Printf("[Mirror] ✓ Fetched bundle %06d (%d ops, %d DIDs)\n", 461 + b.BundleNumber, len(b.Operations), b.DIDCount) 462 + 463 + // Add a small delay between fetches to be nice to the PLC directory 464 + time.Sleep(500 * time.Millisecond) 465 + } 466 + }