this repo has no description
1package state
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log/slog"
8 "strings"
9 "time"
10
11 "github.com/bluesky-social/indigo/atproto/syntax"
12 "tangled.org/core/api/tangled"
13 "tangled.org/core/appview/cache"
14 "tangled.org/core/appview/config"
15 "tangled.org/core/appview/db"
16 "tangled.org/core/appview/models"
17 ec "tangled.org/core/eventconsumer"
18 "tangled.org/core/eventconsumer/cursor"
19 "tangled.org/core/log"
20 "tangled.org/core/orm"
21 "tangled.org/core/rbac"
22 spindle "tangled.org/core/spindle/models"
23 "tangled.org/core/workflow"
24)
25
26func Spindlestream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer) (*ec.Consumer, error) {
27 logger := log.FromContext(ctx)
28 logger = log.SubLogger(logger, "spindlestream")
29
30 spindles, err := db.GetSpindles(
31 d,
32 orm.FilterIsNot("verified", "null"),
33 )
34 if err != nil {
35 return nil, err
36 }
37
38 srcs := make(map[ec.Source]struct{})
39 for _, s := range spindles {
40 src := ec.NewSpindleSource(s.Instance)
41 srcs[src] = struct{}{}
42 }
43
44 cache := cache.New(c.Redis.Addr)
45 cursorStore := cursor.NewRedisCursorStore(cache)
46
47 cfg := ec.ConsumerConfig{
48 Sources: srcs,
49 ProcessFunc: spindleIngester(ctx, logger, d),
50 RetryInterval: c.Spindlestream.RetryInterval,
51 MaxRetryInterval: c.Spindlestream.MaxRetryInterval,
52 ConnectionTimeout: c.Spindlestream.ConnectionTimeout,
53 WorkerCount: c.Spindlestream.WorkerCount,
54 QueueSize: c.Spindlestream.QueueSize,
55 Logger: logger,
56 Dev: c.Core.Dev,
57 CursorStore: &cursorStore,
58 }
59
60 return ec.NewConsumer(cfg), nil
61}
62
63func spindleIngester(ctx context.Context, logger *slog.Logger, d *db.DB) ec.ProcessFunc {
64 return func(ctx context.Context, source ec.Source, msg ec.Message) error {
65 switch msg.Nsid {
66 case tangled.PipelineNSID:
67 return ingestPipeline(logger, d, source, msg)
68 case tangled.PipelineStatusNSID:
69 return ingestPipelineStatus(ctx, logger, d, source, msg)
70 }
71
72 return nil
73 }
74}
75
76func ingestPipeline(l *slog.Logger, d *db.DB, source ec.Source, msg ec.Message) error {
77 var record tangled.Pipeline
78 err := json.Unmarshal(msg.EventJson, &record)
79 if err != nil {
80 return err
81 }
82
83 if record.TriggerMetadata == nil {
84 return fmt.Errorf("empty trigger metadata: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
85 }
86
87 if record.TriggerMetadata.Repo == nil {
88 return fmt.Errorf("empty repo: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
89 }
90
91 // does this repo have a spindle configured?
92 repos, err := db.GetRepos(
93 d,
94 0,
95 orm.FilterEq("did", record.TriggerMetadata.Repo.Did),
96 orm.FilterEq("name", record.TriggerMetadata.Repo.Repo),
97 )
98 if err != nil {
99 return fmt.Errorf("failed to look for repo in DB: nsid %s, rkey %s, %w", msg.Nsid, msg.Rkey, err)
100 }
101 if len(repos) != 1 {
102 return fmt.Errorf("incorrect number of repos returned: %d (expected 1)", len(repos))
103 }
104 if repos[0].Spindle == "" {
105 return fmt.Errorf("repo does not have a spindle configured yet: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
106 }
107
108 // trigger info
109 var trigger models.Trigger
110 var sha string
111 trigger.Kind = workflow.TriggerKind(record.TriggerMetadata.Kind)
112 switch trigger.Kind {
113 case workflow.TriggerKindPush:
114 trigger.PushRef = &record.TriggerMetadata.Push.Ref
115 trigger.PushNewSha = &record.TriggerMetadata.Push.NewSha
116 trigger.PushOldSha = &record.TriggerMetadata.Push.OldSha
117 sha = *trigger.PushNewSha
118 case workflow.TriggerKindPullRequest:
119 trigger.PRSourceBranch = &record.TriggerMetadata.PullRequest.SourceBranch
120 trigger.PRTargetBranch = &record.TriggerMetadata.PullRequest.TargetBranch
121 trigger.PRSourceSha = &record.TriggerMetadata.PullRequest.SourceSha
122 trigger.PRAction = &record.TriggerMetadata.PullRequest.Action
123 sha = *trigger.PRSourceSha
124 }
125
126 tx, err := d.Begin()
127 if err != nil {
128 return fmt.Errorf("failed to start txn: %w", err)
129 }
130
131 triggerId, err := db.AddTrigger(tx, trigger)
132 if err != nil {
133 return fmt.Errorf("failed to add trigger entry: %w", err)
134 }
135
136 // TODO: we shouldn't even use knot to identify pipelines
137 knot := record.TriggerMetadata.Repo.Knot
138 pipeline := models.Pipeline{
139 Rkey: msg.Rkey,
140 Knot: knot,
141 RepoOwner: syntax.DID(record.TriggerMetadata.Repo.Did),
142 RepoName: record.TriggerMetadata.Repo.Repo,
143 TriggerId: int(triggerId),
144 Sha: sha,
145 }
146
147 err = db.AddPipeline(tx, pipeline)
148 if err != nil {
149 return fmt.Errorf("failed to add pipeline: %w", err)
150 }
151
152 err = tx.Commit()
153 if err != nil {
154 return fmt.Errorf("failed to commit txn: %w", err)
155 }
156
157 l.Info("added pipeline", "pipeline", pipeline)
158
159 return nil
160}
161
162func ingestPipelineStatus(ctx context.Context, logger *slog.Logger, d *db.DB, source ec.Source, msg ec.Message) error {
163 var record tangled.PipelineStatus
164 err := json.Unmarshal(msg.EventJson, &record)
165 if err != nil {
166 return err
167 }
168
169 pipelineUri, err := syntax.ParseATURI(record.Pipeline)
170 if err != nil {
171 return err
172 }
173
174 exitCode := 0
175 if record.ExitCode != nil {
176 exitCode = int(*record.ExitCode)
177 }
178
179 // pick the record creation time if possible, or use time.Now
180 created := time.Now()
181 if t, err := time.Parse(time.RFC3339, record.CreatedAt); err == nil && created.After(t) {
182 created = t
183 }
184
185 status := models.PipelineStatus{
186 Spindle: source.Key(),
187 Rkey: msg.Rkey,
188 PipelineKnot: strings.TrimPrefix(pipelineUri.Authority().String(), "did:web:"),
189 PipelineRkey: pipelineUri.RecordKey().String(),
190 Created: created,
191 Workflow: record.Workflow,
192 Status: spindle.StatusKind(record.Status),
193 Error: record.Error,
194 ExitCode: exitCode,
195 }
196
197 err = db.AddPipelineStatus(d, status)
198 if err != nil {
199 return fmt.Errorf("failed to add pipeline status: %w", err)
200 }
201
202 return nil
203}