this repo has no description
1package state
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "slices"
9 "time"
10
11 "tangled.org/core/api/tangled"
12 "tangled.org/core/appview/cache"
13 "tangled.org/core/appview/config"
14 "tangled.org/core/appview/db"
15 "tangled.org/core/appview/models"
16 ec "tangled.org/core/eventconsumer"
17 "tangled.org/core/eventconsumer/cursor"
18 "tangled.org/core/log"
19 "tangled.org/core/orm"
20 "tangled.org/core/rbac"
21
22 "github.com/go-git/go-git/v5/plumbing"
23 "github.com/posthog/posthog-go"
24)
25
26func Knotstream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client) (*ec.Consumer, error) {
27 logger := log.FromContext(ctx)
28 logger = log.SubLogger(logger, "knotstream")
29
30 knots, err := db.GetRegistrations(
31 d,
32 orm.FilterIsNot("registered", "null"),
33 )
34 if err != nil {
35 return nil, err
36 }
37
38 srcs := make(map[ec.Source]struct{})
39 for _, k := range knots {
40 s := ec.NewKnotSource(k.Domain)
41 srcs[s] = struct{}{}
42 }
43
44 cache := cache.New(c.Redis.Addr)
45 cursorStore := cursor.NewRedisCursorStore(cache)
46
47 cfg := ec.ConsumerConfig{
48 Sources: srcs,
49 ProcessFunc: knotIngester(d, enforcer, posthog, c.Core.Dev),
50 RetryInterval: c.Knotstream.RetryInterval,
51 MaxRetryInterval: c.Knotstream.MaxRetryInterval,
52 ConnectionTimeout: c.Knotstream.ConnectionTimeout,
53 WorkerCount: c.Knotstream.WorkerCount,
54 QueueSize: c.Knotstream.QueueSize,
55 Logger: logger,
56 Dev: c.Core.Dev,
57 CursorStore: &cursorStore,
58 }
59
60 return ec.NewConsumer(cfg), nil
61}
62
63func knotIngester(d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, dev bool) ec.ProcessFunc {
64 return func(ctx context.Context, source ec.Source, msg ec.Message) error {
65 switch msg.Nsid {
66 case tangled.GitRefUpdateNSID:
67 return ingestRefUpdate(d, enforcer, posthog, dev, source, msg)
68 }
69
70 return nil
71 }
72}
73
74func ingestRefUpdate(d *db.DB, enforcer *rbac.Enforcer, pc posthog.Client, dev bool, source ec.Source, msg ec.Message) error {
75 var record tangled.GitRefUpdate
76 err := json.Unmarshal(msg.EventJson, &record)
77 if err != nil {
78 return err
79 }
80
81 knownKnots, err := enforcer.GetKnotsForUser(record.CommitterDid)
82 if err != nil {
83 return err
84 }
85 if !slices.Contains(knownKnots, source.Key()) {
86 return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Key())
87 }
88
89 err1 := populatePunchcard(d, record)
90 err2 := updateRepoLanguages(d, record)
91
92 var err3 error
93 if !dev {
94 err3 = pc.Enqueue(posthog.Capture{
95 DistinctId: record.CommitterDid,
96 Event: "git_ref_update",
97 })
98 }
99
100 return errors.Join(err1, err2, err3)
101}
102
103func populatePunchcard(d *db.DB, record tangled.GitRefUpdate) error {
104 knownEmails, err := db.GetAllEmails(d, record.CommitterDid)
105 if err != nil {
106 return err
107 }
108
109 count := 0
110 for _, ke := range knownEmails {
111 if record.Meta == nil {
112 continue
113 }
114 if record.Meta.CommitCount == nil {
115 continue
116 }
117 for _, ce := range record.Meta.CommitCount.ByEmail {
118 if ce == nil {
119 continue
120 }
121 if ce.Email == ke.Address {
122 count += int(ce.Count)
123 }
124 }
125 }
126
127 punch := models.Punch{
128 Did: record.CommitterDid,
129 Date: time.Now(),
130 Count: count,
131 }
132 return db.AddPunch(d, punch)
133}
134
135func updateRepoLanguages(d *db.DB, record tangled.GitRefUpdate) error {
136 if record.Meta == nil || record.Meta.LangBreakdown == nil || record.Meta.LangBreakdown.Inputs == nil {
137 return fmt.Errorf("empty language data for repo: %s/%s", record.RepoDid, record.RepoName)
138 }
139
140 repos, err := db.GetRepos(
141 d,
142 0,
143 orm.FilterEq("did", record.RepoDid),
144 orm.FilterEq("name", record.RepoName),
145 )
146 if err != nil {
147 return fmt.Errorf("failed to look for repo in DB (%s/%s): %w", record.RepoDid, record.RepoName, err)
148 }
149 if len(repos) != 1 {
150 return fmt.Errorf("incorrect number of repos returned: %d (expected 1)", len(repos))
151 }
152 repo := repos[0]
153
154 ref := plumbing.ReferenceName(record.Ref)
155 if !ref.IsBranch() {
156 return fmt.Errorf("%s is not a valid reference name", ref)
157 }
158
159 var langs []models.RepoLanguage
160 for _, l := range record.Meta.LangBreakdown.Inputs {
161 if l == nil {
162 continue
163 }
164
165 langs = append(langs, models.RepoLanguage{
166 RepoAt: repo.RepoAt(),
167 Ref: ref.Short(),
168 IsDefaultRef: record.Meta.IsDefaultRef,
169 Language: l.Lang,
170 Bytes: l.Size,
171 })
172 }
173
174 tx, err := d.Begin()
175 if err != nil {
176 return err
177 }
178 defer tx.Rollback()
179
180 // update appview's cache
181 err = db.UpdateRepoLanguages(tx, repo.RepoAt(), ref.Short(), langs)
182 if err != nil {
183 fmt.Printf("failed; %s\n", err)
184 // non-fatal
185 }
186
187 return tx.Commit()
188}