this repo has no description
1package state 2 3import ( 4 "encoding/json" 5 "fmt" 6 "slices" 7 "time" 8 9 "tangled.sh/tangled.sh/core/api/tangled" 10 "tangled.sh/tangled.sh/core/appview/cache" 11 "tangled.sh/tangled.sh/core/appview/config" 12 "tangled.sh/tangled.sh/core/appview/db" 13 kc "tangled.sh/tangled.sh/core/knotclient" 14 "tangled.sh/tangled.sh/core/log" 15 "tangled.sh/tangled.sh/core/rbac" 16) 17 18func KnotstreamConsumer(c *config.Config, d *db.DB, enforcer *rbac.Enforcer) (*kc.EventConsumer, error) { 19 knots, err := db.GetCompletedRegistrations(d) 20 if err != nil { 21 return nil, err 22 } 23 24 srcs := make(map[kc.EventSource]struct{}) 25 for _, k := range knots { 26 s := kc.EventSource{k} 27 srcs[s] = struct{}{} 28 } 29 30 logger := log.New("knotstream") 31 cache := cache.New(c.Redis.Addr) 32 cursorStore := kc.NewRedisCursorStore(cache) 33 34 cfg := kc.ConsumerConfig{ 35 Sources: srcs, 36 ProcessFunc: knotstreamIngester(d, enforcer), 37 RetryInterval: c.Knotstream.RetryInterval, 38 MaxRetryInterval: c.Knotstream.MaxRetryInterval, 39 ConnectionTimeout: c.Knotstream.ConnectionTimeout, 40 WorkerCount: c.Knotstream.WorkerCount, 41 QueueSize: c.Knotstream.QueueSize, 42 Logger: logger, 43 Dev: c.Core.Dev, 44 CursorStore: &cursorStore, 45 } 46 47 return kc.NewEventConsumer(cfg), nil 48} 49 50func knotstreamIngester(d *db.DB, enforcer *rbac.Enforcer) kc.ProcessFunc { 51 return func(source kc.EventSource, msg kc.Message) error { 52 switch msg.Nsid { 53 case tangled.GitRefUpdateNSID: 54 return ingestRefUpdate(d, enforcer, source, msg) 55 case tangled.PipelineNSID: 56 // TODO 57 } 58 59 return nil 60 } 61} 62 63func ingestRefUpdate(d *db.DB, enforcer *rbac.Enforcer, source kc.EventSource, msg kc.Message) error { 64 var record tangled.GitRefUpdate 65 err := json.Unmarshal(msg.EventJson, &record) 66 if err != nil { 67 return err 68 } 69 70 knownKnots, err := enforcer.GetDomainsForUser(record.CommitterDid) 71 if err != nil { 72 return err 73 } 74 75 if !slices.Contains(knownKnots, source.Knot) { 76 return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Knot) 77 } 78 79 punch := db.Punch{ 80 Did: record.CommitterDid, 81 Date: time.Now(), 82 Count: 1, 83 } 84 if err := db.AddPunch(d, punch); err != nil { 85 return err 86 } 87 88 return nil 89}