this repo has no description
1package pipelines 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log/slog" 8 "net/http" 9 "strings" 10 "time" 11 12 "tangled.sh/tangled.sh/core/appview/config" 13 "tangled.sh/tangled.sh/core/appview/db" 14 "tangled.sh/tangled.sh/core/appview/idresolver" 15 "tangled.sh/tangled.sh/core/appview/oauth" 16 "tangled.sh/tangled.sh/core/appview/pages" 17 "tangled.sh/tangled.sh/core/appview/reporesolver" 18 "tangled.sh/tangled.sh/core/eventconsumer" 19 "tangled.sh/tangled.sh/core/log" 20 "tangled.sh/tangled.sh/core/rbac" 21 spindlemodel "tangled.sh/tangled.sh/core/spindle/models" 22 23 "github.com/go-chi/chi/v5" 24 "github.com/gorilla/websocket" 25 "github.com/posthog/posthog-go" 26) 27 28type Pipelines struct { 29 repoResolver *reporesolver.RepoResolver 30 idResolver *idresolver.Resolver 31 config *config.Config 32 oauth *oauth.OAuth 33 pages *pages.Pages 34 spindlestream *eventconsumer.Consumer 35 db *db.DB 36 enforcer *rbac.Enforcer 37 posthog posthog.Client 38 logger *slog.Logger 39} 40 41func New( 42 oauth *oauth.OAuth, 43 repoResolver *reporesolver.RepoResolver, 44 pages *pages.Pages, 45 spindlestream *eventconsumer.Consumer, 46 idResolver *idresolver.Resolver, 47 db *db.DB, 48 config *config.Config, 49 posthog posthog.Client, 50 enforcer *rbac.Enforcer, 51) *Pipelines { 52 logger := log.New("pipelines") 53 54 return &Pipelines{oauth: oauth, 55 repoResolver: repoResolver, 56 pages: pages, 57 idResolver: idResolver, 58 config: config, 59 spindlestream: spindlestream, 60 db: db, 61 posthog: posthog, 62 enforcer: enforcer, 63 logger: logger, 64 } 65} 66 67func (p *Pipelines) Index(w http.ResponseWriter, r *http.Request) { 68 user := p.oauth.GetUser(r) 69 l := p.logger.With("handler", "Index") 70 71 f, err := p.repoResolver.Resolve(r) 72 if err != nil { 73 l.Error("failed to get repo and knot", "err", err) 74 return 75 } 76 77 repoInfo := f.RepoInfo(user) 78 79 ps, err := db.GetPipelineStatuses( 80 p.db, 81 db.FilterEq("repo_owner", repoInfo.OwnerDid), 82 db.FilterEq("repo_name", repoInfo.Name), 83 db.FilterEq("knot", repoInfo.Knot), 84 ) 85 if err != nil { 86 l.Error("failed to query db", "err", err) 87 return 88 } 89 90 p.pages.Pipelines(w, pages.PipelinesParams{ 91 LoggedInUser: user, 92 RepoInfo: repoInfo, 93 Pipelines: ps, 94 }) 95} 96 97func (p *Pipelines) Workflow(w http.ResponseWriter, r *http.Request) { 98 user := p.oauth.GetUser(r) 99 l := p.logger.With("handler", "Workflow") 100 101 f, err := p.repoResolver.Resolve(r) 102 if err != nil { 103 l.Error("failed to get repo and knot", "err", err) 104 return 105 } 106 107 repoInfo := f.RepoInfo(user) 108 109 pipelineId := chi.URLParam(r, "pipeline") 110 if pipelineId == "" { 111 l.Error("empty pipeline ID") 112 return 113 } 114 115 workflow := chi.URLParam(r, "workflow") 116 if workflow == "" { 117 l.Error("empty workflow name") 118 return 119 } 120 121 ps, err := db.GetPipelineStatuses( 122 p.db, 123 db.FilterEq("repo_owner", repoInfo.OwnerDid), 124 db.FilterEq("repo_name", repoInfo.Name), 125 db.FilterEq("knot", repoInfo.Knot), 126 db.FilterEq("id", pipelineId), 127 ) 128 if err != nil { 129 l.Error("failed to query db", "err", err) 130 return 131 } 132 133 if len(ps) != 1 { 134 l.Error("invalid number of pipelines", "len", len(ps)) 135 return 136 } 137 138 singlePipeline := ps[0] 139 140 p.pages.Workflow(w, pages.WorkflowParams{ 141 LoggedInUser: user, 142 RepoInfo: repoInfo, 143 Pipeline: singlePipeline, 144 Workflow: workflow, 145 }) 146} 147 148var upgrader = websocket.Upgrader{ 149 ReadBufferSize: 1024, 150 WriteBufferSize: 1024, 151} 152 153func (p *Pipelines) Logs(w http.ResponseWriter, r *http.Request) { 154 l := p.logger.With("handler", "logs") 155 156 clientConn, err := upgrader.Upgrade(w, r, nil) 157 if err != nil { 158 l.Error("websocket upgrade failed", "err", err) 159 return 160 } 161 defer clientConn.Close() 162 163 ctx, cancel := context.WithCancel(r.Context()) 164 defer cancel() 165 go func() { 166 for { 167 if _, _, err := clientConn.NextReader(); err != nil { 168 l.Error("failed to read", "err", err) 169 cancel() 170 return 171 } 172 } 173 }() 174 175 user := p.oauth.GetUser(r) 176 f, err := p.repoResolver.Resolve(r) 177 if err != nil { 178 l.Error("failed to get repo and knot", "err", err) 179 http.Error(w, "bad repo/knot", http.StatusBadRequest) 180 return 181 } 182 183 repoInfo := f.RepoInfo(user) 184 185 pipelineId := chi.URLParam(r, "pipeline") 186 workflow := chi.URLParam(r, "workflow") 187 if pipelineId == "" || workflow == "" { 188 http.Error(w, "missing pipeline ID or workflow", http.StatusBadRequest) 189 return 190 } 191 192 ps, err := db.GetPipelineStatuses( 193 p.db, 194 db.FilterEq("repo_owner", repoInfo.OwnerDid), 195 db.FilterEq("repo_name", repoInfo.Name), 196 db.FilterEq("knot", repoInfo.Knot), 197 db.FilterEq("id", pipelineId), 198 ) 199 if err != nil || len(ps) != 1 { 200 l.Error("pipeline query failed", "err", err, "count", len(ps)) 201 http.Error(w, "pipeline not found", http.StatusNotFound) 202 return 203 } 204 205 singlePipeline := ps[0] 206 spindle := repoInfo.Spindle 207 knot := repoInfo.Knot 208 rkey := singlePipeline.Rkey 209 210 if spindle == "" || knot == "" || rkey == "" { 211 http.Error(w, "invalid repo info", http.StatusBadRequest) 212 return 213 } 214 215 scheme := "wss" 216 if p.config.Core.Dev { 217 scheme = "ws" 218 } 219 220 url := scheme + "://" + strings.Join([]string{spindle, "logs", knot, rkey, workflow}, "/") 221 l = l.With("url", url) 222 l.Info("logs endpoint hit") 223 224 spindleConn, _, err := websocket.DefaultDialer.Dial(url, nil) 225 if err != nil { 226 l.Error("websocket dial failed", "err", err) 227 http.Error(w, "failed to connect to log stream", http.StatusBadGateway) 228 return 229 } 230 defer spindleConn.Close() 231 232 // create a channel for incoming messages 233 msgChan := make(chan []byte, 10) 234 errChan := make(chan error, 1) 235 236 // start a goroutine to read from spindle 237 go func() { 238 defer close(msgChan) 239 for { 240 _, msg, err := spindleConn.ReadMessage() 241 if err != nil { 242 errChan <- err 243 return 244 } 245 msgChan <- msg 246 } 247 }() 248 249 for { 250 select { 251 case <-ctx.Done(): 252 l.Info("client disconnected") 253 return 254 case err := <-errChan: 255 l.Error("error reading from spindle", "err", err) 256 return 257 case msg := <-msgChan: 258 var logLine spindlemodel.LogLine 259 if err = json.Unmarshal(msg, &logLine); err != nil { 260 l.Error("failed to parse logline", "err", err) 261 continue 262 } 263 264 html := fmt.Appendf(nil, ` 265 <div id="lines" hx-swap-oob="beforeend"> 266 <p>%s: %s</p> 267 </div> 268 `, logLine.Stream, logLine.Data) 269 270 if err = clientConn.WriteMessage(websocket.TextMessage, html); err != nil { 271 l.Error("error writing to client", "err", err) 272 return 273 } 274 case <-time.After(30 * time.Second): 275 l.Debug("sent keepalive") 276 if err = clientConn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil { 277 l.Error("failed to write control", "err", err) 278 } 279 } 280 } 281}