A simple HTTPS ingress for Kubernetes clusters, designed to work well with Anubis.
at main 154 lines 3.9 kB view raw
1package telemetry 2 3import ( 4 "bytes" 5 "context" 6 "encoding/json" 7 "fmt" 8 "log/slog" 9 "math/rand/v2" 10 "net" 11 "net/http" 12 "time" 13 14 "git.xeserv.us/Techaro/hythlodaeus/bundler" 15 "git.xeserv.us/Techaro/hythlodaeus/proto/relayd" 16 awsConfig "github.com/aws/aws-sdk-go-v2/config" 17 "github.com/aws/aws-sdk-go-v2/service/s3" 18 "github.com/aws/aws-sdk-go/aws" 19 "github.com/exaring/ja4plus" 20 "github.com/felixge/httpsnoop" 21 "github.com/google/uuid" 22 "github.com/prometheus/client_golang/prometheus" 23 "github.com/prometheus/client_golang/prometheus/promauto" 24 "google.golang.org/protobuf/types/known/durationpb" 25) 26 27var ( 28 responseTime = promauto.NewHistogramVec(prometheus.HistogramOpts{ 29 Name: "hythlodaeus_response_time", 30 Help: "response time per domain proxied", 31 Buckets: prometheus.ExponentialBuckets(float64(time.Microsecond), 2, 24), 32 }, []string{"host"}) 33 responseCodes = promauto.NewCounterVec(prometheus.CounterOpts{ 34 Name: "hythlodaeus_response_codes", 35 Help: "response codes per domain proxied", 36 }, []string{"host", "code"}) 37) 38 39type Config struct { 40 Bucket string 41 PathStyle bool 42 Host string 43 BundleCount int 44 ContextDeadline time.Duration 45 DelayThreshold time.Duration 46} 47 48type Sink struct { 49 sink *bundler.Bundler[*relayd.RequestLog] 50 s3c *s3.Client 51 cfg Config 52} 53 54func New(ctx context.Context, cfg Config) (*Sink, error) { 55 slog.Info("telemetry enabled") 56 57 acfg, err := awsConfig.LoadDefaultConfig(ctx) 58 if err != nil { 59 return nil, err 60 } 61 62 s3c := s3.NewFromConfig(acfg, func(o *s3.Options) { 63 o.UsePathStyle = true 64 }) 65 66 result := &Sink{ 67 s3c: s3c, 68 cfg: cfg, 69 } 70 71 result.sink = bundler.New(result.WriteBundle) 72 result.sink.DelayThreshold = cfg.DelayThreshold 73 result.sink.BundleCountThreshold = cfg.BundleCount 74 result.sink.ContextDeadline = cfg.ContextDeadline 75 76 return result, nil 77} 78 79func (s *Sink) Add(item *relayd.RequestLog) { 80 s.sink.Add(item, 1) 81} 82 83func (s *Sink) WriteBundle(ctx context.Context, items []*relayd.RequestLog) { 84 if err := s.writeBundle(ctx, items); err != nil { 85 slog.Error("failed writing", "itemCount", len(items), "err", err) 86 for _, item := range items { 87 item := item 88 // 1 in 8 chance to drop 89 if rand.IntN(8) != 4 /* chosen by fair dice roll */ { 90 go func(rl *relayd.RequestLog) { 91 s.sink.Add(rl, 1) 92 }(item) 93 } 94 } 95 } 96} 97 98func (s *Sink) writeBundle(ctx context.Context, items []*relayd.RequestLog) error { 99 buf := bytes.NewBuffer(nil) 100 enc := json.NewEncoder(buf) 101 102 for _, item := range items { 103 if err := enc.Encode(item); err != nil { 104 return err 105 } 106 } 107 108 id := uuid.Must(uuid.NewV7()).String() 109 110 if _, err := s.s3c.PutObject(ctx, &s3.PutObjectInput{ 111 Body: buf, 112 Bucket: aws.String(s.cfg.Bucket), 113 Key: aws.String(s.cfg.Host + "/" + id + ".jsonl"), 114 ContentType: aws.String("application/jsonl"), 115 }); err != nil { 116 return err 117 } 118 119 return nil 120} 121 122func (s *Sink) Middleware(next http.Handler) http.Handler { 123 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 124 host, _, _ := net.SplitHostPort(r.RemoteAddr) 125 if host != "" { 126 r.Header.Set("X-Real-Ip", host) 127 } 128 129 ja4 := ja4plus.JA4FromContext(r.Context()) 130 if ja4 != "" { 131 r.Header.Set("X-Tls-Fingerprint-Ja4", ja4) 132 } 133 134 reqID := uuid.Must(uuid.NewV7()).String() 135 rl := relayd.RequestLogFromRequest(r, host, reqID, ja4) 136 137 r.Header.Set("X-Forwarded-Host", r.Host) 138 r.Header.Set("X-Forwarded-Proto", "https") 139 r.Header.Set("X-Forwarded-Scheme", "https") 140 r.Header.Set("X-Request-Id", reqID) 141 r.Header.Set("X-Scheme", "https") 142 r.Header.Set("X-HTTP-Protocol", r.Proto) 143 144 m := httpsnoop.CaptureMetrics(next, w, r) 145 rl.ResponseTime = durationpb.New(m.Duration) 146 rl.StatusCode = int32(m.Code) 147 rl.BytesWritten = m.Written 148 149 responseTime.WithLabelValues(r.Host).Observe(float64(m.Duration)) 150 responseCodes.WithLabelValues(r.Host, fmt.Sprint(m.Code)).Add(1) 151 152 s.Add(rl) 153 }) 154}