Monorepo for Tangled
1package state
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "slices"
9 "time"
10
11 "tangled.org/core/appview/notify"
12
13 "tangled.org/core/api/tangled"
14 "tangled.org/core/appview/cache"
15 "tangled.org/core/appview/config"
16 "tangled.org/core/appview/db"
17 "tangled.org/core/appview/models"
18 ec "tangled.org/core/eventconsumer"
19 "tangled.org/core/eventconsumer/cursor"
20 "tangled.org/core/log"
21 "tangled.org/core/orm"
22 "tangled.org/core/rbac"
23 "tangled.org/core/workflow"
24
25 "github.com/bluesky-social/indigo/atproto/syntax"
26 "github.com/go-git/go-git/v5/plumbing"
27 "github.com/posthog/posthog-go"
28)
29
30func Knotstream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, notifier notify.Notifier) (*ec.Consumer, error) {
31 logger := log.FromContext(ctx)
32 logger = log.SubLogger(logger, "knotstream")
33
34 knots, err := db.GetRegistrations(
35 d,
36 orm.FilterIsNot("registered", "null"),
37 )
38 if err != nil {
39 return nil, err
40 }
41
42 srcs := make(map[ec.Source]struct{})
43 for _, k := range knots {
44 s := ec.NewKnotSource(k.Domain)
45 srcs[s] = struct{}{}
46 }
47
48 cache := cache.New(c.Redis.Addr)
49 cursorStore := cursor.NewRedisCursorStore(cache)
50
51 cfg := ec.ConsumerConfig{
52 Sources: srcs,
53 ProcessFunc: knotIngester(d, enforcer, posthog, notifier, c.Core.Dev),
54 RetryInterval: c.Knotstream.RetryInterval,
55 MaxRetryInterval: c.Knotstream.MaxRetryInterval,
56 ConnectionTimeout: c.Knotstream.ConnectionTimeout,
57 WorkerCount: c.Knotstream.WorkerCount,
58 QueueSize: c.Knotstream.QueueSize,
59 Logger: logger,
60 Dev: c.Core.Dev,
61 CursorStore: &cursorStore,
62 }
63
64 return ec.NewConsumer(cfg), nil
65}
66
67func knotIngester(d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, notifier notify.Notifier, dev bool) ec.ProcessFunc {
68 return func(ctx context.Context, source ec.Source, msg ec.Message) error {
69 switch msg.Nsid {
70 case tangled.GitRefUpdateNSID:
71 return ingestRefUpdate(d, enforcer, posthog, notifier, dev, source, msg, ctx)
72 case tangled.PipelineNSID:
73 return ingestPipeline(d, source, msg)
74 }
75
76 return nil
77 }
78}
79
80func ingestRefUpdate(d *db.DB, enforcer *rbac.Enforcer, pc posthog.Client, notifier notify.Notifier, dev bool, source ec.Source, msg ec.Message, ctx context.Context) error {
81 logger := log.FromContext(ctx)
82
83 var record tangled.GitRefUpdate
84 err := json.Unmarshal(msg.EventJson, &record)
85 if err != nil {
86 return err
87 }
88
89 knownKnots, err := enforcer.GetKnotsForUser(record.CommitterDid)
90 if err != nil {
91 return err
92 }
93 if !slices.Contains(knownKnots, source.Key()) {
94 return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Key())
95 }
96
97 logger.Info("processing gitRefUpdate event",
98 "repo_did", record.RepoDid,
99 "repo_name", record.RepoName,
100 "ref", record.Ref,
101 "old_sha", record.OldSha,
102 "new_sha", record.NewSha)
103
104 // trigger webhook notifications first (before other ops that might fail)
105 var errWebhook error
106 repos, err := db.GetRepos(
107 d,
108 0,
109 orm.FilterEq("did", record.RepoDid),
110 orm.FilterEq("name", record.RepoName),
111 )
112 if err != nil {
113 errWebhook = fmt.Errorf("failed to lookup repo for webhooks: %w", err)
114 } else if len(repos) == 1 {
115 notifier.Push(ctx, &repos[0], record.Ref, record.OldSha, record.NewSha, record.CommitterDid)
116 } else if len(repos) == 0 {
117 errWebhook = fmt.Errorf("no repo found for webhooks: %s/%s", record.RepoDid, record.RepoName)
118 }
119
120 errPunchcard := populatePunchcard(d, record)
121 errLanguages := updateRepoLanguages(d, record)
122
123 var errPosthog error
124 if !dev && record.CommitterDid != "" {
125 errPosthog = pc.Enqueue(posthog.Capture{
126 DistinctId: record.CommitterDid,
127 Event: "git_ref_update",
128 })
129 }
130
131 return errors.Join(errWebhook, errPunchcard, errLanguages, errPosthog)
132}
133
134func populatePunchcard(d *db.DB, record tangled.GitRefUpdate) error {
135 if record.CommitterDid == "" {
136 return nil
137 }
138
139 knownEmails, err := db.GetAllEmails(d, record.CommitterDid)
140 if err != nil {
141 return err
142 }
143
144 count := 0
145 for _, ke := range knownEmails {
146 if record.Meta == nil {
147 continue
148 }
149 if record.Meta.CommitCount == nil {
150 continue
151 }
152 for _, ce := range record.Meta.CommitCount.ByEmail {
153 if ce == nil {
154 continue
155 }
156 if ce.Email == ke.Address || ce.Email == record.CommitterDid {
157 count += int(ce.Count)
158 }
159 }
160 }
161
162 punch := models.Punch{
163 Did: record.CommitterDid,
164 Date: time.Now(),
165 Count: count,
166 }
167 return db.AddPunch(d, punch)
168}
169
170func updateRepoLanguages(d *db.DB, record tangled.GitRefUpdate) error {
171 if record.Meta == nil || record.Meta.LangBreakdown == nil || record.Meta.LangBreakdown.Inputs == nil {
172 return fmt.Errorf("empty language data for repo: %s/%s", record.RepoDid, record.RepoName)
173 }
174
175 repos, err := db.GetRepos(
176 d,
177 0,
178 orm.FilterEq("did", record.RepoDid),
179 orm.FilterEq("name", record.RepoName),
180 )
181 if err != nil {
182 return fmt.Errorf("failed to look for repo in DB (%s/%s): %w", record.RepoDid, record.RepoName, err)
183 }
184 if len(repos) != 1 {
185 return fmt.Errorf("incorrect number of repos returned: %d (expected 1)", len(repos))
186 }
187 repo := repos[0]
188
189 ref := plumbing.ReferenceName(record.Ref)
190 if !ref.IsBranch() {
191 return fmt.Errorf("%s is not a valid reference name", ref)
192 }
193
194 var langs []models.RepoLanguage
195 for _, l := range record.Meta.LangBreakdown.Inputs {
196 if l == nil {
197 continue
198 }
199
200 langs = append(langs, models.RepoLanguage{
201 RepoAt: repo.RepoAt(),
202 Ref: ref.Short(),
203 IsDefaultRef: record.Meta.IsDefaultRef,
204 Language: l.Lang,
205 Bytes: l.Size,
206 })
207 }
208
209 tx, err := d.Begin()
210 if err != nil {
211 return err
212 }
213 defer tx.Rollback()
214
215 // update appview's cache
216 err = db.UpdateRepoLanguages(tx, repo.RepoAt(), ref.Short(), langs)
217 if err != nil {
218 fmt.Printf("failed; %s\n", err)
219 // non-fatal
220 }
221
222 return tx.Commit()
223}
224
225func ingestPipeline(d *db.DB, source ec.Source, msg ec.Message) error {
226 var record tangled.Pipeline
227 err := json.Unmarshal(msg.EventJson, &record)
228 if err != nil {
229 return err
230 }
231
232 if record.TriggerMetadata == nil {
233 return fmt.Errorf("empty trigger metadata: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
234 }
235
236 if record.TriggerMetadata.Repo == nil {
237 return fmt.Errorf("empty repo: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
238 }
239
240 // does this repo have a spindle configured?
241 repos, err := db.GetRepos(
242 d,
243 0,
244 orm.FilterEq("did", record.TriggerMetadata.Repo.Did),
245 orm.FilterEq("name", record.TriggerMetadata.Repo.Repo),
246 )
247 if err != nil {
248 return fmt.Errorf("failed to look for repo in DB: nsid %s, rkey %s, %w", msg.Nsid, msg.Rkey, err)
249 }
250 if len(repos) != 1 {
251 return fmt.Errorf("incorrect number of repos returned: %d (expected 1)", len(repos))
252 }
253 if repos[0].Spindle == "" {
254 return fmt.Errorf("repo does not have a spindle configured yet: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
255 }
256
257 // trigger info
258 var trigger models.Trigger
259 var sha string
260 trigger.Kind = workflow.TriggerKind(record.TriggerMetadata.Kind)
261 switch trigger.Kind {
262 case workflow.TriggerKindPush:
263 trigger.PushRef = &record.TriggerMetadata.Push.Ref
264 trigger.PushNewSha = &record.TriggerMetadata.Push.NewSha
265 trigger.PushOldSha = &record.TriggerMetadata.Push.OldSha
266 sha = *trigger.PushNewSha
267 case workflow.TriggerKindPullRequest:
268 trigger.PRSourceBranch = &record.TriggerMetadata.PullRequest.SourceBranch
269 trigger.PRTargetBranch = &record.TriggerMetadata.PullRequest.TargetBranch
270 trigger.PRSourceSha = &record.TriggerMetadata.PullRequest.SourceSha
271 trigger.PRAction = &record.TriggerMetadata.PullRequest.Action
272 sha = *trigger.PRSourceSha
273 }
274
275 tx, err := d.Begin()
276 if err != nil {
277 return fmt.Errorf("failed to start txn: %w", err)
278 }
279
280 triggerId, err := db.AddTrigger(tx, trigger)
281 if err != nil {
282 return fmt.Errorf("failed to add trigger entry: %w", err)
283 }
284
285 pipeline := models.Pipeline{
286 Rkey: msg.Rkey,
287 Knot: source.Key(),
288 RepoOwner: syntax.DID(record.TriggerMetadata.Repo.Did),
289 RepoName: record.TriggerMetadata.Repo.Repo,
290 TriggerId: int(triggerId),
291 Sha: sha,
292 }
293
294 err = db.AddPipeline(tx, pipeline)
295 if err != nil {
296 return fmt.Errorf("failed to add pipeline: %w", err)
297 }
298
299 err = tx.Commit()
300 if err != nil {
301 return fmt.Errorf("failed to commit txn: %w", err)
302 }
303
304 return nil
305}