Monorepo for Tangled
at ef3dbf02ea0d133dffb20583ac15c761951b9c8f 452 lines 11 kB view raw
1package pipelines 2 3import ( 4 "bytes" 5 "context" 6 "encoding/json" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "strings" 11 "time" 12 13 "tangled.org/core/api/tangled" 14 "tangled.org/core/appview/config" 15 "tangled.org/core/appview/db" 16 "tangled.org/core/appview/middleware" 17 "tangled.org/core/appview/models" 18 "tangled.org/core/appview/oauth" 19 "tangled.org/core/appview/pages" 20 "tangled.org/core/appview/reporesolver" 21 "tangled.org/core/eventconsumer" 22 "tangled.org/core/idresolver" 23 "tangled.org/core/orm" 24 "tangled.org/core/rbac" 25 spindlemodel "tangled.org/core/spindle/models" 26 27 "github.com/go-chi/chi/v5" 28 "github.com/gorilla/websocket" 29) 30 31type Pipelines struct { 32 repoResolver *reporesolver.RepoResolver 33 idResolver *idresolver.Resolver 34 config *config.Config 35 oauth *oauth.OAuth 36 pages *pages.Pages 37 spindlestream *eventconsumer.Consumer 38 db *db.DB 39 enforcer *rbac.Enforcer 40 logger *slog.Logger 41} 42 43func (p *Pipelines) Router(mw *middleware.Middleware) http.Handler { 44 r := chi.NewRouter() 45 r.Get("/", p.Index) 46 r.Get("/{pipeline}/workflow/{workflow}", p.Workflow) 47 r.Get("/{pipeline}/workflow/{workflow}/logs", p.Logs) 48 r. 49 With(mw.RepoPermissionMiddleware("repo:owner")). 50 Post("/{pipeline}/workflow/{workflow}/cancel", p.Cancel) 51 52 return r 53} 54 55func New( 56 oauth *oauth.OAuth, 57 repoResolver *reporesolver.RepoResolver, 58 pages *pages.Pages, 59 spindlestream *eventconsumer.Consumer, 60 idResolver *idresolver.Resolver, 61 db *db.DB, 62 config *config.Config, 63 enforcer *rbac.Enforcer, 64 logger *slog.Logger, 65) *Pipelines { 66 return &Pipelines{ 67 oauth: oauth, 68 repoResolver: repoResolver, 69 pages: pages, 70 idResolver: idResolver, 71 config: config, 72 spindlestream: spindlestream, 73 db: db, 74 enforcer: enforcer, 75 logger: logger, 76 } 77} 78 79func (p *Pipelines) Index(w http.ResponseWriter, r *http.Request) { 80 user := p.oauth.GetMultiAccountUser(r) 81 l := p.logger.With("handler", "Index") 82 83 f, err := p.repoResolver.Resolve(r) 84 if err != nil { 85 l.Error("failed to get repo and knot", "err", err) 86 return 87 } 88 89 ps, err := db.GetPipelineStatuses( 90 p.db, 91 30, 92 orm.FilterEq("repo_owner", f.Did), 93 orm.FilterEq("repo_name", f.Name), 94 orm.FilterEq("knot", f.Knot), 95 ) 96 if err != nil { 97 l.Error("failed to query db", "err", err) 98 return 99 } 100 101 // Filter by trigger 102 filterTrigger := r.URL.Query().Get("trigger") 103 var filtered []models.Pipeline 104 for _, pipeline := range ps { 105 if filterTrigger == "push" && pipeline.Trigger != nil && pipeline.Trigger.IsPush() { 106 filtered = append(filtered, pipeline) 107 } else if filterTrigger == "pr" && pipeline.Trigger != nil && pipeline.Trigger.IsPullRequest() { 108 filtered = append(filtered, pipeline) 109 } else if filterTrigger == "" || filterTrigger == "all" { 110 filtered = append(filtered, pipeline) 111 } 112 } 113 114 filteringByPush := filterTrigger == "push" 115 filteringByPR := filterTrigger == "pr" 116 117 p.pages.Pipelines(w, pages.PipelinesParams{ 118 LoggedInUser: user, 119 RepoInfo: p.repoResolver.GetRepoInfo(r, user), 120 Pipelines: filtered, 121 FilteringByPush: filteringByPush, 122 FilteringByPR: filteringByPR, 123 }) 124} 125 126func (p *Pipelines) Workflow(w http.ResponseWriter, r *http.Request) { 127 user := p.oauth.GetMultiAccountUser(r) 128 l := p.logger.With("handler", "Workflow") 129 130 f, err := p.repoResolver.Resolve(r) 131 if err != nil { 132 l.Error("failed to get repo and knot", "err", err) 133 return 134 } 135 136 pipelineId := chi.URLParam(r, "pipeline") 137 if pipelineId == "" { 138 l.Error("empty pipeline ID") 139 return 140 } 141 142 workflow := chi.URLParam(r, "workflow") 143 if workflow == "" { 144 l.Error("empty workflow name") 145 return 146 } 147 148 ps, err := db.GetPipelineStatuses( 149 p.db, 150 1, 151 orm.FilterEq("repo_owner", f.Did), 152 orm.FilterEq("repo_name", f.Name), 153 orm.FilterEq("knot", f.Knot), 154 orm.FilterEq("id", pipelineId), 155 ) 156 if err != nil { 157 l.Error("failed to query db", "err", err) 158 return 159 } 160 161 if len(ps) != 1 { 162 l.Error("invalid number of pipelines", "len", len(ps)) 163 return 164 } 165 166 singlePipeline := ps[0] 167 168 p.pages.Workflow(w, pages.WorkflowParams{ 169 LoggedInUser: user, 170 RepoInfo: p.repoResolver.GetRepoInfo(r, user), 171 Pipeline: singlePipeline, 172 Workflow: workflow, 173 }) 174} 175 176var upgrader = websocket.Upgrader{ 177 ReadBufferSize: 1024, 178 WriteBufferSize: 1024, 179} 180 181func (p *Pipelines) Logs(w http.ResponseWriter, r *http.Request) { 182 l := p.logger.With("handler", "logs") 183 184 clientConn, err := upgrader.Upgrade(w, r, nil) 185 if err != nil { 186 l.Error("websocket upgrade failed", "err", err) 187 return 188 } 189 defer func() { 190 _ = clientConn.WriteControl( 191 websocket.CloseMessage, 192 websocket.FormatCloseMessage(websocket.CloseNormalClosure, "log stream complete"), 193 time.Now().Add(time.Second), 194 ) 195 clientConn.Close() 196 }() 197 198 ctx, cancel := context.WithCancel(r.Context()) 199 defer cancel() 200 201 f, err := p.repoResolver.Resolve(r) 202 if err != nil { 203 l.Error("failed to get repo and knot", "err", err) 204 http.Error(w, "bad repo/knot", http.StatusBadRequest) 205 return 206 } 207 208 pipelineId := chi.URLParam(r, "pipeline") 209 workflow := chi.URLParam(r, "workflow") 210 if pipelineId == "" || workflow == "" { 211 http.Error(w, "missing pipeline ID or workflow", http.StatusBadRequest) 212 return 213 } 214 215 ps, err := db.GetPipelineStatuses( 216 p.db, 217 1, 218 orm.FilterEq("repo_owner", f.Did), 219 orm.FilterEq("repo_name", f.Name), 220 orm.FilterEq("knot", f.Knot), 221 orm.FilterEq("id", pipelineId), 222 ) 223 if err != nil || len(ps) != 1 { 224 l.Error("pipeline query failed", "err", err, "count", len(ps)) 225 http.Error(w, "pipeline not found", http.StatusNotFound) 226 return 227 } 228 229 singlePipeline := ps[0] 230 spindle := f.Spindle 231 knot := f.Knot 232 rkey := singlePipeline.Rkey 233 234 if spindle == "" || knot == "" || rkey == "" { 235 http.Error(w, "invalid repo info", http.StatusBadRequest) 236 return 237 } 238 239 scheme := "wss" 240 if p.config.Core.Dev { 241 scheme = "ws" 242 } 243 244 url := scheme + "://" + strings.Join([]string{spindle, "logs", knot, rkey, workflow}, "/") 245 l = l.With("url", url) 246 l.Info("logs endpoint hit") 247 248 spindleConn, _, err := websocket.DefaultDialer.Dial(url, nil) 249 if err != nil { 250 l.Error("websocket dial failed", "err", err) 251 http.Error(w, "failed to connect to log stream", http.StatusBadGateway) 252 return 253 } 254 defer spindleConn.Close() 255 256 // create a channel for incoming messages 257 evChan := make(chan logEvent, 100) 258 // start a goroutine to read from spindle 259 go readLogs(spindleConn, evChan) 260 261 stepStartTimes := make(map[int]time.Time) 262 var fragment bytes.Buffer 263 for { 264 select { 265 case <-ctx.Done(): 266 l.Info("client disconnected") 267 return 268 269 case ev, ok := <-evChan: 270 if !ok { 271 continue 272 } 273 274 if ev.err != nil && ev.isCloseError() { 275 l.Debug("graceful shutdown, tail complete", "err", err) 276 return 277 } 278 if ev.err != nil { 279 l.Error("error reading from spindle", "err", err) 280 return 281 } 282 283 var logLine spindlemodel.LogLine 284 if err = json.Unmarshal(ev.msg, &logLine); err != nil { 285 l.Error("failed to parse logline", "err", err) 286 continue 287 } 288 289 fragment.Reset() 290 291 switch logLine.Kind { 292 case spindlemodel.LogKindControl: 293 switch logLine.StepStatus { 294 case spindlemodel.StepStatusStart: 295 stepStartTimes[logLine.StepId] = logLine.Time 296 collapsed := false 297 if logLine.StepKind == spindlemodel.StepKindSystem { 298 collapsed = true 299 } 300 err = p.pages.LogBlock(&fragment, pages.LogBlockParams{ 301 Id: logLine.StepId, 302 Name: logLine.Content, 303 Command: logLine.StepCommand, 304 Collapsed: collapsed, 305 StartTime: logLine.Time, 306 }) 307 case spindlemodel.StepStatusEnd: 308 startTime := stepStartTimes[logLine.StepId] 309 endTime := logLine.Time 310 err = p.pages.LogBlockEnd(&fragment, pages.LogBlockEndParams{ 311 Id: logLine.StepId, 312 StartTime: startTime, 313 EndTime: endTime, 314 }) 315 } 316 317 case spindlemodel.LogKindData: 318 // data messages simply insert new log lines into current step 319 err = p.pages.LogLine(&fragment, pages.LogLineParams{ 320 Id: logLine.StepId, 321 Content: logLine.Content, 322 }) 323 } 324 if err != nil { 325 l.Error("failed to render log line", "err", err) 326 return 327 } 328 329 if err = clientConn.WriteMessage(websocket.TextMessage, fragment.Bytes()); err != nil { 330 l.Error("error writing to client", "err", err) 331 return 332 } 333 334 case <-time.After(30 * time.Second): 335 l.Debug("sent keepalive") 336 if err = clientConn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil { 337 l.Error("failed to write control", "err", err) 338 return 339 } 340 } 341 } 342} 343 344func (p *Pipelines) Cancel(w http.ResponseWriter, r *http.Request) { 345 l := p.logger.With("handler", "Cancel") 346 347 var ( 348 pipelineId = chi.URLParam(r, "pipeline") 349 workflow = chi.URLParam(r, "workflow") 350 ) 351 if pipelineId == "" || workflow == "" { 352 http.Error(w, "missing pipeline ID or workflow", http.StatusBadRequest) 353 return 354 } 355 356 f, err := p.repoResolver.Resolve(r) 357 if err != nil { 358 l.Error("failed to get repo and knot", "err", err) 359 http.Error(w, "bad repo/knot", http.StatusBadRequest) 360 return 361 } 362 363 pipeline, err := func() (models.Pipeline, error) { 364 ps, err := db.GetPipelineStatuses( 365 p.db, 366 1, 367 orm.FilterEq("repo_owner", f.Did), 368 orm.FilterEq("repo_name", f.Name), 369 orm.FilterEq("knot", f.Knot), 370 orm.FilterEq("id", pipelineId), 371 ) 372 if err != nil { 373 return models.Pipeline{}, err 374 } 375 if len(ps) != 1 { 376 return models.Pipeline{}, fmt.Errorf("wrong pipeline count %d", len(ps)) 377 } 378 return ps[0], nil 379 }() 380 if err != nil { 381 l.Error("pipeline query failed", "err", err) 382 http.Error(w, "pipeline not found", http.StatusNotFound) 383 } 384 var ( 385 spindle = f.Spindle 386 knot = f.Knot 387 rkey = pipeline.Rkey 388 ) 389 390 if spindle == "" || knot == "" || rkey == "" { 391 http.Error(w, "invalid repo info", http.StatusBadRequest) 392 return 393 } 394 395 spindleClient, err := p.oauth.ServiceClient( 396 r, 397 oauth.WithService(f.Spindle), 398 oauth.WithLxm(tangled.PipelineCancelPipelineNSID), 399 oauth.WithDev(p.config.Core.Dev), 400 oauth.WithTimeout(time.Second*30), // workflow cleanup usually takes time 401 ) 402 403 err = tangled.PipelineCancelPipeline( 404 r.Context(), 405 spindleClient, 406 &tangled.PipelineCancelPipeline_Input{ 407 Repo: string(f.RepoAt()), 408 Pipeline: pipeline.AtUri().String(), 409 Workflow: workflow, 410 }, 411 ) 412 errorId := "workflow-error" 413 if err != nil { 414 l.Error("failed to cancel workflow", "err", err) 415 p.pages.Notice(w, errorId, "Failed to cancel workflow") 416 return 417 } 418 l.Debug("canceled pipeline", "uri", pipeline.AtUri()) 419} 420 421// either a message or an error 422type logEvent struct { 423 msg []byte 424 err error 425} 426 427func (ev *logEvent) isCloseError() bool { 428 return websocket.IsCloseError( 429 ev.err, 430 websocket.CloseNormalClosure, 431 websocket.CloseGoingAway, 432 websocket.CloseAbnormalClosure, 433 ) 434} 435 436// read logs from spindle and pass through to chan 437func readLogs(conn *websocket.Conn, ch chan logEvent) { 438 defer close(ch) 439 440 for { 441 if conn == nil { 442 return 443 } 444 445 _, msg, err := conn.ReadMessage() 446 if err != nil { 447 ch <- logEvent{err: err} 448 return 449 } 450 ch <- logEvent{msg: msg} 451 } 452}