[DEPRECATED] Go implementation of plcbundle
at b94343e542bc77c7115f660f068af5d2c6b67943 165 lines 4.4 kB view raw
1package main 2 3import ( 4 "bufio" 5 "bytes" 6 "fmt" 7 "os" 8 "path/filepath" 9 "strconv" 10 "strings" 11 "time" 12 13 gozstd "github.com/DataDog/zstd" 14 "github.com/goccy/go-json" 15) 16 17// Minimal operation struct 18type Operation struct { 19 DID string `json:"did"` 20 Operation map[string]interface{} `json:"operation"` 21 CID string `json:"cid"` 22 Nullified interface{} `json:"nullified,omitempty"` 23 CreatedAt time.Time `json:"createdAt"` 24 25 // RawJSON stores the original JSON bytes for exact reproduction 26 RawJSON []byte `json:"-"` 27} 28 29// User's detect function 30func detect(op *Operation) []string { 31 labels := []string{} 32 33 if strings.HasPrefix(op.DID, "did:plc:aa") { 34 labels = append(labels, "test") 35 } 36 37 // Log operation.sig (like console.log in JavaScript) 38 if sig, ok := op.Operation["sig"]; ok { 39 fmt.Fprintf(os.Stderr, "%v\n", sig) 40 } 41 42 return labels 43} 44 45func main() { 46 bundleDir := "./" 47 if len(os.Args) > 1 { 48 bundleDir = os.Args[1] 49 } 50 51 startBundle := 1 52 if len(os.Args) > 2 { 53 startBundle, _ = strconv.Atoi(os.Args[2]) 54 } 55 56 endBundle := 100 57 if len(os.Args) > 3 { 58 endBundle, _ = strconv.Atoi(os.Args[3]) 59 } 60 61 fmt.Fprintf(os.Stderr, "Processing bundles %d-%d from %s\n", startBundle, endBundle, bundleDir) 62 fmt.Fprintf(os.Stderr, "\n") 63 64 // Buffered stdout 65 writer := bufio.NewWriterSize(os.Stdout, 512*1024) 66 defer writer.Flush() 67 68 // CSV header 69 writer.WriteString("bundle,position,cid,size,confidence,labels\n") 70 71 totalOps := 0 72 matchCount := 0 73 totalBytes := int64(0) 74 matchedBytes := int64(0) 75 76 startTime := time.Now() 77 78 for bundleNum := startBundle; bundleNum <= endBundle; bundleNum++ { 79 bundleFile := filepath.Join(bundleDir, fmt.Sprintf("%06d.jsonl.zst", bundleNum)) 80 81 // Read compressed file 82 compressed, err := os.ReadFile(bundleFile) 83 if err != nil { 84 fmt.Fprintf(os.Stderr, "\nError reading bundle %d: %v\n", bundleNum, err) 85 continue 86 } 87 88 // Decompress with native gozstd (CGO, fast as native zstd) 89 decompressed, err := gozstd.Decompress(nil, compressed) 90 if err != nil { 91 fmt.Fprintf(os.Stderr, "\nError decompressing bundle %d: %v\n", bundleNum, err) 92 continue 93 } 94 95 // Split into lines 96 lines := bytes.Split(decompressed, []byte("\n")) 97 98 for position, line := range lines { 99 if len(line) == 0 { 100 continue 101 } 102 103 totalOps++ 104 opSize := len(line) 105 totalBytes += int64(opSize) 106 107 // Parse JSON with go-json 108 var op Operation 109 if err := json.Unmarshal(line, &op); err != nil { 110 fmt.Fprintf(os.Stderr, "Error parsing operation: %v\n", err) 111 continue 112 } 113 114 // Run detect function 115 labels := detect(&op) 116 117 if len(labels) > 0 { 118 matchCount++ 119 matchedBytes += int64(opSize) 120 121 // Extract last 4 chars of CID 122 cidShort := op.CID 123 if len(cidShort) > 4 { 124 cidShort = cidShort[len(cidShort)-4:] 125 } 126 127 // Write CSV line 128 writer.WriteString(strconv.Itoa(bundleNum)) 129 writer.WriteByte(',') 130 writer.WriteString(strconv.Itoa(position)) 131 writer.WriteByte(',') 132 writer.WriteString(cidShort) 133 writer.WriteByte(',') 134 writer.WriteString(strconv.Itoa(opSize)) 135 writer.WriteByte(',') 136 writer.WriteString("0.95") 137 writer.WriteByte(',') 138 writer.WriteString(strings.Join(labels, ";")) 139 writer.WriteByte('\n') 140 } 141 } 142 143 // Progress 144 if bundleNum%10 == 0 { 145 elapsed := time.Since(startTime).Seconds() 146 opsPerSec := float64(totalOps) / elapsed 147 fmt.Fprintf(os.Stderr, "Processed %d/%d bundles | %d ops | %.0f ops/sec\r", 148 bundleNum, endBundle, totalOps, opsPerSec) 149 } 150 } 151 152 elapsed := time.Since(startTime).Seconds() 153 154 // Stats 155 fmt.Fprintf(os.Stderr, "\n\n") 156 fmt.Fprintf(os.Stderr, "✓ Detection complete\n") 157 fmt.Fprintf(os.Stderr, " Total operations: %d\n", totalOps) 158 fmt.Fprintf(os.Stderr, " Matches found: %d (%.2f%%)\n", matchCount, float64(matchCount)/float64(totalOps)*100) 159 fmt.Fprintf(os.Stderr, " Total size: %.1f MB\n", float64(totalBytes)/1e6) 160 fmt.Fprintf(os.Stderr, " Matched size: %.1f MB (%.2f%%)\n", float64(matchedBytes)/1e6, float64(matchedBytes)/float64(totalBytes)*100) 161 fmt.Fprintf(os.Stderr, "\n") 162 fmt.Fprintf(os.Stderr, " Time elapsed: %.2fs\n", elapsed) 163 fmt.Fprintf(os.Stderr, " Throughput: %.0f ops/sec\n", float64(totalOps)/elapsed) 164 fmt.Fprintf(os.Stderr, " Speed: %.1f MB/sec\n", float64(totalBytes)/elapsed/1e6) 165}