Monorepo for Tangled
1package pipelines
2
3import (
4 "bytes"
5 "context"
6 "encoding/json"
7 "fmt"
8 "log/slog"
9 "net/http"
10 "strings"
11 "time"
12
13 "tangled.org/core/api/tangled"
14 "tangled.org/core/appview/config"
15 "tangled.org/core/appview/db"
16 "tangled.org/core/appview/middleware"
17 "tangled.org/core/appview/models"
18 "tangled.org/core/appview/oauth"
19 "tangled.org/core/appview/pages"
20 "tangled.org/core/appview/reporesolver"
21 "tangled.org/core/eventconsumer"
22 "tangled.org/core/idresolver"
23 "tangled.org/core/orm"
24 "tangled.org/core/rbac"
25 spindlemodel "tangled.org/core/spindle/models"
26
27 "github.com/go-chi/chi/v5"
28 "github.com/gorilla/websocket"
29)
30
31type Pipelines struct {
32 repoResolver *reporesolver.RepoResolver
33 idResolver *idresolver.Resolver
34 config *config.Config
35 oauth *oauth.OAuth
36 pages *pages.Pages
37 spindlestream *eventconsumer.Consumer
38 db *db.DB
39 enforcer *rbac.Enforcer
40 logger *slog.Logger
41}
42
43func (p *Pipelines) Router(mw *middleware.Middleware) http.Handler {
44 r := chi.NewRouter()
45 r.Get("/", p.Index)
46 r.Get("/{pipeline}/workflow/{workflow}", p.Workflow)
47 r.Get("/{pipeline}/workflow/{workflow}/logs", p.Logs)
48 r.
49 With(mw.RepoPermissionMiddleware("repo:owner")).
50 Post("/{pipeline}/workflow/{workflow}/cancel", p.Cancel)
51
52 return r
53}
54
55func New(
56 oauth *oauth.OAuth,
57 repoResolver *reporesolver.RepoResolver,
58 pages *pages.Pages,
59 spindlestream *eventconsumer.Consumer,
60 idResolver *idresolver.Resolver,
61 db *db.DB,
62 config *config.Config,
63 enforcer *rbac.Enforcer,
64 logger *slog.Logger,
65) *Pipelines {
66 return &Pipelines{
67 oauth: oauth,
68 repoResolver: repoResolver,
69 pages: pages,
70 idResolver: idResolver,
71 config: config,
72 spindlestream: spindlestream,
73 db: db,
74 enforcer: enforcer,
75 logger: logger,
76 }
77}
78
79func (p *Pipelines) Index(w http.ResponseWriter, r *http.Request) {
80 user := p.oauth.GetMultiAccountUser(r)
81 l := p.logger.With("handler", "Index")
82
83 f, err := p.repoResolver.Resolve(r)
84 if err != nil {
85 l.Error("failed to get repo and knot", "err", err)
86 return
87 }
88
89 filterKind := r.URL.Query().Get("trigger")
90 filters := []orm.Filter{
91 orm.FilterEq("p.repo_owner", f.Did),
92 orm.FilterEq("p.repo_name", f.Name),
93 orm.FilterEq("p.knot", f.Knot),
94 }
95 switch filterKind {
96 case "push":
97 filters = append(filters, orm.FilterEq("t.kind", "push"))
98 case "pull_request":
99 filters = append(filters, orm.FilterEq("t.kind", "pull_request"))
100 default:
101 // no filters otherwise, default to "all"
102 filterKind = "all"
103 }
104
105 ps, err := db.GetPipelineStatuses(
106 p.db,
107 30,
108 filters...,
109 )
110 if err != nil {
111 l.Error("failed to query db", "err", err)
112 return
113 }
114
115 total, err := db.GetTotalPipelineStatuses(p.db, filters...)
116 if err != nil {
117 l.Error("failed to query db", "err", err)
118 return
119 }
120
121 p.pages.Pipelines(w, pages.PipelinesParams{
122 LoggedInUser: user,
123 RepoInfo: p.repoResolver.GetRepoInfo(r, user),
124 Pipelines: ps,
125 FilterKind: filterKind,
126 Total: total,
127 })
128}
129
130func (p *Pipelines) Workflow(w http.ResponseWriter, r *http.Request) {
131 user := p.oauth.GetMultiAccountUser(r)
132 l := p.logger.With("handler", "Workflow")
133
134 f, err := p.repoResolver.Resolve(r)
135 if err != nil {
136 l.Error("failed to get repo and knot", "err", err)
137 return
138 }
139
140 pipelineId := chi.URLParam(r, "pipeline")
141 if pipelineId == "" {
142 l.Error("empty pipeline ID")
143 return
144 }
145
146 workflow := chi.URLParam(r, "workflow")
147 if workflow == "" {
148 l.Error("empty workflow name")
149 return
150 }
151
152 ps, err := db.GetPipelineStatuses(
153 p.db,
154 1,
155 orm.FilterEq("p.repo_owner", f.Did),
156 orm.FilterEq("p.repo_name", f.Name),
157 orm.FilterEq("p.knot", f.Knot),
158 orm.FilterEq("p.id", pipelineId),
159 )
160 if err != nil {
161 l.Error("failed to query db", "err", err)
162 return
163 }
164
165 if len(ps) != 1 {
166 l.Error("invalid number of pipelines", "len", len(ps))
167 return
168 }
169
170 singlePipeline := ps[0]
171
172 p.pages.Workflow(w, pages.WorkflowParams{
173 LoggedInUser: user,
174 RepoInfo: p.repoResolver.GetRepoInfo(r, user),
175 Pipeline: singlePipeline,
176 Workflow: workflow,
177 })
178}
179
180var upgrader = websocket.Upgrader{
181 ReadBufferSize: 1024,
182 WriteBufferSize: 1024,
183}
184
185func (p *Pipelines) Logs(w http.ResponseWriter, r *http.Request) {
186 l := p.logger.With("handler", "logs")
187
188 clientConn, err := upgrader.Upgrade(w, r, nil)
189 if err != nil {
190 l.Error("websocket upgrade failed", "err", err)
191 return
192 }
193 defer func() {
194 _ = clientConn.WriteControl(
195 websocket.CloseMessage,
196 websocket.FormatCloseMessage(websocket.CloseNormalClosure, "log stream complete"),
197 time.Now().Add(time.Second),
198 )
199 clientConn.Close()
200 }()
201
202 ctx, cancel := context.WithCancel(r.Context())
203 defer cancel()
204
205 f, err := p.repoResolver.Resolve(r)
206 if err != nil {
207 l.Error("failed to get repo and knot", "err", err)
208 http.Error(w, "bad repo/knot", http.StatusBadRequest)
209 return
210 }
211
212 pipelineId := chi.URLParam(r, "pipeline")
213 workflow := chi.URLParam(r, "workflow")
214 if pipelineId == "" || workflow == "" {
215 http.Error(w, "missing pipeline ID or workflow", http.StatusBadRequest)
216 return
217 }
218
219 ps, err := db.GetPipelineStatuses(
220 p.db,
221 1,
222 orm.FilterEq("p.repo_owner", f.Did),
223 orm.FilterEq("p.repo_name", f.Name),
224 orm.FilterEq("p.knot", f.Knot),
225 orm.FilterEq("p.id", pipelineId),
226 )
227 if err != nil || len(ps) != 1 {
228 l.Error("pipeline query failed", "err", err, "count", len(ps))
229 http.Error(w, "pipeline not found", http.StatusNotFound)
230 return
231 }
232
233 singlePipeline := ps[0]
234 spindle := f.Spindle
235 knot := f.Knot
236 rkey := singlePipeline.Rkey
237
238 if spindle == "" || knot == "" || rkey == "" {
239 http.Error(w, "invalid repo info", http.StatusBadRequest)
240 return
241 }
242
243 scheme := "wss"
244 if p.config.Core.Dev {
245 scheme = "ws"
246 }
247
248 url := scheme + "://" + strings.Join([]string{spindle, "logs", knot, rkey, workflow}, "/")
249 l = l.With("url", url)
250 l.Info("logs endpoint hit")
251
252 spindleConn, _, err := websocket.DefaultDialer.Dial(url, nil)
253 if err != nil {
254 l.Error("websocket dial failed", "err", err)
255 http.Error(w, "failed to connect to log stream", http.StatusBadGateway)
256 return
257 }
258 defer spindleConn.Close()
259
260 // create a channel for incoming messages
261 evChan := make(chan logEvent, 100)
262 // start a goroutine to read from spindle
263 go readLogs(spindleConn, evChan)
264
265 stepStartTimes := make(map[int]time.Time)
266 var fragment bytes.Buffer
267 for {
268 select {
269 case <-ctx.Done():
270 l.Info("client disconnected")
271 return
272
273 case ev, ok := <-evChan:
274 if !ok {
275 continue
276 }
277
278 if ev.err != nil && ev.isCloseError() {
279 l.Debug("graceful shutdown, tail complete", "err", err)
280 return
281 }
282 if ev.err != nil {
283 l.Error("error reading from spindle", "err", err)
284 return
285 }
286
287 var logLine spindlemodel.LogLine
288 if err = json.Unmarshal(ev.msg, &logLine); err != nil {
289 l.Error("failed to parse logline", "err", err)
290 continue
291 }
292
293 fragment.Reset()
294
295 switch logLine.Kind {
296 case spindlemodel.LogKindControl:
297 switch logLine.StepStatus {
298 case spindlemodel.StepStatusStart:
299 stepStartTimes[logLine.StepId] = logLine.Time
300 collapsed := false
301 if logLine.StepKind == spindlemodel.StepKindSystem {
302 collapsed = true
303 }
304 err = p.pages.LogBlock(&fragment, pages.LogBlockParams{
305 Id: logLine.StepId,
306 Name: logLine.Content,
307 Command: logLine.StepCommand,
308 Collapsed: collapsed,
309 StartTime: logLine.Time,
310 })
311 case spindlemodel.StepStatusEnd:
312 startTime := stepStartTimes[logLine.StepId]
313 endTime := logLine.Time
314 err = p.pages.LogBlockEnd(&fragment, pages.LogBlockEndParams{
315 Id: logLine.StepId,
316 StartTime: startTime,
317 EndTime: endTime,
318 })
319 }
320
321 case spindlemodel.LogKindData:
322 // data messages simply insert new log lines into current step
323 err = p.pages.LogLine(&fragment, pages.LogLineParams{
324 Id: logLine.StepId,
325 Content: logLine.Content,
326 })
327 }
328 if err != nil {
329 l.Error("failed to render log line", "err", err)
330 return
331 }
332
333 if err = clientConn.WriteMessage(websocket.TextMessage, fragment.Bytes()); err != nil {
334 l.Error("error writing to client", "err", err)
335 return
336 }
337
338 case <-time.After(30 * time.Second):
339 l.Debug("sent keepalive")
340 if err = clientConn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil {
341 l.Error("failed to write control", "err", err)
342 return
343 }
344 }
345 }
346}
347
348func (p *Pipelines) Cancel(w http.ResponseWriter, r *http.Request) {
349 l := p.logger.With("handler", "Cancel")
350
351 var (
352 pipelineId = chi.URLParam(r, "pipeline")
353 workflow = chi.URLParam(r, "workflow")
354 )
355 if pipelineId == "" || workflow == "" {
356 http.Error(w, "missing pipeline ID or workflow", http.StatusBadRequest)
357 return
358 }
359
360 f, err := p.repoResolver.Resolve(r)
361 if err != nil {
362 l.Error("failed to get repo and knot", "err", err)
363 http.Error(w, "bad repo/knot", http.StatusBadRequest)
364 return
365 }
366
367 pipeline, err := func() (models.Pipeline, error) {
368 ps, err := db.GetPipelineStatuses(
369 p.db,
370 1,
371 orm.FilterEq("p.repo_owner", f.Did),
372 orm.FilterEq("p.repo_name", f.Name),
373 orm.FilterEq("p.knot", f.Knot),
374 orm.FilterEq("p.id", pipelineId),
375 )
376 if err != nil {
377 return models.Pipeline{}, err
378 }
379 if len(ps) != 1 {
380 return models.Pipeline{}, fmt.Errorf("wrong pipeline count %d", len(ps))
381 }
382 return ps[0], nil
383 }()
384 if err != nil {
385 l.Error("pipeline query failed", "err", err)
386 http.Error(w, "pipeline not found", http.StatusNotFound)
387 }
388 var (
389 spindle = f.Spindle
390 knot = f.Knot
391 rkey = pipeline.Rkey
392 )
393
394 if spindle == "" || knot == "" || rkey == "" {
395 http.Error(w, "invalid repo info", http.StatusBadRequest)
396 return
397 }
398
399 spindleClient, err := p.oauth.ServiceClient(
400 r,
401 oauth.WithService(f.Spindle),
402 oauth.WithLxm(tangled.PipelineCancelPipelineNSID),
403 oauth.WithDev(p.config.Core.Dev),
404 oauth.WithTimeout(time.Second*30), // workflow cleanup usually takes time
405 )
406
407 err = tangled.PipelineCancelPipeline(
408 r.Context(),
409 spindleClient,
410 &tangled.PipelineCancelPipeline_Input{
411 Repo: string(f.RepoAt()),
412 Pipeline: pipeline.AtUri().String(),
413 Workflow: workflow,
414 },
415 )
416 errorId := "workflow-error"
417 if err != nil {
418 l.Error("failed to cancel workflow", "err", err)
419 p.pages.Notice(w, errorId, "Failed to cancel workflow")
420 return
421 }
422 l.Debug("canceled pipeline", "uri", pipeline.AtUri())
423}
424
425// either a message or an error
426type logEvent struct {
427 msg []byte
428 err error
429}
430
431func (ev *logEvent) isCloseError() bool {
432 return websocket.IsCloseError(
433 ev.err,
434 websocket.CloseNormalClosure,
435 websocket.CloseGoingAway,
436 websocket.CloseAbnormalClosure,
437 )
438}
439
440// read logs from spindle and pass through to chan
441func readLogs(conn *websocket.Conn, ch chan logEvent) {
442 defer close(ch)
443
444 for {
445 if conn == nil {
446 return
447 }
448
449 _, msg, err := conn.ReadMessage()
450 if err != nil {
451 ch <- logEvent{err: err}
452 return
453 }
454 ch <- logEvent{msg: msg}
455 }
456}