Webhook-to-SSE gateway with hierarchical topic routing and signature verification
1package main
2
3import (
4 "context"
5 "fmt"
6 "testing"
7 "time"
8)
9
10func TestMemoryBackend_publishAndSubscribe(t *testing.T) {
11 backend := NewMemoryBackend(100)
12 ctx, cancel := context.WithCancel(context.Background())
13 defer cancel()
14
15 ch := backend.Subscribe(ctx)
16 event := &Event{ID: "e1", Path: "test", Payload: map[string]any{}}
17 backend.Publish(event)
18
19 select {
20 case got := <-ch:
21 if got.ID != "e1" {
22 t.Errorf("expected e1, got %s", got.ID)
23 }
24 case <-time.After(time.Second):
25 t.Fatal("timed out waiting for event")
26 }
27}
28
29func TestMemoryBackend_multipleListeners(t *testing.T) {
30 backend := NewMemoryBackend(100)
31 ctx, cancel := context.WithCancel(context.Background())
32 defer cancel()
33
34 ch1 := backend.Subscribe(ctx)
35 ch2 := backend.Subscribe(ctx)
36 backend.Publish(&Event{ID: "e1", Path: "test", Payload: map[string]any{}})
37
38 for i, ch := range []<-chan *Event{ch1, ch2} {
39 select {
40 case got := <-ch:
41 if got.ID != "e1" {
42 t.Errorf("listener %d: expected e1, got %s", i, got.ID)
43 }
44 case <-time.After(time.Second):
45 t.Fatalf("listener %d: timed out", i)
46 }
47 }
48}
49
50func TestMemoryBackend_cancelRemovesListener(t *testing.T) {
51 backend := NewMemoryBackend(100)
52 ctx, cancel := context.WithCancel(context.Background())
53 backend.Subscribe(ctx)
54 cancel()
55
56 // Give the cleanup goroutine time to run
57 time.Sleep(50 * time.Millisecond)
58
59 backend.mu.RLock()
60 n := len(backend.listeners)
61 backend.mu.RUnlock()
62 if n != 0 {
63 t.Errorf("expected 0 listeners after cancel, got %d", n)
64 }
65}
66
67func TestMemoryBackend_since(t *testing.T) {
68 backend := NewMemoryBackend(100)
69 backend.Publish(&Event{ID: "e1", Path: "test", Payload: map[string]any{}})
70 backend.Publish(&Event{ID: "e2", Path: "test", Payload: map[string]any{}})
71 backend.Publish(&Event{ID: "e3", Path: "other", Payload: map[string]any{}})
72
73 events := backend.Since("e1", "test")
74 if len(events) != 1 {
75 t.Fatalf("expected 1 event, got %d", len(events))
76 }
77 if events[0].ID != "e2" {
78 t.Errorf("expected e2, got %s", events[0].ID)
79 }
80}
81
82func TestMemoryBackend_sinceEmptyPath(t *testing.T) {
83 backend := NewMemoryBackend(100)
84 backend.Publish(&Event{ID: "e1", Path: "test", Payload: map[string]any{}})
85 backend.Publish(&Event{ID: "e2", Path: "other", Payload: map[string]any{}})
86
87 events := backend.Since("e1", "")
88 if len(events) != 1 {
89 t.Fatalf("expected 1 event, got %d", len(events))
90 }
91 if events[0].ID != "e2" {
92 t.Errorf("expected e2, got %s", events[0].ID)
93 }
94}
95
96func TestMemoryBackend_sinceExpiredID(t *testing.T) {
97 backend := NewMemoryBackend(3)
98 for i := range 6 {
99 backend.Publish(&Event{ID: fmt.Sprintf("e%d", i), Path: "test", Payload: map[string]any{}})
100 }
101
102 events := backend.Since("e0", "test")
103 if len(events) != 3 {
104 t.Fatalf("expected 3 events (full buffer), got %d", len(events))
105 }
106 if events[0].ID != "e3" {
107 t.Errorf("expected e3, got %s", events[0].ID)
108 }
109}
110
111func TestMemoryBackend_closeIsIdempotent(t *testing.T) {
112 backend := NewMemoryBackend(100)
113 if err := backend.Close(); err != nil {
114 t.Errorf("first close: %v", err)
115 }
116 if err := backend.Close(); err != nil {
117 t.Errorf("second close: %v", err)
118 }
119}