Monorepo for Tangled
1package nixery
2
3import (
4 "bufio"
5 "context"
6 "errors"
7 "fmt"
8 "io"
9 "log/slog"
10 "path"
11 "runtime"
12 "sync"
13 "time"
14
15 "github.com/docker/docker/api/types/container"
16 "github.com/docker/docker/api/types/image"
17 "github.com/docker/docker/api/types/mount"
18 "github.com/docker/docker/api/types/network"
19 "github.com/docker/docker/client"
20 "github.com/docker/docker/pkg/stdcopy"
21 "gopkg.in/yaml.v3"
22 "tangled.org/core/api/tangled"
23 "tangled.org/core/log"
24 "tangled.org/core/spindle/config"
25 "tangled.org/core/spindle/engine"
26 "tangled.org/core/spindle/models"
27 "tangled.org/core/spindle/secrets"
28)
29
30const (
31 workspaceDir = "/tangled/workspace"
32 homeDir = "/tangled/home"
33)
34
35type cleanupFunc func(context.Context) error
36
37type Engine struct {
38 docker client.APIClient
39 l *slog.Logger
40 cfg *config.Config
41
42 cleanupMu sync.Mutex
43 cleanup map[string][]cleanupFunc
44}
45
46type Step struct {
47 name string
48 kind models.StepKind
49 command string
50 environment map[string]string
51}
52
53func (s Step) Name() string {
54 return s.name
55}
56
57func (s Step) Command() string {
58 return s.command
59}
60
61func (s Step) Kind() models.StepKind {
62 return s.kind
63}
64
65// setupSteps get added to start of Steps
66type setupSteps []models.Step
67
68// addStep adds a step to the beginning of the workflow's steps.
69func (ss *setupSteps) addStep(step models.Step) {
70 *ss = append(*ss, step)
71}
72
73type addlFields struct {
74 image string
75 container string
76}
77
78func (e *Engine) InitWorkflow(twf tangled.Pipeline_Workflow, tpl tangled.Pipeline) (*models.Workflow, error) {
79 swf := &models.Workflow{}
80 addl := addlFields{}
81
82 dwf := &struct {
83 Steps []struct {
84 Command string `yaml:"command"`
85 Name string `yaml:"name"`
86 Environment map[string]string `yaml:"environment"`
87 } `yaml:"steps"`
88 Dependencies map[string][]string `yaml:"dependencies"`
89 Environment map[string]string `yaml:"environment"`
90 }{}
91 err := yaml.Unmarshal([]byte(twf.Raw), &dwf)
92 if err != nil {
93 return nil, err
94 }
95
96 for _, dstep := range dwf.Steps {
97 sstep := Step{}
98 sstep.environment = dstep.Environment
99 sstep.command = dstep.Command
100 sstep.name = dstep.Name
101 sstep.kind = models.StepKindUser
102 swf.Steps = append(swf.Steps, sstep)
103 }
104 swf.Name = twf.Name
105 swf.Environment = dwf.Environment
106 addl.image = workflowImage(dwf.Dependencies, e.cfg.NixeryPipelines.Nixery)
107
108 setup := &setupSteps{}
109
110 setup.addStep(nixConfStep())
111 setup.addStep(models.BuildCloneStep(twf, *tpl.TriggerMetadata, e.cfg.Server.Dev))
112 // this step could be empty
113 if s := dependencyStep(dwf.Dependencies); s != nil {
114 setup.addStep(*s)
115 }
116
117 // append setup steps in order to the start of workflow steps
118 swf.Steps = append(*setup, swf.Steps...)
119 swf.Data = addl
120
121 return swf, nil
122}
123
124func (e *Engine) WorkflowTimeout() time.Duration {
125 workflowTimeoutStr := e.cfg.NixeryPipelines.WorkflowTimeout
126 workflowTimeout, err := time.ParseDuration(workflowTimeoutStr)
127 if err != nil {
128 e.l.Error("failed to parse workflow timeout", "error", err, "timeout", workflowTimeoutStr)
129 workflowTimeout = 5 * time.Minute
130 }
131
132 return workflowTimeout
133}
134
135func workflowImage(deps map[string][]string, nixery string) string {
136 var dependencies string
137 for reg, ds := range deps {
138 if reg == "nixpkgs" {
139 dependencies = path.Join(ds...)
140 }
141 }
142
143 // load defaults from somewhere else
144 dependencies = path.Join(dependencies, "bash", "git", "coreutils", "nix")
145
146 if runtime.GOARCH == "arm64" {
147 dependencies = path.Join("arm64", dependencies)
148 }
149
150 return path.Join(nixery, dependencies)
151}
152
153func New(ctx context.Context, cfg *config.Config) (*Engine, error) {
154 dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
155 if err != nil {
156 return nil, err
157 }
158
159 l := log.FromContext(ctx).With("component", "spindle")
160
161 e := &Engine{
162 docker: dcli,
163 l: l,
164 cfg: cfg,
165 }
166
167 e.cleanup = make(map[string][]cleanupFunc)
168
169 return e, nil
170}
171
172func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId, wf *models.Workflow, wfLogger models.WorkflowLogger) error {
173 /// -------------------------INITIAL SETUP------------------------------------------
174 l := e.l.With("workflow", wid)
175 l.Info("setting up workflow")
176
177 setupStep := Step{
178 name: "nixery image pull",
179 kind: models.StepKindSystem,
180 }
181 setupStepIdx := -1
182
183 wfLogger.ControlWriter(setupStepIdx, setupStep, models.StepStatusStart).Write([]byte{0})
184 defer wfLogger.ControlWriter(setupStepIdx, setupStep, models.StepStatusEnd).Write([]byte{0})
185
186 /// -------------------------NETWORK CREATION---------------------------------------
187 _, err := e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{
188 Driver: "bridge",
189 })
190 if err != nil {
191 return err
192 }
193
194 e.registerCleanup(wid, func(ctx context.Context) error {
195 if err := e.docker.NetworkRemove(ctx, networkName(wid)); err != nil {
196 return fmt.Errorf("removing network: %w", err)
197 }
198 return nil
199 })
200
201 /// -------------------------IMAGE PULL---------------------------------------------
202 addl := wf.Data.(addlFields)
203 l.Info("pulling image", "image", addl.image)
204 fmt.Fprintf(
205 wfLogger.DataWriter(setupStepIdx, "stdout"),
206 "pulling image: %s",
207 addl.image,
208 )
209
210 reader, err := e.docker.ImagePull(ctx, addl.image, image.PullOptions{})
211 if err != nil {
212 l.Error("pipeline image pull failed!", "error", err.Error())
213 fmt.Fprintf(wfLogger.DataWriter(setupStepIdx, "stderr"), "image pull failed: %s", err)
214 return fmt.Errorf("pulling image: %w", err)
215 }
216 defer reader.Close()
217
218 scanner := bufio.NewScanner(reader)
219 for scanner.Scan() {
220 line := scanner.Text()
221 wfLogger.DataWriter(setupStepIdx, "stdout").Write([]byte(line))
222 l.Info("image pull progress", "stdout", line)
223 }
224
225 /// -------------------------CONTAINER CREATION-------------------------------------
226 l.Info("creating container")
227 wfLogger.DataWriter(setupStepIdx, "stdout").Write([]byte("creating container..."))
228
229 resp, err := e.docker.ContainerCreate(ctx, &container.Config{
230 Image: addl.image,
231 Cmd: []string{"cat"},
232 OpenStdin: true, // so cat stays alive :3
233 Tty: false,
234 Hostname: "spindle",
235 WorkingDir: workspaceDir,
236 Labels: map[string]string{
237 "sh.tangled.pipeline/workflow_id": wid.String(),
238 },
239 // TODO(winter): investigate whether environment variables passed here
240 // get propagated to ContainerExec processes
241 }, &container.HostConfig{
242 Mounts: []mount.Mount{
243 {
244 Type: mount.TypeTmpfs,
245 Target: "/tmp",
246 ReadOnly: false,
247 TmpfsOptions: &mount.TmpfsOptions{
248 Mode: 0o1777, // world-writeable sticky bit
249 Options: [][]string{
250 {"exec"},
251 },
252 },
253 },
254 },
255 ReadonlyRootfs: false,
256 CapDrop: []string{"ALL"},
257 CapAdd: []string{"CAP_DAC_OVERRIDE", "CAP_CHOWN", "CAP_FOWNER", "CAP_SETUID", "CAP_SETGID"},
258 SecurityOpt: []string{"no-new-privileges"},
259 ExtraHosts: []string{"host.docker.internal:host-gateway"},
260 }, nil, nil, "")
261 if err != nil {
262 fmt.Fprintf(
263 wfLogger.DataWriter(setupStepIdx, "stderr"),
264 "container creation failed: %s",
265 err,
266 )
267 return fmt.Errorf("creating container: %w", err)
268 }
269
270 e.registerCleanup(wid, func(ctx context.Context) error {
271 if err := e.docker.ContainerStop(ctx, resp.ID, container.StopOptions{}); err != nil {
272 return fmt.Errorf("stopping container: %w", err)
273 }
274
275 err := e.docker.ContainerRemove(ctx, resp.ID, container.RemoveOptions{
276 RemoveVolumes: true,
277 RemoveLinks: false,
278 Force: false,
279 })
280 if err != nil {
281 return fmt.Errorf("removing container: %w", err)
282 }
283
284 return nil
285 })
286
287 /// -------------------------CONTAINER START----------------------------------------
288 wfLogger.DataWriter(setupStepIdx, "stdout").Write([]byte("starting container..."))
289 if err := e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{}); err != nil {
290 return fmt.Errorf("starting container: %w", err)
291 }
292
293 mkExecResp, err := e.docker.ContainerExecCreate(ctx, resp.ID, container.ExecOptions{
294 Cmd: []string{"mkdir", "-p", workspaceDir, homeDir},
295 AttachStdout: true, // NOTE(winter): pretty sure this will make it so that when stdout read is done below, mkdir is done. maybe??
296 AttachStderr: true, // for good measure, backed up by docker/cli ("If -d is not set, attach to everything by default")
297 })
298 if err != nil {
299 return err
300 }
301
302 // This actually *starts* the command. Thanks, Docker!
303 execResp, err := e.docker.ContainerExecAttach(ctx, mkExecResp.ID, container.ExecAttachOptions{})
304 if err != nil {
305 return err
306 }
307 defer execResp.Close()
308
309 // This is apparently best way to wait for the command to complete.
310 _, err = io.ReadAll(execResp.Reader)
311 if err != nil {
312 return err
313 }
314
315 /// -----------------------------------FINISH---------------------------------------
316 execInspectResp, err := e.docker.ContainerExecInspect(ctx, mkExecResp.ID)
317 if err != nil {
318 return err
319 }
320
321 if execInspectResp.ExitCode != 0 {
322 return fmt.Errorf("mkdir exited with exit code %d", execInspectResp.ExitCode)
323 } else if execInspectResp.Running {
324 return errors.New("mkdir is somehow still running??")
325 }
326
327 addl.container = resp.ID
328 wf.Data = addl
329
330 return nil
331}
332
333func (e *Engine) RunStep(ctx context.Context, wid models.WorkflowId, w *models.Workflow, idx int, secrets []secrets.UnlockedSecret, wfLogger models.WorkflowLogger) error {
334 addl := w.Data.(addlFields)
335 workflowEnvs := ConstructEnvs(w.Environment)
336 // TODO(winter): should SetupWorkflow also have secret access?
337 // IMO yes, but probably worth thinking on.
338 for _, s := range secrets {
339 workflowEnvs.AddEnv(s.Key, s.Value)
340 }
341
342 step := w.Steps[idx]
343
344 select {
345 case <-ctx.Done():
346 return ctx.Err()
347 default:
348 }
349
350 envs := append(EnvVars(nil), workflowEnvs...)
351 if nixStep, ok := step.(Step); ok {
352 for k, v := range nixStep.environment {
353 envs.AddEnv(k, v)
354 }
355 }
356
357 envs.AddEnv("HOME", homeDir)
358 existingPath := "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"
359 envs.AddEnv("PATH", fmt.Sprintf("%s/.nix-profile/bin:/nix/var/nix/profiles/default/bin:%s", homeDir, existingPath))
360
361 mkExecResp, err := e.docker.ContainerExecCreate(ctx, addl.container, container.ExecOptions{
362 Cmd: []string{"bash", "-c", step.Command()},
363 AttachStdout: true,
364 AttachStderr: true,
365 Env: envs,
366 })
367 if err != nil {
368 return fmt.Errorf("creating exec: %w", err)
369 }
370
371 // start tailing logs in background
372 tailDone := make(chan error, 1)
373 go func() {
374 tailDone <- e.tailStep(ctx, wfLogger, mkExecResp.ID, idx)
375 }()
376
377 select {
378 case <-tailDone:
379
380 case <-ctx.Done():
381 // cleanup will be handled by DestroyWorkflow, since
382 // Docker doesn't provide an API to kill an exec run
383 // (sure, we could grab the PID and kill it ourselves,
384 // but that's wasted effort)
385 e.l.Warn("step timed out", "step", step.Name())
386
387 <-tailDone
388
389 return engine.ErrTimedOut
390 }
391
392 select {
393 case <-ctx.Done():
394 return ctx.Err()
395 default:
396 }
397
398 execInspectResp, err := e.docker.ContainerExecInspect(ctx, mkExecResp.ID)
399 if err != nil {
400 return err
401 }
402
403 if execInspectResp.ExitCode != 0 {
404 inspectResp, err := e.docker.ContainerInspect(ctx, addl.container)
405 if err != nil {
406 return err
407 }
408
409 e.l.Error("workflow failed!", "workflow_id", wid.String(), "exit_code", execInspectResp.ExitCode, "oom_killed", inspectResp.State.OOMKilled)
410
411 if inspectResp.State.OOMKilled {
412 return ErrOOMKilled
413 }
414 return engine.ErrWorkflowFailed
415 }
416
417 return nil
418}
419
420func (e *Engine) tailStep(ctx context.Context, wfLogger models.WorkflowLogger, execID string, stepIdx int) error {
421 if wfLogger == nil {
422 return nil
423 }
424
425 // This actually *starts* the command. Thanks, Docker!
426 logs, err := e.docker.ContainerExecAttach(ctx, execID, container.ExecAttachOptions{})
427 if err != nil {
428 return err
429 }
430 defer logs.Close()
431
432 _, err = stdcopy.StdCopy(
433 wfLogger.DataWriter(stepIdx, "stdout"),
434 wfLogger.DataWriter(stepIdx, "stderr"),
435 logs.Reader,
436 )
437 if err != nil && err != io.EOF && !errors.Is(err, context.DeadlineExceeded) {
438 return fmt.Errorf("failed to copy logs: %w", err)
439 }
440
441 return nil
442}
443
444func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error {
445 fns := e.drainCleanups(wid)
446
447 for _, fn := range fns {
448 if err := fn(ctx); err != nil {
449 e.l.Error("failed to cleanup workflow resource", "workflowId", wid, "error", err)
450 }
451 }
452 return nil
453}
454
455func (e *Engine) registerCleanup(wid models.WorkflowId, fn cleanupFunc) {
456 e.cleanupMu.Lock()
457 defer e.cleanupMu.Unlock()
458
459 key := wid.String()
460 e.cleanup[key] = append(e.cleanup[key], fn)
461}
462
463func (e *Engine) drainCleanups(wid models.WorkflowId) []cleanupFunc {
464 e.cleanupMu.Lock()
465 key := wid.String()
466
467 fns := e.cleanup[key]
468 delete(e.cleanup, key)
469 e.cleanupMu.Unlock()
470
471 return fns
472}
473
474func networkName(wid models.WorkflowId) string {
475 return fmt.Sprintf("workflow-network-%s", wid)
476}