Monorepo for Tangled

spindle/models: stream nixery image pull errors to clients

Signed-off-by: oppiliappan <me@oppi.li>

authored by

oppiliappan and committed by tangled.org e3285ce3 46115499

+67 -25
+16 -14
spindle/engine/engine.go
··· 30 30 } 31 31 } 32 32 33 + secretValues := make([]string, len(allSecrets)) 34 + for i, s := range allSecrets { 35 + secretValues[i] = s.Value 36 + } 37 + 33 38 var wg sync.WaitGroup 34 39 for eng, wfs := range pipeline.Workflows { 35 40 workflowTimeout := eng.WorkflowTimeout() ··· 45 50 Name: w.Name, 46 51 } 47 52 48 - err := db.StatusRunning(wid, n) 53 + wfLogger, err := models.NewFileWorkflowLogger(cfg.Server.LogDir, wid, secretValues) 54 + if err != nil { 55 + l.Warn("failed to setup step logger; logs will not be persisted", "error", err) 56 + wfLogger = models.NullLogger{} 57 + } else { 58 + l.Info("setup step logger; logs will be persisted", "logDir", cfg.Server.LogDir, "wid", wid) 59 + defer wfLogger.Close() 60 + } 61 + 62 + err = db.StatusRunning(wid, n) 49 63 if err != nil { 50 64 l.Error("failed to set workflow status to running", "wid", wid, "err", err) 51 65 return 52 66 } 53 67 54 - err = eng.SetupWorkflow(ctx, wid, &w) 68 + err = eng.SetupWorkflow(ctx, wid, &w, wfLogger) 55 69 if err != nil { 56 70 // TODO(winter): Should this always set StatusFailed? 57 71 // In the original, we only do in a subset of cases. ··· 69 83 return 70 84 } 71 85 defer eng.DestroyWorkflow(ctx, wid) 72 - 73 - secretValues := make([]string, len(allSecrets)) 74 - for i, s := range allSecrets { 75 - secretValues[i] = s.Value 76 - } 77 - wfLogger, err := models.NewWorkflowLogger(cfg.Server.LogDir, wid, secretValues) 78 - if err != nil { 79 - l.Warn("failed to setup step logger; logs will not be persisted", "error", err) 80 - wfLogger = nil 81 - } else { 82 - defer wfLogger.Close() 83 - } 84 86 85 87 ctx, cancel := context.WithTimeout(ctx, workflowTimeout) 86 88 defer cancel()
+49 -9
spindle/engines/nixery/engine.go
··· 1 1 package nixery 2 2 3 3 import ( 4 + "bufio" 4 5 "context" 5 6 "errors" 6 7 "fmt" 7 8 "io" 8 9 "log/slog" 9 - "os" 10 10 "path" 11 11 "runtime" 12 12 "sync" ··· 169 169 return e, nil 170 170 } 171 171 172 - func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId, wf *models.Workflow) error { 173 - e.l.Info("setting up workflow", "workflow", wid) 172 + func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId, wf *models.Workflow, wfLogger models.WorkflowLogger) error { 173 + /// -------------------------INITIAL SETUP------------------------------------------ 174 + l := e.l.With("workflow", wid) 175 + l.Info("setting up workflow") 176 + 177 + setupStep := Step{ 178 + name: "nixery image pull", 179 + kind: models.StepKindSystem, 180 + } 181 + setupStepIdx := -1 182 + 183 + wfLogger.ControlWriter(setupStepIdx, setupStep, models.StepStatusStart).Write([]byte{0}) 184 + defer wfLogger.ControlWriter(setupStepIdx, setupStep, models.StepStatusEnd).Write([]byte{0}) 174 185 186 + /// -------------------------NETWORK CREATION--------------------------------------- 175 187 _, err := e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{ 176 188 Driver: "bridge", 177 189 }) 178 190 if err != nil { 179 191 return err 180 192 } 193 + 181 194 e.registerCleanup(wid, func(ctx context.Context) error { 182 195 if err := e.docker.NetworkRemove(ctx, networkName(wid)); err != nil { 183 196 return fmt.Errorf("removing network: %w", err) ··· 185 198 return nil 186 199 }) 187 200 201 + /// -------------------------IMAGE PULL--------------------------------------------- 188 202 addl := wf.Data.(addlFields) 203 + l.Info("pulling image", "image", addl.image) 204 + fmt.Fprintf( 205 + wfLogger.DataWriter(setupStepIdx, "stdout"), 206 + "pulling image: %s", 207 + addl.image, 208 + ) 189 209 190 210 reader, err := e.docker.ImagePull(ctx, addl.image, image.PullOptions{}) 191 211 if err != nil { 192 - e.l.Error("pipeline image pull failed!", "image", addl.image, "workflowId", wid, "error", err.Error()) 193 - 212 + l.Error("pipeline image pull failed!", "error", err.Error()) 213 + fmt.Fprintf(wfLogger.DataWriter(setupStepIdx, "stderr"), "image pull failed: %s", err) 194 214 return fmt.Errorf("pulling image: %w", err) 195 215 } 196 216 defer reader.Close() 197 - io.Copy(os.Stdout, reader) 217 + 218 + scanner := bufio.NewScanner(reader) 219 + for scanner.Scan() { 220 + line := scanner.Text() 221 + wfLogger.DataWriter(setupStepIdx, "stdout").Write([]byte(line)) 222 + l.Info("image pull progress", "stdout", line) 223 + } 224 + 225 + /// -------------------------CONTAINER CREATION------------------------------------- 226 + l.Info("creating container") 227 + wfLogger.DataWriter(setupStepIdx, "stdout").Write([]byte("creating container...")) 198 228 199 229 resp, err := e.docker.ContainerCreate(ctx, &container.Config{ 200 230 Image: addl.image, ··· 229 259 ExtraHosts: []string{"host.docker.internal:host-gateway"}, 230 260 }, nil, nil, "") 231 261 if err != nil { 262 + fmt.Fprintf( 263 + wfLogger.DataWriter(setupStepIdx, "stderr"), 264 + "container creation failed: %s", 265 + err, 266 + ) 232 267 return fmt.Errorf("creating container: %w", err) 233 268 } 269 + 234 270 e.registerCleanup(wid, func(ctx context.Context) error { 235 271 if err := e.docker.ContainerStop(ctx, resp.ID, container.StopOptions{}); err != nil { 236 272 return fmt.Errorf("stopping container: %w", err) ··· 244 280 if err != nil { 245 281 return fmt.Errorf("removing container: %w", err) 246 282 } 283 + 247 284 return nil 248 285 }) 249 286 287 + /// -------------------------CONTAINER START---------------------------------------- 288 + wfLogger.DataWriter(setupStepIdx, "stdout").Write([]byte("starting container...")) 250 289 if err := e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{}); err != nil { 251 290 return fmt.Errorf("starting container: %w", err) 252 291 } ··· 273 312 return err 274 313 } 275 314 315 + /// -----------------------------------FINISH--------------------------------------- 276 316 execInspectResp, err := e.docker.ContainerExecInspect(ctx, mkExecResp.ID) 277 317 if err != nil { 278 318 return err ··· 290 330 return nil 291 331 } 292 332 293 - func (e *Engine) RunStep(ctx context.Context, wid models.WorkflowId, w *models.Workflow, idx int, secrets []secrets.UnlockedSecret, wfLogger *models.WorkflowLogger) error { 333 + func (e *Engine) RunStep(ctx context.Context, wid models.WorkflowId, w *models.Workflow, idx int, secrets []secrets.UnlockedSecret, wfLogger models.WorkflowLogger) error { 294 334 addl := w.Data.(addlFields) 295 335 workflowEnvs := ConstructEnvs(w.Environment) 296 336 // TODO(winter): should SetupWorkflow also have secret access? ··· 331 371 // start tailing logs in background 332 372 tailDone := make(chan error, 1) 333 373 go func() { 334 - tailDone <- e.tailStep(ctx, wfLogger, mkExecResp.ID, wid, idx, step) 374 + tailDone <- e.tailStep(ctx, wfLogger, mkExecResp.ID, idx) 335 375 }() 336 376 337 377 select { ··· 377 417 return nil 378 418 } 379 419 380 - func (e *Engine) tailStep(ctx context.Context, wfLogger *models.WorkflowLogger, execID string, wid models.WorkflowId, stepIdx int, step models.Step) error { 420 + func (e *Engine) tailStep(ctx context.Context, wfLogger models.WorkflowLogger, execID string, stepIdx int) error { 381 421 if wfLogger == nil { 382 422 return nil 383 423 }
+2 -2
spindle/models/engine.go
··· 10 10 11 11 type Engine interface { 12 12 InitWorkflow(twf tangled.Pipeline_Workflow, tpl tangled.Pipeline) (*Workflow, error) 13 - SetupWorkflow(ctx context.Context, wid WorkflowId, wf *Workflow) error 13 + SetupWorkflow(ctx context.Context, wid WorkflowId, wf *Workflow, wfLogger WorkflowLogger) error 14 14 WorkflowTimeout() time.Duration 15 15 DestroyWorkflow(ctx context.Context, wid WorkflowId) error 16 - RunStep(ctx context.Context, wid WorkflowId, w *Workflow, idx int, secrets []secrets.UnlockedSecret, wfLogger *WorkflowLogger) error 16 + RunStep(ctx context.Context, wid WorkflowId, w *Workflow, idx int, secrets []secrets.UnlockedSecret, wfLogger WorkflowLogger) error 17 17 }