this repo has no description
1package nixery 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "io" 8 "log/slog" 9 "os" 10 "path" 11 "runtime" 12 "strings" 13 "sync" 14 "time" 15 16 "github.com/docker/docker/api/types/container" 17 "github.com/docker/docker/api/types/image" 18 "github.com/docker/docker/api/types/mount" 19 "github.com/docker/docker/api/types/network" 20 "github.com/docker/docker/api/types/volume" 21 "github.com/docker/docker/client" 22 "github.com/docker/docker/pkg/stdcopy" 23 "gopkg.in/yaml.v3" 24 "tangled.sh/tangled.sh/core/api/tangled" 25 "tangled.sh/tangled.sh/core/log" 26 "tangled.sh/tangled.sh/core/spindle/config" 27 "tangled.sh/tangled.sh/core/spindle/engine" 28 "tangled.sh/tangled.sh/core/spindle/models" 29 "tangled.sh/tangled.sh/core/spindle/secrets" 30) 31 32const ( 33 workspaceDir = "/tangled/workspace" 34) 35 36type cleanupFunc func(context.Context) error 37 38type Engine struct { 39 docker client.APIClient 40 l *slog.Logger 41 cfg *config.Config 42 43 cleanupMu sync.Mutex 44 cleanup map[string][]cleanupFunc 45} 46 47type Step struct { 48 name string 49 kind models.StepKind 50 command string 51 environment map[string]string 52} 53 54func (s Step) Name() string { 55 return s.name 56} 57 58func (s Step) Command() string { 59 return s.command 60} 61 62func (s Step) Kind() models.StepKind { 63 return s.kind 64} 65 66// setupSteps get added to start of Steps 67type setupSteps []models.Step 68 69// addStep adds a step to the beginning of the workflow's steps. 70func (ss *setupSteps) addStep(step models.Step) { 71 *ss = append(*ss, step) 72} 73 74type addlFields struct { 75 image string 76 env map[string]string 77} 78 79func (e *Engine) InitWorkflow(twf tangled.Pipeline_Workflow, tpl tangled.Pipeline) (*models.Workflow, error) { 80 swf := &models.Workflow{} 81 addl := addlFields{} 82 83 dwf := &struct { 84 Steps []struct { 85 Command string `yaml:"command"` 86 Name string `yaml:"name"` 87 Environment map[string]string `yaml:"environment"` 88 } `yaml:"steps"` 89 Dependencies map[string][]string `yaml:"dependencies"` 90 Environment map[string]string `yaml:"environment"` 91 }{} 92 err := yaml.Unmarshal([]byte(twf.Raw), &dwf) 93 if err != nil { 94 return nil, err 95 } 96 97 for _, dstep := range dwf.Steps { 98 sstep := Step{} 99 sstep.environment = dstep.Environment 100 sstep.command = dstep.Command 101 sstep.name = dstep.Name 102 sstep.kind = models.StepKindUser 103 swf.Steps = append(swf.Steps, sstep) 104 } 105 swf.Name = twf.Name 106 addl.env = dwf.Environment 107 addl.image = workflowImage(dwf.Dependencies, e.cfg.NixeryPipelines.Nixery) 108 109 setup := &setupSteps{} 110 111 setup.addStep(nixConfStep()) 112 setup.addStep(cloneStep(twf, *tpl.TriggerMetadata, e.cfg.Server.Dev)) 113 // this step could be empty 114 if s := dependencyStep(dwf.Dependencies); s != nil { 115 setup.addStep(*s) 116 } 117 118 // append setup steps in order to the start of workflow steps 119 swf.Steps = append(*setup, swf.Steps...) 120 swf.Data = addl 121 122 return swf, nil 123} 124 125func (e *Engine) WorkflowTimeout() time.Duration { 126 workflowTimeoutStr := e.cfg.NixeryPipelines.WorkflowTimeout 127 workflowTimeout, err := time.ParseDuration(workflowTimeoutStr) 128 if err != nil { 129 e.l.Error("failed to parse workflow timeout", "error", err, "timeout", workflowTimeoutStr) 130 workflowTimeout = 5 * time.Minute 131 } 132 133 return workflowTimeout 134} 135 136func workflowImage(deps map[string][]string, nixery string) string { 137 var dependencies string 138 for reg, ds := range deps { 139 if reg == "nixpkgs" { 140 dependencies = path.Join(ds...) 141 } 142 } 143 144 // load defaults from somewhere else 145 dependencies = path.Join(dependencies, "bash", "git", "coreutils", "nix") 146 147 if runtime.GOARCH == "arm64" { 148 dependencies = path.Join("arm64", dependencies) 149 } 150 151 return path.Join(nixery, dependencies) 152} 153 154func New(ctx context.Context, cfg *config.Config) (*Engine, error) { 155 dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) 156 if err != nil { 157 return nil, err 158 } 159 160 l := log.FromContext(ctx).With("component", "spindle") 161 162 e := &Engine{ 163 docker: dcli, 164 l: l, 165 cfg: cfg, 166 } 167 168 e.cleanup = make(map[string][]cleanupFunc) 169 170 return e, nil 171} 172 173// SetupWorkflow sets up a new network for the workflow and volumes for 174// the workspace and Nix store. These are persisted across steps and are 175// destroyed at the end of the workflow. 176func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId, wf *models.Workflow) error { 177 e.l.Info("setting up workflow", "workflow", wid) 178 179 _, err := e.docker.VolumeCreate(ctx, volume.CreateOptions{ 180 Name: workspaceVolume(wid), 181 Driver: "local", 182 }) 183 if err != nil { 184 return err 185 } 186 e.registerCleanup(wid, func(ctx context.Context) error { 187 return e.docker.VolumeRemove(ctx, workspaceVolume(wid), true) 188 }) 189 190 _, err = e.docker.VolumeCreate(ctx, volume.CreateOptions{ 191 Name: nixVolume(wid), 192 Driver: "local", 193 }) 194 if err != nil { 195 return err 196 } 197 e.registerCleanup(wid, func(ctx context.Context) error { 198 return e.docker.VolumeRemove(ctx, nixVolume(wid), true) 199 }) 200 201 _, err = e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{ 202 Driver: "bridge", 203 }) 204 if err != nil { 205 return err 206 } 207 e.registerCleanup(wid, func(ctx context.Context) error { 208 return e.docker.NetworkRemove(ctx, networkName(wid)) 209 }) 210 211 addl := wf.Data.(addlFields) 212 213 reader, err := e.docker.ImagePull(ctx, addl.image, image.PullOptions{}) 214 if err != nil { 215 e.l.Error("pipeline image pull failed!", "image", addl.image, "workflowId", wid, "error", err.Error()) 216 217 return fmt.Errorf("pulling image: %w", err) 218 } 219 defer reader.Close() 220 io.Copy(os.Stdout, reader) 221 222 return nil 223} 224 225func (e *Engine) RunStep(ctx context.Context, wid models.WorkflowId, w *models.Workflow, idx int, secrets []secrets.UnlockedSecret, wfLogger *models.WorkflowLogger) error { 226 workflowEnvs := ConstructEnvs(w.Data.(addlFields).env) 227 for _, s := range secrets { 228 workflowEnvs.AddEnv(s.Key, s.Value) 229 } 230 231 step := w.Steps[idx].(Step) 232 233 select { 234 case <-ctx.Done(): 235 return ctx.Err() 236 default: 237 } 238 239 envs := append(EnvVars(nil), workflowEnvs...) 240 for k, v := range step.environment { 241 envs.AddEnv(k, v) 242 } 243 envs.AddEnv("HOME", workspaceDir) 244 e.l.Debug("envs for step", "step", step.Name, "envs", envs.Slice()) 245 246 hostConfig := hostConfig(wid) 247 resp, err := e.docker.ContainerCreate(ctx, &container.Config{ 248 Image: w.Data.(addlFields).image, 249 Cmd: []string{"bash", "-c", step.command}, 250 WorkingDir: workspaceDir, 251 Tty: false, 252 Hostname: "spindle", 253 Env: envs.Slice(), 254 }, hostConfig, nil, nil, "") 255 defer e.DestroyStep(ctx, resp.ID) 256 if err != nil { 257 return fmt.Errorf("creating container: %w", err) 258 } 259 260 err = e.docker.NetworkConnect(ctx, networkName(wid), resp.ID, nil) 261 if err != nil { 262 return fmt.Errorf("connecting network: %w", err) 263 } 264 265 err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{}) 266 if err != nil { 267 return err 268 } 269 e.l.Info("started container", "name", resp.ID, "step", step.Name) 270 271 // start tailing logs in background 272 tailDone := make(chan error, 1) 273 go func() { 274 tailDone <- e.tailStep(ctx, wfLogger, resp.ID, wid, idx, step) 275 }() 276 277 // wait for container completion or timeout 278 waitDone := make(chan struct{}) 279 var state *container.State 280 var waitErr error 281 282 go func() { 283 defer close(waitDone) 284 state, waitErr = e.WaitStep(ctx, resp.ID) 285 }() 286 287 select { 288 case <-waitDone: 289 290 // wait for tailing to complete 291 <-tailDone 292 293 case <-ctx.Done(): 294 e.l.Warn("step timed out; killing container", "container", resp.ID, "step", step.Name) 295 err = e.DestroyStep(context.Background(), resp.ID) 296 if err != nil { 297 e.l.Error("failed to destroy step", "container", resp.ID, "error", err) 298 } 299 300 // wait for both goroutines to finish 301 <-waitDone 302 <-tailDone 303 304 return engine.ErrTimedOut 305 } 306 307 select { 308 case <-ctx.Done(): 309 return ctx.Err() 310 default: 311 } 312 313 if waitErr != nil { 314 return waitErr 315 } 316 317 err = e.DestroyStep(ctx, resp.ID) 318 if err != nil { 319 return err 320 } 321 322 if state.ExitCode != 0 { 323 e.l.Error("workflow failed!", "workflow_id", wid.String(), "error", state.Error, "exit_code", state.ExitCode, "oom_killed", state.OOMKilled) 324 if state.OOMKilled { 325 return ErrOOMKilled 326 } 327 return engine.ErrWorkflowFailed 328 } 329 330 return nil 331} 332 333func (e *Engine) WaitStep(ctx context.Context, containerID string) (*container.State, error) { 334 wait, errCh := e.docker.ContainerWait(ctx, containerID, container.WaitConditionNotRunning) 335 select { 336 case err := <-errCh: 337 if err != nil { 338 return nil, err 339 } 340 case <-wait: 341 } 342 343 e.l.Info("waited for container", "name", containerID) 344 345 info, err := e.docker.ContainerInspect(ctx, containerID) 346 if err != nil { 347 return nil, err 348 } 349 350 return info.State, nil 351} 352 353func (e *Engine) tailStep(ctx context.Context, wfLogger *models.WorkflowLogger, containerID string, wid models.WorkflowId, stepIdx int, step models.Step) error { 354 if wfLogger == nil { 355 return nil 356 } 357 358 logs, err := e.docker.ContainerLogs(ctx, containerID, container.LogsOptions{ 359 Follow: true, 360 ShowStdout: true, 361 ShowStderr: true, 362 Details: false, 363 Timestamps: false, 364 }) 365 if err != nil { 366 return err 367 } 368 369 _, err = stdcopy.StdCopy( 370 wfLogger.DataWriter("stdout"), 371 wfLogger.DataWriter("stderr"), 372 logs, 373 ) 374 if err != nil && err != io.EOF && !errors.Is(err, context.DeadlineExceeded) { 375 return fmt.Errorf("failed to copy logs: %w", err) 376 } 377 378 return nil 379} 380 381func (e *Engine) DestroyStep(ctx context.Context, containerID string) error { 382 err := e.docker.ContainerKill(ctx, containerID, "9") // SIGKILL 383 if err != nil && !isErrContainerNotFoundOrNotRunning(err) { 384 return err 385 } 386 387 if err := e.docker.ContainerRemove(ctx, containerID, container.RemoveOptions{ 388 RemoveVolumes: true, 389 RemoveLinks: false, 390 Force: false, 391 }); err != nil && !isErrContainerNotFoundOrNotRunning(err) { 392 return err 393 } 394 395 return nil 396} 397 398func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error { 399 e.cleanupMu.Lock() 400 key := wid.String() 401 402 fns := e.cleanup[key] 403 delete(e.cleanup, key) 404 e.cleanupMu.Unlock() 405 406 for _, fn := range fns { 407 if err := fn(ctx); err != nil { 408 e.l.Error("failed to cleanup workflow resource", "workflowId", wid, "error", err) 409 } 410 } 411 return nil 412} 413 414func (e *Engine) registerCleanup(wid models.WorkflowId, fn cleanupFunc) { 415 e.cleanupMu.Lock() 416 defer e.cleanupMu.Unlock() 417 418 key := wid.String() 419 e.cleanup[key] = append(e.cleanup[key], fn) 420} 421 422func workspaceVolume(wid models.WorkflowId) string { 423 return fmt.Sprintf("workspace-%s", wid) 424} 425 426func nixVolume(wid models.WorkflowId) string { 427 return fmt.Sprintf("nix-%s", wid) 428} 429 430func networkName(wid models.WorkflowId) string { 431 return fmt.Sprintf("workflow-network-%s", wid) 432} 433 434func hostConfig(wid models.WorkflowId) *container.HostConfig { 435 hostConfig := &container.HostConfig{ 436 Mounts: []mount.Mount{ 437 { 438 Type: mount.TypeVolume, 439 Source: workspaceVolume(wid), 440 Target: workspaceDir, 441 }, 442 { 443 Type: mount.TypeVolume, 444 Source: nixVolume(wid), 445 Target: "/nix", 446 }, 447 { 448 Type: mount.TypeTmpfs, 449 Target: "/tmp", 450 ReadOnly: false, 451 TmpfsOptions: &mount.TmpfsOptions{ 452 Mode: 0o1777, // world-writeable sticky bit 453 Options: [][]string{ 454 {"exec"}, 455 }, 456 }, 457 }, 458 { 459 Type: mount.TypeVolume, 460 Source: "etc-nix-" + wid.String(), 461 Target: "/etc/nix", 462 }, 463 }, 464 ReadonlyRootfs: false, 465 CapDrop: []string{"ALL"}, 466 CapAdd: []string{"CAP_DAC_OVERRIDE"}, 467 SecurityOpt: []string{"no-new-privileges"}, 468 ExtraHosts: []string{"host.docker.internal:host-gateway"}, 469 } 470 471 return hostConfig 472} 473 474// thanks woodpecker 475func isErrContainerNotFoundOrNotRunning(err error) bool { 476 // Error response from daemon: Cannot kill container: ...: No such container: ... 477 // Error response from daemon: Cannot kill container: ...: Container ... is not running" 478 // Error response from podman daemon: can only kill running containers. ... is in state exited 479 // Error: No such container: ... 480 return err != nil && (strings.Contains(err.Error(), "No such container") || strings.Contains(err.Error(), "is not running") || strings.Contains(err.Error(), "can only kill running containers")) 481}