forked from
tangled.org/core
Monorepo for Tangled
1package state
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "slices"
9 "time"
10
11 "tangled.org/core/appview/cloudflare"
12 "tangled.org/core/appview/notify"
13
14 "tangled.org/core/api/tangled"
15 "tangled.org/core/appview/cache"
16 "tangled.org/core/appview/config"
17 "tangled.org/core/appview/db"
18 "tangled.org/core/appview/models"
19 "tangled.org/core/appview/sites"
20 ec "tangled.org/core/eventconsumer"
21 "tangled.org/core/eventconsumer/cursor"
22 "tangled.org/core/log"
23 "tangled.org/core/orm"
24 "tangled.org/core/rbac"
25 "tangled.org/core/workflow"
26
27 "github.com/bluesky-social/indigo/atproto/syntax"
28 "github.com/go-git/go-git/v5/plumbing"
29 "github.com/posthog/posthog-go"
30)
31
32func Knotstream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, notifier notify.Notifier, cfClient *cloudflare.Client) (*ec.Consumer, error) {
33 logger := log.FromContext(ctx)
34 logger = log.SubLogger(logger, "knotstream")
35
36 knots, err := db.GetRegistrations(
37 d,
38 orm.FilterIsNot("registered", "null"),
39 )
40 if err != nil {
41 return nil, err
42 }
43
44 srcs := make(map[ec.Source]struct{})
45 for _, k := range knots {
46 s := ec.NewKnotSource(k.Domain)
47 srcs[s] = struct{}{}
48 }
49
50 cache := cache.New(c.Redis.Addr)
51 cursorStore := cursor.NewRedisCursorStore(cache)
52
53 cfg := ec.ConsumerConfig{
54 Sources: srcs,
55 ProcessFunc: knotIngester(d, enforcer, posthog, notifier, c.Core.Dev, c, cfClient),
56 RetryInterval: c.Knotstream.RetryInterval,
57 MaxRetryInterval: c.Knotstream.MaxRetryInterval,
58 ConnectionTimeout: c.Knotstream.ConnectionTimeout,
59 WorkerCount: c.Knotstream.WorkerCount,
60 QueueSize: c.Knotstream.QueueSize,
61 Logger: logger,
62 Dev: c.Core.Dev,
63 CursorStore: &cursorStore,
64 }
65
66 return ec.NewConsumer(cfg), nil
67}
68
69func knotIngester(d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, notifier notify.Notifier, dev bool, c *config.Config, cfClient *cloudflare.Client) ec.ProcessFunc {
70 return func(ctx context.Context, source ec.Source, msg ec.Message) error {
71 switch msg.Nsid {
72 case tangled.GitRefUpdateNSID:
73 return ingestRefUpdate(ctx, d, enforcer, posthog, notifier, dev, c, cfClient, source, msg)
74 case tangled.PipelineNSID:
75 return ingestPipeline(d, source, msg)
76 }
77
78 return nil
79 }
80}
81
82func ingestRefUpdate(ctx context.Context, d *db.DB, enforcer *rbac.Enforcer, pc posthog.Client, notifier notify.Notifier, dev bool, c *config.Config, cfClient *cloudflare.Client, source ec.Source, msg ec.Message) error {
83 logger := log.FromContext(ctx)
84
85 var record tangled.GitRefUpdate
86 err := json.Unmarshal(msg.EventJson, &record)
87 if err != nil {
88 return err
89 }
90
91 knownKnots, err := enforcer.GetKnotsForUser(record.CommitterDid)
92 if err != nil {
93 return err
94 }
95 if !slices.Contains(knownKnots, source.Key()) {
96 return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Key())
97 }
98
99 logger.Info("processing gitRefUpdate event",
100 "repo_did", record.RepoDid,
101 "repo_name", record.RepoName,
102 "ref", record.Ref,
103 "old_sha", record.OldSha,
104 "new_sha", record.NewSha)
105
106 // trigger webhook notifications first (before other ops that might fail)
107 var errWebhook error
108 repos, err := db.GetRepos(
109 d,
110 0,
111 orm.FilterEq("did", record.RepoDid),
112 orm.FilterEq("name", record.RepoName),
113 )
114 if err != nil {
115 errWebhook = fmt.Errorf("failed to lookup repo for webhooks: %w", err)
116 } else if len(repos) == 1 {
117 notifier.Push(ctx, &repos[0], record.Ref, record.OldSha, record.NewSha, record.CommitterDid)
118 }
119
120 errPunchcard := populatePunchcard(d, record)
121 errLanguages := updateRepoLanguages(d, record)
122
123 var errPosthog error
124 if !dev && record.CommitterDid != "" {
125 errPosthog = pc.Enqueue(posthog.Capture{
126 DistinctId: record.CommitterDid,
127 Event: "git_ref_update",
128 })
129 }
130
131 // Trigger a sites redeploy if this push is to the configured sites branch.
132 if cfClient.Enabled() {
133 go triggerSitesDeployIfNeeded(ctx, d, cfClient, c, record, source)
134 }
135
136 return errors.Join(errWebhook, errPunchcard, errLanguages, errPosthog)
137}
138
139// triggerSitesDeployIfNeeded checks whether the pushed ref matches the sites
140// branch configured for this repo and, if so, syncs the site to R2
141func triggerSitesDeployIfNeeded(ctx context.Context, d *db.DB, cfClient *cloudflare.Client, c *config.Config, record tangled.GitRefUpdate, source ec.Source) {
142 logger := log.FromContext(ctx)
143
144 ref := plumbing.ReferenceName(record.Ref)
145 if !ref.IsBranch() {
146 return
147 }
148 pushedBranch := ref.Short()
149
150 repos, err := db.GetRepos(
151 d,
152 0,
153 orm.FilterEq("did", record.RepoDid),
154 orm.FilterEq("name", record.RepoName),
155 )
156 if err != nil || len(repos) != 1 {
157 return
158 }
159 repo := repos[0]
160
161 siteConfig, err := db.GetRepoSiteConfig(d, repo.RepoAt().String())
162 if err != nil || siteConfig == nil {
163 return
164 }
165 if siteConfig.Branch != pushedBranch {
166 return
167 }
168
169 scheme := "https"
170 if c.Core.Dev {
171 scheme = "http"
172 }
173 knotHost := fmt.Sprintf("%s://%s", scheme, source.Key())
174
175 deploy := &models.SiteDeploy{
176 RepoAt: repo.RepoAt().String(),
177 Branch: siteConfig.Branch,
178 Dir: siteConfig.Dir,
179 CommitSHA: record.NewSha,
180 Trigger: models.SiteDeployTriggerPush,
181 }
182
183 deployErr := sites.Deploy(ctx, cfClient, knotHost, record.RepoDid, record.RepoName, siteConfig.Branch, siteConfig.Dir)
184 if deployErr != nil {
185 logger.Error("sites: R2 sync failed on push", "repo", record.RepoDid+"/"+record.RepoName, "err", deployErr)
186 deploy.Status = models.SiteDeployStatusFailure
187 deploy.Error = deployErr.Error()
188 } else {
189 deploy.Status = models.SiteDeployStatusSuccess
190 }
191
192 if err := db.AddSiteDeploy(d, deploy); err != nil {
193 logger.Error("sites: failed to record deploy", "repo", record.RepoDid+"/"+record.RepoName, "err", err)
194 }
195
196 if deployErr == nil {
197 logger.Info("site deployed to r2", "repo", record.RepoDid+"/"+record.RepoName)
198 }
199}
200
201func populatePunchcard(d *db.DB, record tangled.GitRefUpdate) error {
202 if record.CommitterDid == "" {
203 return nil
204 }
205
206 knownEmails, err := db.GetAllEmails(d, record.CommitterDid)
207 if err != nil {
208 return err
209 }
210
211 count := 0
212 for _, ke := range knownEmails {
213 if record.Meta == nil {
214 continue
215 }
216 if record.Meta.CommitCount == nil {
217 continue
218 }
219 for _, ce := range record.Meta.CommitCount.ByEmail {
220 if ce == nil {
221 continue
222 }
223 if ce.Email == ke.Address || ce.Email == record.CommitterDid {
224 count += int(ce.Count)
225 }
226 }
227 }
228
229 punch := models.Punch{
230 Did: record.CommitterDid,
231 Date: time.Now(),
232 Count: count,
233 }
234 return db.AddPunch(d, punch)
235}
236
237func updateRepoLanguages(d *db.DB, record tangled.GitRefUpdate) error {
238 if record.Meta == nil || record.Meta.LangBreakdown == nil || record.Meta.LangBreakdown.Inputs == nil {
239 return fmt.Errorf("empty language data for repo: %s/%s", record.RepoDid, record.RepoName)
240 }
241
242 repos, err := db.GetRepos(
243 d,
244 0,
245 orm.FilterEq("did", record.RepoDid),
246 orm.FilterEq("name", record.RepoName),
247 )
248 if err != nil {
249 return fmt.Errorf("failed to look for repo in DB (%s/%s): %w", record.RepoDid, record.RepoName, err)
250 }
251 if len(repos) != 1 {
252 return fmt.Errorf("incorrect number of repos returned: %d (expected 1)", len(repos))
253 }
254 repo := repos[0]
255
256 ref := plumbing.ReferenceName(record.Ref)
257 if !ref.IsBranch() {
258 return fmt.Errorf("%s is not a valid reference name", ref)
259 }
260
261 var langs []models.RepoLanguage
262 for _, l := range record.Meta.LangBreakdown.Inputs {
263 if l == nil {
264 continue
265 }
266
267 langs = append(langs, models.RepoLanguage{
268 RepoAt: repo.RepoAt(),
269 Ref: ref.Short(),
270 IsDefaultRef: record.Meta.IsDefaultRef,
271 Language: l.Lang,
272 Bytes: l.Size,
273 })
274 }
275
276 tx, err := d.Begin()
277 if err != nil {
278 return err
279 }
280 defer tx.Rollback()
281
282 // update appview's cache
283 err = db.UpdateRepoLanguages(tx, repo.RepoAt(), ref.Short(), langs)
284 if err != nil {
285 fmt.Printf("failed; %s\n", err)
286 // non-fatal
287 }
288
289 return tx.Commit()
290}
291
292func ingestPipeline(d *db.DB, source ec.Source, msg ec.Message) error {
293 var record tangled.Pipeline
294 err := json.Unmarshal(msg.EventJson, &record)
295 if err != nil {
296 return err
297 }
298
299 if record.TriggerMetadata == nil {
300 return fmt.Errorf("empty trigger metadata: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
301 }
302
303 if record.TriggerMetadata.Repo == nil {
304 return fmt.Errorf("empty repo: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
305 }
306
307 // does this repo have a spindle configured?
308 repos, err := db.GetRepos(
309 d,
310 0,
311 orm.FilterEq("did", record.TriggerMetadata.Repo.Did),
312 orm.FilterEq("name", record.TriggerMetadata.Repo.Repo),
313 )
314 if err != nil {
315 return fmt.Errorf("failed to look for repo in DB: nsid %s, rkey %s, %w", msg.Nsid, msg.Rkey, err)
316 }
317 if len(repos) != 1 {
318 return fmt.Errorf("incorrect number of repos returned: %d (expected 1)", len(repos))
319 }
320 if repos[0].Spindle == "" {
321 return fmt.Errorf("repo does not have a spindle configured yet: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
322 }
323
324 // trigger info
325 var trigger models.Trigger
326 var sha string
327 trigger.Kind = workflow.TriggerKind(record.TriggerMetadata.Kind)
328 switch trigger.Kind {
329 case workflow.TriggerKindPush:
330 trigger.PushRef = &record.TriggerMetadata.Push.Ref
331 trigger.PushNewSha = &record.TriggerMetadata.Push.NewSha
332 trigger.PushOldSha = &record.TriggerMetadata.Push.OldSha
333 sha = *trigger.PushNewSha
334 case workflow.TriggerKindPullRequest:
335 trigger.PRSourceBranch = &record.TriggerMetadata.PullRequest.SourceBranch
336 trigger.PRTargetBranch = &record.TriggerMetadata.PullRequest.TargetBranch
337 trigger.PRSourceSha = &record.TriggerMetadata.PullRequest.SourceSha
338 trigger.PRAction = &record.TriggerMetadata.PullRequest.Action
339 sha = *trigger.PRSourceSha
340 }
341
342 tx, err := d.Begin()
343 if err != nil {
344 return fmt.Errorf("failed to start txn: %w", err)
345 }
346
347 triggerId, err := db.AddTrigger(tx, trigger)
348 if err != nil {
349 return fmt.Errorf("failed to add trigger entry: %w", err)
350 }
351
352 pipeline := models.Pipeline{
353 Rkey: msg.Rkey,
354 Knot: source.Key(),
355 RepoOwner: syntax.DID(record.TriggerMetadata.Repo.Did),
356 RepoName: record.TriggerMetadata.Repo.Repo,
357 TriggerId: int(triggerId),
358 Sha: sha,
359 }
360
361 err = db.AddPipeline(tx, pipeline)
362 if err != nil {
363 return fmt.Errorf("failed to add pipeline: %w", err)
364 }
365
366 err = tx.Commit()
367 if err != nil {
368 return fmt.Errorf("failed to commit txn: %w", err)
369 }
370
371 return nil
372}