Mirror of @tangled.org/core. Running on a Raspberry Pi Zero 2

lexicons,appview,spindle: add workflow cancel

Signed-off-by: Seongmin Lee <git@boltless.me>

authored by boltless.me and committed by tangled.org 3aeaff84 b18bfee9

+240 -4
api/tangled/pipelinecancelPipeline.go

This is a binary file and will not be displayed.

+14
appview/pages/templates/repo/pipelines/workflow.html
··· 12 12 {{ block "sidebar" . }} {{ end }} 13 13 </div> 14 14 <div class="col-span-1 md:col-span-3"> 15 + <!-- TODO(boltless): explictly check for pipeline cancel permission --> 16 + {{ if $.RepoInfo.Roles.IsOwner }} 17 + <div class="flex justify-between mb-2"> 18 + <div id="workflow-error" class="text-red-500 dark:text-red-400"></div> 19 + <button 20 + class="btn" 21 + hx-post="/{{ $.RepoInfo.FullName }}/pipelines/{{ .Pipeline.Id }}/workflow/{{ .Workflow }}/cancel" 22 + hx-swap="none" 23 + {{ if (index .Pipeline.Statuses .Workflow).Latest.Status.IsFinish -}} 24 + disabled 25 + {{- end }} 26 + >Cancel</button> 27 + </div> 28 + {{ end }} 15 29 {{ block "logs" . }} {{ end }} 16 30 </div> 17 31 </section>
+85 -1
appview/pipelines/pipelines.go
··· 4 4 "bytes" 5 5 "context" 6 6 "encoding/json" 7 + "fmt" 7 8 "log/slog" 8 9 "net/http" 9 10 "strings" 10 11 "time" 11 12 13 + "tangled.org/core/api/tangled" 12 14 "tangled.org/core/appview/config" 13 15 "tangled.org/core/appview/db" 16 + "tangled.org/core/appview/middleware" 17 + "tangled.org/core/appview/models" 14 18 "tangled.org/core/appview/oauth" 15 19 "tangled.org/core/appview/pages" 16 20 "tangled.org/core/appview/reporesolver" ··· 40 36 logger *slog.Logger 41 37 } 42 38 43 - func (p *Pipelines) Router() http.Handler { 39 + func (p *Pipelines) Router(mw *middleware.Middleware) http.Handler { 44 40 r := chi.NewRouter() 45 41 r.Get("/", p.Index) 46 42 r.Get("/{pipeline}/workflow/{workflow}", p.Workflow) 47 43 r.Get("/{pipeline}/workflow/{workflow}/logs", p.Logs) 44 + r. 45 + With(mw.RepoPermissionMiddleware("repo:owner")). 46 + Post("/{pipeline}/workflow/{workflow}/cancel", p.Cancel) 48 47 49 48 return r 50 49 } ··· 321 314 } 322 315 } 323 316 } 317 + } 318 + 319 + func (p *Pipelines) Cancel(w http.ResponseWriter, r *http.Request) { 320 + l := p.logger.With("handler", "Cancel") 321 + 322 + var ( 323 + pipelineId = chi.URLParam(r, "pipeline") 324 + workflow = chi.URLParam(r, "workflow") 325 + ) 326 + if pipelineId == "" || workflow == "" { 327 + http.Error(w, "missing pipeline ID or workflow", http.StatusBadRequest) 328 + return 329 + } 330 + 331 + f, err := p.repoResolver.Resolve(r) 332 + if err != nil { 333 + l.Error("failed to get repo and knot", "err", err) 334 + http.Error(w, "bad repo/knot", http.StatusBadRequest) 335 + return 336 + } 337 + 338 + pipeline, err := func() (models.Pipeline, error) { 339 + ps, err := db.GetPipelineStatuses( 340 + p.db, 341 + 1, 342 + orm.FilterEq("repo_owner", f.Did), 343 + orm.FilterEq("repo_name", f.Name), 344 + orm.FilterEq("knot", f.Knot), 345 + orm.FilterEq("id", pipelineId), 346 + ) 347 + if err != nil { 348 + return models.Pipeline{}, err 349 + } 350 + if len(ps) != 1 { 351 + return models.Pipeline{}, fmt.Errorf("wrong pipeline count %d", len(ps)) 352 + } 353 + return ps[0], nil 354 + }() 355 + if err != nil { 356 + l.Error("pipeline query failed", "err", err) 357 + http.Error(w, "pipeline not found", http.StatusNotFound) 358 + } 359 + var ( 360 + spindle = f.Spindle 361 + knot = f.Knot 362 + rkey = pipeline.Rkey 363 + ) 364 + 365 + if spindle == "" || knot == "" || rkey == "" { 366 + http.Error(w, "invalid repo info", http.StatusBadRequest) 367 + return 368 + } 369 + 370 + spindleClient, err := p.oauth.ServiceClient( 371 + r, 372 + oauth.WithService(f.Spindle), 373 + oauth.WithLxm(tangled.PipelineCancelPipelineNSID), 374 + oauth.WithDev(p.config.Core.Dev), 375 + oauth.WithTimeout(time.Second*30), // workflow cleanup usually takes time 376 + ) 377 + 378 + err = tangled.PipelineCancelPipeline( 379 + r.Context(), 380 + spindleClient, 381 + &tangled.PipelineCancelPipeline_Input{ 382 + Repo: string(f.RepoAt()), 383 + Pipeline: pipeline.AtUri().String(), 384 + Workflow: workflow, 385 + }, 386 + ) 387 + errorId := "workflow-error" 388 + if err != nil { 389 + l.Error("failed to cancel workflow", "err", err) 390 + p.pages.Notice(w, errorId, "Failed to cancel workflow") 391 + return 392 + } 393 + l.Debug("canceled pipeline", "uri", pipeline.AtUri()) 324 394 } 325 395 326 396 // either a message or an error
+3 -3
appview/state/router.go
··· 94 94 r.Mount("/", s.RepoRouter(mw)) 95 95 r.Mount("/issues", s.IssuesRouter(mw)) 96 96 r.Mount("/pulls", s.PullsRouter(mw)) 97 - r.Mount("/pipelines", s.PipelinesRouter()) 97 + r.Mount("/pipelines", s.PipelinesRouter(mw)) 98 98 r.Mount("/labels", s.LabelsRouter()) 99 99 100 100 // These routes get proxied to the knot ··· 313 313 return repo.Router(mw) 314 314 } 315 315 316 - func (s *State) PipelinesRouter() http.Handler { 316 + func (s *State) PipelinesRouter(mw *middleware.Middleware) http.Handler { 317 317 pipes := pipelines.New( 318 318 s.oauth, 319 319 s.repoResolver, ··· 325 325 s.enforcer, 326 326 log.SubLogger(s.logger, "pipelines"), 327 327 ) 328 - return pipes.Router() 328 + return pipes.Router(mw) 329 329 } 330 330 331 331 func (s *State) LabelsRouter() http.Handler {
+33
lexicons/pipeline/cancelPipeline.json
··· 1 + { 2 + "lexicon": 1, 3 + "id": "sh.tangled.pipeline.cancelPipeline", 4 + "defs": { 5 + "main": { 6 + "type": "procedure", 7 + "description": "Cancel a running pipeline", 8 + "input": { 9 + "encoding": "application/json", 10 + "schema": { 11 + "type": "object", 12 + "required": ["repo", "pipeline", "workflow"], 13 + "properties": { 14 + "repo": { 15 + "type": "string", 16 + "format": "at-uri", 17 + "description": "repo at-uri, spindle can't resolve repo from pipeline at-uri yet" 18 + }, 19 + "pipeline": { 20 + "type": "string", 21 + "format": "at-uri", 22 + "description": "pipeline at-uri" 23 + }, 24 + "workflow": { 25 + "type": "string", 26 + "description": "workflow name" 27 + } 28 + } 29 + } 30 + } 31 + } 32 + } 33 + }
+4
spindle/db/events.go
··· 150 150 return d.createStatusEvent(workflowId, models.StatusKindFailed, &workflowError, &exitCode, n) 151 151 } 152 152 153 + func (d *DB) StatusCancelled(workflowId models.WorkflowId, workflowError string, exitCode int64, n *notifier.Notifier) error { 154 + return d.createStatusEvent(workflowId, models.StatusKindCancelled, &workflowError, &exitCode, n) 155 + } 156 + 153 157 func (d *DB) StatusSuccess(workflowId models.WorkflowId, n *notifier.Notifier) error { 154 158 return d.createStatusEvent(workflowId, models.StatusKindSuccess, nil, nil, n) 155 159 }
+1
spindle/server.go
··· 286 286 Config: s.cfg, 287 287 Resolver: s.res, 288 288 Vault: s.vault, 289 + Notifier: s.Notifier(), 289 290 ServiceAuth: serviceAuth, 290 291 } 291 292
+97
spindle/xrpc/pipeline_cancelPipeline.go
··· 1 + package xrpc 2 + 3 + import ( 4 + "encoding/json" 5 + "fmt" 6 + "net/http" 7 + "strings" 8 + 9 + "github.com/bluesky-social/indigo/api/atproto" 10 + "github.com/bluesky-social/indigo/atproto/syntax" 11 + "github.com/bluesky-social/indigo/xrpc" 12 + securejoin "github.com/cyphar/filepath-securejoin" 13 + "tangled.org/core/api/tangled" 14 + "tangled.org/core/rbac" 15 + "tangled.org/core/spindle/models" 16 + xrpcerr "tangled.org/core/xrpc/errors" 17 + ) 18 + 19 + func (x *Xrpc) CancelPipeline(w http.ResponseWriter, r *http.Request) { 20 + l := x.Logger 21 + fail := func(e xrpcerr.XrpcError) { 22 + l.Error("failed", "kind", e.Tag, "error", e.Message) 23 + writeError(w, e, http.StatusBadRequest) 24 + } 25 + l.Debug("cancel pipeline") 26 + 27 + actorDid, ok := r.Context().Value(ActorDid).(syntax.DID) 28 + if !ok { 29 + fail(xrpcerr.MissingActorDidError) 30 + return 31 + } 32 + 33 + var input tangled.PipelineCancelPipeline_Input 34 + if err := json.NewDecoder(r.Body).Decode(&input); err != nil { 35 + fail(xrpcerr.GenericError(err)) 36 + return 37 + } 38 + 39 + aturi := syntax.ATURI(input.Pipeline) 40 + wid := models.WorkflowId{ 41 + PipelineId: models.PipelineId{ 42 + Knot: strings.TrimPrefix(aturi.Authority().String(), "did:web:"), 43 + Rkey: aturi.RecordKey().String(), 44 + }, 45 + Name: input.Workflow, 46 + } 47 + l.Debug("cancel pipeline", "wid", wid) 48 + 49 + // unfortunately we have to resolve repo-at here 50 + repoAt, err := syntax.ParseATURI(input.Repo) 51 + if err != nil { 52 + fail(xrpcerr.InvalidRepoError(input.Repo)) 53 + return 54 + } 55 + 56 + ident, err := x.Resolver.ResolveIdent(r.Context(), repoAt.Authority().String()) 57 + if err != nil || ident.Handle.IsInvalidHandle() { 58 + fail(xrpcerr.GenericError(fmt.Errorf("failed to resolve handle: %w", err))) 59 + return 60 + } 61 + 62 + xrpcc := xrpc.Client{Host: ident.PDSEndpoint()} 63 + resp, err := atproto.RepoGetRecord(r.Context(), &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 64 + if err != nil { 65 + fail(xrpcerr.GenericError(err)) 66 + return 67 + } 68 + 69 + repo := resp.Value.Val.(*tangled.Repo) 70 + didSlashRepo, err := securejoin.SecureJoin(ident.DID.String(), repo.Name) 71 + if err != nil { 72 + fail(xrpcerr.GenericError(err)) 73 + return 74 + } 75 + 76 + // TODO: fine-grained role based control 77 + isRepoOwner, err := x.Enforcer.IsRepoOwner(actorDid.String(), rbac.ThisServer, didSlashRepo) 78 + if err != nil || !isRepoOwner { 79 + fail(xrpcerr.AccessControlError(actorDid.String())) 80 + return 81 + } 82 + for _, engine := range x.Engines { 83 + l.Debug("destorying workflow", "wid", wid) 84 + err = engine.DestroyWorkflow(r.Context(), wid) 85 + if err != nil { 86 + fail(xrpcerr.GenericError(fmt.Errorf("failed to destroy workflow: %w", err))) 87 + return 88 + } 89 + err = x.Db.StatusCancelled(wid, "User canceled the workflow", -1, x.Notifier) 90 + if err != nil { 91 + fail(xrpcerr.GenericError(fmt.Errorf("failed to emit status failed: %w", err))) 92 + return 93 + } 94 + } 95 + 96 + w.WriteHeader(http.StatusOK) 97 + }
+3
spindle/xrpc/xrpc.go
··· 10 10 11 11 "tangled.org/core/api/tangled" 12 12 "tangled.org/core/idresolver" 13 + "tangled.org/core/notifier" 13 14 "tangled.org/core/rbac" 14 15 "tangled.org/core/spindle/config" 15 16 "tangled.org/core/spindle/db" ··· 30 29 Config *config.Config 31 30 Resolver *idresolver.Resolver 32 31 Vault secrets.Manager 32 + Notifier *notifier.Notifier 33 33 ServiceAuth *serviceauth.ServiceAuth 34 34 } 35 35 ··· 43 41 r.Post("/"+tangled.RepoAddSecretNSID, x.AddSecret) 44 42 r.Post("/"+tangled.RepoRemoveSecretNSID, x.RemoveSecret) 45 43 r.Get("/"+tangled.RepoListSecretsNSID, x.ListSecrets) 44 + r.Post("/"+tangled.PipelineCancelPipelineNSID, x.CancelPipeline) 46 45 }) 47 46 48 47 // service query endpoints (no auth required)