A simple HTTPS ingress for Kubernetes clusters, designed to work well with Anubis.
1package telemetry
2
3import (
4 "bytes"
5 "context"
6 "encoding/json"
7 "fmt"
8 "log/slog"
9 "math/rand/v2"
10 "net"
11 "net/http"
12 "time"
13
14 "git.xeserv.us/Techaro/hythlodaeus/bundler"
15 "git.xeserv.us/Techaro/hythlodaeus/proto/relayd"
16 awsConfig "github.com/aws/aws-sdk-go-v2/config"
17 "github.com/aws/aws-sdk-go-v2/service/s3"
18 "github.com/aws/aws-sdk-go/aws"
19 "github.com/exaring/ja4plus"
20 "github.com/felixge/httpsnoop"
21 "github.com/google/uuid"
22 "github.com/prometheus/client_golang/prometheus"
23 "github.com/prometheus/client_golang/prometheus/promauto"
24 "google.golang.org/protobuf/types/known/durationpb"
25)
26
27var (
28 responseTime = promauto.NewHistogramVec(prometheus.HistogramOpts{
29 Name: "hythlodaeus_response_time",
30 Help: "response time per domain proxied",
31 Buckets: prometheus.ExponentialBuckets(float64(time.Microsecond), 2, 24),
32 }, []string{"host"})
33 responseCodes = promauto.NewCounterVec(prometheus.CounterOpts{
34 Name: "hythlodaeus_response_codes",
35 Help: "response codes per domain proxied",
36 }, []string{"host", "code"})
37)
38
39type Config struct {
40 Bucket string
41 PathStyle bool
42 Host string
43 BundleCount int
44 ContextDeadline time.Duration
45 DelayThreshold time.Duration
46}
47
48type Sink struct {
49 sink *bundler.Bundler[*relayd.RequestLog]
50 s3c *s3.Client
51 cfg Config
52}
53
54func New(ctx context.Context, cfg Config) (*Sink, error) {
55 slog.Info("telemetry enabled")
56
57 acfg, err := awsConfig.LoadDefaultConfig(ctx)
58 if err != nil {
59 return nil, err
60 }
61
62 s3c := s3.NewFromConfig(acfg, func(o *s3.Options) {
63 o.UsePathStyle = true
64 })
65
66 result := &Sink{
67 s3c: s3c,
68 cfg: cfg,
69 }
70
71 result.sink = bundler.New(result.WriteBundle)
72 result.sink.DelayThreshold = cfg.DelayThreshold
73 result.sink.BundleCountThreshold = cfg.BundleCount
74 result.sink.ContextDeadline = cfg.ContextDeadline
75
76 return result, nil
77}
78
79func (s *Sink) Add(item *relayd.RequestLog) {
80 s.sink.Add(item, 1)
81}
82
83func (s *Sink) WriteBundle(ctx context.Context, items []*relayd.RequestLog) {
84 if err := s.writeBundle(ctx, items); err != nil {
85 slog.Error("failed writing", "itemCount", len(items), "err", err)
86 for _, item := range items {
87 item := item
88 // 1 in 8 chance to drop
89 if rand.IntN(8) != 4 /* chosen by fair dice roll */ {
90 go func(rl *relayd.RequestLog) {
91 s.sink.Add(rl, 1)
92 }(item)
93 }
94 }
95 }
96}
97
98func (s *Sink) writeBundle(ctx context.Context, items []*relayd.RequestLog) error {
99 buf := bytes.NewBuffer(nil)
100 enc := json.NewEncoder(buf)
101
102 for _, item := range items {
103 if err := enc.Encode(item); err != nil {
104 return err
105 }
106 }
107
108 id := uuid.Must(uuid.NewV7()).String()
109
110 if _, err := s.s3c.PutObject(ctx, &s3.PutObjectInput{
111 Body: buf,
112 Bucket: aws.String(s.cfg.Bucket),
113 Key: aws.String(s.cfg.Host + "/" + id + ".jsonl"),
114 ContentType: aws.String("application/jsonl"),
115 }); err != nil {
116 return err
117 }
118
119 return nil
120}
121
122func (s *Sink) Middleware(next http.Handler) http.Handler {
123 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
124 host, _, _ := net.SplitHostPort(r.RemoteAddr)
125 if host != "" {
126 r.Header.Set("X-Real-Ip", host)
127 }
128
129 ja4 := ja4plus.JA4FromContext(r.Context())
130 if ja4 != "" {
131 r.Header.Set("X-Tls-Fingerprint-Ja4", ja4)
132 }
133
134 reqID := uuid.Must(uuid.NewV7()).String()
135 rl := relayd.RequestLogFromRequest(r, host, reqID, ja4)
136
137 r.Header.Set("X-Forwarded-Host", r.Host)
138 r.Header.Set("X-Forwarded-Proto", "https")
139 r.Header.Set("X-Forwarded-Scheme", "https")
140 r.Header.Set("X-Request-Id", reqID)
141 r.Header.Set("X-Scheme", "https")
142 r.Header.Set("X-HTTP-Protocol", r.Proto)
143
144 m := httpsnoop.CaptureMetrics(next, w, r)
145 rl.ResponseTime = durationpb.New(m.Duration)
146 rl.StatusCode = int32(m.Code)
147 rl.BytesWritten = m.Written
148
149 responseTime.WithLabelValues(r.Host).Observe(float64(m.Duration))
150 responseCodes.WithLabelValues(r.Host, fmt.Sprint(m.Code)).Add(1)
151
152 s.Add(rl)
153 })
154}