Monorepo for Tangled
at push-pkuzytwlwptp 1220 lines 30 kB view raw
1package repo 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "net/url" 11 "slices" 12 "strings" 13 "time" 14 15 "tangled.org/core/api/tangled" 16 "tangled.org/core/appview/config" 17 "tangled.org/core/appview/db" 18 "tangled.org/core/appview/models" 19 "tangled.org/core/appview/notify" 20 "tangled.org/core/appview/oauth" 21 "tangled.org/core/appview/pages" 22 "tangled.org/core/appview/reporesolver" 23 "tangled.org/core/appview/validator" 24 xrpcclient "tangled.org/core/appview/xrpcclient" 25 "tangled.org/core/eventconsumer" 26 "tangled.org/core/idresolver" 27 "tangled.org/core/orm" 28 "tangled.org/core/rbac" 29 "tangled.org/core/tid" 30 "tangled.org/core/xrpc/serviceauth" 31 32 comatproto "github.com/bluesky-social/indigo/api/atproto" 33 atpclient "github.com/bluesky-social/indigo/atproto/client" 34 "github.com/bluesky-social/indigo/atproto/syntax" 35 lexutil "github.com/bluesky-social/indigo/lex/util" 36 securejoin "github.com/cyphar/filepath-securejoin" 37 "github.com/go-chi/chi/v5" 38) 39 40type Repo struct { 41 repoResolver *reporesolver.RepoResolver 42 idResolver *idresolver.Resolver 43 config *config.Config 44 oauth *oauth.OAuth 45 pages *pages.Pages 46 spindlestream *eventconsumer.Consumer 47 db *db.DB 48 enforcer *rbac.Enforcer 49 notifier notify.Notifier 50 logger *slog.Logger 51 serviceAuth *serviceauth.ServiceAuth 52 validator *validator.Validator 53} 54 55func New( 56 oauth *oauth.OAuth, 57 repoResolver *reporesolver.RepoResolver, 58 pages *pages.Pages, 59 spindlestream *eventconsumer.Consumer, 60 idResolver *idresolver.Resolver, 61 db *db.DB, 62 config *config.Config, 63 notifier notify.Notifier, 64 enforcer *rbac.Enforcer, 65 logger *slog.Logger, 66 validator *validator.Validator, 67) *Repo { 68 return &Repo{oauth: oauth, 69 repoResolver: repoResolver, 70 pages: pages, 71 idResolver: idResolver, 72 config: config, 73 spindlestream: spindlestream, 74 db: db, 75 notifier: notifier, 76 enforcer: enforcer, 77 logger: logger, 78 validator: validator, 79 } 80} 81 82// modify the spindle configured for this repo 83func (rp *Repo) EditSpindle(w http.ResponseWriter, r *http.Request) { 84 user := rp.oauth.GetMultiAccountUser(r) 85 l := rp.logger.With("handler", "EditSpindle") 86 l = l.With("did", user.Active.Did) 87 88 errorId := "operation-error" 89 fail := func(msg string, err error) { 90 l.Error(msg, "err", err) 91 rp.pages.Notice(w, errorId, msg) 92 } 93 94 f, err := rp.repoResolver.Resolve(r) 95 if err != nil { 96 fail("Failed to resolve repo. Try again later", err) 97 return 98 } 99 100 newSpindle := r.FormValue("spindle") 101 removingSpindle := newSpindle == "[[none]]" // see pages/templates/repo/settings/pipelines.html for more info on why we use this value 102 client, err := rp.oauth.AuthorizedClient(r) 103 if err != nil { 104 fail("Failed to authorize. Try again later.", err) 105 return 106 } 107 108 if !removingSpindle { 109 // ensure that this is a valid spindle for this user 110 validSpindles, err := rp.enforcer.GetSpindlesForUser(user.Active.Did) 111 if err != nil { 112 fail("Failed to find spindles. Try again later.", err) 113 return 114 } 115 116 if !slices.Contains(validSpindles, newSpindle) { 117 fail("Failed to configure spindle.", fmt.Errorf("%s is not a valid spindle: %q", newSpindle, validSpindles)) 118 return 119 } 120 } 121 122 newRepo := *f 123 newRepo.Spindle = newSpindle 124 record := newRepo.AsRecord() 125 126 spindlePtr := &newSpindle 127 if removingSpindle { 128 spindlePtr = nil 129 newRepo.Spindle = "" 130 } 131 132 // optimistic update 133 err = db.UpdateSpindle(rp.db, newRepo.RepoAt().String(), spindlePtr) 134 if err != nil { 135 fail("Failed to update spindle. Try again later.", err) 136 return 137 } 138 139 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey) 140 if err != nil { 141 fail("Failed to update spindle, no record found on PDS.", err) 142 return 143 } 144 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 145 Collection: tangled.RepoNSID, 146 Repo: newRepo.Did, 147 Rkey: newRepo.Rkey, 148 SwapRecord: ex.Cid, 149 Record: &lexutil.LexiconTypeDecoder{ 150 Val: &record, 151 }, 152 }) 153 154 if err != nil { 155 fail("Failed to update spindle, unable to save to PDS.", err) 156 return 157 } 158 159 if !removingSpindle { 160 // add this spindle to spindle stream 161 rp.spindlestream.AddSource( 162 context.Background(), 163 eventconsumer.NewSpindleSource(newSpindle), 164 ) 165 } 166 167 rp.pages.HxRefresh(w) 168} 169 170func (rp *Repo) AddLabelDef(w http.ResponseWriter, r *http.Request) { 171 user := rp.oauth.GetMultiAccountUser(r) 172 l := rp.logger.With("handler", "AddLabel") 173 l = l.With("did", user.Active.Did) 174 175 f, err := rp.repoResolver.Resolve(r) 176 if err != nil { 177 l.Error("failed to get repo and knot", "err", err) 178 return 179 } 180 181 errorId := "add-label-error" 182 fail := func(msg string, err error) { 183 l.Error(msg, "err", err) 184 rp.pages.Notice(w, errorId, msg) 185 } 186 187 // get form values for label definition 188 name := r.FormValue("name") 189 concreteType := r.FormValue("valueType") 190 valueFormat := r.FormValue("valueFormat") 191 enumValues := r.FormValue("enumValues") 192 scope := r.Form["scope"] 193 color := r.FormValue("color") 194 multiple := r.FormValue("multiple") == "true" 195 196 var variants []string 197 for part := range strings.SplitSeq(enumValues, ",") { 198 if part = strings.TrimSpace(part); part != "" { 199 variants = append(variants, part) 200 } 201 } 202 203 if concreteType == "" { 204 concreteType = "null" 205 } 206 207 format := models.ValueTypeFormatAny 208 if valueFormat == "did" { 209 format = models.ValueTypeFormatDid 210 } 211 212 valueType := models.ValueType{ 213 Type: models.ConcreteType(concreteType), 214 Format: format, 215 Enum: variants, 216 } 217 218 label := models.LabelDefinition{ 219 Did: user.Active.Did, 220 Rkey: tid.TID(), 221 Name: name, 222 ValueType: valueType, 223 Scope: scope, 224 Color: &color, 225 Multiple: multiple, 226 Created: time.Now(), 227 } 228 if err := rp.validator.ValidateLabelDefinition(&label); err != nil { 229 fail(err.Error(), err) 230 return 231 } 232 233 // announce this relation into the firehose, store into owners' pds 234 client, err := rp.oauth.AuthorizedClient(r) 235 if err != nil { 236 fail(err.Error(), err) 237 return 238 } 239 240 // emit a labelRecord 241 labelRecord := label.AsRecord() 242 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 243 Collection: tangled.LabelDefinitionNSID, 244 Repo: label.Did, 245 Rkey: label.Rkey, 246 Record: &lexutil.LexiconTypeDecoder{ 247 Val: &labelRecord, 248 }, 249 }) 250 // invalid record 251 if err != nil { 252 fail("Failed to write record to PDS.", err) 253 return 254 } 255 256 aturi := resp.Uri 257 l = l.With("at-uri", aturi) 258 l.Info("wrote label record to PDS") 259 260 // update the repo to subscribe to this label 261 newRepo := *f 262 newRepo.Labels = append(newRepo.Labels, aturi) 263 repoRecord := newRepo.AsRecord() 264 265 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey) 266 if err != nil { 267 fail("Failed to update labels, no record found on PDS.", err) 268 return 269 } 270 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 271 Collection: tangled.RepoNSID, 272 Repo: newRepo.Did, 273 Rkey: newRepo.Rkey, 274 SwapRecord: ex.Cid, 275 Record: &lexutil.LexiconTypeDecoder{ 276 Val: &repoRecord, 277 }, 278 }) 279 if err != nil { 280 fail("Failed to update labels for repo.", err) 281 return 282 } 283 284 tx, err := rp.db.BeginTx(r.Context(), nil) 285 if err != nil { 286 fail("Failed to add label.", err) 287 return 288 } 289 290 rollback := func() { 291 err1 := tx.Rollback() 292 err2 := rollbackRecord(context.Background(), aturi, client) 293 294 // ignore txn complete errors, this is okay 295 if errors.Is(err1, sql.ErrTxDone) { 296 err1 = nil 297 } 298 299 if errs := errors.Join(err1, err2); errs != nil { 300 l.Error("failed to rollback changes", "errs", errs) 301 return 302 } 303 } 304 defer rollback() 305 306 _, err = db.AddLabelDefinition(tx, &label) 307 if err != nil { 308 fail("Failed to add label.", err) 309 return 310 } 311 312 err = db.SubscribeLabel(tx, &models.RepoLabel{ 313 RepoAt: f.RepoAt(), 314 LabelAt: label.AtUri(), 315 RepoDid: f.RepoDid, 316 }) 317 318 err = tx.Commit() 319 if err != nil { 320 fail("Failed to add label.", err) 321 return 322 } 323 324 // clear aturi when everything is successful 325 aturi = "" 326 327 rp.pages.HxRefresh(w) 328} 329 330func (rp *Repo) DeleteLabelDef(w http.ResponseWriter, r *http.Request) { 331 user := rp.oauth.GetMultiAccountUser(r) 332 l := rp.logger.With("handler", "DeleteLabel") 333 l = l.With("did", user.Active.Did) 334 335 f, err := rp.repoResolver.Resolve(r) 336 if err != nil { 337 l.Error("failed to get repo and knot", "err", err) 338 return 339 } 340 341 errorId := "label-operation" 342 fail := func(msg string, err error) { 343 l.Error(msg, "err", err) 344 rp.pages.Notice(w, errorId, msg) 345 } 346 347 // get form values 348 labelId := r.FormValue("label-id") 349 350 label, err := db.GetLabelDefinition(rp.db, orm.FilterEq("id", labelId)) 351 if err != nil { 352 fail("Failed to find label definition.", err) 353 return 354 } 355 356 client, err := rp.oauth.AuthorizedClient(r) 357 if err != nil { 358 fail(err.Error(), err) 359 return 360 } 361 362 // delete label record from PDS 363 _, err = comatproto.RepoDeleteRecord(r.Context(), client, &comatproto.RepoDeleteRecord_Input{ 364 Collection: tangled.LabelDefinitionNSID, 365 Repo: label.Did, 366 Rkey: label.Rkey, 367 }) 368 if err != nil { 369 fail("Failed to delete label record from PDS.", err) 370 return 371 } 372 373 // update repo record to remove the label reference 374 newRepo := *f 375 var updated []string 376 removedAt := label.AtUri().String() 377 for _, l := range newRepo.Labels { 378 if l != removedAt { 379 updated = append(updated, l) 380 } 381 } 382 newRepo.Labels = updated 383 repoRecord := newRepo.AsRecord() 384 385 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey) 386 if err != nil { 387 fail("Failed to update labels, no record found on PDS.", err) 388 return 389 } 390 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 391 Collection: tangled.RepoNSID, 392 Repo: newRepo.Did, 393 Rkey: newRepo.Rkey, 394 SwapRecord: ex.Cid, 395 Record: &lexutil.LexiconTypeDecoder{ 396 Val: &repoRecord, 397 }, 398 }) 399 if err != nil { 400 fail("Failed to update repo record.", err) 401 return 402 } 403 404 // transaction for DB changes 405 tx, err := rp.db.BeginTx(r.Context(), nil) 406 if err != nil { 407 fail("Failed to delete label.", err) 408 return 409 } 410 defer tx.Rollback() 411 412 err = db.UnsubscribeLabel( 413 tx, 414 orm.FilterEq("repo_at", f.RepoAt()), 415 orm.FilterEq("label_at", removedAt), 416 ) 417 if err != nil { 418 fail("Failed to unsubscribe label.", err) 419 return 420 } 421 422 err = db.DeleteLabelDefinition(tx, orm.FilterEq("id", label.Id)) 423 if err != nil { 424 fail("Failed to delete label definition.", err) 425 return 426 } 427 428 err = tx.Commit() 429 if err != nil { 430 fail("Failed to delete label.", err) 431 return 432 } 433 434 // everything succeeded 435 rp.pages.HxRefresh(w) 436} 437 438func (rp *Repo) SubscribeLabel(w http.ResponseWriter, r *http.Request) { 439 user := rp.oauth.GetMultiAccountUser(r) 440 l := rp.logger.With("handler", "SubscribeLabel") 441 l = l.With("did", user.Active.Did) 442 443 f, err := rp.repoResolver.Resolve(r) 444 if err != nil { 445 l.Error("failed to get repo and knot", "err", err) 446 return 447 } 448 449 if err := r.ParseForm(); err != nil { 450 l.Error("invalid form", "err", err) 451 return 452 } 453 454 errorId := "default-label-operation" 455 fail := func(msg string, err error) { 456 l.Error(msg, "err", err) 457 rp.pages.Notice(w, errorId, msg) 458 } 459 460 labelAts := r.Form["label"] 461 _, err = db.GetLabelDefinitions(rp.db, orm.FilterIn("at_uri", labelAts)) 462 if err != nil { 463 fail("Failed to subscribe to label.", err) 464 return 465 } 466 467 newRepo := *f 468 newRepo.Labels = append(newRepo.Labels, labelAts...) 469 470 // dedup 471 slices.Sort(newRepo.Labels) 472 newRepo.Labels = slices.Compact(newRepo.Labels) 473 474 repoRecord := newRepo.AsRecord() 475 476 client, err := rp.oauth.AuthorizedClient(r) 477 if err != nil { 478 fail(err.Error(), err) 479 return 480 } 481 482 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, f.Did, f.Rkey) 483 if err != nil { 484 fail("Failed to update labels, no record found on PDS.", err) 485 return 486 } 487 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 488 Collection: tangled.RepoNSID, 489 Repo: newRepo.Did, 490 Rkey: newRepo.Rkey, 491 SwapRecord: ex.Cid, 492 Record: &lexutil.LexiconTypeDecoder{ 493 Val: &repoRecord, 494 }, 495 }) 496 497 tx, err := rp.db.Begin() 498 if err != nil { 499 fail("Failed to subscribe to label.", err) 500 return 501 } 502 defer tx.Rollback() 503 504 for _, l := range labelAts { 505 err = db.SubscribeLabel(tx, &models.RepoLabel{ 506 RepoAt: f.RepoAt(), 507 LabelAt: syntax.ATURI(l), 508 RepoDid: f.RepoDid, 509 }) 510 if err != nil { 511 fail("Failed to subscribe to label.", err) 512 return 513 } 514 } 515 516 if err := tx.Commit(); err != nil { 517 fail("Failed to subscribe to label.", err) 518 return 519 } 520 521 // everything succeeded 522 rp.pages.HxRefresh(w) 523} 524 525func (rp *Repo) UnsubscribeLabel(w http.ResponseWriter, r *http.Request) { 526 user := rp.oauth.GetMultiAccountUser(r) 527 l := rp.logger.With("handler", "UnsubscribeLabel") 528 l = l.With("did", user.Active.Did) 529 530 f, err := rp.repoResolver.Resolve(r) 531 if err != nil { 532 l.Error("failed to get repo and knot", "err", err) 533 return 534 } 535 536 if err := r.ParseForm(); err != nil { 537 l.Error("invalid form", "err", err) 538 return 539 } 540 541 errorId := "default-label-operation" 542 fail := func(msg string, err error) { 543 l.Error(msg, "err", err) 544 rp.pages.Notice(w, errorId, msg) 545 } 546 547 labelAts := r.Form["label"] 548 _, err = db.GetLabelDefinitions(rp.db, orm.FilterIn("at_uri", labelAts)) 549 if err != nil { 550 fail("Failed to unsubscribe to label.", err) 551 return 552 } 553 554 // update repo record to remove the label reference 555 newRepo := *f 556 var updated []string 557 for _, l := range newRepo.Labels { 558 if !slices.Contains(labelAts, l) { 559 updated = append(updated, l) 560 } 561 } 562 newRepo.Labels = updated 563 repoRecord := newRepo.AsRecord() 564 565 client, err := rp.oauth.AuthorizedClient(r) 566 if err != nil { 567 fail(err.Error(), err) 568 return 569 } 570 571 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, f.Did, f.Rkey) 572 if err != nil { 573 fail("Failed to update labels, no record found on PDS.", err) 574 return 575 } 576 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 577 Collection: tangled.RepoNSID, 578 Repo: newRepo.Did, 579 Rkey: newRepo.Rkey, 580 SwapRecord: ex.Cid, 581 Record: &lexutil.LexiconTypeDecoder{ 582 Val: &repoRecord, 583 }, 584 }) 585 586 err = db.UnsubscribeLabel( 587 rp.db, 588 orm.FilterEq("repo_at", f.RepoAt()), 589 orm.FilterIn("label_at", labelAts), 590 ) 591 if err != nil { 592 fail("Failed to unsubscribe label.", err) 593 return 594 } 595 596 // everything succeeded 597 rp.pages.HxRefresh(w) 598} 599 600func (rp *Repo) LabelPanel(w http.ResponseWriter, r *http.Request) { 601 l := rp.logger.With("handler", "LabelPanel") 602 603 f, err := rp.repoResolver.Resolve(r) 604 if err != nil { 605 l.Error("failed to get repo and knot", "err", err) 606 return 607 } 608 609 subjectStr := r.FormValue("subject") 610 subject, err := syntax.ParseATURI(subjectStr) 611 if err != nil { 612 l.Error("failed to get repo and knot", "err", err) 613 return 614 } 615 616 labelDefs, err := db.GetLabelDefinitions( 617 rp.db, 618 orm.FilterIn("at_uri", f.Labels), 619 orm.FilterContains("scope", subject.Collection().String()), 620 ) 621 if err != nil { 622 l.Error("failed to fetch label defs", "err", err) 623 return 624 } 625 626 defs := make(map[string]*models.LabelDefinition) 627 for _, l := range labelDefs { 628 defs[l.AtUri().String()] = &l 629 } 630 631 states, err := db.GetLabels(rp.db, orm.FilterEq("subject", subject)) 632 if err != nil { 633 l.Error("failed to build label state", "err", err) 634 return 635 } 636 state := states[subject] 637 638 user := rp.oauth.GetMultiAccountUser(r) 639 rp.pages.LabelPanel(w, pages.LabelPanelParams{ 640 LoggedInUser: user, 641 RepoInfo: rp.repoResolver.GetRepoInfo(r, user), 642 Defs: defs, 643 Subject: subject.String(), 644 State: state, 645 }) 646} 647 648func (rp *Repo) EditLabelPanel(w http.ResponseWriter, r *http.Request) { 649 l := rp.logger.With("handler", "EditLabelPanel") 650 651 f, err := rp.repoResolver.Resolve(r) 652 if err != nil { 653 l.Error("failed to get repo and knot", "err", err) 654 return 655 } 656 657 subjectStr := r.FormValue("subject") 658 subject, err := syntax.ParseATURI(subjectStr) 659 if err != nil { 660 l.Error("failed to get repo and knot", "err", err) 661 return 662 } 663 664 labelDefs, err := db.GetLabelDefinitions( 665 rp.db, 666 orm.FilterIn("at_uri", f.Labels), 667 orm.FilterContains("scope", subject.Collection().String()), 668 ) 669 if err != nil { 670 l.Error("failed to fetch labels", "err", err) 671 return 672 } 673 674 defs := make(map[string]*models.LabelDefinition) 675 for _, l := range labelDefs { 676 defs[l.AtUri().String()] = &l 677 } 678 679 states, err := db.GetLabels(rp.db, orm.FilterEq("subject", subject)) 680 if err != nil { 681 l.Error("failed to build label state", "err", err) 682 return 683 } 684 state := states[subject] 685 686 user := rp.oauth.GetMultiAccountUser(r) 687 rp.pages.EditLabelPanel(w, pages.EditLabelPanelParams{ 688 LoggedInUser: user, 689 RepoInfo: rp.repoResolver.GetRepoInfo(r, user), 690 Defs: defs, 691 Subject: subject.String(), 692 State: state, 693 }) 694} 695 696func (rp *Repo) AddCollaborator(w http.ResponseWriter, r *http.Request) { 697 user := rp.oauth.GetMultiAccountUser(r) 698 l := rp.logger.With("handler", "AddCollaborator") 699 l = l.With("did", user.Active.Did) 700 701 f, err := rp.repoResolver.Resolve(r) 702 if err != nil { 703 l.Error("failed to get repo and knot", "err", err) 704 return 705 } 706 707 errorId := "add-collaborator-error" 708 fail := func(msg string, err error) { 709 l.Error(msg, "err", err) 710 rp.pages.Notice(w, errorId, msg) 711 } 712 713 collaborator := r.FormValue("collaborator") 714 if collaborator == "" { 715 fail("Invalid form.", nil) 716 return 717 } 718 719 // remove a single leading `@`, to make @handle work with ResolveIdent 720 collaborator = strings.TrimPrefix(collaborator, "@") 721 722 collaboratorIdent, err := rp.idResolver.ResolveIdent(r.Context(), collaborator) 723 if err != nil { 724 fail(fmt.Sprintf("'%s' is not a valid DID/handle.", collaborator), err) 725 return 726 } 727 728 if collaboratorIdent.DID.String() == user.Active.Did { 729 fail("You seem to be adding yourself as a collaborator.", nil) 730 return 731 } 732 l = l.With("collaborator", collaboratorIdent.Handle) 733 l = l.With("knot", f.Knot) 734 735 // announce this relation into the firehose, store into owners' pds 736 client, err := rp.oauth.AuthorizedClient(r) 737 if err != nil { 738 fail("Failed to write to PDS.", err) 739 return 740 } 741 742 // emit a record 743 currentUser := rp.oauth.GetMultiAccountUser(r) 744 rkey := tid.TID() 745 createdAt := time.Now() 746 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 747 Collection: tangled.RepoCollaboratorNSID, 748 Repo: currentUser.Active.Did, 749 Rkey: rkey, 750 Record: &lexutil.LexiconTypeDecoder{ 751 Val: &tangled.RepoCollaborator{ 752 Subject: collaboratorIdent.DID.String(), 753 Repo: string(f.RepoAt()), 754 CreatedAt: createdAt.Format(time.RFC3339), 755 }}, 756 }) 757 // invalid record 758 if err != nil { 759 fail("Failed to write record to PDS.", err) 760 return 761 } 762 763 aturi := resp.Uri 764 l = l.With("at-uri", aturi) 765 l.Info("wrote record to PDS") 766 767 tx, err := rp.db.BeginTx(r.Context(), nil) 768 if err != nil { 769 fail("Failed to add collaborator.", err) 770 return 771 } 772 773 rollback := func() { 774 err1 := tx.Rollback() 775 err2 := rp.enforcer.E.LoadPolicy() 776 err3 := rollbackRecord(context.Background(), aturi, client) 777 778 // ignore txn complete errors, this is okay 779 if errors.Is(err1, sql.ErrTxDone) { 780 err1 = nil 781 } 782 783 if errs := errors.Join(err1, err2, err3); errs != nil { 784 l.Error("failed to rollback changes", "errs", errs) 785 return 786 } 787 } 788 defer rollback() 789 790 err = rp.enforcer.AddCollaborator(collaboratorIdent.DID.String(), f.Knot, f.DidSlashRepo()) 791 if err != nil { 792 fail("Failed to add collaborator permissions.", err) 793 return 794 } 795 796 err = db.AddCollaborator(tx, models.Collaborator{ 797 Did: syntax.DID(currentUser.Active.Did), 798 Rkey: rkey, 799 SubjectDid: collaboratorIdent.DID, 800 RepoAt: f.RepoAt(), 801 Created: createdAt, 802 }) 803 if err != nil { 804 fail("Failed to add collaborator.", err) 805 return 806 } 807 808 err = tx.Commit() 809 if err != nil { 810 fail("Failed to add collaborator.", err) 811 return 812 } 813 814 err = rp.enforcer.E.SavePolicy() 815 if err != nil { 816 fail("Failed to update collaborator permissions.", err) 817 return 818 } 819 820 // clear aturi to when everything is successful 821 aturi = "" 822 823 rp.pages.HxRefresh(w) 824} 825 826func (rp *Repo) DeleteRepo(w http.ResponseWriter, r *http.Request) { 827 user := rp.oauth.GetMultiAccountUser(r) 828 l := rp.logger.With("handler", "DeleteRepo") 829 830 noticeId := "operation-error" 831 f, err := rp.repoResolver.Resolve(r) 832 if err != nil { 833 l.Error("failed to get repo and knot", "err", err) 834 return 835 } 836 837 // remove record from pds 838 atpClient, err := rp.oauth.AuthorizedClient(r) 839 if err != nil { 840 l.Error("failed to get authorized client", "err", err) 841 return 842 } 843 _, err = comatproto.RepoDeleteRecord(r.Context(), atpClient, &comatproto.RepoDeleteRecord_Input{ 844 Collection: tangled.RepoNSID, 845 Repo: user.Active.Did, 846 Rkey: f.Rkey, 847 }) 848 if err != nil { 849 l.Error("failed to delete record", "err", err) 850 rp.pages.Notice(w, noticeId, "Failed to delete repository from PDS.") 851 return 852 } 853 l.Info("removed repo record", "aturi", f.RepoAt().String()) 854 855 client, err := rp.oauth.ServiceClient( 856 r, 857 oauth.WithService(f.Knot), 858 oauth.WithLxm(tangled.RepoDeleteNSID), 859 oauth.WithDev(rp.config.Core.Dev), 860 ) 861 if err != nil { 862 l.Error("failed to connect to knot server", "err", err) 863 return 864 } 865 866 err = tangled.RepoDelete( 867 r.Context(), 868 client, 869 &tangled.RepoDelete_Input{ 870 Did: f.Did, 871 Name: f.Name, 872 Rkey: f.Rkey, 873 }, 874 ) 875 if err := xrpcclient.HandleXrpcErr(err); err != nil { 876 rp.pages.Notice(w, noticeId, err.Error()) 877 return 878 } 879 l.Info("deleted repo from knot") 880 881 tx, err := rp.db.BeginTx(r.Context(), nil) 882 if err != nil { 883 l.Error("failed to start tx") 884 w.Write(fmt.Append(nil, "failed to add collaborator: ", err)) 885 return 886 } 887 defer func() { 888 tx.Rollback() 889 err = rp.enforcer.E.LoadPolicy() 890 if err != nil { 891 l.Error("failed to rollback policies") 892 } 893 }() 894 895 // remove collaborator RBAC 896 repoCollaborators, err := rp.enforcer.E.GetImplicitUsersForResourceByDomain(f.DidSlashRepo(), f.Knot) 897 if err != nil { 898 rp.pages.Notice(w, noticeId, "Failed to remove collaborators") 899 return 900 } 901 for _, c := range repoCollaborators { 902 did := c[0] 903 rp.enforcer.RemoveCollaborator(did, f.Knot, f.DidSlashRepo()) 904 } 905 l.Info("removed collaborators") 906 907 // remove repo RBAC 908 err = rp.enforcer.RemoveRepo(f.Did, f.Knot, f.DidSlashRepo()) 909 if err != nil { 910 rp.pages.Notice(w, noticeId, "Failed to update RBAC rules") 911 return 912 } 913 914 // remove repo from db 915 err = db.RemoveRepo(tx, f.Did, f.Name) 916 if err != nil { 917 rp.pages.Notice(w, noticeId, "Failed to update appview") 918 return 919 } 920 l.Info("removed repo from db") 921 922 err = tx.Commit() 923 if err != nil { 924 l.Error("failed to commit changes", "err", err) 925 http.Error(w, err.Error(), http.StatusInternalServerError) 926 return 927 } 928 929 err = rp.enforcer.E.SavePolicy() 930 if err != nil { 931 l.Error("failed to update ACLs", "err", err) 932 http.Error(w, err.Error(), http.StatusInternalServerError) 933 return 934 } 935 936 rp.pages.HxRedirect(w, fmt.Sprintf("/%s", f.Did)) 937} 938 939func (rp *Repo) SyncRepoFork(w http.ResponseWriter, r *http.Request) { 940 l := rp.logger.With("handler", "SyncRepoFork") 941 942 ref := chi.URLParam(r, "ref") 943 ref, _ = url.PathUnescape(ref) 944 945 user := rp.oauth.GetMultiAccountUser(r) 946 f, err := rp.repoResolver.Resolve(r) 947 if err != nil { 948 l.Error("failed to resolve source repo", "err", err) 949 return 950 } 951 952 switch r.Method { 953 case http.MethodPost: 954 client, err := rp.oauth.ServiceClient( 955 r, 956 oauth.WithService(f.Knot), 957 oauth.WithLxm(tangled.RepoForkSyncNSID), 958 oauth.WithDev(rp.config.Core.Dev), 959 ) 960 if err != nil { 961 rp.pages.Notice(w, "repo", "Failed to connect to knot server.") 962 return 963 } 964 965 if f.Source == "" { 966 rp.pages.Notice(w, "repo", "This repository is not a fork.") 967 return 968 } 969 970 err = tangled.RepoForkSync( 971 r.Context(), 972 client, 973 &tangled.RepoForkSync_Input{ 974 Did: user.Active.Did, 975 Name: f.Name, 976 Source: f.Source, 977 Branch: ref, 978 }, 979 ) 980 if err := xrpcclient.HandleXrpcErr(err); err != nil { 981 rp.pages.Notice(w, "repo", err.Error()) 982 return 983 } 984 985 rp.pages.HxRefresh(w) 986 return 987 } 988} 989 990func (rp *Repo) ForkRepo(w http.ResponseWriter, r *http.Request) { 991 l := rp.logger.With("handler", "ForkRepo") 992 993 user := rp.oauth.GetMultiAccountUser(r) 994 f, err := rp.repoResolver.Resolve(r) 995 if err != nil { 996 l.Error("failed to resolve source repo", "err", err) 997 return 998 } 999 1000 switch r.Method { 1001 case http.MethodGet: 1002 user := rp.oauth.GetMultiAccountUser(r) 1003 knots, err := rp.enforcer.GetKnotsForUser(user.Active.Did) 1004 if err != nil { 1005 rp.pages.Notice(w, "repo", "Invalid user account.") 1006 return 1007 } 1008 1009 rp.pages.ForkRepo(w, pages.ForkRepoParams{ 1010 LoggedInUser: user, 1011 Knots: knots, 1012 RepoInfo: rp.repoResolver.GetRepoInfo(r, user), 1013 }) 1014 1015 case http.MethodPost: 1016 l := rp.logger.With("handler", "ForkRepo") 1017 1018 targetKnot := r.FormValue("knot") 1019 if targetKnot == "" { 1020 rp.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.") 1021 return 1022 } 1023 l = l.With("targetKnot", targetKnot) 1024 1025 ok, err := rp.enforcer.E.Enforce(user.Active.Did, targetKnot, targetKnot, "repo:create") 1026 if err != nil || !ok { 1027 rp.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 1028 return 1029 } 1030 1031 // choose a name for a fork 1032 forkName := r.FormValue("repo_name") 1033 if forkName == "" { 1034 rp.pages.Notice(w, "repo", "Repository name cannot be empty.") 1035 return 1036 } 1037 1038 // this check is *only* to see if the forked repo name already exists 1039 // in the user's account. 1040 existingRepo, err := db.GetRepo( 1041 rp.db, 1042 orm.FilterEq("did", user.Active.Did), 1043 orm.FilterEq("name", forkName), 1044 ) 1045 if err != nil { 1046 if !errors.Is(err, sql.ErrNoRows) { 1047 l.Error("error fetching existing repo from db", "err", err) 1048 rp.pages.Notice(w, "repo", "Failed to fork this repository. Try again later.") 1049 return 1050 } 1051 } else if existingRepo != nil { 1052 // repo with this name already exists 1053 rp.pages.Notice(w, "repo", "A repository with this name already exists.") 1054 return 1055 } 1056 l = l.With("forkName", forkName) 1057 1058 uri := "https" 1059 if rp.config.Core.Dev { 1060 uri = "http" 1061 } 1062 1063 forkSourceUrl := fmt.Sprintf("%s://%s/%s/%s", uri, f.Knot, f.Did, f.Name) 1064 l = l.With("cloneUrl", forkSourceUrl) 1065 1066 sourceAt := f.RepoAt().String() 1067 1068 // create an atproto record for this fork 1069 rkey := tid.TID() 1070 repo := &models.Repo{ 1071 Did: user.Active.Did, 1072 Name: forkName, 1073 Knot: targetKnot, 1074 Rkey: rkey, 1075 Source: sourceAt, 1076 Description: f.Description, 1077 Created: time.Now(), 1078 Labels: rp.config.Label.DefaultLabelDefs, 1079 } 1080 record := repo.AsRecord() 1081 1082 atpClient, err := rp.oauth.AuthorizedClient(r) 1083 if err != nil { 1084 l.Error("failed to create xrpcclient", "err", err) 1085 rp.pages.Notice(w, "repo", "Failed to fork repository.") 1086 return 1087 } 1088 1089 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{ 1090 Collection: tangled.RepoNSID, 1091 Repo: user.Active.Did, 1092 Rkey: rkey, 1093 Record: &lexutil.LexiconTypeDecoder{ 1094 Val: &record, 1095 }, 1096 }) 1097 if err != nil { 1098 l.Error("failed to write to PDS", "err", err) 1099 rp.pages.Notice(w, "repo", "Failed to announce repository creation.") 1100 return 1101 } 1102 1103 aturi := atresp.Uri 1104 l = l.With("aturi", aturi) 1105 l.Info("wrote to PDS") 1106 1107 tx, err := rp.db.BeginTx(r.Context(), nil) 1108 if err != nil { 1109 l.Info("txn failed", "err", err) 1110 rp.pages.Notice(w, "repo", "Failed to save repository information.") 1111 return 1112 } 1113 1114 // The rollback function reverts a few things on failure: 1115 // - the pending txn 1116 // - the ACLs 1117 // - the atproto record created 1118 rollback := func() { 1119 err1 := tx.Rollback() 1120 err2 := rp.enforcer.E.LoadPolicy() 1121 err3 := rollbackRecord(context.Background(), aturi, atpClient) 1122 1123 // ignore txn complete errors, this is okay 1124 if errors.Is(err1, sql.ErrTxDone) { 1125 err1 = nil 1126 } 1127 1128 if errs := errors.Join(err1, err2, err3); errs != nil { 1129 l.Error("failed to rollback changes", "errs", errs) 1130 return 1131 } 1132 } 1133 defer rollback() 1134 1135 // TODO: this could coordinate better with the knot to recieve a clone status 1136 client, err := rp.oauth.ServiceClient( 1137 r, 1138 oauth.WithService(targetKnot), 1139 oauth.WithLxm(tangled.RepoCreateNSID), 1140 oauth.WithDev(rp.config.Core.Dev), 1141 oauth.WithTimeout(time.Second*20), // big repos take time to clone 1142 ) 1143 if err != nil { 1144 l.Error("could not create service client", "err", err) 1145 rp.pages.Notice(w, "repo", "Failed to connect to knot server.") 1146 return 1147 } 1148 1149 err = tangled.RepoCreate( 1150 r.Context(), 1151 client, 1152 &tangled.RepoCreate_Input{ 1153 Rkey: rkey, 1154 Source: &forkSourceUrl, 1155 }, 1156 ) 1157 if err := xrpcclient.HandleXrpcErr(err); err != nil { 1158 rp.pages.Notice(w, "repo", err.Error()) 1159 return 1160 } 1161 1162 err = db.AddRepo(tx, repo) 1163 if err != nil { 1164 l.Error("failed to AddRepo", "err", err) 1165 rp.pages.Notice(w, "repo", "Failed to save repository information.") 1166 return 1167 } 1168 1169 // acls 1170 p, _ := securejoin.SecureJoin(user.Active.Did, forkName) 1171 err = rp.enforcer.AddRepo(user.Active.Did, targetKnot, p) 1172 if err != nil { 1173 l.Error("failed to add ACLs", "err", err) 1174 rp.pages.Notice(w, "repo", "Failed to set up repository permissions.") 1175 return 1176 } 1177 1178 err = tx.Commit() 1179 if err != nil { 1180 l.Error("failed to commit changes", "err", err) 1181 http.Error(w, err.Error(), http.StatusInternalServerError) 1182 return 1183 } 1184 1185 err = rp.enforcer.E.SavePolicy() 1186 if err != nil { 1187 l.Error("failed to update ACLs", "err", err) 1188 http.Error(w, err.Error(), http.StatusInternalServerError) 1189 return 1190 } 1191 1192 // reset the ATURI because the transaction completed successfully 1193 aturi = "" 1194 1195 rp.notifier.NewRepo(r.Context(), repo) 1196 rp.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Active.Did, forkName)) 1197 } 1198} 1199 1200// this is used to rollback changes made to the PDS 1201// 1202// it is a no-op if the provided ATURI is empty 1203func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error { 1204 if aturi == "" { 1205 return nil 1206 } 1207 1208 parsed := syntax.ATURI(aturi) 1209 1210 collection := parsed.Collection().String() 1211 repo := parsed.Authority().String() 1212 rkey := parsed.RecordKey().String() 1213 1214 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{ 1215 Collection: collection, 1216 Repo: repo, 1217 Rkey: rkey, 1218 }) 1219 return err 1220}