this repo has no description
1package labels 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "time" 11 12 comatproto "github.com/bluesky-social/indigo/api/atproto" 13 "github.com/bluesky-social/indigo/atproto/syntax" 14 lexutil "github.com/bluesky-social/indigo/lex/util" 15 "github.com/go-chi/chi/v5" 16 17 "tangled.sh/tangled.sh/core/api/tangled" 18 "tangled.sh/tangled.sh/core/appview/config" 19 "tangled.sh/tangled.sh/core/appview/db" 20 "tangled.sh/tangled.sh/core/appview/middleware" 21 "tangled.sh/tangled.sh/core/appview/oauth" 22 "tangled.sh/tangled.sh/core/appview/pages" 23 "tangled.sh/tangled.sh/core/appview/reporesolver" 24 "tangled.sh/tangled.sh/core/appview/xrpcclient" 25 "tangled.sh/tangled.sh/core/eventconsumer" 26 "tangled.sh/tangled.sh/core/idresolver" 27 "tangled.sh/tangled.sh/core/log" 28 "tangled.sh/tangled.sh/core/rbac" 29 "tangled.sh/tangled.sh/core/tid" 30) 31 32type Labels struct { 33 repoResolver *reporesolver.RepoResolver 34 idResolver *idresolver.Resolver 35 oauth *oauth.OAuth 36 pages *pages.Pages 37 db *db.DB 38 logger *slog.Logger 39} 40 41func New( 42 oauth *oauth.OAuth, 43 repoResolver *reporesolver.RepoResolver, 44 pages *pages.Pages, 45 spindlestream *eventconsumer.Consumer, 46 idResolver *idresolver.Resolver, 47 db *db.DB, 48 config *config.Config, 49 enforcer *rbac.Enforcer, 50) *Labels { 51 logger := log.New("labels") 52 53 return &Labels{ 54 oauth: oauth, 55 repoResolver: repoResolver, 56 pages: pages, 57 idResolver: idResolver, 58 db: db, 59 logger: logger, 60 } 61} 62 63func (l *Labels) Router(mw *middleware.Middleware) http.Handler { 64 r := chi.NewRouter() 65 66 r.With(middleware.AuthMiddleware(l.oauth)).Put("/perform", l.PerformLabelOp) 67 68 return r 69} 70 71func (l *Labels) PerformLabelOp(w http.ResponseWriter, r *http.Request) { 72 user := l.oauth.GetUser(r) 73 74 if err := r.ParseForm(); err != nil { 75 l.logger.Error("failed to parse form data", "error", err) 76 http.Error(w, "Invalid form data", http.StatusBadRequest) 77 return 78 } 79 80 did := user.Did 81 rkey := tid.TID() 82 performedAt := time.Now() 83 indexedAt := time.Now() 84 repoAt := r.Form.Get("repo") 85 subjectUri := r.Form.Get("subject") 86 keys := r.Form["operand-key"] 87 vals := r.Form["operand-val"] 88 89 var labelOps []db.LabelOp 90 for i := range len(keys) { 91 op := r.FormValue(fmt.Sprintf("op-%d", i)) 92 if op == "" { 93 op = string(db.LabelOperationDel) 94 } 95 key := keys[i] 96 val := vals[i] 97 98 labelOps = append(labelOps, db.LabelOp{ 99 Did: did, 100 Rkey: rkey, 101 Subject: syntax.ATURI(subjectUri), 102 Operation: db.LabelOperation(op), 103 OperandKey: key, 104 OperandValue: val, 105 PerformedAt: performedAt, 106 IndexedAt: indexedAt, 107 }) 108 } 109 110 // TODO: validate the operations 111 112 // find all the labels that this repo subscribes to 113 repoLabels, err := db.GetRepoLabels(l.db, db.FilterEq("repo_at", repoAt)) 114 if err != nil { 115 http.Error(w, "Invalid form data", http.StatusBadRequest) 116 return 117 } 118 119 var labelAts []string 120 for _, rl := range repoLabels { 121 labelAts = append(labelAts, rl.LabelAt.String()) 122 } 123 124 actx, err := db.NewLabelApplicationCtx(l.db, db.FilterIn("at_uri", labelAts)) 125 if err != nil { 126 http.Error(w, "Invalid form data", http.StatusBadRequest) 127 return 128 } 129 130 // calculate the start state by applying already known labels 131 existingOps, err := db.GetLabelOps(l.db, db.FilterEq("subject", subjectUri)) 132 if err != nil { 133 http.Error(w, "Invalid form data", http.StatusBadRequest) 134 return 135 } 136 137 labelState := db.NewLabelState() 138 actx.ApplyLabelOps(labelState, existingOps) 139 140 // next, apply all ops introduced in this request and filter out ones that are no-ops 141 validLabelOps := labelOps[:0] 142 for _, op := range labelOps { 143 if err = actx.ApplyLabelOp(labelState, op); err != db.LabelNoOpError { 144 validLabelOps = append(validLabelOps, op) 145 } 146 } 147 148 // nothing to do 149 if len(validLabelOps) == 0 { 150 l.pages.HxRefresh(w) 151 return 152 } 153 154 // create an atproto record of valid ops 155 record := db.LabelOpsAsRecord(validLabelOps) 156 157 client, err := l.oauth.AuthorizedClient(r) 158 if err != nil { 159 l.logger.Error("failed to create client", "error", err) 160 http.Error(w, "Invalid form data", http.StatusBadRequest) 161 return 162 } 163 164 resp, err := client.RepoPutRecord(r.Context(), &comatproto.RepoPutRecord_Input{ 165 Collection: tangled.LabelOpNSID, 166 Repo: did, 167 Rkey: rkey, 168 Record: &lexutil.LexiconTypeDecoder{ 169 Val: &record, 170 }, 171 }) 172 if err != nil { 173 l.logger.Error("failed to write to PDS", "error", err) 174 http.Error(w, "failed to write to PDS", http.StatusInternalServerError) 175 return 176 } 177 atUri := resp.Uri 178 179 tx, err := l.db.BeginTx(r.Context(), nil) 180 if err != nil { 181 l.logger.Error("failed to start tx", "error", err) 182 return 183 } 184 185 rollback := func() { 186 err1 := tx.Rollback() 187 err2 := rollbackRecord(context.Background(), atUri, client) 188 189 // ignore txn complete errors, this is okay 190 if errors.Is(err1, sql.ErrTxDone) { 191 err1 = nil 192 } 193 194 if errs := errors.Join(err1, err2); errs != nil { 195 return 196 } 197 } 198 defer rollback() 199 200 for _, o := range validLabelOps { 201 if _, err := db.AddLabelOp(l.db, &o); err != nil { 202 l.logger.Error("failed to add op", "err", err) 203 return 204 } 205 206 l.logger.Info("performed label op", "did", o.Did, "rkey", o.Rkey, "kind", o.Operation, "subjcet", o.Subject, "key", o.OperandKey) 207 } 208 209 err = tx.Commit() 210 if err != nil { 211 return 212 } 213 214 // clear aturi when everything is successful 215 atUri = "" 216 217 l.pages.HxRefresh(w) 218} 219 220// this is used to rollback changes made to the PDS 221// 222// it is a no-op if the provided ATURI is empty 223func rollbackRecord(ctx context.Context, aturi string, xrpcc *xrpcclient.Client) error { 224 if aturi == "" { 225 return nil 226 } 227 228 parsed := syntax.ATURI(aturi) 229 230 collection := parsed.Collection().String() 231 repo := parsed.Authority().String() 232 rkey := parsed.RecordKey().String() 233 234 _, err := xrpcc.RepoDeleteRecord(ctx, &comatproto.RepoDeleteRecord_Input{ 235 Collection: collection, 236 Repo: repo, 237 Rkey: rkey, 238 }) 239 return err 240}