Monorepo for Tangled tangled.org

lexicons,appview,spindle: add workflow cancel #889

merged opened by boltless.me targeting master from sl/okmkyytolvko
Labels

None yet.

assignee

None yet.

Participants 2
AT URI
at://did:plc:xasnlahkri4ewmbuzly2rlc5/sh.tangled.repo.pull/3m7zmleup7g22
+264
Diff #0
+34
api/tangled/pipelinecancelPipeline.go
···
··· 1 + // Code generated by cmd/lexgen (see Makefile's lexgen); DO NOT EDIT. 2 + 3 + package tangled 4 + 5 + // schema: sh.tangled.pipeline.cancelPipeline 6 + 7 + import ( 8 + "context" 9 + 10 + "github.com/bluesky-social/indigo/lex/util" 11 + ) 12 + 13 + const ( 14 + PipelineCancelPipelineNSID = "sh.tangled.pipeline.cancelPipeline" 15 + ) 16 + 17 + // PipelineCancelPipeline_Input is the input argument to a sh.tangled.pipeline.cancelPipeline call. 18 + type PipelineCancelPipeline_Input struct { 19 + // pipeline: pipeline at-uri 20 + Pipeline string `json:"pipeline" cborgen:"pipeline"` 21 + // repo: repo at-uri, spindle can't resolve repo from pipeline at-uri yet 22 + Repo string `json:"repo" cborgen:"repo"` 23 + // workflow: workflow name 24 + Workflow string `json:"workflow" cborgen:"workflow"` 25 + } 26 + 27 + // PipelineCancelPipeline calls the XRPC method "sh.tangled.pipeline.cancelPipeline". 28 + func PipelineCancelPipeline(ctx context.Context, c util.LexClient, input *PipelineCancelPipeline_Input) error { 29 + if err := c.LexDo(ctx, util.Procedure, "application/json", "sh.tangled.pipeline.cancelPipeline", nil, input, nil); err != nil { 30 + return err 31 + } 32 + 33 + return nil 34 + }
+10
appview/pages/templates/repo/pipelines/workflow.html
··· 12 {{ block "sidebar" . }} {{ end }} 13 </div> 14 <div class="col-span-1 md:col-span-3"> 15 {{ block "logs" . }} {{ end }} 16 </div> 17 </section>
··· 12 {{ block "sidebar" . }} {{ end }} 13 </div> 14 <div class="col-span-1 md:col-span-3"> 15 + <div class="flex justify-end mb-2"> 16 + <button 17 + class="btn" 18 + hx-post="/{{ $.RepoInfo.FullName }}/pipelines/{{ .Pipeline.Id }}/workflow/{{ .Workflow }}/cancel" 19 + hx-swap="none" 20 + {{ if (index .Pipeline.Statuses .Workflow).Latest.Status.IsFinish -}} 21 + disabled 22 + {{- end }} 23 + >Cancel</button> 24 + </div> 25 {{ block "logs" . }} {{ end }} 26 </div> 27 </section>
+82
appview/pipelines/pipelines.go
··· 4 "bytes" 5 "context" 6 "encoding/json" 7 "log/slog" 8 "net/http" 9 "strings" 10 "time" 11 12 "tangled.org/core/appview/config" 13 "tangled.org/core/appview/db" 14 "tangled.org/core/appview/oauth" 15 "tangled.org/core/appview/pages" 16 "tangled.org/core/appview/reporesolver" ··· 41 r.Get("/", p.Index) 42 r.Get("/{pipeline}/workflow/{workflow}", p.Workflow) 43 r.Get("/{pipeline}/workflow/{workflow}/logs", p.Logs) 44 45 return r 46 } ··· 316 } 317 } 318 319 // either a message or an error 320 type logEvent struct { 321 msg []byte
··· 4 "bytes" 5 "context" 6 "encoding/json" 7 + "fmt" 8 "log/slog" 9 "net/http" 10 "strings" 11 "time" 12 13 + "tangled.org/core/api/tangled" 14 "tangled.org/core/appview/config" 15 "tangled.org/core/appview/db" 16 + "tangled.org/core/appview/models" 17 "tangled.org/core/appview/oauth" 18 "tangled.org/core/appview/pages" 19 "tangled.org/core/appview/reporesolver" ··· 44 r.Get("/", p.Index) 45 r.Get("/{pipeline}/workflow/{workflow}", p.Workflow) 46 r.Get("/{pipeline}/workflow/{workflow}/logs", p.Logs) 47 + r.Post("/{pipeline}/workflow/{workflow}/cancel", p.Cancel) 48 49 return r 50 } ··· 320 } 321 } 322 323 + func (p *Pipelines) Cancel(w http.ResponseWriter, r *http.Request) { 324 + l := p.logger.With("handler", "Cancel") 325 + 326 + var ( 327 + pipelineId = chi.URLParam(r, "pipeline") 328 + workflow = chi.URLParam(r, "workflow") 329 + ) 330 + if pipelineId == "" || workflow == "" { 331 + http.Error(w, "missing pipeline ID or workflow", http.StatusBadRequest) 332 + return 333 + } 334 + 335 + f, err := p.repoResolver.Resolve(r) 336 + if err != nil { 337 + l.Error("failed to get repo and knot", "err", err) 338 + http.Error(w, "bad repo/knot", http.StatusBadRequest) 339 + return 340 + } 341 + 342 + pipeline, err := func() (models.Pipeline, error) { 343 + ps, err := db.GetPipelineStatuses( 344 + p.db, 345 + 1, 346 + orm.FilterEq("repo_owner", f.Did), 347 + orm.FilterEq("repo_name", f.Name), 348 + orm.FilterEq("knot", f.Knot), 349 + orm.FilterEq("id", pipelineId), 350 + ) 351 + if err != nil { 352 + return models.Pipeline{}, err 353 + } 354 + if len(ps) != 1 { 355 + return models.Pipeline{}, fmt.Errorf("wrong pipeline count %d", len(ps)) 356 + } 357 + return ps[0], nil 358 + }() 359 + if err != nil { 360 + l.Error("pipeline query failed", "err", err) 361 + http.Error(w, "pipeline not found", http.StatusNotFound) 362 + } 363 + var ( 364 + spindle = f.Spindle 365 + knot = f.Knot 366 + rkey = pipeline.Rkey 367 + ) 368 + 369 + if spindle == "" || knot == "" || rkey == "" { 370 + http.Error(w, "invalid repo info", http.StatusBadRequest) 371 + return 372 + } 373 + 374 + spindleClient, err := p.oauth.ServiceClient( 375 + r, 376 + oauth.WithService(f.Spindle), 377 + oauth.WithLxm(tangled.PipelineCancelPipelineNSID), 378 + oauth.WithExp(60), 379 + oauth.WithDev(false), 380 + oauth.WithTimeout(time.Second*30), // workflow cleanup usually takes time 381 + ) 382 + 383 + err = tangled.PipelineCancelPipeline( 384 + r.Context(), 385 + spindleClient, 386 + &tangled.PipelineCancelPipeline_Input{ 387 + Repo: string(f.RepoAt()), 388 + Pipeline: pipeline.AtUri().String(), 389 + Workflow: workflow, 390 + }, 391 + ) 392 + errorId := "pipeline-action" 393 + if err != nil { 394 + l.Error("failed to cancel pipeline", "err", err) 395 + p.pages.Notice(w, errorId, "Failed to add secret.") 396 + return 397 + } 398 + l.Debug("canceled pipeline", "uri", pipeline.AtUri()) 399 + } 400 + 401 // either a message or an error 402 type logEvent struct { 403 msg []byte
+33
lexicons/pipeline/cancelPipeline.json
···
··· 1 + { 2 + "lexicon": 1, 3 + "id": "sh.tangled.pipeline.cancelPipeline", 4 + "defs": { 5 + "main": { 6 + "type": "procedure", 7 + "description": "Cancel a running pipeline", 8 + "input": { 9 + "encoding": "application/json", 10 + "schema": { 11 + "type": "object", 12 + "required": ["repo", "pipeline", "workflow"], 13 + "properties": { 14 + "repo": { 15 + "type": "string", 16 + "format": "at-uri", 17 + "description": "repo at-uri, spindle can't resolve repo from pipeline at-uri yet" 18 + }, 19 + "pipeline": { 20 + "type": "string", 21 + "format": "at-uri", 22 + "description": "pipeline at-uri" 23 + }, 24 + "workflow": { 25 + "type": "string", 26 + "description": "workflow name" 27 + } 28 + } 29 + } 30 + } 31 + } 32 + } 33 + }
+4
spindle/db/events.go
··· 150 return d.createStatusEvent(workflowId, models.StatusKindFailed, &workflowError, &exitCode, n) 151 } 152 153 func (d *DB) StatusSuccess(workflowId models.WorkflowId, n *notifier.Notifier) error { 154 return d.createStatusEvent(workflowId, models.StatusKindSuccess, nil, nil, n) 155 }
··· 150 return d.createStatusEvent(workflowId, models.StatusKindFailed, &workflowError, &exitCode, n) 151 } 152 153 + func (d *DB) StatusCancelled(workflowId models.WorkflowId, workflowError string, exitCode int64, n *notifier.Notifier) error { 154 + return d.createStatusEvent(workflowId, models.StatusKindCancelled, &workflowError, &exitCode, n) 155 + } 156 + 157 func (d *DB) StatusSuccess(workflowId models.WorkflowId, n *notifier.Notifier) error { 158 return d.createStatusEvent(workflowId, models.StatusKindSuccess, nil, nil, n) 159 }
+1
spindle/server.go
··· 268 Config: s.cfg, 269 Resolver: s.res, 270 Vault: s.vault, 271 ServiceAuth: serviceAuth, 272 } 273
··· 268 Config: s.cfg, 269 Resolver: s.res, 270 Vault: s.vault, 271 + Notifier: s.Notifier(), 272 ServiceAuth: serviceAuth, 273 } 274
+97
spindle/xrpc/pipeline_cancelPipeline.go
···
··· 1 + package xrpc 2 + 3 + import ( 4 + "encoding/json" 5 + "fmt" 6 + "net/http" 7 + "strings" 8 + 9 + "github.com/bluesky-social/indigo/api/atproto" 10 + "github.com/bluesky-social/indigo/atproto/syntax" 11 + "github.com/bluesky-social/indigo/xrpc" 12 + securejoin "github.com/cyphar/filepath-securejoin" 13 + "tangled.org/core/api/tangled" 14 + "tangled.org/core/rbac" 15 + "tangled.org/core/spindle/models" 16 + xrpcerr "tangled.org/core/xrpc/errors" 17 + ) 18 + 19 + func (x *Xrpc) CancelPipeline(w http.ResponseWriter, r *http.Request) { 20 + l := x.Logger 21 + fail := func(e xrpcerr.XrpcError) { 22 + l.Error("failed", "kind", e.Tag, "error", e.Message) 23 + writeError(w, e, http.StatusBadRequest) 24 + } 25 + l.Debug("cancel pipeline") 26 + 27 + actorDid, ok := r.Context().Value(ActorDid).(syntax.DID) 28 + if !ok { 29 + fail(xrpcerr.MissingActorDidError) 30 + return 31 + } 32 + 33 + var input tangled.PipelineCancelPipeline_Input 34 + if err := json.NewDecoder(r.Body).Decode(&input); err != nil { 35 + fail(xrpcerr.GenericError(err)) 36 + return 37 + } 38 + 39 + aturi := syntax.ATURI(input.Pipeline) 40 + wid := models.WorkflowId{ 41 + PipelineId: models.PipelineId{ 42 + Knot: strings.TrimPrefix(aturi.Authority().String(), "did:web:"), 43 + Rkey: aturi.RecordKey().String(), 44 + }, 45 + Name: input.Workflow, 46 + } 47 + l.Debug("cancel pipeline", "wid", wid) 48 + 49 + // unfortunately we have to resolve repo-at here 50 + repoAt, err := syntax.ParseATURI(input.Repo) 51 + if err != nil { 52 + fail(xrpcerr.InvalidRepoError(input.Repo)) 53 + return 54 + } 55 + 56 + ident, err := x.Resolver.ResolveIdent(r.Context(), repoAt.Authority().String()) 57 + if err != nil || ident.Handle.IsInvalidHandle() { 58 + fail(xrpcerr.GenericError(fmt.Errorf("failed to resolve handle: %w", err))) 59 + return 60 + } 61 + 62 + xrpcc := xrpc.Client{Host: ident.PDSEndpoint()} 63 + resp, err := atproto.RepoGetRecord(r.Context(), &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 64 + if err != nil { 65 + fail(xrpcerr.GenericError(err)) 66 + return 67 + } 68 + 69 + repo := resp.Value.Val.(*tangled.Repo) 70 + didSlashRepo, err := securejoin.SecureJoin(ident.DID.String(), repo.Name) 71 + if err != nil { 72 + fail(xrpcerr.GenericError(err)) 73 + return 74 + } 75 + 76 + // TODO: fine-grained role based control 77 + isRepoOwner, err := x.Enforcer.IsRepoOwner(actorDid.String(), rbac.ThisServer, didSlashRepo) 78 + if err != nil || !isRepoOwner { 79 + fail(xrpcerr.AccessControlError(actorDid.String())) 80 + return 81 + } 82 + for _, engine := range x.Engines { 83 + l.Debug("destorying workflow", "wid", wid) 84 + err = engine.DestroyWorkflow(r.Context(), wid) 85 + if err != nil { 86 + fail(xrpcerr.GenericError(fmt.Errorf("dailed to destroy workflow: %w", err))) 87 + return 88 + } 89 + err = x.Db.StatusCancelled(wid, "User canceled the workflow", -1, x.Notifier) 90 + if err != nil { 91 + fail(xrpcerr.GenericError(fmt.Errorf("dailed to emit status failed: %w", err))) 92 + return 93 + } 94 + } 95 + 96 + w.WriteHeader(http.StatusOK) 97 + }
+3
spindle/xrpc/xrpc.go
··· 10 11 "tangled.org/core/api/tangled" 12 "tangled.org/core/idresolver" 13 "tangled.org/core/rbac" 14 "tangled.org/core/spindle/config" 15 "tangled.org/core/spindle/db" ··· 29 Config *config.Config 30 Resolver *idresolver.Resolver 31 Vault secrets.Manager 32 ServiceAuth *serviceauth.ServiceAuth 33 } 34 ··· 41 r.Post("/"+tangled.RepoAddSecretNSID, x.AddSecret) 42 r.Post("/"+tangled.RepoRemoveSecretNSID, x.RemoveSecret) 43 r.Get("/"+tangled.RepoListSecretsNSID, x.ListSecrets) 44 }) 45 46 // service query endpoints (no auth required)
··· 10 11 "tangled.org/core/api/tangled" 12 "tangled.org/core/idresolver" 13 + "tangled.org/core/notifier" 14 "tangled.org/core/rbac" 15 "tangled.org/core/spindle/config" 16 "tangled.org/core/spindle/db" ··· 30 Config *config.Config 31 Resolver *idresolver.Resolver 32 Vault secrets.Manager 33 + Notifier *notifier.Notifier 34 ServiceAuth *serviceauth.ServiceAuth 35 } 36 ··· 43 r.Post("/"+tangled.RepoAddSecretNSID, x.AddSecret) 44 r.Post("/"+tangled.RepoRemoveSecretNSID, x.RemoveSecret) 45 r.Get("/"+tangled.RepoListSecretsNSID, x.ListSecrets) 46 + r.Post("/"+tangled.PipelineCancelPipelineNSID, x.CancelPipeline) 47 }) 48 49 // service query endpoints (no auth required)

