this repo has no description
1package engine
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "io"
8 "log/slog"
9 "os"
10 "strings"
11 "sync"
12 "time"
13
14 "github.com/docker/docker/api/types/container"
15 "github.com/docker/docker/api/types/image"
16 "github.com/docker/docker/api/types/mount"
17 "github.com/docker/docker/api/types/network"
18 "github.com/docker/docker/api/types/volume"
19 "github.com/docker/docker/client"
20 "github.com/docker/docker/pkg/stdcopy"
21 "tangled.sh/tangled.sh/core/log"
22 "tangled.sh/tangled.sh/core/notifier"
23 "tangled.sh/tangled.sh/core/spindle/config"
24 "tangled.sh/tangled.sh/core/spindle/db"
25 "tangled.sh/tangled.sh/core/spindle/models"
26)
27
28const (
29 workspaceDir = "/tangled/workspace"
30)
31
32type cleanupFunc func(context.Context) error
33
34type Engine struct {
35 docker client.APIClient
36 l *slog.Logger
37 db *db.DB
38 n *notifier.Notifier
39 cfg *config.Config
40
41 cleanupMu sync.Mutex
42 cleanup map[string][]cleanupFunc
43}
44
45func New(ctx context.Context, cfg *config.Config, db *db.DB, n *notifier.Notifier) (*Engine, error) {
46 dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
47 if err != nil {
48 return nil, err
49 }
50
51 l := log.FromContext(ctx).With("component", "spindle")
52
53 e := &Engine{
54 docker: dcli,
55 l: l,
56 db: db,
57 n: n,
58 cfg: cfg,
59 }
60
61 e.cleanup = make(map[string][]cleanupFunc)
62
63 return e, nil
64}
65
66func (e *Engine) StartWorkflows(ctx context.Context, pipeline *models.Pipeline, pipelineId models.PipelineId) {
67 e.l.Info("starting all workflows in parallel", "pipeline", pipelineId)
68
69 wg := sync.WaitGroup{}
70 for _, w := range pipeline.Workflows {
71 wg.Add(1)
72 go func() error {
73 defer wg.Done()
74 wid := models.WorkflowId{
75 PipelineId: pipelineId,
76 Name: w.Name,
77 }
78
79 err := e.db.StatusRunning(wid, e.n)
80 if err != nil {
81 return err
82 }
83
84 err = e.SetupWorkflow(ctx, wid)
85 if err != nil {
86 e.l.Error("setting up worklow", "wid", wid, "err", err)
87 return err
88 }
89 defer e.DestroyWorkflow(ctx, wid)
90
91 reader, err := e.docker.ImagePull(ctx, w.Image, image.PullOptions{})
92 if err != nil {
93 e.l.Error("pipeline failed!", "workflowId", wid, "error", err.Error())
94
95 err := e.db.StatusFailed(wid, err.Error(), -1, e.n)
96 if err != nil {
97 return err
98 }
99
100 return fmt.Errorf("pulling image: %w", err)
101 }
102 defer reader.Close()
103 io.Copy(os.Stdout, reader)
104
105 workflowTimeoutStr := e.cfg.Pipelines.WorkflowTimeout
106 workflowTimeout, err := time.ParseDuration(workflowTimeoutStr)
107 if err != nil {
108 e.l.Error("failed to parse workflow timeout", "error", err, "timeout", workflowTimeoutStr)
109 workflowTimeout = 5 * time.Minute
110 }
111 e.l.Info("using workflow timeout", "timeout", workflowTimeout)
112 ctx, cancel := context.WithTimeout(ctx, workflowTimeout)
113 defer cancel()
114
115 err = e.StartSteps(ctx, w.Steps, wid, w.Image)
116 if err != nil {
117 if errors.Is(err, ErrTimedOut) {
118 dbErr := e.db.StatusTimeout(wid, e.n)
119 if dbErr != nil {
120 return dbErr
121 }
122 } else {
123 dbErr := e.db.StatusFailed(wid, err.Error(), -1, e.n)
124 if dbErr != nil {
125 return dbErr
126 }
127 }
128
129 return fmt.Errorf("starting steps image: %w", err)
130 }
131
132 err = e.db.StatusSuccess(wid, e.n)
133 if err != nil {
134 return err
135 }
136
137 return nil
138 }()
139 }
140
141 wg.Wait()
142}
143
144// SetupWorkflow sets up a new network for the workflow and volumes for
145// the workspace and Nix store. These are persisted across steps and are
146// destroyed at the end of the workflow.
147func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId) error {
148 e.l.Info("setting up workflow", "workflow", wid)
149
150 _, err := e.docker.VolumeCreate(ctx, volume.CreateOptions{
151 Name: workspaceVolume(wid),
152 Driver: "local",
153 })
154 if err != nil {
155 return err
156 }
157 e.registerCleanup(wid, func(ctx context.Context) error {
158 return e.docker.VolumeRemove(ctx, workspaceVolume(wid), true)
159 })
160
161 _, err = e.docker.VolumeCreate(ctx, volume.CreateOptions{
162 Name: nixVolume(wid),
163 Driver: "local",
164 })
165 if err != nil {
166 return err
167 }
168 e.registerCleanup(wid, func(ctx context.Context) error {
169 return e.docker.VolumeRemove(ctx, nixVolume(wid), true)
170 })
171
172 _, err = e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{
173 Driver: "bridge",
174 })
175 if err != nil {
176 return err
177 }
178 e.registerCleanup(wid, func(ctx context.Context) error {
179 return e.docker.NetworkRemove(ctx, networkName(wid))
180 })
181
182 return nil
183}
184
185// StartSteps starts all steps sequentially with the same base image.
186// ONLY marks pipeline as failed if container's exit code is non-zero.
187// All other errors are bubbled up.
188// Fixed version of the step execution logic
189func (e *Engine) StartSteps(ctx context.Context, steps []models.Step, wid models.WorkflowId, image string) error {
190
191 for stepIdx, step := range steps {
192 select {
193 case <-ctx.Done():
194 return ctx.Err()
195 default:
196 }
197
198 envs := ConstructEnvs(step.Environment)
199 envs.AddEnv("HOME", workspaceDir)
200 e.l.Debug("envs for step", "step", step.Name, "envs", envs.Slice())
201
202 hostConfig := hostConfig(wid)
203 resp, err := e.docker.ContainerCreate(ctx, &container.Config{
204 Image: image,
205 Cmd: []string{"bash", "-c", step.Command},
206 WorkingDir: workspaceDir,
207 Tty: false,
208 Hostname: "spindle",
209 Env: envs.Slice(),
210 }, hostConfig, nil, nil, "")
211 defer e.DestroyStep(ctx, resp.ID)
212 if err != nil {
213 return fmt.Errorf("creating container: %w", err)
214 }
215
216 err = e.docker.NetworkConnect(ctx, networkName(wid), resp.ID, nil)
217 if err != nil {
218 return fmt.Errorf("connecting network: %w", err)
219 }
220
221 err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{})
222 if err != nil {
223 return err
224 }
225 e.l.Info("started container", "name", resp.ID, "step", step.Name)
226
227 // start tailing logs in background
228 tailDone := make(chan error, 1)
229 go func() {
230 tailDone <- e.TailStep(ctx, resp.ID, wid, stepIdx)
231 }()
232
233 // wait for container completion or timeout
234 waitDone := make(chan struct{})
235 var state *container.State
236 var waitErr error
237
238 go func() {
239 defer close(waitDone)
240 state, waitErr = e.WaitStep(ctx, resp.ID)
241 }()
242
243 select {
244 case <-waitDone:
245
246 // wait for tailing to complete
247 <-tailDone
248
249 case <-ctx.Done():
250 e.l.Warn("step timed out; killing container", "container", resp.ID, "step", step.Name)
251 err = e.DestroyStep(context.Background(), resp.ID)
252 if err != nil {
253 e.l.Error("failed to destroy step", "container", resp.ID, "error", err)
254 }
255
256 // wait for both goroutines to finish
257 <-waitDone
258 <-tailDone
259
260 return ErrTimedOut
261 }
262
263 select {
264 case <-ctx.Done():
265 return ctx.Err()
266 default:
267 }
268
269 if waitErr != nil {
270 return waitErr
271 }
272
273 err = e.DestroyStep(ctx, resp.ID)
274 if err != nil {
275 return err
276 }
277
278 if state.ExitCode != 0 {
279 e.l.Error("workflow failed!", "workflow_id", wid.String(), "error", state.Error, "exit_code", state.ExitCode, "oom_killed", state.OOMKilled)
280 if state.OOMKilled {
281 return ErrOOMKilled
282 }
283 return ErrWorkflowFailed
284 }
285 }
286
287 return nil
288}
289
290func (e *Engine) WaitStep(ctx context.Context, containerID string) (*container.State, error) {
291 wait, errCh := e.docker.ContainerWait(ctx, containerID, container.WaitConditionNotRunning)
292 select {
293 case err := <-errCh:
294 if err != nil {
295 return nil, err
296 }
297 case <-wait:
298 }
299
300 e.l.Info("waited for container", "name", containerID)
301
302 info, err := e.docker.ContainerInspect(ctx, containerID)
303 if err != nil {
304 return nil, err
305 }
306
307 return info.State, nil
308}
309
310func (e *Engine) TailStep(ctx context.Context, containerID string, wid models.WorkflowId, stepIdx int) error {
311 logs, err := e.docker.ContainerLogs(ctx, containerID, container.LogsOptions{
312 Follow: true,
313 ShowStdout: true,
314 ShowStderr: true,
315 Details: false,
316 Timestamps: false,
317 })
318 if err != nil {
319 return err
320 }
321
322 wfLogger, err := NewWorkflowLogger(e.cfg.Pipelines.LogDir, wid)
323 if err != nil {
324 e.l.Warn("failed to setup step logger; logs will not be persisted", "error", err)
325 return err
326 }
327 defer wfLogger.Close()
328
329 _, err = stdcopy.StdCopy(wfLogger.Stdout(), wfLogger.Stderr(), logs)
330 if err != nil && err != io.EOF && !errors.Is(err, context.DeadlineExceeded) {
331 return fmt.Errorf("failed to copy logs: %w", err)
332 }
333
334 return nil
335}
336
337func (e *Engine) DestroyStep(ctx context.Context, containerID string) error {
338 err := e.docker.ContainerKill(ctx, containerID, "9") // SIGKILL
339 if err != nil && !isErrContainerNotFoundOrNotRunning(err) {
340 return err
341 }
342
343 if err := e.docker.ContainerRemove(ctx, containerID, container.RemoveOptions{
344 RemoveVolumes: true,
345 RemoveLinks: false,
346 Force: false,
347 }); err != nil && !isErrContainerNotFoundOrNotRunning(err) {
348 return err
349 }
350
351 return nil
352}
353
354func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error {
355 e.cleanupMu.Lock()
356 key := wid.String()
357
358 fns := e.cleanup[key]
359 delete(e.cleanup, key)
360 e.cleanupMu.Unlock()
361
362 for _, fn := range fns {
363 if err := fn(ctx); err != nil {
364 e.l.Error("failed to cleanup workflow resource", "workflowId", wid, "error", err)
365 }
366 }
367 return nil
368}
369
370func (e *Engine) registerCleanup(wid models.WorkflowId, fn cleanupFunc) {
371 e.cleanupMu.Lock()
372 defer e.cleanupMu.Unlock()
373
374 key := wid.String()
375 e.cleanup[key] = append(e.cleanup[key], fn)
376}
377
378func workspaceVolume(wid models.WorkflowId) string {
379 return fmt.Sprintf("workspace-%s", wid)
380}
381
382func nixVolume(wid models.WorkflowId) string {
383 return fmt.Sprintf("nix-%s", wid)
384}
385
386func networkName(wid models.WorkflowId) string {
387 return fmt.Sprintf("workflow-network-%s", wid)
388}
389
390func hostConfig(wid models.WorkflowId) *container.HostConfig {
391 hostConfig := &container.HostConfig{
392 Mounts: []mount.Mount{
393 {
394 Type: mount.TypeVolume,
395 Source: workspaceVolume(wid),
396 Target: workspaceDir,
397 },
398 {
399 Type: mount.TypeVolume,
400 Source: nixVolume(wid),
401 Target: "/nix",
402 },
403 {
404 Type: mount.TypeTmpfs,
405 Target: "/tmp",
406 ReadOnly: false,
407 TmpfsOptions: &mount.TmpfsOptions{
408 Mode: 0o1777, // world-writeable sticky bit
409 },
410 },
411 {
412 Type: mount.TypeVolume,
413 Source: "etc-nix-" + wid.String(),
414 Target: "/etc/nix",
415 },
416 },
417 ReadonlyRootfs: false,
418 CapDrop: []string{"ALL"},
419 CapAdd: []string{"CAP_DAC_OVERRIDE"},
420 SecurityOpt: []string{"no-new-privileges"},
421 ExtraHosts: []string{"host.docker.internal:host-gateway"},
422 }
423
424 return hostConfig
425}
426
427// thanks woodpecker
428func isErrContainerNotFoundOrNotRunning(err error) bool {
429 // Error response from daemon: Cannot kill container: ...: No such container: ...
430 // Error response from daemon: Cannot kill container: ...: Container ... is not running"
431 // Error response from podman daemon: can only kill running containers. ... is in state exited
432 // Error: No such container: ...
433 return err != nil && (strings.Contains(err.Error(), "No such container") || strings.Contains(err.Error(), "is not running") || strings.Contains(err.Error(), "can only kill running containers"))
434}