this repo has no description
1package nixery
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "io"
8 "log/slog"
9 "os"
10 "path"
11 "runtime"
12 "strings"
13 "sync"
14 "time"
15
16 "github.com/docker/docker/api/types/container"
17 "github.com/docker/docker/api/types/image"
18 "github.com/docker/docker/api/types/mount"
19 "github.com/docker/docker/api/types/network"
20 "github.com/docker/docker/api/types/volume"
21 "github.com/docker/docker/client"
22 "github.com/docker/docker/pkg/stdcopy"
23 "gopkg.in/yaml.v3"
24 "tangled.sh/tangled.sh/core/api/tangled"
25 "tangled.sh/tangled.sh/core/log"
26 "tangled.sh/tangled.sh/core/spindle/config"
27 "tangled.sh/tangled.sh/core/spindle/engine"
28 "tangled.sh/tangled.sh/core/spindle/models"
29 "tangled.sh/tangled.sh/core/spindle/secrets"
30)
31
32const (
33 workspaceDir = "/tangled/workspace"
34)
35
36type cleanupFunc func(context.Context) error
37
38type Engine struct {
39 docker client.APIClient
40 l *slog.Logger
41 cfg *config.Config
42
43 cleanupMu sync.Mutex
44 cleanup map[string][]cleanupFunc
45}
46
47type Step struct {
48 name string
49 kind models.StepKind
50 command string
51 environment map[string]string
52}
53
54func (s Step) Name() string {
55 return s.name
56}
57
58func (s Step) Command() string {
59 return s.command
60}
61
62func (s Step) Kind() models.StepKind {
63 return s.kind
64}
65
66// setupSteps get added to start of Steps
67type setupSteps []models.Step
68
69// addStep adds a step to the beginning of the workflow's steps.
70func (ss *setupSteps) addStep(step models.Step) {
71 *ss = append(*ss, step)
72}
73
74type addlFields struct {
75 image string
76 env map[string]string
77}
78
79func (e *Engine) InitWorkflow(twf tangled.Pipeline_Workflow, tpl tangled.Pipeline) (*models.Workflow, error) {
80 swf := &models.Workflow{}
81 addl := addlFields{}
82
83 dwf := &struct {
84 Steps []struct {
85 Command string `yaml:"command"`
86 Name string `yaml:"name"`
87 Environment map[string]string `yaml:"environment"`
88 } `yaml:"steps"`
89 Dependencies map[string][]string `yaml:"dependencies"`
90 Environment map[string]string `yaml:"environment"`
91 }{}
92 err := yaml.Unmarshal([]byte(twf.Raw), &dwf)
93 if err != nil {
94 return nil, err
95 }
96
97 for _, dstep := range dwf.Steps {
98 sstep := Step{}
99 sstep.environment = dstep.Environment
100 sstep.command = dstep.Command
101 sstep.name = dstep.Name
102 sstep.kind = models.StepKindUser
103 swf.Steps = append(swf.Steps, sstep)
104 }
105 swf.Name = twf.Name
106 addl.env = dwf.Environment
107 addl.image = workflowImage(dwf.Dependencies, e.cfg.NixeryPipelines.Nixery)
108
109 setup := &setupSteps{}
110
111 setup.addStep(nixConfStep())
112 setup.addStep(cloneStep(twf, *tpl.TriggerMetadata, e.cfg.Server.Dev))
113 // this step could be empty
114 if s := dependencyStep(dwf.Dependencies); s != nil {
115 setup.addStep(*s)
116 }
117
118 // append setup steps in order to the start of workflow steps
119 swf.Steps = append(*setup, swf.Steps...)
120 swf.Data = addl
121
122 return swf, nil
123}
124
125func (e *Engine) WorkflowTimeout() time.Duration {
126 workflowTimeoutStr := e.cfg.NixeryPipelines.WorkflowTimeout
127 workflowTimeout, err := time.ParseDuration(workflowTimeoutStr)
128 if err != nil {
129 e.l.Error("failed to parse workflow timeout", "error", err, "timeout", workflowTimeoutStr)
130 workflowTimeout = 5 * time.Minute
131 }
132
133 return workflowTimeout
134}
135
136func workflowImage(deps map[string][]string, nixery string) string {
137 var dependencies string
138 for reg, ds := range deps {
139 if reg == "nixpkgs" {
140 dependencies = path.Join(ds...)
141 }
142 }
143
144 // load defaults from somewhere else
145 dependencies = path.Join(dependencies, "bash", "git", "coreutils", "nix")
146
147 if runtime.GOARCH == "arm64" {
148 dependencies = path.Join("arm64", dependencies)
149 }
150
151 return path.Join(nixery, dependencies)
152}
153
154func New(ctx context.Context, cfg *config.Config) (*Engine, error) {
155 dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
156 if err != nil {
157 return nil, err
158 }
159
160 l := log.FromContext(ctx).With("component", "spindle")
161
162 e := &Engine{
163 docker: dcli,
164 l: l,
165 cfg: cfg,
166 }
167
168 e.cleanup = make(map[string][]cleanupFunc)
169
170 return e, nil
171}
172
173// SetupWorkflow sets up a new network for the workflow and volumes for
174// the workspace and Nix store. These are persisted across steps and are
175// destroyed at the end of the workflow.
176func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId, wf *models.Workflow) error {
177 e.l.Info("setting up workflow", "workflow", wid)
178
179 _, err := e.docker.VolumeCreate(ctx, volume.CreateOptions{
180 Name: workspaceVolume(wid),
181 Driver: "local",
182 })
183 if err != nil {
184 return err
185 }
186 e.registerCleanup(wid, func(ctx context.Context) error {
187 return e.docker.VolumeRemove(ctx, workspaceVolume(wid), true)
188 })
189
190 _, err = e.docker.VolumeCreate(ctx, volume.CreateOptions{
191 Name: nixVolume(wid),
192 Driver: "local",
193 })
194 if err != nil {
195 return err
196 }
197 e.registerCleanup(wid, func(ctx context.Context) error {
198 return e.docker.VolumeRemove(ctx, nixVolume(wid), true)
199 })
200
201 _, err = e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{
202 Driver: "bridge",
203 })
204 if err != nil {
205 return err
206 }
207 e.registerCleanup(wid, func(ctx context.Context) error {
208 return e.docker.NetworkRemove(ctx, networkName(wid))
209 })
210
211 addl := wf.Data.(addlFields)
212
213 reader, err := e.docker.ImagePull(ctx, addl.image, image.PullOptions{})
214 if err != nil {
215 e.l.Error("pipeline image pull failed!", "image", addl.image, "workflowId", wid, "error", err.Error())
216
217 return fmt.Errorf("pulling image: %w", err)
218 }
219 defer reader.Close()
220 io.Copy(os.Stdout, reader)
221
222 return nil
223}
224
225func (e *Engine) RunStep(ctx context.Context, wid models.WorkflowId, w *models.Workflow, idx int, secrets []secrets.UnlockedSecret, wfLogger *models.WorkflowLogger) error {
226 workflowEnvs := ConstructEnvs(w.Data.(addlFields).env)
227 for _, s := range secrets {
228 workflowEnvs.AddEnv(s.Key, s.Value)
229 }
230
231 step := w.Steps[idx].(Step)
232
233 select {
234 case <-ctx.Done():
235 return ctx.Err()
236 default:
237 }
238
239 envs := append(EnvVars(nil), workflowEnvs...)
240 for k, v := range step.environment {
241 envs.AddEnv(k, v)
242 }
243 envs.AddEnv("HOME", workspaceDir)
244 e.l.Debug("envs for step", "step", step.Name, "envs", envs.Slice())
245
246 hostConfig := hostConfig(wid)
247 resp, err := e.docker.ContainerCreate(ctx, &container.Config{
248 Image: w.Data.(addlFields).image,
249 Cmd: []string{"bash", "-c", step.command},
250 WorkingDir: workspaceDir,
251 Tty: false,
252 Hostname: "spindle",
253 Env: envs.Slice(),
254 }, hostConfig, nil, nil, "")
255 defer e.DestroyStep(ctx, resp.ID)
256 if err != nil {
257 return fmt.Errorf("creating container: %w", err)
258 }
259
260 err = e.docker.NetworkConnect(ctx, networkName(wid), resp.ID, nil)
261 if err != nil {
262 return fmt.Errorf("connecting network: %w", err)
263 }
264
265 err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{})
266 if err != nil {
267 return err
268 }
269 e.l.Info("started container", "name", resp.ID, "step", step.Name)
270
271 // start tailing logs in background
272 tailDone := make(chan error, 1)
273 go func() {
274 tailDone <- e.tailStep(ctx, wfLogger, resp.ID, wid, idx, step)
275 }()
276
277 // wait for container completion or timeout
278 waitDone := make(chan struct{})
279 var state *container.State
280 var waitErr error
281
282 go func() {
283 defer close(waitDone)
284 state, waitErr = e.WaitStep(ctx, resp.ID)
285 }()
286
287 select {
288 case <-waitDone:
289
290 // wait for tailing to complete
291 <-tailDone
292
293 case <-ctx.Done():
294 e.l.Warn("step timed out; killing container", "container", resp.ID, "step", step.Name)
295 err = e.DestroyStep(context.Background(), resp.ID)
296 if err != nil {
297 e.l.Error("failed to destroy step", "container", resp.ID, "error", err)
298 }
299
300 // wait for both goroutines to finish
301 <-waitDone
302 <-tailDone
303
304 return engine.ErrTimedOut
305 }
306
307 select {
308 case <-ctx.Done():
309 return ctx.Err()
310 default:
311 }
312
313 if waitErr != nil {
314 return waitErr
315 }
316
317 err = e.DestroyStep(ctx, resp.ID)
318 if err != nil {
319 return err
320 }
321
322 if state.ExitCode != 0 {
323 e.l.Error("workflow failed!", "workflow_id", wid.String(), "error", state.Error, "exit_code", state.ExitCode, "oom_killed", state.OOMKilled)
324 if state.OOMKilled {
325 return ErrOOMKilled
326 }
327 return engine.ErrWorkflowFailed
328 }
329
330 return nil
331}
332
333func (e *Engine) WaitStep(ctx context.Context, containerID string) (*container.State, error) {
334 wait, errCh := e.docker.ContainerWait(ctx, containerID, container.WaitConditionNotRunning)
335 select {
336 case err := <-errCh:
337 if err != nil {
338 return nil, err
339 }
340 case <-wait:
341 }
342
343 e.l.Info("waited for container", "name", containerID)
344
345 info, err := e.docker.ContainerInspect(ctx, containerID)
346 if err != nil {
347 return nil, err
348 }
349
350 return info.State, nil
351}
352
353func (e *Engine) tailStep(ctx context.Context, wfLogger *models.WorkflowLogger, containerID string, wid models.WorkflowId, stepIdx int, step models.Step) error {
354 if wfLogger == nil {
355 return nil
356 }
357
358 logs, err := e.docker.ContainerLogs(ctx, containerID, container.LogsOptions{
359 Follow: true,
360 ShowStdout: true,
361 ShowStderr: true,
362 Details: false,
363 Timestamps: false,
364 })
365 if err != nil {
366 return err
367 }
368
369 _, err = stdcopy.StdCopy(
370 wfLogger.DataWriter("stdout"),
371 wfLogger.DataWriter("stderr"),
372 logs,
373 )
374 if err != nil && err != io.EOF && !errors.Is(err, context.DeadlineExceeded) {
375 return fmt.Errorf("failed to copy logs: %w", err)
376 }
377
378 return nil
379}
380
381func (e *Engine) DestroyStep(ctx context.Context, containerID string) error {
382 err := e.docker.ContainerKill(ctx, containerID, "9") // SIGKILL
383 if err != nil && !isErrContainerNotFoundOrNotRunning(err) {
384 return err
385 }
386
387 if err := e.docker.ContainerRemove(ctx, containerID, container.RemoveOptions{
388 RemoveVolumes: true,
389 RemoveLinks: false,
390 Force: false,
391 }); err != nil && !isErrContainerNotFoundOrNotRunning(err) {
392 return err
393 }
394
395 return nil
396}
397
398func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error {
399 e.cleanupMu.Lock()
400 key := wid.String()
401
402 fns := e.cleanup[key]
403 delete(e.cleanup, key)
404 e.cleanupMu.Unlock()
405
406 for _, fn := range fns {
407 if err := fn(ctx); err != nil {
408 e.l.Error("failed to cleanup workflow resource", "workflowId", wid, "error", err)
409 }
410 }
411 return nil
412}
413
414func (e *Engine) registerCleanup(wid models.WorkflowId, fn cleanupFunc) {
415 e.cleanupMu.Lock()
416 defer e.cleanupMu.Unlock()
417
418 key := wid.String()
419 e.cleanup[key] = append(e.cleanup[key], fn)
420}
421
422func workspaceVolume(wid models.WorkflowId) string {
423 return fmt.Sprintf("workspace-%s", wid)
424}
425
426func nixVolume(wid models.WorkflowId) string {
427 return fmt.Sprintf("nix-%s", wid)
428}
429
430func networkName(wid models.WorkflowId) string {
431 return fmt.Sprintf("workflow-network-%s", wid)
432}
433
434func hostConfig(wid models.WorkflowId) *container.HostConfig {
435 hostConfig := &container.HostConfig{
436 Mounts: []mount.Mount{
437 {
438 Type: mount.TypeVolume,
439 Source: workspaceVolume(wid),
440 Target: workspaceDir,
441 },
442 {
443 Type: mount.TypeVolume,
444 Source: nixVolume(wid),
445 Target: "/nix",
446 },
447 {
448 Type: mount.TypeTmpfs,
449 Target: "/tmp",
450 ReadOnly: false,
451 TmpfsOptions: &mount.TmpfsOptions{
452 Mode: 0o1777, // world-writeable sticky bit
453 Options: [][]string{
454 {"exec"},
455 },
456 },
457 },
458 {
459 Type: mount.TypeVolume,
460 Source: "etc-nix-" + wid.String(),
461 Target: "/etc/nix",
462 },
463 },
464 ReadonlyRootfs: false,
465 CapDrop: []string{"ALL"},
466 CapAdd: []string{"CAP_DAC_OVERRIDE"},
467 SecurityOpt: []string{"no-new-privileges"},
468 ExtraHosts: []string{"host.docker.internal:host-gateway"},
469 }
470
471 return hostConfig
472}
473
474// thanks woodpecker
475func isErrContainerNotFoundOrNotRunning(err error) bool {
476 // Error response from daemon: Cannot kill container: ...: No such container: ...
477 // Error response from daemon: Cannot kill container: ...: Container ... is not running"
478 // Error response from podman daemon: can only kill running containers. ... is in state exited
479 // Error: No such container: ...
480 return err != nil && (strings.Contains(err.Error(), "No such container") || strings.Contains(err.Error(), "is not running") || strings.Contains(err.Error(), "can only kill running containers"))
481}