this repo has no description
1package engine
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "io"
8 "log/slog"
9 "os"
10 "strings"
11 "sync"
12 "time"
13
14 "github.com/docker/docker/api/types/container"
15 "github.com/docker/docker/api/types/image"
16 "github.com/docker/docker/api/types/mount"
17 "github.com/docker/docker/api/types/network"
18 "github.com/docker/docker/api/types/volume"
19 "github.com/docker/docker/client"
20 "github.com/docker/docker/pkg/stdcopy"
21 "tangled.sh/tangled.sh/core/log"
22 "tangled.sh/tangled.sh/core/notifier"
23 "tangled.sh/tangled.sh/core/spindle/config"
24 "tangled.sh/tangled.sh/core/spindle/db"
25 "tangled.sh/tangled.sh/core/spindle/models"
26)
27
28const (
29 workspaceDir = "/tangled/workspace"
30)
31
32type cleanupFunc func(context.Context) error
33
34type Engine struct {
35 docker client.APIClient
36 l *slog.Logger
37 db *db.DB
38 n *notifier.Notifier
39 cfg *config.Config
40
41 cleanupMu sync.Mutex
42 cleanup map[string][]cleanupFunc
43}
44
45func New(ctx context.Context, cfg *config.Config, db *db.DB, n *notifier.Notifier) (*Engine, error) {
46 dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
47 if err != nil {
48 return nil, err
49 }
50
51 l := log.FromContext(ctx).With("component", "spindle")
52
53 e := &Engine{
54 docker: dcli,
55 l: l,
56 db: db,
57 n: n,
58 cfg: cfg,
59 }
60
61 e.cleanup = make(map[string][]cleanupFunc)
62
63 return e, nil
64}
65
66func (e *Engine) StartWorkflows(ctx context.Context, pipeline *models.Pipeline, pipelineId models.PipelineId) {
67 e.l.Info("starting all workflows in parallel", "pipeline", pipelineId)
68
69 wg := sync.WaitGroup{}
70 for _, w := range pipeline.Workflows {
71 wg.Add(1)
72 go func() error {
73 defer wg.Done()
74 wid := models.WorkflowId{
75 PipelineId: pipelineId,
76 Name: w.Name,
77 }
78
79 err := e.db.StatusRunning(wid, e.n)
80 if err != nil {
81 return err
82 }
83
84 err = e.SetupWorkflow(ctx, wid)
85 if err != nil {
86 e.l.Error("setting up worklow", "wid", wid, "err", err)
87 return err
88 }
89 defer e.DestroyWorkflow(ctx, wid)
90
91 reader, err := e.docker.ImagePull(ctx, w.Image, image.PullOptions{})
92 if err != nil {
93 e.l.Error("pipeline failed!", "workflowId", wid, "error", err.Error())
94
95 err := e.db.StatusFailed(wid, err.Error(), -1, e.n)
96 if err != nil {
97 return err
98 }
99
100 return fmt.Errorf("pulling image: %w", err)
101 }
102 defer reader.Close()
103 io.Copy(os.Stdout, reader)
104
105 err = e.StartSteps(ctx, w.Steps, wid, w.Image)
106 if err != nil {
107 e.l.Error("workflow failed!", "wid", wid.String(), "error", err.Error())
108
109 dbErr := e.db.StatusFailed(wid, err.Error(), -1, e.n)
110 if dbErr != nil {
111 return dbErr
112 }
113
114 return fmt.Errorf("starting steps image: %w", err)
115 }
116
117 err = e.db.StatusSuccess(wid, e.n)
118 if err != nil {
119 return err
120 }
121
122 return nil
123 }()
124 }
125
126 wg.Wait()
127}
128
129// SetupWorkflow sets up a new network for the workflow and volumes for
130// the workspace and Nix store. These are persisted across steps and are
131// destroyed at the end of the workflow.
132func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId) error {
133 e.l.Info("setting up workflow", "workflow", wid)
134
135 _, err := e.docker.VolumeCreate(ctx, volume.CreateOptions{
136 Name: workspaceVolume(wid),
137 Driver: "local",
138 })
139 if err != nil {
140 return err
141 }
142 e.registerCleanup(wid, func(ctx context.Context) error {
143 return e.docker.VolumeRemove(ctx, workspaceVolume(wid), true)
144 })
145
146 _, err = e.docker.VolumeCreate(ctx, volume.CreateOptions{
147 Name: nixVolume(wid),
148 Driver: "local",
149 })
150 if err != nil {
151 return err
152 }
153 e.registerCleanup(wid, func(ctx context.Context) error {
154 return e.docker.VolumeRemove(ctx, nixVolume(wid), true)
155 })
156
157 _, err = e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{
158 Driver: "bridge",
159 })
160 if err != nil {
161 return err
162 }
163 e.registerCleanup(wid, func(ctx context.Context) error {
164 return e.docker.NetworkRemove(ctx, networkName(wid))
165 })
166
167 return nil
168}
169
170// StartSteps starts all steps sequentially with the same base image.
171// ONLY marks pipeline as failed if container's exit code is non-zero.
172// All other errors are bubbled up.
173// Fixed version of the step execution logic
174func (e *Engine) StartSteps(ctx context.Context, steps []models.Step, wid models.WorkflowId, image string) error {
175 stepTimeoutStr := e.cfg.Pipelines.StepTimeout
176 stepTimeout, err := time.ParseDuration(stepTimeoutStr)
177 if err != nil {
178 e.l.Error("failed to parse step timeout", "error", err, "timeout", stepTimeoutStr)
179 stepTimeout = 5 * time.Minute
180 }
181 e.l.Info("using step timeout", "timeout", stepTimeout)
182
183 for stepIdx, step := range steps {
184 envs := ConstructEnvs(step.Environment)
185 envs.AddEnv("HOME", workspaceDir)
186 e.l.Debug("envs for step", "step", step.Name, "envs", envs.Slice())
187
188 hostConfig := hostConfig(wid)
189 resp, err := e.docker.ContainerCreate(ctx, &container.Config{
190 Image: image,
191 Cmd: []string{"bash", "-c", step.Command},
192 WorkingDir: workspaceDir,
193 Tty: false,
194 Hostname: "spindle",
195 Env: envs.Slice(),
196 }, hostConfig, nil, nil, "")
197 if err != nil {
198 return fmt.Errorf("creating container: %w", err)
199 }
200
201 err = e.docker.NetworkConnect(ctx, networkName(wid), resp.ID, nil)
202 if err != nil {
203 return fmt.Errorf("connecting network: %w", err)
204 }
205
206 stepCtx, stepCancel := context.WithTimeout(ctx, stepTimeout)
207
208 err = e.docker.ContainerStart(stepCtx, resp.ID, container.StartOptions{})
209 if err != nil {
210 stepCancel()
211 return err
212 }
213 e.l.Info("started container", "name", resp.ID, "step", step.Name)
214
215 // start tailing logs in background
216 tailDone := make(chan error, 1)
217 go func() {
218 tailDone <- e.TailStep(stepCtx, resp.ID, wid, stepIdx)
219 }()
220
221 // wait for container completion or timeout
222 waitDone := make(chan struct{})
223 var state *container.State
224 var waitErr error
225
226 go func() {
227 defer close(waitDone)
228 state, waitErr = e.WaitStep(stepCtx, resp.ID)
229 }()
230
231 select {
232 case <-waitDone:
233
234 // wait for tailing to complete
235 <-tailDone
236 stepCancel()
237
238 case <-stepCtx.Done():
239 e.l.Warn("step timed out; killing container", "container", resp.ID, "timeout", stepTimeout)
240
241 _ = e.DestroyStep(ctx, resp.ID)
242
243 // wait for both goroutines to finish
244 <-waitDone
245 <-tailDone
246
247 stepCancel()
248 return fmt.Errorf("step timed out after %v", stepTimeout)
249 }
250
251 if waitErr != nil {
252 return waitErr
253 }
254
255 err = e.DestroyStep(ctx, resp.ID)
256 if err != nil {
257 return err
258 }
259
260 if state.ExitCode != 0 {
261 e.l.Error("workflow failed!", "workflow_id", wid.String(), "error", state.Error, "exit_code", state.ExitCode, "oom_killed", state.OOMKilled)
262 return fmt.Errorf("error: %s, exit code: %d, oom: %t", state.Error, state.ExitCode, state.OOMKilled)
263 }
264 }
265
266 return nil
267}
268
269func (e *Engine) WaitStep(ctx context.Context, containerID string) (*container.State, error) {
270 wait, errCh := e.docker.ContainerWait(ctx, containerID, container.WaitConditionNotRunning)
271 select {
272 case err := <-errCh:
273 if err != nil {
274 return nil, err
275 }
276 case <-wait:
277 }
278
279 e.l.Info("waited for container", "name", containerID)
280
281 info, err := e.docker.ContainerInspect(ctx, containerID)
282 if err != nil {
283 return nil, err
284 }
285
286 return info.State, nil
287}
288
289func (e *Engine) TailStep(ctx context.Context, containerID string, wid models.WorkflowId, stepIdx int) error {
290 logs, err := e.docker.ContainerLogs(ctx, containerID, container.LogsOptions{
291 Follow: true,
292 ShowStdout: true,
293 ShowStderr: true,
294 Details: false,
295 Timestamps: false,
296 })
297 if err != nil {
298 return err
299 }
300
301 wfLogger, err := NewWorkflowLogger(e.cfg.Pipelines.LogDir, wid)
302 if err != nil {
303 e.l.Warn("failed to setup step logger; logs will not be persisted", "error", err)
304 return err
305 }
306 defer wfLogger.Close()
307
308 _, err = stdcopy.StdCopy(wfLogger.Stdout(), wfLogger.Stderr(), logs)
309 if err != nil && err != io.EOF && !errors.Is(err, context.DeadlineExceeded) {
310 return fmt.Errorf("failed to copy logs: %w", err)
311 }
312
313 return nil
314}
315
316func (e *Engine) DestroyStep(ctx context.Context, containerID string) error {
317 err := e.docker.ContainerKill(ctx, containerID, "9") // SIGKILL
318 if err != nil && !isErrContainerNotFoundOrNotRunning(err) {
319 return err
320 }
321
322 if err := e.docker.ContainerRemove(ctx, containerID, container.RemoveOptions{
323 RemoveVolumes: true,
324 RemoveLinks: false,
325 Force: false,
326 }); err != nil && !isErrContainerNotFoundOrNotRunning(err) {
327 return err
328 }
329
330 return nil
331}
332
333func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error {
334 e.cleanupMu.Lock()
335 key := wid.String()
336
337 fns := e.cleanup[key]
338 delete(e.cleanup, key)
339 e.cleanupMu.Unlock()
340
341 for _, fn := range fns {
342 if err := fn(ctx); err != nil {
343 e.l.Error("failed to cleanup workflow resource", "workflowId", wid, "error", err)
344 }
345 }
346 return nil
347}
348
349func (e *Engine) registerCleanup(wid models.WorkflowId, fn cleanupFunc) {
350 e.cleanupMu.Lock()
351 defer e.cleanupMu.Unlock()
352
353 key := wid.String()
354 e.cleanup[key] = append(e.cleanup[key], fn)
355}
356
357func workspaceVolume(wid models.WorkflowId) string {
358 return fmt.Sprintf("workspace-%s", wid)
359}
360
361func nixVolume(wid models.WorkflowId) string {
362 return fmt.Sprintf("nix-%s", wid)
363}
364
365func networkName(wid models.WorkflowId) string {
366 return fmt.Sprintf("workflow-network-%s", wid)
367}
368
369func hostConfig(wid models.WorkflowId) *container.HostConfig {
370 hostConfig := &container.HostConfig{
371 Mounts: []mount.Mount{
372 {
373 Type: mount.TypeVolume,
374 Source: workspaceVolume(wid),
375 Target: workspaceDir,
376 },
377 {
378 Type: mount.TypeVolume,
379 Source: nixVolume(wid),
380 Target: "/nix",
381 },
382 {
383 Type: mount.TypeTmpfs,
384 Target: "/tmp",
385 ReadOnly: false,
386 TmpfsOptions: &mount.TmpfsOptions{
387 Mode: 0o1777, // world-writeable sticky bit
388 },
389 },
390 {
391 Type: mount.TypeVolume,
392 Source: "etc-nix-" + wid.String(),
393 Target: "/etc/nix",
394 },
395 },
396 ReadonlyRootfs: false,
397 CapDrop: []string{"ALL"},
398 CapAdd: []string{"CAP_DAC_OVERRIDE"},
399 SecurityOpt: []string{"no-new-privileges"},
400 ExtraHosts: []string{"host.docker.internal:host-gateway"},
401 }
402
403 return hostConfig
404}
405
406// thanks woodpecker
407func isErrContainerNotFoundOrNotRunning(err error) bool {
408 // Error response from daemon: Cannot kill container: ...: No such container: ...
409 // Error response from daemon: Cannot kill container: ...: Container ... is not running"
410 // Error response from podman daemon: can only kill running containers. ... is in state exited
411 // Error: No such container: ...
412 return err != nil && (strings.Contains(err.Error(), "No such container") || strings.Contains(err.Error(), "is not running") || strings.Contains(err.Error(), "can only kill running containers"))
413}