this repo has no description
1package spindle 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log/slog" 8 "net/http" 9 10 "github.com/go-chi/chi/v5" 11 "tangled.sh/tangled.sh/core/api/tangled" 12 "tangled.sh/tangled.sh/core/jetstream" 13 "tangled.sh/tangled.sh/core/knotclient" 14 "tangled.sh/tangled.sh/core/knotclient/cursor" 15 "tangled.sh/tangled.sh/core/log" 16 "tangled.sh/tangled.sh/core/notifier" 17 "tangled.sh/tangled.sh/core/rbac" 18 "tangled.sh/tangled.sh/core/spindle/config" 19 "tangled.sh/tangled.sh/core/spindle/db" 20 "tangled.sh/tangled.sh/core/spindle/engine" 21 "tangled.sh/tangled.sh/core/spindle/models" 22 "tangled.sh/tangled.sh/core/spindle/queue" 23) 24 25const ( 26 rbacDomain = "thisserver" 27) 28 29type Spindle struct { 30 jc *jetstream.JetstreamClient 31 db *db.DB 32 e *rbac.Enforcer 33 l *slog.Logger 34 n *notifier.Notifier 35 eng *engine.Engine 36 jq *queue.Queue 37 cfg *config.Config 38 ks *knotclient.EventConsumer 39} 40 41func Run(ctx context.Context) error { 42 logger := log.FromContext(ctx) 43 44 cfg, err := config.Load(ctx) 45 if err != nil { 46 return fmt.Errorf("failed to load config: %w", err) 47 } 48 49 d, err := db.Make(cfg.Server.DBPath) 50 if err != nil { 51 return fmt.Errorf("failed to setup db: %w", err) 52 } 53 54 e, err := rbac.NewEnforcer(cfg.Server.DBPath) 55 if err != nil { 56 return fmt.Errorf("failed to setup rbac enforcer: %w", err) 57 } 58 e.E.EnableAutoSave(true) 59 60 n := notifier.New() 61 62 eng, err := engine.New(ctx, d, &n) 63 if err != nil { 64 return err 65 } 66 67 jq := queue.NewQueue(100, 2) 68 69 collections := []string{tangled.SpindleMemberNSID} 70 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, logger, d, false, false) 71 if err != nil { 72 return fmt.Errorf("failed to setup jetstream client: %w", err) 73 } 74 75 spindle := Spindle{ 76 jc: jc, 77 e: e, 78 db: d, 79 l: logger, 80 n: &n, 81 eng: eng, 82 jq: jq, 83 cfg: cfg, 84 } 85 86 err = e.AddKnot(rbacDomain) 87 if err != nil { 88 return fmt.Errorf("failed to set rbac domain: %w", err) 89 } 90 err = spindle.configureOwner() 91 if err != nil { 92 return err 93 } 94 logger.Info("owner set", "did", cfg.Server.Owner) 95 96 // starts a job queue runner in the background 97 jq.Start() 98 defer jq.Stop() 99 100 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath) 101 if err != nil { 102 return fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 103 } 104 105 err = jc.StartJetstream(ctx, spindle.ingest()) 106 if err != nil { 107 return fmt.Errorf("failed to start jetstream consumer: %w", err) 108 } 109 110 // for each incoming sh.tangled.pipeline, we execute 111 // spindle.processPipeline, which in turn enqueues the pipeline 112 // job in the above registered queue. 113 ccfg := knotclient.NewConsumerConfig() 114 ccfg.Logger = logger 115 ccfg.Dev = cfg.Server.Dev 116 ccfg.ProcessFunc = spindle.processPipeline 117 ccfg.CursorStore = cursorStore 118 knotstream := knotclient.NewEventConsumer(*ccfg) 119 knownKnots, err := d.Knots() 120 if err != nil { 121 return err 122 } 123 for _, knot := range knownKnots { 124 knotstream.AddSource(ctx, knotclient.NewEventSource(knot)) 125 } 126 spindle.ks = knotstream 127 128 go func() { 129 logger.Info("starting knot event consumer", "knots") 130 knotstream.Start(ctx) 131 }() 132 133 logger.Info("starting spindle server", "address", cfg.Server.ListenAddr) 134 logger.Error("server error", "error", http.ListenAndServe(cfg.Server.ListenAddr, spindle.Router())) 135 136 return nil 137} 138 139func (s *Spindle) Router() http.Handler { 140 mux := chi.NewRouter() 141 142 mux.HandleFunc("/events", s.Events) 143 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs) 144 return mux 145} 146 147func (s *Spindle) processPipeline(ctx context.Context, src knotclient.EventSource, msg knotclient.Message) error { 148 if msg.Nsid == tangled.PipelineNSID { 149 pipeline := tangled.Pipeline{} 150 err := json.Unmarshal(msg.EventJson, &pipeline) 151 if err != nil { 152 fmt.Println("error unmarshalling", err) 153 return err 154 } 155 156 pipelineId := models.PipelineId{ 157 Knot: src.Knot, 158 Rkey: msg.Rkey, 159 } 160 161 for _, w := range pipeline.Workflows { 162 if w != nil { 163 err := s.db.StatusPending(models.WorkflowId{ 164 PipelineId: pipelineId, 165 Name: w.Name, 166 }, s.n) 167 if err != nil { 168 return err 169 } 170 } 171 } 172 173 ok := s.jq.Enqueue(queue.Job{ 174 Run: func() error { 175 s.eng.StartWorkflows(ctx, &pipeline, pipelineId) 176 return nil 177 }, 178 OnFail: func(jobError error) { 179 s.l.Error("pipeline run failed", "error", jobError) 180 }, 181 }) 182 if ok { 183 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey) 184 } else { 185 s.l.Error("failed to enqueue pipeline: queue is full") 186 } 187 } 188 189 return nil 190} 191 192func (s *Spindle) configureOwner() error { 193 cfgOwner := s.cfg.Server.Owner 194 serverOwner, err := s.e.GetUserByRole("server:owner", rbacDomain) 195 if err != nil { 196 return fmt.Errorf("failed to fetch server:owner: %w", err) 197 } 198 199 if len(serverOwner) == 0 { 200 s.e.AddKnotOwner(rbacDomain, cfgOwner) 201 } else { 202 if serverOwner[0] != cfgOwner { 203 return fmt.Errorf("server owner mismatch: %s != %s", cfgOwner, serverOwner[0]) 204 } 205 } 206 return nil 207}