Monorepo for Tangled
1package state
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "slices"
10 "time"
11
12 "tangled.org/core/appview/notify"
13
14 "tangled.org/core/api/tangled"
15 "tangled.org/core/appview/cache"
16 "tangled.org/core/appview/config"
17 "tangled.org/core/appview/db"
18 "tangled.org/core/appview/models"
19 ec "tangled.org/core/eventconsumer"
20 "tangled.org/core/eventconsumer/cursor"
21 "tangled.org/core/log"
22 "tangled.org/core/orm"
23 "tangled.org/core/rbac"
24 "tangled.org/core/workflow"
25
26 "github.com/bluesky-social/indigo/atproto/syntax"
27 "github.com/go-git/go-git/v5/plumbing"
28 "github.com/posthog/posthog-go"
29)
30
31func Knotstream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, notifier notify.Notifier) (*ec.Consumer, error) {
32 logger := log.FromContext(ctx)
33 logger = log.SubLogger(logger, "knotstream")
34
35 knots, err := db.GetRegistrations(
36 d,
37 orm.FilterIsNot("registered", "null"),
38 )
39 if err != nil {
40 return nil, err
41 }
42
43 srcs := make(map[ec.Source]struct{})
44 for _, k := range knots {
45 s := ec.NewKnotSource(k.Domain)
46 srcs[s] = struct{}{}
47 }
48
49 cache := cache.New(c.Redis.Addr)
50 cursorStore := cursor.NewRedisCursorStore(cache)
51
52 cfg := ec.ConsumerConfig{
53 Sources: srcs,
54 ProcessFunc: knotIngester(d, enforcer, posthog, notifier, c.Core.Dev),
55 RetryInterval: c.Knotstream.RetryInterval,
56 MaxRetryInterval: c.Knotstream.MaxRetryInterval,
57 ConnectionTimeout: c.Knotstream.ConnectionTimeout,
58 WorkerCount: c.Knotstream.WorkerCount,
59 QueueSize: c.Knotstream.QueueSize,
60 Logger: logger,
61 Dev: c.Core.Dev,
62 CursorStore: &cursorStore,
63 }
64
65 return ec.NewConsumer(cfg), nil
66}
67
68func knotIngester(d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, notifier notify.Notifier, dev bool) ec.ProcessFunc {
69 return func(ctx context.Context, source ec.Source, msg ec.Message) error {
70 switch msg.Nsid {
71 case tangled.GitRefUpdateNSID:
72 return ingestRefUpdate(d, enforcer, posthog, notifier, dev, source, msg, ctx)
73 case tangled.PipelineNSID:
74 return ingestPipeline(d, source, msg)
75 }
76
77 return nil
78 }
79}
80
81func ingestRefUpdate(d *db.DB, enforcer *rbac.Enforcer, pc posthog.Client, notifier notify.Notifier, dev bool, source ec.Source, msg ec.Message, ctx context.Context) error {
82 logger := log.FromContext(ctx)
83
84 var record tangled.GitRefUpdate
85 err := json.Unmarshal(msg.EventJson, &record)
86 if err != nil {
87 return err
88 }
89
90 if record.RepoDid == "" {
91 logger.Error("gitRefUpdate missing repoDid, skipping", "owner_did", record.OwnerDid, "repo_name", record.RepoName)
92 return fmt.Errorf("gitRefUpdate missing repoDid")
93 }
94
95 knownKnots, err := enforcer.GetKnotsForUser(record.CommitterDid)
96 if err != nil {
97 return err
98 }
99 if !slices.Contains(knownKnots, source.Key()) {
100 return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Key())
101 }
102
103 logger.Info("processing gitRefUpdate event",
104 "repo_did", record.RepoDid,
105 "ref", record.Ref,
106 "old_sha", record.OldSha,
107 "new_sha", record.NewSha)
108
109 var errWebhook error
110
111 repo, lookupErr := db.GetRepoByDid(d, record.RepoDid)
112 if lookupErr != nil && !errors.Is(lookupErr, sql.ErrNoRows) {
113 return fmt.Errorf("failed to look up repo by DID %s: %w", record.RepoDid, lookupErr)
114 }
115
116 var repos []models.Repo
117 if lookupErr == nil {
118 repos = []models.Repo{*repo}
119 }
120
121 if errWebhook == nil && len(repos) == 1 {
122 notifier.Push(ctx, &repos[0], record.Ref, record.OldSha, record.NewSha, record.CommitterDid)
123 }
124
125 errPunchcard := populatePunchcard(d, record)
126 errLanguages := updateRepoLanguages(d, record)
127
128 var errPosthog error
129 if !dev && record.CommitterDid != "" {
130 errPosthog = pc.Enqueue(posthog.Capture{
131 DistinctId: record.CommitterDid,
132 Event: "git_ref_update",
133 })
134 }
135
136 return errors.Join(errWebhook, errPunchcard, errLanguages, errPosthog)
137}
138
139func populatePunchcard(d *db.DB, record tangled.GitRefUpdate) error {
140 if record.CommitterDid == "" {
141 return nil
142 }
143
144 knownEmails, err := db.GetAllEmails(d, record.CommitterDid)
145 if err != nil {
146 return err
147 }
148
149 count := 0
150 for _, ke := range knownEmails {
151 if record.Meta == nil {
152 continue
153 }
154 if record.Meta.CommitCount == nil {
155 continue
156 }
157 for _, ce := range record.Meta.CommitCount.ByEmail {
158 if ce == nil {
159 continue
160 }
161 if ce.Email == ke.Address || ce.Email == record.CommitterDid {
162 count += int(ce.Count)
163 }
164 }
165 }
166
167 punch := models.Punch{
168 Did: record.CommitterDid,
169 Date: time.Now(),
170 Count: count,
171 }
172 return db.AddPunch(d, punch)
173}
174
175func updateRepoLanguages(d *db.DB, record tangled.GitRefUpdate) error {
176 if record.Meta == nil || record.Meta.LangBreakdown == nil || record.Meta.LangBreakdown.Inputs == nil {
177 return fmt.Errorf("empty language data for repo: %s/%s", record.OwnerDid, record.RepoName)
178 }
179
180 if record.RepoDid == "" {
181 return fmt.Errorf("gitRefUpdate missing repoDid for language update")
182 }
183
184 r, lookupErr := db.GetRepoByDid(d, record.RepoDid)
185 if lookupErr != nil {
186 return fmt.Errorf("failed to look up repo by DID %s: %w", record.RepoDid, lookupErr)
187 }
188 repo := *r
189
190 ref := plumbing.ReferenceName(record.Ref)
191 if !ref.IsBranch() {
192 return fmt.Errorf("%s is not a valid reference name", ref)
193 }
194
195 var langs []models.RepoLanguage
196 for _, l := range record.Meta.LangBreakdown.Inputs {
197 if l == nil {
198 continue
199 }
200
201 langs = append(langs, models.RepoLanguage{
202 RepoAt: repo.RepoAt(),
203 RepoDid: repo.RepoDid,
204 Ref: ref.Short(),
205 IsDefaultRef: record.Meta.IsDefaultRef,
206 Language: l.Lang,
207 Bytes: l.Size,
208 })
209 }
210
211 tx, err := d.Begin()
212 if err != nil {
213 return err
214 }
215 defer tx.Rollback()
216
217 // update appview's cache
218 err = db.UpdateRepoLanguages(tx, repo.RepoAt(), ref.Short(), langs)
219 if err != nil {
220 fmt.Printf("failed; %s\n", err)
221 // non-fatal
222 }
223
224 return tx.Commit()
225}
226
227func ingestPipeline(d *db.DB, source ec.Source, msg ec.Message) error {
228 var record tangled.Pipeline
229 err := json.Unmarshal(msg.EventJson, &record)
230 if err != nil {
231 return err
232 }
233
234 if record.TriggerMetadata == nil {
235 return fmt.Errorf("empty trigger metadata: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
236 }
237
238 if record.TriggerMetadata.Repo == nil {
239 return fmt.Errorf("empty repo: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
240 }
241
242 if record.TriggerMetadata.Repo.RepoDid == "" {
243 return fmt.Errorf("pipeline missing repoDid: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
244 }
245
246 repo, lookupErr := db.GetRepoByDid(d, record.TriggerMetadata.Repo.RepoDid)
247 if lookupErr != nil {
248 return fmt.Errorf("failed to look up repo by DID %s: %w", record.TriggerMetadata.Repo.RepoDid, lookupErr)
249 }
250 repos := []models.Repo{*repo}
251 if repos[0].Spindle == "" {
252 return fmt.Errorf("repo does not have a spindle configured yet: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
253 }
254
255 // trigger info
256 var trigger models.Trigger
257 var sha string
258 trigger.Kind = workflow.TriggerKind(record.TriggerMetadata.Kind)
259 switch trigger.Kind {
260 case workflow.TriggerKindPush:
261 trigger.PushRef = &record.TriggerMetadata.Push.Ref
262 trigger.PushNewSha = &record.TriggerMetadata.Push.NewSha
263 trigger.PushOldSha = &record.TriggerMetadata.Push.OldSha
264 sha = *trigger.PushNewSha
265 case workflow.TriggerKindPullRequest:
266 trigger.PRSourceBranch = &record.TriggerMetadata.PullRequest.SourceBranch
267 trigger.PRTargetBranch = &record.TriggerMetadata.PullRequest.TargetBranch
268 trigger.PRSourceSha = &record.TriggerMetadata.PullRequest.SourceSha
269 trigger.PRAction = &record.TriggerMetadata.PullRequest.Action
270 sha = *trigger.PRSourceSha
271 }
272
273 tx, err := d.Begin()
274 if err != nil {
275 return fmt.Errorf("failed to start txn: %w", err)
276 }
277
278 triggerId, err := db.AddTrigger(tx, trigger)
279 if err != nil {
280 return fmt.Errorf("failed to add trigger entry: %w", err)
281 }
282
283 pipeline := models.Pipeline{
284 Rkey: msg.Rkey,
285 Knot: source.Key(),
286 RepoOwner: syntax.DID(record.TriggerMetadata.Repo.Did),
287 RepoName: record.TriggerMetadata.Repo.Repo,
288 RepoDid: repos[0].RepoDid,
289 TriggerId: int(triggerId),
290 Sha: sha,
291 }
292
293 err = db.AddPipeline(tx, pipeline)
294 if err != nil {
295 return fmt.Errorf("failed to add pipeline: %w", err)
296 }
297
298 err = tx.Commit()
299 if err != nil {
300 return fmt.Errorf("failed to commit txn: %w", err)
301 }
302
303 return nil
304}