Monorepo for Tangled
1package state
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "slices"
10 "time"
11
12 "tangled.org/core/appview/notify"
13
14 "tangled.org/core/api/tangled"
15 knotdb "tangled.org/core/knotserver/db"
16 "tangled.org/core/appview/cache"
17 "tangled.org/core/appview/config"
18 "tangled.org/core/appview/db"
19 "tangled.org/core/appview/models"
20 ec "tangled.org/core/eventconsumer"
21 "tangled.org/core/eventconsumer/cursor"
22 "tangled.org/core/log"
23 "tangled.org/core/orm"
24 "tangled.org/core/rbac"
25 "tangled.org/core/workflow"
26
27 "github.com/bluesky-social/indigo/atproto/syntax"
28 "github.com/go-git/go-git/v5/plumbing"
29 "github.com/posthog/posthog-go"
30)
31
32func Knotstream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, notifier notify.Notifier) (*ec.Consumer, error) {
33 logger := log.FromContext(ctx)
34 logger = log.SubLogger(logger, "knotstream")
35
36 knots, err := db.GetRegistrations(
37 d,
38 orm.FilterIsNot("registered", "null"),
39 )
40 if err != nil {
41 return nil, err
42 }
43
44 srcs := make(map[ec.Source]struct{})
45 for _, k := range knots {
46 s := ec.NewKnotSource(k.Domain)
47 srcs[s] = struct{}{}
48 }
49
50 cache := cache.New(c.Redis.Addr)
51 cursorStore := cursor.NewRedisCursorStore(cache)
52
53 cfg := ec.ConsumerConfig{
54 Sources: srcs,
55 ProcessFunc: knotIngester(d, enforcer, posthog, notifier, c.Core.Dev),
56 RetryInterval: c.Knotstream.RetryInterval,
57 MaxRetryInterval: c.Knotstream.MaxRetryInterval,
58 ConnectionTimeout: c.Knotstream.ConnectionTimeout,
59 WorkerCount: c.Knotstream.WorkerCount,
60 QueueSize: c.Knotstream.QueueSize,
61 Logger: logger,
62 Dev: c.Core.Dev,
63 CursorStore: &cursorStore,
64 }
65
66 return ec.NewConsumer(cfg), nil
67}
68
69func knotIngester(d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, notifier notify.Notifier, dev bool) ec.ProcessFunc {
70 return func(ctx context.Context, source ec.Source, msg ec.Message) error {
71 switch msg.Nsid {
72 case tangled.GitRefUpdateNSID:
73 return ingestRefUpdate(d, enforcer, posthog, notifier, dev, source, msg, ctx)
74 case tangled.PipelineNSID:
75 return ingestPipeline(d, source, msg)
76 case knotdb.RepoDIDAssignNSID:
77 return ingestDIDAssign(d, enforcer, source, msg, ctx)
78 }
79
80 return nil
81 }
82}
83
84func ingestRefUpdate(d *db.DB, enforcer *rbac.Enforcer, pc posthog.Client, notifier notify.Notifier, dev bool, source ec.Source, msg ec.Message, ctx context.Context) error {
85 logger := log.FromContext(ctx)
86
87 var record tangled.GitRefUpdate
88 err := json.Unmarshal(msg.EventJson, &record)
89 if err != nil {
90 return err
91 }
92
93 if record.RepoDid == "" {
94 logger.Error("gitRefUpdate missing repoDid, skipping", "owner_did", record.OwnerDid, "repo_name", record.RepoName)
95 return fmt.Errorf("gitRefUpdate missing repoDid")
96 }
97
98 knownKnots, err := enforcer.GetKnotsForUser(record.CommitterDid)
99 if err != nil {
100 return err
101 }
102 if !slices.Contains(knownKnots, source.Key()) {
103 return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Key())
104 }
105
106 logger.Info("processing gitRefUpdate event",
107 "repo_did", record.RepoDid,
108 "ref", record.Ref,
109 "old_sha", record.OldSha,
110 "new_sha", record.NewSha)
111
112 var errWebhook error
113
114 repo, lookupErr := db.GetRepoByDid(d, record.RepoDid)
115 if lookupErr != nil && !errors.Is(lookupErr, sql.ErrNoRows) {
116 return fmt.Errorf("failed to look up repo by DID %s: %w", record.RepoDid, lookupErr)
117 }
118
119 var repos []models.Repo
120 if lookupErr == nil {
121 repos = []models.Repo{*repo}
122 }
123
124 if errWebhook == nil && len(repos) == 1 {
125 notifier.Push(ctx, &repos[0], record.Ref, record.OldSha, record.NewSha, record.CommitterDid)
126 }
127
128 errPunchcard := populatePunchcard(d, record)
129 errLanguages := updateRepoLanguages(d, record)
130
131 var errPosthog error
132 if !dev && record.CommitterDid != "" {
133 errPosthog = pc.Enqueue(posthog.Capture{
134 DistinctId: record.CommitterDid,
135 Event: "git_ref_update",
136 })
137 }
138
139 return errors.Join(errWebhook, errPunchcard, errLanguages, errPosthog)
140}
141
142func populatePunchcard(d *db.DB, record tangled.GitRefUpdate) error {
143 if record.CommitterDid == "" {
144 return nil
145 }
146
147 knownEmails, err := db.GetAllEmails(d, record.CommitterDid)
148 if err != nil {
149 return err
150 }
151
152 count := 0
153 for _, ke := range knownEmails {
154 if record.Meta == nil {
155 continue
156 }
157 if record.Meta.CommitCount == nil {
158 continue
159 }
160 for _, ce := range record.Meta.CommitCount.ByEmail {
161 if ce == nil {
162 continue
163 }
164 if ce.Email == ke.Address || ce.Email == record.CommitterDid {
165 count += int(ce.Count)
166 }
167 }
168 }
169
170 punch := models.Punch{
171 Did: record.CommitterDid,
172 Date: time.Now(),
173 Count: count,
174 }
175 return db.AddPunch(d, punch)
176}
177
178func updateRepoLanguages(d *db.DB, record tangled.GitRefUpdate) error {
179 if record.Meta == nil || record.Meta.LangBreakdown == nil || record.Meta.LangBreakdown.Inputs == nil {
180 return fmt.Errorf("empty language data for repo: %s/%s", record.OwnerDid, record.RepoName)
181 }
182
183 if record.RepoDid == "" {
184 return fmt.Errorf("gitRefUpdate missing repoDid for language update")
185 }
186
187 r, lookupErr := db.GetRepoByDid(d, record.RepoDid)
188 if lookupErr != nil {
189 return fmt.Errorf("failed to look up repo by DID %s: %w", record.RepoDid, lookupErr)
190 }
191 repo := *r
192
193 ref := plumbing.ReferenceName(record.Ref)
194 if !ref.IsBranch() {
195 return fmt.Errorf("%s is not a valid reference name", ref)
196 }
197
198 var langs []models.RepoLanguage
199 for _, l := range record.Meta.LangBreakdown.Inputs {
200 if l == nil {
201 continue
202 }
203
204 langs = append(langs, models.RepoLanguage{
205 RepoAt: repo.RepoAt(),
206 RepoDid: repo.RepoDid,
207 Ref: ref.Short(),
208 IsDefaultRef: record.Meta.IsDefaultRef,
209 Language: l.Lang,
210 Bytes: l.Size,
211 })
212 }
213
214 tx, err := d.Begin()
215 if err != nil {
216 return err
217 }
218 defer tx.Rollback()
219
220 // update appview's cache
221 err = db.UpdateRepoLanguages(tx, repo.RepoAt(), ref.Short(), langs)
222 if err != nil {
223 fmt.Printf("failed; %s\n", err)
224 // non-fatal
225 }
226
227 return tx.Commit()
228}
229
230func ingestPipeline(d *db.DB, source ec.Source, msg ec.Message) error {
231 var record tangled.Pipeline
232 err := json.Unmarshal(msg.EventJson, &record)
233 if err != nil {
234 return err
235 }
236
237 if record.TriggerMetadata == nil {
238 return fmt.Errorf("empty trigger metadata: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
239 }
240
241 if record.TriggerMetadata.Repo == nil {
242 return fmt.Errorf("empty repo: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
243 }
244
245 if record.TriggerMetadata.Repo.RepoDid == "" {
246 return fmt.Errorf("pipeline missing repoDid: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
247 }
248
249 repo, lookupErr := db.GetRepoByDid(d, record.TriggerMetadata.Repo.RepoDid)
250 if lookupErr != nil {
251 return fmt.Errorf("failed to look up repo by DID %s: %w", record.TriggerMetadata.Repo.RepoDid, lookupErr)
252 }
253 repos := []models.Repo{*repo}
254 if repos[0].Spindle == "" {
255 return fmt.Errorf("repo does not have a spindle configured yet: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
256 }
257
258 // trigger info
259 var trigger models.Trigger
260 var sha string
261 trigger.Kind = workflow.TriggerKind(record.TriggerMetadata.Kind)
262 switch trigger.Kind {
263 case workflow.TriggerKindPush:
264 trigger.PushRef = &record.TriggerMetadata.Push.Ref
265 trigger.PushNewSha = &record.TriggerMetadata.Push.NewSha
266 trigger.PushOldSha = &record.TriggerMetadata.Push.OldSha
267 sha = *trigger.PushNewSha
268 case workflow.TriggerKindPullRequest:
269 trigger.PRSourceBranch = &record.TriggerMetadata.PullRequest.SourceBranch
270 trigger.PRTargetBranch = &record.TriggerMetadata.PullRequest.TargetBranch
271 trigger.PRSourceSha = &record.TriggerMetadata.PullRequest.SourceSha
272 trigger.PRAction = &record.TriggerMetadata.PullRequest.Action
273 sha = *trigger.PRSourceSha
274 }
275
276 tx, err := d.Begin()
277 if err != nil {
278 return fmt.Errorf("failed to start txn: %w", err)
279 }
280
281 triggerId, err := db.AddTrigger(tx, trigger)
282 if err != nil {
283 return fmt.Errorf("failed to add trigger entry: %w", err)
284 }
285
286 pipeline := models.Pipeline{
287 Rkey: msg.Rkey,
288 Knot: source.Key(),
289 RepoOwner: syntax.DID(record.TriggerMetadata.Repo.Did),
290 RepoName: record.TriggerMetadata.Repo.Repo,
291 RepoDid: repos[0].RepoDid,
292 TriggerId: int(triggerId),
293 Sha: sha,
294 }
295
296 err = db.AddPipeline(tx, pipeline)
297 if err != nil {
298 return fmt.Errorf("failed to add pipeline: %w", err)
299 }
300
301 err = tx.Commit()
302 if err != nil {
303 return fmt.Errorf("failed to commit txn: %w", err)
304 }
305
306 return nil
307}
308
309func ingestDIDAssign(d *db.DB, enforcer *rbac.Enforcer, source ec.Source, msg ec.Message, ctx context.Context) error {
310 logger := log.FromContext(ctx)
311
312 var record knotdb.RepoDIDAssign
313 if err := json.Unmarshal(msg.EventJson, &record); err != nil {
314 return fmt.Errorf("unmarshal didAssign: %w", err)
315 }
316
317 if record.RepoDid == "" || record.OwnerDid == "" || record.RepoName == "" {
318 return fmt.Errorf("didAssign missing required fields: repoDid=%q ownerDid=%q repoName=%q",
319 record.RepoDid, record.OwnerDid, record.RepoName)
320 }
321
322 logger.Info("processing didAssign event",
323 "repo_did", record.RepoDid,
324 "owner_did", record.OwnerDid,
325 "repo_name", record.RepoName)
326
327 repos, err := db.GetRepos(d, 1,
328 orm.FilterEq("did", record.OwnerDid),
329 orm.FilterEq("name", record.RepoName),
330 )
331 if err != nil || len(repos) == 0 {
332 logger.Warn("didAssign for unknown repo, skipping",
333 "owner_did", record.OwnerDid,
334 "repo_name", record.RepoName)
335 return nil
336 }
337 repo := repos[0]
338 repoAtUri := repo.RepoAt().String()
339 knot := source.Key()
340 legacyResource := record.OwnerDid + "/" + record.RepoName
341
342 if repo.RepoDid != record.RepoDid {
343 tx, err := d.Begin()
344 if err != nil {
345 return fmt.Errorf("begin didAssign txn: %w", err)
346 }
347 defer tx.Rollback()
348
349 if err := db.CascadeRepoDid(tx, repoAtUri, record.RepoDid); err != nil {
350 return fmt.Errorf("cascade repo_did: %w", err)
351 }
352
353 if err := db.EnqueuePdsRewritesForRepo(tx, record.RepoDid, repoAtUri); err != nil {
354 return fmt.Errorf("enqueue pds rewrites: %w", err)
355 }
356
357 if err := tx.Commit(); err != nil {
358 return fmt.Errorf("commit didAssign txn: %w", err)
359 }
360 }
361
362 if err := enforcer.RemoveRepo(record.OwnerDid, knot, legacyResource); err != nil {
363 return fmt.Errorf("remove legacy RBAC policies for %s: %w", legacyResource, err)
364 }
365 if err := enforcer.AddRepo(record.OwnerDid, knot, record.RepoDid); err != nil {
366 return fmt.Errorf("add RBAC policies for %s: %w", record.RepoDid, err)
367 }
368
369 collabs, collabErr := db.GetCollaborators(d, orm.FilterEq("repo_at", repoAtUri))
370 if collabErr != nil {
371 return fmt.Errorf("get collaborators for RBAC update: %w", collabErr)
372 }
373 for _, c := range collabs {
374 collabDid := c.SubjectDid.String()
375 if err := enforcer.RemoveCollaborator(collabDid, knot, legacyResource); err != nil {
376 return fmt.Errorf("remove collaborator RBAC for %s: %w", collabDid, err)
377 }
378 if err := enforcer.AddCollaborator(collabDid, knot, record.RepoDid); err != nil {
379 return fmt.Errorf("add collaborator RBAC for %s: %w", collabDid, err)
380 }
381 }
382
383 logger.Info("didAssign processed successfully",
384 "repo_did", record.RepoDid,
385 "owner_did", record.OwnerDid,
386 "repo_name", record.RepoName)
387
388 return nil
389}