package main import ( "context" "fmt" "testing" "time" ) func TestMemoryBackend_publishAndSubscribe(t *testing.T) { backend := NewMemoryBackend(100) ctx, cancel := context.WithCancel(context.Background()) defer cancel() ch := backend.Subscribe(ctx) event := &Event{ID: "e1", Path: "test", Payload: map[string]any{}} backend.Publish(event) select { case got := <-ch: if got.ID != "e1" { t.Errorf("expected e1, got %s", got.ID) } case <-time.After(time.Second): t.Fatal("timed out waiting for event") } } func TestMemoryBackend_multipleListeners(t *testing.T) { backend := NewMemoryBackend(100) ctx, cancel := context.WithCancel(context.Background()) defer cancel() ch1 := backend.Subscribe(ctx) ch2 := backend.Subscribe(ctx) backend.Publish(&Event{ID: "e1", Path: "test", Payload: map[string]any{}}) for i, ch := range []<-chan *Event{ch1, ch2} { select { case got := <-ch: if got.ID != "e1" { t.Errorf("listener %d: expected e1, got %s", i, got.ID) } case <-time.After(time.Second): t.Fatalf("listener %d: timed out", i) } } } func TestMemoryBackend_cancelRemovesListener(t *testing.T) { backend := NewMemoryBackend(100) ctx, cancel := context.WithCancel(context.Background()) backend.Subscribe(ctx) cancel() // Give the cleanup goroutine time to run time.Sleep(50 * time.Millisecond) backend.mu.RLock() n := len(backend.listeners) backend.mu.RUnlock() if n != 0 { t.Errorf("expected 0 listeners after cancel, got %d", n) } } func TestMemoryBackend_since(t *testing.T) { backend := NewMemoryBackend(100) backend.Publish(&Event{ID: "e1", Path: "test", Payload: map[string]any{}}) backend.Publish(&Event{ID: "e2", Path: "test", Payload: map[string]any{}}) backend.Publish(&Event{ID: "e3", Path: "other", Payload: map[string]any{}}) events := backend.Since("e1", "test") if len(events) != 1 { t.Fatalf("expected 1 event, got %d", len(events)) } if events[0].ID != "e2" { t.Errorf("expected e2, got %s", events[0].ID) } } func TestMemoryBackend_sinceEmptyPath(t *testing.T) { backend := NewMemoryBackend(100) backend.Publish(&Event{ID: "e1", Path: "test", Payload: map[string]any{}}) backend.Publish(&Event{ID: "e2", Path: "other", Payload: map[string]any{}}) events := backend.Since("e1", "") if len(events) != 1 { t.Fatalf("expected 1 event, got %d", len(events)) } if events[0].ID != "e2" { t.Errorf("expected e2, got %s", events[0].ID) } } func TestMemoryBackend_sinceExpiredID(t *testing.T) { backend := NewMemoryBackend(3) for i := range 6 { backend.Publish(&Event{ID: fmt.Sprintf("e%d", i), Path: "test", Payload: map[string]any{}}) } events := backend.Since("e0", "test") if len(events) != 3 { t.Fatalf("expected 3 events (full buffer), got %d", len(events)) } if events[0].ID != "e3" { t.Errorf("expected e3, got %s", events[0].ID) } } func TestMemoryBackend_closeIsIdempotent(t *testing.T) { backend := NewMemoryBackend(100) if err := backend.Close(); err != nil { t.Errorf("first close: %v", err) } if err := backend.Close(); err != nil { t.Errorf("second close: %v", err) } }