Monorepo for Tangled
1package state
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "slices"
10 "time"
11
12 "tangled.org/core/appview/notify"
13
14 "tangled.org/core/api/tangled"
15 "tangled.org/core/appview/cache"
16 "tangled.org/core/appview/config"
17 "tangled.org/core/appview/db"
18 "tangled.org/core/appview/models"
19 ec "tangled.org/core/eventconsumer"
20 "tangled.org/core/eventconsumer/cursor"
21 "tangled.org/core/log"
22 "tangled.org/core/orm"
23 "tangled.org/core/rbac"
24 "tangled.org/core/workflow"
25
26 "github.com/bluesky-social/indigo/atproto/syntax"
27 "github.com/go-git/go-git/v5/plumbing"
28 "github.com/posthog/posthog-go"
29)
30
31func Knotstream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, notifier notify.Notifier) (*ec.Consumer, error) {
32 logger := log.FromContext(ctx)
33 logger = log.SubLogger(logger, "knotstream")
34
35 knots, err := db.GetRegistrations(
36 d,
37 orm.FilterIsNot("registered", "null"),
38 )
39 if err != nil {
40 return nil, err
41 }
42
43 srcs := make(map[ec.Source]struct{})
44 for _, k := range knots {
45 s := ec.NewKnotSource(k.Domain)
46 srcs[s] = struct{}{}
47 }
48
49 cache := cache.New(c.Redis.Addr)
50 cursorStore := cursor.NewRedisCursorStore(cache)
51
52 cfg := ec.ConsumerConfig{
53 Sources: srcs,
54 ProcessFunc: knotIngester(d, enforcer, posthog, notifier, c.Core.Dev),
55 RetryInterval: c.Knotstream.RetryInterval,
56 MaxRetryInterval: c.Knotstream.MaxRetryInterval,
57 ConnectionTimeout: c.Knotstream.ConnectionTimeout,
58 WorkerCount: c.Knotstream.WorkerCount,
59 QueueSize: c.Knotstream.QueueSize,
60 Logger: logger,
61 Dev: c.Core.Dev,
62 CursorStore: &cursorStore,
63 }
64
65 return ec.NewConsumer(cfg), nil
66}
67
68func knotIngester(d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, notifier notify.Notifier, dev bool) ec.ProcessFunc {
69 return func(ctx context.Context, source ec.Source, msg ec.Message) error {
70 switch msg.Nsid {
71 case tangled.GitRefUpdateNSID:
72 return ingestRefUpdate(d, enforcer, posthog, notifier, dev, source, msg, ctx)
73 case tangled.PipelineNSID:
74 return ingestPipeline(d, source, msg)
75 }
76
77 return nil
78 }
79}
80
81func ingestRefUpdate(d *db.DB, enforcer *rbac.Enforcer, pc posthog.Client, notifier notify.Notifier, dev bool, source ec.Source, msg ec.Message, ctx context.Context) error {
82 logger := log.FromContext(ctx)
83
84 var record tangled.GitRefUpdate
85 err := json.Unmarshal(msg.EventJson, &record)
86 if err != nil {
87 return err
88 }
89
90 // backward compat: old knots emit owner DID as "repoDid" with no "ownerDid" field
91 if record.OwnerDid == "" && record.RepoDid != nil && *record.RepoDid != "" {
92 record.OwnerDid = *record.RepoDid
93 record.RepoDid = nil
94 }
95
96 knownKnots, err := enforcer.GetKnotsForUser(record.CommitterDid)
97 if err != nil {
98 return err
99 }
100 if !slices.Contains(knownKnots, source.Key()) {
101 return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Key())
102 }
103
104 logger.Info("processing gitRefUpdate event",
105 "owner_did", record.OwnerDid,
106 "repo_name", record.RepoName,
107 "repo_did", record.RepoDid,
108 "ref", record.Ref,
109 "old_sha", record.OldSha,
110 "new_sha", record.NewSha)
111
112 var errWebhook error
113 var repos []models.Repo
114
115 if record.RepoDid != nil && *record.RepoDid != "" {
116 repo, lookupErr := db.GetRepoByDid(d, *record.RepoDid)
117 switch {
118 case lookupErr == nil:
119 repos = []models.Repo{*repo}
120 case !errors.Is(lookupErr, sql.ErrNoRows):
121 logger.Warn("failed to look up repo by DID", "err", lookupErr, "repoDid", *record.RepoDid)
122 }
123 }
124
125 if len(repos) == 0 {
126 repos, err = db.GetRepos(
127 d,
128 0,
129 orm.FilterEq("did", record.OwnerDid),
130 orm.FilterEq("name", record.RepoName),
131 )
132 if err != nil {
133 errWebhook = fmt.Errorf("failed to lookup repo for webhooks: %w", err)
134 }
135 }
136
137 if errWebhook == nil && len(repos) == 1 {
138 notifier.Push(ctx, &repos[0], record.Ref, record.OldSha, record.NewSha, record.CommitterDid)
139 }
140
141 errPunchcard := populatePunchcard(d, record)
142 errLanguages := updateRepoLanguages(d, record)
143
144 var errPosthog error
145 if !dev && record.CommitterDid != "" {
146 errPosthog = pc.Enqueue(posthog.Capture{
147 DistinctId: record.CommitterDid,
148 Event: "git_ref_update",
149 })
150 }
151
152 return errors.Join(errWebhook, errPunchcard, errLanguages, errPosthog)
153}
154
155func populatePunchcard(d *db.DB, record tangled.GitRefUpdate) error {
156 if record.CommitterDid == "" {
157 return nil
158 }
159
160 knownEmails, err := db.GetAllEmails(d, record.CommitterDid)
161 if err != nil {
162 return err
163 }
164
165 count := 0
166 for _, ke := range knownEmails {
167 if record.Meta == nil {
168 continue
169 }
170 if record.Meta.CommitCount == nil {
171 continue
172 }
173 for _, ce := range record.Meta.CommitCount.ByEmail {
174 if ce == nil {
175 continue
176 }
177 if ce.Email == ke.Address || ce.Email == record.CommitterDid {
178 count += int(ce.Count)
179 }
180 }
181 }
182
183 punch := models.Punch{
184 Did: record.CommitterDid,
185 Date: time.Now(),
186 Count: count,
187 }
188 return db.AddPunch(d, punch)
189}
190
191func updateRepoLanguages(d *db.DB, record tangled.GitRefUpdate) error {
192 if record.Meta == nil || record.Meta.LangBreakdown == nil || record.Meta.LangBreakdown.Inputs == nil {
193 return fmt.Errorf("empty language data for repo: %s/%s", record.OwnerDid, record.RepoName)
194 }
195
196 var repo models.Repo
197 if record.RepoDid != nil && *record.RepoDid != "" {
198 r, lookupErr := db.GetRepoByDid(d, *record.RepoDid)
199 switch {
200 case lookupErr == nil:
201 repo = *r
202 case !errors.Is(lookupErr, sql.ErrNoRows):
203 return fmt.Errorf("failed to look up repo by DID %s: %w", *record.RepoDid, lookupErr)
204 }
205 }
206
207 if repo.Id == 0 {
208 repos, err := db.GetRepos(
209 d,
210 0,
211 orm.FilterEq("did", record.OwnerDid),
212 orm.FilterEq("name", record.RepoName),
213 )
214 if err != nil {
215 return fmt.Errorf("failed to look for repo in DB (%s/%s): %w", record.OwnerDid, record.RepoName, err)
216 }
217 if len(repos) != 1 {
218 return fmt.Errorf("incorrect number of repos returned: %d (expected 1)", len(repos))
219 }
220 repo = repos[0]
221 }
222
223 ref := plumbing.ReferenceName(record.Ref)
224 if !ref.IsBranch() {
225 return fmt.Errorf("%s is not a valid reference name", ref)
226 }
227
228 var langs []models.RepoLanguage
229 for _, l := range record.Meta.LangBreakdown.Inputs {
230 if l == nil {
231 continue
232 }
233
234 langs = append(langs, models.RepoLanguage{
235 RepoAt: repo.RepoAt(),
236 Ref: ref.Short(),
237 IsDefaultRef: record.Meta.IsDefaultRef,
238 Language: l.Lang,
239 Bytes: l.Size,
240 })
241 }
242
243 tx, err := d.Begin()
244 if err != nil {
245 return err
246 }
247 defer tx.Rollback()
248
249 // update appview's cache
250 err = db.UpdateRepoLanguages(tx, repo.RepoAt(), ref.Short(), langs)
251 if err != nil {
252 fmt.Printf("failed; %s\n", err)
253 // non-fatal
254 }
255
256 return tx.Commit()
257}
258
259func ingestPipeline(d *db.DB, source ec.Source, msg ec.Message) error {
260 var record tangled.Pipeline
261 err := json.Unmarshal(msg.EventJson, &record)
262 if err != nil {
263 return err
264 }
265
266 if record.TriggerMetadata == nil {
267 return fmt.Errorf("empty trigger metadata: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
268 }
269
270 if record.TriggerMetadata.Repo == nil {
271 return fmt.Errorf("empty repo: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
272 }
273
274 // does this repo have a spindle configured?
275 var repos []models.Repo
276 if record.TriggerMetadata.Repo.RepoDid != nil && *record.TriggerMetadata.Repo.RepoDid != "" {
277 repo, lookupErr := db.GetRepoByDid(d, *record.TriggerMetadata.Repo.RepoDid)
278 switch {
279 case lookupErr == nil:
280 repos = []models.Repo{*repo}
281 case !errors.Is(lookupErr, sql.ErrNoRows):
282 return fmt.Errorf("failed to look up repo by DID %s: %w", *record.TriggerMetadata.Repo.RepoDid, lookupErr)
283 }
284 }
285
286 if len(repos) == 0 {
287 var err error
288 repos, err = db.GetRepos(
289 d,
290 0,
291 orm.FilterEq("did", record.TriggerMetadata.Repo.Did),
292 orm.FilterEq("name", record.TriggerMetadata.Repo.Repo),
293 )
294 if err != nil {
295 return fmt.Errorf("failed to look for repo in DB: nsid %s, rkey %s, %w", msg.Nsid, msg.Rkey, err)
296 }
297 }
298
299 if len(repos) != 1 {
300 return fmt.Errorf("incorrect number of repos returned: %d (expected 1)", len(repos))
301 }
302 if repos[0].Spindle == "" {
303 return fmt.Errorf("repo does not have a spindle configured yet: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
304 }
305
306 // trigger info
307 var trigger models.Trigger
308 var sha string
309 trigger.Kind = workflow.TriggerKind(record.TriggerMetadata.Kind)
310 switch trigger.Kind {
311 case workflow.TriggerKindPush:
312 trigger.PushRef = &record.TriggerMetadata.Push.Ref
313 trigger.PushNewSha = &record.TriggerMetadata.Push.NewSha
314 trigger.PushOldSha = &record.TriggerMetadata.Push.OldSha
315 sha = *trigger.PushNewSha
316 case workflow.TriggerKindPullRequest:
317 trigger.PRSourceBranch = &record.TriggerMetadata.PullRequest.SourceBranch
318 trigger.PRTargetBranch = &record.TriggerMetadata.PullRequest.TargetBranch
319 trigger.PRSourceSha = &record.TriggerMetadata.PullRequest.SourceSha
320 trigger.PRAction = &record.TriggerMetadata.PullRequest.Action
321 sha = *trigger.PRSourceSha
322 }
323
324 tx, err := d.Begin()
325 if err != nil {
326 return fmt.Errorf("failed to start txn: %w", err)
327 }
328
329 triggerId, err := db.AddTrigger(tx, trigger)
330 if err != nil {
331 return fmt.Errorf("failed to add trigger entry: %w", err)
332 }
333
334 pipeline := models.Pipeline{
335 Rkey: msg.Rkey,
336 Knot: source.Key(),
337 RepoOwner: syntax.DID(record.TriggerMetadata.Repo.Did),
338 RepoName: record.TriggerMetadata.Repo.Repo,
339 TriggerId: int(triggerId),
340 Sha: sha,
341 }
342
343 err = db.AddPipeline(tx, pipeline)
344 if err != nil {
345 return fmt.Errorf("failed to add pipeline: %w", err)
346 }
347
348 err = tx.Commit()
349 if err != nil {
350 return fmt.Errorf("failed to commit txn: %w", err)
351 }
352
353 return nil
354}