forked from
tangled.org/core
Monorepo for Tangled
1package pipelines
2
3import (
4 "bytes"
5 "context"
6 "encoding/json"
7 "fmt"
8 "log/slog"
9 "net/http"
10 "strings"
11 "time"
12
13 "tangled.org/core/api/tangled"
14 "tangled.org/core/appview/config"
15 "tangled.org/core/appview/db"
16 "tangled.org/core/appview/middleware"
17 "tangled.org/core/appview/models"
18 "tangled.org/core/appview/oauth"
19 "tangled.org/core/appview/pages"
20 "tangled.org/core/appview/reporesolver"
21 "tangled.org/core/eventconsumer"
22 "tangled.org/core/idresolver"
23 "tangled.org/core/orm"
24 "tangled.org/core/rbac"
25 spindlemodel "tangled.org/core/spindle/models"
26
27 "github.com/go-chi/chi/v5"
28 "github.com/gorilla/websocket"
29)
30
31type Pipelines struct {
32 repoResolver *reporesolver.RepoResolver
33 idResolver *idresolver.Resolver
34 config *config.Config
35 oauth *oauth.OAuth
36 pages *pages.Pages
37 spindlestream *eventconsumer.Consumer
38 db *db.DB
39 enforcer *rbac.Enforcer
40 logger *slog.Logger
41}
42
43func (p *Pipelines) Router(mw *middleware.Middleware) http.Handler {
44 r := chi.NewRouter()
45 r.Get("/", p.Index)
46 r.Get("/{pipeline}/workflow/{workflow}", p.Workflow)
47 r.Get("/{pipeline}/workflow/{workflow}/logs", p.Logs)
48 r.
49 With(mw.RepoPermissionMiddleware("repo:owner")).
50 Post("/{pipeline}/workflow/{workflow}/cancel", p.Cancel)
51
52 return r
53}
54
55func New(
56 oauth *oauth.OAuth,
57 repoResolver *reporesolver.RepoResolver,
58 pages *pages.Pages,
59 spindlestream *eventconsumer.Consumer,
60 idResolver *idresolver.Resolver,
61 db *db.DB,
62 config *config.Config,
63 enforcer *rbac.Enforcer,
64 logger *slog.Logger,
65) *Pipelines {
66 return &Pipelines{
67 oauth: oauth,
68 repoResolver: repoResolver,
69 pages: pages,
70 idResolver: idResolver,
71 config: config,
72 spindlestream: spindlestream,
73 db: db,
74 enforcer: enforcer,
75 logger: logger,
76 }
77}
78
79func (p *Pipelines) Index(w http.ResponseWriter, r *http.Request) {
80 user := p.oauth.GetMultiAccountUser(r)
81 l := p.logger.With("handler", "Index")
82
83 f, err := p.repoResolver.Resolve(r)
84 if err != nil {
85 l.Error("failed to get repo and knot", "err", err)
86 return
87 }
88
89 ps, err := db.GetPipelineStatuses(
90 p.db,
91 30,
92 orm.FilterEq("repo_owner", f.Did),
93 orm.FilterEq("repo_name", f.Name),
94 orm.FilterEq("knot", f.Knot),
95 )
96 if err != nil {
97 l.Error("failed to query db", "err", err)
98 return
99 }
100
101 // Filter by trigger
102 filterTrigger := r.URL.Query().Get("trigger")
103 var filtered []models.Pipeline
104 for _, pipeline := range ps {
105 if filterTrigger == "push" && pipeline.Trigger != nil && pipeline.Trigger.IsPush() {
106 filtered = append(filtered, pipeline)
107 } else if filterTrigger == "pr" && pipeline.Trigger != nil && pipeline.Trigger.IsPullRequest() {
108 filtered = append(filtered, pipeline)
109 } else if filterTrigger == "" || filterTrigger == "all" {
110 filtered = append(filtered, pipeline)
111 }
112 }
113
114 filteringByPush := filterTrigger == "push"
115 filteringByPR := filterTrigger == "pr"
116
117 p.pages.Pipelines(w, pages.PipelinesParams{
118 LoggedInUser: user,
119 RepoInfo: p.repoResolver.GetRepoInfo(r, user),
120 Pipelines: filtered,
121 FilteringByPush: filteringByPush,
122 FilteringByPR: filteringByPR,
123 })
124}
125
126func (p *Pipelines) Workflow(w http.ResponseWriter, r *http.Request) {
127 user := p.oauth.GetMultiAccountUser(r)
128 l := p.logger.With("handler", "Workflow")
129
130 f, err := p.repoResolver.Resolve(r)
131 if err != nil {
132 l.Error("failed to get repo and knot", "err", err)
133 return
134 }
135
136 pipelineId := chi.URLParam(r, "pipeline")
137 if pipelineId == "" {
138 l.Error("empty pipeline ID")
139 return
140 }
141
142 workflow := chi.URLParam(r, "workflow")
143 if workflow == "" {
144 l.Error("empty workflow name")
145 return
146 }
147
148 ps, err := db.GetPipelineStatuses(
149 p.db,
150 1,
151 orm.FilterEq("repo_owner", f.Did),
152 orm.FilterEq("repo_name", f.Name),
153 orm.FilterEq("knot", f.Knot),
154 orm.FilterEq("id", pipelineId),
155 )
156 if err != nil {
157 l.Error("failed to query db", "err", err)
158 return
159 }
160
161 if len(ps) != 1 {
162 l.Error("invalid number of pipelines", "len", len(ps))
163 return
164 }
165
166 singlePipeline := ps[0]
167
168 p.pages.Workflow(w, pages.WorkflowParams{
169 LoggedInUser: user,
170 RepoInfo: p.repoResolver.GetRepoInfo(r, user),
171 Pipeline: singlePipeline,
172 Workflow: workflow,
173 })
174}
175
176var upgrader = websocket.Upgrader{
177 ReadBufferSize: 1024,
178 WriteBufferSize: 1024,
179}
180
181func (p *Pipelines) Logs(w http.ResponseWriter, r *http.Request) {
182 l := p.logger.With("handler", "logs")
183
184 clientConn, err := upgrader.Upgrade(w, r, nil)
185 if err != nil {
186 l.Error("websocket upgrade failed", "err", err)
187 return
188 }
189 defer func() {
190 _ = clientConn.WriteControl(
191 websocket.CloseMessage,
192 websocket.FormatCloseMessage(websocket.CloseNormalClosure, "log stream complete"),
193 time.Now().Add(time.Second),
194 )
195 clientConn.Close()
196 }()
197
198 ctx, cancel := context.WithCancel(r.Context())
199 defer cancel()
200
201 f, err := p.repoResolver.Resolve(r)
202 if err != nil {
203 l.Error("failed to get repo and knot", "err", err)
204 http.Error(w, "bad repo/knot", http.StatusBadRequest)
205 return
206 }
207
208 pipelineId := chi.URLParam(r, "pipeline")
209 workflow := chi.URLParam(r, "workflow")
210 if pipelineId == "" || workflow == "" {
211 http.Error(w, "missing pipeline ID or workflow", http.StatusBadRequest)
212 return
213 }
214
215 ps, err := db.GetPipelineStatuses(
216 p.db,
217 1,
218 orm.FilterEq("repo_owner", f.Did),
219 orm.FilterEq("repo_name", f.Name),
220 orm.FilterEq("knot", f.Knot),
221 orm.FilterEq("id", pipelineId),
222 )
223 if err != nil || len(ps) != 1 {
224 l.Error("pipeline query failed", "err", err, "count", len(ps))
225 http.Error(w, "pipeline not found", http.StatusNotFound)
226 return
227 }
228
229 singlePipeline := ps[0]
230 spindle := f.Spindle
231 knot := f.Knot
232 rkey := singlePipeline.Rkey
233
234 if spindle == "" || knot == "" || rkey == "" {
235 http.Error(w, "invalid repo info", http.StatusBadRequest)
236 return
237 }
238
239 scheme := "wss"
240 if p.config.Core.Dev {
241 scheme = "ws"
242 }
243
244 url := scheme + "://" + strings.Join([]string{spindle, "logs", knot, rkey, workflow}, "/")
245 l = l.With("url", url)
246 l.Info("logs endpoint hit")
247
248 spindleConn, _, err := websocket.DefaultDialer.Dial(url, nil)
249 if err != nil {
250 l.Error("websocket dial failed", "err", err)
251 http.Error(w, "failed to connect to log stream", http.StatusBadGateway)
252 return
253 }
254 defer spindleConn.Close()
255
256 // create a channel for incoming messages
257 evChan := make(chan logEvent, 100)
258 // start a goroutine to read from spindle
259 go readLogs(spindleConn, evChan)
260
261 stepStartTimes := make(map[int]time.Time)
262 var fragment bytes.Buffer
263 for {
264 select {
265 case <-ctx.Done():
266 l.Info("client disconnected")
267 return
268
269 case ev, ok := <-evChan:
270 if !ok {
271 continue
272 }
273
274 if ev.err != nil && ev.isCloseError() {
275 l.Debug("graceful shutdown, tail complete", "err", err)
276 return
277 }
278 if ev.err != nil {
279 l.Error("error reading from spindle", "err", err)
280 return
281 }
282
283 var logLine spindlemodel.LogLine
284 if err = json.Unmarshal(ev.msg, &logLine); err != nil {
285 l.Error("failed to parse logline", "err", err)
286 continue
287 }
288
289 fragment.Reset()
290
291 switch logLine.Kind {
292 case spindlemodel.LogKindControl:
293 switch logLine.StepStatus {
294 case spindlemodel.StepStatusStart:
295 stepStartTimes[logLine.StepId] = logLine.Time
296 collapsed := false
297 if logLine.StepKind == spindlemodel.StepKindSystem {
298 collapsed = true
299 }
300 err = p.pages.LogBlock(&fragment, pages.LogBlockParams{
301 Id: logLine.StepId,
302 Name: logLine.Content,
303 Command: logLine.StepCommand,
304 Collapsed: collapsed,
305 StartTime: logLine.Time,
306 })
307 case spindlemodel.StepStatusEnd:
308 startTime := stepStartTimes[logLine.StepId]
309 endTime := logLine.Time
310 err = p.pages.LogBlockEnd(&fragment, pages.LogBlockEndParams{
311 Id: logLine.StepId,
312 StartTime: startTime,
313 EndTime: endTime,
314 })
315 }
316
317 case spindlemodel.LogKindData:
318 // data messages simply insert new log lines into current step
319 err = p.pages.LogLine(&fragment, pages.LogLineParams{
320 Id: logLine.StepId,
321 Content: logLine.Content,
322 })
323 }
324 if err != nil {
325 l.Error("failed to render log line", "err", err)
326 return
327 }
328
329 if err = clientConn.WriteMessage(websocket.TextMessage, fragment.Bytes()); err != nil {
330 l.Error("error writing to client", "err", err)
331 return
332 }
333
334 case <-time.After(30 * time.Second):
335 l.Debug("sent keepalive")
336 if err = clientConn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil {
337 l.Error("failed to write control", "err", err)
338 return
339 }
340 }
341 }
342}
343
344func (p *Pipelines) Cancel(w http.ResponseWriter, r *http.Request) {
345 l := p.logger.With("handler", "Cancel")
346
347 var (
348 pipelineId = chi.URLParam(r, "pipeline")
349 workflow = chi.URLParam(r, "workflow")
350 )
351 if pipelineId == "" || workflow == "" {
352 http.Error(w, "missing pipeline ID or workflow", http.StatusBadRequest)
353 return
354 }
355
356 f, err := p.repoResolver.Resolve(r)
357 if err != nil {
358 l.Error("failed to get repo and knot", "err", err)
359 http.Error(w, "bad repo/knot", http.StatusBadRequest)
360 return
361 }
362
363 pipeline, err := func() (models.Pipeline, error) {
364 ps, err := db.GetPipelineStatuses(
365 p.db,
366 1,
367 orm.FilterEq("repo_owner", f.Did),
368 orm.FilterEq("repo_name", f.Name),
369 orm.FilterEq("knot", f.Knot),
370 orm.FilterEq("id", pipelineId),
371 )
372 if err != nil {
373 return models.Pipeline{}, err
374 }
375 if len(ps) != 1 {
376 return models.Pipeline{}, fmt.Errorf("wrong pipeline count %d", len(ps))
377 }
378 return ps[0], nil
379 }()
380 if err != nil {
381 l.Error("pipeline query failed", "err", err)
382 http.Error(w, "pipeline not found", http.StatusNotFound)
383 }
384 var (
385 spindle = f.Spindle
386 knot = f.Knot
387 rkey = pipeline.Rkey
388 )
389
390 if spindle == "" || knot == "" || rkey == "" {
391 http.Error(w, "invalid repo info", http.StatusBadRequest)
392 return
393 }
394
395 spindleClient, err := p.oauth.ServiceClient(
396 r,
397 oauth.WithService(f.Spindle),
398 oauth.WithLxm(tangled.PipelineCancelPipelineNSID),
399 oauth.WithDev(p.config.Core.Dev),
400 oauth.WithTimeout(time.Second*30), // workflow cleanup usually takes time
401 )
402
403 err = tangled.PipelineCancelPipeline(
404 r.Context(),
405 spindleClient,
406 &tangled.PipelineCancelPipeline_Input{
407 Repo: string(f.RepoAt()),
408 Pipeline: pipeline.AtUri().String(),
409 Workflow: workflow,
410 },
411 )
412 errorId := "workflow-error"
413 if err != nil {
414 l.Error("failed to cancel workflow", "err", err)
415 p.pages.Notice(w, errorId, "Failed to cancel workflow")
416 return
417 }
418 l.Debug("canceled pipeline", "uri", pipeline.AtUri())
419}
420
421// either a message or an error
422type logEvent struct {
423 msg []byte
424 err error
425}
426
427func (ev *logEvent) isCloseError() bool {
428 return websocket.IsCloseError(
429 ev.err,
430 websocket.CloseNormalClosure,
431 websocket.CloseGoingAway,
432 websocket.CloseAbnormalClosure,
433 )
434}
435
436// read logs from spindle and pass through to chan
437func readLogs(conn *websocket.Conn, ch chan logEvent) {
438 defer close(ch)
439
440 for {
441 if conn == nil {
442 return
443 }
444
445 _, msg, err := conn.ReadMessage()
446 if err != nil {
447 ch <- logEvent{err: err}
448 return
449 }
450 ch <- logEvent{msg: msg}
451 }
452}