Monorepo for Tangled
1package repo
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "net/http"
10 "net/url"
11 "slices"
12 "strings"
13 "time"
14
15 "tangled.org/core/api/tangled"
16 "tangled.org/core/appview/config"
17 "tangled.org/core/appview/db"
18 "tangled.org/core/appview/models"
19 "tangled.org/core/appview/notify"
20 "tangled.org/core/appview/oauth"
21 "tangled.org/core/appview/pages"
22 "tangled.org/core/appview/reporesolver"
23 "tangled.org/core/appview/validator"
24 xrpcclient "tangled.org/core/appview/xrpcclient"
25 "tangled.org/core/eventconsumer"
26 "tangled.org/core/idresolver"
27 "tangled.org/core/orm"
28 "tangled.org/core/rbac"
29 "tangled.org/core/tid"
30 "tangled.org/core/xrpc/serviceauth"
31
32 comatproto "github.com/bluesky-social/indigo/api/atproto"
33 atpclient "github.com/bluesky-social/indigo/atproto/client"
34 "github.com/bluesky-social/indigo/atproto/syntax"
35 lexutil "github.com/bluesky-social/indigo/lex/util"
36
37 "github.com/go-chi/chi/v5"
38)
39
40type Repo struct {
41 repoResolver *reporesolver.RepoResolver
42 idResolver *idresolver.Resolver
43 config *config.Config
44 oauth *oauth.OAuth
45 pages *pages.Pages
46 spindlestream *eventconsumer.Consumer
47 db *db.DB
48 enforcer *rbac.Enforcer
49 notifier notify.Notifier
50 logger *slog.Logger
51 serviceAuth *serviceauth.ServiceAuth
52 validator *validator.Validator
53}
54
55func New(
56 oauth *oauth.OAuth,
57 repoResolver *reporesolver.RepoResolver,
58 pages *pages.Pages,
59 spindlestream *eventconsumer.Consumer,
60 idResolver *idresolver.Resolver,
61 db *db.DB,
62 config *config.Config,
63 notifier notify.Notifier,
64 enforcer *rbac.Enforcer,
65 logger *slog.Logger,
66 validator *validator.Validator,
67) *Repo {
68 return &Repo{oauth: oauth,
69 repoResolver: repoResolver,
70 pages: pages,
71 idResolver: idResolver,
72 config: config,
73 spindlestream: spindlestream,
74 db: db,
75 notifier: notifier,
76 enforcer: enforcer,
77 logger: logger,
78 validator: validator,
79 }
80}
81
82// modify the spindle configured for this repo
83func (rp *Repo) EditSpindle(w http.ResponseWriter, r *http.Request) {
84 user := rp.oauth.GetMultiAccountUser(r)
85 l := rp.logger.With("handler", "EditSpindle")
86 l = l.With("did", user.Active.Did)
87
88 errorId := "operation-error"
89 fail := func(msg string, err error) {
90 l.Error(msg, "err", err)
91 rp.pages.Notice(w, errorId, msg)
92 }
93
94 f, err := rp.repoResolver.Resolve(r)
95 if err != nil {
96 fail("Failed to resolve repo. Try again later", err)
97 return
98 }
99
100 newSpindle := r.FormValue("spindle")
101 removingSpindle := newSpindle == "[[none]]" // see pages/templates/repo/settings/pipelines.html for more info on why we use this value
102 client, err := rp.oauth.AuthorizedClient(r)
103 if err != nil {
104 fail("Failed to authorize. Try again later.", err)
105 return
106 }
107
108 if !removingSpindle {
109 // ensure that this is a valid spindle for this user
110 validSpindles, err := rp.enforcer.GetSpindlesForUser(user.Active.Did)
111 if err != nil {
112 fail("Failed to find spindles. Try again later.", err)
113 return
114 }
115
116 if !slices.Contains(validSpindles, newSpindle) {
117 fail("Failed to configure spindle.", fmt.Errorf("%s is not a valid spindle: %q", newSpindle, validSpindles))
118 return
119 }
120 }
121
122 newRepo := *f
123 newRepo.Spindle = newSpindle
124 record := newRepo.AsRecord()
125
126 spindlePtr := &newSpindle
127 if removingSpindle {
128 spindlePtr = nil
129 newRepo.Spindle = ""
130 }
131
132 // optimistic update
133 err = db.UpdateSpindle(rp.db, newRepo.RepoAt().String(), spindlePtr)
134 if err != nil {
135 fail("Failed to update spindle. Try again later.", err)
136 return
137 }
138
139 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey)
140 if err != nil {
141 fail("Failed to update spindle, no record found on PDS.", err)
142 return
143 }
144 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
145 Collection: tangled.RepoNSID,
146 Repo: newRepo.Did,
147 Rkey: newRepo.Rkey,
148 SwapRecord: ex.Cid,
149 Record: &lexutil.LexiconTypeDecoder{
150 Val: &record,
151 },
152 })
153
154 if err != nil {
155 fail("Failed to update spindle, unable to save to PDS.", err)
156 return
157 }
158
159 if !removingSpindle {
160 // add this spindle to spindle stream
161 rp.spindlestream.AddSource(
162 context.Background(),
163 eventconsumer.NewSpindleSource(newSpindle),
164 )
165 }
166
167 rp.pages.HxRefresh(w)
168}
169
170func (rp *Repo) AddLabelDef(w http.ResponseWriter, r *http.Request) {
171 user := rp.oauth.GetMultiAccountUser(r)
172 l := rp.logger.With("handler", "AddLabel")
173 l = l.With("did", user.Active.Did)
174
175 f, err := rp.repoResolver.Resolve(r)
176 if err != nil {
177 l.Error("failed to get repo and knot", "err", err)
178 return
179 }
180
181 errorId := "add-label-error"
182 fail := func(msg string, err error) {
183 l.Error(msg, "err", err)
184 rp.pages.Notice(w, errorId, msg)
185 }
186
187 // get form values for label definition
188 name := r.FormValue("name")
189 concreteType := r.FormValue("valueType")
190 valueFormat := r.FormValue("valueFormat")
191 enumValues := r.FormValue("enumValues")
192 scope := r.Form["scope"]
193 color := r.FormValue("color")
194 multiple := r.FormValue("multiple") == "true"
195
196 var variants []string
197 for part := range strings.SplitSeq(enumValues, ",") {
198 if part = strings.TrimSpace(part); part != "" {
199 variants = append(variants, part)
200 }
201 }
202
203 if concreteType == "" {
204 concreteType = "null"
205 }
206
207 format := models.ValueTypeFormatAny
208 if valueFormat == "did" {
209 format = models.ValueTypeFormatDid
210 }
211
212 valueType := models.ValueType{
213 Type: models.ConcreteType(concreteType),
214 Format: format,
215 Enum: variants,
216 }
217
218 label := models.LabelDefinition{
219 Did: user.Active.Did,
220 Rkey: tid.TID(),
221 Name: name,
222 ValueType: valueType,
223 Scope: scope,
224 Color: &color,
225 Multiple: multiple,
226 Created: time.Now(),
227 }
228 if err := rp.validator.ValidateLabelDefinition(&label); err != nil {
229 fail(err.Error(), err)
230 return
231 }
232
233 // announce this relation into the firehose, store into owners' pds
234 client, err := rp.oauth.AuthorizedClient(r)
235 if err != nil {
236 fail(err.Error(), err)
237 return
238 }
239
240 // emit a labelRecord
241 labelRecord := label.AsRecord()
242 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
243 Collection: tangled.LabelDefinitionNSID,
244 Repo: label.Did,
245 Rkey: label.Rkey,
246 Record: &lexutil.LexiconTypeDecoder{
247 Val: &labelRecord,
248 },
249 })
250 // invalid record
251 if err != nil {
252 fail("Failed to write record to PDS.", err)
253 return
254 }
255
256 aturi := resp.Uri
257 l = l.With("at-uri", aturi)
258 l.Info("wrote label record to PDS")
259
260 // update the repo to subscribe to this label
261 newRepo := *f
262 newRepo.Labels = append(newRepo.Labels, aturi)
263 repoRecord := newRepo.AsRecord()
264
265 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey)
266 if err != nil {
267 fail("Failed to update labels, no record found on PDS.", err)
268 return
269 }
270 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
271 Collection: tangled.RepoNSID,
272 Repo: newRepo.Did,
273 Rkey: newRepo.Rkey,
274 SwapRecord: ex.Cid,
275 Record: &lexutil.LexiconTypeDecoder{
276 Val: &repoRecord,
277 },
278 })
279 if err != nil {
280 fail("Failed to update labels for repo.", err)
281 return
282 }
283
284 tx, err := rp.db.BeginTx(r.Context(), nil)
285 if err != nil {
286 fail("Failed to add label.", err)
287 return
288 }
289
290 rollback := func() {
291 err1 := tx.Rollback()
292 err2 := rollbackRecord(context.Background(), aturi, client)
293
294 // ignore txn complete errors, this is okay
295 if errors.Is(err1, sql.ErrTxDone) {
296 err1 = nil
297 }
298
299 if errs := errors.Join(err1, err2); errs != nil {
300 l.Error("failed to rollback changes", "errs", errs)
301 return
302 }
303 }
304 defer rollback()
305
306 _, err = db.AddLabelDefinition(tx, &label)
307 if err != nil {
308 fail("Failed to add label.", err)
309 return
310 }
311
312 if err = db.SubscribeLabel(tx, &models.RepoLabel{
313 RepoAt: f.RepoAt(),
314 LabelAt: label.AtUri(),
315 RepoDid: f.RepoDid,
316 }); err != nil {
317 fail("Failed to subscribe to label.", err)
318 return
319 }
320
321 err = tx.Commit()
322 if err != nil {
323 fail("Failed to add label.", err)
324 return
325 }
326
327 // clear aturi when everything is successful
328 aturi = ""
329
330 rp.pages.HxRefresh(w)
331}
332
333func (rp *Repo) DeleteLabelDef(w http.ResponseWriter, r *http.Request) {
334 user := rp.oauth.GetMultiAccountUser(r)
335 l := rp.logger.With("handler", "DeleteLabel")
336 l = l.With("did", user.Active.Did)
337
338 f, err := rp.repoResolver.Resolve(r)
339 if err != nil {
340 l.Error("failed to get repo and knot", "err", err)
341 return
342 }
343
344 errorId := "label-operation"
345 fail := func(msg string, err error) {
346 l.Error(msg, "err", err)
347 rp.pages.Notice(w, errorId, msg)
348 }
349
350 // get form values
351 labelId := r.FormValue("label-id")
352
353 label, err := db.GetLabelDefinition(rp.db, orm.FilterEq("id", labelId))
354 if err != nil {
355 fail("Failed to find label definition.", err)
356 return
357 }
358
359 client, err := rp.oauth.AuthorizedClient(r)
360 if err != nil {
361 fail(err.Error(), err)
362 return
363 }
364
365 // delete label record from PDS
366 _, err = comatproto.RepoDeleteRecord(r.Context(), client, &comatproto.RepoDeleteRecord_Input{
367 Collection: tangled.LabelDefinitionNSID,
368 Repo: label.Did,
369 Rkey: label.Rkey,
370 })
371 if err != nil {
372 fail("Failed to delete label record from PDS.", err)
373 return
374 }
375
376 // update repo record to remove the label reference
377 newRepo := *f
378 var updated []string
379 removedAt := label.AtUri().String()
380 for _, l := range newRepo.Labels {
381 if l != removedAt {
382 updated = append(updated, l)
383 }
384 }
385 newRepo.Labels = updated
386 repoRecord := newRepo.AsRecord()
387
388 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey)
389 if err != nil {
390 fail("Failed to update labels, no record found on PDS.", err)
391 return
392 }
393 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
394 Collection: tangled.RepoNSID,
395 Repo: newRepo.Did,
396 Rkey: newRepo.Rkey,
397 SwapRecord: ex.Cid,
398 Record: &lexutil.LexiconTypeDecoder{
399 Val: &repoRecord,
400 },
401 })
402 if err != nil {
403 fail("Failed to update repo record.", err)
404 return
405 }
406
407 // transaction for DB changes
408 tx, err := rp.db.BeginTx(r.Context(), nil)
409 if err != nil {
410 fail("Failed to delete label.", err)
411 return
412 }
413 defer tx.Rollback()
414
415 err = db.UnsubscribeLabel(
416 tx,
417 orm.FilterEq("repo_at", f.RepoAt()),
418 orm.FilterEq("label_at", removedAt),
419 )
420 if err != nil {
421 fail("Failed to unsubscribe label.", err)
422 return
423 }
424
425 err = db.DeleteLabelDefinition(tx, orm.FilterEq("id", label.Id))
426 if err != nil {
427 fail("Failed to delete label definition.", err)
428 return
429 }
430
431 err = tx.Commit()
432 if err != nil {
433 fail("Failed to delete label.", err)
434 return
435 }
436
437 // everything succeeded
438 rp.pages.HxRefresh(w)
439}
440
441func (rp *Repo) SubscribeLabel(w http.ResponseWriter, r *http.Request) {
442 user := rp.oauth.GetMultiAccountUser(r)
443 l := rp.logger.With("handler", "SubscribeLabel")
444 l = l.With("did", user.Active.Did)
445
446 f, err := rp.repoResolver.Resolve(r)
447 if err != nil {
448 l.Error("failed to get repo and knot", "err", err)
449 return
450 }
451
452 if err := r.ParseForm(); err != nil {
453 l.Error("invalid form", "err", err)
454 return
455 }
456
457 errorId := "default-label-operation"
458 fail := func(msg string, err error) {
459 l.Error(msg, "err", err)
460 rp.pages.Notice(w, errorId, msg)
461 }
462
463 labelAts := r.Form["label"]
464 _, err = db.GetLabelDefinitions(rp.db, orm.FilterIn("at_uri", labelAts))
465 if err != nil {
466 fail("Failed to subscribe to label.", err)
467 return
468 }
469
470 newRepo := *f
471 newRepo.Labels = append(newRepo.Labels, labelAts...)
472
473 // dedup
474 slices.Sort(newRepo.Labels)
475 newRepo.Labels = slices.Compact(newRepo.Labels)
476
477 repoRecord := newRepo.AsRecord()
478
479 client, err := rp.oauth.AuthorizedClient(r)
480 if err != nil {
481 fail(err.Error(), err)
482 return
483 }
484
485 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, f.Did, f.Rkey)
486 if err != nil {
487 fail("Failed to update labels, no record found on PDS.", err)
488 return
489 }
490 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
491 Collection: tangled.RepoNSID,
492 Repo: newRepo.Did,
493 Rkey: newRepo.Rkey,
494 SwapRecord: ex.Cid,
495 Record: &lexutil.LexiconTypeDecoder{
496 Val: &repoRecord,
497 },
498 })
499
500 tx, err := rp.db.Begin()
501 if err != nil {
502 fail("Failed to subscribe to label.", err)
503 return
504 }
505 defer tx.Rollback()
506
507 for _, l := range labelAts {
508 err = db.SubscribeLabel(tx, &models.RepoLabel{
509 RepoAt: f.RepoAt(),
510 LabelAt: syntax.ATURI(l),
511 RepoDid: f.RepoDid,
512 })
513 if err != nil {
514 fail("Failed to subscribe to label.", err)
515 return
516 }
517 }
518
519 if err := tx.Commit(); err != nil {
520 fail("Failed to subscribe to label.", err)
521 return
522 }
523
524 // everything succeeded
525 rp.pages.HxRefresh(w)
526}
527
528func (rp *Repo) UnsubscribeLabel(w http.ResponseWriter, r *http.Request) {
529 user := rp.oauth.GetMultiAccountUser(r)
530 l := rp.logger.With("handler", "UnsubscribeLabel")
531 l = l.With("did", user.Active.Did)
532
533 f, err := rp.repoResolver.Resolve(r)
534 if err != nil {
535 l.Error("failed to get repo and knot", "err", err)
536 return
537 }
538
539 if err := r.ParseForm(); err != nil {
540 l.Error("invalid form", "err", err)
541 return
542 }
543
544 errorId := "default-label-operation"
545 fail := func(msg string, err error) {
546 l.Error(msg, "err", err)
547 rp.pages.Notice(w, errorId, msg)
548 }
549
550 labelAts := r.Form["label"]
551 _, err = db.GetLabelDefinitions(rp.db, orm.FilterIn("at_uri", labelAts))
552 if err != nil {
553 fail("Failed to unsubscribe to label.", err)
554 return
555 }
556
557 // update repo record to remove the label reference
558 newRepo := *f
559 var updated []string
560 for _, l := range newRepo.Labels {
561 if !slices.Contains(labelAts, l) {
562 updated = append(updated, l)
563 }
564 }
565 newRepo.Labels = updated
566 repoRecord := newRepo.AsRecord()
567
568 client, err := rp.oauth.AuthorizedClient(r)
569 if err != nil {
570 fail(err.Error(), err)
571 return
572 }
573
574 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, f.Did, f.Rkey)
575 if err != nil {
576 fail("Failed to update labels, no record found on PDS.", err)
577 return
578 }
579 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
580 Collection: tangled.RepoNSID,
581 Repo: newRepo.Did,
582 Rkey: newRepo.Rkey,
583 SwapRecord: ex.Cid,
584 Record: &lexutil.LexiconTypeDecoder{
585 Val: &repoRecord,
586 },
587 })
588
589 err = db.UnsubscribeLabel(
590 rp.db,
591 orm.FilterEq("repo_at", f.RepoAt()),
592 orm.FilterIn("label_at", labelAts),
593 )
594 if err != nil {
595 fail("Failed to unsubscribe label.", err)
596 return
597 }
598
599 // everything succeeded
600 rp.pages.HxRefresh(w)
601}
602
603func (rp *Repo) LabelPanel(w http.ResponseWriter, r *http.Request) {
604 l := rp.logger.With("handler", "LabelPanel")
605
606 f, err := rp.repoResolver.Resolve(r)
607 if err != nil {
608 l.Error("failed to get repo and knot", "err", err)
609 return
610 }
611
612 subjectStr := r.FormValue("subject")
613 subject, err := syntax.ParseATURI(subjectStr)
614 if err != nil {
615 l.Error("failed to get repo and knot", "err", err)
616 return
617 }
618
619 labelDefs, err := db.GetLabelDefinitions(
620 rp.db,
621 orm.FilterIn("at_uri", f.Labels),
622 orm.FilterContains("scope", subject.Collection().String()),
623 )
624 if err != nil {
625 l.Error("failed to fetch label defs", "err", err)
626 return
627 }
628
629 defs := make(map[string]*models.LabelDefinition)
630 for _, l := range labelDefs {
631 defs[l.AtUri().String()] = &l
632 }
633
634 states, err := db.GetLabels(rp.db, orm.FilterEq("subject", subject))
635 if err != nil {
636 l.Error("failed to build label state", "err", err)
637 return
638 }
639 state := states[subject]
640
641 user := rp.oauth.GetMultiAccountUser(r)
642 rp.pages.LabelPanel(w, pages.LabelPanelParams{
643 LoggedInUser: user,
644 RepoInfo: rp.repoResolver.GetRepoInfo(r, user),
645 Defs: defs,
646 Subject: subject.String(),
647 State: state,
648 })
649}
650
651func (rp *Repo) EditLabelPanel(w http.ResponseWriter, r *http.Request) {
652 l := rp.logger.With("handler", "EditLabelPanel")
653
654 f, err := rp.repoResolver.Resolve(r)
655 if err != nil {
656 l.Error("failed to get repo and knot", "err", err)
657 return
658 }
659
660 subjectStr := r.FormValue("subject")
661 subject, err := syntax.ParseATURI(subjectStr)
662 if err != nil {
663 l.Error("failed to get repo and knot", "err", err)
664 return
665 }
666
667 labelDefs, err := db.GetLabelDefinitions(
668 rp.db,
669 orm.FilterIn("at_uri", f.Labels),
670 orm.FilterContains("scope", subject.Collection().String()),
671 )
672 if err != nil {
673 l.Error("failed to fetch labels", "err", err)
674 return
675 }
676
677 defs := make(map[string]*models.LabelDefinition)
678 for _, l := range labelDefs {
679 defs[l.AtUri().String()] = &l
680 }
681
682 states, err := db.GetLabels(rp.db, orm.FilterEq("subject", subject))
683 if err != nil {
684 l.Error("failed to build label state", "err", err)
685 return
686 }
687 state := states[subject]
688
689 user := rp.oauth.GetMultiAccountUser(r)
690 rp.pages.EditLabelPanel(w, pages.EditLabelPanelParams{
691 LoggedInUser: user,
692 RepoInfo: rp.repoResolver.GetRepoInfo(r, user),
693 Defs: defs,
694 Subject: subject.String(),
695 State: state,
696 })
697}
698
699func (rp *Repo) AddCollaborator(w http.ResponseWriter, r *http.Request) {
700 user := rp.oauth.GetMultiAccountUser(r)
701 l := rp.logger.With("handler", "AddCollaborator")
702 l = l.With("did", user.Active.Did)
703
704 f, err := rp.repoResolver.Resolve(r)
705 if err != nil {
706 l.Error("failed to get repo and knot", "err", err)
707 return
708 }
709
710 errorId := "add-collaborator-error"
711 fail := func(msg string, err error) {
712 l.Error(msg, "err", err)
713 rp.pages.Notice(w, errorId, msg)
714 }
715
716 collaborator := r.FormValue("collaborator")
717 if collaborator == "" {
718 fail("Invalid form.", nil)
719 return
720 }
721
722 // remove a single leading `@`, to make @handle work with ResolveIdent
723 collaborator = strings.TrimPrefix(collaborator, "@")
724
725 collaboratorIdent, err := rp.idResolver.ResolveIdent(r.Context(), collaborator)
726 if err != nil {
727 fail(fmt.Sprintf("'%s' is not a valid DID/handle.", collaborator), err)
728 return
729 }
730
731 if collaboratorIdent.DID.String() == user.Active.Did {
732 fail("You seem to be adding yourself as a collaborator.", nil)
733 return
734 }
735 l = l.With("collaborator", collaboratorIdent.Handle)
736 l = l.With("knot", f.Knot)
737
738 // announce this relation into the firehose, store into owners' pds
739 client, err := rp.oauth.AuthorizedClient(r)
740 if err != nil {
741 fail("Failed to write to PDS.", err)
742 return
743 }
744
745 // emit a record
746 currentUser := rp.oauth.GetMultiAccountUser(r)
747 rkey := tid.TID()
748 createdAt := time.Now()
749 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
750 Collection: tangled.RepoCollaboratorNSID,
751 Repo: currentUser.Active.Did,
752 Rkey: rkey,
753 Record: &lexutil.LexiconTypeDecoder{
754 Val: repoCollaboratorRecord(f, collaboratorIdent.DID.String(), createdAt),
755 },
756 })
757 // invalid record
758 if err != nil {
759 fail("Failed to write record to PDS.", err)
760 return
761 }
762
763 aturi := resp.Uri
764 l = l.With("at-uri", aturi)
765 l.Info("wrote record to PDS")
766
767 tx, err := rp.db.BeginTx(r.Context(), nil)
768 if err != nil {
769 fail("Failed to add collaborator.", err)
770 return
771 }
772
773 rollback := func() {
774 err1 := tx.Rollback()
775 err2 := rp.enforcer.E.LoadPolicy()
776 err3 := rollbackRecord(context.Background(), aturi, client)
777
778 // ignore txn complete errors, this is okay
779 if errors.Is(err1, sql.ErrTxDone) {
780 err1 = nil
781 }
782
783 if errs := errors.Join(err1, err2, err3); errs != nil {
784 l.Error("failed to rollback changes", "errs", errs)
785 return
786 }
787 }
788 defer rollback()
789
790 err = rp.enforcer.AddCollaborator(collaboratorIdent.DID.String(), f.Knot, f.RepoIdentifier())
791 if err != nil {
792 fail("Failed to add collaborator permissions.", err)
793 return
794 }
795
796 err = db.AddCollaborator(tx, models.Collaborator{
797 Did: syntax.DID(currentUser.Active.Did),
798 Rkey: rkey,
799 SubjectDid: collaboratorIdent.DID,
800 RepoAt: f.RepoAt(),
801 RepoDid: f.RepoDid,
802 Created: createdAt,
803 })
804 if err != nil {
805 fail("Failed to add collaborator.", err)
806 return
807 }
808
809 err = tx.Commit()
810 if err != nil {
811 fail("Failed to add collaborator.", err)
812 return
813 }
814
815 err = rp.enforcer.E.SavePolicy()
816 if err != nil {
817 fail("Failed to update collaborator permissions.", err)
818 return
819 }
820
821 // clear aturi to when everything is successful
822 aturi = ""
823
824 rp.pages.HxRefresh(w)
825}
826
827func (rp *Repo) DeleteRepo(w http.ResponseWriter, r *http.Request) {
828 user := rp.oauth.GetMultiAccountUser(r)
829 l := rp.logger.With("handler", "DeleteRepo")
830
831 noticeId := "operation-error"
832 f, err := rp.repoResolver.Resolve(r)
833 if err != nil {
834 l.Error("failed to get repo and knot", "err", err)
835 return
836 }
837
838 // remove record from pds
839 atpClient, err := rp.oauth.AuthorizedClient(r)
840 if err != nil {
841 l.Error("failed to get authorized client", "err", err)
842 return
843 }
844 _, err = comatproto.RepoDeleteRecord(r.Context(), atpClient, &comatproto.RepoDeleteRecord_Input{
845 Collection: tangled.RepoNSID,
846 Repo: user.Active.Did,
847 Rkey: f.Rkey,
848 })
849 if err != nil {
850 l.Error("failed to delete record", "err", err)
851 rp.pages.Notice(w, noticeId, "Failed to delete repository from PDS.")
852 return
853 }
854 l.Info("removed repo record", "aturi", f.RepoAt().String())
855
856 client, err := rp.oauth.ServiceClient(
857 r,
858 oauth.WithService(f.Knot),
859 oauth.WithLxm(tangled.RepoDeleteNSID),
860 oauth.WithDev(rp.config.Core.Dev),
861 )
862 if err != nil {
863 l.Error("failed to connect to knot server", "err", err)
864 return
865 }
866
867 err = tangled.RepoDelete(
868 r.Context(),
869 client,
870 &tangled.RepoDelete_Input{
871 Did: f.Did,
872 Name: f.Name,
873 Rkey: f.Rkey,
874 },
875 )
876 if err := xrpcclient.HandleXrpcErr(err); err != nil {
877 rp.pages.Notice(w, noticeId, err.Error())
878 return
879 }
880 l.Info("deleted repo from knot")
881
882 tx, err := rp.db.BeginTx(r.Context(), nil)
883 if err != nil {
884 l.Error("failed to start tx")
885 w.Write(fmt.Append(nil, "failed to add collaborator: ", err))
886 return
887 }
888 defer func() {
889 tx.Rollback()
890 err = rp.enforcer.E.LoadPolicy()
891 if err != nil {
892 l.Error("failed to rollback policies")
893 }
894 }()
895
896 // remove collaborator RBAC
897 repoCollaborators, err := rp.enforcer.E.GetImplicitUsersForResourceByDomain(f.RepoIdentifier(), f.Knot)
898 if err != nil {
899 rp.pages.Notice(w, noticeId, "Failed to remove collaborators")
900 return
901 }
902 for _, c := range repoCollaborators {
903 did := c[0]
904 rp.enforcer.RemoveCollaborator(did, f.Knot, f.RepoIdentifier())
905 }
906 l.Info("removed collaborators")
907
908 // remove repo RBAC
909 err = rp.enforcer.RemoveRepo(f.Did, f.Knot, f.RepoIdentifier())
910 if err != nil {
911 rp.pages.Notice(w, noticeId, "Failed to update RBAC rules")
912 return
913 }
914
915 // remove repo from db
916 err = db.RemoveRepo(tx, f.Did, f.Name)
917 if err != nil {
918 rp.pages.Notice(w, noticeId, "Failed to update appview")
919 return
920 }
921 l.Info("removed repo from db")
922
923 err = tx.Commit()
924 if err != nil {
925 l.Error("failed to commit changes", "err", err)
926 http.Error(w, err.Error(), http.StatusInternalServerError)
927 return
928 }
929
930 err = rp.enforcer.E.SavePolicy()
931 if err != nil {
932 l.Error("failed to update ACLs", "err", err)
933 http.Error(w, err.Error(), http.StatusInternalServerError)
934 return
935 }
936
937 rp.pages.HxRedirect(w, fmt.Sprintf("/%s", f.Did))
938}
939
940func (rp *Repo) SyncRepoFork(w http.ResponseWriter, r *http.Request) {
941 l := rp.logger.With("handler", "SyncRepoFork")
942
943 ref := chi.URLParam(r, "ref")
944 ref, _ = url.PathUnescape(ref)
945
946 user := rp.oauth.GetMultiAccountUser(r)
947 f, err := rp.repoResolver.Resolve(r)
948 if err != nil {
949 l.Error("failed to resolve source repo", "err", err)
950 return
951 }
952
953 switch r.Method {
954 case http.MethodPost:
955 client, err := rp.oauth.ServiceClient(
956 r,
957 oauth.WithService(f.Knot),
958 oauth.WithLxm(tangled.RepoForkSyncNSID),
959 oauth.WithDev(rp.config.Core.Dev),
960 )
961 if err != nil {
962 rp.pages.Notice(w, "repo", "Failed to connect to knot server.")
963 return
964 }
965
966 if f.Source == "" {
967 rp.pages.Notice(w, "repo", "This repository is not a fork.")
968 return
969 }
970
971 err = tangled.RepoForkSync(
972 r.Context(),
973 client,
974 &tangled.RepoForkSync_Input{
975 Did: user.Active.Did,
976 Name: f.Name,
977 Source: f.Source,
978 Branch: ref,
979 },
980 )
981 if err := xrpcclient.HandleXrpcErr(err); err != nil {
982 rp.pages.Notice(w, "repo", err.Error())
983 return
984 }
985
986 rp.pages.HxRefresh(w)
987 return
988 }
989}
990
991func (rp *Repo) ForkRepo(w http.ResponseWriter, r *http.Request) {
992 l := rp.logger.With("handler", "ForkRepo")
993
994 user := rp.oauth.GetMultiAccountUser(r)
995 f, err := rp.repoResolver.Resolve(r)
996 if err != nil {
997 l.Error("failed to resolve source repo", "err", err)
998 return
999 }
1000
1001 switch r.Method {
1002 case http.MethodGet:
1003 user := rp.oauth.GetMultiAccountUser(r)
1004 knots, err := rp.enforcer.GetKnotsForUser(user.Active.Did)
1005 if err != nil {
1006 rp.pages.Notice(w, "repo", "Invalid user account.")
1007 return
1008 }
1009
1010 rp.pages.ForkRepo(w, pages.ForkRepoParams{
1011 LoggedInUser: user,
1012 Knots: knots,
1013 RepoInfo: rp.repoResolver.GetRepoInfo(r, user),
1014 })
1015
1016 case http.MethodPost:
1017 l := rp.logger.With("handler", "ForkRepo")
1018
1019 targetKnot := r.FormValue("knot")
1020 if targetKnot == "" {
1021 rp.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
1022 return
1023 }
1024 l = l.With("targetKnot", targetKnot)
1025
1026 ok, err := rp.enforcer.E.Enforce(user.Active.Did, targetKnot, targetKnot, "repo:create")
1027 if err != nil || !ok {
1028 rp.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
1029 return
1030 }
1031
1032 // choose a name for a fork
1033 forkName := r.FormValue("repo_name")
1034 if forkName == "" {
1035 rp.pages.Notice(w, "repo", "Repository name cannot be empty.")
1036 return
1037 }
1038
1039 // this check is *only* to see if the forked repo name already exists
1040 // in the user's account.
1041 existingRepo, err := db.GetRepo(
1042 rp.db,
1043 orm.FilterEq("did", user.Active.Did),
1044 orm.FilterEq("name", forkName),
1045 )
1046 if err != nil {
1047 if !errors.Is(err, sql.ErrNoRows) {
1048 l.Error("error fetching existing repo from db", "err", err)
1049 rp.pages.Notice(w, "repo", "Failed to fork this repository. Try again later.")
1050 return
1051 }
1052 } else if existingRepo != nil {
1053 // repo with this name already exists
1054 rp.pages.Notice(w, "repo", "A repository with this name already exists.")
1055 return
1056 }
1057 l = l.With("forkName", forkName)
1058
1059 uri := "https"
1060 if rp.config.Core.Dev {
1061 uri = "http"
1062 }
1063
1064 forkSourceUrl := fmt.Sprintf("%s://%s/%s", uri, f.Knot, f.RepoIdentifier())
1065 l = l.With("cloneUrl", forkSourceUrl)
1066
1067 rkey := tid.TID()
1068
1069 // TODO: this could coordinate better with the knot to recieve a clone status
1070 client, err := rp.oauth.ServiceClient(
1071 r,
1072 oauth.WithService(targetKnot),
1073 oauth.WithLxm(tangled.RepoCreateNSID),
1074 oauth.WithDev(rp.config.Core.Dev),
1075 oauth.WithTimeout(time.Second*20),
1076 )
1077 if err != nil {
1078 l.Error("could not create service client", "err", err)
1079 rp.pages.Notice(w, "repo", "Failed to connect to knot server.")
1080 return
1081 }
1082
1083 forkInput := &tangled.RepoCreate_Input{
1084 Rkey: rkey,
1085 Name: forkName,
1086 Source: &forkSourceUrl,
1087 }
1088 if rd := strings.TrimSpace(r.FormValue("repo_did")); rd != "" {
1089 forkInput.RepoDid = &rd
1090 }
1091
1092 createResp, createErr := tangled.RepoCreate(
1093 r.Context(),
1094 client,
1095 forkInput,
1096 )
1097 if err := xrpcclient.HandleXrpcErr(createErr); err != nil {
1098 rp.pages.Notice(w, "repo", err.Error())
1099 return
1100 }
1101
1102 var repoDid string
1103 if createResp != nil && createResp.RepoDid != nil {
1104 repoDid = *createResp.RepoDid
1105 }
1106 if repoDid == "" {
1107 l.Error("knot returned empty repo DID for fork")
1108 rp.pages.Notice(w, "repo", "Knot failed to mint a repo DID. The knot may need to be upgraded.")
1109 return
1110 }
1111
1112 forkSource := f.RepoAt().String()
1113 if f.RepoDid != "" {
1114 forkSource = f.RepoDid
1115 }
1116
1117 repo := &models.Repo{
1118 Did: user.Active.Did,
1119 Name: forkName,
1120 Knot: targetKnot,
1121 Rkey: rkey,
1122 Source: forkSource,
1123 Description: f.Description,
1124 Created: time.Now(),
1125 Labels: rp.config.Label.DefaultLabelDefs,
1126 RepoDid: repoDid,
1127 }
1128 record := repo.AsRecord()
1129
1130 cleanupKnot := func() {
1131 go func() {
1132 delays := []time.Duration{0, 2 * time.Second, 5 * time.Second}
1133 for attempt, delay := range delays {
1134 time.Sleep(delay)
1135 deleteClient, dErr := rp.oauth.ServiceClient(
1136 r,
1137 oauth.WithService(targetKnot),
1138 oauth.WithLxm(tangled.RepoDeleteNSID),
1139 oauth.WithDev(rp.config.Core.Dev),
1140 )
1141 if dErr != nil {
1142 l.Error("failed to create delete client for knot cleanup", "attempt", attempt+1, "err", dErr)
1143 continue
1144 }
1145 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
1146 if dErr := tangled.RepoDelete(ctx, deleteClient, &tangled.RepoDelete_Input{
1147 Did: user.Active.Did,
1148 Name: forkName,
1149 Rkey: rkey,
1150 }); dErr != nil {
1151 cancel()
1152 l.Error("failed to clean up fork on knot after rollback", "attempt", attempt+1, "err", dErr)
1153 continue
1154 }
1155 cancel()
1156 l.Info("successfully cleaned up fork on knot after rollback", "attempt", attempt+1)
1157 return
1158 }
1159 l.Error("exhausted retries for knot cleanup, fork may be orphaned",
1160 "did", user.Active.Did, "fork", forkName, "knot", targetKnot)
1161 }()
1162 }
1163
1164 atpClient, err := rp.oauth.AuthorizedClient(r)
1165 if err != nil {
1166 l.Error("failed to create xrpcclient", "err", err)
1167 cleanupKnot()
1168 rp.pages.Notice(w, "repo", "Failed to fork repository.")
1169 return
1170 }
1171
1172 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{
1173 Collection: tangled.RepoNSID,
1174 Repo: user.Active.Did,
1175 Rkey: rkey,
1176 Record: &lexutil.LexiconTypeDecoder{
1177 Val: &record,
1178 },
1179 })
1180 if err != nil {
1181 l.Error("failed to write to PDS", "err", err)
1182 cleanupKnot()
1183 rp.pages.Notice(w, "repo", "Failed to announce repository creation.")
1184 return
1185 }
1186
1187 aturi := atresp.Uri
1188 l = l.With("aturi", aturi)
1189 l.Info("wrote to PDS")
1190
1191 tx, err := rp.db.BeginTx(r.Context(), nil)
1192 if err != nil {
1193 l.Info("txn failed", "err", err)
1194 rp.pages.Notice(w, "repo", "Failed to save repository information.")
1195 return
1196 }
1197
1198 rollback := func() {
1199 err1 := tx.Rollback()
1200 err2 := rp.enforcer.E.LoadPolicy()
1201 err3 := rollbackRecord(context.Background(), aturi, atpClient)
1202
1203 if errors.Is(err1, sql.ErrTxDone) {
1204 err1 = nil
1205 }
1206
1207 if errs := errors.Join(err1, err2, err3); errs != nil {
1208 l.Error("failed to rollback changes", "errs", errs)
1209 }
1210
1211 if aturi != "" {
1212 cleanupKnot()
1213 }
1214 }
1215 defer rollback()
1216
1217 err = db.AddRepo(tx, repo)
1218 if err != nil {
1219 l.Error("failed to AddRepo", "err", err)
1220 rp.pages.Notice(w, "repo", "Failed to save repository information.")
1221 return
1222 }
1223
1224 rbacPath := repo.RepoIdentifier()
1225 err = rp.enforcer.AddRepo(user.Active.Did, targetKnot, rbacPath)
1226 if err != nil {
1227 l.Error("failed to add ACLs", "err", err)
1228 rp.pages.Notice(w, "repo", "Failed to set up repository permissions.")
1229 return
1230 }
1231
1232 err = tx.Commit()
1233 if err != nil {
1234 l.Error("failed to commit changes", "err", err)
1235 http.Error(w, err.Error(), http.StatusInternalServerError)
1236 return
1237 }
1238
1239 err = rp.enforcer.E.SavePolicy()
1240 if err != nil {
1241 l.Error("failed to update ACLs", "err", err)
1242 http.Error(w, err.Error(), http.StatusInternalServerError)
1243 return
1244 }
1245
1246 aturi = ""
1247
1248 rp.notifier.NewRepo(r.Context(), repo)
1249 if repoDid != "" {
1250 rp.pages.HxLocation(w, fmt.Sprintf("/%s", repoDid))
1251 } else {
1252 rp.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Active.Did, forkName))
1253 }
1254 }
1255}
1256
1257// this is used to rollback changes made to the PDS
1258//
1259// it is a no-op if the provided ATURI is empty
1260func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error {
1261 if aturi == "" {
1262 return nil
1263 }
1264
1265 parsed := syntax.ATURI(aturi)
1266
1267 collection := parsed.Collection().String()
1268 repo := parsed.Authority().String()
1269 rkey := parsed.RecordKey().String()
1270
1271 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{
1272 Collection: collection,
1273 Repo: repo,
1274 Rkey: rkey,
1275 })
1276 return err
1277}
1278
1279func repoCollaboratorRecord(f *models.Repo, subject string, createdAt time.Time) *tangled.RepoCollaborator {
1280 rec := &tangled.RepoCollaborator{
1281 Subject: subject,
1282 CreatedAt: createdAt.Format(time.RFC3339),
1283 }
1284 if f.RepoDid != "" {
1285 rec.RepoDid = &f.RepoDid
1286 } else {
1287 s := string(f.RepoAt())
1288 rec.Repo = &s
1289 }
1290 return rec
1291}