this repo has no description
1package spindle
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log/slog"
8 "net/http"
9
10 "github.com/go-chi/chi/v5"
11 "tangled.sh/tangled.sh/core/api/tangled"
12 "tangled.sh/tangled.sh/core/jetstream"
13 "tangled.sh/tangled.sh/core/knotclient"
14 "tangled.sh/tangled.sh/core/knotclient/cursor"
15 "tangled.sh/tangled.sh/core/log"
16 "tangled.sh/tangled.sh/core/notifier"
17 "tangled.sh/tangled.sh/core/rbac"
18 "tangled.sh/tangled.sh/core/spindle/config"
19 "tangled.sh/tangled.sh/core/spindle/db"
20 "tangled.sh/tangled.sh/core/spindle/engine"
21 "tangled.sh/tangled.sh/core/spindle/models"
22 "tangled.sh/tangled.sh/core/spindle/queue"
23)
24
25const (
26 rbacDomain = "thisserver"
27)
28
29type Spindle struct {
30 jc *jetstream.JetstreamClient
31 db *db.DB
32 e *rbac.Enforcer
33 l *slog.Logger
34 n *notifier.Notifier
35 eng *engine.Engine
36 jq *queue.Queue
37 cfg *config.Config
38 ks *knotclient.EventConsumer
39}
40
41func Run(ctx context.Context) error {
42 logger := log.FromContext(ctx)
43
44 cfg, err := config.Load(ctx)
45 if err != nil {
46 return fmt.Errorf("failed to load config: %w", err)
47 }
48
49 d, err := db.Make(cfg.Server.DBPath)
50 if err != nil {
51 return fmt.Errorf("failed to setup db: %w", err)
52 }
53
54 e, err := rbac.NewEnforcer(cfg.Server.DBPath)
55 if err != nil {
56 return fmt.Errorf("failed to setup rbac enforcer: %w", err)
57 }
58 e.E.EnableAutoSave(true)
59
60 n := notifier.New()
61
62 eng, err := engine.New(ctx, d, &n)
63 if err != nil {
64 return err
65 }
66
67 jq := queue.NewQueue(100, 2)
68
69 collections := []string{
70 tangled.SpindleMemberNSID,
71 tangled.RepoNSID,
72 }
73 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, logger, d, false, false)
74 if err != nil {
75 return fmt.Errorf("failed to setup jetstream client: %w", err)
76 }
77
78 spindle := Spindle{
79 jc: jc,
80 e: e,
81 db: d,
82 l: logger,
83 n: &n,
84 eng: eng,
85 jq: jq,
86 cfg: cfg,
87 }
88
89 err = e.AddKnot(rbacDomain)
90 if err != nil {
91 return fmt.Errorf("failed to set rbac domain: %w", err)
92 }
93 err = spindle.configureOwner()
94 if err != nil {
95 return err
96 }
97 logger.Info("owner set", "did", cfg.Server.Owner)
98
99 // starts a job queue runner in the background
100 jq.Start()
101 defer jq.Stop()
102
103 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath)
104 if err != nil {
105 return fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
106 }
107
108 err = jc.StartJetstream(ctx, spindle.ingest())
109 if err != nil {
110 return fmt.Errorf("failed to start jetstream consumer: %w", err)
111 }
112
113 // for each incoming sh.tangled.pipeline, we execute
114 // spindle.processPipeline, which in turn enqueues the pipeline
115 // job in the above registered queue.
116 ccfg := knotclient.NewConsumerConfig()
117 ccfg.Logger = logger
118 ccfg.Dev = cfg.Server.Dev
119 ccfg.ProcessFunc = spindle.processPipeline
120 ccfg.CursorStore = cursorStore
121 knotstream := knotclient.NewEventConsumer(*ccfg)
122 knownKnots, err := d.Knots()
123 if err != nil {
124 return err
125 }
126 for _, knot := range knownKnots {
127 knotstream.AddSource(ctx, knotclient.NewEventSource(knot))
128 }
129 spindle.ks = knotstream
130
131 go func() {
132 logger.Info("starting knot event consumer", "knots")
133 knotstream.Start(ctx)
134 }()
135
136 logger.Info("starting spindle server", "address", cfg.Server.ListenAddr)
137 logger.Error("server error", "error", http.ListenAndServe(cfg.Server.ListenAddr, spindle.Router()))
138
139 return nil
140}
141
142func (s *Spindle) Router() http.Handler {
143 mux := chi.NewRouter()
144
145 mux.HandleFunc("/events", s.Events)
146 mux.HandleFunc("/owner", func(w http.ResponseWriter, r *http.Request) {
147 w.Write([]byte(s.cfg.Server.Owner))
148 })
149 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs)
150 return mux
151}
152
153func (s *Spindle) processPipeline(ctx context.Context, src knotclient.EventSource, msg knotclient.Message) error {
154 if msg.Nsid == tangled.PipelineNSID {
155 pipeline := tangled.Pipeline{}
156 err := json.Unmarshal(msg.EventJson, &pipeline)
157 if err != nil {
158 fmt.Println("error unmarshalling", err)
159 return err
160 }
161
162 if pipeline.TriggerMetadata == nil {
163 return fmt.Errorf("no trigger metadata found")
164 }
165
166 if pipeline.TriggerMetadata.Repo == nil {
167 return fmt.Errorf("no repo data found")
168 }
169
170 // filter by repos
171 _, err = s.db.GetRepo(
172 pipeline.TriggerMetadata.Repo.Knot,
173 pipeline.TriggerMetadata.Repo.Did,
174 pipeline.TriggerMetadata.Repo.Repo,
175 )
176 if err != nil {
177 return err
178 }
179
180 pipelineId := models.PipelineId{
181 Knot: src.Knot,
182 Rkey: msg.Rkey,
183 }
184
185 for _, w := range pipeline.Workflows {
186 if w != nil {
187 err := s.db.StatusPending(models.WorkflowId{
188 PipelineId: pipelineId,
189 Name: w.Name,
190 }, s.n)
191 if err != nil {
192 return err
193 }
194 }
195 }
196
197 ok := s.jq.Enqueue(queue.Job{
198 Run: func() error {
199 s.eng.StartWorkflows(ctx, &pipeline, pipelineId)
200 return nil
201 },
202 OnFail: func(jobError error) {
203 s.l.Error("pipeline run failed", "error", jobError)
204 },
205 })
206 if ok {
207 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey)
208 } else {
209 s.l.Error("failed to enqueue pipeline: queue is full")
210 }
211 }
212
213 return nil
214}
215
216func (s *Spindle) configureOwner() error {
217 cfgOwner := s.cfg.Server.Owner
218 serverOwner, err := s.e.GetUserByRole("server:owner", rbacDomain)
219 if err != nil {
220 return fmt.Errorf("failed to fetch server:owner: %w", err)
221 }
222
223 if len(serverOwner) == 0 {
224 s.e.AddKnotOwner(rbacDomain, cfgOwner)
225 } else {
226 if serverOwner[0] != cfgOwner {
227 return fmt.Errorf("server owner mismatch: %s != %s", cfgOwner, serverOwner[0])
228 }
229 }
230 return nil
231}