[DEPRECATED] Go implementation of plcbundle
at rust-test 878 lines 24 kB view raw
1package storage_test 2 3import ( 4 "bufio" 5 "bytes" 6 "fmt" 7 "os" 8 "path/filepath" 9 "sync" 10 "testing" 11 "time" 12 13 "tangled.org/atscan.net/plcbundle/internal/plcclient" 14 "tangled.org/atscan.net/plcbundle/internal/storage" 15) 16 17type testLogger struct { 18 t *testing.T 19} 20 21func (l *testLogger) Printf(format string, v ...interface{}) { 22 l.t.Logf(format, v...) 23} 24 25func (l *testLogger) Println(v ...interface{}) { 26 l.t.Log(v...) 27} 28 29var ( 30 bundleInfo = &storage.BundleInfo{ 31 BundleNumber: 1, 32 Origin: "test-origin", 33 ParentHash: "", 34 Cursor: "", 35 CreatedBy: "test", 36 Hostname: "test-host", 37 } 38) 39 40// ==================================================================================== 41// COMPRESSION TESTS 42// ==================================================================================== 43 44func TestStorageCompression(t *testing.T) { 45 tmpDir := t.TempDir() 46 logger := &testLogger{t: t} 47 ops, err := storage.NewOperations(logger, false) 48 if err != nil { 49 t.Fatalf("NewOperations failed: %v", err) 50 } 51 defer ops.Close() 52 53 t.Run("RoundTripCompression", func(t *testing.T) { 54 tests := []struct { 55 name string 56 count int 57 }{ 58 {"Empty", 0}, 59 {"Single", 1}, 60 {"Small", 10}, 61 {"Medium", 100}, 62 {"Large", 1000}, 63 {"FullBundle", 10000}, 64 } 65 66 for _, tt := range tests { 67 t.Run(tt.name, func(t *testing.T) { 68 if tt.count == 0 { 69 return // Skip empty for now 70 } 71 72 original := makeTestOperations(tt.count) 73 path := filepath.Join(tmpDir, tt.name+".jsonl.zst") 74 75 // Save 76 _, _, _, _, err := ops.SaveBundle(path, original, bundleInfo) 77 if err != nil { 78 t.Fatalf("SaveBundle failed: %v", err) 79 } 80 81 // Load 82 loaded, err := ops.LoadBundle(path) 83 if err != nil { 84 t.Fatalf("LoadBundle failed: %v", err) 85 } 86 87 // Verify count 88 if len(loaded) != len(original) { 89 t.Errorf("count mismatch: got %d, want %d", len(loaded), len(original)) 90 } 91 92 // Verify each operation 93 for i := range original { 94 if loaded[i].DID != original[i].DID { 95 t.Errorf("op %d DID mismatch: got %s, want %s", i, loaded[i].DID, original[i].DID) 96 } 97 if loaded[i].CID != original[i].CID { 98 t.Errorf("op %d CID mismatch: got %s, want %s", i, loaded[i].CID, original[i].CID) 99 } 100 if !loaded[i].CreatedAt.Equal(original[i].CreatedAt) { 101 t.Errorf("op %d timestamp mismatch", i) 102 } 103 } 104 }) 105 } 106 }) 107 108 t.Run("CompressionRatio", func(t *testing.T) { 109 operations := makeTestOperations(10000) 110 path := filepath.Join(tmpDir, "compression_test.jsonl.zst") 111 112 _, _, uncompSize, compSize, err := ops.SaveBundle(path, operations, bundleInfo) 113 if err != nil { 114 t.Fatalf("SaveBundle failed: %v", err) 115 } 116 117 if compSize >= uncompSize { 118 t.Errorf("compression failed: compressed=%d >= uncompressed=%d", compSize, uncompSize) 119 } 120 121 ratio := float64(uncompSize) / float64(compSize) 122 if ratio < 2.0 { 123 t.Errorf("poor compression ratio: %.2fx (expected > 2.0x)", ratio) 124 } 125 126 t.Logf("Compression ratio: %.2fx (%d → %d bytes)", ratio, uncompSize, compSize) 127 }) 128 129 t.Run("CompressedDataIntegrity", func(t *testing.T) { 130 operations := makeTestOperations(100) 131 path := filepath.Join(tmpDir, "integrity_test.jsonl.zst") 132 133 contentHash, compHash, _, _, err := ops.SaveBundle(path, operations, bundleInfo) 134 if err != nil { 135 t.Fatalf("SaveBundle failed: %v", err) 136 } 137 138 // Recalculate hashes 139 calcCompHash, _, calcContentHash, _, err := ops.CalculateFileHashes(path) 140 if err != nil { 141 t.Fatalf("CalculateFileHashes failed: %v", err) 142 } 143 144 if calcCompHash != compHash { 145 t.Errorf("compressed hash mismatch: got %s, want %s", calcCompHash, compHash) 146 } 147 148 if calcContentHash != contentHash { 149 t.Errorf("content hash mismatch: got %s, want %s", calcContentHash, contentHash) 150 } 151 }) 152} 153 154// ==================================================================================== 155// HASHING TESTS - CRITICAL FOR CHAIN INTEGRITY 156// ==================================================================================== 157 158func TestStorageHashing(t *testing.T) { 159 logger := &testLogger{t: t} 160 ops, err := storage.NewOperations(logger, false) 161 if err != nil { 162 t.Fatalf("NewOperations failed: %v", err) 163 } 164 defer ops.Close() 165 166 t.Run("HashDeterminism", func(t *testing.T) { 167 data := []byte("test data for hashing") 168 169 // Calculate hash multiple times 170 hashes := make([]string, 100) 171 for i := 0; i < 100; i++ { 172 hashes[i] = ops.Hash(data) 173 } 174 175 // All should be identical 176 firstHash := hashes[0] 177 for i, h := range hashes { 178 if h != firstHash { 179 t.Errorf("hash %d differs: got %s, want %s", i, h, firstHash) 180 } 181 } 182 183 // Verify it's actually a valid SHA256 hex (64 chars) 184 if len(firstHash) != 64 { 185 t.Errorf("invalid hash length: got %d, want 64", len(firstHash)) 186 } 187 }) 188 189 t.Run("ChainHashCalculation", func(t *testing.T) { 190 contentHash := "abc123def456" 191 192 // Genesis bundle (no parent) 193 genesisHash := ops.CalculateChainHash("", contentHash) 194 expectedGenesis := ops.Hash([]byte("plcbundle:genesis:" + contentHash)) 195 if genesisHash != expectedGenesis { 196 t.Errorf("genesis hash mismatch: got %s, want %s", genesisHash, expectedGenesis) 197 } 198 199 // Second bundle (has parent) 200 parentHash := genesisHash 201 childHash := ops.CalculateChainHash(parentHash, contentHash) 202 expectedChild := ops.Hash([]byte(parentHash + ":" + contentHash)) 203 if childHash != expectedChild { 204 t.Errorf("child hash mismatch: got %s, want %s", childHash, expectedChild) 205 } 206 207 // Chain continues 208 grandchildHash := ops.CalculateChainHash(childHash, contentHash) 209 expectedGrandchild := ops.Hash([]byte(childHash + ":" + contentHash)) 210 if grandchildHash != expectedGrandchild { 211 t.Errorf("grandchild hash mismatch") 212 } 213 }) 214 215 t.Run("HashSensitivity", func(t *testing.T) { 216 // Small changes should produce completely different hashes 217 data1 := []byte("test data") 218 data2 := []byte("test datb") // Changed one char 219 data3 := []byte("test data ") // Added space 220 221 hash1 := ops.Hash(data1) 222 hash2 := ops.Hash(data2) 223 hash3 := ops.Hash(data3) 224 225 if hash1 == hash2 { 226 t.Error("different data produced same hash (collision!)") 227 } 228 if hash1 == hash3 { 229 t.Error("different data produced same hash (collision!)") 230 } 231 }) 232 233 t.Run("EmptyDataHash", func(t *testing.T) { 234 hash := ops.Hash([]byte{}) 235 if len(hash) != 64 { 236 t.Errorf("empty data hash invalid length: %d", len(hash)) 237 } 238 // SHA256 of empty string is known constant 239 // e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855 240 expected := "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" 241 if hash != expected { 242 t.Errorf("empty data hash mismatch: got %s, want %s", hash, expected) 243 } 244 }) 245} 246 247// ==================================================================================== 248// CONCURRENCY TESTS - CRITICAL FOR PRODUCTION 249// ==================================================================================== 250 251func TestStorageConcurrency(t *testing.T) { 252 tmpDir := t.TempDir() 253 logger := &testLogger{t: t} 254 ops, err := storage.NewOperations(logger, false) 255 if err != nil { 256 t.Fatalf("NewOperations failed: %v", err) 257 } 258 defer ops.Close() 259 260 t.Run("ParallelBundleReads", func(t *testing.T) { 261 // Create test bundle 262 operations := makeTestOperations(10000) 263 path := filepath.Join(tmpDir, "parallel_test.jsonl.zst") 264 _, _, _, _, err := ops.SaveBundle(path, operations, bundleInfo) 265 if err != nil { 266 t.Fatalf("SaveBundle failed: %v", err) 267 } 268 269 // Read from 100 goroutines simultaneously 270 var wg sync.WaitGroup 271 errors := make(chan error, 100) 272 273 for i := 0; i < 100; i++ { 274 wg.Add(1) 275 go func(id int) { 276 defer wg.Done() 277 loaded, err := ops.LoadBundle(path) 278 if err != nil { 279 errors <- err 280 return 281 } 282 if len(loaded) != 10000 { 283 errors <- err 284 } 285 }(i) 286 } 287 288 wg.Wait() 289 close(errors) 290 291 for err := range errors { 292 t.Errorf("concurrent read error: %v", err) 293 } 294 }) 295 296 t.Run("LoadOperationAtPositionConcurrency", func(t *testing.T) { 297 // Critical test - this is heavily used by DID lookups 298 operations := makeTestOperations(10000) 299 path := filepath.Join(tmpDir, "position_test.jsonl.zst") 300 _, _, _, _, err := ops.SaveBundle(path, operations, bundleInfo) 301 if err != nil { 302 t.Fatalf("SaveBundle failed: %v", err) 303 } 304 305 // 200 concurrent position reads 306 var wg sync.WaitGroup 307 errors := make(chan error, 200) 308 309 for i := 0; i < 200; i++ { 310 wg.Add(1) 311 go func(position int) { 312 defer wg.Done() 313 op, err := ops.LoadOperationAtPosition(path, position%10000) 314 if err != nil { 315 errors <- err 316 return 317 } 318 if op == nil { 319 errors <- err 320 } 321 }(i) 322 } 323 324 wg.Wait() 325 close(errors) 326 327 for err := range errors { 328 t.Errorf("concurrent position read error: %v", err) 329 } 330 }) 331 332 t.Run("ConcurrentHashVerification", func(t *testing.T) { 333 operations := makeTestOperations(1000) 334 path := filepath.Join(tmpDir, "verify_test.jsonl.zst") 335 _, compHash, _, _, err := ops.SaveBundle(path, operations, bundleInfo) 336 if err != nil { 337 t.Fatalf("SaveBundle failed: %v", err) 338 } 339 340 var wg sync.WaitGroup 341 for i := 0; i < 50; i++ { 342 wg.Add(1) 343 go func() { 344 defer wg.Done() 345 valid, _, err := ops.VerifyHash(path, compHash) 346 if err != nil { 347 t.Errorf("VerifyHash failed: %v", err) 348 } 349 if !valid { 350 t.Error("hash verification failed") 351 } 352 }() 353 } 354 wg.Wait() 355 }) 356} 357 358// ==================================================================================== 359// EDGE CASES & ERROR HANDLING 360// ==================================================================================== 361 362func TestStorageEdgeCases(t *testing.T) { 363 tmpDir := t.TempDir() 364 logger := &testLogger{t: t} 365 ops, err := storage.NewOperations(logger, false) 366 if err != nil { 367 t.Fatalf("NewOperations failed: %v", err) 368 } 369 defer ops.Close() 370 371 t.Run("CorruptedZstdFile", func(t *testing.T) { 372 path := filepath.Join(tmpDir, "corrupted.jsonl.zst") 373 // Write invalid zstd data 374 os.WriteFile(path, []byte("this is not valid zstd data"), 0644) 375 376 _, err := ops.LoadBundle(path) 377 if err == nil { 378 t.Error("expected error loading corrupted file, got nil") 379 } 380 }) 381 382 t.Run("TruncatedFile", func(t *testing.T) { 383 operations := makeTestOperations(100) 384 path := filepath.Join(tmpDir, "truncated.jsonl.zst") 385 ops.SaveBundle(path, operations, bundleInfo) 386 387 // Read and truncate 388 data, _ := os.ReadFile(path) 389 os.WriteFile(path, data[:len(data)/2], 0644) 390 391 _, err := ops.LoadBundle(path) 392 if err == nil { 393 t.Error("expected error loading truncated file, got nil") 394 } 395 }) 396 397 t.Run("InvalidJSONL", func(t *testing.T) { 398 path := filepath.Join(tmpDir, "invalid.jsonl.zst") 399 invalidData := []byte("{invalid json}\n{also invalid}") 400 401 // Manually compress invalid data 402 operations := makeTestOperations(10) 403 ops.SaveBundle(path, operations, bundleInfo) // Create valid file first 404 405 // Now corrupt it with invalid JSON 406 // This is hard to test properly since SaveBundle enforces valid data 407 // Better to test ParseJSONL directly 408 _, err := ops.ParseJSONL(invalidData) 409 if err == nil { 410 t.Error("expected error parsing invalid JSONL, got nil") 411 } 412 }) 413 414 t.Run("NonExistentFile", func(t *testing.T) { 415 _, err := ops.LoadBundle("/nonexistent/path/file.jsonl.zst") 416 if err == nil { 417 t.Error("expected error loading nonexistent file, got nil") 418 } 419 }) 420 421 t.Run("InvalidPosition", func(t *testing.T) { 422 operations := makeTestOperations(100) 423 path := filepath.Join(tmpDir, "position_test.jsonl.zst") 424 ops.SaveBundle(path, operations, bundleInfo) 425 426 // Negative position 427 _, err := ops.LoadOperationAtPosition(path, -1) 428 if err == nil { 429 t.Error("expected error for negative position") 430 } 431 432 // Position beyond file 433 _, err = ops.LoadOperationAtPosition(path, 10000) 434 if err == nil { 435 t.Error("expected error for position beyond file") 436 } 437 }) 438} 439 440// ==================================================================================== 441// BOUNDARY CONDITIONS - CRITICAL FOR BUNDLE CHAINING 442// ==================================================================================== 443 444func TestStorageBoundaryConditions(t *testing.T) { 445 logger := &testLogger{t: t} 446 ops, err := storage.NewOperations(logger, false) 447 if err != nil { 448 t.Fatalf("NewOperations failed: %v", err) 449 } 450 defer ops.Close() 451 452 t.Run("GetBoundaryCIDs_SingleOperation", func(t *testing.T) { 453 baseTime := time.Now() 454 operations := []plcclient.PLCOperation{ 455 {CID: "cid1", CreatedAt: baseTime}, 456 } 457 458 boundaryTime, cids := ops.GetBoundaryCIDs(operations) 459 460 if !boundaryTime.Equal(baseTime) { 461 t.Error("boundary time mismatch") 462 } 463 if len(cids) != 1 { 464 t.Errorf("expected 1 boundary CID, got %d", len(cids)) 465 } 466 if !cids["cid1"] { 467 t.Error("expected cid1 in boundary set") 468 } 469 }) 470 471 t.Run("GetBoundaryCIDs_MultipleSameTimestamp", func(t *testing.T) { 472 // CRITICAL: Operations with identical timestamps (happens in real data) 473 baseTime := time.Now() 474 operations := []plcclient.PLCOperation{ 475 {CID: "cid1", CreatedAt: baseTime.Add(-2 * time.Second)}, 476 {CID: "cid2", CreatedAt: baseTime.Add(-1 * time.Second)}, 477 {CID: "cid3", CreatedAt: baseTime}, // Last timestamp 478 {CID: "cid4", CreatedAt: baseTime}, // Same as cid3 479 {CID: "cid5", CreatedAt: baseTime}, // Same as cid3 480 } 481 482 boundaryTime, cids := ops.GetBoundaryCIDs(operations) 483 484 if !boundaryTime.Equal(baseTime) { 485 t.Error("boundary time should be last operation time") 486 } 487 488 // Should return ALL CIDs with the last timestamp 489 if len(cids) != 3 { 490 t.Errorf("expected 3 boundary CIDs, got %d", len(cids)) 491 } 492 493 for _, expectedCID := range []string{"cid3", "cid4", "cid5"} { 494 if !cids[expectedCID] { 495 t.Errorf("expected %s in boundary set", expectedCID) 496 } 497 } 498 499 // Earlier CIDs should NOT be in set 500 if cids["cid1"] || cids["cid2"] { 501 t.Error("earlier CIDs should not be in boundary set") 502 } 503 }) 504 505 t.Run("GetBoundaryCIDs_AllSameTimestamp", func(t *testing.T) { 506 baseTime := time.Now() 507 operations := []plcclient.PLCOperation{ 508 {CID: "cid1", CreatedAt: baseTime}, 509 {CID: "cid2", CreatedAt: baseTime}, 510 {CID: "cid3", CreatedAt: baseTime}, 511 } 512 513 _, cids := ops.GetBoundaryCIDs(operations) 514 515 if len(cids) != 3 { 516 t.Errorf("expected all 3 CIDs, got %d", len(cids)) 517 } 518 }) 519 520 t.Run("GetBoundaryCIDs_EmptyOperations", func(t *testing.T) { 521 operations := []plcclient.PLCOperation{} 522 boundaryTime, cids := ops.GetBoundaryCIDs(operations) 523 524 if !boundaryTime.IsZero() { 525 t.Error("expected zero time for empty operations") 526 } 527 if len(cids) > 0 { 528 t.Error("expected nil or empty CID set") 529 } 530 }) 531 532 t.Run("StripBoundaryDuplicates_ActualDuplication", func(t *testing.T) { 533 // CRITICAL: This prevents duplicate operations across bundle boundaries 534 baseTime := time.Now() 535 boundaryTimestamp := baseTime.Format(time.RFC3339Nano) 536 537 prevBoundaryCIDs := map[string]bool{ 538 "cid3": true, 539 "cid4": true, 540 } 541 542 operations := []plcclient.PLCOperation{ 543 {CID: "cid3", CreatedAt: baseTime}, // Duplicate - should be stripped 544 {CID: "cid4", CreatedAt: baseTime}, // Duplicate - should be stripped 545 {CID: "cid5", CreatedAt: baseTime}, // New - should be kept 546 {CID: "cid6", CreatedAt: baseTime.Add(1 * time.Second)}, // After boundary - kept 547 } 548 549 result := ops.StripBoundaryDuplicates(operations, boundaryTimestamp, prevBoundaryCIDs) 550 551 if len(result) != 2 { 552 t.Errorf("expected 2 operations after stripping, got %d", len(result)) 553 } 554 555 if result[0].CID != "cid5" { 556 t.Errorf("expected cid5 first, got %s", result[0].CID) 557 } 558 if result[1].CID != "cid6" { 559 t.Errorf("expected cid6 second, got %s", result[1].CID) 560 } 561 }) 562 563 t.Run("StripBoundaryDuplicates_NoDuplicates", func(t *testing.T) { 564 baseTime := time.Now() 565 boundaryTimestamp := baseTime.Format(time.RFC3339Nano) 566 567 prevBoundaryCIDs := map[string]bool{ 568 "old_cid": true, 569 } 570 571 operations := []plcclient.PLCOperation{ 572 {CID: "cid1", CreatedAt: baseTime.Add(1 * time.Second)}, 573 {CID: "cid2", CreatedAt: baseTime.Add(2 * time.Second)}, 574 } 575 576 result := ops.StripBoundaryDuplicates(operations, boundaryTimestamp, prevBoundaryCIDs) 577 578 if len(result) != 2 { 579 t.Errorf("expected 2 operations, got %d", len(result)) 580 } 581 }) 582 583 t.Run("StripBoundaryDuplicates_EmptyPrevious", func(t *testing.T) { 584 baseTime := time.Now() 585 operations := makeTestOperations(10) 586 587 result := ops.StripBoundaryDuplicates(operations, baseTime.Format(time.RFC3339Nano), nil) 588 589 if len(result) != len(operations) { 590 t.Error("should not strip anything with no previous boundary CIDs") 591 } 592 }) 593} 594 595// ==================================================================================== 596// SERIALIZATION TESTS 597// ==================================================================================== 598 599func TestStorageSerialization(t *testing.T) { 600 logger := &testLogger{t: t} 601 ops, err := storage.NewOperations(logger, false) 602 if err != nil { 603 t.Fatalf("NewOperations failed: %v", err) 604 } 605 defer ops.Close() 606 607 t.Run("SerializeJSONL_PreservesRawJSON", func(t *testing.T) { 608 rawJSON := []byte(`{"did":"did:plc:test","cid":"bafytest","createdAt":"2024-01-01T00:00:00.000Z"}`) 609 op := plcclient.PLCOperation{ 610 DID: "did:plc:test", 611 CID: "bafytest", 612 CreatedAt: time.Now(), 613 RawJSON: rawJSON, 614 } 615 616 result := ops.SerializeJSONL([]plcclient.PLCOperation{op}) 617 618 // Should use RawJSON directly 619 if !containsBytes(result, rawJSON) { 620 t.Error("SerializeJSONL did not preserve RawJSON") 621 } 622 }) 623 624 t.Run("SerializeJSONL_MarshalsFallback", func(t *testing.T) { 625 op := plcclient.PLCOperation{ 626 DID: "did:plc:test", 627 CID: "bafytest", 628 CreatedAt: time.Now(), 629 // No RawJSON - should marshal 630 } 631 632 result := ops.SerializeJSONL([]plcclient.PLCOperation{op}) 633 634 if len(result) == 0 { 635 t.Error("SerializeJSONL returned empty result") 636 } 637 638 // Should contain the DID 639 if !containsBytes(result, []byte("did:plc:test")) { 640 t.Error("serialized data missing DID") 641 } 642 }) 643 644 t.Run("ParseJSONL_RoundTrip", func(t *testing.T) { 645 original := makeTestOperations(100) 646 data := ops.SerializeJSONL(original) 647 648 parsed, err := ops.ParseJSONL(data) 649 if err != nil { 650 t.Fatalf("ParseJSONL failed: %v", err) 651 } 652 653 if len(parsed) != len(original) { 654 t.Errorf("count mismatch: got %d, want %d", len(parsed), len(original)) 655 } 656 657 // Verify RawJSON is populated 658 for i, op := range parsed { 659 if len(op.RawJSON) == 0 { 660 t.Errorf("operation %d missing RawJSON", i) 661 } 662 } 663 }) 664} 665 666// ==================================================================================== 667// UTILITY FUNCTION TESTS 668// ==================================================================================== 669 670func TestStorageUtilities(t *testing.T) { 671 tmpDir := t.TempDir() 672 logger := &testLogger{t: t} 673 ops, err := storage.NewOperations(logger, false) 674 if err != nil { 675 t.Fatalf("NewOperations failed: %v", err) 676 } 677 defer ops.Close() 678 679 t.Run("ExtractUniqueDIDs", func(t *testing.T) { 680 operations := []plcclient.PLCOperation{ 681 {DID: "did:plc:aaa"}, 682 {DID: "did:plc:bbb"}, 683 {DID: "did:plc:aaa"}, // Duplicate 684 {DID: "did:plc:ccc"}, 685 {DID: "did:plc:bbb"}, // Duplicate 686 {DID: "did:plc:aaa"}, // Duplicate 687 } 688 689 dids := ops.ExtractUniqueDIDs(operations) 690 691 if len(dids) != 3 { 692 t.Errorf("expected 3 unique DIDs, got %d", len(dids)) 693 } 694 695 // Verify all expected DIDs present 696 didSet := make(map[string]bool) 697 for _, did := range dids { 698 didSet[did] = true 699 } 700 701 for _, expectedDID := range []string{"did:plc:aaa", "did:plc:bbb", "did:plc:ccc"} { 702 if !didSet[expectedDID] { 703 t.Errorf("missing expected DID: %s", expectedDID) 704 } 705 } 706 }) 707 708 t.Run("ExtractUniqueDIDs_Empty", func(t *testing.T) { 709 dids := ops.ExtractUniqueDIDs([]plcclient.PLCOperation{}) 710 if len(dids) != 0 { 711 t.Error("expected empty result for empty input") 712 } 713 }) 714 715 t.Run("FileExists", func(t *testing.T) { 716 existingFile := filepath.Join(tmpDir, "exists.txt") 717 os.WriteFile(existingFile, []byte("test"), 0644) 718 719 if !ops.FileExists(existingFile) { 720 t.Error("FileExists returned false for existing file") 721 } 722 723 if ops.FileExists(filepath.Join(tmpDir, "nonexistent.txt")) { 724 t.Error("FileExists returned true for nonexistent file") 725 } 726 }) 727 728 t.Run("GetFileSize", func(t *testing.T) { 729 testFile := filepath.Join(tmpDir, "size_test.txt") 730 testData := []byte("exactly 12 b") 731 os.WriteFile(testFile, testData, 0644) 732 733 size, err := ops.GetFileSize(testFile) 734 if err != nil { 735 t.Fatalf("GetFileSize failed: %v", err) 736 } 737 738 if size != int64(len(testData)) { 739 t.Errorf("size mismatch: got %d, want %d", size, len(testData)) 740 } 741 }) 742} 743 744// ==================================================================================== 745// STREAMING TESTS 746// ==================================================================================== 747 748func TestStorageStreaming(t *testing.T) { 749 tmpDir := t.TempDir() 750 logger := &testLogger{t: t} 751 ops, err := storage.NewOperations(logger, false) 752 if err != nil { 753 t.Fatalf("NewOperations failed: %v", err) 754 } 755 defer ops.Close() 756 757 t.Run("StreamRaw", func(t *testing.T) { 758 operations := makeTestOperations(100) 759 path := filepath.Join(tmpDir, "stream_raw.jsonl.zst") 760 _, _, _, _, err := ops.SaveBundle(path, operations, bundleInfo) 761 if err != nil { 762 t.Fatalf("SaveBundle failed: %v", err) 763 } 764 765 reader, err := ops.StreamRaw(path) 766 if err != nil { 767 t.Fatalf("StreamRaw failed: %v", err) 768 } 769 defer reader.Close() 770 771 // Read all data 772 data := make([]byte, 1024*1024) 773 n, err := reader.Read(data) 774 if err != nil && err.Error() != "EOF" { 775 t.Fatalf("Read failed: %v", err) 776 } 777 778 if n == 0 { 779 t.Error("StreamRaw returned no data") 780 } 781 }) 782 783 t.Run("StreamDecompressed", func(t *testing.T) { 784 operations := makeTestOperations(100) 785 path := filepath.Join(tmpDir, "stream_decomp.jsonl.zst") 786 ops.SaveBundle(path, operations, bundleInfo) 787 788 reader, err := ops.StreamDecompressed(path) 789 if err != nil { 790 t.Fatalf("StreamDecompressed failed: %v", err) 791 } 792 defer reader.Close() 793 794 // Count JSONL lines 795 scanner := bufio.NewScanner(reader) 796 lineCount := 0 797 for scanner.Scan() { 798 lineCount++ 799 } 800 801 if lineCount != 100 { 802 t.Errorf("expected 100 lines, got %d", lineCount) 803 } 804 }) 805} 806 807// ==================================================================================== 808// PERFORMANCE / BENCHMARK TESTS 809// ==================================================================================== 810 811func BenchmarkStorageOperations(b *testing.B) { 812 tmpDir := b.TempDir() 813 logger := &testLogger{t: &testing.T{}} 814 ops, _ := storage.NewOperations(logger, false) 815 defer ops.Close() 816 817 operations := makeTestOperations(10000) 818 819 b.Run("SaveBundle", func(b *testing.B) { 820 for i := 0; i < b.N; i++ { 821 path := filepath.Join(tmpDir, fmt.Sprintf("bench_%d.jsonl.zst", i)) 822 ops.SaveBundle(path, operations, bundleInfo) 823 } 824 }) 825 826 // Create bundle for read benchmarks 827 testPath := filepath.Join(tmpDir, "bench_read.jsonl.zst") 828 ops.SaveBundle(testPath, operations, nil) 829 830 b.Run("LoadBundle", func(b *testing.B) { 831 for i := 0; i < b.N; i++ { 832 ops.LoadBundle(testPath) 833 } 834 }) 835 836 b.Run("LoadOperationAtPosition", func(b *testing.B) { 837 for i := 0; i < b.N; i++ { 838 ops.LoadOperationAtPosition(testPath, i%10000) 839 } 840 }) 841 842 b.Run("Hash", func(b *testing.B) { 843 data := ops.SerializeJSONL(operations) 844 b.ResetTimer() 845 for i := 0; i < b.N; i++ { 846 ops.Hash(data) 847 } 848 }) 849 850 b.Run("SerializeJSONL", func(b *testing.B) { 851 for i := 0; i < b.N; i++ { 852 ops.SerializeJSONL(operations) 853 } 854 }) 855} 856 857// ==================================================================================== 858// HELPER FUNCTIONS 859// ==================================================================================== 860 861func makeTestOperations(count int) []plcclient.PLCOperation { 862 ops := make([]plcclient.PLCOperation, count) 863 baseTime := time.Now().Add(-time.Hour) 864 865 for i := 0; i < count; i++ { 866 ops[i] = plcclient.PLCOperation{ 867 DID: fmt.Sprintf("did:plc:test%06d", i), 868 CID: fmt.Sprintf("bafy%06d", i), 869 CreatedAt: baseTime.Add(time.Duration(i) * time.Second), 870 } 871 } 872 873 return ops 874} 875 876func containsBytes(haystack, needle []byte) bool { 877 return bytes.Contains(haystack, needle) 878}