this repo has no description
1package engine 2 3import ( 4 "bufio" 5 "context" 6 "fmt" 7 "io" 8 "log/slog" 9 "os" 10 "strings" 11 "sync" 12 13 "github.com/docker/docker/api/types/container" 14 "github.com/docker/docker/api/types/image" 15 "github.com/docker/docker/api/types/mount" 16 "github.com/docker/docker/api/types/network" 17 "github.com/docker/docker/api/types/volume" 18 "github.com/docker/docker/client" 19 "github.com/docker/docker/pkg/stdcopy" 20 "tangled.sh/tangled.sh/core/log" 21 "tangled.sh/tangled.sh/core/notifier" 22 "tangled.sh/tangled.sh/core/spindle/config" 23 "tangled.sh/tangled.sh/core/spindle/db" 24 "tangled.sh/tangled.sh/core/spindle/models" 25) 26 27const ( 28 workspaceDir = "/tangled/workspace" 29) 30 31type cleanupFunc func(context.Context) error 32 33type Engine struct { 34 docker client.APIClient 35 l *slog.Logger 36 db *db.DB 37 n *notifier.Notifier 38 cfg *config.Config 39 40 chanMu sync.RWMutex 41 stdoutChans map[string]chan string 42 stderrChans map[string]chan string 43 44 cleanupMu sync.Mutex 45 cleanup map[string][]cleanupFunc 46} 47 48func New(ctx context.Context, cfg *config.Config, db *db.DB, n *notifier.Notifier) (*Engine, error) { 49 dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) 50 if err != nil { 51 return nil, err 52 } 53 54 l := log.FromContext(ctx).With("component", "spindle") 55 56 e := &Engine{ 57 docker: dcli, 58 l: l, 59 db: db, 60 n: n, 61 cfg: cfg, 62 } 63 64 e.stdoutChans = make(map[string]chan string, 100) 65 e.stderrChans = make(map[string]chan string, 100) 66 67 e.cleanup = make(map[string][]cleanupFunc) 68 69 return e, nil 70} 71 72func (e *Engine) StartWorkflows(ctx context.Context, pipeline *models.Pipeline, pipelineId models.PipelineId) { 73 e.l.Info("starting all workflows in parallel", "pipeline", pipelineId) 74 75 wg := sync.WaitGroup{} 76 for _, w := range pipeline.Workflows { 77 wg.Add(1) 78 go func() error { 79 defer wg.Done() 80 wid := models.WorkflowId{ 81 PipelineId: pipelineId, 82 Name: w.Name, 83 } 84 85 err := e.db.StatusRunning(wid, e.n) 86 if err != nil { 87 return err 88 } 89 90 err = e.SetupWorkflow(ctx, wid) 91 if err != nil { 92 e.l.Error("setting up worklow", "wid", wid, "err", err) 93 return err 94 } 95 defer e.DestroyWorkflow(ctx, wid) 96 97 reader, err := e.docker.ImagePull(ctx, w.Image, image.PullOptions{}) 98 if err != nil { 99 e.l.Error("pipeline failed!", "workflowId", wid, "error", err.Error()) 100 101 err := e.db.StatusFailed(wid, err.Error(), -1, e.n) 102 if err != nil { 103 return err 104 } 105 106 return fmt.Errorf("pulling image: %w", err) 107 } 108 defer reader.Close() 109 io.Copy(os.Stdout, reader) 110 111 err = e.StartSteps(ctx, w.Steps, wid, w.Image) 112 if err != nil { 113 e.l.Error("workflow failed!", "wid", wid.String(), "error", err.Error()) 114 115 dbErr := e.db.StatusFailed(wid, err.Error(), -1, e.n) 116 if dbErr != nil { 117 return dbErr 118 } 119 120 return fmt.Errorf("starting steps image: %w", err) 121 } 122 123 err = e.db.StatusSuccess(wid, e.n) 124 if err != nil { 125 return err 126 } 127 128 return nil 129 }() 130 } 131 132 wg.Wait() 133} 134 135// SetupWorkflow sets up a new network for the workflow and volumes for 136// the workspace and Nix store. These are persisted across steps and are 137// destroyed at the end of the workflow. 138func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId) error { 139 e.l.Info("setting up workflow", "workflow", wid) 140 141 _, err := e.docker.VolumeCreate(ctx, volume.CreateOptions{ 142 Name: workspaceVolume(wid), 143 Driver: "local", 144 }) 145 if err != nil { 146 return err 147 } 148 e.registerCleanup(wid, func(ctx context.Context) error { 149 return e.docker.VolumeRemove(ctx, workspaceVolume(wid), true) 150 }) 151 152 _, err = e.docker.VolumeCreate(ctx, volume.CreateOptions{ 153 Name: nixVolume(wid), 154 Driver: "local", 155 }) 156 if err != nil { 157 return err 158 } 159 e.registerCleanup(wid, func(ctx context.Context) error { 160 return e.docker.VolumeRemove(ctx, nixVolume(wid), true) 161 }) 162 163 _, err = e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{ 164 Driver: "bridge", 165 }) 166 if err != nil { 167 return err 168 } 169 e.registerCleanup(wid, func(ctx context.Context) error { 170 return e.docker.NetworkRemove(ctx, networkName(wid)) 171 }) 172 173 return nil 174} 175 176// StartSteps starts all steps sequentially with the same base image. 177// ONLY marks pipeline as failed if container's exit code is non-zero. 178// All other errors are bubbled up. 179func (e *Engine) StartSteps(ctx context.Context, steps []models.Step, wid models.WorkflowId, image string) error { 180 // set up logging channels 181 e.chanMu.Lock() 182 if _, exists := e.stdoutChans[wid.String()]; !exists { 183 e.stdoutChans[wid.String()] = make(chan string, 100) 184 } 185 if _, exists := e.stderrChans[wid.String()]; !exists { 186 e.stderrChans[wid.String()] = make(chan string, 100) 187 } 188 e.chanMu.Unlock() 189 190 // close channels after all steps are complete 191 defer func() { 192 close(e.stdoutChans[wid.String()]) 193 close(e.stderrChans[wid.String()]) 194 }() 195 196 for _, step := range steps { 197 envs := ConstructEnvs(step.Environment) 198 envs.AddEnv("HOME", workspaceDir) 199 e.l.Debug("envs for step", "step", step.Name, "envs", envs.Slice()) 200 201 hostConfig := hostConfig(wid) 202 resp, err := e.docker.ContainerCreate(ctx, &container.Config{ 203 Image: image, 204 Cmd: []string{"bash", "-c", step.Command}, 205 WorkingDir: workspaceDir, 206 Tty: false, 207 Hostname: "spindle", 208 Env: envs.Slice(), 209 }, hostConfig, nil, nil, "") 210 if err != nil { 211 return fmt.Errorf("creating container: %w", err) 212 } 213 214 err = e.docker.NetworkConnect(ctx, networkName(wid), resp.ID, nil) 215 if err != nil { 216 return fmt.Errorf("connecting network: %w", err) 217 } 218 219 err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{}) 220 if err != nil { 221 return err 222 } 223 e.l.Info("started container", "name", resp.ID, "step", step.Name) 224 225 wg := sync.WaitGroup{} 226 227 wg.Add(1) 228 go func() { 229 defer wg.Done() 230 err := e.TailStep(ctx, resp.ID, wid) 231 if err != nil { 232 e.l.Error("failed to tail container", "container", resp.ID) 233 return 234 } 235 }() 236 237 // wait until all logs are piped 238 wg.Wait() 239 240 state, err := e.WaitStep(ctx, resp.ID) 241 if err != nil { 242 return err 243 } 244 245 err = e.DestroyStep(ctx, resp.ID) 246 if err != nil { 247 return err 248 } 249 250 if state.ExitCode != 0 { 251 e.l.Error("workflow failed!", "workflow_id", wid.String(), "error", state.Error, "exit_code", state.ExitCode, "oom_killed", state.OOMKilled) 252 err := e.db.StatusFailed(wid, state.Error, int64(state.ExitCode), e.n) 253 if err != nil { 254 return err 255 } 256 return fmt.Errorf("error: %s, exit code: %d, oom: %s", state.Error, state.ExitCode, state.OOMKilled) 257 } 258 } 259 260 return nil 261 262} 263 264func (e *Engine) WaitStep(ctx context.Context, containerID string) (*container.State, error) { 265 wait, errCh := e.docker.ContainerWait(ctx, containerID, container.WaitConditionNotRunning) 266 select { 267 case err := <-errCh: 268 if err != nil { 269 return nil, err 270 } 271 case <-wait: 272 } 273 274 e.l.Info("waited for container", "name", containerID) 275 276 info, err := e.docker.ContainerInspect(ctx, containerID) 277 if err != nil { 278 return nil, err 279 } 280 281 return info.State, nil 282} 283 284func (e *Engine) TailStep(ctx context.Context, containerID string, wid models.WorkflowId) error { 285 logs, err := e.docker.ContainerLogs(ctx, containerID, container.LogsOptions{ 286 Follow: true, 287 ShowStdout: true, 288 ShowStderr: true, 289 Details: true, 290 Timestamps: false, 291 }) 292 if err != nil { 293 return err 294 } 295 296 var devOutput io.Writer = io.Discard 297 if e.cfg.Server.Dev { 298 devOutput = &ansiStrippingWriter{underlying: os.Stdout} 299 } 300 301 tee := io.TeeReader(logs, devOutput) 302 303 // using StdCopy we demux logs and stream stdout and stderr to different 304 // channels. 305 // 306 // stdout w||r stdoutCh 307 // stderr w||r stderrCh 308 // 309 310 rpipeOut, wpipeOut := io.Pipe() 311 rpipeErr, wpipeErr := io.Pipe() 312 313 wg := sync.WaitGroup{} 314 315 wg.Add(1) 316 go func() { 317 defer wg.Done() 318 defer wpipeOut.Close() 319 defer wpipeErr.Close() 320 _, err := stdcopy.StdCopy(wpipeOut, wpipeErr, tee) 321 if err != nil && err != io.EOF { 322 e.l.Error("failed to copy logs", "error", err) 323 } 324 }() 325 326 // read from stdout and send to stdout pipe 327 // NOTE: the stdoutCh channnel is closed further up in StartSteps 328 // once all steps are done. 329 wg.Add(1) 330 go func() { 331 defer wg.Done() 332 e.chanMu.RLock() 333 stdoutCh := e.stdoutChans[wid.String()] 334 e.chanMu.RUnlock() 335 336 scanner := bufio.NewScanner(rpipeOut) 337 for scanner.Scan() { 338 stdoutCh <- scanner.Text() 339 } 340 if err := scanner.Err(); err != nil { 341 e.l.Error("failed to scan stdout", "error", err) 342 } 343 }() 344 345 // read from stderr and send to stderr pipe 346 // NOTE: the stderrCh channnel is closed further up in StartSteps 347 // once all steps are done. 348 wg.Add(1) 349 go func() { 350 defer wg.Done() 351 e.chanMu.RLock() 352 stderrCh := e.stderrChans[wid.String()] 353 e.chanMu.RUnlock() 354 355 scanner := bufio.NewScanner(rpipeErr) 356 for scanner.Scan() { 357 stderrCh <- scanner.Text() 358 } 359 if err := scanner.Err(); err != nil { 360 e.l.Error("failed to scan stderr", "error", err) 361 } 362 }() 363 364 wg.Wait() 365 366 return nil 367} 368 369func (e *Engine) DestroyStep(ctx context.Context, containerID string) error { 370 err := e.docker.ContainerKill(ctx, containerID, "9") // SIGKILL 371 if err != nil && !isErrContainerNotFoundOrNotRunning(err) { 372 return err 373 } 374 375 if err := e.docker.ContainerRemove(ctx, containerID, container.RemoveOptions{ 376 RemoveVolumes: true, 377 RemoveLinks: false, 378 Force: false, 379 }); err != nil && !isErrContainerNotFoundOrNotRunning(err) { 380 return err 381 } 382 383 return nil 384} 385 386func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error { 387 e.cleanupMu.Lock() 388 key := wid.String() 389 390 fns := e.cleanup[key] 391 delete(e.cleanup, key) 392 e.cleanupMu.Unlock() 393 394 for _, fn := range fns { 395 if err := fn(ctx); err != nil { 396 e.l.Error("failed to cleanup workflow resource", "workflowId", wid) 397 } 398 } 399 return nil 400} 401 402func (e *Engine) LogChannels(wid models.WorkflowId) (stdout <-chan string, stderr <-chan string, ok bool) { 403 e.chanMu.RLock() 404 defer e.chanMu.RUnlock() 405 406 stdoutCh, ok1 := e.stdoutChans[wid.String()] 407 stderrCh, ok2 := e.stderrChans[wid.String()] 408 409 if !ok1 || !ok2 { 410 return nil, nil, false 411 } 412 return stdoutCh, stderrCh, true 413} 414 415func (e *Engine) registerCleanup(wid models.WorkflowId, fn cleanupFunc) { 416 e.cleanupMu.Lock() 417 defer e.cleanupMu.Unlock() 418 419 key := wid.String() 420 e.cleanup[key] = append(e.cleanup[key], fn) 421} 422 423func workspaceVolume(wid models.WorkflowId) string { 424 return fmt.Sprintf("workspace-%s", wid) 425} 426 427func nixVolume(wid models.WorkflowId) string { 428 return fmt.Sprintf("nix-%s", wid) 429} 430 431func networkName(wid models.WorkflowId) string { 432 return fmt.Sprintf("workflow-network-%s", wid) 433} 434 435func hostConfig(wid models.WorkflowId) *container.HostConfig { 436 hostConfig := &container.HostConfig{ 437 Mounts: []mount.Mount{ 438 { 439 Type: mount.TypeVolume, 440 Source: workspaceVolume(wid), 441 Target: workspaceDir, 442 }, 443 { 444 Type: mount.TypeVolume, 445 Source: nixVolume(wid), 446 Target: "/nix", 447 }, 448 { 449 Type: mount.TypeTmpfs, 450 Target: "/tmp", 451 }, 452 }, 453 ReadonlyRootfs: false, 454 CapDrop: []string{"ALL"}, 455 SecurityOpt: []string{"seccomp=unconfined"}, 456 } 457 458 return hostConfig 459} 460 461// thanks woodpecker 462func isErrContainerNotFoundOrNotRunning(err error) bool { 463 // Error response from daemon: Cannot kill container: ...: No such container: ... 464 // Error response from daemon: Cannot kill container: ...: Container ... is not running" 465 // Error response from podman daemon: can only kill running containers. ... is in state exited 466 // Error: No such container: ... 467 return err != nil && (strings.Contains(err.Error(), "No such container") || strings.Contains(err.Error(), "is not running") || strings.Contains(err.Error(), "can only kill running containers")) 468}