A Transparent and Verifiable Way to Sync the AT Protocol's PLC Directory

more work on detectors

+288 -337
+138 -285
cmd/plcbundle/detector.go
··· 36 36 cmdDetectorTest() 37 37 case "run": 38 38 cmdDetectorRun() 39 - case "filter": // ← Add this 39 + case "filter": 40 40 cmdDetectorFilter() 41 41 case "info": 42 42 cmdDetectorInfo() ··· 82 82 func cmdDetectorFilter() { 83 83 if len(os.Args) < 4 { 84 84 fmt.Fprintf(os.Stderr, "Usage: plcbundle detector filter <detector1|script.js> [detector2...] [--confidence 0.9]\n") 85 - fmt.Fprintf(os.Stderr, "\nFilters OUT operations that match detectors (outputs clean data)\n\n") 86 - fmt.Fprintf(os.Stderr, "Examples:\n") 87 - fmt.Fprintf(os.Stderr, " plcbundle backfill | plcbundle detector filter all > clean.jsonl\n") 88 - fmt.Fprintf(os.Stderr, " plcbundle export --bundle 1 | plcbundle detector filter invalid_handle > clean.jsonl\n") 89 - fmt.Fprintf(os.Stderr, " plcbundle backfill | plcbundle detector filter ./my_detector.js > clean.jsonl\n") 90 85 os.Exit(1) 91 86 } 92 87 93 - // Manually separate detector names from flags 88 + // Parse detector names and flags 94 89 var detectorNames []string 95 90 var flagArgs []string 96 - 97 91 for i := 3; i < len(os.Args); i++ { 98 - arg := os.Args[i] 99 - if strings.HasPrefix(arg, "-") { 92 + if strings.HasPrefix(os.Args[i], "-") { 100 93 flagArgs = os.Args[i:] 101 94 break 102 95 } 103 - detectorNames = append(detectorNames, arg) 96 + detectorNames = append(detectorNames, os.Args[i]) 104 97 } 105 98 106 99 if len(detectorNames) == 0 { ··· 108 101 os.Exit(1) 109 102 } 110 103 111 - // Parse flags 112 104 fs := flag.NewFlagSet("detector filter", flag.ExitOnError) 113 105 confidence := fs.Float64("confidence", 0.90, "minimum confidence") 114 106 fs.Parse(flagArgs) 115 107 116 - // Setup registry 117 - registry := detector.DefaultRegistry() 118 - 119 - // Track script detectors for cleanup 120 - var scriptDetectors []*detector.ScriptDetector 121 - defer func() { 122 - for _, sd := range scriptDetectors { 123 - sd.Close() 124 - } 125 - }() 126 - 127 - // Handle "all" keyword 128 - if len(detectorNames) == 1 && detectorNames[0] == "all" { 129 - detectorNames = registry.Names() 130 - fmt.Fprintf(os.Stderr, "Using all detectors: %s\n", strings.Join(detectorNames, ", ")) 131 - } 132 - 133 - // Get all detectors 134 - detectors := make([]detector.Detector, 0, len(detectorNames)) 135 - for _, name := range detectorNames { 136 - // Check if it's a .js file (script detector) 137 - if strings.HasSuffix(name, ".js") { 138 - scriptDetector, err := detector.NewScriptDetector(name) 139 - if err != nil { 140 - fmt.Fprintf(os.Stderr, "Error loading script %s: %v\n", name, err) 141 - os.Exit(1) 142 - } 143 - scriptDetectors = append(scriptDetectors, scriptDetector) 144 - registry.Register(scriptDetector) 145 - detectors = append(detectors, scriptDetector) 146 - fmt.Fprintf(os.Stderr, "✓ Started detector server: %s\n", scriptDetector.Name()) 147 - } else { 148 - d, err := registry.Get(name) 149 - if err != nil { 150 - fmt.Fprintf(os.Stderr, "Error: %v\n", err) 151 - os.Exit(1) 152 - } 153 - detectors = append(detectors, d) 154 - } 108 + // Load detectors (common logic) 109 + setup, err := parseAndLoadDetectors(detectorNames, *confidence) 110 + if err != nil { 111 + fmt.Fprintf(os.Stderr, "Error: %v\n", err) 112 + os.Exit(1) 155 113 } 114 + defer setup.cleanup() 156 115 157 - // Log to stderr 158 - fmt.Fprintf(os.Stderr, "Filtering OUT spam with %d detector(s)\n", len(detectors)) 159 - if len(detectorNames) <= 5 { 160 - fmt.Fprintf(os.Stderr, "Detectors: %s\n", strings.Join(detectorNames, ", ")) 161 - } 162 - fmt.Fprintf(os.Stderr, "Min confidence: %.2f\n\n", *confidence) 116 + fmt.Fprintf(os.Stderr, "Filtering with %d detector(s), min confidence: %.2f\n\n", len(setup.detectors), *confidence) 163 117 164 118 ctx := context.Background() 165 119 scanner := bufio.NewScanner(os.Stdin) 166 - 167 - // Set large buffer for long lines 168 120 buf := make([]byte, 0, 64*1024) 169 121 scanner.Buffer(buf, 1024*1024) 170 122 171 - cleanCount := 0 172 - filteredCount := 0 173 - totalCount := 0 174 - totalBytes := int64(0) 175 - filteredBytes := int64(0) 123 + cleanCount, filteredCount, totalCount := 0, 0, 0 124 + totalBytes, filteredBytes := int64(0), int64(0) 176 125 177 - // Read JSONL from stdin 178 126 for scanner.Scan() { 179 127 line := scanner.Bytes() 180 128 if len(line) == 0 { ··· 182 130 } 183 131 184 132 totalCount++ 185 - opSize := int64(len(line)) 186 - totalBytes += opSize 133 + totalBytes += int64(len(line)) 187 134 188 - // Parse operation 189 135 var op plc.PLCOperation 190 136 if err := json.Unmarshal(line, &op); err != nil { 191 - fmt.Fprintf(os.Stderr, "Warning: failed to parse line %d: %v\n", totalCount, err) 192 137 continue 193 138 } 194 139 195 - // Run all detectors on this operation 196 - isSpam := false 197 - 198 - for _, det := range detectors { 199 - match, err := det.Detect(ctx, op) 200 - if err != nil { 201 - continue 202 - } 203 - 204 - if match != nil && match.Confidence >= *confidence { 205 - // Detected as spam - filter it out 206 - isSpam = true 207 - break 208 - } 209 - } 140 + // Run detection (common logic) 141 + labels, _ := detectOperation(ctx, setup.detectors, op, setup.confidence) 210 142 211 - // Output only if NOT spam (clean operation) 212 - if !isSpam { 143 + if len(labels) == 0 { 213 144 cleanCount++ 214 145 fmt.Println(string(line)) 215 146 } else { 216 147 filteredCount++ 217 - filteredBytes += opSize 148 + filteredBytes += int64(len(line)) 218 149 } 219 150 220 - // Progress to stderr 221 151 if totalCount%1000 == 0 { 222 - fmt.Fprintf(os.Stderr, "Processed: %d | Clean: %d | Filtered: %d | Saved: %s\r", 223 - totalCount, cleanCount, filteredCount, formatBytes(filteredBytes)) 152 + fmt.Fprintf(os.Stderr, "Processed: %d | Clean: %d | Filtered: %d\r", totalCount, cleanCount, filteredCount) 224 153 } 225 154 } 226 155 227 - if err := scanner.Err(); err != nil { 228 - fmt.Fprintf(os.Stderr, "\nError reading stdin: %v\n", err) 229 - os.Exit(1) 230 - } 231 - 232 - // Final stats to stderr 233 - fmt.Fprintf(os.Stderr, "\n\n") 234 - fmt.Fprintf(os.Stderr, "✓ Filter complete\n") 235 - fmt.Fprintf(os.Stderr, " Total operations: %d\n", totalCount) 236 - fmt.Fprintf(os.Stderr, " Clean: %d (%.2f%%)\n", cleanCount, float64(cleanCount)/float64(totalCount)*100) 237 - fmt.Fprintf(os.Stderr, " Filtered out: %d (%.2f%%)\n", filteredCount, float64(filteredCount)/float64(totalCount)*100) 238 - fmt.Fprintf(os.Stderr, "\n") 239 - fmt.Fprintf(os.Stderr, " Total size: %s\n", formatBytes(totalBytes)) 240 - fmt.Fprintf(os.Stderr, " Filtered size: %s (%.2f%%)\n", formatBytes(filteredBytes), float64(filteredBytes)/float64(totalBytes)*100) 241 - fmt.Fprintf(os.Stderr, " Clean size: %s (%.2f%%)\n", formatBytes(totalBytes-filteredBytes), float64(totalBytes-filteredBytes)/float64(totalBytes)*100) 242 - fmt.Fprintf(os.Stderr, "\n") 243 - fmt.Fprintf(os.Stderr, " Detectors used: %d\n", len(detectors)) 156 + // Stats 157 + fmt.Fprintf(os.Stderr, "\n\n✓ Filter complete\n") 158 + fmt.Fprintf(os.Stderr, " Total: %d | Clean: %d (%.2f%%) | Filtered: %d (%.2f%%)\n", 159 + totalCount, cleanCount, float64(cleanCount)/float64(totalCount)*100, 160 + filteredCount, float64(filteredCount)/float64(totalCount)*100) 161 + fmt.Fprintf(os.Stderr, " Size saved: %s (%.2f%%)\n", formatBytes(filteredBytes), float64(filteredBytes)/float64(totalBytes)*100) 244 162 } 245 163 246 164 func cmdDetectorList() { ··· 371 289 func cmdDetectorRun() { 372 290 if len(os.Args) < 4 { 373 291 fmt.Fprintf(os.Stderr, "Usage: plcbundle detector run <detector1|script.js> [detector2...] [--bundles 1-100]\n") 374 - fmt.Fprintf(os.Stderr, "\nExamples:\n") 375 - fmt.Fprintf(os.Stderr, " plcbundle detector run invalid_handle --bundles 1-100\n") 376 - fmt.Fprintf(os.Stderr, " plcbundle detector run invalid_handle aka_spam --bundles 1-100\n") 377 - fmt.Fprintf(os.Stderr, " plcbundle detector run ./my_detector.js --bundles 1-100\n") 378 - fmt.Fprintf(os.Stderr, " plcbundle detector run all # runs on all bundles\n") 379 292 os.Exit(1) 380 293 } 381 294 382 - // Manually separate detector names from flags 295 + // Parse detector names and flags 383 296 var detectorNames []string 384 297 var flagArgs []string 385 - 386 298 for i := 3; i < len(os.Args); i++ { 387 - arg := os.Args[i] 388 - if strings.HasPrefix(arg, "-") { 299 + if strings.HasPrefix(os.Args[i], "-") { 389 300 flagArgs = os.Args[i:] 390 301 break 391 302 } 392 - detectorNames = append(detectorNames, arg) 303 + detectorNames = append(detectorNames, os.Args[i]) 393 304 } 394 305 395 306 if len(detectorNames) == 0 { ··· 397 308 os.Exit(1) 398 309 } 399 310 400 - // Parse flags 401 311 fs := flag.NewFlagSet("detector run", flag.ExitOnError) 402 - bundleRange := fs.String("bundles", "", "bundle range (e.g., '1-100'), default: all bundles") 312 + bundleRange := fs.String("bundles", "", "bundle range, default: all bundles") 403 313 confidence := fs.Float64("confidence", 0.90, "minimum confidence") 404 314 fs.Parse(flagArgs) 405 315 406 - // Load manager (needed to determine bundle range) 316 + // Load manager 407 317 mgr, _, err := getManager("") 408 318 if err != nil { 409 319 fmt.Fprintf(os.Stderr, "Error: %v\n", err) ··· 414 324 // Determine bundle range 415 325 var start, end int 416 326 if *bundleRange == "" { 417 - // Default to all bundles 418 327 index := mgr.GetIndex() 419 328 bundles := index.GetBundles() 420 329 if len(bundles) == 0 { ··· 423 332 } 424 333 start = bundles[0].BundleNumber 425 334 end = bundles[len(bundles)-1].BundleNumber 426 - fmt.Fprintf(os.Stderr, "No --bundles specified, using all available bundles: %d-%d\n", start, end) 335 + fmt.Fprintf(os.Stderr, "Using all bundles: %d-%d\n", start, end) 427 336 } else { 428 - // Parse provided range 429 337 start, end, err = parseBundleRange(*bundleRange) 430 338 if err != nil { 431 339 fmt.Fprintf(os.Stderr, "Error: %v\n", err) ··· 433 341 } 434 342 } 435 343 436 - // Setup registry 437 - registry := detector.DefaultRegistry() 438 - config := detector.DefaultConfig() 439 - config.MinConfidence = *confidence 440 - 441 - // Track script detectors for cleanup 442 - var scriptDetectors []*detector.ScriptDetector 443 - defer func() { 444 - for _, sd := range scriptDetectors { 445 - sd.Close() 446 - } 447 - }() 448 - 449 - // Handle "all" keyword - expand to all available detectors 450 - if len(detectorNames) == 1 && detectorNames[0] == "all" { 451 - detectorNames = registry.Names() 452 - fmt.Fprintf(os.Stderr, "Using all available detectors: %s\n", strings.Join(detectorNames, ", ")) 344 + // Load detectors (common logic) 345 + setup, err := parseAndLoadDetectors(detectorNames, *confidence) 346 + if err != nil { 347 + fmt.Fprintf(os.Stderr, "Error: %v\n", err) 348 + os.Exit(1) 453 349 } 350 + defer setup.cleanup() 454 351 455 - // Load detectors (built-in or scripts) 456 - detectors := make([]detector.Detector, 0, len(detectorNames)) 457 - for _, name := range detectorNames { 458 - // Check if it's a .js file (script detector) 459 - if strings.HasSuffix(name, ".js") { 460 - scriptDetector, err := detector.NewScriptDetector(name) 461 - if err != nil { 462 - fmt.Fprintf(os.Stderr, "Error loading script %s: %v\n", name, err) 463 - os.Exit(1) 464 - } 465 - // Track for cleanup 466 - scriptDetectors = append(scriptDetectors, scriptDetector) 467 - // Register it so it can be used 468 - registry.Register(scriptDetector) 469 - detectors = append(detectors, scriptDetector) 470 - fmt.Fprintf(os.Stderr, "✓ Started detector server: %s\n", scriptDetector.Name()) 471 - } else { 472 - // Try to get built-in detector 473 - d, err := registry.Get(name) 474 - if err != nil { 475 - fmt.Fprintf(os.Stderr, "Error: %v\n", err) 476 - os.Exit(1) 477 - } 478 - detectors = append(detectors, d) 479 - } 480 - } 481 - 482 - // Log to stderr 483 - fmt.Fprintf(os.Stderr, "Running %d detector(s) on bundles %d-%d...\n", len(detectors), start, end) 484 - if len(detectorNames) <= 5 { 485 - fmt.Fprintf(os.Stderr, "Detectors: %s\n", strings.Join(detectorNames, ", ")) 486 - } 352 + fmt.Fprintf(os.Stderr, "Running %d detector(s) on bundles %d-%d\n", len(setup.detectors), start, end) 487 353 fmt.Fprintf(os.Stderr, "Min confidence: %.2f\n\n", *confidence) 488 354 489 355 ctx := context.Background() 490 - 491 - // Write CSV header to stdout 492 356 fmt.Println("bundle,position,cid,size,confidence,labels") 493 357 494 - // Track statistics 495 - totalOps := 0 496 - matchCount := 0 497 - totalBytes := int64(0) 498 - matchedBytes := int64(0) 499 - bundlesProcessed := 0 500 - detectorMatchCounts := make(map[string]int) 501 - 358 + // Stats 359 + totalOps, matchCount := 0, 0 360 + totalBytes, matchedBytes := int64(0), int64(0) 502 361 totalBundles := end - start + 1 503 - 504 - // Create progress bar with byte tracking enabled 505 - fmt.Fprintf(os.Stderr, "Processing bundles:\n") 506 362 progress := NewProgressBar(totalBundles) 507 363 progress.showBytes = true 508 364 509 - // Process bundles and stream results 365 + // Process bundles 510 366 for bundleNum := start; bundleNum <= end; bundleNum++ { 511 367 bundle, err := mgr.LoadBundle(ctx, bundleNum) 512 368 if err != nil { 513 - progress.Finish() 514 - fmt.Fprintf(os.Stderr, "\n⚠️ Warning: failed to load bundle %d: %v\n", bundleNum, err) 515 - progress = NewProgressBar(totalBundles) 516 - progress.showBytes = true 517 - progress.SetWithBytes(bundleNum-start, totalBytes) 518 369 continue 519 370 } 520 371 521 - bundlesProcessed++ 522 372 totalOps += len(bundle.Operations) 523 373 524 - // Process each operation with all detectors 525 374 for position, op := range bundle.Operations { 526 - // Calculate operation size 527 - var opSize int 528 - if len(op.RawJSON) > 0 { 529 - opSize = len(op.RawJSON) 530 - } else { 375 + opSize := len(op.RawJSON) 376 + if opSize == 0 { 531 377 data, _ := json.Marshal(op) 532 378 opSize = len(data) 533 379 } 534 380 totalBytes += int64(opSize) 535 381 536 - // Collect all matches for this operation 537 - var matchedLabels []string 538 - var maxConfidence float64 539 - 540 - // Run all detectors on this operation 541 - for _, det := range detectors { 542 - match, err := det.Detect(ctx, op) 543 - if err != nil { 544 - continue 545 - } 546 - 547 - // Skip if no match or confidence too low 548 - if match == nil || match.Confidence < *confidence { 549 - continue 550 - } 551 - 552 - // Extract labels from match metadata 553 - var labels []string 554 - if labelList, ok := match.Metadata["labels"].([]string); ok { 555 - labels = labelList 556 - } else if labelList, ok := match.Metadata["labels"].([]interface{}); ok { 557 - for _, l := range labelList { 558 - if str, ok := l.(string); ok { 559 - labels = append(labels, str) 560 - } 561 - } 562 - } 563 - 564 - // If no labels in metadata, use detector name 565 - if len(labels) == 0 { 566 - labels = []string{det.Name()} 567 - } 382 + // Run detection (common logic) 383 + labels, confidence := detectOperation(ctx, setup.detectors, op, setup.confidence) 568 384 569 - // Collect all labels 570 - matchedLabels = append(matchedLabels, labels...) 571 - detectorMatchCounts[det.Name()]++ 572 - 573 - // Track highest confidence 574 - if match.Confidence > maxConfidence { 575 - maxConfidence = match.Confidence 576 - } 577 - } 578 - 579 - // Output only if at least one detector matched 580 - if len(matchedLabels) > 0 { 385 + if len(labels) > 0 { 581 386 matchCount++ 582 387 matchedBytes += int64(opSize) 583 388 584 - // Extract last 4 chars of CID 585 389 cidShort := op.CID 586 390 if len(cidShort) > 4 { 587 391 cidShort = cidShort[len(cidShort)-4:] 588 392 } 589 393 590 394 fmt.Printf("%d,%d,%s,%d,%.2f,%s\n", 591 - bundleNum, 592 - position, 593 - cidShort, 594 - opSize, 595 - maxConfidence, 596 - strings.Join(matchedLabels, ";"), 597 - ) 395 + bundleNum, position, cidShort, opSize, confidence, strings.Join(labels, ";")) 598 396 } 599 397 } 600 398 601 - // Update progress with bytes 602 399 progress.SetWithBytes(bundleNum-start+1, totalBytes) 603 400 } 604 401 605 - // Finish progress bar 606 402 progress.Finish() 607 403 608 - // Final stats to stderr 609 - fmt.Fprintf(os.Stderr, "\n") 610 - fmt.Fprintf(os.Stderr, "✓ Detection complete\n") 611 - fmt.Fprintf(os.Stderr, " Bundles processed: %d\n", bundlesProcessed) 404 + // Stats 405 + fmt.Fprintf(os.Stderr, "\n✓ Detection complete\n") 612 406 fmt.Fprintf(os.Stderr, " Total operations: %d\n", totalOps) 613 407 fmt.Fprintf(os.Stderr, " Matches found: %d (%.2f%%)\n", matchCount, float64(matchCount)/float64(totalOps)*100) 614 - fmt.Fprintf(os.Stderr, " Clean operations: %d (%.2f%%)\n", totalOps-matchCount, float64(totalOps-matchCount)/float64(totalOps)*100) 615 - fmt.Fprintf(os.Stderr, "\n") 616 408 fmt.Fprintf(os.Stderr, " Total size: %s\n", formatBytes(totalBytes)) 617 409 fmt.Fprintf(os.Stderr, " Matched size: %s (%.2f%%)\n", formatBytes(matchedBytes), float64(matchedBytes)/float64(totalBytes)*100) 618 - fmt.Fprintf(os.Stderr, " Clean size: %s (%.2f%%)\n", formatBytes(totalBytes-matchedBytes), float64(totalBytes-matchedBytes)/float64(totalBytes)*100) 619 - 620 - if matchedBytes > 0 { 621 - fmt.Fprintf(os.Stderr, "\n") 622 - fmt.Fprintf(os.Stderr, " 💾 Potential savings if filtered: %s (%.2f%% reduction)\n", 623 - formatBytes(matchedBytes), 624 - float64(matchedBytes)/float64(totalBytes)*100) 625 - } 626 - 627 - fmt.Fprintf(os.Stderr, "\n") 628 - fmt.Fprintf(os.Stderr, " Detectors used: %d\n", len(detectors)) 629 - 630 - // Show breakdown by detector if multiple used 631 - if len(detectors) > 1 { 632 - fmt.Fprintf(os.Stderr, "\n") 633 - fmt.Fprintf(os.Stderr, " Matches by detector:\n") 634 - for _, det := range detectors { 635 - name := det.Name() 636 - count := detectorMatchCounts[name] 637 - if count > 0 { 638 - pct := float64(count) / float64(matchCount) * 100 639 - fmt.Fprintf(os.Stderr, " %-20s %d (%.1f%%)\n", name, count, pct) 640 - } else { 641 - fmt.Fprintf(os.Stderr, " %-20s 0\n", name) 642 - } 643 - } 644 - } 645 410 } 646 411 647 412 func cmdDetectorInfo() { ··· 709 474 710 475 return start, end, nil 711 476 } 477 + 478 + // Common detector setup 479 + type detectorSetup struct { 480 + detectors []detector.Detector 481 + scriptDetectors []interface{ Close() error } 482 + confidence float64 483 + } 484 + 485 + func (ds *detectorSetup) cleanup() { 486 + for _, sd := range ds.scriptDetectors { 487 + sd.Close() 488 + } 489 + } 490 + 491 + // parseAndLoadDetectors handles common detector loading logic 492 + func parseAndLoadDetectors(detectorNames []string, confidence float64) (*detectorSetup, error) { 493 + registry := detector.DefaultRegistry() 494 + 495 + if len(detectorNames) == 1 && detectorNames[0] == "all" { 496 + detectorNames = registry.Names() 497 + fmt.Fprintf(os.Stderr, "Using all detectors: %s\n", strings.Join(detectorNames, ", ")) 498 + } 499 + 500 + setup := &detectorSetup{ 501 + detectors: make([]detector.Detector, 0, len(detectorNames)), 502 + scriptDetectors: make([]interface{ Close() error }, 0), 503 + confidence: confidence, 504 + } 505 + 506 + for _, name := range detectorNames { 507 + if strings.HasSuffix(name, ".js") { 508 + sd, err := detector.NewScriptDetector(name) // Simple single process 509 + if err != nil { 510 + setup.cleanup() 511 + return nil, fmt.Errorf("error loading script %s: %w", name, err) 512 + } 513 + setup.scriptDetectors = append(setup.scriptDetectors, sd) 514 + registry.Register(sd) 515 + setup.detectors = append(setup.detectors, sd) 516 + fmt.Fprintf(os.Stderr, "✓ Started detector server: %s\n", sd.Name()) 517 + } else { 518 + d, err := registry.Get(name) 519 + if err != nil { 520 + setup.cleanup() 521 + return nil, err 522 + } 523 + setup.detectors = append(setup.detectors, d) 524 + } 525 + } 526 + 527 + return setup, nil 528 + } 529 + 530 + // detectOperation runs all detectors on an operation and returns labels + confidence 531 + func detectOperation(ctx context.Context, detectors []detector.Detector, op plc.PLCOperation, minConfidence float64) ([]string, float64) { 532 + var matchedLabels []string 533 + var maxConfidence float64 534 + 535 + for _, det := range detectors { 536 + match, err := det.Detect(ctx, op) 537 + if err != nil || match == nil || match.Confidence < minConfidence { 538 + continue 539 + } 540 + 541 + // Extract labels 542 + var labels []string 543 + if labelList, ok := match.Metadata["labels"].([]string); ok { 544 + labels = labelList 545 + } else if labelList, ok := match.Metadata["labels"].([]interface{}); ok { 546 + for _, l := range labelList { 547 + if str, ok := l.(string); ok { 548 + labels = append(labels, str) 549 + } 550 + } 551 + } 552 + 553 + if len(labels) == 0 { 554 + labels = []string{det.Name()} 555 + } 556 + 557 + matchedLabels = append(matchedLabels, labels...) 558 + if match.Confidence > maxConfidence { 559 + maxConfidence = match.Confidence 560 + } 561 + } 562 + 563 + return matchedLabels, maxConfidence 564 + }
+18
detector/builtin.go
··· 9 9 "tangled.org/atscan.net/plcbundle/plc" 10 10 ) 11 11 12 + // NoOpDetector is an empty detector for speed testing 13 + type NoOpDetector struct{} 14 + 15 + func NewNoOpDetector() *NoOpDetector { 16 + return &NoOpDetector{} 17 + } 18 + 19 + func (d *NoOpDetector) Name() string { return "noop" } 20 + func (d *NoOpDetector) Description() string { 21 + return "Empty detector for benchmarking (always returns no match)" 22 + } 23 + func (d *NoOpDetector) Version() string { return "1.0.0" } 24 + 25 + func (d *NoOpDetector) Detect(ctx context.Context, op plc.PLCOperation) (*Match, error) { 26 + // Instant return - no work done 27 + return nil, nil 28 + } 29 + 12 30 // InvalidHandleDetector detects operations with invalid handle patterns 13 31 type InvalidHandleDetector struct { 14 32 // Valid handle regex based on AT Protocol handle specification
+2
detector/registry.go
··· 76 76 func DefaultRegistry() *Registry { 77 77 r := NewRegistry() 78 78 79 + r.Register(NewNoOpDetector()) 80 + 79 81 // Register real spam detectors 80 82 r.Register(NewInvalidHandleDetector()) 81 83 r.Register(NewAlsoKnownAsSpamDetector())
+17 -50
detector/script.go
··· 28 28 } 29 29 30 30 func NewScriptDetector(scriptPath string) (*ScriptDetector, error) { 31 - // Verify bun is available 32 31 if _, err := exec.LookPath("bun"); err != nil { 33 32 return nil, fmt.Errorf("bun runtime not found in PATH") 34 33 } ··· 38 37 return nil, fmt.Errorf("invalid script path: %w", err) 39 38 } 40 39 41 - // Use filename as detector name 42 40 name := strings.TrimSuffix(filepath.Base(scriptPath), filepath.Ext(scriptPath)) 43 - 44 - // Create unique socket path 45 41 socketPath := filepath.Join(os.TempDir(), fmt.Sprintf("detector-%d-%s.sock", os.Getpid(), name)) 46 42 47 43 sd := &ScriptDetector{ ··· 50 46 socketPath: socketPath, 51 47 } 52 48 53 - // Start the server 54 49 if err := sd.startServer(); err != nil { 55 50 return nil, err 56 51 } ··· 63 58 func (d *ScriptDetector) Version() string { return "1.0.0" } 64 59 65 60 func (d *ScriptDetector) startServer() error { 66 - // Read user's script 67 61 userCode, err := os.ReadFile(d.scriptPath) 68 62 if err != nil { 69 63 return fmt.Errorf("failed to read script: %w", err) 70 64 } 71 65 72 - // Create wrapper script with socket server 73 66 wrapperScript := d.createSocketWrapper(string(userCode)) 74 - 75 - // Remove old socket if exists 76 67 os.Remove(d.socketPath) 77 68 78 - // Start Bun server with script piped to stdin 79 69 d.serverCmd = exec.Command("bun", "run", "-", d.socketPath) 80 70 d.serverCmd.Stdout = os.Stderr 81 71 d.serverCmd.Stderr = os.Stderr 82 72 83 - // Get stdin pipe 84 73 stdin, err := d.serverCmd.StdinPipe() 85 74 if err != nil { 86 75 return fmt.Errorf("failed to create stdin pipe: %w", err) 87 76 } 88 77 89 - // Start the process 90 78 if err := d.serverCmd.Start(); err != nil { 91 79 return fmt.Errorf("failed to start server: %w", err) 92 80 } 93 81 94 - // Write wrapper script to stdin 95 - if _, err := stdin.Write([]byte(wrapperScript)); err != nil { 96 - d.serverCmd.Process.Kill() 97 - return fmt.Errorf("failed to write script: %w", err) 98 - } 82 + stdin.Write([]byte(wrapperScript)) 99 83 stdin.Close() 100 84 101 - // Wait for socket to be ready and connect 102 85 if err := d.connectToServer(); err != nil { 103 86 d.serverCmd.Process.Kill() 104 87 os.Remove(d.socketPath) ··· 116 99 // Unix socket server 117 100 const socketPath = process.argv[2]; 118 101 119 - // Clean up old socket 120 102 try { 121 103 await Bun.file(socketPath).unlink(); 122 104 } catch {} ··· 125 107 unix: socketPath, 126 108 socket: { 127 109 data(socket, data) { 128 - const lines = data.toString().split('\n').filter(line => line.trim()); 129 - 130 - for (const line of lines) { 131 - try { 132 - const operation = JSON.parse(line); 133 - const labels = detect({ op: operation }) || []; 134 - socket.write(JSON.stringify({ labels }) + '\n'); 135 - } catch (error) { 136 - socket.write(JSON.stringify({ labels: [], error: error.message }) + '\n'); 137 - } 110 + try { 111 + const operation = JSON.parse(data.toString()); 112 + const labels = detect({ op: operation }) || []; 113 + socket.write(JSON.stringify({ labels }) + '\n'); 114 + } catch (error) { 115 + socket.write(JSON.stringify({ labels: [], error: error.message }) + '\n'); 138 116 } 139 117 }, 140 - error(socket, error) { 141 - console.error('Socket error:', error); 142 - }, 143 - close(socket) { 144 - console.error('Socket closed'); 145 - } 118 + error(socket, error) {}, 119 + close(socket) {} 146 120 } 147 121 }); 148 122 ··· 151 125 } 152 126 153 127 func (d *ScriptDetector) connectToServer() error { 154 - // Try to connect with retries 155 128 maxRetries := 50 156 129 for i := 0; i < maxRetries; i++ { 157 130 conn, err := net.Dial("unix", d.socketPath) ··· 171 144 return nil, fmt.Errorf("not connected to server") 172 145 } 173 146 174 - // Serialize operation 175 - data, err := json.Marshal(op) 176 - if err != nil { 177 - return nil, fmt.Errorf("failed to serialize operation: %w", err) 147 + // Use RawJSON directly 148 + data := op.RawJSON 149 + if len(data) == 0 { 150 + var err error 151 + data, err = json.Marshal(op) 152 + if err != nil { 153 + return nil, fmt.Errorf("failed to serialize operation: %w", err) 154 + } 178 155 } 179 156 180 - // Write to socket with newline delimiter 181 157 if _, err := d.writer.Write(data); err != nil { 182 158 return nil, fmt.Errorf("failed to write to socket: %w", err) 183 159 } ··· 188 164 return nil, fmt.Errorf("failed to flush: %w", err) 189 165 } 190 166 191 - // Read response 192 167 line, err := d.reader.ReadString('\n') 193 168 if err != nil { 194 169 return nil, fmt.Errorf("failed to read response: %w", err) 195 170 } 196 171 197 - // Parse response 198 172 var result struct { 199 173 Labels []string `json:"labels"` 200 174 Error string `json:"error,omitempty"` ··· 208 182 return nil, fmt.Errorf("detector error: %s", result.Error) 209 183 } 210 184 211 - // No match if no labels 212 185 if len(result.Labels) == 0 { 213 186 return nil, nil 214 187 } 215 188 216 - // Convert labels to Match 217 189 return &Match{ 218 190 Reason: strings.Join(result.Labels, "_"), 219 191 Category: "custom", ··· 226 198 }, nil 227 199 } 228 200 229 - // Close shuts down the server 230 201 func (d *ScriptDetector) Close() error { 231 - // Close connection 232 202 if d.conn != nil { 233 203 d.conn.Close() 234 204 d.conn = nil 235 205 } 236 206 237 - // Kill server process 238 207 if d.serverCmd != nil && d.serverCmd.Process != nil { 239 208 d.serverCmd.Process.Kill() 240 209 d.serverCmd.Wait() 241 210 } 242 211 243 - // Remove socket file 244 212 os.Remove(d.socketPath) 245 - 246 213 return nil 247 214 }
+5
go.mod
··· 5 5 require github.com/klauspost/compress v1.18.1 6 6 7 7 require github.com/gorilla/websocket v1.5.3 8 + 9 + require ( 10 + github.com/vmihailenco/msgpack/v5 v5.4.1 // indirect 11 + github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect 12 + )
+4
go.sum
··· 2 2 github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= 3 3 github.com/klauspost/compress v1.18.1 h1:bcSGx7UbpBqMChDtsF28Lw6v/G94LPrrbMbdC3JH2co= 4 4 github.com/klauspost/compress v1.18.1/go.mod h1:ZQFFVG+MdnR0P+l6wpXgIL4NTtwiKIdBnrBd8Nrxr+0= 5 + github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8= 6 + github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok= 7 + github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= 8 + github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
+100
scripts/benchmark-detector.js
··· 1 + #!/usr/bin/env bun 2 + 3 + // User's detect function 4 + function detect({ op }) { 5 + const labels = []; 6 + 7 + if (op.did.match(/^did:plc:aa/)) { 8 + labels.push('test') 9 + } 10 + 11 + return labels; 12 + } 13 + 14 + // ========================================== 15 + // Pure Bun bundle processor 16 + // ========================================== 17 + 18 + import { spawn } from 'bun'; 19 + import { readdir } from 'fs/promises'; 20 + 21 + const BUNDLE_DIR = process.argv[2] || './'; 22 + const START_BUNDLE = parseInt(process.argv[3]) || 1; 23 + const END_BUNDLE = parseInt(process.argv[4]) || 100; 24 + 25 + console.error(`Processing bundles ${START_BUNDLE}-${END_BUNDLE} from ${BUNDLE_DIR}`); 26 + console.error(''); 27 + 28 + // CSV header 29 + console.log('bundle,position,cid,size,confidence,labels'); 30 + 31 + let totalOps = 0; 32 + let matchCount = 0; 33 + let totalBytes = 0; 34 + let matchedBytes = 0; 35 + 36 + const startTime = Date.now(); 37 + 38 + for (let bundleNum = START_BUNDLE; bundleNum <= END_BUNDLE; bundleNum++) { 39 + const bundleFile = `${BUNDLE_DIR}/${bundleNum.toString().padStart(6, '0')}.jsonl.zst`; 40 + 41 + try { 42 + // Decompress bundle using zstd command 43 + const proc = spawn(['zstd', '-d', '-c', bundleFile]); 44 + const text = await new Response(proc.stdout).text(); 45 + 46 + const lines = text.split('\n').filter(line => line.trim()); 47 + 48 + for (let position = 0; position < lines.length; position++) { 49 + const line = lines[position]; 50 + if (!line) continue; 51 + 52 + totalOps++; 53 + const opSize = line.length; 54 + totalBytes += opSize; 55 + 56 + try { 57 + const op = JSON.parse(line); 58 + const labels = detect({ op }); 59 + 60 + if (labels && labels.length > 0) { 61 + matchCount++; 62 + matchedBytes += opSize; 63 + 64 + // Extract last 4 chars of CID 65 + const cidShort = op.cid.slice(-4); 66 + 67 + console.log( 68 + `${bundleNum},${position},${cidShort},${opSize},0.95,${labels.join(';')}` 69 + ); 70 + } 71 + } catch (err) { 72 + console.error(`Error parsing operation: ${err.message}`); 73 + } 74 + } 75 + 76 + // Progress 77 + if (bundleNum % 10 === 0) { 78 + const elapsed = (Date.now() - startTime) / 1000; 79 + const opsPerSec = (totalOps / elapsed).toFixed(0); 80 + console.error(`Processed ${bundleNum}/${END_BUNDLE} bundles | ${totalOps} ops | ${opsPerSec} ops/sec\r`); 81 + } 82 + 83 + } catch (err) { 84 + console.error(`\nError loading bundle ${bundleNum}: ${err.message}`); 85 + } 86 + } 87 + 88 + const elapsed = (Date.now() - startTime) / 1000; 89 + 90 + // Stats 91 + console.error('\n'); 92 + console.error('✓ Detection complete'); 93 + console.error(` Total operations: ${totalOps}`); 94 + console.error(` Matches found: ${matchCount} (${(matchCount/totalOps*100).toFixed(2)}%)`); 95 + console.error(` Total size: ${(totalBytes / 1e6).toFixed(1)} MB`); 96 + console.error(` Matched size: ${(matchedBytes / 1e6).toFixed(1)} MB (${(matchedBytes/totalBytes*100).toFixed(2)}%)`); 97 + console.error(''); 98 + console.error(` Time elapsed: ${elapsed.toFixed(2)}s`); 99 + console.error(` Throughput: ${(totalOps / elapsed).toFixed(0)} ops/sec`); 100 + console.error(` Speed: ${(totalBytes / elapsed / 1e6).toFixed(1)} MB/sec`);
+4 -2
scripts/detector-template.js
··· 3 3 // op.did - DID identifier 4 4 // op.cid - Content ID 5 5 // op.createdAt - Timestamp 6 - 7 - console.log(op.did) 8 6 9 7 const labels = []; 10 8 11 9 // Add your detection logic here 12 10 // Return array of label strings 13 11 // Return empty array [] for no match 12 + 13 + if (op.did.match(/^did:plc:aa/)) { 14 + labels.push('test') 15 + } 14 16 15 17 return labels; 16 18 }