[DEPRECATED] Go implementation of plcbundle

perf updates

+300 -106
benchmark_detector

This is a binary file and will not be displayed.

+2 -2
bundle/bundle_test.go
··· 415 415 DID: "did:plc:test" + string(rune(i)), 416 416 CID: "bafytest" + string(rune(i)), 417 417 CreatedAt: baseTime.Add(time.Duration(i) * time.Second), 418 - Operation: map[string]interface{}{ 418 + /*Operation: map[string]interface{}{ 419 419 "type": "create", 420 - }, 420 + },*/ 421 421 } 422 422 } 423 423
+2 -1
bundle/clone.go
··· 2 2 3 3 import ( 4 4 "context" 5 - "encoding/json" 6 5 "fmt" 7 6 "io" 8 7 "net/http" ··· 11 10 "strings" 12 11 "sync" 13 12 "time" 13 + 14 + "github.com/goccy/go-json" 14 15 ) 15 16 16 17 // CloneFromRemote clones bundles from a remote HTTP endpoint
+2 -1
bundle/index.go
··· 1 1 package bundle 2 2 3 3 import ( 4 - "encoding/json" 5 4 "fmt" 6 5 "os" 7 6 "sort" 8 7 "sync" 9 8 "time" 9 + 10 + "github.com/goccy/go-json" 10 11 ) 11 12 12 13 const (
+2 -1
bundle/mempool.go
··· 3 3 import ( 4 4 "bufio" 5 5 "bytes" 6 - "encoding/json" 7 6 "fmt" 8 7 "os" 9 8 "path/filepath" 10 9 "sync" 11 10 "time" 11 + 12 + "github.com/goccy/go-json" 12 13 13 14 "tangled.org/atscan.net/plcbundle/plc" 14 15 )
+26 -65
bundle/operations.go
··· 5 5 "bytes" 6 6 "crypto/sha256" 7 7 "encoding/hex" 8 - "encoding/json" 9 8 "fmt" 10 9 "io" 11 10 "os" 12 11 "time" 13 12 14 - "github.com/klauspost/compress/zstd" 13 + gozstd "github.com/DataDog/zstd" 14 + "github.com/goccy/go-json" 15 15 "tangled.org/atscan.net/plcbundle/plc" 16 16 ) 17 17 18 18 // Operations handles low-level bundle file operations 19 19 type Operations struct { 20 - encoder *zstd.Encoder 21 - decoder *zstd.Decoder 22 - logger Logger 20 + logger Logger 23 21 } 24 22 25 - // NewOperations creates a new Operations handler with default compression 26 23 func NewOperations(logger Logger) (*Operations, error) { 27 - // Always use default compression (level 3 - good balance) 28 - encoder, err := zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedDefault)) 29 - if err != nil { 30 - return nil, fmt.Errorf("failed to create zstd encoder: %w", err) 31 - } 32 - 33 - decoder, err := zstd.NewReader(nil) 34 - if err != nil { 35 - return nil, fmt.Errorf("failed to create zstd decoder: %w", err) 36 - } 37 - 38 - return &Operations{ 39 - encoder: encoder, 40 - decoder: decoder, 41 - logger: logger, 42 - }, nil 24 + return &Operations{logger: logger}, nil 43 25 } 44 26 45 - // Close cleans up resources 46 27 func (op *Operations) Close() { 47 - if op.encoder != nil { 48 - op.encoder.Close() 49 - } 50 - if op.decoder != nil { 51 - op.decoder.Close() 52 - } 28 + // Nothing to close 53 29 } 54 30 55 31 // ======================================== ··· 81 57 func (op *Operations) ParseJSONL(data []byte) ([]plc.PLCOperation, error) { 82 58 var operations []plc.PLCOperation 83 59 scanner := bufio.NewScanner(bytes.NewReader(data)) 84 - 85 - // Set a large buffer for long lines 86 60 buf := make([]byte, 0, 64*1024) 87 61 scanner.Buffer(buf, 1024*1024) 88 62 89 - lineNum := 0 90 63 for scanner.Scan() { 91 - lineNum++ 92 64 line := scanner.Bytes() 93 - 94 65 if len(line) == 0 { 95 66 continue 96 67 } 97 68 98 69 var operation plc.PLCOperation 70 + // Use sonic instead of json.Unmarshal 99 71 if err := json.Unmarshal(line, &operation); err != nil { 100 - return nil, fmt.Errorf("failed to parse line %d: %w", lineNum, err) 72 + return nil, fmt.Errorf("failed to parse line: %w", err) 101 73 } 102 74 103 - // Store raw JSON 104 75 operation.RawJSON = make([]byte, len(line)) 105 76 copy(operation.RawJSON, line) 106 - 107 77 operations = append(operations, operation) 108 78 } 109 79 110 - if err := scanner.Err(); err != nil { 111 - return nil, fmt.Errorf("scanner error: %w", err) 112 - } 113 - 114 80 return operations, nil 115 81 } 116 82 ··· 118 84 // FILE OPERATIONS (uses JSONL + compression) 119 85 // ======================================== 120 86 121 - // LoadBundle loads a compressed bundle from disk 87 + // LoadBundle loads a compressed bundle 122 88 func (op *Operations) LoadBundle(path string) ([]plc.PLCOperation, error) { 123 - // Read compressed file 124 89 compressed, err := os.ReadFile(path) 125 90 if err != nil { 126 91 return nil, fmt.Errorf("failed to read file: %w", err) 127 92 } 128 93 129 - // Decompress 130 - decompressed, err := op.decoder.DecodeAll(compressed, nil) 94 + decompressed, err := gozstd.Decompress(nil, compressed) 131 95 if err != nil { 132 96 return nil, fmt.Errorf("failed to decompress: %w", err) 133 97 } 134 98 135 - // Parse JSONL 136 99 return op.ParseJSONL(decompressed) 137 100 } 138 101 139 102 // SaveBundle saves operations to disk (compressed) 140 103 // Returns: contentHash, compressedHash, contentSize, compressedSize, error 141 104 func (op *Operations) SaveBundle(path string, operations []plc.PLCOperation) (string, string, int64, int64, error) { 142 - // Serialize to JSONL 143 105 jsonlData := op.SerializeJSONL(operations) 144 106 contentSize := int64(len(jsonlData)) 145 107 contentHash := op.Hash(jsonlData) 146 108 147 - // Compress 148 - compressed := op.encoder.EncodeAll(jsonlData, nil) 109 + // DataDog zstd.Compress returns ([]byte, error) 110 + compressed, err := gozstd.Compress(nil, jsonlData) 111 + if err != nil { 112 + return "", "", 0, 0, fmt.Errorf("failed to compress: %w", err) 113 + } 114 + 149 115 compressedSize := int64(len(compressed)) 150 116 compressedHash := op.Hash(compressed) 151 117 152 - // Write to file 153 118 if err := os.WriteFile(path, compressed, 0644); err != nil { 154 119 return "", "", 0, 0, fmt.Errorf("failed to write file: %w", err) 155 120 } ··· 177 142 return nil, fmt.Errorf("failed to open bundle: %w", err) 178 143 } 179 144 180 - // Create a new decoder for this stream 181 - decoder, err := zstd.NewReader(file) 182 - if err != nil { 183 - file.Close() 184 - return nil, fmt.Errorf("failed to create decompressor: %w", err) 185 - } 145 + // Create zstd reader using DataDog's package 146 + reader := gozstd.NewReader(file) 186 147 187 - // Return a wrapper that closes both the decoder and file 148 + // Return a wrapper that closes both the reader and file 188 149 return &decompressedReader{ 189 - decoder: decoder, 190 - file: file, 150 + reader: reader, 151 + file: file, 191 152 }, nil 192 153 } 193 154 194 155 // decompressedReader wraps a zstd decoder and underlying file 195 156 type decompressedReader struct { 196 - decoder *zstd.Decoder 197 - file *os.File 157 + reader io.ReadCloser 158 + file *os.File 198 159 } 199 160 200 161 func (dr *decompressedReader) Read(p []byte) (int, error) { 201 - return dr.decoder.Read(p) 162 + return dr.reader.Read(p) 202 163 } 203 164 204 165 func (dr *decompressedReader) Close() error { 205 - dr.decoder.Close() 166 + dr.reader.Close() 206 167 return dr.file.Close() 207 168 } 208 169 ··· 241 202 compressedHash = op.Hash(compressedData) 242 203 compressedSize = int64(len(compressedData)) 243 204 244 - // Decompress 245 - decompressed, err := op.decoder.DecodeAll(compressedData, nil) 205 + // Decompress with DataDog zstd 206 + decompressed, err := gozstd.Decompress(nil, compressedData) 246 207 if err != nil { 247 208 return "", 0, "", 0, fmt.Errorf("failed to decompress: %w", err) 248 209 }
+2 -1
cmd/plcbundle/compare.go
··· 1 1 package main 2 2 3 3 import ( 4 - "encoding/json" 5 4 "fmt" 6 5 "io" 7 6 "net/http" ··· 10 9 "sort" 11 10 "strings" 12 11 "time" 12 + 13 + "github.com/goccy/go-json" 13 14 14 15 "tangled.org/atscan.net/plcbundle/bundle" 15 16 )
+17 -1
cmd/plcbundle/detector.go
··· 4 4 import ( 5 5 "bufio" 6 6 "context" 7 - "encoding/json" 8 7 "flag" 9 8 "fmt" 9 + "net/http" 10 + _ "net/http/pprof" 10 11 "os" 11 12 "sort" 12 13 "strings" 14 + "time" 15 + 16 + "github.com/goccy/go-json" 13 17 14 18 "tangled.org/atscan.net/plcbundle/detector" 15 19 "tangled.org/atscan.net/plcbundle/plc" ··· 311 315 fs := flag.NewFlagSet("detector run", flag.ExitOnError) 312 316 bundleRange := fs.String("bundles", "", "bundle range, default: all bundles") 313 317 confidence := fs.Float64("confidence", 0.90, "minimum confidence") 318 + pprofPort := fs.String("pprof", "", "enable pprof on port (e.g., :6060)") 314 319 fs.Parse(flagArgs) 320 + 321 + // Start pprof server if requested 322 + if *pprofPort != "" { 323 + go func() { 324 + fmt.Fprintf(os.Stderr, "pprof server starting on http://localhost%s/debug/pprof/\n", *pprofPort) 325 + if err := http.ListenAndServe(*pprofPort, nil); err != nil { 326 + fmt.Fprintf(os.Stderr, "pprof server failed: %v\n", err) 327 + } 328 + }() 329 + time.Sleep(100 * time.Millisecond) // Let server start 330 + } 315 331 316 332 // Load manager 317 333 mgr, _, err := getManager("")
+5 -1
cmd/plcbundle/main.go
··· 2 2 3 3 import ( 4 4 "context" 5 - "encoding/json" 6 5 "flag" 7 6 "fmt" 8 7 "net/http" ··· 16 15 "sync" 17 16 "syscall" 18 17 "time" 18 + 19 + "github.com/goccy/go-json" 19 20 20 21 "tangled.org/atscan.net/plcbundle/bundle" 21 22 "tangled.org/atscan.net/plcbundle/plc" ··· 55 56 } 56 57 57 58 func main() { 59 + 60 + debug.SetGCPercent(400) 61 + 58 62 if len(os.Args) < 2 { 59 63 printUsage() 60 64 os.Exit(1)
+2 -1
cmd/plcbundle/server.go
··· 2 2 3 3 import ( 4 4 "context" 5 - "encoding/json" 6 5 "fmt" 7 6 "io" 8 7 "net/http" ··· 10 9 "strconv" 11 10 "strings" 12 11 "time" 12 + 13 + "github.com/goccy/go-json" 13 14 14 15 "github.com/gorilla/websocket" 15 16 "tangled.org/atscan.net/plcbundle/bundle"
+42 -8
detector/builtin.go
··· 49 49 func (d *InvalidHandleDetector) Version() string { return "1.0.0" } 50 50 51 51 func (d *InvalidHandleDetector) Detect(ctx context.Context, op plc.PLCOperation) (*Match, error) { 52 - if alsoKnownAs, ok := op.Operation["alsoKnownAs"].([]interface{}); ok { 52 + // Parse Operation field on-demand 53 + operation, err := op.GetOperationMap() 54 + if err != nil { 55 + return nil, err 56 + } 57 + if operation == nil { 58 + return nil, nil 59 + } 60 + 61 + if alsoKnownAs, ok := operation["alsoKnownAs"].([]interface{}); ok { 62 + 53 63 for _, aka := range alsoKnownAs { 54 64 if str, ok := aka.(string); ok { 55 65 // Check if it's an at:// handle ··· 203 213 func (d *AlsoKnownAsSpamDetector) Version() string { return "1.0.0" } 204 214 205 215 func (d *AlsoKnownAsSpamDetector) Detect(ctx context.Context, op plc.PLCOperation) (*Match, error) { 206 - if alsoKnownAs, ok := op.Operation["alsoKnownAs"].([]interface{}); ok { 216 + // Parse Operation field on-demand 217 + operation, err := op.GetOperationMap() 218 + if err != nil { 219 + return nil, err 220 + } 221 + if operation == nil { 222 + return nil, nil 223 + } 224 + if alsoKnownAs, ok := operation["alsoKnownAs"].([]interface{}); ok { 207 225 entryCount := len(alsoKnownAs) 208 226 209 227 // Count different types of entries ··· 296 314 func (d *SpamPDSDetector) Version() string { return "1.0.0" } 297 315 298 316 func (d *SpamPDSDetector) Detect(ctx context.Context, op plc.PLCOperation) (*Match, error) { 317 + // Parse Operation field on-demand 318 + operation, err := op.GetOperationMap() 319 + if err != nil { 320 + return nil, err 321 + } 322 + if operation == nil { 323 + return nil, nil 324 + } 299 325 // Check PDS endpoint 300 - if services, ok := op.Operation["services"].(map[string]interface{}); ok { 326 + if services, ok := operation["services"].(map[string]interface{}); ok { 301 327 if pds, ok := services["atproto_pds"].(map[string]interface{}); ok { 302 328 if endpoint, ok := pds["endpoint"].(string); ok { 303 329 host := extractHost(endpoint) ··· 320 346 } 321 347 322 348 // Check for spam domain claims in alsoKnownAs 323 - if alsoKnownAs, ok := op.Operation["alsoKnownAs"].([]interface{}); ok { 349 + if alsoKnownAs, ok := operation["alsoKnownAs"].([]interface{}); ok { 324 350 for _, aka := range alsoKnownAs { 325 351 if str, ok := aka.(string); ok { 326 352 if !strings.HasPrefix(str, "at://") { ··· 391 417 func (d *ServiceAbuseDetector) Version() string { return "1.0.0" } 392 418 393 419 func (d *ServiceAbuseDetector) Detect(ctx context.Context, op plc.PLCOperation) (*Match, error) { 394 - if services, ok := op.Operation["services"].(map[string]interface{}); ok { 420 + // Parse Operation field on-demand 421 + operation, err := op.GetOperationMap() 422 + if err != nil { 423 + return nil, err 424 + } 425 + if operation == nil { 426 + return nil, nil 427 + } 428 + if services, ok := operation["services"].(map[string]interface{}); ok { 395 429 // Check for numeric service keys (spam uses "0", "1", "2" instead of proper names) 396 430 hasNumericKeys := false 397 431 numericKeyCount := 0 ··· 457 491 } 458 492 459 493 // Check for excessively long handles in alsoKnownAs 460 - if alsoKnownAs, ok := op.Operation["alsoKnownAs"].([]interface{}); ok { 494 + if alsoKnownAs, ok := operation["alsoKnownAs"].([]interface{}); ok { 461 495 for _, aka := range alsoKnownAs { 462 496 if str, ok := aka.(string); ok { 463 497 if strings.HasPrefix(str, "at://") { ··· 480 514 } 481 515 482 516 // Check for empty verificationMethods (common in this spam) 483 - if vm, ok := op.Operation["verificationMethods"].(map[string]interface{}); ok { 517 + if vm, ok := operation["verificationMethods"].(map[string]interface{}); ok { 484 518 if len(vm) == 0 { 485 519 // Empty verificationMethods alone isn't enough, but combined with other signals... 486 520 // Check if there are other suspicious signals 487 - if services, ok := op.Operation["services"].(map[string]interface{}); ok { 521 + if services, ok := operation["services"].(map[string]interface{}); ok { 488 522 if len(services) > 2 { 489 523 // Multiple services + empty verificationMethods = suspicious 490 524 return &Match{
+2 -1
detector/script.go
··· 4 4 import ( 5 5 "bufio" 6 6 "context" 7 - "encoding/json" 8 7 "fmt" 9 8 "net" 10 9 "os" ··· 12 11 "path/filepath" 13 12 "strings" 14 13 "time" 14 + 15 + "github.com/goccy/go-json" 15 16 16 17 "tangled.org/atscan.net/plcbundle/plc" 17 18 )
+4 -7
go.mod
··· 1 1 module tangled.org/atscan.net/plcbundle 2 2 3 - go 1.23 4 - 5 - require github.com/klauspost/compress v1.18.1 6 - 7 - require github.com/gorilla/websocket v1.5.3 3 + go 1.25 8 4 9 5 require ( 10 - github.com/vmihailenco/msgpack/v5 v5.4.1 // indirect 11 - github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect 6 + github.com/DataDog/zstd v1.5.7 7 + github.com/goccy/go-json v0.10.5 8 + github.com/gorilla/websocket v1.5.3 12 9 )
+4 -6
go.sum
··· 1 + github.com/DataDog/zstd v1.5.7 h1:ybO8RBeh29qrxIhCA9E8gKY6xfONU9T6G6aP9DTKfLE= 2 + github.com/DataDog/zstd v1.5.7/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw= 3 + github.com/goccy/go-json v0.10.5 h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4= 4 + github.com/goccy/go-json v0.10.5/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= 1 5 github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= 2 6 github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= 3 - github.com/klauspost/compress v1.18.1 h1:bcSGx7UbpBqMChDtsF28Lw6v/G94LPrrbMbdC3JH2co= 4 - github.com/klauspost/compress v1.18.1/go.mod h1:ZQFFVG+MdnR0P+l6wpXgIL4NTtwiKIdBnrBd8Nrxr+0= 5 - github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8= 6 - github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok= 7 - github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= 8 - github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
+2 -1
plc/client.go
··· 3 3 import ( 4 4 "bufio" 5 5 "context" 6 - "encoding/json" 7 6 "fmt" 8 7 "io" 9 8 "log" 10 9 "net/http" 11 10 "strconv" 12 11 "time" 12 + 13 + "github.com/goccy/go-json" 13 14 ) 14 15 15 16 // Client is a client for the PLC directory
+2 -2
plc/plc_test.go
··· 94 94 DID: "did:plc:test" + string(rune(i)), 95 95 CID: "bafytest" + string(rune(i)), 96 96 CreatedAt: time.Now(), 97 - Operation: map[string]interface{}{"type": "create"}, 97 + //Operation: map[string]interface{}{"type": "create"}, 98 98 } 99 99 json.NewEncoder(w).Encode(op) 100 100 } ··· 239 239 DID: "did:plc:test", 240 240 CID: "bafytest", 241 241 CreatedAt: time.Now(), 242 - Operation: map[string]interface{}{"type": "create"}, 242 + //Operation: map[string]interface{}{"type": "create"}, 243 243 } 244 244 } 245 245
+23 -6
plc/types.go
··· 1 1 package plc 2 2 3 - import "time" 3 + import ( 4 + "time" 5 + 6 + "github.com/goccy/go-json" 7 + ) 4 8 5 9 // PLCOperation represents a single operation from the PLC directory 6 10 type PLCOperation struct { 7 - DID string `json:"did"` 8 - Operation map[string]interface{} `json:"operation"` 9 - CID string `json:"cid"` 10 - Nullified interface{} `json:"nullified,omitempty"` 11 - CreatedAt time.Time `json:"createdAt"` 11 + DID string `json:"did"` 12 + //Operation map[string]interface{} `json:"operation"` 13 + Operation json.RawMessage `json:"operation"` 14 + CID string `json:"cid"` 15 + Nullified interface{} `json:"nullified,omitempty"` 16 + CreatedAt time.Time `json:"createdAt"` 12 17 13 18 // RawJSON stores the original JSON bytes for exact reproduction 14 19 RawJSON []byte `json:"-"` ··· 86 91 Type string // "pds", "labeler", etc. 87 92 Endpoint string 88 93 } 94 + 95 + // GetOperationMap parses Operation RawMessage into a map 96 + func (op *PLCOperation) GetOperationMap() (map[string]interface{}, error) { 97 + if len(op.Operation) == 0 { 98 + return nil, nil 99 + } 100 + var result map[string]interface{} 101 + if err := json.Unmarshal(op.Operation, &result); err != nil { 102 + return nil, err 103 + } 104 + return result, nil 105 + }
+160
scripts/benchmark-detector.go
··· 1 + package main 2 + 3 + import ( 4 + "bufio" 5 + "bytes" 6 + "fmt" 7 + "os" 8 + "path/filepath" 9 + "strconv" 10 + "strings" 11 + "time" 12 + 13 + gozstd "github.com/DataDog/zstd" 14 + "github.com/goccy/go-json" 15 + ) 16 + 17 + // Minimal operation struct 18 + type Operation struct { 19 + DID string `json:"did"` 20 + //Operation map[string]interface{} `json:"operation"` 21 + CID string `json:"cid"` 22 + Nullified interface{} `json:"nullified,omitempty"` 23 + CreatedAt time.Time `json:"createdAt"` 24 + 25 + // RawJSON stores the original JSON bytes for exact reproduction 26 + RawJSON []byte `json:"-"` 27 + } 28 + 29 + // User's detect function 30 + func detect(op *Operation) []string { 31 + labels := []string{} 32 + 33 + if strings.HasPrefix(op.DID, "did:plc:aa") { 34 + labels = append(labels, "test") 35 + } 36 + 37 + return labels 38 + } 39 + 40 + func main() { 41 + bundleDir := "./" 42 + if len(os.Args) > 1 { 43 + bundleDir = os.Args[1] 44 + } 45 + 46 + startBundle := 1 47 + if len(os.Args) > 2 { 48 + startBundle, _ = strconv.Atoi(os.Args[2]) 49 + } 50 + 51 + endBundle := 100 52 + if len(os.Args) > 3 { 53 + endBundle, _ = strconv.Atoi(os.Args[3]) 54 + } 55 + 56 + fmt.Fprintf(os.Stderr, "Processing bundles %d-%d from %s\n", startBundle, endBundle, bundleDir) 57 + fmt.Fprintf(os.Stderr, "\n") 58 + 59 + // Buffered stdout 60 + writer := bufio.NewWriterSize(os.Stdout, 512*1024) 61 + defer writer.Flush() 62 + 63 + // CSV header 64 + writer.WriteString("bundle,position,cid,size,confidence,labels\n") 65 + 66 + totalOps := 0 67 + matchCount := 0 68 + totalBytes := int64(0) 69 + matchedBytes := int64(0) 70 + 71 + startTime := time.Now() 72 + 73 + for bundleNum := startBundle; bundleNum <= endBundle; bundleNum++ { 74 + bundleFile := filepath.Join(bundleDir, fmt.Sprintf("%06d.jsonl.zst", bundleNum)) 75 + 76 + // Read compressed file 77 + compressed, err := os.ReadFile(bundleFile) 78 + if err != nil { 79 + fmt.Fprintf(os.Stderr, "\nError reading bundle %d: %v\n", bundleNum, err) 80 + continue 81 + } 82 + 83 + // Decompress with native gozstd (CGO, fast as native zstd) 84 + decompressed, err := gozstd.Decompress(nil, compressed) 85 + if err != nil { 86 + fmt.Fprintf(os.Stderr, "\nError decompressing bundle %d: %v\n", bundleNum, err) 87 + continue 88 + } 89 + 90 + // Split into lines 91 + lines := bytes.Split(decompressed, []byte("\n")) 92 + 93 + for position, line := range lines { 94 + if len(line) == 0 { 95 + continue 96 + } 97 + 98 + totalOps++ 99 + opSize := len(line) 100 + totalBytes += int64(opSize) 101 + 102 + // Parse JSON with go-json 103 + var op Operation 104 + if err := json.Unmarshal(line, &op); err != nil { 105 + fmt.Fprintf(os.Stderr, "Error parsing operation: %v\n", err) 106 + continue 107 + } 108 + 109 + // Run detect function 110 + labels := detect(&op) 111 + 112 + if len(labels) > 0 { 113 + matchCount++ 114 + matchedBytes += int64(opSize) 115 + 116 + // Extract last 4 chars of CID 117 + cidShort := op.CID 118 + if len(cidShort) > 4 { 119 + cidShort = cidShort[len(cidShort)-4:] 120 + } 121 + 122 + // Write CSV line 123 + writer.WriteString(strconv.Itoa(bundleNum)) 124 + writer.WriteByte(',') 125 + writer.WriteString(strconv.Itoa(position)) 126 + writer.WriteByte(',') 127 + writer.WriteString(cidShort) 128 + writer.WriteByte(',') 129 + writer.WriteString(strconv.Itoa(opSize)) 130 + writer.WriteByte(',') 131 + writer.WriteString("0.95") 132 + writer.WriteByte(',') 133 + writer.WriteString(strings.Join(labels, ";")) 134 + writer.WriteByte('\n') 135 + } 136 + } 137 + 138 + // Progress 139 + if bundleNum%10 == 0 { 140 + elapsed := time.Since(startTime).Seconds() 141 + opsPerSec := float64(totalOps) / elapsed 142 + fmt.Fprintf(os.Stderr, "Processed %d/%d bundles | %d ops | %.0f ops/sec\r", 143 + bundleNum, endBundle, totalOps, opsPerSec) 144 + } 145 + } 146 + 147 + elapsed := time.Since(startTime).Seconds() 148 + 149 + // Stats 150 + fmt.Fprintf(os.Stderr, "\n\n") 151 + fmt.Fprintf(os.Stderr, "✓ Detection complete\n") 152 + fmt.Fprintf(os.Stderr, " Total operations: %d\n", totalOps) 153 + fmt.Fprintf(os.Stderr, " Matches found: %d (%.2f%%)\n", matchCount, float64(matchCount)/float64(totalOps)*100) 154 + fmt.Fprintf(os.Stderr, " Total size: %.1f MB\n", float64(totalBytes)/1e6) 155 + fmt.Fprintf(os.Stderr, " Matched size: %.1f MB (%.2f%%)\n", float64(matchedBytes)/1e6, float64(matchedBytes)/float64(totalBytes)*100) 156 + fmt.Fprintf(os.Stderr, "\n") 157 + fmt.Fprintf(os.Stderr, " Time elapsed: %.2fs\n", elapsed) 158 + fmt.Fprintf(os.Stderr, " Throughput: %.0f ops/sec\n", float64(totalOps)/elapsed) 159 + fmt.Fprintf(os.Stderr, " Speed: %.1f MB/sec\n", float64(totalBytes)/elapsed/1e6) 160 + }
+1 -1
scripts/detector-template.js
··· 10 10 // Return array of label strings 11 11 // Return empty array [] for no match 12 12 13 - if (op.did.match(/^did:plc:aa/)) { 13 + if (op.did.match(/^did:plc:aaa/)) { 14 14 labels.push('test') 15 15 } 16 16