this repo has no description
1package state
2
3import (
4 "encoding/json"
5 "fmt"
6 "slices"
7 "time"
8
9 "tangled.sh/tangled.sh/core/api/tangled"
10 "tangled.sh/tangled.sh/core/appview/cache"
11 "tangled.sh/tangled.sh/core/appview/config"
12 "tangled.sh/tangled.sh/core/appview/db"
13 kc "tangled.sh/tangled.sh/core/knotclient"
14 "tangled.sh/tangled.sh/core/log"
15 "tangled.sh/tangled.sh/core/rbac"
16)
17
18func KnotstreamConsumer(c *config.Config, d *db.DB, enforcer *rbac.Enforcer) (*kc.EventConsumer, error) {
19 knots, err := db.GetCompletedRegistrations(d)
20 if err != nil {
21 return nil, err
22 }
23
24 srcs := make(map[kc.EventSource]struct{})
25 for _, k := range knots {
26 s := kc.EventSource{k}
27 srcs[s] = struct{}{}
28 }
29
30 logger := log.New("knotstream")
31 cache := cache.New(c.Redis.Addr)
32 cursorStore := kc.NewRedisCursorStore(cache)
33
34 cfg := kc.ConsumerConfig{
35 Sources: srcs,
36 ProcessFunc: knotstreamIngester(d, enforcer),
37 RetryInterval: c.Knotstream.RetryInterval,
38 MaxRetryInterval: c.Knotstream.MaxRetryInterval,
39 ConnectionTimeout: c.Knotstream.ConnectionTimeout,
40 WorkerCount: c.Knotstream.WorkerCount,
41 QueueSize: c.Knotstream.QueueSize,
42 Logger: logger,
43 Dev: c.Core.Dev,
44 CursorStore: &cursorStore,
45 }
46
47 return kc.NewEventConsumer(cfg), nil
48}
49
50func knotstreamIngester(d *db.DB, enforcer *rbac.Enforcer) kc.ProcessFunc {
51 return func(source kc.EventSource, msg kc.Message) error {
52 switch msg.Nsid {
53 case tangled.GitRefUpdateNSID:
54 return ingestRefUpdate(d, enforcer, source, msg)
55 case tangled.PipelineNSID:
56 // TODO
57 }
58
59 return nil
60 }
61}
62
63func ingestRefUpdate(d *db.DB, enforcer *rbac.Enforcer, source kc.EventSource, msg kc.Message) error {
64 var record tangled.GitRefUpdate
65 err := json.Unmarshal(msg.EventJson, &record)
66 if err != nil {
67 return err
68 }
69
70 knownKnots, err := enforcer.GetDomainsForUser(record.CommitterDid)
71 if err != nil {
72 return err
73 }
74 if !slices.Contains(knownKnots, source.Knot) {
75 return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Knot)
76 }
77
78 knownEmails, err := db.GetAllEmails(d, record.CommitterDid)
79 if err != nil {
80 return err
81 }
82 count := 0
83 for _, ke := range knownEmails {
84 if record.Meta == nil {
85 continue
86 }
87 if record.Meta.CommitCount == nil {
88 continue
89 }
90 for _, ce := range record.Meta.CommitCount.ByEmail {
91 if ce == nil {
92 continue
93 }
94 if ce.Email == ke.Address {
95 count += int(ce.Count)
96 }
97 }
98 }
99
100 punch := db.Punch{
101 Did: record.CommitterDid,
102 Date: time.Now(),
103 Count: count,
104 }
105 if err := db.AddPunch(d, punch); err != nil {
106 return err
107 }
108
109 return nil
110}