Monorepo for Tangled tangled.org

appview: listen for pipeline events from spindlestream #987

open opened by boltless.me targeting master from sl/spindle-rewrite
Labels

None yet.

assignee

None yet.

Participants 1
AT URI
at://did:plc:xasnlahkri4ewmbuzly2rlc5/sh.tangled.repo.pull/3mckguakflj22
+89 -86
Diff #4
-86
appview/state/knotstream.go
··· 20 20 "tangled.org/core/log" 21 21 "tangled.org/core/orm" 22 22 "tangled.org/core/rbac" 23 - "tangled.org/core/workflow" 24 23 25 - "github.com/bluesky-social/indigo/atproto/syntax" 26 24 "github.com/go-git/go-git/v5/plumbing" 27 25 "github.com/posthog/posthog-go" 28 26 ) ··· 69 67 switch msg.Nsid { 70 68 case tangled.GitRefUpdateNSID: 71 69 return ingestRefUpdate(d, enforcer, posthog, notifier, dev, source, msg, ctx) 72 - case tangled.PipelineNSID: 73 - return ingestPipeline(d, source, msg) 74 70 } 75 71 76 72 return nil ··· 221 217 222 218 return tx.Commit() 223 219 } 224 - 225 - func ingestPipeline(d *db.DB, source ec.Source, msg ec.Message) error { 226 - var record tangled.Pipeline 227 - err := json.Unmarshal(msg.EventJson, &record) 228 - if err != nil { 229 - return err 230 - } 231 - 232 - if record.TriggerMetadata == nil { 233 - return fmt.Errorf("empty trigger metadata: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 234 - } 235 - 236 - if record.TriggerMetadata.Repo == nil { 237 - return fmt.Errorf("empty repo: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 238 - } 239 - 240 - // does this repo have a spindle configured? 241 - repos, err := db.GetRepos( 242 - d, 243 - 0, 244 - orm.FilterEq("did", record.TriggerMetadata.Repo.Did), 245 - orm.FilterEq("name", record.TriggerMetadata.Repo.Repo), 246 - ) 247 - if err != nil { 248 - return fmt.Errorf("failed to look for repo in DB: nsid %s, rkey %s, %w", msg.Nsid, msg.Rkey, err) 249 - } 250 - if len(repos) != 1 { 251 - return fmt.Errorf("incorrect number of repos returned: %d (expected 1)", len(repos)) 252 - } 253 - if repos[0].Spindle == "" { 254 - return fmt.Errorf("repo does not have a spindle configured yet: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 255 - } 256 - 257 - // trigger info 258 - var trigger models.Trigger 259 - var sha string 260 - trigger.Kind = workflow.TriggerKind(record.TriggerMetadata.Kind) 261 - switch trigger.Kind { 262 - case workflow.TriggerKindPush: 263 - trigger.PushRef = &record.TriggerMetadata.Push.Ref 264 - trigger.PushNewSha = &record.TriggerMetadata.Push.NewSha 265 - trigger.PushOldSha = &record.TriggerMetadata.Push.OldSha 266 - sha = *trigger.PushNewSha 267 - case workflow.TriggerKindPullRequest: 268 - trigger.PRSourceBranch = &record.TriggerMetadata.PullRequest.SourceBranch 269 - trigger.PRTargetBranch = &record.TriggerMetadata.PullRequest.TargetBranch 270 - trigger.PRSourceSha = &record.TriggerMetadata.PullRequest.SourceSha 271 - trigger.PRAction = &record.TriggerMetadata.PullRequest.Action 272 - sha = *trigger.PRSourceSha 273 - } 274 - 275 - tx, err := d.Begin() 276 - if err != nil { 277 - return fmt.Errorf("failed to start txn: %w", err) 278 - } 279 - 280 - triggerId, err := db.AddTrigger(tx, trigger) 281 - if err != nil { 282 - return fmt.Errorf("failed to add trigger entry: %w", err) 283 - } 284 - 285 - pipeline := models.Pipeline{ 286 - Rkey: msg.Rkey, 287 - Knot: source.Key(), 288 - RepoOwner: syntax.DID(record.TriggerMetadata.Repo.Did), 289 - RepoName: record.TriggerMetadata.Repo.Repo, 290 - TriggerId: int(triggerId), 291 - Sha: sha, 292 - } 293 - 294 - err = db.AddPipeline(tx, pipeline) 295 - if err != nil { 296 - return fmt.Errorf("failed to add pipeline: %w", err) 297 - } 298 - 299 - err = tx.Commit() 300 - if err != nil { 301 - return fmt.Errorf("failed to commit txn: %w", err) 302 - } 303 - 304 - return nil 305 - }
+89
appview/state/spindlestream.go
··· 20 20 "tangled.org/core/orm" 21 21 "tangled.org/core/rbac" 22 22 spindle "tangled.org/core/spindle/models" 23 + "tangled.org/core/workflow" 23 24 ) 24 25 25 26 func Spindlestream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer) (*ec.Consumer, error) { ··· 62 63 func spindleIngester(ctx context.Context, logger *slog.Logger, d *db.DB) ec.ProcessFunc { 63 64 return func(ctx context.Context, source ec.Source, msg ec.Message) error { 64 65 switch msg.Nsid { 66 + case tangled.PipelineNSID: 67 + return ingestPipeline(logger, d, source, msg) 65 68 case tangled.PipelineStatusNSID: 66 69 return ingestPipelineStatus(ctx, logger, d, source, msg) 67 70 } ··· 70 73 } 71 74 } 72 75 76 + func ingestPipeline(l *slog.Logger, d *db.DB, source ec.Source, msg ec.Message) error { 77 + var record tangled.Pipeline 78 + err := json.Unmarshal(msg.EventJson, &record) 79 + if err != nil { 80 + return err 81 + } 82 + 83 + if record.TriggerMetadata == nil { 84 + return fmt.Errorf("empty trigger metadata: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 85 + } 86 + 87 + if record.TriggerMetadata.Repo == nil { 88 + return fmt.Errorf("empty repo: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 89 + } 90 + 91 + // does this repo have a spindle configured? 92 + repos, err := db.GetRepos( 93 + d, 94 + 0, 95 + orm.FilterEq("did", record.TriggerMetadata.Repo.Did), 96 + orm.FilterEq("name", record.TriggerMetadata.Repo.Repo), 97 + ) 98 + if err != nil { 99 + return fmt.Errorf("failed to look for repo in DB: nsid %s, rkey %s, %w", msg.Nsid, msg.Rkey, err) 100 + } 101 + if len(repos) != 1 { 102 + return fmt.Errorf("incorrect number of repos returned: %d (expected 1)", len(repos)) 103 + } 104 + if repos[0].Spindle == "" { 105 + return fmt.Errorf("repo does not have a spindle configured yet: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 106 + } 107 + 108 + // trigger info 109 + var trigger models.Trigger 110 + var sha string 111 + trigger.Kind = workflow.TriggerKind(record.TriggerMetadata.Kind) 112 + switch trigger.Kind { 113 + case workflow.TriggerKindPush: 114 + trigger.PushRef = &record.TriggerMetadata.Push.Ref 115 + trigger.PushNewSha = &record.TriggerMetadata.Push.NewSha 116 + trigger.PushOldSha = &record.TriggerMetadata.Push.OldSha 117 + sha = *trigger.PushNewSha 118 + case workflow.TriggerKindPullRequest: 119 + trigger.PRSourceBranch = &record.TriggerMetadata.PullRequest.SourceBranch 120 + trigger.PRTargetBranch = &record.TriggerMetadata.PullRequest.TargetBranch 121 + trigger.PRSourceSha = &record.TriggerMetadata.PullRequest.SourceSha 122 + trigger.PRAction = &record.TriggerMetadata.PullRequest.Action 123 + sha = *trigger.PRSourceSha 124 + } 125 + 126 + tx, err := d.Begin() 127 + if err != nil { 128 + return fmt.Errorf("failed to start txn: %w", err) 129 + } 130 + 131 + triggerId, err := db.AddTrigger(tx, trigger) 132 + if err != nil { 133 + return fmt.Errorf("failed to add trigger entry: %w", err) 134 + } 135 + 136 + // TODO: we shouldn't even use knot to identify pipelines 137 + knot := record.TriggerMetadata.Repo.Knot 138 + pipeline := models.Pipeline{ 139 + Rkey: msg.Rkey, 140 + Knot: knot, 141 + RepoOwner: syntax.DID(record.TriggerMetadata.Repo.Did), 142 + RepoName: record.TriggerMetadata.Repo.Repo, 143 + TriggerId: int(triggerId), 144 + Sha: sha, 145 + } 146 + 147 + err = db.AddPipeline(tx, pipeline) 148 + if err != nil { 149 + return fmt.Errorf("failed to add pipeline: %w", err) 150 + } 151 + 152 + err = tx.Commit() 153 + if err != nil { 154 + return fmt.Errorf("failed to commit txn: %w", err) 155 + } 156 + 157 + l.Info("added pipeline", "pipeline", pipeline) 158 + 159 + return nil 160 + } 161 + 73 162 func ingestPipelineStatus(ctx context.Context, logger *slog.Logger, d *db.DB, source ec.Source, msg ec.Message) error { 74 163 var record tangled.PipelineStatus 75 164 err := json.Unmarshal(msg.EventJson, &record)

History

5 rounds 0 comments
sign up or login to add to the discussion
1 commit
expand
appview: listen for pipeline events from spindlestream
3/3 success
expand
merge conflicts detected
expand
  • appview/state/state.go:35
  • go.mod:9
  • go.sum:483
  • nix/gomod2nix.toml:510
expand 0 comments
1 commit
expand
appview: listen for pipeline events from spindlestream
1/3 failed, 2/3 success
expand
expand 0 comments
1 commit
expand
appview: listen for pipeline events from spindlestream
1/3 failed, 2/3 success
expand
expand 0 comments
1 commit
expand
appview: listen for pipeline events from spindlestream
1/3 failed, 2/3 success
expand
expand 0 comments
1 commit
expand
appview: listen for pipeline events from spindlestream
1/3 failed, 2/3 success
expand
expand 0 comments