Monorepo for Tangled
at master 456 lines 11 kB view raw
1package pipelines 2 3import ( 4 "bytes" 5 "context" 6 "encoding/json" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "strings" 11 "time" 12 13 "tangled.org/core/api/tangled" 14 "tangled.org/core/appview/config" 15 "tangled.org/core/appview/db" 16 "tangled.org/core/appview/middleware" 17 "tangled.org/core/appview/models" 18 "tangled.org/core/appview/oauth" 19 "tangled.org/core/appview/pages" 20 "tangled.org/core/appview/reporesolver" 21 "tangled.org/core/eventconsumer" 22 "tangled.org/core/idresolver" 23 "tangled.org/core/orm" 24 "tangled.org/core/rbac" 25 spindlemodel "tangled.org/core/spindle/models" 26 27 "github.com/go-chi/chi/v5" 28 "github.com/gorilla/websocket" 29) 30 31type Pipelines struct { 32 repoResolver *reporesolver.RepoResolver 33 idResolver *idresolver.Resolver 34 config *config.Config 35 oauth *oauth.OAuth 36 pages *pages.Pages 37 spindlestream *eventconsumer.Consumer 38 db *db.DB 39 enforcer *rbac.Enforcer 40 logger *slog.Logger 41} 42 43func (p *Pipelines) Router(mw *middleware.Middleware) http.Handler { 44 r := chi.NewRouter() 45 r.Get("/", p.Index) 46 r.Get("/{pipeline}/workflow/{workflow}", p.Workflow) 47 r.Get("/{pipeline}/workflow/{workflow}/logs", p.Logs) 48 r. 49 With(mw.RepoPermissionMiddleware("repo:owner")). 50 Post("/{pipeline}/workflow/{workflow}/cancel", p.Cancel) 51 52 return r 53} 54 55func New( 56 oauth *oauth.OAuth, 57 repoResolver *reporesolver.RepoResolver, 58 pages *pages.Pages, 59 spindlestream *eventconsumer.Consumer, 60 idResolver *idresolver.Resolver, 61 db *db.DB, 62 config *config.Config, 63 enforcer *rbac.Enforcer, 64 logger *slog.Logger, 65) *Pipelines { 66 return &Pipelines{ 67 oauth: oauth, 68 repoResolver: repoResolver, 69 pages: pages, 70 idResolver: idResolver, 71 config: config, 72 spindlestream: spindlestream, 73 db: db, 74 enforcer: enforcer, 75 logger: logger, 76 } 77} 78 79func (p *Pipelines) Index(w http.ResponseWriter, r *http.Request) { 80 user := p.oauth.GetMultiAccountUser(r) 81 l := p.logger.With("handler", "Index") 82 83 f, err := p.repoResolver.Resolve(r) 84 if err != nil { 85 l.Error("failed to get repo and knot", "err", err) 86 return 87 } 88 89 filterKind := r.URL.Query().Get("trigger") 90 filters := []orm.Filter{ 91 orm.FilterEq("p.repo_owner", f.Did), 92 orm.FilterEq("p.repo_name", f.Name), 93 orm.FilterEq("p.knot", f.Knot), 94 } 95 switch filterKind { 96 case "push": 97 filters = append(filters, orm.FilterEq("t.kind", "push")) 98 case "pull_request": 99 filters = append(filters, orm.FilterEq("t.kind", "pull_request")) 100 default: 101 // no filters otherwise, default to "all" 102 filterKind = "all" 103 } 104 105 ps, err := db.GetPipelineStatuses( 106 p.db, 107 30, 108 filters..., 109 ) 110 if err != nil { 111 l.Error("failed to query db", "err", err) 112 return 113 } 114 115 total, err := db.GetTotalPipelineStatuses(p.db, filters...) 116 if err != nil { 117 l.Error("failed to query db", "err", err) 118 return 119 } 120 121 p.pages.Pipelines(w, pages.PipelinesParams{ 122 LoggedInUser: user, 123 RepoInfo: p.repoResolver.GetRepoInfo(r, user), 124 Pipelines: ps, 125 FilterKind: filterKind, 126 Total: total, 127 }) 128} 129 130func (p *Pipelines) Workflow(w http.ResponseWriter, r *http.Request) { 131 user := p.oauth.GetMultiAccountUser(r) 132 l := p.logger.With("handler", "Workflow") 133 134 f, err := p.repoResolver.Resolve(r) 135 if err != nil { 136 l.Error("failed to get repo and knot", "err", err) 137 return 138 } 139 140 pipelineId := chi.URLParam(r, "pipeline") 141 if pipelineId == "" { 142 l.Error("empty pipeline ID") 143 return 144 } 145 146 workflow := chi.URLParam(r, "workflow") 147 if workflow == "" { 148 l.Error("empty workflow name") 149 return 150 } 151 152 ps, err := db.GetPipelineStatuses( 153 p.db, 154 1, 155 orm.FilterEq("p.repo_owner", f.Did), 156 orm.FilterEq("p.repo_name", f.Name), 157 orm.FilterEq("p.knot", f.Knot), 158 orm.FilterEq("p.id", pipelineId), 159 ) 160 if err != nil { 161 l.Error("failed to query db", "err", err) 162 return 163 } 164 165 if len(ps) != 1 { 166 l.Error("invalid number of pipelines", "len", len(ps)) 167 return 168 } 169 170 singlePipeline := ps[0] 171 172 p.pages.Workflow(w, pages.WorkflowParams{ 173 LoggedInUser: user, 174 RepoInfo: p.repoResolver.GetRepoInfo(r, user), 175 Pipeline: singlePipeline, 176 Workflow: workflow, 177 }) 178} 179 180var upgrader = websocket.Upgrader{ 181 ReadBufferSize: 1024, 182 WriteBufferSize: 1024, 183} 184 185func (p *Pipelines) Logs(w http.ResponseWriter, r *http.Request) { 186 l := p.logger.With("handler", "logs") 187 188 clientConn, err := upgrader.Upgrade(w, r, nil) 189 if err != nil { 190 l.Error("websocket upgrade failed", "err", err) 191 return 192 } 193 defer func() { 194 _ = clientConn.WriteControl( 195 websocket.CloseMessage, 196 websocket.FormatCloseMessage(websocket.CloseNormalClosure, "log stream complete"), 197 time.Now().Add(time.Second), 198 ) 199 clientConn.Close() 200 }() 201 202 ctx, cancel := context.WithCancel(r.Context()) 203 defer cancel() 204 205 f, err := p.repoResolver.Resolve(r) 206 if err != nil { 207 l.Error("failed to get repo and knot", "err", err) 208 http.Error(w, "bad repo/knot", http.StatusBadRequest) 209 return 210 } 211 212 pipelineId := chi.URLParam(r, "pipeline") 213 workflow := chi.URLParam(r, "workflow") 214 if pipelineId == "" || workflow == "" { 215 http.Error(w, "missing pipeline ID or workflow", http.StatusBadRequest) 216 return 217 } 218 219 ps, err := db.GetPipelineStatuses( 220 p.db, 221 1, 222 orm.FilterEq("p.repo_owner", f.Did), 223 orm.FilterEq("p.repo_name", f.Name), 224 orm.FilterEq("p.knot", f.Knot), 225 orm.FilterEq("p.id", pipelineId), 226 ) 227 if err != nil || len(ps) != 1 { 228 l.Error("pipeline query failed", "err", err, "count", len(ps)) 229 http.Error(w, "pipeline not found", http.StatusNotFound) 230 return 231 } 232 233 singlePipeline := ps[0] 234 spindle := f.Spindle 235 knot := f.Knot 236 rkey := singlePipeline.Rkey 237 238 if spindle == "" || knot == "" || rkey == "" { 239 http.Error(w, "invalid repo info", http.StatusBadRequest) 240 return 241 } 242 243 scheme := "wss" 244 if p.config.Core.Dev { 245 scheme = "ws" 246 } 247 248 url := scheme + "://" + strings.Join([]string{spindle, "logs", knot, rkey, workflow}, "/") 249 l = l.With("url", url) 250 l.Info("logs endpoint hit") 251 252 spindleConn, _, err := websocket.DefaultDialer.Dial(url, nil) 253 if err != nil { 254 l.Error("websocket dial failed", "err", err) 255 http.Error(w, "failed to connect to log stream", http.StatusBadGateway) 256 return 257 } 258 defer spindleConn.Close() 259 260 // create a channel for incoming messages 261 evChan := make(chan logEvent, 100) 262 // start a goroutine to read from spindle 263 go readLogs(spindleConn, evChan) 264 265 stepStartTimes := make(map[int]time.Time) 266 var fragment bytes.Buffer 267 for { 268 select { 269 case <-ctx.Done(): 270 l.Info("client disconnected") 271 return 272 273 case ev, ok := <-evChan: 274 if !ok { 275 continue 276 } 277 278 if ev.err != nil && ev.isCloseError() { 279 l.Debug("graceful shutdown, tail complete", "err", err) 280 return 281 } 282 if ev.err != nil { 283 l.Error("error reading from spindle", "err", err) 284 return 285 } 286 287 var logLine spindlemodel.LogLine 288 if err = json.Unmarshal(ev.msg, &logLine); err != nil { 289 l.Error("failed to parse logline", "err", err) 290 continue 291 } 292 293 fragment.Reset() 294 295 switch logLine.Kind { 296 case spindlemodel.LogKindControl: 297 switch logLine.StepStatus { 298 case spindlemodel.StepStatusStart: 299 stepStartTimes[logLine.StepId] = logLine.Time 300 collapsed := false 301 if logLine.StepKind == spindlemodel.StepKindSystem { 302 collapsed = true 303 } 304 err = p.pages.LogBlock(&fragment, pages.LogBlockParams{ 305 Id: logLine.StepId, 306 Name: logLine.Content, 307 Command: logLine.StepCommand, 308 Collapsed: collapsed, 309 StartTime: logLine.Time, 310 }) 311 case spindlemodel.StepStatusEnd: 312 startTime := stepStartTimes[logLine.StepId] 313 endTime := logLine.Time 314 err = p.pages.LogBlockEnd(&fragment, pages.LogBlockEndParams{ 315 Id: logLine.StepId, 316 StartTime: startTime, 317 EndTime: endTime, 318 }) 319 } 320 321 case spindlemodel.LogKindData: 322 // data messages simply insert new log lines into current step 323 err = p.pages.LogLine(&fragment, pages.LogLineParams{ 324 Id: logLine.StepId, 325 Content: logLine.Content, 326 }) 327 } 328 if err != nil { 329 l.Error("failed to render log line", "err", err) 330 return 331 } 332 333 if err = clientConn.WriteMessage(websocket.TextMessage, fragment.Bytes()); err != nil { 334 l.Error("error writing to client", "err", err) 335 return 336 } 337 338 case <-time.After(30 * time.Second): 339 l.Debug("sent keepalive") 340 if err = clientConn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil { 341 l.Error("failed to write control", "err", err) 342 return 343 } 344 } 345 } 346} 347 348func (p *Pipelines) Cancel(w http.ResponseWriter, r *http.Request) { 349 l := p.logger.With("handler", "Cancel") 350 351 var ( 352 pipelineId = chi.URLParam(r, "pipeline") 353 workflow = chi.URLParam(r, "workflow") 354 ) 355 if pipelineId == "" || workflow == "" { 356 http.Error(w, "missing pipeline ID or workflow", http.StatusBadRequest) 357 return 358 } 359 360 f, err := p.repoResolver.Resolve(r) 361 if err != nil { 362 l.Error("failed to get repo and knot", "err", err) 363 http.Error(w, "bad repo/knot", http.StatusBadRequest) 364 return 365 } 366 367 pipeline, err := func() (models.Pipeline, error) { 368 ps, err := db.GetPipelineStatuses( 369 p.db, 370 1, 371 orm.FilterEq("p.repo_owner", f.Did), 372 orm.FilterEq("p.repo_name", f.Name), 373 orm.FilterEq("p.knot", f.Knot), 374 orm.FilterEq("p.id", pipelineId), 375 ) 376 if err != nil { 377 return models.Pipeline{}, err 378 } 379 if len(ps) != 1 { 380 return models.Pipeline{}, fmt.Errorf("wrong pipeline count %d", len(ps)) 381 } 382 return ps[0], nil 383 }() 384 if err != nil { 385 l.Error("pipeline query failed", "err", err) 386 http.Error(w, "pipeline not found", http.StatusNotFound) 387 } 388 var ( 389 spindle = f.Spindle 390 knot = f.Knot 391 rkey = pipeline.Rkey 392 ) 393 394 if spindle == "" || knot == "" || rkey == "" { 395 http.Error(w, "invalid repo info", http.StatusBadRequest) 396 return 397 } 398 399 spindleClient, err := p.oauth.ServiceClient( 400 r, 401 oauth.WithService(f.Spindle), 402 oauth.WithLxm(tangled.PipelineCancelPipelineNSID), 403 oauth.WithDev(p.config.Core.Dev), 404 oauth.WithTimeout(time.Second*30), // workflow cleanup usually takes time 405 ) 406 407 err = tangled.PipelineCancelPipeline( 408 r.Context(), 409 spindleClient, 410 &tangled.PipelineCancelPipeline_Input{ 411 Repo: string(f.RepoAt()), 412 Pipeline: pipeline.AtUri().String(), 413 Workflow: workflow, 414 }, 415 ) 416 errorId := "workflow-error" 417 if err != nil { 418 l.Error("failed to cancel workflow", "err", err) 419 p.pages.Notice(w, errorId, "Failed to cancel workflow") 420 return 421 } 422 l.Debug("canceled pipeline", "uri", pipeline.AtUri()) 423} 424 425// either a message or an error 426type logEvent struct { 427 msg []byte 428 err error 429} 430 431func (ev *logEvent) isCloseError() bool { 432 return websocket.IsCloseError( 433 ev.err, 434 websocket.CloseNormalClosure, 435 websocket.CloseGoingAway, 436 websocket.CloseAbnormalClosure, 437 ) 438} 439 440// read logs from spindle and pass through to chan 441func readLogs(conn *websocket.Conn, ch chan logEvent) { 442 defer close(ch) 443 444 for { 445 if conn == nil { 446 return 447 } 448 449 _, msg, err := conn.ReadMessage() 450 if err != nil { 451 ch <- logEvent{err: err} 452 return 453 } 454 ch <- logEvent{msg: msg} 455 } 456}