this repo has no description
1package nixery 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "io" 8 "log/slog" 9 "os" 10 "path" 11 "runtime" 12 "sync" 13 "time" 14 15 "github.com/docker/docker/api/types/container" 16 "github.com/docker/docker/api/types/image" 17 "github.com/docker/docker/api/types/mount" 18 "github.com/docker/docker/api/types/network" 19 "github.com/docker/docker/client" 20 "github.com/docker/docker/pkg/stdcopy" 21 "gopkg.in/yaml.v3" 22 "tangled.sh/tangled.sh/core/api/tangled" 23 "tangled.sh/tangled.sh/core/log" 24 "tangled.sh/tangled.sh/core/spindle/config" 25 "tangled.sh/tangled.sh/core/spindle/engine" 26 "tangled.sh/tangled.sh/core/spindle/models" 27 "tangled.sh/tangled.sh/core/spindle/secrets" 28) 29 30const ( 31 workspaceDir = "/tangled/workspace" 32) 33 34type cleanupFunc func(context.Context) error 35 36type Engine struct { 37 docker client.APIClient 38 l *slog.Logger 39 cfg *config.Config 40 41 cleanupMu sync.Mutex 42 cleanup map[string][]cleanupFunc 43} 44 45type Step struct { 46 name string 47 kind models.StepKind 48 command string 49 environment map[string]string 50} 51 52func (s Step) Name() string { 53 return s.name 54} 55 56func (s Step) Command() string { 57 return s.command 58} 59 60func (s Step) Kind() models.StepKind { 61 return s.kind 62} 63 64// setupSteps get added to start of Steps 65type setupSteps []models.Step 66 67// addStep adds a step to the beginning of the workflow's steps. 68func (ss *setupSteps) addStep(step models.Step) { 69 *ss = append(*ss, step) 70} 71 72type addlFields struct { 73 image string 74 container string 75 env map[string]string 76} 77 78func (e *Engine) InitWorkflow(twf tangled.Pipeline_Workflow, tpl tangled.Pipeline) (*models.Workflow, error) { 79 swf := &models.Workflow{} 80 addl := addlFields{} 81 82 dwf := &struct { 83 Steps []struct { 84 Command string `yaml:"command"` 85 Name string `yaml:"name"` 86 Environment map[string]string `yaml:"environment"` 87 } `yaml:"steps"` 88 Dependencies map[string][]string `yaml:"dependencies"` 89 Environment map[string]string `yaml:"environment"` 90 }{} 91 err := yaml.Unmarshal([]byte(twf.Raw), &dwf) 92 if err != nil { 93 return nil, err 94 } 95 96 for _, dstep := range dwf.Steps { 97 sstep := Step{} 98 sstep.environment = dstep.Environment 99 sstep.command = dstep.Command 100 sstep.name = dstep.Name 101 sstep.kind = models.StepKindUser 102 swf.Steps = append(swf.Steps, sstep) 103 } 104 swf.Name = twf.Name 105 addl.env = dwf.Environment 106 addl.image = workflowImage(dwf.Dependencies, e.cfg.NixeryPipelines.Nixery) 107 108 setup := &setupSteps{} 109 110 setup.addStep(nixConfStep()) 111 setup.addStep(cloneStep(twf, *tpl.TriggerMetadata, e.cfg.Server.Dev)) 112 // this step could be empty 113 if s := dependencyStep(dwf.Dependencies); s != nil { 114 setup.addStep(*s) 115 } 116 117 // append setup steps in order to the start of workflow steps 118 swf.Steps = append(*setup, swf.Steps...) 119 swf.Data = addl 120 121 return swf, nil 122} 123 124func (e *Engine) WorkflowTimeout() time.Duration { 125 workflowTimeoutStr := e.cfg.NixeryPipelines.WorkflowTimeout 126 workflowTimeout, err := time.ParseDuration(workflowTimeoutStr) 127 if err != nil { 128 e.l.Error("failed to parse workflow timeout", "error", err, "timeout", workflowTimeoutStr) 129 workflowTimeout = 5 * time.Minute 130 } 131 132 return workflowTimeout 133} 134 135func workflowImage(deps map[string][]string, nixery string) string { 136 var dependencies string 137 for reg, ds := range deps { 138 if reg == "nixpkgs" { 139 dependencies = path.Join(ds...) 140 } 141 } 142 143 // load defaults from somewhere else 144 dependencies = path.Join(dependencies, "bash", "git", "coreutils", "nix") 145 146 if runtime.GOARCH == "arm64" { 147 dependencies = path.Join("arm64", dependencies) 148 } 149 150 return path.Join(nixery, dependencies) 151} 152 153func New(ctx context.Context, cfg *config.Config) (*Engine, error) { 154 dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) 155 if err != nil { 156 return nil, err 157 } 158 159 l := log.FromContext(ctx).With("component", "spindle") 160 161 e := &Engine{ 162 docker: dcli, 163 l: l, 164 cfg: cfg, 165 } 166 167 e.cleanup = make(map[string][]cleanupFunc) 168 169 return e, nil 170} 171 172func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId, wf *models.Workflow) error { 173 e.l.Info("setting up workflow", "workflow", wid) 174 175 _, err := e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{ 176 Driver: "bridge", 177 }) 178 if err != nil { 179 return err 180 } 181 e.registerCleanup(wid, func(ctx context.Context) error { 182 return e.docker.NetworkRemove(ctx, networkName(wid)) 183 }) 184 185 addl := wf.Data.(addlFields) 186 187 reader, err := e.docker.ImagePull(ctx, addl.image, image.PullOptions{}) 188 if err != nil { 189 e.l.Error("pipeline image pull failed!", "image", addl.image, "workflowId", wid, "error", err.Error()) 190 191 return fmt.Errorf("pulling image: %w", err) 192 } 193 defer reader.Close() 194 io.Copy(os.Stdout, reader) 195 196 resp, err := e.docker.ContainerCreate(ctx, &container.Config{ 197 Image: addl.image, 198 Cmd: []string{"cat"}, 199 OpenStdin: true, // so cat stays alive :3 200 Tty: false, 201 Hostname: "spindle", 202 // TODO(winter): investigate whether environment variables passed here 203 // get propagated to ContainerExec processes 204 }, &container.HostConfig{ 205 Mounts: []mount.Mount{ 206 { 207 Type: mount.TypeTmpfs, 208 Target: "/tmp", 209 ReadOnly: false, 210 TmpfsOptions: &mount.TmpfsOptions{ 211 Mode: 0o1777, // world-writeable sticky bit 212 Options: [][]string{ 213 {"exec"}, 214 }, 215 }, 216 }, 217 }, 218 ReadonlyRootfs: false, 219 CapDrop: []string{"ALL"}, 220 CapAdd: []string{"CAP_DAC_OVERRIDE"}, 221 SecurityOpt: []string{"no-new-privileges"}, 222 ExtraHosts: []string{"host.docker.internal:host-gateway"}, 223 }, nil, nil, "") 224 if err != nil { 225 return fmt.Errorf("creating container: %w", err) 226 } 227 e.registerCleanup(wid, func(ctx context.Context) error { 228 err = e.docker.ContainerStop(ctx, resp.ID, container.StopOptions{}) 229 if err != nil { 230 return err 231 } 232 233 return e.docker.ContainerRemove(ctx, resp.ID, container.RemoveOptions{ 234 RemoveVolumes: true, 235 RemoveLinks: false, 236 Force: false, 237 }) 238 }) 239 240 err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{}) 241 if err != nil { 242 return fmt.Errorf("starting container: %w", err) 243 } 244 245 mkExecResp, err := e.docker.ContainerExecCreate(ctx, resp.ID, container.ExecOptions{ 246 Cmd: []string{"mkdir", "-p", workspaceDir}, 247 AttachStdout: true, // NOTE(winter): pretty sure this will make it so that when stdout read is done below, mkdir is done. maybe?? 248 AttachStderr: true, // for good measure, backed up by docker/cli ("If -d is not set, attach to everything by default") 249 }) 250 if err != nil { 251 return err 252 } 253 254 // This actually *starts* the command. Thanks, Docker! 255 execResp, err := e.docker.ContainerExecAttach(ctx, mkExecResp.ID, container.ExecAttachOptions{}) 256 if err != nil { 257 return err 258 } 259 defer execResp.Close() 260 261 // This is apparently best way to wait for the command to complete. 262 _, err = io.ReadAll(execResp.Reader) 263 if err != nil { 264 return err 265 } 266 267 execInspectResp, err := e.docker.ContainerExecInspect(ctx, mkExecResp.ID) 268 if err != nil { 269 return err 270 } 271 272 if execInspectResp.ExitCode != 0 { 273 return fmt.Errorf("mkdir exited with exit code %d", execInspectResp.ExitCode) 274 } else if execInspectResp.Running { 275 return errors.New("mkdir is somehow still running??") 276 } 277 278 addl.container = resp.ID 279 wf.Data = addl 280 281 return nil 282} 283 284func (e *Engine) RunStep(ctx context.Context, wid models.WorkflowId, w *models.Workflow, idx int, secrets []secrets.UnlockedSecret, wfLogger *models.WorkflowLogger) error { 285 addl := w.Data.(addlFields) 286 workflowEnvs := ConstructEnvs(addl.env) 287 // TODO(winter): should SetupWorkflow also have secret access? 288 // IMO yes, but probably worth thinking on. 289 for _, s := range secrets { 290 workflowEnvs.AddEnv(s.Key, s.Value) 291 } 292 293 step := w.Steps[idx].(Step) 294 295 select { 296 case <-ctx.Done(): 297 return ctx.Err() 298 default: 299 } 300 301 envs := append(EnvVars(nil), workflowEnvs...) 302 for k, v := range step.environment { 303 envs.AddEnv(k, v) 304 } 305 envs.AddEnv("HOME", workspaceDir) 306 e.l.Debug("envs for step", "step", step.Name, "envs", envs.Slice()) 307 308 mkExecResp, err := e.docker.ContainerExecCreate(ctx, addl.container, container.ExecOptions{ 309 Cmd: []string{"bash", "-c", step.command}, 310 AttachStdout: true, 311 AttachStderr: true, 312 }) 313 if err != nil { 314 return fmt.Errorf("creating exec: %w", err) 315 } 316 317 // start tailing logs in background 318 tailDone := make(chan error, 1) 319 go func() { 320 tailDone <- e.tailStep(ctx, wfLogger, mkExecResp.ID, wid, idx, step) 321 }() 322 323 select { 324 case <-tailDone: 325 326 case <-ctx.Done(): 327 // cleanup will be handled by DestroyWorkflow, since 328 // Docker doesn't provide an API to kill an exec run 329 // (sure, we could grab the PID and kill it ourselves, 330 // but that's wasted effort) 331 e.l.Warn("step timed out", "step", step.Name) 332 333 <-tailDone 334 335 return engine.ErrTimedOut 336 } 337 338 select { 339 case <-ctx.Done(): 340 return ctx.Err() 341 default: 342 } 343 344 execInspectResp, err := e.docker.ContainerExecInspect(ctx, mkExecResp.ID) 345 if err != nil { 346 return err 347 } 348 349 if execInspectResp.ExitCode != 0 { 350 inspectResp, err := e.docker.ContainerInspect(ctx, addl.container) 351 if err != nil { 352 return err 353 } 354 355 e.l.Error("workflow failed!", "workflow_id", wid.String(), "exit_code", execInspectResp.ExitCode, "oom_killed", inspectResp.State.OOMKilled) 356 357 if inspectResp.State.OOMKilled { 358 return ErrOOMKilled 359 } 360 return engine.ErrWorkflowFailed 361 } 362 363 return nil 364} 365 366func (e *Engine) tailStep(ctx context.Context, wfLogger *models.WorkflowLogger, execID string, wid models.WorkflowId, stepIdx int, step models.Step) error { 367 if wfLogger == nil { 368 return nil 369 } 370 371 // This actually *starts* the command. Thanks, Docker! 372 logs, err := e.docker.ContainerExecAttach(ctx, execID, container.ExecAttachOptions{}) 373 if err != nil { 374 return err 375 } 376 defer logs.Close() 377 378 _, err = stdcopy.StdCopy( 379 wfLogger.DataWriter("stdout"), 380 wfLogger.DataWriter("stderr"), 381 logs.Reader, 382 ) 383 if err != nil && err != io.EOF && !errors.Is(err, context.DeadlineExceeded) { 384 return fmt.Errorf("failed to copy logs: %w", err) 385 } 386 387 return nil 388} 389 390func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error { 391 e.cleanupMu.Lock() 392 key := wid.String() 393 394 fns := e.cleanup[key] 395 delete(e.cleanup, key) 396 e.cleanupMu.Unlock() 397 398 for _, fn := range fns { 399 if err := fn(ctx); err != nil { 400 e.l.Error("failed to cleanup workflow resource", "workflowId", wid, "error", err) 401 } 402 } 403 return nil 404} 405 406func (e *Engine) registerCleanup(wid models.WorkflowId, fn cleanupFunc) { 407 e.cleanupMu.Lock() 408 defer e.cleanupMu.Unlock() 409 410 key := wid.String() 411 e.cleanup[key] = append(e.cleanup[key], fn) 412} 413 414func networkName(wid models.WorkflowId) string { 415 return fmt.Sprintf("workflow-network-%s", wid) 416}