Monorepo for Tangled
at push-zpskmntwpyxz 389 lines 11 kB view raw
1package state 2 3import ( 4 "context" 5 "database/sql" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "slices" 10 "time" 11 12 "tangled.org/core/appview/notify" 13 14 "tangled.org/core/api/tangled" 15 knotdb "tangled.org/core/knotserver/db" 16 "tangled.org/core/appview/cache" 17 "tangled.org/core/appview/config" 18 "tangled.org/core/appview/db" 19 "tangled.org/core/appview/models" 20 ec "tangled.org/core/eventconsumer" 21 "tangled.org/core/eventconsumer/cursor" 22 "tangled.org/core/log" 23 "tangled.org/core/orm" 24 "tangled.org/core/rbac" 25 "tangled.org/core/workflow" 26 27 "github.com/bluesky-social/indigo/atproto/syntax" 28 "github.com/go-git/go-git/v5/plumbing" 29 "github.com/posthog/posthog-go" 30) 31 32func Knotstream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, notifier notify.Notifier) (*ec.Consumer, error) { 33 logger := log.FromContext(ctx) 34 logger = log.SubLogger(logger, "knotstream") 35 36 knots, err := db.GetRegistrations( 37 d, 38 orm.FilterIsNot("registered", "null"), 39 ) 40 if err != nil { 41 return nil, err 42 } 43 44 srcs := make(map[ec.Source]struct{}) 45 for _, k := range knots { 46 s := ec.NewKnotSource(k.Domain) 47 srcs[s] = struct{}{} 48 } 49 50 cache := cache.New(c.Redis.Addr) 51 cursorStore := cursor.NewRedisCursorStore(cache) 52 53 cfg := ec.ConsumerConfig{ 54 Sources: srcs, 55 ProcessFunc: knotIngester(d, enforcer, posthog, notifier, c.Core.Dev), 56 RetryInterval: c.Knotstream.RetryInterval, 57 MaxRetryInterval: c.Knotstream.MaxRetryInterval, 58 ConnectionTimeout: c.Knotstream.ConnectionTimeout, 59 WorkerCount: c.Knotstream.WorkerCount, 60 QueueSize: c.Knotstream.QueueSize, 61 Logger: logger, 62 Dev: c.Core.Dev, 63 CursorStore: &cursorStore, 64 } 65 66 return ec.NewConsumer(cfg), nil 67} 68 69func knotIngester(d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, notifier notify.Notifier, dev bool) ec.ProcessFunc { 70 return func(ctx context.Context, source ec.Source, msg ec.Message) error { 71 switch msg.Nsid { 72 case tangled.GitRefUpdateNSID: 73 return ingestRefUpdate(d, enforcer, posthog, notifier, dev, source, msg, ctx) 74 case tangled.PipelineNSID: 75 return ingestPipeline(d, source, msg) 76 case knotdb.RepoDIDAssignNSID: 77 return ingestDIDAssign(d, enforcer, source, msg, ctx) 78 } 79 80 return nil 81 } 82} 83 84func ingestRefUpdate(d *db.DB, enforcer *rbac.Enforcer, pc posthog.Client, notifier notify.Notifier, dev bool, source ec.Source, msg ec.Message, ctx context.Context) error { 85 logger := log.FromContext(ctx) 86 87 var record tangled.GitRefUpdate 88 err := json.Unmarshal(msg.EventJson, &record) 89 if err != nil { 90 return err 91 } 92 93 if record.RepoDid == "" { 94 logger.Error("gitRefUpdate missing repoDid, skipping", "owner_did", record.OwnerDid, "repo_name", record.RepoName) 95 return fmt.Errorf("gitRefUpdate missing repoDid") 96 } 97 98 knownKnots, err := enforcer.GetKnotsForUser(record.CommitterDid) 99 if err != nil { 100 return err 101 } 102 if !slices.Contains(knownKnots, source.Key()) { 103 return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Key()) 104 } 105 106 logger.Info("processing gitRefUpdate event", 107 "repo_did", record.RepoDid, 108 "ref", record.Ref, 109 "old_sha", record.OldSha, 110 "new_sha", record.NewSha) 111 112 var errWebhook error 113 114 repo, lookupErr := db.GetRepoByDid(d, record.RepoDid) 115 if lookupErr != nil && !errors.Is(lookupErr, sql.ErrNoRows) { 116 return fmt.Errorf("failed to look up repo by DID %s: %w", record.RepoDid, lookupErr) 117 } 118 119 var repos []models.Repo 120 if lookupErr == nil { 121 repos = []models.Repo{*repo} 122 } 123 124 if errWebhook == nil && len(repos) == 1 { 125 notifier.Push(ctx, &repos[0], record.Ref, record.OldSha, record.NewSha, record.CommitterDid) 126 } 127 128 errPunchcard := populatePunchcard(d, record) 129 errLanguages := updateRepoLanguages(d, record) 130 131 var errPosthog error 132 if !dev && record.CommitterDid != "" { 133 errPosthog = pc.Enqueue(posthog.Capture{ 134 DistinctId: record.CommitterDid, 135 Event: "git_ref_update", 136 }) 137 } 138 139 return errors.Join(errWebhook, errPunchcard, errLanguages, errPosthog) 140} 141 142func populatePunchcard(d *db.DB, record tangled.GitRefUpdate) error { 143 if record.CommitterDid == "" { 144 return nil 145 } 146 147 knownEmails, err := db.GetAllEmails(d, record.CommitterDid) 148 if err != nil { 149 return err 150 } 151 152 count := 0 153 for _, ke := range knownEmails { 154 if record.Meta == nil { 155 continue 156 } 157 if record.Meta.CommitCount == nil { 158 continue 159 } 160 for _, ce := range record.Meta.CommitCount.ByEmail { 161 if ce == nil { 162 continue 163 } 164 if ce.Email == ke.Address || ce.Email == record.CommitterDid { 165 count += int(ce.Count) 166 } 167 } 168 } 169 170 punch := models.Punch{ 171 Did: record.CommitterDid, 172 Date: time.Now(), 173 Count: count, 174 } 175 return db.AddPunch(d, punch) 176} 177 178func updateRepoLanguages(d *db.DB, record tangled.GitRefUpdate) error { 179 if record.Meta == nil || record.Meta.LangBreakdown == nil || record.Meta.LangBreakdown.Inputs == nil { 180 return fmt.Errorf("empty language data for repo: %s/%s", record.OwnerDid, record.RepoName) 181 } 182 183 if record.RepoDid == "" { 184 return fmt.Errorf("gitRefUpdate missing repoDid for language update") 185 } 186 187 r, lookupErr := db.GetRepoByDid(d, record.RepoDid) 188 if lookupErr != nil { 189 return fmt.Errorf("failed to look up repo by DID %s: %w", record.RepoDid, lookupErr) 190 } 191 repo := *r 192 193 ref := plumbing.ReferenceName(record.Ref) 194 if !ref.IsBranch() { 195 return fmt.Errorf("%s is not a valid reference name", ref) 196 } 197 198 var langs []models.RepoLanguage 199 for _, l := range record.Meta.LangBreakdown.Inputs { 200 if l == nil { 201 continue 202 } 203 204 langs = append(langs, models.RepoLanguage{ 205 RepoAt: repo.RepoAt(), 206 RepoDid: repo.RepoDid, 207 Ref: ref.Short(), 208 IsDefaultRef: record.Meta.IsDefaultRef, 209 Language: l.Lang, 210 Bytes: l.Size, 211 }) 212 } 213 214 tx, err := d.Begin() 215 if err != nil { 216 return err 217 } 218 defer tx.Rollback() 219 220 // update appview's cache 221 err = db.UpdateRepoLanguages(tx, repo.RepoAt(), ref.Short(), langs) 222 if err != nil { 223 fmt.Printf("failed; %s\n", err) 224 // non-fatal 225 } 226 227 return tx.Commit() 228} 229 230func ingestPipeline(d *db.DB, source ec.Source, msg ec.Message) error { 231 var record tangled.Pipeline 232 err := json.Unmarshal(msg.EventJson, &record) 233 if err != nil { 234 return err 235 } 236 237 if record.TriggerMetadata == nil { 238 return fmt.Errorf("empty trigger metadata: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 239 } 240 241 if record.TriggerMetadata.Repo == nil { 242 return fmt.Errorf("empty repo: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 243 } 244 245 if record.TriggerMetadata.Repo.RepoDid == "" { 246 return fmt.Errorf("pipeline missing repoDid: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 247 } 248 249 repo, lookupErr := db.GetRepoByDid(d, record.TriggerMetadata.Repo.RepoDid) 250 if lookupErr != nil { 251 return fmt.Errorf("failed to look up repo by DID %s: %w", record.TriggerMetadata.Repo.RepoDid, lookupErr) 252 } 253 repos := []models.Repo{*repo} 254 if repos[0].Spindle == "" { 255 return fmt.Errorf("repo does not have a spindle configured yet: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 256 } 257 258 // trigger info 259 var trigger models.Trigger 260 var sha string 261 trigger.Kind = workflow.TriggerKind(record.TriggerMetadata.Kind) 262 switch trigger.Kind { 263 case workflow.TriggerKindPush: 264 trigger.PushRef = &record.TriggerMetadata.Push.Ref 265 trigger.PushNewSha = &record.TriggerMetadata.Push.NewSha 266 trigger.PushOldSha = &record.TriggerMetadata.Push.OldSha 267 sha = *trigger.PushNewSha 268 case workflow.TriggerKindPullRequest: 269 trigger.PRSourceBranch = &record.TriggerMetadata.PullRequest.SourceBranch 270 trigger.PRTargetBranch = &record.TriggerMetadata.PullRequest.TargetBranch 271 trigger.PRSourceSha = &record.TriggerMetadata.PullRequest.SourceSha 272 trigger.PRAction = &record.TriggerMetadata.PullRequest.Action 273 sha = *trigger.PRSourceSha 274 } 275 276 tx, err := d.Begin() 277 if err != nil { 278 return fmt.Errorf("failed to start txn: %w", err) 279 } 280 281 triggerId, err := db.AddTrigger(tx, trigger) 282 if err != nil { 283 return fmt.Errorf("failed to add trigger entry: %w", err) 284 } 285 286 pipeline := models.Pipeline{ 287 Rkey: msg.Rkey, 288 Knot: source.Key(), 289 RepoOwner: syntax.DID(record.TriggerMetadata.Repo.Did), 290 RepoName: record.TriggerMetadata.Repo.Repo, 291 RepoDid: repos[0].RepoDid, 292 TriggerId: int(triggerId), 293 Sha: sha, 294 } 295 296 err = db.AddPipeline(tx, pipeline) 297 if err != nil { 298 return fmt.Errorf("failed to add pipeline: %w", err) 299 } 300 301 err = tx.Commit() 302 if err != nil { 303 return fmt.Errorf("failed to commit txn: %w", err) 304 } 305 306 return nil 307} 308 309func ingestDIDAssign(d *db.DB, enforcer *rbac.Enforcer, source ec.Source, msg ec.Message, ctx context.Context) error { 310 logger := log.FromContext(ctx) 311 312 var record knotdb.RepoDIDAssign 313 if err := json.Unmarshal(msg.EventJson, &record); err != nil { 314 return fmt.Errorf("unmarshal didAssign: %w", err) 315 } 316 317 if record.RepoDid == "" || record.OwnerDid == "" || record.RepoName == "" { 318 return fmt.Errorf("didAssign missing required fields: repoDid=%q ownerDid=%q repoName=%q", 319 record.RepoDid, record.OwnerDid, record.RepoName) 320 } 321 322 logger.Info("processing didAssign event", 323 "repo_did", record.RepoDid, 324 "owner_did", record.OwnerDid, 325 "repo_name", record.RepoName) 326 327 repos, err := db.GetRepos(d, 1, 328 orm.FilterEq("did", record.OwnerDid), 329 orm.FilterEq("name", record.RepoName), 330 ) 331 if err != nil || len(repos) == 0 { 332 logger.Warn("didAssign for unknown repo, skipping", 333 "owner_did", record.OwnerDid, 334 "repo_name", record.RepoName) 335 return nil 336 } 337 repo := repos[0] 338 repoAtUri := repo.RepoAt().String() 339 knot := source.Key() 340 legacyResource := record.OwnerDid + "/" + record.RepoName 341 342 if repo.RepoDid != record.RepoDid { 343 tx, err := d.Begin() 344 if err != nil { 345 return fmt.Errorf("begin didAssign txn: %w", err) 346 } 347 defer tx.Rollback() 348 349 if err := db.CascadeRepoDid(tx, repoAtUri, record.RepoDid); err != nil { 350 return fmt.Errorf("cascade repo_did: %w", err) 351 } 352 353 if err := db.EnqueuePdsRewritesForRepo(tx, record.RepoDid, repoAtUri); err != nil { 354 return fmt.Errorf("enqueue pds rewrites: %w", err) 355 } 356 357 if err := tx.Commit(); err != nil { 358 return fmt.Errorf("commit didAssign txn: %w", err) 359 } 360 } 361 362 if err := enforcer.RemoveRepo(record.OwnerDid, knot, legacyResource); err != nil { 363 return fmt.Errorf("remove legacy RBAC policies for %s: %w", legacyResource, err) 364 } 365 if err := enforcer.AddRepo(record.OwnerDid, knot, record.RepoDid); err != nil { 366 return fmt.Errorf("add RBAC policies for %s: %w", record.RepoDid, err) 367 } 368 369 collabs, collabErr := db.GetCollaborators(d, orm.FilterEq("repo_at", repoAtUri)) 370 if collabErr != nil { 371 return fmt.Errorf("get collaborators for RBAC update: %w", collabErr) 372 } 373 for _, c := range collabs { 374 collabDid := c.SubjectDid.String() 375 if err := enforcer.RemoveCollaborator(collabDid, knot, legacyResource); err != nil { 376 return fmt.Errorf("remove collaborator RBAC for %s: %w", collabDid, err) 377 } 378 if err := enforcer.AddCollaborator(collabDid, knot, record.RepoDid); err != nil { 379 return fmt.Errorf("add collaborator RBAC for %s: %w", collabDid, err) 380 } 381 } 382 383 logger.Info("didAssign processed successfully", 384 "repo_did", record.RepoDid, 385 "owner_did", record.OwnerDid, 386 "repo_name", record.RepoName) 387 388 return nil 389}