this repo has no description
1package nixery
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "io"
8 "log/slog"
9 "os"
10 "path"
11 "runtime"
12 "sync"
13 "time"
14
15 "github.com/docker/docker/api/types/container"
16 "github.com/docker/docker/api/types/image"
17 "github.com/docker/docker/api/types/mount"
18 "github.com/docker/docker/api/types/network"
19 "github.com/docker/docker/client"
20 "github.com/docker/docker/pkg/stdcopy"
21 "gopkg.in/yaml.v3"
22 "tangled.sh/tangled.sh/core/api/tangled"
23 "tangled.sh/tangled.sh/core/log"
24 "tangled.sh/tangled.sh/core/spindle/config"
25 "tangled.sh/tangled.sh/core/spindle/engine"
26 "tangled.sh/tangled.sh/core/spindle/models"
27 "tangled.sh/tangled.sh/core/spindle/secrets"
28)
29
30const (
31 workspaceDir = "/tangled/workspace"
32 homeDir = "/tangled/home"
33)
34
35type cleanupFunc func(context.Context) error
36
37type Engine struct {
38 docker client.APIClient
39 l *slog.Logger
40 cfg *config.Config
41
42 cleanupMu sync.Mutex
43 cleanup map[string][]cleanupFunc
44}
45
46type Step struct {
47 name string
48 kind models.StepKind
49 command string
50 environment map[string]string
51}
52
53func (s Step) Name() string {
54 return s.name
55}
56
57func (s Step) Command() string {
58 return s.command
59}
60
61func (s Step) Kind() models.StepKind {
62 return s.kind
63}
64
65// setupSteps get added to start of Steps
66type setupSteps []models.Step
67
68// addStep adds a step to the beginning of the workflow's steps.
69func (ss *setupSteps) addStep(step models.Step) {
70 *ss = append(*ss, step)
71}
72
73type addlFields struct {
74 image string
75 container string
76 env map[string]string
77}
78
79func (e *Engine) InitWorkflow(twf tangled.Pipeline_Workflow, tpl tangled.Pipeline) (*models.Workflow, error) {
80 swf := &models.Workflow{}
81 addl := addlFields{}
82
83 dwf := &struct {
84 Steps []struct {
85 Command string `yaml:"command"`
86 Name string `yaml:"name"`
87 Environment map[string]string `yaml:"environment"`
88 } `yaml:"steps"`
89 Dependencies map[string][]string `yaml:"dependencies"`
90 Environment map[string]string `yaml:"environment"`
91 }{}
92 err := yaml.Unmarshal([]byte(twf.Raw), &dwf)
93 if err != nil {
94 return nil, err
95 }
96
97 for _, dstep := range dwf.Steps {
98 sstep := Step{}
99 sstep.environment = dstep.Environment
100 sstep.command = dstep.Command
101 sstep.name = dstep.Name
102 sstep.kind = models.StepKindUser
103 swf.Steps = append(swf.Steps, sstep)
104 }
105 swf.Name = twf.Name
106 addl.env = dwf.Environment
107 addl.image = workflowImage(dwf.Dependencies, e.cfg.NixeryPipelines.Nixery)
108
109 setup := &setupSteps{}
110
111 setup.addStep(nixConfStep())
112 setup.addStep(cloneStep(twf, *tpl.TriggerMetadata, e.cfg.Server.Dev))
113 // this step could be empty
114 if s := dependencyStep(dwf.Dependencies); s != nil {
115 setup.addStep(*s)
116 }
117
118 // append setup steps in order to the start of workflow steps
119 swf.Steps = append(*setup, swf.Steps...)
120 swf.Data = addl
121
122 return swf, nil
123}
124
125func (e *Engine) WorkflowTimeout() time.Duration {
126 workflowTimeoutStr := e.cfg.NixeryPipelines.WorkflowTimeout
127 workflowTimeout, err := time.ParseDuration(workflowTimeoutStr)
128 if err != nil {
129 e.l.Error("failed to parse workflow timeout", "error", err, "timeout", workflowTimeoutStr)
130 workflowTimeout = 5 * time.Minute
131 }
132
133 return workflowTimeout
134}
135
136func workflowImage(deps map[string][]string, nixery string) string {
137 var dependencies string
138 for reg, ds := range deps {
139 if reg == "nixpkgs" {
140 dependencies = path.Join(ds...)
141 }
142 }
143
144 // load defaults from somewhere else
145 dependencies = path.Join(dependencies, "bash", "git", "coreutils", "nix")
146
147 if runtime.GOARCH == "arm64" {
148 dependencies = path.Join("arm64", dependencies)
149 }
150
151 return path.Join(nixery, dependencies)
152}
153
154func New(ctx context.Context, cfg *config.Config) (*Engine, error) {
155 dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
156 if err != nil {
157 return nil, err
158 }
159
160 l := log.FromContext(ctx).With("component", "spindle")
161
162 e := &Engine{
163 docker: dcli,
164 l: l,
165 cfg: cfg,
166 }
167
168 e.cleanup = make(map[string][]cleanupFunc)
169
170 return e, nil
171}
172
173func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId, wf *models.Workflow) error {
174 e.l.Info("setting up workflow", "workflow", wid)
175
176 addl := wf.Data.(addlFields)
177
178 reader, err := e.docker.ImagePull(ctx, addl.image, image.PullOptions{})
179 if err != nil {
180 e.l.Error("pipeline image pull failed!", "image", addl.image, "workflowId", wid, "error", err.Error())
181
182 return fmt.Errorf("pulling image: %w", err)
183 }
184 defer reader.Close()
185 io.Copy(os.Stdout, reader)
186
187 _, err = e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{
188 Driver: "bridge",
189 })
190 if err != nil {
191 return err
192 }
193 e.registerCleanup(wid, func(ctx context.Context) error {
194 return e.docker.NetworkRemove(ctx, networkName(wid))
195 })
196
197 resp, err := e.docker.ContainerCreate(ctx, &container.Config{
198 Image: addl.image,
199 Cmd: []string{"cat"},
200 OpenStdin: true, // so cat stays alive :3
201 Tty: false,
202 Hostname: "spindle",
203 WorkingDir: workspaceDir,
204 Labels: map[string]string{
205 "sh.tangled.pipeline/workflow_id": wid.String(),
206 },
207 // TODO(winter): investigate whether environment variables passed here
208 // get propagated to ContainerExec processes
209 }, &container.HostConfig{
210 Mounts: []mount.Mount{
211 {
212 Type: mount.TypeTmpfs,
213 Target: "/tmp",
214 ReadOnly: false,
215 TmpfsOptions: &mount.TmpfsOptions{
216 Mode: 0o1777, // world-writeable sticky bit
217 Options: [][]string{
218 {"exec"},
219 },
220 },
221 },
222 },
223 ReadonlyRootfs: false,
224 CapDrop: []string{"ALL"},
225 CapAdd: []string{"CAP_DAC_OVERRIDE"},
226 SecurityOpt: []string{"no-new-privileges"},
227 ExtraHosts: []string{"host.docker.internal:host-gateway"},
228 }, nil, nil, "")
229 if err != nil {
230 return fmt.Errorf("creating container: %w", err)
231 }
232 e.registerCleanup(wid, func(ctx context.Context) error {
233 err = e.docker.ContainerStop(ctx, resp.ID, container.StopOptions{})
234 if err != nil {
235 return err
236 }
237
238 return e.docker.ContainerRemove(ctx, resp.ID, container.RemoveOptions{
239 RemoveVolumes: true,
240 RemoveLinks: false,
241 Force: false,
242 })
243 })
244
245 err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{})
246 if err != nil {
247 return fmt.Errorf("starting container: %w", err)
248 }
249
250 mkExecResp, err := e.docker.ContainerExecCreate(ctx, resp.ID, container.ExecOptions{
251 Cmd: []string{"mkdir", "-p", workspaceDir, homeDir},
252 AttachStdout: true, // NOTE(winter): pretty sure this will make it so that when stdout read is done below, mkdir is done. maybe??
253 AttachStderr: true, // for good measure, backed up by docker/cli ("If -d is not set, attach to everything by default")
254 })
255 if err != nil {
256 return err
257 }
258
259 // This actually *starts* the command. Thanks, Docker!
260 execResp, err := e.docker.ContainerExecAttach(ctx, mkExecResp.ID, container.ExecAttachOptions{})
261 if err != nil {
262 return err
263 }
264 defer execResp.Close()
265
266 // This is apparently best way to wait for the command to complete.
267 _, err = io.ReadAll(execResp.Reader)
268 if err != nil {
269 return err
270 }
271
272 execInspectResp, err := e.docker.ContainerExecInspect(ctx, mkExecResp.ID)
273 if err != nil {
274 return err
275 }
276
277 if execInspectResp.ExitCode != 0 {
278 return fmt.Errorf("mkdir exited with exit code %d", execInspectResp.ExitCode)
279 } else if execInspectResp.Running {
280 return errors.New("mkdir is somehow still running??")
281 }
282
283 addl.container = resp.ID
284 wf.Data = addl
285
286 return nil
287}
288
289func (e *Engine) RunStep(ctx context.Context, wid models.WorkflowId, w *models.Workflow, idx int, secrets []secrets.UnlockedSecret, wfLogger *models.WorkflowLogger) error {
290 addl := w.Data.(addlFields)
291 workflowEnvs := ConstructEnvs(addl.env)
292 // TODO(winter): should SetupWorkflow also have secret access?
293 // IMO yes, but probably worth thinking on.
294 for _, s := range secrets {
295 workflowEnvs.AddEnv(s.Key, s.Value)
296 }
297 step := w.Steps[idx].(Step)
298 select {
299 case <-ctx.Done():
300 return ctx.Err()
301 default:
302 }
303 envs := append(EnvVars(nil), workflowEnvs...)
304 for k, v := range step.environment {
305 envs.AddEnv(k, v)
306 }
307 envs.AddEnv("HOME", homeDir)
308
309 e.l.Info("executing step",
310 "workflow_id", wid.String(),
311 "step_index", idx,
312 "step_name", step.Name,
313 "command", step.command,
314 )
315
316 mkExecResp, err := e.docker.ContainerExecCreate(ctx, addl.container, container.ExecOptions{
317 Cmd: []string{"bash", "-c", step.command},
318 AttachStdout: true,
319 AttachStderr: true,
320 Env: envs,
321 })
322 if err != nil {
323 return fmt.Errorf("creating exec: %w", err)
324 }
325
326 // start tailing logs in background
327 tailDone := make(chan error, 1)
328 go func() {
329 tailDone <- e.tailStep(ctx, wfLogger, mkExecResp.ID, wid, idx, step)
330 }()
331
332 select {
333 case <-tailDone:
334 case <-ctx.Done():
335 // cleanup will be handled by DestroyWorkflow, since
336 // Docker doesn't provide an API to kill an exec run
337 // (sure, we could grab the PID and kill it ourselves,
338 // but that's wasted effort)
339 e.l.Warn("step timed out", "step", step.Name)
340 <-tailDone
341 return engine.ErrTimedOut
342 }
343
344 select {
345 case <-ctx.Done():
346 return ctx.Err()
347 default:
348 }
349
350 if err = e.handleStepFailure(ctx, wid, w, idx, mkExecResp.ID); err != nil {
351 return err
352 }
353
354 e.l.Info("step completed successfully",
355 "workflow_id", wid.String(),
356 "step_index", idx,
357 "step_name", step.Name,
358 )
359
360 return nil
361}
362
363// logStepFailure logs detailed information about a failed workflow step
364func (e *Engine) handleStepFailure(
365 ctx context.Context,
366 wid models.WorkflowId,
367 w *models.Workflow,
368 idx int,
369 execID string,
370) error {
371 addl := w.Data.(addlFields)
372 step := w.Steps[idx].(Step)
373
374 inspectResp, err := e.docker.ContainerInspect(ctx, addl.container)
375 if err != nil {
376 return err
377 }
378
379 execInspectResp, err := e.docker.ContainerExecInspect(ctx, execID)
380 if err != nil {
381 return err
382 }
383
384 // no error
385 if execInspectResp.ExitCode == 0 {
386 return nil
387 }
388
389 logFields := []any{
390 "workflow_id", wid.String(),
391 "step_index", idx,
392 "step_name", step.Name,
393 "command", step.command,
394 "container_exit_code", inspectResp.State.ExitCode,
395 "container_oom_killed", inspectResp.State.OOMKilled,
396 "exec_exit_code", execInspectResp.ExitCode,
397 }
398
399 // Add container state information
400 if inspectResp.State != nil {
401 logFields = append(logFields,
402 "container_status", inspectResp.State.Status,
403 "container_running", inspectResp.State.Running,
404 "container_paused", inspectResp.State.Paused,
405 "container_restarting", inspectResp.State.Restarting,
406 "container_dead", inspectResp.State.Dead,
407 )
408
409 if inspectResp.State.Error != "" {
410 logFields = append(logFields, "container_error", inspectResp.State.Error)
411 }
412
413 if inspectResp.State.StartedAt != "" {
414 logFields = append(logFields, "container_started_at", inspectResp.State.StartedAt)
415 }
416
417 if inspectResp.State.FinishedAt != "" {
418 logFields = append(logFields, "container_finished_at", inspectResp.State.FinishedAt)
419 }
420 }
421
422 // Add resource usage if available
423 if inspectResp.HostConfig != nil && inspectResp.HostConfig.Memory > 0 {
424 logFields = append(logFields, "memory_limit", inspectResp.HostConfig.Memory)
425 }
426
427 e.l.Error("workflow step failed!", logFields...)
428
429 if inspectResp.State.OOMKilled {
430 return ErrOOMKilled
431 }
432 return engine.ErrWorkflowFailed
433}
434
435func (e *Engine) tailStep(ctx context.Context, wfLogger *models.WorkflowLogger, execID string, wid models.WorkflowId, stepIdx int, step models.Step) error {
436 if wfLogger == nil {
437 return nil
438 }
439
440 // This actually *starts* the command. Thanks, Docker!
441 logs, err := e.docker.ContainerExecAttach(ctx, execID, container.ExecAttachOptions{})
442 if err != nil {
443 return err
444 }
445 defer logs.Close()
446
447 _, err = stdcopy.StdCopy(
448 wfLogger.DataWriter("stdout"),
449 wfLogger.DataWriter("stderr"),
450 logs.Reader,
451 )
452 if err != nil && err != io.EOF && !errors.Is(err, context.DeadlineExceeded) {
453 return fmt.Errorf("failed to copy logs: %w", err)
454 }
455
456 return nil
457}
458
459func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error {
460 e.cleanupMu.Lock()
461 key := wid.String()
462
463 fns := e.cleanup[key]
464 delete(e.cleanup, key)
465 e.cleanupMu.Unlock()
466
467 for _, fn := range fns {
468 if err := fn(ctx); err != nil {
469 e.l.Error("failed to cleanup workflow resource", "workflowId", wid, "error", err)
470 }
471 }
472 return nil
473}
474
475func (e *Engine) registerCleanup(wid models.WorkflowId, fn cleanupFunc) {
476 e.cleanupMu.Lock()
477 defer e.cleanupMu.Unlock()
478
479 key := wid.String()
480 e.cleanup[key] = append(e.cleanup[key], fn)
481}
482
483func networkName(wid models.WorkflowId) string {
484 return fmt.Sprintf("workflow-network-%s", wid)
485}