this repo has no description
1package spindle
2
3import (
4 "context"
5 _ "embed"
6 "encoding/json"
7 "fmt"
8 "log/slog"
9 "net/http"
10
11 "github.com/go-chi/chi/v5"
12 "tangled.sh/tangled.sh/core/api/tangled"
13 "tangled.sh/tangled.sh/core/eventconsumer"
14 "tangled.sh/tangled.sh/core/eventconsumer/cursor"
15 "tangled.sh/tangled.sh/core/idresolver"
16 "tangled.sh/tangled.sh/core/jetstream"
17 "tangled.sh/tangled.sh/core/log"
18 "tangled.sh/tangled.sh/core/notifier"
19 "tangled.sh/tangled.sh/core/rbac"
20 "tangled.sh/tangled.sh/core/spindle/config"
21 "tangled.sh/tangled.sh/core/spindle/db"
22 "tangled.sh/tangled.sh/core/spindle/engine"
23 "tangled.sh/tangled.sh/core/spindle/models"
24 "tangled.sh/tangled.sh/core/spindle/queue"
25 "tangled.sh/tangled.sh/core/spindle/secrets"
26 "tangled.sh/tangled.sh/core/spindle/xrpc"
27)
28
29//go:embed motd
30var motd []byte
31
32const (
33 rbacDomain = "thisserver"
34)
35
36type Spindle struct {
37 jc *jetstream.JetstreamClient
38 db *db.DB
39 e *rbac.Enforcer
40 l *slog.Logger
41 n *notifier.Notifier
42 eng *engine.Engine
43 jq *queue.Queue
44 cfg *config.Config
45 ks *eventconsumer.Consumer
46 res *idresolver.Resolver
47 vault secrets.Manager
48}
49
50func Run(ctx context.Context) error {
51 logger := log.FromContext(ctx)
52
53 cfg, err := config.Load(ctx)
54 if err != nil {
55 return fmt.Errorf("failed to load config: %w", err)
56 }
57
58 d, err := db.Make(cfg.Server.DBPath)
59 if err != nil {
60 return fmt.Errorf("failed to setup db: %w", err)
61 }
62
63 e, err := rbac.NewEnforcer(cfg.Server.DBPath)
64 if err != nil {
65 return fmt.Errorf("failed to setup rbac enforcer: %w", err)
66 }
67 e.E.EnableAutoSave(true)
68
69 n := notifier.New()
70
71 eng, err := engine.New(ctx, cfg, d, &n)
72 if err != nil {
73 return err
74 }
75
76 jq := queue.NewQueue(100, 2)
77
78 collections := []string{
79 tangled.SpindleMemberNSID,
80 tangled.RepoNSID,
81 }
82 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, logger, d, true, true)
83 if err != nil {
84 return fmt.Errorf("failed to setup jetstream client: %w", err)
85 }
86 jc.AddDid(cfg.Server.Owner)
87
88 resolver := idresolver.DefaultResolver()
89
90 spindle := Spindle{
91 jc: jc,
92 e: e,
93 db: d,
94 l: logger,
95 n: &n,
96 eng: eng,
97 jq: jq,
98 cfg: cfg,
99 res: resolver,
100 vault: vault,
101 }
102
103 err = e.AddSpindle(rbacDomain)
104 if err != nil {
105 return fmt.Errorf("failed to set rbac domain: %w", err)
106 }
107 err = spindle.configureOwner()
108 if err != nil {
109 return err
110 }
111 logger.Info("owner set", "did", cfg.Server.Owner)
112
113 // starts a job queue runner in the background
114 jq.Start()
115 defer jq.Stop()
116
117 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath)
118 if err != nil {
119 return fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
120 }
121
122 err = jc.StartJetstream(ctx, spindle.ingest())
123 if err != nil {
124 return fmt.Errorf("failed to start jetstream consumer: %w", err)
125 }
126
127 // for each incoming sh.tangled.pipeline, we execute
128 // spindle.processPipeline, which in turn enqueues the pipeline
129 // job in the above registered queue.
130 ccfg := eventconsumer.NewConsumerConfig()
131 ccfg.Logger = logger
132 ccfg.Dev = cfg.Server.Dev
133 ccfg.ProcessFunc = spindle.processPipeline
134 ccfg.CursorStore = cursorStore
135 knownKnots, err := d.Knots()
136 if err != nil {
137 return err
138 }
139 for _, knot := range knownKnots {
140 logger.Info("adding source start", "knot", knot)
141 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{}
142 }
143 spindle.ks = eventconsumer.NewConsumer(*ccfg)
144
145 go func() {
146 logger.Info("starting knot event consumer")
147 spindle.ks.Start(ctx)
148 }()
149
150 logger.Info("starting spindle server", "address", cfg.Server.ListenAddr)
151 logger.Error("server error", "error", http.ListenAndServe(cfg.Server.ListenAddr, spindle.Router()))
152
153 return nil
154}
155
156func (s *Spindle) Router() http.Handler {
157 mux := chi.NewRouter()
158
159 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
160 w.Write(motd)
161 })
162 mux.HandleFunc("/events", s.Events)
163 mux.HandleFunc("/owner", func(w http.ResponseWriter, r *http.Request) {
164 w.Write([]byte(s.cfg.Server.Owner))
165 })
166 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs)
167
168 mux.Mount("/xrpc", s.XrpcRouter())
169 return mux
170}
171
172func (s *Spindle) XrpcRouter() http.Handler {
173 logger := s.l.With("route", "xrpc")
174
175 x := xrpc.Xrpc{
176 Logger: logger,
177 Db: s.db,
178 Enforcer: s.e,
179 Engine: s.eng,
180 Config: s.cfg,
181 Resolver: s.res,
182 Vault: s.vault,
183 }
184
185 return x.Router()
186}
187
188func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error {
189 if msg.Nsid == tangled.PipelineNSID {
190 tpl := tangled.Pipeline{}
191 err := json.Unmarshal(msg.EventJson, &tpl)
192 if err != nil {
193 fmt.Println("error unmarshalling", err)
194 return err
195 }
196
197 if tpl.TriggerMetadata == nil {
198 return fmt.Errorf("no trigger metadata found")
199 }
200
201 if tpl.TriggerMetadata.Repo == nil {
202 return fmt.Errorf("no repo data found")
203 }
204
205 // filter by repos
206 _, err = s.db.GetRepo(
207 tpl.TriggerMetadata.Repo.Knot,
208 tpl.TriggerMetadata.Repo.Did,
209 tpl.TriggerMetadata.Repo.Repo,
210 )
211 if err != nil {
212 return err
213 }
214
215 pipelineId := models.PipelineId{
216 Knot: src.Key(),
217 Rkey: msg.Rkey,
218 }
219
220 for _, w := range tpl.Workflows {
221 if w != nil {
222 err := s.db.StatusPending(models.WorkflowId{
223 PipelineId: pipelineId,
224 Name: w.Name,
225 }, s.n)
226 if err != nil {
227 return err
228 }
229 }
230 }
231
232 spl := models.ToPipeline(tpl, *s.cfg)
233
234 ok := s.jq.Enqueue(queue.Job{
235 Run: func() error {
236 s.eng.StartWorkflows(ctx, spl, pipelineId)
237 return nil
238 },
239 OnFail: func(jobError error) {
240 s.l.Error("pipeline run failed", "error", jobError)
241 },
242 })
243 if ok {
244 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey)
245 } else {
246 s.l.Error("failed to enqueue pipeline: queue is full")
247 }
248 }
249
250 return nil
251}
252
253func (s *Spindle) configureOwner() error {
254 cfgOwner := s.cfg.Server.Owner
255
256 existing, err := s.e.GetSpindleUsersByRole("server:owner", rbacDomain)
257 if err != nil {
258 return err
259 }
260
261 switch len(existing) {
262 case 0:
263 // no owner configured, continue
264 case 1:
265 // find existing owner
266 existingOwner := existing[0]
267
268 // no ownership change, this is okay
269 if existingOwner == s.cfg.Server.Owner {
270 break
271 }
272
273 // remove existing owner
274 err = s.e.RemoveSpindleOwner(rbacDomain, existingOwner)
275 if err != nil {
276 return nil
277 }
278 default:
279 return fmt.Errorf("more than one owner in DB, try deleting %q and starting over", s.cfg.Server.DBPath)
280 }
281
282 return s.e.AddSpindleOwner(rbacDomain, cfgOwner)
283}