Monorepo for Tangled
1package labels
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "net/http"
10 "time"
11
12 "tangled.org/core/api/tangled"
13 "tangled.org/core/appview/db"
14 "tangled.org/core/appview/middleware"
15 "tangled.org/core/appview/models"
16 "tangled.org/core/appview/notify"
17 "tangled.org/core/appview/oauth"
18 "tangled.org/core/appview/pages"
19 "tangled.org/core/appview/validator"
20 "tangled.org/core/orm"
21 "tangled.org/core/rbac"
22 "tangled.org/core/tid"
23
24 comatproto "github.com/bluesky-social/indigo/api/atproto"
25 atpclient "github.com/bluesky-social/indigo/atproto/client"
26 "github.com/bluesky-social/indigo/atproto/syntax"
27 lexutil "github.com/bluesky-social/indigo/lex/util"
28 "github.com/go-chi/chi/v5"
29)
30
31type Labels struct {
32 oauth *oauth.OAuth
33 pages *pages.Pages
34 db *db.DB
35 logger *slog.Logger
36 validator *validator.Validator
37 enforcer *rbac.Enforcer
38 notifier notify.Notifier
39}
40
41func New(
42 oauth *oauth.OAuth,
43 pages *pages.Pages,
44 db *db.DB,
45 validator *validator.Validator,
46 enforcer *rbac.Enforcer,
47 notifier notify.Notifier,
48 logger *slog.Logger,
49) *Labels {
50 return &Labels{
51 oauth: oauth,
52 pages: pages,
53 db: db,
54 logger: logger,
55 validator: validator,
56 enforcer: enforcer,
57 notifier: notifier,
58 }
59}
60
61func (l *Labels) Router() http.Handler {
62 r := chi.NewRouter()
63
64 r.Use(middleware.AuthMiddleware(l.oauth))
65 r.Put("/perform", l.PerformLabelOp)
66
67 return r
68}
69
70// this is a tricky handler implementation:
71// - the user selects the new state of all the labels in the label panel and hits save
72// - this handler should calculate the diff in order to create the labelop record
73// - we need the diff in order to maintain a "history" of operations performed by users
74func (l *Labels) PerformLabelOp(w http.ResponseWriter, r *http.Request) {
75 user := l.oauth.GetMultiAccountUser(r)
76
77 noticeId := "add-label-error"
78
79 fail := func(msg string, err error) {
80 l.logger.Error("failed to add label", "err", err)
81 l.pages.Notice(w, noticeId, msg)
82 }
83
84 if err := r.ParseForm(); err != nil {
85 fail("Invalid form.", err)
86 return
87 }
88
89 did := user.Active.Did
90 rkey := tid.TID()
91 performedAt := time.Now()
92 indexedAt := time.Now()
93 repoAt := r.Form.Get("repo")
94 subjectUri := r.Form.Get("subject")
95
96 repo, err := db.GetRepo(l.db, orm.FilterEq("at_uri", repoAt))
97 if err != nil {
98 fail("Failed to get repository.", err)
99 return
100 }
101
102 // find all the labels that this repo subscribes to
103 repoLabels, err := db.GetRepoLabels(l.db, orm.FilterEq("repo_at", repoAt))
104 if err != nil {
105 fail("Failed to get labels for this repository.", err)
106 return
107 }
108
109 var labelAts []string
110 for _, rl := range repoLabels {
111 labelAts = append(labelAts, rl.LabelAt.String())
112 }
113
114 actx, err := db.NewLabelApplicationCtx(l.db, orm.FilterIn("at_uri", labelAts))
115 if err != nil {
116 fail("Invalid form data.", err)
117 return
118 }
119
120 // calculate the start state by applying already known labels
121 existingOps, err := db.GetLabelOps(l.db, orm.FilterEq("subject", subjectUri))
122 if err != nil {
123 fail("Invalid form data.", err)
124 return
125 }
126
127 labelState := models.NewLabelState()
128 actx.ApplyLabelOps(labelState, existingOps)
129
130 var labelOps []models.LabelOp
131
132 // first delete all existing state
133 for key, vals := range labelState.Inner() {
134 for val := range vals {
135 labelOps = append(labelOps, models.LabelOp{
136 Did: did,
137 Rkey: rkey,
138 Subject: syntax.ATURI(subjectUri),
139 Operation: models.LabelOperationDel,
140 OperandKey: key,
141 OperandValue: val,
142 PerformedAt: performedAt,
143 IndexedAt: indexedAt,
144 })
145 }
146 }
147
148 // add all the new state the user specified
149 for key, vals := range r.Form {
150 if _, ok := actx.Defs[key]; !ok {
151 continue
152 }
153
154 for _, val := range vals {
155 labelOps = append(labelOps, models.LabelOp{
156 Did: did,
157 Rkey: rkey,
158 Subject: syntax.ATURI(subjectUri),
159 Operation: models.LabelOperationAdd,
160 OperandKey: key,
161 OperandValue: val,
162 PerformedAt: performedAt,
163 IndexedAt: indexedAt,
164 })
165 }
166 }
167
168 for i := range labelOps {
169 def := actx.Defs[labelOps[i].OperandKey]
170 if err := l.validator.ValidateLabelOp(def, repo, &labelOps[i]); err != nil {
171 fail(fmt.Sprintf("Invalid form data: %s", err), err)
172 return
173 }
174 }
175
176 // reduce the opset
177 labelOps = models.ReduceLabelOps(labelOps)
178
179 // next, apply all ops introduced in this request and filter out ones that are no-ops
180 validLabelOps := labelOps[:0]
181 for _, op := range labelOps {
182 if err = actx.ApplyLabelOp(labelState, op); err != models.LabelNoOpError {
183 validLabelOps = append(validLabelOps, op)
184 }
185 }
186
187 // nothing to do
188 if len(validLabelOps) == 0 {
189 l.pages.HxRefresh(w)
190 return
191 }
192
193 // create an atproto record of valid ops
194 record := models.LabelOpsAsRecord(validLabelOps)
195
196 client, err := l.oauth.AuthorizedClient(r)
197 if err != nil {
198 fail("Failed to authorize user.", err)
199 return
200 }
201
202 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
203 Collection: tangled.LabelOpNSID,
204 Repo: did,
205 Rkey: rkey,
206 Record: &lexutil.LexiconTypeDecoder{
207 Val: &record,
208 },
209 })
210 if err != nil {
211 fail("Failed to create record on PDS for user.", err)
212 return
213 }
214 atUri := resp.Uri
215
216 tx, err := l.db.BeginTx(r.Context(), nil)
217 if err != nil {
218 fail("Failed to update labels. Try again later.", err)
219 return
220 }
221
222 rollback := func() {
223 err1 := tx.Rollback()
224 err2 := rollbackRecord(context.Background(), atUri, client)
225
226 // ignore txn complete errors, this is okay
227 if errors.Is(err1, sql.ErrTxDone) {
228 err1 = nil
229 }
230
231 if errs := errors.Join(err1, err2); errs != nil {
232 return
233 }
234 }
235 defer rollback()
236
237 for _, o := range validLabelOps {
238 if _, err := db.AddLabelOp(l.db, &o); err != nil {
239 fail("Failed to update labels. Try again later.", err)
240 return
241 }
242 }
243
244 err = tx.Commit()
245 if err != nil {
246 return
247 }
248
249 // clear aturi when everything is successful
250 atUri = ""
251
252 subject := syntax.ATURI(subjectUri)
253 if subject.Collection() == tangled.RepoIssueNSID {
254 issues, err := db.GetIssues(l.db, orm.FilterEq("at_uri", subjectUri))
255 if err == nil && len(issues) == 1 {
256 l.notifier.NewIssueLabelOp(r.Context(), &issues[0])
257 }
258 }
259 if subject.Collection() == tangled.RepoPullNSID {
260 pulls, err := db.GetPulls(l.db, orm.FilterEq("at_uri", subjectUri))
261 if err == nil && len(pulls) == 1 {
262 l.notifier.NewPullLabelOp(r.Context(), pulls[0])
263 }
264 }
265
266 l.pages.HxRefresh(w)
267}
268
269// this is used to rollback changes made to the PDS
270//
271// it is a no-op if the provided ATURI is empty
272func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error {
273 if aturi == "" {
274 return nil
275 }
276
277 parsed := syntax.ATURI(aturi)
278
279 collection := parsed.Collection().String()
280 repo := parsed.Authority().String()
281 rkey := parsed.RecordKey().String()
282
283 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{
284 Collection: collection,
285 Repo: repo,
286 Rkey: rkey,
287 })
288 return err
289}