Monorepo for Tangled
at push-zpskmntwpyxz 1291 lines 32 kB view raw
1package repo 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "net/url" 11 "slices" 12 "strings" 13 "time" 14 15 "tangled.org/core/api/tangled" 16 "tangled.org/core/appview/config" 17 "tangled.org/core/appview/db" 18 "tangled.org/core/appview/models" 19 "tangled.org/core/appview/notify" 20 "tangled.org/core/appview/oauth" 21 "tangled.org/core/appview/pages" 22 "tangled.org/core/appview/reporesolver" 23 "tangled.org/core/appview/validator" 24 xrpcclient "tangled.org/core/appview/xrpcclient" 25 "tangled.org/core/eventconsumer" 26 "tangled.org/core/idresolver" 27 "tangled.org/core/orm" 28 "tangled.org/core/rbac" 29 "tangled.org/core/tid" 30 "tangled.org/core/xrpc/serviceauth" 31 32 comatproto "github.com/bluesky-social/indigo/api/atproto" 33 atpclient "github.com/bluesky-social/indigo/atproto/client" 34 "github.com/bluesky-social/indigo/atproto/syntax" 35 lexutil "github.com/bluesky-social/indigo/lex/util" 36 37 "github.com/go-chi/chi/v5" 38) 39 40type Repo struct { 41 repoResolver *reporesolver.RepoResolver 42 idResolver *idresolver.Resolver 43 config *config.Config 44 oauth *oauth.OAuth 45 pages *pages.Pages 46 spindlestream *eventconsumer.Consumer 47 db *db.DB 48 enforcer *rbac.Enforcer 49 notifier notify.Notifier 50 logger *slog.Logger 51 serviceAuth *serviceauth.ServiceAuth 52 validator *validator.Validator 53} 54 55func New( 56 oauth *oauth.OAuth, 57 repoResolver *reporesolver.RepoResolver, 58 pages *pages.Pages, 59 spindlestream *eventconsumer.Consumer, 60 idResolver *idresolver.Resolver, 61 db *db.DB, 62 config *config.Config, 63 notifier notify.Notifier, 64 enforcer *rbac.Enforcer, 65 logger *slog.Logger, 66 validator *validator.Validator, 67) *Repo { 68 return &Repo{oauth: oauth, 69 repoResolver: repoResolver, 70 pages: pages, 71 idResolver: idResolver, 72 config: config, 73 spindlestream: spindlestream, 74 db: db, 75 notifier: notifier, 76 enforcer: enforcer, 77 logger: logger, 78 validator: validator, 79 } 80} 81 82// modify the spindle configured for this repo 83func (rp *Repo) EditSpindle(w http.ResponseWriter, r *http.Request) { 84 user := rp.oauth.GetMultiAccountUser(r) 85 l := rp.logger.With("handler", "EditSpindle") 86 l = l.With("did", user.Active.Did) 87 88 errorId := "operation-error" 89 fail := func(msg string, err error) { 90 l.Error(msg, "err", err) 91 rp.pages.Notice(w, errorId, msg) 92 } 93 94 f, err := rp.repoResolver.Resolve(r) 95 if err != nil { 96 fail("Failed to resolve repo. Try again later", err) 97 return 98 } 99 100 newSpindle := r.FormValue("spindle") 101 removingSpindle := newSpindle == "[[none]]" // see pages/templates/repo/settings/pipelines.html for more info on why we use this value 102 client, err := rp.oauth.AuthorizedClient(r) 103 if err != nil { 104 fail("Failed to authorize. Try again later.", err) 105 return 106 } 107 108 if !removingSpindle { 109 // ensure that this is a valid spindle for this user 110 validSpindles, err := rp.enforcer.GetSpindlesForUser(user.Active.Did) 111 if err != nil { 112 fail("Failed to find spindles. Try again later.", err) 113 return 114 } 115 116 if !slices.Contains(validSpindles, newSpindle) { 117 fail("Failed to configure spindle.", fmt.Errorf("%s is not a valid spindle: %q", newSpindle, validSpindles)) 118 return 119 } 120 } 121 122 newRepo := *f 123 newRepo.Spindle = newSpindle 124 record := newRepo.AsRecord() 125 126 spindlePtr := &newSpindle 127 if removingSpindle { 128 spindlePtr = nil 129 newRepo.Spindle = "" 130 } 131 132 // optimistic update 133 err = db.UpdateSpindle(rp.db, newRepo.RepoAt().String(), spindlePtr) 134 if err != nil { 135 fail("Failed to update spindle. Try again later.", err) 136 return 137 } 138 139 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey) 140 if err != nil { 141 fail("Failed to update spindle, no record found on PDS.", err) 142 return 143 } 144 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 145 Collection: tangled.RepoNSID, 146 Repo: newRepo.Did, 147 Rkey: newRepo.Rkey, 148 SwapRecord: ex.Cid, 149 Record: &lexutil.LexiconTypeDecoder{ 150 Val: &record, 151 }, 152 }) 153 154 if err != nil { 155 fail("Failed to update spindle, unable to save to PDS.", err) 156 return 157 } 158 159 if !removingSpindle { 160 // add this spindle to spindle stream 161 rp.spindlestream.AddSource( 162 context.Background(), 163 eventconsumer.NewSpindleSource(newSpindle), 164 ) 165 } 166 167 rp.pages.HxRefresh(w) 168} 169 170func (rp *Repo) AddLabelDef(w http.ResponseWriter, r *http.Request) { 171 user := rp.oauth.GetMultiAccountUser(r) 172 l := rp.logger.With("handler", "AddLabel") 173 l = l.With("did", user.Active.Did) 174 175 f, err := rp.repoResolver.Resolve(r) 176 if err != nil { 177 l.Error("failed to get repo and knot", "err", err) 178 return 179 } 180 181 errorId := "add-label-error" 182 fail := func(msg string, err error) { 183 l.Error(msg, "err", err) 184 rp.pages.Notice(w, errorId, msg) 185 } 186 187 // get form values for label definition 188 name := r.FormValue("name") 189 concreteType := r.FormValue("valueType") 190 valueFormat := r.FormValue("valueFormat") 191 enumValues := r.FormValue("enumValues") 192 scope := r.Form["scope"] 193 color := r.FormValue("color") 194 multiple := r.FormValue("multiple") == "true" 195 196 var variants []string 197 for part := range strings.SplitSeq(enumValues, ",") { 198 if part = strings.TrimSpace(part); part != "" { 199 variants = append(variants, part) 200 } 201 } 202 203 if concreteType == "" { 204 concreteType = "null" 205 } 206 207 format := models.ValueTypeFormatAny 208 if valueFormat == "did" { 209 format = models.ValueTypeFormatDid 210 } 211 212 valueType := models.ValueType{ 213 Type: models.ConcreteType(concreteType), 214 Format: format, 215 Enum: variants, 216 } 217 218 label := models.LabelDefinition{ 219 Did: user.Active.Did, 220 Rkey: tid.TID(), 221 Name: name, 222 ValueType: valueType, 223 Scope: scope, 224 Color: &color, 225 Multiple: multiple, 226 Created: time.Now(), 227 } 228 if err := rp.validator.ValidateLabelDefinition(&label); err != nil { 229 fail(err.Error(), err) 230 return 231 } 232 233 // announce this relation into the firehose, store into owners' pds 234 client, err := rp.oauth.AuthorizedClient(r) 235 if err != nil { 236 fail(err.Error(), err) 237 return 238 } 239 240 // emit a labelRecord 241 labelRecord := label.AsRecord() 242 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 243 Collection: tangled.LabelDefinitionNSID, 244 Repo: label.Did, 245 Rkey: label.Rkey, 246 Record: &lexutil.LexiconTypeDecoder{ 247 Val: &labelRecord, 248 }, 249 }) 250 // invalid record 251 if err != nil { 252 fail("Failed to write record to PDS.", err) 253 return 254 } 255 256 aturi := resp.Uri 257 l = l.With("at-uri", aturi) 258 l.Info("wrote label record to PDS") 259 260 // update the repo to subscribe to this label 261 newRepo := *f 262 newRepo.Labels = append(newRepo.Labels, aturi) 263 repoRecord := newRepo.AsRecord() 264 265 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey) 266 if err != nil { 267 fail("Failed to update labels, no record found on PDS.", err) 268 return 269 } 270 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 271 Collection: tangled.RepoNSID, 272 Repo: newRepo.Did, 273 Rkey: newRepo.Rkey, 274 SwapRecord: ex.Cid, 275 Record: &lexutil.LexiconTypeDecoder{ 276 Val: &repoRecord, 277 }, 278 }) 279 if err != nil { 280 fail("Failed to update labels for repo.", err) 281 return 282 } 283 284 tx, err := rp.db.BeginTx(r.Context(), nil) 285 if err != nil { 286 fail("Failed to add label.", err) 287 return 288 } 289 290 rollback := func() { 291 err1 := tx.Rollback() 292 err2 := rollbackRecord(context.Background(), aturi, client) 293 294 // ignore txn complete errors, this is okay 295 if errors.Is(err1, sql.ErrTxDone) { 296 err1 = nil 297 } 298 299 if errs := errors.Join(err1, err2); errs != nil { 300 l.Error("failed to rollback changes", "errs", errs) 301 return 302 } 303 } 304 defer rollback() 305 306 _, err = db.AddLabelDefinition(tx, &label) 307 if err != nil { 308 fail("Failed to add label.", err) 309 return 310 } 311 312 if err = db.SubscribeLabel(tx, &models.RepoLabel{ 313 RepoAt: f.RepoAt(), 314 LabelAt: label.AtUri(), 315 RepoDid: f.RepoDid, 316 }); err != nil { 317 fail("Failed to subscribe to label.", err) 318 return 319 } 320 321 err = tx.Commit() 322 if err != nil { 323 fail("Failed to add label.", err) 324 return 325 } 326 327 // clear aturi when everything is successful 328 aturi = "" 329 330 rp.pages.HxRefresh(w) 331} 332 333func (rp *Repo) DeleteLabelDef(w http.ResponseWriter, r *http.Request) { 334 user := rp.oauth.GetMultiAccountUser(r) 335 l := rp.logger.With("handler", "DeleteLabel") 336 l = l.With("did", user.Active.Did) 337 338 f, err := rp.repoResolver.Resolve(r) 339 if err != nil { 340 l.Error("failed to get repo and knot", "err", err) 341 return 342 } 343 344 errorId := "label-operation" 345 fail := func(msg string, err error) { 346 l.Error(msg, "err", err) 347 rp.pages.Notice(w, errorId, msg) 348 } 349 350 // get form values 351 labelId := r.FormValue("label-id") 352 353 label, err := db.GetLabelDefinition(rp.db, orm.FilterEq("id", labelId)) 354 if err != nil { 355 fail("Failed to find label definition.", err) 356 return 357 } 358 359 client, err := rp.oauth.AuthorizedClient(r) 360 if err != nil { 361 fail(err.Error(), err) 362 return 363 } 364 365 // delete label record from PDS 366 _, err = comatproto.RepoDeleteRecord(r.Context(), client, &comatproto.RepoDeleteRecord_Input{ 367 Collection: tangled.LabelDefinitionNSID, 368 Repo: label.Did, 369 Rkey: label.Rkey, 370 }) 371 if err != nil { 372 fail("Failed to delete label record from PDS.", err) 373 return 374 } 375 376 // update repo record to remove the label reference 377 newRepo := *f 378 var updated []string 379 removedAt := label.AtUri().String() 380 for _, l := range newRepo.Labels { 381 if l != removedAt { 382 updated = append(updated, l) 383 } 384 } 385 newRepo.Labels = updated 386 repoRecord := newRepo.AsRecord() 387 388 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey) 389 if err != nil { 390 fail("Failed to update labels, no record found on PDS.", err) 391 return 392 } 393 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 394 Collection: tangled.RepoNSID, 395 Repo: newRepo.Did, 396 Rkey: newRepo.Rkey, 397 SwapRecord: ex.Cid, 398 Record: &lexutil.LexiconTypeDecoder{ 399 Val: &repoRecord, 400 }, 401 }) 402 if err != nil { 403 fail("Failed to update repo record.", err) 404 return 405 } 406 407 // transaction for DB changes 408 tx, err := rp.db.BeginTx(r.Context(), nil) 409 if err != nil { 410 fail("Failed to delete label.", err) 411 return 412 } 413 defer tx.Rollback() 414 415 err = db.UnsubscribeLabel( 416 tx, 417 orm.FilterEq("repo_at", f.RepoAt()), 418 orm.FilterEq("label_at", removedAt), 419 ) 420 if err != nil { 421 fail("Failed to unsubscribe label.", err) 422 return 423 } 424 425 err = db.DeleteLabelDefinition(tx, orm.FilterEq("id", label.Id)) 426 if err != nil { 427 fail("Failed to delete label definition.", err) 428 return 429 } 430 431 err = tx.Commit() 432 if err != nil { 433 fail("Failed to delete label.", err) 434 return 435 } 436 437 // everything succeeded 438 rp.pages.HxRefresh(w) 439} 440 441func (rp *Repo) SubscribeLabel(w http.ResponseWriter, r *http.Request) { 442 user := rp.oauth.GetMultiAccountUser(r) 443 l := rp.logger.With("handler", "SubscribeLabel") 444 l = l.With("did", user.Active.Did) 445 446 f, err := rp.repoResolver.Resolve(r) 447 if err != nil { 448 l.Error("failed to get repo and knot", "err", err) 449 return 450 } 451 452 if err := r.ParseForm(); err != nil { 453 l.Error("invalid form", "err", err) 454 return 455 } 456 457 errorId := "default-label-operation" 458 fail := func(msg string, err error) { 459 l.Error(msg, "err", err) 460 rp.pages.Notice(w, errorId, msg) 461 } 462 463 labelAts := r.Form["label"] 464 _, err = db.GetLabelDefinitions(rp.db, orm.FilterIn("at_uri", labelAts)) 465 if err != nil { 466 fail("Failed to subscribe to label.", err) 467 return 468 } 469 470 newRepo := *f 471 newRepo.Labels = append(newRepo.Labels, labelAts...) 472 473 // dedup 474 slices.Sort(newRepo.Labels) 475 newRepo.Labels = slices.Compact(newRepo.Labels) 476 477 repoRecord := newRepo.AsRecord() 478 479 client, err := rp.oauth.AuthorizedClient(r) 480 if err != nil { 481 fail(err.Error(), err) 482 return 483 } 484 485 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, f.Did, f.Rkey) 486 if err != nil { 487 fail("Failed to update labels, no record found on PDS.", err) 488 return 489 } 490 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 491 Collection: tangled.RepoNSID, 492 Repo: newRepo.Did, 493 Rkey: newRepo.Rkey, 494 SwapRecord: ex.Cid, 495 Record: &lexutil.LexiconTypeDecoder{ 496 Val: &repoRecord, 497 }, 498 }) 499 500 tx, err := rp.db.Begin() 501 if err != nil { 502 fail("Failed to subscribe to label.", err) 503 return 504 } 505 defer tx.Rollback() 506 507 for _, l := range labelAts { 508 err = db.SubscribeLabel(tx, &models.RepoLabel{ 509 RepoAt: f.RepoAt(), 510 LabelAt: syntax.ATURI(l), 511 RepoDid: f.RepoDid, 512 }) 513 if err != nil { 514 fail("Failed to subscribe to label.", err) 515 return 516 } 517 } 518 519 if err := tx.Commit(); err != nil { 520 fail("Failed to subscribe to label.", err) 521 return 522 } 523 524 // everything succeeded 525 rp.pages.HxRefresh(w) 526} 527 528func (rp *Repo) UnsubscribeLabel(w http.ResponseWriter, r *http.Request) { 529 user := rp.oauth.GetMultiAccountUser(r) 530 l := rp.logger.With("handler", "UnsubscribeLabel") 531 l = l.With("did", user.Active.Did) 532 533 f, err := rp.repoResolver.Resolve(r) 534 if err != nil { 535 l.Error("failed to get repo and knot", "err", err) 536 return 537 } 538 539 if err := r.ParseForm(); err != nil { 540 l.Error("invalid form", "err", err) 541 return 542 } 543 544 errorId := "default-label-operation" 545 fail := func(msg string, err error) { 546 l.Error(msg, "err", err) 547 rp.pages.Notice(w, errorId, msg) 548 } 549 550 labelAts := r.Form["label"] 551 _, err = db.GetLabelDefinitions(rp.db, orm.FilterIn("at_uri", labelAts)) 552 if err != nil { 553 fail("Failed to unsubscribe to label.", err) 554 return 555 } 556 557 // update repo record to remove the label reference 558 newRepo := *f 559 var updated []string 560 for _, l := range newRepo.Labels { 561 if !slices.Contains(labelAts, l) { 562 updated = append(updated, l) 563 } 564 } 565 newRepo.Labels = updated 566 repoRecord := newRepo.AsRecord() 567 568 client, err := rp.oauth.AuthorizedClient(r) 569 if err != nil { 570 fail(err.Error(), err) 571 return 572 } 573 574 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, f.Did, f.Rkey) 575 if err != nil { 576 fail("Failed to update labels, no record found on PDS.", err) 577 return 578 } 579 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 580 Collection: tangled.RepoNSID, 581 Repo: newRepo.Did, 582 Rkey: newRepo.Rkey, 583 SwapRecord: ex.Cid, 584 Record: &lexutil.LexiconTypeDecoder{ 585 Val: &repoRecord, 586 }, 587 }) 588 589 err = db.UnsubscribeLabel( 590 rp.db, 591 orm.FilterEq("repo_at", f.RepoAt()), 592 orm.FilterIn("label_at", labelAts), 593 ) 594 if err != nil { 595 fail("Failed to unsubscribe label.", err) 596 return 597 } 598 599 // everything succeeded 600 rp.pages.HxRefresh(w) 601} 602 603func (rp *Repo) LabelPanel(w http.ResponseWriter, r *http.Request) { 604 l := rp.logger.With("handler", "LabelPanel") 605 606 f, err := rp.repoResolver.Resolve(r) 607 if err != nil { 608 l.Error("failed to get repo and knot", "err", err) 609 return 610 } 611 612 subjectStr := r.FormValue("subject") 613 subject, err := syntax.ParseATURI(subjectStr) 614 if err != nil { 615 l.Error("failed to get repo and knot", "err", err) 616 return 617 } 618 619 labelDefs, err := db.GetLabelDefinitions( 620 rp.db, 621 orm.FilterIn("at_uri", f.Labels), 622 orm.FilterContains("scope", subject.Collection().String()), 623 ) 624 if err != nil { 625 l.Error("failed to fetch label defs", "err", err) 626 return 627 } 628 629 defs := make(map[string]*models.LabelDefinition) 630 for _, l := range labelDefs { 631 defs[l.AtUri().String()] = &l 632 } 633 634 states, err := db.GetLabels(rp.db, orm.FilterEq("subject", subject)) 635 if err != nil { 636 l.Error("failed to build label state", "err", err) 637 return 638 } 639 state := states[subject] 640 641 user := rp.oauth.GetMultiAccountUser(r) 642 rp.pages.LabelPanel(w, pages.LabelPanelParams{ 643 LoggedInUser: user, 644 RepoInfo: rp.repoResolver.GetRepoInfo(r, user), 645 Defs: defs, 646 Subject: subject.String(), 647 State: state, 648 }) 649} 650 651func (rp *Repo) EditLabelPanel(w http.ResponseWriter, r *http.Request) { 652 l := rp.logger.With("handler", "EditLabelPanel") 653 654 f, err := rp.repoResolver.Resolve(r) 655 if err != nil { 656 l.Error("failed to get repo and knot", "err", err) 657 return 658 } 659 660 subjectStr := r.FormValue("subject") 661 subject, err := syntax.ParseATURI(subjectStr) 662 if err != nil { 663 l.Error("failed to get repo and knot", "err", err) 664 return 665 } 666 667 labelDefs, err := db.GetLabelDefinitions( 668 rp.db, 669 orm.FilterIn("at_uri", f.Labels), 670 orm.FilterContains("scope", subject.Collection().String()), 671 ) 672 if err != nil { 673 l.Error("failed to fetch labels", "err", err) 674 return 675 } 676 677 defs := make(map[string]*models.LabelDefinition) 678 for _, l := range labelDefs { 679 defs[l.AtUri().String()] = &l 680 } 681 682 states, err := db.GetLabels(rp.db, orm.FilterEq("subject", subject)) 683 if err != nil { 684 l.Error("failed to build label state", "err", err) 685 return 686 } 687 state := states[subject] 688 689 user := rp.oauth.GetMultiAccountUser(r) 690 rp.pages.EditLabelPanel(w, pages.EditLabelPanelParams{ 691 LoggedInUser: user, 692 RepoInfo: rp.repoResolver.GetRepoInfo(r, user), 693 Defs: defs, 694 Subject: subject.String(), 695 State: state, 696 }) 697} 698 699func (rp *Repo) AddCollaborator(w http.ResponseWriter, r *http.Request) { 700 user := rp.oauth.GetMultiAccountUser(r) 701 l := rp.logger.With("handler", "AddCollaborator") 702 l = l.With("did", user.Active.Did) 703 704 f, err := rp.repoResolver.Resolve(r) 705 if err != nil { 706 l.Error("failed to get repo and knot", "err", err) 707 return 708 } 709 710 errorId := "add-collaborator-error" 711 fail := func(msg string, err error) { 712 l.Error(msg, "err", err) 713 rp.pages.Notice(w, errorId, msg) 714 } 715 716 collaborator := r.FormValue("collaborator") 717 if collaborator == "" { 718 fail("Invalid form.", nil) 719 return 720 } 721 722 // remove a single leading `@`, to make @handle work with ResolveIdent 723 collaborator = strings.TrimPrefix(collaborator, "@") 724 725 collaboratorIdent, err := rp.idResolver.ResolveIdent(r.Context(), collaborator) 726 if err != nil { 727 fail(fmt.Sprintf("'%s' is not a valid DID/handle.", collaborator), err) 728 return 729 } 730 731 if collaboratorIdent.DID.String() == user.Active.Did { 732 fail("You seem to be adding yourself as a collaborator.", nil) 733 return 734 } 735 l = l.With("collaborator", collaboratorIdent.Handle) 736 l = l.With("knot", f.Knot) 737 738 // announce this relation into the firehose, store into owners' pds 739 client, err := rp.oauth.AuthorizedClient(r) 740 if err != nil { 741 fail("Failed to write to PDS.", err) 742 return 743 } 744 745 // emit a record 746 currentUser := rp.oauth.GetMultiAccountUser(r) 747 rkey := tid.TID() 748 createdAt := time.Now() 749 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 750 Collection: tangled.RepoCollaboratorNSID, 751 Repo: currentUser.Active.Did, 752 Rkey: rkey, 753 Record: &lexutil.LexiconTypeDecoder{ 754 Val: repoCollaboratorRecord(f, collaboratorIdent.DID.String(), createdAt), 755 }, 756 }) 757 // invalid record 758 if err != nil { 759 fail("Failed to write record to PDS.", err) 760 return 761 } 762 763 aturi := resp.Uri 764 l = l.With("at-uri", aturi) 765 l.Info("wrote record to PDS") 766 767 tx, err := rp.db.BeginTx(r.Context(), nil) 768 if err != nil { 769 fail("Failed to add collaborator.", err) 770 return 771 } 772 773 rollback := func() { 774 err1 := tx.Rollback() 775 err2 := rp.enforcer.E.LoadPolicy() 776 err3 := rollbackRecord(context.Background(), aturi, client) 777 778 // ignore txn complete errors, this is okay 779 if errors.Is(err1, sql.ErrTxDone) { 780 err1 = nil 781 } 782 783 if errs := errors.Join(err1, err2, err3); errs != nil { 784 l.Error("failed to rollback changes", "errs", errs) 785 return 786 } 787 } 788 defer rollback() 789 790 err = rp.enforcer.AddCollaborator(collaboratorIdent.DID.String(), f.Knot, f.RepoIdentifier()) 791 if err != nil { 792 fail("Failed to add collaborator permissions.", err) 793 return 794 } 795 796 err = db.AddCollaborator(tx, models.Collaborator{ 797 Did: syntax.DID(currentUser.Active.Did), 798 Rkey: rkey, 799 SubjectDid: collaboratorIdent.DID, 800 RepoAt: f.RepoAt(), 801 RepoDid: f.RepoDid, 802 Created: createdAt, 803 }) 804 if err != nil { 805 fail("Failed to add collaborator.", err) 806 return 807 } 808 809 err = tx.Commit() 810 if err != nil { 811 fail("Failed to add collaborator.", err) 812 return 813 } 814 815 err = rp.enforcer.E.SavePolicy() 816 if err != nil { 817 fail("Failed to update collaborator permissions.", err) 818 return 819 } 820 821 // clear aturi to when everything is successful 822 aturi = "" 823 824 rp.pages.HxRefresh(w) 825} 826 827func (rp *Repo) DeleteRepo(w http.ResponseWriter, r *http.Request) { 828 user := rp.oauth.GetMultiAccountUser(r) 829 l := rp.logger.With("handler", "DeleteRepo") 830 831 noticeId := "operation-error" 832 f, err := rp.repoResolver.Resolve(r) 833 if err != nil { 834 l.Error("failed to get repo and knot", "err", err) 835 return 836 } 837 838 // remove record from pds 839 atpClient, err := rp.oauth.AuthorizedClient(r) 840 if err != nil { 841 l.Error("failed to get authorized client", "err", err) 842 return 843 } 844 _, err = comatproto.RepoDeleteRecord(r.Context(), atpClient, &comatproto.RepoDeleteRecord_Input{ 845 Collection: tangled.RepoNSID, 846 Repo: user.Active.Did, 847 Rkey: f.Rkey, 848 }) 849 if err != nil { 850 l.Error("failed to delete record", "err", err) 851 rp.pages.Notice(w, noticeId, "Failed to delete repository from PDS.") 852 return 853 } 854 l.Info("removed repo record", "aturi", f.RepoAt().String()) 855 856 client, err := rp.oauth.ServiceClient( 857 r, 858 oauth.WithService(f.Knot), 859 oauth.WithLxm(tangled.RepoDeleteNSID), 860 oauth.WithDev(rp.config.Core.Dev), 861 ) 862 if err != nil { 863 l.Error("failed to connect to knot server", "err", err) 864 return 865 } 866 867 err = tangled.RepoDelete( 868 r.Context(), 869 client, 870 &tangled.RepoDelete_Input{ 871 Did: f.Did, 872 Name: f.Name, 873 Rkey: f.Rkey, 874 }, 875 ) 876 if err := xrpcclient.HandleXrpcErr(err); err != nil { 877 rp.pages.Notice(w, noticeId, err.Error()) 878 return 879 } 880 l.Info("deleted repo from knot") 881 882 tx, err := rp.db.BeginTx(r.Context(), nil) 883 if err != nil { 884 l.Error("failed to start tx") 885 w.Write(fmt.Append(nil, "failed to add collaborator: ", err)) 886 return 887 } 888 defer func() { 889 tx.Rollback() 890 err = rp.enforcer.E.LoadPolicy() 891 if err != nil { 892 l.Error("failed to rollback policies") 893 } 894 }() 895 896 // remove collaborator RBAC 897 repoCollaborators, err := rp.enforcer.E.GetImplicitUsersForResourceByDomain(f.RepoIdentifier(), f.Knot) 898 if err != nil { 899 rp.pages.Notice(w, noticeId, "Failed to remove collaborators") 900 return 901 } 902 for _, c := range repoCollaborators { 903 did := c[0] 904 rp.enforcer.RemoveCollaborator(did, f.Knot, f.RepoIdentifier()) 905 } 906 l.Info("removed collaborators") 907 908 // remove repo RBAC 909 err = rp.enforcer.RemoveRepo(f.Did, f.Knot, f.RepoIdentifier()) 910 if err != nil { 911 rp.pages.Notice(w, noticeId, "Failed to update RBAC rules") 912 return 913 } 914 915 // remove repo from db 916 err = db.RemoveRepo(tx, f.Did, f.Name) 917 if err != nil { 918 rp.pages.Notice(w, noticeId, "Failed to update appview") 919 return 920 } 921 l.Info("removed repo from db") 922 923 err = tx.Commit() 924 if err != nil { 925 l.Error("failed to commit changes", "err", err) 926 http.Error(w, err.Error(), http.StatusInternalServerError) 927 return 928 } 929 930 err = rp.enforcer.E.SavePolicy() 931 if err != nil { 932 l.Error("failed to update ACLs", "err", err) 933 http.Error(w, err.Error(), http.StatusInternalServerError) 934 return 935 } 936 937 rp.pages.HxRedirect(w, fmt.Sprintf("/%s", f.Did)) 938} 939 940func (rp *Repo) SyncRepoFork(w http.ResponseWriter, r *http.Request) { 941 l := rp.logger.With("handler", "SyncRepoFork") 942 943 ref := chi.URLParam(r, "ref") 944 ref, _ = url.PathUnescape(ref) 945 946 user := rp.oauth.GetMultiAccountUser(r) 947 f, err := rp.repoResolver.Resolve(r) 948 if err != nil { 949 l.Error("failed to resolve source repo", "err", err) 950 return 951 } 952 953 switch r.Method { 954 case http.MethodPost: 955 client, err := rp.oauth.ServiceClient( 956 r, 957 oauth.WithService(f.Knot), 958 oauth.WithLxm(tangled.RepoForkSyncNSID), 959 oauth.WithDev(rp.config.Core.Dev), 960 ) 961 if err != nil { 962 rp.pages.Notice(w, "repo", "Failed to connect to knot server.") 963 return 964 } 965 966 if f.Source == "" { 967 rp.pages.Notice(w, "repo", "This repository is not a fork.") 968 return 969 } 970 971 err = tangled.RepoForkSync( 972 r.Context(), 973 client, 974 &tangled.RepoForkSync_Input{ 975 Did: user.Active.Did, 976 Name: f.Name, 977 Source: f.Source, 978 Branch: ref, 979 }, 980 ) 981 if err := xrpcclient.HandleXrpcErr(err); err != nil { 982 rp.pages.Notice(w, "repo", err.Error()) 983 return 984 } 985 986 rp.pages.HxRefresh(w) 987 return 988 } 989} 990 991func (rp *Repo) ForkRepo(w http.ResponseWriter, r *http.Request) { 992 l := rp.logger.With("handler", "ForkRepo") 993 994 user := rp.oauth.GetMultiAccountUser(r) 995 f, err := rp.repoResolver.Resolve(r) 996 if err != nil { 997 l.Error("failed to resolve source repo", "err", err) 998 return 999 } 1000 1001 switch r.Method { 1002 case http.MethodGet: 1003 user := rp.oauth.GetMultiAccountUser(r) 1004 knots, err := rp.enforcer.GetKnotsForUser(user.Active.Did) 1005 if err != nil { 1006 rp.pages.Notice(w, "repo", "Invalid user account.") 1007 return 1008 } 1009 1010 rp.pages.ForkRepo(w, pages.ForkRepoParams{ 1011 LoggedInUser: user, 1012 Knots: knots, 1013 RepoInfo: rp.repoResolver.GetRepoInfo(r, user), 1014 }) 1015 1016 case http.MethodPost: 1017 l := rp.logger.With("handler", "ForkRepo") 1018 1019 targetKnot := r.FormValue("knot") 1020 if targetKnot == "" { 1021 rp.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.") 1022 return 1023 } 1024 l = l.With("targetKnot", targetKnot) 1025 1026 ok, err := rp.enforcer.E.Enforce(user.Active.Did, targetKnot, targetKnot, "repo:create") 1027 if err != nil || !ok { 1028 rp.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 1029 return 1030 } 1031 1032 // choose a name for a fork 1033 forkName := r.FormValue("repo_name") 1034 if forkName == "" { 1035 rp.pages.Notice(w, "repo", "Repository name cannot be empty.") 1036 return 1037 } 1038 1039 // this check is *only* to see if the forked repo name already exists 1040 // in the user's account. 1041 existingRepo, err := db.GetRepo( 1042 rp.db, 1043 orm.FilterEq("did", user.Active.Did), 1044 orm.FilterEq("name", forkName), 1045 ) 1046 if err != nil { 1047 if !errors.Is(err, sql.ErrNoRows) { 1048 l.Error("error fetching existing repo from db", "err", err) 1049 rp.pages.Notice(w, "repo", "Failed to fork this repository. Try again later.") 1050 return 1051 } 1052 } else if existingRepo != nil { 1053 // repo with this name already exists 1054 rp.pages.Notice(w, "repo", "A repository with this name already exists.") 1055 return 1056 } 1057 l = l.With("forkName", forkName) 1058 1059 uri := "https" 1060 if rp.config.Core.Dev { 1061 uri = "http" 1062 } 1063 1064 forkSourceUrl := fmt.Sprintf("%s://%s/%s", uri, f.Knot, f.RepoIdentifier()) 1065 l = l.With("cloneUrl", forkSourceUrl) 1066 1067 rkey := tid.TID() 1068 1069 // TODO: this could coordinate better with the knot to recieve a clone status 1070 client, err := rp.oauth.ServiceClient( 1071 r, 1072 oauth.WithService(targetKnot), 1073 oauth.WithLxm(tangled.RepoCreateNSID), 1074 oauth.WithDev(rp.config.Core.Dev), 1075 oauth.WithTimeout(time.Second*20), 1076 ) 1077 if err != nil { 1078 l.Error("could not create service client", "err", err) 1079 rp.pages.Notice(w, "repo", "Failed to connect to knot server.") 1080 return 1081 } 1082 1083 forkInput := &tangled.RepoCreate_Input{ 1084 Rkey: rkey, 1085 Name: forkName, 1086 Source: &forkSourceUrl, 1087 } 1088 if rd := strings.TrimSpace(r.FormValue("repo_did")); rd != "" { 1089 forkInput.RepoDid = &rd 1090 } 1091 1092 createResp, createErr := tangled.RepoCreate( 1093 r.Context(), 1094 client, 1095 forkInput, 1096 ) 1097 if err := xrpcclient.HandleXrpcErr(createErr); err != nil { 1098 rp.pages.Notice(w, "repo", err.Error()) 1099 return 1100 } 1101 1102 var repoDid string 1103 if createResp != nil && createResp.RepoDid != nil { 1104 repoDid = *createResp.RepoDid 1105 } 1106 if repoDid == "" { 1107 l.Error("knot returned empty repo DID for fork") 1108 rp.pages.Notice(w, "repo", "Knot failed to mint a repo DID. The knot may need to be upgraded.") 1109 return 1110 } 1111 1112 forkSource := f.RepoAt().String() 1113 if f.RepoDid != "" { 1114 forkSource = f.RepoDid 1115 } 1116 1117 repo := &models.Repo{ 1118 Did: user.Active.Did, 1119 Name: forkName, 1120 Knot: targetKnot, 1121 Rkey: rkey, 1122 Source: forkSource, 1123 Description: f.Description, 1124 Created: time.Now(), 1125 Labels: rp.config.Label.DefaultLabelDefs, 1126 RepoDid: repoDid, 1127 } 1128 record := repo.AsRecord() 1129 1130 cleanupKnot := func() { 1131 go func() { 1132 delays := []time.Duration{0, 2 * time.Second, 5 * time.Second} 1133 for attempt, delay := range delays { 1134 time.Sleep(delay) 1135 deleteClient, dErr := rp.oauth.ServiceClient( 1136 r, 1137 oauth.WithService(targetKnot), 1138 oauth.WithLxm(tangled.RepoDeleteNSID), 1139 oauth.WithDev(rp.config.Core.Dev), 1140 ) 1141 if dErr != nil { 1142 l.Error("failed to create delete client for knot cleanup", "attempt", attempt+1, "err", dErr) 1143 continue 1144 } 1145 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) 1146 if dErr := tangled.RepoDelete(ctx, deleteClient, &tangled.RepoDelete_Input{ 1147 Did: user.Active.Did, 1148 Name: forkName, 1149 Rkey: rkey, 1150 }); dErr != nil { 1151 cancel() 1152 l.Error("failed to clean up fork on knot after rollback", "attempt", attempt+1, "err", dErr) 1153 continue 1154 } 1155 cancel() 1156 l.Info("successfully cleaned up fork on knot after rollback", "attempt", attempt+1) 1157 return 1158 } 1159 l.Error("exhausted retries for knot cleanup, fork may be orphaned", 1160 "did", user.Active.Did, "fork", forkName, "knot", targetKnot) 1161 }() 1162 } 1163 1164 atpClient, err := rp.oauth.AuthorizedClient(r) 1165 if err != nil { 1166 l.Error("failed to create xrpcclient", "err", err) 1167 cleanupKnot() 1168 rp.pages.Notice(w, "repo", "Failed to fork repository.") 1169 return 1170 } 1171 1172 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{ 1173 Collection: tangled.RepoNSID, 1174 Repo: user.Active.Did, 1175 Rkey: rkey, 1176 Record: &lexutil.LexiconTypeDecoder{ 1177 Val: &record, 1178 }, 1179 }) 1180 if err != nil { 1181 l.Error("failed to write to PDS", "err", err) 1182 cleanupKnot() 1183 rp.pages.Notice(w, "repo", "Failed to announce repository creation.") 1184 return 1185 } 1186 1187 aturi := atresp.Uri 1188 l = l.With("aturi", aturi) 1189 l.Info("wrote to PDS") 1190 1191 tx, err := rp.db.BeginTx(r.Context(), nil) 1192 if err != nil { 1193 l.Info("txn failed", "err", err) 1194 rp.pages.Notice(w, "repo", "Failed to save repository information.") 1195 return 1196 } 1197 1198 rollback := func() { 1199 err1 := tx.Rollback() 1200 err2 := rp.enforcer.E.LoadPolicy() 1201 err3 := rollbackRecord(context.Background(), aturi, atpClient) 1202 1203 if errors.Is(err1, sql.ErrTxDone) { 1204 err1 = nil 1205 } 1206 1207 if errs := errors.Join(err1, err2, err3); errs != nil { 1208 l.Error("failed to rollback changes", "errs", errs) 1209 } 1210 1211 if aturi != "" { 1212 cleanupKnot() 1213 } 1214 } 1215 defer rollback() 1216 1217 err = db.AddRepo(tx, repo) 1218 if err != nil { 1219 l.Error("failed to AddRepo", "err", err) 1220 rp.pages.Notice(w, "repo", "Failed to save repository information.") 1221 return 1222 } 1223 1224 rbacPath := repo.RepoIdentifier() 1225 err = rp.enforcer.AddRepo(user.Active.Did, targetKnot, rbacPath) 1226 if err != nil { 1227 l.Error("failed to add ACLs", "err", err) 1228 rp.pages.Notice(w, "repo", "Failed to set up repository permissions.") 1229 return 1230 } 1231 1232 err = tx.Commit() 1233 if err != nil { 1234 l.Error("failed to commit changes", "err", err) 1235 http.Error(w, err.Error(), http.StatusInternalServerError) 1236 return 1237 } 1238 1239 err = rp.enforcer.E.SavePolicy() 1240 if err != nil { 1241 l.Error("failed to update ACLs", "err", err) 1242 http.Error(w, err.Error(), http.StatusInternalServerError) 1243 return 1244 } 1245 1246 aturi = "" 1247 1248 rp.notifier.NewRepo(r.Context(), repo) 1249 if repoDid != "" { 1250 rp.pages.HxLocation(w, fmt.Sprintf("/%s", repoDid)) 1251 } else { 1252 rp.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Active.Did, forkName)) 1253 } 1254 } 1255} 1256 1257// this is used to rollback changes made to the PDS 1258// 1259// it is a no-op if the provided ATURI is empty 1260func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error { 1261 if aturi == "" { 1262 return nil 1263 } 1264 1265 parsed := syntax.ATURI(aturi) 1266 1267 collection := parsed.Collection().String() 1268 repo := parsed.Authority().String() 1269 rkey := parsed.RecordKey().String() 1270 1271 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{ 1272 Collection: collection, 1273 Repo: repo, 1274 Rkey: rkey, 1275 }) 1276 return err 1277} 1278 1279func repoCollaboratorRecord(f *models.Repo, subject string, createdAt time.Time) *tangled.RepoCollaborator { 1280 rec := &tangled.RepoCollaborator{ 1281 Subject: subject, 1282 CreatedAt: createdAt.Format(time.RFC3339), 1283 } 1284 if f.RepoDid != "" { 1285 rec.RepoDid = &f.RepoDid 1286 } else { 1287 s := string(f.RepoAt()) 1288 rec.Repo = &s 1289 } 1290 return rec 1291}