this repo has no description
1package spindle 2 3import ( 4 "context" 5 _ "embed" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "log/slog" 10 "maps" 11 "net/http" 12 "path/filepath" 13 14 "github.com/bluesky-social/indigo/atproto/syntax" 15 "github.com/go-chi/chi/v5" 16 "github.com/go-git/go-git/v5/plumbing/object" 17 "github.com/hashicorp/go-version" 18 "tangled.org/core/api/tangled" 19 "tangled.org/core/eventconsumer" 20 "tangled.org/core/eventconsumer/cursor" 21 "tangled.org/core/idresolver" 22 kgit "tangled.org/core/knotserver/git" 23 "tangled.org/core/log" 24 "tangled.org/core/notifier" 25 "tangled.org/core/rbac2" 26 "tangled.org/core/spindle/config" 27 "tangled.org/core/spindle/db" 28 "tangled.org/core/spindle/engine" 29 "tangled.org/core/spindle/engines/nixery" 30 "tangled.org/core/spindle/git" 31 "tangled.org/core/spindle/models" 32 "tangled.org/core/spindle/queue" 33 "tangled.org/core/spindle/secrets" 34 "tangled.org/core/spindle/xrpc" 35 "tangled.org/core/tap" 36 "tangled.org/core/tid" 37 "tangled.org/core/workflow" 38 "tangled.org/core/xrpc/serviceauth" 39) 40 41//go:embed motd 42var motd []byte 43 44type Spindle struct { 45 tap *tap.Client 46 db *db.DB 47 e *rbac2.Enforcer 48 l *slog.Logger 49 n *notifier.Notifier 50 engs map[string]models.Engine 51 jq *queue.Queue 52 cfg *config.Config 53 ks *eventconsumer.Consumer 54 res *idresolver.Resolver 55 vault secrets.Manager 56} 57 58// New creates a new Spindle server with the provided configuration and engines. 59func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) { 60 logger := log.FromContext(ctx) 61 62 if err := ensureGitVersion(); err != nil { 63 return nil, fmt.Errorf("ensuring git version: %w", err) 64 } 65 66 d, err := db.Make(ctx, cfg.Server.DBPath()) 67 if err != nil { 68 return nil, fmt.Errorf("failed to setup db: %w", err) 69 } 70 71 e, err := rbac2.NewEnforcer(cfg.Server.DBPath()) 72 if err != nil { 73 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err) 74 } 75 76 n := notifier.New() 77 78 var vault secrets.Manager 79 switch cfg.Server.Secrets.Provider { 80 case "openbao": 81 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" { 82 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider") 83 } 84 vault, err = secrets.NewOpenBaoManager( 85 cfg.Server.Secrets.OpenBao.ProxyAddr, 86 logger, 87 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount), 88 ) 89 if err != nil { 90 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err) 91 } 92 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount) 93 case "sqlite", "": 94 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath(), secrets.WithTableName("secrets")) 95 if err != nil { 96 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err) 97 } 98 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath()) 99 default: 100 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider) 101 } 102 103 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount) 104 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount) 105 106 tap := tap.NewClient(cfg.Server.TapUrl, "") 107 108 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl) 109 110 spindle := &Spindle{ 111 tap: &tap, 112 e: e, 113 db: d, 114 l: logger, 115 n: &n, 116 engs: engines, 117 jq: jq, 118 cfg: cfg, 119 res: resolver, 120 vault: vault, 121 } 122 123 err = e.SetSpindleOwner(spindle.cfg.Server.Owner, spindle.cfg.Server.Did()) 124 if err != nil { 125 return nil, err 126 } 127 logger.Info("owner set", "did", cfg.Server.Owner) 128 129 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath()) 130 if err != nil { 131 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 132 } 133 134 // spindle listen to knot stream for sh.tangled.git.refUpdate 135 // which will sync the local workflow files in spindle and enqueues the 136 // pipeline job for on-push workflows 137 ccfg := eventconsumer.NewConsumerConfig() 138 ccfg.Logger = log.SubLogger(logger, "eventconsumer") 139 ccfg.Dev = cfg.Server.Dev 140 ccfg.ProcessFunc = spindle.processKnotStream 141 ccfg.CursorStore = cursorStore 142 knownKnots, err := d.Knots() 143 if err != nil { 144 return nil, err 145 } 146 for _, knot := range knownKnots { 147 logger.Info("adding source start", "knot", knot) 148 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{} 149 } 150 spindle.ks = eventconsumer.NewConsumer(*ccfg) 151 152 return spindle, nil 153} 154 155// DB returns the database instance. 156func (s *Spindle) DB() *db.DB { 157 return s.db 158} 159 160// Queue returns the job queue instance. 161func (s *Spindle) Queue() *queue.Queue { 162 return s.jq 163} 164 165// Engines returns the map of available engines. 166func (s *Spindle) Engines() map[string]models.Engine { 167 return s.engs 168} 169 170// Vault returns the secrets manager instance. 171func (s *Spindle) Vault() secrets.Manager { 172 return s.vault 173} 174 175// Notifier returns the notifier instance. 176func (s *Spindle) Notifier() *notifier.Notifier { 177 return s.n 178} 179 180// Enforcer returns the RBAC enforcer instance. 181func (s *Spindle) Enforcer() *rbac2.Enforcer { 182 return s.e 183} 184 185// Start starts the Spindle server (blocking). 186func (s *Spindle) Start(ctx context.Context) error { 187 // starts a job queue runner in the background 188 s.jq.Start() 189 defer s.jq.Stop() 190 191 // Stop vault token renewal if it implements Stopper 192 if stopper, ok := s.vault.(secrets.Stopper); ok { 193 defer stopper.Stop() 194 } 195 196 go func() { 197 s.l.Info("starting knot event consumer") 198 s.ks.Start(ctx) 199 }() 200 201 // ensure server owner is tracked 202 if err := s.tap.AddRepos(ctx, []syntax.DID{s.cfg.Server.Owner}); err != nil { 203 return err 204 } 205 206 go func() { 207 s.l.Info("starting tap stream consumer") 208 s.tap.Connect(ctx, &tap.SimpleIndexer{ 209 EventHandler: s.processEvent, 210 }) 211 }() 212 213 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr) 214 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router()) 215} 216 217func Run(ctx context.Context) error { 218 cfg, err := config.Load(ctx) 219 if err != nil { 220 return fmt.Errorf("failed to load config: %w", err) 221 } 222 223 nixeryEng, err := nixery.New(ctx, cfg) 224 if err != nil { 225 return err 226 } 227 228 s, err := New(ctx, cfg, map[string]models.Engine{ 229 "nixery": nixeryEng, 230 }) 231 if err != nil { 232 return err 233 } 234 235 return s.Start(ctx) 236} 237 238func (s *Spindle) Router() http.Handler { 239 mux := chi.NewRouter() 240 241 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { 242 w.Write(motd) 243 }) 244 mux.HandleFunc("/events", s.Events) 245 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs) 246 247 mux.Mount("/xrpc", s.XrpcRouter()) 248 return mux 249} 250 251func (s *Spindle) XrpcRouter() http.Handler { 252 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String()) 253 254 l := log.SubLogger(s.l, "xrpc") 255 256 x := xrpc.Xrpc{ 257 Logger: l, 258 Db: s.db, 259 Enforcer: s.e, 260 Engines: s.engs, 261 Config: s.cfg, 262 Resolver: s.res, 263 Vault: s.vault, 264 Notifier: s.Notifier(), 265 ServiceAuth: serviceAuth, 266 } 267 268 return x.Router() 269} 270 271func (s *Spindle) processKnotStream(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 272 l := log.FromContext(ctx).With("handler", "processKnotStream") 273 l = l.With("src", src.Key(), "msg.Nsid", msg.Nsid, "msg.Rkey", msg.Rkey) 274 if msg.Nsid == tangled.PipelineNSID { 275 return nil 276 tpl := tangled.Pipeline{} 277 err := json.Unmarshal(msg.EventJson, &tpl) 278 if err != nil { 279 fmt.Println("error unmarshalling", err) 280 return err 281 } 282 283 if tpl.TriggerMetadata == nil { 284 return fmt.Errorf("no trigger metadata found") 285 } 286 287 if tpl.TriggerMetadata.Repo == nil { 288 return fmt.Errorf("no repo data found") 289 } 290 291 if src.Key() != tpl.TriggerMetadata.Repo.Knot { 292 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot) 293 } 294 295 // filter by repos 296 _, err = s.db.GetRepoWithName( 297 syntax.DID(tpl.TriggerMetadata.Repo.Did), 298 tpl.TriggerMetadata.Repo.Repo, 299 ) 300 if err != nil { 301 return fmt.Errorf("failed to get repo: %w", err) 302 } 303 304 pipelineId := models.PipelineId{ 305 Knot: src.Key(), 306 Rkey: msg.Rkey, 307 } 308 309 err = s.processPipeline(ctx, tpl, pipelineId) 310 if err != nil { 311 return err 312 } 313 } else if msg.Nsid == tangled.GitRefUpdateNSID { 314 event := tangled.GitRefUpdate{} 315 if err := json.Unmarshal(msg.EventJson, &event); err != nil { 316 l.Error("error unmarshalling", "err", err) 317 return err 318 } 319 l = l.With("repoDid", event.RepoDid, "repoName", event.RepoName) 320 321 // resolve repo name to rkey 322 // TODO: git.refUpdate should respond with rkey instead of repo name 323 repo, err := s.db.GetRepoWithName(syntax.DID(event.RepoDid), event.RepoName) 324 if err != nil { 325 return fmt.Errorf("get repo with did and name (%s/%s): %w", event.RepoDid, event.RepoName, err) 326 } 327 328 // NOTE: we are blindly trusting the knot that it will return only repos it own 329 repoCloneUri := s.newRepoCloneUrl(src.Key(), event.RepoDid, event.RepoName) 330 repoPath := s.newRepoPath(repo.Did, repo.Rkey) 331 if err := git.SparseSyncGitRepo(ctx, repoCloneUri, repoPath, event.NewSha); err != nil { 332 return fmt.Errorf("sync git repo: %w", err) 333 } 334 l.Info("synced git repo") 335 336 compiler := workflow.Compiler{ 337 Trigger: tangled.Pipeline_TriggerMetadata{ 338 Kind: string(workflow.TriggerKindPush), 339 Push: &tangled.Pipeline_PushTriggerData{ 340 Ref: event.Ref, 341 OldSha: event.OldSha, 342 NewSha: event.NewSha, 343 }, 344 Repo: &tangled.Pipeline_TriggerRepo{ 345 Did: repo.Did.String(), 346 Knot: repo.Knot, 347 Repo: repo.Name, 348 }, 349 }, 350 } 351 352 // load workflow definitions from rev (without spindle context) 353 rawPipeline, err := s.loadPipeline(ctx, repoCloneUri, repoPath, event.NewSha) 354 if err != nil { 355 return fmt.Errorf("loading pipeline: %w", err) 356 } 357 if len(rawPipeline) == 0 { 358 l.Info("no workflow definition find for the repo. skipping the event") 359 return nil 360 } 361 tpl := compiler.Compile(compiler.Parse(rawPipeline)) 362 // TODO: pass compile error to workflow log 363 for _, w := range compiler.Diagnostics.Errors { 364 l.Error(w.String()) 365 } 366 for _, w := range compiler.Diagnostics.Warnings { 367 l.Warn(w.String()) 368 } 369 370 pipelineId := models.PipelineId{ 371 Knot: tpl.TriggerMetadata.Repo.Knot, 372 Rkey: tid.TID(), 373 } 374 if err := s.db.CreatePipelineEvent(pipelineId.Rkey, tpl, s.n); err != nil { 375 l.Error("failed to create pipeline event", "err", err) 376 return nil 377 } 378 err = s.processPipeline(ctx, tpl, pipelineId) 379 if err != nil { 380 return err 381 } 382 } 383 384 return nil 385} 386 387func (s *Spindle) loadPipeline(ctx context.Context, repoUri, repoPath, rev string) (workflow.RawPipeline, error) { 388 if err := git.SparseSyncGitRepo(ctx, repoUri, repoPath, rev); err != nil { 389 return nil, fmt.Errorf("syncing git repo: %w", err) 390 } 391 gr, err := kgit.Open(repoPath, rev) 392 if err != nil { 393 return nil, fmt.Errorf("opening git repo: %w", err) 394 } 395 396 workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir) 397 if errors.Is(err, object.ErrDirectoryNotFound) { 398 // return empty RawPipeline when directory doesn't exist 399 return nil, nil 400 } else if err != nil { 401 return nil, fmt.Errorf("loading file tree: %w", err) 402 } 403 404 var rawPipeline workflow.RawPipeline 405 for _, e := range workflowDir { 406 if !e.IsFile() { 407 continue 408 } 409 410 fpath := filepath.Join(workflow.WorkflowDir, e.Name) 411 contents, err := gr.RawContent(fpath) 412 if err != nil { 413 return nil, fmt.Errorf("reading raw content of '%s': %w", fpath, err) 414 } 415 416 rawPipeline = append(rawPipeline, workflow.RawWorkflow{ 417 Name: e.Name, 418 Contents: contents, 419 }) 420 } 421 422 return rawPipeline, nil 423} 424 425func (s *Spindle) processPipeline(ctx context.Context, tpl tangled.Pipeline, pipelineId models.PipelineId) error { 426 // Build pipeline environment variables once for all workflows 427 pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId, s.cfg.Server.Dev) 428 429 // filter & init workflows 430 workflows := make(map[models.Engine][]models.Workflow) 431 for _, w := range tpl.Workflows { 432 if w == nil { 433 continue 434 } 435 if _, ok := s.engs[w.Engine]; !ok { 436 err := s.db.StatusFailed(models.WorkflowId{ 437 PipelineId: pipelineId, 438 Name: w.Name, 439 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n) 440 if err != nil { 441 return fmt.Errorf("db.StatusFailed: %w", err) 442 } 443 444 continue 445 } 446 447 eng := s.engs[w.Engine] 448 449 if _, ok := workflows[eng]; !ok { 450 workflows[eng] = []models.Workflow{} 451 } 452 453 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl) 454 if err != nil { 455 return fmt.Errorf("init workflow: %w", err) 456 } 457 458 // inject TANGLED_* env vars after InitWorkflow 459 // This prevents user-defined env vars from overriding them 460 if ewf.Environment == nil { 461 ewf.Environment = make(map[string]string) 462 } 463 maps.Copy(ewf.Environment, pipelineEnv) 464 465 workflows[eng] = append(workflows[eng], *ewf) 466 } 467 468 // enqueue pipeline 469 ok := s.jq.Enqueue(queue.Job{ 470 Run: func() error { 471 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{ 472 RepoOwner: tpl.TriggerMetadata.Repo.Did, 473 RepoName: tpl.TriggerMetadata.Repo.Repo, 474 Workflows: workflows, 475 }, pipelineId) 476 return nil 477 }, 478 OnFail: func(jobError error) { 479 s.l.Error("pipeline run failed", "error", jobError) 480 }, 481 }) 482 if !ok { 483 return fmt.Errorf("failed to enqueue pipeline: queue is full") 484 } 485 s.l.Info("pipeline enqueued successfully", "id", pipelineId) 486 487 // emit StatusPending for all workflows here (after successful enqueue) 488 for _, ewfs := range workflows { 489 for _, ewf := range ewfs { 490 err := s.db.StatusPending(models.WorkflowId{ 491 PipelineId: pipelineId, 492 Name: ewf.Name, 493 }, s.n) 494 if err != nil { 495 return fmt.Errorf("db.StatusPending: %w", err) 496 } 497 } 498 } 499 return nil 500} 501 502// newRepoPath creates a path to store repository by its did and rkey. 503// The path format would be: `/data/repos/did:plc:foo/sh.tangled.repo/repo-rkey 504func (s *Spindle) newRepoPath(did syntax.DID, rkey syntax.RecordKey) string { 505 return filepath.Join(s.cfg.Server.RepoDir(), did.String(), tangled.RepoNSID, rkey.String()) 506} 507 508func (s *Spindle) newRepoCloneUrl(knot, did, name string) string { 509 scheme := "https://" 510 if s.cfg.Server.Dev { 511 scheme = "http://" 512 } 513 return fmt.Sprintf("%s%s/%s/%s", scheme, knot, did, name) 514} 515 516const RequiredVersion = "2.49.0" 517 518func ensureGitVersion() error { 519 v, err := git.Version() 520 if err != nil { 521 return fmt.Errorf("fetching git version: %w", err) 522 } 523 if v.LessThan(version.Must(version.NewVersion(RequiredVersion))) { 524 return fmt.Errorf("installed git version %q is not supported, Spindle requires git version >= %q", v, RequiredVersion) 525 } 526 return nil 527}