this repo has no description
1package xrpc
2
3import (
4 "encoding/json"
5 "fmt"
6 "net/http"
7 "strings"
8
9 "github.com/bluesky-social/indigo/api/atproto"
10 "github.com/bluesky-social/indigo/atproto/syntax"
11 "github.com/bluesky-social/indigo/xrpc"
12 securejoin "github.com/cyphar/filepath-securejoin"
13 "tangled.org/core/api/tangled"
14 "tangled.org/core/rbac"
15 "tangled.org/core/spindle/models"
16 xrpcerr "tangled.org/core/xrpc/errors"
17)
18
19func (x *Xrpc) CancelPipeline(w http.ResponseWriter, r *http.Request) {
20 l := x.Logger
21 fail := func(e xrpcerr.XrpcError) {
22 l.Error("failed", "kind", e.Tag, "error", e.Message)
23 writeError(w, e, http.StatusBadRequest)
24 }
25 l.Debug("cancel pipeline")
26
27 actorDid, ok := r.Context().Value(ActorDid).(syntax.DID)
28 if !ok {
29 fail(xrpcerr.MissingActorDidError)
30 return
31 }
32
33 var input tangled.PipelineCancelPipeline_Input
34 if err := json.NewDecoder(r.Body).Decode(&input); err != nil {
35 fail(xrpcerr.GenericError(err))
36 return
37 }
38
39 aturi := syntax.ATURI(input.Pipeline)
40 wid := models.WorkflowId{
41 PipelineId: models.PipelineId{
42 Knot: strings.TrimPrefix(aturi.Authority().String(), "did:web:"),
43 Rkey: aturi.RecordKey().String(),
44 },
45 Name: input.Workflow,
46 }
47 l.Debug("cancel pipeline", "wid", wid)
48
49 // unfortunately we have to resolve repo-at here
50 repoAt, err := syntax.ParseATURI(input.Repo)
51 if err != nil {
52 fail(xrpcerr.InvalidRepoError(input.Repo))
53 return
54 }
55
56 ident, err := x.Resolver.ResolveIdent(r.Context(), repoAt.Authority().String())
57 if err != nil || ident.Handle.IsInvalidHandle() {
58 fail(xrpcerr.GenericError(fmt.Errorf("failed to resolve handle: %w", err)))
59 return
60 }
61
62 xrpcc := xrpc.Client{Host: ident.PDSEndpoint()}
63 resp, err := atproto.RepoGetRecord(r.Context(), &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String())
64 if err != nil {
65 fail(xrpcerr.GenericError(err))
66 return
67 }
68
69 repo := resp.Value.Val.(*tangled.Repo)
70 didSlashRepo, err := securejoin.SecureJoin(ident.DID.String(), repo.Name)
71 if err != nil {
72 fail(xrpcerr.GenericError(err))
73 return
74 }
75
76 // TODO: fine-grained role based control
77 isRepoOwner, err := x.Enforcer.IsRepoOwner(actorDid.String(), rbac.ThisServer, didSlashRepo)
78 if err != nil || !isRepoOwner {
79 fail(xrpcerr.AccessControlError(actorDid.String()))
80 return
81 }
82 for _, engine := range x.Engines {
83 l.Debug("destorying workflow", "wid", wid)
84 err = engine.DestroyWorkflow(r.Context(), wid)
85 if err != nil {
86 fail(xrpcerr.GenericError(fmt.Errorf("dailed to destroy workflow: %w", err)))
87 return
88 }
89 err = x.Db.StatusCancelled(wid, "User canceled the workflow", -1, x.Notifier)
90 if err != nil {
91 fail(xrpcerr.GenericError(fmt.Errorf("dailed to emit status failed: %w", err)))
92 return
93 }
94 }
95
96 w.WriteHeader(http.StatusOK)
97}