this repo has no description
1package xrpc 2 3import ( 4 "encoding/json" 5 "fmt" 6 "net/http" 7 "strings" 8 9 "github.com/bluesky-social/indigo/api/atproto" 10 "github.com/bluesky-social/indigo/atproto/syntax" 11 "github.com/bluesky-social/indigo/xrpc" 12 securejoin "github.com/cyphar/filepath-securejoin" 13 "tangled.org/core/api/tangled" 14 "tangled.org/core/rbac" 15 "tangled.org/core/spindle/models" 16 xrpcerr "tangled.org/core/xrpc/errors" 17) 18 19func (x *Xrpc) CancelPipeline(w http.ResponseWriter, r *http.Request) { 20 l := x.Logger 21 fail := func(e xrpcerr.XrpcError) { 22 l.Error("failed", "kind", e.Tag, "error", e.Message) 23 writeError(w, e, http.StatusBadRequest) 24 } 25 l.Debug("cancel pipeline") 26 27 actorDid, ok := r.Context().Value(ActorDid).(syntax.DID) 28 if !ok { 29 fail(xrpcerr.MissingActorDidError) 30 return 31 } 32 33 var input tangled.PipelineCancelPipeline_Input 34 if err := json.NewDecoder(r.Body).Decode(&input); err != nil { 35 fail(xrpcerr.GenericError(err)) 36 return 37 } 38 39 aturi := syntax.ATURI(input.Pipeline) 40 wid := models.WorkflowId{ 41 PipelineId: models.PipelineId{ 42 Knot: strings.TrimPrefix(aturi.Authority().String(), "did:web:"), 43 Rkey: aturi.RecordKey().String(), 44 }, 45 Name: input.Workflow, 46 } 47 l.Debug("cancel pipeline", "wid", wid) 48 49 // unfortunately we have to resolve repo-at here 50 repoAt, err := syntax.ParseATURI(input.Repo) 51 if err != nil { 52 fail(xrpcerr.InvalidRepoError(input.Repo)) 53 return 54 } 55 56 ident, err := x.Resolver.ResolveIdent(r.Context(), repoAt.Authority().String()) 57 if err != nil || ident.Handle.IsInvalidHandle() { 58 fail(xrpcerr.GenericError(fmt.Errorf("failed to resolve handle: %w", err))) 59 return 60 } 61 62 xrpcc := xrpc.Client{Host: ident.PDSEndpoint()} 63 resp, err := atproto.RepoGetRecord(r.Context(), &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 64 if err != nil { 65 fail(xrpcerr.GenericError(err)) 66 return 67 } 68 69 repo := resp.Value.Val.(*tangled.Repo) 70 didSlashRepo, err := securejoin.SecureJoin(ident.DID.String(), repo.Name) 71 if err != nil { 72 fail(xrpcerr.GenericError(err)) 73 return 74 } 75 76 // TODO: fine-grained role based control 77 isRepoOwner, err := x.Enforcer.IsRepoOwner(actorDid.String(), rbac.ThisServer, didSlashRepo) 78 if err != nil || !isRepoOwner { 79 fail(xrpcerr.AccessControlError(actorDid.String())) 80 return 81 } 82 for _, engine := range x.Engines { 83 l.Debug("destorying workflow", "wid", wid) 84 err = engine.DestroyWorkflow(r.Context(), wid) 85 if err != nil { 86 fail(xrpcerr.GenericError(fmt.Errorf("dailed to destroy workflow: %w", err))) 87 return 88 } 89 err = x.Db.StatusCancelled(wid, "User canceled the workflow", -1, x.Notifier) 90 if err != nil { 91 fail(xrpcerr.GenericError(fmt.Errorf("dailed to emit status failed: %w", err))) 92 return 93 } 94 } 95 96 w.WriteHeader(http.StatusOK) 97}