this repo has no description
1package nixery
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "io"
8 "log/slog"
9 "os"
10 "path"
11 "runtime"
12 "sync"
13 "time"
14
15 "github.com/docker/docker/api/types/container"
16 "github.com/docker/docker/api/types/image"
17 "github.com/docker/docker/api/types/mount"
18 "github.com/docker/docker/api/types/network"
19 "github.com/docker/docker/client"
20 "github.com/docker/docker/pkg/stdcopy"
21 "gopkg.in/yaml.v3"
22 "tangled.sh/tangled.sh/core/api/tangled"
23 "tangled.sh/tangled.sh/core/log"
24 "tangled.sh/tangled.sh/core/spindle/config"
25 "tangled.sh/tangled.sh/core/spindle/engine"
26 "tangled.sh/tangled.sh/core/spindle/models"
27 "tangled.sh/tangled.sh/core/spindle/secrets"
28)
29
30const (
31 workspaceDir = "/tangled/workspace"
32)
33
34type cleanupFunc func(context.Context) error
35
36type Engine struct {
37 docker client.APIClient
38 l *slog.Logger
39 cfg *config.Config
40
41 cleanupMu sync.Mutex
42 cleanup map[string][]cleanupFunc
43}
44
45type Step struct {
46 name string
47 kind models.StepKind
48 command string
49 environment map[string]string
50}
51
52func (s Step) Name() string {
53 return s.name
54}
55
56func (s Step) Command() string {
57 return s.command
58}
59
60func (s Step) Kind() models.StepKind {
61 return s.kind
62}
63
64// setupSteps get added to start of Steps
65type setupSteps []models.Step
66
67// addStep adds a step to the beginning of the workflow's steps.
68func (ss *setupSteps) addStep(step models.Step) {
69 *ss = append(*ss, step)
70}
71
72type addlFields struct {
73 image string
74 container string
75 env map[string]string
76}
77
78func (e *Engine) InitWorkflow(twf tangled.Pipeline_Workflow, tpl tangled.Pipeline) (*models.Workflow, error) {
79 swf := &models.Workflow{}
80 addl := addlFields{}
81
82 dwf := &struct {
83 Steps []struct {
84 Command string `yaml:"command"`
85 Name string `yaml:"name"`
86 Environment map[string]string `yaml:"environment"`
87 } `yaml:"steps"`
88 Dependencies map[string][]string `yaml:"dependencies"`
89 Environment map[string]string `yaml:"environment"`
90 }{}
91 err := yaml.Unmarshal([]byte(twf.Raw), &dwf)
92 if err != nil {
93 return nil, err
94 }
95
96 for _, dstep := range dwf.Steps {
97 sstep := Step{}
98 sstep.environment = dstep.Environment
99 sstep.command = dstep.Command
100 sstep.name = dstep.Name
101 sstep.kind = models.StepKindUser
102 swf.Steps = append(swf.Steps, sstep)
103 }
104 swf.Name = twf.Name
105 addl.env = dwf.Environment
106 addl.image = workflowImage(dwf.Dependencies, e.cfg.NixeryPipelines.Nixery)
107
108 setup := &setupSteps{}
109
110 setup.addStep(nixConfStep())
111 setup.addStep(cloneStep(twf, *tpl.TriggerMetadata, e.cfg.Server.Dev))
112 // this step could be empty
113 if s := dependencyStep(dwf.Dependencies); s != nil {
114 setup.addStep(*s)
115 }
116
117 // append setup steps in order to the start of workflow steps
118 swf.Steps = append(*setup, swf.Steps...)
119 swf.Data = addl
120
121 return swf, nil
122}
123
124func (e *Engine) WorkflowTimeout() time.Duration {
125 workflowTimeoutStr := e.cfg.NixeryPipelines.WorkflowTimeout
126 workflowTimeout, err := time.ParseDuration(workflowTimeoutStr)
127 if err != nil {
128 e.l.Error("failed to parse workflow timeout", "error", err, "timeout", workflowTimeoutStr)
129 workflowTimeout = 5 * time.Minute
130 }
131
132 return workflowTimeout
133}
134
135func workflowImage(deps map[string][]string, nixery string) string {
136 var dependencies string
137 for reg, ds := range deps {
138 if reg == "nixpkgs" {
139 dependencies = path.Join(ds...)
140 }
141 }
142
143 // load defaults from somewhere else
144 dependencies = path.Join(dependencies, "bash", "git", "coreutils", "nix")
145
146 if runtime.GOARCH == "arm64" {
147 dependencies = path.Join("arm64", dependencies)
148 }
149
150 return path.Join(nixery, dependencies)
151}
152
153func New(ctx context.Context, cfg *config.Config) (*Engine, error) {
154 dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
155 if err != nil {
156 return nil, err
157 }
158
159 l := log.FromContext(ctx).With("component", "spindle")
160
161 e := &Engine{
162 docker: dcli,
163 l: l,
164 cfg: cfg,
165 }
166
167 e.cleanup = make(map[string][]cleanupFunc)
168
169 return e, nil
170}
171
172func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId, wf *models.Workflow) error {
173 e.l.Info("setting up workflow", "workflow", wid)
174
175 _, err := e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{
176 Driver: "bridge",
177 })
178 if err != nil {
179 return err
180 }
181 e.registerCleanup(wid, func(ctx context.Context) error {
182 return e.docker.NetworkRemove(ctx, networkName(wid))
183 })
184
185 addl := wf.Data.(addlFields)
186
187 reader, err := e.docker.ImagePull(ctx, addl.image, image.PullOptions{})
188 if err != nil {
189 e.l.Error("pipeline image pull failed!", "image", addl.image, "workflowId", wid, "error", err.Error())
190
191 return fmt.Errorf("pulling image: %w", err)
192 }
193 defer reader.Close()
194 io.Copy(os.Stdout, reader)
195
196 resp, err := e.docker.ContainerCreate(ctx, &container.Config{
197 Image: addl.image,
198 Cmd: []string{"cat"},
199 OpenStdin: true, // so cat stays alive :3
200 Tty: false,
201 Hostname: "spindle",
202 // TODO(winter): investigate whether environment variables passed here
203 // get propagated to ContainerExec processes
204 }, &container.HostConfig{
205 Mounts: []mount.Mount{
206 {
207 Type: mount.TypeTmpfs,
208 Target: "/tmp",
209 ReadOnly: false,
210 TmpfsOptions: &mount.TmpfsOptions{
211 Mode: 0o1777, // world-writeable sticky bit
212 Options: [][]string{
213 {"exec"},
214 },
215 },
216 },
217 },
218 ReadonlyRootfs: false,
219 CapDrop: []string{"ALL"},
220 CapAdd: []string{"CAP_DAC_OVERRIDE"},
221 SecurityOpt: []string{"no-new-privileges"},
222 ExtraHosts: []string{"host.docker.internal:host-gateway"},
223 }, nil, nil, "")
224 if err != nil {
225 return fmt.Errorf("creating container: %w", err)
226 }
227 e.registerCleanup(wid, func(ctx context.Context) error {
228 err = e.docker.ContainerStop(ctx, resp.ID, container.StopOptions{})
229 if err != nil {
230 return err
231 }
232
233 return e.docker.ContainerRemove(ctx, resp.ID, container.RemoveOptions{
234 RemoveVolumes: true,
235 RemoveLinks: false,
236 Force: false,
237 })
238 })
239
240 err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{})
241 if err != nil {
242 return fmt.Errorf("starting container: %w", err)
243 }
244
245 mkExecResp, err := e.docker.ContainerExecCreate(ctx, resp.ID, container.ExecOptions{
246 Cmd: []string{"mkdir", "-p", workspaceDir},
247 AttachStdout: true, // NOTE(winter): pretty sure this will make it so that when stdout read is done below, mkdir is done. maybe??
248 AttachStderr: true, // for good measure, backed up by docker/cli ("If -d is not set, attach to everything by default")
249 })
250 if err != nil {
251 return err
252 }
253
254 // This actually *starts* the command. Thanks, Docker!
255 execResp, err := e.docker.ContainerExecAttach(ctx, mkExecResp.ID, container.ExecAttachOptions{})
256 if err != nil {
257 return err
258 }
259 defer execResp.Close()
260
261 // This is apparently best way to wait for the command to complete.
262 _, err = io.ReadAll(execResp.Reader)
263 if err != nil {
264 return err
265 }
266
267 execInspectResp, err := e.docker.ContainerExecInspect(ctx, mkExecResp.ID)
268 if err != nil {
269 return err
270 }
271
272 if execInspectResp.ExitCode != 0 {
273 return fmt.Errorf("mkdir exited with exit code %d", execInspectResp.ExitCode)
274 } else if execInspectResp.Running {
275 return errors.New("mkdir is somehow still running??")
276 }
277
278 addl.container = resp.ID
279 wf.Data = addl
280
281 return nil
282}
283
284func (e *Engine) RunStep(ctx context.Context, wid models.WorkflowId, w *models.Workflow, idx int, secrets []secrets.UnlockedSecret, wfLogger *models.WorkflowLogger) error {
285 addl := w.Data.(addlFields)
286 workflowEnvs := ConstructEnvs(addl.env)
287 // TODO(winter): should SetupWorkflow also have secret access?
288 // IMO yes, but probably worth thinking on.
289 for _, s := range secrets {
290 workflowEnvs.AddEnv(s.Key, s.Value)
291 }
292
293 step := w.Steps[idx].(Step)
294
295 select {
296 case <-ctx.Done():
297 return ctx.Err()
298 default:
299 }
300
301 envs := append(EnvVars(nil), workflowEnvs...)
302 for k, v := range step.environment {
303 envs.AddEnv(k, v)
304 }
305 envs.AddEnv("HOME", workspaceDir)
306 e.l.Debug("envs for step", "step", step.Name, "envs", envs.Slice())
307
308 mkExecResp, err := e.docker.ContainerExecCreate(ctx, addl.container, container.ExecOptions{
309 Cmd: []string{"bash", "-c", step.command},
310 AttachStdout: true,
311 AttachStderr: true,
312 })
313 if err != nil {
314 return fmt.Errorf("creating exec: %w", err)
315 }
316
317 // start tailing logs in background
318 tailDone := make(chan error, 1)
319 go func() {
320 tailDone <- e.tailStep(ctx, wfLogger, mkExecResp.ID, wid, idx, step)
321 }()
322
323 select {
324 case <-tailDone:
325
326 case <-ctx.Done():
327 // cleanup will be handled by DestroyWorkflow, since
328 // Docker doesn't provide an API to kill an exec run
329 // (sure, we could grab the PID and kill it ourselves,
330 // but that's wasted effort)
331 e.l.Warn("step timed out", "step", step.Name)
332
333 <-tailDone
334
335 return engine.ErrTimedOut
336 }
337
338 select {
339 case <-ctx.Done():
340 return ctx.Err()
341 default:
342 }
343
344 execInspectResp, err := e.docker.ContainerExecInspect(ctx, mkExecResp.ID)
345 if err != nil {
346 return err
347 }
348
349 if execInspectResp.ExitCode != 0 {
350 inspectResp, err := e.docker.ContainerInspect(ctx, addl.container)
351 if err != nil {
352 return err
353 }
354
355 e.l.Error("workflow failed!", "workflow_id", wid.String(), "exit_code", execInspectResp.ExitCode, "oom_killed", inspectResp.State.OOMKilled)
356
357 if inspectResp.State.OOMKilled {
358 return ErrOOMKilled
359 }
360 return engine.ErrWorkflowFailed
361 }
362
363 return nil
364}
365
366func (e *Engine) tailStep(ctx context.Context, wfLogger *models.WorkflowLogger, execID string, wid models.WorkflowId, stepIdx int, step models.Step) error {
367 if wfLogger == nil {
368 return nil
369 }
370
371 // This actually *starts* the command. Thanks, Docker!
372 logs, err := e.docker.ContainerExecAttach(ctx, execID, container.ExecAttachOptions{})
373 if err != nil {
374 return err
375 }
376 defer logs.Close()
377
378 _, err = stdcopy.StdCopy(
379 wfLogger.DataWriter("stdout"),
380 wfLogger.DataWriter("stderr"),
381 logs.Reader,
382 )
383 if err != nil && err != io.EOF && !errors.Is(err, context.DeadlineExceeded) {
384 return fmt.Errorf("failed to copy logs: %w", err)
385 }
386
387 return nil
388}
389
390func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error {
391 e.cleanupMu.Lock()
392 key := wid.String()
393
394 fns := e.cleanup[key]
395 delete(e.cleanup, key)
396 e.cleanupMu.Unlock()
397
398 for _, fn := range fns {
399 if err := fn(ctx); err != nil {
400 e.l.Error("failed to cleanup workflow resource", "workflowId", wid, "error", err)
401 }
402 }
403 return nil
404}
405
406func (e *Engine) registerCleanup(wid models.WorkflowId, fn cleanupFunc) {
407 e.cleanupMu.Lock()
408 defer e.cleanupMu.Unlock()
409
410 key := wid.String()
411 e.cleanup[key] = append(e.cleanup[key], fn)
412}
413
414func networkName(wid models.WorkflowId) string {
415 return fmt.Sprintf("workflow-network-%s", wid)
416}