Monorepo for Tangled — https://tangled.org
at loom 423 lines 11 kB view raw
1package spindle 2 3import ( 4 "context" 5 _ "embed" 6 "encoding/json" 7 "fmt" 8 "log/slog" 9 "maps" 10 "net/http" 11 "os" 12 13 "github.com/go-chi/chi/v5" 14 "tangled.org/core/api/tangled" 15 "tangled.org/core/eventconsumer" 16 "tangled.org/core/eventconsumer/cursor" 17 "tangled.org/core/idresolver" 18 "tangled.org/core/jetstream" 19 "tangled.org/core/log" 20 "tangled.org/core/notifier" 21 "tangled.org/core/rbac" 22 "tangled.org/core/spindle/config" 23 "tangled.org/core/spindle/db" 24 "tangled.org/core/spindle/engine" 25 "tangled.org/core/spindle/engines/nixery" 26 "tangled.org/core/spindle/models" 27 "tangled.org/core/spindle/queue" 28 "tangled.org/core/spindle/secrets" 29 "tangled.org/core/spindle/xrpc" 30 "tangled.org/core/xrpc/serviceauth" 31) 32 33//go:embed motd 34var motd []byte 35 36const ( 37 rbacDomain = "thisserver" 38) 39 40type Spindle struct { 41 jc *jetstream.JetstreamClient 42 db *db.DB 43 e *rbac.Enforcer 44 l *slog.Logger 45 n *notifier.Notifier 46 engs map[string]models.Engine 47 jq *queue.Queue 48 cfg *config.Config 49 ks *eventconsumer.Consumer 50 res *idresolver.Resolver 51 vault secrets.Manager 52} 53 54// New creates a new Spindle server with the provided configuration and engines. 55func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) { 56 logger := log.FromContext(ctx) 57 58 d, err := db.Make(cfg.Server.DBPath) 59 if err != nil { 60 return nil, fmt.Errorf("failed to setup db: %w", err) 61 } 62 63 e, err := rbac.NewEnforcer(cfg.Server.DBPath) 64 if err != nil { 65 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err) 66 } 67 e.E.EnableAutoSave(true) 68 69 n := notifier.New() 70 71 var vault secrets.Manager 72 switch cfg.Server.Secrets.Provider { 73 case "openbao": 74 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" { 75 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider") 76 } 77 vault, err = secrets.NewOpenBaoManager( 78 cfg.Server.Secrets.OpenBao.ProxyAddr, 79 logger, 80 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount), 81 ) 82 if err != nil { 83 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err) 84 } 85 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount) 86 case "sqlite", "": 87 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets")) 88 if err != nil { 89 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err) 90 } 91 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath) 92 default: 93 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider) 94 } 95 96 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount) 97 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount) 98 99 collections := []string{ 100 tangled.SpindleMemberNSID, 101 tangled.RepoNSID, 102 tangled.RepoCollaboratorNSID, 103 } 104 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, log.SubLogger(logger, "jetstream"), d, true, true) 105 if err != nil { 106 return nil, fmt.Errorf("failed to setup jetstream client: %w", err) 107 } 108 jc.AddDid(cfg.Server.Owner) 109 110 // Check if the spindle knows about any Dids; 111 dids, err := d.GetAllDids() 112 if err != nil { 113 return nil, fmt.Errorf("failed to get all dids: %w", err) 114 } 115 for _, d := range dids { 116 jc.AddDid(d) 117 } 118 119 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl) 120 121 spindle := &Spindle{ 122 jc: jc, 123 e: e, 124 db: d, 125 l: logger, 126 n: &n, 127 engs: engines, 128 jq: jq, 129 cfg: cfg, 130 res: resolver, 131 vault: vault, 132 } 133 134 err = e.AddSpindle(rbacDomain) 135 if err != nil { 136 return nil, fmt.Errorf("failed to set rbac domain: %w", err) 137 } 138 err = spindle.configureOwner() 139 if err != nil { 140 return nil, err 141 } 142 logger.Info("owner set", "did", cfg.Server.Owner) 143 144 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath) 145 if err != nil { 146 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 147 } 148 149 err = jc.StartJetstream(ctx, spindle.ingest()) 150 if err != nil { 151 return nil, fmt.Errorf("failed to start jetstream consumer: %w", err) 152 } 153 154 // for each incoming sh.tangled.pipeline, we execute 155 // spindle.processPipeline, which in turn enqueues the pipeline 156 // job in the above registered queue. 157 ccfg := eventconsumer.NewConsumerConfig() 158 ccfg.Logger = log.SubLogger(logger, "eventconsumer") 159 ccfg.Dev = cfg.Server.Dev 160 ccfg.ProcessFunc = spindle.processPipeline 161 ccfg.CursorStore = cursorStore 162 knownKnots, err := d.Knots() 163 if err != nil { 164 return nil, err 165 } 166 for _, knot := range knownKnots { 167 logger.Info("adding source start", "knot", knot) 168 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{} 169 } 170 spindle.ks = eventconsumer.NewConsumer(*ccfg) 171 172 return spindle, nil 173} 174 175// DB returns the database instance. 176func (s *Spindle) DB() *db.DB { 177 return s.db 178} 179 180// Queue returns the job queue instance. 181func (s *Spindle) Queue() *queue.Queue { 182 return s.jq 183} 184 185// Engines returns the map of available engines. 186func (s *Spindle) Engines() map[string]models.Engine { 187 return s.engs 188} 189 190// Vault returns the secrets manager instance. 191func (s *Spindle) Vault() secrets.Manager { 192 return s.vault 193} 194 195// Notifier returns the notifier instance. 196func (s *Spindle) Notifier() *notifier.Notifier { 197 return s.n 198} 199 200// Enforcer returns the RBAC enforcer instance. 201func (s *Spindle) Enforcer() *rbac.Enforcer { 202 return s.e 203} 204 205// Start starts the Spindle server (blocking). 206func (s *Spindle) Start(ctx context.Context) error { 207 // starts a job queue runner in the background 208 s.jq.Start() 209 defer s.jq.Stop() 210 211 // Stop vault token renewal if it implements Stopper 212 if stopper, ok := s.vault.(secrets.Stopper); ok { 213 defer stopper.Stop() 214 } 215 216 go func() { 217 s.l.Info("starting knot event consumer") 218 s.ks.Start(ctx) 219 }() 220 221 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr) 222 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router()) 223} 224 225func Run(ctx context.Context) error { 226 cfg, err := config.Load(ctx) 227 if err != nil { 228 return fmt.Errorf("failed to load config: %w", err) 229 } 230 231 nixeryEng, err := nixery.New(ctx, cfg) 232 if err != nil { 233 return err 234 } 235 236 s, err := New(ctx, cfg, map[string]models.Engine{ 237 "nixery": nixeryEng, 238 }) 239 if err != nil { 240 return err 241 } 242 243 return s.Start(ctx) 244} 245 246func (s *Spindle) Router() http.Handler { 247 mux := chi.NewRouter() 248 249 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { 250 if s.cfg.Server.MOTDFile != "" { 251 if content, err := os.ReadFile(s.cfg.Server.MOTDFile); err == nil { 252 w.Write(content) 253 return 254 } 255 s.l.Warn("failed to read custom MOTD file, using default", "path", s.cfg.Server.MOTDFile) 256 } 257 w.Write(motd) 258 }) 259 mux.HandleFunc("/events", s.Events) 260 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs) 261 262 mux.Mount("/xrpc", s.XrpcRouter()) 263 return mux 264} 265 266func (s *Spindle) XrpcRouter() http.Handler { 267 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String()) 268 269 l := log.SubLogger(s.l, "xrpc") 270 271 x := xrpc.Xrpc{ 272 Logger: l, 273 Db: s.db, 274 Enforcer: s.e, 275 Engines: s.engs, 276 Config: s.cfg, 277 Resolver: s.res, 278 Vault: s.vault, 279 ServiceAuth: serviceAuth, 280 } 281 282 return x.Router() 283} 284 285func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 286 if msg.Nsid == tangled.PipelineNSID { 287 tpl := tangled.Pipeline{} 288 err := json.Unmarshal(msg.EventJson, &tpl) 289 if err != nil { 290 fmt.Println("error unmarshalling", err) 291 return err 292 } 293 294 if tpl.TriggerMetadata == nil { 295 return fmt.Errorf("no trigger metadata found") 296 } 297 298 if tpl.TriggerMetadata.Repo == nil { 299 return fmt.Errorf("no repo data found") 300 } 301 302 if src.Key() != tpl.TriggerMetadata.Repo.Knot { 303 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot) 304 } 305 306 // filter by repos 307 _, err = s.db.GetRepo( 308 tpl.TriggerMetadata.Repo.Knot, 309 tpl.TriggerMetadata.Repo.Did, 310 tpl.TriggerMetadata.Repo.Repo, 311 ) 312 if err != nil { 313 return err 314 } 315 316 pipelineId := models.PipelineId{ 317 Knot: src.Key(), 318 Rkey: msg.Rkey, 319 } 320 321 workflows := make(map[models.Engine][]models.Workflow) 322 323 // Build pipeline environment variables once for all workflows 324 pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId, s.cfg.Server.Dev, s.res) 325 326 for _, w := range tpl.Workflows { 327 if w != nil { 328 if _, ok := s.engs[w.Engine]; !ok { 329 err = s.db.StatusFailed(models.WorkflowId{ 330 PipelineId: pipelineId, 331 Name: w.Name, 332 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n) 333 if err != nil { 334 return err 335 } 336 337 continue 338 } 339 340 eng := s.engs[w.Engine] 341 342 if _, ok := workflows[eng]; !ok { 343 workflows[eng] = []models.Workflow{} 344 } 345 346 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl) 347 if err != nil { 348 return err 349 } 350 351 // inject TANGLED_* env vars after InitWorkflow 352 // This prevents user-defined env vars from overriding them 353 if ewf.Environment == nil { 354 ewf.Environment = make(map[string]string) 355 } 356 maps.Copy(ewf.Environment, pipelineEnv) 357 358 workflows[eng] = append(workflows[eng], *ewf) 359 360 err = s.db.StatusPending(models.WorkflowId{ 361 PipelineId: pipelineId, 362 Name: w.Name, 363 }, s.n) 364 if err != nil { 365 return err 366 } 367 } 368 } 369 370 ok := s.jq.Enqueue(queue.Job{ 371 Run: func() error { 372 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{ 373 RepoOwner: tpl.TriggerMetadata.Repo.Did, 374 RepoName: tpl.TriggerMetadata.Repo.Repo, 375 Workflows: workflows, 376 }, pipelineId) 377 return nil 378 }, 379 OnFail: func(jobError error) { 380 s.l.Error("pipeline run failed", "error", jobError) 381 }, 382 }) 383 if ok { 384 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey) 385 } else { 386 s.l.Error("failed to enqueue pipeline: queue is full") 387 } 388 } 389 390 return nil 391} 392 393func (s *Spindle) configureOwner() error { 394 cfgOwner := s.cfg.Server.Owner 395 396 existing, err := s.e.GetSpindleUsersByRole("server:owner", rbacDomain) 397 if err != nil { 398 return err 399 } 400 401 switch len(existing) { 402 case 0: 403 // no owner configured, continue 404 case 1: 405 // find existing owner 406 existingOwner := existing[0] 407 408 // no ownership change, this is okay 409 if existingOwner == s.cfg.Server.Owner { 410 break 411 } 412 413 // remove existing owner 414 err = s.e.RemoveSpindleOwner(rbacDomain, existingOwner) 415 if err != nil { 416 return nil 417 } 418 default: 419 return fmt.Errorf("more than one owner in DB, try deleting %q and starting over", s.cfg.Server.DBPath) 420 } 421 422 return s.e.AddSpindleOwner(rbacDomain, cfgOwner) 423}