Monorepo for Tangled — https://tangled.org
at buildah-engine 365 lines 9.1 kB view raw
1package spindle 2 3import ( 4 "context" 5 _ "embed" 6 "encoding/json" 7 "fmt" 8 "log/slog" 9 "net/http" 10 11 "github.com/go-chi/chi/v5" 12 "tangled.org/core/api/tangled" 13 "tangled.org/core/eventconsumer" 14 "tangled.org/core/eventconsumer/cursor" 15 "tangled.org/core/idresolver" 16 "tangled.org/core/jetstream" 17 "tangled.org/core/log" 18 "tangled.org/core/notifier" 19 "tangled.org/core/rbac" 20 "tangled.org/core/spindle/config" 21 "tangled.org/core/spindle/db" 22 "tangled.org/core/spindle/engine" 23 "tangled.org/core/spindle/engines/buildah" 24 "tangled.org/core/spindle/engines/nixery" 25 "tangled.org/core/spindle/models" 26 "tangled.org/core/spindle/queue" 27 "tangled.org/core/spindle/secrets" 28 "tangled.org/core/spindle/xrpc" 29 "tangled.org/core/xrpc/serviceauth" 30) 31 32//go:embed motd 33var motd []byte 34 35const ( 36 rbacDomain = "thisserver" 37) 38 39type Spindle struct { 40 jc *jetstream.JetstreamClient 41 db *db.DB 42 e *rbac.Enforcer 43 l *slog.Logger 44 n *notifier.Notifier 45 engs map[string]models.Engine 46 jq *queue.Queue 47 cfg *config.Config 48 ks *eventconsumer.Consumer 49 res *idresolver.Resolver 50 vault secrets.Manager 51} 52 53func Run(ctx context.Context) error { 54 logger := log.FromContext(ctx) 55 56 cfg, err := config.Load(ctx) 57 if err != nil { 58 return fmt.Errorf("failed to load config: %w", err) 59 } 60 61 d, err := db.Make(cfg.Server.DBPath) 62 if err != nil { 63 return fmt.Errorf("failed to setup db: %w", err) 64 } 65 66 e, err := rbac.NewEnforcer(cfg.Server.DBPath) 67 if err != nil { 68 return fmt.Errorf("failed to setup rbac enforcer: %w", err) 69 } 70 e.E.EnableAutoSave(true) 71 72 n := notifier.New() 73 74 var vault secrets.Manager 75 switch cfg.Server.Secrets.Provider { 76 case "openbao": 77 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" { 78 return fmt.Errorf("openbao proxy address is required when using openbao secrets provider") 79 } 80 vault, err = secrets.NewOpenBaoManager( 81 cfg.Server.Secrets.OpenBao.ProxyAddr, 82 logger, 83 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount), 84 ) 85 if err != nil { 86 return fmt.Errorf("failed to setup openbao secrets provider: %w", err) 87 } 88 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount) 89 case "sqlite", "": 90 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets")) 91 if err != nil { 92 return fmt.Errorf("failed to setup sqlite secrets provider: %w", err) 93 } 94 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath) 95 default: 96 return fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider) 97 } 98 99 nixeryEng, err := nixery.New(ctx, cfg) 100 if err != nil { 101 return err 102 } 103 104 buildahEng, err := buildah.New(ctx, cfg) 105 if err != nil { 106 return err 107 } 108 109 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount) 110 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount) 111 112 collections := []string{ 113 tangled.SpindleMemberNSID, 114 tangled.RepoNSID, 115 tangled.RepoCollaboratorNSID, 116 } 117 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, log.SubLogger(logger, "jetstream"), d, true, true) 118 if err != nil { 119 return fmt.Errorf("failed to setup jetstream client: %w", err) 120 } 121 jc.AddDid(cfg.Server.Owner) 122 123 // Check if the spindle knows about any Dids; 124 dids, err := d.GetAllDids() 125 if err != nil { 126 return fmt.Errorf("failed to get all dids: %w", err) 127 } 128 for _, d := range dids { 129 jc.AddDid(d) 130 } 131 132 resolver := idresolver.DefaultResolver() 133 134 spindle := Spindle{ 135 jc: jc, 136 e: e, 137 db: d, 138 l: logger, 139 n: &n, 140 engs: map[string]models.Engine{"nixery": nixeryEng, "buildah": buildahEng}, 141 jq: jq, 142 cfg: cfg, 143 res: resolver, 144 vault: vault, 145 } 146 147 err = e.AddSpindle(rbacDomain) 148 if err != nil { 149 return fmt.Errorf("failed to set rbac domain: %w", err) 150 } 151 err = spindle.configureOwner() 152 if err != nil { 153 return err 154 } 155 logger.Info("owner set", "did", cfg.Server.Owner) 156 157 // starts a job queue runner in the background 158 jq.Start() 159 defer jq.Stop() 160 161 // Stop vault token renewal if it implements Stopper 162 if stopper, ok := vault.(secrets.Stopper); ok { 163 defer stopper.Stop() 164 } 165 166 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath) 167 if err != nil { 168 return fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 169 } 170 171 err = jc.StartJetstream(ctx, spindle.ingest()) 172 if err != nil { 173 return fmt.Errorf("failed to start jetstream consumer: %w", err) 174 } 175 176 // for each incoming sh.tangled.pipeline, we execute 177 // spindle.processPipeline, which in turn enqueues the pipeline 178 // job in the above registered queue. 179 ccfg := eventconsumer.NewConsumerConfig() 180 ccfg.Logger = log.SubLogger(logger, "eventconsumer") 181 ccfg.Dev = cfg.Server.Dev 182 ccfg.ProcessFunc = spindle.processPipeline 183 ccfg.CursorStore = cursorStore 184 knownKnots, err := d.Knots() 185 if err != nil { 186 return err 187 } 188 for _, knot := range knownKnots { 189 logger.Info("adding source start", "knot", knot) 190 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{} 191 } 192 spindle.ks = eventconsumer.NewConsumer(*ccfg) 193 194 go func() { 195 logger.Info("starting knot event consumer") 196 spindle.ks.Start(ctx) 197 }() 198 199 logger.Info("starting spindle server", "address", cfg.Server.ListenAddr) 200 logger.Error("server error", "error", http.ListenAndServe(cfg.Server.ListenAddr, spindle.Router())) 201 202 return nil 203} 204 205func (s *Spindle) Router() http.Handler { 206 mux := chi.NewRouter() 207 208 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { 209 w.Write(motd) 210 }) 211 mux.HandleFunc("/events", s.Events) 212 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs) 213 214 mux.Mount("/xrpc", s.XrpcRouter()) 215 return mux 216} 217 218func (s *Spindle) XrpcRouter() http.Handler { 219 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String()) 220 221 l := log.SubLogger(s.l, "xrpc") 222 223 x := xrpc.Xrpc{ 224 Logger: l, 225 Db: s.db, 226 Enforcer: s.e, 227 Engines: s.engs, 228 Config: s.cfg, 229 Resolver: s.res, 230 Vault: s.vault, 231 ServiceAuth: serviceAuth, 232 } 233 234 return x.Router() 235} 236 237func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 238 if msg.Nsid == tangled.PipelineNSID { 239 tpl := tangled.Pipeline{} 240 err := json.Unmarshal(msg.EventJson, &tpl) 241 if err != nil { 242 fmt.Println("error unmarshalling", err) 243 return err 244 } 245 246 if tpl.TriggerMetadata == nil { 247 return fmt.Errorf("no trigger metadata found") 248 } 249 250 if tpl.TriggerMetadata.Repo == nil { 251 return fmt.Errorf("no repo data found") 252 } 253 254 if src.Key() != tpl.TriggerMetadata.Repo.Knot { 255 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot) 256 } 257 258 // filter by repos 259 _, err = s.db.GetRepo( 260 tpl.TriggerMetadata.Repo.Knot, 261 tpl.TriggerMetadata.Repo.Did, 262 tpl.TriggerMetadata.Repo.Repo, 263 ) 264 if err != nil { 265 return err 266 } 267 268 pipelineId := models.PipelineId{ 269 Knot: src.Key(), 270 Rkey: msg.Rkey, 271 } 272 273 workflows := make(map[models.Engine][]models.Workflow) 274 275 for _, w := range tpl.Workflows { 276 if w != nil { 277 if _, ok := s.engs[w.Engine]; !ok { 278 err = s.db.StatusFailed(models.WorkflowId{ 279 PipelineId: pipelineId, 280 Name: w.Name, 281 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n) 282 if err != nil { 283 return err 284 } 285 286 continue 287 } 288 289 eng := s.engs[w.Engine] 290 291 if _, ok := workflows[eng]; !ok { 292 workflows[eng] = []models.Workflow{} 293 } 294 295 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl) 296 if err != nil { 297 return err 298 } 299 300 workflows[eng] = append(workflows[eng], *ewf) 301 302 err = s.db.StatusPending(models.WorkflowId{ 303 PipelineId: pipelineId, 304 Name: w.Name, 305 }, s.n) 306 if err != nil { 307 return err 308 } 309 } 310 } 311 312 ok := s.jq.Enqueue(queue.Job{ 313 Run: func() error { 314 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{ 315 RepoOwner: tpl.TriggerMetadata.Repo.Did, 316 RepoName: tpl.TriggerMetadata.Repo.Repo, 317 Workflows: workflows, 318 }, pipelineId) 319 return nil 320 }, 321 OnFail: func(jobError error) { 322 s.l.Error("pipeline run failed", "error", jobError) 323 }, 324 }) 325 if ok { 326 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey) 327 } else { 328 s.l.Error("failed to enqueue pipeline: queue is full") 329 } 330 } 331 332 return nil 333} 334 335func (s *Spindle) configureOwner() error { 336 cfgOwner := s.cfg.Server.Owner 337 338 existing, err := s.e.GetSpindleUsersByRole("server:owner", rbacDomain) 339 if err != nil { 340 return err 341 } 342 343 switch len(existing) { 344 case 0: 345 // no owner configured, continue 346 case 1: 347 // find existing owner 348 existingOwner := existing[0] 349 350 // no ownership change, this is okay 351 if existingOwner == s.cfg.Server.Owner { 352 break 353 } 354 355 // remove existing owner 356 err = s.e.RemoveSpindleOwner(rbacDomain, existingOwner) 357 if err != nil { 358 return nil 359 } 360 default: 361 return fmt.Errorf("more than one owner in DB, try deleting %q and starting over", s.cfg.Server.DBPath) 362 } 363 364 return s.e.AddSpindleOwner(rbacDomain, cfgOwner) 365}