this repo has no description
1package engine 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "io" 8 "log/slog" 9 "os" 10 "strings" 11 "sync" 12 "time" 13 14 "github.com/docker/docker/api/types/container" 15 "github.com/docker/docker/api/types/image" 16 "github.com/docker/docker/api/types/mount" 17 "github.com/docker/docker/api/types/network" 18 "github.com/docker/docker/api/types/volume" 19 "github.com/docker/docker/client" 20 "github.com/docker/docker/pkg/stdcopy" 21 "tangled.sh/tangled.sh/core/log" 22 "tangled.sh/tangled.sh/core/notifier" 23 "tangled.sh/tangled.sh/core/spindle/config" 24 "tangled.sh/tangled.sh/core/spindle/db" 25 "tangled.sh/tangled.sh/core/spindle/models" 26) 27 28const ( 29 workspaceDir = "/tangled/workspace" 30) 31 32type cleanupFunc func(context.Context) error 33 34type Engine struct { 35 docker client.APIClient 36 l *slog.Logger 37 db *db.DB 38 n *notifier.Notifier 39 cfg *config.Config 40 41 cleanupMu sync.Mutex 42 cleanup map[string][]cleanupFunc 43} 44 45func New(ctx context.Context, cfg *config.Config, db *db.DB, n *notifier.Notifier) (*Engine, error) { 46 dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) 47 if err != nil { 48 return nil, err 49 } 50 51 l := log.FromContext(ctx).With("component", "spindle") 52 53 e := &Engine{ 54 docker: dcli, 55 l: l, 56 db: db, 57 n: n, 58 cfg: cfg, 59 } 60 61 e.cleanup = make(map[string][]cleanupFunc) 62 63 return e, nil 64} 65 66func (e *Engine) StartWorkflows(ctx context.Context, pipeline *models.Pipeline, pipelineId models.PipelineId) { 67 e.l.Info("starting all workflows in parallel", "pipeline", pipelineId) 68 69 wg := sync.WaitGroup{} 70 for _, w := range pipeline.Workflows { 71 wg.Add(1) 72 go func() error { 73 defer wg.Done() 74 wid := models.WorkflowId{ 75 PipelineId: pipelineId, 76 Name: w.Name, 77 } 78 79 err := e.db.StatusRunning(wid, e.n) 80 if err != nil { 81 return err 82 } 83 84 err = e.SetupWorkflow(ctx, wid) 85 if err != nil { 86 e.l.Error("setting up worklow", "wid", wid, "err", err) 87 return err 88 } 89 defer e.DestroyWorkflow(ctx, wid) 90 91 reader, err := e.docker.ImagePull(ctx, w.Image, image.PullOptions{}) 92 if err != nil { 93 e.l.Error("pipeline failed!", "workflowId", wid, "error", err.Error()) 94 95 err := e.db.StatusFailed(wid, err.Error(), -1, e.n) 96 if err != nil { 97 return err 98 } 99 100 return fmt.Errorf("pulling image: %w", err) 101 } 102 defer reader.Close() 103 io.Copy(os.Stdout, reader) 104 105 err = e.StartSteps(ctx, w.Steps, wid, w.Image) 106 if err != nil { 107 e.l.Error("workflow failed!", "wid", wid.String(), "error", err.Error()) 108 109 dbErr := e.db.StatusFailed(wid, err.Error(), -1, e.n) 110 if dbErr != nil { 111 return dbErr 112 } 113 114 return fmt.Errorf("starting steps image: %w", err) 115 } 116 117 err = e.db.StatusSuccess(wid, e.n) 118 if err != nil { 119 return err 120 } 121 122 return nil 123 }() 124 } 125 126 wg.Wait() 127} 128 129// SetupWorkflow sets up a new network for the workflow and volumes for 130// the workspace and Nix store. These are persisted across steps and are 131// destroyed at the end of the workflow. 132func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId) error { 133 e.l.Info("setting up workflow", "workflow", wid) 134 135 _, err := e.docker.VolumeCreate(ctx, volume.CreateOptions{ 136 Name: workspaceVolume(wid), 137 Driver: "local", 138 }) 139 if err != nil { 140 return err 141 } 142 e.registerCleanup(wid, func(ctx context.Context) error { 143 return e.docker.VolumeRemove(ctx, workspaceVolume(wid), true) 144 }) 145 146 _, err = e.docker.VolumeCreate(ctx, volume.CreateOptions{ 147 Name: nixVolume(wid), 148 Driver: "local", 149 }) 150 if err != nil { 151 return err 152 } 153 e.registerCleanup(wid, func(ctx context.Context) error { 154 return e.docker.VolumeRemove(ctx, nixVolume(wid), true) 155 }) 156 157 _, err = e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{ 158 Driver: "bridge", 159 }) 160 if err != nil { 161 return err 162 } 163 e.registerCleanup(wid, func(ctx context.Context) error { 164 return e.docker.NetworkRemove(ctx, networkName(wid)) 165 }) 166 167 return nil 168} 169 170// StartSteps starts all steps sequentially with the same base image. 171// ONLY marks pipeline as failed if container's exit code is non-zero. 172// All other errors are bubbled up. 173// Fixed version of the step execution logic 174func (e *Engine) StartSteps(ctx context.Context, steps []models.Step, wid models.WorkflowId, image string) error { 175 stepTimeoutStr := e.cfg.Pipelines.StepTimeout 176 stepTimeout, err := time.ParseDuration(stepTimeoutStr) 177 if err != nil { 178 e.l.Error("failed to parse step timeout", "error", err, "timeout", stepTimeoutStr) 179 stepTimeout = 5 * time.Minute 180 } 181 e.l.Info("using step timeout", "timeout", stepTimeout) 182 183 for stepIdx, step := range steps { 184 envs := ConstructEnvs(step.Environment) 185 envs.AddEnv("HOME", workspaceDir) 186 e.l.Debug("envs for step", "step", step.Name, "envs", envs.Slice()) 187 188 hostConfig := hostConfig(wid) 189 resp, err := e.docker.ContainerCreate(ctx, &container.Config{ 190 Image: image, 191 Cmd: []string{"bash", "-c", step.Command}, 192 WorkingDir: workspaceDir, 193 Tty: false, 194 Hostname: "spindle", 195 Env: envs.Slice(), 196 }, hostConfig, nil, nil, "") 197 if err != nil { 198 return fmt.Errorf("creating container: %w", err) 199 } 200 201 err = e.docker.NetworkConnect(ctx, networkName(wid), resp.ID, nil) 202 if err != nil { 203 return fmt.Errorf("connecting network: %w", err) 204 } 205 206 stepCtx, stepCancel := context.WithTimeout(ctx, stepTimeout) 207 208 err = e.docker.ContainerStart(stepCtx, resp.ID, container.StartOptions{}) 209 if err != nil { 210 stepCancel() 211 return err 212 } 213 e.l.Info("started container", "name", resp.ID, "step", step.Name) 214 215 // start tailing logs in background 216 tailDone := make(chan error, 1) 217 go func() { 218 tailDone <- e.TailStep(stepCtx, resp.ID, wid, stepIdx) 219 }() 220 221 // wait for container completion or timeout 222 waitDone := make(chan struct{}) 223 var state *container.State 224 var waitErr error 225 226 go func() { 227 defer close(waitDone) 228 state, waitErr = e.WaitStep(stepCtx, resp.ID) 229 }() 230 231 select { 232 case <-waitDone: 233 234 // wait for tailing to complete 235 <-tailDone 236 stepCancel() 237 238 case <-stepCtx.Done(): 239 e.l.Warn("step timed out; killing container", "container", resp.ID, "timeout", stepTimeout) 240 241 _ = e.DestroyStep(ctx, resp.ID) 242 243 // wait for both goroutines to finish 244 <-waitDone 245 <-tailDone 246 247 stepCancel() 248 return fmt.Errorf("step timed out after %v", stepTimeout) 249 } 250 251 if waitErr != nil { 252 return waitErr 253 } 254 255 err = e.DestroyStep(ctx, resp.ID) 256 if err != nil { 257 return err 258 } 259 260 if state.ExitCode != 0 { 261 e.l.Error("workflow failed!", "workflow_id", wid.String(), "error", state.Error, "exit_code", state.ExitCode, "oom_killed", state.OOMKilled) 262 return fmt.Errorf("error: %s, exit code: %d, oom: %t", state.Error, state.ExitCode, state.OOMKilled) 263 } 264 } 265 266 return nil 267} 268 269func (e *Engine) WaitStep(ctx context.Context, containerID string) (*container.State, error) { 270 wait, errCh := e.docker.ContainerWait(ctx, containerID, container.WaitConditionNotRunning) 271 select { 272 case err := <-errCh: 273 if err != nil { 274 return nil, err 275 } 276 case <-wait: 277 } 278 279 e.l.Info("waited for container", "name", containerID) 280 281 info, err := e.docker.ContainerInspect(ctx, containerID) 282 if err != nil { 283 return nil, err 284 } 285 286 return info.State, nil 287} 288 289func (e *Engine) TailStep(ctx context.Context, containerID string, wid models.WorkflowId, stepIdx int) error { 290 logs, err := e.docker.ContainerLogs(ctx, containerID, container.LogsOptions{ 291 Follow: true, 292 ShowStdout: true, 293 ShowStderr: true, 294 Details: false, 295 Timestamps: false, 296 }) 297 if err != nil { 298 return err 299 } 300 301 wfLogger, err := NewWorkflowLogger(e.cfg.Pipelines.LogDir, wid) 302 if err != nil { 303 e.l.Warn("failed to setup step logger; logs will not be persisted", "error", err) 304 return err 305 } 306 defer wfLogger.Close() 307 308 _, err = stdcopy.StdCopy(wfLogger.Stdout(), wfLogger.Stderr(), logs) 309 if err != nil && err != io.EOF && !errors.Is(err, context.DeadlineExceeded) { 310 return fmt.Errorf("failed to copy logs: %w", err) 311 } 312 313 return nil 314} 315 316func (e *Engine) DestroyStep(ctx context.Context, containerID string) error { 317 err := e.docker.ContainerKill(ctx, containerID, "9") // SIGKILL 318 if err != nil && !isErrContainerNotFoundOrNotRunning(err) { 319 return err 320 } 321 322 if err := e.docker.ContainerRemove(ctx, containerID, container.RemoveOptions{ 323 RemoveVolumes: true, 324 RemoveLinks: false, 325 Force: false, 326 }); err != nil && !isErrContainerNotFoundOrNotRunning(err) { 327 return err 328 } 329 330 return nil 331} 332 333func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error { 334 e.cleanupMu.Lock() 335 key := wid.String() 336 337 fns := e.cleanup[key] 338 delete(e.cleanup, key) 339 e.cleanupMu.Unlock() 340 341 for _, fn := range fns { 342 if err := fn(ctx); err != nil { 343 e.l.Error("failed to cleanup workflow resource", "workflowId", wid, "error", err) 344 } 345 } 346 return nil 347} 348 349func (e *Engine) registerCleanup(wid models.WorkflowId, fn cleanupFunc) { 350 e.cleanupMu.Lock() 351 defer e.cleanupMu.Unlock() 352 353 key := wid.String() 354 e.cleanup[key] = append(e.cleanup[key], fn) 355} 356 357func workspaceVolume(wid models.WorkflowId) string { 358 return fmt.Sprintf("workspace-%s", wid) 359} 360 361func nixVolume(wid models.WorkflowId) string { 362 return fmt.Sprintf("nix-%s", wid) 363} 364 365func networkName(wid models.WorkflowId) string { 366 return fmt.Sprintf("workflow-network-%s", wid) 367} 368 369func hostConfig(wid models.WorkflowId) *container.HostConfig { 370 hostConfig := &container.HostConfig{ 371 Mounts: []mount.Mount{ 372 { 373 Type: mount.TypeVolume, 374 Source: workspaceVolume(wid), 375 Target: workspaceDir, 376 }, 377 { 378 Type: mount.TypeVolume, 379 Source: nixVolume(wid), 380 Target: "/nix", 381 }, 382 { 383 Type: mount.TypeTmpfs, 384 Target: "/tmp", 385 ReadOnly: false, 386 TmpfsOptions: &mount.TmpfsOptions{ 387 Mode: 0o1777, // world-writeable sticky bit 388 }, 389 }, 390 { 391 Type: mount.TypeVolume, 392 Source: "etc-nix-" + wid.String(), 393 Target: "/etc/nix", 394 }, 395 }, 396 ReadonlyRootfs: false, 397 CapDrop: []string{"ALL"}, 398 CapAdd: []string{"CAP_DAC_OVERRIDE"}, 399 SecurityOpt: []string{"no-new-privileges"}, 400 ExtraHosts: []string{"host.docker.internal:host-gateway"}, 401 } 402 403 return hostConfig 404} 405 406// thanks woodpecker 407func isErrContainerNotFoundOrNotRunning(err error) bool { 408 // Error response from daemon: Cannot kill container: ...: No such container: ... 409 // Error response from daemon: Cannot kill container: ...: Container ... is not running" 410 // Error response from podman daemon: can only kill running containers. ... is in state exited 411 // Error: No such container: ... 412 return err != nil && (strings.Contains(err.Error(), "No such container") || strings.Contains(err.Error(), "is not running") || strings.Contains(err.Error(), "can only kill running containers")) 413}