Monorepo for Tangled
at push-pkuzytwlwptp 354 lines 9.6 kB view raw
1package state 2 3import ( 4 "context" 5 "database/sql" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "slices" 10 "time" 11 12 "tangled.org/core/appview/notify" 13 14 "tangled.org/core/api/tangled" 15 "tangled.org/core/appview/cache" 16 "tangled.org/core/appview/config" 17 "tangled.org/core/appview/db" 18 "tangled.org/core/appview/models" 19 ec "tangled.org/core/eventconsumer" 20 "tangled.org/core/eventconsumer/cursor" 21 "tangled.org/core/log" 22 "tangled.org/core/orm" 23 "tangled.org/core/rbac" 24 "tangled.org/core/workflow" 25 26 "github.com/bluesky-social/indigo/atproto/syntax" 27 "github.com/go-git/go-git/v5/plumbing" 28 "github.com/posthog/posthog-go" 29) 30 31func Knotstream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, notifier notify.Notifier) (*ec.Consumer, error) { 32 logger := log.FromContext(ctx) 33 logger = log.SubLogger(logger, "knotstream") 34 35 knots, err := db.GetRegistrations( 36 d, 37 orm.FilterIsNot("registered", "null"), 38 ) 39 if err != nil { 40 return nil, err 41 } 42 43 srcs := make(map[ec.Source]struct{}) 44 for _, k := range knots { 45 s := ec.NewKnotSource(k.Domain) 46 srcs[s] = struct{}{} 47 } 48 49 cache := cache.New(c.Redis.Addr) 50 cursorStore := cursor.NewRedisCursorStore(cache) 51 52 cfg := ec.ConsumerConfig{ 53 Sources: srcs, 54 ProcessFunc: knotIngester(d, enforcer, posthog, notifier, c.Core.Dev), 55 RetryInterval: c.Knotstream.RetryInterval, 56 MaxRetryInterval: c.Knotstream.MaxRetryInterval, 57 ConnectionTimeout: c.Knotstream.ConnectionTimeout, 58 WorkerCount: c.Knotstream.WorkerCount, 59 QueueSize: c.Knotstream.QueueSize, 60 Logger: logger, 61 Dev: c.Core.Dev, 62 CursorStore: &cursorStore, 63 } 64 65 return ec.NewConsumer(cfg), nil 66} 67 68func knotIngester(d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, notifier notify.Notifier, dev bool) ec.ProcessFunc { 69 return func(ctx context.Context, source ec.Source, msg ec.Message) error { 70 switch msg.Nsid { 71 case tangled.GitRefUpdateNSID: 72 return ingestRefUpdate(d, enforcer, posthog, notifier, dev, source, msg, ctx) 73 case tangled.PipelineNSID: 74 return ingestPipeline(d, source, msg) 75 } 76 77 return nil 78 } 79} 80 81func ingestRefUpdate(d *db.DB, enforcer *rbac.Enforcer, pc posthog.Client, notifier notify.Notifier, dev bool, source ec.Source, msg ec.Message, ctx context.Context) error { 82 logger := log.FromContext(ctx) 83 84 var record tangled.GitRefUpdate 85 err := json.Unmarshal(msg.EventJson, &record) 86 if err != nil { 87 return err 88 } 89 90 // backward compat: old knots emit owner DID as "repoDid" with no "ownerDid" field 91 if record.OwnerDid == "" && record.RepoDid != nil && *record.RepoDid != "" { 92 record.OwnerDid = *record.RepoDid 93 record.RepoDid = nil 94 } 95 96 knownKnots, err := enforcer.GetKnotsForUser(record.CommitterDid) 97 if err != nil { 98 return err 99 } 100 if !slices.Contains(knownKnots, source.Key()) { 101 return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Key()) 102 } 103 104 logger.Info("processing gitRefUpdate event", 105 "owner_did", record.OwnerDid, 106 "repo_name", record.RepoName, 107 "repo_did", record.RepoDid, 108 "ref", record.Ref, 109 "old_sha", record.OldSha, 110 "new_sha", record.NewSha) 111 112 var errWebhook error 113 var repos []models.Repo 114 115 if record.RepoDid != nil && *record.RepoDid != "" { 116 repo, lookupErr := db.GetRepoByDid(d, *record.RepoDid) 117 switch { 118 case lookupErr == nil: 119 repos = []models.Repo{*repo} 120 case !errors.Is(lookupErr, sql.ErrNoRows): 121 logger.Warn("failed to look up repo by DID", "err", lookupErr, "repoDid", *record.RepoDid) 122 } 123 } 124 125 if len(repos) == 0 { 126 repos, err = db.GetRepos( 127 d, 128 0, 129 orm.FilterEq("did", record.OwnerDid), 130 orm.FilterEq("name", record.RepoName), 131 ) 132 if err != nil { 133 errWebhook = fmt.Errorf("failed to lookup repo for webhooks: %w", err) 134 } 135 } 136 137 if errWebhook == nil && len(repos) == 1 { 138 notifier.Push(ctx, &repos[0], record.Ref, record.OldSha, record.NewSha, record.CommitterDid) 139 } 140 141 errPunchcard := populatePunchcard(d, record) 142 errLanguages := updateRepoLanguages(d, record) 143 144 var errPosthog error 145 if !dev && record.CommitterDid != "" { 146 errPosthog = pc.Enqueue(posthog.Capture{ 147 DistinctId: record.CommitterDid, 148 Event: "git_ref_update", 149 }) 150 } 151 152 return errors.Join(errWebhook, errPunchcard, errLanguages, errPosthog) 153} 154 155func populatePunchcard(d *db.DB, record tangled.GitRefUpdate) error { 156 if record.CommitterDid == "" { 157 return nil 158 } 159 160 knownEmails, err := db.GetAllEmails(d, record.CommitterDid) 161 if err != nil { 162 return err 163 } 164 165 count := 0 166 for _, ke := range knownEmails { 167 if record.Meta == nil { 168 continue 169 } 170 if record.Meta.CommitCount == nil { 171 continue 172 } 173 for _, ce := range record.Meta.CommitCount.ByEmail { 174 if ce == nil { 175 continue 176 } 177 if ce.Email == ke.Address || ce.Email == record.CommitterDid { 178 count += int(ce.Count) 179 } 180 } 181 } 182 183 punch := models.Punch{ 184 Did: record.CommitterDid, 185 Date: time.Now(), 186 Count: count, 187 } 188 return db.AddPunch(d, punch) 189} 190 191func updateRepoLanguages(d *db.DB, record tangled.GitRefUpdate) error { 192 if record.Meta == nil || record.Meta.LangBreakdown == nil || record.Meta.LangBreakdown.Inputs == nil { 193 return fmt.Errorf("empty language data for repo: %s/%s", record.OwnerDid, record.RepoName) 194 } 195 196 var repo models.Repo 197 if record.RepoDid != nil && *record.RepoDid != "" { 198 r, lookupErr := db.GetRepoByDid(d, *record.RepoDid) 199 switch { 200 case lookupErr == nil: 201 repo = *r 202 case !errors.Is(lookupErr, sql.ErrNoRows): 203 return fmt.Errorf("failed to look up repo by DID %s: %w", *record.RepoDid, lookupErr) 204 } 205 } 206 207 if repo.Id == 0 { 208 repos, err := db.GetRepos( 209 d, 210 0, 211 orm.FilterEq("did", record.OwnerDid), 212 orm.FilterEq("name", record.RepoName), 213 ) 214 if err != nil { 215 return fmt.Errorf("failed to look for repo in DB (%s/%s): %w", record.OwnerDid, record.RepoName, err) 216 } 217 if len(repos) != 1 { 218 return fmt.Errorf("incorrect number of repos returned: %d (expected 1)", len(repos)) 219 } 220 repo = repos[0] 221 } 222 223 ref := plumbing.ReferenceName(record.Ref) 224 if !ref.IsBranch() { 225 return fmt.Errorf("%s is not a valid reference name", ref) 226 } 227 228 var langs []models.RepoLanguage 229 for _, l := range record.Meta.LangBreakdown.Inputs { 230 if l == nil { 231 continue 232 } 233 234 langs = append(langs, models.RepoLanguage{ 235 RepoAt: repo.RepoAt(), 236 Ref: ref.Short(), 237 IsDefaultRef: record.Meta.IsDefaultRef, 238 Language: l.Lang, 239 Bytes: l.Size, 240 }) 241 } 242 243 tx, err := d.Begin() 244 if err != nil { 245 return err 246 } 247 defer tx.Rollback() 248 249 // update appview's cache 250 err = db.UpdateRepoLanguages(tx, repo.RepoAt(), ref.Short(), langs) 251 if err != nil { 252 fmt.Printf("failed; %s\n", err) 253 // non-fatal 254 } 255 256 return tx.Commit() 257} 258 259func ingestPipeline(d *db.DB, source ec.Source, msg ec.Message) error { 260 var record tangled.Pipeline 261 err := json.Unmarshal(msg.EventJson, &record) 262 if err != nil { 263 return err 264 } 265 266 if record.TriggerMetadata == nil { 267 return fmt.Errorf("empty trigger metadata: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 268 } 269 270 if record.TriggerMetadata.Repo == nil { 271 return fmt.Errorf("empty repo: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 272 } 273 274 // does this repo have a spindle configured? 275 var repos []models.Repo 276 if record.TriggerMetadata.Repo.RepoDid != nil && *record.TriggerMetadata.Repo.RepoDid != "" { 277 repo, lookupErr := db.GetRepoByDid(d, *record.TriggerMetadata.Repo.RepoDid) 278 switch { 279 case lookupErr == nil: 280 repos = []models.Repo{*repo} 281 case !errors.Is(lookupErr, sql.ErrNoRows): 282 return fmt.Errorf("failed to look up repo by DID %s: %w", *record.TriggerMetadata.Repo.RepoDid, lookupErr) 283 } 284 } 285 286 if len(repos) == 0 { 287 var err error 288 repos, err = db.GetRepos( 289 d, 290 0, 291 orm.FilterEq("did", record.TriggerMetadata.Repo.Did), 292 orm.FilterEq("name", record.TriggerMetadata.Repo.Repo), 293 ) 294 if err != nil { 295 return fmt.Errorf("failed to look for repo in DB: nsid %s, rkey %s, %w", msg.Nsid, msg.Rkey, err) 296 } 297 } 298 299 if len(repos) != 1 { 300 return fmt.Errorf("incorrect number of repos returned: %d (expected 1)", len(repos)) 301 } 302 if repos[0].Spindle == "" { 303 return fmt.Errorf("repo does not have a spindle configured yet: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 304 } 305 306 // trigger info 307 var trigger models.Trigger 308 var sha string 309 trigger.Kind = workflow.TriggerKind(record.TriggerMetadata.Kind) 310 switch trigger.Kind { 311 case workflow.TriggerKindPush: 312 trigger.PushRef = &record.TriggerMetadata.Push.Ref 313 trigger.PushNewSha = &record.TriggerMetadata.Push.NewSha 314 trigger.PushOldSha = &record.TriggerMetadata.Push.OldSha 315 sha = *trigger.PushNewSha 316 case workflow.TriggerKindPullRequest: 317 trigger.PRSourceBranch = &record.TriggerMetadata.PullRequest.SourceBranch 318 trigger.PRTargetBranch = &record.TriggerMetadata.PullRequest.TargetBranch 319 trigger.PRSourceSha = &record.TriggerMetadata.PullRequest.SourceSha 320 trigger.PRAction = &record.TriggerMetadata.PullRequest.Action 321 sha = *trigger.PRSourceSha 322 } 323 324 tx, err := d.Begin() 325 if err != nil { 326 return fmt.Errorf("failed to start txn: %w", err) 327 } 328 329 triggerId, err := db.AddTrigger(tx, trigger) 330 if err != nil { 331 return fmt.Errorf("failed to add trigger entry: %w", err) 332 } 333 334 pipeline := models.Pipeline{ 335 Rkey: msg.Rkey, 336 Knot: source.Key(), 337 RepoOwner: syntax.DID(record.TriggerMetadata.Repo.Did), 338 RepoName: record.TriggerMetadata.Repo.Repo, 339 TriggerId: int(triggerId), 340 Sha: sha, 341 } 342 343 err = db.AddPipeline(tx, pipeline) 344 if err != nil { 345 return fmt.Errorf("failed to add pipeline: %w", err) 346 } 347 348 err = tx.Commit() 349 if err != nil { 350 return fmt.Errorf("failed to commit txn: %w", err) 351 } 352 353 return nil 354}