this repo has no description
1package spindle 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log/slog" 8 "net/http" 9 10 "github.com/go-chi/chi/v5" 11 "tangled.sh/tangled.sh/core/api/tangled" 12 "tangled.sh/tangled.sh/core/jetstream" 13 "tangled.sh/tangled.sh/core/knotclient" 14 "tangled.sh/tangled.sh/core/knotclient/cursor" 15 "tangled.sh/tangled.sh/core/log" 16 "tangled.sh/tangled.sh/core/notifier" 17 "tangled.sh/tangled.sh/core/rbac" 18 "tangled.sh/tangled.sh/core/spindle/config" 19 "tangled.sh/tangled.sh/core/spindle/db" 20 "tangled.sh/tangled.sh/core/spindle/engine" 21 "tangled.sh/tangled.sh/core/spindle/queue" 22) 23 24type Spindle struct { 25 jc *jetstream.JetstreamClient 26 db *db.DB 27 e *rbac.Enforcer 28 l *slog.Logger 29 n *notifier.Notifier 30 eng *engine.Engine 31 jq *queue.Queue 32} 33 34func Run(ctx context.Context) error { 35 cfg, err := config.Load(ctx) 36 if err != nil { 37 return fmt.Errorf("failed to load config: %w", err) 38 } 39 40 d, err := db.Make(cfg.Server.DBPath) 41 if err != nil { 42 return fmt.Errorf("failed to setup db: %w", err) 43 } 44 45 e, err := rbac.NewEnforcer(cfg.Server.DBPath) 46 if err != nil { 47 return fmt.Errorf("failed to setup rbac enforcer: %w", err) 48 } 49 50 logger := log.FromContext(ctx) 51 52 collections := []string{tangled.SpindleMemberNSID} 53 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, logger, d, true, false) 54 if err != nil { 55 return fmt.Errorf("failed to setup jetstream client: %w", err) 56 } 57 58 n := notifier.New() 59 eng, err := engine.New(ctx, d, &n) 60 if err != nil { 61 return err 62 } 63 64 jq := queue.NewQueue(100) 65 66 // starts a job queue runner in the background 67 jq.StartRunner() 68 69 spindle := Spindle{ 70 jc: jc, 71 e: e, 72 db: d, 73 l: logger, 74 n: &n, 75 eng: eng, 76 jq: jq, 77 } 78 79 // for each incoming sh.tangled.pipeline, we execute 80 // spindle.processPipeline, which in turn enqueues the pipeline 81 // job in the above registered queue. 82 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath) 83 if err != nil { 84 return fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 85 } 86 go func() { 87 logger.Info("starting event consumer") 88 knotEventSource := knotclient.NewEventSource("localhost:6000") 89 90 ccfg := knotclient.NewConsumerConfig() 91 ccfg.Logger = logger 92 ccfg.Dev = cfg.Server.Dev 93 ccfg.ProcessFunc = spindle.processPipeline 94 ccfg.CursorStore = cursorStore 95 ccfg.AddEventSource(knotEventSource) 96 97 ec := knotclient.NewEventConsumer(*ccfg) 98 99 ec.Start(ctx) 100 }() 101 102 logger.Info("starting spindle server", "address", cfg.Server.ListenAddr) 103 logger.Error("server error", "error", http.ListenAndServe(cfg.Server.ListenAddr, spindle.Router())) 104 105 return nil 106} 107 108func (s *Spindle) Router() http.Handler { 109 mux := chi.NewRouter() 110 111 mux.HandleFunc("/events", s.Events) 112 mux.HandleFunc("/logs/{pipelineID}", s.Logs) 113 return mux 114} 115 116func (s *Spindle) processPipeline(ctx context.Context, src knotclient.EventSource, msg knotclient.Message) error { 117 if msg.Nsid == tangled.PipelineNSID { 118 pipeline := tangled.Pipeline{} 119 err := json.Unmarshal(msg.EventJson, &pipeline) 120 if err != nil { 121 fmt.Println("error unmarshalling", err) 122 return err 123 } 124 125 ok := s.jq.Enqueue(queue.Job{ 126 Run: func() error { 127 // this is a "fake" at uri for now 128 pipelineAtUri := fmt.Sprintf("at://%s/did:web:%s/%s", tangled.PipelineNSID, pipeline.TriggerMetadata.Repo.Knot, msg.Rkey) 129 130 rkey := TID() 131 132 err = s.db.CreatePipeline(rkey, pipelineAtUri, s.n) 133 if err != nil { 134 return err 135 } 136 137 return s.eng.StartWorkflows(ctx, &pipeline, rkey) 138 }, 139 OnFail: func(error) { 140 s.l.Error("pipeline run failed", "error", err) 141 }, 142 }) 143 if ok { 144 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey) 145 } else { 146 s.l.Error("failed to enqueue pipeline: queue is full") 147 } 148 } 149 150 return nil 151}