Signed-off-by: oppiliappan me@oppi.li
+16
-14
spindle/engine/engine.go
+16
-14
spindle/engine/engine.go
···
30
30
}
31
31
}
32
32
33
+
secretValues := make([]string, len(allSecrets))
34
+
for i, s := range allSecrets {
35
+
secretValues[i] = s.Value
36
+
}
37
+
33
38
var wg sync.WaitGroup
34
39
for eng, wfs := range pipeline.Workflows {
35
40
workflowTimeout := eng.WorkflowTimeout()
···
45
50
Name: w.Name,
46
51
}
47
52
48
-
err := db.StatusRunning(wid, n)
53
+
wfLogger, err := models.NewFileWorkflowLogger(cfg.Server.LogDir, wid, secretValues)
54
+
if err != nil {
55
+
l.Warn("failed to setup step logger; logs will not be persisted", "error", err)
56
+
wfLogger = models.NullLogger{}
57
+
} else {
58
+
l.Info("setup step logger; logs will be persisted", "logDir", cfg.Server.LogDir, "wid", wid)
59
+
defer wfLogger.Close()
60
+
}
61
+
62
+
err = db.StatusRunning(wid, n)
49
63
if err != nil {
50
64
l.Error("failed to set workflow status to running", "wid", wid, "err", err)
51
65
return
52
66
}
53
67
54
-
err = eng.SetupWorkflow(ctx, wid, &w)
68
+
err = eng.SetupWorkflow(ctx, wid, &w, wfLogger)
55
69
if err != nil {
56
70
// TODO(winter): Should this always set StatusFailed?
57
71
// In the original, we only do in a subset of cases.
···
70
84
}
71
85
defer eng.DestroyWorkflow(ctx, wid)
72
86
73
-
secretValues := make([]string, len(allSecrets))
74
-
for i, s := range allSecrets {
75
-
secretValues[i] = s.Value
76
-
}
77
-
wfLogger, err := models.NewWorkflowLogger(cfg.Server.LogDir, wid, secretValues)
78
-
if err != nil {
79
-
l.Warn("failed to setup step logger; logs will not be persisted", "error", err)
80
-
wfLogger = nil
81
-
} else {
82
-
defer wfLogger.Close()
83
-
}
84
-
85
87
ctx, cancel := context.WithTimeout(ctx, workflowTimeout)
86
88
defer cancel()
87
89
+49
-9
spindle/engines/nixery/engine.go
+49
-9
spindle/engines/nixery/engine.go
···
1
1
package nixery
2
2
3
3
import (
4
+
"bufio"
4
5
"context"
5
6
"errors"
6
7
"fmt"
7
8
"io"
8
9
"log/slog"
9
-
"os"
10
10
"path"
11
11
"runtime"
12
12
"sync"
···
169
169
return e, nil
170
170
}
171
171
172
-
func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId, wf *models.Workflow) error {
173
-
e.l.Info("setting up workflow", "workflow", wid)
172
+
func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId, wf *models.Workflow, wfLogger models.WorkflowLogger) error {
173
+
/// -------------------------INITIAL SETUP------------------------------------------
174
+
l := e.l.With("workflow", wid)
175
+
l.Info("setting up workflow")
174
176
177
+
setupStep := Step{
178
+
name: "nixery image pull",
179
+
kind: models.StepKindSystem,
180
+
}
181
+
setupStepIdx := -1
182
+
183
+
wfLogger.ControlWriter(setupStepIdx, setupStep, models.StepStatusStart).Write([]byte{0})
184
+
defer wfLogger.ControlWriter(setupStepIdx, setupStep, models.StepStatusEnd).Write([]byte{0})
185
+
186
+
/// -------------------------NETWORK CREATION---------------------------------------
175
187
_, err := e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{
176
188
Driver: "bridge",
177
189
})
178
190
if err != nil {
179
191
return err
180
192
}
193
+
181
194
e.registerCleanup(wid, func(ctx context.Context) error {
182
195
if err := e.docker.NetworkRemove(ctx, networkName(wid)); err != nil {
183
196
return fmt.Errorf("removing network: %w", err)
···
185
198
return nil
186
199
})
187
200
201
+
/// -------------------------IMAGE PULL---------------------------------------------
188
202
addl := wf.Data.(addlFields)
203
+
l.Info("pulling image", "image", addl.image)
204
+
fmt.Fprintf(
205
+
wfLogger.DataWriter(setupStepIdx, "stdout"),
206
+
"pulling image: %s",
207
+
addl.image,
208
+
)
189
209
190
210
reader, err := e.docker.ImagePull(ctx, addl.image, image.PullOptions{})
191
211
if err != nil {
192
-
e.l.Error("pipeline image pull failed!", "image", addl.image, "workflowId", wid, "error", err.Error())
193
-
212
+
l.Error("pipeline image pull failed!", "error", err.Error())
213
+
fmt.Fprintf(wfLogger.DataWriter(setupStepIdx, "stderr"), "image pull failed: %s", err)
194
214
return fmt.Errorf("pulling image: %w", err)
195
215
}
196
216
defer reader.Close()
197
-
io.Copy(os.Stdout, reader)
217
+
218
+
scanner := bufio.NewScanner(reader)
219
+
for scanner.Scan() {
220
+
line := scanner.Text()
221
+
wfLogger.DataWriter(setupStepIdx, "stdout").Write([]byte(line))
222
+
l.Info("image pull progress", "stdout", line)
223
+
}
224
+
225
+
/// -------------------------CONTAINER CREATION-------------------------------------
226
+
l.Info("creating container")
227
+
wfLogger.DataWriter(setupStepIdx, "stdout").Write([]byte("creating container..."))
198
228
199
229
resp, err := e.docker.ContainerCreate(ctx, &container.Config{
200
230
Image: addl.image,
···
229
259
ExtraHosts: []string{"host.docker.internal:host-gateway"},
230
260
}, nil, nil, "")
231
261
if err != nil {
262
+
fmt.Fprintf(
263
+
wfLogger.DataWriter(setupStepIdx, "stderr"),
264
+
"container creation failed: %s",
265
+
err,
266
+
)
232
267
return fmt.Errorf("creating container: %w", err)
233
268
}
269
+
234
270
e.registerCleanup(wid, func(ctx context.Context) error {
235
271
if err := e.docker.ContainerStop(ctx, resp.ID, container.StopOptions{}); err != nil {
236
272
return fmt.Errorf("stopping container: %w", err)
···
244
280
if err != nil {
245
281
return fmt.Errorf("removing container: %w", err)
246
282
}
283
+
247
284
return nil
248
285
})
249
286
287
+
/// -------------------------CONTAINER START----------------------------------------
288
+
wfLogger.DataWriter(setupStepIdx, "stdout").Write([]byte("starting container..."))
250
289
if err := e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{}); err != nil {
251
290
return fmt.Errorf("starting container: %w", err)
252
291
}
···
273
312
return err
274
313
}
275
314
315
+
/// -----------------------------------FINISH---------------------------------------
276
316
execInspectResp, err := e.docker.ContainerExecInspect(ctx, mkExecResp.ID)
277
317
if err != nil {
278
318
return err
···
290
330
return nil
291
331
}
292
332
293
-
func (e *Engine) RunStep(ctx context.Context, wid models.WorkflowId, w *models.Workflow, idx int, secrets []secrets.UnlockedSecret, wfLogger *models.WorkflowLogger) error {
333
+
func (e *Engine) RunStep(ctx context.Context, wid models.WorkflowId, w *models.Workflow, idx int, secrets []secrets.UnlockedSecret, wfLogger models.WorkflowLogger) error {
294
334
addl := w.Data.(addlFields)
295
335
workflowEnvs := ConstructEnvs(w.Environment)
296
336
// TODO(winter): should SetupWorkflow also have secret access?
···
331
371
// start tailing logs in background
332
372
tailDone := make(chan error, 1)
333
373
go func() {
334
-
tailDone <- e.tailStep(ctx, wfLogger, mkExecResp.ID, wid, idx, step)
374
+
tailDone <- e.tailStep(ctx, wfLogger, mkExecResp.ID, idx)
335
375
}()
336
376
337
377
select {
···
377
417
return nil
378
418
}
379
419
380
-
func (e *Engine) tailStep(ctx context.Context, wfLogger *models.WorkflowLogger, execID string, wid models.WorkflowId, stepIdx int, step models.Step) error {
420
+
func (e *Engine) tailStep(ctx context.Context, wfLogger models.WorkflowLogger, execID string, stepIdx int) error {
381
421
if wfLogger == nil {
382
422
return nil
383
423
}
+2
-2
spindle/models/engine.go
+2
-2
spindle/models/engine.go
···
10
10
11
11
type Engine interface {
12
12
InitWorkflow(twf tangled.Pipeline_Workflow, tpl tangled.Pipeline) (*Workflow, error)
13
-
SetupWorkflow(ctx context.Context, wid WorkflowId, wf *Workflow) error
13
+
SetupWorkflow(ctx context.Context, wid WorkflowId, wf *Workflow, wfLogger WorkflowLogger) error
14
14
WorkflowTimeout() time.Duration
15
15
DestroyWorkflow(ctx context.Context, wid WorkflowId) error
16
-
RunStep(ctx context.Context, wid WorkflowId, w *Workflow, idx int, secrets []secrets.UnlockedSecret, wfLogger *WorkflowLogger) error
16
+
RunStep(ctx context.Context, wid WorkflowId, w *Workflow, idx int, secrets []secrets.UnlockedSecret, wfLogger WorkflowLogger) error
17
17
}
History
1 round
1 comment
oppi.li
submitted
#0
1 commit
expand
collapse
spindle/models: stream nixery image pull errors to clients
Signed-off-by: oppiliappan <me@oppi.li>
3/3 success
expand
collapse
expand 1 comment
pull request successfully merged
some sample logs: