Monorepo for Tangled
at eb3e271982e4e6c4133a0fa35e0a208b78b22875 664 lines 17 kB view raw
1package state 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "strings" 11 "time" 12 13 "tangled.org/core/api/tangled" 14 "tangled.org/core/appview" 15 "tangled.org/core/appview/config" 16 "tangled.org/core/appview/db" 17 "tangled.org/core/appview/indexer" 18 "tangled.org/core/appview/mentions" 19 "tangled.org/core/appview/models" 20 "tangled.org/core/appview/notify" 21 dbnotify "tangled.org/core/appview/notify/db" 22 phnotify "tangled.org/core/appview/notify/posthog" 23 "tangled.org/core/appview/oauth" 24 "tangled.org/core/appview/pages" 25 "tangled.org/core/appview/reporesolver" 26 "tangled.org/core/appview/validator" 27 xrpcclient "tangled.org/core/appview/xrpcclient" 28 "tangled.org/core/eventconsumer" 29 "tangled.org/core/idresolver" 30 "tangled.org/core/jetstream" 31 "tangled.org/core/log" 32 tlog "tangled.org/core/log" 33 "tangled.org/core/orm" 34 "tangled.org/core/rbac" 35 "tangled.org/core/tid" 36 37 comatproto "github.com/bluesky-social/indigo/api/atproto" 38 atpclient "github.com/bluesky-social/indigo/atproto/client" 39 "github.com/bluesky-social/indigo/atproto/syntax" 40 lexutil "github.com/bluesky-social/indigo/lex/util" 41 securejoin "github.com/cyphar/filepath-securejoin" 42 "github.com/go-chi/chi/v5" 43 "github.com/posthog/posthog-go" 44) 45 46type State struct { 47 db *db.DB 48 notifier notify.Notifier 49 indexer *indexer.Indexer 50 oauth *oauth.OAuth 51 enforcer *rbac.Enforcer 52 pages *pages.Pages 53 idResolver *idresolver.Resolver 54 mentionsResolver *mentions.Resolver 55 posthog posthog.Client 56 jc *jetstream.JetstreamClient 57 config *config.Config 58 repoResolver *reporesolver.RepoResolver 59 knotstream *eventconsumer.Consumer 60 spindlestream *eventconsumer.Consumer 61 logger *slog.Logger 62 validator *validator.Validator 63} 64 65func Make(ctx context.Context, config *config.Config) (*State, error) { 66 logger := tlog.FromContext(ctx) 67 68 d, err := db.Make(ctx, config.Core.DbPath) 69 if err != nil { 70 return nil, fmt.Errorf("failed to create db: %w", err) 71 } 72 73 indexer := indexer.New(log.SubLogger(logger, "indexer")) 74 err = indexer.Init(ctx, d) 75 if err != nil { 76 return nil, fmt.Errorf("failed to create indexer: %w", err) 77 } 78 79 enforcer, err := rbac.NewEnforcer(config.Core.DbPath) 80 if err != nil { 81 return nil, fmt.Errorf("failed to create enforcer: %w", err) 82 } 83 84 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL) 85 if err != nil { 86 logger.Error("failed to create redis resolver", "err", err) 87 res = idresolver.DefaultResolver(config.Plc.PLCURL) 88 } 89 90 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint}) 91 if err != nil { 92 return nil, fmt.Errorf("failed to create posthog client: %w", err) 93 } 94 95 pages := pages.NewPages(config, res, d, log.SubLogger(logger, "pages")) 96 oauth, err := oauth.New(config, posthog, d, enforcer, res, log.SubLogger(logger, "oauth")) 97 if err != nil { 98 return nil, fmt.Errorf("failed to start oauth handler: %w", err) 99 } 100 validator := validator.New(d, res, enforcer) 101 102 repoResolver := reporesolver.New(config, enforcer, d) 103 104 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver")) 105 106 wrapper := db.DbWrapper{Execer: d} 107 jc, err := jetstream.NewJetstreamClient( 108 config.Jetstream.Endpoint, 109 "appview", 110 []string{ 111 tangled.GraphFollowNSID, 112 tangled.FeedStarNSID, 113 tangled.PublicKeyNSID, 114 tangled.RepoArtifactNSID, 115 tangled.ActorProfileNSID, 116 tangled.SpindleMemberNSID, 117 tangled.SpindleNSID, 118 tangled.StringNSID, 119 tangled.RepoIssueNSID, 120 tangled.RepoIssueCommentNSID, 121 tangled.LabelDefinitionNSID, 122 tangled.LabelOpNSID, 123 }, 124 nil, 125 tlog.SubLogger(logger, "jetstream"), 126 wrapper, 127 false, 128 129 // in-memory filter is inapplicable to appview so 130 // we'll never log dids anyway. 131 false, 132 ) 133 if err != nil { 134 return nil, fmt.Errorf("failed to create jetstream client: %w", err) 135 } 136 137 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil { 138 return nil, fmt.Errorf("failed to backfill default label defs: %w", err) 139 } 140 141 ingester := appview.Ingester{ 142 Db: wrapper, 143 Enforcer: enforcer, 144 IdResolver: res, 145 Config: config, 146 Logger: log.SubLogger(logger, "ingester"), 147 Validator: validator, 148 } 149 err = jc.StartJetstream(ctx, ingester.Ingest()) 150 if err != nil { 151 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 152 } 153 154 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog) 155 if err != nil { 156 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err) 157 } 158 knotstream.Start(ctx) 159 160 spindlestream, err := Spindlestream(ctx, config, d, enforcer) 161 if err != nil { 162 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err) 163 } 164 spindlestream.Start(ctx) 165 166 var notifiers []notify.Notifier 167 168 // Always add the database notifier 169 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res)) 170 171 // Add other notifiers in production only 172 if !config.Core.Dev { 173 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog)) 174 } 175 notifiers = append(notifiers, indexer) 176 notifier := notify.NewMergedNotifier(notifiers) 177 notifier = notify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify")) 178 179 state := &State{ 180 d, 181 notifier, 182 indexer, 183 oauth, 184 enforcer, 185 pages, 186 res, 187 mentionsResolver, 188 posthog, 189 jc, 190 config, 191 repoResolver, 192 knotstream, 193 spindlestream, 194 logger, 195 validator, 196 } 197 198 return state, nil 199} 200 201func (s *State) Close() error { 202 // other close up logic goes here 203 return s.db.Close() 204} 205 206func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) { 207 w.Header().Set("Content-Type", "text/plain") 208 w.Header().Set("Cache-Control", "public, max-age=86400") // one day 209 210 robotsTxt := `# Hello, Tanglers! 211User-agent: * 212Allow: / 213Disallow: /*/*/settings 214Disallow: /settings 215Disallow: /*/*/compare 216Disallow: /*/*/fork 217 218Crawl-delay: 1 219` 220 w.Write([]byte(robotsTxt)) 221} 222 223func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) { 224 user := s.oauth.GetMultiAccountUser(r) 225 s.pages.TermsOfService(w, pages.TermsOfServiceParams{ 226 LoggedInUser: user, 227 }) 228} 229 230func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) { 231 user := s.oauth.GetMultiAccountUser(r) 232 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{ 233 LoggedInUser: user, 234 }) 235} 236 237func (s *State) Brand(w http.ResponseWriter, r *http.Request) { 238 user := s.oauth.GetMultiAccountUser(r) 239 s.pages.Brand(w, pages.BrandParams{ 240 LoggedInUser: user, 241 }) 242} 243 244func (s *State) HomeOrTimeline(w http.ResponseWriter, r *http.Request) { 245 if s.oauth.GetMultiAccountUser(r) != nil { 246 s.Timeline(w, r) 247 return 248 } 249 s.Home(w, r) 250} 251 252func (s *State) Timeline(w http.ResponseWriter, r *http.Request) { 253 user := s.oauth.GetMultiAccountUser(r) 254 255 // TODO: set this flag based on the UI 256 filtered := false 257 258 var userDid string 259 if user != nil && user.Active != nil { 260 userDid = user.Active.Did 261 } 262 timeline, err := db.MakeTimeline(s.db, 50, userDid, filtered) 263 if err != nil { 264 s.logger.Error("failed to make timeline", "err", err) 265 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.") 266 } 267 268 repos, err := db.GetTopStarredReposLastWeek(s.db) 269 if err != nil { 270 s.logger.Error("failed to get top starred repos", "err", err) 271 s.pages.Notice(w, "topstarredrepos", "Unable to load.") 272 return 273 } 274 275 gfiLabel, err := db.GetLabelDefinition(s.db, orm.FilterEq("at_uri", s.config.Label.GoodFirstIssue)) 276 if err != nil { 277 // non-fatal 278 } 279 280 s.pages.Timeline(w, pages.TimelineParams{ 281 LoggedInUser: user, 282 Timeline: timeline, 283 Repos: repos, 284 GfiLabel: gfiLabel, 285 }) 286} 287 288func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) { 289 user := s.oauth.GetMultiAccountUser(r) 290 if user == nil { 291 return 292 } 293 294 l := s.logger.With("handler", "UpgradeBanner") 295 l = l.With("did", user.Active.Did) 296 297 regs, err := db.GetRegistrations( 298 s.db, 299 orm.FilterEq("did", user.Active.Did), 300 orm.FilterEq("needs_upgrade", 1), 301 ) 302 if err != nil { 303 l.Error("non-fatal: failed to get registrations", "err", err) 304 } 305 306 spindles, err := db.GetSpindles( 307 s.db, 308 orm.FilterEq("owner", user.Active.Did), 309 orm.FilterEq("needs_upgrade", 1), 310 ) 311 if err != nil { 312 l.Error("non-fatal: failed to get spindles", "err", err) 313 } 314 315 if regs == nil && spindles == nil { 316 return 317 } 318 319 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{ 320 Registrations: regs, 321 Spindles: spindles, 322 }) 323} 324 325func (s *State) Home(w http.ResponseWriter, r *http.Request) { 326 // TODO: set this flag based on the UI 327 filtered := false 328 329 timeline, err := db.MakeTimeline(s.db, 5, "", filtered) 330 if err != nil { 331 s.logger.Error("failed to make timeline", "err", err) 332 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.") 333 return 334 } 335 336 repos, err := db.GetTopStarredReposLastWeek(s.db) 337 if err != nil { 338 s.logger.Error("failed to get top starred repos", "err", err) 339 s.pages.Notice(w, "topstarredrepos", "Unable to load.") 340 return 341 } 342 343 s.pages.Home(w, pages.TimelineParams{ 344 LoggedInUser: nil, 345 Timeline: timeline, 346 Repos: repos, 347 }) 348} 349 350func (s *State) Keys(w http.ResponseWriter, r *http.Request) { 351 user := chi.URLParam(r, "user") 352 user = strings.TrimPrefix(user, "@") 353 354 if user == "" { 355 w.WriteHeader(http.StatusBadRequest) 356 return 357 } 358 359 id, err := s.idResolver.ResolveIdent(r.Context(), user) 360 if err != nil { 361 w.WriteHeader(http.StatusInternalServerError) 362 return 363 } 364 365 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String()) 366 if err != nil { 367 s.logger.Error("failed to get public keys", "err", err) 368 http.Error(w, "failed to get public keys", http.StatusInternalServerError) 369 return 370 } 371 372 if len(pubKeys) == 0 { 373 w.WriteHeader(http.StatusNoContent) 374 return 375 } 376 377 for _, k := range pubKeys { 378 key := strings.TrimRight(k.Key, "\n") 379 fmt.Fprintln(w, key) 380 } 381} 382 383func validateRepoName(name string) error { 384 // check for path traversal attempts 385 if name == "." || name == ".." || 386 strings.Contains(name, "/") || strings.Contains(name, "\\") { 387 return fmt.Errorf("Repository name contains invalid path characters") 388 } 389 390 // check for sequences that could be used for traversal when normalized 391 if strings.Contains(name, "./") || strings.Contains(name, "../") || 392 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") { 393 return fmt.Errorf("Repository name contains invalid path sequence") 394 } 395 396 // then continue with character validation 397 for _, char := range name { 398 if !((char >= 'a' && char <= 'z') || 399 (char >= 'A' && char <= 'Z') || 400 (char >= '0' && char <= '9') || 401 char == '-' || char == '_' || char == '.') { 402 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores") 403 } 404 } 405 406 // additional check to prevent multiple sequential dots 407 if strings.Contains(name, "..") { 408 return fmt.Errorf("Repository name cannot contain sequential dots") 409 } 410 411 // if all checks pass 412 return nil 413} 414 415func stripGitExt(name string) string { 416 return strings.TrimSuffix(name, ".git") 417} 418 419func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) { 420 switch r.Method { 421 case http.MethodGet: 422 user := s.oauth.GetMultiAccountUser(r) 423 knots, err := s.enforcer.GetKnotsForUser(user.Active.Did) 424 if err != nil { 425 s.pages.Notice(w, "repo", "Invalid user account.") 426 return 427 } 428 429 s.pages.NewRepo(w, pages.NewRepoParams{ 430 LoggedInUser: user, 431 Knots: knots, 432 }) 433 434 case http.MethodPost: 435 l := s.logger.With("handler", "NewRepo") 436 437 user := s.oauth.GetMultiAccountUser(r) 438 l = l.With("did", user.Active.Did) 439 440 // form validation 441 domain := r.FormValue("domain") 442 if domain == "" { 443 s.pages.Notice(w, "repo", "Invalid form submission&mdash;missing knot domain.") 444 return 445 } 446 l = l.With("knot", domain) 447 448 repoName := r.FormValue("name") 449 if repoName == "" { 450 s.pages.Notice(w, "repo", "Repository name cannot be empty.") 451 return 452 } 453 454 if err := validateRepoName(repoName); err != nil { 455 s.pages.Notice(w, "repo", err.Error()) 456 return 457 } 458 repoName = stripGitExt(repoName) 459 l = l.With("repoName", repoName) 460 461 defaultBranch := r.FormValue("branch") 462 if defaultBranch == "" { 463 defaultBranch = "main" 464 } 465 l = l.With("defaultBranch", defaultBranch) 466 467 description := r.FormValue("description") 468 469 // ACL validation 470 ok, err := s.enforcer.E.Enforce(user.Active.Did, domain, domain, "repo:create") 471 if err != nil || !ok { 472 l.Info("unauthorized") 473 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 474 return 475 } 476 477 // Check for existing repos 478 existingRepo, err := db.GetRepo( 479 s.db, 480 orm.FilterEq("did", user.Active.Did), 481 orm.FilterEq("name", repoName), 482 ) 483 if err == nil && existingRepo != nil { 484 l.Info("repo exists") 485 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot)) 486 return 487 } 488 489 // create atproto record for this repo 490 rkey := tid.TID() 491 repo := &models.Repo{ 492 Did: user.Active.Did, 493 Name: repoName, 494 Knot: domain, 495 Rkey: rkey, 496 Description: description, 497 Created: time.Now(), 498 Labels: s.config.Label.DefaultLabelDefs, 499 } 500 record := repo.AsRecord() 501 502 atpClient, err := s.oauth.AuthorizedClient(r) 503 if err != nil { 504 l.Info("PDS write failed", "err", err) 505 s.pages.Notice(w, "repo", "Failed to write record to PDS.") 506 return 507 } 508 509 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{ 510 Collection: tangled.RepoNSID, 511 Repo: user.Active.Did, 512 Rkey: rkey, 513 Record: &lexutil.LexiconTypeDecoder{ 514 Val: &record, 515 }, 516 }) 517 if err != nil { 518 l.Info("PDS write failed", "err", err) 519 s.pages.Notice(w, "repo", "Failed to announce repository creation.") 520 return 521 } 522 523 aturi := atresp.Uri 524 l = l.With("aturi", aturi) 525 l.Info("wrote to PDS") 526 527 tx, err := s.db.BeginTx(r.Context(), nil) 528 if err != nil { 529 l.Info("txn failed", "err", err) 530 s.pages.Notice(w, "repo", "Failed to save repository information.") 531 return 532 } 533 534 // The rollback function reverts a few things on failure: 535 // - the pending txn 536 // - the ACLs 537 // - the atproto record created 538 rollback := func() { 539 err1 := tx.Rollback() 540 err2 := s.enforcer.E.LoadPolicy() 541 err3 := rollbackRecord(context.Background(), aturi, atpClient) 542 543 // ignore txn complete errors, this is okay 544 if errors.Is(err1, sql.ErrTxDone) { 545 err1 = nil 546 } 547 548 if errs := errors.Join(err1, err2, err3); errs != nil { 549 l.Error("failed to rollback changes", "errs", errs) 550 return 551 } 552 } 553 defer rollback() 554 555 client, err := s.oauth.ServiceClient( 556 r, 557 oauth.WithService(domain), 558 oauth.WithLxm(tangled.RepoCreateNSID), 559 oauth.WithDev(s.config.Core.Dev), 560 ) 561 if err != nil { 562 l.Error("service auth failed", "err", err) 563 s.pages.Notice(w, "repo", "Failed to reach PDS.") 564 return 565 } 566 567 xe := tangled.RepoCreate( 568 r.Context(), 569 client, 570 &tangled.RepoCreate_Input{ 571 Rkey: rkey, 572 }, 573 ) 574 if err := xrpcclient.HandleXrpcErr(xe); err != nil { 575 l.Error("xrpc error", "xe", xe) 576 s.pages.Notice(w, "repo", err.Error()) 577 return 578 } 579 580 err = db.AddRepo(tx, repo) 581 if err != nil { 582 l.Error("db write failed", "err", err) 583 s.pages.Notice(w, "repo", "Failed to save repository information.") 584 return 585 } 586 587 // acls 588 p, _ := securejoin.SecureJoin(user.Active.Did, repoName) 589 err = s.enforcer.AddRepo(user.Active.Did, domain, p) 590 if err != nil { 591 l.Error("acl setup failed", "err", err) 592 s.pages.Notice(w, "repo", "Failed to set up repository permissions.") 593 return 594 } 595 596 err = tx.Commit() 597 if err != nil { 598 l.Error("txn commit failed", "err", err) 599 http.Error(w, err.Error(), http.StatusInternalServerError) 600 return 601 } 602 603 err = s.enforcer.E.SavePolicy() 604 if err != nil { 605 l.Error("acl save failed", "err", err) 606 http.Error(w, err.Error(), http.StatusInternalServerError) 607 return 608 } 609 610 // reset the ATURI because the transaction completed successfully 611 aturi = "" 612 613 s.notifier.NewRepo(r.Context(), repo) 614 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Active.Did, repoName)) 615 } 616} 617 618// this is used to rollback changes made to the PDS 619// 620// it is a no-op if the provided ATURI is empty 621func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error { 622 if aturi == "" { 623 return nil 624 } 625 626 parsed := syntax.ATURI(aturi) 627 628 collection := parsed.Collection().String() 629 repo := parsed.Authority().String() 630 rkey := parsed.RecordKey().String() 631 632 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{ 633 Collection: collection, 634 Repo: repo, 635 Rkey: rkey, 636 }) 637 return err 638} 639 640func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error { 641 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults)) 642 if err != nil { 643 return err 644 } 645 // already present 646 if len(defaultLabels) == len(defaults) { 647 return nil 648 } 649 650 labelDefs, err := models.FetchLabelDefs(r, defaults) 651 if err != nil { 652 return err 653 } 654 655 // Insert each label definition to the database 656 for _, labelDef := range labelDefs { 657 _, err = db.AddLabelDefinition(e, &labelDef) 658 if err != nil { 659 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err) 660 } 661 } 662 663 return nil 664}