this repo has no description
1package engine 2 3import ( 4 "bufio" 5 "context" 6 "fmt" 7 "io" 8 "log/slog" 9 "os" 10 "path" 11 "sync" 12 13 "github.com/docker/docker/api/types/container" 14 "github.com/docker/docker/api/types/image" 15 "github.com/docker/docker/api/types/mount" 16 "github.com/docker/docker/api/types/network" 17 "github.com/docker/docker/api/types/volume" 18 "github.com/docker/docker/client" 19 "github.com/docker/docker/pkg/stdcopy" 20 "golang.org/x/sync/errgroup" 21 "tangled.sh/tangled.sh/core/api/tangled" 22 "tangled.sh/tangled.sh/core/knotserver/notifier" 23 "tangled.sh/tangled.sh/core/log" 24 "tangled.sh/tangled.sh/core/spindle/db" 25) 26 27const ( 28 workspaceDir = "/tangled/workspace" 29) 30 31type Engine struct { 32 docker client.APIClient 33 l *slog.Logger 34 db *db.DB 35 n *notifier.Notifier 36 37 chanMu sync.RWMutex 38 stdoutChans map[string]chan string 39 stderrChans map[string]chan string 40} 41 42func New(ctx context.Context, db *db.DB, n *notifier.Notifier) (*Engine, error) { 43 dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) 44 if err != nil { 45 return nil, err 46 } 47 48 l := log.FromContext(ctx).With("component", "spindle") 49 50 e := &Engine{ 51 docker: dcli, 52 l: l, 53 db: db, 54 n: n, 55 } 56 57 e.stdoutChans = make(map[string]chan string, 100) 58 e.stderrChans = make(map[string]chan string, 100) 59 60 return e, nil 61} 62 63// SetupPipeline sets up a new network for the pipeline, and possibly volumes etc. 64// in the future. In here also goes other setup steps. 65func (e *Engine) SetupPipeline(ctx context.Context, pipeline *tangled.Pipeline, atUri, id string) error { 66 e.l.Info("setting up pipeline", "pipeline", id) 67 68 _, err := e.docker.VolumeCreate(ctx, volume.CreateOptions{ 69 Name: workspaceVolume(id), 70 Driver: "local", 71 }) 72 if err != nil { 73 return err 74 } 75 76 _, err = e.docker.VolumeCreate(ctx, volume.CreateOptions{ 77 Name: nixVolume(id), 78 Driver: "local", 79 }) 80 if err != nil { 81 return err 82 } 83 84 _, err = e.docker.NetworkCreate(ctx, pipelineName(id), network.CreateOptions{ 85 Driver: "bridge", 86 }) 87 if err != nil { 88 return err 89 } 90 91 err = e.db.CreatePipeline(id, atUri, e.n) 92 return err 93} 94 95func (e *Engine) StartWorkflows(ctx context.Context, pipeline *tangled.Pipeline, id string) error { 96 e.l.Info("starting all workflows in parallel", "pipeline", id) 97 98 err := e.db.MarkPipelineRunning(id, e.n) 99 if err != nil { 100 return err 101 } 102 103 g := errgroup.Group{} 104 for _, w := range pipeline.Workflows { 105 g.Go(func() error { 106 // TODO: actual checks for image/registry etc. 107 var deps string 108 for _, d := range w.Dependencies { 109 if d.Registry == "nixpkgs" { 110 deps = path.Join(d.Packages...) 111 } 112 } 113 114 // load defaults from somewhere else 115 deps = path.Join(deps, "bash", "git", "coreutils", "nix") 116 117 cimg := path.Join("nixery.dev", deps) 118 reader, err := e.docker.ImagePull(ctx, cimg, image.PullOptions{}) 119 if err != nil { 120 e.l.Error("pipeline failed!", "id", id, "error", err.Error()) 121 err := e.db.MarkPipelineFailed(id, -1, err.Error(), e.n) 122 if err != nil { 123 return err 124 } 125 return fmt.Errorf("pulling image: %w", err) 126 } 127 defer reader.Close() 128 io.Copy(os.Stdout, reader) 129 130 err = e.StartSteps(ctx, w.Steps, id, cimg) 131 if err != nil { 132 e.l.Error("pipeline failed!", "id", id, "error", err.Error()) 133 return e.db.MarkPipelineFailed(id, -1, err.Error(), e.n) 134 } 135 136 return nil 137 }) 138 } 139 140 err = g.Wait() 141 if err != nil { 142 e.l.Error("pipeline failed!", "id", id, "error", err.Error()) 143 return e.db.MarkPipelineFailed(id, -1, err.Error(), e.n) 144 } 145 146 e.l.Info("pipeline success!", "id", id) 147 return e.db.MarkPipelineSuccess(id, e.n) 148} 149 150// StartSteps starts all steps sequentially with the same base image. 151// ONLY marks pipeline as failed if container's exit code is non-zero. 152// All other errors are bubbled up. 153func (e *Engine) StartSteps(ctx context.Context, steps []*tangled.Pipeline_Step, id, image string) error { 154 // set up logging channels 155 e.chanMu.Lock() 156 if _, exists := e.stdoutChans[id]; !exists { 157 e.stdoutChans[id] = make(chan string, 100) 158 } 159 if _, exists := e.stderrChans[id]; !exists { 160 e.stderrChans[id] = make(chan string, 100) 161 } 162 e.chanMu.Unlock() 163 164 // close channels after all steps are complete 165 defer func() { 166 close(e.stdoutChans[id]) 167 close(e.stderrChans[id]) 168 }() 169 170 for _, step := range steps { 171 hostConfig := hostConfig(id) 172 resp, err := e.docker.ContainerCreate(ctx, &container.Config{ 173 Image: image, 174 Cmd: []string{"bash", "-c", step.Command}, 175 WorkingDir: workspaceDir, 176 Tty: false, 177 Hostname: "spindle", 178 Env: []string{"HOME=" + workspaceDir}, 179 }, hostConfig, nil, nil, "") 180 if err != nil { 181 return fmt.Errorf("creating container: %w", err) 182 } 183 184 err = e.docker.NetworkConnect(ctx, pipelineName(id), resp.ID, nil) 185 if err != nil { 186 return fmt.Errorf("connecting network: %w", err) 187 } 188 189 err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{}) 190 if err != nil { 191 return err 192 } 193 e.l.Info("started container", "name", resp.ID, "step", step.Name) 194 195 wg := sync.WaitGroup{} 196 197 wg.Add(1) 198 go func() { 199 defer wg.Done() 200 err := e.TailStep(ctx, resp.ID, id) 201 if err != nil { 202 e.l.Error("failed to tail container", "container", resp.ID) 203 return 204 } 205 }() 206 207 // wait until all logs are piped 208 wg.Wait() 209 210 state, err := e.WaitStep(ctx, resp.ID) 211 if err != nil { 212 return err 213 } 214 215 if state.ExitCode != 0 { 216 e.l.Error("pipeline failed!", "id", id, "error", state.Error, "exit_code", state.ExitCode) 217 return e.db.MarkPipelineFailed(id, state.ExitCode, state.Error, e.n) 218 } 219 } 220 221 return nil 222 223} 224 225func (e *Engine) WaitStep(ctx context.Context, containerID string) (*container.State, error) { 226 wait, errCh := e.docker.ContainerWait(ctx, containerID, container.WaitConditionNotRunning) 227 select { 228 case err := <-errCh: 229 if err != nil { 230 return nil, err 231 } 232 case <-wait: 233 } 234 235 e.l.Info("waited for container", "name", containerID) 236 237 info, err := e.docker.ContainerInspect(ctx, containerID) 238 if err != nil { 239 return nil, err 240 } 241 242 return info.State, nil 243} 244 245func (e *Engine) TailStep(ctx context.Context, containerID, pipelineID string) error { 246 logs, err := e.docker.ContainerLogs(ctx, containerID, container.LogsOptions{ 247 Follow: true, 248 ShowStdout: true, 249 ShowStderr: true, 250 Details: false, 251 Timestamps: false, 252 }) 253 if err != nil { 254 return err 255 } 256 257 // using StdCopy we demux logs and stream stdout and stderr to different 258 // channels. 259 // 260 // stdout w||r stdoutCh 261 // stderr w||r stderrCh 262 // 263 264 rpipeOut, wpipeOut := io.Pipe() 265 rpipeErr, wpipeErr := io.Pipe() 266 267 go func() { 268 defer wpipeOut.Close() 269 defer wpipeErr.Close() 270 _, err := stdcopy.StdCopy(wpipeOut, wpipeErr, logs) 271 if err != nil && err != io.EOF { 272 e.l.Error("failed to copy logs", "error", err) 273 } 274 }() 275 276 // read from stdout and send to stdout pipe 277 // NOTE: the stdoutCh channnel is closed further up in StartSteps 278 // once all steps are done. 279 go func() { 280 e.chanMu.RLock() 281 stdoutCh := e.stdoutChans[pipelineID] 282 e.chanMu.RUnlock() 283 284 scanner := bufio.NewScanner(rpipeOut) 285 for scanner.Scan() { 286 stdoutCh <- scanner.Text() 287 } 288 if err := scanner.Err(); err != nil { 289 e.l.Error("failed to scan stdout", "error", err) 290 } 291 }() 292 293 // read from stderr and send to stderr pipe 294 // NOTE: the stderrCh channnel is closed further up in StartSteps 295 // once all steps are done. 296 go func() { 297 e.chanMu.RLock() 298 stderrCh := e.stderrChans[pipelineID] 299 e.chanMu.RUnlock() 300 301 scanner := bufio.NewScanner(rpipeErr) 302 for scanner.Scan() { 303 stderrCh <- scanner.Text() 304 } 305 if err := scanner.Err(); err != nil { 306 e.l.Error("failed to scan stderr", "error", err) 307 } 308 }() 309 310 return nil 311} 312 313func (e *Engine) LogChannels(pipelineID string) (stdout <-chan string, stderr <-chan string, ok bool) { 314 e.chanMu.RLock() 315 defer e.chanMu.RUnlock() 316 317 stdoutCh, ok1 := e.stdoutChans[pipelineID] 318 stderrCh, ok2 := e.stderrChans[pipelineID] 319 320 if !ok1 || !ok2 { 321 return nil, nil, false 322 } 323 return stdoutCh, stderrCh, true 324} 325 326func workspaceVolume(id string) string { 327 return "workspace-" + id 328} 329 330func nixVolume(id string) string { 331 return "nix-" + id 332} 333 334func pipelineName(id string) string { 335 return "pipeline-" + id 336} 337 338func hostConfig(id string) *container.HostConfig { 339 hostConfig := &container.HostConfig{ 340 Mounts: []mount.Mount{ 341 { 342 Type: mount.TypeVolume, 343 Source: workspaceVolume(id), 344 Target: workspaceDir, 345 }, 346 { 347 Type: mount.TypeVolume, 348 Source: nixVolume(id), 349 Target: "/nix", 350 }, 351 }, 352 ReadonlyRootfs: true, 353 CapDrop: []string{"ALL"}, 354 SecurityOpt: []string{"no-new-privileges"}, 355 } 356 357 return hostConfig 358}