this repo has no description
1package pipelines
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log/slog"
8 "net/http"
9 "strings"
10 "time"
11
12 "tangled.sh/tangled.sh/core/appview/config"
13 "tangled.sh/tangled.sh/core/appview/db"
14 "tangled.sh/tangled.sh/core/appview/idresolver"
15 "tangled.sh/tangled.sh/core/appview/oauth"
16 "tangled.sh/tangled.sh/core/appview/pages"
17 "tangled.sh/tangled.sh/core/appview/reporesolver"
18 "tangled.sh/tangled.sh/core/eventconsumer"
19 "tangled.sh/tangled.sh/core/log"
20 "tangled.sh/tangled.sh/core/rbac"
21 spindlemodel "tangled.sh/tangled.sh/core/spindle/models"
22
23 "github.com/go-chi/chi/v5"
24 "github.com/gorilla/websocket"
25 "github.com/posthog/posthog-go"
26)
27
28type Pipelines struct {
29 repoResolver *reporesolver.RepoResolver
30 idResolver *idresolver.Resolver
31 config *config.Config
32 oauth *oauth.OAuth
33 pages *pages.Pages
34 spindlestream *eventconsumer.Consumer
35 db *db.DB
36 enforcer *rbac.Enforcer
37 posthog posthog.Client
38 logger *slog.Logger
39}
40
41func New(
42 oauth *oauth.OAuth,
43 repoResolver *reporesolver.RepoResolver,
44 pages *pages.Pages,
45 spindlestream *eventconsumer.Consumer,
46 idResolver *idresolver.Resolver,
47 db *db.DB,
48 config *config.Config,
49 posthog posthog.Client,
50 enforcer *rbac.Enforcer,
51) *Pipelines {
52 logger := log.New("pipelines")
53
54 return &Pipelines{oauth: oauth,
55 repoResolver: repoResolver,
56 pages: pages,
57 idResolver: idResolver,
58 config: config,
59 spindlestream: spindlestream,
60 db: db,
61 posthog: posthog,
62 enforcer: enforcer,
63 logger: logger,
64 }
65}
66
67func (p *Pipelines) Index(w http.ResponseWriter, r *http.Request) {
68 user := p.oauth.GetUser(r)
69 l := p.logger.With("handler", "Index")
70
71 f, err := p.repoResolver.Resolve(r)
72 if err != nil {
73 l.Error("failed to get repo and knot", "err", err)
74 return
75 }
76
77 repoInfo := f.RepoInfo(user)
78
79 ps, err := db.GetPipelineStatuses(
80 p.db,
81 db.FilterEq("repo_owner", repoInfo.OwnerDid),
82 db.FilterEq("repo_name", repoInfo.Name),
83 db.FilterEq("knot", repoInfo.Knot),
84 )
85 if err != nil {
86 l.Error("failed to query db", "err", err)
87 return
88 }
89
90 p.pages.Pipelines(w, pages.PipelinesParams{
91 LoggedInUser: user,
92 RepoInfo: repoInfo,
93 Pipelines: ps,
94 })
95}
96
97func (p *Pipelines) Workflow(w http.ResponseWriter, r *http.Request) {
98 user := p.oauth.GetUser(r)
99 l := p.logger.With("handler", "Workflow")
100
101 f, err := p.repoResolver.Resolve(r)
102 if err != nil {
103 l.Error("failed to get repo and knot", "err", err)
104 return
105 }
106
107 repoInfo := f.RepoInfo(user)
108
109 pipelineId := chi.URLParam(r, "pipeline")
110 if pipelineId == "" {
111 l.Error("empty pipeline ID")
112 return
113 }
114
115 workflow := chi.URLParam(r, "workflow")
116 if workflow == "" {
117 l.Error("empty workflow name")
118 return
119 }
120
121 ps, err := db.GetPipelineStatuses(
122 p.db,
123 db.FilterEq("repo_owner", repoInfo.OwnerDid),
124 db.FilterEq("repo_name", repoInfo.Name),
125 db.FilterEq("knot", repoInfo.Knot),
126 db.FilterEq("id", pipelineId),
127 )
128 if err != nil {
129 l.Error("failed to query db", "err", err)
130 return
131 }
132
133 if len(ps) != 1 {
134 l.Error("invalid number of pipelines", "len", len(ps))
135 return
136 }
137
138 singlePipeline := ps[0]
139
140 p.pages.Workflow(w, pages.WorkflowParams{
141 LoggedInUser: user,
142 RepoInfo: repoInfo,
143 Pipeline: singlePipeline,
144 Workflow: workflow,
145 })
146}
147
148var upgrader = websocket.Upgrader{
149 ReadBufferSize: 1024,
150 WriteBufferSize: 1024,
151}
152
153func (p *Pipelines) Logs(w http.ResponseWriter, r *http.Request) {
154 l := p.logger.With("handler", "logs")
155
156 clientConn, err := upgrader.Upgrade(w, r, nil)
157 if err != nil {
158 l.Error("websocket upgrade failed", "err", err)
159 return
160 }
161 defer clientConn.Close()
162
163 ctx, cancel := context.WithCancel(r.Context())
164 defer cancel()
165 go func() {
166 for {
167 if _, _, err := clientConn.NextReader(); err != nil {
168 l.Error("failed to read", "err", err)
169 cancel()
170 return
171 }
172 }
173 }()
174
175 user := p.oauth.GetUser(r)
176 f, err := p.repoResolver.Resolve(r)
177 if err != nil {
178 l.Error("failed to get repo and knot", "err", err)
179 http.Error(w, "bad repo/knot", http.StatusBadRequest)
180 return
181 }
182
183 repoInfo := f.RepoInfo(user)
184
185 pipelineId := chi.URLParam(r, "pipeline")
186 workflow := chi.URLParam(r, "workflow")
187 if pipelineId == "" || workflow == "" {
188 http.Error(w, "missing pipeline ID or workflow", http.StatusBadRequest)
189 return
190 }
191
192 ps, err := db.GetPipelineStatuses(
193 p.db,
194 db.FilterEq("repo_owner", repoInfo.OwnerDid),
195 db.FilterEq("repo_name", repoInfo.Name),
196 db.FilterEq("knot", repoInfo.Knot),
197 db.FilterEq("id", pipelineId),
198 )
199 if err != nil || len(ps) != 1 {
200 l.Error("pipeline query failed", "err", err, "count", len(ps))
201 http.Error(w, "pipeline not found", http.StatusNotFound)
202 return
203 }
204
205 singlePipeline := ps[0]
206 spindle := repoInfo.Spindle
207 knot := repoInfo.Knot
208 rkey := singlePipeline.Rkey
209
210 if spindle == "" || knot == "" || rkey == "" {
211 http.Error(w, "invalid repo info", http.StatusBadRequest)
212 return
213 }
214
215 scheme := "wss"
216 if p.config.Core.Dev {
217 scheme = "ws"
218 }
219
220 url := scheme + "://" + strings.Join([]string{spindle, "logs", knot, rkey, workflow}, "/")
221 l = l.With("url", url)
222 l.Info("logs endpoint hit")
223
224 spindleConn, _, err := websocket.DefaultDialer.Dial(url, nil)
225 if err != nil {
226 l.Error("websocket dial failed", "err", err)
227 http.Error(w, "failed to connect to log stream", http.StatusBadGateway)
228 return
229 }
230 defer spindleConn.Close()
231
232 // create a channel for incoming messages
233 msgChan := make(chan []byte, 10)
234 errChan := make(chan error, 1)
235
236 // start a goroutine to read from spindle
237 go func() {
238 defer close(msgChan)
239 for {
240 _, msg, err := spindleConn.ReadMessage()
241 if err != nil {
242 errChan <- err
243 return
244 }
245 msgChan <- msg
246 }
247 }()
248
249 for {
250 select {
251 case <-ctx.Done():
252 l.Info("client disconnected")
253 return
254 case err := <-errChan:
255 l.Error("error reading from spindle", "err", err)
256 return
257 case msg := <-msgChan:
258 var logLine spindlemodel.LogLine
259 if err = json.Unmarshal(msg, &logLine); err != nil {
260 l.Error("failed to parse logline", "err", err)
261 continue
262 }
263
264 html := fmt.Appendf(nil, `
265 <div id="lines" hx-swap-oob="beforeend">
266 <p>%s: %s</p>
267 </div>
268 `, logLine.Stream, logLine.Data)
269
270 if err = clientConn.WriteMessage(websocket.TextMessage, html); err != nil {
271 l.Error("error writing to client", "err", err)
272 return
273 }
274 case <-time.After(30 * time.Second):
275 l.Debug("sent keepalive")
276 if err = clientConn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil {
277 l.Error("failed to write control", "err", err)
278 }
279 }
280 }
281}