this repo has no description
1package spindle
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7
8 "tangled.sh/tangled.sh/core/api/tangled"
9)
10
11func (s *Spindle) exec(ctx context.Context, src string, msg []byte) error {
12 pipeline := tangled.Pipeline{}
13 data := map[string]any{}
14 err := json.Unmarshal(msg, &data)
15 if err != nil {
16 fmt.Println("error unmarshalling", err)
17 return err
18 }
19
20 if data["nsid"] == tangled.PipelineNSID {
21 event, ok := data["event"]
22 if !ok {
23 s.l.Error("no event in message")
24 return nil
25 }
26
27 rawEvent, err := json.Marshal(event)
28 if err != nil {
29 return err
30 }
31
32 err = json.Unmarshal(rawEvent, &pipeline)
33 if err != nil {
34 return err
35 }
36
37 rkey, ok := data["rkey"].(string)
38 if !ok {
39 s.l.Error("no rkey in message")
40 return nil
41 }
42
43 err = s.eng.SetupPipeline(ctx, &pipeline, rkey)
44 if err != nil {
45 return err
46 }
47 err = s.eng.StartWorkflows(ctx, &pipeline, rkey)
48 if err != nil {
49 return err
50 }
51 }
52
53 return nil
54}