this repo has no description
1package spindle 2 3import ( 4 "context" 5 _ "embed" 6 "encoding/json" 7 "fmt" 8 "log/slog" 9 "maps" 10 "net/http" 11 12 "github.com/bluesky-social/indigo/atproto/syntax" 13 "github.com/go-chi/chi/v5" 14 "tangled.org/core/api/tangled" 15 "tangled.org/core/eventconsumer" 16 "tangled.org/core/eventconsumer/cursor" 17 "tangled.org/core/idresolver" 18 "tangled.org/core/log" 19 "tangled.org/core/notifier" 20 "tangled.org/core/rbac2" 21 "tangled.org/core/spindle/config" 22 "tangled.org/core/spindle/db" 23 "tangled.org/core/spindle/engine" 24 "tangled.org/core/spindle/engines/nixery" 25 "tangled.org/core/spindle/models" 26 "tangled.org/core/spindle/queue" 27 "tangled.org/core/spindle/secrets" 28 "tangled.org/core/spindle/xrpc" 29 "tangled.org/core/tap" 30 "tangled.org/core/xrpc/serviceauth" 31) 32 33//go:embed motd 34var motd []byte 35 36type Spindle struct { 37 tap *tap.Client 38 db *db.DB 39 e *rbac2.Enforcer 40 l *slog.Logger 41 n *notifier.Notifier 42 engs map[string]models.Engine 43 jq *queue.Queue 44 cfg *config.Config 45 ks *eventconsumer.Consumer 46 res *idresolver.Resolver 47 vault secrets.Manager 48} 49 50// New creates a new Spindle server with the provided configuration and engines. 51func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) { 52 logger := log.FromContext(ctx) 53 54 d, err := db.Make(ctx, cfg.Server.DBPath) 55 if err != nil { 56 return nil, fmt.Errorf("failed to setup db: %w", err) 57 } 58 59 e, err := rbac2.NewEnforcer(cfg.Server.DBPath) 60 if err != nil { 61 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err) 62 } 63 64 n := notifier.New() 65 66 var vault secrets.Manager 67 switch cfg.Server.Secrets.Provider { 68 case "openbao": 69 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" { 70 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider") 71 } 72 vault, err = secrets.NewOpenBaoManager( 73 cfg.Server.Secrets.OpenBao.ProxyAddr, 74 logger, 75 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount), 76 ) 77 if err != nil { 78 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err) 79 } 80 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount) 81 case "sqlite", "": 82 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets")) 83 if err != nil { 84 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err) 85 } 86 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath) 87 default: 88 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider) 89 } 90 91 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount) 92 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount) 93 94 tap := tap.NewClient(cfg.Server.TapUrl, "") 95 96 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl) 97 98 spindle := &Spindle{ 99 tap: &tap, 100 e: e, 101 db: d, 102 l: logger, 103 n: &n, 104 engs: engines, 105 jq: jq, 106 cfg: cfg, 107 res: resolver, 108 vault: vault, 109 } 110 111 err = e.SetSpindleOwner(spindle.cfg.Server.Owner, spindle.cfg.Server.Did()) 112 if err != nil { 113 return nil, err 114 } 115 logger.Info("owner set", "did", cfg.Server.Owner) 116 117 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath) 118 if err != nil { 119 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 120 } 121 122 // for each incoming sh.tangled.pipeline, we execute 123 // spindle.processPipeline, which in turn enqueues the pipeline 124 // job in the above registered queue. 125 ccfg := eventconsumer.NewConsumerConfig() 126 ccfg.Logger = log.SubLogger(logger, "eventconsumer") 127 ccfg.Dev = cfg.Server.Dev 128 ccfg.ProcessFunc = spindle.processPipeline 129 ccfg.CursorStore = cursorStore 130 knownKnots, err := d.Knots() 131 if err != nil { 132 return nil, err 133 } 134 for _, knot := range knownKnots { 135 logger.Info("adding source start", "knot", knot) 136 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{} 137 } 138 spindle.ks = eventconsumer.NewConsumer(*ccfg) 139 140 return spindle, nil 141} 142 143// DB returns the database instance. 144func (s *Spindle) DB() *db.DB { 145 return s.db 146} 147 148// Queue returns the job queue instance. 149func (s *Spindle) Queue() *queue.Queue { 150 return s.jq 151} 152 153// Engines returns the map of available engines. 154func (s *Spindle) Engines() map[string]models.Engine { 155 return s.engs 156} 157 158// Vault returns the secrets manager instance. 159func (s *Spindle) Vault() secrets.Manager { 160 return s.vault 161} 162 163// Notifier returns the notifier instance. 164func (s *Spindle) Notifier() *notifier.Notifier { 165 return s.n 166} 167 168// Enforcer returns the RBAC enforcer instance. 169func (s *Spindle) Enforcer() *rbac2.Enforcer { 170 return s.e 171} 172 173// Start starts the Spindle server (blocking). 174func (s *Spindle) Start(ctx context.Context) error { 175 // starts a job queue runner in the background 176 s.jq.Start() 177 defer s.jq.Stop() 178 179 // Stop vault token renewal if it implements Stopper 180 if stopper, ok := s.vault.(secrets.Stopper); ok { 181 defer stopper.Stop() 182 } 183 184 go func() { 185 s.l.Info("starting knot event consumer") 186 s.ks.Start(ctx) 187 }() 188 189 // ensure server owner is tracked 190 if err := s.tap.AddRepos(ctx, []syntax.DID{s.cfg.Server.Owner}); err != nil { 191 return err 192 } 193 194 go func() { 195 s.l.Info("starting tap stream consumer") 196 s.tap.Connect(ctx, &tap.SimpleIndexer{ 197 EventHandler: s.processEvent, 198 }) 199 }() 200 201 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr) 202 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router()) 203} 204 205func Run(ctx context.Context) error { 206 cfg, err := config.Load(ctx) 207 if err != nil { 208 return fmt.Errorf("failed to load config: %w", err) 209 } 210 211 nixeryEng, err := nixery.New(ctx, cfg) 212 if err != nil { 213 return err 214 } 215 216 s, err := New(ctx, cfg, map[string]models.Engine{ 217 "nixery": nixeryEng, 218 }) 219 if err != nil { 220 return err 221 } 222 223 return s.Start(ctx) 224} 225 226func (s *Spindle) Router() http.Handler { 227 mux := chi.NewRouter() 228 229 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { 230 w.Write(motd) 231 }) 232 mux.HandleFunc("/events", s.Events) 233 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs) 234 235 mux.Mount("/xrpc", s.XrpcRouter()) 236 return mux 237} 238 239func (s *Spindle) XrpcRouter() http.Handler { 240 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String()) 241 242 l := log.SubLogger(s.l, "xrpc") 243 244 x := xrpc.Xrpc{ 245 Logger: l, 246 Db: s.db, 247 Enforcer: s.e, 248 Engines: s.engs, 249 Config: s.cfg, 250 Resolver: s.res, 251 Vault: s.vault, 252 Notifier: s.Notifier(), 253 ServiceAuth: serviceAuth, 254 } 255 256 return x.Router() 257} 258 259func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 260 if msg.Nsid == tangled.PipelineNSID { 261 tpl := tangled.Pipeline{} 262 err := json.Unmarshal(msg.EventJson, &tpl) 263 if err != nil { 264 fmt.Println("error unmarshalling", err) 265 return err 266 } 267 268 if tpl.TriggerMetadata == nil { 269 return fmt.Errorf("no trigger metadata found") 270 } 271 272 if tpl.TriggerMetadata.Repo == nil { 273 return fmt.Errorf("no repo data found") 274 } 275 276 if src.Key() != tpl.TriggerMetadata.Repo.Knot { 277 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot) 278 } 279 280 // filter by repos 281 _, err = s.db.GetRepoWithName( 282 syntax.DID(tpl.TriggerMetadata.Repo.Did), 283 tpl.TriggerMetadata.Repo.Repo, 284 ) 285 if err != nil { 286 return fmt.Errorf("failed to get repo: %w", err) 287 } 288 289 pipelineId := models.PipelineId{ 290 Knot: src.Key(), 291 Rkey: msg.Rkey, 292 } 293 294 workflows := make(map[models.Engine][]models.Workflow) 295 296 // Build pipeline environment variables once for all workflows 297 pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId, s.cfg.Server.Dev) 298 299 for _, w := range tpl.Workflows { 300 if w != nil { 301 if _, ok := s.engs[w.Engine]; !ok { 302 err = s.db.StatusFailed(models.WorkflowId{ 303 PipelineId: pipelineId, 304 Name: w.Name, 305 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n) 306 if err != nil { 307 return fmt.Errorf("db.StatusFailed: %w", err) 308 } 309 310 continue 311 } 312 313 eng := s.engs[w.Engine] 314 315 if _, ok := workflows[eng]; !ok { 316 workflows[eng] = []models.Workflow{} 317 } 318 319 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl) 320 if err != nil { 321 return fmt.Errorf("init workflow: %w", err) 322 } 323 324 // inject TANGLED_* env vars after InitWorkflow 325 // This prevents user-defined env vars from overriding them 326 if ewf.Environment == nil { 327 ewf.Environment = make(map[string]string) 328 } 329 maps.Copy(ewf.Environment, pipelineEnv) 330 331 workflows[eng] = append(workflows[eng], *ewf) 332 333 err = s.db.StatusPending(models.WorkflowId{ 334 PipelineId: pipelineId, 335 Name: w.Name, 336 }, s.n) 337 if err != nil { 338 return fmt.Errorf("db.StatusPending: %w", err) 339 } 340 } 341 } 342 343 ok := s.jq.Enqueue(queue.Job{ 344 Run: func() error { 345 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{ 346 RepoOwner: tpl.TriggerMetadata.Repo.Did, 347 RepoName: tpl.TriggerMetadata.Repo.Repo, 348 Workflows: workflows, 349 }, pipelineId) 350 return nil 351 }, 352 OnFail: func(jobError error) { 353 s.l.Error("pipeline run failed", "error", jobError) 354 }, 355 }) 356 if ok { 357 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey) 358 } else { 359 s.l.Error("failed to enqueue pipeline: queue is full") 360 } 361 } 362 363 return nil 364}