this repo has no description
1package spindle 2 3import ( 4 "context" 5 _ "embed" 6 "encoding/json" 7 "fmt" 8 "log/slog" 9 "net/http" 10 11 "github.com/go-chi/chi/v5" 12 "tangled.sh/tangled.sh/core/api/tangled" 13 "tangled.sh/tangled.sh/core/eventconsumer" 14 "tangled.sh/tangled.sh/core/eventconsumer/cursor" 15 "tangled.sh/tangled.sh/core/idresolver" 16 "tangled.sh/tangled.sh/core/jetstream" 17 "tangled.sh/tangled.sh/core/log" 18 "tangled.sh/tangled.sh/core/notifier" 19 "tangled.sh/tangled.sh/core/rbac" 20 "tangled.sh/tangled.sh/core/spindle/config" 21 "tangled.sh/tangled.sh/core/spindle/db" 22 "tangled.sh/tangled.sh/core/spindle/engine" 23 "tangled.sh/tangled.sh/core/spindle/models" 24 "tangled.sh/tangled.sh/core/spindle/queue" 25 "tangled.sh/tangled.sh/core/spindle/secrets" 26 "tangled.sh/tangled.sh/core/spindle/xrpc" 27) 28 29//go:embed motd 30var motd []byte 31 32const ( 33 rbacDomain = "thisserver" 34) 35 36type Spindle struct { 37 jc *jetstream.JetstreamClient 38 db *db.DB 39 e *rbac.Enforcer 40 l *slog.Logger 41 n *notifier.Notifier 42 eng *engine.Engine 43 jq *queue.Queue 44 cfg *config.Config 45 ks *eventconsumer.Consumer 46 res *idresolver.Resolver 47 vault secrets.Manager 48} 49 50func Run(ctx context.Context) error { 51 logger := log.FromContext(ctx) 52 53 cfg, err := config.Load(ctx) 54 if err != nil { 55 return fmt.Errorf("failed to load config: %w", err) 56 } 57 58 d, err := db.Make(cfg.Server.DBPath) 59 if err != nil { 60 return fmt.Errorf("failed to setup db: %w", err) 61 } 62 63 e, err := rbac.NewEnforcer(cfg.Server.DBPath) 64 if err != nil { 65 return fmt.Errorf("failed to setup rbac enforcer: %w", err) 66 } 67 e.E.EnableAutoSave(true) 68 69 n := notifier.New() 70 71 eng, err := engine.New(ctx, cfg, d, &n) 72 if err != nil { 73 return err 74 } 75 76 jq := queue.NewQueue(100, 2) 77 78 collections := []string{ 79 tangled.SpindleMemberNSID, 80 tangled.RepoNSID, 81 } 82 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, logger, d, true, true) 83 if err != nil { 84 return fmt.Errorf("failed to setup jetstream client: %w", err) 85 } 86 jc.AddDid(cfg.Server.Owner) 87 88 resolver := idresolver.DefaultResolver() 89 90 spindle := Spindle{ 91 jc: jc, 92 e: e, 93 db: d, 94 l: logger, 95 n: &n, 96 eng: eng, 97 jq: jq, 98 cfg: cfg, 99 res: resolver, 100 vault: vault, 101 } 102 103 err = e.AddSpindle(rbacDomain) 104 if err != nil { 105 return fmt.Errorf("failed to set rbac domain: %w", err) 106 } 107 err = spindle.configureOwner() 108 if err != nil { 109 return err 110 } 111 logger.Info("owner set", "did", cfg.Server.Owner) 112 113 // starts a job queue runner in the background 114 jq.Start() 115 defer jq.Stop() 116 117 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath) 118 if err != nil { 119 return fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 120 } 121 122 err = jc.StartJetstream(ctx, spindle.ingest()) 123 if err != nil { 124 return fmt.Errorf("failed to start jetstream consumer: %w", err) 125 } 126 127 // for each incoming sh.tangled.pipeline, we execute 128 // spindle.processPipeline, which in turn enqueues the pipeline 129 // job in the above registered queue. 130 ccfg := eventconsumer.NewConsumerConfig() 131 ccfg.Logger = logger 132 ccfg.Dev = cfg.Server.Dev 133 ccfg.ProcessFunc = spindle.processPipeline 134 ccfg.CursorStore = cursorStore 135 knownKnots, err := d.Knots() 136 if err != nil { 137 return err 138 } 139 for _, knot := range knownKnots { 140 logger.Info("adding source start", "knot", knot) 141 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{} 142 } 143 spindle.ks = eventconsumer.NewConsumer(*ccfg) 144 145 go func() { 146 logger.Info("starting knot event consumer") 147 spindle.ks.Start(ctx) 148 }() 149 150 logger.Info("starting spindle server", "address", cfg.Server.ListenAddr) 151 logger.Error("server error", "error", http.ListenAndServe(cfg.Server.ListenAddr, spindle.Router())) 152 153 return nil 154} 155 156func (s *Spindle) Router() http.Handler { 157 mux := chi.NewRouter() 158 159 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { 160 w.Write(motd) 161 }) 162 mux.HandleFunc("/events", s.Events) 163 mux.HandleFunc("/owner", func(w http.ResponseWriter, r *http.Request) { 164 w.Write([]byte(s.cfg.Server.Owner)) 165 }) 166 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs) 167 168 mux.Mount("/xrpc", s.XrpcRouter()) 169 return mux 170} 171 172func (s *Spindle) XrpcRouter() http.Handler { 173 logger := s.l.With("route", "xrpc") 174 175 x := xrpc.Xrpc{ 176 Logger: logger, 177 Db: s.db, 178 Enforcer: s.e, 179 Engine: s.eng, 180 Config: s.cfg, 181 Resolver: s.res, 182 Vault: s.vault, 183 } 184 185 return x.Router() 186} 187 188func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 189 if msg.Nsid == tangled.PipelineNSID { 190 tpl := tangled.Pipeline{} 191 err := json.Unmarshal(msg.EventJson, &tpl) 192 if err != nil { 193 fmt.Println("error unmarshalling", err) 194 return err 195 } 196 197 if tpl.TriggerMetadata == nil { 198 return fmt.Errorf("no trigger metadata found") 199 } 200 201 if tpl.TriggerMetadata.Repo == nil { 202 return fmt.Errorf("no repo data found") 203 } 204 205 // filter by repos 206 _, err = s.db.GetRepo( 207 tpl.TriggerMetadata.Repo.Knot, 208 tpl.TriggerMetadata.Repo.Did, 209 tpl.TriggerMetadata.Repo.Repo, 210 ) 211 if err != nil { 212 return err 213 } 214 215 pipelineId := models.PipelineId{ 216 Knot: src.Key(), 217 Rkey: msg.Rkey, 218 } 219 220 for _, w := range tpl.Workflows { 221 if w != nil { 222 err := s.db.StatusPending(models.WorkflowId{ 223 PipelineId: pipelineId, 224 Name: w.Name, 225 }, s.n) 226 if err != nil { 227 return err 228 } 229 } 230 } 231 232 spl := models.ToPipeline(tpl, *s.cfg) 233 234 ok := s.jq.Enqueue(queue.Job{ 235 Run: func() error { 236 s.eng.StartWorkflows(ctx, spl, pipelineId) 237 return nil 238 }, 239 OnFail: func(jobError error) { 240 s.l.Error("pipeline run failed", "error", jobError) 241 }, 242 }) 243 if ok { 244 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey) 245 } else { 246 s.l.Error("failed to enqueue pipeline: queue is full") 247 } 248 } 249 250 return nil 251} 252 253func (s *Spindle) configureOwner() error { 254 cfgOwner := s.cfg.Server.Owner 255 256 existing, err := s.e.GetSpindleUsersByRole("server:owner", rbacDomain) 257 if err != nil { 258 return err 259 } 260 261 switch len(existing) { 262 case 0: 263 // no owner configured, continue 264 case 1: 265 // find existing owner 266 existingOwner := existing[0] 267 268 // no ownership change, this is okay 269 if existingOwner == s.cfg.Server.Owner { 270 break 271 } 272 273 // remove existing owner 274 err = s.e.RemoveSpindleOwner(rbacDomain, existingOwner) 275 if err != nil { 276 return nil 277 } 278 default: 279 return fmt.Errorf("more than one owner in DB, try deleting %q and starting over", s.cfg.Server.DBPath) 280 } 281 282 return s.e.AddSpindleOwner(rbacDomain, cfgOwner) 283}