Monorepo for Tangled
tangled.org
1package spindle
2
3import (
4 "context"
5 _ "embed"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "log/slog"
10 "maps"
11 "net/http"
12 "path/filepath"
13 "sync"
14
15 "github.com/bluesky-social/indigo/atproto/syntax"
16 "github.com/go-chi/chi/v5"
17 "github.com/go-git/go-git/v5/plumbing/object"
18 "github.com/hashicorp/go-version"
19 "tangled.org/core/api/tangled"
20 "tangled.org/core/eventconsumer"
21 "tangled.org/core/eventconsumer/cursor"
22 "tangled.org/core/idresolver"
23 kgit "tangled.org/core/knotserver/git"
24 "tangled.org/core/log"
25 "tangled.org/core/notifier"
26 "tangled.org/core/rbac2"
27 "tangled.org/core/spindle/config"
28 "tangled.org/core/spindle/db"
29 "tangled.org/core/spindle/engine"
30 "tangled.org/core/spindle/engines/nixery"
31 "tangled.org/core/spindle/git"
32 "tangled.org/core/spindle/models"
33 "tangled.org/core/spindle/queue"
34 "tangled.org/core/spindle/secrets"
35 "tangled.org/core/spindle/xrpc"
36 "tangled.org/core/tap"
37 "tangled.org/core/tid"
38 "tangled.org/core/workflow"
39 "tangled.org/core/xrpc/serviceauth"
40)
41
42//go:embed motd
43var defaultMotd []byte
44
45type Spindle struct {
46 tap *tap.Client
47 db *db.DB
48 e *rbac2.Enforcer
49 l *slog.Logger
50 n *notifier.Notifier
51 engs map[string]models.Engine
52 adapters map[string]models.Adapter
53 jq *queue.Queue
54 cfg *config.Config
55 ks *eventconsumer.Consumer
56 res *idresolver.Resolver
57 vault secrets.Manager
58 motd []byte
59 motdMu sync.RWMutex
60}
61
62// New creates a new Spindle server with the provided configuration and engines.
63func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) {
64 logger := log.FromContext(ctx)
65
66 if err := ensureGitVersion(); err != nil {
67 return nil, fmt.Errorf("ensuring git version: %w", err)
68 }
69
70 d, err := db.Make(ctx, cfg.Server.DBPath())
71 if err != nil {
72 return nil, fmt.Errorf("failed to setup db: %w", err)
73 }
74
75 e, err := rbac2.NewEnforcer(cfg.Server.DBPath())
76 if err != nil {
77 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err)
78 }
79
80 n := notifier.New()
81
82 var vault secrets.Manager
83 switch cfg.Server.Secrets.Provider {
84 case "openbao":
85 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" {
86 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider")
87 }
88 vault, err = secrets.NewOpenBaoManager(
89 cfg.Server.Secrets.OpenBao.ProxyAddr,
90 logger,
91 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount),
92 )
93 if err != nil {
94 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err)
95 }
96 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount)
97 case "sqlite", "":
98 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath(), secrets.WithTableName("secrets"))
99 if err != nil {
100 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err)
101 }
102 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath())
103 default:
104 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider)
105 }
106
107 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount)
108 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount)
109
110 tap := tap.NewClient(cfg.Server.TapUrl, "")
111
112 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl)
113
114 spindle := &Spindle{
115 tap: &tap,
116 e: e,
117 db: d,
118 l: logger,
119 n: &n,
120 engs: engines,
121 jq: jq,
122 cfg: cfg,
123 res: resolver,
124 vault: vault,
125 motd: defaultMotd,
126 }
127
128 err = e.SetSpindleOwner(spindle.cfg.Server.Owner, spindle.cfg.Server.Did())
129 if err != nil {
130 return nil, err
131 }
132 logger.Info("owner set", "did", cfg.Server.Owner)
133
134 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath())
135 if err != nil {
136 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
137 }
138
139 // spindle listen to knot stream for sh.tangled.git.refUpdate
140 // which will sync the local workflow files in spindle and enqueues the
141 // pipeline job for on-push workflows
142 ccfg := eventconsumer.NewConsumerConfig()
143 ccfg.Logger = log.SubLogger(logger, "eventconsumer")
144 ccfg.Dev = cfg.Server.Dev
145 ccfg.ProcessFunc = spindle.processKnotStream
146 ccfg.CursorStore = cursorStore
147 knownKnots, err := d.Knots()
148 if err != nil {
149 return nil, err
150 }
151 for _, knot := range knownKnots {
152 logger.Info("adding source start", "knot", knot)
153 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{}
154 }
155 spindle.ks = eventconsumer.NewConsumer(*ccfg)
156
157 return spindle, nil
158}
159
160// DB returns the database instance.
161func (s *Spindle) DB() *db.DB {
162 return s.db
163}
164
165// Queue returns the job queue instance.
166func (s *Spindle) Queue() *queue.Queue {
167 return s.jq
168}
169
170// Engines returns the map of available engines.
171func (s *Spindle) Engines() map[string]models.Engine {
172 return s.engs
173}
174
175// Vault returns the secrets manager instance.
176func (s *Spindle) Vault() secrets.Manager {
177 return s.vault
178}
179
180// Notifier returns the notifier instance.
181func (s *Spindle) Notifier() *notifier.Notifier {
182 return s.n
183}
184
185// Enforcer returns the RBAC enforcer instance.
186func (s *Spindle) Enforcer() *rbac2.Enforcer {
187 return s.e
188}
189
190// SetMotdContent sets custom MOTD content, replacing the embedded default.
191func (s *Spindle) SetMotdContent(content []byte) {
192 s.motdMu.Lock()
193 defer s.motdMu.Unlock()
194 s.motd = content
195}
196
197// GetMotdContent returns the current MOTD content.
198func (s *Spindle) GetMotdContent() []byte {
199 s.motdMu.RLock()
200 defer s.motdMu.RUnlock()
201 return s.motd
202}
203
204// Start starts the Spindle server (blocking).
205func (s *Spindle) Start(ctx context.Context) error {
206 // starts a job queue runner in the background
207 s.jq.Start()
208 defer s.jq.Stop()
209
210 // Stop vault token renewal if it implements Stopper
211 if stopper, ok := s.vault.(secrets.Stopper); ok {
212 defer stopper.Stop()
213 }
214
215 go func() {
216 s.l.Info("starting knot event consumer")
217 s.ks.Start(ctx)
218 }()
219
220 // ensure server owner is tracked
221 if err := s.tap.AddRepos(ctx, []syntax.DID{s.cfg.Server.Owner}); err != nil {
222 return err
223 }
224
225 go func() {
226 s.l.Info("starting tap stream consumer")
227 s.tap.Connect(ctx, &tap.SimpleIndexer{
228 EventHandler: s.processEvent,
229 })
230 }()
231
232 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr)
233 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router())
234}
235
236func Run(ctx context.Context) error {
237 cfg, err := config.Load(ctx)
238 if err != nil {
239 return fmt.Errorf("failed to load config: %w", err)
240 }
241
242 nixeryEng, err := nixery.New(ctx, cfg)
243 if err != nil {
244 return err
245 }
246
247 s, err := New(ctx, cfg, map[string]models.Engine{
248 "nixery": nixeryEng,
249 })
250 if err != nil {
251 return err
252 }
253
254 return s.Start(ctx)
255}
256
257func (s *Spindle) Router() http.Handler {
258 mux := chi.NewRouter()
259
260 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
261 w.Write(s.GetMotdContent())
262 })
263 mux.HandleFunc("/events", s.Events)
264 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs)
265
266 mux.Mount("/xrpc", s.XrpcRouter())
267 return mux
268}
269
270func (s *Spindle) XrpcRouter() http.Handler {
271 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String())
272
273 l := log.SubLogger(s.l, "xrpc")
274
275 x := xrpc.Xrpc{
276 Logger: l,
277 Db: s.db,
278 Enforcer: s.e,
279 Engines: s.engs,
280 Config: s.cfg,
281 Resolver: s.res,
282 Vault: s.vault,
283 Notifier: s.Notifier(),
284 ServiceAuth: serviceAuth,
285 }
286
287 return x.Router()
288}
289
290func (s *Spindle) processKnotStream(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error {
291 l := log.FromContext(ctx).With("handler", "processKnotStream")
292 l = l.With("src", src.Key(), "msg.Nsid", msg.Nsid, "msg.Rkey", msg.Rkey)
293 if msg.Nsid == tangled.GitRefUpdateNSID {
294 event := tangled.GitRefUpdate{}
295 if err := json.Unmarshal(msg.EventJson, &event); err != nil {
296 l.Error("error unmarshalling", "err", err)
297 return err
298 }
299 l = l.With("repoDid", event.RepoDid, "repoName", event.RepoName)
300
301 // resolve repo name to rkey
302 // TODO: git.refUpdate should respond with rkey instead of repo name
303 repo, err := s.db.GetRepoWithName(syntax.DID(event.RepoDid), event.RepoName)
304 if err != nil {
305 return fmt.Errorf("get repo with did and name (%s/%s): %w", event.RepoDid, event.RepoName, err)
306 }
307
308 // NOTE: we are blindly trusting the knot that it will return only repos it own
309 repoCloneUri := s.newRepoCloneUrl(src.Key(), event.RepoDid, event.RepoName)
310 repoPath := s.newRepoPath(repo.Did, repo.Rkey)
311 if err := git.SparseSyncGitRepo(ctx, repoCloneUri, repoPath, event.NewSha); err != nil {
312 return fmt.Errorf("sync git repo: %w", err)
313 }
314 l.Info("synced git repo")
315
316 compiler := workflow.Compiler{
317 Trigger: tangled.Pipeline_TriggerMetadata{
318 Kind: string(workflow.TriggerKindPush),
319 Push: &tangled.Pipeline_PushTriggerData{
320 Ref: event.Ref,
321 OldSha: event.OldSha,
322 NewSha: event.NewSha,
323 },
324 Repo: &tangled.Pipeline_TriggerRepo{
325 Did: repo.Did.String(),
326 Knot: repo.Knot,
327 Repo: repo.Name,
328 },
329 },
330 }
331
332 // load workflow definitions from rev (without spindle context)
333 rawPipeline, err := s.loadPipeline(ctx, repoCloneUri, repoPath, event.NewSha)
334 if err != nil {
335 return fmt.Errorf("loading pipeline: %w", err)
336 }
337 if len(rawPipeline) == 0 {
338 l.Info("no workflow definition find for the repo. skipping the event")
339 return nil
340 }
341 tpl := compiler.Compile(compiler.Parse(rawPipeline))
342 // TODO: pass compile error to workflow log
343 for _, w := range compiler.Diagnostics.Errors {
344 l.Error(w.String())
345 }
346 for _, w := range compiler.Diagnostics.Warnings {
347 l.Warn(w.String())
348 }
349
350 pipelineId := models.PipelineId{
351 Knot: tpl.TriggerMetadata.Repo.Knot,
352 Rkey: tid.TID(),
353 }
354 if err := s.db.CreatePipelineEvent(pipelineId.Rkey, tpl, s.n); err != nil {
355 l.Error("failed to create pipeline event", "err", err)
356 return nil
357 }
358 err = s.processPipeline(ctx, tpl, pipelineId)
359 if err != nil {
360 return err
361 }
362 }
363
364 return nil
365}
366
367func (s *Spindle) loadPipeline(ctx context.Context, repoUri, repoPath, rev string) (workflow.RawPipeline, error) {
368 if err := git.SparseSyncGitRepo(ctx, repoUri, repoPath, rev); err != nil {
369 return nil, fmt.Errorf("syncing git repo: %w", err)
370 }
371 gr, err := kgit.Open(repoPath, rev)
372 if err != nil {
373 return nil, fmt.Errorf("opening git repo: %w", err)
374 }
375
376 workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir)
377 if errors.Is(err, object.ErrDirectoryNotFound) {
378 // return empty RawPipeline when directory doesn't exist
379 return nil, nil
380 } else if err != nil {
381 return nil, fmt.Errorf("loading file tree: %w", err)
382 }
383
384 var rawPipeline workflow.RawPipeline
385 for _, e := range workflowDir {
386 if !e.IsFile() {
387 continue
388 }
389
390 fpath := filepath.Join(workflow.WorkflowDir, e.Name)
391 contents, err := gr.RawContent(fpath)
392 if err != nil {
393 return nil, fmt.Errorf("reading raw content of '%s': %w", fpath, err)
394 }
395
396 rawPipeline = append(rawPipeline, workflow.RawWorkflow{
397 Name: e.Name,
398 Contents: contents,
399 })
400 }
401
402 return rawPipeline, nil
403}
404
405func (s *Spindle) processPipeline(ctx context.Context, tpl tangled.Pipeline, pipelineId models.PipelineId) error {
406 // Build pipeline environment variables once for all workflows
407 pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId, s.cfg.Server.Dev)
408
409 // filter & init workflows
410 workflows := make(map[models.Engine][]models.Workflow)
411 for _, w := range tpl.Workflows {
412 if w == nil {
413 continue
414 }
415 if _, ok := s.engs[w.Engine]; !ok {
416 err := s.db.StatusFailed(models.WorkflowId{
417 PipelineId: pipelineId,
418 Name: w.Name,
419 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n)
420 if err != nil {
421 return fmt.Errorf("db.StatusFailed: %w", err)
422 }
423
424 continue
425 }
426
427 eng := s.engs[w.Engine]
428
429 if _, ok := workflows[eng]; !ok {
430 workflows[eng] = []models.Workflow{}
431 }
432
433 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl)
434 if err != nil {
435 return fmt.Errorf("init workflow: %w", err)
436 }
437
438 // inject TANGLED_* env vars after InitWorkflow
439 // This prevents user-defined env vars from overriding them
440 if ewf.Environment == nil {
441 ewf.Environment = make(map[string]string)
442 }
443 maps.Copy(ewf.Environment, pipelineEnv)
444
445 workflows[eng] = append(workflows[eng], *ewf)
446 }
447
448 // enqueue pipeline
449 ok := s.jq.Enqueue(queue.Job{
450 Run: func() error {
451 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{
452 RepoOwner: tpl.TriggerMetadata.Repo.Did,
453 RepoName: tpl.TriggerMetadata.Repo.Repo,
454 Workflows: workflows,
455 }, pipelineId)
456 return nil
457 },
458 OnFail: func(jobError error) {
459 s.l.Error("pipeline run failed", "error", jobError)
460 },
461 })
462 if !ok {
463 return fmt.Errorf("failed to enqueue pipeline: queue is full")
464 }
465 s.l.Info("pipeline enqueued successfully", "id", pipelineId)
466
467 // emit StatusPending for all workflows here (after successful enqueue)
468 for _, ewfs := range workflows {
469 for _, ewf := range ewfs {
470 err := s.db.StatusPending(models.WorkflowId{
471 PipelineId: pipelineId,
472 Name: ewf.Name,
473 }, s.n)
474 if err != nil {
475 return fmt.Errorf("db.StatusPending: %w", err)
476 }
477 }
478 }
479 return nil
480}
481
482// newRepoPath creates a path to store repository by its did and rkey.
483// The path format would be: `/data/repos/did:plc:foo/sh.tangled.repo/repo-rkey
484func (s *Spindle) newRepoPath(did syntax.DID, rkey syntax.RecordKey) string {
485 return filepath.Join(s.cfg.Server.RepoDir(), did.String(), tangled.RepoNSID, rkey.String())
486}
487
488func (s *Spindle) newRepoCloneUrl(knot, did, name string) string {
489 scheme := "https://"
490 if s.cfg.Server.Dev {
491 scheme = "http://"
492 }
493 return fmt.Sprintf("%s%s/%s/%s", scheme, knot, did, name)
494}
495
496const RequiredVersion = "2.49.0"
497
498func ensureGitVersion() error {
499 v, err := git.Version()
500 if err != nil {
501 return fmt.Errorf("fetching git version: %w", err)
502 }
503 if v.LessThan(version.Must(version.NewVersion(RequiredVersion))) {
504 return fmt.Errorf("installed git version %q is not supported, Spindle requires git version >= %q", v, RequiredVersion)
505 }
506 return nil
507}