Monorepo for Tangled
at default-knot 682 lines 18 kB view raw
1package state 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "strings" 11 "time" 12 13 "tangled.org/core/api/tangled" 14 "tangled.org/core/appview" 15 "tangled.org/core/appview/config" 16 "tangled.org/core/appview/db" 17 "tangled.org/core/appview/indexer" 18 "tangled.org/core/appview/mentions" 19 "tangled.org/core/appview/models" 20 "tangled.org/core/appview/notify" 21 dbnotify "tangled.org/core/appview/notify/db" 22 phnotify "tangled.org/core/appview/notify/posthog" 23 "tangled.org/core/appview/oauth" 24 "tangled.org/core/appview/pages" 25 "tangled.org/core/appview/reporesolver" 26 "tangled.org/core/appview/validator" 27 xrpcclient "tangled.org/core/appview/xrpcclient" 28 "tangled.org/core/eventconsumer" 29 "tangled.org/core/idresolver" 30 "tangled.org/core/jetstream" 31 "tangled.org/core/log" 32 tlog "tangled.org/core/log" 33 "tangled.org/core/orm" 34 "tangled.org/core/rbac" 35 "tangled.org/core/tid" 36 37 comatproto "github.com/bluesky-social/indigo/api/atproto" 38 atpclient "github.com/bluesky-social/indigo/atproto/client" 39 "github.com/bluesky-social/indigo/atproto/syntax" 40 lexutil "github.com/bluesky-social/indigo/lex/util" 41 securejoin "github.com/cyphar/filepath-securejoin" 42 "github.com/go-chi/chi/v5" 43 "github.com/posthog/posthog-go" 44) 45 46type State struct { 47 db *db.DB 48 notifier notify.Notifier 49 indexer *indexer.Indexer 50 oauth *oauth.OAuth 51 enforcer *rbac.Enforcer 52 pages *pages.Pages 53 idResolver *idresolver.Resolver 54 mentionsResolver *mentions.Resolver 55 posthog posthog.Client 56 jc *jetstream.JetstreamClient 57 config *config.Config 58 repoResolver *reporesolver.RepoResolver 59 knotstream *eventconsumer.Consumer 60 spindlestream *eventconsumer.Consumer 61 logger *slog.Logger 62 validator *validator.Validator 63} 64 65func Make(ctx context.Context, config *config.Config) (*State, error) { 66 logger := tlog.FromContext(ctx) 67 68 d, err := db.Make(ctx, config.Core.DbPath) 69 if err != nil { 70 return nil, fmt.Errorf("failed to create db: %w", err) 71 } 72 73 indexer := indexer.New(log.SubLogger(logger, "indexer")) 74 err = indexer.Init(ctx, d) 75 if err != nil { 76 return nil, fmt.Errorf("failed to create indexer: %w", err) 77 } 78 79 enforcer, err := rbac.NewEnforcer(config.Core.DbPath) 80 if err != nil { 81 return nil, fmt.Errorf("failed to create enforcer: %w", err) 82 } 83 84 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL) 85 if err != nil { 86 logger.Error("failed to create redis resolver", "err", err) 87 res = idresolver.DefaultResolver(config.Plc.PLCURL) 88 } 89 90 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint}) 91 if err != nil { 92 return nil, fmt.Errorf("failed to create posthog client: %w", err) 93 } 94 95 pages := pages.NewPages(config, res, d, log.SubLogger(logger, "pages")) 96 oauth, err := oauth.New(config, posthog, d, enforcer, res, log.SubLogger(logger, "oauth")) 97 if err != nil { 98 return nil, fmt.Errorf("failed to start oauth handler: %w", err) 99 } 100 validator := validator.New(d, res, enforcer) 101 102 repoResolver := reporesolver.New(config, enforcer, d) 103 104 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver")) 105 106 wrapper := db.DbWrapper{Execer: d} 107 jc, err := jetstream.NewJetstreamClient( 108 config.Jetstream.Endpoint, 109 "appview", 110 []string{ 111 tangled.GraphFollowNSID, 112 tangled.FeedStarNSID, 113 tangled.PublicKeyNSID, 114 tangled.RepoArtifactNSID, 115 tangled.ActorProfileNSID, 116 tangled.SpindleMemberNSID, 117 tangled.SpindleNSID, 118 tangled.StringNSID, 119 tangled.RepoIssueNSID, 120 tangled.RepoIssueCommentNSID, 121 tangled.LabelDefinitionNSID, 122 tangled.LabelOpNSID, 123 }, 124 nil, 125 tlog.SubLogger(logger, "jetstream"), 126 wrapper, 127 false, 128 129 // in-memory filter is inapplicable to appview so 130 // we'll never log dids anyway. 131 false, 132 ) 133 if err != nil { 134 return nil, fmt.Errorf("failed to create jetstream client: %w", err) 135 } 136 137 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil { 138 return nil, fmt.Errorf("failed to backfill default label defs: %w", err) 139 } 140 141 ingester := appview.Ingester{ 142 Db: wrapper, 143 Enforcer: enforcer, 144 IdResolver: res, 145 Config: config, 146 Logger: log.SubLogger(logger, "ingester"), 147 Validator: validator, 148 } 149 err = jc.StartJetstream(ctx, ingester.Ingest()) 150 if err != nil { 151 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 152 } 153 154 var notifiers []notify.Notifier 155 156 // Always add the database notifier 157 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res)) 158 159 // Add other notifiers in production only 160 if !config.Core.Dev { 161 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog)) 162 } 163 notifiers = append(notifiers, indexer) 164 165 // Add webhook notifier 166 notifiers = append(notifiers, notify.NewWebhookNotifier(d)) 167 168 notifier := notify.NewMergedNotifier(notifiers) 169 notifier = notify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify")) 170 171 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog, notifier) 172 if err != nil { 173 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err) 174 } 175 knotstream.Start(ctx) 176 177 spindlestream, err := Spindlestream(ctx, config, d, enforcer) 178 if err != nil { 179 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err) 180 } 181 spindlestream.Start(ctx) 182 183 state := &State{ 184 d, 185 notifier, 186 indexer, 187 oauth, 188 enforcer, 189 pages, 190 res, 191 mentionsResolver, 192 posthog, 193 jc, 194 config, 195 repoResolver, 196 knotstream, 197 spindlestream, 198 logger, 199 validator, 200 } 201 202 return state, nil 203} 204 205func (s *State) Close() error { 206 // other close up logic goes here 207 return s.db.Close() 208} 209 210func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) { 211 w.Header().Set("Content-Type", "text/plain") 212 w.Header().Set("Cache-Control", "public, max-age=86400") // one day 213 214 robotsTxt := `# Hello, Tanglers! 215User-agent: * 216Allow: / 217Disallow: /*/*/settings 218Disallow: /settings 219Disallow: /*/*/compare 220Disallow: /*/*/fork 221 222Crawl-delay: 1 223` 224 w.Write([]byte(robotsTxt)) 225} 226 227func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) { 228 user := s.oauth.GetMultiAccountUser(r) 229 s.pages.TermsOfService(w, pages.TermsOfServiceParams{ 230 LoggedInUser: user, 231 }) 232} 233 234func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) { 235 user := s.oauth.GetMultiAccountUser(r) 236 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{ 237 LoggedInUser: user, 238 }) 239} 240 241func (s *State) Brand(w http.ResponseWriter, r *http.Request) { 242 user := s.oauth.GetMultiAccountUser(r) 243 s.pages.Brand(w, pages.BrandParams{ 244 LoggedInUser: user, 245 }) 246} 247 248func (s *State) HomeOrTimeline(w http.ResponseWriter, r *http.Request) { 249 if s.oauth.GetMultiAccountUser(r) != nil { 250 s.Timeline(w, r) 251 return 252 } 253 s.Home(w, r) 254} 255 256func (s *State) Timeline(w http.ResponseWriter, r *http.Request) { 257 user := s.oauth.GetMultiAccountUser(r) 258 259 // TODO: set this flag based on the UI 260 filtered := false 261 262 var userDid string 263 if user != nil && user.Active != nil { 264 userDid = user.Active.Did 265 } 266 timeline, err := db.MakeTimeline(s.db, 50, userDid, filtered) 267 if err != nil { 268 s.logger.Error("failed to make timeline", "err", err) 269 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.") 270 } 271 272 repos, err := db.GetTopStarredReposLastWeek(s.db) 273 if err != nil { 274 s.logger.Error("failed to get top starred repos", "err", err) 275 s.pages.Notice(w, "topstarredrepos", "Unable to load.") 276 return 277 } 278 279 gfiLabel, err := db.GetLabelDefinition(s.db, orm.FilterEq("at_uri", s.config.Label.GoodFirstIssue)) 280 if err != nil { 281 // non-fatal 282 } 283 284 s.pages.Timeline(w, pages.TimelineParams{ 285 LoggedInUser: user, 286 Timeline: timeline, 287 Repos: repos, 288 GfiLabel: gfiLabel, 289 }) 290} 291 292func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) { 293 user := s.oauth.GetMultiAccountUser(r) 294 if user == nil { 295 return 296 } 297 298 l := s.logger.With("handler", "UpgradeBanner") 299 l = l.With("did", user.Active.Did) 300 301 regs, err := db.GetRegistrations( 302 s.db, 303 orm.FilterEq("did", user.Active.Did), 304 orm.FilterEq("needs_upgrade", 1), 305 ) 306 if err != nil { 307 l.Error("non-fatal: failed to get registrations", "err", err) 308 } 309 310 spindles, err := db.GetSpindles( 311 s.db, 312 orm.FilterEq("owner", user.Active.Did), 313 orm.FilterEq("needs_upgrade", 1), 314 ) 315 if err != nil { 316 l.Error("non-fatal: failed to get spindles", "err", err) 317 } 318 319 if regs == nil && spindles == nil { 320 return 321 } 322 323 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{ 324 Registrations: regs, 325 Spindles: spindles, 326 }) 327} 328 329func (s *State) Home(w http.ResponseWriter, r *http.Request) { 330 // TODO: set this flag based on the UI 331 filtered := false 332 333 timeline, err := db.MakeTimeline(s.db, 5, "", filtered) 334 if err != nil { 335 s.logger.Error("failed to make timeline", "err", err) 336 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.") 337 return 338 } 339 340 repos, err := db.GetTopStarredReposLastWeek(s.db) 341 if err != nil { 342 s.logger.Error("failed to get top starred repos", "err", err) 343 s.pages.Notice(w, "topstarredrepos", "Unable to load.") 344 return 345 } 346 347 s.pages.Home(w, pages.TimelineParams{ 348 LoggedInUser: nil, 349 Timeline: timeline, 350 Repos: repos, 351 }) 352} 353 354func (s *State) Keys(w http.ResponseWriter, r *http.Request) { 355 user := chi.URLParam(r, "user") 356 user = strings.TrimPrefix(user, "@") 357 358 if user == "" { 359 w.WriteHeader(http.StatusBadRequest) 360 return 361 } 362 363 id, err := s.idResolver.ResolveIdent(r.Context(), user) 364 if err != nil { 365 w.WriteHeader(http.StatusInternalServerError) 366 return 367 } 368 369 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String()) 370 if err != nil { 371 s.logger.Error("failed to get public keys", "err", err) 372 http.Error(w, "failed to get public keys", http.StatusInternalServerError) 373 return 374 } 375 376 if len(pubKeys) == 0 { 377 w.WriteHeader(http.StatusNoContent) 378 return 379 } 380 381 for _, k := range pubKeys { 382 key := strings.TrimRight(k.Key, "\n") 383 fmt.Fprintln(w, key) 384 } 385} 386 387func validateRepoName(name string) error { 388 // check for path traversal attempts 389 if name == "." || name == ".." || 390 strings.Contains(name, "/") || strings.Contains(name, "\\") { 391 return fmt.Errorf("Repository name contains invalid path characters") 392 } 393 394 // check for sequences that could be used for traversal when normalized 395 if strings.Contains(name, "./") || strings.Contains(name, "../") || 396 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") { 397 return fmt.Errorf("Repository name contains invalid path sequence") 398 } 399 400 // then continue with character validation 401 for _, char := range name { 402 if !((char >= 'a' && char <= 'z') || 403 (char >= 'A' && char <= 'Z') || 404 (char >= '0' && char <= '9') || 405 char == '-' || char == '_' || char == '.') { 406 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores") 407 } 408 } 409 410 // additional check to prevent multiple sequential dots 411 if strings.Contains(name, "..") { 412 return fmt.Errorf("Repository name cannot contain sequential dots") 413 } 414 415 // if all checks pass 416 return nil 417} 418 419func stripGitExt(name string) string { 420 return strings.TrimSuffix(name, ".git") 421} 422 423func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) { 424 switch r.Method { 425 case http.MethodGet: 426 user := s.oauth.GetMultiAccountUser(r) 427 knots, err := s.enforcer.GetKnotsForUser(user.Active.Did) 428 if err != nil { 429 s.pages.Notice(w, "repo", "Invalid user account.") 430 return 431 } 432 433 defaultKnot := "" 434 knotPrefence, err := db.GetKnotPreference(s.db, user.Did()) 435 if err != nil { 436 s.logger.Warn("gettings users knot preferences", "error", err) 437 } 438 if knotPrefence != nil { 439 defaultKnot = knotPrefence.DefaultKnot 440 } 441 442 s.pages.NewRepo(w, pages.NewRepoParams{ 443 LoggedInUser: user, 444 Knots: knots, 445 DefaultKnot: defaultKnot, 446 }) 447 448 case http.MethodPost: 449 l := s.logger.With("handler", "NewRepo") 450 451 user := s.oauth.GetMultiAccountUser(r) 452 l = l.With("did", user.Active.Did) 453 454 // form validation 455 domain := r.FormValue("domain") 456 if domain == "" { 457 s.pages.Notice(w, "repo", "Invalid form submission&mdash;missing knot domain.") 458 return 459 } 460 l = l.With("knot", domain) 461 462 repoName := r.FormValue("name") 463 if repoName == "" { 464 s.pages.Notice(w, "repo", "Repository name cannot be empty.") 465 return 466 } 467 468 if err := validateRepoName(repoName); err != nil { 469 s.pages.Notice(w, "repo", err.Error()) 470 return 471 } 472 repoName = stripGitExt(repoName) 473 l = l.With("repoName", repoName) 474 475 defaultBranch := r.FormValue("branch") 476 if defaultBranch == "" { 477 defaultBranch = "main" 478 } 479 l = l.With("defaultBranch", defaultBranch) 480 481 description := r.FormValue("description") 482 if len([]rune(description)) > 140 { 483 s.pages.Notice(w, "repo", "Description must be 140 characters or fewer.") 484 return 485 } 486 487 // ACL validation 488 ok, err := s.enforcer.E.Enforce(user.Active.Did, domain, domain, "repo:create") 489 if err != nil || !ok { 490 l.Info("unauthorized") 491 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 492 return 493 } 494 495 // Check for existing repos 496 existingRepo, err := db.GetRepo( 497 s.db, 498 orm.FilterEq("did", user.Active.Did), 499 orm.FilterEq("name", repoName), 500 ) 501 if err == nil && existingRepo != nil { 502 l.Info("repo exists") 503 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot)) 504 return 505 } 506 507 // create atproto record for this repo 508 rkey := tid.TID() 509 repo := &models.Repo{ 510 Did: user.Active.Did, 511 Name: repoName, 512 Knot: domain, 513 Rkey: rkey, 514 Description: description, 515 Created: time.Now(), 516 Labels: s.config.Label.DefaultLabelDefs, 517 } 518 record := repo.AsRecord() 519 520 atpClient, err := s.oauth.AuthorizedClient(r) 521 if err != nil { 522 l.Info("PDS write failed", "err", err) 523 s.pages.Notice(w, "repo", "Failed to write record to PDS.") 524 return 525 } 526 527 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{ 528 Collection: tangled.RepoNSID, 529 Repo: user.Active.Did, 530 Rkey: rkey, 531 Record: &lexutil.LexiconTypeDecoder{ 532 Val: &record, 533 }, 534 }) 535 if err != nil { 536 l.Info("PDS write failed", "err", err) 537 s.pages.Notice(w, "repo", "Failed to announce repository creation.") 538 return 539 } 540 541 aturi := atresp.Uri 542 l = l.With("aturi", aturi) 543 l.Info("wrote to PDS") 544 545 tx, err := s.db.BeginTx(r.Context(), nil) 546 if err != nil { 547 l.Info("txn failed", "err", err) 548 s.pages.Notice(w, "repo", "Failed to save repository information.") 549 return 550 } 551 552 // The rollback function reverts a few things on failure: 553 // - the pending txn 554 // - the ACLs 555 // - the atproto record created 556 rollback := func() { 557 err1 := tx.Rollback() 558 err2 := s.enforcer.E.LoadPolicy() 559 err3 := rollbackRecord(context.Background(), aturi, atpClient) 560 561 // ignore txn complete errors, this is okay 562 if errors.Is(err1, sql.ErrTxDone) { 563 err1 = nil 564 } 565 566 if errs := errors.Join(err1, err2, err3); errs != nil { 567 l.Error("failed to rollback changes", "errs", errs) 568 return 569 } 570 } 571 defer rollback() 572 573 client, err := s.oauth.ServiceClient( 574 r, 575 oauth.WithService(domain), 576 oauth.WithLxm(tangled.RepoCreateNSID), 577 oauth.WithDev(s.config.Core.Dev), 578 ) 579 if err != nil { 580 l.Error("service auth failed", "err", err) 581 s.pages.Notice(w, "repo", "Failed to reach PDS.") 582 return 583 } 584 585 xe := tangled.RepoCreate( 586 r.Context(), 587 client, 588 &tangled.RepoCreate_Input{ 589 Rkey: rkey, 590 }, 591 ) 592 if err := xrpcclient.HandleXrpcErr(xe); err != nil { 593 l.Error("xrpc error", "xe", xe) 594 s.pages.Notice(w, "repo", err.Error()) 595 return 596 } 597 598 err = db.AddRepo(tx, repo) 599 if err != nil { 600 l.Error("db write failed", "err", err) 601 s.pages.Notice(w, "repo", "Failed to save repository information.") 602 return 603 } 604 605 // acls 606 p, _ := securejoin.SecureJoin(user.Active.Did, repoName) 607 err = s.enforcer.AddRepo(user.Active.Did, domain, p) 608 if err != nil { 609 l.Error("acl setup failed", "err", err) 610 s.pages.Notice(w, "repo", "Failed to set up repository permissions.") 611 return 612 } 613 614 err = tx.Commit() 615 if err != nil { 616 l.Error("txn commit failed", "err", err) 617 http.Error(w, err.Error(), http.StatusInternalServerError) 618 return 619 } 620 621 err = s.enforcer.E.SavePolicy() 622 if err != nil { 623 l.Error("acl save failed", "err", err) 624 http.Error(w, err.Error(), http.StatusInternalServerError) 625 return 626 } 627 628 // reset the ATURI because the transaction completed successfully 629 aturi = "" 630 631 s.notifier.NewRepo(r.Context(), repo) 632 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Active.Did, repoName)) 633 } 634} 635 636// this is used to rollback changes made to the PDS 637// 638// it is a no-op if the provided ATURI is empty 639func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error { 640 if aturi == "" { 641 return nil 642 } 643 644 parsed := syntax.ATURI(aturi) 645 646 collection := parsed.Collection().String() 647 repo := parsed.Authority().String() 648 rkey := parsed.RecordKey().String() 649 650 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{ 651 Collection: collection, 652 Repo: repo, 653 Rkey: rkey, 654 }) 655 return err 656} 657 658func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error { 659 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults)) 660 if err != nil { 661 return err 662 } 663 // already present 664 if len(defaultLabels) == len(defaults) { 665 return nil 666 } 667 668 labelDefs, err := models.FetchLabelDefs(r, defaults) 669 if err != nil { 670 return err 671 } 672 673 // Insert each label definition to the database 674 for _, labelDef := range labelDefs { 675 _, err = db.AddLabelDefinition(e, &labelDef) 676 if err != nil { 677 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err) 678 } 679 } 680 681 return nil 682}