this repo has no description
1package knotclient 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log/slog" 8 "math/rand" 9 "net/url" 10 "sync" 11 "time" 12 13 "tangled.sh/tangled.sh/core/knotclient/cursor" 14 "tangled.sh/tangled.sh/core/log" 15 16 "github.com/gorilla/websocket" 17) 18 19type ProcessFunc func(ctx context.Context, source EventSource, message Message) error 20 21type Message struct { 22 Rkey string 23 Nsid string 24 // do not full deserialize this portion of the message, processFunc can do that 25 EventJson json.RawMessage `json:"event"` 26} 27 28type ConsumerConfig struct { 29 Sources map[EventSource]struct{} 30 ProcessFunc ProcessFunc 31 RetryInterval time.Duration 32 MaxRetryInterval time.Duration 33 ConnectionTimeout time.Duration 34 WorkerCount int 35 QueueSize int 36 Logger *slog.Logger 37 Dev bool 38 CursorStore cursor.Store 39} 40 41func NewConsumerConfig() *ConsumerConfig { 42 return &ConsumerConfig{ 43 Sources: make(map[EventSource]struct{}), 44 } 45} 46 47type EventSource struct { 48 Knot string 49} 50 51func NewEventSource(knot string) EventSource { 52 return EventSource{ 53 Knot: knot, 54 } 55} 56 57type EventConsumer struct { 58 wg sync.WaitGroup 59 dialer *websocket.Dialer 60 connMap sync.Map 61 jobQueue chan job 62 logger *slog.Logger 63 randSource *rand.Rand 64 65 // rw lock over edits to ConsumerConfig 66 cfgMu sync.RWMutex 67 cfg ConsumerConfig 68} 69 70func (e *EventConsumer) buildUrl(s EventSource, cursor int64) (*url.URL, error) { 71 scheme := "wss" 72 if e.cfg.Dev { 73 scheme = "ws" 74 } 75 76 u, err := url.Parse(scheme + "://" + s.Knot + "/events") 77 if err != nil { 78 return nil, err 79 } 80 81 if cursor != 0 { 82 query := url.Values{} 83 query.Add("cursor", fmt.Sprintf("%d", cursor)) 84 u.RawQuery = query.Encode() 85 } 86 return u, nil 87} 88 89type job struct { 90 source EventSource 91 message []byte 92} 93 94func NewEventConsumer(cfg ConsumerConfig) *EventConsumer { 95 if cfg.RetryInterval == 0 { 96 cfg.RetryInterval = 15 * time.Minute 97 } 98 if cfg.ConnectionTimeout == 0 { 99 cfg.ConnectionTimeout = 10 * time.Second 100 } 101 if cfg.WorkerCount <= 0 { 102 cfg.WorkerCount = 5 103 } 104 if cfg.MaxRetryInterval == 0 { 105 cfg.MaxRetryInterval = 1 * time.Hour 106 } 107 if cfg.Logger == nil { 108 cfg.Logger = log.New("eventconsumer") 109 } 110 if cfg.QueueSize == 0 { 111 cfg.QueueSize = 100 112 } 113 if cfg.CursorStore == nil { 114 cfg.CursorStore = &cursor.MemoryStore{} 115 } 116 return &EventConsumer{ 117 cfg: cfg, 118 dialer: websocket.DefaultDialer, 119 jobQueue: make(chan job, cfg.QueueSize), // buffered job queue 120 logger: cfg.Logger, 121 randSource: rand.New(rand.NewSource(time.Now().UnixNano())), 122 } 123} 124 125func (c *EventConsumer) Start(ctx context.Context) { 126 c.cfg.Logger.Info("starting consumer", "config", c.cfg) 127 128 // start workers 129 for range c.cfg.WorkerCount { 130 c.wg.Add(1) 131 go c.worker(ctx) 132 } 133 134 // start streaming 135 for source := range c.cfg.Sources { 136 c.wg.Add(1) 137 go c.startConnectionLoop(ctx, source) 138 } 139} 140 141func (c *EventConsumer) Stop() { 142 c.connMap.Range(func(_, val any) bool { 143 if conn, ok := val.(*websocket.Conn); ok { 144 conn.Close() 145 } 146 return true 147 }) 148 c.wg.Wait() 149 close(c.jobQueue) 150} 151 152func (c *EventConsumer) AddSource(ctx context.Context, s EventSource) { 153 // we are already listening to this source 154 if _, ok := c.cfg.Sources[s]; ok { 155 c.logger.Info("source already present", "source", s) 156 return 157 } 158 159 c.cfgMu.Lock() 160 c.cfg.Sources[s] = struct{}{} 161 c.wg.Add(1) 162 go c.startConnectionLoop(ctx, s) 163 c.cfgMu.Unlock() 164} 165 166func (c *EventConsumer) worker(ctx context.Context) { 167 defer c.wg.Done() 168 for { 169 select { 170 case <-ctx.Done(): 171 return 172 case j, ok := <-c.jobQueue: 173 if !ok { 174 return 175 } 176 177 var msg Message 178 err := json.Unmarshal(j.message, &msg) 179 if err != nil { 180 c.logger.Error("error deserializing message", "source", j.source.Knot, "err", err) 181 return 182 } 183 184 // update cursor 185 c.cfg.CursorStore.Set(j.source.Knot, time.Now().UnixNano()) 186 187 if err := c.cfg.ProcessFunc(ctx, j.source, msg); err != nil { 188 c.logger.Error("error processing message", "source", j.source, "err", err) 189 } 190 } 191 } 192} 193 194func (c *EventConsumer) startConnectionLoop(ctx context.Context, source EventSource) { 195 defer c.wg.Done() 196 retryInterval := c.cfg.RetryInterval 197 for { 198 select { 199 case <-ctx.Done(): 200 return 201 default: 202 err := c.runConnection(ctx, source) 203 if err != nil { 204 c.logger.Error("connection failed", "source", source, "err", err) 205 } 206 207 // apply jitter 208 jitter := time.Duration(c.randSource.Int63n(int64(retryInterval) / 5)) 209 delay := retryInterval + jitter 210 211 if retryInterval < c.cfg.MaxRetryInterval { 212 retryInterval *= 2 213 if retryInterval > c.cfg.MaxRetryInterval { 214 retryInterval = c.cfg.MaxRetryInterval 215 } 216 } 217 c.logger.Info("retrying connection", "source", source, "delay", delay) 218 select { 219 case <-time.After(delay): 220 case <-ctx.Done(): 221 return 222 } 223 } 224 } 225} 226 227func (c *EventConsumer) runConnection(ctx context.Context, source EventSource) error { 228 connCtx, cancel := context.WithTimeout(ctx, c.cfg.ConnectionTimeout) 229 defer cancel() 230 231 cursor := c.cfg.CursorStore.Get(source.Knot) 232 233 u, err := c.buildUrl(source, cursor) 234 if err != nil { 235 return err 236 } 237 238 c.logger.Info("connecting", "url", u.String()) 239 conn, _, err := c.dialer.DialContext(connCtx, u.String(), nil) 240 if err != nil { 241 return err 242 } 243 defer conn.Close() 244 c.connMap.Store(source, conn) 245 defer c.connMap.Delete(source) 246 247 c.logger.Info("connected", "source", source) 248 249 for { 250 select { 251 case <-ctx.Done(): 252 return nil 253 default: 254 msgType, msg, err := conn.ReadMessage() 255 if err != nil { 256 return err 257 } 258 if msgType != websocket.TextMessage { 259 continue 260 } 261 select { 262 case c.jobQueue <- job{source: source, message: msg}: 263 case <-ctx.Done(): 264 return nil 265 } 266 } 267 } 268}