Monorepo for Tangled tangled.org
at sl/knotmirror 544 lines 15 kB view raw
1package spindle 2 3import ( 4 "context" 5 _ "embed" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "log/slog" 10 "maps" 11 "net/http" 12 "path/filepath" 13 "sync" 14 "time" 15 16 "github.com/bluesky-social/indigo/atproto/syntax" 17 "github.com/bluesky-social/indigo/service/tap" 18 "github.com/go-chi/chi/v5" 19 "github.com/go-git/go-git/v5/plumbing/object" 20 "github.com/hashicorp/go-version" 21 "tangled.org/core/api/tangled" 22 "tangled.org/core/eventconsumer" 23 "tangled.org/core/eventconsumer/cursor" 24 "tangled.org/core/idresolver" 25 kgit "tangled.org/core/knotserver/git" 26 "tangled.org/core/log" 27 "tangled.org/core/notifier" 28 "tangled.org/core/rbac2" 29 "tangled.org/core/spindle/config" 30 "tangled.org/core/spindle/db" 31 "tangled.org/core/spindle/engine" 32 "tangled.org/core/spindle/engines/nixery" 33 "tangled.org/core/spindle/git" 34 "tangled.org/core/spindle/models" 35 "tangled.org/core/spindle/queue" 36 "tangled.org/core/spindle/secrets" 37 "tangled.org/core/spindle/xrpc" 38 "tangled.org/core/tapc" 39 "tangled.org/core/tid" 40 "tangled.org/core/workflow" 41 "tangled.org/core/xrpc/serviceauth" 42) 43 44//go:embed motd 45var defaultMotd []byte 46 47type Spindle struct { 48 tap *tapc.Client 49 db *db.DB 50 e *rbac2.Enforcer 51 l *slog.Logger 52 n *notifier.Notifier 53 engs map[string]models.Engine 54 jq *queue.Queue 55 cfg *config.Config 56 ks *eventconsumer.Consumer 57 res *idresolver.Resolver 58 vault secrets.Manager 59 motd []byte 60 motdMu sync.RWMutex 61} 62 63// New creates a new Spindle server with the provided configuration and engines. 64func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) { 65 logger := log.FromContext(ctx) 66 67 if err := ensureGitVersion(); err != nil { 68 return nil, fmt.Errorf("ensuring git version: %w", err) 69 } 70 71 d, err := db.Make(ctx, cfg.Server.DBPath()) 72 if err != nil { 73 return nil, fmt.Errorf("failed to setup db: %w", err) 74 } 75 76 e, err := rbac2.NewEnforcer(cfg.Server.DBPath()) 77 if err != nil { 78 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err) 79 } 80 81 n := notifier.New() 82 83 var vault secrets.Manager 84 switch cfg.Server.Secrets.Provider { 85 case "openbao": 86 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" { 87 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider") 88 } 89 vault, err = secrets.NewOpenBaoManager( 90 cfg.Server.Secrets.OpenBao.ProxyAddr, 91 logger, 92 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount), 93 ) 94 if err != nil { 95 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err) 96 } 97 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount) 98 case "sqlite", "": 99 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath(), secrets.WithTableName("secrets")) 100 if err != nil { 101 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err) 102 } 103 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath()) 104 default: 105 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider) 106 } 107 108 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount) 109 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount) 110 111 tap := tapc.NewClient("http://localhost:"+cfg.Server.TapPort, "") 112 113 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl) 114 115 spindle := &Spindle{ 116 tap: &tap, 117 e: e, 118 db: d, 119 l: logger, 120 n: &n, 121 engs: engines, 122 jq: jq, 123 cfg: cfg, 124 res: resolver, 125 vault: vault, 126 motd: defaultMotd, 127 } 128 129 err = e.SetSpindleOwner(spindle.cfg.Server.Owner, spindle.cfg.Server.Did()) 130 if err != nil { 131 return nil, err 132 } 133 logger.Info("owner set", "did", cfg.Server.Owner) 134 135 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath()) 136 if err != nil { 137 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 138 } 139 140 // spindle listen to knot stream for sh.tangled.git.refUpdate 141 // which will sync the local workflow files in spindle and enqueues the 142 // pipeline job for on-push workflows 143 ccfg := eventconsumer.NewConsumerConfig() 144 ccfg.Logger = log.SubLogger(logger, "eventconsumer") 145 ccfg.Dev = cfg.Server.Dev 146 ccfg.ProcessFunc = spindle.processKnotStream 147 ccfg.CursorStore = cursorStore 148 knownKnots, err := d.Knots() 149 if err != nil { 150 return nil, err 151 } 152 for _, knot := range knownKnots { 153 logger.Info("adding source start", "knot", knot) 154 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{} 155 } 156 spindle.ks = eventconsumer.NewConsumer(*ccfg) 157 158 return spindle, nil 159} 160 161// DB returns the database instance. 162func (s *Spindle) DB() *db.DB { 163 return s.db 164} 165 166// Queue returns the job queue instance. 167func (s *Spindle) Queue() *queue.Queue { 168 return s.jq 169} 170 171// Engines returns the map of available engines. 172func (s *Spindle) Engines() map[string]models.Engine { 173 return s.engs 174} 175 176// Vault returns the secrets manager instance. 177func (s *Spindle) Vault() secrets.Manager { 178 return s.vault 179} 180 181// Notifier returns the notifier instance. 182func (s *Spindle) Notifier() *notifier.Notifier { 183 return s.n 184} 185 186// Enforcer returns the RBAC enforcer instance. 187func (s *Spindle) Enforcer() *rbac2.Enforcer { 188 return s.e 189} 190 191// SetMotdContent sets custom MOTD content, replacing the embedded default. 192func (s *Spindle) SetMotdContent(content []byte) { 193 s.motdMu.Lock() 194 defer s.motdMu.Unlock() 195 s.motd = content 196} 197 198// GetMotdContent returns the current MOTD content. 199func (s *Spindle) GetMotdContent() []byte { 200 s.motdMu.RLock() 201 defer s.motdMu.RUnlock() 202 return s.motd 203} 204 205// Start starts the Spindle server (blocking). 206func (s *Spindle) Start(ctx context.Context) error { 207 svcErr := make(chan error, 1) 208 209 // starts a job queue runner in the background 210 s.jq.Start() 211 defer s.jq.Stop() 212 213 // Stop vault token renewal if it implements Stopper 214 if stopper, ok := s.vault.(secrets.Stopper); ok { 215 defer stopper.Stop() 216 } 217 218 go func() { 219 s.l.Info("starting knot event consumer") 220 s.ks.Start(ctx) 221 }() 222 223 tap, err := tap.New(tap.Config{ 224 DatabaseURL: s.cfg.Server.TapDBUrl, 225 DBMaxConns: 32, 226 PLCURL: s.cfg.Server.PlcUrl, 227 RelayUrl: s.cfg.Server.RelayUrl, 228 FirehoseParallelism: 10, 229 ResyncParallelism: 5, 230 OutboxParallelism: 1, 231 FirehoseCursorSaveInterval: 1 * time.Second, 232 RepoFetchTimeout: 300 * time.Second, 233 IdentityCacheSize: 2_000_000, 234 EventCacheSize: 100_000, 235 SignalCollection: tangled.RepoPullNSID, // HACK: listen for repo.pull from non-tangled users 236 CollectionFilters: []string{tangled.RepoNSID, tangled.RepoCollaboratorNSID, tangled.RepoPullNSID, tangled.SpindleMemberNSID}, 237 RetryTimeout: 3 * time.Second, 238 }) 239 if err != nil { 240 return err 241 } 242 go func() { 243 if err := tap.RunTap(ctx, ":"+s.cfg.Server.TapPort); err != nil { 244 svcErr <- err 245 } 246 }() 247 go func() { 248 s.l.Info("starting embedded tap server") 249 250 s.l.Info("starting tap stream consumer") 251 s.tap.Connect(ctx, &tapc.SimpleIndexer{ 252 EventHandler: s.processEvent, 253 }) 254 }() 255 256 s.l.Debug("waiting for tap readiness") 257 tapReadyCtx, cancel := context.WithTimeout(ctx, 10*time.Second) 258 defer cancel() 259 if err := s.tap.WaitReady(tapReadyCtx); err != nil { 260 return fmt.Errorf("tap not ready: %w", err) 261 } 262 263 // ensure server owner is tracked 264 s.l.Debug("adding server owner to tap") 265 if err := s.tap.AddRepos(ctx, []syntax.DID{s.cfg.Server.Owner}); err != nil { 266 return err 267 } 268 269 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr) 270 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router()) 271} 272 273func Run(ctx context.Context) error { 274 cfg, err := config.Load(ctx) 275 if err != nil { 276 return fmt.Errorf("failed to load config: %w", err) 277 } 278 279 nixeryEng, err := nixery.New(ctx, cfg) 280 if err != nil { 281 return err 282 } 283 284 s, err := New(ctx, cfg, map[string]models.Engine{ 285 "nixery": nixeryEng, 286 }) 287 if err != nil { 288 return err 289 } 290 291 return s.Start(ctx) 292} 293 294func (s *Spindle) Router() http.Handler { 295 mux := chi.NewRouter() 296 297 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { 298 w.Write(s.GetMotdContent()) 299 }) 300 mux.HandleFunc("/events", s.Events) 301 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs) 302 303 mux.Mount("/xrpc", s.XrpcRouter()) 304 return mux 305} 306 307func (s *Spindle) XrpcRouter() http.Handler { 308 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String()) 309 310 l := log.SubLogger(s.l, "xrpc") 311 312 x := xrpc.Xrpc{ 313 Logger: l, 314 Db: s.db, 315 Enforcer: s.e, 316 Engines: s.engs, 317 Config: s.cfg, 318 Resolver: s.res, 319 Vault: s.vault, 320 Notifier: s.Notifier(), 321 ServiceAuth: serviceAuth, 322 } 323 324 return x.Router() 325} 326 327func (s *Spindle) processKnotStream(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 328 l := log.FromContext(ctx).With("handler", "processKnotStream") 329 l = l.With("src", src.Key(), "msg.Nsid", msg.Nsid, "msg.Rkey", msg.Rkey) 330 if msg.Nsid == tangled.GitRefUpdateNSID { 331 event := tangled.GitRefUpdate{} 332 if err := json.Unmarshal(msg.EventJson, &event); err != nil { 333 l.Error("error unmarshalling", "err", err) 334 return err 335 } 336 l = l.With("repoDid", event.RepoDid, "repoName", event.RepoName) 337 338 // resolve repo name to rkey 339 // TODO: git.refUpdate should respond with rkey instead of repo name 340 repo, err := s.db.GetRepoWithName(syntax.DID(event.RepoDid), event.RepoName) 341 if err != nil { 342 return fmt.Errorf("get repo with did and name (%s/%s): %w", event.RepoDid, event.RepoName, err) 343 } 344 345 // NOTE: we are blindly trusting the knot that it will return only repos it own 346 repoCloneUri := s.newRepoCloneUrl(src.Key(), event.RepoDid, event.RepoName) 347 repoPath := s.newRepoPath(repo.Did, repo.Rkey) 348 if err := git.SparseSyncGitRepo(ctx, repoCloneUri, repoPath, event.NewSha); err != nil { 349 return fmt.Errorf("sync git repo: %w", err) 350 } 351 l.Info("synced git repo") 352 353 compiler := workflow.Compiler{ 354 Trigger: tangled.Pipeline_TriggerMetadata{ 355 Kind: string(workflow.TriggerKindPush), 356 Push: &tangled.Pipeline_PushTriggerData{ 357 Ref: event.Ref, 358 OldSha: event.OldSha, 359 NewSha: event.NewSha, 360 }, 361 Repo: &tangled.Pipeline_TriggerRepo{ 362 Did: repo.Did.String(), 363 Knot: repo.Knot, 364 Repo: repo.Name, 365 }, 366 }, 367 } 368 369 // load workflow definitions from rev (without spindle context) 370 rawPipeline, err := s.loadPipeline(ctx, repoCloneUri, repoPath, event.NewSha) 371 if err != nil { 372 return fmt.Errorf("loading pipeline: %w", err) 373 } 374 if len(rawPipeline) == 0 { 375 l.Info("no workflow definition find for the repo. skipping the event") 376 return nil 377 } 378 tpl := compiler.Compile(compiler.Parse(rawPipeline)) 379 // TODO: pass compile error to workflow log 380 for _, w := range compiler.Diagnostics.Errors { 381 l.Error(w.String()) 382 } 383 for _, w := range compiler.Diagnostics.Warnings { 384 l.Warn(w.String()) 385 } 386 387 pipelineId := models.PipelineId{ 388 Knot: tpl.TriggerMetadata.Repo.Knot, 389 Rkey: tid.TID(), 390 } 391 if err := s.db.CreatePipelineEvent(pipelineId.Rkey, tpl, s.n); err != nil { 392 l.Error("failed to create pipeline event", "err", err) 393 return nil 394 } 395 err = s.processPipeline(ctx, tpl, pipelineId) 396 if err != nil { 397 return err 398 } 399 } 400 401 return nil 402} 403 404func (s *Spindle) loadPipeline(ctx context.Context, repoUri, repoPath, rev string) (workflow.RawPipeline, error) { 405 if err := git.SparseSyncGitRepo(ctx, repoUri, repoPath, rev); err != nil { 406 return nil, fmt.Errorf("syncing git repo: %w", err) 407 } 408 gr, err := kgit.Open(repoPath, rev) 409 if err != nil { 410 return nil, fmt.Errorf("opening git repo: %w", err) 411 } 412 413 workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir) 414 if errors.Is(err, object.ErrDirectoryNotFound) { 415 // return empty RawPipeline when directory doesn't exist 416 return nil, nil 417 } else if err != nil { 418 return nil, fmt.Errorf("loading file tree: %w", err) 419 } 420 421 var rawPipeline workflow.RawPipeline 422 for _, e := range workflowDir { 423 if !e.IsFile() { 424 continue 425 } 426 427 fpath := filepath.Join(workflow.WorkflowDir, e.Name) 428 contents, err := gr.RawContent(fpath) 429 if err != nil { 430 return nil, fmt.Errorf("reading raw content of '%s': %w", fpath, err) 431 } 432 433 rawPipeline = append(rawPipeline, workflow.RawWorkflow{ 434 Name: e.Name, 435 Contents: contents, 436 }) 437 } 438 439 return rawPipeline, nil 440} 441 442func (s *Spindle) processPipeline(ctx context.Context, tpl tangled.Pipeline, pipelineId models.PipelineId) error { 443 // Build pipeline environment variables once for all workflows 444 pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId, s.cfg.Server.Dev) 445 446 // filter & init workflows 447 workflows := make(map[models.Engine][]models.Workflow) 448 for _, w := range tpl.Workflows { 449 if w == nil { 450 continue 451 } 452 if _, ok := s.engs[w.Engine]; !ok { 453 err := s.db.StatusFailed(models.WorkflowId{ 454 PipelineId: pipelineId, 455 Name: w.Name, 456 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n) 457 if err != nil { 458 return fmt.Errorf("db.StatusFailed: %w", err) 459 } 460 461 continue 462 } 463 464 eng := s.engs[w.Engine] 465 466 if _, ok := workflows[eng]; !ok { 467 workflows[eng] = []models.Workflow{} 468 } 469 470 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl) 471 if err != nil { 472 return fmt.Errorf("init workflow: %w", err) 473 } 474 475 // inject TANGLED_* env vars after InitWorkflow 476 // This prevents user-defined env vars from overriding them 477 if ewf.Environment == nil { 478 ewf.Environment = make(map[string]string) 479 } 480 maps.Copy(ewf.Environment, pipelineEnv) 481 482 workflows[eng] = append(workflows[eng], *ewf) 483 } 484 485 // enqueue pipeline 486 ok := s.jq.Enqueue(queue.Job{ 487 Run: func() error { 488 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{ 489 RepoOwner: tpl.TriggerMetadata.Repo.Did, 490 RepoName: tpl.TriggerMetadata.Repo.Repo, 491 Workflows: workflows, 492 }, pipelineId) 493 return nil 494 }, 495 OnFail: func(jobError error) { 496 s.l.Error("pipeline run failed", "error", jobError) 497 }, 498 }) 499 if !ok { 500 return fmt.Errorf("failed to enqueue pipeline: queue is full") 501 } 502 s.l.Info("pipeline enqueued successfully", "id", pipelineId) 503 504 // emit StatusPending for all workflows here (after successful enqueue) 505 for _, ewfs := range workflows { 506 for _, ewf := range ewfs { 507 err := s.db.StatusPending(models.WorkflowId{ 508 PipelineId: pipelineId, 509 Name: ewf.Name, 510 }, s.n) 511 if err != nil { 512 return fmt.Errorf("db.StatusPending: %w", err) 513 } 514 } 515 } 516 return nil 517} 518 519// newRepoPath creates a path to store repository by its did and rkey. 520// The path format would be: `/data/repos/did:plc:foo/sh.tangled.repo/repo-rkey 521func (s *Spindle) newRepoPath(did syntax.DID, rkey syntax.RecordKey) string { 522 return filepath.Join(s.cfg.Server.RepoDir(), did.String(), tangled.RepoNSID, rkey.String()) 523} 524 525func (s *Spindle) newRepoCloneUrl(knot, did, name string) string { 526 scheme := "https://" 527 if s.cfg.Server.Dev { 528 scheme = "http://" 529 } 530 return fmt.Sprintf("%s%s/%s/%s", scheme, knot, did, name) 531} 532 533const RequiredVersion = "2.49.0" 534 535func ensureGitVersion() error { 536 v, err := git.Version() 537 if err != nil { 538 return fmt.Errorf("fetching git version: %w", err) 539 } 540 if v.LessThan(version.Must(version.NewVersion(RequiredVersion))) { 541 return fmt.Errorf("installed git version %q is not supported, Spindle requires git version >= %q", v, RequiredVersion) 542 } 543 return nil 544}