this repo has no description
1package state
2
3import (
4 "encoding/json"
5 "fmt"
6 "slices"
7 "time"
8
9 "tangled.sh/tangled.sh/core/api/tangled"
10 "tangled.sh/tangled.sh/core/appview/cache"
11 "tangled.sh/tangled.sh/core/appview/config"
12 "tangled.sh/tangled.sh/core/appview/db"
13 kc "tangled.sh/tangled.sh/core/knotclient"
14 "tangled.sh/tangled.sh/core/log"
15 "tangled.sh/tangled.sh/core/rbac"
16)
17
18func KnotstreamConsumer(c *config.Config, d *db.DB, enforcer *rbac.Enforcer) (*kc.EventConsumer, error) {
19 knots, err := db.GetCompletedRegistrations(d)
20 if err != nil {
21 return nil, err
22 }
23
24 srcs := make(map[kc.EventSource]struct{})
25 for _, k := range knots {
26 s := kc.EventSource{k}
27 srcs[s] = struct{}{}
28 }
29
30 logger := log.New("knotstream")
31 cache := cache.New(c.Redis.Addr)
32 cursorStore := kc.NewRedisCursorStore(cache)
33
34 cfg := kc.ConsumerConfig{
35 Sources: srcs,
36 ProcessFunc: knotstreamIngester(d, enforcer),
37 RetryInterval: c.Knotstream.RetryInterval,
38 MaxRetryInterval: c.Knotstream.MaxRetryInterval,
39 ConnectionTimeout: c.Knotstream.ConnectionTimeout,
40 WorkerCount: c.Knotstream.WorkerCount,
41 QueueSize: c.Knotstream.QueueSize,
42 Logger: logger,
43 Dev: c.Core.Dev,
44 CursorStore: &cursorStore,
45 }
46
47 return kc.NewEventConsumer(cfg), nil
48}
49
50func knotstreamIngester(d *db.DB, enforcer *rbac.Enforcer) kc.ProcessFunc {
51 return func(source kc.EventSource, msg kc.Message) error {
52 switch msg.Nsid {
53 case tangled.GitRefUpdateNSID:
54 return ingestRefUpdate(d, enforcer, source, msg)
55 case tangled.PipelineNSID:
56 // TODO
57 }
58
59 return nil
60 }
61}
62
63func ingestRefUpdate(d *db.DB, enforcer *rbac.Enforcer, source kc.EventSource, msg kc.Message) error {
64 var record tangled.GitRefUpdate
65 err := json.Unmarshal(msg.EventJson, &record)
66 if err != nil {
67 return err
68 }
69
70 knownKnots, err := enforcer.GetDomainsForUser(record.CommitterDid)
71 if err != nil {
72 return err
73 }
74
75 if !slices.Contains(knownKnots, source.Knot) {
76 return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Knot)
77 }
78
79 punch := db.Punch{
80 Did: record.CommitterDid,
81 Date: time.Now(),
82 Count: 1,
83 }
84 if err := db.AddPunch(d, punch); err != nil {
85 return err
86 }
87
88 return nil
89}