forked from
tangled.org/core
Monorepo for Tangled — https://tangled.org
1package spindle
2
3import (
4 "context"
5 _ "embed"
6 "encoding/json"
7 "fmt"
8 "log/slog"
9 "maps"
10 "net/http"
11 "os"
12
13 "github.com/go-chi/chi/v5"
14 "tangled.org/core/api/tangled"
15 "tangled.org/core/eventconsumer"
16 "tangled.org/core/eventconsumer/cursor"
17 "tangled.org/core/idresolver"
18 "tangled.org/core/jetstream"
19 "tangled.org/core/log"
20 "tangled.org/core/notifier"
21 "tangled.org/core/rbac"
22 "tangled.org/core/spindle/config"
23 "tangled.org/core/spindle/db"
24 "tangled.org/core/spindle/engine"
25 "tangled.org/core/spindle/engines/nixery"
26 "tangled.org/core/spindle/models"
27 "tangled.org/core/spindle/queue"
28 "tangled.org/core/spindle/secrets"
29 "tangled.org/core/spindle/xrpc"
30 "tangled.org/core/xrpc/serviceauth"
31)
32
33//go:embed motd
34var motd []byte
35
36const (
37 rbacDomain = "thisserver"
38)
39
40type Spindle struct {
41 jc *jetstream.JetstreamClient
42 db *db.DB
43 e *rbac.Enforcer
44 l *slog.Logger
45 n *notifier.Notifier
46 engs map[string]models.Engine
47 jq *queue.Queue
48 cfg *config.Config
49 ks *eventconsumer.Consumer
50 res *idresolver.Resolver
51 vault secrets.Manager
52}
53
54// New creates a new Spindle server with the provided configuration and engines.
55func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) {
56 logger := log.FromContext(ctx)
57
58 d, err := db.Make(cfg.Server.DBPath)
59 if err != nil {
60 return nil, fmt.Errorf("failed to setup db: %w", err)
61 }
62
63 e, err := rbac.NewEnforcer(cfg.Server.DBPath)
64 if err != nil {
65 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err)
66 }
67 e.E.EnableAutoSave(true)
68
69 n := notifier.New()
70
71 var vault secrets.Manager
72 switch cfg.Server.Secrets.Provider {
73 case "openbao":
74 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" {
75 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider")
76 }
77 vault, err = secrets.NewOpenBaoManager(
78 cfg.Server.Secrets.OpenBao.ProxyAddr,
79 logger,
80 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount),
81 )
82 if err != nil {
83 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err)
84 }
85 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount)
86 case "sqlite", "":
87 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets"))
88 if err != nil {
89 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err)
90 }
91 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath)
92 default:
93 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider)
94 }
95
96 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount)
97 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount)
98
99 collections := []string{
100 tangled.SpindleMemberNSID,
101 tangled.RepoNSID,
102 tangled.RepoCollaboratorNSID,
103 }
104 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, log.SubLogger(logger, "jetstream"), d, true, true)
105 if err != nil {
106 return nil, fmt.Errorf("failed to setup jetstream client: %w", err)
107 }
108 jc.AddDid(cfg.Server.Owner)
109
110 // Check if the spindle knows about any Dids;
111 dids, err := d.GetAllDids()
112 if err != nil {
113 return nil, fmt.Errorf("failed to get all dids: %w", err)
114 }
115 for _, d := range dids {
116 jc.AddDid(d)
117 }
118
119 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl)
120
121 spindle := &Spindle{
122 jc: jc,
123 e: e,
124 db: d,
125 l: logger,
126 n: &n,
127 engs: engines,
128 jq: jq,
129 cfg: cfg,
130 res: resolver,
131 vault: vault,
132 }
133
134 err = e.AddSpindle(rbacDomain)
135 if err != nil {
136 return nil, fmt.Errorf("failed to set rbac domain: %w", err)
137 }
138 err = spindle.configureOwner()
139 if err != nil {
140 return nil, err
141 }
142 logger.Info("owner set", "did", cfg.Server.Owner)
143
144 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath)
145 if err != nil {
146 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
147 }
148
149 err = jc.StartJetstream(ctx, spindle.ingest())
150 if err != nil {
151 return nil, fmt.Errorf("failed to start jetstream consumer: %w", err)
152 }
153
154 // for each incoming sh.tangled.pipeline, we execute
155 // spindle.processPipeline, which in turn enqueues the pipeline
156 // job in the above registered queue.
157 ccfg := eventconsumer.NewConsumerConfig()
158 ccfg.Logger = log.SubLogger(logger, "eventconsumer")
159 ccfg.Dev = cfg.Server.Dev
160 ccfg.ProcessFunc = spindle.processPipeline
161 ccfg.CursorStore = cursorStore
162 knownKnots, err := d.Knots()
163 if err != nil {
164 return nil, err
165 }
166 for _, knot := range knownKnots {
167 logger.Info("adding source start", "knot", knot)
168 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{}
169 }
170 spindle.ks = eventconsumer.NewConsumer(*ccfg)
171
172 return spindle, nil
173}
174
175// DB returns the database instance.
176func (s *Spindle) DB() *db.DB {
177 return s.db
178}
179
180// Queue returns the job queue instance.
181func (s *Spindle) Queue() *queue.Queue {
182 return s.jq
183}
184
185// Engines returns the map of available engines.
186func (s *Spindle) Engines() map[string]models.Engine {
187 return s.engs
188}
189
190// Vault returns the secrets manager instance.
191func (s *Spindle) Vault() secrets.Manager {
192 return s.vault
193}
194
195// Notifier returns the notifier instance.
196func (s *Spindle) Notifier() *notifier.Notifier {
197 return s.n
198}
199
200// Enforcer returns the RBAC enforcer instance.
201func (s *Spindle) Enforcer() *rbac.Enforcer {
202 return s.e
203}
204
205// Start starts the Spindle server (blocking).
206func (s *Spindle) Start(ctx context.Context) error {
207 // starts a job queue runner in the background
208 s.jq.Start()
209 defer s.jq.Stop()
210
211 // Stop vault token renewal if it implements Stopper
212 if stopper, ok := s.vault.(secrets.Stopper); ok {
213 defer stopper.Stop()
214 }
215
216 go func() {
217 s.l.Info("starting knot event consumer")
218 s.ks.Start(ctx)
219 }()
220
221 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr)
222 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router())
223}
224
225func Run(ctx context.Context) error {
226 cfg, err := config.Load(ctx)
227 if err != nil {
228 return fmt.Errorf("failed to load config: %w", err)
229 }
230
231 nixeryEng, err := nixery.New(ctx, cfg)
232 if err != nil {
233 return err
234 }
235
236 s, err := New(ctx, cfg, map[string]models.Engine{
237 "nixery": nixeryEng,
238 })
239 if err != nil {
240 return err
241 }
242
243 return s.Start(ctx)
244}
245
246func (s *Spindle) Router() http.Handler {
247 mux := chi.NewRouter()
248
249 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
250 if s.cfg.Server.MOTDFile != "" {
251 if content, err := os.ReadFile(s.cfg.Server.MOTDFile); err == nil {
252 w.Write(content)
253 return
254 }
255 s.l.Warn("failed to read custom MOTD file, using default", "path", s.cfg.Server.MOTDFile)
256 }
257 w.Write(motd)
258 })
259 mux.HandleFunc("/events", s.Events)
260 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs)
261
262 mux.Mount("/xrpc", s.XrpcRouter())
263 return mux
264}
265
266func (s *Spindle) XrpcRouter() http.Handler {
267 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String())
268
269 l := log.SubLogger(s.l, "xrpc")
270
271 x := xrpc.Xrpc{
272 Logger: l,
273 Db: s.db,
274 Enforcer: s.e,
275 Engines: s.engs,
276 Config: s.cfg,
277 Resolver: s.res,
278 Vault: s.vault,
279 ServiceAuth: serviceAuth,
280 }
281
282 return x.Router()
283}
284
285func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error {
286 if msg.Nsid == tangled.PipelineNSID {
287 tpl := tangled.Pipeline{}
288 err := json.Unmarshal(msg.EventJson, &tpl)
289 if err != nil {
290 fmt.Println("error unmarshalling", err)
291 return err
292 }
293
294 if tpl.TriggerMetadata == nil {
295 return fmt.Errorf("no trigger metadata found")
296 }
297
298 if tpl.TriggerMetadata.Repo == nil {
299 return fmt.Errorf("no repo data found")
300 }
301
302 if src.Key() != tpl.TriggerMetadata.Repo.Knot {
303 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot)
304 }
305
306 // filter by repos
307 _, err = s.db.GetRepo(
308 tpl.TriggerMetadata.Repo.Knot,
309 tpl.TriggerMetadata.Repo.Did,
310 tpl.TriggerMetadata.Repo.Repo,
311 )
312 if err != nil {
313 return err
314 }
315
316 pipelineId := models.PipelineId{
317 Knot: src.Key(),
318 Rkey: msg.Rkey,
319 }
320
321 workflows := make(map[models.Engine][]models.Workflow)
322
323 // Build pipeline environment variables once for all workflows
324 pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId, s.cfg.Server.Dev, s.res)
325
326 for _, w := range tpl.Workflows {
327 if w != nil {
328 if _, ok := s.engs[w.Engine]; !ok {
329 err = s.db.StatusFailed(models.WorkflowId{
330 PipelineId: pipelineId,
331 Name: w.Name,
332 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n)
333 if err != nil {
334 return err
335 }
336
337 continue
338 }
339
340 eng := s.engs[w.Engine]
341
342 if _, ok := workflows[eng]; !ok {
343 workflows[eng] = []models.Workflow{}
344 }
345
346 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl)
347 if err != nil {
348 return err
349 }
350
351 // inject TANGLED_* env vars after InitWorkflow
352 // This prevents user-defined env vars from overriding them
353 if ewf.Environment == nil {
354 ewf.Environment = make(map[string]string)
355 }
356 maps.Copy(ewf.Environment, pipelineEnv)
357
358 workflows[eng] = append(workflows[eng], *ewf)
359
360 err = s.db.StatusPending(models.WorkflowId{
361 PipelineId: pipelineId,
362 Name: w.Name,
363 }, s.n)
364 if err != nil {
365 return err
366 }
367 }
368 }
369
370 ok := s.jq.Enqueue(queue.Job{
371 Run: func() error {
372 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{
373 RepoOwner: tpl.TriggerMetadata.Repo.Did,
374 RepoName: tpl.TriggerMetadata.Repo.Repo,
375 Workflows: workflows,
376 }, pipelineId)
377 return nil
378 },
379 OnFail: func(jobError error) {
380 s.l.Error("pipeline run failed", "error", jobError)
381 },
382 })
383 if ok {
384 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey)
385 } else {
386 s.l.Error("failed to enqueue pipeline: queue is full")
387 }
388 }
389
390 return nil
391}
392
393func (s *Spindle) configureOwner() error {
394 cfgOwner := s.cfg.Server.Owner
395
396 existing, err := s.e.GetSpindleUsersByRole("server:owner", rbacDomain)
397 if err != nil {
398 return err
399 }
400
401 switch len(existing) {
402 case 0:
403 // no owner configured, continue
404 case 1:
405 // find existing owner
406 existingOwner := existing[0]
407
408 // no ownership change, this is okay
409 if existingOwner == s.cfg.Server.Owner {
410 break
411 }
412
413 // remove existing owner
414 err = s.e.RemoveSpindleOwner(rbacDomain, existingOwner)
415 if err != nil {
416 return nil
417 }
418 default:
419 return fmt.Errorf("more than one owner in DB, try deleting %q and starting over", s.cfg.Server.DBPath)
420 }
421
422 return s.e.AddSpindleOwner(rbacDomain, cfgOwner)
423}