this repo has no description
1package state
2
3import (
4 "context"
5 "encoding/json"
6 "log/slog"
7 "time"
8
9 "github.com/bluesky-social/indigo/atproto/syntax"
10 "tangled.sh/tangled.sh/core/api/tangled"
11 "tangled.sh/tangled.sh/core/appview/cache"
12 "tangled.sh/tangled.sh/core/appview/config"
13 "tangled.sh/tangled.sh/core/appview/db"
14 ec "tangled.sh/tangled.sh/core/eventconsumer"
15 "tangled.sh/tangled.sh/core/eventconsumer/cursor"
16 "tangled.sh/tangled.sh/core/log"
17 "tangled.sh/tangled.sh/core/rbac"
18)
19
20func Spindlestream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer) (*ec.Consumer, error) {
21 spindles, err := db.GetSpindles(d)
22 if err != nil {
23 return nil, err
24 }
25
26 srcs := make(map[ec.Source]struct{})
27 for _, s := range spindles {
28 src := ec.NewSpindleSource(s.Instance)
29 srcs[src] = struct{}{}
30 }
31
32 logger := log.New("spindlestream")
33 cache := cache.New(c.Redis.Addr)
34 cursorStore := cursor.NewRedisCursorStore(cache)
35
36 cfg := ec.ConsumerConfig{
37 Sources: srcs,
38 ProcessFunc: spindleIngester(ctx, logger, d),
39 RetryInterval: c.Spindlestream.RetryInterval,
40 MaxRetryInterval: c.Spindlestream.MaxRetryInterval,
41 ConnectionTimeout: c.Spindlestream.ConnectionTimeout,
42 WorkerCount: c.Spindlestream.WorkerCount,
43 QueueSize: c.Spindlestream.QueueSize,
44 Logger: logger,
45 Dev: c.Core.Dev,
46 CursorStore: &cursorStore,
47 }
48
49 return ec.NewConsumer(cfg), nil
50}
51
52func spindleIngester(ctx context.Context, logger *slog.Logger, d *db.DB) ec.ProcessFunc {
53 return func(ctx context.Context, source ec.Source, msg ec.Message) error {
54 switch msg.Nsid {
55 case tangled.PipelineStatusNSID:
56 return ingestPipelineStatus(ctx, logger, d, source, msg)
57 }
58
59 return nil
60 }
61}
62
63func ingestPipelineStatus(ctx context.Context, logger *slog.Logger, d *db.DB, source ec.Source, msg ec.Message) error {
64 var record tangled.PipelineStatus
65 err := json.Unmarshal(msg.EventJson, &record)
66 if err != nil {
67 return err
68 }
69
70 pipelineUri, err := syntax.ParseATURI(record.Pipeline)
71 if err != nil {
72 return err
73 }
74
75 exitCode := 0
76 if record.ExitCode != nil {
77 exitCode = int(*record.ExitCode)
78 }
79
80 status := db.PipelineStatus{
81 Spindle: source.Key(),
82 Rkey: msg.Rkey,
83 PipelineKnot: pipelineUri.Authority().String(),
84 PipelineRkey: pipelineUri.RecordKey().String(),
85 Created: time.Now(),
86 Workflow: record.Workflow,
87 Status: record.Status,
88 Error: record.Error,
89 ExitCode: exitCode,
90 }
91
92 return db.AddPipelineStatus(d, status)
93}