Monorepo for Tangled

wip: spindle: docker engine

Signed-off-by: Anirudh Oppiliappan <anirudh@tangled.org>

anirudh.fi de80ec95 3638909d

verified
+423 -11
+5
spindle/config/config.go
··· 41 WorkflowTimeout string `env:"WORKFLOW_TIMEOUT, default=5m"` 42 } 43 44 type Config struct { 45 Server Server `env:",prefix=SPINDLE_SERVER_"` 46 NixeryPipelines NixeryPipelines `env:",prefix=SPINDLE_NIXERY_PIPELINES_"` 47 } 48 49 func Load(ctx context.Context) (*Config, error) {
··· 41 WorkflowTimeout string `env:"WORKFLOW_TIMEOUT, default=5m"` 42 } 43 44 + type DockerPipelines struct { 45 + WorkflowTimeout string `env:"WORKFLOW_TIMEOUT, default=5m"` 46 + } 47 + 48 type Config struct { 49 Server Server `env:",prefix=SPINDLE_SERVER_"` 50 NixeryPipelines NixeryPipelines `env:",prefix=SPINDLE_NIXERY_PIPELINES_"` 51 + DockerPipelines DockerPipelines `env:",prefix=SPINDLE_DOCKER_PIPELINES_"` 52 } 53 54 func Load(ctx context.Context) (*Config, error) {
+398
spindle/engines/docker/engine.go
···
··· 1 + package docker 2 + 3 + import ( 4 + "context" 5 + "errors" 6 + "fmt" 7 + "io" 8 + "log/slog" 9 + "os" 10 + "sync" 11 + "time" 12 + 13 + "github.com/docker/docker/api/types/container" 14 + "github.com/docker/docker/api/types/image" 15 + "github.com/docker/docker/api/types/mount" 16 + "github.com/docker/docker/api/types/network" 17 + "github.com/docker/docker/client" 18 + "github.com/docker/docker/pkg/stdcopy" 19 + "gopkg.in/yaml.v3" 20 + "tangled.org/core/api/tangled" 21 + "tangled.org/core/log" 22 + "tangled.org/core/spindle/config" 23 + "tangled.org/core/spindle/engine" 24 + "tangled.org/core/spindle/engines/common" 25 + engineerrors "tangled.org/core/spindle/engines/errors" 26 + "tangled.org/core/spindle/models" 27 + "tangled.org/core/spindle/secrets" 28 + ) 29 + 30 + const ( 31 + workspaceDir = "/tangled/workspace" 32 + homeDir = "/tangled/home" 33 + defaultImage = "ubuntu:22.04" 34 + ) 35 + 36 + type cleanupFunc func(context.Context) error 37 + 38 + type Engine struct { 39 + docker client.APIClient 40 + l *slog.Logger 41 + cfg *config.Config 42 + 43 + cleanupMu sync.Mutex 44 + cleanup map[string][]cleanupFunc 45 + } 46 + 47 + type Step struct { 48 + name string 49 + kind models.StepKind 50 + command string 51 + environment map[string]string 52 + } 53 + 54 + func (s Step) Name() string { 55 + return s.name 56 + } 57 + 58 + func (s Step) Command() string { 59 + return s.command 60 + } 61 + 62 + func (s Step) Kind() models.StepKind { 63 + return s.kind 64 + } 65 + 66 + // setupSteps get added to start of Steps 67 + type setupSteps []models.Step 68 + 69 + // addStep adds a step to the beginning of the workflow's steps. 70 + func (ss *setupSteps) addStep(step models.Step) { 71 + *ss = append(*ss, step) 72 + } 73 + 74 + type addlFields struct { 75 + image string 76 + container string 77 + } 78 + 79 + func (e *Engine) InitWorkflow(twf tangled.Pipeline_Workflow, tpl tangled.Pipeline) (*models.Workflow, error) { 80 + swf := &models.Workflow{} 81 + addl := addlFields{} 82 + 83 + dwf := &struct { 84 + Steps []struct { 85 + Command string `yaml:"command"` 86 + Name string `yaml:"name"` 87 + Environment map[string]string `yaml:"environment"` 88 + } `yaml:"steps"` 89 + Environment map[string]string `yaml:"environment"` 90 + Image string `yaml:"image"` 91 + }{} 92 + err := yaml.Unmarshal([]byte(twf.Raw), &dwf) 93 + if err != nil { 94 + return nil, err 95 + } 96 + 97 + for _, dstep := range dwf.Steps { 98 + sstep := Step{} 99 + sstep.environment = dstep.Environment 100 + sstep.command = dstep.Command 101 + sstep.name = dstep.Name 102 + sstep.kind = models.StepKindUser 103 + swf.Steps = append(swf.Steps, sstep) 104 + } 105 + swf.Name = twf.Name 106 + swf.Environment = dwf.Environment 107 + 108 + // Use specified image or default 109 + if dwf.Image != "" { 110 + addl.image = dwf.Image 111 + } else { 112 + addl.image = defaultImage 113 + } 114 + 115 + setup := &setupSteps{} 116 + 117 + // Add clone step 118 + setup.addStep(models.BuildCloneStep(twf, *tpl.TriggerMetadata, e.cfg.Server.Dev)) 119 + 120 + // append setup steps in order to the start of workflow steps 121 + swf.Steps = append(*setup, swf.Steps...) 122 + swf.Data = addl 123 + 124 + return swf, nil 125 + } 126 + 127 + func (e *Engine) WorkflowTimeout() time.Duration { 128 + workflowTimeoutStr := e.cfg.DockerPipelines.WorkflowTimeout 129 + workflowTimeout, err := time.ParseDuration(workflowTimeoutStr) 130 + if err != nil { 131 + e.l.Error("failed to parse workflow timeout", "error", err, "timeout", workflowTimeoutStr) 132 + workflowTimeout = 5 * time.Minute 133 + } 134 + 135 + return workflowTimeout 136 + } 137 + 138 + func New(ctx context.Context, cfg *config.Config) (*Engine, error) { 139 + dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) 140 + if err != nil { 141 + return nil, err 142 + } 143 + 144 + l := log.FromContext(ctx).With("component", "spindle-docker") 145 + 146 + e := &Engine{ 147 + docker: dcli, 148 + l: l, 149 + cfg: cfg, 150 + } 151 + 152 + e.cleanup = make(map[string][]cleanupFunc) 153 + 154 + return e, nil 155 + } 156 + 157 + func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId, wf *models.Workflow) error { 158 + e.l.Info("setting up workflow", "workflow", wid) 159 + 160 + _, err := e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{ 161 + Driver: "bridge", 162 + }) 163 + if err != nil { 164 + return err 165 + } 166 + e.registerCleanup(wid, func(ctx context.Context) error { 167 + return e.docker.NetworkRemove(ctx, networkName(wid)) 168 + }) 169 + 170 + addl := wf.Data.(addlFields) 171 + 172 + reader, err := e.docker.ImagePull(ctx, addl.image, image.PullOptions{}) 173 + if err != nil { 174 + e.l.Error("pipeline image pull failed!", "image", addl.image, "workflowId", wid, "error", err.Error()) 175 + 176 + return fmt.Errorf("pulling image: %w", err) 177 + } 178 + defer reader.Close() 179 + io.Copy(os.Stdout, reader) 180 + 181 + resp, err := e.docker.ContainerCreate(ctx, &container.Config{ 182 + Image: addl.image, 183 + Cmd: []string{"sleep", "infinity"}, // Keep container running 184 + OpenStdin: true, 185 + Tty: false, 186 + Hostname: "spindle", 187 + WorkingDir: workspaceDir, 188 + Labels: map[string]string{ 189 + "sh.tangled.pipeline/workflow_id": wid.String(), 190 + }, 191 + }, &container.HostConfig{ 192 + Mounts: []mount.Mount{ 193 + { 194 + Type: mount.TypeTmpfs, 195 + Target: "/tmp", 196 + ReadOnly: false, 197 + TmpfsOptions: &mount.TmpfsOptions{ 198 + Mode: 0o1777, // world-writeable sticky bit 199 + Options: [][]string{ 200 + {"exec"}, 201 + }, 202 + }, 203 + }, 204 + }, 205 + ReadonlyRootfs: false, 206 + CapDrop: []string{"ALL"}, 207 + CapAdd: []string{"CAP_DAC_OVERRIDE", "CAP_CHOWN", "CAP_FOWNER", "CAP_SETUID", "CAP_SETGID"}, 208 + SecurityOpt: []string{"no-new-privileges"}, 209 + ExtraHosts: []string{"host.docker.internal:host-gateway"}, 210 + }, nil, nil, "") 211 + if err != nil { 212 + return fmt.Errorf("creating container: %w", err) 213 + } 214 + e.registerCleanup(wid, func(ctx context.Context) error { 215 + err = e.docker.ContainerStop(ctx, resp.ID, container.StopOptions{}) 216 + if err != nil { 217 + return err 218 + } 219 + 220 + return e.docker.ContainerRemove(ctx, resp.ID, container.RemoveOptions{ 221 + RemoveVolumes: true, 222 + RemoveLinks: false, 223 + Force: false, 224 + }) 225 + }) 226 + 227 + err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{}) 228 + if err != nil { 229 + return fmt.Errorf("starting container: %w", err) 230 + } 231 + 232 + // Create necessary directories 233 + mkExecResp, err := e.docker.ContainerExecCreate(ctx, resp.ID, container.ExecOptions{ 234 + Cmd: []string{"mkdir", "-p", workspaceDir, homeDir}, 235 + AttachStdout: true, 236 + AttachStderr: true, 237 + }) 238 + if err != nil { 239 + return err 240 + } 241 + 242 + // This actually *starts* the command. Thanks, Docker! 243 + execResp, err := e.docker.ContainerExecAttach(ctx, mkExecResp.ID, container.ExecAttachOptions{}) 244 + if err != nil { 245 + return err 246 + } 247 + defer execResp.Close() 248 + 249 + // This is apparently best way to wait for the command to complete. 250 + _, err = io.ReadAll(execResp.Reader) 251 + if err != nil { 252 + return err 253 + } 254 + 255 + execInspectResp, err := e.docker.ContainerExecInspect(ctx, mkExecResp.ID) 256 + if err != nil { 257 + return err 258 + } 259 + 260 + if execInspectResp.ExitCode != 0 { 261 + return fmt.Errorf("mkdir exited with exit code %d", execInspectResp.ExitCode) 262 + } else if execInspectResp.Running { 263 + return errors.New("mkdir is somehow still running??") 264 + } 265 + 266 + addl.container = resp.ID 267 + wf.Data = addl 268 + 269 + return nil 270 + } 271 + 272 + func (e *Engine) RunStep(ctx context.Context, wid models.WorkflowId, w *models.Workflow, idx int, secrets []secrets.UnlockedSecret, wfLogger *models.WorkflowLogger) error { 273 + addl := w.Data.(addlFields) 274 + workflowEnvs := common.ConstructEnvs(w.Environment) 275 + for _, s := range secrets { 276 + workflowEnvs.AddEnv(s.Key, s.Value) 277 + } 278 + 279 + step := w.Steps[idx] 280 + 281 + select { 282 + case <-ctx.Done(): 283 + return ctx.Err() 284 + default: 285 + } 286 + 287 + envs := append(common.EnvVars(nil), workflowEnvs...) 288 + if dockerStep, ok := step.(Step); ok { 289 + for k, v := range dockerStep.environment { 290 + envs.AddEnv(k, v) 291 + } 292 + } 293 + envs.AddEnv("HOME", homeDir) 294 + 295 + mkExecResp, err := e.docker.ContainerExecCreate(ctx, addl.container, container.ExecOptions{ 296 + Cmd: []string{"sh", "-c", step.Command()}, 297 + AttachStdout: true, 298 + AttachStderr: true, 299 + Env: envs, 300 + }) 301 + if err != nil { 302 + return fmt.Errorf("creating exec: %w", err) 303 + } 304 + 305 + // start tailing logs in background 306 + tailDone := make(chan error, 1) 307 + go func() { 308 + tailDone <- e.tailStep(ctx, wfLogger, mkExecResp.ID, wid, idx, step) 309 + }() 310 + 311 + select { 312 + case <-tailDone: 313 + 314 + case <-ctx.Done(): 315 + e.l.Warn("step timed out", "step", step.Name()) 316 + <-tailDone 317 + return engine.ErrTimedOut 318 + } 319 + 320 + select { 321 + case <-ctx.Done(): 322 + return ctx.Err() 323 + default: 324 + } 325 + 326 + execInspectResp, err := e.docker.ContainerExecInspect(ctx, mkExecResp.ID) 327 + if err != nil { 328 + return err 329 + } 330 + 331 + if execInspectResp.ExitCode != 0 { 332 + inspectResp, err := e.docker.ContainerInspect(ctx, addl.container) 333 + if err != nil { 334 + return err 335 + } 336 + 337 + e.l.Error("workflow failed!", "workflow_id", wid.String(), "exit_code", execInspectResp.ExitCode, "oom_killed", inspectResp.State.OOMKilled) 338 + 339 + if inspectResp.State.OOMKilled { 340 + return engineerrors.ErrOOMKilled 341 + } 342 + return engine.ErrWorkflowFailed 343 + } 344 + 345 + return nil 346 + } 347 + 348 + func (e *Engine) tailStep(ctx context.Context, wfLogger *models.WorkflowLogger, execID string, wid models.WorkflowId, stepIdx int, step models.Step) error { 349 + if wfLogger == nil { 350 + return nil 351 + } 352 + 353 + // This actually *starts* the command. Thanks, Docker! 354 + logs, err := e.docker.ContainerExecAttach(ctx, execID, container.ExecAttachOptions{}) 355 + if err != nil { 356 + return err 357 + } 358 + defer logs.Close() 359 + 360 + _, err = stdcopy.StdCopy( 361 + wfLogger.DataWriter(stepIdx, "stdout"), 362 + wfLogger.DataWriter(stepIdx, "stderr"), 363 + logs.Reader, 364 + ) 365 + if err != nil && err != io.EOF && !errors.Is(err, context.DeadlineExceeded) { 366 + return fmt.Errorf("failed to copy logs: %w", err) 367 + } 368 + 369 + return nil 370 + } 371 + 372 + func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error { 373 + e.cleanupMu.Lock() 374 + key := wid.String() 375 + 376 + fns := e.cleanup[key] 377 + delete(e.cleanup, key) 378 + e.cleanupMu.Unlock() 379 + 380 + for _, fn := range fns { 381 + if err := fn(ctx); err != nil { 382 + e.l.Error("failed to cleanup workflow resource", "workflowId", wid, "error", err) 383 + } 384 + } 385 + return nil 386 + } 387 + 388 + func (e *Engine) registerCleanup(wid models.WorkflowId, fn cleanupFunc) { 389 + e.cleanupMu.Lock() 390 + defer e.cleanupMu.Unlock() 391 + 392 + key := wid.String() 393 + e.cleanup[key] = append(e.cleanup[key], fn) 394 + } 395 + 396 + func networkName(wid models.WorkflowId) string { 397 + return fmt.Sprintf("workflow-network-%s", wid) 398 + }
+5 -3
spindle/engines/nixery/engine.go
··· 23 "tangled.org/core/log" 24 "tangled.org/core/spindle/config" 25 "tangled.org/core/spindle/engine" 26 "tangled.org/core/spindle/models" 27 "tangled.org/core/spindle/secrets" 28 ) ··· 287 288 func (e *Engine) RunStep(ctx context.Context, wid models.WorkflowId, w *models.Workflow, idx int, secrets []secrets.UnlockedSecret, wfLogger *models.WorkflowLogger) error { 289 addl := w.Data.(addlFields) 290 - workflowEnvs := ConstructEnvs(w.Environment) 291 // TODO(winter): should SetupWorkflow also have secret access? 292 // IMO yes, but probably worth thinking on. 293 for _, s := range secrets { ··· 302 default: 303 } 304 305 - envs := append(EnvVars(nil), workflowEnvs...) 306 if nixStep, ok := step.(Step); ok { 307 for k, v := range nixStep.environment { 308 envs.AddEnv(k, v) ··· 361 e.l.Error("workflow failed!", "workflow_id", wid.String(), "exit_code", execInspectResp.ExitCode, "oom_killed", inspectResp.State.OOMKilled) 362 363 if inspectResp.State.OOMKilled { 364 - return ErrOOMKilled 365 } 366 return engine.ErrWorkflowFailed 367 }
··· 23 "tangled.org/core/log" 24 "tangled.org/core/spindle/config" 25 "tangled.org/core/spindle/engine" 26 + "tangled.org/core/spindle/engines/common" 27 + engineerrors "tangled.org/core/spindle/engines/errors" 28 "tangled.org/core/spindle/models" 29 "tangled.org/core/spindle/secrets" 30 ) ··· 289 290 func (e *Engine) RunStep(ctx context.Context, wid models.WorkflowId, w *models.Workflow, idx int, secrets []secrets.UnlockedSecret, wfLogger *models.WorkflowLogger) error { 291 addl := w.Data.(addlFields) 292 + workflowEnvs := common.ConstructEnvs(w.Environment) 293 // TODO(winter): should SetupWorkflow also have secret access? 294 // IMO yes, but probably worth thinking on. 295 for _, s := range secrets { ··· 304 default: 305 } 306 307 + envs := append(common.EnvVars(nil), workflowEnvs...) 308 if nixStep, ok := step.(Step); ok { 309 for k, v := range nixStep.environment { 310 envs.AddEnv(k, v) ··· 363 e.l.Error("workflow failed!", "workflow_id", wid.String(), "exit_code", execInspectResp.ExitCode, "oom_killed", inspectResp.State.OOMKilled) 364 365 if inspectResp.State.OOMKilled { 366 + return engineerrors.ErrOOMKilled 367 } 368 return engine.ErrWorkflowFailed 369 }
+4 -4
spindle/engines/nixery/envs.go spindle/engines/common/envs.go
··· 1 - package nixery 2 3 import ( 4 "fmt" ··· 6 7 type EnvVars []string 8 9 - // ConstructEnvs converts a tangled.Pipeline_Step_Environment_Elem.{Key,Value} 10 - // representation into a docker-friendly []string{"KEY=value", ...} slice. 11 func ConstructEnvs(envs map[string]string) EnvVars { 12 var dockerEnvs EnvVars 13 for k, v := range envs { ··· 25 // AddEnv adds a key=value string to the EnvVar. 26 func (ev *EnvVars) AddEnv(key, value string) { 27 *ev = append(*ev, fmt.Sprintf("%s=%s", key, value)) 28 - }
··· 1 + package common 2 3 import ( 4 "fmt" ··· 6 7 type EnvVars []string 8 9 + // ConstructEnvs converts a map[string]string representation into a 10 + // docker-friendly []string{"KEY=value", ...} slice. 11 func ConstructEnvs(envs map[string]string) EnvVars { 12 var dockerEnvs EnvVars 13 for k, v := range envs { ··· 25 // AddEnv adds a key=value string to the EnvVar. 26 func (ev *EnvVars) AddEnv(key, value string) { 27 *ev = append(*ev, fmt.Sprintf("%s=%s", key, value)) 28 + }
+2 -2
spindle/engines/nixery/envs_test.go spindle/engines/common/envs_test.go
··· 1 - package nixery 2 3 import ( 4 "testing" ··· 45 ev.AddEnv("BAZ", "qux") 46 want := EnvVars{"FOO=bar", "BAZ=qux"} 47 assert.ElementsMatch(t, want, ev) 48 - }
··· 1 + package common 2 3 import ( 4 "testing" ··· 45 ev.AddEnv("BAZ", "qux") 46 want := EnvVars{"FOO=bar", "BAZ=qux"} 47 assert.ElementsMatch(t, want, ev) 48 + }
+2 -2
spindle/engines/nixery/errors.go spindle/engines/errors/errors.go
··· 1 - package nixery 2 3 import "errors" 4 5 var ( 6 ErrOOMKilled = errors.New("oom killed") 7 - )
··· 1 + package errors 2 3 import "errors" 4 5 var ( 6 ErrOOMKilled = errors.New("oom killed") 7 + )
+7
spindle/server.go
··· 22 "tangled.org/core/spindle/config" 23 "tangled.org/core/spindle/db" 24 "tangled.org/core/spindle/engine" 25 "tangled.org/core/spindle/engines/nixery" 26 "tangled.org/core/spindle/models" 27 "tangled.org/core/spindle/queue" ··· 250 return err 251 } 252 253 s, err := New(ctx, cfg, map[string]models.Engine{ 254 "nixery": nixeryEng, 255 }) 256 if err != nil { 257 return err
··· 22 "tangled.org/core/spindle/config" 23 "tangled.org/core/spindle/db" 24 "tangled.org/core/spindle/engine" 25 + "tangled.org/core/spindle/engines/docker" 26 "tangled.org/core/spindle/engines/nixery" 27 "tangled.org/core/spindle/models" 28 "tangled.org/core/spindle/queue" ··· 251 return err 252 } 253 254 + dockerEng, err := docker.New(ctx, cfg) 255 + if err != nil { 256 + return err 257 + } 258 + 259 s, err := New(ctx, cfg, map[string]models.Engine{ 260 "nixery": nixeryEng, 261 + "docker": dockerEng, 262 }) 263 if err != nil { 264 return err