Signed-off-by: Seongmin Lee git@boltless.me
-86
appview/state/knotstream.go
-86
appview/state/knotstream.go
···
20
20
"tangled.org/core/log"
21
21
"tangled.org/core/orm"
22
22
"tangled.org/core/rbac"
23
-
"tangled.org/core/workflow"
24
23
25
-
"github.com/bluesky-social/indigo/atproto/syntax"
26
24
"github.com/go-git/go-git/v5/plumbing"
27
25
"github.com/posthog/posthog-go"
28
26
)
···
69
67
switch msg.Nsid {
70
68
case tangled.GitRefUpdateNSID:
71
69
return ingestRefUpdate(d, enforcer, posthog, notifier, dev, source, msg, ctx)
72
-
case tangled.PipelineNSID:
73
-
return ingestPipeline(d, source, msg)
74
70
}
75
71
76
72
return nil
···
221
217
222
218
return tx.Commit()
223
219
}
224
-
225
-
func ingestPipeline(d *db.DB, source ec.Source, msg ec.Message) error {
226
-
var record tangled.Pipeline
227
-
err := json.Unmarshal(msg.EventJson, &record)
228
-
if err != nil {
229
-
return err
230
-
}
231
-
232
-
if record.TriggerMetadata == nil {
233
-
return fmt.Errorf("empty trigger metadata: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
234
-
}
235
-
236
-
if record.TriggerMetadata.Repo == nil {
237
-
return fmt.Errorf("empty repo: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
238
-
}
239
-
240
-
// does this repo have a spindle configured?
241
-
repos, err := db.GetRepos(
242
-
d,
243
-
0,
244
-
orm.FilterEq("did", record.TriggerMetadata.Repo.Did),
245
-
orm.FilterEq("name", record.TriggerMetadata.Repo.Repo),
246
-
)
247
-
if err != nil {
248
-
return fmt.Errorf("failed to look for repo in DB: nsid %s, rkey %s, %w", msg.Nsid, msg.Rkey, err)
249
-
}
250
-
if len(repos) != 1 {
251
-
return fmt.Errorf("incorrect number of repos returned: %d (expected 1)", len(repos))
252
-
}
253
-
if repos[0].Spindle == "" {
254
-
return fmt.Errorf("repo does not have a spindle configured yet: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
255
-
}
256
-
257
-
// trigger info
258
-
var trigger models.Trigger
259
-
var sha string
260
-
trigger.Kind = workflow.TriggerKind(record.TriggerMetadata.Kind)
261
-
switch trigger.Kind {
262
-
case workflow.TriggerKindPush:
263
-
trigger.PushRef = &record.TriggerMetadata.Push.Ref
264
-
trigger.PushNewSha = &record.TriggerMetadata.Push.NewSha
265
-
trigger.PushOldSha = &record.TriggerMetadata.Push.OldSha
266
-
sha = *trigger.PushNewSha
267
-
case workflow.TriggerKindPullRequest:
268
-
trigger.PRSourceBranch = &record.TriggerMetadata.PullRequest.SourceBranch
269
-
trigger.PRTargetBranch = &record.TriggerMetadata.PullRequest.TargetBranch
270
-
trigger.PRSourceSha = &record.TriggerMetadata.PullRequest.SourceSha
271
-
trigger.PRAction = &record.TriggerMetadata.PullRequest.Action
272
-
sha = *trigger.PRSourceSha
273
-
}
274
-
275
-
tx, err := d.Begin()
276
-
if err != nil {
277
-
return fmt.Errorf("failed to start txn: %w", err)
278
-
}
279
-
280
-
triggerId, err := db.AddTrigger(tx, trigger)
281
-
if err != nil {
282
-
return fmt.Errorf("failed to add trigger entry: %w", err)
283
-
}
284
-
285
-
pipeline := models.Pipeline{
286
-
Rkey: msg.Rkey,
287
-
Knot: source.Key(),
288
-
RepoOwner: syntax.DID(record.TriggerMetadata.Repo.Did),
289
-
RepoName: record.TriggerMetadata.Repo.Repo,
290
-
TriggerId: int(triggerId),
291
-
Sha: sha,
292
-
}
293
-
294
-
err = db.AddPipeline(tx, pipeline)
295
-
if err != nil {
296
-
return fmt.Errorf("failed to add pipeline: %w", err)
297
-
}
298
-
299
-
err = tx.Commit()
300
-
if err != nil {
301
-
return fmt.Errorf("failed to commit txn: %w", err)
302
-
}
303
-
304
-
return nil
305
-
}
+89
appview/state/spindlestream.go
+89
appview/state/spindlestream.go
···
20
20
"tangled.org/core/orm"
21
21
"tangled.org/core/rbac"
22
22
spindle "tangled.org/core/spindle/models"
23
+
"tangled.org/core/workflow"
23
24
)
24
25
25
26
func Spindlestream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer) (*ec.Consumer, error) {
···
62
63
func spindleIngester(ctx context.Context, logger *slog.Logger, d *db.DB) ec.ProcessFunc {
63
64
return func(ctx context.Context, source ec.Source, msg ec.Message) error {
64
65
switch msg.Nsid {
66
+
case tangled.PipelineNSID:
67
+
return ingestPipeline(logger, d, source, msg)
65
68
case tangled.PipelineStatusNSID:
66
69
return ingestPipelineStatus(ctx, logger, d, source, msg)
67
70
}
···
70
73
}
71
74
}
72
75
76
+
func ingestPipeline(l *slog.Logger, d *db.DB, source ec.Source, msg ec.Message) error {
77
+
var record tangled.Pipeline
78
+
err := json.Unmarshal(msg.EventJson, &record)
79
+
if err != nil {
80
+
return err
81
+
}
82
+
83
+
if record.TriggerMetadata == nil {
84
+
return fmt.Errorf("empty trigger metadata: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
85
+
}
86
+
87
+
if record.TriggerMetadata.Repo == nil {
88
+
return fmt.Errorf("empty repo: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
89
+
}
90
+
91
+
// does this repo have a spindle configured?
92
+
repos, err := db.GetRepos(
93
+
d,
94
+
0,
95
+
orm.FilterEq("did", record.TriggerMetadata.Repo.Did),
96
+
orm.FilterEq("name", record.TriggerMetadata.Repo.Repo),
97
+
)
98
+
if err != nil {
99
+
return fmt.Errorf("failed to look for repo in DB: nsid %s, rkey %s, %w", msg.Nsid, msg.Rkey, err)
100
+
}
101
+
if len(repos) != 1 {
102
+
return fmt.Errorf("incorrect number of repos returned: %d (expected 1)", len(repos))
103
+
}
104
+
if repos[0].Spindle == "" {
105
+
return fmt.Errorf("repo does not have a spindle configured yet: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
106
+
}
107
+
108
+
// trigger info
109
+
var trigger models.Trigger
110
+
var sha string
111
+
trigger.Kind = workflow.TriggerKind(record.TriggerMetadata.Kind)
112
+
switch trigger.Kind {
113
+
case workflow.TriggerKindPush:
114
+
trigger.PushRef = &record.TriggerMetadata.Push.Ref
115
+
trigger.PushNewSha = &record.TriggerMetadata.Push.NewSha
116
+
trigger.PushOldSha = &record.TriggerMetadata.Push.OldSha
117
+
sha = *trigger.PushNewSha
118
+
case workflow.TriggerKindPullRequest:
119
+
trigger.PRSourceBranch = &record.TriggerMetadata.PullRequest.SourceBranch
120
+
trigger.PRTargetBranch = &record.TriggerMetadata.PullRequest.TargetBranch
121
+
trigger.PRSourceSha = &record.TriggerMetadata.PullRequest.SourceSha
122
+
trigger.PRAction = &record.TriggerMetadata.PullRequest.Action
123
+
sha = *trigger.PRSourceSha
124
+
}
125
+
126
+
tx, err := d.Begin()
127
+
if err != nil {
128
+
return fmt.Errorf("failed to start txn: %w", err)
129
+
}
130
+
131
+
triggerId, err := db.AddTrigger(tx, trigger)
132
+
if err != nil {
133
+
return fmt.Errorf("failed to add trigger entry: %w", err)
134
+
}
135
+
136
+
// TODO: we shouldn't even use knot to identify pipelines
137
+
knot := record.TriggerMetadata.Repo.Knot
138
+
pipeline := models.Pipeline{
139
+
Rkey: msg.Rkey,
140
+
Knot: knot,
141
+
RepoOwner: syntax.DID(record.TriggerMetadata.Repo.Did),
142
+
RepoName: record.TriggerMetadata.Repo.Repo,
143
+
TriggerId: int(triggerId),
144
+
Sha: sha,
145
+
}
146
+
147
+
err = db.AddPipeline(tx, pipeline)
148
+
if err != nil {
149
+
return fmt.Errorf("failed to add pipeline: %w", err)
150
+
}
151
+
152
+
err = tx.Commit()
153
+
if err != nil {
154
+
return fmt.Errorf("failed to commit txn: %w", err)
155
+
}
156
+
157
+
l.Info("added pipeline", "pipeline", pipeline)
158
+
159
+
return nil
160
+
}
161
+
73
162
func ingestPipelineStatus(ctx context.Context, logger *slog.Logger, d *db.DB, source ec.Source, msg ec.Message) error {
74
163
var record tangled.PipelineStatus
75
164
err := json.Unmarshal(msg.EventJson, &record)
History
5 rounds
0 comments
boltless.me
submitted
#4
1 commit
expand
collapse
appview: listen for pipeline events from spindlestream
Signed-off-by: Seongmin Lee <git@boltless.me>
3/3 success
expand
collapse
merge conflicts detected
expand
collapse
expand
collapse
- appview/state/state.go:35
- go.mod:9
- go.sum:483
- nix/gomod2nix.toml:510
expand 0 comments
boltless.me
submitted
#3
1 commit
expand
collapse
appview: listen for pipeline events from spindlestream
Signed-off-by: Seongmin Lee <git@boltless.me>
1/3 failed, 2/3 success
expand
collapse
expand 0 comments
boltless.me
submitted
#2
1 commit
expand
collapse
appview: listen for pipeline events from spindlestream
Signed-off-by: Seongmin Lee <git@boltless.me>
1/3 failed, 2/3 success
expand
collapse
expand 0 comments
boltless.me
submitted
#1
1 commit
expand
collapse
appview: listen for pipeline events from spindlestream
Signed-off-by: Seongmin Lee <git@boltless.me>
1/3 failed, 2/3 success
expand
collapse
expand 0 comments
boltless.me
submitted
#0
1 commit
expand
collapse
appview: listen for pipeline events from spindlestream
Signed-off-by: Seongmin Lee <git@boltless.me>