this repo has no description
1package labels
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "net/http"
10 "time"
11
12 comatproto "github.com/bluesky-social/indigo/api/atproto"
13 "github.com/bluesky-social/indigo/atproto/syntax"
14 lexutil "github.com/bluesky-social/indigo/lex/util"
15 "github.com/go-chi/chi/v5"
16
17 "tangled.sh/tangled.sh/core/api/tangled"
18 "tangled.sh/tangled.sh/core/appview/config"
19 "tangled.sh/tangled.sh/core/appview/db"
20 "tangled.sh/tangled.sh/core/appview/middleware"
21 "tangled.sh/tangled.sh/core/appview/oauth"
22 "tangled.sh/tangled.sh/core/appview/pages"
23 "tangled.sh/tangled.sh/core/appview/reporesolver"
24 "tangled.sh/tangled.sh/core/appview/xrpcclient"
25 "tangled.sh/tangled.sh/core/eventconsumer"
26 "tangled.sh/tangled.sh/core/idresolver"
27 "tangled.sh/tangled.sh/core/log"
28 "tangled.sh/tangled.sh/core/rbac"
29 "tangled.sh/tangled.sh/core/tid"
30)
31
32type Labels struct {
33 repoResolver *reporesolver.RepoResolver
34 idResolver *idresolver.Resolver
35 oauth *oauth.OAuth
36 pages *pages.Pages
37 db *db.DB
38 logger *slog.Logger
39}
40
41func New(
42 oauth *oauth.OAuth,
43 repoResolver *reporesolver.RepoResolver,
44 pages *pages.Pages,
45 spindlestream *eventconsumer.Consumer,
46 idResolver *idresolver.Resolver,
47 db *db.DB,
48 config *config.Config,
49 enforcer *rbac.Enforcer,
50) *Labels {
51 logger := log.New("labels")
52
53 return &Labels{
54 oauth: oauth,
55 repoResolver: repoResolver,
56 pages: pages,
57 idResolver: idResolver,
58 db: db,
59 logger: logger,
60 }
61}
62
63func (l *Labels) Router(mw *middleware.Middleware) http.Handler {
64 r := chi.NewRouter()
65
66 r.With(middleware.AuthMiddleware(l.oauth)).Put("/perform", l.PerformLabelOp)
67
68 return r
69}
70
71func (l *Labels) PerformLabelOp(w http.ResponseWriter, r *http.Request) {
72 user := l.oauth.GetUser(r)
73
74 if err := r.ParseForm(); err != nil {
75 l.logger.Error("failed to parse form data", "error", err)
76 http.Error(w, "Invalid form data", http.StatusBadRequest)
77 return
78 }
79
80 did := user.Did
81 rkey := tid.TID()
82 performedAt := time.Now()
83 indexedAt := time.Now()
84 repoAt := r.Form.Get("repo")
85 subjectUri := r.Form.Get("subject")
86 keys := r.Form["operand-key"]
87 vals := r.Form["operand-val"]
88
89 var labelOps []db.LabelOp
90 for i := range len(keys) {
91 op := r.FormValue(fmt.Sprintf("op-%d", i))
92 if op == "" {
93 op = string(db.LabelOperationDel)
94 }
95 key := keys[i]
96 val := vals[i]
97
98 labelOps = append(labelOps, db.LabelOp{
99 Did: did,
100 Rkey: rkey,
101 Subject: syntax.ATURI(subjectUri),
102 Operation: db.LabelOperation(op),
103 OperandKey: key,
104 OperandValue: val,
105 PerformedAt: performedAt,
106 IndexedAt: indexedAt,
107 })
108 }
109
110 // TODO: validate the operations
111
112 // find all the labels that this repo subscribes to
113 repoLabels, err := db.GetRepoLabels(l.db, db.FilterEq("repo_at", repoAt))
114 if err != nil {
115 http.Error(w, "Invalid form data", http.StatusBadRequest)
116 return
117 }
118
119 var labelAts []string
120 for _, rl := range repoLabels {
121 labelAts = append(labelAts, rl.LabelAt.String())
122 }
123
124 actx, err := db.NewLabelApplicationCtx(l.db, db.FilterIn("at_uri", labelAts))
125 if err != nil {
126 http.Error(w, "Invalid form data", http.StatusBadRequest)
127 return
128 }
129
130 // calculate the start state by applying already known labels
131 existingOps, err := db.GetLabelOps(l.db, db.FilterEq("subject", subjectUri))
132 if err != nil {
133 http.Error(w, "Invalid form data", http.StatusBadRequest)
134 return
135 }
136
137 labelState := db.NewLabelState()
138 actx.ApplyLabelOps(labelState, existingOps)
139
140 // next, apply all ops introduced in this request and filter out ones that are no-ops
141 validLabelOps := labelOps[:0]
142 for _, op := range labelOps {
143 if err = actx.ApplyLabelOp(labelState, op); err != db.LabelNoOpError {
144 validLabelOps = append(validLabelOps, op)
145 }
146 }
147
148 // nothing to do
149 if len(validLabelOps) == 0 {
150 l.pages.HxRefresh(w)
151 return
152 }
153
154 // create an atproto record of valid ops
155 record := db.LabelOpsAsRecord(validLabelOps)
156
157 client, err := l.oauth.AuthorizedClient(r)
158 if err != nil {
159 l.logger.Error("failed to create client", "error", err)
160 http.Error(w, "Invalid form data", http.StatusBadRequest)
161 return
162 }
163
164 resp, err := client.RepoPutRecord(r.Context(), &comatproto.RepoPutRecord_Input{
165 Collection: tangled.LabelOpNSID,
166 Repo: did,
167 Rkey: rkey,
168 Record: &lexutil.LexiconTypeDecoder{
169 Val: &record,
170 },
171 })
172 if err != nil {
173 l.logger.Error("failed to write to PDS", "error", err)
174 http.Error(w, "failed to write to PDS", http.StatusInternalServerError)
175 return
176 }
177 atUri := resp.Uri
178
179 tx, err := l.db.BeginTx(r.Context(), nil)
180 if err != nil {
181 l.logger.Error("failed to start tx", "error", err)
182 return
183 }
184
185 rollback := func() {
186 err1 := tx.Rollback()
187 err2 := rollbackRecord(context.Background(), atUri, client)
188
189 // ignore txn complete errors, this is okay
190 if errors.Is(err1, sql.ErrTxDone) {
191 err1 = nil
192 }
193
194 if errs := errors.Join(err1, err2); errs != nil {
195 return
196 }
197 }
198 defer rollback()
199
200 for _, o := range validLabelOps {
201 if _, err := db.AddLabelOp(l.db, &o); err != nil {
202 l.logger.Error("failed to add op", "err", err)
203 return
204 }
205
206 l.logger.Info("performed label op", "did", o.Did, "rkey", o.Rkey, "kind", o.Operation, "subjcet", o.Subject, "key", o.OperandKey)
207 }
208
209 err = tx.Commit()
210 if err != nil {
211 return
212 }
213
214 // clear aturi when everything is successful
215 atUri = ""
216
217 l.pages.HxRefresh(w)
218}
219
220// this is used to rollback changes made to the PDS
221//
222// it is a no-op if the provided ATURI is empty
223func rollbackRecord(ctx context.Context, aturi string, xrpcc *xrpcclient.Client) error {
224 if aturi == "" {
225 return nil
226 }
227
228 parsed := syntax.ATURI(aturi)
229
230 collection := parsed.Collection().String()
231 repo := parsed.Authority().String()
232 rkey := parsed.RecordKey().String()
233
234 _, err := xrpcc.RepoDeleteRecord(ctx, &comatproto.RepoDeleteRecord_Input{
235 Collection: collection,
236 Repo: repo,
237 Rkey: rkey,
238 })
239 return err
240}