A Transparent and Verifiable Way to Sync the AT Protocol's PLC Directory

index inside of bundle

+1291 -117
+896
cmd/plcbundle/commands/inspect.go
··· 1 + package commands 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "os" 7 + "path/filepath" 8 + "sort" 9 + "strings" 10 + "time" 11 + 12 + "github.com/goccy/go-json" 13 + "github.com/spf13/cobra" 14 + "tangled.org/atscan.net/plcbundle/internal/storage" 15 + ) 16 + 17 + // ============================================================================ 18 + // TYPES (defined at package level to avoid conflicts) 19 + // ============================================================================ 20 + 21 + type DIDActivity struct { 22 + DID string 23 + Count int 24 + } 25 + 26 + type DomainCount struct { 27 + Domain string 28 + Count int 29 + } 30 + 31 + type EndpointCount struct { 32 + Endpoint string 33 + Count int 34 + } 35 + 36 + type TimeSlot struct { 37 + Time time.Time 38 + Count int 39 + } 40 + 41 + type inspectOptions struct { 42 + showJSON bool 43 + verify bool 44 + showSamples bool 45 + sampleCount int 46 + skipMetadata bool 47 + skipPatterns bool 48 + skipCrypto bool 49 + verbose bool 50 + } 51 + 52 + type inspectResult struct { 53 + // Metadata 54 + Metadata *storage.BundleMetadata 55 + 56 + // Basic stats 57 + FilePath string 58 + FileSize int64 59 + HasMetadataFrame bool 60 + HasFrameIndex bool 61 + 62 + // Operation analysis 63 + TotalOps int 64 + NullifiedOps int 65 + ActiveOps int 66 + UniqueDIDs int 67 + OperationTypes map[string]int 68 + 69 + // DID patterns 70 + TopDIDs []DIDActivity 71 + SingleOpDIDs int 72 + MultiOpDIDs int 73 + 74 + // Handle patterns 75 + TotalHandles int 76 + TopDomains []DomainCount 77 + InvalidHandles int 78 + 79 + // Service patterns 80 + TotalServices int 81 + UniqueEndpoints int 82 + TopPDSEndpoints []EndpointCount 83 + 84 + // Temporal 85 + TimeDistribution []TimeSlot 86 + AvgOpsPerMinute float64 87 + 88 + // Size analysis 89 + AvgOpSize int 90 + MinOpSize int 91 + MaxOpSize int 92 + TotalOpSize int64 93 + 94 + // Crypto verification 95 + ContentHashValid bool 96 + CompressedHashValid bool 97 + MetadataValid bool 98 + 99 + // Timing 100 + LoadTime time.Duration 101 + AnalyzeTime time.Duration 102 + VerifyTime time.Duration 103 + TotalTime time.Duration 104 + } 105 + 106 + type bundleAnalysis struct { 107 + TotalOps int 108 + NullifiedOps int 109 + ActiveOps int 110 + UniqueDIDs int 111 + OperationTypes map[string]int 112 + SingleOpDIDs int 113 + MultiOpDIDs int 114 + TotalHandles int 115 + InvalidHandles int 116 + TotalServices int 117 + UniqueEndpoints int 118 + AvgOpsPerMinute float64 119 + AvgOpSize int 120 + MinOpSize int 121 + MaxOpSize int 122 + TotalOpSize int64 123 + 124 + // For top-N calculations 125 + didActivity map[string]int 126 + domainCounts map[string]int 127 + endpointCounts map[string]int 128 + timeSlots map[int64]int 129 + 130 + // Results 131 + TopDIDs []DIDActivity 132 + TopDomains []DomainCount 133 + TopPDSEndpoints []EndpointCount 134 + TimeDistribution []TimeSlot 135 + } 136 + 137 + // ============================================================================ 138 + // COMMAND DEFINITION 139 + // ============================================================================ 140 + 141 + func NewInspectCommand() *cobra.Command { 142 + var ( 143 + showJSON bool 144 + verify bool 145 + showSamples bool 146 + sampleCount int 147 + skipMetadata bool 148 + skipPatterns bool 149 + skipCrypto bool 150 + ) 151 + 152 + cmd := &cobra.Command{ 153 + Use: "inspect <bundle-number|bundle-file>", 154 + Short: "Deep analysis of bundle contents", 155 + Long: `Deep analysis of bundle contents 156 + 157 + Performs comprehensive analysis of a bundle including: 158 + • Embedded metadata (from skippable frame) 159 + • Operation type breakdown 160 + • DID activity patterns 161 + • Handle and domain statistics 162 + • Service endpoint analysis 163 + • Temporal distribution 164 + • Cryptographic verification 165 + • Size analysis 166 + 167 + Can inspect either by bundle number (from repository) or direct file path.`, 168 + 169 + Example: ` # Inspect from repository 170 + plcbundle inspect 42 171 + 172 + # Inspect specific file 173 + plcbundle inspect /path/to/000042.jsonl.zst 174 + plcbundle inspect 000042.jsonl.zst 175 + 176 + # Skip certain analysis sections 177 + plcbundle inspect 42 --skip-patterns --skip-crypto 178 + 179 + # Show sample operations 180 + plcbundle inspect 42 --samples --sample-count 20 181 + 182 + # Verify all hashes 183 + plcbundle inspect 42 --verify 184 + 185 + # JSON output (for scripting) 186 + plcbundle inspect 42 --json`, 187 + 188 + Args: cobra.ExactArgs(1), 189 + 190 + RunE: func(cmd *cobra.Command, args []string) error { 191 + input := args[0] 192 + verbose, _ := cmd.Root().PersistentFlags().GetBool("verbose") 193 + 194 + return runInspect(cmd, input, inspectOptions{ 195 + showJSON: showJSON, 196 + verify: verify, 197 + showSamples: showSamples, 198 + sampleCount: sampleCount, 199 + skipMetadata: skipMetadata, 200 + skipPatterns: skipPatterns, 201 + skipCrypto: skipCrypto, 202 + verbose: verbose, 203 + }) 204 + }, 205 + } 206 + 207 + cmd.Flags().BoolVar(&showJSON, "json", false, "Output as JSON") 208 + cmd.Flags().BoolVar(&verify, "verify", false, "Verify cryptographic hashes") 209 + cmd.Flags().BoolVar(&showSamples, "samples", false, "Show sample operations") 210 + cmd.Flags().IntVar(&sampleCount, "sample-count", 10, "Number of samples to show") 211 + cmd.Flags().BoolVar(&skipMetadata, "skip-metadata", false, "Skip embedded metadata section") 212 + cmd.Flags().BoolVar(&skipPatterns, "skip-patterns", false, "Skip pattern analysis") 213 + cmd.Flags().BoolVar(&skipCrypto, "skip-crypto", false, "Skip cryptographic verification") 214 + 215 + return cmd 216 + } 217 + 218 + // ============================================================================ 219 + // MAIN LOGIC 220 + // ============================================================================ 221 + 222 + func runInspect(cmd *cobra.Command, input string, opts inspectOptions) error { 223 + totalStart := time.Now() 224 + 225 + // Determine if input is bundle number or file path 226 + bundlePath, bundleNum, err := resolveBundlePath(cmd, input) 227 + if err != nil { 228 + return err 229 + } 230 + 231 + result := &inspectResult{ 232 + FilePath: bundlePath, 233 + OperationTypes: make(map[string]int), 234 + TopDIDs: make([]DIDActivity, 0), 235 + TopDomains: make([]DomainCount, 0), 236 + TopPDSEndpoints: make([]EndpointCount, 0), 237 + } 238 + 239 + // Check file exists 240 + info, err := os.Stat(bundlePath) 241 + if err != nil { 242 + return fmt.Errorf("bundle file not found: %w", err) 243 + } 244 + result.FileSize = info.Size() 245 + 246 + // Check for frame index 247 + indexPath := bundlePath + ".idx" 248 + if _, err := os.Stat(indexPath); err == nil { 249 + result.HasFrameIndex = true 250 + } 251 + 252 + fmt.Fprintf(os.Stderr, "Inspecting: %s\n", filepath.Base(bundlePath)) 253 + fmt.Fprintf(os.Stderr, "File size: %s\n\n", formatBytes(result.FileSize)) 254 + 255 + // SECTION 1: Extract embedded metadata (fast!) 256 + if !opts.skipMetadata { 257 + fmt.Fprintf(os.Stderr, "Reading embedded metadata...\n") 258 + metaStart := time.Now() 259 + 260 + ops := &storage.Operations{} 261 + meta, err := ops.ExtractBundleMetadata(bundlePath) 262 + if err != nil { 263 + if opts.verbose { 264 + fmt.Fprintf(os.Stderr, " No embedded metadata: %v\n", err) 265 + } 266 + result.HasMetadataFrame = false 267 + } else { 268 + result.HasMetadataFrame = true 269 + result.Metadata = meta 270 + if opts.verbose { 271 + fmt.Fprintf(os.Stderr, " ✓ Extracted in %s\n", time.Since(metaStart)) 272 + } 273 + } 274 + fmt.Fprintf(os.Stderr, "\n") 275 + } 276 + 277 + // SECTION 2: Load and analyze operations 278 + fmt.Fprintf(os.Stderr, "Loading and analyzing operations...\n") 279 + loadStart := time.Now() 280 + 281 + analysis, err := analyzeBundle(bundlePath, opts) 282 + if err != nil { 283 + return fmt.Errorf("analysis failed: %w", err) 284 + } 285 + 286 + result.LoadTime = time.Since(loadStart) 287 + result.TotalOps = analysis.TotalOps 288 + result.NullifiedOps = analysis.NullifiedOps 289 + result.ActiveOps = analysis.ActiveOps 290 + result.UniqueDIDs = analysis.UniqueDIDs 291 + result.OperationTypes = analysis.OperationTypes 292 + result.TopDIDs = analysis.TopDIDs 293 + result.SingleOpDIDs = analysis.SingleOpDIDs 294 + result.MultiOpDIDs = analysis.MultiOpDIDs 295 + result.TotalHandles = analysis.TotalHandles 296 + result.TopDomains = analysis.TopDomains 297 + result.InvalidHandles = analysis.InvalidHandles 298 + result.TotalServices = analysis.TotalServices 299 + result.UniqueEndpoints = analysis.UniqueEndpoints 300 + result.TopPDSEndpoints = analysis.TopPDSEndpoints 301 + result.TimeDistribution = analysis.TimeDistribution 302 + result.AvgOpsPerMinute = analysis.AvgOpsPerMinute 303 + result.AvgOpSize = analysis.AvgOpSize 304 + result.MinOpSize = analysis.MinOpSize 305 + result.MaxOpSize = analysis.MaxOpSize 306 + result.TotalOpSize = analysis.TotalOpSize 307 + 308 + fmt.Fprintf(os.Stderr, " ✓ Analyzed in %s\n\n", result.LoadTime) 309 + 310 + // SECTION 3: Cryptographic verification 311 + if opts.verify && !opts.skipCrypto { 312 + fmt.Fprintf(os.Stderr, "Verifying cryptographic hashes...\n") 313 + verifyStart := time.Now() 314 + 315 + // ✅ Pass cmd parameter 316 + result.ContentHashValid, result.CompressedHashValid, result.MetadataValid = 317 + verifyCrypto(cmd, bundlePath, result.Metadata, bundleNum, opts.verbose) 318 + 319 + result.VerifyTime = time.Since(verifyStart) 320 + fmt.Fprintf(os.Stderr, " ✓ Verified in %s\n\n", result.VerifyTime) 321 + } 322 + 323 + result.TotalTime = time.Since(totalStart) 324 + 325 + // Display results 326 + if opts.showJSON { 327 + return displayInspectJSON(result) 328 + } 329 + 330 + return displayInspectHuman(result, analysis, opts) 331 + } 332 + 333 + // ============================================================================ 334 + // ANALYSIS FUNCTIONS 335 + // ============================================================================ 336 + 337 + func analyzeBundle(path string, opts inspectOptions) (*bundleAnalysis, error) { 338 + ops := &storage.Operations{} 339 + operations, err := ops.LoadBundle(path) 340 + if err != nil { 341 + return nil, err 342 + } 343 + 344 + analysis := &bundleAnalysis{ 345 + TotalOps: len(operations), 346 + OperationTypes: make(map[string]int), 347 + didActivity: make(map[string]int), 348 + domainCounts: make(map[string]int), 349 + endpointCounts: make(map[string]int), 350 + timeSlots: make(map[int64]int), 351 + } 352 + 353 + // Analyze each operation 354 + for _, op := range operations { 355 + // Nullification 356 + if op.IsNullified() { 357 + analysis.NullifiedOps++ 358 + } else { 359 + analysis.ActiveOps++ 360 + } 361 + 362 + // DID activity 363 + analysis.didActivity[op.DID]++ 364 + 365 + // Size stats 366 + opSize := len(op.RawJSON) 367 + if opSize == 0 { 368 + data, _ := json.Marshal(op) 369 + opSize = len(data) 370 + } 371 + 372 + analysis.TotalOpSize += int64(opSize) 373 + if analysis.MinOpSize == 0 || opSize < analysis.MinOpSize { 374 + analysis.MinOpSize = opSize 375 + } 376 + if opSize > analysis.MaxOpSize { 377 + analysis.MaxOpSize = opSize 378 + } 379 + 380 + // Parse operation for detailed analysis 381 + opData, err := op.GetOperationData() 382 + if err != nil || opData == nil { 383 + continue 384 + } 385 + 386 + // Operation type 387 + if opType, ok := opData["type"].(string); ok { 388 + analysis.OperationTypes[opType]++ 389 + } 390 + 391 + // Handle analysis 392 + if !opts.skipPatterns { 393 + analyzeHandles(opData, analysis) 394 + analyzeServices(opData, analysis) 395 + } 396 + 397 + // Time distribution (group by minute) 398 + timeSlot := op.CreatedAt.Unix() / 60 399 + analysis.timeSlots[timeSlot]++ 400 + } 401 + 402 + // Calculate derived stats 403 + analysis.UniqueDIDs = len(analysis.didActivity) 404 + if analysis.TotalOps > 0 { 405 + analysis.AvgOpSize = int(analysis.TotalOpSize / int64(analysis.TotalOps)) 406 + } 407 + 408 + // Count single vs multi-op DIDs 409 + for _, count := range analysis.didActivity { 410 + if count == 1 { 411 + analysis.SingleOpDIDs++ 412 + } else { 413 + analysis.MultiOpDIDs++ 414 + } 415 + } 416 + 417 + // Top DIDs 418 + analysis.TopDIDs = getTopDIDs(analysis.didActivity, 10) 419 + 420 + // Top domains 421 + analysis.TopDomains = getTopDomains(analysis.domainCounts, 10) 422 + 423 + // Top endpoints 424 + analysis.TopPDSEndpoints = getTopEndpoints(analysis.endpointCounts, 10) 425 + 426 + // Unique endpoints 427 + analysis.UniqueEndpoints = len(analysis.endpointCounts) 428 + 429 + // Time distribution 430 + analysis.TimeDistribution = getTimeDistribution(analysis.timeSlots) 431 + 432 + // Calculate ops per minute 433 + if len(operations) > 1 { 434 + duration := operations[len(operations)-1].CreatedAt.Sub(operations[0].CreatedAt) 435 + if duration.Minutes() > 0 { 436 + analysis.AvgOpsPerMinute = float64(len(operations)) / duration.Minutes() 437 + } 438 + } 439 + 440 + return analysis, nil 441 + } 442 + 443 + func analyzeHandles(opData map[string]interface{}, analysis *bundleAnalysis) { 444 + if aka, ok := opData["alsoKnownAs"].([]interface{}); ok { 445 + for _, a := range aka { 446 + if akaStr, ok := a.(string); ok { 447 + if strings.HasPrefix(akaStr, "at://") { 448 + analysis.TotalHandles++ 449 + 450 + // Extract domain 451 + handle := strings.TrimPrefix(akaStr, "at://") 452 + if idx := strings.Index(handle, "/"); idx > 0 { 453 + handle = handle[:idx] 454 + } 455 + 456 + // Count domain (TLD) 457 + parts := strings.Split(handle, ".") 458 + if len(parts) >= 2 { 459 + domain := parts[len(parts)-1] 460 + if len(parts) >= 2 { 461 + domain = parts[len(parts)-2] + "." + domain 462 + } 463 + analysis.domainCounts[domain]++ 464 + } 465 + 466 + // Check for invalid patterns 467 + if strings.Contains(handle, "_") { 468 + analysis.InvalidHandles++ 469 + } 470 + } 471 + } 472 + } 473 + } 474 + } 475 + 476 + func analyzeServices(opData map[string]interface{}, analysis *bundleAnalysis) { 477 + if services, ok := opData["services"].(map[string]interface{}); ok { 478 + analysis.TotalServices += len(services) 479 + 480 + // Extract PDS endpoints 481 + if pds, ok := services["atproto_pds"].(map[string]interface{}); ok { 482 + if endpoint, ok := pds["endpoint"].(string); ok { 483 + // Normalize endpoint 484 + endpoint = strings.TrimPrefix(endpoint, "https://") 485 + endpoint = strings.TrimPrefix(endpoint, "http://") 486 + if idx := strings.Index(endpoint, "/"); idx > 0 { 487 + endpoint = endpoint[:idx] 488 + } 489 + analysis.endpointCounts[endpoint]++ 490 + } 491 + } 492 + } 493 + } 494 + 495 + func getTopDIDs(didActivity map[string]int, limit int) []DIDActivity { 496 + var results []DIDActivity 497 + for did, count := range didActivity { 498 + results = append(results, DIDActivity{DID: did, Count: count}) 499 + } 500 + 501 + sort.Slice(results, func(i, j int) bool { 502 + return results[i].Count > results[j].Count 503 + }) 504 + 505 + if len(results) > limit { 506 + results = results[:limit] 507 + } 508 + 509 + return results 510 + } 511 + 512 + func getTopDomains(domainCounts map[string]int, limit int) []DomainCount { 513 + var results []DomainCount 514 + for domain, count := range domainCounts { 515 + results = append(results, DomainCount{Domain: domain, Count: count}) 516 + } 517 + 518 + sort.Slice(results, func(i, j int) bool { 519 + return results[i].Count > results[j].Count 520 + }) 521 + 522 + if len(results) > limit { 523 + results = results[:limit] 524 + } 525 + 526 + return results 527 + } 528 + 529 + func getTopEndpoints(endpointCounts map[string]int, limit int) []EndpointCount { 530 + var results []EndpointCount 531 + for endpoint, count := range endpointCounts { 532 + results = append(results, EndpointCount{Endpoint: endpoint, Count: count}) 533 + } 534 + 535 + sort.Slice(results, func(i, j int) bool { 536 + return results[i].Count > results[j].Count 537 + }) 538 + 539 + if len(results) > limit { 540 + results = results[:limit] 541 + } 542 + 543 + return results 544 + } 545 + 546 + func getTimeDistribution(timeSlots map[int64]int) []TimeSlot { 547 + var results []TimeSlot 548 + for slot, count := range timeSlots { 549 + results = append(results, TimeSlot{ 550 + Time: time.Unix(slot*60, 0), 551 + Count: count, 552 + }) 553 + } 554 + 555 + sort.Slice(results, func(i, j int) bool { 556 + return results[i].Time.Before(results[j].Time) 557 + }) 558 + 559 + return results 560 + } 561 + 562 + // ============================================================================ 563 + // DISPLAY FUNCTIONS 564 + // ============================================================================ 565 + 566 + func displayInspectHuman(result *inspectResult, analysis *bundleAnalysis, opts inspectOptions) error { 567 + fmt.Printf("\n") 568 + fmt.Printf("═══════════════════════════════════════════════════════════════\n") 569 + fmt.Printf(" Bundle Deep Inspection\n") 570 + fmt.Printf("═══════════════════════════════════════════════════════════════\n\n") 571 + 572 + // File info 573 + fmt.Printf("📁 File Information\n") 574 + fmt.Printf("───────────────────\n") 575 + fmt.Printf(" Path: %s\n", filepath.Base(result.FilePath)) 576 + fmt.Printf(" Size: %s\n", formatBytes(result.FileSize)) 577 + fmt.Printf(" Has metadata frame: %v\n", result.HasMetadataFrame) 578 + fmt.Printf(" Has frame index: %v\n\n", result.HasFrameIndex) 579 + 580 + // Embedded metadata 581 + if result.HasMetadataFrame && result.Metadata != nil && !opts.skipMetadata { 582 + meta := result.Metadata 583 + fmt.Printf("📋 Embedded Metadata (Skippable Frame)\n") 584 + fmt.Printf("──────────────────────────────────────\n") 585 + fmt.Printf(" Bundle Number: %06d\n", meta.BundleNumber) 586 + if meta.Origin != "" { 587 + fmt.Printf(" Origin: %s\n", meta.Origin) 588 + } 589 + fmt.Printf(" Operations: %s\n", formatNumber(meta.OperationCount)) 590 + fmt.Printf(" DIDs: %s unique\n", formatNumber(meta.DIDCount)) 591 + fmt.Printf(" Frames: %d\n", meta.FrameCount) 592 + fmt.Printf(" Uncompressed: %s\n", formatBytes(meta.UncompressedSize)) 593 + fmt.Printf(" Compressed: %s (%.2fx)\n", 594 + formatBytes(meta.CompressedSize), 595 + float64(meta.UncompressedSize)/float64(meta.CompressedSize)) 596 + fmt.Printf(" Timespan: %s → %s\n", 597 + meta.StartTime.Format("2006-01-02 15:04:05"), 598 + meta.EndTime.Format("2006-01-02 15:04:05")) 599 + fmt.Printf(" Duration: %s\n", 600 + formatDuration(meta.EndTime.Sub(meta.StartTime))) 601 + 602 + if meta.ContentHash != "" { 603 + fmt.Printf("\n Hashes:\n") 604 + fmt.Printf(" Content: %s\n", meta.ContentHash[:16]+"...") 605 + fmt.Printf(" Compressed: %s\n", meta.CompressedHash[:16]+"...") 606 + if meta.ParentHash != "" { 607 + fmt.Printf(" Parent: %s\n", meta.ParentHash[:16]+"...") 608 + } 609 + } 610 + fmt.Printf("\n") 611 + } 612 + 613 + // Operations breakdown 614 + fmt.Printf("📊 Operations Analysis\n") 615 + fmt.Printf("──────────────────────\n") 616 + fmt.Printf(" Total operations: %s\n", formatNumber(result.TotalOps)) 617 + fmt.Printf(" Active: %s (%.1f%%)\n", 618 + formatNumber(result.ActiveOps), 619 + float64(result.ActiveOps)/float64(result.TotalOps)*100) 620 + if result.NullifiedOps > 0 { 621 + fmt.Printf(" Nullified: %s (%.1f%%)\n", 622 + formatNumber(result.NullifiedOps), 623 + float64(result.NullifiedOps)/float64(result.TotalOps)*100) 624 + } 625 + 626 + if len(result.OperationTypes) > 0 { 627 + fmt.Printf("\n Operation Types:\n") 628 + 629 + // Sort by count 630 + var types []struct { 631 + name string 632 + count int 633 + } 634 + for name, count := range result.OperationTypes { 635 + types = append(types, struct { 636 + name string 637 + count int 638 + }{name, count}) 639 + } 640 + sort.Slice(types, func(i, j int) bool { 641 + return types[i].count > types[j].count 642 + }) 643 + 644 + for _, t := range types { 645 + pct := float64(t.count) / float64(result.TotalOps) * 100 646 + fmt.Printf(" %-25s %s (%.1f%%)\n", t.name, formatNumber(t.count), pct) 647 + } 648 + } 649 + fmt.Printf("\n") 650 + 651 + // DID patterns 652 + fmt.Printf("👤 DID Activity Patterns\n") 653 + fmt.Printf("────────────────────────\n") 654 + fmt.Printf(" Unique DIDs: %s\n", formatNumber(result.UniqueDIDs)) 655 + fmt.Printf(" Single-op DIDs: %s (%.1f%%)\n", 656 + formatNumber(result.SingleOpDIDs), 657 + float64(result.SingleOpDIDs)/float64(result.UniqueDIDs)*100) 658 + fmt.Printf(" Multi-op DIDs: %s (%.1f%%)\n", 659 + formatNumber(result.MultiOpDIDs), 660 + float64(result.MultiOpDIDs)/float64(result.UniqueDIDs)*100) 661 + 662 + if len(result.TopDIDs) > 0 { 663 + fmt.Printf("\n Most Active DIDs:\n") 664 + for i, da := range result.TopDIDs { 665 + if i >= 5 { 666 + break 667 + } 668 + fmt.Printf(" %d. %s (%d ops)\n", i+1, da.DID, da.Count) 669 + } 670 + } 671 + fmt.Printf("\n") 672 + 673 + // Handle patterns 674 + if !opts.skipPatterns && result.TotalHandles > 0 { 675 + fmt.Printf("🏷️ Handle Statistics\n") 676 + fmt.Printf("────────────────────\n") 677 + fmt.Printf(" Total handles: %s\n", formatNumber(result.TotalHandles)) 678 + if result.InvalidHandles > 0 { 679 + fmt.Printf(" Invalid patterns: %s (%.1f%%)\n", 680 + formatNumber(result.InvalidHandles), 681 + float64(result.InvalidHandles)/float64(result.TotalHandles)*100) 682 + } 683 + 684 + if len(result.TopDomains) > 0 { 685 + fmt.Printf("\n Top Domains:\n") 686 + for i, dc := range result.TopDomains { 687 + if i >= 10 { 688 + break 689 + } 690 + pct := float64(dc.Count) / float64(result.TotalHandles) * 100 691 + fmt.Printf(" %-25s %s (%.1f%%)\n", dc.Domain, formatNumber(dc.Count), pct) 692 + } 693 + } 694 + fmt.Printf("\n") 695 + } 696 + 697 + // Service patterns 698 + if !opts.skipPatterns && result.TotalServices > 0 { 699 + fmt.Printf("🌐 Service Endpoints\n") 700 + fmt.Printf("────────────────────\n") 701 + fmt.Printf(" Total services: %s\n", formatNumber(result.TotalServices)) 702 + fmt.Printf(" Unique endpoints: %s\n", formatNumber(result.UniqueEndpoints)) 703 + 704 + if len(result.TopPDSEndpoints) > 0 { 705 + fmt.Printf("\n Top PDS Endpoints:\n") 706 + for i, ec := range result.TopPDSEndpoints { 707 + if i >= 10 { 708 + break 709 + } 710 + fmt.Printf(" %-40s %s ops\n", ec.Endpoint, formatNumber(ec.Count)) 711 + } 712 + } 713 + fmt.Printf("\n") 714 + } 715 + 716 + // Temporal analysis 717 + fmt.Printf("⏱️ Temporal Distribution\n") 718 + fmt.Printf("───────────────────────\n") 719 + if len(result.TimeDistribution) > 0 { 720 + first := result.TimeDistribution[0] 721 + last := result.TimeDistribution[len(result.TimeDistribution)-1] 722 + duration := last.Time.Sub(first.Time) 723 + 724 + fmt.Printf(" Start: %s\n", first.Time.Format("2006-01-02 15:04:05")) 725 + fmt.Printf(" End: %s\n", last.Time.Format("2006-01-02 15:04:05")) 726 + fmt.Printf(" Duration: %s\n", formatDuration(duration)) 727 + fmt.Printf(" Avg ops/minute: %.1f\n", result.AvgOpsPerMinute) 728 + fmt.Printf(" Time slots: %d minutes\n", len(result.TimeDistribution)) 729 + 730 + // Find peak activity 731 + maxSlot := result.TimeDistribution[0] 732 + for _, slot := range result.TimeDistribution { 733 + if slot.Count > maxSlot.Count { 734 + maxSlot = slot 735 + } 736 + } 737 + fmt.Printf(" Peak activity: %d ops at %s\n", 738 + maxSlot.Count, maxSlot.Time.Format("15:04")) 739 + } 740 + fmt.Printf("\n") 741 + 742 + // Size analysis 743 + fmt.Printf("📏 Size Analysis\n") 744 + fmt.Printf("────────────────\n") 745 + fmt.Printf(" Total data: %s\n", formatBytes(result.TotalOpSize)) 746 + fmt.Printf(" Average per op: %s\n", formatBytes(int64(result.AvgOpSize))) 747 + fmt.Printf(" Min operation: %s\n", formatBytes(int64(result.MinOpSize))) 748 + fmt.Printf(" Max operation: %s\n\n", formatBytes(int64(result.MaxOpSize))) 749 + 750 + // Cryptographic verification 751 + if opts.verify && !opts.skipCrypto { 752 + fmt.Printf("🔐 Cryptographic Verification\n") 753 + fmt.Printf("─────────────────────────────\n") 754 + 755 + status := func(valid bool) string { 756 + if valid { 757 + return "✓ Valid" 758 + } 759 + return "✗ Invalid" 760 + } 761 + 762 + fmt.Printf(" Content hash: %s\n", status(result.ContentHashValid)) 763 + fmt.Printf(" Compressed hash: %s\n", status(result.CompressedHashValid)) 764 + if result.HasMetadataFrame { 765 + fmt.Printf(" Metadata integrity: %s\n", status(result.MetadataValid)) 766 + } 767 + fmt.Printf("\n") 768 + } 769 + 770 + // Performance summary 771 + fmt.Printf("⚡ Performance\n") 772 + fmt.Printf("──────────────\n") 773 + fmt.Printf(" Load time: %s\n", result.LoadTime) 774 + if opts.verify { 775 + fmt.Printf(" Verify time: %s\n", result.VerifyTime) 776 + } 777 + fmt.Printf(" Total time: %s\n", result.TotalTime) 778 + if result.LoadTime.Seconds() > 0 { 779 + opsPerSec := float64(result.TotalOps) / result.LoadTime.Seconds() 780 + mbPerSec := float64(result.TotalOpSize) / result.LoadTime.Seconds() / (1024 * 1024) 781 + fmt.Printf(" Throughput: %.0f ops/sec, %.2f MB/s\n", opsPerSec, mbPerSec) 782 + } 783 + fmt.Printf("\n") 784 + 785 + return nil 786 + } 787 + 788 + func displayInspectJSON(result *inspectResult) error { 789 + data, _ := json.MarshalIndent(result, "", " ") 790 + fmt.Println(string(data)) 791 + return nil 792 + } 793 + 794 + func verifyCrypto(cmd *cobra.Command, path string, meta *storage.BundleMetadata, bundleNum int, verbose bool) (contentValid, compressedValid, metadataValid bool) { 795 + ops := &storage.Operations{} 796 + 797 + // Calculate actual hashes 798 + compHash, compSize, contentHash, contentSize, err := ops.CalculateFileHashes(path) 799 + if err != nil { 800 + if verbose { 801 + fmt.Fprintf(os.Stderr, " Hash calculation failed: %v\n", err) 802 + } 803 + return false, false, false 804 + } 805 + 806 + contentValid = true 807 + compressedValid = true 808 + metadataValid = true 809 + 810 + // Verify against embedded metadata if available 811 + if meta != nil { 812 + if meta.ContentHash != "" && meta.ContentHash != contentHash { 813 + contentValid = false 814 + if verbose { 815 + fmt.Fprintf(os.Stderr, " ✗ Content hash mismatch!\n") 816 + fmt.Fprintf(os.Stderr, " Expected: %s\n", meta.ContentHash) 817 + fmt.Fprintf(os.Stderr, " Actual: %s\n", contentHash) 818 + } 819 + } 820 + 821 + if meta.CompressedHash != "" && meta.CompressedHash != compHash { 822 + compressedValid = false 823 + if verbose { 824 + fmt.Fprintf(os.Stderr, " ✗ Compressed hash mismatch!\n") 825 + } 826 + } 827 + 828 + if meta.UncompressedSize != contentSize { 829 + metadataValid = false 830 + if verbose { 831 + fmt.Fprintf(os.Stderr, " ✗ Uncompressed size mismatch: meta=%d, actual=%d\n", 832 + meta.UncompressedSize, contentSize) 833 + } 834 + } 835 + 836 + if meta.CompressedSize != compSize { 837 + metadataValid = false 838 + if verbose { 839 + fmt.Fprintf(os.Stderr, " ✗ Compressed size mismatch: meta=%d, actual=%d\n", 840 + meta.CompressedSize, compSize) 841 + } 842 + } 843 + } 844 + 845 + // Also verify against repository index if bundle number is known 846 + if bundleNum > 0 { 847 + mgr, _, err := getManager(nil) 848 + if err == nil { 849 + defer mgr.Close() 850 + 851 + ctx := context.Background() 852 + vr, err := mgr.VerifyBundle(ctx, bundleNum) 853 + if err == nil { 854 + contentValid = contentValid && vr.Valid 855 + compressedValid = compressedValid && vr.HashMatch 856 + } 857 + } 858 + } 859 + 860 + return contentValid, compressedValid, metadataValid 861 + } 862 + 863 + func resolveBundlePath(cmd *cobra.Command, input string) (path string, bundleNum int, err error) { 864 + // Check if it's a file path 865 + if strings.HasSuffix(input, ".zst") || strings.Contains(input, "/") || strings.Contains(input, "\\") { 866 + absPath, err := filepath.Abs(input) 867 + if err != nil { 868 + return "", 0, err 869 + } 870 + 871 + // Try to extract bundle number from filename 872 + base := filepath.Base(absPath) 873 + fmt.Sscanf(base, "%d", &bundleNum) 874 + 875 + return absPath, bundleNum, nil 876 + } 877 + 878 + // Try to parse as bundle number 879 + if _, err := fmt.Sscanf(input, "%d", &bundleNum); err == nil { 880 + // Load from repository 881 + mgr, dir, err := getManager(&ManagerOptions{Cmd: cmd}) 882 + if err != nil { 883 + return "", 0, err 884 + } 885 + defer mgr.Close() 886 + 887 + path := filepath.Join(dir, fmt.Sprintf("%06d.jsonl.zst", bundleNum)) 888 + if _, err := os.Stat(path); err != nil { 889 + return "", 0, fmt.Errorf("bundle %d not found in repository", bundleNum) 890 + } 891 + 892 + return path, bundleNum, nil 893 + } 894 + 895 + return "", 0, fmt.Errorf("invalid input: must be bundle number or file path") 896 + }
+162 -92
cmd/plcbundle/commands/op.go
··· 53 53 // ============================================================================ 54 54 55 55 func newOpGetCommand() *cobra.Command { 56 + var verbose bool 57 + 56 58 cmd := &cobra.Command{ 57 59 Use: "get <bundle> <position> | <globalPosition>", 58 60 Short: "Get operation as JSON", ··· 62 64 1. Bundle number + position: get 42 1337 63 65 2. Global position: get 420000 64 66 65 - Global position = (bundleNumber × 10,000) + position`, 67 + Global position = (bundleNumber × 10,000) + position 68 + 69 + Use -v/--verbose to see detailed timing breakdown.`, 66 70 67 71 Example: ` # By bundle + position 68 72 plcbundle op get 42 1337 ··· 70 74 # By global position 71 75 plcbundle op get 88410345 72 76 77 + # With timing metrics 78 + plcbundle op get 42 1337 -v 79 + plcbundle op get 88410345 --verbose 80 + 73 81 # Pipe to jq 74 82 plcbundle op get 42 1337 | jq .did`, 75 83 ··· 88 96 defer mgr.Close() 89 97 90 98 ctx := context.Background() 99 + 100 + // ✅ Time the operation load 101 + totalStart := time.Now() 91 102 op, err := mgr.LoadOperation(ctx, bundleNum, position) 103 + totalDuration := time.Since(totalStart) 104 + 92 105 if err != nil { 93 106 return err 94 107 } 95 108 96 - // Output raw JSON 109 + // Output timing to stderr if verbose 110 + if verbose { 111 + globalPos := (bundleNum * 10000) + position 112 + 113 + fmt.Fprintf(os.Stderr, "Operation Load Metrics\n") 114 + fmt.Fprintf(os.Stderr, "══════════════════════\n\n") 115 + fmt.Fprintf(os.Stderr, " Location: Bundle %06d, Position %04d\n", bundleNum, position) 116 + fmt.Fprintf(os.Stderr, " Global Position: %d\n", globalPos) 117 + fmt.Fprintf(os.Stderr, " Total Time: %s\n", totalDuration) 118 + 119 + // Calculate throughput 120 + if len(op.RawJSON) > 0 { 121 + mbPerSec := float64(len(op.RawJSON)) / totalDuration.Seconds() / (1024 * 1024) 122 + fmt.Fprintf(os.Stderr, " Data Size: %d bytes\n", len(op.RawJSON)) 123 + fmt.Fprintf(os.Stderr, " Throughput: %.2f MB/s\n", mbPerSec) 124 + } 125 + 126 + fmt.Fprintf(os.Stderr, "\n") 127 + } 128 + 129 + // Output raw JSON to stdout 97 130 if len(op.RawJSON) > 0 { 98 131 fmt.Println(string(op.RawJSON)) 99 132 } else { ··· 105 138 }, 106 139 } 107 140 141 + cmd.Flags().BoolVarP(&verbose, "verbose", "v", false, "Show timing metrics") 142 + 108 143 return cmd 109 144 } 110 145 111 - // ============================================================================ 146 + // // ============================================================================ 112 147 // OP SHOW - Show operation (formatted) 113 148 // ============================================================================ 114 149 ··· 125 160 • DID and CID 126 161 • Timestamp and age 127 162 • Nullification status 128 - • Parsed operation details`, 163 + • Parsed operation details 164 + • Performance metrics (with -v)`, 129 165 130 166 Example: ` # By bundle + position 131 167 plcbundle op show 42 1337 ··· 133 169 # By global position 134 170 plcbundle op show 88410345 135 171 136 - # Verbose (show full operation JSON) 172 + # Verbose with timing and full JSON 137 173 plcbundle op show 42 1337 -v`, 138 174 139 175 Args: cobra.RangeArgs(1, 2), ··· 151 187 defer mgr.Close() 152 188 153 189 ctx := context.Background() 190 + 191 + // ✅ Time the operation 192 + loadStart := time.Now() 154 193 op, err := mgr.LoadOperation(ctx, bundleNum, position) 194 + loadDuration := time.Since(loadStart) 195 + 155 196 if err != nil { 156 197 return err 157 198 } 158 199 159 - return displayOperation(bundleNum, position, op, verbose) 200 + // ✅ Time the parsing 201 + parseStart := time.Now() 202 + opData, parseErr := op.GetOperationData() 203 + parseDuration := time.Since(parseStart) 204 + 205 + return displayOperationWithTiming(bundleNum, position, op, opData, parseErr, 206 + loadDuration, parseDuration, verbose) 160 207 }, 161 208 } 162 209 163 - cmd.Flags().BoolVarP(&verbose, "verbose", "v", false, "Show full operation JSON") 210 + cmd.Flags().BoolVarP(&verbose, "verbose", "v", false, "Show timing metrics and full JSON") 164 211 165 212 return cmd 166 213 } ··· 250 297 return 0, 0, fmt.Errorf("usage: op <command> <bundle> <position> OR op <command> <globalPosition>") 251 298 } 252 299 253 - // displayOperation shows formatted operation details 254 - func displayOperation(bundleNum, position int, op *plcclient.PLCOperation, verbose bool) error { 300 + // findOperationByCID searches for an operation by CID 301 + func findOperationByCID(mgr BundleManager, cid string) error { 302 + ctx := context.Background() 303 + 304 + // ✨ CHECK MEMPOOL FIRST (most recent data) 305 + fmt.Fprintf(os.Stderr, "Checking mempool...\n") 306 + mempoolOps, err := mgr.GetMempoolOperations() 307 + if err == nil && len(mempoolOps) > 0 { 308 + for pos, op := range mempoolOps { 309 + if op.CID == cid { 310 + fmt.Printf("Found in mempool: position %d\n\n", pos) 311 + fmt.Printf(" DID: %s\n", op.DID) 312 + fmt.Printf(" Created: %s\n", op.CreatedAt.Format("2006-01-02 15:04:05")) 313 + 314 + if op.IsNullified() { 315 + fmt.Printf(" Status: ✗ Nullified") 316 + if nullCID := op.GetNullifyingCID(); nullCID != "" { 317 + fmt.Printf(" by %s", nullCID) 318 + } 319 + fmt.Printf("\n") 320 + } else { 321 + fmt.Printf(" Status: ✓ Active\n") 322 + } 323 + 324 + return nil 325 + } 326 + } 327 + } 328 + 329 + // Search bundles 330 + index := mgr.GetIndex() 331 + bundles := index.GetBundles() 332 + 333 + if len(bundles) == 0 { 334 + fmt.Fprintf(os.Stderr, "No bundles to search\n") 335 + return nil 336 + } 337 + 338 + fmt.Fprintf(os.Stderr, "Searching %d bundles for CID: %s\n\n", len(bundles), cid) 339 + 340 + for _, meta := range bundles { 341 + bundle, err := mgr.LoadBundle(ctx, meta.BundleNumber) 342 + if err != nil { 343 + continue 344 + } 345 + 346 + for pos, op := range bundle.Operations { 347 + if op.CID == cid { 348 + globalPos := (meta.BundleNumber * types.BUNDLE_SIZE) + pos 349 + 350 + fmt.Printf("Found: bundle %06d, position %d\n", meta.BundleNumber, pos) 351 + fmt.Printf("Global position: %d\n\n", globalPos) 352 + 353 + fmt.Printf(" DID: %s\n", op.DID) 354 + fmt.Printf(" Created: %s\n", op.CreatedAt.Format("2006-01-02 15:04:05")) 355 + 356 + if op.IsNullified() { 357 + fmt.Printf(" Status: ✗ Nullified") 358 + if nullCID := op.GetNullifyingCID(); nullCID != "" { 359 + fmt.Printf(" by %s", nullCID) 360 + } 361 + fmt.Printf("\n") 362 + } else { 363 + fmt.Printf(" Status: ✓ Active\n") 364 + } 365 + 366 + return nil 367 + } 368 + } 369 + 370 + // Progress indicator 371 + if meta.BundleNumber%100 == 0 { 372 + fmt.Fprintf(os.Stderr, "Searched through bundle %06d...\r", meta.BundleNumber) 373 + } 374 + } 375 + 376 + fmt.Fprintf(os.Stderr, "\nCID not found: %s\n", cid) 377 + fmt.Fprintf(os.Stderr, "(Searched %d bundles + mempool)\n", len(bundles)) 378 + return fmt.Errorf("CID not found") 379 + } 380 + 381 + // displayOperationWithTiming shows formatted operation details with timing 382 + func displayOperationWithTiming(bundleNum, position int, op *plcclient.PLCOperation, 383 + opData map[string]interface{}, _ error, 384 + loadDuration, parseDuration time.Duration, verbose bool) error { 385 + 255 386 globalPos := (bundleNum * types.BUNDLE_SIZE) + position 256 387 257 - fmt.Printf("Operation %d\n", globalPos) 388 + fmt.Printf("═══════════════════════════════════════════════════════════════\n") 389 + fmt.Printf(" Operation %d\n", globalPos) 258 390 fmt.Printf("═══════════════════════════════════════════════════════════════\n\n") 259 391 260 392 fmt.Printf("Location\n") ··· 285 417 fmt.Printf("──────\n") 286 418 fmt.Printf(" %s\n\n", status) 287 419 420 + // ✅ Performance metrics (always shown if verbose) 421 + if verbose { 422 + totalTime := loadDuration + parseDuration 423 + 424 + fmt.Printf("Performance\n") 425 + fmt.Printf("───────────\n") 426 + fmt.Printf(" Load time: %s\n", loadDuration) 427 + fmt.Printf(" Parse time: %s\n", parseDuration) 428 + fmt.Printf(" Total time: %s\n", totalTime) 429 + 430 + if len(op.RawJSON) > 0 { 431 + fmt.Printf(" Data size: %d bytes\n", len(op.RawJSON)) 432 + mbPerSec := float64(len(op.RawJSON)) / loadDuration.Seconds() / (1024 * 1024) 433 + fmt.Printf(" Load speed: %.2f MB/s\n", mbPerSec) 434 + } 435 + 436 + fmt.Printf("\n") 437 + } 438 + 288 439 // Parse operation details 289 - if opData, err := op.GetOperationData(); err == nil && opData != nil && !op.IsNullified() { 440 + if opData != nil && !op.IsNullified() { 290 441 fmt.Printf("Details\n") 291 442 fmt.Printf("───────\n") 292 443 ··· 339 490 340 491 return nil 341 492 } 342 - 343 - // findOperationByCID searches for an operation by CID 344 - func findOperationByCID(mgr BundleManager, cid string) error { 345 - ctx := context.Background() 346 - 347 - // ✨ CHECK MEMPOOL FIRST (most recent data) 348 - fmt.Fprintf(os.Stderr, "Checking mempool...\n") 349 - mempoolOps, err := mgr.GetMempoolOperations() 350 - if err == nil && len(mempoolOps) > 0 { 351 - for pos, op := range mempoolOps { 352 - if op.CID == cid { 353 - fmt.Printf("Found in mempool: position %d\n\n", pos) 354 - fmt.Printf(" DID: %s\n", op.DID) 355 - fmt.Printf(" Created: %s\n", op.CreatedAt.Format("2006-01-02 15:04:05")) 356 - 357 - if op.IsNullified() { 358 - fmt.Printf(" Status: ✗ Nullified") 359 - if nullCID := op.GetNullifyingCID(); nullCID != "" { 360 - fmt.Printf(" by %s", nullCID) 361 - } 362 - fmt.Printf("\n") 363 - } else { 364 - fmt.Printf(" Status: ✓ Active\n") 365 - } 366 - 367 - return nil 368 - } 369 - } 370 - } 371 - 372 - // Search bundles 373 - index := mgr.GetIndex() 374 - bundles := index.GetBundles() 375 - 376 - if len(bundles) == 0 { 377 - fmt.Fprintf(os.Stderr, "No bundles to search\n") 378 - return nil 379 - } 380 - 381 - fmt.Fprintf(os.Stderr, "Searching %d bundles for CID: %s\n\n", len(bundles), cid) 382 - 383 - for _, meta := range bundles { 384 - bundle, err := mgr.LoadBundle(ctx, meta.BundleNumber) 385 - if err != nil { 386 - continue 387 - } 388 - 389 - for pos, op := range bundle.Operations { 390 - if op.CID == cid { 391 - globalPos := (meta.BundleNumber * types.BUNDLE_SIZE) + pos 392 - 393 - fmt.Printf("Found: bundle %06d, position %d\n", meta.BundleNumber, pos) 394 - fmt.Printf("Global position: %d\n\n", globalPos) 395 - 396 - fmt.Printf(" DID: %s\n", op.DID) 397 - fmt.Printf(" Created: %s\n", op.CreatedAt.Format("2006-01-02 15:04:05")) 398 - 399 - if op.IsNullified() { 400 - fmt.Printf(" Status: ✗ Nullified") 401 - if nullCID := op.GetNullifyingCID(); nullCID != "" { 402 - fmt.Printf(" by %s", nullCID) 403 - } 404 - fmt.Printf("\n") 405 - } else { 406 - fmt.Printf(" Status: ✓ Active\n") 407 - } 408 - 409 - return nil 410 - } 411 - } 412 - 413 - // Progress indicator 414 - if meta.BundleNumber%100 == 0 { 415 - fmt.Fprintf(os.Stderr, "Searched through bundle %06d...\r", meta.BundleNumber) 416 - } 417 - } 418 - 419 - fmt.Fprintf(os.Stderr, "\nCID not found: %s\n", cid) 420 - fmt.Fprintf(os.Stderr, "(Searched %d bundles + mempool)\n", len(bundles)) 421 - return fmt.Errorf("CID not found") 422 - }
+2 -2
cmd/plcbundle/main.go
··· 58 58 //cmd.AddCommand(commands.NewGapsCommand()) 59 59 cmd.AddCommand(commands.NewVerifyCommand()) 60 60 cmd.AddCommand(commands.NewDiffCommand()) 61 - /*cmd.AddCommand(commands.NewStatsCommand()) 62 - cmd.AddCommand(commands.NewInspectCommand())*/ 61 + //cmd.AddCommand(commands.NewStatsCommand()) 62 + cmd.AddCommand(commands.NewInspectCommand()) 63 63 64 64 // Namespaced commands 65 65 cmd.AddCommand(commands.NewDIDCommand())
+75 -8
internal/storage/storage.go
··· 86 86 // FILE OPERATIONS (using zstd abstraction) 87 87 // ======================================== 88 88 89 - // SaveBundle saves operations to disk (compressed with multi-frame support) 89 + // SaveBundle saves operations to disk with embedded metadata 90 90 func (op *Operations) SaveBundle(path string, operations []plcclient.PLCOperation) (string, string, int64, int64, error) { 91 91 // 1. Serialize all operations once 92 92 jsonlData := op.SerializeJSONL(operations) ··· 100 100 } 101 101 defer bundleFile.Close() 102 102 103 - frameOffsets := []int64{0} 103 + // ✅ 3. Write metadata as skippable frame FIRST (placeholder - will update later) 104 + // We write a placeholder now and update after we know the compressed size 105 + placeholderMeta := &BundleMetadata{ 106 + Version: 1, 107 + BundleNumber: 0, // Will be set by caller 108 + UncompressedSize: contentSize, 109 + OperationCount: len(operations), 110 + FrameCount: (len(operations) + FrameSize - 1) / FrameSize, 111 + CreatedAt: time.Now().UTC(), 112 + } 104 113 105 - // 3. Loop through operations in chunks 114 + metadataStart, err := WriteMetadataFrame(bundleFile, placeholderMeta) 115 + if err != nil { 116 + return "", "", 0, 0, fmt.Errorf("failed to write metadata frame: %w", err) 117 + } 118 + 119 + frameOffsets := []int64{metadataStart} // First data frame starts after metadata 120 + 121 + // 4. Write data frames 106 122 for i := 0; i < len(operations); i += FrameSize { 107 123 end := i + FrameSize 108 124 if end > len(operations) { ··· 111 127 opChunk := operations[i:end] 112 128 chunkJsonlData := op.SerializeJSONL(opChunk) 113 129 114 - // ✅ Use abstracted compression 130 + // Compress this chunk 115 131 compressedChunk, err := CompressFrame(chunkJsonlData) 116 132 if err != nil { 117 133 return "", "", 0, 0, fmt.Errorf("failed to compress frame: %w", err) ··· 134 150 } 135 151 } 136 152 137 - // 4. Get final file size 153 + // 5. Get final file size 138 154 finalSize, _ := bundleFile.Seek(0, io.SeekCurrent) 139 155 frameOffsets = append(frameOffsets, finalSize) 140 156 141 - // 5. Sync to disk 157 + // 6. Sync to disk 142 158 if err := bundleFile.Sync(); err != nil { 143 159 return "", "", 0, 0, fmt.Errorf("failed to sync file: %w", err) 144 160 } 145 161 146 - // 6. Save frame index 162 + // 7. Save frame index (still useful for random access) 147 163 indexPath := path + ".idx" 148 164 indexData, _ := json.Marshal(frameOffsets) 149 165 if err := os.WriteFile(indexPath, indexData, 0644); err != nil { ··· 151 167 return "", "", 0, 0, fmt.Errorf("failed to write frame index: %w", err) 152 168 } 153 169 154 - // 7. Calculate compressed hash 170 + // 8. Calculate compressed hash 155 171 compressedData, err := os.ReadFile(path) 156 172 if err != nil { 157 173 return "", "", 0, 0, fmt.Errorf("failed to re-read bundle for hashing: %w", err) ··· 640 656 641 657 return lineNum, len(didSet), startTime, endTime, scanner.Err() 642 658 } 659 + 660 + // ExtractBundleMetadata extracts metadata from bundle file without decompressing 661 + func (op *Operations) ExtractBundleMetadata(path string) (*BundleMetadata, error) { 662 + meta, err := ExtractMetadataFromFile(path) 663 + if err != nil { 664 + return nil, fmt.Errorf("failed to extract metadata: %w", err) 665 + } 666 + return meta, nil 667 + } 668 + 669 + // LoadBundleWithMetadata loads bundle and returns both data and embedded metadata 670 + func (op *Operations) LoadBundleWithMetadata(path string) ([]plcclient.PLCOperation, *BundleMetadata, error) { 671 + file, err := os.Open(path) 672 + if err != nil { 673 + return nil, nil, fmt.Errorf("failed to open file: %w", err) 674 + } 675 + defer file.Close() 676 + 677 + // 1. Try to read metadata frame first 678 + meta, err := ReadMetadataFrame(file) 679 + if err != nil { 680 + // No metadata frame - fall back to regular load 681 + file.Seek(0, io.SeekStart) // Reset to beginning 682 + ops, err := op.loadFromReader(file) 683 + return ops, nil, err 684 + } 685 + 686 + // 2. Read compressed data (file position is now after metadata frame) 687 + ops, err := op.loadFromReader(file) 688 + if err != nil { 689 + return nil, nil, err 690 + } 691 + 692 + return ops, meta, nil 693 + } 694 + 695 + // loadFromReader loads operations from a reader (internal helper) 696 + func (op *Operations) loadFromReader(r io.Reader) ([]plcclient.PLCOperation, error) { 697 + reader, err := NewStreamingReader(r) 698 + if err != nil { 699 + return nil, fmt.Errorf("failed to create reader: %w", err) 700 + } 701 + defer reader.Release() 702 + 703 + decompressed, err := io.ReadAll(reader) 704 + if err != nil { 705 + return nil, fmt.Errorf("failed to decompress: %w", err) 706 + } 707 + 708 + return op.ParseJSONL(decompressed) 709 + }
+156 -15
internal/storage/zstd.go
··· 1 1 package storage 2 2 3 3 import ( 4 + "encoding/binary" 5 + "encoding/json" 4 6 "fmt" 5 7 "io" 8 + "os" 9 + "time" 6 10 7 11 "github.com/valyala/gozstd" 8 12 ) ··· 10 14 // ============================================================================ 11 15 // ZSTD COMPRESSION ABSTRACTION LAYER 12 16 // ============================================================================ 13 - // This file provides a clean interface for zstd operations. 14 - // Swap implementations by changing the functions in this file. 15 17 16 18 const ( 17 - // CompressionLevel is the default compression level 18 - CompressionLevel = 2 // Default from zstd 19 + CompressionLevel = 3 20 + FrameSize = 100 19 21 20 - // FrameSize is the number of operations per frame 21 - FrameSize = 100 22 + // Skippable frame magic numbers (0x184D2A50 to 0x184D2A5F) 23 + // We use 0x184D2A50 for bundle metadata 24 + SkippableMagicMetadata = 0x184D2A50 22 25 ) 23 26 27 + // BundleMetadata is stored in skippable frame at start of bundle file 28 + type BundleMetadata struct { 29 + Version int `json:"version"` // Metadata format version 30 + BundleNumber int `json:"bundle_number"` 31 + Origin string `json:"origin,omitempty"` 32 + 33 + // Hashes 34 + ContentHash string `json:"content_hash"` 35 + CompressedHash string `json:"compressed_hash"` 36 + ParentHash string `json:"parent_hash,omitempty"` 37 + 38 + // Sizes 39 + UncompressedSize int64 `json:"uncompressed_size"` 40 + CompressedSize int64 `json:"compressed_size"` 41 + 42 + // Timestamps 43 + StartTime time.Time `json:"start_time"` 44 + EndTime time.Time `json:"end_time"` 45 + CreatedAt time.Time `json:"created_at"` 46 + 47 + // Counts 48 + OperationCount int `json:"operation_count"` 49 + DIDCount int `json:"did_count"` 50 + FrameCount int `json:"frame_count"` 51 + 52 + // Additional info 53 + Cursor string `json:"cursor,omitempty"` 54 + } 55 + 56 + // ============================================================================ 57 + // SKIPPABLE FRAME FUNCTIONS 58 + // ============================================================================ 59 + 60 + // WriteSkippableFrame writes a skippable frame with the given data 61 + // Returns the number of bytes written 62 + func WriteSkippableFrame(w io.Writer, magicNumber uint32, data []byte) (int64, error) { 63 + // Skippable frame format: 64 + // [4 bytes] Magic Number (0x184D2A5X) 65 + // [4 bytes] Frame Size (little-endian uint32) 66 + // [N bytes] Frame Data 67 + 68 + frameSize := uint32(len(data)) 69 + 70 + // Write magic number 71 + if err := binary.Write(w, binary.LittleEndian, magicNumber); err != nil { 72 + return 0, err 73 + } 74 + 75 + // Write frame size 76 + if err := binary.Write(w, binary.LittleEndian, frameSize); err != nil { 77 + return 0, err 78 + } 79 + 80 + // Write data 81 + n, err := w.Write(data) 82 + if err != nil { 83 + return 0, err 84 + } 85 + 86 + totalBytes := int64(4 + 4 + n) // magic + size + data 87 + return totalBytes, nil 88 + } 89 + 90 + // ReadSkippableFrame reads a skippable frame from the reader 91 + // Returns the magic number and data, or error if not a skippable frame 92 + func ReadSkippableFrame(r io.Reader) (uint32, []byte, error) { 93 + // Read magic number 94 + var magic uint32 95 + if err := binary.Read(r, binary.LittleEndian, &magic); err != nil { 96 + return 0, nil, err 97 + } 98 + 99 + // Verify it's a skippable frame (0x184D2A50 to 0x184D2A5F) 100 + if magic < 0x184D2A50 || magic > 0x184D2A5F { 101 + return 0, nil, fmt.Errorf("not a skippable frame: magic=0x%08X", magic) 102 + } 103 + 104 + // Read frame size 105 + var frameSize uint32 106 + if err := binary.Read(r, binary.LittleEndian, &frameSize); err != nil { 107 + return 0, nil, err 108 + } 109 + 110 + // Read frame data 111 + data := make([]byte, frameSize) 112 + if _, err := io.ReadFull(r, data); err != nil { 113 + return 0, nil, err 114 + } 115 + 116 + return magic, data, nil 117 + } 118 + 119 + // WriteMetadataFrame writes bundle metadata as a skippable frame 120 + func WriteMetadataFrame(w io.Writer, meta *BundleMetadata) (int64, error) { 121 + // Serialize metadata to JSON 122 + jsonData, err := json.Marshal(meta) 123 + if err != nil { 124 + return 0, fmt.Errorf("failed to marshal metadata: %w", err) 125 + } 126 + 127 + // Write as skippable frame 128 + return WriteSkippableFrame(w, SkippableMagicMetadata, jsonData) 129 + } 130 + 131 + // ReadMetadataFrame reads bundle metadata from skippable frame 132 + func ReadMetadataFrame(r io.Reader) (*BundleMetadata, error) { 133 + magic, data, err := ReadSkippableFrame(r) 134 + if err != nil { 135 + return nil, err 136 + } 137 + 138 + if magic != SkippableMagicMetadata { 139 + return nil, fmt.Errorf("unexpected skippable frame magic: 0x%08X (expected 0x%08X)", 140 + magic, SkippableMagicMetadata) 141 + } 142 + 143 + var meta BundleMetadata 144 + if err := json.Unmarshal(data, &meta); err != nil { 145 + return nil, fmt.Errorf("failed to unmarshal metadata: %w", err) 146 + } 147 + 148 + return &meta, nil 149 + } 150 + 151 + // ExtractMetadataFromFile reads just the metadata without decompressing the bundle 152 + func ExtractMetadataFromFile(path string) (*BundleMetadata, error) { 153 + file, err := os.Open(path) 154 + if err != nil { 155 + return nil, err 156 + } 157 + defer file.Close() 158 + 159 + // Try to read skippable frame at start 160 + meta, err := ReadMetadataFrame(file) 161 + if err != nil { 162 + return nil, fmt.Errorf("no metadata frame found: %w", err) 163 + } 164 + 165 + return meta, nil 166 + } 167 + 168 + // ============================================================================ 169 + // COMPRESSION/DECOMPRESSION 170 + // ============================================================================ 171 + 24 172 // CompressFrame compresses a single chunk of data into a zstd frame 25 - // with proper content size headers for multi-frame concatenation 26 173 func CompressFrame(data []byte) ([]byte, error) { 27 - // ✅ valyala/gozstd.Compress creates proper frames with content size 28 174 compressed := gozstd.Compress(nil, data) 29 175 return compressed, nil 30 176 } 31 177 32 178 // DecompressAll decompresses all frames in the compressed data 33 179 func DecompressAll(compressed []byte) ([]byte, error) { 34 - // ✅ valyala/gozstd.Decompress handles multi-frame 35 180 decompressed, err := gozstd.Decompress(nil, compressed) 36 181 if err != nil { 37 182 return nil, fmt.Errorf("decompression failed: %w", err) ··· 45 190 } 46 191 47 192 // NewStreamingReader creates a streaming decompressor 48 - // Returns a reader that must be released with Release() 49 193 func NewStreamingReader(r io.Reader) (StreamReader, error) { 50 194 reader := gozstd.NewReader(r) 51 195 return &gozstdReader{reader: reader}, nil 52 196 } 53 197 54 198 // NewStreamingWriter creates a streaming compressor at default level 55 - // Returns a writer that must be closed with Close() then released with Release() 56 199 func NewStreamingWriter(w io.Writer) (StreamWriter, error) { 57 200 writer := gozstd.NewWriterLevel(w, CompressionLevel) 58 201 return &gozstdWriter{writer: writer}, nil 59 202 } 60 203 61 204 // ============================================================================ 62 - // INTERFACES (for abstraction) 205 + // INTERFACES 63 206 // ============================================================================ 64 207 65 - // StreamReader is a streaming decompression reader 66 208 type StreamReader interface { 67 209 io.Reader 68 210 io.WriterTo 69 211 Release() 70 212 } 71 213 72 - // StreamWriter is a streaming compression writer 73 214 type StreamWriter interface { 74 215 io.Writer 75 216 io.Closer ··· 78 219 } 79 220 80 221 // ============================================================================ 81 - // WRAPPER TYPES (valyala/gozstd specific) 222 + // WRAPPER TYPES 82 223 // ============================================================================ 83 224 84 225 type gozstdReader struct {