Monorepo for Tangled tangled.org
at sl/spindle-adapters 507 lines 14 kB view raw
1package spindle 2 3import ( 4 "context" 5 _ "embed" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "log/slog" 10 "maps" 11 "net/http" 12 "path/filepath" 13 "sync" 14 15 "github.com/bluesky-social/indigo/atproto/syntax" 16 "github.com/go-chi/chi/v5" 17 "github.com/go-git/go-git/v5/plumbing/object" 18 "github.com/hashicorp/go-version" 19 "tangled.org/core/api/tangled" 20 "tangled.org/core/eventconsumer" 21 "tangled.org/core/eventconsumer/cursor" 22 "tangled.org/core/idresolver" 23 kgit "tangled.org/core/knotserver/git" 24 "tangled.org/core/log" 25 "tangled.org/core/notifier" 26 "tangled.org/core/rbac2" 27 "tangled.org/core/spindle/config" 28 "tangled.org/core/spindle/db" 29 "tangled.org/core/spindle/engine" 30 "tangled.org/core/spindle/engines/nixery" 31 "tangled.org/core/spindle/git" 32 "tangled.org/core/spindle/models" 33 "tangled.org/core/spindle/queue" 34 "tangled.org/core/spindle/secrets" 35 "tangled.org/core/spindle/xrpc" 36 "tangled.org/core/tap" 37 "tangled.org/core/tid" 38 "tangled.org/core/workflow" 39 "tangled.org/core/xrpc/serviceauth" 40) 41 42//go:embed motd 43var defaultMotd []byte 44 45type Spindle struct { 46 tap *tap.Client 47 db *db.DB 48 e *rbac2.Enforcer 49 l *slog.Logger 50 n *notifier.Notifier 51 engs map[string]models.Engine 52 adapters map[string]models.Adapter 53 jq *queue.Queue 54 cfg *config.Config 55 ks *eventconsumer.Consumer 56 res *idresolver.Resolver 57 vault secrets.Manager 58 motd []byte 59 motdMu sync.RWMutex 60} 61 62// New creates a new Spindle server with the provided configuration and engines. 63func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) { 64 logger := log.FromContext(ctx) 65 66 if err := ensureGitVersion(); err != nil { 67 return nil, fmt.Errorf("ensuring git version: %w", err) 68 } 69 70 d, err := db.Make(ctx, cfg.Server.DBPath()) 71 if err != nil { 72 return nil, fmt.Errorf("failed to setup db: %w", err) 73 } 74 75 e, err := rbac2.NewEnforcer(cfg.Server.DBPath()) 76 if err != nil { 77 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err) 78 } 79 80 n := notifier.New() 81 82 var vault secrets.Manager 83 switch cfg.Server.Secrets.Provider { 84 case "openbao": 85 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" { 86 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider") 87 } 88 vault, err = secrets.NewOpenBaoManager( 89 cfg.Server.Secrets.OpenBao.ProxyAddr, 90 logger, 91 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount), 92 ) 93 if err != nil { 94 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err) 95 } 96 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount) 97 case "sqlite", "": 98 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath(), secrets.WithTableName("secrets")) 99 if err != nil { 100 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err) 101 } 102 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath()) 103 default: 104 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider) 105 } 106 107 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount) 108 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount) 109 110 tap := tap.NewClient(cfg.Server.TapUrl, "") 111 112 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl) 113 114 spindle := &Spindle{ 115 tap: &tap, 116 e: e, 117 db: d, 118 l: logger, 119 n: &n, 120 engs: engines, 121 jq: jq, 122 cfg: cfg, 123 res: resolver, 124 vault: vault, 125 motd: defaultMotd, 126 } 127 128 err = e.SetSpindleOwner(spindle.cfg.Server.Owner, spindle.cfg.Server.Did()) 129 if err != nil { 130 return nil, err 131 } 132 logger.Info("owner set", "did", cfg.Server.Owner) 133 134 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath()) 135 if err != nil { 136 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 137 } 138 139 // spindle listen to knot stream for sh.tangled.git.refUpdate 140 // which will sync the local workflow files in spindle and enqueues the 141 // pipeline job for on-push workflows 142 ccfg := eventconsumer.NewConsumerConfig() 143 ccfg.Logger = log.SubLogger(logger, "eventconsumer") 144 ccfg.Dev = cfg.Server.Dev 145 ccfg.ProcessFunc = spindle.processKnotStream 146 ccfg.CursorStore = cursorStore 147 knownKnots, err := d.Knots() 148 if err != nil { 149 return nil, err 150 } 151 for _, knot := range knownKnots { 152 logger.Info("adding source start", "knot", knot) 153 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{} 154 } 155 spindle.ks = eventconsumer.NewConsumer(*ccfg) 156 157 return spindle, nil 158} 159 160// DB returns the database instance. 161func (s *Spindle) DB() *db.DB { 162 return s.db 163} 164 165// Queue returns the job queue instance. 166func (s *Spindle) Queue() *queue.Queue { 167 return s.jq 168} 169 170// Engines returns the map of available engines. 171func (s *Spindle) Engines() map[string]models.Engine { 172 return s.engs 173} 174 175// Vault returns the secrets manager instance. 176func (s *Spindle) Vault() secrets.Manager { 177 return s.vault 178} 179 180// Notifier returns the notifier instance. 181func (s *Spindle) Notifier() *notifier.Notifier { 182 return s.n 183} 184 185// Enforcer returns the RBAC enforcer instance. 186func (s *Spindle) Enforcer() *rbac2.Enforcer { 187 return s.e 188} 189 190// SetMotdContent sets custom MOTD content, replacing the embedded default. 191func (s *Spindle) SetMotdContent(content []byte) { 192 s.motdMu.Lock() 193 defer s.motdMu.Unlock() 194 s.motd = content 195} 196 197// GetMotdContent returns the current MOTD content. 198func (s *Spindle) GetMotdContent() []byte { 199 s.motdMu.RLock() 200 defer s.motdMu.RUnlock() 201 return s.motd 202} 203 204// Start starts the Spindle server (blocking). 205func (s *Spindle) Start(ctx context.Context) error { 206 // starts a job queue runner in the background 207 s.jq.Start() 208 defer s.jq.Stop() 209 210 // Stop vault token renewal if it implements Stopper 211 if stopper, ok := s.vault.(secrets.Stopper); ok { 212 defer stopper.Stop() 213 } 214 215 go func() { 216 s.l.Info("starting knot event consumer") 217 s.ks.Start(ctx) 218 }() 219 220 // ensure server owner is tracked 221 if err := s.tap.AddRepos(ctx, []syntax.DID{s.cfg.Server.Owner}); err != nil { 222 return err 223 } 224 225 go func() { 226 s.l.Info("starting tap stream consumer") 227 s.tap.Connect(ctx, &tap.SimpleIndexer{ 228 EventHandler: s.processEvent, 229 }) 230 }() 231 232 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr) 233 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router()) 234} 235 236func Run(ctx context.Context) error { 237 cfg, err := config.Load(ctx) 238 if err != nil { 239 return fmt.Errorf("failed to load config: %w", err) 240 } 241 242 nixeryEng, err := nixery.New(ctx, cfg) 243 if err != nil { 244 return err 245 } 246 247 s, err := New(ctx, cfg, map[string]models.Engine{ 248 "nixery": nixeryEng, 249 }) 250 if err != nil { 251 return err 252 } 253 254 return s.Start(ctx) 255} 256 257func (s *Spindle) Router() http.Handler { 258 mux := chi.NewRouter() 259 260 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { 261 w.Write(s.GetMotdContent()) 262 }) 263 mux.HandleFunc("/events", s.Events) 264 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs) 265 266 mux.Mount("/xrpc", s.XrpcRouter()) 267 return mux 268} 269 270func (s *Spindle) XrpcRouter() http.Handler { 271 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String()) 272 273 l := log.SubLogger(s.l, "xrpc") 274 275 x := xrpc.Xrpc{ 276 Logger: l, 277 Db: s.db, 278 Enforcer: s.e, 279 Engines: s.engs, 280 Config: s.cfg, 281 Resolver: s.res, 282 Vault: s.vault, 283 Notifier: s.Notifier(), 284 ServiceAuth: serviceAuth, 285 } 286 287 return x.Router() 288} 289 290func (s *Spindle) processKnotStream(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 291 l := log.FromContext(ctx).With("handler", "processKnotStream") 292 l = l.With("src", src.Key(), "msg.Nsid", msg.Nsid, "msg.Rkey", msg.Rkey) 293 if msg.Nsid == tangled.GitRefUpdateNSID { 294 event := tangled.GitRefUpdate{} 295 if err := json.Unmarshal(msg.EventJson, &event); err != nil { 296 l.Error("error unmarshalling", "err", err) 297 return err 298 } 299 l = l.With("repoDid", event.RepoDid, "repoName", event.RepoName) 300 301 // resolve repo name to rkey 302 // TODO: git.refUpdate should respond with rkey instead of repo name 303 repo, err := s.db.GetRepoWithName(syntax.DID(event.RepoDid), event.RepoName) 304 if err != nil { 305 return fmt.Errorf("get repo with did and name (%s/%s): %w", event.RepoDid, event.RepoName, err) 306 } 307 308 // NOTE: we are blindly trusting the knot that it will return only repos it own 309 repoCloneUri := s.newRepoCloneUrl(src.Key(), event.RepoDid, event.RepoName) 310 repoPath := s.newRepoPath(repo.Did, repo.Rkey) 311 if err := git.SparseSyncGitRepo(ctx, repoCloneUri, repoPath, event.NewSha); err != nil { 312 return fmt.Errorf("sync git repo: %w", err) 313 } 314 l.Info("synced git repo") 315 316 compiler := workflow.Compiler{ 317 Trigger: tangled.Pipeline_TriggerMetadata{ 318 Kind: string(workflow.TriggerKindPush), 319 Push: &tangled.Pipeline_PushTriggerData{ 320 Ref: event.Ref, 321 OldSha: event.OldSha, 322 NewSha: event.NewSha, 323 }, 324 Repo: &tangled.Pipeline_TriggerRepo{ 325 Did: repo.Did.String(), 326 Knot: repo.Knot, 327 Repo: repo.Name, 328 }, 329 }, 330 } 331 332 // load workflow definitions from rev (without spindle context) 333 rawPipeline, err := s.loadPipeline(ctx, repoCloneUri, repoPath, event.NewSha) 334 if err != nil { 335 return fmt.Errorf("loading pipeline: %w", err) 336 } 337 if len(rawPipeline) == 0 { 338 l.Info("no workflow definition find for the repo. skipping the event") 339 return nil 340 } 341 tpl := compiler.Compile(compiler.Parse(rawPipeline)) 342 // TODO: pass compile error to workflow log 343 for _, w := range compiler.Diagnostics.Errors { 344 l.Error(w.String()) 345 } 346 for _, w := range compiler.Diagnostics.Warnings { 347 l.Warn(w.String()) 348 } 349 350 pipelineId := models.PipelineId{ 351 Knot: tpl.TriggerMetadata.Repo.Knot, 352 Rkey: tid.TID(), 353 } 354 if err := s.db.CreatePipelineEvent(pipelineId.Rkey, tpl, s.n); err != nil { 355 l.Error("failed to create pipeline event", "err", err) 356 return nil 357 } 358 err = s.processPipeline(ctx, tpl, pipelineId) 359 if err != nil { 360 return err 361 } 362 } 363 364 return nil 365} 366 367func (s *Spindle) loadPipeline(ctx context.Context, repoUri, repoPath, rev string) (workflow.RawPipeline, error) { 368 if err := git.SparseSyncGitRepo(ctx, repoUri, repoPath, rev); err != nil { 369 return nil, fmt.Errorf("syncing git repo: %w", err) 370 } 371 gr, err := kgit.Open(repoPath, rev) 372 if err != nil { 373 return nil, fmt.Errorf("opening git repo: %w", err) 374 } 375 376 workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir) 377 if errors.Is(err, object.ErrDirectoryNotFound) { 378 // return empty RawPipeline when directory doesn't exist 379 return nil, nil 380 } else if err != nil { 381 return nil, fmt.Errorf("loading file tree: %w", err) 382 } 383 384 var rawPipeline workflow.RawPipeline 385 for _, e := range workflowDir { 386 if !e.IsFile() { 387 continue 388 } 389 390 fpath := filepath.Join(workflow.WorkflowDir, e.Name) 391 contents, err := gr.RawContent(fpath) 392 if err != nil { 393 return nil, fmt.Errorf("reading raw content of '%s': %w", fpath, err) 394 } 395 396 rawPipeline = append(rawPipeline, workflow.RawWorkflow{ 397 Name: e.Name, 398 Contents: contents, 399 }) 400 } 401 402 return rawPipeline, nil 403} 404 405func (s *Spindle) processPipeline(ctx context.Context, tpl tangled.Pipeline, pipelineId models.PipelineId) error { 406 // Build pipeline environment variables once for all workflows 407 pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId, s.cfg.Server.Dev) 408 409 // filter & init workflows 410 workflows := make(map[models.Engine][]models.Workflow) 411 for _, w := range tpl.Workflows { 412 if w == nil { 413 continue 414 } 415 if _, ok := s.engs[w.Engine]; !ok { 416 err := s.db.StatusFailed(models.WorkflowId{ 417 PipelineId: pipelineId, 418 Name: w.Name, 419 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n) 420 if err != nil { 421 return fmt.Errorf("db.StatusFailed: %w", err) 422 } 423 424 continue 425 } 426 427 eng := s.engs[w.Engine] 428 429 if _, ok := workflows[eng]; !ok { 430 workflows[eng] = []models.Workflow{} 431 } 432 433 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl) 434 if err != nil { 435 return fmt.Errorf("init workflow: %w", err) 436 } 437 438 // inject TANGLED_* env vars after InitWorkflow 439 // This prevents user-defined env vars from overriding them 440 if ewf.Environment == nil { 441 ewf.Environment = make(map[string]string) 442 } 443 maps.Copy(ewf.Environment, pipelineEnv) 444 445 workflows[eng] = append(workflows[eng], *ewf) 446 } 447 448 // enqueue pipeline 449 ok := s.jq.Enqueue(queue.Job{ 450 Run: func() error { 451 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{ 452 RepoOwner: tpl.TriggerMetadata.Repo.Did, 453 RepoName: tpl.TriggerMetadata.Repo.Repo, 454 Workflows: workflows, 455 }, pipelineId) 456 return nil 457 }, 458 OnFail: func(jobError error) { 459 s.l.Error("pipeline run failed", "error", jobError) 460 }, 461 }) 462 if !ok { 463 return fmt.Errorf("failed to enqueue pipeline: queue is full") 464 } 465 s.l.Info("pipeline enqueued successfully", "id", pipelineId) 466 467 // emit StatusPending for all workflows here (after successful enqueue) 468 for _, ewfs := range workflows { 469 for _, ewf := range ewfs { 470 err := s.db.StatusPending(models.WorkflowId{ 471 PipelineId: pipelineId, 472 Name: ewf.Name, 473 }, s.n) 474 if err != nil { 475 return fmt.Errorf("db.StatusPending: %w", err) 476 } 477 } 478 } 479 return nil 480} 481 482// newRepoPath creates a path to store repository by its did and rkey. 483// The path format would be: `/data/repos/did:plc:foo/sh.tangled.repo/repo-rkey 484func (s *Spindle) newRepoPath(did syntax.DID, rkey syntax.RecordKey) string { 485 return filepath.Join(s.cfg.Server.RepoDir(), did.String(), tangled.RepoNSID, rkey.String()) 486} 487 488func (s *Spindle) newRepoCloneUrl(knot, did, name string) string { 489 scheme := "https://" 490 if s.cfg.Server.Dev { 491 scheme = "http://" 492 } 493 return fmt.Sprintf("%s%s/%s/%s", scheme, knot, did, name) 494} 495 496const RequiredVersion = "2.49.0" 497 498func ensureGitVersion() error { 499 v, err := git.Version() 500 if err != nil { 501 return fmt.Errorf("fetching git version: %w", err) 502 } 503 if v.LessThan(version.Must(version.NewVersion(RequiredVersion))) { 504 return fmt.Errorf("installed git version %q is not supported, Spindle requires git version >= %q", v, RequiredVersion) 505 } 506 return nil 507}