[DEPRECATED] Go implementation of plcbundle
at rust-test 1298 lines 36 kB view raw
1package commands 2 3import ( 4 "bufio" 5 "context" 6 "fmt" 7 "os" 8 "strings" 9 "sync" 10 "time" 11 12 "github.com/goccy/go-json" 13 "github.com/spf13/cobra" 14 "tangled.org/atscan.net/plcbundle/cmd/plcbundle/ui" 15 "tangled.org/atscan.net/plcbundle/internal/didindex" 16 "tangled.org/atscan.net/plcbundle/internal/plcclient" 17) 18 19func NewDIDCommand() *cobra.Command { 20 cmd := &cobra.Command{ 21 Use: "did", 22 Aliases: []string{"d"}, 23 Short: "DID operations and queries", 24 Long: `DID operations and queries 25 26Query and analyze DIDs in the bundle repository. All commands 27require a DID index to be built for optimal performance.`, 28 29 Example: ` # Lookup all operations for a DID 30 plcbundle did lookup did:plc:524tuhdhh3m7li5gycdn6boe 31 32 # Resolve to current DID document 33 plcbundle did resolve did:plc:524tuhdhh3m7li5gycdn6boe 34 35 # Show complete audit log 36 plcbundle did history did:plc:524tuhdhh3m7li5gycdn6boe 37 38 # Show DID statistics 39 plcbundle did stats did:plc:524tuhdhh3m7li5gycdn6boe 40 41 # Batch process from file 42 plcbundle did batch dids.txt`, 43 } 44 45 // Add subcommands 46 cmd.AddCommand(newDIDLookupCommand()) 47 cmd.AddCommand(newDIDResolveCommand()) 48 cmd.AddCommand(newDIDHistoryCommand()) 49 cmd.AddCommand(newDIDBatchCommand()) 50 cmd.AddCommand(newDIDStatsCommand()) 51 52 return cmd 53} 54 55// ============================================================================ 56// DID LOOKUP - Find all operations for a DID 57// ============================================================================ 58 59func newDIDLookupCommand() *cobra.Command { 60 var ( 61 verbose bool 62 showJSON bool 63 ) 64 65 cmd := &cobra.Command{ 66 Use: "lookup <did|handle>", 67 Aliases: []string{"find", "get"}, 68 Short: "Find all operations for a DID or handle", 69 Long: `Find all operations for a DID or handle 70 71Retrieves all operations (both bundled and mempool) for a specific DID. 72Accepts either: 73 • DID: did:plc:524tuhdhh3m7li5gycdn6boe 74 • Handle: tree.fail (resolves via configured resolver) 75 76Requires DID index to be built.`, 77 78 Example: ` # Lookup by DID 79 plcbundle did lookup did:plc:524tuhdhh3m7li5gycdn6boe 80 81 # Lookup by handle 82 plcbundle did lookup tree.fail 83 plcbundle did lookup ngerakines.me 84 85 # With non-default handle resolver configured 86 plcbundle --handle-resolver https://quickdid.smokesignal.tools did lookup tree.fail`, 87 88 Args: cobra.ExactArgs(1), 89 90 RunE: func(cmd *cobra.Command, args []string) error { 91 input := args[0] 92 93 mgr, _, err := getManager(&ManagerOptions{Cmd: cmd}) 94 if err != nil { 95 return err 96 } 97 defer mgr.Close() 98 99 // Resolve handle to DID with timing 100 ctx := context.Background() 101 did, _, err := mgr.ResolveHandleOrDID(ctx, input) 102 if err != nil { 103 return err 104 } 105 106 stats := mgr.GetDIDIndexStats() 107 if !stats["exists"].(bool) { 108 fmt.Fprintf(os.Stderr, "⚠️ DID index not found. Run: plcbundle index build\n") 109 fmt.Fprintf(os.Stderr, " Falling back to full scan (slow)...\n\n") 110 } 111 112 totalStart := time.Now() 113 114 // Lookup operations 115 lookupStart := time.Now() 116 _, opsWithLoc, err := mgr.GetDIDOperations(ctx, did, verbose) 117 if err != nil { 118 return err 119 } 120 lookupElapsed := time.Since(lookupStart) 121 122 // Check mempool 123 mempoolStart := time.Now() 124 mempoolOps, err := mgr.GetDIDOperationsFromMempool(did) 125 if err != nil { 126 return fmt.Errorf("error checking mempool: %w", err) 127 } 128 mempoolElapsed := time.Since(mempoolStart) 129 130 totalElapsed := time.Since(totalStart) 131 132 if len(opsWithLoc) == 0 && len(mempoolOps) == 0 { 133 if showJSON { 134 fmt.Println("{\"found\": false, \"operations\": []}") 135 } else { 136 fmt.Printf("DID not found (searched in %s)\n", totalElapsed) 137 } 138 return nil 139 } 140 141 if showJSON { 142 return outputLookupJSON(did, opsWithLoc, mempoolOps, totalElapsed, lookupElapsed, mempoolElapsed) 143 } 144 145 return displayLookupResults(did, opsWithLoc, mempoolOps, totalElapsed, lookupElapsed, mempoolElapsed, verbose, stats) 146 }, 147 } 148 149 cmd.Flags().BoolVarP(&verbose, "verbose", "v", false, "Verbose debug output") 150 cmd.Flags().BoolVar(&showJSON, "json", false, "Output as JSON") 151 152 return cmd 153} 154 155// ============================================================================ 156// DID RESOLVE - Resolve to current document 157// ============================================================================ 158 159func newDIDResolveCommand() *cobra.Command { 160 var ( 161 verbose bool 162 raw bool 163 ) 164 165 cmd := &cobra.Command{ 166 Use: "resolve <did>", 167 Aliases: []string{"doc", "document"}, 168 Short: "Resolve DID to current document", 169 Long: `Resolve DID to current W3C DID document 170 171Resolves a DID to its current state by applying all non-nullified 172operations in chronological order. Returns standard W3C DID document. 173 174Optimized for speed: checks mempool first, then uses DID index for 175O(1) lookup of latest operation.`, 176 177 Example: ` # Resolve DID 178 plcbundle did resolve did:plc:524tuhdhh3m7li5gycdn6boe 179 180 # Show timings and other details 181 plcbundle did resolve did:plc:524tuhdhh3m7li5gycdn6boe --verbose 182 183 # Get raw PLC state (not W3C format) 184 plcbundle did resolve did:plc:524tuhdhh3m7li5gycdn6boe --raw 185 186 # Pipe to jq 187 plcbundle did resolve did:plc:524tuhdhh3m7li5gycdn6boe | jq .service 188 189 # Resolve by handle 190 plcbundle did resolve tree.fail`, 191 192 Args: cobra.ExactArgs(1), 193 194 RunE: func(cmd *cobra.Command, args []string) error { 195 input := args[0] 196 197 mgr, _, err := getManager(&ManagerOptions{Cmd: cmd}) 198 if err != nil { 199 return err 200 } 201 defer mgr.Close() 202 203 ctx := context.Background() 204 205 // Resolve handle to DID with timing 206 did, handleResolveTime, err := mgr.ResolveHandleOrDID(ctx, input) 207 if err != nil { 208 return err 209 } 210 211 if verbose { 212 fmt.Fprintf(os.Stderr, "Resolving DID: %s\n", did) 213 mgr.GetDIDIndex().SetVerbose(true) 214 } 215 216 result, err := mgr.ResolveDID(ctx, did) 217 if err != nil { 218 return err 219 } 220 221 // Display timing if requested 222 if verbose { 223 if handleResolveTime > 0 { 224 fmt.Fprintf(os.Stderr, "Handle: %s | ", handleResolveTime) 225 } 226 if result.Source == "mempool" { 227 fmt.Fprintf(os.Stderr, "Mempool check: %s (✓ found)\n", result.MempoolTime) 228 fmt.Fprintf(os.Stderr, "Total: %s\n\n", result.TotalTime) 229 } else { 230 fmt.Fprintf(os.Stderr, "Mempool: %s | Index: %s | Load: %s | Total: %s\n", 231 result.MempoolTime, result.IndexTime, result.LoadOpTime, result.TotalTime) 232 fmt.Fprintf(os.Stderr, "Source: bundle %06d, position %d\n\n", 233 result.BundleNumber, result.Position) 234 } 235 } 236 237 // Output document 238 data, _ := json.MarshalIndent(result.Document, "", " ") 239 fmt.Println(string(data)) 240 241 return nil 242 }, 243 } 244 245 cmd.Flags().BoolVarP(&verbose, "verbose", "v", false, "Verbose debug output") 246 cmd.Flags().BoolVar(&raw, "raw", false, "Output raw PLC state (not W3C document)") 247 248 return cmd 249} 250 251// ============================================================================ 252// DID HISTORY - Show complete audit log 253// ============================================================================ 254 255func newDIDHistoryCommand() *cobra.Command { 256 var ( 257 verbose bool 258 showJSON bool 259 compact bool 260 includeNullified bool 261 ) 262 263 cmd := &cobra.Command{ 264 Use: "history <did>", 265 Aliases: []string{"log", "audit"}, 266 Short: "Show complete DID audit log", 267 Long: `Show complete DID audit log 268 269Displays all operations for a DID in chronological order, showing 270the complete history including nullified operations. 271 272This provides a full audit trail of all changes to the DID.`, 273 274 Example: ` # Show full history 275 plcbundle did history did:plc:524tuhdhh3m7li5gycdn6boe 276 277 # Include nullified operations 278 plcbundle did history did:plc:524tuhdhh3m7li5gycdn6boe --include-nullified 279 280 # Compact one-line format 281 plcbundle did history did:plc:524tuhdhh3m7li5gycdn6boe --compact 282 283 # JSON output 284 plcbundle did history did:plc:524tuhdhh3m7li5gycdn6boe --json`, 285 286 Args: cobra.ExactArgs(1), 287 288 RunE: func(cmd *cobra.Command, args []string) error { 289 did := args[0] 290 291 mgr, _, err := getManager(&ManagerOptions{Cmd: cmd}) 292 if err != nil { 293 return err 294 } 295 defer mgr.Close() 296 297 ctx := context.Background() 298 299 // Get all operations with locations 300 _, opsWithLoc, err := mgr.GetDIDOperations(ctx, did, verbose) 301 if err != nil { 302 return err 303 } 304 305 // Get mempool operations 306 mempoolOps, err := mgr.GetDIDOperationsFromMempool(did) 307 if err != nil { 308 return err 309 } 310 311 if len(opsWithLoc) == 0 && len(mempoolOps) == 0 { 312 fmt.Fprintf(os.Stderr, "DID not found: %s\n", did) 313 return nil 314 } 315 316 if showJSON { 317 return outputHistoryJSON(did, opsWithLoc, mempoolOps) 318 } 319 320 return displayHistory(did, opsWithLoc, mempoolOps, compact, includeNullified) 321 }, 322 } 323 324 cmd.Flags().BoolVarP(&verbose, "verbose", "v", false, "Verbose output") 325 cmd.Flags().BoolVar(&showJSON, "json", false, "Output as JSON") 326 cmd.Flags().BoolVar(&compact, "compact", false, "Compact one-line format") 327 cmd.Flags().BoolVar(&includeNullified, "include-nullified", false, "Show nullified operations") 328 329 return cmd 330} 331 332// ============================================================================ 333// DID BATCH - Process multiple DIDs from file or stdin 334// ============================================================================ 335 336func newDIDBatchCommand() *cobra.Command { 337 var ( 338 action string 339 workers int 340 outputFile string 341 fromStdin bool 342 ) 343 344 cmd := &cobra.Command{ 345 Use: "batch [file]", 346 Short: "Process multiple DIDs from file or stdin", 347 Long: `Process multiple DIDs from file or stdin 348 349Read DIDs from a file (one per line) or stdin and perform batch operations. 350Supports parallel processing for better performance. 351 352Actions: 353 lookup - Lookup all DIDs and show summary 354 resolve - Resolve all DIDs to documents 355 export - Export all operations to JSONL 356 357Input formats: 358 - File path: reads DIDs from file 359 - "-" or --stdin: reads DIDs from stdin 360 - Omit file + use --stdin: reads from stdin`, 361 362 Example: ` # Batch lookup from file 363 plcbundle did batch dids.txt --action lookup 364 365 # Read from stdin 366 cat dids.txt | plcbundle did batch --stdin --action lookup 367 cat dids.txt | plcbundle did batch - --action resolve 368 369 # Export operations for DIDs from stdin 370 echo "did:plc:524tuhdhh3m7li5gycdn6boe" | plcbundle did batch - --action export 371 372 # Pipe results 373 plcbundle did batch dids.txt --action resolve -o resolved.jsonl 374 375 # Parallel processing 376 cat dids.txt | plcbundle did batch --stdin --action lookup --workers 8 377 378 # Chain commands 379 grep "did:plc:" some_file.txt | plcbundle did batch - --action export > ops.jsonl`, 380 381 Args: cobra.MaximumNArgs(1), 382 383 RunE: func(cmd *cobra.Command, args []string) error { 384 var filename string 385 386 // Determine input source 387 if len(args) > 0 { 388 filename = args[0] 389 if filename == "-" { 390 fromStdin = true 391 } 392 } else if !fromStdin { 393 return fmt.Errorf("either provide filename or use --stdin flag\n" + 394 "Examples:\n" + 395 " plcbundle did batch dids.txt\n" + 396 " plcbundle did batch --stdin\n" + 397 " cat dids.txt | plcbundle did batch -") 398 } 399 400 mgr, _, err := getManager(&ManagerOptions{Cmd: cmd}) 401 if err != nil { 402 return err 403 } 404 defer mgr.Close() 405 406 return processBatchDIDs(mgr, filename, batchOptions{ 407 action: action, 408 workers: workers, 409 outputFile: outputFile, 410 fromStdin: fromStdin, 411 }) 412 }, 413 } 414 415 cmd.Flags().StringVar(&action, "action", "lookup", "Action: lookup, resolve, export") 416 cmd.Flags().IntVar(&workers, "workers", 4, "Number of parallel workers") 417 cmd.Flags().StringVarP(&outputFile, "output", "o", "", "Output file (default: stdout)") 418 cmd.Flags().BoolVar(&fromStdin, "stdin", false, "Read DIDs from stdin") 419 420 return cmd 421} 422 423// ============================================================================ 424// DID STATS - Show DID activity statistics 425// ============================================================================ 426 427func newDIDStatsCommand() *cobra.Command { 428 var ( 429 showGlobal bool 430 showJSON bool 431 ) 432 433 cmd := &cobra.Command{ 434 Use: "stats [did]", 435 Short: "Show DID activity statistics", 436 Long: `Show DID activity statistics 437 438Display statistics for a specific DID or global DID index stats. 439 440With DID: shows operation count, first/last activity, bundle distribution 441Without DID: shows global index statistics`, 442 443 Example: ` # Stats for specific DID 444 plcbundle did stats did:plc:524tuhdhh3m7li5gycdn6boe 445 446 # Global index stats 447 plcbundle did stats --global 448 plcbundle did stats 449 450 # JSON output 451 plcbundle did stats did:plc:524tuhdhh3m7li5gycdn6boe --json`, 452 453 Args: cobra.MaximumNArgs(1), 454 455 RunE: func(cmd *cobra.Command, args []string) error { 456 mgr, dir, err := getManager(&ManagerOptions{Cmd: cmd}) 457 if err != nil { 458 return err 459 } 460 defer mgr.Close() 461 462 // Global stats 463 if len(args) == 0 || showGlobal { 464 return showGlobalDIDStats(mgr, dir, showJSON) 465 } 466 467 // Specific DID stats 468 did := args[0] 469 return showDIDStats(mgr, did, showJSON) 470 }, 471 } 472 473 cmd.Flags().BoolVar(&showGlobal, "global", false, "Show global index stats") 474 cmd.Flags().BoolVar(&showJSON, "json", false, "Output as JSON") 475 476 return cmd 477} 478 479// ============================================================================ 480// Helper Functions 481// ============================================================================ 482 483type batchOptions struct { 484 action string 485 workers int 486 outputFile string 487 fromStdin bool 488} 489 490func processBatchDIDs(mgr BundleManager, filename string, opts batchOptions) error { 491 // Determine input source 492 var input *os.File 493 var err error 494 495 if opts.fromStdin { 496 input = os.Stdin 497 fmt.Fprintf(os.Stderr, "Reading DIDs from stdin...\n") 498 } else { 499 input, err = os.Open(filename) 500 if err != nil { 501 return fmt.Errorf("failed to open file: %w", err) 502 } 503 defer input.Close() 504 fmt.Fprintf(os.Stderr, "Reading DIDs from: %s\n", filename) 505 } 506 507 // Read DIDs 508 var dids []string 509 scanner := bufio.NewScanner(input) 510 511 // Increase buffer size for large input 512 buf := make([]byte, 64*1024) 513 scanner.Buffer(buf, 1024*1024) 514 515 lineNum := 0 516 for scanner.Scan() { 517 lineNum++ 518 line := strings.TrimSpace(scanner.Text()) 519 520 // Skip empty lines and comments 521 if line == "" || strings.HasPrefix(line, "#") { 522 continue 523 } 524 525 // Basic validation 526 if !strings.HasPrefix(line, "did:plc:") { 527 fmt.Fprintf(os.Stderr, "⚠️ Line %d: skipping invalid DID: %s\n", lineNum, line) 528 continue 529 } 530 531 dids = append(dids, line) 532 } 533 534 if err := scanner.Err(); err != nil { 535 return fmt.Errorf("error reading input: %w", err) 536 } 537 538 if len(dids) == 0 { 539 return fmt.Errorf("no valid DIDs found in input") 540 } 541 542 fmt.Fprintf(os.Stderr, "Processing %d DIDs with action '%s' (%d workers)\n\n", 543 len(dids), opts.action, opts.workers) 544 545 // Setup output 546 var output *os.File 547 if opts.outputFile != "" { 548 output, err = os.Create(opts.outputFile) 549 if err != nil { 550 return fmt.Errorf("failed to create output file: %w", err) 551 } 552 defer output.Close() 553 fmt.Fprintf(os.Stderr, "Output: %s\n\n", opts.outputFile) 554 } else { 555 output = os.Stdout 556 } 557 558 // Process based on action 559 switch opts.action { 560 case "lookup": 561 return batchLookup(mgr, dids, output, opts.workers) 562 case "resolve": 563 return batchResolve(mgr, dids, output, opts.workers) 564 case "export": 565 return batchExport(mgr, dids, output, opts.workers) 566 default: 567 return fmt.Errorf("unknown action: %s (valid: lookup, resolve, export)", opts.action) 568 } 569} 570 571func showGlobalDIDStats(mgr BundleManager, dir string, showJSON bool) error { 572 stats := mgr.GetDIDIndexStats() 573 574 if !stats["exists"].(bool) { 575 fmt.Printf("DID index does not exist\n") 576 fmt.Printf("Run: plcbundle index build\n") 577 return nil 578 } 579 580 if showJSON { 581 data, _ := json.MarshalIndent(stats, "", " ") 582 fmt.Println(string(data)) 583 return nil 584 } 585 586 indexedDIDs := stats["indexed_dids"].(int64) 587 mempoolDIDs := stats["mempool_dids"].(int64) 588 totalDIDs := stats["total_dids"].(int64) 589 590 fmt.Printf("\nDID Index Statistics\n") 591 fmt.Printf("════════════════════\n\n") 592 fmt.Printf(" Location: %s/.plcbundle/\n", dir) 593 594 if mempoolDIDs > 0 { 595 fmt.Printf(" Indexed DIDs: %s (in bundles)\n", formatNumber(int(indexedDIDs))) 596 fmt.Printf(" Mempool DIDs: %s (not yet bundled)\n", formatNumber(int(mempoolDIDs))) 597 fmt.Printf(" Total DIDs: %s\n", formatNumber(int(totalDIDs))) 598 } else { 599 fmt.Printf(" Total DIDs: %s\n", formatNumber(int(totalDIDs))) 600 } 601 602 fmt.Printf(" Shard count: %d\n", stats["shard_count"]) 603 fmt.Printf(" Last bundle: %06d\n", stats["last_bundle"]) 604 fmt.Printf(" Updated: %s\n\n", stats["updated_at"].(time.Time).Format("2006-01-02 15:04:05")) 605 606 fmt.Printf(" Cached shards: %d / %d\n", stats["cached_shards"], stats["cache_limit"]) 607 608 if cachedList, ok := stats["cache_order"].([]int); ok && len(cachedList) > 0 { 609 fmt.Printf(" Hot shards: ") 610 for i, shard := range cachedList { 611 if i > 0 { 612 fmt.Printf(", ") 613 } 614 if i >= 10 { 615 fmt.Printf("... (+%d more)", len(cachedList)-10) 616 break 617 } 618 fmt.Printf("%02x", shard) 619 } 620 fmt.Printf("\n") 621 } 622 623 fmt.Printf("\n") 624 return nil 625} 626 627func showDIDStats(mgr BundleManager, did string, showJSON bool) error { 628 ctx := context.Background() 629 630 // Get operations 631 _, opsWithLoc, err := mgr.GetDIDOperations(ctx, did, false) 632 if err != nil { 633 return err 634 } 635 636 mempoolOps, err := mgr.GetDIDOperationsFromMempool(did) 637 if err != nil { 638 return err 639 } 640 641 if len(opsWithLoc) == 0 && len(mempoolOps) == 0 { 642 fmt.Fprintf(os.Stderr, "DID not found: %s\n", did) 643 return nil 644 } 645 646 // Calculate stats 647 totalOps := len(opsWithLoc) + len(mempoolOps) 648 nullifiedCount := 0 649 for _, owl := range opsWithLoc { 650 if owl.Operation.IsNullified() { 651 nullifiedCount++ 652 } 653 } 654 655 bundleSpan := 0 656 if len(opsWithLoc) > 0 { 657 bundles := make(map[int]bool) 658 for _, owl := range opsWithLoc { 659 bundles[owl.Bundle] = true 660 } 661 bundleSpan = len(bundles) 662 } 663 664 if showJSON { 665 output := map[string]interface{}{ 666 "did": did, 667 "total_operations": totalOps, 668 "bundled": len(opsWithLoc), 669 "mempool": len(mempoolOps), 670 "nullified": nullifiedCount, 671 "active": totalOps - nullifiedCount, 672 "bundle_span": bundleSpan, 673 } 674 data, _ := json.MarshalIndent(output, "", " ") 675 fmt.Println(string(data)) 676 return nil 677 } 678 679 fmt.Printf("\nDID Statistics\n") 680 fmt.Printf("══════════════\n\n") 681 fmt.Printf(" DID: %s\n\n", did) 682 fmt.Printf(" Total operations: %d\n", totalOps) 683 fmt.Printf(" Active: %d\n", totalOps-nullifiedCount) 684 if nullifiedCount > 0 { 685 fmt.Printf(" Nullified: %d\n", nullifiedCount) 686 } 687 if len(opsWithLoc) > 0 { 688 fmt.Printf(" Bundled: %d\n", len(opsWithLoc)) 689 fmt.Printf(" Bundle span: %d bundles\n", bundleSpan) 690 } 691 if len(mempoolOps) > 0 { 692 fmt.Printf(" Mempool: %d\n", len(mempoolOps)) 693 } 694 fmt.Printf("\n") 695 696 return nil 697} 698 699func displayHistory(did string, opsWithLoc []PLCOperationWithLocation, mempoolOps []plcclient.PLCOperation, compact bool, includeNullified bool) error { 700 if compact { 701 return displayHistoryCompact(did, opsWithLoc, mempoolOps, includeNullified) 702 } 703 return displayHistoryDetailed(did, opsWithLoc, mempoolOps, includeNullified) 704} 705 706func displayHistoryCompact(did string, opsWithLoc []PLCOperationWithLocation, mempoolOps []plcclient.PLCOperation, includeNullified bool) error { 707 fmt.Printf("DID History: %s\n\n", did) 708 709 for _, owl := range opsWithLoc { 710 if !includeNullified && owl.Operation.IsNullified() { 711 continue 712 } 713 714 status := "✓" 715 if owl.Operation.IsNullified() { 716 status = "✗" 717 } 718 719 fmt.Printf("%s [%06d:%04d] %s %s\n", 720 status, 721 owl.Bundle, 722 owl.Position, 723 owl.Operation.CreatedAt.Format("2006-01-02 15:04:05"), 724 owl.Operation.CID) 725 } 726 727 for _, op := range mempoolOps { 728 fmt.Printf("✓ [mempool ] %s %s\n", 729 op.CreatedAt.Format("2006-01-02 15:04:05"), 730 op.CID) 731 } 732 733 return nil 734} 735 736func displayHistoryDetailed(did string, opsWithLoc []PLCOperationWithLocation, mempoolOps []plcclient.PLCOperation, includeNullified bool) error { 737 fmt.Printf("═══════════════════════════════════════════════════════════════\n") 738 fmt.Printf(" DID Audit Log\n") 739 fmt.Printf("═══════════════════════════════════════════════════════════════\n\n") 740 fmt.Printf("DID: %s\n\n", did) 741 742 for i, owl := range opsWithLoc { 743 if !includeNullified && owl.Operation.IsNullified() { 744 continue 745 } 746 747 op := owl.Operation 748 status := "✓ Active" 749 if op.IsNullified() { 750 status = "✗ Nullified" 751 } 752 753 fmt.Printf("Operation %d [Bundle %06d, Position %04d]\n", i+1, owl.Bundle, owl.Position) 754 fmt.Printf(" CID: %s\n", op.CID) 755 fmt.Printf(" Created: %s\n", op.CreatedAt.Format("2006-01-02 15:04:05.000 MST")) 756 fmt.Printf(" Status: %s\n", status) 757 758 if opData, err := op.GetOperationData(); err == nil && opData != nil { 759 showOperationDetails(&op) 760 } 761 762 fmt.Printf("\n") 763 } 764 765 if len(mempoolOps) > 0 { 766 fmt.Printf("Mempool Operations (%d)\n", len(mempoolOps)) 767 fmt.Printf("══════════════════════════════════════════════════════════════\n\n") 768 769 for i, op := range mempoolOps { 770 fmt.Printf("Operation %d [Mempool]\n", i+1) 771 fmt.Printf(" CID: %s\n", op.CID) 772 fmt.Printf(" Created: %s\n", op.CreatedAt.Format("2006-01-02 15:04:05.000 MST")) 773 fmt.Printf(" Status: ✓ Active\n") 774 fmt.Printf("\n") 775 } 776 } 777 778 return nil 779} 780 781func outputHistoryJSON(did string, opsWithLoc []PLCOperationWithLocation, mempoolOps []plcclient.PLCOperation) error { 782 output := map[string]interface{}{ 783 "did": did, 784 "bundled": make([]map[string]interface{}, 0), 785 "mempool": make([]map[string]interface{}, 0), 786 } 787 788 for _, owl := range opsWithLoc { 789 output["bundled"] = append(output["bundled"].([]map[string]interface{}), map[string]interface{}{ 790 "bundle": owl.Bundle, 791 "position": owl.Position, 792 "cid": owl.Operation.CID, 793 "nullified": owl.Operation.IsNullified(), 794 "created_at": owl.Operation.CreatedAt.Format(time.RFC3339Nano), 795 }) 796 } 797 798 for _, op := range mempoolOps { 799 output["mempool"] = append(output["mempool"].([]map[string]interface{}), map[string]interface{}{ 800 "cid": op.CID, 801 "nullified": op.IsNullified(), 802 "created_at": op.CreatedAt.Format(time.RFC3339Nano), 803 }) 804 } 805 806 data, _ := json.MarshalIndent(output, "", " ") 807 fmt.Println(string(data)) 808 809 return nil 810} 811 812func batchLookup(mgr BundleManager, dids []string, output *os.File, _ int) error { 813 progress := ui.NewProgressBar(len(dids)) 814 ctx := context.Background() 815 816 // CSV header 817 fmt.Fprintf(output, "did,status,operation_count,bundled,mempool,nullified\n") 818 819 found := 0 820 notFound := 0 821 errorCount := 0 822 823 for i, did := range dids { 824 _, opsWithLoc, err := mgr.GetDIDOperations(ctx, did, false) 825 if err != nil { 826 errorCount++ 827 fmt.Fprintf(output, "%s,error,0,0,0,0\n", did) 828 progress.Set(i + 1) 829 continue 830 } 831 832 mempoolOps, _ := mgr.GetDIDOperationsFromMempool(did) 833 834 if len(opsWithLoc) == 0 && len(mempoolOps) == 0 { 835 notFound++ 836 fmt.Fprintf(output, "%s,not_found,0,0,0,0\n", did) 837 } else { 838 found++ 839 840 // Count nullified 841 nullified := 0 842 for _, owl := range opsWithLoc { 843 if owl.Operation.IsNullified() { 844 nullified++ 845 } 846 } 847 848 fmt.Fprintf(output, "%s,found,%d,%d,%d,%d\n", 849 did, 850 len(opsWithLoc)+len(mempoolOps), 851 len(opsWithLoc), 852 len(mempoolOps), 853 nullified) 854 } 855 856 progress.Set(i + 1) 857 } 858 859 progress.Finish() 860 861 fmt.Fprintf(os.Stderr, "\n✓ Batch lookup complete\n") 862 fmt.Fprintf(os.Stderr, " DIDs input: %d\n", len(dids)) 863 fmt.Fprintf(os.Stderr, " Found: %d\n", found) 864 fmt.Fprintf(os.Stderr, " Not found: %d\n", notFound) 865 if errorCount > 0 { 866 fmt.Fprintf(os.Stderr, " Errors: %d\n", errorCount) 867 } 868 869 return nil 870} 871 872func batchResolve(mgr BundleManager, dids []string, output *os.File, workers int) error { 873 ctx := context.Background() 874 overallStart := time.Now() 875 876 // ================================================================= 877 // PHASE 1: Batch lookup locations 878 // ================================================================= 879 fmt.Fprintf(os.Stderr, "Phase 1/2: Looking up %d DID locations...\n", len(dids)) 880 phase1Start := time.Now() 881 882 type didLocation struct { 883 did string 884 bundle int 885 position int 886 fromMempool bool 887 mempoolOp *plcclient.PLCOperation 888 err error 889 } 890 891 locations := make([]didLocation, len(dids)) 892 893 // Mempool check 894 for i, did := range dids { 895 if mempoolOp := findLatestInMempool(mgr, did); mempoolOp != nil { 896 locations[i] = didLocation{did: did, fromMempool: true, mempoolOp: mempoolOp} 897 } 898 } 899 900 // Batch index lookup 901 needsLookup := make([]string, 0) 902 lookupMap := make(map[string]int) 903 for i, did := range dids { 904 if !locations[i].fromMempool { 905 needsLookup = append(needsLookup, did) 906 lookupMap[did] = i 907 } 908 } 909 910 if len(needsLookup) > 0 { 911 batchResults, _ := mgr.GetDIDIndex().BatchGetDIDLocations(needsLookup) 912 for did, locs := range batchResults { 913 var latest *didindex.OpLocation 914 for i := range locs { 915 if !locs[i].Nullified() && (latest == nil || locs[i].IsAfter(*latest)) { 916 latest = &locs[i] 917 } 918 } 919 920 idx := lookupMap[did] 921 if latest != nil { 922 locations[idx] = didLocation{did: did, bundle: latest.BundleInt(), position: latest.PositionInt()} 923 } else { 924 locations[idx] = didLocation{did: did, err: fmt.Errorf("not found")} 925 } 926 } 927 928 for _, did := range needsLookup { 929 if idx := lookupMap[did]; locations[idx].bundle == 0 && locations[idx].err == nil { 930 locations[idx] = didLocation{did: did, err: fmt.Errorf("not found")} 931 } 932 } 933 } 934 935 phase1Duration := time.Since(phase1Start) 936 fmt.Fprintf(os.Stderr, " ✓ %s\n\n", phase1Duration.Round(time.Millisecond)) 937 938 // ================================================================= 939 // PHASE 2: Group by bundle, load ops, resolve (MERGED, parallel) 940 // ================================================================= 941 fmt.Fprintf(os.Stderr, "Phase 2/2: Loading and resolving (%d workers)...\n", workers) 942 943 // Group DIDs by bundle 944 type bundleGroup struct { 945 bundleNum int 946 dids []int // indices into locations array 947 } 948 949 bundleMap := make(map[int][]int) 950 mempoolDIDs := make([]int, 0) 951 errorDIDs := make([]int, 0) 952 953 for i, loc := range locations { 954 if loc.err != nil { 955 errorDIDs = append(errorDIDs, i) 956 } else if loc.fromMempool { 957 mempoolDIDs = append(mempoolDIDs, i) 958 } else { 959 bundleMap[loc.bundle] = append(bundleMap[loc.bundle], i) 960 } 961 } 962 963 bundles := make([]bundleGroup, 0, len(bundleMap)) 964 for bn, didIndices := range bundleMap { 965 bundles = append(bundles, bundleGroup{bundleNum: bn, dids: didIndices}) 966 } 967 968 fmt.Fprintf(os.Stderr, " %d bundles, %d mempool, %d errors\n", 969 len(bundles), len(mempoolDIDs), len(errorDIDs)) 970 971 // Setup output 972 writer := bufio.NewWriterSize(output, 512*1024) 973 defer writer.Flush() 974 975 var ( 976 resolved int 977 failed int 978 processed int 979 mu sync.Mutex // Single lock for all counters 980 ) 981 982 progress := ui.NewProgressBar(len(dids)) 983 984 writeResult := func(doc *plcclient.DIDDocument, err error) { 985 mu.Lock() 986 defer mu.Unlock() 987 988 processed++ 989 progress.Set(processed) 990 991 if err != nil { 992 failed++ 993 } else { 994 resolved++ 995 data, _ := json.Marshal(doc) 996 writer.Write(data) 997 writer.WriteByte('\n') 998 if resolved%100 == 0 { 999 writer.Flush() 1000 } 1001 } 1002 } 1003 1004 // Process mempool DIDs (already have ops) 1005 for _, idx := range mempoolDIDs { 1006 loc := locations[idx] 1007 doc, err := plcclient.ResolveDIDDocument(loc.did, []plcclient.PLCOperation{*loc.mempoolOp}) 1008 writeResult(doc, err) 1009 } 1010 1011 // Process errors 1012 for range errorDIDs { 1013 writeResult(nil, fmt.Errorf("not found")) 1014 } 1015 1016 // Process bundles in parallel - LoadOperations once per bundle 1017 bundleJobs := make(chan bundleGroup, len(bundles)) 1018 var wg sync.WaitGroup 1019 1020 for w := 0; w < workers; w++ { 1021 wg.Add(1) 1022 go func() { 1023 defer wg.Done() 1024 1025 for job := range bundleJobs { 1026 // Collect all positions needed from this bundle 1027 positions := make([]int, len(job.dids)) 1028 for i, didIdx := range job.dids { 1029 positions[i] = locations[didIdx].position 1030 } 1031 1032 // Load operations once for this bundle 1033 ops, err := mgr.LoadOperations(ctx, job.bundleNum, positions) 1034 1035 if err != nil { 1036 // All DIDs from this bundle fail 1037 for range job.dids { 1038 writeResult(nil, err) 1039 } 1040 continue 1041 } 1042 1043 // Resolve each DID using loaded operations 1044 for i, didIdx := range job.dids { 1045 loc := locations[didIdx] 1046 1047 if op, ok := ops[positions[i]]; ok { 1048 doc, resolveErr := plcclient.ResolveDIDDocument(loc.did, []plcclient.PLCOperation{*op}) 1049 writeResult(doc, resolveErr) 1050 } else { 1051 writeResult(nil, fmt.Errorf("operation not loaded")) 1052 } 1053 } 1054 } 1055 }() 1056 } 1057 1058 // Send bundle jobs 1059 for _, bg := range bundles { 1060 bundleJobs <- bg 1061 } 1062 close(bundleJobs) 1063 1064 wg.Wait() 1065 writer.Flush() 1066 progress.Finish() 1067 1068 totalDuration := time.Since(overallStart) 1069 1070 fmt.Fprintf(os.Stderr, "\n✓ Batch resolve complete\n") 1071 fmt.Fprintf(os.Stderr, " Resolved: %d/%d\n", resolved, len(dids)) 1072 if failed > 0 { 1073 fmt.Fprintf(os.Stderr, " Failed: %d\n", failed) 1074 } 1075 fmt.Fprintf(os.Stderr, " Total: %s (%.1f DIDs/sec)\n", 1076 totalDuration.Round(time.Millisecond), 1077 float64(resolved)/totalDuration.Seconds()) 1078 1079 return nil 1080} 1081 1082// Helper function to find latest non-nullified op in mempool 1083func findLatestInMempool(mgr BundleManager, did string) *plcclient.PLCOperation { 1084 ops, err := mgr.GetDIDOperationsFromMempool(did) 1085 if err != nil || len(ops) == 0 { 1086 return nil 1087 } 1088 1089 // Search backwards from most recent 1090 for i := len(ops) - 1; i >= 0; i-- { 1091 if !ops[i].IsNullified() { 1092 return &ops[i] 1093 } 1094 } 1095 1096 return nil 1097} 1098 1099func batchExport(mgr BundleManager, dids []string, output *os.File, _ int) error { 1100 progress := ui.NewProgressBar(len(dids)) 1101 ctx := context.Background() 1102 1103 totalOps := 0 1104 processedDIDs := 0 1105 errorCount := 0 1106 1107 // Use buffered writer for better performance 1108 writer := bufio.NewWriterSize(output, 512*1024) 1109 defer writer.Flush() 1110 1111 for i, did := range dids { 1112 _, opsWithLoc, err := mgr.GetDIDOperations(ctx, did, false) 1113 if err != nil { 1114 errorCount++ 1115 if i < 10 { // Only log first few errors 1116 fmt.Fprintf(os.Stderr, "Error processing %s: %v\n", did, err) 1117 } 1118 progress.Set(i + 1) 1119 continue 1120 } 1121 1122 // Get mempool operations too 1123 mempoolOps, _ := mgr.GetDIDOperationsFromMempool(did) 1124 1125 if len(opsWithLoc) == 0 && len(mempoolOps) == 0 { 1126 progress.Set(i + 1) 1127 continue 1128 } 1129 1130 processedDIDs++ 1131 1132 // Export bundled operations 1133 for _, owl := range opsWithLoc { 1134 if len(owl.Operation.RawJSON) > 0 { 1135 writer.Write(owl.Operation.RawJSON) 1136 } else { 1137 data, _ := json.Marshal(owl.Operation) 1138 writer.Write(data) 1139 } 1140 writer.WriteByte('\n') 1141 totalOps++ 1142 } 1143 1144 // Export mempool operations 1145 for _, op := range mempoolOps { 1146 if len(op.RawJSON) > 0 { 1147 writer.Write(op.RawJSON) 1148 } else { 1149 data, _ := json.Marshal(op) 1150 writer.Write(data) 1151 } 1152 writer.WriteByte('\n') 1153 totalOps++ 1154 } 1155 1156 // Flush periodically 1157 if i%100 == 0 { 1158 writer.Flush() 1159 } 1160 1161 progress.Set(i + 1) 1162 } 1163 1164 writer.Flush() 1165 progress.Finish() 1166 1167 fmt.Fprintf(os.Stderr, "\n✓ Batch export complete\n") 1168 fmt.Fprintf(os.Stderr, " DIDs input: %d\n", len(dids)) 1169 fmt.Fprintf(os.Stderr, " DIDs processed: %d\n", processedDIDs) 1170 fmt.Fprintf(os.Stderr, " Operations: %s\n", formatNumber(totalOps)) 1171 if errorCount > 0 { 1172 fmt.Fprintf(os.Stderr, " Errors: %d\n", errorCount) 1173 } 1174 1175 return nil 1176} 1177 1178// ============================================================================ 1179// Shared Helper Functions (used by both DID and legacy index commands) 1180// ============================================================================ 1181 1182func outputLookupJSON(did string, opsWithLoc []PLCOperationWithLocation, mempoolOps []plcclient.PLCOperation, totalElapsed, lookupElapsed, mempoolElapsed time.Duration) error { 1183 output := map[string]interface{}{ 1184 "found": true, 1185 "did": did, 1186 "timing": map[string]interface{}{ 1187 "total_ms": totalElapsed.Milliseconds(), 1188 "lookup_ms": lookupElapsed.Milliseconds(), 1189 "mempool_ms": mempoolElapsed.Milliseconds(), 1190 }, 1191 "bundled": make([]map[string]interface{}, 0), 1192 "mempool": make([]map[string]interface{}, 0), 1193 } 1194 1195 for _, owl := range opsWithLoc { 1196 output["bundled"] = append(output["bundled"].([]map[string]interface{}), map[string]interface{}{ 1197 "bundle": owl.Bundle, 1198 "position": owl.Position, 1199 "cid": owl.Operation.CID, 1200 "nullified": owl.Operation.IsNullified(), 1201 "created_at": owl.Operation.CreatedAt.Format(time.RFC3339Nano), 1202 }) 1203 } 1204 1205 for _, op := range mempoolOps { 1206 output["mempool"] = append(output["mempool"].([]map[string]interface{}), map[string]interface{}{ 1207 "cid": op.CID, 1208 "nullified": op.IsNullified(), 1209 "created_at": op.CreatedAt.Format(time.RFC3339Nano), 1210 }) 1211 } 1212 1213 data, _ := json.MarshalIndent(output, "", " ") 1214 fmt.Println(string(data)) 1215 1216 return nil 1217} 1218 1219func displayLookupResults(did string, opsWithLoc []PLCOperationWithLocation, mempoolOps []plcclient.PLCOperation, totalElapsed, lookupElapsed, mempoolElapsed time.Duration, verbose bool, _ map[string]interface{}) error { 1220 nullifiedCount := 0 1221 for _, owl := range opsWithLoc { 1222 if owl.Operation.IsNullified() { 1223 nullifiedCount++ 1224 } 1225 } 1226 1227 totalOps := len(opsWithLoc) + len(mempoolOps) 1228 activeOps := len(opsWithLoc) - nullifiedCount + len(mempoolOps) 1229 1230 fmt.Printf("═══════════════════════════════════════════════════════════════\n") 1231 fmt.Printf(" DID Lookup Results\n") 1232 fmt.Printf("═══════════════════════════════════════════════════════════════\n\n") 1233 fmt.Printf("DID: %s\n\n", did) 1234 1235 fmt.Printf("Summary\n───────\n") 1236 fmt.Printf(" Total operations: %d\n", totalOps) 1237 fmt.Printf(" Active operations: %d\n", activeOps) 1238 if nullifiedCount > 0 { 1239 fmt.Printf(" Nullified: %d\n", nullifiedCount) 1240 } 1241 if len(opsWithLoc) > 0 { 1242 fmt.Printf(" Bundled: %d\n", len(opsWithLoc)) 1243 } 1244 if len(mempoolOps) > 0 { 1245 fmt.Printf(" Mempool: %d\n", len(mempoolOps)) 1246 } 1247 fmt.Printf("\n") 1248 1249 fmt.Printf("Performance\n───────────\n") 1250 fmt.Printf(" Index lookup: %s\n", lookupElapsed) 1251 fmt.Printf(" Mempool check: %s\n", mempoolElapsed) 1252 fmt.Printf(" Total time: %s\n\n", totalElapsed) 1253 1254 // Show operations 1255 if len(opsWithLoc) > 0 { 1256 fmt.Printf("Bundled Operations (%d total)\n", len(opsWithLoc)) 1257 fmt.Printf("══════════════════════════════════════════════════════════════\n\n") 1258 1259 for i, owl := range opsWithLoc { 1260 op := owl.Operation 1261 status := "✓ Active" 1262 if op.IsNullified() { 1263 status = "✗ Nullified" 1264 } 1265 1266 fmt.Printf("Operation %d [Bundle %06d, Position %04d]\n", i+1, owl.Bundle, owl.Position) 1267 fmt.Printf(" CID: %s\n", op.CID) 1268 fmt.Printf(" Created: %s\n", op.CreatedAt.Format("2006-01-02 15:04:05.000 MST")) 1269 fmt.Printf(" Status: %s\n", status) 1270 1271 if verbose && !op.IsNullified() { 1272 showOperationDetails(&op) 1273 } 1274 1275 fmt.Printf("\n") 1276 } 1277 } 1278 1279 fmt.Printf("✓ Lookup complete in %s\n", totalElapsed) 1280 return nil 1281} 1282 1283func showOperationDetails(op *plcclient.PLCOperation) { 1284 if opData, err := op.GetOperationData(); err == nil && opData != nil { 1285 if opType, ok := opData["type"].(string); ok { 1286 fmt.Printf(" Type: %s\n", opType) 1287 } 1288 1289 if handle, ok := opData["handle"].(string); ok { 1290 fmt.Printf(" Handle: %s\n", handle) 1291 } else if aka, ok := opData["alsoKnownAs"].([]interface{}); ok && len(aka) > 0 { 1292 if akaStr, ok := aka[0].(string); ok { 1293 handle := strings.TrimPrefix(akaStr, "at://") 1294 fmt.Printf(" Handle: %s\n", handle) 1295 } 1296 } 1297 } 1298}