History

7 rounds 7 comments
sign up or login to add to the discussion
1 commit
expand
lexicons,appview,spindle: add workflow cancel
expand 0 comments
pull request successfully merged
1 commit
expand
lexicons,appview,spindle: add workflow cancel
expand 1 comment
  • here, failedis spelled asdailed`
  • here, err := fmt.Errorf(...) should probably be removed.
1 commit
expand
lexicons,appview,spindle: add workflow cancel
expand 0 comments
1 commit
expand
lexicons,appview,spindle: add workflow cancel
expand 0 comments
1 commit
expand
lexicons,appview,spindle: add workflow cancel
expand 0 comments
1 commit
expand
lexicons,appview,spindle: add workflow cancel
expand 6 comments

requesting a review @oppi.li

can we allow owners to also cancel pipelines?

sorry, i mean collaborators.

@oppi.li I'm not sure how should I implement that. We don't have repo:pipeline action defined for repo owners and contributors.

For example, setting access is controlled by repo:settings.

Should I just check for both repo:owner and repo:contributor as a dirty fix?

can we allow collaborators to also cancel pipelines?

@oppi.li I think we can merge this for now. Spindle's collaborator ingesting logic is currently broken so allowing them to cancel pipelines will reveal the bug. I'll make it possible on upcoming PR I'm working on sl/spindle-rewrite branch.

reviews:

  • here, we should use config.Core.Dev
  • here, don't set expiry, the default is already 60s into the future
  • there is no auth filter for the cancel button on the appview side, the button should only be visible to repo owners + collaborators, and the /workflow/cancel endpoint should return 403 for non-collaborators and non-owners.
  • here, the error message needs to be "Failed to cancel workflow", not "Failed to add secret"

the rest of the code is brilliant, will give this a test locally as well!

boltless.me submitted #0
1 commit
expand
lexicons,appview,spindle: add workflow cancel
expand 0 comments