[DEPRECATED] Go implementation of plcbundle
1package mempool_test
2
3import (
4 "fmt"
5 "os"
6 "path/filepath"
7 "sync"
8 "testing"
9 "time"
10
11 "tangled.org/atscan.net/plcbundle-go/internal/mempool"
12 "tangled.org/atscan.net/plcbundle-go/internal/plcclient"
13 "tangled.org/atscan.net/plcbundle-go/internal/types"
14)
15
16type testLogger struct {
17 t *testing.T
18}
19
20func (l *testLogger) Printf(format string, v ...interface{}) {
21 l.t.Logf(format, v...)
22}
23
24func (l *testLogger) Println(v ...interface{}) {
25 l.t.Log(v...)
26}
27
28// ====================================================================================
29// CHRONOLOGICAL VALIDATION - MOST CRITICAL
30// ====================================================================================
31
32func TestMempoolChronologicalStrict(t *testing.T) {
33 tmpDir := t.TempDir()
34 logger := &testLogger{t: t}
35 baseTime := time.Now().Add(-time.Hour)
36
37 t.Run("RejectOutOfOrder", func(t *testing.T) {
38 minTime := baseTime
39 m, err := mempool.NewMempool(tmpDir, 1, minTime, logger, false)
40 if err != nil {
41 t.Fatalf("NewMempool failed: %v", err)
42 }
43
44 // Add operations in order: 1, 2, 4
45 ops := []plcclient.PLCOperation{
46 {CID: "cid1", CreatedAt: baseTime.Add(1 * time.Second)},
47 {CID: "cid2", CreatedAt: baseTime.Add(2 * time.Second)},
48 {CID: "cid4", CreatedAt: baseTime.Add(4 * time.Second)},
49 }
50
51 _, err = m.Add(ops)
52 if err != nil {
53 t.Fatalf("Add failed: %v", err)
54 }
55
56 // Now try to add operation 3 (out of order)
57 outOfOrder := []plcclient.PLCOperation{
58 {CID: "cid3", CreatedAt: baseTime.Add(3 * time.Second)},
59 }
60
61 _, err = m.Add(outOfOrder)
62 if err == nil {
63 t.Error("expected chronological validation error, got nil")
64 }
65
66 if m.Count() != 3 {
67 t.Errorf("count should still be 3, got %d", m.Count())
68 }
69 })
70
71 t.Run("RejectBeforeMinTimestamp", func(t *testing.T) {
72 minTime := baseTime.Add(10 * time.Second)
73 m, err := mempool.NewMempool(tmpDir, 2, minTime, logger, false)
74 if err != nil {
75 t.Fatalf("NewMempool failed: %v", err)
76 }
77
78 // Try to add operation before min timestamp
79 tooEarly := []plcclient.PLCOperation{
80 {CID: "cid1", CreatedAt: baseTime}, // Before minTime
81 }
82
83 _, err = m.Add(tooEarly)
84 if err == nil {
85 t.Error("expected error for operation before min timestamp")
86 }
87 })
88
89 t.Run("AllowEqualTimestamps", func(t *testing.T) {
90 minTime := baseTime
91 m, err := mempool.NewMempool(tmpDir, 3, minTime, logger, false)
92 if err != nil {
93 t.Fatalf("NewMempool failed: %v", err)
94 }
95
96 // Multiple operations with same timestamp (happens in real PLC data)
97 sameTime := baseTime.Add(5 * time.Second)
98 ops := []plcclient.PLCOperation{
99 {CID: "cid1", CreatedAt: sameTime},
100 {CID: "cid2", CreatedAt: sameTime},
101 {CID: "cid3", CreatedAt: sameTime},
102 }
103
104 added, err := m.Add(ops)
105 if err != nil {
106 t.Fatalf("should allow equal timestamps: %v", err)
107 }
108
109 if added != 3 {
110 t.Errorf("expected 3 added, got %d", added)
111 }
112 })
113
114 t.Run("ChronologicalAfterReload", func(t *testing.T) {
115 minTime := baseTime
116 m, err := mempool.NewMempool(tmpDir, 4, minTime, logger, false)
117 if err != nil {
118 t.Fatalf("NewMempool failed: %v", err)
119 }
120
121 // Add some operations
122 ops1 := []plcclient.PLCOperation{
123 {CID: "cid1", CreatedAt: baseTime.Add(1 * time.Second)},
124 {CID: "cid2", CreatedAt: baseTime.Add(2 * time.Second)},
125 }
126 m.Add(ops1)
127 m.Save()
128
129 // Reload mempool
130 m2, err := mempool.NewMempool(tmpDir, 4, minTime, logger, false)
131 if err != nil {
132 t.Fatalf("NewMempool reload failed: %v", err)
133 }
134
135 // Try to add out-of-order operation
136 outOfOrder := []plcclient.PLCOperation{
137 {CID: "cid0", CreatedAt: baseTime}, // Before loaded ops
138 }
139
140 _, err = m2.Add(outOfOrder)
141 if err == nil {
142 t.Error("should reject out-of-order after reload")
143 }
144
145 // Add valid operation after loaded ones
146 validOps := []plcclient.PLCOperation{
147 {CID: "cid3", CreatedAt: baseTime.Add(3 * time.Second)},
148 }
149
150 added, err := m2.Add(validOps)
151 if err != nil {
152 t.Fatalf("should accept in-order operation: %v", err)
153 }
154
155 if added != 1 {
156 t.Error("should have added 1 operation")
157 }
158 })
159
160 t.Run("StrictIncreasingOrder", func(t *testing.T) {
161 minTime := baseTime
162 m, err := mempool.NewMempool(tmpDir, 5, minTime, logger, false)
163 if err != nil {
164 t.Fatalf("NewMempool failed: %v", err)
165 }
166
167 // Each operation must be >= previous timestamp
168 ops := []plcclient.PLCOperation{
169 {CID: "cid1", CreatedAt: baseTime.Add(1 * time.Second)},
170 {CID: "cid2", CreatedAt: baseTime.Add(2 * time.Second)},
171 {CID: "cid3", CreatedAt: baseTime.Add(2 * time.Second)}, // Equal - OK
172 {CID: "cid4", CreatedAt: baseTime.Add(3 * time.Second)},
173 }
174
175 added, err := m.Add(ops)
176 if err != nil {
177 t.Fatalf("should allow non-decreasing timestamps: %v", err)
178 }
179
180 if added != 4 {
181 t.Errorf("expected 4 added, got %d", added)
182 }
183 })
184}
185
186// ====================================================================================
187// DUPLICATE PREVENTION
188// ====================================================================================
189
190func TestMempoolDuplicatePrevention(t *testing.T) {
191 tmpDir := t.TempDir()
192 logger := &testLogger{t: t}
193 baseTime := time.Now().Add(-time.Hour)
194
195 t.Run("SameCIDTwice", func(t *testing.T) {
196 minTime := baseTime
197 m, err := mempool.NewMempool(tmpDir, 6, minTime, logger, false)
198 if err != nil {
199 t.Fatalf("NewMempool failed: %v", err)
200 }
201
202 op := plcclient.PLCOperation{
203 CID: "duplicate_cid",
204 DID: "did:plc:test",
205 CreatedAt: baseTime.Add(1 * time.Second),
206 }
207
208 // Add first time
209 added, err := m.Add([]plcclient.PLCOperation{op})
210 if err != nil {
211 t.Fatalf("first add failed: %v", err)
212 }
213 if added != 1 {
214 t.Error("first add should succeed")
215 }
216
217 // Add same CID again (should be silently skipped)
218 added, err = m.Add([]plcclient.PLCOperation{op})
219 if err != nil {
220 t.Fatalf("duplicate add should not error: %v", err)
221 }
222 if added != 0 {
223 t.Errorf("duplicate should be skipped, but added=%d", added)
224 }
225
226 if m.Count() != 1 {
227 t.Errorf("count should be 1, got %d", m.Count())
228 }
229 })
230
231 t.Run("DuplicateAcrossSaveLoad", func(t *testing.T) {
232 minTime := baseTime
233 m, err := mempool.NewMempool(tmpDir, 7, minTime, logger, false)
234 if err != nil {
235 t.Fatalf("NewMempool failed: %v", err)
236 }
237
238 op := plcclient.PLCOperation{
239 CID: "persistent_cid",
240 DID: "did:plc:test",
241 CreatedAt: baseTime.Add(1 * time.Second),
242 }
243
244 // Add and save
245 m.Add([]plcclient.PLCOperation{op})
246 m.Save()
247
248 // Reload
249 m2, err := mempool.NewMempool(tmpDir, 7, minTime, logger, false)
250 if err != nil {
251 t.Fatalf("reload failed: %v", err)
252 }
253
254 // Try to add same operation
255 added, err := m2.Add([]plcclient.PLCOperation{op})
256 if err != nil {
257 t.Fatalf("add after reload failed: %v", err)
258 }
259
260 if added != 0 {
261 t.Errorf("duplicate should be skipped after reload, added=%d", added)
262 }
263
264 if m2.Count() != 1 {
265 t.Errorf("count should be 1, got %d", m2.Count())
266 }
267 })
268
269 t.Run("DuplicatesInBatch", func(t *testing.T) {
270 minTime := baseTime
271 m, err := mempool.NewMempool(tmpDir, 8, minTime, logger, false)
272 if err != nil {
273 t.Fatalf("NewMempool failed: %v", err)
274 }
275
276 // Batch contains duplicates
277 ops := []plcclient.PLCOperation{
278 {CID: "cid1", DID: "did:plc:001", CreatedAt: baseTime.Add(1 * time.Second)},
279 {CID: "cid2", DID: "did:plc:002", CreatedAt: baseTime.Add(2 * time.Second)},
280 {CID: "cid1", DID: "did:plc:001", CreatedAt: baseTime.Add(3 * time.Second)}, // Duplicate CID
281 }
282
283 added, err := m.Add(ops)
284 if err != nil {
285 t.Fatalf("Add failed: %v", err)
286 }
287
288 // Should only add 2 (skip duplicate)
289 if added != 2 {
290 t.Errorf("expected 2 unique operations, added %d", added)
291 }
292
293 if m.Count() != 2 {
294 t.Errorf("count should be 2, got %d", m.Count())
295 }
296 })
297}
298
299// ====================================================================================
300// PERSISTENCE & CORRUPTION HANDLING
301// ====================================================================================
302
303func TestMempoolPersistence(t *testing.T) {
304 tmpDir := t.TempDir()
305 logger := &testLogger{t: t}
306 baseTime := time.Now().Add(-time.Hour)
307
308 t.Run("SaveAndLoad", func(t *testing.T) {
309 minTime := baseTime
310 m, err := mempool.NewMempool(tmpDir, 9, minTime, logger, false)
311 if err != nil {
312 t.Fatalf("NewMempool failed: %v", err)
313 }
314
315 ops := makeTestOperations(50)
316 m.Add(ops)
317
318 if err := m.Save(); err != nil {
319 t.Fatalf("Save failed: %v", err)
320 }
321
322 // Reload
323 m2, err := mempool.NewMempool(tmpDir, 9, minTime, logger, false)
324 if err != nil {
325 t.Fatalf("reload failed: %v", err)
326 }
327
328 if m2.Count() != 50 {
329 t.Errorf("after reload, expected 50 ops, got %d", m2.Count())
330 }
331
332 // Verify data integrity
333 loaded := m2.Peek(50)
334 for i := 0; i < 50; i++ {
335 if loaded[i].CID != ops[i].CID {
336 t.Errorf("op %d CID mismatch after reload", i)
337 }
338 }
339 })
340
341 // Fix the IncrementalSave test - line ~353
342 t.Run("IncrementalSave", func(t *testing.T) {
343 minTime := baseTime
344 m, err := mempool.NewMempool(tmpDir, 10, minTime, logger, false)
345 if err != nil {
346 t.Fatalf("NewMempool failed: %v", err)
347 }
348
349 // Add 10 ops and save
350 ops1 := makeTestOperations(10)
351 m.Add(ops1)
352 m.Save()
353
354 // Add 10 more and save
355 ops2 := makeTestOperationsFrom(10, 10)
356 m.Add(ops2)
357 m.Save()
358
359 // Reload - should have all 20
360 m2, err := mempool.NewMempool(tmpDir, 10, minTime, logger, false)
361 if err != nil {
362 t.Fatalf("reload failed: %v", err)
363 }
364
365 if m2.Count() != 20 {
366 t.Errorf("expected 20 ops after incremental saves, got %d", m2.Count())
367 }
368 })
369
370 t.Run("CorruptedMempoolFile", func(t *testing.T) {
371 minTime := baseTime
372 mempoolFile := filepath.Join(tmpDir, "plc_mempool_000011.jsonl")
373
374 // Write corrupted data
375 os.WriteFile(mempoolFile, []byte("{invalid json\n{also bad"), 0644)
376
377 // Should error on load
378 _, err := mempool.NewMempool(tmpDir, 11, minTime, logger, false)
379 if err == nil {
380 t.Error("expected error loading corrupted mempool")
381 }
382 })
383
384 t.Run("DeleteMempool", func(t *testing.T) {
385 minTime := baseTime
386 m, err := mempool.NewMempool(tmpDir, 12, minTime, logger, false)
387 if err != nil {
388 t.Fatalf("NewMempool failed: %v", err)
389 }
390
391 ops := makeTestOperations(10)
392 m.Add(ops)
393 m.Save()
394
395 // Verify file exists
396 mempoolFile := filepath.Join(tmpDir, "plc_mempool_000012.jsonl")
397 if _, err := os.Stat(mempoolFile); os.IsNotExist(err) {
398 t.Fatal("mempool file should exist after save")
399 }
400
401 // Delete
402 if err := m.Delete(); err != nil {
403 t.Fatalf("Delete failed: %v", err)
404 }
405
406 // Verify file gone
407 if _, err := os.Stat(mempoolFile); !os.IsNotExist(err) {
408 t.Error("mempool file should be deleted")
409 }
410 })
411}
412
413// ====================================================================================
414// TAKE OPERATIONS - CRITICAL FOR BUNDLING
415// ====================================================================================
416
417func TestMempoolTakeOperations(t *testing.T) {
418 tmpDir := t.TempDir()
419 logger := &testLogger{t: t}
420 baseTime := time.Now().Add(-time.Hour)
421
422 t.Run("TakeExact", func(t *testing.T) {
423 minTime := baseTime
424 m, err := mempool.NewMempool(tmpDir, 13, minTime, logger, false)
425 if err != nil {
426 t.Fatalf("NewMempool failed: %v", err)
427 }
428
429 m.Add(makeTestOperations(100))
430
431 taken, err := m.Take(50)
432 if err != nil {
433 t.Fatalf("Take failed: %v", err)
434 }
435
436 if len(taken) != 50 {
437 t.Errorf("expected 50 operations, got %d", len(taken))
438 }
439
440 if m.Count() != 50 {
441 t.Errorf("expected 50 remaining, got %d", m.Count())
442 }
443 })
444
445 t.Run("TakeMoreThanAvailable", func(t *testing.T) {
446 minTime := baseTime
447 m, err := mempool.NewMempool(tmpDir, 14, minTime, logger, false)
448 if err != nil {
449 t.Fatalf("NewMempool failed: %v", err)
450 }
451
452 m.Add(makeTestOperations(30))
453
454 // Try to take 100 (only 30 available)
455 taken, err := m.Take(100)
456 if err != nil {
457 t.Fatalf("Take failed: %v", err)
458 }
459
460 if len(taken) != 30 {
461 t.Errorf("expected 30 operations (all available), got %d", len(taken))
462 }
463
464 if m.Count() != 0 {
465 t.Errorf("mempool should be empty, got %d", m.Count())
466 }
467 })
468
469 t.Run("TakePreservesOrder", func(t *testing.T) {
470 minTime := baseTime
471 m, err := mempool.NewMempool(tmpDir, 15, minTime, logger, false)
472 if err != nil {
473 t.Fatalf("NewMempool failed: %v", err)
474 }
475
476 ops := makeTestOperations(100)
477 m.Add(ops)
478
479 taken, err := m.Take(50)
480 if err != nil {
481 t.Fatalf("Take failed: %v", err)
482 }
483
484 // Verify first 50 match
485 for i := 0; i < 50; i++ {
486 if taken[i].CID != ops[i].CID {
487 t.Errorf("operation %d mismatch: got %s, want %s", i, taken[i].CID, ops[i].CID)
488 }
489 }
490
491 // Remaining should be ops[50:100]
492 remaining := m.Peek(50)
493 for i := 0; i < 50; i++ {
494 if remaining[i].CID != ops[50+i].CID {
495 t.Errorf("remaining op %d mismatch", i)
496 }
497 }
498 })
499
500 t.Run("TakeFromEmpty", func(t *testing.T) {
501 minTime := baseTime
502 m, err := mempool.NewMempool(tmpDir, 16, minTime, logger, false)
503 if err != nil {
504 t.Fatalf("NewMempool failed: %v", err)
505 }
506
507 taken, err := m.Take(10)
508 if err != nil {
509 t.Fatalf("Take from empty failed: %v", err)
510 }
511
512 if len(taken) != 0 {
513 t.Errorf("expected 0 operations from empty mempool, got %d", len(taken))
514 }
515 })
516}
517
518// ====================================================================================
519// VALIDATION TESTS
520// ====================================================================================
521
522func TestMempoolValidation(t *testing.T) {
523 tmpDir := t.TempDir()
524 logger := &testLogger{t: t}
525 baseTime := time.Now().Add(-time.Hour)
526
527 t.Run("ValidateChronological", func(t *testing.T) {
528 minTime := baseTime
529 m, err := mempool.NewMempool(tmpDir, 17, minTime, logger, false)
530 if err != nil {
531 t.Fatalf("NewMempool failed: %v", err)
532 }
533
534 ops := makeTestOperations(100)
535 m.Add(ops)
536
537 if err := m.Validate(); err != nil {
538 t.Errorf("Validate failed on valid mempool: %v", err)
539 }
540 })
541
542 t.Run("ValidateDetectsMinTimestampViolation", func(t *testing.T) {
543 minTime := baseTime.Add(10 * time.Second)
544 _, err := mempool.NewMempool(tmpDir, 18, minTime, logger, false)
545 if err != nil {
546 t.Fatalf("NewMempool failed: %v", err)
547 }
548
549 // Manually add operation before min (bypassing Add validation)
550 // This simulates corrupted state
551 ops := makeTestOperations(10)
552 ops[0].CreatedAt = baseTime // Before minTime
553
554 // Note: This is hard to test since Add enforces validation
555 // Better to test through file corruption
556 })
557
558 t.Run("ValidateDetectsDuplicateCIDs", func(t *testing.T) {
559 // Test for duplicate CID detection
560 // Similar challenge - Add prevents duplicates
561 // Would need to manually construct corrupted state
562 })
563}
564
565// ====================================================================================
566// CONCURRENCY TESTS
567// ====================================================================================
568
569func TestMempoolConcurrency(t *testing.T) {
570 tmpDir := t.TempDir()
571 logger := &testLogger{t: t}
572 baseTime := time.Now().Add(-time.Hour)
573
574 t.Run("ConcurrentReads", func(t *testing.T) {
575 minTime := baseTime
576 m, err := mempool.NewMempool(tmpDir, 19, minTime, logger, false)
577 if err != nil {
578 t.Fatalf("NewMempool failed: %v", err)
579 }
580
581 m.Add(makeTestOperations(1000))
582
583 // 100 concurrent readers
584 var wg sync.WaitGroup
585 for i := 0; i < 100; i++ {
586 wg.Add(1)
587 go func() {
588 defer wg.Done()
589 count := m.Count()
590 if count != 1000 {
591 t.Errorf("count mismatch: got %d", count)
592 }
593
594 peek := m.Peek(10)
595 if len(peek) != 10 {
596 t.Errorf("peek mismatch: got %d", len(peek))
597 }
598 }()
599 }
600 wg.Wait()
601 })
602
603 t.Run("ConcurrentAddAndRead", func(t *testing.T) {
604 minTime := baseTime
605 m, err := mempool.NewMempool(tmpDir, 20, minTime, logger, false)
606 if err != nil {
607 t.Fatalf("NewMempool failed: %v", err)
608 }
609
610 var wg sync.WaitGroup
611 errors := make(chan error, 100)
612
613 // Writer goroutine
614 wg.Add(1)
615 go func() {
616 defer wg.Done()
617 for i := 0; i < 10; i++ {
618 ops := []plcclient.PLCOperation{
619 {CID: fmt.Sprintf("cid%d", i*100), CreatedAt: baseTime.Add(time.Duration(i*100) * time.Second)},
620 }
621 if _, err := m.Add(ops); err != nil {
622 errors <- err
623 }
624 time.Sleep(10 * time.Millisecond)
625 }
626 }()
627
628 // Reader goroutines
629 for i := 0; i < 10; i++ {
630 wg.Add(1)
631 go func() {
632 defer wg.Done()
633 for j := 0; j < 20; j++ {
634 m.Count()
635 m.Peek(5)
636 time.Sleep(5 * time.Millisecond)
637 }
638 }()
639 }
640
641 wg.Wait()
642 close(errors)
643
644 for err := range errors {
645 t.Errorf("concurrent operation error: %v", err)
646 }
647 })
648}
649
650// ====================================================================================
651// STATS & METADATA TESTS
652// ====================================================================================
653
654func TestMempoolStats(t *testing.T) {
655 tmpDir := t.TempDir()
656 logger := &testLogger{t: t}
657 baseTime := time.Now().Add(-time.Hour)
658
659 t.Run("StatsEmpty", func(t *testing.T) {
660 minTime := baseTime
661 m, err := mempool.NewMempool(tmpDir, 21, minTime, logger, false)
662 if err != nil {
663 t.Fatalf("NewMempool failed: %v", err)
664 }
665
666 stats := m.Stats()
667
668 if stats["count"].(int) != 0 {
669 t.Error("empty mempool should have count 0")
670 }
671
672 if stats["can_create_bundle"].(bool) {
673 t.Error("empty mempool cannot create bundle")
674 }
675
676 if stats["target_bundle"].(int) != 21 {
677 t.Error("target bundle mismatch")
678 }
679 })
680
681 t.Run("StatsPopulated", func(t *testing.T) {
682 minTime := baseTime
683 m, err := mempool.NewMempool(tmpDir, 22, minTime, logger, false)
684 if err != nil {
685 t.Fatalf("NewMempool failed: %v", err)
686 }
687
688 ops := makeTestOperations(100)
689 m.Add(ops)
690
691 stats := m.Stats()
692
693 if stats["count"].(int) != 100 {
694 t.Error("count mismatch in stats")
695 }
696
697 if _, ok := stats["first_time"]; !ok {
698 t.Error("stats missing first_time")
699 }
700
701 if _, ok := stats["last_time"]; !ok {
702 t.Error("stats missing last_time")
703 }
704
705 if _, ok := stats["size_bytes"]; !ok {
706 t.Error("stats missing size_bytes")
707 }
708
709 if stats["did_count"].(int) != 100 {
710 t.Error("did_count should match operation count for unique DIDs")
711 }
712 })
713
714 t.Run("StatsCanCreateBundle", func(t *testing.T) {
715 minTime := baseTime
716 m, err := mempool.NewMempool(tmpDir, 23, minTime, logger, false)
717 if err != nil {
718 t.Fatalf("NewMempool failed: %v", err)
719 }
720
721 // Add exactly BUNDLE_SIZE operations
722 m.Add(makeTestOperations(types.BUNDLE_SIZE))
723
724 stats := m.Stats()
725
726 if !stats["can_create_bundle"].(bool) {
727 t.Error("should be able to create bundle with BUNDLE_SIZE operations")
728 }
729 })
730}
731
732// ====================================================================================
733// DID SEARCH TESTS
734// ====================================================================================
735
736func TestMempoolDIDSearch(t *testing.T) {
737 tmpDir := t.TempDir()
738 logger := &testLogger{t: t}
739 baseTime := time.Now().Add(-time.Hour)
740
741 t.Run("FindDIDOperations", func(t *testing.T) {
742 minTime := baseTime
743 m, err := mempool.NewMempool(tmpDir, 24, minTime, logger, false)
744 if err != nil {
745 t.Fatalf("NewMempool failed: %v", err)
746 }
747
748 targetDID := "did:plc:target"
749
750 ops := []plcclient.PLCOperation{
751 {DID: "did:plc:other1", CID: "cid1", CreatedAt: baseTime.Add(1 * time.Second)},
752 {DID: targetDID, CID: "cid2", CreatedAt: baseTime.Add(2 * time.Second)},
753 {DID: "did:plc:other2", CID: "cid3", CreatedAt: baseTime.Add(3 * time.Second)},
754 {DID: targetDID, CID: "cid4", CreatedAt: baseTime.Add(4 * time.Second)},
755 {DID: "did:plc:other3", CID: "cid5", CreatedAt: baseTime.Add(5 * time.Second)},
756 }
757
758 m.Add(ops)
759
760 // Search
761 found := m.FindDIDOperations(targetDID)
762
763 if len(found) != 2 {
764 t.Errorf("expected 2 operations for %s, got %d", targetDID, len(found))
765 }
766
767 if found[0].CID != "cid2" || found[1].CID != "cid4" {
768 t.Error("wrong operations returned")
769 }
770 })
771
772 t.Run("FindLatestDIDOperation", func(t *testing.T) {
773 minTime := baseTime
774 m, err := mempool.NewMempool(tmpDir, 25, minTime, logger, false)
775 if err != nil {
776 t.Fatalf("NewMempool failed: %v", err)
777 }
778
779 targetDID := "did:plc:target"
780
781 ops := []plcclient.PLCOperation{
782 {DID: targetDID, CID: "cid1", CreatedAt: baseTime.Add(1 * time.Second), Nullified: false},
783 {DID: targetDID, CID: "cid2", CreatedAt: baseTime.Add(2 * time.Second), Nullified: false},
784 {DID: targetDID, CID: "cid3", CreatedAt: baseTime.Add(3 * time.Second), Nullified: true}, // Nullified
785 }
786
787 m.Add(ops)
788
789 // Should return cid2 (latest non-nullified)
790 latest := m.FindLatestDIDOperation(targetDID)
791
792 if latest == nil {
793 t.Fatal("expected to find operation, got nil")
794 }
795
796 if latest.CID != "cid2" {
797 t.Errorf("expected cid2 (latest non-nullified), got %s", latest.CID)
798 }
799 })
800
801 t.Run("FindLatestDIDOperation_AllNullified", func(t *testing.T) {
802 minTime := baseTime
803 m, err := mempool.NewMempool(tmpDir, 26, minTime, logger, false)
804 if err != nil {
805 t.Fatalf("NewMempool failed: %v", err)
806 }
807
808 targetDID := "did:plc:target"
809
810 ops := []plcclient.PLCOperation{
811 {DID: targetDID, CID: "cid1", CreatedAt: baseTime.Add(1 * time.Second), Nullified: true},
812 {DID: targetDID, CID: "cid2", CreatedAt: baseTime.Add(2 * time.Second), Nullified: true},
813 }
814
815 m.Add(ops)
816
817 latest := m.FindLatestDIDOperation(targetDID)
818
819 if latest != nil {
820 t.Error("should return nil when all operations are nullified")
821 }
822 })
823
824 t.Run("FindDIDOperations_NotFound", func(t *testing.T) {
825 minTime := baseTime
826 m, err := mempool.NewMempool(tmpDir, 27, minTime, logger, false)
827 if err != nil {
828 t.Fatalf("NewMempool failed: %v", err)
829 }
830
831 m.Add(makeTestOperations(100))
832
833 found := m.FindDIDOperations("did:plc:nonexistent")
834
835 if len(found) != 0 {
836 t.Errorf("expected empty result, got %d operations", len(found))
837 }
838 })
839}
840
841// ====================================================================================
842// CLEAR OPERATION TESTS
843// ====================================================================================
844
845func TestMempoolClear(t *testing.T) {
846 tmpDir := t.TempDir()
847 logger := &testLogger{t: t}
848 baseTime := time.Now().Add(-time.Hour)
849
850 t.Run("ClearPopulated", func(t *testing.T) {
851 minTime := baseTime
852 m, err := mempool.NewMempool(tmpDir, 28, minTime, logger, false)
853 if err != nil {
854 t.Fatalf("NewMempool failed: %v", err)
855 }
856
857 m.Add(makeTestOperations(100))
858
859 if m.Count() != 100 {
860 t.Fatal("setup failed")
861 }
862
863 m.Clear()
864
865 if m.Count() != 0 {
866 t.Errorf("after clear, count should be 0, got %d", m.Count())
867 }
868
869 // Should be able to add new operations
870 newOps := []plcclient.PLCOperation{
871 {CID: "new1", CreatedAt: baseTime.Add(200 * time.Second)},
872 }
873
874 added, err := m.Add(newOps)
875 if err != nil {
876 t.Fatalf("Add after clear failed: %v", err)
877 }
878
879 if added != 1 {
880 t.Error("should be able to add after clear")
881 }
882 })
883}
884
885// ====================================================================================
886// HELPER FUNCTIONS
887// ====================================================================================
888
889func makeTestOperations(count int) []plcclient.PLCOperation {
890 return makeTestOperationsFrom(0, count)
891}
892
893func makeTestOperationsFrom(start, count int) []plcclient.PLCOperation {
894 ops := make([]plcclient.PLCOperation, count)
895 baseTime := time.Now().Add(-time.Hour)
896
897 for i := 0; i < count; i++ {
898 idx := start + i
899 ops[i] = plcclient.PLCOperation{
900 DID: fmt.Sprintf("did:plc:test%06d", idx),
901 CID: fmt.Sprintf("bafy%06d", idx),
902 CreatedAt: baseTime.Add(time.Duration(idx) * time.Second),
903 }
904 }
905
906 return ops
907}