Monorepo for Tangled
1package repo
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "net/http"
10 "net/url"
11 "slices"
12 "strings"
13 "time"
14
15 "tangled.org/core/api/tangled"
16 "tangled.org/core/appview/config"
17 "tangled.org/core/appview/db"
18 "tangled.org/core/appview/models"
19 "tangled.org/core/appview/notify"
20 "tangled.org/core/appview/oauth"
21 "tangled.org/core/appview/pages"
22 "tangled.org/core/appview/reporesolver"
23 "tangled.org/core/appview/validator"
24 xrpcclient "tangled.org/core/appview/xrpcclient"
25 "tangled.org/core/eventconsumer"
26 "tangled.org/core/idresolver"
27 "tangled.org/core/orm"
28 "tangled.org/core/rbac"
29 "tangled.org/core/tid"
30 "tangled.org/core/xrpc/serviceauth"
31
32 comatproto "github.com/bluesky-social/indigo/api/atproto"
33 atpclient "github.com/bluesky-social/indigo/atproto/client"
34 "github.com/bluesky-social/indigo/atproto/syntax"
35 lexutil "github.com/bluesky-social/indigo/lex/util"
36 securejoin "github.com/cyphar/filepath-securejoin"
37 "github.com/go-chi/chi/v5"
38)
39
40type Repo struct {
41 repoResolver *reporesolver.RepoResolver
42 idResolver *idresolver.Resolver
43 config *config.Config
44 oauth *oauth.OAuth
45 pages *pages.Pages
46 spindlestream *eventconsumer.Consumer
47 db *db.DB
48 enforcer *rbac.Enforcer
49 notifier notify.Notifier
50 logger *slog.Logger
51 serviceAuth *serviceauth.ServiceAuth
52 validator *validator.Validator
53}
54
55func New(
56 oauth *oauth.OAuth,
57 repoResolver *reporesolver.RepoResolver,
58 pages *pages.Pages,
59 spindlestream *eventconsumer.Consumer,
60 idResolver *idresolver.Resolver,
61 db *db.DB,
62 config *config.Config,
63 notifier notify.Notifier,
64 enforcer *rbac.Enforcer,
65 logger *slog.Logger,
66 validator *validator.Validator,
67) *Repo {
68 return &Repo{oauth: oauth,
69 repoResolver: repoResolver,
70 pages: pages,
71 idResolver: idResolver,
72 config: config,
73 spindlestream: spindlestream,
74 db: db,
75 notifier: notifier,
76 enforcer: enforcer,
77 logger: logger,
78 validator: validator,
79 }
80}
81
82// modify the spindle configured for this repo
83func (rp *Repo) EditSpindle(w http.ResponseWriter, r *http.Request) {
84 user := rp.oauth.GetMultiAccountUser(r)
85 l := rp.logger.With("handler", "EditSpindle")
86 l = l.With("did", user.Active.Did)
87
88 errorId := "operation-error"
89 fail := func(msg string, err error) {
90 l.Error(msg, "err", err)
91 rp.pages.Notice(w, errorId, msg)
92 }
93
94 f, err := rp.repoResolver.Resolve(r)
95 if err != nil {
96 fail("Failed to resolve repo. Try again later", err)
97 return
98 }
99
100 newSpindle := r.FormValue("spindle")
101 removingSpindle := newSpindle == "[[none]]" // see pages/templates/repo/settings/pipelines.html for more info on why we use this value
102 client, err := rp.oauth.AuthorizedClient(r)
103 if err != nil {
104 fail("Failed to authorize. Try again later.", err)
105 return
106 }
107
108 if !removingSpindle {
109 // ensure that this is a valid spindle for this user
110 validSpindles, err := rp.enforcer.GetSpindlesForUser(user.Active.Did)
111 if err != nil {
112 fail("Failed to find spindles. Try again later.", err)
113 return
114 }
115
116 if !slices.Contains(validSpindles, newSpindle) {
117 fail("Failed to configure spindle.", fmt.Errorf("%s is not a valid spindle: %q", newSpindle, validSpindles))
118 return
119 }
120 }
121
122 newRepo := *f
123 newRepo.Spindle = newSpindle
124 record := newRepo.AsRecord()
125
126 spindlePtr := &newSpindle
127 if removingSpindle {
128 spindlePtr = nil
129 newRepo.Spindle = ""
130 }
131
132 // optimistic update
133 err = db.UpdateSpindle(rp.db, newRepo.RepoAt().String(), spindlePtr)
134 if err != nil {
135 fail("Failed to update spindle. Try again later.", err)
136 return
137 }
138
139 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey)
140 if err != nil {
141 fail("Failed to update spindle, no record found on PDS.", err)
142 return
143 }
144 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
145 Collection: tangled.RepoNSID,
146 Repo: newRepo.Did,
147 Rkey: newRepo.Rkey,
148 SwapRecord: ex.Cid,
149 Record: &lexutil.LexiconTypeDecoder{
150 Val: &record,
151 },
152 })
153
154 if err != nil {
155 fail("Failed to update spindle, unable to save to PDS.", err)
156 return
157 }
158
159 if !removingSpindle {
160 // add this spindle to spindle stream
161 rp.spindlestream.AddSource(
162 context.Background(),
163 eventconsumer.NewSpindleSource(newSpindle),
164 )
165 }
166
167 rp.pages.HxRefresh(w)
168}
169
170func (rp *Repo) AddLabelDef(w http.ResponseWriter, r *http.Request) {
171 user := rp.oauth.GetMultiAccountUser(r)
172 l := rp.logger.With("handler", "AddLabel")
173 l = l.With("did", user.Active.Did)
174
175 f, err := rp.repoResolver.Resolve(r)
176 if err != nil {
177 l.Error("failed to get repo and knot", "err", err)
178 return
179 }
180
181 errorId := "add-label-error"
182 fail := func(msg string, err error) {
183 l.Error(msg, "err", err)
184 rp.pages.Notice(w, errorId, msg)
185 }
186
187 // get form values for label definition
188 name := r.FormValue("name")
189 concreteType := r.FormValue("valueType")
190 valueFormat := r.FormValue("valueFormat")
191 enumValues := r.FormValue("enumValues")
192 scope := r.Form["scope"]
193 color := r.FormValue("color")
194 multiple := r.FormValue("multiple") == "true"
195
196 var variants []string
197 for part := range strings.SplitSeq(enumValues, ",") {
198 if part = strings.TrimSpace(part); part != "" {
199 variants = append(variants, part)
200 }
201 }
202
203 if concreteType == "" {
204 concreteType = "null"
205 }
206
207 format := models.ValueTypeFormatAny
208 if valueFormat == "did" {
209 format = models.ValueTypeFormatDid
210 }
211
212 valueType := models.ValueType{
213 Type: models.ConcreteType(concreteType),
214 Format: format,
215 Enum: variants,
216 }
217
218 label := models.LabelDefinition{
219 Did: user.Active.Did,
220 Rkey: tid.TID(),
221 Name: name,
222 ValueType: valueType,
223 Scope: scope,
224 Color: &color,
225 Multiple: multiple,
226 Created: time.Now(),
227 }
228 if err := rp.validator.ValidateLabelDefinition(&label); err != nil {
229 fail(err.Error(), err)
230 return
231 }
232
233 // announce this relation into the firehose, store into owners' pds
234 client, err := rp.oauth.AuthorizedClient(r)
235 if err != nil {
236 fail(err.Error(), err)
237 return
238 }
239
240 // emit a labelRecord
241 labelRecord := label.AsRecord()
242 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
243 Collection: tangled.LabelDefinitionNSID,
244 Repo: label.Did,
245 Rkey: label.Rkey,
246 Record: &lexutil.LexiconTypeDecoder{
247 Val: &labelRecord,
248 },
249 })
250 // invalid record
251 if err != nil {
252 fail("Failed to write record to PDS.", err)
253 return
254 }
255
256 aturi := resp.Uri
257 l = l.With("at-uri", aturi)
258 l.Info("wrote label record to PDS")
259
260 // update the repo to subscribe to this label
261 newRepo := *f
262 newRepo.Labels = append(newRepo.Labels, aturi)
263 repoRecord := newRepo.AsRecord()
264
265 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey)
266 if err != nil {
267 fail("Failed to update labels, no record found on PDS.", err)
268 return
269 }
270 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
271 Collection: tangled.RepoNSID,
272 Repo: newRepo.Did,
273 Rkey: newRepo.Rkey,
274 SwapRecord: ex.Cid,
275 Record: &lexutil.LexiconTypeDecoder{
276 Val: &repoRecord,
277 },
278 })
279 if err != nil {
280 fail("Failed to update labels for repo.", err)
281 return
282 }
283
284 tx, err := rp.db.BeginTx(r.Context(), nil)
285 if err != nil {
286 fail("Failed to add label.", err)
287 return
288 }
289
290 rollback := func() {
291 err1 := tx.Rollback()
292 err2 := rollbackRecord(context.Background(), aturi, client)
293
294 // ignore txn complete errors, this is okay
295 if errors.Is(err1, sql.ErrTxDone) {
296 err1 = nil
297 }
298
299 if errs := errors.Join(err1, err2); errs != nil {
300 l.Error("failed to rollback changes", "errs", errs)
301 return
302 }
303 }
304 defer rollback()
305
306 _, err = db.AddLabelDefinition(tx, &label)
307 if err != nil {
308 fail("Failed to add label.", err)
309 return
310 }
311
312 err = db.SubscribeLabel(tx, &models.RepoLabel{
313 RepoAt: f.RepoAt(),
314 LabelAt: label.AtUri(),
315 RepoDid: f.RepoDid,
316 })
317
318 err = tx.Commit()
319 if err != nil {
320 fail("Failed to add label.", err)
321 return
322 }
323
324 // clear aturi when everything is successful
325 aturi = ""
326
327 rp.pages.HxRefresh(w)
328}
329
330func (rp *Repo) DeleteLabelDef(w http.ResponseWriter, r *http.Request) {
331 user := rp.oauth.GetMultiAccountUser(r)
332 l := rp.logger.With("handler", "DeleteLabel")
333 l = l.With("did", user.Active.Did)
334
335 f, err := rp.repoResolver.Resolve(r)
336 if err != nil {
337 l.Error("failed to get repo and knot", "err", err)
338 return
339 }
340
341 errorId := "label-operation"
342 fail := func(msg string, err error) {
343 l.Error(msg, "err", err)
344 rp.pages.Notice(w, errorId, msg)
345 }
346
347 // get form values
348 labelId := r.FormValue("label-id")
349
350 label, err := db.GetLabelDefinition(rp.db, orm.FilterEq("id", labelId))
351 if err != nil {
352 fail("Failed to find label definition.", err)
353 return
354 }
355
356 client, err := rp.oauth.AuthorizedClient(r)
357 if err != nil {
358 fail(err.Error(), err)
359 return
360 }
361
362 // delete label record from PDS
363 _, err = comatproto.RepoDeleteRecord(r.Context(), client, &comatproto.RepoDeleteRecord_Input{
364 Collection: tangled.LabelDefinitionNSID,
365 Repo: label.Did,
366 Rkey: label.Rkey,
367 })
368 if err != nil {
369 fail("Failed to delete label record from PDS.", err)
370 return
371 }
372
373 // update repo record to remove the label reference
374 newRepo := *f
375 var updated []string
376 removedAt := label.AtUri().String()
377 for _, l := range newRepo.Labels {
378 if l != removedAt {
379 updated = append(updated, l)
380 }
381 }
382 newRepo.Labels = updated
383 repoRecord := newRepo.AsRecord()
384
385 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey)
386 if err != nil {
387 fail("Failed to update labels, no record found on PDS.", err)
388 return
389 }
390 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
391 Collection: tangled.RepoNSID,
392 Repo: newRepo.Did,
393 Rkey: newRepo.Rkey,
394 SwapRecord: ex.Cid,
395 Record: &lexutil.LexiconTypeDecoder{
396 Val: &repoRecord,
397 },
398 })
399 if err != nil {
400 fail("Failed to update repo record.", err)
401 return
402 }
403
404 // transaction for DB changes
405 tx, err := rp.db.BeginTx(r.Context(), nil)
406 if err != nil {
407 fail("Failed to delete label.", err)
408 return
409 }
410 defer tx.Rollback()
411
412 err = db.UnsubscribeLabel(
413 tx,
414 orm.FilterEq("repo_at", f.RepoAt()),
415 orm.FilterEq("label_at", removedAt),
416 )
417 if err != nil {
418 fail("Failed to unsubscribe label.", err)
419 return
420 }
421
422 err = db.DeleteLabelDefinition(tx, orm.FilterEq("id", label.Id))
423 if err != nil {
424 fail("Failed to delete label definition.", err)
425 return
426 }
427
428 err = tx.Commit()
429 if err != nil {
430 fail("Failed to delete label.", err)
431 return
432 }
433
434 // everything succeeded
435 rp.pages.HxRefresh(w)
436}
437
438func (rp *Repo) SubscribeLabel(w http.ResponseWriter, r *http.Request) {
439 user := rp.oauth.GetMultiAccountUser(r)
440 l := rp.logger.With("handler", "SubscribeLabel")
441 l = l.With("did", user.Active.Did)
442
443 f, err := rp.repoResolver.Resolve(r)
444 if err != nil {
445 l.Error("failed to get repo and knot", "err", err)
446 return
447 }
448
449 if err := r.ParseForm(); err != nil {
450 l.Error("invalid form", "err", err)
451 return
452 }
453
454 errorId := "default-label-operation"
455 fail := func(msg string, err error) {
456 l.Error(msg, "err", err)
457 rp.pages.Notice(w, errorId, msg)
458 }
459
460 labelAts := r.Form["label"]
461 _, err = db.GetLabelDefinitions(rp.db, orm.FilterIn("at_uri", labelAts))
462 if err != nil {
463 fail("Failed to subscribe to label.", err)
464 return
465 }
466
467 newRepo := *f
468 newRepo.Labels = append(newRepo.Labels, labelAts...)
469
470 // dedup
471 slices.Sort(newRepo.Labels)
472 newRepo.Labels = slices.Compact(newRepo.Labels)
473
474 repoRecord := newRepo.AsRecord()
475
476 client, err := rp.oauth.AuthorizedClient(r)
477 if err != nil {
478 fail(err.Error(), err)
479 return
480 }
481
482 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, f.Did, f.Rkey)
483 if err != nil {
484 fail("Failed to update labels, no record found on PDS.", err)
485 return
486 }
487 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
488 Collection: tangled.RepoNSID,
489 Repo: newRepo.Did,
490 Rkey: newRepo.Rkey,
491 SwapRecord: ex.Cid,
492 Record: &lexutil.LexiconTypeDecoder{
493 Val: &repoRecord,
494 },
495 })
496
497 tx, err := rp.db.Begin()
498 if err != nil {
499 fail("Failed to subscribe to label.", err)
500 return
501 }
502 defer tx.Rollback()
503
504 for _, l := range labelAts {
505 err = db.SubscribeLabel(tx, &models.RepoLabel{
506 RepoAt: f.RepoAt(),
507 LabelAt: syntax.ATURI(l),
508 RepoDid: f.RepoDid,
509 })
510 if err != nil {
511 fail("Failed to subscribe to label.", err)
512 return
513 }
514 }
515
516 if err := tx.Commit(); err != nil {
517 fail("Failed to subscribe to label.", err)
518 return
519 }
520
521 // everything succeeded
522 rp.pages.HxRefresh(w)
523}
524
525func (rp *Repo) UnsubscribeLabel(w http.ResponseWriter, r *http.Request) {
526 user := rp.oauth.GetMultiAccountUser(r)
527 l := rp.logger.With("handler", "UnsubscribeLabel")
528 l = l.With("did", user.Active.Did)
529
530 f, err := rp.repoResolver.Resolve(r)
531 if err != nil {
532 l.Error("failed to get repo and knot", "err", err)
533 return
534 }
535
536 if err := r.ParseForm(); err != nil {
537 l.Error("invalid form", "err", err)
538 return
539 }
540
541 errorId := "default-label-operation"
542 fail := func(msg string, err error) {
543 l.Error(msg, "err", err)
544 rp.pages.Notice(w, errorId, msg)
545 }
546
547 labelAts := r.Form["label"]
548 _, err = db.GetLabelDefinitions(rp.db, orm.FilterIn("at_uri", labelAts))
549 if err != nil {
550 fail("Failed to unsubscribe to label.", err)
551 return
552 }
553
554 // update repo record to remove the label reference
555 newRepo := *f
556 var updated []string
557 for _, l := range newRepo.Labels {
558 if !slices.Contains(labelAts, l) {
559 updated = append(updated, l)
560 }
561 }
562 newRepo.Labels = updated
563 repoRecord := newRepo.AsRecord()
564
565 client, err := rp.oauth.AuthorizedClient(r)
566 if err != nil {
567 fail(err.Error(), err)
568 return
569 }
570
571 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, f.Did, f.Rkey)
572 if err != nil {
573 fail("Failed to update labels, no record found on PDS.", err)
574 return
575 }
576 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
577 Collection: tangled.RepoNSID,
578 Repo: newRepo.Did,
579 Rkey: newRepo.Rkey,
580 SwapRecord: ex.Cid,
581 Record: &lexutil.LexiconTypeDecoder{
582 Val: &repoRecord,
583 },
584 })
585
586 err = db.UnsubscribeLabel(
587 rp.db,
588 orm.FilterEq("repo_at", f.RepoAt()),
589 orm.FilterIn("label_at", labelAts),
590 )
591 if err != nil {
592 fail("Failed to unsubscribe label.", err)
593 return
594 }
595
596 // everything succeeded
597 rp.pages.HxRefresh(w)
598}
599
600func (rp *Repo) LabelPanel(w http.ResponseWriter, r *http.Request) {
601 l := rp.logger.With("handler", "LabelPanel")
602
603 f, err := rp.repoResolver.Resolve(r)
604 if err != nil {
605 l.Error("failed to get repo and knot", "err", err)
606 return
607 }
608
609 subjectStr := r.FormValue("subject")
610 subject, err := syntax.ParseATURI(subjectStr)
611 if err != nil {
612 l.Error("failed to get repo and knot", "err", err)
613 return
614 }
615
616 labelDefs, err := db.GetLabelDefinitions(
617 rp.db,
618 orm.FilterIn("at_uri", f.Labels),
619 orm.FilterContains("scope", subject.Collection().String()),
620 )
621 if err != nil {
622 l.Error("failed to fetch label defs", "err", err)
623 return
624 }
625
626 defs := make(map[string]*models.LabelDefinition)
627 for _, l := range labelDefs {
628 defs[l.AtUri().String()] = &l
629 }
630
631 states, err := db.GetLabels(rp.db, orm.FilterEq("subject", subject))
632 if err != nil {
633 l.Error("failed to build label state", "err", err)
634 return
635 }
636 state := states[subject]
637
638 user := rp.oauth.GetMultiAccountUser(r)
639 rp.pages.LabelPanel(w, pages.LabelPanelParams{
640 LoggedInUser: user,
641 RepoInfo: rp.repoResolver.GetRepoInfo(r, user),
642 Defs: defs,
643 Subject: subject.String(),
644 State: state,
645 })
646}
647
648func (rp *Repo) EditLabelPanel(w http.ResponseWriter, r *http.Request) {
649 l := rp.logger.With("handler", "EditLabelPanel")
650
651 f, err := rp.repoResolver.Resolve(r)
652 if err != nil {
653 l.Error("failed to get repo and knot", "err", err)
654 return
655 }
656
657 subjectStr := r.FormValue("subject")
658 subject, err := syntax.ParseATURI(subjectStr)
659 if err != nil {
660 l.Error("failed to get repo and knot", "err", err)
661 return
662 }
663
664 labelDefs, err := db.GetLabelDefinitions(
665 rp.db,
666 orm.FilterIn("at_uri", f.Labels),
667 orm.FilterContains("scope", subject.Collection().String()),
668 )
669 if err != nil {
670 l.Error("failed to fetch labels", "err", err)
671 return
672 }
673
674 defs := make(map[string]*models.LabelDefinition)
675 for _, l := range labelDefs {
676 defs[l.AtUri().String()] = &l
677 }
678
679 states, err := db.GetLabels(rp.db, orm.FilterEq("subject", subject))
680 if err != nil {
681 l.Error("failed to build label state", "err", err)
682 return
683 }
684 state := states[subject]
685
686 user := rp.oauth.GetMultiAccountUser(r)
687 rp.pages.EditLabelPanel(w, pages.EditLabelPanelParams{
688 LoggedInUser: user,
689 RepoInfo: rp.repoResolver.GetRepoInfo(r, user),
690 Defs: defs,
691 Subject: subject.String(),
692 State: state,
693 })
694}
695
696func (rp *Repo) AddCollaborator(w http.ResponseWriter, r *http.Request) {
697 user := rp.oauth.GetMultiAccountUser(r)
698 l := rp.logger.With("handler", "AddCollaborator")
699 l = l.With("did", user.Active.Did)
700
701 f, err := rp.repoResolver.Resolve(r)
702 if err != nil {
703 l.Error("failed to get repo and knot", "err", err)
704 return
705 }
706
707 errorId := "add-collaborator-error"
708 fail := func(msg string, err error) {
709 l.Error(msg, "err", err)
710 rp.pages.Notice(w, errorId, msg)
711 }
712
713 collaborator := r.FormValue("collaborator")
714 if collaborator == "" {
715 fail("Invalid form.", nil)
716 return
717 }
718
719 // remove a single leading `@`, to make @handle work with ResolveIdent
720 collaborator = strings.TrimPrefix(collaborator, "@")
721
722 collaboratorIdent, err := rp.idResolver.ResolveIdent(r.Context(), collaborator)
723 if err != nil {
724 fail(fmt.Sprintf("'%s' is not a valid DID/handle.", collaborator), err)
725 return
726 }
727
728 if collaboratorIdent.DID.String() == user.Active.Did {
729 fail("You seem to be adding yourself as a collaborator.", nil)
730 return
731 }
732 l = l.With("collaborator", collaboratorIdent.Handle)
733 l = l.With("knot", f.Knot)
734
735 // announce this relation into the firehose, store into owners' pds
736 client, err := rp.oauth.AuthorizedClient(r)
737 if err != nil {
738 fail("Failed to write to PDS.", err)
739 return
740 }
741
742 // emit a record
743 currentUser := rp.oauth.GetMultiAccountUser(r)
744 rkey := tid.TID()
745 createdAt := time.Now()
746 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
747 Collection: tangled.RepoCollaboratorNSID,
748 Repo: currentUser.Active.Did,
749 Rkey: rkey,
750 Record: &lexutil.LexiconTypeDecoder{
751 Val: &tangled.RepoCollaborator{
752 Subject: collaboratorIdent.DID.String(),
753 Repo: string(f.RepoAt()),
754 CreatedAt: createdAt.Format(time.RFC3339),
755 }},
756 })
757 // invalid record
758 if err != nil {
759 fail("Failed to write record to PDS.", err)
760 return
761 }
762
763 aturi := resp.Uri
764 l = l.With("at-uri", aturi)
765 l.Info("wrote record to PDS")
766
767 tx, err := rp.db.BeginTx(r.Context(), nil)
768 if err != nil {
769 fail("Failed to add collaborator.", err)
770 return
771 }
772
773 rollback := func() {
774 err1 := tx.Rollback()
775 err2 := rp.enforcer.E.LoadPolicy()
776 err3 := rollbackRecord(context.Background(), aturi, client)
777
778 // ignore txn complete errors, this is okay
779 if errors.Is(err1, sql.ErrTxDone) {
780 err1 = nil
781 }
782
783 if errs := errors.Join(err1, err2, err3); errs != nil {
784 l.Error("failed to rollback changes", "errs", errs)
785 return
786 }
787 }
788 defer rollback()
789
790 err = rp.enforcer.AddCollaborator(collaboratorIdent.DID.String(), f.Knot, f.DidSlashRepo())
791 if err != nil {
792 fail("Failed to add collaborator permissions.", err)
793 return
794 }
795
796 err = db.AddCollaborator(tx, models.Collaborator{
797 Did: syntax.DID(currentUser.Active.Did),
798 Rkey: rkey,
799 SubjectDid: collaboratorIdent.DID,
800 RepoAt: f.RepoAt(),
801 Created: createdAt,
802 })
803 if err != nil {
804 fail("Failed to add collaborator.", err)
805 return
806 }
807
808 err = tx.Commit()
809 if err != nil {
810 fail("Failed to add collaborator.", err)
811 return
812 }
813
814 err = rp.enforcer.E.SavePolicy()
815 if err != nil {
816 fail("Failed to update collaborator permissions.", err)
817 return
818 }
819
820 // clear aturi to when everything is successful
821 aturi = ""
822
823 rp.pages.HxRefresh(w)
824}
825
826func (rp *Repo) DeleteRepo(w http.ResponseWriter, r *http.Request) {
827 user := rp.oauth.GetMultiAccountUser(r)
828 l := rp.logger.With("handler", "DeleteRepo")
829
830 noticeId := "operation-error"
831 f, err := rp.repoResolver.Resolve(r)
832 if err != nil {
833 l.Error("failed to get repo and knot", "err", err)
834 return
835 }
836
837 // remove record from pds
838 atpClient, err := rp.oauth.AuthorizedClient(r)
839 if err != nil {
840 l.Error("failed to get authorized client", "err", err)
841 return
842 }
843 _, err = comatproto.RepoDeleteRecord(r.Context(), atpClient, &comatproto.RepoDeleteRecord_Input{
844 Collection: tangled.RepoNSID,
845 Repo: user.Active.Did,
846 Rkey: f.Rkey,
847 })
848 if err != nil {
849 l.Error("failed to delete record", "err", err)
850 rp.pages.Notice(w, noticeId, "Failed to delete repository from PDS.")
851 return
852 }
853 l.Info("removed repo record", "aturi", f.RepoAt().String())
854
855 client, err := rp.oauth.ServiceClient(
856 r,
857 oauth.WithService(f.Knot),
858 oauth.WithLxm(tangled.RepoDeleteNSID),
859 oauth.WithDev(rp.config.Core.Dev),
860 )
861 if err != nil {
862 l.Error("failed to connect to knot server", "err", err)
863 return
864 }
865
866 err = tangled.RepoDelete(
867 r.Context(),
868 client,
869 &tangled.RepoDelete_Input{
870 Did: f.Did,
871 Name: f.Name,
872 Rkey: f.Rkey,
873 },
874 )
875 if err := xrpcclient.HandleXrpcErr(err); err != nil {
876 rp.pages.Notice(w, noticeId, err.Error())
877 return
878 }
879 l.Info("deleted repo from knot")
880
881 tx, err := rp.db.BeginTx(r.Context(), nil)
882 if err != nil {
883 l.Error("failed to start tx")
884 w.Write(fmt.Append(nil, "failed to add collaborator: ", err))
885 return
886 }
887 defer func() {
888 tx.Rollback()
889 err = rp.enforcer.E.LoadPolicy()
890 if err != nil {
891 l.Error("failed to rollback policies")
892 }
893 }()
894
895 // remove collaborator RBAC
896 repoCollaborators, err := rp.enforcer.E.GetImplicitUsersForResourceByDomain(f.DidSlashRepo(), f.Knot)
897 if err != nil {
898 rp.pages.Notice(w, noticeId, "Failed to remove collaborators")
899 return
900 }
901 for _, c := range repoCollaborators {
902 did := c[0]
903 rp.enforcer.RemoveCollaborator(did, f.Knot, f.DidSlashRepo())
904 }
905 l.Info("removed collaborators")
906
907 // remove repo RBAC
908 err = rp.enforcer.RemoveRepo(f.Did, f.Knot, f.DidSlashRepo())
909 if err != nil {
910 rp.pages.Notice(w, noticeId, "Failed to update RBAC rules")
911 return
912 }
913
914 // remove repo from db
915 err = db.RemoveRepo(tx, f.Did, f.Name)
916 if err != nil {
917 rp.pages.Notice(w, noticeId, "Failed to update appview")
918 return
919 }
920 l.Info("removed repo from db")
921
922 err = tx.Commit()
923 if err != nil {
924 l.Error("failed to commit changes", "err", err)
925 http.Error(w, err.Error(), http.StatusInternalServerError)
926 return
927 }
928
929 err = rp.enforcer.E.SavePolicy()
930 if err != nil {
931 l.Error("failed to update ACLs", "err", err)
932 http.Error(w, err.Error(), http.StatusInternalServerError)
933 return
934 }
935
936 rp.pages.HxRedirect(w, fmt.Sprintf("/%s", f.Did))
937}
938
939func (rp *Repo) SyncRepoFork(w http.ResponseWriter, r *http.Request) {
940 l := rp.logger.With("handler", "SyncRepoFork")
941
942 ref := chi.URLParam(r, "ref")
943 ref, _ = url.PathUnescape(ref)
944
945 user := rp.oauth.GetMultiAccountUser(r)
946 f, err := rp.repoResolver.Resolve(r)
947 if err != nil {
948 l.Error("failed to resolve source repo", "err", err)
949 return
950 }
951
952 switch r.Method {
953 case http.MethodPost:
954 client, err := rp.oauth.ServiceClient(
955 r,
956 oauth.WithService(f.Knot),
957 oauth.WithLxm(tangled.RepoForkSyncNSID),
958 oauth.WithDev(rp.config.Core.Dev),
959 )
960 if err != nil {
961 rp.pages.Notice(w, "repo", "Failed to connect to knot server.")
962 return
963 }
964
965 if f.Source == "" {
966 rp.pages.Notice(w, "repo", "This repository is not a fork.")
967 return
968 }
969
970 err = tangled.RepoForkSync(
971 r.Context(),
972 client,
973 &tangled.RepoForkSync_Input{
974 Did: user.Active.Did,
975 Name: f.Name,
976 Source: f.Source,
977 Branch: ref,
978 },
979 )
980 if err := xrpcclient.HandleXrpcErr(err); err != nil {
981 rp.pages.Notice(w, "repo", err.Error())
982 return
983 }
984
985 rp.pages.HxRefresh(w)
986 return
987 }
988}
989
990func (rp *Repo) ForkRepo(w http.ResponseWriter, r *http.Request) {
991 l := rp.logger.With("handler", "ForkRepo")
992
993 user := rp.oauth.GetMultiAccountUser(r)
994 f, err := rp.repoResolver.Resolve(r)
995 if err != nil {
996 l.Error("failed to resolve source repo", "err", err)
997 return
998 }
999
1000 switch r.Method {
1001 case http.MethodGet:
1002 user := rp.oauth.GetMultiAccountUser(r)
1003 knots, err := rp.enforcer.GetKnotsForUser(user.Active.Did)
1004 if err != nil {
1005 rp.pages.Notice(w, "repo", "Invalid user account.")
1006 return
1007 }
1008
1009 rp.pages.ForkRepo(w, pages.ForkRepoParams{
1010 LoggedInUser: user,
1011 Knots: knots,
1012 RepoInfo: rp.repoResolver.GetRepoInfo(r, user),
1013 })
1014
1015 case http.MethodPost:
1016 l := rp.logger.With("handler", "ForkRepo")
1017
1018 targetKnot := r.FormValue("knot")
1019 if targetKnot == "" {
1020 rp.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
1021 return
1022 }
1023 l = l.With("targetKnot", targetKnot)
1024
1025 ok, err := rp.enforcer.E.Enforce(user.Active.Did, targetKnot, targetKnot, "repo:create")
1026 if err != nil || !ok {
1027 rp.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
1028 return
1029 }
1030
1031 // choose a name for a fork
1032 forkName := r.FormValue("repo_name")
1033 if forkName == "" {
1034 rp.pages.Notice(w, "repo", "Repository name cannot be empty.")
1035 return
1036 }
1037
1038 // this check is *only* to see if the forked repo name already exists
1039 // in the user's account.
1040 existingRepo, err := db.GetRepo(
1041 rp.db,
1042 orm.FilterEq("did", user.Active.Did),
1043 orm.FilterEq("name", forkName),
1044 )
1045 if err != nil {
1046 if !errors.Is(err, sql.ErrNoRows) {
1047 l.Error("error fetching existing repo from db", "err", err)
1048 rp.pages.Notice(w, "repo", "Failed to fork this repository. Try again later.")
1049 return
1050 }
1051 } else if existingRepo != nil {
1052 // repo with this name already exists
1053 rp.pages.Notice(w, "repo", "A repository with this name already exists.")
1054 return
1055 }
1056 l = l.With("forkName", forkName)
1057
1058 uri := "https"
1059 if rp.config.Core.Dev {
1060 uri = "http"
1061 }
1062
1063 forkSourceUrl := fmt.Sprintf("%s://%s/%s/%s", uri, f.Knot, f.Did, f.Name)
1064 l = l.With("cloneUrl", forkSourceUrl)
1065
1066 sourceAt := f.RepoAt().String()
1067
1068 // create an atproto record for this fork
1069 rkey := tid.TID()
1070 repo := &models.Repo{
1071 Did: user.Active.Did,
1072 Name: forkName,
1073 Knot: targetKnot,
1074 Rkey: rkey,
1075 Source: sourceAt,
1076 Description: f.Description,
1077 Created: time.Now(),
1078 Labels: rp.config.Label.DefaultLabelDefs,
1079 }
1080 record := repo.AsRecord()
1081
1082 atpClient, err := rp.oauth.AuthorizedClient(r)
1083 if err != nil {
1084 l.Error("failed to create xrpcclient", "err", err)
1085 rp.pages.Notice(w, "repo", "Failed to fork repository.")
1086 return
1087 }
1088
1089 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{
1090 Collection: tangled.RepoNSID,
1091 Repo: user.Active.Did,
1092 Rkey: rkey,
1093 Record: &lexutil.LexiconTypeDecoder{
1094 Val: &record,
1095 },
1096 })
1097 if err != nil {
1098 l.Error("failed to write to PDS", "err", err)
1099 rp.pages.Notice(w, "repo", "Failed to announce repository creation.")
1100 return
1101 }
1102
1103 aturi := atresp.Uri
1104 l = l.With("aturi", aturi)
1105 l.Info("wrote to PDS")
1106
1107 tx, err := rp.db.BeginTx(r.Context(), nil)
1108 if err != nil {
1109 l.Info("txn failed", "err", err)
1110 rp.pages.Notice(w, "repo", "Failed to save repository information.")
1111 return
1112 }
1113
1114 // The rollback function reverts a few things on failure:
1115 // - the pending txn
1116 // - the ACLs
1117 // - the atproto record created
1118 rollback := func() {
1119 err1 := tx.Rollback()
1120 err2 := rp.enforcer.E.LoadPolicy()
1121 err3 := rollbackRecord(context.Background(), aturi, atpClient)
1122
1123 // ignore txn complete errors, this is okay
1124 if errors.Is(err1, sql.ErrTxDone) {
1125 err1 = nil
1126 }
1127
1128 if errs := errors.Join(err1, err2, err3); errs != nil {
1129 l.Error("failed to rollback changes", "errs", errs)
1130 return
1131 }
1132 }
1133 defer rollback()
1134
1135 // TODO: this could coordinate better with the knot to recieve a clone status
1136 client, err := rp.oauth.ServiceClient(
1137 r,
1138 oauth.WithService(targetKnot),
1139 oauth.WithLxm(tangled.RepoCreateNSID),
1140 oauth.WithDev(rp.config.Core.Dev),
1141 oauth.WithTimeout(time.Second*20), // big repos take time to clone
1142 )
1143 if err != nil {
1144 l.Error("could not create service client", "err", err)
1145 rp.pages.Notice(w, "repo", "Failed to connect to knot server.")
1146 return
1147 }
1148
1149 err = tangled.RepoCreate(
1150 r.Context(),
1151 client,
1152 &tangled.RepoCreate_Input{
1153 Rkey: rkey,
1154 Source: &forkSourceUrl,
1155 },
1156 )
1157 if err := xrpcclient.HandleXrpcErr(err); err != nil {
1158 rp.pages.Notice(w, "repo", err.Error())
1159 return
1160 }
1161
1162 err = db.AddRepo(tx, repo)
1163 if err != nil {
1164 l.Error("failed to AddRepo", "err", err)
1165 rp.pages.Notice(w, "repo", "Failed to save repository information.")
1166 return
1167 }
1168
1169 // acls
1170 p, _ := securejoin.SecureJoin(user.Active.Did, forkName)
1171 err = rp.enforcer.AddRepo(user.Active.Did, targetKnot, p)
1172 if err != nil {
1173 l.Error("failed to add ACLs", "err", err)
1174 rp.pages.Notice(w, "repo", "Failed to set up repository permissions.")
1175 return
1176 }
1177
1178 err = tx.Commit()
1179 if err != nil {
1180 l.Error("failed to commit changes", "err", err)
1181 http.Error(w, err.Error(), http.StatusInternalServerError)
1182 return
1183 }
1184
1185 err = rp.enforcer.E.SavePolicy()
1186 if err != nil {
1187 l.Error("failed to update ACLs", "err", err)
1188 http.Error(w, err.Error(), http.StatusInternalServerError)
1189 return
1190 }
1191
1192 // reset the ATURI because the transaction completed successfully
1193 aturi = ""
1194
1195 rp.notifier.NewRepo(r.Context(), repo)
1196 rp.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Active.Did, forkName))
1197 }
1198}
1199
1200// this is used to rollback changes made to the PDS
1201//
1202// it is a no-op if the provided ATURI is empty
1203func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error {
1204 if aturi == "" {
1205 return nil
1206 }
1207
1208 parsed := syntax.ATURI(aturi)
1209
1210 collection := parsed.Collection().String()
1211 repo := parsed.Authority().String()
1212 rkey := parsed.RecordKey().String()
1213
1214 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{
1215 Collection: collection,
1216 Repo: repo,
1217 Rkey: rkey,
1218 })
1219 return err
1220}