Monorepo for Tangled
at 67c0e9bc7d93ce141284b0554feacb3c1d57f6d0 305 lines 8.3 kB view raw
1package state 2 3import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "fmt" 8 "slices" 9 "time" 10 11 "tangled.org/core/appview/notify" 12 13 "tangled.org/core/api/tangled" 14 "tangled.org/core/appview/cache" 15 "tangled.org/core/appview/config" 16 "tangled.org/core/appview/db" 17 "tangled.org/core/appview/models" 18 ec "tangled.org/core/eventconsumer" 19 "tangled.org/core/eventconsumer/cursor" 20 "tangled.org/core/log" 21 "tangled.org/core/orm" 22 "tangled.org/core/rbac" 23 "tangled.org/core/workflow" 24 25 "github.com/bluesky-social/indigo/atproto/syntax" 26 "github.com/go-git/go-git/v5/plumbing" 27 "github.com/posthog/posthog-go" 28) 29 30func Knotstream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, notifier notify.Notifier) (*ec.Consumer, error) { 31 logger := log.FromContext(ctx) 32 logger = log.SubLogger(logger, "knotstream") 33 34 knots, err := db.GetRegistrations( 35 d, 36 orm.FilterIsNot("registered", "null"), 37 ) 38 if err != nil { 39 return nil, err 40 } 41 42 srcs := make(map[ec.Source]struct{}) 43 for _, k := range knots { 44 s := ec.NewKnotSource(k.Domain) 45 srcs[s] = struct{}{} 46 } 47 48 cache := cache.New(c.Redis.Addr) 49 cursorStore := cursor.NewRedisCursorStore(cache) 50 51 cfg := ec.ConsumerConfig{ 52 Sources: srcs, 53 ProcessFunc: knotIngester(d, enforcer, posthog, notifier, c.Core.Dev), 54 RetryInterval: c.Knotstream.RetryInterval, 55 MaxRetryInterval: c.Knotstream.MaxRetryInterval, 56 ConnectionTimeout: c.Knotstream.ConnectionTimeout, 57 WorkerCount: c.Knotstream.WorkerCount, 58 QueueSize: c.Knotstream.QueueSize, 59 Logger: logger, 60 Dev: c.Core.Dev, 61 CursorStore: &cursorStore, 62 } 63 64 return ec.NewConsumer(cfg), nil 65} 66 67func knotIngester(d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, notifier notify.Notifier, dev bool) ec.ProcessFunc { 68 return func(ctx context.Context, source ec.Source, msg ec.Message) error { 69 switch msg.Nsid { 70 case tangled.GitRefUpdateNSID: 71 return ingestRefUpdate(d, enforcer, posthog, notifier, dev, source, msg, ctx) 72 case tangled.PipelineNSID: 73 return ingestPipeline(d, source, msg) 74 } 75 76 return nil 77 } 78} 79 80func ingestRefUpdate(d *db.DB, enforcer *rbac.Enforcer, pc posthog.Client, notifier notify.Notifier, dev bool, source ec.Source, msg ec.Message, ctx context.Context) error { 81 logger := log.FromContext(ctx) 82 83 var record tangled.GitRefUpdate 84 err := json.Unmarshal(msg.EventJson, &record) 85 if err != nil { 86 return err 87 } 88 89 knownKnots, err := enforcer.GetKnotsForUser(record.CommitterDid) 90 if err != nil { 91 return err 92 } 93 if !slices.Contains(knownKnots, source.Key()) { 94 return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Key()) 95 } 96 97 logger.Info("processing gitRefUpdate event", 98 "repo_did", record.RepoDid, 99 "repo_name", record.RepoName, 100 "ref", record.Ref, 101 "old_sha", record.OldSha, 102 "new_sha", record.NewSha) 103 104 // trigger webhook notifications first (before other ops that might fail) 105 var errWebhook error 106 repos, err := db.GetRepos( 107 d, 108 0, 109 orm.FilterEq("did", record.RepoDid), 110 orm.FilterEq("name", record.RepoName), 111 ) 112 if err != nil { 113 errWebhook = fmt.Errorf("failed to lookup repo for webhooks: %w", err) 114 } else if len(repos) == 1 { 115 notifier.Push(ctx, &repos[0], record.Ref, record.OldSha, record.NewSha, record.CommitterDid) 116 } else if len(repos) == 0 { 117 errWebhook = fmt.Errorf("no repo found for webhooks: %s/%s", record.RepoDid, record.RepoName) 118 } 119 120 errPunchcard := populatePunchcard(d, record) 121 errLanguages := updateRepoLanguages(d, record) 122 123 var errPosthog error 124 if !dev && record.CommitterDid != "" { 125 errPosthog = pc.Enqueue(posthog.Capture{ 126 DistinctId: record.CommitterDid, 127 Event: "git_ref_update", 128 }) 129 } 130 131 return errors.Join(errWebhook, errPunchcard, errLanguages, errPosthog) 132} 133 134func populatePunchcard(d *db.DB, record tangled.GitRefUpdate) error { 135 if record.CommitterDid == "" { 136 return nil 137 } 138 139 knownEmails, err := db.GetAllEmails(d, record.CommitterDid) 140 if err != nil { 141 return err 142 } 143 144 count := 0 145 for _, ke := range knownEmails { 146 if record.Meta == nil { 147 continue 148 } 149 if record.Meta.CommitCount == nil { 150 continue 151 } 152 for _, ce := range record.Meta.CommitCount.ByEmail { 153 if ce == nil { 154 continue 155 } 156 if ce.Email == ke.Address || ce.Email == record.CommitterDid { 157 count += int(ce.Count) 158 } 159 } 160 } 161 162 punch := models.Punch{ 163 Did: record.CommitterDid, 164 Date: time.Now(), 165 Count: count, 166 } 167 return db.AddPunch(d, punch) 168} 169 170func updateRepoLanguages(d *db.DB, record tangled.GitRefUpdate) error { 171 if record.Meta == nil || record.Meta.LangBreakdown == nil || record.Meta.LangBreakdown.Inputs == nil { 172 return fmt.Errorf("empty language data for repo: %s/%s", record.RepoDid, record.RepoName) 173 } 174 175 repos, err := db.GetRepos( 176 d, 177 0, 178 orm.FilterEq("did", record.RepoDid), 179 orm.FilterEq("name", record.RepoName), 180 ) 181 if err != nil { 182 return fmt.Errorf("failed to look for repo in DB (%s/%s): %w", record.RepoDid, record.RepoName, err) 183 } 184 if len(repos) != 1 { 185 return fmt.Errorf("incorrect number of repos returned: %d (expected 1)", len(repos)) 186 } 187 repo := repos[0] 188 189 ref := plumbing.ReferenceName(record.Ref) 190 if !ref.IsBranch() { 191 return fmt.Errorf("%s is not a valid reference name", ref) 192 } 193 194 var langs []models.RepoLanguage 195 for _, l := range record.Meta.LangBreakdown.Inputs { 196 if l == nil { 197 continue 198 } 199 200 langs = append(langs, models.RepoLanguage{ 201 RepoAt: repo.RepoAt(), 202 Ref: ref.Short(), 203 IsDefaultRef: record.Meta.IsDefaultRef, 204 Language: l.Lang, 205 Bytes: l.Size, 206 }) 207 } 208 209 tx, err := d.Begin() 210 if err != nil { 211 return err 212 } 213 defer tx.Rollback() 214 215 // update appview's cache 216 err = db.UpdateRepoLanguages(tx, repo.RepoAt(), ref.Short(), langs) 217 if err != nil { 218 fmt.Printf("failed; %s\n", err) 219 // non-fatal 220 } 221 222 return tx.Commit() 223} 224 225func ingestPipeline(d *db.DB, source ec.Source, msg ec.Message) error { 226 var record tangled.Pipeline 227 err := json.Unmarshal(msg.EventJson, &record) 228 if err != nil { 229 return err 230 } 231 232 if record.TriggerMetadata == nil { 233 return fmt.Errorf("empty trigger metadata: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 234 } 235 236 if record.TriggerMetadata.Repo == nil { 237 return fmt.Errorf("empty repo: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 238 } 239 240 // does this repo have a spindle configured? 241 repos, err := db.GetRepos( 242 d, 243 0, 244 orm.FilterEq("did", record.TriggerMetadata.Repo.Did), 245 orm.FilterEq("name", record.TriggerMetadata.Repo.Repo), 246 ) 247 if err != nil { 248 return fmt.Errorf("failed to look for repo in DB: nsid %s, rkey %s, %w", msg.Nsid, msg.Rkey, err) 249 } 250 if len(repos) != 1 { 251 return fmt.Errorf("incorrect number of repos returned: %d (expected 1)", len(repos)) 252 } 253 if repos[0].Spindle == "" { 254 return fmt.Errorf("repo does not have a spindle configured yet: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 255 } 256 257 // trigger info 258 var trigger models.Trigger 259 var sha string 260 trigger.Kind = workflow.TriggerKind(record.TriggerMetadata.Kind) 261 switch trigger.Kind { 262 case workflow.TriggerKindPush: 263 trigger.PushRef = &record.TriggerMetadata.Push.Ref 264 trigger.PushNewSha = &record.TriggerMetadata.Push.NewSha 265 trigger.PushOldSha = &record.TriggerMetadata.Push.OldSha 266 sha = *trigger.PushNewSha 267 case workflow.TriggerKindPullRequest: 268 trigger.PRSourceBranch = &record.TriggerMetadata.PullRequest.SourceBranch 269 trigger.PRTargetBranch = &record.TriggerMetadata.PullRequest.TargetBranch 270 trigger.PRSourceSha = &record.TriggerMetadata.PullRequest.SourceSha 271 trigger.PRAction = &record.TriggerMetadata.PullRequest.Action 272 sha = *trigger.PRSourceSha 273 } 274 275 tx, err := d.Begin() 276 if err != nil { 277 return fmt.Errorf("failed to start txn: %w", err) 278 } 279 280 triggerId, err := db.AddTrigger(tx, trigger) 281 if err != nil { 282 return fmt.Errorf("failed to add trigger entry: %w", err) 283 } 284 285 pipeline := models.Pipeline{ 286 Rkey: msg.Rkey, 287 Knot: source.Key(), 288 RepoOwner: syntax.DID(record.TriggerMetadata.Repo.Did), 289 RepoName: record.TriggerMetadata.Repo.Repo, 290 TriggerId: int(triggerId), 291 Sha: sha, 292 } 293 294 err = db.AddPipeline(tx, pipeline) 295 if err != nil { 296 return fmt.Errorf("failed to add pipeline: %w", err) 297 } 298 299 err = tx.Commit() 300 if err != nil { 301 return fmt.Errorf("failed to commit txn: %w", err) 302 } 303 304 return nil 305}