this repo has no description
1package spindle
2
3import (
4 "context"
5 _ "embed"
6 "encoding/json"
7 "fmt"
8 "log/slog"
9 "maps"
10 "net/http"
11
12 "github.com/bluesky-social/indigo/atproto/syntax"
13 "github.com/go-chi/chi/v5"
14 "tangled.org/core/api/tangled"
15 "tangled.org/core/eventconsumer"
16 "tangled.org/core/eventconsumer/cursor"
17 "tangled.org/core/idresolver"
18 "tangled.org/core/log"
19 "tangled.org/core/notifier"
20 "tangled.org/core/rbac2"
21 "tangled.org/core/spindle/config"
22 "tangled.org/core/spindle/db"
23 "tangled.org/core/spindle/engine"
24 "tangled.org/core/spindle/engines/nixery"
25 "tangled.org/core/spindle/models"
26 "tangled.org/core/spindle/queue"
27 "tangled.org/core/spindle/secrets"
28 "tangled.org/core/spindle/xrpc"
29 "tangled.org/core/tap"
30 "tangled.org/core/xrpc/serviceauth"
31)
32
33//go:embed motd
34var motd []byte
35
36type Spindle struct {
37 tap *tap.Client
38 db *db.DB
39 e *rbac2.Enforcer
40 l *slog.Logger
41 n *notifier.Notifier
42 engs map[string]models.Engine
43 jq *queue.Queue
44 cfg *config.Config
45 ks *eventconsumer.Consumer
46 res *idresolver.Resolver
47 vault secrets.Manager
48}
49
50// New creates a new Spindle server with the provided configuration and engines.
51func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) {
52 logger := log.FromContext(ctx)
53
54 d, err := db.Make(ctx, cfg.Server.DBPath)
55 if err != nil {
56 return nil, fmt.Errorf("failed to setup db: %w", err)
57 }
58
59 e, err := rbac2.NewEnforcer(cfg.Server.DBPath)
60 if err != nil {
61 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err)
62 }
63
64 n := notifier.New()
65
66 var vault secrets.Manager
67 switch cfg.Server.Secrets.Provider {
68 case "openbao":
69 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" {
70 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider")
71 }
72 vault, err = secrets.NewOpenBaoManager(
73 cfg.Server.Secrets.OpenBao.ProxyAddr,
74 logger,
75 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount),
76 )
77 if err != nil {
78 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err)
79 }
80 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount)
81 case "sqlite", "":
82 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets"))
83 if err != nil {
84 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err)
85 }
86 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath)
87 default:
88 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider)
89 }
90
91 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount)
92 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount)
93
94 tap := tap.NewClient(cfg.Server.TapUrl, "")
95
96 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl)
97
98 spindle := &Spindle{
99 tap: &tap,
100 e: e,
101 db: d,
102 l: logger,
103 n: &n,
104 engs: engines,
105 jq: jq,
106 cfg: cfg,
107 res: resolver,
108 vault: vault,
109 }
110
111 err = e.SetSpindleOwner(spindle.cfg.Server.Owner, spindle.cfg.Server.Did())
112 if err != nil {
113 return nil, err
114 }
115 logger.Info("owner set", "did", cfg.Server.Owner)
116
117 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath)
118 if err != nil {
119 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
120 }
121
122 // for each incoming sh.tangled.pipeline, we execute
123 // spindle.processPipeline, which in turn enqueues the pipeline
124 // job in the above registered queue.
125 ccfg := eventconsumer.NewConsumerConfig()
126 ccfg.Logger = log.SubLogger(logger, "eventconsumer")
127 ccfg.Dev = cfg.Server.Dev
128 ccfg.ProcessFunc = spindle.processPipeline
129 ccfg.CursorStore = cursorStore
130 knownKnots, err := d.Knots()
131 if err != nil {
132 return nil, err
133 }
134 for _, knot := range knownKnots {
135 logger.Info("adding source start", "knot", knot)
136 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{}
137 }
138 spindle.ks = eventconsumer.NewConsumer(*ccfg)
139
140 return spindle, nil
141}
142
143// DB returns the database instance.
144func (s *Spindle) DB() *db.DB {
145 return s.db
146}
147
148// Queue returns the job queue instance.
149func (s *Spindle) Queue() *queue.Queue {
150 return s.jq
151}
152
153// Engines returns the map of available engines.
154func (s *Spindle) Engines() map[string]models.Engine {
155 return s.engs
156}
157
158// Vault returns the secrets manager instance.
159func (s *Spindle) Vault() secrets.Manager {
160 return s.vault
161}
162
163// Notifier returns the notifier instance.
164func (s *Spindle) Notifier() *notifier.Notifier {
165 return s.n
166}
167
168// Enforcer returns the RBAC enforcer instance.
169func (s *Spindle) Enforcer() *rbac2.Enforcer {
170 return s.e
171}
172
173// Start starts the Spindle server (blocking).
174func (s *Spindle) Start(ctx context.Context) error {
175 // starts a job queue runner in the background
176 s.jq.Start()
177 defer s.jq.Stop()
178
179 // Stop vault token renewal if it implements Stopper
180 if stopper, ok := s.vault.(secrets.Stopper); ok {
181 defer stopper.Stop()
182 }
183
184 go func() {
185 s.l.Info("starting knot event consumer")
186 s.ks.Start(ctx)
187 }()
188
189 // ensure server owner is tracked
190 if err := s.tap.AddRepos(ctx, []syntax.DID{s.cfg.Server.Owner}); err != nil {
191 return err
192 }
193
194 go func() {
195 s.l.Info("starting tap stream consumer")
196 s.tap.Connect(ctx, &tap.SimpleIndexer{
197 EventHandler: s.processEvent,
198 })
199 }()
200
201 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr)
202 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router())
203}
204
205func Run(ctx context.Context) error {
206 cfg, err := config.Load(ctx)
207 if err != nil {
208 return fmt.Errorf("failed to load config: %w", err)
209 }
210
211 nixeryEng, err := nixery.New(ctx, cfg)
212 if err != nil {
213 return err
214 }
215
216 s, err := New(ctx, cfg, map[string]models.Engine{
217 "nixery": nixeryEng,
218 })
219 if err != nil {
220 return err
221 }
222
223 return s.Start(ctx)
224}
225
226func (s *Spindle) Router() http.Handler {
227 mux := chi.NewRouter()
228
229 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
230 w.Write(motd)
231 })
232 mux.HandleFunc("/events", s.Events)
233 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs)
234
235 mux.Mount("/xrpc", s.XrpcRouter())
236 return mux
237}
238
239func (s *Spindle) XrpcRouter() http.Handler {
240 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String())
241
242 l := log.SubLogger(s.l, "xrpc")
243
244 x := xrpc.Xrpc{
245 Logger: l,
246 Db: s.db,
247 Enforcer: s.e,
248 Engines: s.engs,
249 Config: s.cfg,
250 Resolver: s.res,
251 Vault: s.vault,
252 Notifier: s.Notifier(),
253 ServiceAuth: serviceAuth,
254 }
255
256 return x.Router()
257}
258
259func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error {
260 if msg.Nsid == tangled.PipelineNSID {
261 tpl := tangled.Pipeline{}
262 err := json.Unmarshal(msg.EventJson, &tpl)
263 if err != nil {
264 fmt.Println("error unmarshalling", err)
265 return err
266 }
267
268 if tpl.TriggerMetadata == nil {
269 return fmt.Errorf("no trigger metadata found")
270 }
271
272 if tpl.TriggerMetadata.Repo == nil {
273 return fmt.Errorf("no repo data found")
274 }
275
276 if src.Key() != tpl.TriggerMetadata.Repo.Knot {
277 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot)
278 }
279
280 // filter by repos
281 _, err = s.db.GetRepoWithName(
282 syntax.DID(tpl.TriggerMetadata.Repo.Did),
283 tpl.TriggerMetadata.Repo.Repo,
284 )
285 if err != nil {
286 return fmt.Errorf("failed to get repo: %w", err)
287 }
288
289 pipelineId := models.PipelineId{
290 Knot: src.Key(),
291 Rkey: msg.Rkey,
292 }
293
294 workflows := make(map[models.Engine][]models.Workflow)
295
296 // Build pipeline environment variables once for all workflows
297 pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId, s.cfg.Server.Dev)
298
299 for _, w := range tpl.Workflows {
300 if w != nil {
301 if _, ok := s.engs[w.Engine]; !ok {
302 err = s.db.StatusFailed(models.WorkflowId{
303 PipelineId: pipelineId,
304 Name: w.Name,
305 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n)
306 if err != nil {
307 return fmt.Errorf("db.StatusFailed: %w", err)
308 }
309
310 continue
311 }
312
313 eng := s.engs[w.Engine]
314
315 if _, ok := workflows[eng]; !ok {
316 workflows[eng] = []models.Workflow{}
317 }
318
319 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl)
320 if err != nil {
321 return fmt.Errorf("init workflow: %w", err)
322 }
323
324 // inject TANGLED_* env vars after InitWorkflow
325 // This prevents user-defined env vars from overriding them
326 if ewf.Environment == nil {
327 ewf.Environment = make(map[string]string)
328 }
329 maps.Copy(ewf.Environment, pipelineEnv)
330
331 workflows[eng] = append(workflows[eng], *ewf)
332
333 err = s.db.StatusPending(models.WorkflowId{
334 PipelineId: pipelineId,
335 Name: w.Name,
336 }, s.n)
337 if err != nil {
338 return fmt.Errorf("db.StatusPending: %w", err)
339 }
340 }
341 }
342
343 ok := s.jq.Enqueue(queue.Job{
344 Run: func() error {
345 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{
346 RepoOwner: tpl.TriggerMetadata.Repo.Did,
347 RepoName: tpl.TriggerMetadata.Repo.Repo,
348 Workflows: workflows,
349 }, pipelineId)
350 return nil
351 },
352 OnFail: func(jobError error) {
353 s.l.Error("pipeline run failed", "error", jobError)
354 },
355 })
356 if ok {
357 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey)
358 } else {
359 s.l.Error("failed to enqueue pipeline: queue is full")
360 }
361 }
362
363 return nil
364}