Mirror of @tangled.org/core. Running on a Raspberry Pi Zero 2

appview/state: integrate webhooks with gitRefUpdate events

Trigger webhooks on git push events from knotstream.

Signed-off-by: Anirudh Oppiliappan <anirudh@tangled.org>

authored by anirudh.fi and committed by tangled.org 18fdecc5 6e0693f3

+58 -23
+42 -11
appview/state/knotstream.go
··· 8 "slices" 9 "time" 10 11 "tangled.org/core/api/tangled" 12 "tangled.org/core/appview/cache" 13 "tangled.org/core/appview/config" ··· 27 "github.com/posthog/posthog-go" 28 ) 29 30 - func Knotstream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client) (*ec.Consumer, error) { 31 logger := log.FromContext(ctx) 32 logger = log.SubLogger(logger, "knotstream") 33 ··· 50 51 cfg := ec.ConsumerConfig{ 52 Sources: srcs, 53 - ProcessFunc: knotIngester(d, enforcer, posthog, c.Core.Dev), 54 RetryInterval: c.Knotstream.RetryInterval, 55 MaxRetryInterval: c.Knotstream.MaxRetryInterval, 56 ConnectionTimeout: c.Knotstream.ConnectionTimeout, ··· 64 return ec.NewConsumer(cfg), nil 65 } 66 67 - func knotIngester(d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, dev bool) ec.ProcessFunc { 68 return func(ctx context.Context, source ec.Source, msg ec.Message) error { 69 switch msg.Nsid { 70 case tangled.GitRefUpdateNSID: 71 - return ingestRefUpdate(d, enforcer, posthog, dev, source, msg) 72 case tangled.PipelineNSID: 73 return ingestPipeline(d, source, msg) 74 } ··· 77 } 78 } 79 80 - func ingestRefUpdate(d *db.DB, enforcer *rbac.Enforcer, pc posthog.Client, dev bool, source ec.Source, msg ec.Message) error { 81 var record tangled.GitRefUpdate 82 err := json.Unmarshal(msg.EventJson, &record) 83 if err != nil { ··· 94 return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Key()) 95 } 96 97 - err1 := populatePunchcard(d, record) 98 - err2 := updateRepoLanguages(d, record) 99 100 - var err3 error 101 - if !dev { 102 - err3 = pc.Enqueue(posthog.Capture{ 103 DistinctId: record.CommitterDid, 104 Event: "git_ref_update", 105 }) 106 } 107 108 - return errors.Join(err1, err2, err3) 109 } 110 111 func populatePunchcard(d *db.DB, record tangled.GitRefUpdate) error { 112 knownEmails, err := db.GetAllEmails(d, record.CommitterDid) 113 if err != nil { 114 return err
··· 8 "slices" 9 "time" 10 11 + "tangled.org/core/appview/notify" 12 + 13 "tangled.org/core/api/tangled" 14 "tangled.org/core/appview/cache" 15 "tangled.org/core/appview/config" ··· 25 "github.com/posthog/posthog-go" 26 ) 27 28 + func Knotstream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, notifier notify.Notifier) (*ec.Consumer, error) { 29 logger := log.FromContext(ctx) 30 logger = log.SubLogger(logger, "knotstream") 31 ··· 48 49 cfg := ec.ConsumerConfig{ 50 Sources: srcs, 51 + ProcessFunc: knotIngester(d, enforcer, posthog, notifier, c.Core.Dev), 52 RetryInterval: c.Knotstream.RetryInterval, 53 MaxRetryInterval: c.Knotstream.MaxRetryInterval, 54 ConnectionTimeout: c.Knotstream.ConnectionTimeout, ··· 62 return ec.NewConsumer(cfg), nil 63 } 64 65 + func knotIngester(d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, notifier notify.Notifier, dev bool) ec.ProcessFunc { 66 return func(ctx context.Context, source ec.Source, msg ec.Message) error { 67 switch msg.Nsid { 68 case tangled.GitRefUpdateNSID: 69 + return ingestRefUpdate(d, enforcer, posthog, notifier, dev, source, msg, ctx) 70 case tangled.PipelineNSID: 71 return ingestPipeline(d, source, msg) 72 } ··· 75 } 76 } 77 78 + func ingestRefUpdate(d *db.DB, enforcer *rbac.Enforcer, pc posthog.Client, notifier notify.Notifier, dev bool, source ec.Source, msg ec.Message, ctx context.Context) error { 79 + logger := log.FromContext(ctx) 80 + 81 var record tangled.GitRefUpdate 82 err := json.Unmarshal(msg.EventJson, &record) 83 if err != nil { ··· 90 return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Key()) 91 } 92 93 + logger.Info("processing gitRefUpdate event", 94 + "repo_did", record.RepoDid, 95 + "repo_name", record.RepoName, 96 + "ref", record.Ref, 97 + "old_sha", record.OldSha, 98 + "new_sha", record.NewSha) 99 100 + // trigger webhook notifications first (before other ops that might fail) 101 + var errWebhook error 102 + repos, err := db.GetRepos( 103 + d, 104 + 0, 105 + orm.FilterEq("did", record.RepoDid), 106 + orm.FilterEq("name", record.RepoName), 107 + ) 108 + if err != nil { 109 + errWebhook = fmt.Errorf("failed to lookup repo for webhooks: %w", err) 110 + } else if len(repos) == 1 { 111 + notifier.Push(ctx, &repos[0], record.Ref, record.OldSha, record.NewSha, record.CommitterDid) 112 + } else if len(repos) == 0 { 113 + errWebhook = fmt.Errorf("no repo found for webhooks: %s/%s", record.RepoDid, record.RepoName) 114 + } 115 + 116 + errPunchcard := populatePunchcard(d, record) 117 + errLanguages := updateRepoLanguages(d, record) 118 + 119 + var errPosthog error 120 + if !dev && record.CommitterDid != "" { 121 + errPosthog = pc.Enqueue(posthog.Capture{ 122 DistinctId: record.CommitterDid, 123 Event: "git_ref_update", 124 }) 125 } 126 127 + return errors.Join(errWebhook, errPunchcard, errLanguages, errPosthog) 128 } 129 130 func populatePunchcard(d *db.DB, record tangled.GitRefUpdate) error { 131 + if record.CommitterDid == "" { 132 + return nil 133 + } 134 + 135 knownEmails, err := db.GetAllEmails(d, record.CommitterDid) 136 if err != nil { 137 return err
+16 -12
appview/state/state.go
··· 151 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 152 } 153 154 - knotstream, err := Knotstream(ctx, config, d, enforcer, posthog) 155 - if err != nil { 156 - return nil, fmt.Errorf("failed to start knotstream consumer: %w", err) 157 - } 158 - knotstream.Start(ctx) 159 - 160 - spindlestream, err := Spindlestream(ctx, config, d, enforcer) 161 - if err != nil { 162 - return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err) 163 - } 164 - spindlestream.Start(ctx) 165 - 166 var notifiers []notify.Notifier 167 168 // Always add the database notifier ··· 161 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog)) 162 } 163 notifiers = append(notifiers, indexer) 164 notifier := notify.NewMergedNotifier(notifiers) 165 notifier = notify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify")) 166 167 state := &State{ 168 d,
··· 151 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 152 } 153 154 var notifiers []notify.Notifier 155 156 // Always add the database notifier ··· 173 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog)) 174 } 175 notifiers = append(notifiers, indexer) 176 + 177 + // Add webhook notifier 178 + notifiers = append(notifiers, notify.NewWebhookNotifier(d)) 179 + 180 notifier := notify.NewMergedNotifier(notifiers) 181 notifier = notify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify")) 182 + 183 + knotstream, err := Knotstream(ctx, config, d, enforcer, posthog, notifier) 184 + if err != nil { 185 + return nil, fmt.Errorf("failed to start knotstream consumer: %w", err) 186 + } 187 + knotstream.Start(ctx) 188 + 189 + spindlestream, err := Spindlestream(ctx, config, d, enforcer) 190 + if err != nil { 191 + return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err) 192 + } 193 + spindlestream.Start(ctx) 194 195 state := &State{ 196 d,