this repo has no description
1package spindle
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log/slog"
8 "net/http"
9
10 "github.com/go-chi/chi/v5"
11 "tangled.sh/tangled.sh/core/api/tangled"
12 "tangled.sh/tangled.sh/core/jetstream"
13 "tangled.sh/tangled.sh/core/knotclient"
14 "tangled.sh/tangled.sh/core/knotclient/cursor"
15 "tangled.sh/tangled.sh/core/log"
16 "tangled.sh/tangled.sh/core/notifier"
17 "tangled.sh/tangled.sh/core/rbac"
18 "tangled.sh/tangled.sh/core/spindle/config"
19 "tangled.sh/tangled.sh/core/spindle/db"
20 "tangled.sh/tangled.sh/core/spindle/engine"
21 "tangled.sh/tangled.sh/core/spindle/models"
22 "tangled.sh/tangled.sh/core/spindle/queue"
23)
24
25const (
26 rbacDomain = "thisserver"
27)
28
29type Spindle struct {
30 jc *jetstream.JetstreamClient
31 db *db.DB
32 e *rbac.Enforcer
33 l *slog.Logger
34 n *notifier.Notifier
35 eng *engine.Engine
36 jq *queue.Queue
37 cfg *config.Config
38 ks *knotclient.EventConsumer
39}
40
41func Run(ctx context.Context) error {
42 logger := log.FromContext(ctx)
43
44 cfg, err := config.Load(ctx)
45 if err != nil {
46 return fmt.Errorf("failed to load config: %w", err)
47 }
48
49 d, err := db.Make(cfg.Server.DBPath)
50 if err != nil {
51 return fmt.Errorf("failed to setup db: %w", err)
52 }
53
54 e, err := rbac.NewEnforcer(cfg.Server.DBPath)
55 if err != nil {
56 return fmt.Errorf("failed to setup rbac enforcer: %w", err)
57 }
58 e.E.EnableAutoSave(true)
59
60 n := notifier.New()
61
62 eng, err := engine.New(ctx, d, &n)
63 if err != nil {
64 return err
65 }
66
67 jq := queue.NewQueue(100, 2)
68
69 collections := []string{tangled.SpindleMemberNSID}
70 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, logger, d, false, false)
71 if err != nil {
72 return fmt.Errorf("failed to setup jetstream client: %w", err)
73 }
74
75 spindle := Spindle{
76 jc: jc,
77 e: e,
78 db: d,
79 l: logger,
80 n: &n,
81 eng: eng,
82 jq: jq,
83 cfg: cfg,
84 }
85
86 err = e.AddKnot(rbacDomain)
87 if err != nil {
88 return fmt.Errorf("failed to set rbac domain: %w", err)
89 }
90 err = spindle.configureOwner()
91 if err != nil {
92 return err
93 }
94 logger.Info("owner set", "did", cfg.Server.Owner)
95
96 // starts a job queue runner in the background
97 jq.Start()
98 defer jq.Stop()
99
100 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath)
101 if err != nil {
102 return fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
103 }
104
105 err = jc.StartJetstream(ctx, spindle.ingest())
106 if err != nil {
107 return fmt.Errorf("failed to start jetstream consumer: %w", err)
108 }
109
110 // for each incoming sh.tangled.pipeline, we execute
111 // spindle.processPipeline, which in turn enqueues the pipeline
112 // job in the above registered queue.
113 ccfg := knotclient.NewConsumerConfig()
114 ccfg.Logger = logger
115 ccfg.Dev = cfg.Server.Dev
116 ccfg.ProcessFunc = spindle.processPipeline
117 ccfg.CursorStore = cursorStore
118 knotstream := knotclient.NewEventConsumer(*ccfg)
119 knownKnots, err := d.Knots()
120 if err != nil {
121 return err
122 }
123 for _, knot := range knownKnots {
124 knotstream.AddSource(ctx, knotclient.NewEventSource(knot))
125 }
126 spindle.ks = knotstream
127
128 go func() {
129 logger.Info("starting knot event consumer", "knots")
130 knotstream.Start(ctx)
131 }()
132
133 logger.Info("starting spindle server", "address", cfg.Server.ListenAddr)
134 logger.Error("server error", "error", http.ListenAndServe(cfg.Server.ListenAddr, spindle.Router()))
135
136 return nil
137}
138
139func (s *Spindle) Router() http.Handler {
140 mux := chi.NewRouter()
141
142 mux.HandleFunc("/events", s.Events)
143 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs)
144 return mux
145}
146
147func (s *Spindle) processPipeline(ctx context.Context, src knotclient.EventSource, msg knotclient.Message) error {
148 if msg.Nsid == tangled.PipelineNSID {
149 pipeline := tangled.Pipeline{}
150 err := json.Unmarshal(msg.EventJson, &pipeline)
151 if err != nil {
152 fmt.Println("error unmarshalling", err)
153 return err
154 }
155
156 pipelineId := models.PipelineId{
157 Knot: src.Knot,
158 Rkey: msg.Rkey,
159 }
160
161 for _, w := range pipeline.Workflows {
162 if w != nil {
163 err := s.db.StatusPending(models.WorkflowId{
164 PipelineId: pipelineId,
165 Name: w.Name,
166 }, s.n)
167 if err != nil {
168 return err
169 }
170 }
171 }
172
173 ok := s.jq.Enqueue(queue.Job{
174 Run: func() error {
175 s.eng.StartWorkflows(ctx, &pipeline, pipelineId)
176 return nil
177 },
178 OnFail: func(jobError error) {
179 s.l.Error("pipeline run failed", "error", jobError)
180 },
181 })
182 if ok {
183 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey)
184 } else {
185 s.l.Error("failed to enqueue pipeline: queue is full")
186 }
187 }
188
189 return nil
190}
191
192func (s *Spindle) configureOwner() error {
193 cfgOwner := s.cfg.Server.Owner
194 serverOwner, err := s.e.GetUserByRole("server:owner", rbacDomain)
195 if err != nil {
196 return fmt.Errorf("failed to fetch server:owner: %w", err)
197 }
198
199 if len(serverOwner) == 0 {
200 s.e.AddKnotOwner(rbacDomain, cfgOwner)
201 } else {
202 if serverOwner[0] != cfgOwner {
203 return fmt.Errorf("server owner mismatch: %s != %s", cfgOwner, serverOwner[0])
204 }
205 }
206 return nil
207}