this repo has no description
1package spindle
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "io"
8 "net/http"
9 "strconv"
10 "time"
11
12 "tangled.sh/tangled.sh/core/spindle/models"
13
14 "github.com/go-chi/chi/v5"
15 "github.com/gorilla/websocket"
16 "github.com/hpcloud/tail"
17)
18
19var upgrader = websocket.Upgrader{
20 ReadBufferSize: 1024,
21 WriteBufferSize: 1024,
22}
23
24func (s *Spindle) Events(w http.ResponseWriter, r *http.Request) {
25 l := s.l.With("handler", "Events")
26 l.Debug("received new connection")
27
28 conn, err := upgrader.Upgrade(w, r, nil)
29 if err != nil {
30 l.Error("websocket upgrade failed", "err", err)
31 w.WriteHeader(http.StatusInternalServerError)
32 return
33 }
34 defer conn.Close()
35 l.Debug("upgraded http to wss")
36
37 ch := s.n.Subscribe()
38 defer s.n.Unsubscribe(ch)
39
40 ctx, cancel := context.WithCancel(r.Context())
41 defer cancel()
42 go func() {
43 for {
44 if _, _, err := conn.NextReader(); err != nil {
45 l.Error("failed to read", "err", err)
46 cancel()
47 return
48 }
49 }
50 }()
51
52 defaultCursor := time.Now().UnixNano()
53 cursorStr := r.URL.Query().Get("cursor")
54 cursor, err := strconv.ParseInt(cursorStr, 10, 64)
55 if err != nil {
56 l.Error("empty or invalid cursor", "invalidCursor", cursorStr, "default", defaultCursor)
57 }
58 if cursor == 0 {
59 cursor = defaultCursor
60 }
61
62 // complete backfill first before going to live data
63 l.Debug("going through backfill", "cursor", cursor)
64 if err := s.streamPipelines(conn, &cursor); err != nil {
65 l.Error("failed to backfill", "err", err)
66 return
67 }
68
69 for {
70 // wait for new data or timeout
71 select {
72 case <-ctx.Done():
73 l.Debug("stopping stream: client closed connection")
74 return
75 case <-ch:
76 // we have been notified of new data
77 l.Debug("going through live data", "cursor", cursor)
78 if err := s.streamPipelines(conn, &cursor); err != nil {
79 l.Error("failed to stream", "err", err)
80 return
81 }
82 case <-time.After(30 * time.Second):
83 // send a keep-alive
84 l.Debug("sent keepalive")
85 if err = conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil {
86 l.Error("failed to write control", "err", err)
87 }
88 }
89 }
90}
91
92func (s *Spindle) Logs(w http.ResponseWriter, r *http.Request) {
93 wid, err := getWorkflowID(r)
94 if err != nil {
95 http.Error(w, err.Error(), http.StatusBadRequest)
96 return
97 }
98
99 l := s.l.With("handler", "Logs")
100 l = s.l.With("wid", wid)
101
102 conn, err := upgrader.Upgrade(w, r, nil)
103 if err != nil {
104 l.Error("websocket upgrade failed", "err", err)
105 http.Error(w, "failed to upgrade", http.StatusInternalServerError)
106 return
107 }
108 defer func() {
109 _ = conn.WriteControl(
110 websocket.CloseMessage,
111 websocket.FormatCloseMessage(websocket.CloseNormalClosure, "log stream complete"),
112 time.Now().Add(time.Second),
113 )
114 conn.Close()
115 }()
116 l.Debug("upgraded http to wss")
117
118 ctx, cancel := context.WithCancel(r.Context())
119 defer cancel()
120
121 go func() {
122 for {
123 if _, _, err := conn.NextReader(); err != nil {
124 l.Debug("client disconnected", "err", err)
125 cancel()
126 return
127 }
128 }
129 }()
130
131 if err := s.streamLogsFromDisk(ctx, conn, wid); err != nil {
132 l.Info("log stream ended", "err", err)
133 }
134
135 l.Info("logs connection closed")
136}
137
138func (s *Spindle) streamLogsFromDisk(ctx context.Context, conn *websocket.Conn, wid models.WorkflowId) error {
139 status, err := s.db.GetStatus(wid)
140 if err != nil {
141 return err
142 }
143 isFinished := models.StatusKind(status.Status).IsFinish()
144
145 filePath := models.LogFilePath(s.cfg.Server.LogDir, wid)
146
147 config := tail.Config{
148 Follow: !isFinished,
149 ReOpen: !isFinished,
150 MustExist: false,
151 Location: &tail.SeekInfo{
152 Offset: 0,
153 Whence: io.SeekStart,
154 },
155 // Logger: tail.DiscardingLogger,
156 }
157
158 t, err := tail.TailFile(filePath, config)
159 if err != nil {
160 return fmt.Errorf("failed to tail log file: %w", err)
161 }
162 defer t.Stop()
163
164 for {
165 select {
166 case <-ctx.Done():
167 return ctx.Err()
168 case line := <-t.Lines:
169 if line == nil && isFinished {
170 return fmt.Errorf("tail completed")
171 }
172
173 if line == nil {
174 return fmt.Errorf("tail channel closed unexpectedly")
175 }
176
177 if line.Err != nil {
178 return fmt.Errorf("error tailing log file: %w", line.Err)
179 }
180
181 if err := conn.WriteMessage(websocket.TextMessage, []byte(line.Text)); err != nil {
182 return fmt.Errorf("failed to write to websocket: %w", err)
183 }
184 }
185 }
186}
187
188func (s *Spindle) streamPipelines(conn *websocket.Conn, cursor *int64) error {
189 events, err := s.db.GetEvents(*cursor)
190 if err != nil {
191 s.l.Debug("err", "err", err)
192 return err
193 }
194 s.l.Debug("ops", "ops", events)
195
196 for _, event := range events {
197 // first extract the inner json into a map
198 var eventJson map[string]any
199 err := json.Unmarshal([]byte(event.EventJson), &eventJson)
200 if err != nil {
201 s.l.Error("failed to unmarshal event", "err", err)
202 return err
203 }
204
205 jsonMsg, err := json.Marshal(map[string]any{
206 "rkey": event.Rkey,
207 "nsid": event.Nsid,
208 "event": eventJson,
209 })
210 if err != nil {
211 s.l.Error("failed to marshal record", "err", err)
212 return err
213 }
214
215 if err := conn.WriteMessage(websocket.TextMessage, jsonMsg); err != nil {
216 s.l.Debug("err", "err", err)
217 return err
218 }
219 *cursor = event.Created
220 }
221
222 return nil
223}
224
225func getWorkflowID(r *http.Request) (models.WorkflowId, error) {
226 knot := chi.URLParam(r, "knot")
227 rkey := chi.URLParam(r, "rkey")
228 name := chi.URLParam(r, "name")
229
230 if knot == "" || rkey == "" || name == "" {
231 return models.WorkflowId{}, fmt.Errorf("missing required parameters")
232 }
233
234 return models.WorkflowId{
235 PipelineId: models.PipelineId{
236 Knot: knot,
237 Rkey: rkey,
238 },
239 Name: name,
240 }, nil
241}