this repo has no description
1package spindle
2
3import (
4 "context"
5 _ "embed"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "log/slog"
10 "maps"
11 "net/http"
12 "path/filepath"
13
14 "github.com/bluesky-social/indigo/atproto/syntax"
15 "github.com/go-chi/chi/v5"
16 "github.com/go-git/go-git/v5/plumbing/object"
17 "github.com/hashicorp/go-version"
18 "tangled.org/core/api/tangled"
19 "tangled.org/core/eventconsumer"
20 "tangled.org/core/eventconsumer/cursor"
21 "tangled.org/core/idresolver"
22 kgit "tangled.org/core/knotserver/git"
23 "tangled.org/core/log"
24 "tangled.org/core/notifier"
25 "tangled.org/core/rbac2"
26 "tangled.org/core/spindle/config"
27 "tangled.org/core/spindle/db"
28 "tangled.org/core/spindle/engine"
29 "tangled.org/core/spindle/engines/nixery"
30 "tangled.org/core/spindle/git"
31 "tangled.org/core/spindle/models"
32 "tangled.org/core/spindle/queue"
33 "tangled.org/core/spindle/secrets"
34 "tangled.org/core/spindle/xrpc"
35 "tangled.org/core/tap"
36 "tangled.org/core/tid"
37 "tangled.org/core/workflow"
38 "tangled.org/core/xrpc/serviceauth"
39)
40
41//go:embed motd
42var motd []byte
43
44type Spindle struct {
45 tap *tap.Client
46 db *db.DB
47 e *rbac2.Enforcer
48 l *slog.Logger
49 n *notifier.Notifier
50 engs map[string]models.Engine
51 jq *queue.Queue
52 cfg *config.Config
53 ks *eventconsumer.Consumer
54 res *idresolver.Resolver
55 vault secrets.Manager
56}
57
58// New creates a new Spindle server with the provided configuration and engines.
59func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) {
60 logger := log.FromContext(ctx)
61
62 if err := ensureGitVersion(); err != nil {
63 return nil, fmt.Errorf("ensuring git version: %w", err)
64 }
65
66 d, err := db.Make(ctx, cfg.Server.DBPath())
67 if err != nil {
68 return nil, fmt.Errorf("failed to setup db: %w", err)
69 }
70
71 e, err := rbac2.NewEnforcer(cfg.Server.DBPath())
72 if err != nil {
73 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err)
74 }
75
76 n := notifier.New()
77
78 var vault secrets.Manager
79 switch cfg.Server.Secrets.Provider {
80 case "openbao":
81 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" {
82 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider")
83 }
84 vault, err = secrets.NewOpenBaoManager(
85 cfg.Server.Secrets.OpenBao.ProxyAddr,
86 logger,
87 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount),
88 )
89 if err != nil {
90 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err)
91 }
92 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount)
93 case "sqlite", "":
94 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath(), secrets.WithTableName("secrets"))
95 if err != nil {
96 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err)
97 }
98 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath())
99 default:
100 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider)
101 }
102
103 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount)
104 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount)
105
106 tap := tap.NewClient(cfg.Server.TapUrl, "")
107
108 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl)
109
110 spindle := &Spindle{
111 tap: &tap,
112 e: e,
113 db: d,
114 l: logger,
115 n: &n,
116 engs: engines,
117 jq: jq,
118 cfg: cfg,
119 res: resolver,
120 vault: vault,
121 }
122
123 err = e.SetSpindleOwner(spindle.cfg.Server.Owner, spindle.cfg.Server.Did())
124 if err != nil {
125 return nil, err
126 }
127 logger.Info("owner set", "did", cfg.Server.Owner)
128
129 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath())
130 if err != nil {
131 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
132 }
133
134 // spindle listen to knot stream for sh.tangled.git.refUpdate
135 // which will sync the local workflow files in spindle and enqueues the
136 // pipeline job for on-push workflows
137 ccfg := eventconsumer.NewConsumerConfig()
138 ccfg.Logger = log.SubLogger(logger, "eventconsumer")
139 ccfg.Dev = cfg.Server.Dev
140 ccfg.ProcessFunc = spindle.processKnotStream
141 ccfg.CursorStore = cursorStore
142 knownKnots, err := d.Knots()
143 if err != nil {
144 return nil, err
145 }
146 for _, knot := range knownKnots {
147 logger.Info("adding source start", "knot", knot)
148 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{}
149 }
150 spindle.ks = eventconsumer.NewConsumer(*ccfg)
151
152 return spindle, nil
153}
154
155// DB returns the database instance.
156func (s *Spindle) DB() *db.DB {
157 return s.db
158}
159
160// Queue returns the job queue instance.
161func (s *Spindle) Queue() *queue.Queue {
162 return s.jq
163}
164
165// Engines returns the map of available engines.
166func (s *Spindle) Engines() map[string]models.Engine {
167 return s.engs
168}
169
170// Vault returns the secrets manager instance.
171func (s *Spindle) Vault() secrets.Manager {
172 return s.vault
173}
174
175// Notifier returns the notifier instance.
176func (s *Spindle) Notifier() *notifier.Notifier {
177 return s.n
178}
179
180// Enforcer returns the RBAC enforcer instance.
181func (s *Spindle) Enforcer() *rbac2.Enforcer {
182 return s.e
183}
184
185// Start starts the Spindle server (blocking).
186func (s *Spindle) Start(ctx context.Context) error {
187 // starts a job queue runner in the background
188 s.jq.Start()
189 defer s.jq.Stop()
190
191 // Stop vault token renewal if it implements Stopper
192 if stopper, ok := s.vault.(secrets.Stopper); ok {
193 defer stopper.Stop()
194 }
195
196 go func() {
197 s.l.Info("starting knot event consumer")
198 s.ks.Start(ctx)
199 }()
200
201 // ensure server owner is tracked
202 if err := s.tap.AddRepos(ctx, []syntax.DID{s.cfg.Server.Owner}); err != nil {
203 return err
204 }
205
206 go func() {
207 s.l.Info("starting tap stream consumer")
208 s.tap.Connect(ctx, &tap.SimpleIndexer{
209 EventHandler: s.processEvent,
210 })
211 }()
212
213 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr)
214 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router())
215}
216
217func Run(ctx context.Context) error {
218 cfg, err := config.Load(ctx)
219 if err != nil {
220 return fmt.Errorf("failed to load config: %w", err)
221 }
222
223 nixeryEng, err := nixery.New(ctx, cfg)
224 if err != nil {
225 return err
226 }
227
228 s, err := New(ctx, cfg, map[string]models.Engine{
229 "nixery": nixeryEng,
230 })
231 if err != nil {
232 return err
233 }
234
235 return s.Start(ctx)
236}
237
238func (s *Spindle) Router() http.Handler {
239 mux := chi.NewRouter()
240
241 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
242 w.Write(motd)
243 })
244 mux.HandleFunc("/events", s.Events)
245 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs)
246
247 mux.Mount("/xrpc", s.XrpcRouter())
248 return mux
249}
250
251func (s *Spindle) XrpcRouter() http.Handler {
252 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String())
253
254 l := log.SubLogger(s.l, "xrpc")
255
256 x := xrpc.Xrpc{
257 Logger: l,
258 Db: s.db,
259 Enforcer: s.e,
260 Engines: s.engs,
261 Config: s.cfg,
262 Resolver: s.res,
263 Vault: s.vault,
264 Notifier: s.Notifier(),
265 ServiceAuth: serviceAuth,
266 }
267
268 return x.Router()
269}
270
271func (s *Spindle) processKnotStream(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error {
272 l := log.FromContext(ctx).With("handler", "processKnotStream")
273 l = l.With("src", src.Key(), "msg.Nsid", msg.Nsid, "msg.Rkey", msg.Rkey)
274 if msg.Nsid == tangled.PipelineNSID {
275 return nil
276 tpl := tangled.Pipeline{}
277 err := json.Unmarshal(msg.EventJson, &tpl)
278 if err != nil {
279 fmt.Println("error unmarshalling", err)
280 return err
281 }
282
283 if tpl.TriggerMetadata == nil {
284 return fmt.Errorf("no trigger metadata found")
285 }
286
287 if tpl.TriggerMetadata.Repo == nil {
288 return fmt.Errorf("no repo data found")
289 }
290
291 if src.Key() != tpl.TriggerMetadata.Repo.Knot {
292 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot)
293 }
294
295 // filter by repos
296 _, err = s.db.GetRepoWithName(
297 syntax.DID(tpl.TriggerMetadata.Repo.Did),
298 tpl.TriggerMetadata.Repo.Repo,
299 )
300 if err != nil {
301 return fmt.Errorf("failed to get repo: %w", err)
302 }
303
304 pipelineId := models.PipelineId{
305 Knot: src.Key(),
306 Rkey: msg.Rkey,
307 }
308
309 err = s.processPipeline(ctx, tpl, pipelineId)
310 if err != nil {
311 return err
312 }
313 } else if msg.Nsid == tangled.GitRefUpdateNSID {
314 event := tangled.GitRefUpdate{}
315 if err := json.Unmarshal(msg.EventJson, &event); err != nil {
316 l.Error("error unmarshalling", "err", err)
317 return err
318 }
319 l = l.With("repoDid", event.RepoDid, "repoName", event.RepoName)
320
321 // resolve repo name to rkey
322 // TODO: git.refUpdate should respond with rkey instead of repo name
323 repo, err := s.db.GetRepoWithName(syntax.DID(event.RepoDid), event.RepoName)
324 if err != nil {
325 return fmt.Errorf("get repo with did and name (%s/%s): %w", event.RepoDid, event.RepoName, err)
326 }
327
328 // NOTE: we are blindly trusting the knot that it will return only repos it own
329 repoCloneUri := s.newRepoCloneUrl(src.Key(), event.RepoDid, event.RepoName)
330 repoPath := s.newRepoPath(repo.Did, repo.Rkey)
331 if err := git.SparseSyncGitRepo(ctx, repoCloneUri, repoPath, event.NewSha); err != nil {
332 return fmt.Errorf("sync git repo: %w", err)
333 }
334 l.Info("synced git repo")
335
336 compiler := workflow.Compiler{
337 Trigger: tangled.Pipeline_TriggerMetadata{
338 Kind: string(workflow.TriggerKindPush),
339 Push: &tangled.Pipeline_PushTriggerData{
340 Ref: event.Ref,
341 OldSha: event.OldSha,
342 NewSha: event.NewSha,
343 },
344 Repo: &tangled.Pipeline_TriggerRepo{
345 Did: repo.Did.String(),
346 Knot: repo.Knot,
347 Repo: repo.Name,
348 },
349 },
350 }
351
352 // load workflow definitions from rev (without spindle context)
353 rawPipeline, err := s.loadPipeline(ctx, repoCloneUri, repoPath, event.NewSha)
354 if err != nil {
355 return fmt.Errorf("loading pipeline: %w", err)
356 }
357 if len(rawPipeline) == 0 {
358 l.Info("no workflow definition find for the repo. skipping the event")
359 return nil
360 }
361 tpl := compiler.Compile(compiler.Parse(rawPipeline))
362 // TODO: pass compile error to workflow log
363 for _, w := range compiler.Diagnostics.Errors {
364 l.Error(w.String())
365 }
366 for _, w := range compiler.Diagnostics.Warnings {
367 l.Warn(w.String())
368 }
369
370 pipelineId := models.PipelineId{
371 Knot: tpl.TriggerMetadata.Repo.Knot,
372 Rkey: tid.TID(),
373 }
374 if err := s.db.CreatePipelineEvent(pipelineId.Rkey, tpl, s.n); err != nil {
375 l.Error("failed to create pipeline event", "err", err)
376 return nil
377 }
378 err = s.processPipeline(ctx, tpl, pipelineId)
379 if err != nil {
380 return err
381 }
382 }
383
384 return nil
385}
386
387func (s *Spindle) loadPipeline(ctx context.Context, repoUri, repoPath, rev string) (workflow.RawPipeline, error) {
388 if err := git.SparseSyncGitRepo(ctx, repoUri, repoPath, rev); err != nil {
389 return nil, fmt.Errorf("syncing git repo: %w", err)
390 }
391 gr, err := kgit.Open(repoPath, rev)
392 if err != nil {
393 return nil, fmt.Errorf("opening git repo: %w", err)
394 }
395
396 workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir)
397 if errors.Is(err, object.ErrDirectoryNotFound) {
398 // return empty RawPipeline when directory doesn't exist
399 return nil, nil
400 } else if err != nil {
401 return nil, fmt.Errorf("loading file tree: %w", err)
402 }
403
404 var rawPipeline workflow.RawPipeline
405 for _, e := range workflowDir {
406 if !e.IsFile() {
407 continue
408 }
409
410 fpath := filepath.Join(workflow.WorkflowDir, e.Name)
411 contents, err := gr.RawContent(fpath)
412 if err != nil {
413 return nil, fmt.Errorf("reading raw content of '%s': %w", fpath, err)
414 }
415
416 rawPipeline = append(rawPipeline, workflow.RawWorkflow{
417 Name: e.Name,
418 Contents: contents,
419 })
420 }
421
422 return rawPipeline, nil
423}
424
425func (s *Spindle) processPipeline(ctx context.Context, tpl tangled.Pipeline, pipelineId models.PipelineId) error {
426 // Build pipeline environment variables once for all workflows
427 pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId, s.cfg.Server.Dev)
428
429 // filter & init workflows
430 workflows := make(map[models.Engine][]models.Workflow)
431 for _, w := range tpl.Workflows {
432 if w == nil {
433 continue
434 }
435 if _, ok := s.engs[w.Engine]; !ok {
436 err := s.db.StatusFailed(models.WorkflowId{
437 PipelineId: pipelineId,
438 Name: w.Name,
439 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n)
440 if err != nil {
441 return fmt.Errorf("db.StatusFailed: %w", err)
442 }
443
444 continue
445 }
446
447 eng := s.engs[w.Engine]
448
449 if _, ok := workflows[eng]; !ok {
450 workflows[eng] = []models.Workflow{}
451 }
452
453 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl)
454 if err != nil {
455 return fmt.Errorf("init workflow: %w", err)
456 }
457
458 // inject TANGLED_* env vars after InitWorkflow
459 // This prevents user-defined env vars from overriding them
460 if ewf.Environment == nil {
461 ewf.Environment = make(map[string]string)
462 }
463 maps.Copy(ewf.Environment, pipelineEnv)
464
465 workflows[eng] = append(workflows[eng], *ewf)
466 }
467
468 // enqueue pipeline
469 ok := s.jq.Enqueue(queue.Job{
470 Run: func() error {
471 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{
472 RepoOwner: tpl.TriggerMetadata.Repo.Did,
473 RepoName: tpl.TriggerMetadata.Repo.Repo,
474 Workflows: workflows,
475 }, pipelineId)
476 return nil
477 },
478 OnFail: func(jobError error) {
479 s.l.Error("pipeline run failed", "error", jobError)
480 },
481 })
482 if !ok {
483 return fmt.Errorf("failed to enqueue pipeline: queue is full")
484 }
485 s.l.Info("pipeline enqueued successfully", "id", pipelineId)
486
487 // emit StatusPending for all workflows here (after successful enqueue)
488 for _, ewfs := range workflows {
489 for _, ewf := range ewfs {
490 err := s.db.StatusPending(models.WorkflowId{
491 PipelineId: pipelineId,
492 Name: ewf.Name,
493 }, s.n)
494 if err != nil {
495 return fmt.Errorf("db.StatusPending: %w", err)
496 }
497 }
498 }
499 return nil
500}
501
502// newRepoPath creates a path to store repository by its did and rkey.
503// The path format would be: `/data/repos/did:plc:foo/sh.tangled.repo/repo-rkey
504func (s *Spindle) newRepoPath(did syntax.DID, rkey syntax.RecordKey) string {
505 return filepath.Join(s.cfg.Server.RepoDir(), did.String(), tangled.RepoNSID, rkey.String())
506}
507
508func (s *Spindle) newRepoCloneUrl(knot, did, name string) string {
509 scheme := "https://"
510 if s.cfg.Server.Dev {
511 scheme = "http://"
512 }
513 return fmt.Sprintf("%s%s/%s/%s", scheme, knot, did, name)
514}
515
516const RequiredVersion = "2.49.0"
517
518func ensureGitVersion() error {
519 v, err := git.Version()
520 if err != nil {
521 return fmt.Errorf("fetching git version: %w", err)
522 }
523 if v.LessThan(version.Must(version.NewVersion(RequiredVersion))) {
524 return fmt.Errorf("installed git version %q is not supported, Spindle requires git version >= %q", v, RequiredVersion)
525 }
526 return nil
527}