this repo has no description
1package nixery
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "io"
8 "log/slog"
9 "os"
10 "path"
11 "strings"
12 "sync"
13 "time"
14
15 "github.com/docker/docker/api/types/container"
16 "github.com/docker/docker/api/types/image"
17 "github.com/docker/docker/api/types/mount"
18 "github.com/docker/docker/api/types/network"
19 "github.com/docker/docker/api/types/volume"
20 "github.com/docker/docker/client"
21 "github.com/docker/docker/pkg/stdcopy"
22 "gopkg.in/yaml.v3"
23 "tangled.sh/tangled.sh/core/api/tangled"
24 "tangled.sh/tangled.sh/core/log"
25 "tangled.sh/tangled.sh/core/spindle/config"
26 "tangled.sh/tangled.sh/core/spindle/engine"
27 "tangled.sh/tangled.sh/core/spindle/models"
28 "tangled.sh/tangled.sh/core/spindle/secrets"
29)
30
31const (
32 workspaceDir = "/tangled/workspace"
33)
34
35type cleanupFunc func(context.Context) error
36
37type Engine struct {
38 docker client.APIClient
39 l *slog.Logger
40 cfg *config.Config
41
42 cleanupMu sync.Mutex
43 cleanup map[string][]cleanupFunc
44}
45
46type Step struct {
47 name string
48 kind models.StepKind
49 command string
50 environment map[string]string
51}
52
53func (s Step) Name() string {
54 return s.name
55}
56
57func (s Step) Command() string {
58 return s.command
59}
60
61func (s Step) Kind() models.StepKind {
62 return s.kind
63}
64
65// setupSteps get added to start of Steps
66type setupSteps []models.Step
67
68// addStep adds a step to the beginning of the workflow's steps.
69func (ss *setupSteps) addStep(step models.Step) {
70 *ss = append(*ss, step)
71}
72
73type addlFields struct {
74 image string
75 env map[string]string
76}
77
78func (e *Engine) InitWorkflow(twf tangled.Pipeline_Workflow, tpl tangled.Pipeline) (*models.Workflow, error) {
79 swf := &models.Workflow{}
80 addl := addlFields{}
81
82 dwf := &struct {
83 Steps []struct {
84 Command string `yaml:"command"`
85 Name string `yaml:"name"`
86 Environment map[string]string `yaml:"environment"`
87 } `yaml:"steps"`
88 Dependencies map[string][]string `yaml:"dependencies"`
89 Environment map[string]string `yaml:"environment"`
90 }{}
91 err := yaml.Unmarshal([]byte(twf.Raw), &dwf)
92 if err != nil {
93 return nil, err
94 }
95
96 for _, dstep := range dwf.Steps {
97 sstep := Step{}
98 sstep.environment = dstep.Environment
99 sstep.command = dstep.Command
100 sstep.name = dstep.Name
101 sstep.kind = models.StepKindUser
102 swf.Steps = append(swf.Steps, sstep)
103 }
104 swf.Name = twf.Name
105 addl.env = dwf.Environment
106 addl.image = workflowImage(dwf.Dependencies, e.cfg.NixeryPipelines.Nixery)
107
108 setup := &setupSteps{}
109
110 setup.addStep(nixConfStep())
111 setup.addStep(cloneStep(twf, *tpl.TriggerMetadata, e.cfg.Server.Dev))
112 // this step could be empty
113 if s := dependencyStep(dwf.Dependencies); s != nil {
114 setup.addStep(*s)
115 }
116
117 // append setup steps in order to the start of workflow steps
118 swf.Steps = append(*setup, swf.Steps...)
119 swf.Data = addl
120
121 return swf, nil
122}
123
124func (e *Engine) WorkflowTimeout() time.Duration {
125 workflowTimeoutStr := e.cfg.NixeryPipelines.WorkflowTimeout
126 workflowTimeout, err := time.ParseDuration(workflowTimeoutStr)
127 if err != nil {
128 e.l.Error("failed to parse workflow timeout", "error", err, "timeout", workflowTimeoutStr)
129 workflowTimeout = 5 * time.Minute
130 }
131
132 return workflowTimeout
133}
134
135func workflowImage(deps map[string][]string, nixery string) string {
136 var dependencies string
137 for reg, ds := range deps {
138 if reg == "nixpkgs" {
139 dependencies = path.Join(ds...)
140 }
141 }
142
143 // load defaults from somewhere else
144 dependencies = path.Join(dependencies, "bash", "git", "coreutils", "nix")
145
146 return path.Join(nixery, dependencies)
147}
148
149func New(ctx context.Context, cfg *config.Config) (*Engine, error) {
150 dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
151 if err != nil {
152 return nil, err
153 }
154
155 l := log.FromContext(ctx).With("component", "spindle")
156
157 e := &Engine{
158 docker: dcli,
159 l: l,
160 cfg: cfg,
161 }
162
163 e.cleanup = make(map[string][]cleanupFunc)
164
165 return e, nil
166}
167
168// SetupWorkflow sets up a new network for the workflow and volumes for
169// the workspace and Nix store. These are persisted across steps and are
170// destroyed at the end of the workflow.
171func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId, wf *models.Workflow) error {
172 e.l.Info("setting up workflow", "workflow", wid)
173
174 _, err := e.docker.VolumeCreate(ctx, volume.CreateOptions{
175 Name: workspaceVolume(wid),
176 Driver: "local",
177 })
178 if err != nil {
179 return err
180 }
181 e.registerCleanup(wid, func(ctx context.Context) error {
182 return e.docker.VolumeRemove(ctx, workspaceVolume(wid), true)
183 })
184
185 _, err = e.docker.VolumeCreate(ctx, volume.CreateOptions{
186 Name: nixVolume(wid),
187 Driver: "local",
188 })
189 if err != nil {
190 return err
191 }
192 e.registerCleanup(wid, func(ctx context.Context) error {
193 return e.docker.VolumeRemove(ctx, nixVolume(wid), true)
194 })
195
196 _, err = e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{
197 Driver: "bridge",
198 })
199 if err != nil {
200 return err
201 }
202 e.registerCleanup(wid, func(ctx context.Context) error {
203 return e.docker.NetworkRemove(ctx, networkName(wid))
204 })
205
206 addl := wf.Data.(addlFields)
207
208 reader, err := e.docker.ImagePull(ctx, addl.image, image.PullOptions{})
209 if err != nil {
210 e.l.Error("pipeline image pull failed!", "image", addl.image, "workflowId", wid, "error", err.Error())
211
212 return fmt.Errorf("pulling image: %w", err)
213 }
214 defer reader.Close()
215 io.Copy(os.Stdout, reader)
216
217 return nil
218}
219
220func (e *Engine) RunStep(ctx context.Context, wid models.WorkflowId, w *models.Workflow, idx int, secrets []secrets.UnlockedSecret, wfLogger *models.WorkflowLogger) error {
221 workflowEnvs := ConstructEnvs(w.Data.(addlFields).env)
222 for _, s := range secrets {
223 workflowEnvs.AddEnv(s.Key, s.Value)
224 }
225
226 step := w.Steps[idx].(Step)
227
228 select {
229 case <-ctx.Done():
230 return ctx.Err()
231 default:
232 }
233
234 envs := append(EnvVars(nil), workflowEnvs...)
235 for k, v := range step.environment {
236 envs.AddEnv(k, v)
237 }
238 envs.AddEnv("HOME", workspaceDir)
239 e.l.Debug("envs for step", "step", step.Name, "envs", envs.Slice())
240
241 hostConfig := hostConfig(wid)
242 resp, err := e.docker.ContainerCreate(ctx, &container.Config{
243 Image: w.Data.(addlFields).image,
244 Cmd: []string{"bash", "-c", step.command},
245 WorkingDir: workspaceDir,
246 Tty: false,
247 Hostname: "spindle",
248 Env: envs.Slice(),
249 }, hostConfig, nil, nil, "")
250 defer e.DestroyStep(ctx, resp.ID)
251 if err != nil {
252 return fmt.Errorf("creating container: %w", err)
253 }
254
255 err = e.docker.NetworkConnect(ctx, networkName(wid), resp.ID, nil)
256 if err != nil {
257 return fmt.Errorf("connecting network: %w", err)
258 }
259
260 err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{})
261 if err != nil {
262 return err
263 }
264 e.l.Info("started container", "name", resp.ID, "step", step.Name)
265
266 // start tailing logs in background
267 tailDone := make(chan error, 1)
268 go func() {
269 tailDone <- e.tailStep(ctx, wfLogger, resp.ID, wid, idx, step)
270 }()
271
272 // wait for container completion or timeout
273 waitDone := make(chan struct{})
274 var state *container.State
275 var waitErr error
276
277 go func() {
278 defer close(waitDone)
279 state, waitErr = e.WaitStep(ctx, resp.ID)
280 }()
281
282 select {
283 case <-waitDone:
284
285 // wait for tailing to complete
286 <-tailDone
287
288 case <-ctx.Done():
289 e.l.Warn("step timed out; killing container", "container", resp.ID, "step", step.Name)
290 err = e.DestroyStep(context.Background(), resp.ID)
291 if err != nil {
292 e.l.Error("failed to destroy step", "container", resp.ID, "error", err)
293 }
294
295 // wait for both goroutines to finish
296 <-waitDone
297 <-tailDone
298
299 return engine.ErrTimedOut
300 }
301
302 select {
303 case <-ctx.Done():
304 return ctx.Err()
305 default:
306 }
307
308 if waitErr != nil {
309 return waitErr
310 }
311
312 err = e.DestroyStep(ctx, resp.ID)
313 if err != nil {
314 return err
315 }
316
317 if state.ExitCode != 0 {
318 e.l.Error("workflow failed!", "workflow_id", wid.String(), "error", state.Error, "exit_code", state.ExitCode, "oom_killed", state.OOMKilled)
319 if state.OOMKilled {
320 return ErrOOMKilled
321 }
322 return engine.ErrWorkflowFailed
323 }
324
325 return nil
326}
327
328func (e *Engine) WaitStep(ctx context.Context, containerID string) (*container.State, error) {
329 wait, errCh := e.docker.ContainerWait(ctx, containerID, container.WaitConditionNotRunning)
330 select {
331 case err := <-errCh:
332 if err != nil {
333 return nil, err
334 }
335 case <-wait:
336 }
337
338 e.l.Info("waited for container", "name", containerID)
339
340 info, err := e.docker.ContainerInspect(ctx, containerID)
341 if err != nil {
342 return nil, err
343 }
344
345 return info.State, nil
346}
347
348func (e *Engine) tailStep(ctx context.Context, wfLogger *models.WorkflowLogger, containerID string, wid models.WorkflowId, stepIdx int, step models.Step) error {
349 if wfLogger == nil {
350 return nil
351 }
352
353 logs, err := e.docker.ContainerLogs(ctx, containerID, container.LogsOptions{
354 Follow: true,
355 ShowStdout: true,
356 ShowStderr: true,
357 Details: false,
358 Timestamps: false,
359 })
360 if err != nil {
361 return err
362 }
363
364 _, err = stdcopy.StdCopy(
365 wfLogger.DataWriter("stdout"),
366 wfLogger.DataWriter("stderr"),
367 logs,
368 )
369 if err != nil && err != io.EOF && !errors.Is(err, context.DeadlineExceeded) {
370 return fmt.Errorf("failed to copy logs: %w", err)
371 }
372
373 return nil
374}
375
376func (e *Engine) DestroyStep(ctx context.Context, containerID string) error {
377 err := e.docker.ContainerKill(ctx, containerID, "9") // SIGKILL
378 if err != nil && !isErrContainerNotFoundOrNotRunning(err) {
379 return err
380 }
381
382 if err := e.docker.ContainerRemove(ctx, containerID, container.RemoveOptions{
383 RemoveVolumes: true,
384 RemoveLinks: false,
385 Force: false,
386 }); err != nil && !isErrContainerNotFoundOrNotRunning(err) {
387 return err
388 }
389
390 return nil
391}
392
393func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error {
394 e.cleanupMu.Lock()
395 key := wid.String()
396
397 fns := e.cleanup[key]
398 delete(e.cleanup, key)
399 e.cleanupMu.Unlock()
400
401 for _, fn := range fns {
402 if err := fn(ctx); err != nil {
403 e.l.Error("failed to cleanup workflow resource", "workflowId", wid, "error", err)
404 }
405 }
406 return nil
407}
408
409func (e *Engine) registerCleanup(wid models.WorkflowId, fn cleanupFunc) {
410 e.cleanupMu.Lock()
411 defer e.cleanupMu.Unlock()
412
413 key := wid.String()
414 e.cleanup[key] = append(e.cleanup[key], fn)
415}
416
417func workspaceVolume(wid models.WorkflowId) string {
418 return fmt.Sprintf("workspace-%s", wid)
419}
420
421func nixVolume(wid models.WorkflowId) string {
422 return fmt.Sprintf("nix-%s", wid)
423}
424
425func networkName(wid models.WorkflowId) string {
426 return fmt.Sprintf("workflow-network-%s", wid)
427}
428
429func hostConfig(wid models.WorkflowId) *container.HostConfig {
430 hostConfig := &container.HostConfig{
431 Mounts: []mount.Mount{
432 {
433 Type: mount.TypeVolume,
434 Source: workspaceVolume(wid),
435 Target: workspaceDir,
436 },
437 {
438 Type: mount.TypeVolume,
439 Source: nixVolume(wid),
440 Target: "/nix",
441 },
442 {
443 Type: mount.TypeTmpfs,
444 Target: "/tmp",
445 ReadOnly: false,
446 TmpfsOptions: &mount.TmpfsOptions{
447 Mode: 0o1777, // world-writeable sticky bit
448 Options: [][]string{
449 {"exec"},
450 },
451 },
452 },
453 {
454 Type: mount.TypeVolume,
455 Source: "etc-nix-" + wid.String(),
456 Target: "/etc/nix",
457 },
458 },
459 ReadonlyRootfs: false,
460 CapDrop: []string{"ALL"},
461 CapAdd: []string{"CAP_DAC_OVERRIDE"},
462 SecurityOpt: []string{"no-new-privileges"},
463 ExtraHosts: []string{"host.docker.internal:host-gateway"},
464 }
465
466 return hostConfig
467}
468
469// thanks woodpecker
470func isErrContainerNotFoundOrNotRunning(err error) bool {
471 // Error response from daemon: Cannot kill container: ...: No such container: ...
472 // Error response from daemon: Cannot kill container: ...: Container ... is not running"
473 // Error response from podman daemon: can only kill running containers. ... is in state exited
474 // Error: No such container: ...
475 return err != nil && (strings.Contains(err.Error(), "No such container") || strings.Contains(err.Error(), "is not running") || strings.Contains(err.Error(), "can only kill running containers"))
476}