this repo has no description
1package engine
2
3import (
4 "bufio"
5 "context"
6 "fmt"
7 "io"
8 "log/slog"
9 "os"
10 "path"
11 "sync"
12
13 "github.com/docker/docker/api/types/container"
14 "github.com/docker/docker/api/types/image"
15 "github.com/docker/docker/api/types/mount"
16 "github.com/docker/docker/api/types/network"
17 "github.com/docker/docker/api/types/volume"
18 "github.com/docker/docker/client"
19 "github.com/docker/docker/pkg/stdcopy"
20 "golang.org/x/sync/errgroup"
21 "tangled.sh/tangled.sh/core/api/tangled"
22 "tangled.sh/tangled.sh/core/knotserver/notifier"
23 "tangled.sh/tangled.sh/core/log"
24 "tangled.sh/tangled.sh/core/spindle/db"
25)
26
27const (
28 workspaceDir = "/tangled/workspace"
29)
30
31type Engine struct {
32 docker client.APIClient
33 l *slog.Logger
34 db *db.DB
35 n *notifier.Notifier
36
37 chanMu sync.RWMutex
38 stdoutChans map[string]chan string
39 stderrChans map[string]chan string
40}
41
42func New(ctx context.Context, db *db.DB, n *notifier.Notifier) (*Engine, error) {
43 dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
44 if err != nil {
45 return nil, err
46 }
47
48 l := log.FromContext(ctx).With("component", "spindle")
49
50 e := &Engine{
51 docker: dcli,
52 l: l,
53 db: db,
54 n: n,
55 }
56
57 e.stdoutChans = make(map[string]chan string, 100)
58 e.stderrChans = make(map[string]chan string, 100)
59
60 return e, nil
61}
62
63// SetupPipeline sets up a new network for the pipeline, and possibly volumes etc.
64// in the future. In here also goes other setup steps.
65func (e *Engine) SetupPipeline(ctx context.Context, pipeline *tangled.Pipeline, atUri, id string) error {
66 e.l.Info("setting up pipeline", "pipeline", id)
67
68 _, err := e.docker.VolumeCreate(ctx, volume.CreateOptions{
69 Name: workspaceVolume(id),
70 Driver: "local",
71 })
72 if err != nil {
73 return err
74 }
75
76 _, err = e.docker.VolumeCreate(ctx, volume.CreateOptions{
77 Name: nixVolume(id),
78 Driver: "local",
79 })
80 if err != nil {
81 return err
82 }
83
84 _, err = e.docker.NetworkCreate(ctx, pipelineName(id), network.CreateOptions{
85 Driver: "bridge",
86 })
87 if err != nil {
88 return err
89 }
90
91 err = e.db.CreatePipeline(id, atUri, e.n)
92 return err
93}
94
95func (e *Engine) StartWorkflows(ctx context.Context, pipeline *tangled.Pipeline, id string) error {
96 e.l.Info("starting all workflows in parallel", "pipeline", id)
97
98 err := e.db.MarkPipelineRunning(id, e.n)
99 if err != nil {
100 return err
101 }
102
103 g := errgroup.Group{}
104 for _, w := range pipeline.Workflows {
105 g.Go(func() error {
106 // TODO: actual checks for image/registry etc.
107 var deps string
108 for _, d := range w.Dependencies {
109 if d.Registry == "nixpkgs" {
110 deps = path.Join(d.Packages...)
111 }
112 }
113
114 // load defaults from somewhere else
115 deps = path.Join(deps, "bash", "git", "coreutils", "nix")
116
117 cimg := path.Join("nixery.dev", deps)
118 reader, err := e.docker.ImagePull(ctx, cimg, image.PullOptions{})
119 if err != nil {
120 e.l.Error("pipeline failed!", "id", id, "error", err.Error())
121 err := e.db.MarkPipelineFailed(id, -1, err.Error(), e.n)
122 if err != nil {
123 return err
124 }
125 return fmt.Errorf("pulling image: %w", err)
126 }
127 defer reader.Close()
128 io.Copy(os.Stdout, reader)
129
130 err = e.StartSteps(ctx, w.Steps, id, cimg)
131 if err != nil {
132 e.l.Error("pipeline failed!", "id", id, "error", err.Error())
133 return e.db.MarkPipelineFailed(id, -1, err.Error(), e.n)
134 }
135
136 return nil
137 })
138 }
139
140 err = g.Wait()
141 if err != nil {
142 e.l.Error("pipeline failed!", "id", id, "error", err.Error())
143 return e.db.MarkPipelineFailed(id, -1, err.Error(), e.n)
144 }
145
146 e.l.Info("pipeline success!", "id", id)
147 return e.db.MarkPipelineSuccess(id, e.n)
148}
149
150// StartSteps starts all steps sequentially with the same base image.
151// ONLY marks pipeline as failed if container's exit code is non-zero.
152// All other errors are bubbled up.
153func (e *Engine) StartSteps(ctx context.Context, steps []*tangled.Pipeline_Step, id, image string) error {
154 // set up logging channels
155 e.chanMu.Lock()
156 if _, exists := e.stdoutChans[id]; !exists {
157 e.stdoutChans[id] = make(chan string, 100)
158 }
159 if _, exists := e.stderrChans[id]; !exists {
160 e.stderrChans[id] = make(chan string, 100)
161 }
162 e.chanMu.Unlock()
163
164 // close channels after all steps are complete
165 defer func() {
166 close(e.stdoutChans[id])
167 close(e.stderrChans[id])
168 }()
169
170 for _, step := range steps {
171 hostConfig := hostConfig(id)
172 resp, err := e.docker.ContainerCreate(ctx, &container.Config{
173 Image: image,
174 Cmd: []string{"bash", "-c", step.Command},
175 WorkingDir: workspaceDir,
176 Tty: false,
177 Hostname: "spindle",
178 Env: []string{"HOME=" + workspaceDir},
179 }, hostConfig, nil, nil, "")
180 if err != nil {
181 return fmt.Errorf("creating container: %w", err)
182 }
183
184 err = e.docker.NetworkConnect(ctx, pipelineName(id), resp.ID, nil)
185 if err != nil {
186 return fmt.Errorf("connecting network: %w", err)
187 }
188
189 err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{})
190 if err != nil {
191 return err
192 }
193 e.l.Info("started container", "name", resp.ID, "step", step.Name)
194
195 wg := sync.WaitGroup{}
196
197 wg.Add(1)
198 go func() {
199 defer wg.Done()
200 err := e.TailStep(ctx, resp.ID, id)
201 if err != nil {
202 e.l.Error("failed to tail container", "container", resp.ID)
203 return
204 }
205 }()
206
207 // wait until all logs are piped
208 wg.Wait()
209
210 state, err := e.WaitStep(ctx, resp.ID)
211 if err != nil {
212 return err
213 }
214
215 if state.ExitCode != 0 {
216 e.l.Error("pipeline failed!", "id", id, "error", state.Error, "exit_code", state.ExitCode)
217 return e.db.MarkPipelineFailed(id, state.ExitCode, state.Error, e.n)
218 }
219 }
220
221 return nil
222
223}
224
225func (e *Engine) WaitStep(ctx context.Context, containerID string) (*container.State, error) {
226 wait, errCh := e.docker.ContainerWait(ctx, containerID, container.WaitConditionNotRunning)
227 select {
228 case err := <-errCh:
229 if err != nil {
230 return nil, err
231 }
232 case <-wait:
233 }
234
235 e.l.Info("waited for container", "name", containerID)
236
237 info, err := e.docker.ContainerInspect(ctx, containerID)
238 if err != nil {
239 return nil, err
240 }
241
242 return info.State, nil
243}
244
245func (e *Engine) TailStep(ctx context.Context, containerID, pipelineID string) error {
246 logs, err := e.docker.ContainerLogs(ctx, containerID, container.LogsOptions{
247 Follow: true,
248 ShowStdout: true,
249 ShowStderr: true,
250 Details: false,
251 Timestamps: false,
252 })
253 if err != nil {
254 return err
255 }
256
257 // using StdCopy we demux logs and stream stdout and stderr to different
258 // channels.
259 //
260 // stdout w||r stdoutCh
261 // stderr w||r stderrCh
262 //
263
264 rpipeOut, wpipeOut := io.Pipe()
265 rpipeErr, wpipeErr := io.Pipe()
266
267 go func() {
268 defer wpipeOut.Close()
269 defer wpipeErr.Close()
270 _, err := stdcopy.StdCopy(wpipeOut, wpipeErr, logs)
271 if err != nil && err != io.EOF {
272 e.l.Error("failed to copy logs", "error", err)
273 }
274 }()
275
276 // read from stdout and send to stdout pipe
277 // NOTE: the stdoutCh channnel is closed further up in StartSteps
278 // once all steps are done.
279 go func() {
280 e.chanMu.RLock()
281 stdoutCh := e.stdoutChans[pipelineID]
282 e.chanMu.RUnlock()
283
284 scanner := bufio.NewScanner(rpipeOut)
285 for scanner.Scan() {
286 stdoutCh <- scanner.Text()
287 }
288 if err := scanner.Err(); err != nil {
289 e.l.Error("failed to scan stdout", "error", err)
290 }
291 }()
292
293 // read from stderr and send to stderr pipe
294 // NOTE: the stderrCh channnel is closed further up in StartSteps
295 // once all steps are done.
296 go func() {
297 e.chanMu.RLock()
298 stderrCh := e.stderrChans[pipelineID]
299 e.chanMu.RUnlock()
300
301 scanner := bufio.NewScanner(rpipeErr)
302 for scanner.Scan() {
303 stderrCh <- scanner.Text()
304 }
305 if err := scanner.Err(); err != nil {
306 e.l.Error("failed to scan stderr", "error", err)
307 }
308 }()
309
310 return nil
311}
312
313func (e *Engine) LogChannels(pipelineID string) (stdout <-chan string, stderr <-chan string, ok bool) {
314 e.chanMu.RLock()
315 defer e.chanMu.RUnlock()
316
317 stdoutCh, ok1 := e.stdoutChans[pipelineID]
318 stderrCh, ok2 := e.stderrChans[pipelineID]
319
320 if !ok1 || !ok2 {
321 return nil, nil, false
322 }
323 return stdoutCh, stderrCh, true
324}
325
326func workspaceVolume(id string) string {
327 return "workspace-" + id
328}
329
330func nixVolume(id string) string {
331 return "nix-" + id
332}
333
334func pipelineName(id string) string {
335 return "pipeline-" + id
336}
337
338func hostConfig(id string) *container.HostConfig {
339 hostConfig := &container.HostConfig{
340 Mounts: []mount.Mount{
341 {
342 Type: mount.TypeVolume,
343 Source: workspaceVolume(id),
344 Target: workspaceDir,
345 },
346 {
347 Type: mount.TypeVolume,
348 Source: nixVolume(id),
349 Target: "/nix",
350 },
351 },
352 ReadonlyRootfs: true,
353 CapDrop: []string{"ALL"},
354 SecurityOpt: []string{"no-new-privileges"},
355 }
356
357 return hostConfig
358}