forked from
tangled.org/core
Monorepo for Tangled — https://tangled.org
1package spindle
2
3import (
4 "context"
5 _ "embed"
6 "encoding/json"
7 "fmt"
8 "log/slog"
9 "net/http"
10
11 "github.com/go-chi/chi/v5"
12 "tangled.org/core/api/tangled"
13 "tangled.org/core/eventconsumer"
14 "tangled.org/core/eventconsumer/cursor"
15 "tangled.org/core/idresolver"
16 "tangled.org/core/jetstream"
17 "tangled.org/core/log"
18 "tangled.org/core/notifier"
19 "tangled.org/core/rbac"
20 "tangled.org/core/spindle/config"
21 "tangled.org/core/spindle/db"
22 "tangled.org/core/spindle/engine"
23 "tangled.org/core/spindle/engines/buildah"
24 "tangled.org/core/spindle/engines/nixery"
25 "tangled.org/core/spindle/models"
26 "tangled.org/core/spindle/queue"
27 "tangled.org/core/spindle/secrets"
28 "tangled.org/core/spindle/xrpc"
29 "tangled.org/core/xrpc/serviceauth"
30)
31
32//go:embed motd
33var motd []byte
34
35const (
36 rbacDomain = "thisserver"
37)
38
39type Spindle struct {
40 jc *jetstream.JetstreamClient
41 db *db.DB
42 e *rbac.Enforcer
43 l *slog.Logger
44 n *notifier.Notifier
45 engs map[string]models.Engine
46 jq *queue.Queue
47 cfg *config.Config
48 ks *eventconsumer.Consumer
49 res *idresolver.Resolver
50 vault secrets.Manager
51}
52
53func Run(ctx context.Context) error {
54 logger := log.FromContext(ctx)
55
56 cfg, err := config.Load(ctx)
57 if err != nil {
58 return fmt.Errorf("failed to load config: %w", err)
59 }
60
61 d, err := db.Make(cfg.Server.DBPath)
62 if err != nil {
63 return fmt.Errorf("failed to setup db: %w", err)
64 }
65
66 e, err := rbac.NewEnforcer(cfg.Server.DBPath)
67 if err != nil {
68 return fmt.Errorf("failed to setup rbac enforcer: %w", err)
69 }
70 e.E.EnableAutoSave(true)
71
72 n := notifier.New()
73
74 var vault secrets.Manager
75 switch cfg.Server.Secrets.Provider {
76 case "openbao":
77 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" {
78 return fmt.Errorf("openbao proxy address is required when using openbao secrets provider")
79 }
80 vault, err = secrets.NewOpenBaoManager(
81 cfg.Server.Secrets.OpenBao.ProxyAddr,
82 logger,
83 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount),
84 )
85 if err != nil {
86 return fmt.Errorf("failed to setup openbao secrets provider: %w", err)
87 }
88 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount)
89 case "sqlite", "":
90 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets"))
91 if err != nil {
92 return fmt.Errorf("failed to setup sqlite secrets provider: %w", err)
93 }
94 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath)
95 default:
96 return fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider)
97 }
98
99 nixeryEng, err := nixery.New(ctx, cfg)
100 if err != nil {
101 return err
102 }
103
104 buildahEng, err := buildah.New(ctx, cfg)
105 if err != nil {
106 return err
107 }
108
109 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount)
110 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount)
111
112 collections := []string{
113 tangled.SpindleMemberNSID,
114 tangled.RepoNSID,
115 tangled.RepoCollaboratorNSID,
116 }
117 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, log.SubLogger(logger, "jetstream"), d, true, true)
118 if err != nil {
119 return fmt.Errorf("failed to setup jetstream client: %w", err)
120 }
121 jc.AddDid(cfg.Server.Owner)
122
123 // Check if the spindle knows about any Dids;
124 dids, err := d.GetAllDids()
125 if err != nil {
126 return fmt.Errorf("failed to get all dids: %w", err)
127 }
128 for _, d := range dids {
129 jc.AddDid(d)
130 }
131
132 resolver := idresolver.DefaultResolver()
133
134 spindle := Spindle{
135 jc: jc,
136 e: e,
137 db: d,
138 l: logger,
139 n: &n,
140 engs: map[string]models.Engine{"nixery": nixeryEng, "buildah": buildahEng},
141 jq: jq,
142 cfg: cfg,
143 res: resolver,
144 vault: vault,
145 }
146
147 err = e.AddSpindle(rbacDomain)
148 if err != nil {
149 return fmt.Errorf("failed to set rbac domain: %w", err)
150 }
151 err = spindle.configureOwner()
152 if err != nil {
153 return err
154 }
155 logger.Info("owner set", "did", cfg.Server.Owner)
156
157 // starts a job queue runner in the background
158 jq.Start()
159 defer jq.Stop()
160
161 // Stop vault token renewal if it implements Stopper
162 if stopper, ok := vault.(secrets.Stopper); ok {
163 defer stopper.Stop()
164 }
165
166 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath)
167 if err != nil {
168 return fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
169 }
170
171 err = jc.StartJetstream(ctx, spindle.ingest())
172 if err != nil {
173 return fmt.Errorf("failed to start jetstream consumer: %w", err)
174 }
175
176 // for each incoming sh.tangled.pipeline, we execute
177 // spindle.processPipeline, which in turn enqueues the pipeline
178 // job in the above registered queue.
179 ccfg := eventconsumer.NewConsumerConfig()
180 ccfg.Logger = log.SubLogger(logger, "eventconsumer")
181 ccfg.Dev = cfg.Server.Dev
182 ccfg.ProcessFunc = spindle.processPipeline
183 ccfg.CursorStore = cursorStore
184 knownKnots, err := d.Knots()
185 if err != nil {
186 return err
187 }
188 for _, knot := range knownKnots {
189 logger.Info("adding source start", "knot", knot)
190 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{}
191 }
192 spindle.ks = eventconsumer.NewConsumer(*ccfg)
193
194 go func() {
195 logger.Info("starting knot event consumer")
196 spindle.ks.Start(ctx)
197 }()
198
199 logger.Info("starting spindle server", "address", cfg.Server.ListenAddr)
200 logger.Error("server error", "error", http.ListenAndServe(cfg.Server.ListenAddr, spindle.Router()))
201
202 return nil
203}
204
205func (s *Spindle) Router() http.Handler {
206 mux := chi.NewRouter()
207
208 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
209 w.Write(motd)
210 })
211 mux.HandleFunc("/events", s.Events)
212 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs)
213
214 mux.Mount("/xrpc", s.XrpcRouter())
215 return mux
216}
217
218func (s *Spindle) XrpcRouter() http.Handler {
219 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String())
220
221 l := log.SubLogger(s.l, "xrpc")
222
223 x := xrpc.Xrpc{
224 Logger: l,
225 Db: s.db,
226 Enforcer: s.e,
227 Engines: s.engs,
228 Config: s.cfg,
229 Resolver: s.res,
230 Vault: s.vault,
231 ServiceAuth: serviceAuth,
232 }
233
234 return x.Router()
235}
236
237func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error {
238 if msg.Nsid == tangled.PipelineNSID {
239 tpl := tangled.Pipeline{}
240 err := json.Unmarshal(msg.EventJson, &tpl)
241 if err != nil {
242 fmt.Println("error unmarshalling", err)
243 return err
244 }
245
246 if tpl.TriggerMetadata == nil {
247 return fmt.Errorf("no trigger metadata found")
248 }
249
250 if tpl.TriggerMetadata.Repo == nil {
251 return fmt.Errorf("no repo data found")
252 }
253
254 if src.Key() != tpl.TriggerMetadata.Repo.Knot {
255 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot)
256 }
257
258 // filter by repos
259 _, err = s.db.GetRepo(
260 tpl.TriggerMetadata.Repo.Knot,
261 tpl.TriggerMetadata.Repo.Did,
262 tpl.TriggerMetadata.Repo.Repo,
263 )
264 if err != nil {
265 return err
266 }
267
268 pipelineId := models.PipelineId{
269 Knot: src.Key(),
270 Rkey: msg.Rkey,
271 }
272
273 workflows := make(map[models.Engine][]models.Workflow)
274
275 for _, w := range tpl.Workflows {
276 if w != nil {
277 if _, ok := s.engs[w.Engine]; !ok {
278 err = s.db.StatusFailed(models.WorkflowId{
279 PipelineId: pipelineId,
280 Name: w.Name,
281 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n)
282 if err != nil {
283 return err
284 }
285
286 continue
287 }
288
289 eng := s.engs[w.Engine]
290
291 if _, ok := workflows[eng]; !ok {
292 workflows[eng] = []models.Workflow{}
293 }
294
295 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl)
296 if err != nil {
297 return err
298 }
299
300 workflows[eng] = append(workflows[eng], *ewf)
301
302 err = s.db.StatusPending(models.WorkflowId{
303 PipelineId: pipelineId,
304 Name: w.Name,
305 }, s.n)
306 if err != nil {
307 return err
308 }
309 }
310 }
311
312 ok := s.jq.Enqueue(queue.Job{
313 Run: func() error {
314 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{
315 RepoOwner: tpl.TriggerMetadata.Repo.Did,
316 RepoName: tpl.TriggerMetadata.Repo.Repo,
317 Workflows: workflows,
318 }, pipelineId)
319 return nil
320 },
321 OnFail: func(jobError error) {
322 s.l.Error("pipeline run failed", "error", jobError)
323 },
324 })
325 if ok {
326 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey)
327 } else {
328 s.l.Error("failed to enqueue pipeline: queue is full")
329 }
330 }
331
332 return nil
333}
334
335func (s *Spindle) configureOwner() error {
336 cfgOwner := s.cfg.Server.Owner
337
338 existing, err := s.e.GetSpindleUsersByRole("server:owner", rbacDomain)
339 if err != nil {
340 return err
341 }
342
343 switch len(existing) {
344 case 0:
345 // no owner configured, continue
346 case 1:
347 // find existing owner
348 existingOwner := existing[0]
349
350 // no ownership change, this is okay
351 if existingOwner == s.cfg.Server.Owner {
352 break
353 }
354
355 // remove existing owner
356 err = s.e.RemoveSpindleOwner(rbacDomain, existingOwner)
357 if err != nil {
358 return nil
359 }
360 default:
361 return fmt.Errorf("more than one owner in DB, try deleting %q and starting over", s.cfg.Server.DBPath)
362 }
363
364 return s.e.AddSpindleOwner(rbacDomain, cfgOwner)
365}