this repo has no description
1package nixery 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "io" 8 "log/slog" 9 "os" 10 "path" 11 "strings" 12 "sync" 13 "time" 14 15 "github.com/docker/docker/api/types/container" 16 "github.com/docker/docker/api/types/image" 17 "github.com/docker/docker/api/types/mount" 18 "github.com/docker/docker/api/types/network" 19 "github.com/docker/docker/api/types/volume" 20 "github.com/docker/docker/client" 21 "github.com/docker/docker/pkg/stdcopy" 22 "gopkg.in/yaml.v3" 23 "tangled.sh/tangled.sh/core/api/tangled" 24 "tangled.sh/tangled.sh/core/log" 25 "tangled.sh/tangled.sh/core/spindle/config" 26 "tangled.sh/tangled.sh/core/spindle/engine" 27 "tangled.sh/tangled.sh/core/spindle/models" 28 "tangled.sh/tangled.sh/core/spindle/secrets" 29) 30 31const ( 32 workspaceDir = "/tangled/workspace" 33) 34 35type cleanupFunc func(context.Context) error 36 37type Engine struct { 38 docker client.APIClient 39 l *slog.Logger 40 cfg *config.Config 41 42 cleanupMu sync.Mutex 43 cleanup map[string][]cleanupFunc 44} 45 46type Step struct { 47 name string 48 kind models.StepKind 49 command string 50 environment map[string]string 51} 52 53func (s Step) Name() string { 54 return s.name 55} 56 57func (s Step) Command() string { 58 return s.command 59} 60 61func (s Step) Kind() models.StepKind { 62 return s.kind 63} 64 65// setupSteps get added to start of Steps 66type setupSteps []models.Step 67 68// addStep adds a step to the beginning of the workflow's steps. 69func (ss *setupSteps) addStep(step models.Step) { 70 *ss = append(*ss, step) 71} 72 73type addlFields struct { 74 image string 75 env map[string]string 76} 77 78func (e *Engine) InitWorkflow(twf tangled.Pipeline_Workflow, tpl tangled.Pipeline) (*models.Workflow, error) { 79 swf := &models.Workflow{} 80 addl := addlFields{} 81 82 dwf := &struct { 83 Steps []struct { 84 Command string `yaml:"command"` 85 Name string `yaml:"name"` 86 Environment map[string]string `yaml:"environment"` 87 } `yaml:"steps"` 88 Dependencies map[string][]string `yaml:"dependencies"` 89 Environment map[string]string `yaml:"environment"` 90 }{} 91 err := yaml.Unmarshal([]byte(twf.Raw), &dwf) 92 if err != nil { 93 return nil, err 94 } 95 96 for _, dstep := range dwf.Steps { 97 sstep := Step{} 98 sstep.environment = dstep.Environment 99 sstep.command = dstep.Command 100 sstep.name = dstep.Name 101 sstep.kind = models.StepKindUser 102 swf.Steps = append(swf.Steps, sstep) 103 } 104 swf.Name = twf.Name 105 addl.env = dwf.Environment 106 addl.image = workflowImage(dwf.Dependencies, e.cfg.NixeryPipelines.Nixery) 107 108 setup := &setupSteps{} 109 110 setup.addStep(nixConfStep()) 111 setup.addStep(cloneStep(twf, *tpl.TriggerMetadata, e.cfg.Server.Dev)) 112 // this step could be empty 113 if s := dependencyStep(dwf.Dependencies); s != nil { 114 setup.addStep(*s) 115 } 116 117 // append setup steps in order to the start of workflow steps 118 swf.Steps = append(*setup, swf.Steps...) 119 swf.Data = addl 120 121 return swf, nil 122} 123 124func (e *Engine) WorkflowTimeout() time.Duration { 125 workflowTimeoutStr := e.cfg.NixeryPipelines.WorkflowTimeout 126 workflowTimeout, err := time.ParseDuration(workflowTimeoutStr) 127 if err != nil { 128 e.l.Error("failed to parse workflow timeout", "error", err, "timeout", workflowTimeoutStr) 129 workflowTimeout = 5 * time.Minute 130 } 131 132 return workflowTimeout 133} 134 135func workflowImage(deps map[string][]string, nixery string) string { 136 var dependencies string 137 for reg, ds := range deps { 138 if reg == "nixpkgs" { 139 dependencies = path.Join(ds...) 140 } 141 } 142 143 // load defaults from somewhere else 144 dependencies = path.Join(dependencies, "bash", "git", "coreutils", "nix") 145 146 return path.Join(nixery, dependencies) 147} 148 149func New(ctx context.Context, cfg *config.Config) (*Engine, error) { 150 dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) 151 if err != nil { 152 return nil, err 153 } 154 155 l := log.FromContext(ctx).With("component", "spindle") 156 157 e := &Engine{ 158 docker: dcli, 159 l: l, 160 cfg: cfg, 161 } 162 163 e.cleanup = make(map[string][]cleanupFunc) 164 165 return e, nil 166} 167 168// SetupWorkflow sets up a new network for the workflow and volumes for 169// the workspace and Nix store. These are persisted across steps and are 170// destroyed at the end of the workflow. 171func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId, wf *models.Workflow) error { 172 e.l.Info("setting up workflow", "workflow", wid) 173 174 _, err := e.docker.VolumeCreate(ctx, volume.CreateOptions{ 175 Name: workspaceVolume(wid), 176 Driver: "local", 177 }) 178 if err != nil { 179 return err 180 } 181 e.registerCleanup(wid, func(ctx context.Context) error { 182 return e.docker.VolumeRemove(ctx, workspaceVolume(wid), true) 183 }) 184 185 _, err = e.docker.VolumeCreate(ctx, volume.CreateOptions{ 186 Name: nixVolume(wid), 187 Driver: "local", 188 }) 189 if err != nil { 190 return err 191 } 192 e.registerCleanup(wid, func(ctx context.Context) error { 193 return e.docker.VolumeRemove(ctx, nixVolume(wid), true) 194 }) 195 196 _, err = e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{ 197 Driver: "bridge", 198 }) 199 if err != nil { 200 return err 201 } 202 e.registerCleanup(wid, func(ctx context.Context) error { 203 return e.docker.NetworkRemove(ctx, networkName(wid)) 204 }) 205 206 addl := wf.Data.(addlFields) 207 208 reader, err := e.docker.ImagePull(ctx, addl.image, image.PullOptions{}) 209 if err != nil { 210 e.l.Error("pipeline image pull failed!", "image", addl.image, "workflowId", wid, "error", err.Error()) 211 212 return fmt.Errorf("pulling image: %w", err) 213 } 214 defer reader.Close() 215 io.Copy(os.Stdout, reader) 216 217 return nil 218} 219 220func (e *Engine) RunStep(ctx context.Context, wid models.WorkflowId, w *models.Workflow, idx int, secrets []secrets.UnlockedSecret, wfLogger *models.WorkflowLogger) error { 221 workflowEnvs := ConstructEnvs(w.Data.(addlFields).env) 222 for _, s := range secrets { 223 workflowEnvs.AddEnv(s.Key, s.Value) 224 } 225 226 step := w.Steps[idx].(Step) 227 228 select { 229 case <-ctx.Done(): 230 return ctx.Err() 231 default: 232 } 233 234 envs := append(EnvVars(nil), workflowEnvs...) 235 for k, v := range step.environment { 236 envs.AddEnv(k, v) 237 } 238 envs.AddEnv("HOME", workspaceDir) 239 e.l.Debug("envs for step", "step", step.Name, "envs", envs.Slice()) 240 241 hostConfig := hostConfig(wid) 242 resp, err := e.docker.ContainerCreate(ctx, &container.Config{ 243 Image: w.Data.(addlFields).image, 244 Cmd: []string{"bash", "-c", step.command}, 245 WorkingDir: workspaceDir, 246 Tty: false, 247 Hostname: "spindle", 248 Env: envs.Slice(), 249 }, hostConfig, nil, nil, "") 250 defer e.DestroyStep(ctx, resp.ID) 251 if err != nil { 252 return fmt.Errorf("creating container: %w", err) 253 } 254 255 err = e.docker.NetworkConnect(ctx, networkName(wid), resp.ID, nil) 256 if err != nil { 257 return fmt.Errorf("connecting network: %w", err) 258 } 259 260 err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{}) 261 if err != nil { 262 return err 263 } 264 e.l.Info("started container", "name", resp.ID, "step", step.Name) 265 266 // start tailing logs in background 267 tailDone := make(chan error, 1) 268 go func() { 269 tailDone <- e.tailStep(ctx, wfLogger, resp.ID, wid, idx, step) 270 }() 271 272 // wait for container completion or timeout 273 waitDone := make(chan struct{}) 274 var state *container.State 275 var waitErr error 276 277 go func() { 278 defer close(waitDone) 279 state, waitErr = e.WaitStep(ctx, resp.ID) 280 }() 281 282 select { 283 case <-waitDone: 284 285 // wait for tailing to complete 286 <-tailDone 287 288 case <-ctx.Done(): 289 e.l.Warn("step timed out; killing container", "container", resp.ID, "step", step.Name) 290 err = e.DestroyStep(context.Background(), resp.ID) 291 if err != nil { 292 e.l.Error("failed to destroy step", "container", resp.ID, "error", err) 293 } 294 295 // wait for both goroutines to finish 296 <-waitDone 297 <-tailDone 298 299 return engine.ErrTimedOut 300 } 301 302 select { 303 case <-ctx.Done(): 304 return ctx.Err() 305 default: 306 } 307 308 if waitErr != nil { 309 return waitErr 310 } 311 312 err = e.DestroyStep(ctx, resp.ID) 313 if err != nil { 314 return err 315 } 316 317 if state.ExitCode != 0 { 318 e.l.Error("workflow failed!", "workflow_id", wid.String(), "error", state.Error, "exit_code", state.ExitCode, "oom_killed", state.OOMKilled) 319 if state.OOMKilled { 320 return ErrOOMKilled 321 } 322 return engine.ErrWorkflowFailed 323 } 324 325 return nil 326} 327 328func (e *Engine) WaitStep(ctx context.Context, containerID string) (*container.State, error) { 329 wait, errCh := e.docker.ContainerWait(ctx, containerID, container.WaitConditionNotRunning) 330 select { 331 case err := <-errCh: 332 if err != nil { 333 return nil, err 334 } 335 case <-wait: 336 } 337 338 e.l.Info("waited for container", "name", containerID) 339 340 info, err := e.docker.ContainerInspect(ctx, containerID) 341 if err != nil { 342 return nil, err 343 } 344 345 return info.State, nil 346} 347 348func (e *Engine) tailStep(ctx context.Context, wfLogger *models.WorkflowLogger, containerID string, wid models.WorkflowId, stepIdx int, step models.Step) error { 349 if wfLogger == nil { 350 return nil 351 } 352 353 logs, err := e.docker.ContainerLogs(ctx, containerID, container.LogsOptions{ 354 Follow: true, 355 ShowStdout: true, 356 ShowStderr: true, 357 Details: false, 358 Timestamps: false, 359 }) 360 if err != nil { 361 return err 362 } 363 364 _, err = stdcopy.StdCopy( 365 wfLogger.DataWriter("stdout"), 366 wfLogger.DataWriter("stderr"), 367 logs, 368 ) 369 if err != nil && err != io.EOF && !errors.Is(err, context.DeadlineExceeded) { 370 return fmt.Errorf("failed to copy logs: %w", err) 371 } 372 373 return nil 374} 375 376func (e *Engine) DestroyStep(ctx context.Context, containerID string) error { 377 err := e.docker.ContainerKill(ctx, containerID, "9") // SIGKILL 378 if err != nil && !isErrContainerNotFoundOrNotRunning(err) { 379 return err 380 } 381 382 if err := e.docker.ContainerRemove(ctx, containerID, container.RemoveOptions{ 383 RemoveVolumes: true, 384 RemoveLinks: false, 385 Force: false, 386 }); err != nil && !isErrContainerNotFoundOrNotRunning(err) { 387 return err 388 } 389 390 return nil 391} 392 393func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error { 394 e.cleanupMu.Lock() 395 key := wid.String() 396 397 fns := e.cleanup[key] 398 delete(e.cleanup, key) 399 e.cleanupMu.Unlock() 400 401 for _, fn := range fns { 402 if err := fn(ctx); err != nil { 403 e.l.Error("failed to cleanup workflow resource", "workflowId", wid, "error", err) 404 } 405 } 406 return nil 407} 408 409func (e *Engine) registerCleanup(wid models.WorkflowId, fn cleanupFunc) { 410 e.cleanupMu.Lock() 411 defer e.cleanupMu.Unlock() 412 413 key := wid.String() 414 e.cleanup[key] = append(e.cleanup[key], fn) 415} 416 417func workspaceVolume(wid models.WorkflowId) string { 418 return fmt.Sprintf("workspace-%s", wid) 419} 420 421func nixVolume(wid models.WorkflowId) string { 422 return fmt.Sprintf("nix-%s", wid) 423} 424 425func networkName(wid models.WorkflowId) string { 426 return fmt.Sprintf("workflow-network-%s", wid) 427} 428 429func hostConfig(wid models.WorkflowId) *container.HostConfig { 430 hostConfig := &container.HostConfig{ 431 Mounts: []mount.Mount{ 432 { 433 Type: mount.TypeVolume, 434 Source: workspaceVolume(wid), 435 Target: workspaceDir, 436 }, 437 { 438 Type: mount.TypeVolume, 439 Source: nixVolume(wid), 440 Target: "/nix", 441 }, 442 { 443 Type: mount.TypeTmpfs, 444 Target: "/tmp", 445 ReadOnly: false, 446 TmpfsOptions: &mount.TmpfsOptions{ 447 Mode: 0o1777, // world-writeable sticky bit 448 Options: [][]string{ 449 {"exec"}, 450 }, 451 }, 452 }, 453 { 454 Type: mount.TypeVolume, 455 Source: "etc-nix-" + wid.String(), 456 Target: "/etc/nix", 457 }, 458 }, 459 ReadonlyRootfs: false, 460 CapDrop: []string{"ALL"}, 461 CapAdd: []string{"CAP_DAC_OVERRIDE"}, 462 SecurityOpt: []string{"no-new-privileges"}, 463 ExtraHosts: []string{"host.docker.internal:host-gateway"}, 464 } 465 466 return hostConfig 467} 468 469// thanks woodpecker 470func isErrContainerNotFoundOrNotRunning(err error) bool { 471 // Error response from daemon: Cannot kill container: ...: No such container: ... 472 // Error response from daemon: Cannot kill container: ...: Container ... is not running" 473 // Error response from podman daemon: can only kill running containers. ... is in state exited 474 // Error: No such container: ... 475 return err != nil && (strings.Contains(err.Error(), "No such container") || strings.Contains(err.Error(), "is not running") || strings.Contains(err.Error(), "can only kill running containers")) 476}