Monorepo for Tangled
at master 304 lines 8.3 kB view raw
1package state 2 3import ( 4 "context" 5 "database/sql" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "slices" 10 "time" 11 12 "tangled.org/core/appview/notify" 13 14 "tangled.org/core/api/tangled" 15 "tangled.org/core/appview/cache" 16 "tangled.org/core/appview/config" 17 "tangled.org/core/appview/db" 18 "tangled.org/core/appview/models" 19 ec "tangled.org/core/eventconsumer" 20 "tangled.org/core/eventconsumer/cursor" 21 "tangled.org/core/log" 22 "tangled.org/core/orm" 23 "tangled.org/core/rbac" 24 "tangled.org/core/workflow" 25 26 "github.com/bluesky-social/indigo/atproto/syntax" 27 "github.com/go-git/go-git/v5/plumbing" 28 "github.com/posthog/posthog-go" 29) 30 31func Knotstream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, notifier notify.Notifier) (*ec.Consumer, error) { 32 logger := log.FromContext(ctx) 33 logger = log.SubLogger(logger, "knotstream") 34 35 knots, err := db.GetRegistrations( 36 d, 37 orm.FilterIsNot("registered", "null"), 38 ) 39 if err != nil { 40 return nil, err 41 } 42 43 srcs := make(map[ec.Source]struct{}) 44 for _, k := range knots { 45 s := ec.NewKnotSource(k.Domain) 46 srcs[s] = struct{}{} 47 } 48 49 cache := cache.New(c.Redis.Addr) 50 cursorStore := cursor.NewRedisCursorStore(cache) 51 52 cfg := ec.ConsumerConfig{ 53 Sources: srcs, 54 ProcessFunc: knotIngester(d, enforcer, posthog, notifier, c.Core.Dev), 55 RetryInterval: c.Knotstream.RetryInterval, 56 MaxRetryInterval: c.Knotstream.MaxRetryInterval, 57 ConnectionTimeout: c.Knotstream.ConnectionTimeout, 58 WorkerCount: c.Knotstream.WorkerCount, 59 QueueSize: c.Knotstream.QueueSize, 60 Logger: logger, 61 Dev: c.Core.Dev, 62 CursorStore: &cursorStore, 63 } 64 65 return ec.NewConsumer(cfg), nil 66} 67 68func knotIngester(d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, notifier notify.Notifier, dev bool) ec.ProcessFunc { 69 return func(ctx context.Context, source ec.Source, msg ec.Message) error { 70 switch msg.Nsid { 71 case tangled.GitRefUpdateNSID: 72 return ingestRefUpdate(d, enforcer, posthog, notifier, dev, source, msg, ctx) 73 case tangled.PipelineNSID: 74 return ingestPipeline(d, source, msg) 75 } 76 77 return nil 78 } 79} 80 81func ingestRefUpdate(d *db.DB, enforcer *rbac.Enforcer, pc posthog.Client, notifier notify.Notifier, dev bool, source ec.Source, msg ec.Message, ctx context.Context) error { 82 logger := log.FromContext(ctx) 83 84 var record tangled.GitRefUpdate 85 err := json.Unmarshal(msg.EventJson, &record) 86 if err != nil { 87 return err 88 } 89 90 if record.RepoDid == "" { 91 logger.Error("gitRefUpdate missing repoDid, skipping", "owner_did", record.OwnerDid, "repo_name", record.RepoName) 92 return fmt.Errorf("gitRefUpdate missing repoDid") 93 } 94 95 knownKnots, err := enforcer.GetKnotsForUser(record.CommitterDid) 96 if err != nil { 97 return err 98 } 99 if !slices.Contains(knownKnots, source.Key()) { 100 return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Key()) 101 } 102 103 logger.Info("processing gitRefUpdate event", 104 "repo_did", record.RepoDid, 105 "ref", record.Ref, 106 "old_sha", record.OldSha, 107 "new_sha", record.NewSha) 108 109 var errWebhook error 110 111 repo, lookupErr := db.GetRepoByDid(d, record.RepoDid) 112 if lookupErr != nil && !errors.Is(lookupErr, sql.ErrNoRows) { 113 return fmt.Errorf("failed to look up repo by DID %s: %w", record.RepoDid, lookupErr) 114 } 115 116 var repos []models.Repo 117 if lookupErr == nil { 118 repos = []models.Repo{*repo} 119 } 120 121 if errWebhook == nil && len(repos) == 1 { 122 notifier.Push(ctx, &repos[0], record.Ref, record.OldSha, record.NewSha, record.CommitterDid) 123 } 124 125 errPunchcard := populatePunchcard(d, record) 126 errLanguages := updateRepoLanguages(d, record) 127 128 var errPosthog error 129 if !dev && record.CommitterDid != "" { 130 errPosthog = pc.Enqueue(posthog.Capture{ 131 DistinctId: record.CommitterDid, 132 Event: "git_ref_update", 133 }) 134 } 135 136 return errors.Join(errWebhook, errPunchcard, errLanguages, errPosthog) 137} 138 139func populatePunchcard(d *db.DB, record tangled.GitRefUpdate) error { 140 if record.CommitterDid == "" { 141 return nil 142 } 143 144 knownEmails, err := db.GetAllEmails(d, record.CommitterDid) 145 if err != nil { 146 return err 147 } 148 149 count := 0 150 for _, ke := range knownEmails { 151 if record.Meta == nil { 152 continue 153 } 154 if record.Meta.CommitCount == nil { 155 continue 156 } 157 for _, ce := range record.Meta.CommitCount.ByEmail { 158 if ce == nil { 159 continue 160 } 161 if ce.Email == ke.Address || ce.Email == record.CommitterDid { 162 count += int(ce.Count) 163 } 164 } 165 } 166 167 punch := models.Punch{ 168 Did: record.CommitterDid, 169 Date: time.Now(), 170 Count: count, 171 } 172 return db.AddPunch(d, punch) 173} 174 175func updateRepoLanguages(d *db.DB, record tangled.GitRefUpdate) error { 176 if record.Meta == nil || record.Meta.LangBreakdown == nil || record.Meta.LangBreakdown.Inputs == nil { 177 return fmt.Errorf("empty language data for repo: %s/%s", record.OwnerDid, record.RepoName) 178 } 179 180 if record.RepoDid == "" { 181 return fmt.Errorf("gitRefUpdate missing repoDid for language update") 182 } 183 184 r, lookupErr := db.GetRepoByDid(d, record.RepoDid) 185 if lookupErr != nil { 186 return fmt.Errorf("failed to look up repo by DID %s: %w", record.RepoDid, lookupErr) 187 } 188 repo := *r 189 190 ref := plumbing.ReferenceName(record.Ref) 191 if !ref.IsBranch() { 192 return fmt.Errorf("%s is not a valid reference name", ref) 193 } 194 195 var langs []models.RepoLanguage 196 for _, l := range record.Meta.LangBreakdown.Inputs { 197 if l == nil { 198 continue 199 } 200 201 langs = append(langs, models.RepoLanguage{ 202 RepoAt: repo.RepoAt(), 203 RepoDid: repo.RepoDid, 204 Ref: ref.Short(), 205 IsDefaultRef: record.Meta.IsDefaultRef, 206 Language: l.Lang, 207 Bytes: l.Size, 208 }) 209 } 210 211 tx, err := d.Begin() 212 if err != nil { 213 return err 214 } 215 defer tx.Rollback() 216 217 // update appview's cache 218 err = db.UpdateRepoLanguages(tx, repo.RepoAt(), ref.Short(), langs) 219 if err != nil { 220 fmt.Printf("failed; %s\n", err) 221 // non-fatal 222 } 223 224 return tx.Commit() 225} 226 227func ingestPipeline(d *db.DB, source ec.Source, msg ec.Message) error { 228 var record tangled.Pipeline 229 err := json.Unmarshal(msg.EventJson, &record) 230 if err != nil { 231 return err 232 } 233 234 if record.TriggerMetadata == nil { 235 return fmt.Errorf("empty trigger metadata: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 236 } 237 238 if record.TriggerMetadata.Repo == nil { 239 return fmt.Errorf("empty repo: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 240 } 241 242 if record.TriggerMetadata.Repo.RepoDid == "" { 243 return fmt.Errorf("pipeline missing repoDid: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 244 } 245 246 repo, lookupErr := db.GetRepoByDid(d, record.TriggerMetadata.Repo.RepoDid) 247 if lookupErr != nil { 248 return fmt.Errorf("failed to look up repo by DID %s: %w", record.TriggerMetadata.Repo.RepoDid, lookupErr) 249 } 250 repos := []models.Repo{*repo} 251 if repos[0].Spindle == "" { 252 return fmt.Errorf("repo does not have a spindle configured yet: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 253 } 254 255 // trigger info 256 var trigger models.Trigger 257 var sha string 258 trigger.Kind = workflow.TriggerKind(record.TriggerMetadata.Kind) 259 switch trigger.Kind { 260 case workflow.TriggerKindPush: 261 trigger.PushRef = &record.TriggerMetadata.Push.Ref 262 trigger.PushNewSha = &record.TriggerMetadata.Push.NewSha 263 trigger.PushOldSha = &record.TriggerMetadata.Push.OldSha 264 sha = *trigger.PushNewSha 265 case workflow.TriggerKindPullRequest: 266 trigger.PRSourceBranch = &record.TriggerMetadata.PullRequest.SourceBranch 267 trigger.PRTargetBranch = &record.TriggerMetadata.PullRequest.TargetBranch 268 trigger.PRSourceSha = &record.TriggerMetadata.PullRequest.SourceSha 269 trigger.PRAction = &record.TriggerMetadata.PullRequest.Action 270 sha = *trigger.PRSourceSha 271 } 272 273 tx, err := d.Begin() 274 if err != nil { 275 return fmt.Errorf("failed to start txn: %w", err) 276 } 277 278 triggerId, err := db.AddTrigger(tx, trigger) 279 if err != nil { 280 return fmt.Errorf("failed to add trigger entry: %w", err) 281 } 282 283 pipeline := models.Pipeline{ 284 Rkey: msg.Rkey, 285 Knot: source.Key(), 286 RepoOwner: syntax.DID(record.TriggerMetadata.Repo.Did), 287 RepoName: record.TriggerMetadata.Repo.Repo, 288 RepoDid: repos[0].RepoDid, 289 TriggerId: int(triggerId), 290 Sha: sha, 291 } 292 293 err = db.AddPipeline(tx, pipeline) 294 if err != nil { 295 return fmt.Errorf("failed to add pipeline: %w", err) 296 } 297 298 err = tx.Commit() 299 if err != nil { 300 return fmt.Errorf("failed to commit txn: %w", err) 301 } 302 303 return nil 304}