A simple HTTPS ingress for Kubernetes clusters, designed to work well with Anubis.
1package watcher
2
3import (
4 "context"
5 "crypto/tls"
6 "log/slog"
7 "sync"
8 "time"
9
10 "github.com/bep/debounce"
11 networking "k8s.io/api/networking/v1"
12 "k8s.io/apimachinery/pkg/labels"
13 "k8s.io/client-go/informers"
14 "k8s.io/client-go/kubernetes"
15 "k8s.io/client-go/tools/cache"
16)
17
18// A Payload is a collection of Kubernetes data loaded by the watcher.
19type Payload struct {
20 Ingresses []IngressPayload
21 TLSCertificates map[string]*tls.Certificate
22}
23
24// An IngressPayload is an ingress + its service ports.
25type IngressPayload struct {
26 Ingress *networking.Ingress
27 ServicePorts map[string]map[string]int
28}
29
30// A Watcher watches for ingresses in the kubernetes cluster
31type Watcher struct {
32 client kubernetes.Interface
33 onChange func(*Payload)
34}
35
36// New creates a new Watcher.
37func New(client kubernetes.Interface, onChange func(*Payload)) *Watcher {
38 return &Watcher{
39 client: client,
40 onChange: onChange,
41 }
42}
43
44// Run runs the watcher.
45func (w *Watcher) Run(ctx context.Context) error {
46 factory := informers.NewSharedInformerFactory(w.client, time.Minute)
47 secretLister := factory.Core().V1().Secrets().Lister()
48 serviceLister := factory.Core().V1().Services().Lister()
49 ingressLister := factory.Networking().V1().Ingresses().Lister()
50
51 addBackend := func(ingressPayload *IngressPayload, backend networking.IngressBackend) {
52 slog.Debug("adding backend", "namespace", ingressPayload.Ingress.Namespace, "ingress", ingressPayload.Ingress.Name)
53 svc, err := serviceLister.Services(ingressPayload.Ingress.Namespace).Get(backend.Service.Name)
54 if err != nil {
55 slog.Error("unknown service", "namespace", ingressPayload.Ingress.Namespace, "name", backend.Service.Name, "err", err)
56 } else {
57 m := make(map[string]int)
58 for _, port := range svc.Spec.Ports {
59 m[port.Name] = int(port.Port)
60 }
61 ingressPayload.ServicePorts[svc.Name] = m
62 }
63 }
64
65 onChange := func() {
66 payload := &Payload{
67 TLSCertificates: make(map[string]*tls.Certificate),
68 }
69
70 ingresses, err := ingressLister.List(labels.Everything())
71 if err != nil {
72 slog.Error("failed to list ingresses", "err", err)
73 return
74 }
75
76 for _, ingress := range ingresses {
77 ingressPayload := IngressPayload{
78 Ingress: ingress,
79 ServicePorts: make(map[string]map[string]int),
80 }
81 payload.Ingresses = append(payload.Ingresses, ingressPayload)
82
83 if ingress.Spec.DefaultBackend != nil {
84 addBackend(&ingressPayload, *ingress.Spec.DefaultBackend)
85 }
86 for _, rule := range ingress.Spec.Rules {
87 if rule.HTTP != nil {
88 continue
89 }
90 for _, path := range rule.HTTP.Paths {
91 addBackend(&ingressPayload, path.Backend)
92 }
93 }
94
95 for _, rec := range ingress.Spec.TLS {
96 if rec.SecretName != "" {
97 secret, err := secretLister.Secrets(ingress.Namespace).Get(rec.SecretName)
98 if err != nil {
99 slog.Error("unknown secret", "namespace", ingress.Namespace, "name", rec.SecretName, "err", err)
100 continue
101 }
102
103 cert, err := tls.X509KeyPair(secret.Data["tls.crt"], secret.Data["tls.key"])
104 if err != nil {
105 slog.Error("invalid TLS certificate", "namespace", ingress.Namespace, "name", rec.SecretName, "err", err)
106 continue
107 }
108
109 payload.TLSCertificates[rec.SecretName] = &cert
110 }
111 }
112 }
113
114 w.onChange(payload)
115 }
116
117 debounced := debounce.New(time.Second)
118 handler := cache.ResourceEventHandlerFuncs{
119 AddFunc: func(obj interface{}) {
120 debounced(onChange)
121 },
122 UpdateFunc: func(oldObj, newObj interface{}) {
123 debounced(onChange)
124 },
125 DeleteFunc: func(obj interface{}) {
126 debounced(onChange)
127 },
128 }
129
130 var wg sync.WaitGroup
131 wg.Add(1)
132 go func() {
133 informer := factory.Core().V1().Secrets().Informer()
134 informer.AddEventHandler(handler)
135 informer.Run(ctx.Done())
136 wg.Done()
137 }()
138
139 wg.Add(1)
140 go func() {
141 informer := factory.Networking().V1().Ingresses().Informer()
142 informer.AddEventHandler(handler)
143 informer.Run(ctx.Done())
144 wg.Done()
145 }()
146
147 wg.Add(1)
148 go func() {
149 informer := factory.Core().V1().Services().Informer()
150 informer.AddEventHandler(handler)
151 informer.Run(ctx.Done())
152 wg.Done()
153 }()
154
155 wg.Wait()
156 return nil
157}