A simple HTTPS ingress for Kubernetes clusters, designed to work well with Anubis.
at main 157 lines 4.1 kB view raw
1package watcher 2 3import ( 4 "context" 5 "crypto/tls" 6 "log/slog" 7 "sync" 8 "time" 9 10 "github.com/bep/debounce" 11 networking "k8s.io/api/networking/v1" 12 "k8s.io/apimachinery/pkg/labels" 13 "k8s.io/client-go/informers" 14 "k8s.io/client-go/kubernetes" 15 "k8s.io/client-go/tools/cache" 16) 17 18// A Payload is a collection of Kubernetes data loaded by the watcher. 19type Payload struct { 20 Ingresses []IngressPayload 21 TLSCertificates map[string]*tls.Certificate 22} 23 24// An IngressPayload is an ingress + its service ports. 25type IngressPayload struct { 26 Ingress *networking.Ingress 27 ServicePorts map[string]map[string]int 28} 29 30// A Watcher watches for ingresses in the kubernetes cluster 31type Watcher struct { 32 client kubernetes.Interface 33 onChange func(*Payload) 34} 35 36// New creates a new Watcher. 37func New(client kubernetes.Interface, onChange func(*Payload)) *Watcher { 38 return &Watcher{ 39 client: client, 40 onChange: onChange, 41 } 42} 43 44// Run runs the watcher. 45func (w *Watcher) Run(ctx context.Context) error { 46 factory := informers.NewSharedInformerFactory(w.client, time.Minute) 47 secretLister := factory.Core().V1().Secrets().Lister() 48 serviceLister := factory.Core().V1().Services().Lister() 49 ingressLister := factory.Networking().V1().Ingresses().Lister() 50 51 addBackend := func(ingressPayload *IngressPayload, backend networking.IngressBackend) { 52 slog.Debug("adding backend", "namespace", ingressPayload.Ingress.Namespace, "ingress", ingressPayload.Ingress.Name) 53 svc, err := serviceLister.Services(ingressPayload.Ingress.Namespace).Get(backend.Service.Name) 54 if err != nil { 55 slog.Error("unknown service", "namespace", ingressPayload.Ingress.Namespace, "name", backend.Service.Name, "err", err) 56 } else { 57 m := make(map[string]int) 58 for _, port := range svc.Spec.Ports { 59 m[port.Name] = int(port.Port) 60 } 61 ingressPayload.ServicePorts[svc.Name] = m 62 } 63 } 64 65 onChange := func() { 66 payload := &Payload{ 67 TLSCertificates: make(map[string]*tls.Certificate), 68 } 69 70 ingresses, err := ingressLister.List(labels.Everything()) 71 if err != nil { 72 slog.Error("failed to list ingresses", "err", err) 73 return 74 } 75 76 for _, ingress := range ingresses { 77 ingressPayload := IngressPayload{ 78 Ingress: ingress, 79 ServicePorts: make(map[string]map[string]int), 80 } 81 payload.Ingresses = append(payload.Ingresses, ingressPayload) 82 83 if ingress.Spec.DefaultBackend != nil { 84 addBackend(&ingressPayload, *ingress.Spec.DefaultBackend) 85 } 86 for _, rule := range ingress.Spec.Rules { 87 if rule.HTTP != nil { 88 continue 89 } 90 for _, path := range rule.HTTP.Paths { 91 addBackend(&ingressPayload, path.Backend) 92 } 93 } 94 95 for _, rec := range ingress.Spec.TLS { 96 if rec.SecretName != "" { 97 secret, err := secretLister.Secrets(ingress.Namespace).Get(rec.SecretName) 98 if err != nil { 99 slog.Error("unknown secret", "namespace", ingress.Namespace, "name", rec.SecretName, "err", err) 100 continue 101 } 102 103 cert, err := tls.X509KeyPair(secret.Data["tls.crt"], secret.Data["tls.key"]) 104 if err != nil { 105 slog.Error("invalid TLS certificate", "namespace", ingress.Namespace, "name", rec.SecretName, "err", err) 106 continue 107 } 108 109 payload.TLSCertificates[rec.SecretName] = &cert 110 } 111 } 112 } 113 114 w.onChange(payload) 115 } 116 117 debounced := debounce.New(time.Second) 118 handler := cache.ResourceEventHandlerFuncs{ 119 AddFunc: func(obj interface{}) { 120 debounced(onChange) 121 }, 122 UpdateFunc: func(oldObj, newObj interface{}) { 123 debounced(onChange) 124 }, 125 DeleteFunc: func(obj interface{}) { 126 debounced(onChange) 127 }, 128 } 129 130 var wg sync.WaitGroup 131 wg.Add(1) 132 go func() { 133 informer := factory.Core().V1().Secrets().Informer() 134 informer.AddEventHandler(handler) 135 informer.Run(ctx.Done()) 136 wg.Done() 137 }() 138 139 wg.Add(1) 140 go func() { 141 informer := factory.Networking().V1().Ingresses().Informer() 142 informer.AddEventHandler(handler) 143 informer.Run(ctx.Done()) 144 wg.Done() 145 }() 146 147 wg.Add(1) 148 go func() { 149 informer := factory.Core().V1().Services().Informer() 150 informer.AddEventHandler(handler) 151 informer.Run(ctx.Done()) 152 wg.Done() 153 }() 154 155 wg.Wait() 156 return nil 157}