Monorepo for Tangled
at eb3e271982e4e6c4133a0fa35e0a208b78b22875 476 lines 13 kB view raw
1package nixery 2 3import ( 4 "bufio" 5 "context" 6 "errors" 7 "fmt" 8 "io" 9 "log/slog" 10 "path" 11 "runtime" 12 "sync" 13 "time" 14 15 "github.com/docker/docker/api/types/container" 16 "github.com/docker/docker/api/types/image" 17 "github.com/docker/docker/api/types/mount" 18 "github.com/docker/docker/api/types/network" 19 "github.com/docker/docker/client" 20 "github.com/docker/docker/pkg/stdcopy" 21 "gopkg.in/yaml.v3" 22 "tangled.org/core/api/tangled" 23 "tangled.org/core/log" 24 "tangled.org/core/spindle/config" 25 "tangled.org/core/spindle/engine" 26 "tangled.org/core/spindle/models" 27 "tangled.org/core/spindle/secrets" 28) 29 30const ( 31 workspaceDir = "/tangled/workspace" 32 homeDir = "/tangled/home" 33) 34 35type cleanupFunc func(context.Context) error 36 37type Engine struct { 38 docker client.APIClient 39 l *slog.Logger 40 cfg *config.Config 41 42 cleanupMu sync.Mutex 43 cleanup map[string][]cleanupFunc 44} 45 46type Step struct { 47 name string 48 kind models.StepKind 49 command string 50 environment map[string]string 51} 52 53func (s Step) Name() string { 54 return s.name 55} 56 57func (s Step) Command() string { 58 return s.command 59} 60 61func (s Step) Kind() models.StepKind { 62 return s.kind 63} 64 65// setupSteps get added to start of Steps 66type setupSteps []models.Step 67 68// addStep adds a step to the beginning of the workflow's steps. 69func (ss *setupSteps) addStep(step models.Step) { 70 *ss = append(*ss, step) 71} 72 73type addlFields struct { 74 image string 75 container string 76} 77 78func (e *Engine) InitWorkflow(twf tangled.Pipeline_Workflow, tpl tangled.Pipeline) (*models.Workflow, error) { 79 swf := &models.Workflow{} 80 addl := addlFields{} 81 82 dwf := &struct { 83 Steps []struct { 84 Command string `yaml:"command"` 85 Name string `yaml:"name"` 86 Environment map[string]string `yaml:"environment"` 87 } `yaml:"steps"` 88 Dependencies map[string][]string `yaml:"dependencies"` 89 Environment map[string]string `yaml:"environment"` 90 }{} 91 err := yaml.Unmarshal([]byte(twf.Raw), &dwf) 92 if err != nil { 93 return nil, err 94 } 95 96 for _, dstep := range dwf.Steps { 97 sstep := Step{} 98 sstep.environment = dstep.Environment 99 sstep.command = dstep.Command 100 sstep.name = dstep.Name 101 sstep.kind = models.StepKindUser 102 swf.Steps = append(swf.Steps, sstep) 103 } 104 swf.Name = twf.Name 105 swf.Environment = dwf.Environment 106 addl.image = workflowImage(dwf.Dependencies, e.cfg.NixeryPipelines.Nixery) 107 108 setup := &setupSteps{} 109 110 setup.addStep(nixConfStep()) 111 setup.addStep(models.BuildCloneStep(twf, *tpl.TriggerMetadata, e.cfg.Server.Dev)) 112 // this step could be empty 113 if s := dependencyStep(dwf.Dependencies); s != nil { 114 setup.addStep(*s) 115 } 116 117 // append setup steps in order to the start of workflow steps 118 swf.Steps = append(*setup, swf.Steps...) 119 swf.Data = addl 120 121 return swf, nil 122} 123 124func (e *Engine) WorkflowTimeout() time.Duration { 125 workflowTimeoutStr := e.cfg.NixeryPipelines.WorkflowTimeout 126 workflowTimeout, err := time.ParseDuration(workflowTimeoutStr) 127 if err != nil { 128 e.l.Error("failed to parse workflow timeout", "error", err, "timeout", workflowTimeoutStr) 129 workflowTimeout = 5 * time.Minute 130 } 131 132 return workflowTimeout 133} 134 135func workflowImage(deps map[string][]string, nixery string) string { 136 var dependencies string 137 for reg, ds := range deps { 138 if reg == "nixpkgs" { 139 dependencies = path.Join(ds...) 140 } 141 } 142 143 // load defaults from somewhere else 144 dependencies = path.Join(dependencies, "bash", "git", "coreutils", "nix") 145 146 if runtime.GOARCH == "arm64" { 147 dependencies = path.Join("arm64", dependencies) 148 } 149 150 return path.Join(nixery, dependencies) 151} 152 153func New(ctx context.Context, cfg *config.Config) (*Engine, error) { 154 dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) 155 if err != nil { 156 return nil, err 157 } 158 159 l := log.FromContext(ctx).With("component", "spindle") 160 161 e := &Engine{ 162 docker: dcli, 163 l: l, 164 cfg: cfg, 165 } 166 167 e.cleanup = make(map[string][]cleanupFunc) 168 169 return e, nil 170} 171 172func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId, wf *models.Workflow, wfLogger models.WorkflowLogger) error { 173 /// -------------------------INITIAL SETUP------------------------------------------ 174 l := e.l.With("workflow", wid) 175 l.Info("setting up workflow") 176 177 setupStep := Step{ 178 name: "nixery image pull", 179 kind: models.StepKindSystem, 180 } 181 setupStepIdx := -1 182 183 wfLogger.ControlWriter(setupStepIdx, setupStep, models.StepStatusStart).Write([]byte{0}) 184 defer wfLogger.ControlWriter(setupStepIdx, setupStep, models.StepStatusEnd).Write([]byte{0}) 185 186 /// -------------------------NETWORK CREATION--------------------------------------- 187 _, err := e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{ 188 Driver: "bridge", 189 }) 190 if err != nil { 191 return err 192 } 193 194 e.registerCleanup(wid, func(ctx context.Context) error { 195 if err := e.docker.NetworkRemove(ctx, networkName(wid)); err != nil { 196 return fmt.Errorf("removing network: %w", err) 197 } 198 return nil 199 }) 200 201 /// -------------------------IMAGE PULL--------------------------------------------- 202 addl := wf.Data.(addlFields) 203 l.Info("pulling image", "image", addl.image) 204 fmt.Fprintf( 205 wfLogger.DataWriter(setupStepIdx, "stdout"), 206 "pulling image: %s", 207 addl.image, 208 ) 209 210 reader, err := e.docker.ImagePull(ctx, addl.image, image.PullOptions{}) 211 if err != nil { 212 l.Error("pipeline image pull failed!", "error", err.Error()) 213 fmt.Fprintf(wfLogger.DataWriter(setupStepIdx, "stderr"), "image pull failed: %s", err) 214 return fmt.Errorf("pulling image: %w", err) 215 } 216 defer reader.Close() 217 218 scanner := bufio.NewScanner(reader) 219 for scanner.Scan() { 220 line := scanner.Text() 221 wfLogger.DataWriter(setupStepIdx, "stdout").Write([]byte(line)) 222 l.Info("image pull progress", "stdout", line) 223 } 224 225 /// -------------------------CONTAINER CREATION------------------------------------- 226 l.Info("creating container") 227 wfLogger.DataWriter(setupStepIdx, "stdout").Write([]byte("creating container...")) 228 229 resp, err := e.docker.ContainerCreate(ctx, &container.Config{ 230 Image: addl.image, 231 Cmd: []string{"cat"}, 232 OpenStdin: true, // so cat stays alive :3 233 Tty: false, 234 Hostname: "spindle", 235 WorkingDir: workspaceDir, 236 Labels: map[string]string{ 237 "sh.tangled.pipeline/workflow_id": wid.String(), 238 }, 239 // TODO(winter): investigate whether environment variables passed here 240 // get propagated to ContainerExec processes 241 }, &container.HostConfig{ 242 Mounts: []mount.Mount{ 243 { 244 Type: mount.TypeTmpfs, 245 Target: "/tmp", 246 ReadOnly: false, 247 TmpfsOptions: &mount.TmpfsOptions{ 248 Mode: 0o1777, // world-writeable sticky bit 249 Options: [][]string{ 250 {"exec"}, 251 }, 252 }, 253 }, 254 }, 255 ReadonlyRootfs: false, 256 CapDrop: []string{"ALL"}, 257 CapAdd: []string{"CAP_DAC_OVERRIDE", "CAP_CHOWN", "CAP_FOWNER", "CAP_SETUID", "CAP_SETGID"}, 258 SecurityOpt: []string{"no-new-privileges"}, 259 ExtraHosts: []string{"host.docker.internal:host-gateway"}, 260 }, nil, nil, "") 261 if err != nil { 262 fmt.Fprintf( 263 wfLogger.DataWriter(setupStepIdx, "stderr"), 264 "container creation failed: %s", 265 err, 266 ) 267 return fmt.Errorf("creating container: %w", err) 268 } 269 270 e.registerCleanup(wid, func(ctx context.Context) error { 271 if err := e.docker.ContainerStop(ctx, resp.ID, container.StopOptions{}); err != nil { 272 return fmt.Errorf("stopping container: %w", err) 273 } 274 275 err := e.docker.ContainerRemove(ctx, resp.ID, container.RemoveOptions{ 276 RemoveVolumes: true, 277 RemoveLinks: false, 278 Force: false, 279 }) 280 if err != nil { 281 return fmt.Errorf("removing container: %w", err) 282 } 283 284 return nil 285 }) 286 287 /// -------------------------CONTAINER START---------------------------------------- 288 wfLogger.DataWriter(setupStepIdx, "stdout").Write([]byte("starting container...")) 289 if err := e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{}); err != nil { 290 return fmt.Errorf("starting container: %w", err) 291 } 292 293 mkExecResp, err := e.docker.ContainerExecCreate(ctx, resp.ID, container.ExecOptions{ 294 Cmd: []string{"mkdir", "-p", workspaceDir, homeDir}, 295 AttachStdout: true, // NOTE(winter): pretty sure this will make it so that when stdout read is done below, mkdir is done. maybe?? 296 AttachStderr: true, // for good measure, backed up by docker/cli ("If -d is not set, attach to everything by default") 297 }) 298 if err != nil { 299 return err 300 } 301 302 // This actually *starts* the command. Thanks, Docker! 303 execResp, err := e.docker.ContainerExecAttach(ctx, mkExecResp.ID, container.ExecAttachOptions{}) 304 if err != nil { 305 return err 306 } 307 defer execResp.Close() 308 309 // This is apparently best way to wait for the command to complete. 310 _, err = io.ReadAll(execResp.Reader) 311 if err != nil { 312 return err 313 } 314 315 /// -----------------------------------FINISH--------------------------------------- 316 execInspectResp, err := e.docker.ContainerExecInspect(ctx, mkExecResp.ID) 317 if err != nil { 318 return err 319 } 320 321 if execInspectResp.ExitCode != 0 { 322 return fmt.Errorf("mkdir exited with exit code %d", execInspectResp.ExitCode) 323 } else if execInspectResp.Running { 324 return errors.New("mkdir is somehow still running??") 325 } 326 327 addl.container = resp.ID 328 wf.Data = addl 329 330 return nil 331} 332 333func (e *Engine) RunStep(ctx context.Context, wid models.WorkflowId, w *models.Workflow, idx int, secrets []secrets.UnlockedSecret, wfLogger models.WorkflowLogger) error { 334 addl := w.Data.(addlFields) 335 workflowEnvs := ConstructEnvs(w.Environment) 336 // TODO(winter): should SetupWorkflow also have secret access? 337 // IMO yes, but probably worth thinking on. 338 for _, s := range secrets { 339 workflowEnvs.AddEnv(s.Key, s.Value) 340 } 341 342 step := w.Steps[idx] 343 344 select { 345 case <-ctx.Done(): 346 return ctx.Err() 347 default: 348 } 349 350 envs := append(EnvVars(nil), workflowEnvs...) 351 if nixStep, ok := step.(Step); ok { 352 for k, v := range nixStep.environment { 353 envs.AddEnv(k, v) 354 } 355 } 356 357 envs.AddEnv("HOME", homeDir) 358 existingPath := "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin" 359 envs.AddEnv("PATH", fmt.Sprintf("%s/.nix-profile/bin:/nix/var/nix/profiles/default/bin:%s", homeDir, existingPath)) 360 361 mkExecResp, err := e.docker.ContainerExecCreate(ctx, addl.container, container.ExecOptions{ 362 Cmd: []string{"bash", "-c", step.Command()}, 363 AttachStdout: true, 364 AttachStderr: true, 365 Env: envs, 366 }) 367 if err != nil { 368 return fmt.Errorf("creating exec: %w", err) 369 } 370 371 // start tailing logs in background 372 tailDone := make(chan error, 1) 373 go func() { 374 tailDone <- e.tailStep(ctx, wfLogger, mkExecResp.ID, idx) 375 }() 376 377 select { 378 case <-tailDone: 379 380 case <-ctx.Done(): 381 // cleanup will be handled by DestroyWorkflow, since 382 // Docker doesn't provide an API to kill an exec run 383 // (sure, we could grab the PID and kill it ourselves, 384 // but that's wasted effort) 385 e.l.Warn("step timed out", "step", step.Name()) 386 387 <-tailDone 388 389 return engine.ErrTimedOut 390 } 391 392 select { 393 case <-ctx.Done(): 394 return ctx.Err() 395 default: 396 } 397 398 execInspectResp, err := e.docker.ContainerExecInspect(ctx, mkExecResp.ID) 399 if err != nil { 400 return err 401 } 402 403 if execInspectResp.ExitCode != 0 { 404 inspectResp, err := e.docker.ContainerInspect(ctx, addl.container) 405 if err != nil { 406 return err 407 } 408 409 e.l.Error("workflow failed!", "workflow_id", wid.String(), "exit_code", execInspectResp.ExitCode, "oom_killed", inspectResp.State.OOMKilled) 410 411 if inspectResp.State.OOMKilled { 412 return ErrOOMKilled 413 } 414 return engine.ErrWorkflowFailed 415 } 416 417 return nil 418} 419 420func (e *Engine) tailStep(ctx context.Context, wfLogger models.WorkflowLogger, execID string, stepIdx int) error { 421 if wfLogger == nil { 422 return nil 423 } 424 425 // This actually *starts* the command. Thanks, Docker! 426 logs, err := e.docker.ContainerExecAttach(ctx, execID, container.ExecAttachOptions{}) 427 if err != nil { 428 return err 429 } 430 defer logs.Close() 431 432 _, err = stdcopy.StdCopy( 433 wfLogger.DataWriter(stepIdx, "stdout"), 434 wfLogger.DataWriter(stepIdx, "stderr"), 435 logs.Reader, 436 ) 437 if err != nil && err != io.EOF && !errors.Is(err, context.DeadlineExceeded) { 438 return fmt.Errorf("failed to copy logs: %w", err) 439 } 440 441 return nil 442} 443 444func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error { 445 fns := e.drainCleanups(wid) 446 447 for _, fn := range fns { 448 if err := fn(ctx); err != nil { 449 e.l.Error("failed to cleanup workflow resource", "workflowId", wid, "error", err) 450 } 451 } 452 return nil 453} 454 455func (e *Engine) registerCleanup(wid models.WorkflowId, fn cleanupFunc) { 456 e.cleanupMu.Lock() 457 defer e.cleanupMu.Unlock() 458 459 key := wid.String() 460 e.cleanup[key] = append(e.cleanup[key], fn) 461} 462 463func (e *Engine) drainCleanups(wid models.WorkflowId) []cleanupFunc { 464 e.cleanupMu.Lock() 465 key := wid.String() 466 467 fns := e.cleanup[key] 468 delete(e.cleanup, key) 469 e.cleanupMu.Unlock() 470 471 return fns 472} 473 474func networkName(wid models.WorkflowId) string { 475 return fmt.Sprintf("workflow-network-%s", wid) 476}