[DEPRECATED] Go implementation of plcbundle

tuning did resolver

+380 -45
+30
bundle/did_index.go
··· 166 166 } 167 167 } 168 168 169 + dim.cacheMu.RLock() 170 + cacheSize := len(dim.shardCache) 171 + dim.cacheMu.RUnlock() 172 + 173 + if dim.verbose || cacheSize > dim.maxCache { 174 + dim.logger.Printf("DEBUG: Shard cache size: %d/%d shards (after lookup)", cacheSize, dim.maxCache) 175 + } 176 + 169 177 return locations, nil 170 178 } 171 179 ··· 547 555 548 556 return nil 549 557 } 558 + 559 + // Add new method to DIDIndexManager 560 + func (dim *DIDIndexManager) TrimCache() { 561 + dim.cacheMu.Lock() 562 + defer dim.cacheMu.Unlock() 563 + 564 + // Keep only most recent shard 565 + if len(dim.shardCache) > 1 { 566 + toEvict := len(dim.shardCache) - 1 567 + for i := 0; i < toEvict; i++ { 568 + if len(dim.cacheOrder) > 0 { 569 + victimNum := dim.cacheOrder[0] 570 + dim.cacheOrder = dim.cacheOrder[1:] 571 + 572 + if victim, exists := dim.shardCache[victimNum]; exists { 573 + dim.unmapShard(victim) 574 + delete(dim.shardCache, victimNum) 575 + } 576 + } 577 + } 578 + } 579 + }
+3 -14
bundle/manager.go
··· 277 277 mempool: mempool, 278 278 didIndex: didIndex, 279 279 bundleCache: make(map[int]*Bundle), 280 - maxCacheSize: 10, // Keep 10 recent bundles in memory (~50-100 MB) 280 + maxCacheSize: 2, // Keep 10 recent bundles in memory (~50-100 MB) 281 281 }, nil 282 282 } 283 283 ··· 1345 1345 // LoadOperation loads a single operation from a bundle efficiently 1346 1346 // This is much faster than LoadBundle() when you only need one operation 1347 1347 func (m *Manager) LoadOperation(ctx context.Context, bundleNumber int, position int) (*plc.PLCOperation, error) { 1348 - m.logger.Printf("DEBUG: Using LoadOperation (position=%d) - should be faster!", position) 1348 + m.logger.Printf("🔍 DEBUG: LoadOperation called (bundle=%d, position=%d)", bundleNumber, position) 1349 1349 1350 1350 // Validate bundle exists in index 1351 - meta, err := m.index.GetBundle(bundleNumber) 1351 + _, err := m.index.GetBundle(bundleNumber) 1352 1352 if err != nil { 1353 1353 return nil, fmt.Errorf("bundle not in index: %w", err) 1354 1354 } ··· 1362 1362 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", bundleNumber)) 1363 1363 if !m.operations.FileExists(path) { 1364 1364 return nil, fmt.Errorf("bundle file not found: %s", path) 1365 - } 1366 - 1367 - // Verify hash if enabled (same as LoadBundle) 1368 - if m.config.VerifyOnLoad { 1369 - valid, actualHash, err := m.operations.VerifyHash(path, meta.CompressedHash) 1370 - if err != nil { 1371 - return nil, fmt.Errorf("failed to verify hash: %w", err) 1372 - } 1373 - if !valid { 1374 - return nil, fmt.Errorf("hash mismatch: expected %s, got %s", meta.CompressedHash, actualHash) 1375 - } 1376 1365 } 1377 1366 1378 1367 // Load just the one operation (efficient!)
+16 -16
bundle/operations.go
··· 8 8 "fmt" 9 9 "io" 10 10 "os" 11 + "sync" 11 12 "time" 12 13 13 14 gozstd "github.com/DataDog/zstd" ··· 121 122 return contentHash, compressedHash, contentSize, compressedSize, nil 122 123 } 123 124 125 + // Pool for scanner buffers (reuse across requests) 126 + var scannerBufPool = sync.Pool{ 127 + New: func() interface{} { 128 + buf := make([]byte, 64*1024) 129 + return &buf 130 + }, 131 + } 132 + 124 133 // LoadOperationAtPosition loads a single operation from a bundle without loading the entire bundle 125 134 // This is much more efficient for single-operation lookups 126 135 func (op *Operations) LoadOperationAtPosition(path string, position int) (*plc.PLCOperation, error) { ··· 128 137 return nil, fmt.Errorf("invalid position: %d", position) 129 138 } 130 139 131 - // ✨ Add this debug log 132 - startTime := time.Now() 133 - defer func() { 134 - elapsed := time.Since(startTime) 135 - op.logger.Printf("DEBUG: LoadOperationAtPosition(pos=%d) took %v (scanned %d lines)", 136 - position, elapsed, position+1) 137 - }() 138 - 139 - // Open compressed file 140 140 file, err := os.Open(path) 141 141 if err != nil { 142 142 return nil, fmt.Errorf("failed to open file: %w", err) 143 143 } 144 144 defer file.Close() 145 145 146 - // Create zstd decompression reader (streaming) 147 146 reader := gozstd.NewReader(file) 148 147 defer reader.Close() 149 148 150 - // Use scanner to skip to target line 149 + // ✨ Get buffer from pool 150 + bufPtr := scannerBufPool.Get().(*[]byte) 151 + defer scannerBufPool.Put(bufPtr) // Return to pool when done 152 + 151 153 scanner := bufio.NewScanner(reader) 152 - buf := make([]byte, 0, 64*1024) 153 - scanner.Buffer(buf, 1024*1024) 154 + scanner.Buffer(*bufPtr, 512*1024) 154 155 155 156 lineNum := 0 156 157 for scanner.Scan() { 157 158 if lineNum == position { 158 - // Found target line! 159 159 line := scanner.Bytes() 160 160 161 161 var operation plc.PLCOperation ··· 163 163 return nil, fmt.Errorf("failed to parse operation at position %d: %w", position, err) 164 164 } 165 165 166 - // Store raw JSON 166 + // Copy raw JSON 167 167 operation.RawJSON = make([]byte, len(line)) 168 168 copy(operation.RawJSON, line) 169 169 ··· 176 176 return nil, fmt.Errorf("scanner error: %w", err) 177 177 } 178 178 179 - return nil, fmt.Errorf("position %d not found (bundle has %d operations)", position, lineNum) 179 + return nil, fmt.Errorf("position %d not found", position) 180 180 } 181 181 182 182 // ========================================
+9 -15
cmd/plcbundle/did_index.go
··· 385 385 386 386 fmt.Fprintf(os.Stderr, "Index lookup: %s (shard access)\n", indexTime) 387 387 388 - // STEP 2: Bundle loading timing 389 - bundleStart := time.Now() 390 - bndl, err := mgr.LoadBundle(ctx, int(latestLoc.Bundle)) 388 + // STEP 2: Operation loading timing (single op, not full bundle!) 389 + opStart := time.Now() 390 + op, err := mgr.LoadOperation(ctx, int(latestLoc.Bundle), int(latestLoc.Position)) 391 391 if err != nil { 392 - fmt.Fprintf(os.Stderr, "Error loading bundle: %v\n", err) 393 - os.Exit(1) 394 - } 395 - bundleTime := time.Since(bundleStart) 396 - 397 - if int(latestLoc.Position) >= len(bndl.Operations) { 398 - fmt.Fprintf(os.Stderr, "Invalid position\n") 392 + fmt.Fprintf(os.Stderr, "Error loading operation: %v\n", err) 399 393 os.Exit(1) 400 394 } 395 + opTime := time.Since(opStart) 401 396 402 - op := bndl.Operations[latestLoc.Position] 403 - 404 - fmt.Fprintf(os.Stderr, "Bundle load: %s (bundle %d, pos %d)\n", 405 - bundleTime, latestLoc.Bundle, latestLoc.Position) 397 + fmt.Fprintf(os.Stderr, "Operation load: %s (bundle %d, pos %d)\n", 398 + opTime, latestLoc.Bundle, latestLoc.Position) 406 399 407 400 // STEP 3: Build DID document 408 - ops := []plc.PLCOperation{op} 401 + ops := []plc.PLCOperation{*op} 409 402 doc, err := plc.ResolveDIDDocument(did, ops) 410 403 if err != nil { 411 404 fmt.Fprintf(os.Stderr, "Build document failed: %v\n", err) ··· 418 411 // Output to stdout 419 412 data, _ := json.MarshalIndent(doc, "", " ") 420 413 fmt.Println(string(data)) 414 + 421 415 }
+30
cmd/plcbundle/server.go
··· 8 8 "log" 9 9 "net/http" 10 10 "os" 11 + "runtime" 11 12 "strconv" 12 13 "strings" 13 14 "time" ··· 342 343 // Status endpoint 343 344 mux.HandleFunc("/status", func(w http.ResponseWriter, r *http.Request) { 344 345 handleStatus(w, mgr, syncMode, wsEnabled) 346 + }) 347 + 348 + mux.HandleFunc("/debug/memory", func(w http.ResponseWriter, r *http.Request) { 349 + var m runtime.MemStats 350 + runtime.ReadMemStats(&m) 351 + 352 + didStats := mgr.GetDIDIndexStats() 353 + 354 + fmt.Fprintf(w, "Memory Stats:\n") 355 + fmt.Fprintf(w, " Alloc: %d MB\n", m.Alloc/1024/1024) 356 + fmt.Fprintf(w, " TotalAlloc: %d MB\n", m.TotalAlloc/1024/1024) 357 + fmt.Fprintf(w, " Sys: %d MB\n", m.Sys/1024/1024) 358 + fmt.Fprintf(w, " NumGC: %d\n", m.NumGC) 359 + fmt.Fprintf(w, "\nDID Index:\n") 360 + fmt.Fprintf(w, " Cached shards: %d/%d\n", didStats["cached_shards"], didStats["cache_limit"]) 361 + 362 + // Force GC and show again 363 + runtime.GC() 364 + runtime.ReadMemStats(&m) 365 + fmt.Fprintf(w, "\nAfter GC:\n") 366 + fmt.Fprintf(w, " Alloc: %d MB\n", m.Alloc/1024/1024) 345 367 }) 346 368 347 369 // WebSocket endpoint (if enabled) ··· 1088 1110 func handleDIDDocument(w http.ResponseWriter, r *http.Request, mgr *bundle.Manager, did string) { 1089 1111 ctx := r.Context() 1090 1112 1113 + // ✨ Trim shard cache BEFORE resolution 1114 + if didIdx := mgr.GetDIDIndex(); didIdx != nil { 1115 + didIdx.TrimCache() 1116 + } 1117 + 1091 1118 // Bundle package: just get the operations 1092 1119 operations, err := mgr.GetDIDOperations(ctx, did, false) 1093 1120 if err != nil { ··· 1110 1137 } 1111 1138 return 1112 1139 } 1140 + 1141 + // ✨ Clear operation data to help GC 1142 + operations = nil 1113 1143 1114 1144 w.Header().Set("Content-Type", "application/did+ld+json") 1115 1145 w.Header().Set("Access-Control-Allow-Origin", "*")
+169
scripts/did-resolve-benchmark-http.sh
··· 1 + #!/bin/bash 2 + # did-resolve-benchmark-http.sh - Benchmark DID resolution over HTTP 3 + 4 + set -e 5 + 6 + SERVER_URL=${1:-"http://localhost:8080"} 7 + BUNDLE=${2:-1} 8 + SAMPLES=${3:-20} 9 + 10 + echo "═══════════════════════════════════════════════════════════" 11 + echo " DID Resolution HTTP Performance Benchmark" 12 + echo "═══════════════════════════════════════════════════════════" 13 + echo "" 14 + echo "Server: $SERVER_URL" 15 + echo "Bundle: $BUNDLE" 16 + echo "Samples per position range: $SAMPLES" 17 + echo "" 18 + 19 + # Check if server is responding 20 + if ! curl -s -f "$SERVER_URL/status" > /dev/null 2>&1; then 21 + echo "❌ Error: Server not responding at $SERVER_URL" 22 + echo "" 23 + echo "Start server first:" 24 + echo " plcbundle serve --port 8080" 25 + exit 1 26 + fi 27 + 28 + echo "✓ Server is online" 29 + echo "" 30 + 31 + # Extract DIDs at different positions from local bundle 32 + echo "Extracting test DIDs from bundle $BUNDLE..." 33 + 34 + # Early positions (0-100) 35 + EARLY_DIDS=$(plcbundle export --bundles $BUNDLE | head -100 | jq -r '.did' | head -$SAMPLES) 36 + 37 + # Middle positions (~5000) 38 + MIDDLE_DIDS=$(plcbundle export --bundles $BUNDLE | head -5100 | tail -100 | jq -r '.did' | head -$SAMPLES) 39 + 40 + # Late positions (~9900) 41 + LATE_DIDS=$(plcbundle export --bundles $BUNDLE | tail -100 | jq -r '.did' | head -$SAMPLES) 42 + 43 + echo "✓ Extracted test DIDs" 44 + echo "" 45 + 46 + # Function to benchmark HTTP resolution 47 + benchmark_http() { 48 + local label="$1" 49 + local dids="$2" 50 + 51 + echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━" 52 + echo "Testing: $label" 53 + echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━" 54 + 55 + local total_time=0 56 + local total_dns=0 57 + local total_connect=0 58 + local total_starttransfer=0 59 + local count=0 60 + local min_time=999999 61 + local max_time=0 62 + local errors=0 63 + 64 + # Arrays to store individual timings 65 + local -a total_timings 66 + local -a transfer_timings 67 + 68 + for did in $dids; do 69 + # Use curl with timing 70 + timing=$(curl -s -w "\n%{time_namelookup},%{time_connect},%{time_starttransfer},%{time_total},%{http_code}" \ 71 + -o /dev/null \ 72 + "$SERVER_URL/$did" 2>&1) 73 + 74 + # Parse timing (last line) 75 + IFS=',' read -r dns connect starttransfer total http_code <<< "$(echo "$timing" | tail -1)" 76 + 77 + if [ "$http_code" == "200" ]; then 78 + # Convert to milliseconds 79 + total_ms=$(echo "scale=2; $total * 1000" | bc) 80 + transfer_ms=$(echo "scale=2; $starttransfer * 1000" | bc) 81 + dns_ms=$(echo "scale=2; $dns * 1000" | bc) 82 + connect_ms=$(echo "scale=2; $connect * 1000" | bc) 83 + 84 + total_timings+=($total_ms) 85 + transfer_timings+=($transfer_ms) 86 + 87 + total_time=$(echo "$total_time + $total_ms" | bc) 88 + total_dns=$(echo "$total_dns + $dns_ms" | bc) 89 + total_connect=$(echo "$total_connect + $connect_ms" | bc) 90 + total_starttransfer=$(echo "$total_starttransfer + $transfer_ms" | bc) 91 + count=$((count + 1)) 92 + 93 + # Update min/max (total time) 94 + if (( $(echo "$total_ms < $min_time" | bc -l) )); then 95 + min_time=$total_ms 96 + fi 97 + if (( $(echo "$total_ms > $max_time" | bc -l) )); then 98 + max_time=$total_ms 99 + fi 100 + 101 + printf "." 102 + else 103 + printf "E" 104 + errors=$((errors + 1)) 105 + fi 106 + done 107 + 108 + echo "" 109 + 110 + if [ $count -gt 0 ]; then 111 + avg_total=$(echo "scale=2; $total_time / $count" | bc) 112 + avg_dns=$(echo "scale=2; $total_dns / $count" | bc) 113 + avg_connect=$(echo "scale=2; $total_connect / $count" | bc) 114 + avg_transfer=$(echo "scale=2; $total_starttransfer / $count" | bc) 115 + 116 + # Calculate median total time 117 + IFS=$'\n' sorted=($(sort -n <<<"${total_timings[*]}")) 118 + median_idx=$((count / 2)) 119 + median_total=${sorted[$median_idx]} 120 + 121 + # Calculate median transfer time 122 + IFS=$'\n' sorted_transfer=($(sort -n <<<"${transfer_timings[*]}")) 123 + median_transfer=${sorted_transfer[$median_idx]} 124 + 125 + echo "" 126 + echo "Results ($count successful, $errors errors):" 127 + echo "" 128 + echo " Total Response Time:" 129 + echo " Average: ${avg_total}ms" 130 + echo " Median: ${median_total}ms" 131 + echo " Min: ${min_time}ms" 132 + echo " Max: ${max_time}ms" 133 + echo "" 134 + echo " Breakdown (average):" 135 + echo " DNS lookup: ${avg_dns}ms" 136 + echo " TCP connect: ${avg_connect}ms" 137 + echo " Time to first byte: ${avg_transfer}ms" 138 + echo " Transfer: $(echo "scale=2; $avg_total - $avg_transfer" | bc)ms" 139 + else 140 + echo " ❌ All requests failed" 141 + fi 142 + 143 + echo "" 144 + } 145 + 146 + # Run HTTP benchmarks 147 + benchmark_http "Early Positions (0-100)" "$EARLY_DIDS" 148 + benchmark_http "Middle Positions (~5000)" "$MIDDLE_DIDS" 149 + benchmark_http "Late Positions (~9900)" "$LATE_DIDS" 150 + 151 + echo "═══════════════════════════════════════════════════════════" 152 + echo " HTTP Benchmark Complete" 153 + echo "═══════════════════════════════════════════════════════════" 154 + echo "" 155 + echo "Note: HTTP adds overhead (~2-5ms) compared to local resolution" 156 + echo "The 'Time to first byte' shows server processing time" 157 + echo "" 158 + 159 + # Bonus: Test server status endpoint 160 + echo "Server Status:" 161 + echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━" 162 + curl -s "$SERVER_URL/status" | jq -r ' 163 + " Bundles: \(.bundles.count)", 164 + " Operations: \(.bundles.total_operations // 0)", 165 + " Uptime: \(.server.uptime_seconds)s", 166 + " WebSocket: \(.server.websocket_enabled)", 167 + if .mempool then " Mempool: \(.mempool.count) ops" else " Mempool: disabled" end 168 + ' 169 + echo ""
+123
scripts/did-resolve-benchmark.sh
··· 1 + #!/bin/bash 2 + # did-resolve-benchmar.sh - Benchmark DID resolution performance 3 + 4 + set -e 5 + 6 + BUNDLE=${1:-1} 7 + SAMPLES=${2:-20} 8 + 9 + echo "═══════════════════════════════════════════════════════════" 10 + echo " DID Resolution Performance Benchmark" 11 + echo "═══════════════════════════════════════════════════════════" 12 + echo "" 13 + echo "Bundle: $BUNDLE" 14 + echo "Samples per position range: $SAMPLES" 15 + echo "" 16 + 17 + # Extract DIDs at different positions from a bundle 18 + echo "Extracting test DIDs from bundle $BUNDLE..." 19 + 20 + # Early positions (0-100) 21 + EARLY_DIDS=$(plcbundle export --bundles $BUNDLE | head -100 | jq -r '.did' | head -$SAMPLES) 22 + 23 + # Middle positions (~5000) 24 + MIDDLE_DIDS=$(plcbundle export --bundles $BUNDLE | head -5100 | tail -100 | jq -r '.did' | head -$SAMPLES) 25 + 26 + # Late positions (~9900) 27 + LATE_DIDS=$(plcbundle export --bundles $BUNDLE | tail -100 | jq -r '.did' | head -$SAMPLES) 28 + 29 + echo "✓ Extracted test DIDs" 30 + echo "" 31 + 32 + # Function to benchmark a set of DIDs 33 + benchmark_dids() { 34 + local label="$1" 35 + local dids="$2" 36 + 37 + echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━" 38 + echo "Testing: $label" 39 + echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━" 40 + 41 + local total_time=0 42 + local count=0 43 + local min_time=999999 44 + local max_time=0 45 + 46 + # Arrays to store individual timings 47 + local -a timings 48 + 49 + for did in $dids; do 50 + # Run resolve and extract timing 51 + output=$(plcbundle index resolve "$did" 2>&1) 52 + 53 + # Extract "Total: XXms" from stderr 54 + if [[ $output =~ Total:\ ([0-9.]+)([µm]?s) ]]; then 55 + time_value="${BASH_REMATCH[1]}" 56 + time_unit="${BASH_REMATCH[2]}" 57 + 58 + # Convert to milliseconds 59 + if [[ $time_unit == "µs" ]]; then 60 + time_ms=$(echo "scale=3; $time_value / 1000" | bc) 61 + elif [[ $time_unit == "ms" ]]; then 62 + time_ms=$time_value 63 + else 64 + # Assume seconds 65 + time_ms=$(echo "scale=3; $time_value * 1000" | bc) 66 + fi 67 + 68 + timings+=($time_ms) 69 + total_time=$(echo "$total_time + $time_ms" | bc) 70 + count=$((count + 1)) 71 + 72 + # Update min/max 73 + if (( $(echo "$time_ms < $min_time" | bc -l) )); then 74 + min_time=$time_ms 75 + fi 76 + if (( $(echo "$time_ms > $max_time" | bc -l) )); then 77 + max_time=$time_ms 78 + fi 79 + 80 + printf "." 81 + fi 82 + done 83 + 84 + echo "" 85 + 86 + if [ $count -gt 0 ]; then 87 + avg_time=$(echo "scale=2; $total_time / $count" | bc) 88 + 89 + # Calculate median (sort and take middle) 90 + IFS=$'\n' sorted=($(sort -n <<<"${timings[*]}")) 91 + median_idx=$((count / 2)) 92 + median_time=${sorted[$median_idx]} 93 + 94 + echo "" 95 + echo "Results ($count samples):" 96 + echo " Average: ${avg_time}ms" 97 + echo " Median: ${median_time}ms" 98 + echo " Min: ${min_time}ms" 99 + echo " Max: ${max_time}ms" 100 + else 101 + echo " No successful timings" 102 + fi 103 + 104 + echo "" 105 + } 106 + 107 + # Run benchmarks 108 + benchmark_dids "Early Positions (0-100)" "$EARLY_DIDS" 109 + benchmark_dids "Middle Positions (~5000)" "$MIDDLE_DIDS" 110 + benchmark_dids "Late Positions (~9900)" "$LATE_DIDS" 111 + 112 + echo "═══════════════════════════════════════════════════════════" 113 + echo " Benchmark Complete" 114 + echo "═══════════════════════════════════════════════════════════" 115 + echo "" 116 + echo "Expected results with LoadOperation optimization:" 117 + echo " Early: ~2-5ms (only decompress first 1%)" 118 + echo " Middle: ~10-15ms (decompress ~50%)" 119 + echo " Late: ~20-30ms (decompress ~99%)" 120 + echo "" 121 + echo "If all positions show similar timing (~18ms), the optimization" 122 + echo "isn't working and LoadBundle is still being called." 123 + echo ""