this repo has no description
1package engine
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "io"
8 "log/slog"
9 "os"
10 "strings"
11 "sync"
12 "time"
13
14 "github.com/docker/docker/api/types/container"
15 "github.com/docker/docker/api/types/image"
16 "github.com/docker/docker/api/types/mount"
17 "github.com/docker/docker/api/types/network"
18 "github.com/docker/docker/api/types/volume"
19 "github.com/docker/docker/client"
20 "github.com/docker/docker/pkg/stdcopy"
21 "tangled.sh/tangled.sh/core/log"
22 "tangled.sh/tangled.sh/core/notifier"
23 "tangled.sh/tangled.sh/core/spindle/config"
24 "tangled.sh/tangled.sh/core/spindle/db"
25 "tangled.sh/tangled.sh/core/spindle/models"
26)
27
28const (
29 workspaceDir = "/tangled/workspace"
30)
31
32type cleanupFunc func(context.Context) error
33
34type Engine struct {
35 docker client.APIClient
36 l *slog.Logger
37 db *db.DB
38 n *notifier.Notifier
39 cfg *config.Config
40
41 cleanupMu sync.Mutex
42 cleanup map[string][]cleanupFunc
43}
44
45func New(ctx context.Context, cfg *config.Config, db *db.DB, n *notifier.Notifier) (*Engine, error) {
46 dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
47 if err != nil {
48 return nil, err
49 }
50
51 l := log.FromContext(ctx).With("component", "spindle")
52
53 e := &Engine{
54 docker: dcli,
55 l: l,
56 db: db,
57 n: n,
58 cfg: cfg,
59 }
60
61 e.cleanup = make(map[string][]cleanupFunc)
62
63 return e, nil
64}
65
66func (e *Engine) StartWorkflows(ctx context.Context, pipeline *models.Pipeline, pipelineId models.PipelineId) {
67 e.l.Info("starting all workflows in parallel", "pipeline", pipelineId)
68
69 wg := sync.WaitGroup{}
70 for _, w := range pipeline.Workflows {
71 wg.Add(1)
72 go func() error {
73 defer wg.Done()
74 wid := models.WorkflowId{
75 PipelineId: pipelineId,
76 Name: w.Name,
77 }
78
79 err := e.db.StatusRunning(wid, e.n)
80 if err != nil {
81 return err
82 }
83
84 err = e.SetupWorkflow(ctx, wid)
85 if err != nil {
86 e.l.Error("setting up worklow", "wid", wid, "err", err)
87 return err
88 }
89 defer e.DestroyWorkflow(ctx, wid)
90
91 reader, err := e.docker.ImagePull(ctx, w.Image, image.PullOptions{})
92 if err != nil {
93 e.l.Error("pipeline failed!", "workflowId", wid, "error", err.Error())
94
95 err := e.db.StatusFailed(wid, err.Error(), -1, e.n)
96 if err != nil {
97 return err
98 }
99
100 return fmt.Errorf("pulling image: %w", err)
101 }
102 defer reader.Close()
103 io.Copy(os.Stdout, reader)
104
105 err = e.StartSteps(ctx, w.Steps, wid, w.Image)
106 if err != nil {
107 if errors.Is(err, ErrTimedOut) {
108 dbErr := e.db.StatusTimeout(wid, e.n)
109 if dbErr != nil {
110 return dbErr
111 }
112 } else {
113 dbErr := e.db.StatusFailed(wid, err.Error(), -1, e.n)
114 if dbErr != nil {
115 return dbErr
116 }
117 }
118
119 return fmt.Errorf("starting steps image: %w", err)
120 }
121
122 err = e.db.StatusSuccess(wid, e.n)
123 if err != nil {
124 return err
125 }
126
127 return nil
128 }()
129 }
130
131 wg.Wait()
132}
133
134// SetupWorkflow sets up a new network for the workflow and volumes for
135// the workspace and Nix store. These are persisted across steps and are
136// destroyed at the end of the workflow.
137func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId) error {
138 e.l.Info("setting up workflow", "workflow", wid)
139
140 _, err := e.docker.VolumeCreate(ctx, volume.CreateOptions{
141 Name: workspaceVolume(wid),
142 Driver: "local",
143 })
144 if err != nil {
145 return err
146 }
147 e.registerCleanup(wid, func(ctx context.Context) error {
148 return e.docker.VolumeRemove(ctx, workspaceVolume(wid), true)
149 })
150
151 _, err = e.docker.VolumeCreate(ctx, volume.CreateOptions{
152 Name: nixVolume(wid),
153 Driver: "local",
154 })
155 if err != nil {
156 return err
157 }
158 e.registerCleanup(wid, func(ctx context.Context) error {
159 return e.docker.VolumeRemove(ctx, nixVolume(wid), true)
160 })
161
162 _, err = e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{
163 Driver: "bridge",
164 })
165 if err != nil {
166 return err
167 }
168 e.registerCleanup(wid, func(ctx context.Context) error {
169 return e.docker.NetworkRemove(ctx, networkName(wid))
170 })
171
172 return nil
173}
174
175// StartSteps starts all steps sequentially with the same base image.
176// ONLY marks pipeline as failed if container's exit code is non-zero.
177// All other errors are bubbled up.
178// Fixed version of the step execution logic
179func (e *Engine) StartSteps(ctx context.Context, steps []models.Step, wid models.WorkflowId, image string) error {
180 stepTimeoutStr := e.cfg.Pipelines.StepTimeout
181 stepTimeout, err := time.ParseDuration(stepTimeoutStr)
182 if err != nil {
183 e.l.Error("failed to parse step timeout", "error", err, "timeout", stepTimeoutStr)
184 stepTimeout = 5 * time.Minute
185 }
186 e.l.Info("using step timeout", "timeout", stepTimeout)
187
188 for stepIdx, step := range steps {
189 envs := ConstructEnvs(step.Environment)
190 envs.AddEnv("HOME", workspaceDir)
191 e.l.Debug("envs for step", "step", step.Name, "envs", envs.Slice())
192
193 hostConfig := hostConfig(wid)
194 resp, err := e.docker.ContainerCreate(ctx, &container.Config{
195 Image: image,
196 Cmd: []string{"bash", "-c", step.Command},
197 WorkingDir: workspaceDir,
198 Tty: false,
199 Hostname: "spindle",
200 Env: envs.Slice(),
201 }, hostConfig, nil, nil, "")
202 if err != nil {
203 return fmt.Errorf("creating container: %w", err)
204 }
205
206 err = e.docker.NetworkConnect(ctx, networkName(wid), resp.ID, nil)
207 if err != nil {
208 return fmt.Errorf("connecting network: %w", err)
209 }
210
211 stepCtx, stepCancel := context.WithTimeout(ctx, stepTimeout)
212
213 err = e.docker.ContainerStart(stepCtx, resp.ID, container.StartOptions{})
214 if err != nil {
215 stepCancel()
216 return err
217 }
218 e.l.Info("started container", "name", resp.ID, "step", step.Name)
219
220 // start tailing logs in background
221 tailDone := make(chan error, 1)
222 go func() {
223 tailDone <- e.TailStep(stepCtx, resp.ID, wid, stepIdx)
224 }()
225
226 // wait for container completion or timeout
227 waitDone := make(chan struct{})
228 var state *container.State
229 var waitErr error
230
231 go func() {
232 defer close(waitDone)
233 state, waitErr = e.WaitStep(stepCtx, resp.ID)
234 }()
235
236 select {
237 case <-waitDone:
238
239 // wait for tailing to complete
240 <-tailDone
241 stepCancel()
242
243 case <-stepCtx.Done():
244 e.l.Warn("step timed out; killing container", "container", resp.ID, "timeout", stepTimeout)
245
246 _ = e.DestroyStep(ctx, resp.ID)
247
248 // wait for both goroutines to finish
249 <-waitDone
250 <-tailDone
251
252 stepCancel()
253 return ErrTimedOut
254 }
255
256 if waitErr != nil {
257 return waitErr
258 }
259
260 err = e.DestroyStep(ctx, resp.ID)
261 if err != nil {
262 return err
263 }
264
265 if state.ExitCode != 0 {
266 e.l.Error("workflow failed!", "workflow_id", wid.String(), "error", state.Error, "exit_code", state.ExitCode, "oom_killed", state.OOMKilled)
267 if state.OOMKilled {
268 return ErrOOMKilled
269 }
270 return ErrWorkflowFailed
271 }
272 }
273
274 return nil
275}
276
277func (e *Engine) WaitStep(ctx context.Context, containerID string) (*container.State, error) {
278 wait, errCh := e.docker.ContainerWait(ctx, containerID, container.WaitConditionNotRunning)
279 select {
280 case err := <-errCh:
281 if err != nil {
282 return nil, err
283 }
284 case <-wait:
285 }
286
287 e.l.Info("waited for container", "name", containerID)
288
289 info, err := e.docker.ContainerInspect(ctx, containerID)
290 if err != nil {
291 return nil, err
292 }
293
294 return info.State, nil
295}
296
297func (e *Engine) TailStep(ctx context.Context, containerID string, wid models.WorkflowId, stepIdx int) error {
298 logs, err := e.docker.ContainerLogs(ctx, containerID, container.LogsOptions{
299 Follow: true,
300 ShowStdout: true,
301 ShowStderr: true,
302 Details: false,
303 Timestamps: false,
304 })
305 if err != nil {
306 return err
307 }
308
309 wfLogger, err := NewWorkflowLogger(e.cfg.Pipelines.LogDir, wid)
310 if err != nil {
311 e.l.Warn("failed to setup step logger; logs will not be persisted", "error", err)
312 return err
313 }
314 defer wfLogger.Close()
315
316 _, err = stdcopy.StdCopy(wfLogger.Stdout(), wfLogger.Stderr(), logs)
317 if err != nil && err != io.EOF && !errors.Is(err, context.DeadlineExceeded) {
318 return fmt.Errorf("failed to copy logs: %w", err)
319 }
320
321 return nil
322}
323
324func (e *Engine) DestroyStep(ctx context.Context, containerID string) error {
325 err := e.docker.ContainerKill(ctx, containerID, "9") // SIGKILL
326 if err != nil && !isErrContainerNotFoundOrNotRunning(err) {
327 return err
328 }
329
330 if err := e.docker.ContainerRemove(ctx, containerID, container.RemoveOptions{
331 RemoveVolumes: true,
332 RemoveLinks: false,
333 Force: false,
334 }); err != nil && !isErrContainerNotFoundOrNotRunning(err) {
335 return err
336 }
337
338 return nil
339}
340
341func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error {
342 e.cleanupMu.Lock()
343 key := wid.String()
344
345 fns := e.cleanup[key]
346 delete(e.cleanup, key)
347 e.cleanupMu.Unlock()
348
349 for _, fn := range fns {
350 if err := fn(ctx); err != nil {
351 e.l.Error("failed to cleanup workflow resource", "workflowId", wid, "error", err)
352 }
353 }
354 return nil
355}
356
357func (e *Engine) registerCleanup(wid models.WorkflowId, fn cleanupFunc) {
358 e.cleanupMu.Lock()
359 defer e.cleanupMu.Unlock()
360
361 key := wid.String()
362 e.cleanup[key] = append(e.cleanup[key], fn)
363}
364
365func workspaceVolume(wid models.WorkflowId) string {
366 return fmt.Sprintf("workspace-%s", wid)
367}
368
369func nixVolume(wid models.WorkflowId) string {
370 return fmt.Sprintf("nix-%s", wid)
371}
372
373func networkName(wid models.WorkflowId) string {
374 return fmt.Sprintf("workflow-network-%s", wid)
375}
376
377func hostConfig(wid models.WorkflowId) *container.HostConfig {
378 hostConfig := &container.HostConfig{
379 Mounts: []mount.Mount{
380 {
381 Type: mount.TypeVolume,
382 Source: workspaceVolume(wid),
383 Target: workspaceDir,
384 },
385 {
386 Type: mount.TypeVolume,
387 Source: nixVolume(wid),
388 Target: "/nix",
389 },
390 {
391 Type: mount.TypeTmpfs,
392 Target: "/tmp",
393 ReadOnly: false,
394 TmpfsOptions: &mount.TmpfsOptions{
395 Mode: 0o1777, // world-writeable sticky bit
396 },
397 },
398 {
399 Type: mount.TypeVolume,
400 Source: "etc-nix-" + wid.String(),
401 Target: "/etc/nix",
402 },
403 },
404 ReadonlyRootfs: false,
405 CapDrop: []string{"ALL"},
406 CapAdd: []string{"CAP_DAC_OVERRIDE"},
407 SecurityOpt: []string{"no-new-privileges"},
408 ExtraHosts: []string{"host.docker.internal:host-gateway"},
409 }
410
411 return hostConfig
412}
413
414// thanks woodpecker
415func isErrContainerNotFoundOrNotRunning(err error) bool {
416 // Error response from daemon: Cannot kill container: ...: No such container: ...
417 // Error response from daemon: Cannot kill container: ...: Container ... is not running"
418 // Error response from podman daemon: can only kill running containers. ... is in state exited
419 // Error: No such container: ...
420 return err != nil && (strings.Contains(err.Error(), "No such container") || strings.Contains(err.Error(), "is not running") || strings.Contains(err.Error(), "can only kill running containers"))
421}