this repo has no description
1package engine
2
3import (
4 "bufio"
5 "context"
6 "errors"
7 "fmt"
8 "io"
9 "log/slog"
10 "os"
11 "strings"
12 "sync"
13 "time"
14
15 "github.com/docker/docker/api/types/container"
16 "github.com/docker/docker/api/types/image"
17 "github.com/docker/docker/api/types/mount"
18 "github.com/docker/docker/api/types/network"
19 "github.com/docker/docker/api/types/volume"
20 "github.com/docker/docker/client"
21 "github.com/docker/docker/pkg/stdcopy"
22 "tangled.sh/tangled.sh/core/log"
23 "tangled.sh/tangled.sh/core/notifier"
24 "tangled.sh/tangled.sh/core/spindle/config"
25 "tangled.sh/tangled.sh/core/spindle/db"
26 "tangled.sh/tangled.sh/core/spindle/models"
27)
28
29const (
30 workspaceDir = "/tangled/workspace"
31)
32
33type cleanupFunc func(context.Context) error
34
35type Engine struct {
36 docker client.APIClient
37 l *slog.Logger
38 db *db.DB
39 n *notifier.Notifier
40 cfg *config.Config
41
42 chanMu sync.RWMutex
43 stdoutChans map[string]chan string
44 stderrChans map[string]chan string
45
46 cleanupMu sync.Mutex
47 cleanup map[string][]cleanupFunc
48}
49
50func New(ctx context.Context, cfg *config.Config, db *db.DB, n *notifier.Notifier) (*Engine, error) {
51 dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
52 if err != nil {
53 return nil, err
54 }
55
56 l := log.FromContext(ctx).With("component", "spindle")
57
58 e := &Engine{
59 docker: dcli,
60 l: l,
61 db: db,
62 n: n,
63 cfg: cfg,
64 }
65
66 e.stdoutChans = make(map[string]chan string, 100)
67 e.stderrChans = make(map[string]chan string, 100)
68
69 e.cleanup = make(map[string][]cleanupFunc)
70
71 return e, nil
72}
73
74func (e *Engine) StartWorkflows(ctx context.Context, pipeline *models.Pipeline, pipelineId models.PipelineId) {
75 e.l.Info("starting all workflows in parallel", "pipeline", pipelineId)
76
77 wg := sync.WaitGroup{}
78 for _, w := range pipeline.Workflows {
79 wg.Add(1)
80 go func() error {
81 defer wg.Done()
82 wid := models.WorkflowId{
83 PipelineId: pipelineId,
84 Name: w.Name,
85 }
86
87 err := e.db.StatusRunning(wid, e.n)
88 if err != nil {
89 return err
90 }
91
92 err = e.SetupWorkflow(ctx, wid)
93 if err != nil {
94 e.l.Error("setting up worklow", "wid", wid, "err", err)
95 return err
96 }
97 defer e.DestroyWorkflow(ctx, wid)
98
99 reader, err := e.docker.ImagePull(ctx, w.Image, image.PullOptions{})
100 if err != nil {
101 e.l.Error("pipeline failed!", "workflowId", wid, "error", err.Error())
102
103 err := e.db.StatusFailed(wid, err.Error(), -1, e.n)
104 if err != nil {
105 return err
106 }
107
108 return fmt.Errorf("pulling image: %w", err)
109 }
110 defer reader.Close()
111 io.Copy(os.Stdout, reader)
112
113 err = e.StartSteps(ctx, w.Steps, wid, w.Image)
114 if err != nil {
115 e.l.Error("workflow failed!", "wid", wid.String(), "error", err.Error())
116
117 dbErr := e.db.StatusFailed(wid, err.Error(), -1, e.n)
118 if dbErr != nil {
119 return dbErr
120 }
121
122 return fmt.Errorf("starting steps image: %w", err)
123 }
124
125 err = e.db.StatusSuccess(wid, e.n)
126 if err != nil {
127 return err
128 }
129
130 return nil
131 }()
132 }
133
134 wg.Wait()
135}
136
137// SetupWorkflow sets up a new network for the workflow and volumes for
138// the workspace and Nix store. These are persisted across steps and are
139// destroyed at the end of the workflow.
140func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId) error {
141 e.l.Info("setting up workflow", "workflow", wid)
142
143 _, err := e.docker.VolumeCreate(ctx, volume.CreateOptions{
144 Name: workspaceVolume(wid),
145 Driver: "local",
146 })
147 if err != nil {
148 return err
149 }
150 e.registerCleanup(wid, func(ctx context.Context) error {
151 return e.docker.VolumeRemove(ctx, workspaceVolume(wid), true)
152 })
153
154 _, err = e.docker.VolumeCreate(ctx, volume.CreateOptions{
155 Name: nixVolume(wid),
156 Driver: "local",
157 })
158 if err != nil {
159 return err
160 }
161 e.registerCleanup(wid, func(ctx context.Context) error {
162 return e.docker.VolumeRemove(ctx, nixVolume(wid), true)
163 })
164
165 _, err = e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{
166 Driver: "bridge",
167 })
168 if err != nil {
169 return err
170 }
171 e.registerCleanup(wid, func(ctx context.Context) error {
172 return e.docker.NetworkRemove(ctx, networkName(wid))
173 })
174
175 return nil
176}
177
178// StartSteps starts all steps sequentially with the same base image.
179// ONLY marks pipeline as failed if container's exit code is non-zero.
180// All other errors are bubbled up.
181// Fixed version of the step execution logic
182func (e *Engine) StartSteps(ctx context.Context, steps []models.Step, wid models.WorkflowId, image string) error {
183 stepTimeoutStr := e.cfg.Pipelines.StepTimeout
184 stepTimeout, err := time.ParseDuration(stepTimeoutStr)
185 if err != nil {
186 e.l.Error("failed to parse step timeout", "error", err, "timeout", stepTimeoutStr)
187 stepTimeout = 5 * time.Minute
188 }
189 e.l.Info("using step timeout", "timeout", stepTimeout)
190
191 e.chanMu.Lock()
192 if _, exists := e.stdoutChans[wid.String()]; !exists {
193 e.stdoutChans[wid.String()] = make(chan string, 100)
194 }
195 if _, exists := e.stderrChans[wid.String()]; !exists {
196 e.stderrChans[wid.String()] = make(chan string, 100)
197 }
198 e.chanMu.Unlock()
199
200 // close channels after all steps are complete
201 defer func() {
202 close(e.stdoutChans[wid.String()])
203 close(e.stderrChans[wid.String()])
204 }()
205
206 for stepIdx, step := range steps {
207 envs := ConstructEnvs(step.Environment)
208 envs.AddEnv("HOME", workspaceDir)
209 e.l.Debug("envs for step", "step", step.Name, "envs", envs.Slice())
210
211 hostConfig := hostConfig(wid)
212 resp, err := e.docker.ContainerCreate(ctx, &container.Config{
213 Image: image,
214 Cmd: []string{"bash", "-c", step.Command},
215 WorkingDir: workspaceDir,
216 Tty: false,
217 Hostname: "spindle",
218 Env: envs.Slice(),
219 }, hostConfig, nil, nil, "")
220 if err != nil {
221 return fmt.Errorf("creating container: %w", err)
222 }
223
224 err = e.docker.NetworkConnect(ctx, networkName(wid), resp.ID, nil)
225 if err != nil {
226 return fmt.Errorf("connecting network: %w", err)
227 }
228
229 stepCtx, stepCancel := context.WithTimeout(ctx, stepTimeout)
230
231 err = e.docker.ContainerStart(stepCtx, resp.ID, container.StartOptions{})
232 if err != nil {
233 stepCancel()
234 return err
235 }
236 e.l.Info("started container", "name", resp.ID, "step", step.Name)
237
238 // start tailing logs in background
239 tailDone := make(chan error, 1)
240 go func() {
241 tailDone <- e.TailStep(stepCtx, resp.ID, wid, stepIdx)
242 }()
243
244 // wait for container completion or timeout
245 waitDone := make(chan struct{})
246 var state *container.State
247 var waitErr error
248
249 go func() {
250 defer close(waitDone)
251 state, waitErr = e.WaitStep(stepCtx, resp.ID)
252 }()
253
254 select {
255 case <-waitDone:
256
257 // wait for tailing to complete
258 <-tailDone
259 stepCancel()
260
261 case <-stepCtx.Done():
262 e.l.Warn("step timed out; killing container", "container", resp.ID, "timeout", stepTimeout)
263
264 _ = e.DestroyStep(ctx, resp.ID)
265
266 // wait for both goroutines to finish
267 <-waitDone
268 <-tailDone
269
270 stepCancel()
271 return fmt.Errorf("step timed out after %v", stepTimeout)
272 }
273
274 if waitErr != nil {
275 return waitErr
276 }
277
278 err = e.DestroyStep(ctx, resp.ID)
279 if err != nil {
280 return err
281 }
282
283 if state.ExitCode != 0 {
284 e.l.Error("workflow failed!", "workflow_id", wid.String(), "error", state.Error, "exit_code", state.ExitCode, "oom_killed", state.OOMKilled)
285 err := e.db.StatusFailed(wid, state.Error, int64(state.ExitCode), e.n)
286 if err != nil {
287 return err
288 }
289 return fmt.Errorf("error: %s, exit code: %d, oom: %t", state.Error, state.ExitCode, state.OOMKilled)
290 }
291 }
292
293 return nil
294}
295
296func (e *Engine) WaitStep(ctx context.Context, containerID string) (*container.State, error) {
297 wait, errCh := e.docker.ContainerWait(ctx, containerID, container.WaitConditionNotRunning)
298 select {
299 case err := <-errCh:
300 if err != nil {
301 return nil, err
302 }
303 case <-wait:
304 }
305
306 e.l.Info("waited for container", "name", containerID)
307
308 info, err := e.docker.ContainerInspect(ctx, containerID)
309 if err != nil {
310 return nil, err
311 }
312
313 return info.State, nil
314}
315
316func (e *Engine) TailStep(ctx context.Context, containerID string, wid models.WorkflowId, stepIdx int) error {
317 logs, err := e.docker.ContainerLogs(ctx, containerID, container.LogsOptions{
318 Follow: true,
319 ShowStdout: true,
320 ShowStderr: true,
321 Details: true,
322 Timestamps: false,
323 })
324 if err != nil {
325 return err
326 }
327
328 stepLogger, err := NewStepLogger(e.cfg.Pipelines.LogDir, wid.String(), stepIdx)
329 if err != nil {
330 e.l.Warn("failed to setup step logger; logs will not be persisted", "error", err)
331 }
332
333 var logOutput io.Writer = io.Discard
334
335 if e.cfg.Server.Dev {
336 logOutput = &ansiStrippingWriter{underlying: os.Stdout}
337 }
338
339 tee := io.TeeReader(logs, logOutput)
340
341 // using StdCopy we demux logs and stream stdout and stderr to different
342 // channels.
343 //
344 // stdout w||r stdoutCh
345 // stderr w||r stderrCh
346 //
347
348 rpipeOut, wpipeOut := io.Pipe()
349 rpipeErr, wpipeErr := io.Pipe()
350
351 // sets up a io.MultiWriter to write to both the pipe
352 // and the file-based logger.
353 multiOut := io.MultiWriter(wpipeOut, stepLogger.Stdout())
354 multiErr := io.MultiWriter(wpipeErr, stepLogger.Stderr())
355
356 wg := sync.WaitGroup{}
357
358 wg.Add(1)
359 go func() {
360 defer wg.Done()
361 defer wpipeOut.Close()
362 defer wpipeErr.Close()
363 defer stepLogger.Close()
364 _, err := stdcopy.StdCopy(multiOut, multiErr, tee)
365 if err != nil && err != io.EOF && !errors.Is(context.DeadlineExceeded, err) {
366 e.l.Error("failed to copy logs", "error", err)
367 }
368 }()
369
370 // read from stdout and send to stdout pipe
371 // NOTE: the stdoutCh channnel is closed further up in StartSteps
372 // once all steps are done.
373 wg.Add(1)
374 go func() {
375 defer wg.Done()
376 e.chanMu.RLock()
377 stdoutCh := e.stdoutChans[wid.String()]
378 e.chanMu.RUnlock()
379
380 scanner := bufio.NewScanner(rpipeOut)
381 for scanner.Scan() {
382 stdoutCh <- scanner.Text()
383 }
384 if err := scanner.Err(); err != nil {
385 e.l.Error("failed to scan stdout", "error", err)
386 }
387 }()
388
389 // read from stderr and send to stderr pipe
390 // NOTE: the stderrCh channnel is closed further up in StartSteps
391 // once all steps are done.
392 wg.Add(1)
393 go func() {
394 defer wg.Done()
395 e.chanMu.RLock()
396 stderrCh := e.stderrChans[wid.String()]
397 e.chanMu.RUnlock()
398
399 scanner := bufio.NewScanner(rpipeErr)
400 for scanner.Scan() {
401 stderrCh <- scanner.Text()
402 }
403 if err := scanner.Err(); err != nil {
404 e.l.Error("failed to scan stderr", "error", err)
405 }
406 }()
407
408 wg.Wait()
409
410 return nil
411}
412
413func (e *Engine) DestroyStep(ctx context.Context, containerID string) error {
414 err := e.docker.ContainerKill(ctx, containerID, "9") // SIGKILL
415 if err != nil && !isErrContainerNotFoundOrNotRunning(err) {
416 return err
417 }
418
419 if err := e.docker.ContainerRemove(ctx, containerID, container.RemoveOptions{
420 RemoveVolumes: true,
421 RemoveLinks: false,
422 Force: false,
423 }); err != nil && !isErrContainerNotFoundOrNotRunning(err) {
424 return err
425 }
426
427 return nil
428}
429
430func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error {
431 e.cleanupMu.Lock()
432 key := wid.String()
433
434 fns := e.cleanup[key]
435 delete(e.cleanup, key)
436 e.cleanupMu.Unlock()
437
438 for _, fn := range fns {
439 if err := fn(ctx); err != nil {
440 e.l.Error("failed to cleanup workflow resource", "workflowId", wid, "error", err)
441 }
442 }
443 return nil
444}
445
446func (e *Engine) LogChannels(wid models.WorkflowId) (stdout <-chan string, stderr <-chan string, ok bool) {
447 e.chanMu.RLock()
448 defer e.chanMu.RUnlock()
449
450 stdoutCh, ok1 := e.stdoutChans[wid.String()]
451 stderrCh, ok2 := e.stderrChans[wid.String()]
452
453 if !ok1 || !ok2 {
454 return nil, nil, false
455 }
456 return stdoutCh, stderrCh, true
457}
458
459func (e *Engine) registerCleanup(wid models.WorkflowId, fn cleanupFunc) {
460 e.cleanupMu.Lock()
461 defer e.cleanupMu.Unlock()
462
463 key := wid.String()
464 e.cleanup[key] = append(e.cleanup[key], fn)
465}
466
467func workspaceVolume(wid models.WorkflowId) string {
468 return fmt.Sprintf("workspace-%s", wid)
469}
470
471func nixVolume(wid models.WorkflowId) string {
472 return fmt.Sprintf("nix-%s", wid)
473}
474
475func networkName(wid models.WorkflowId) string {
476 return fmt.Sprintf("workflow-network-%s", wid)
477}
478
479func hostConfig(wid models.WorkflowId) *container.HostConfig {
480 hostConfig := &container.HostConfig{
481 Mounts: []mount.Mount{
482 {
483 Type: mount.TypeVolume,
484 Source: workspaceVolume(wid),
485 Target: workspaceDir,
486 },
487 {
488 Type: mount.TypeVolume,
489 Source: nixVolume(wid),
490 Target: "/nix",
491 },
492 {
493 Type: mount.TypeTmpfs,
494 Target: "/tmp",
495 ReadOnly: false,
496 TmpfsOptions: &mount.TmpfsOptions{
497 Mode: 0o1777, // world-writeable sticky bit
498 },
499 },
500 {
501 Type: mount.TypeVolume,
502 Source: "etc-nix-" + wid.String(),
503 Target: "/etc/nix",
504 },
505 },
506 ReadonlyRootfs: false,
507 CapDrop: []string{"ALL"},
508 CapAdd: []string{"CAP_DAC_OVERRIDE"},
509 SecurityOpt: []string{"no-new-privileges"},
510 }
511
512 return hostConfig
513}
514
515// thanks woodpecker
516func isErrContainerNotFoundOrNotRunning(err error) bool {
517 // Error response from daemon: Cannot kill container: ...: No such container: ...
518 // Error response from daemon: Cannot kill container: ...: Container ... is not running"
519 // Error response from podman daemon: can only kill running containers. ... is in state exited
520 // Error: No such container: ...
521 return err != nil && (strings.Contains(err.Error(), "No such container") || strings.Contains(err.Error(), "is not running") || strings.Contains(err.Error(), "can only kill running containers"))
522}