this repo has no description
1package spindle 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "io" 8 "net/http" 9 "strconv" 10 "time" 11 12 "tangled.sh/tangled.sh/core/spindle/models" 13 14 "github.com/go-chi/chi/v5" 15 "github.com/gorilla/websocket" 16 "github.com/hpcloud/tail" 17) 18 19var upgrader = websocket.Upgrader{ 20 ReadBufferSize: 1024, 21 WriteBufferSize: 1024, 22} 23 24func (s *Spindle) Events(w http.ResponseWriter, r *http.Request) { 25 l := s.l.With("handler", "Events") 26 l.Debug("received new connection") 27 28 conn, err := upgrader.Upgrade(w, r, nil) 29 if err != nil { 30 l.Error("websocket upgrade failed", "err", err) 31 w.WriteHeader(http.StatusInternalServerError) 32 return 33 } 34 defer conn.Close() 35 l.Debug("upgraded http to wss") 36 37 ch := s.n.Subscribe() 38 defer s.n.Unsubscribe(ch) 39 40 ctx, cancel := context.WithCancel(r.Context()) 41 defer cancel() 42 go func() { 43 for { 44 if _, _, err := conn.NextReader(); err != nil { 45 l.Error("failed to read", "err", err) 46 cancel() 47 return 48 } 49 } 50 }() 51 52 defaultCursor := time.Now().UnixNano() 53 cursorStr := r.URL.Query().Get("cursor") 54 cursor, err := strconv.ParseInt(cursorStr, 10, 64) 55 if err != nil { 56 l.Error("empty or invalid cursor", "invalidCursor", cursorStr, "default", defaultCursor) 57 } 58 if cursor == 0 { 59 cursor = defaultCursor 60 } 61 62 // complete backfill first before going to live data 63 l.Debug("going through backfill", "cursor", cursor) 64 if err := s.streamPipelines(conn, &cursor); err != nil { 65 l.Error("failed to backfill", "err", err) 66 return 67 } 68 69 for { 70 // wait for new data or timeout 71 select { 72 case <-ctx.Done(): 73 l.Debug("stopping stream: client closed connection") 74 return 75 case <-ch: 76 // we have been notified of new data 77 l.Debug("going through live data", "cursor", cursor) 78 if err := s.streamPipelines(conn, &cursor); err != nil { 79 l.Error("failed to stream", "err", err) 80 return 81 } 82 case <-time.After(30 * time.Second): 83 // send a keep-alive 84 l.Debug("sent keepalive") 85 if err = conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil { 86 l.Error("failed to write control", "err", err) 87 } 88 } 89 } 90} 91 92func (s *Spindle) Logs(w http.ResponseWriter, r *http.Request) { 93 wid, err := getWorkflowID(r) 94 if err != nil { 95 http.Error(w, err.Error(), http.StatusBadRequest) 96 return 97 } 98 99 l := s.l.With("handler", "Logs") 100 l = s.l.With("wid", wid) 101 102 conn, err := upgrader.Upgrade(w, r, nil) 103 if err != nil { 104 l.Error("websocket upgrade failed", "err", err) 105 http.Error(w, "failed to upgrade", http.StatusInternalServerError) 106 return 107 } 108 defer func() { 109 _ = conn.WriteControl( 110 websocket.CloseMessage, 111 websocket.FormatCloseMessage(websocket.CloseNormalClosure, "log stream complete"), 112 time.Now().Add(time.Second), 113 ) 114 conn.Close() 115 }() 116 l.Debug("upgraded http to wss") 117 118 ctx, cancel := context.WithCancel(r.Context()) 119 defer cancel() 120 121 go func() { 122 for { 123 if _, _, err := conn.NextReader(); err != nil { 124 l.Debug("client disconnected", "err", err) 125 cancel() 126 return 127 } 128 } 129 }() 130 131 if err := s.streamLogsFromDisk(ctx, conn, wid); err != nil { 132 l.Info("log stream ended", "err", err) 133 } 134 135 l.Info("logs connection closed") 136} 137 138func (s *Spindle) streamLogsFromDisk(ctx context.Context, conn *websocket.Conn, wid models.WorkflowId) error { 139 status, err := s.db.GetStatus(wid) 140 if err != nil { 141 return err 142 } 143 isFinished := models.StatusKind(status.Status).IsFinish() 144 145 filePath := models.LogFilePath(s.cfg.Server.LogDir, wid) 146 147 config := tail.Config{ 148 Follow: !isFinished, 149 ReOpen: !isFinished, 150 MustExist: false, 151 Location: &tail.SeekInfo{ 152 Offset: 0, 153 Whence: io.SeekStart, 154 }, 155 // Logger: tail.DiscardingLogger, 156 } 157 158 t, err := tail.TailFile(filePath, config) 159 if err != nil { 160 return fmt.Errorf("failed to tail log file: %w", err) 161 } 162 defer t.Stop() 163 164 for { 165 select { 166 case <-ctx.Done(): 167 return ctx.Err() 168 case line := <-t.Lines: 169 if line == nil && isFinished { 170 return fmt.Errorf("tail completed") 171 } 172 173 if line == nil { 174 return fmt.Errorf("tail channel closed unexpectedly") 175 } 176 177 if line.Err != nil { 178 return fmt.Errorf("error tailing log file: %w", line.Err) 179 } 180 181 if err := conn.WriteMessage(websocket.TextMessage, []byte(line.Text)); err != nil { 182 return fmt.Errorf("failed to write to websocket: %w", err) 183 } 184 } 185 } 186} 187 188func (s *Spindle) streamPipelines(conn *websocket.Conn, cursor *int64) error { 189 events, err := s.db.GetEvents(*cursor) 190 if err != nil { 191 s.l.Debug("err", "err", err) 192 return err 193 } 194 s.l.Debug("ops", "ops", events) 195 196 for _, event := range events { 197 // first extract the inner json into a map 198 var eventJson map[string]any 199 err := json.Unmarshal([]byte(event.EventJson), &eventJson) 200 if err != nil { 201 s.l.Error("failed to unmarshal event", "err", err) 202 return err 203 } 204 205 jsonMsg, err := json.Marshal(map[string]any{ 206 "rkey": event.Rkey, 207 "nsid": event.Nsid, 208 "event": eventJson, 209 }) 210 if err != nil { 211 s.l.Error("failed to marshal record", "err", err) 212 return err 213 } 214 215 if err := conn.WriteMessage(websocket.TextMessage, jsonMsg); err != nil { 216 s.l.Debug("err", "err", err) 217 return err 218 } 219 *cursor = event.Created 220 } 221 222 return nil 223} 224 225func getWorkflowID(r *http.Request) (models.WorkflowId, error) { 226 knot := chi.URLParam(r, "knot") 227 rkey := chi.URLParam(r, "rkey") 228 name := chi.URLParam(r, "name") 229 230 if knot == "" || rkey == "" || name == "" { 231 return models.WorkflowId{}, fmt.Errorf("missing required parameters") 232 } 233 234 return models.WorkflowId{ 235 PipelineId: models.PipelineId{ 236 Knot: knot, 237 Rkey: rkey, 238 }, 239 Name: name, 240 }, nil 241}