this repo has no description
1package engine
2
3import (
4 "bufio"
5 "context"
6 "fmt"
7 "io"
8 "log/slog"
9 "os"
10 "strings"
11 "sync"
12
13 "github.com/docker/docker/api/types/container"
14 "github.com/docker/docker/api/types/image"
15 "github.com/docker/docker/api/types/mount"
16 "github.com/docker/docker/api/types/network"
17 "github.com/docker/docker/api/types/volume"
18 "github.com/docker/docker/client"
19 "github.com/docker/docker/pkg/stdcopy"
20 "tangled.sh/tangled.sh/core/log"
21 "tangled.sh/tangled.sh/core/notifier"
22 "tangled.sh/tangled.sh/core/spindle/config"
23 "tangled.sh/tangled.sh/core/spindle/db"
24 "tangled.sh/tangled.sh/core/spindle/models"
25)
26
27const (
28 workspaceDir = "/tangled/workspace"
29)
30
31type cleanupFunc func(context.Context) error
32
33type Engine struct {
34 docker client.APIClient
35 l *slog.Logger
36 db *db.DB
37 n *notifier.Notifier
38 cfg *config.Config
39
40 chanMu sync.RWMutex
41 stdoutChans map[string]chan string
42 stderrChans map[string]chan string
43
44 cleanupMu sync.Mutex
45 cleanup map[string][]cleanupFunc
46}
47
48func New(ctx context.Context, cfg *config.Config, db *db.DB, n *notifier.Notifier) (*Engine, error) {
49 dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
50 if err != nil {
51 return nil, err
52 }
53
54 l := log.FromContext(ctx).With("component", "spindle")
55
56 e := &Engine{
57 docker: dcli,
58 l: l,
59 db: db,
60 n: n,
61 cfg: cfg,
62 }
63
64 e.stdoutChans = make(map[string]chan string, 100)
65 e.stderrChans = make(map[string]chan string, 100)
66
67 e.cleanup = make(map[string][]cleanupFunc)
68
69 return e, nil
70}
71
72func (e *Engine) StartWorkflows(ctx context.Context, pipeline *models.Pipeline, pipelineId models.PipelineId) {
73 e.l.Info("starting all workflows in parallel", "pipeline", pipelineId)
74
75 wg := sync.WaitGroup{}
76 for _, w := range pipeline.Workflows {
77 wg.Add(1)
78 go func() error {
79 defer wg.Done()
80 wid := models.WorkflowId{
81 PipelineId: pipelineId,
82 Name: w.Name,
83 }
84
85 err := e.db.StatusRunning(wid, e.n)
86 if err != nil {
87 return err
88 }
89
90 err = e.SetupWorkflow(ctx, wid)
91 if err != nil {
92 e.l.Error("setting up worklow", "wid", wid, "err", err)
93 return err
94 }
95 defer e.DestroyWorkflow(ctx, wid)
96
97 reader, err := e.docker.ImagePull(ctx, w.Image, image.PullOptions{})
98 if err != nil {
99 e.l.Error("pipeline failed!", "workflowId", wid, "error", err.Error())
100
101 err := e.db.StatusFailed(wid, err.Error(), -1, e.n)
102 if err != nil {
103 return err
104 }
105
106 return fmt.Errorf("pulling image: %w", err)
107 }
108 defer reader.Close()
109 io.Copy(os.Stdout, reader)
110
111 err = e.StartSteps(ctx, w.Steps, wid, w.Image)
112 if err != nil {
113 e.l.Error("workflow failed!", "wid", wid.String(), "error", err.Error())
114
115 dbErr := e.db.StatusFailed(wid, err.Error(), -1, e.n)
116 if dbErr != nil {
117 return dbErr
118 }
119
120 return fmt.Errorf("starting steps image: %w", err)
121 }
122
123 err = e.db.StatusSuccess(wid, e.n)
124 if err != nil {
125 return err
126 }
127
128 return nil
129 }()
130 }
131
132 wg.Wait()
133}
134
135// SetupWorkflow sets up a new network for the workflow and volumes for
136// the workspace and Nix store. These are persisted across steps and are
137// destroyed at the end of the workflow.
138func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId) error {
139 e.l.Info("setting up workflow", "workflow", wid)
140
141 _, err := e.docker.VolumeCreate(ctx, volume.CreateOptions{
142 Name: workspaceVolume(wid),
143 Driver: "local",
144 })
145 if err != nil {
146 return err
147 }
148 e.registerCleanup(wid, func(ctx context.Context) error {
149 return e.docker.VolumeRemove(ctx, workspaceVolume(wid), true)
150 })
151
152 _, err = e.docker.VolumeCreate(ctx, volume.CreateOptions{
153 Name: nixVolume(wid),
154 Driver: "local",
155 })
156 if err != nil {
157 return err
158 }
159 e.registerCleanup(wid, func(ctx context.Context) error {
160 return e.docker.VolumeRemove(ctx, nixVolume(wid), true)
161 })
162
163 _, err = e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{
164 Driver: "bridge",
165 })
166 if err != nil {
167 return err
168 }
169 e.registerCleanup(wid, func(ctx context.Context) error {
170 return e.docker.NetworkRemove(ctx, networkName(wid))
171 })
172
173 return nil
174}
175
176// StartSteps starts all steps sequentially with the same base image.
177// ONLY marks pipeline as failed if container's exit code is non-zero.
178// All other errors are bubbled up.
179func (e *Engine) StartSteps(ctx context.Context, steps []models.Step, wid models.WorkflowId, image string) error {
180 // set up logging channels
181 e.chanMu.Lock()
182 if _, exists := e.stdoutChans[wid.String()]; !exists {
183 e.stdoutChans[wid.String()] = make(chan string, 100)
184 }
185 if _, exists := e.stderrChans[wid.String()]; !exists {
186 e.stderrChans[wid.String()] = make(chan string, 100)
187 }
188 e.chanMu.Unlock()
189
190 // close channels after all steps are complete
191 defer func() {
192 close(e.stdoutChans[wid.String()])
193 close(e.stderrChans[wid.String()])
194 }()
195
196 for _, step := range steps {
197 envs := ConstructEnvs(step.Environment)
198 envs.AddEnv("HOME", workspaceDir)
199 e.l.Debug("envs for step", "step", step.Name, "envs", envs.Slice())
200
201 hostConfig := hostConfig(wid)
202 resp, err := e.docker.ContainerCreate(ctx, &container.Config{
203 Image: image,
204 Cmd: []string{"bash", "-c", step.Command},
205 WorkingDir: workspaceDir,
206 Tty: false,
207 Hostname: "spindle",
208 Env: envs.Slice(),
209 }, hostConfig, nil, nil, "")
210 if err != nil {
211 return fmt.Errorf("creating container: %w", err)
212 }
213
214 err = e.docker.NetworkConnect(ctx, networkName(wid), resp.ID, nil)
215 if err != nil {
216 return fmt.Errorf("connecting network: %w", err)
217 }
218
219 err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{})
220 if err != nil {
221 return err
222 }
223 e.l.Info("started container", "name", resp.ID, "step", step.Name)
224
225 wg := sync.WaitGroup{}
226
227 wg.Add(1)
228 go func() {
229 defer wg.Done()
230 err := e.TailStep(ctx, resp.ID, wid)
231 if err != nil {
232 e.l.Error("failed to tail container", "container", resp.ID)
233 return
234 }
235 }()
236
237 // wait until all logs are piped
238 wg.Wait()
239
240 state, err := e.WaitStep(ctx, resp.ID)
241 if err != nil {
242 return err
243 }
244
245 err = e.DestroyStep(ctx, resp.ID)
246 if err != nil {
247 return err
248 }
249
250 if state.ExitCode != 0 {
251 e.l.Error("workflow failed!", "workflow_id", wid.String(), "error", state.Error, "exit_code", state.ExitCode, "oom_killed", state.OOMKilled)
252 err := e.db.StatusFailed(wid, state.Error, int64(state.ExitCode), e.n)
253 if err != nil {
254 return err
255 }
256 return fmt.Errorf("error: %s, exit code: %d, oom: %s", state.Error, state.ExitCode, state.OOMKilled)
257 }
258 }
259
260 return nil
261
262}
263
264func (e *Engine) WaitStep(ctx context.Context, containerID string) (*container.State, error) {
265 wait, errCh := e.docker.ContainerWait(ctx, containerID, container.WaitConditionNotRunning)
266 select {
267 case err := <-errCh:
268 if err != nil {
269 return nil, err
270 }
271 case <-wait:
272 }
273
274 e.l.Info("waited for container", "name", containerID)
275
276 info, err := e.docker.ContainerInspect(ctx, containerID)
277 if err != nil {
278 return nil, err
279 }
280
281 return info.State, nil
282}
283
284func (e *Engine) TailStep(ctx context.Context, containerID string, wid models.WorkflowId) error {
285 logs, err := e.docker.ContainerLogs(ctx, containerID, container.LogsOptions{
286 Follow: true,
287 ShowStdout: true,
288 ShowStderr: true,
289 Details: true,
290 Timestamps: false,
291 })
292 if err != nil {
293 return err
294 }
295
296 var devOutput io.Writer = io.Discard
297 if e.cfg.Server.Dev {
298 devOutput = &ansiStrippingWriter{underlying: os.Stdout}
299 }
300
301 tee := io.TeeReader(logs, devOutput)
302
303 // using StdCopy we demux logs and stream stdout and stderr to different
304 // channels.
305 //
306 // stdout w||r stdoutCh
307 // stderr w||r stderrCh
308 //
309
310 rpipeOut, wpipeOut := io.Pipe()
311 rpipeErr, wpipeErr := io.Pipe()
312
313 wg := sync.WaitGroup{}
314
315 wg.Add(1)
316 go func() {
317 defer wg.Done()
318 defer wpipeOut.Close()
319 defer wpipeErr.Close()
320 _, err := stdcopy.StdCopy(wpipeOut, wpipeErr, tee)
321 if err != nil && err != io.EOF {
322 e.l.Error("failed to copy logs", "error", err)
323 }
324 }()
325
326 // read from stdout and send to stdout pipe
327 // NOTE: the stdoutCh channnel is closed further up in StartSteps
328 // once all steps are done.
329 wg.Add(1)
330 go func() {
331 defer wg.Done()
332 e.chanMu.RLock()
333 stdoutCh := e.stdoutChans[wid.String()]
334 e.chanMu.RUnlock()
335
336 scanner := bufio.NewScanner(rpipeOut)
337 for scanner.Scan() {
338 stdoutCh <- scanner.Text()
339 }
340 if err := scanner.Err(); err != nil {
341 e.l.Error("failed to scan stdout", "error", err)
342 }
343 }()
344
345 // read from stderr and send to stderr pipe
346 // NOTE: the stderrCh channnel is closed further up in StartSteps
347 // once all steps are done.
348 wg.Add(1)
349 go func() {
350 defer wg.Done()
351 e.chanMu.RLock()
352 stderrCh := e.stderrChans[wid.String()]
353 e.chanMu.RUnlock()
354
355 scanner := bufio.NewScanner(rpipeErr)
356 for scanner.Scan() {
357 stderrCh <- scanner.Text()
358 }
359 if err := scanner.Err(); err != nil {
360 e.l.Error("failed to scan stderr", "error", err)
361 }
362 }()
363
364 wg.Wait()
365
366 return nil
367}
368
369func (e *Engine) DestroyStep(ctx context.Context, containerID string) error {
370 err := e.docker.ContainerKill(ctx, containerID, "9") // SIGKILL
371 if err != nil && !isErrContainerNotFoundOrNotRunning(err) {
372 return err
373 }
374
375 if err := e.docker.ContainerRemove(ctx, containerID, container.RemoveOptions{
376 RemoveVolumes: true,
377 RemoveLinks: false,
378 Force: false,
379 }); err != nil && !isErrContainerNotFoundOrNotRunning(err) {
380 return err
381 }
382
383 return nil
384}
385
386func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error {
387 e.cleanupMu.Lock()
388 key := wid.String()
389
390 fns := e.cleanup[key]
391 delete(e.cleanup, key)
392 e.cleanupMu.Unlock()
393
394 for _, fn := range fns {
395 if err := fn(ctx); err != nil {
396 e.l.Error("failed to cleanup workflow resource", "workflowId", wid)
397 }
398 }
399 return nil
400}
401
402func (e *Engine) LogChannels(wid models.WorkflowId) (stdout <-chan string, stderr <-chan string, ok bool) {
403 e.chanMu.RLock()
404 defer e.chanMu.RUnlock()
405
406 stdoutCh, ok1 := e.stdoutChans[wid.String()]
407 stderrCh, ok2 := e.stderrChans[wid.String()]
408
409 if !ok1 || !ok2 {
410 return nil, nil, false
411 }
412 return stdoutCh, stderrCh, true
413}
414
415func (e *Engine) registerCleanup(wid models.WorkflowId, fn cleanupFunc) {
416 e.cleanupMu.Lock()
417 defer e.cleanupMu.Unlock()
418
419 key := wid.String()
420 e.cleanup[key] = append(e.cleanup[key], fn)
421}
422
423func workspaceVolume(wid models.WorkflowId) string {
424 return fmt.Sprintf("workspace-%s", wid)
425}
426
427func nixVolume(wid models.WorkflowId) string {
428 return fmt.Sprintf("nix-%s", wid)
429}
430
431func networkName(wid models.WorkflowId) string {
432 return fmt.Sprintf("workflow-network-%s", wid)
433}
434
435func hostConfig(wid models.WorkflowId) *container.HostConfig {
436 hostConfig := &container.HostConfig{
437 Mounts: []mount.Mount{
438 {
439 Type: mount.TypeVolume,
440 Source: workspaceVolume(wid),
441 Target: workspaceDir,
442 },
443 {
444 Type: mount.TypeVolume,
445 Source: nixVolume(wid),
446 Target: "/nix",
447 },
448 {
449 Type: mount.TypeTmpfs,
450 Target: "/tmp",
451 },
452 },
453 ReadonlyRootfs: false,
454 CapDrop: []string{"ALL"},
455 SecurityOpt: []string{"seccomp=unconfined"},
456 }
457
458 return hostConfig
459}
460
461// thanks woodpecker
462func isErrContainerNotFoundOrNotRunning(err error) bool {
463 // Error response from daemon: Cannot kill container: ...: No such container: ...
464 // Error response from daemon: Cannot kill container: ...: Container ... is not running"
465 // Error response from podman daemon: can only kill running containers. ... is in state exited
466 // Error: No such container: ...
467 return err != nil && (strings.Contains(err.Error(), "No such container") || strings.Contains(err.Error(), "is not running") || strings.Contains(err.Error(), "can only kill running containers"))
468}