[DEPRECATED] Go implementation of plcbundle
at main 907 lines 23 kB view raw
1package mempool_test 2 3import ( 4 "fmt" 5 "os" 6 "path/filepath" 7 "sync" 8 "testing" 9 "time" 10 11 "tangled.org/atscan.net/plcbundle-go/internal/mempool" 12 "tangled.org/atscan.net/plcbundle-go/internal/plcclient" 13 "tangled.org/atscan.net/plcbundle-go/internal/types" 14) 15 16type testLogger struct { 17 t *testing.T 18} 19 20func (l *testLogger) Printf(format string, v ...interface{}) { 21 l.t.Logf(format, v...) 22} 23 24func (l *testLogger) Println(v ...interface{}) { 25 l.t.Log(v...) 26} 27 28// ==================================================================================== 29// CHRONOLOGICAL VALIDATION - MOST CRITICAL 30// ==================================================================================== 31 32func TestMempoolChronologicalStrict(t *testing.T) { 33 tmpDir := t.TempDir() 34 logger := &testLogger{t: t} 35 baseTime := time.Now().Add(-time.Hour) 36 37 t.Run("RejectOutOfOrder", func(t *testing.T) { 38 minTime := baseTime 39 m, err := mempool.NewMempool(tmpDir, 1, minTime, logger, false) 40 if err != nil { 41 t.Fatalf("NewMempool failed: %v", err) 42 } 43 44 // Add operations in order: 1, 2, 4 45 ops := []plcclient.PLCOperation{ 46 {CID: "cid1", CreatedAt: baseTime.Add(1 * time.Second)}, 47 {CID: "cid2", CreatedAt: baseTime.Add(2 * time.Second)}, 48 {CID: "cid4", CreatedAt: baseTime.Add(4 * time.Second)}, 49 } 50 51 _, err = m.Add(ops) 52 if err != nil { 53 t.Fatalf("Add failed: %v", err) 54 } 55 56 // Now try to add operation 3 (out of order) 57 outOfOrder := []plcclient.PLCOperation{ 58 {CID: "cid3", CreatedAt: baseTime.Add(3 * time.Second)}, 59 } 60 61 _, err = m.Add(outOfOrder) 62 if err == nil { 63 t.Error("expected chronological validation error, got nil") 64 } 65 66 if m.Count() != 3 { 67 t.Errorf("count should still be 3, got %d", m.Count()) 68 } 69 }) 70 71 t.Run("RejectBeforeMinTimestamp", func(t *testing.T) { 72 minTime := baseTime.Add(10 * time.Second) 73 m, err := mempool.NewMempool(tmpDir, 2, minTime, logger, false) 74 if err != nil { 75 t.Fatalf("NewMempool failed: %v", err) 76 } 77 78 // Try to add operation before min timestamp 79 tooEarly := []plcclient.PLCOperation{ 80 {CID: "cid1", CreatedAt: baseTime}, // Before minTime 81 } 82 83 _, err = m.Add(tooEarly) 84 if err == nil { 85 t.Error("expected error for operation before min timestamp") 86 } 87 }) 88 89 t.Run("AllowEqualTimestamps", func(t *testing.T) { 90 minTime := baseTime 91 m, err := mempool.NewMempool(tmpDir, 3, minTime, logger, false) 92 if err != nil { 93 t.Fatalf("NewMempool failed: %v", err) 94 } 95 96 // Multiple operations with same timestamp (happens in real PLC data) 97 sameTime := baseTime.Add(5 * time.Second) 98 ops := []plcclient.PLCOperation{ 99 {CID: "cid1", CreatedAt: sameTime}, 100 {CID: "cid2", CreatedAt: sameTime}, 101 {CID: "cid3", CreatedAt: sameTime}, 102 } 103 104 added, err := m.Add(ops) 105 if err != nil { 106 t.Fatalf("should allow equal timestamps: %v", err) 107 } 108 109 if added != 3 { 110 t.Errorf("expected 3 added, got %d", added) 111 } 112 }) 113 114 t.Run("ChronologicalAfterReload", func(t *testing.T) { 115 minTime := baseTime 116 m, err := mempool.NewMempool(tmpDir, 4, minTime, logger, false) 117 if err != nil { 118 t.Fatalf("NewMempool failed: %v", err) 119 } 120 121 // Add some operations 122 ops1 := []plcclient.PLCOperation{ 123 {CID: "cid1", CreatedAt: baseTime.Add(1 * time.Second)}, 124 {CID: "cid2", CreatedAt: baseTime.Add(2 * time.Second)}, 125 } 126 m.Add(ops1) 127 m.Save() 128 129 // Reload mempool 130 m2, err := mempool.NewMempool(tmpDir, 4, minTime, logger, false) 131 if err != nil { 132 t.Fatalf("NewMempool reload failed: %v", err) 133 } 134 135 // Try to add out-of-order operation 136 outOfOrder := []plcclient.PLCOperation{ 137 {CID: "cid0", CreatedAt: baseTime}, // Before loaded ops 138 } 139 140 _, err = m2.Add(outOfOrder) 141 if err == nil { 142 t.Error("should reject out-of-order after reload") 143 } 144 145 // Add valid operation after loaded ones 146 validOps := []plcclient.PLCOperation{ 147 {CID: "cid3", CreatedAt: baseTime.Add(3 * time.Second)}, 148 } 149 150 added, err := m2.Add(validOps) 151 if err != nil { 152 t.Fatalf("should accept in-order operation: %v", err) 153 } 154 155 if added != 1 { 156 t.Error("should have added 1 operation") 157 } 158 }) 159 160 t.Run("StrictIncreasingOrder", func(t *testing.T) { 161 minTime := baseTime 162 m, err := mempool.NewMempool(tmpDir, 5, minTime, logger, false) 163 if err != nil { 164 t.Fatalf("NewMempool failed: %v", err) 165 } 166 167 // Each operation must be >= previous timestamp 168 ops := []plcclient.PLCOperation{ 169 {CID: "cid1", CreatedAt: baseTime.Add(1 * time.Second)}, 170 {CID: "cid2", CreatedAt: baseTime.Add(2 * time.Second)}, 171 {CID: "cid3", CreatedAt: baseTime.Add(2 * time.Second)}, // Equal - OK 172 {CID: "cid4", CreatedAt: baseTime.Add(3 * time.Second)}, 173 } 174 175 added, err := m.Add(ops) 176 if err != nil { 177 t.Fatalf("should allow non-decreasing timestamps: %v", err) 178 } 179 180 if added != 4 { 181 t.Errorf("expected 4 added, got %d", added) 182 } 183 }) 184} 185 186// ==================================================================================== 187// DUPLICATE PREVENTION 188// ==================================================================================== 189 190func TestMempoolDuplicatePrevention(t *testing.T) { 191 tmpDir := t.TempDir() 192 logger := &testLogger{t: t} 193 baseTime := time.Now().Add(-time.Hour) 194 195 t.Run("SameCIDTwice", func(t *testing.T) { 196 minTime := baseTime 197 m, err := mempool.NewMempool(tmpDir, 6, minTime, logger, false) 198 if err != nil { 199 t.Fatalf("NewMempool failed: %v", err) 200 } 201 202 op := plcclient.PLCOperation{ 203 CID: "duplicate_cid", 204 DID: "did:plc:test", 205 CreatedAt: baseTime.Add(1 * time.Second), 206 } 207 208 // Add first time 209 added, err := m.Add([]plcclient.PLCOperation{op}) 210 if err != nil { 211 t.Fatalf("first add failed: %v", err) 212 } 213 if added != 1 { 214 t.Error("first add should succeed") 215 } 216 217 // Add same CID again (should be silently skipped) 218 added, err = m.Add([]plcclient.PLCOperation{op}) 219 if err != nil { 220 t.Fatalf("duplicate add should not error: %v", err) 221 } 222 if added != 0 { 223 t.Errorf("duplicate should be skipped, but added=%d", added) 224 } 225 226 if m.Count() != 1 { 227 t.Errorf("count should be 1, got %d", m.Count()) 228 } 229 }) 230 231 t.Run("DuplicateAcrossSaveLoad", func(t *testing.T) { 232 minTime := baseTime 233 m, err := mempool.NewMempool(tmpDir, 7, minTime, logger, false) 234 if err != nil { 235 t.Fatalf("NewMempool failed: %v", err) 236 } 237 238 op := plcclient.PLCOperation{ 239 CID: "persistent_cid", 240 DID: "did:plc:test", 241 CreatedAt: baseTime.Add(1 * time.Second), 242 } 243 244 // Add and save 245 m.Add([]plcclient.PLCOperation{op}) 246 m.Save() 247 248 // Reload 249 m2, err := mempool.NewMempool(tmpDir, 7, minTime, logger, false) 250 if err != nil { 251 t.Fatalf("reload failed: %v", err) 252 } 253 254 // Try to add same operation 255 added, err := m2.Add([]plcclient.PLCOperation{op}) 256 if err != nil { 257 t.Fatalf("add after reload failed: %v", err) 258 } 259 260 if added != 0 { 261 t.Errorf("duplicate should be skipped after reload, added=%d", added) 262 } 263 264 if m2.Count() != 1 { 265 t.Errorf("count should be 1, got %d", m2.Count()) 266 } 267 }) 268 269 t.Run("DuplicatesInBatch", func(t *testing.T) { 270 minTime := baseTime 271 m, err := mempool.NewMempool(tmpDir, 8, minTime, logger, false) 272 if err != nil { 273 t.Fatalf("NewMempool failed: %v", err) 274 } 275 276 // Batch contains duplicates 277 ops := []plcclient.PLCOperation{ 278 {CID: "cid1", DID: "did:plc:001", CreatedAt: baseTime.Add(1 * time.Second)}, 279 {CID: "cid2", DID: "did:plc:002", CreatedAt: baseTime.Add(2 * time.Second)}, 280 {CID: "cid1", DID: "did:plc:001", CreatedAt: baseTime.Add(3 * time.Second)}, // Duplicate CID 281 } 282 283 added, err := m.Add(ops) 284 if err != nil { 285 t.Fatalf("Add failed: %v", err) 286 } 287 288 // Should only add 2 (skip duplicate) 289 if added != 2 { 290 t.Errorf("expected 2 unique operations, added %d", added) 291 } 292 293 if m.Count() != 2 { 294 t.Errorf("count should be 2, got %d", m.Count()) 295 } 296 }) 297} 298 299// ==================================================================================== 300// PERSISTENCE & CORRUPTION HANDLING 301// ==================================================================================== 302 303func TestMempoolPersistence(t *testing.T) { 304 tmpDir := t.TempDir() 305 logger := &testLogger{t: t} 306 baseTime := time.Now().Add(-time.Hour) 307 308 t.Run("SaveAndLoad", func(t *testing.T) { 309 minTime := baseTime 310 m, err := mempool.NewMempool(tmpDir, 9, minTime, logger, false) 311 if err != nil { 312 t.Fatalf("NewMempool failed: %v", err) 313 } 314 315 ops := makeTestOperations(50) 316 m.Add(ops) 317 318 if err := m.Save(); err != nil { 319 t.Fatalf("Save failed: %v", err) 320 } 321 322 // Reload 323 m2, err := mempool.NewMempool(tmpDir, 9, minTime, logger, false) 324 if err != nil { 325 t.Fatalf("reload failed: %v", err) 326 } 327 328 if m2.Count() != 50 { 329 t.Errorf("after reload, expected 50 ops, got %d", m2.Count()) 330 } 331 332 // Verify data integrity 333 loaded := m2.Peek(50) 334 for i := 0; i < 50; i++ { 335 if loaded[i].CID != ops[i].CID { 336 t.Errorf("op %d CID mismatch after reload", i) 337 } 338 } 339 }) 340 341 // Fix the IncrementalSave test - line ~353 342 t.Run("IncrementalSave", func(t *testing.T) { 343 minTime := baseTime 344 m, err := mempool.NewMempool(tmpDir, 10, minTime, logger, false) 345 if err != nil { 346 t.Fatalf("NewMempool failed: %v", err) 347 } 348 349 // Add 10 ops and save 350 ops1 := makeTestOperations(10) 351 m.Add(ops1) 352 m.Save() 353 354 // Add 10 more and save 355 ops2 := makeTestOperationsFrom(10, 10) 356 m.Add(ops2) 357 m.Save() 358 359 // Reload - should have all 20 360 m2, err := mempool.NewMempool(tmpDir, 10, minTime, logger, false) 361 if err != nil { 362 t.Fatalf("reload failed: %v", err) 363 } 364 365 if m2.Count() != 20 { 366 t.Errorf("expected 20 ops after incremental saves, got %d", m2.Count()) 367 } 368 }) 369 370 t.Run("CorruptedMempoolFile", func(t *testing.T) { 371 minTime := baseTime 372 mempoolFile := filepath.Join(tmpDir, "plc_mempool_000011.jsonl") 373 374 // Write corrupted data 375 os.WriteFile(mempoolFile, []byte("{invalid json\n{also bad"), 0644) 376 377 // Should error on load 378 _, err := mempool.NewMempool(tmpDir, 11, minTime, logger, false) 379 if err == nil { 380 t.Error("expected error loading corrupted mempool") 381 } 382 }) 383 384 t.Run("DeleteMempool", func(t *testing.T) { 385 minTime := baseTime 386 m, err := mempool.NewMempool(tmpDir, 12, minTime, logger, false) 387 if err != nil { 388 t.Fatalf("NewMempool failed: %v", err) 389 } 390 391 ops := makeTestOperations(10) 392 m.Add(ops) 393 m.Save() 394 395 // Verify file exists 396 mempoolFile := filepath.Join(tmpDir, "plc_mempool_000012.jsonl") 397 if _, err := os.Stat(mempoolFile); os.IsNotExist(err) { 398 t.Fatal("mempool file should exist after save") 399 } 400 401 // Delete 402 if err := m.Delete(); err != nil { 403 t.Fatalf("Delete failed: %v", err) 404 } 405 406 // Verify file gone 407 if _, err := os.Stat(mempoolFile); !os.IsNotExist(err) { 408 t.Error("mempool file should be deleted") 409 } 410 }) 411} 412 413// ==================================================================================== 414// TAKE OPERATIONS - CRITICAL FOR BUNDLING 415// ==================================================================================== 416 417func TestMempoolTakeOperations(t *testing.T) { 418 tmpDir := t.TempDir() 419 logger := &testLogger{t: t} 420 baseTime := time.Now().Add(-time.Hour) 421 422 t.Run("TakeExact", func(t *testing.T) { 423 minTime := baseTime 424 m, err := mempool.NewMempool(tmpDir, 13, minTime, logger, false) 425 if err != nil { 426 t.Fatalf("NewMempool failed: %v", err) 427 } 428 429 m.Add(makeTestOperations(100)) 430 431 taken, err := m.Take(50) 432 if err != nil { 433 t.Fatalf("Take failed: %v", err) 434 } 435 436 if len(taken) != 50 { 437 t.Errorf("expected 50 operations, got %d", len(taken)) 438 } 439 440 if m.Count() != 50 { 441 t.Errorf("expected 50 remaining, got %d", m.Count()) 442 } 443 }) 444 445 t.Run("TakeMoreThanAvailable", func(t *testing.T) { 446 minTime := baseTime 447 m, err := mempool.NewMempool(tmpDir, 14, minTime, logger, false) 448 if err != nil { 449 t.Fatalf("NewMempool failed: %v", err) 450 } 451 452 m.Add(makeTestOperations(30)) 453 454 // Try to take 100 (only 30 available) 455 taken, err := m.Take(100) 456 if err != nil { 457 t.Fatalf("Take failed: %v", err) 458 } 459 460 if len(taken) != 30 { 461 t.Errorf("expected 30 operations (all available), got %d", len(taken)) 462 } 463 464 if m.Count() != 0 { 465 t.Errorf("mempool should be empty, got %d", m.Count()) 466 } 467 }) 468 469 t.Run("TakePreservesOrder", func(t *testing.T) { 470 minTime := baseTime 471 m, err := mempool.NewMempool(tmpDir, 15, minTime, logger, false) 472 if err != nil { 473 t.Fatalf("NewMempool failed: %v", err) 474 } 475 476 ops := makeTestOperations(100) 477 m.Add(ops) 478 479 taken, err := m.Take(50) 480 if err != nil { 481 t.Fatalf("Take failed: %v", err) 482 } 483 484 // Verify first 50 match 485 for i := 0; i < 50; i++ { 486 if taken[i].CID != ops[i].CID { 487 t.Errorf("operation %d mismatch: got %s, want %s", i, taken[i].CID, ops[i].CID) 488 } 489 } 490 491 // Remaining should be ops[50:100] 492 remaining := m.Peek(50) 493 for i := 0; i < 50; i++ { 494 if remaining[i].CID != ops[50+i].CID { 495 t.Errorf("remaining op %d mismatch", i) 496 } 497 } 498 }) 499 500 t.Run("TakeFromEmpty", func(t *testing.T) { 501 minTime := baseTime 502 m, err := mempool.NewMempool(tmpDir, 16, minTime, logger, false) 503 if err != nil { 504 t.Fatalf("NewMempool failed: %v", err) 505 } 506 507 taken, err := m.Take(10) 508 if err != nil { 509 t.Fatalf("Take from empty failed: %v", err) 510 } 511 512 if len(taken) != 0 { 513 t.Errorf("expected 0 operations from empty mempool, got %d", len(taken)) 514 } 515 }) 516} 517 518// ==================================================================================== 519// VALIDATION TESTS 520// ==================================================================================== 521 522func TestMempoolValidation(t *testing.T) { 523 tmpDir := t.TempDir() 524 logger := &testLogger{t: t} 525 baseTime := time.Now().Add(-time.Hour) 526 527 t.Run("ValidateChronological", func(t *testing.T) { 528 minTime := baseTime 529 m, err := mempool.NewMempool(tmpDir, 17, minTime, logger, false) 530 if err != nil { 531 t.Fatalf("NewMempool failed: %v", err) 532 } 533 534 ops := makeTestOperations(100) 535 m.Add(ops) 536 537 if err := m.Validate(); err != nil { 538 t.Errorf("Validate failed on valid mempool: %v", err) 539 } 540 }) 541 542 t.Run("ValidateDetectsMinTimestampViolation", func(t *testing.T) { 543 minTime := baseTime.Add(10 * time.Second) 544 _, err := mempool.NewMempool(tmpDir, 18, minTime, logger, false) 545 if err != nil { 546 t.Fatalf("NewMempool failed: %v", err) 547 } 548 549 // Manually add operation before min (bypassing Add validation) 550 // This simulates corrupted state 551 ops := makeTestOperations(10) 552 ops[0].CreatedAt = baseTime // Before minTime 553 554 // Note: This is hard to test since Add enforces validation 555 // Better to test through file corruption 556 }) 557 558 t.Run("ValidateDetectsDuplicateCIDs", func(t *testing.T) { 559 // Test for duplicate CID detection 560 // Similar challenge - Add prevents duplicates 561 // Would need to manually construct corrupted state 562 }) 563} 564 565// ==================================================================================== 566// CONCURRENCY TESTS 567// ==================================================================================== 568 569func TestMempoolConcurrency(t *testing.T) { 570 tmpDir := t.TempDir() 571 logger := &testLogger{t: t} 572 baseTime := time.Now().Add(-time.Hour) 573 574 t.Run("ConcurrentReads", func(t *testing.T) { 575 minTime := baseTime 576 m, err := mempool.NewMempool(tmpDir, 19, minTime, logger, false) 577 if err != nil { 578 t.Fatalf("NewMempool failed: %v", err) 579 } 580 581 m.Add(makeTestOperations(1000)) 582 583 // 100 concurrent readers 584 var wg sync.WaitGroup 585 for i := 0; i < 100; i++ { 586 wg.Add(1) 587 go func() { 588 defer wg.Done() 589 count := m.Count() 590 if count != 1000 { 591 t.Errorf("count mismatch: got %d", count) 592 } 593 594 peek := m.Peek(10) 595 if len(peek) != 10 { 596 t.Errorf("peek mismatch: got %d", len(peek)) 597 } 598 }() 599 } 600 wg.Wait() 601 }) 602 603 t.Run("ConcurrentAddAndRead", func(t *testing.T) { 604 minTime := baseTime 605 m, err := mempool.NewMempool(tmpDir, 20, minTime, logger, false) 606 if err != nil { 607 t.Fatalf("NewMempool failed: %v", err) 608 } 609 610 var wg sync.WaitGroup 611 errors := make(chan error, 100) 612 613 // Writer goroutine 614 wg.Add(1) 615 go func() { 616 defer wg.Done() 617 for i := 0; i < 10; i++ { 618 ops := []plcclient.PLCOperation{ 619 {CID: fmt.Sprintf("cid%d", i*100), CreatedAt: baseTime.Add(time.Duration(i*100) * time.Second)}, 620 } 621 if _, err := m.Add(ops); err != nil { 622 errors <- err 623 } 624 time.Sleep(10 * time.Millisecond) 625 } 626 }() 627 628 // Reader goroutines 629 for i := 0; i < 10; i++ { 630 wg.Add(1) 631 go func() { 632 defer wg.Done() 633 for j := 0; j < 20; j++ { 634 m.Count() 635 m.Peek(5) 636 time.Sleep(5 * time.Millisecond) 637 } 638 }() 639 } 640 641 wg.Wait() 642 close(errors) 643 644 for err := range errors { 645 t.Errorf("concurrent operation error: %v", err) 646 } 647 }) 648} 649 650// ==================================================================================== 651// STATS & METADATA TESTS 652// ==================================================================================== 653 654func TestMempoolStats(t *testing.T) { 655 tmpDir := t.TempDir() 656 logger := &testLogger{t: t} 657 baseTime := time.Now().Add(-time.Hour) 658 659 t.Run("StatsEmpty", func(t *testing.T) { 660 minTime := baseTime 661 m, err := mempool.NewMempool(tmpDir, 21, minTime, logger, false) 662 if err != nil { 663 t.Fatalf("NewMempool failed: %v", err) 664 } 665 666 stats := m.Stats() 667 668 if stats["count"].(int) != 0 { 669 t.Error("empty mempool should have count 0") 670 } 671 672 if stats["can_create_bundle"].(bool) { 673 t.Error("empty mempool cannot create bundle") 674 } 675 676 if stats["target_bundle"].(int) != 21 { 677 t.Error("target bundle mismatch") 678 } 679 }) 680 681 t.Run("StatsPopulated", func(t *testing.T) { 682 minTime := baseTime 683 m, err := mempool.NewMempool(tmpDir, 22, minTime, logger, false) 684 if err != nil { 685 t.Fatalf("NewMempool failed: %v", err) 686 } 687 688 ops := makeTestOperations(100) 689 m.Add(ops) 690 691 stats := m.Stats() 692 693 if stats["count"].(int) != 100 { 694 t.Error("count mismatch in stats") 695 } 696 697 if _, ok := stats["first_time"]; !ok { 698 t.Error("stats missing first_time") 699 } 700 701 if _, ok := stats["last_time"]; !ok { 702 t.Error("stats missing last_time") 703 } 704 705 if _, ok := stats["size_bytes"]; !ok { 706 t.Error("stats missing size_bytes") 707 } 708 709 if stats["did_count"].(int) != 100 { 710 t.Error("did_count should match operation count for unique DIDs") 711 } 712 }) 713 714 t.Run("StatsCanCreateBundle", func(t *testing.T) { 715 minTime := baseTime 716 m, err := mempool.NewMempool(tmpDir, 23, minTime, logger, false) 717 if err != nil { 718 t.Fatalf("NewMempool failed: %v", err) 719 } 720 721 // Add exactly BUNDLE_SIZE operations 722 m.Add(makeTestOperations(types.BUNDLE_SIZE)) 723 724 stats := m.Stats() 725 726 if !stats["can_create_bundle"].(bool) { 727 t.Error("should be able to create bundle with BUNDLE_SIZE operations") 728 } 729 }) 730} 731 732// ==================================================================================== 733// DID SEARCH TESTS 734// ==================================================================================== 735 736func TestMempoolDIDSearch(t *testing.T) { 737 tmpDir := t.TempDir() 738 logger := &testLogger{t: t} 739 baseTime := time.Now().Add(-time.Hour) 740 741 t.Run("FindDIDOperations", func(t *testing.T) { 742 minTime := baseTime 743 m, err := mempool.NewMempool(tmpDir, 24, minTime, logger, false) 744 if err != nil { 745 t.Fatalf("NewMempool failed: %v", err) 746 } 747 748 targetDID := "did:plc:target" 749 750 ops := []plcclient.PLCOperation{ 751 {DID: "did:plc:other1", CID: "cid1", CreatedAt: baseTime.Add(1 * time.Second)}, 752 {DID: targetDID, CID: "cid2", CreatedAt: baseTime.Add(2 * time.Second)}, 753 {DID: "did:plc:other2", CID: "cid3", CreatedAt: baseTime.Add(3 * time.Second)}, 754 {DID: targetDID, CID: "cid4", CreatedAt: baseTime.Add(4 * time.Second)}, 755 {DID: "did:plc:other3", CID: "cid5", CreatedAt: baseTime.Add(5 * time.Second)}, 756 } 757 758 m.Add(ops) 759 760 // Search 761 found := m.FindDIDOperations(targetDID) 762 763 if len(found) != 2 { 764 t.Errorf("expected 2 operations for %s, got %d", targetDID, len(found)) 765 } 766 767 if found[0].CID != "cid2" || found[1].CID != "cid4" { 768 t.Error("wrong operations returned") 769 } 770 }) 771 772 t.Run("FindLatestDIDOperation", func(t *testing.T) { 773 minTime := baseTime 774 m, err := mempool.NewMempool(tmpDir, 25, minTime, logger, false) 775 if err != nil { 776 t.Fatalf("NewMempool failed: %v", err) 777 } 778 779 targetDID := "did:plc:target" 780 781 ops := []plcclient.PLCOperation{ 782 {DID: targetDID, CID: "cid1", CreatedAt: baseTime.Add(1 * time.Second), Nullified: false}, 783 {DID: targetDID, CID: "cid2", CreatedAt: baseTime.Add(2 * time.Second), Nullified: false}, 784 {DID: targetDID, CID: "cid3", CreatedAt: baseTime.Add(3 * time.Second), Nullified: true}, // Nullified 785 } 786 787 m.Add(ops) 788 789 // Should return cid2 (latest non-nullified) 790 latest := m.FindLatestDIDOperation(targetDID) 791 792 if latest == nil { 793 t.Fatal("expected to find operation, got nil") 794 } 795 796 if latest.CID != "cid2" { 797 t.Errorf("expected cid2 (latest non-nullified), got %s", latest.CID) 798 } 799 }) 800 801 t.Run("FindLatestDIDOperation_AllNullified", func(t *testing.T) { 802 minTime := baseTime 803 m, err := mempool.NewMempool(tmpDir, 26, minTime, logger, false) 804 if err != nil { 805 t.Fatalf("NewMempool failed: %v", err) 806 } 807 808 targetDID := "did:plc:target" 809 810 ops := []plcclient.PLCOperation{ 811 {DID: targetDID, CID: "cid1", CreatedAt: baseTime.Add(1 * time.Second), Nullified: true}, 812 {DID: targetDID, CID: "cid2", CreatedAt: baseTime.Add(2 * time.Second), Nullified: true}, 813 } 814 815 m.Add(ops) 816 817 latest := m.FindLatestDIDOperation(targetDID) 818 819 if latest != nil { 820 t.Error("should return nil when all operations are nullified") 821 } 822 }) 823 824 t.Run("FindDIDOperations_NotFound", func(t *testing.T) { 825 minTime := baseTime 826 m, err := mempool.NewMempool(tmpDir, 27, minTime, logger, false) 827 if err != nil { 828 t.Fatalf("NewMempool failed: %v", err) 829 } 830 831 m.Add(makeTestOperations(100)) 832 833 found := m.FindDIDOperations("did:plc:nonexistent") 834 835 if len(found) != 0 { 836 t.Errorf("expected empty result, got %d operations", len(found)) 837 } 838 }) 839} 840 841// ==================================================================================== 842// CLEAR OPERATION TESTS 843// ==================================================================================== 844 845func TestMempoolClear(t *testing.T) { 846 tmpDir := t.TempDir() 847 logger := &testLogger{t: t} 848 baseTime := time.Now().Add(-time.Hour) 849 850 t.Run("ClearPopulated", func(t *testing.T) { 851 minTime := baseTime 852 m, err := mempool.NewMempool(tmpDir, 28, minTime, logger, false) 853 if err != nil { 854 t.Fatalf("NewMempool failed: %v", err) 855 } 856 857 m.Add(makeTestOperations(100)) 858 859 if m.Count() != 100 { 860 t.Fatal("setup failed") 861 } 862 863 m.Clear() 864 865 if m.Count() != 0 { 866 t.Errorf("after clear, count should be 0, got %d", m.Count()) 867 } 868 869 // Should be able to add new operations 870 newOps := []plcclient.PLCOperation{ 871 {CID: "new1", CreatedAt: baseTime.Add(200 * time.Second)}, 872 } 873 874 added, err := m.Add(newOps) 875 if err != nil { 876 t.Fatalf("Add after clear failed: %v", err) 877 } 878 879 if added != 1 { 880 t.Error("should be able to add after clear") 881 } 882 }) 883} 884 885// ==================================================================================== 886// HELPER FUNCTIONS 887// ==================================================================================== 888 889func makeTestOperations(count int) []plcclient.PLCOperation { 890 return makeTestOperationsFrom(0, count) 891} 892 893func makeTestOperationsFrom(start, count int) []plcclient.PLCOperation { 894 ops := make([]plcclient.PLCOperation, count) 895 baseTime := time.Now().Add(-time.Hour) 896 897 for i := 0; i < count; i++ { 898 idx := start + i 899 ops[i] = plcclient.PLCOperation{ 900 DID: fmt.Sprintf("did:plc:test%06d", idx), 901 CID: fmt.Sprintf("bafy%06d", idx), 902 CreatedAt: baseTime.Add(time.Duration(idx) * time.Second), 903 } 904 } 905 906 return ops 907}