[DEPRECATED] Go implementation of plcbundle
1package storage_test
2
3import (
4 "bufio"
5 "bytes"
6 "fmt"
7 "os"
8 "path/filepath"
9 "sync"
10 "testing"
11 "time"
12
13 "tangled.org/atscan.net/plcbundle/internal/plcclient"
14 "tangled.org/atscan.net/plcbundle/internal/storage"
15)
16
17type testLogger struct {
18 t *testing.T
19}
20
21func (l *testLogger) Printf(format string, v ...interface{}) {
22 l.t.Logf(format, v...)
23}
24
25func (l *testLogger) Println(v ...interface{}) {
26 l.t.Log(v...)
27}
28
29var (
30 bundleInfo = &storage.BundleInfo{
31 BundleNumber: 1,
32 Origin: "test-origin",
33 ParentHash: "",
34 Cursor: "",
35 CreatedBy: "test",
36 Hostname: "test-host",
37 }
38)
39
40// ====================================================================================
41// COMPRESSION TESTS
42// ====================================================================================
43
44func TestStorageCompression(t *testing.T) {
45 tmpDir := t.TempDir()
46 logger := &testLogger{t: t}
47 ops, err := storage.NewOperations(logger, false)
48 if err != nil {
49 t.Fatalf("NewOperations failed: %v", err)
50 }
51 defer ops.Close()
52
53 t.Run("RoundTripCompression", func(t *testing.T) {
54 tests := []struct {
55 name string
56 count int
57 }{
58 {"Empty", 0},
59 {"Single", 1},
60 {"Small", 10},
61 {"Medium", 100},
62 {"Large", 1000},
63 {"FullBundle", 10000},
64 }
65
66 for _, tt := range tests {
67 t.Run(tt.name, func(t *testing.T) {
68 if tt.count == 0 {
69 return // Skip empty for now
70 }
71
72 original := makeTestOperations(tt.count)
73 path := filepath.Join(tmpDir, tt.name+".jsonl.zst")
74
75 // Save
76 _, _, _, _, err := ops.SaveBundle(path, original, bundleInfo)
77 if err != nil {
78 t.Fatalf("SaveBundle failed: %v", err)
79 }
80
81 // Load
82 loaded, err := ops.LoadBundle(path)
83 if err != nil {
84 t.Fatalf("LoadBundle failed: %v", err)
85 }
86
87 // Verify count
88 if len(loaded) != len(original) {
89 t.Errorf("count mismatch: got %d, want %d", len(loaded), len(original))
90 }
91
92 // Verify each operation
93 for i := range original {
94 if loaded[i].DID != original[i].DID {
95 t.Errorf("op %d DID mismatch: got %s, want %s", i, loaded[i].DID, original[i].DID)
96 }
97 if loaded[i].CID != original[i].CID {
98 t.Errorf("op %d CID mismatch: got %s, want %s", i, loaded[i].CID, original[i].CID)
99 }
100 if !loaded[i].CreatedAt.Equal(original[i].CreatedAt) {
101 t.Errorf("op %d timestamp mismatch", i)
102 }
103 }
104 })
105 }
106 })
107
108 t.Run("CompressionRatio", func(t *testing.T) {
109 operations := makeTestOperations(10000)
110 path := filepath.Join(tmpDir, "compression_test.jsonl.zst")
111
112 _, _, uncompSize, compSize, err := ops.SaveBundle(path, operations, bundleInfo)
113 if err != nil {
114 t.Fatalf("SaveBundle failed: %v", err)
115 }
116
117 if compSize >= uncompSize {
118 t.Errorf("compression failed: compressed=%d >= uncompressed=%d", compSize, uncompSize)
119 }
120
121 ratio := float64(uncompSize) / float64(compSize)
122 if ratio < 2.0 {
123 t.Errorf("poor compression ratio: %.2fx (expected > 2.0x)", ratio)
124 }
125
126 t.Logf("Compression ratio: %.2fx (%d → %d bytes)", ratio, uncompSize, compSize)
127 })
128
129 t.Run("CompressedDataIntegrity", func(t *testing.T) {
130 operations := makeTestOperations(100)
131 path := filepath.Join(tmpDir, "integrity_test.jsonl.zst")
132
133 contentHash, compHash, _, _, err := ops.SaveBundle(path, operations, bundleInfo)
134 if err != nil {
135 t.Fatalf("SaveBundle failed: %v", err)
136 }
137
138 // Recalculate hashes
139 calcCompHash, _, calcContentHash, _, err := ops.CalculateFileHashes(path)
140 if err != nil {
141 t.Fatalf("CalculateFileHashes failed: %v", err)
142 }
143
144 if calcCompHash != compHash {
145 t.Errorf("compressed hash mismatch: got %s, want %s", calcCompHash, compHash)
146 }
147
148 if calcContentHash != contentHash {
149 t.Errorf("content hash mismatch: got %s, want %s", calcContentHash, contentHash)
150 }
151 })
152}
153
154// ====================================================================================
155// HASHING TESTS - CRITICAL FOR CHAIN INTEGRITY
156// ====================================================================================
157
158func TestStorageHashing(t *testing.T) {
159 logger := &testLogger{t: t}
160 ops, err := storage.NewOperations(logger, false)
161 if err != nil {
162 t.Fatalf("NewOperations failed: %v", err)
163 }
164 defer ops.Close()
165
166 t.Run("HashDeterminism", func(t *testing.T) {
167 data := []byte("test data for hashing")
168
169 // Calculate hash multiple times
170 hashes := make([]string, 100)
171 for i := 0; i < 100; i++ {
172 hashes[i] = ops.Hash(data)
173 }
174
175 // All should be identical
176 firstHash := hashes[0]
177 for i, h := range hashes {
178 if h != firstHash {
179 t.Errorf("hash %d differs: got %s, want %s", i, h, firstHash)
180 }
181 }
182
183 // Verify it's actually a valid SHA256 hex (64 chars)
184 if len(firstHash) != 64 {
185 t.Errorf("invalid hash length: got %d, want 64", len(firstHash))
186 }
187 })
188
189 t.Run("ChainHashCalculation", func(t *testing.T) {
190 contentHash := "abc123def456"
191
192 // Genesis bundle (no parent)
193 genesisHash := ops.CalculateChainHash("", contentHash)
194 expectedGenesis := ops.Hash([]byte("plcbundle:genesis:" + contentHash))
195 if genesisHash != expectedGenesis {
196 t.Errorf("genesis hash mismatch: got %s, want %s", genesisHash, expectedGenesis)
197 }
198
199 // Second bundle (has parent)
200 parentHash := genesisHash
201 childHash := ops.CalculateChainHash(parentHash, contentHash)
202 expectedChild := ops.Hash([]byte(parentHash + ":" + contentHash))
203 if childHash != expectedChild {
204 t.Errorf("child hash mismatch: got %s, want %s", childHash, expectedChild)
205 }
206
207 // Chain continues
208 grandchildHash := ops.CalculateChainHash(childHash, contentHash)
209 expectedGrandchild := ops.Hash([]byte(childHash + ":" + contentHash))
210 if grandchildHash != expectedGrandchild {
211 t.Errorf("grandchild hash mismatch")
212 }
213 })
214
215 t.Run("HashSensitivity", func(t *testing.T) {
216 // Small changes should produce completely different hashes
217 data1 := []byte("test data")
218 data2 := []byte("test datb") // Changed one char
219 data3 := []byte("test data ") // Added space
220
221 hash1 := ops.Hash(data1)
222 hash2 := ops.Hash(data2)
223 hash3 := ops.Hash(data3)
224
225 if hash1 == hash2 {
226 t.Error("different data produced same hash (collision!)")
227 }
228 if hash1 == hash3 {
229 t.Error("different data produced same hash (collision!)")
230 }
231 })
232
233 t.Run("EmptyDataHash", func(t *testing.T) {
234 hash := ops.Hash([]byte{})
235 if len(hash) != 64 {
236 t.Errorf("empty data hash invalid length: %d", len(hash))
237 }
238 // SHA256 of empty string is known constant
239 // e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855
240 expected := "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
241 if hash != expected {
242 t.Errorf("empty data hash mismatch: got %s, want %s", hash, expected)
243 }
244 })
245}
246
247// ====================================================================================
248// CONCURRENCY TESTS - CRITICAL FOR PRODUCTION
249// ====================================================================================
250
251func TestStorageConcurrency(t *testing.T) {
252 tmpDir := t.TempDir()
253 logger := &testLogger{t: t}
254 ops, err := storage.NewOperations(logger, false)
255 if err != nil {
256 t.Fatalf("NewOperations failed: %v", err)
257 }
258 defer ops.Close()
259
260 t.Run("ParallelBundleReads", func(t *testing.T) {
261 // Create test bundle
262 operations := makeTestOperations(10000)
263 path := filepath.Join(tmpDir, "parallel_test.jsonl.zst")
264 _, _, _, _, err := ops.SaveBundle(path, operations, bundleInfo)
265 if err != nil {
266 t.Fatalf("SaveBundle failed: %v", err)
267 }
268
269 // Read from 100 goroutines simultaneously
270 var wg sync.WaitGroup
271 errors := make(chan error, 100)
272
273 for i := 0; i < 100; i++ {
274 wg.Add(1)
275 go func(id int) {
276 defer wg.Done()
277 loaded, err := ops.LoadBundle(path)
278 if err != nil {
279 errors <- err
280 return
281 }
282 if len(loaded) != 10000 {
283 errors <- err
284 }
285 }(i)
286 }
287
288 wg.Wait()
289 close(errors)
290
291 for err := range errors {
292 t.Errorf("concurrent read error: %v", err)
293 }
294 })
295
296 t.Run("LoadOperationAtPositionConcurrency", func(t *testing.T) {
297 // Critical test - this is heavily used by DID lookups
298 operations := makeTestOperations(10000)
299 path := filepath.Join(tmpDir, "position_test.jsonl.zst")
300 _, _, _, _, err := ops.SaveBundle(path, operations, bundleInfo)
301 if err != nil {
302 t.Fatalf("SaveBundle failed: %v", err)
303 }
304
305 // 200 concurrent position reads
306 var wg sync.WaitGroup
307 errors := make(chan error, 200)
308
309 for i := 0; i < 200; i++ {
310 wg.Add(1)
311 go func(position int) {
312 defer wg.Done()
313 op, err := ops.LoadOperationAtPosition(path, position%10000)
314 if err != nil {
315 errors <- err
316 return
317 }
318 if op == nil {
319 errors <- err
320 }
321 }(i)
322 }
323
324 wg.Wait()
325 close(errors)
326
327 for err := range errors {
328 t.Errorf("concurrent position read error: %v", err)
329 }
330 })
331
332 t.Run("ConcurrentHashVerification", func(t *testing.T) {
333 operations := makeTestOperations(1000)
334 path := filepath.Join(tmpDir, "verify_test.jsonl.zst")
335 _, compHash, _, _, err := ops.SaveBundle(path, operations, bundleInfo)
336 if err != nil {
337 t.Fatalf("SaveBundle failed: %v", err)
338 }
339
340 var wg sync.WaitGroup
341 for i := 0; i < 50; i++ {
342 wg.Add(1)
343 go func() {
344 defer wg.Done()
345 valid, _, err := ops.VerifyHash(path, compHash)
346 if err != nil {
347 t.Errorf("VerifyHash failed: %v", err)
348 }
349 if !valid {
350 t.Error("hash verification failed")
351 }
352 }()
353 }
354 wg.Wait()
355 })
356}
357
358// ====================================================================================
359// EDGE CASES & ERROR HANDLING
360// ====================================================================================
361
362func TestStorageEdgeCases(t *testing.T) {
363 tmpDir := t.TempDir()
364 logger := &testLogger{t: t}
365 ops, err := storage.NewOperations(logger, false)
366 if err != nil {
367 t.Fatalf("NewOperations failed: %v", err)
368 }
369 defer ops.Close()
370
371 t.Run("CorruptedZstdFile", func(t *testing.T) {
372 path := filepath.Join(tmpDir, "corrupted.jsonl.zst")
373 // Write invalid zstd data
374 os.WriteFile(path, []byte("this is not valid zstd data"), 0644)
375
376 _, err := ops.LoadBundle(path)
377 if err == nil {
378 t.Error("expected error loading corrupted file, got nil")
379 }
380 })
381
382 t.Run("TruncatedFile", func(t *testing.T) {
383 operations := makeTestOperations(100)
384 path := filepath.Join(tmpDir, "truncated.jsonl.zst")
385 ops.SaveBundle(path, operations, bundleInfo)
386
387 // Read and truncate
388 data, _ := os.ReadFile(path)
389 os.WriteFile(path, data[:len(data)/2], 0644)
390
391 _, err := ops.LoadBundle(path)
392 if err == nil {
393 t.Error("expected error loading truncated file, got nil")
394 }
395 })
396
397 t.Run("InvalidJSONL", func(t *testing.T) {
398 path := filepath.Join(tmpDir, "invalid.jsonl.zst")
399 invalidData := []byte("{invalid json}\n{also invalid}")
400
401 // Manually compress invalid data
402 operations := makeTestOperations(10)
403 ops.SaveBundle(path, operations, bundleInfo) // Create valid file first
404
405 // Now corrupt it with invalid JSON
406 // This is hard to test properly since SaveBundle enforces valid data
407 // Better to test ParseJSONL directly
408 _, err := ops.ParseJSONL(invalidData)
409 if err == nil {
410 t.Error("expected error parsing invalid JSONL, got nil")
411 }
412 })
413
414 t.Run("NonExistentFile", func(t *testing.T) {
415 _, err := ops.LoadBundle("/nonexistent/path/file.jsonl.zst")
416 if err == nil {
417 t.Error("expected error loading nonexistent file, got nil")
418 }
419 })
420
421 t.Run("InvalidPosition", func(t *testing.T) {
422 operations := makeTestOperations(100)
423 path := filepath.Join(tmpDir, "position_test.jsonl.zst")
424 ops.SaveBundle(path, operations, bundleInfo)
425
426 // Negative position
427 _, err := ops.LoadOperationAtPosition(path, -1)
428 if err == nil {
429 t.Error("expected error for negative position")
430 }
431
432 // Position beyond file
433 _, err = ops.LoadOperationAtPosition(path, 10000)
434 if err == nil {
435 t.Error("expected error for position beyond file")
436 }
437 })
438}
439
440// ====================================================================================
441// BOUNDARY CONDITIONS - CRITICAL FOR BUNDLE CHAINING
442// ====================================================================================
443
444func TestStorageBoundaryConditions(t *testing.T) {
445 logger := &testLogger{t: t}
446 ops, err := storage.NewOperations(logger, false)
447 if err != nil {
448 t.Fatalf("NewOperations failed: %v", err)
449 }
450 defer ops.Close()
451
452 t.Run("GetBoundaryCIDs_SingleOperation", func(t *testing.T) {
453 baseTime := time.Now()
454 operations := []plcclient.PLCOperation{
455 {CID: "cid1", CreatedAt: baseTime},
456 }
457
458 boundaryTime, cids := ops.GetBoundaryCIDs(operations)
459
460 if !boundaryTime.Equal(baseTime) {
461 t.Error("boundary time mismatch")
462 }
463 if len(cids) != 1 {
464 t.Errorf("expected 1 boundary CID, got %d", len(cids))
465 }
466 if !cids["cid1"] {
467 t.Error("expected cid1 in boundary set")
468 }
469 })
470
471 t.Run("GetBoundaryCIDs_MultipleSameTimestamp", func(t *testing.T) {
472 // CRITICAL: Operations with identical timestamps (happens in real data)
473 baseTime := time.Now()
474 operations := []plcclient.PLCOperation{
475 {CID: "cid1", CreatedAt: baseTime.Add(-2 * time.Second)},
476 {CID: "cid2", CreatedAt: baseTime.Add(-1 * time.Second)},
477 {CID: "cid3", CreatedAt: baseTime}, // Last timestamp
478 {CID: "cid4", CreatedAt: baseTime}, // Same as cid3
479 {CID: "cid5", CreatedAt: baseTime}, // Same as cid3
480 }
481
482 boundaryTime, cids := ops.GetBoundaryCIDs(operations)
483
484 if !boundaryTime.Equal(baseTime) {
485 t.Error("boundary time should be last operation time")
486 }
487
488 // Should return ALL CIDs with the last timestamp
489 if len(cids) != 3 {
490 t.Errorf("expected 3 boundary CIDs, got %d", len(cids))
491 }
492
493 for _, expectedCID := range []string{"cid3", "cid4", "cid5"} {
494 if !cids[expectedCID] {
495 t.Errorf("expected %s in boundary set", expectedCID)
496 }
497 }
498
499 // Earlier CIDs should NOT be in set
500 if cids["cid1"] || cids["cid2"] {
501 t.Error("earlier CIDs should not be in boundary set")
502 }
503 })
504
505 t.Run("GetBoundaryCIDs_AllSameTimestamp", func(t *testing.T) {
506 baseTime := time.Now()
507 operations := []plcclient.PLCOperation{
508 {CID: "cid1", CreatedAt: baseTime},
509 {CID: "cid2", CreatedAt: baseTime},
510 {CID: "cid3", CreatedAt: baseTime},
511 }
512
513 _, cids := ops.GetBoundaryCIDs(operations)
514
515 if len(cids) != 3 {
516 t.Errorf("expected all 3 CIDs, got %d", len(cids))
517 }
518 })
519
520 t.Run("GetBoundaryCIDs_EmptyOperations", func(t *testing.T) {
521 operations := []plcclient.PLCOperation{}
522 boundaryTime, cids := ops.GetBoundaryCIDs(operations)
523
524 if !boundaryTime.IsZero() {
525 t.Error("expected zero time for empty operations")
526 }
527 if len(cids) > 0 {
528 t.Error("expected nil or empty CID set")
529 }
530 })
531
532 t.Run("StripBoundaryDuplicates_ActualDuplication", func(t *testing.T) {
533 // CRITICAL: This prevents duplicate operations across bundle boundaries
534 baseTime := time.Now()
535 boundaryTimestamp := baseTime.Format(time.RFC3339Nano)
536
537 prevBoundaryCIDs := map[string]bool{
538 "cid3": true,
539 "cid4": true,
540 }
541
542 operations := []plcclient.PLCOperation{
543 {CID: "cid3", CreatedAt: baseTime}, // Duplicate - should be stripped
544 {CID: "cid4", CreatedAt: baseTime}, // Duplicate - should be stripped
545 {CID: "cid5", CreatedAt: baseTime}, // New - should be kept
546 {CID: "cid6", CreatedAt: baseTime.Add(1 * time.Second)}, // After boundary - kept
547 }
548
549 result := ops.StripBoundaryDuplicates(operations, boundaryTimestamp, prevBoundaryCIDs)
550
551 if len(result) != 2 {
552 t.Errorf("expected 2 operations after stripping, got %d", len(result))
553 }
554
555 if result[0].CID != "cid5" {
556 t.Errorf("expected cid5 first, got %s", result[0].CID)
557 }
558 if result[1].CID != "cid6" {
559 t.Errorf("expected cid6 second, got %s", result[1].CID)
560 }
561 })
562
563 t.Run("StripBoundaryDuplicates_NoDuplicates", func(t *testing.T) {
564 baseTime := time.Now()
565 boundaryTimestamp := baseTime.Format(time.RFC3339Nano)
566
567 prevBoundaryCIDs := map[string]bool{
568 "old_cid": true,
569 }
570
571 operations := []plcclient.PLCOperation{
572 {CID: "cid1", CreatedAt: baseTime.Add(1 * time.Second)},
573 {CID: "cid2", CreatedAt: baseTime.Add(2 * time.Second)},
574 }
575
576 result := ops.StripBoundaryDuplicates(operations, boundaryTimestamp, prevBoundaryCIDs)
577
578 if len(result) != 2 {
579 t.Errorf("expected 2 operations, got %d", len(result))
580 }
581 })
582
583 t.Run("StripBoundaryDuplicates_EmptyPrevious", func(t *testing.T) {
584 baseTime := time.Now()
585 operations := makeTestOperations(10)
586
587 result := ops.StripBoundaryDuplicates(operations, baseTime.Format(time.RFC3339Nano), nil)
588
589 if len(result) != len(operations) {
590 t.Error("should not strip anything with no previous boundary CIDs")
591 }
592 })
593}
594
595// ====================================================================================
596// SERIALIZATION TESTS
597// ====================================================================================
598
599func TestStorageSerialization(t *testing.T) {
600 logger := &testLogger{t: t}
601 ops, err := storage.NewOperations(logger, false)
602 if err != nil {
603 t.Fatalf("NewOperations failed: %v", err)
604 }
605 defer ops.Close()
606
607 t.Run("SerializeJSONL_PreservesRawJSON", func(t *testing.T) {
608 rawJSON := []byte(`{"did":"did:plc:test","cid":"bafytest","createdAt":"2024-01-01T00:00:00.000Z"}`)
609 op := plcclient.PLCOperation{
610 DID: "did:plc:test",
611 CID: "bafytest",
612 CreatedAt: time.Now(),
613 RawJSON: rawJSON,
614 }
615
616 result := ops.SerializeJSONL([]plcclient.PLCOperation{op})
617
618 // Should use RawJSON directly
619 if !containsBytes(result, rawJSON) {
620 t.Error("SerializeJSONL did not preserve RawJSON")
621 }
622 })
623
624 t.Run("SerializeJSONL_MarshalsFallback", func(t *testing.T) {
625 op := plcclient.PLCOperation{
626 DID: "did:plc:test",
627 CID: "bafytest",
628 CreatedAt: time.Now(),
629 // No RawJSON - should marshal
630 }
631
632 result := ops.SerializeJSONL([]plcclient.PLCOperation{op})
633
634 if len(result) == 0 {
635 t.Error("SerializeJSONL returned empty result")
636 }
637
638 // Should contain the DID
639 if !containsBytes(result, []byte("did:plc:test")) {
640 t.Error("serialized data missing DID")
641 }
642 })
643
644 t.Run("ParseJSONL_RoundTrip", func(t *testing.T) {
645 original := makeTestOperations(100)
646 data := ops.SerializeJSONL(original)
647
648 parsed, err := ops.ParseJSONL(data)
649 if err != nil {
650 t.Fatalf("ParseJSONL failed: %v", err)
651 }
652
653 if len(parsed) != len(original) {
654 t.Errorf("count mismatch: got %d, want %d", len(parsed), len(original))
655 }
656
657 // Verify RawJSON is populated
658 for i, op := range parsed {
659 if len(op.RawJSON) == 0 {
660 t.Errorf("operation %d missing RawJSON", i)
661 }
662 }
663 })
664}
665
666// ====================================================================================
667// UTILITY FUNCTION TESTS
668// ====================================================================================
669
670func TestStorageUtilities(t *testing.T) {
671 tmpDir := t.TempDir()
672 logger := &testLogger{t: t}
673 ops, err := storage.NewOperations(logger, false)
674 if err != nil {
675 t.Fatalf("NewOperations failed: %v", err)
676 }
677 defer ops.Close()
678
679 t.Run("ExtractUniqueDIDs", func(t *testing.T) {
680 operations := []plcclient.PLCOperation{
681 {DID: "did:plc:aaa"},
682 {DID: "did:plc:bbb"},
683 {DID: "did:plc:aaa"}, // Duplicate
684 {DID: "did:plc:ccc"},
685 {DID: "did:plc:bbb"}, // Duplicate
686 {DID: "did:plc:aaa"}, // Duplicate
687 }
688
689 dids := ops.ExtractUniqueDIDs(operations)
690
691 if len(dids) != 3 {
692 t.Errorf("expected 3 unique DIDs, got %d", len(dids))
693 }
694
695 // Verify all expected DIDs present
696 didSet := make(map[string]bool)
697 for _, did := range dids {
698 didSet[did] = true
699 }
700
701 for _, expectedDID := range []string{"did:plc:aaa", "did:plc:bbb", "did:plc:ccc"} {
702 if !didSet[expectedDID] {
703 t.Errorf("missing expected DID: %s", expectedDID)
704 }
705 }
706 })
707
708 t.Run("ExtractUniqueDIDs_Empty", func(t *testing.T) {
709 dids := ops.ExtractUniqueDIDs([]plcclient.PLCOperation{})
710 if len(dids) != 0 {
711 t.Error("expected empty result for empty input")
712 }
713 })
714
715 t.Run("FileExists", func(t *testing.T) {
716 existingFile := filepath.Join(tmpDir, "exists.txt")
717 os.WriteFile(existingFile, []byte("test"), 0644)
718
719 if !ops.FileExists(existingFile) {
720 t.Error("FileExists returned false for existing file")
721 }
722
723 if ops.FileExists(filepath.Join(tmpDir, "nonexistent.txt")) {
724 t.Error("FileExists returned true for nonexistent file")
725 }
726 })
727
728 t.Run("GetFileSize", func(t *testing.T) {
729 testFile := filepath.Join(tmpDir, "size_test.txt")
730 testData := []byte("exactly 12 b")
731 os.WriteFile(testFile, testData, 0644)
732
733 size, err := ops.GetFileSize(testFile)
734 if err != nil {
735 t.Fatalf("GetFileSize failed: %v", err)
736 }
737
738 if size != int64(len(testData)) {
739 t.Errorf("size mismatch: got %d, want %d", size, len(testData))
740 }
741 })
742}
743
744// ====================================================================================
745// STREAMING TESTS
746// ====================================================================================
747
748func TestStorageStreaming(t *testing.T) {
749 tmpDir := t.TempDir()
750 logger := &testLogger{t: t}
751 ops, err := storage.NewOperations(logger, false)
752 if err != nil {
753 t.Fatalf("NewOperations failed: %v", err)
754 }
755 defer ops.Close()
756
757 t.Run("StreamRaw", func(t *testing.T) {
758 operations := makeTestOperations(100)
759 path := filepath.Join(tmpDir, "stream_raw.jsonl.zst")
760 _, _, _, _, err := ops.SaveBundle(path, operations, bundleInfo)
761 if err != nil {
762 t.Fatalf("SaveBundle failed: %v", err)
763 }
764
765 reader, err := ops.StreamRaw(path)
766 if err != nil {
767 t.Fatalf("StreamRaw failed: %v", err)
768 }
769 defer reader.Close()
770
771 // Read all data
772 data := make([]byte, 1024*1024)
773 n, err := reader.Read(data)
774 if err != nil && err.Error() != "EOF" {
775 t.Fatalf("Read failed: %v", err)
776 }
777
778 if n == 0 {
779 t.Error("StreamRaw returned no data")
780 }
781 })
782
783 t.Run("StreamDecompressed", func(t *testing.T) {
784 operations := makeTestOperations(100)
785 path := filepath.Join(tmpDir, "stream_decomp.jsonl.zst")
786 ops.SaveBundle(path, operations, bundleInfo)
787
788 reader, err := ops.StreamDecompressed(path)
789 if err != nil {
790 t.Fatalf("StreamDecompressed failed: %v", err)
791 }
792 defer reader.Close()
793
794 // Count JSONL lines
795 scanner := bufio.NewScanner(reader)
796 lineCount := 0
797 for scanner.Scan() {
798 lineCount++
799 }
800
801 if lineCount != 100 {
802 t.Errorf("expected 100 lines, got %d", lineCount)
803 }
804 })
805}
806
807// ====================================================================================
808// PERFORMANCE / BENCHMARK TESTS
809// ====================================================================================
810
811func BenchmarkStorageOperations(b *testing.B) {
812 tmpDir := b.TempDir()
813 logger := &testLogger{t: &testing.T{}}
814 ops, _ := storage.NewOperations(logger, false)
815 defer ops.Close()
816
817 operations := makeTestOperations(10000)
818
819 b.Run("SaveBundle", func(b *testing.B) {
820 for i := 0; i < b.N; i++ {
821 path := filepath.Join(tmpDir, fmt.Sprintf("bench_%d.jsonl.zst", i))
822 ops.SaveBundle(path, operations, bundleInfo)
823 }
824 })
825
826 // Create bundle for read benchmarks
827 testPath := filepath.Join(tmpDir, "bench_read.jsonl.zst")
828 ops.SaveBundle(testPath, operations, nil)
829
830 b.Run("LoadBundle", func(b *testing.B) {
831 for i := 0; i < b.N; i++ {
832 ops.LoadBundle(testPath)
833 }
834 })
835
836 b.Run("LoadOperationAtPosition", func(b *testing.B) {
837 for i := 0; i < b.N; i++ {
838 ops.LoadOperationAtPosition(testPath, i%10000)
839 }
840 })
841
842 b.Run("Hash", func(b *testing.B) {
843 data := ops.SerializeJSONL(operations)
844 b.ResetTimer()
845 for i := 0; i < b.N; i++ {
846 ops.Hash(data)
847 }
848 })
849
850 b.Run("SerializeJSONL", func(b *testing.B) {
851 for i := 0; i < b.N; i++ {
852 ops.SerializeJSONL(operations)
853 }
854 })
855}
856
857// ====================================================================================
858// HELPER FUNCTIONS
859// ====================================================================================
860
861func makeTestOperations(count int) []plcclient.PLCOperation {
862 ops := make([]plcclient.PLCOperation, count)
863 baseTime := time.Now().Add(-time.Hour)
864
865 for i := 0; i < count; i++ {
866 ops[i] = plcclient.PLCOperation{
867 DID: fmt.Sprintf("did:plc:test%06d", i),
868 CID: fmt.Sprintf("bafy%06d", i),
869 CreatedAt: baseTime.Add(time.Duration(i) * time.Second),
870 }
871 }
872
873 return ops
874}
875
876func containsBytes(haystack, needle []byte) bool {
877 return bytes.Contains(haystack, needle)
878}