package spindle import ( "context" "encoding/json" "fmt" "time" "github.com/bluesky-social/indigo/atproto/syntax" "tangled.org/core/api/tangled" "tangled.org/core/eventconsumer" "tangled.org/core/spindle/db" "tangled.org/core/spindle/git" "tangled.org/core/spindle/models" "tangled.org/core/tap" "tangled.org/core/tid" "tangled.org/core/workflow" ) func (s *Spindle) processEvent(ctx context.Context, evt tap.Event) error { l := s.l.With("component", "tapIndexer") var err error switch evt.Type { case tap.EvtRecord: switch evt.Record.Collection.String() { case tangled.SpindleMemberNSID: err = s.processMember(ctx, evt) case tangled.RepoNSID: err = s.processRepo(ctx, evt) case tangled.RepoCollaboratorNSID: err = s.processCollaborator(ctx, evt) case tangled.RepoPullNSID: err = s.processPull(ctx, evt) } case tap.EvtIdentity: // no-op } if err != nil { l.Error("failed to process message. will retry later", "event.ID", evt.ID, "err", err) return err } return nil } // NOTE: make sure to return nil if we don't need to retry (e.g. forbidden, unrelated) func (s *Spindle) processMember(ctx context.Context, evt tap.Event) error { l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri()) l.Info("processing spindle.member record") // only listen to members if ok, err := s.e.IsSpindleMemberInviteAllowed(evt.Record.Did, s.cfg.Server.Did()); !ok || err != nil { l.Warn("forbidden request: member invite not allowed", "did", evt.Record.Did, "error", err) return nil } switch evt.Record.Action { case tap.RecordCreateAction, tap.RecordUpdateAction: record := tangled.SpindleMember{} if err := json.Unmarshal(evt.Record.Record, &record); err != nil { return fmt.Errorf("parsing record: %w", err) } domain := s.cfg.Server.Hostname if record.Instance != domain { l.Info("domain mismatch", "domain", record.Instance, "expected", domain) return nil } created, err := time.Parse(record.CreatedAt, time.RFC3339) if err != nil { created = time.Now() } if err := db.AddSpindleMember(s.db, db.SpindleMember{ Did: evt.Record.Did, Rkey: evt.Record.Rkey.String(), Instance: record.Instance, Subject: syntax.DID(record.Subject), Created: created, }); err != nil { l.Error("failed to add member", "error", err) return fmt.Errorf("adding member to db: %w", err) } if err := s.e.AddSpindleMember(syntax.DID(record.Subject), s.cfg.Server.Did()); err != nil { return fmt.Errorf("adding member to rbac: %w", err) } if err := s.tap.AddRepos(ctx, []syntax.DID{syntax.DID(record.Subject)}); err != nil { return fmt.Errorf("adding did to tap", err) } l.Info("added member", "member", record.Subject) return nil case tap.RecordDeleteAction: var ( did = evt.Record.Did.String() rkey = evt.Record.Rkey.String() ) member, err := db.GetSpindleMember(s.db, did, rkey) if err != nil { return fmt.Errorf("finding member: %w", err) } if err := db.RemoveSpindleMember(s.db, did, rkey); err != nil { return fmt.Errorf("removing member from db: %w", err) } if err := s.e.RemoveSpindleMember(member.Subject, s.cfg.Server.Did()); err != nil { return fmt.Errorf("removing member from rbac: %w", err) } if err := s.tapSafeRemoveDid(ctx, member.Subject); err != nil { return fmt.Errorf("removing did from tap: %w", err) } l.Info("removed member", "member", member.Subject) return nil } return nil } func (s *Spindle) processCollaborator(ctx context.Context, evt tap.Event) error { l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri()) l.Info("processing repo.collaborator record") // only listen to members if ok, err := s.e.IsSpindleMember(evt.Record.Did, s.cfg.Server.Did()); !ok || err != nil { l.Warn("forbidden request: not spindle member", "did", evt.Record.Did, "err", err) return nil } switch evt.Record.Action { case tap.RecordCreateAction, tap.RecordUpdateAction: record := tangled.RepoCollaborator{} if err := json.Unmarshal(evt.Record.Record, &record); err != nil { l.Error("invalid record", "err", err) return fmt.Errorf("parsing record: %w", err) } // retry later if target repo is not ingested yet if _, err := s.db.GetRepo(syntax.ATURI(record.Repo)); err != nil { l.Warn("target repo is not ingested yet", "repo", record.Repo, "err", err) return fmt.Errorf("target repo is unknown") } // check perms for this user if ok, err := s.e.IsRepoCollaboratorInviteAllowed(evt.Record.Did, syntax.ATURI(record.Repo)); !ok || err != nil { l.Warn("forbidden request collaborator invite not allowed", "did", evt.Record.Did, "err", err) return nil } if err := s.db.PutRepoCollaborator(&db.RepoCollaborator{ Did: evt.Record.Did, Rkey: evt.Record.Rkey, Repo: syntax.ATURI(record.Repo), Subject: syntax.DID(record.Subject), }); err != nil { return fmt.Errorf("adding collaborator to db: %w", err) } if err := s.e.AddRepoCollaborator(syntax.DID(record.Subject), syntax.ATURI(record.Repo)); err != nil { return fmt.Errorf("adding collaborator to rbac: %w", err) } if err := s.tap.AddRepos(ctx, []syntax.DID{syntax.DID(record.Subject)}); err != nil { return fmt.Errorf("adding did to tap: %w", err) } l.Info("add repo collaborator", "subejct", record.Subject, "repo", record.Repo) return nil case tap.RecordDeleteAction: // get existing collaborator collaborator, err := s.db.GetRepoCollaborator(evt.Record.Did, evt.Record.Rkey) if err != nil { return fmt.Errorf("failed to get existing collaborator info: %w", err) } // check perms for this user if ok, err := s.e.IsRepoCollaboratorInviteAllowed(evt.Record.Did, collaborator.Repo); !ok || err != nil { l.Warn("forbidden request collaborator invite not allowed", "did", evt.Record.Did, "err", err) return nil } if err := s.db.RemoveRepoCollaborator(collaborator.Subject, collaborator.Rkey); err != nil { return fmt.Errorf("removing collaborator from db: %w", err) } if err := s.e.RemoveRepoCollaborator(collaborator.Subject, collaborator.Repo); err != nil { return fmt.Errorf("removing collaborator from rbac: %w", err) } if err := s.tapSafeRemoveDid(ctx, collaborator.Subject); err != nil { return fmt.Errorf("removing did from tap: %w", err) } l.Info("removed repo collaborator", "subejct", collaborator.Subject, "repo", collaborator.Repo) return nil } return nil } func (s *Spindle) processRepo(ctx context.Context, evt tap.Event) error { l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri()) l.Info("processing repo record") // only listen to members if ok, err := s.e.IsSpindleMember(evt.Record.Did, s.cfg.Server.Did()); !ok || err != nil { l.Warn("forbidden request: not spindle member", "did", evt.Record.Did, "err", err) return nil } switch evt.Record.Action { case tap.RecordCreateAction, tap.RecordUpdateAction: record := tangled.Repo{} if err := json.Unmarshal(evt.Record.Record, &record); err != nil { return fmt.Errorf("parsing record: %w", err) } domain := s.cfg.Server.Hostname if record.Spindle == nil || *record.Spindle != domain { if record.Spindle == nil { l.Info("spindle isn't configured", "name", record.Name) } else { l.Info("different spindle configured", "name", record.Name, "spindle", *record.Spindle, "domain", domain) } if err := s.db.DeleteRepo(evt.Record.Did, evt.Record.Rkey); err != nil { return fmt.Errorf("deleting repo from db: %w", err) } return nil } repo := &db.Repo{ Did: evt.Record.Did, Rkey: evt.Record.Rkey, Name: record.Name, Knot: record.Knot, } if err := s.db.PutRepo(repo); err != nil { return fmt.Errorf("adding repo to db: %w", err) } if err := s.e.AddRepo(evt.Record.AtUri()); err != nil { return fmt.Errorf("adding repo to rbac") } // add this knot to the event consumer src := eventconsumer.NewKnotSource(record.Knot) s.ks.AddSource(context.Background(), src) // setup sparse sync repoCloneUri := s.newRepoCloneUrl(repo.Knot, repo.Did.String(), repo.Name) repoPath := s.newRepoPath(repo.Did, repo.Rkey) if err := git.SparseSyncGitRepo(ctx, repoCloneUri, repoPath, ""); err != nil { return fmt.Errorf("setting up sparse-clone git repo: %w", err) } l.Info("added repo", "repo", evt.Record.AtUri()) return nil case tap.RecordDeleteAction: // check perms for this user if ok, err := s.e.IsRepoOwner(evt.Record.Did, evt.Record.AtUri()); !ok || err != nil { l.Warn("forbidden request: not repo owner", "did", evt.Record.Did, "err", err) return nil } if err := s.db.DeleteRepo(evt.Record.Did, evt.Record.Rkey); err != nil { return fmt.Errorf("deleting repo from db: %w", err) } if err := s.e.DeleteRepo(evt.Record.AtUri()); err != nil { return fmt.Errorf("deleting repo from rbac: %w", err) } l.Info("deleted repo", "repo", evt.Record.AtUri()) return nil } return nil } func (s *Spindle) processPull(ctx context.Context, evt tap.Event) error { l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri()) l.Info("processing pull record") // only listen to live events if !evt.Record.Live { l.Info("skipping backfill event", "event", evt.Record.AtUri()) return nil } switch evt.Record.Action { case tap.RecordCreateAction, tap.RecordUpdateAction: record := tangled.RepoPull{} if err := json.Unmarshal(evt.Record.Record, &record); err != nil { l.Error("invalid record", "err", err) return fmt.Errorf("parsing record: %w", err) } // ignore legacy records if record.Target == nil { l.Info("ignoring pull record: target repo is nil") return nil } // ignore patch-based and fork-based PRs if record.Source == nil || record.Source.Repo != nil { l.Info("ignoring pull record: not a branch-based pull request") return nil } // skip if target repo is unknown repo, err := s.db.GetRepo(syntax.ATURI(record.Target.Repo)) if err != nil { l.Warn("target repo is not ingested yet", "repo", record.Target.Repo, "err", err) return fmt.Errorf("target repo is unknown") } compiler := workflow.Compiler{ Trigger: tangled.Pipeline_TriggerMetadata{ Kind: string(workflow.TriggerKindPullRequest), PullRequest: &tangled.Pipeline_PullRequestTriggerData{ Action: "create", SourceBranch: record.Source.Branch, SourceSha: record.Source.Sha, TargetBranch: record.Target.Branch, }, Repo: &tangled.Pipeline_TriggerRepo{ Did: repo.Did.String(), Knot: repo.Knot, Repo: repo.Name, }, }, } repoUri := s.newRepoCloneUrl(repo.Knot, repo.Did.String(), repo.Name) repoPath := s.newRepoPath(repo.Did, repo.Rkey) // load workflow definitions from rev (without spindle context) rawPipeline, err := s.loadPipeline(ctx, repoUri, repoPath, record.Source.Sha) if err != nil { // don't retry l.Error("failed loading pipeline", "err", err) return nil } if len(rawPipeline) == 0 { l.Info("no workflow definition find for the repo. skipping the event") return nil } tpl := compiler.Compile(compiler.Parse(rawPipeline)) // TODO: pass compile error to workflow log for _, w := range compiler.Diagnostics.Errors { l.Error(w.String()) } for _, w := range compiler.Diagnostics.Warnings { l.Warn(w.String()) } pipelineId := models.PipelineId{ Knot: tpl.TriggerMetadata.Repo.Knot, Rkey: tid.TID(), } if err := s.db.CreatePipelineEvent(pipelineId.Rkey, tpl, s.n); err != nil { l.Error("failed to create pipeline event", "err", err) return nil } err = s.processPipeline(ctx, tpl, pipelineId) if err != nil { // don't retry l.Error("failed processing pipeline", "err", err) return nil } case tap.RecordDeleteAction: // no-op } return nil } func (s *Spindle) tapSafeRemoveDid(ctx context.Context, did syntax.DID) error { known, err := s.db.IsKnownDid(syntax.DID(did)) if err != nil { return fmt.Errorf("ensuring did known state: %w", err) } if !known { if err := s.tap.RemoveRepos(ctx, []syntax.DID{did}); err != nil { return fmt.Errorf("removing did from tap: %w", err) } } return nil }