this repo has no description
1package engine 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "io" 8 "log/slog" 9 "os" 10 "strings" 11 "sync" 12 "time" 13 14 "github.com/docker/docker/api/types/container" 15 "github.com/docker/docker/api/types/image" 16 "github.com/docker/docker/api/types/mount" 17 "github.com/docker/docker/api/types/network" 18 "github.com/docker/docker/api/types/volume" 19 "github.com/docker/docker/client" 20 "github.com/docker/docker/pkg/stdcopy" 21 "tangled.sh/tangled.sh/core/log" 22 "tangled.sh/tangled.sh/core/notifier" 23 "tangled.sh/tangled.sh/core/spindle/config" 24 "tangled.sh/tangled.sh/core/spindle/db" 25 "tangled.sh/tangled.sh/core/spindle/models" 26) 27 28const ( 29 workspaceDir = "/tangled/workspace" 30) 31 32type cleanupFunc func(context.Context) error 33 34type Engine struct { 35 docker client.APIClient 36 l *slog.Logger 37 db *db.DB 38 n *notifier.Notifier 39 cfg *config.Config 40 41 cleanupMu sync.Mutex 42 cleanup map[string][]cleanupFunc 43} 44 45func New(ctx context.Context, cfg *config.Config, db *db.DB, n *notifier.Notifier) (*Engine, error) { 46 dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) 47 if err != nil { 48 return nil, err 49 } 50 51 l := log.FromContext(ctx).With("component", "spindle") 52 53 e := &Engine{ 54 docker: dcli, 55 l: l, 56 db: db, 57 n: n, 58 cfg: cfg, 59 } 60 61 e.cleanup = make(map[string][]cleanupFunc) 62 63 return e, nil 64} 65 66func (e *Engine) StartWorkflows(ctx context.Context, pipeline *models.Pipeline, pipelineId models.PipelineId) { 67 e.l.Info("starting all workflows in parallel", "pipeline", pipelineId) 68 69 wg := sync.WaitGroup{} 70 for _, w := range pipeline.Workflows { 71 wg.Add(1) 72 go func() error { 73 defer wg.Done() 74 wid := models.WorkflowId{ 75 PipelineId: pipelineId, 76 Name: w.Name, 77 } 78 79 err := e.db.StatusRunning(wid, e.n) 80 if err != nil { 81 return err 82 } 83 84 err = e.SetupWorkflow(ctx, wid) 85 if err != nil { 86 e.l.Error("setting up worklow", "wid", wid, "err", err) 87 return err 88 } 89 defer e.DestroyWorkflow(ctx, wid) 90 91 reader, err := e.docker.ImagePull(ctx, w.Image, image.PullOptions{}) 92 if err != nil { 93 e.l.Error("pipeline failed!", "workflowId", wid, "error", err.Error()) 94 95 err := e.db.StatusFailed(wid, err.Error(), -1, e.n) 96 if err != nil { 97 return err 98 } 99 100 return fmt.Errorf("pulling image: %w", err) 101 } 102 defer reader.Close() 103 io.Copy(os.Stdout, reader) 104 105 err = e.StartSteps(ctx, w.Steps, wid, w.Image) 106 if err != nil { 107 if errors.Is(err, ErrTimedOut) { 108 dbErr := e.db.StatusTimeout(wid, e.n) 109 if dbErr != nil { 110 return dbErr 111 } 112 } else { 113 dbErr := e.db.StatusFailed(wid, err.Error(), -1, e.n) 114 if dbErr != nil { 115 return dbErr 116 } 117 } 118 119 return fmt.Errorf("starting steps image: %w", err) 120 } 121 122 err = e.db.StatusSuccess(wid, e.n) 123 if err != nil { 124 return err 125 } 126 127 return nil 128 }() 129 } 130 131 wg.Wait() 132} 133 134// SetupWorkflow sets up a new network for the workflow and volumes for 135// the workspace and Nix store. These are persisted across steps and are 136// destroyed at the end of the workflow. 137func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId) error { 138 e.l.Info("setting up workflow", "workflow", wid) 139 140 _, err := e.docker.VolumeCreate(ctx, volume.CreateOptions{ 141 Name: workspaceVolume(wid), 142 Driver: "local", 143 }) 144 if err != nil { 145 return err 146 } 147 e.registerCleanup(wid, func(ctx context.Context) error { 148 return e.docker.VolumeRemove(ctx, workspaceVolume(wid), true) 149 }) 150 151 _, err = e.docker.VolumeCreate(ctx, volume.CreateOptions{ 152 Name: nixVolume(wid), 153 Driver: "local", 154 }) 155 if err != nil { 156 return err 157 } 158 e.registerCleanup(wid, func(ctx context.Context) error { 159 return e.docker.VolumeRemove(ctx, nixVolume(wid), true) 160 }) 161 162 _, err = e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{ 163 Driver: "bridge", 164 }) 165 if err != nil { 166 return err 167 } 168 e.registerCleanup(wid, func(ctx context.Context) error { 169 return e.docker.NetworkRemove(ctx, networkName(wid)) 170 }) 171 172 return nil 173} 174 175// StartSteps starts all steps sequentially with the same base image. 176// ONLY marks pipeline as failed if container's exit code is non-zero. 177// All other errors are bubbled up. 178// Fixed version of the step execution logic 179func (e *Engine) StartSteps(ctx context.Context, steps []models.Step, wid models.WorkflowId, image string) error { 180 stepTimeoutStr := e.cfg.Pipelines.StepTimeout 181 stepTimeout, err := time.ParseDuration(stepTimeoutStr) 182 if err != nil { 183 e.l.Error("failed to parse step timeout", "error", err, "timeout", stepTimeoutStr) 184 stepTimeout = 5 * time.Minute 185 } 186 e.l.Info("using step timeout", "timeout", stepTimeout) 187 188 for stepIdx, step := range steps { 189 envs := ConstructEnvs(step.Environment) 190 envs.AddEnv("HOME", workspaceDir) 191 e.l.Debug("envs for step", "step", step.Name, "envs", envs.Slice()) 192 193 hostConfig := hostConfig(wid) 194 resp, err := e.docker.ContainerCreate(ctx, &container.Config{ 195 Image: image, 196 Cmd: []string{"bash", "-c", step.Command}, 197 WorkingDir: workspaceDir, 198 Tty: false, 199 Hostname: "spindle", 200 Env: envs.Slice(), 201 }, hostConfig, nil, nil, "") 202 if err != nil { 203 return fmt.Errorf("creating container: %w", err) 204 } 205 206 err = e.docker.NetworkConnect(ctx, networkName(wid), resp.ID, nil) 207 if err != nil { 208 return fmt.Errorf("connecting network: %w", err) 209 } 210 211 stepCtx, stepCancel := context.WithTimeout(ctx, stepTimeout) 212 213 err = e.docker.ContainerStart(stepCtx, resp.ID, container.StartOptions{}) 214 if err != nil { 215 stepCancel() 216 return err 217 } 218 e.l.Info("started container", "name", resp.ID, "step", step.Name) 219 220 // start tailing logs in background 221 tailDone := make(chan error, 1) 222 go func() { 223 tailDone <- e.TailStep(stepCtx, resp.ID, wid, stepIdx) 224 }() 225 226 // wait for container completion or timeout 227 waitDone := make(chan struct{}) 228 var state *container.State 229 var waitErr error 230 231 go func() { 232 defer close(waitDone) 233 state, waitErr = e.WaitStep(stepCtx, resp.ID) 234 }() 235 236 select { 237 case <-waitDone: 238 239 // wait for tailing to complete 240 <-tailDone 241 stepCancel() 242 243 case <-stepCtx.Done(): 244 e.l.Warn("step timed out; killing container", "container", resp.ID, "timeout", stepTimeout) 245 246 _ = e.DestroyStep(ctx, resp.ID) 247 248 // wait for both goroutines to finish 249 <-waitDone 250 <-tailDone 251 252 stepCancel() 253 return ErrTimedOut 254 } 255 256 if waitErr != nil { 257 return waitErr 258 } 259 260 err = e.DestroyStep(ctx, resp.ID) 261 if err != nil { 262 return err 263 } 264 265 if state.ExitCode != 0 { 266 e.l.Error("workflow failed!", "workflow_id", wid.String(), "error", state.Error, "exit_code", state.ExitCode, "oom_killed", state.OOMKilled) 267 if state.OOMKilled { 268 return ErrOOMKilled 269 } 270 return ErrWorkflowFailed 271 } 272 } 273 274 return nil 275} 276 277func (e *Engine) WaitStep(ctx context.Context, containerID string) (*container.State, error) { 278 wait, errCh := e.docker.ContainerWait(ctx, containerID, container.WaitConditionNotRunning) 279 select { 280 case err := <-errCh: 281 if err != nil { 282 return nil, err 283 } 284 case <-wait: 285 } 286 287 e.l.Info("waited for container", "name", containerID) 288 289 info, err := e.docker.ContainerInspect(ctx, containerID) 290 if err != nil { 291 return nil, err 292 } 293 294 return info.State, nil 295} 296 297func (e *Engine) TailStep(ctx context.Context, containerID string, wid models.WorkflowId, stepIdx int) error { 298 logs, err := e.docker.ContainerLogs(ctx, containerID, container.LogsOptions{ 299 Follow: true, 300 ShowStdout: true, 301 ShowStderr: true, 302 Details: false, 303 Timestamps: false, 304 }) 305 if err != nil { 306 return err 307 } 308 309 wfLogger, err := NewWorkflowLogger(e.cfg.Pipelines.LogDir, wid) 310 if err != nil { 311 e.l.Warn("failed to setup step logger; logs will not be persisted", "error", err) 312 return err 313 } 314 defer wfLogger.Close() 315 316 _, err = stdcopy.StdCopy(wfLogger.Stdout(), wfLogger.Stderr(), logs) 317 if err != nil && err != io.EOF && !errors.Is(err, context.DeadlineExceeded) { 318 return fmt.Errorf("failed to copy logs: %w", err) 319 } 320 321 return nil 322} 323 324func (e *Engine) DestroyStep(ctx context.Context, containerID string) error { 325 err := e.docker.ContainerKill(ctx, containerID, "9") // SIGKILL 326 if err != nil && !isErrContainerNotFoundOrNotRunning(err) { 327 return err 328 } 329 330 if err := e.docker.ContainerRemove(ctx, containerID, container.RemoveOptions{ 331 RemoveVolumes: true, 332 RemoveLinks: false, 333 Force: false, 334 }); err != nil && !isErrContainerNotFoundOrNotRunning(err) { 335 return err 336 } 337 338 return nil 339} 340 341func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error { 342 e.cleanupMu.Lock() 343 key := wid.String() 344 345 fns := e.cleanup[key] 346 delete(e.cleanup, key) 347 e.cleanupMu.Unlock() 348 349 for _, fn := range fns { 350 if err := fn(ctx); err != nil { 351 e.l.Error("failed to cleanup workflow resource", "workflowId", wid, "error", err) 352 } 353 } 354 return nil 355} 356 357func (e *Engine) registerCleanup(wid models.WorkflowId, fn cleanupFunc) { 358 e.cleanupMu.Lock() 359 defer e.cleanupMu.Unlock() 360 361 key := wid.String() 362 e.cleanup[key] = append(e.cleanup[key], fn) 363} 364 365func workspaceVolume(wid models.WorkflowId) string { 366 return fmt.Sprintf("workspace-%s", wid) 367} 368 369func nixVolume(wid models.WorkflowId) string { 370 return fmt.Sprintf("nix-%s", wid) 371} 372 373func networkName(wid models.WorkflowId) string { 374 return fmt.Sprintf("workflow-network-%s", wid) 375} 376 377func hostConfig(wid models.WorkflowId) *container.HostConfig { 378 hostConfig := &container.HostConfig{ 379 Mounts: []mount.Mount{ 380 { 381 Type: mount.TypeVolume, 382 Source: workspaceVolume(wid), 383 Target: workspaceDir, 384 }, 385 { 386 Type: mount.TypeVolume, 387 Source: nixVolume(wid), 388 Target: "/nix", 389 }, 390 { 391 Type: mount.TypeTmpfs, 392 Target: "/tmp", 393 ReadOnly: false, 394 TmpfsOptions: &mount.TmpfsOptions{ 395 Mode: 0o1777, // world-writeable sticky bit 396 }, 397 }, 398 { 399 Type: mount.TypeVolume, 400 Source: "etc-nix-" + wid.String(), 401 Target: "/etc/nix", 402 }, 403 }, 404 ReadonlyRootfs: false, 405 CapDrop: []string{"ALL"}, 406 CapAdd: []string{"CAP_DAC_OVERRIDE"}, 407 SecurityOpt: []string{"no-new-privileges"}, 408 ExtraHosts: []string{"host.docker.internal:host-gateway"}, 409 } 410 411 return hostConfig 412} 413 414// thanks woodpecker 415func isErrContainerNotFoundOrNotRunning(err error) bool { 416 // Error response from daemon: Cannot kill container: ...: No such container: ... 417 // Error response from daemon: Cannot kill container: ...: Container ... is not running" 418 // Error response from podman daemon: can only kill running containers. ... is in state exited 419 // Error: No such container: ... 420 return err != nil && (strings.Contains(err.Error(), "No such container") || strings.Contains(err.Error(), "is not running") || strings.Contains(err.Error(), "can only kill running containers")) 421}