this repo has no description
1package spindle
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log/slog"
8 "net/http"
9
10 "github.com/go-chi/chi/v5"
11 "tangled.sh/tangled.sh/core/api/tangled"
12 "tangled.sh/tangled.sh/core/jetstream"
13 "tangled.sh/tangled.sh/core/knotclient"
14 "tangled.sh/tangled.sh/core/knotclient/cursor"
15 "tangled.sh/tangled.sh/core/log"
16 "tangled.sh/tangled.sh/core/notifier"
17 "tangled.sh/tangled.sh/core/rbac"
18 "tangled.sh/tangled.sh/core/spindle/config"
19 "tangled.sh/tangled.sh/core/spindle/db"
20 "tangled.sh/tangled.sh/core/spindle/engine"
21 "tangled.sh/tangled.sh/core/spindle/queue"
22)
23
24type Spindle struct {
25 jc *jetstream.JetstreamClient
26 db *db.DB
27 e *rbac.Enforcer
28 l *slog.Logger
29 n *notifier.Notifier
30 eng *engine.Engine
31 jq *queue.Queue
32}
33
34func Run(ctx context.Context) error {
35 cfg, err := config.Load(ctx)
36 if err != nil {
37 return fmt.Errorf("failed to load config: %w", err)
38 }
39
40 d, err := db.Make(cfg.Server.DBPath)
41 if err != nil {
42 return fmt.Errorf("failed to setup db: %w", err)
43 }
44
45 e, err := rbac.NewEnforcer(cfg.Server.DBPath)
46 if err != nil {
47 return fmt.Errorf("failed to setup rbac enforcer: %w", err)
48 }
49
50 logger := log.FromContext(ctx)
51
52 collections := []string{tangled.SpindleMemberNSID}
53 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, logger, d, true, false)
54 if err != nil {
55 return fmt.Errorf("failed to setup jetstream client: %w", err)
56 }
57
58 n := notifier.New()
59 eng, err := engine.New(ctx, d, &n)
60 if err != nil {
61 return err
62 }
63
64 jq := queue.NewQueue(100)
65
66 // starts a job queue runner in the background
67 jq.StartRunner()
68
69 spindle := Spindle{
70 jc: jc,
71 e: e,
72 db: d,
73 l: logger,
74 n: &n,
75 eng: eng,
76 jq: jq,
77 }
78
79 // for each incoming sh.tangled.pipeline, we execute
80 // spindle.processPipeline, which in turn enqueues the pipeline
81 // job in the above registered queue.
82 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath)
83 if err != nil {
84 return fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
85 }
86 go func() {
87 logger.Info("starting event consumer")
88 knotEventSource := knotclient.NewEventSource("localhost:6000")
89
90 ccfg := knotclient.NewConsumerConfig()
91 ccfg.Logger = logger
92 ccfg.Dev = cfg.Server.Dev
93 ccfg.ProcessFunc = spindle.processPipeline
94 ccfg.CursorStore = cursorStore
95 ccfg.AddEventSource(knotEventSource)
96
97 ec := knotclient.NewEventConsumer(*ccfg)
98
99 ec.Start(ctx)
100 }()
101
102 logger.Info("starting spindle server", "address", cfg.Server.ListenAddr)
103 logger.Error("server error", "error", http.ListenAndServe(cfg.Server.ListenAddr, spindle.Router()))
104
105 return nil
106}
107
108func (s *Spindle) Router() http.Handler {
109 mux := chi.NewRouter()
110
111 mux.HandleFunc("/events", s.Events)
112 mux.HandleFunc("/logs/{pipelineID}", s.Logs)
113 return mux
114}
115
116func (s *Spindle) processPipeline(ctx context.Context, src knotclient.EventSource, msg knotclient.Message) error {
117 if msg.Nsid == tangled.PipelineNSID {
118 pipeline := tangled.Pipeline{}
119 err := json.Unmarshal(msg.EventJson, &pipeline)
120 if err != nil {
121 fmt.Println("error unmarshalling", err)
122 return err
123 }
124
125 ok := s.jq.Enqueue(queue.Job{
126 Run: func() error {
127 // this is a "fake" at uri for now
128 pipelineAtUri := fmt.Sprintf("at://%s/did:web:%s/%s", tangled.PipelineNSID, pipeline.TriggerMetadata.Repo.Knot, msg.Rkey)
129
130 rkey := TID()
131
132 err = s.db.CreatePipeline(rkey, pipelineAtUri, s.n)
133 if err != nil {
134 return err
135 }
136
137 return s.eng.StartWorkflows(ctx, &pipeline, rkey)
138 },
139 OnFail: func(error) {
140 s.l.Error("pipeline run failed", "error", err)
141 },
142 })
143 if ok {
144 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey)
145 } else {
146 s.l.Error("failed to enqueue pipeline: queue is full")
147 }
148 }
149
150 return nil
151}