Webhook-to-SSE gateway with hierarchical topic routing and signature verification
at main 119 lines 3.1 kB view raw
1package main 2 3import ( 4 "context" 5 "fmt" 6 "testing" 7 "time" 8) 9 10func TestMemoryBackend_publishAndSubscribe(t *testing.T) { 11 backend := NewMemoryBackend(100) 12 ctx, cancel := context.WithCancel(context.Background()) 13 defer cancel() 14 15 ch := backend.Subscribe(ctx) 16 event := &Event{ID: "e1", Path: "test", Payload: map[string]any{}} 17 backend.Publish(event) 18 19 select { 20 case got := <-ch: 21 if got.ID != "e1" { 22 t.Errorf("expected e1, got %s", got.ID) 23 } 24 case <-time.After(time.Second): 25 t.Fatal("timed out waiting for event") 26 } 27} 28 29func TestMemoryBackend_multipleListeners(t *testing.T) { 30 backend := NewMemoryBackend(100) 31 ctx, cancel := context.WithCancel(context.Background()) 32 defer cancel() 33 34 ch1 := backend.Subscribe(ctx) 35 ch2 := backend.Subscribe(ctx) 36 backend.Publish(&Event{ID: "e1", Path: "test", Payload: map[string]any{}}) 37 38 for i, ch := range []<-chan *Event{ch1, ch2} { 39 select { 40 case got := <-ch: 41 if got.ID != "e1" { 42 t.Errorf("listener %d: expected e1, got %s", i, got.ID) 43 } 44 case <-time.After(time.Second): 45 t.Fatalf("listener %d: timed out", i) 46 } 47 } 48} 49 50func TestMemoryBackend_cancelRemovesListener(t *testing.T) { 51 backend := NewMemoryBackend(100) 52 ctx, cancel := context.WithCancel(context.Background()) 53 backend.Subscribe(ctx) 54 cancel() 55 56 // Give the cleanup goroutine time to run 57 time.Sleep(50 * time.Millisecond) 58 59 backend.mu.RLock() 60 n := len(backend.listeners) 61 backend.mu.RUnlock() 62 if n != 0 { 63 t.Errorf("expected 0 listeners after cancel, got %d", n) 64 } 65} 66 67func TestMemoryBackend_since(t *testing.T) { 68 backend := NewMemoryBackend(100) 69 backend.Publish(&Event{ID: "e1", Path: "test", Payload: map[string]any{}}) 70 backend.Publish(&Event{ID: "e2", Path: "test", Payload: map[string]any{}}) 71 backend.Publish(&Event{ID: "e3", Path: "other", Payload: map[string]any{}}) 72 73 events := backend.Since("e1", "test") 74 if len(events) != 1 { 75 t.Fatalf("expected 1 event, got %d", len(events)) 76 } 77 if events[0].ID != "e2" { 78 t.Errorf("expected e2, got %s", events[0].ID) 79 } 80} 81 82func TestMemoryBackend_sinceEmptyPath(t *testing.T) { 83 backend := NewMemoryBackend(100) 84 backend.Publish(&Event{ID: "e1", Path: "test", Payload: map[string]any{}}) 85 backend.Publish(&Event{ID: "e2", Path: "other", Payload: map[string]any{}}) 86 87 events := backend.Since("e1", "") 88 if len(events) != 1 { 89 t.Fatalf("expected 1 event, got %d", len(events)) 90 } 91 if events[0].ID != "e2" { 92 t.Errorf("expected e2, got %s", events[0].ID) 93 } 94} 95 96func TestMemoryBackend_sinceExpiredID(t *testing.T) { 97 backend := NewMemoryBackend(3) 98 for i := range 6 { 99 backend.Publish(&Event{ID: fmt.Sprintf("e%d", i), Path: "test", Payload: map[string]any{}}) 100 } 101 102 events := backend.Since("e0", "test") 103 if len(events) != 3 { 104 t.Fatalf("expected 3 events (full buffer), got %d", len(events)) 105 } 106 if events[0].ID != "e3" { 107 t.Errorf("expected e3, got %s", events[0].ID) 108 } 109} 110 111func TestMemoryBackend_closeIsIdempotent(t *testing.T) { 112 backend := NewMemoryBackend(100) 113 if err := backend.Close(); err != nil { 114 t.Errorf("first close: %v", err) 115 } 116 if err := backend.Close(); err != nil { 117 t.Errorf("second close: %v", err) 118 } 119}