Monorepo for Tangled
at b0d4690d3c26f8b272a5ad299fea52b84d3df4fa 657 lines 17 kB view raw
1package state 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "strings" 11 "time" 12 13 "tangled.org/core/api/tangled" 14 "tangled.org/core/appview" 15 "tangled.org/core/appview/config" 16 "tangled.org/core/appview/db" 17 "tangled.org/core/appview/indexer" 18 "tangled.org/core/appview/mentions" 19 "tangled.org/core/appview/models" 20 "tangled.org/core/appview/notify" 21 dbnotify "tangled.org/core/appview/notify/db" 22 phnotify "tangled.org/core/appview/notify/posthog" 23 "tangled.org/core/appview/oauth" 24 "tangled.org/core/appview/pages" 25 "tangled.org/core/appview/reporesolver" 26 "tangled.org/core/appview/validator" 27 xrpcclient "tangled.org/core/appview/xrpcclient" 28 "tangled.org/core/eventconsumer" 29 "tangled.org/core/idresolver" 30 "tangled.org/core/jetstream" 31 "tangled.org/core/log" 32 tlog "tangled.org/core/log" 33 "tangled.org/core/orm" 34 "tangled.org/core/rbac" 35 "tangled.org/core/tid" 36 37 comatproto "github.com/bluesky-social/indigo/api/atproto" 38 atpclient "github.com/bluesky-social/indigo/atproto/client" 39 "github.com/bluesky-social/indigo/atproto/syntax" 40 lexutil "github.com/bluesky-social/indigo/lex/util" 41 securejoin "github.com/cyphar/filepath-securejoin" 42 "github.com/go-chi/chi/v5" 43 "github.com/posthog/posthog-go" 44) 45 46type State struct { 47 db *db.DB 48 notifier notify.Notifier 49 indexer *indexer.Indexer 50 oauth *oauth.OAuth 51 enforcer *rbac.Enforcer 52 pages *pages.Pages 53 idResolver *idresolver.Resolver 54 mentionsResolver *mentions.Resolver 55 posthog posthog.Client 56 jc *jetstream.JetstreamClient 57 config *config.Config 58 repoResolver *reporesolver.RepoResolver 59 knotstream *eventconsumer.Consumer 60 spindlestream *eventconsumer.Consumer 61 logger *slog.Logger 62 validator *validator.Validator 63} 64 65func Make(ctx context.Context, config *config.Config) (*State, error) { 66 logger := tlog.FromContext(ctx) 67 68 d, err := db.Make(ctx, config.Core.DbPath) 69 if err != nil { 70 return nil, fmt.Errorf("failed to create db: %w", err) 71 } 72 73 indexer := indexer.New(log.SubLogger(logger, "indexer")) 74 err = indexer.Init(ctx, d) 75 if err != nil { 76 return nil, fmt.Errorf("failed to create indexer: %w", err) 77 } 78 79 enforcer, err := rbac.NewEnforcer(config.Core.DbPath) 80 if err != nil { 81 return nil, fmt.Errorf("failed to create enforcer: %w", err) 82 } 83 84 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL) 85 if err != nil { 86 logger.Error("failed to create redis resolver", "err", err) 87 res = idresolver.DefaultResolver(config.Plc.PLCURL) 88 } 89 90 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint}) 91 if err != nil { 92 return nil, fmt.Errorf("failed to create posthog client: %w", err) 93 } 94 95 pages := pages.NewPages(config, res, d, log.SubLogger(logger, "pages")) 96 oauth, err := oauth.New(config, posthog, d, enforcer, res, log.SubLogger(logger, "oauth")) 97 if err != nil { 98 return nil, fmt.Errorf("failed to start oauth handler: %w", err) 99 } 100 validator := validator.New(d, res, enforcer) 101 102 repoResolver := reporesolver.New(config, enforcer, d) 103 104 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver")) 105 106 wrapper := db.DbWrapper{Execer: d} 107 jc, err := jetstream.NewJetstreamClient( 108 config.Jetstream.Endpoint, 109 "appview", 110 []string{ 111 tangled.GraphFollowNSID, 112 tangled.FeedStarNSID, 113 tangled.PublicKeyNSID, 114 tangled.RepoArtifactNSID, 115 tangled.ActorProfileNSID, 116 tangled.SpindleMemberNSID, 117 tangled.SpindleNSID, 118 tangled.StringNSID, 119 tangled.RepoIssueNSID, 120 tangled.RepoIssueCommentNSID, 121 tangled.LabelDefinitionNSID, 122 tangled.LabelOpNSID, 123 }, 124 nil, 125 tlog.SubLogger(logger, "jetstream"), 126 wrapper, 127 false, 128 129 // in-memory filter is inapplicalble to appview so 130 // we'll never log dids anyway. 131 false, 132 ) 133 if err != nil { 134 return nil, fmt.Errorf("failed to create jetstream client: %w", err) 135 } 136 137 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil { 138 return nil, fmt.Errorf("failed to backfill default label defs: %w", err) 139 } 140 141 ingester := appview.Ingester{ 142 Db: wrapper, 143 Enforcer: enforcer, 144 IdResolver: res, 145 Config: config, 146 Logger: log.SubLogger(logger, "ingester"), 147 Validator: validator, 148 } 149 err = jc.StartJetstream(ctx, ingester.Ingest()) 150 if err != nil { 151 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 152 } 153 154 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog) 155 if err != nil { 156 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err) 157 } 158 knotstream.Start(ctx) 159 160 spindlestream, err := Spindlestream(ctx, config, d, enforcer) 161 if err != nil { 162 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err) 163 } 164 spindlestream.Start(ctx) 165 166 var notifiers []notify.Notifier 167 168 // Always add the database notifier 169 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res)) 170 171 // Add other notifiers in production only 172 if !config.Core.Dev { 173 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog)) 174 } 175 notifiers = append(notifiers, indexer) 176 notifier := notify.NewMergedNotifier(notifiers) 177 notifier = notify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify")) 178 179 state := &State{ 180 d, 181 notifier, 182 indexer, 183 oauth, 184 enforcer, 185 pages, 186 res, 187 mentionsResolver, 188 posthog, 189 jc, 190 config, 191 repoResolver, 192 knotstream, 193 spindlestream, 194 logger, 195 validator, 196 } 197 198 return state, nil 199} 200 201func (s *State) Close() error { 202 // other close up logic goes here 203 return s.db.Close() 204} 205 206func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) { 207 w.Header().Set("Content-Type", "text/plain") 208 w.Header().Set("Cache-Control", "public, max-age=86400") // one day 209 210 robotsTxt := `User-agent: * 211Allow: / 212` 213 w.Write([]byte(robotsTxt)) 214} 215 216func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) { 217 user := s.oauth.GetMultiAccountUser(r) 218 s.pages.TermsOfService(w, pages.TermsOfServiceParams{ 219 LoggedInUser: user, 220 }) 221} 222 223func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) { 224 user := s.oauth.GetMultiAccountUser(r) 225 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{ 226 LoggedInUser: user, 227 }) 228} 229 230func (s *State) Brand(w http.ResponseWriter, r *http.Request) { 231 user := s.oauth.GetMultiAccountUser(r) 232 s.pages.Brand(w, pages.BrandParams{ 233 LoggedInUser: user, 234 }) 235} 236 237func (s *State) HomeOrTimeline(w http.ResponseWriter, r *http.Request) { 238 if s.oauth.GetMultiAccountUser(r) != nil { 239 s.Timeline(w, r) 240 return 241 } 242 s.Home(w, r) 243} 244 245func (s *State) Timeline(w http.ResponseWriter, r *http.Request) { 246 user := s.oauth.GetMultiAccountUser(r) 247 248 // TODO: set this flag based on the UI 249 filtered := false 250 251 var userDid string 252 if user != nil && user.Active != nil { 253 userDid = user.Active.Did 254 } 255 timeline, err := db.MakeTimeline(s.db, 50, userDid, filtered) 256 if err != nil { 257 s.logger.Error("failed to make timeline", "err", err) 258 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.") 259 } 260 261 repos, err := db.GetTopStarredReposLastWeek(s.db) 262 if err != nil { 263 s.logger.Error("failed to get top starred repos", "err", err) 264 s.pages.Notice(w, "topstarredrepos", "Unable to load.") 265 return 266 } 267 268 gfiLabel, err := db.GetLabelDefinition(s.db, orm.FilterEq("at_uri", s.config.Label.GoodFirstIssue)) 269 if err != nil { 270 // non-fatal 271 } 272 273 s.pages.Timeline(w, pages.TimelineParams{ 274 LoggedInUser: user, 275 Timeline: timeline, 276 Repos: repos, 277 GfiLabel: gfiLabel, 278 }) 279} 280 281func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) { 282 user := s.oauth.GetMultiAccountUser(r) 283 if user == nil { 284 return 285 } 286 287 l := s.logger.With("handler", "UpgradeBanner") 288 l = l.With("did", user.Active.Did) 289 290 regs, err := db.GetRegistrations( 291 s.db, 292 orm.FilterEq("did", user.Active.Did), 293 orm.FilterEq("needs_upgrade", 1), 294 ) 295 if err != nil { 296 l.Error("non-fatal: failed to get registrations", "err", err) 297 } 298 299 spindles, err := db.GetSpindles( 300 s.db, 301 orm.FilterEq("owner", user.Active.Did), 302 orm.FilterEq("needs_upgrade", 1), 303 ) 304 if err != nil { 305 l.Error("non-fatal: failed to get spindles", "err", err) 306 } 307 308 if regs == nil && spindles == nil { 309 return 310 } 311 312 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{ 313 Registrations: regs, 314 Spindles: spindles, 315 }) 316} 317 318func (s *State) Home(w http.ResponseWriter, r *http.Request) { 319 // TODO: set this flag based on the UI 320 filtered := false 321 322 timeline, err := db.MakeTimeline(s.db, 5, "", filtered) 323 if err != nil { 324 s.logger.Error("failed to make timeline", "err", err) 325 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.") 326 return 327 } 328 329 repos, err := db.GetTopStarredReposLastWeek(s.db) 330 if err != nil { 331 s.logger.Error("failed to get top starred repos", "err", err) 332 s.pages.Notice(w, "topstarredrepos", "Unable to load.") 333 return 334 } 335 336 s.pages.Home(w, pages.TimelineParams{ 337 LoggedInUser: nil, 338 Timeline: timeline, 339 Repos: repos, 340 }) 341} 342 343func (s *State) Keys(w http.ResponseWriter, r *http.Request) { 344 user := chi.URLParam(r, "user") 345 user = strings.TrimPrefix(user, "@") 346 347 if user == "" { 348 w.WriteHeader(http.StatusBadRequest) 349 return 350 } 351 352 id, err := s.idResolver.ResolveIdent(r.Context(), user) 353 if err != nil { 354 w.WriteHeader(http.StatusInternalServerError) 355 return 356 } 357 358 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String()) 359 if err != nil { 360 s.logger.Error("failed to get public keys", "err", err) 361 http.Error(w, "failed to get public keys", http.StatusInternalServerError) 362 return 363 } 364 365 if len(pubKeys) == 0 { 366 w.WriteHeader(http.StatusNoContent) 367 return 368 } 369 370 for _, k := range pubKeys { 371 key := strings.TrimRight(k.Key, "\n") 372 fmt.Fprintln(w, key) 373 } 374} 375 376func validateRepoName(name string) error { 377 // check for path traversal attempts 378 if name == "." || name == ".." || 379 strings.Contains(name, "/") || strings.Contains(name, "\\") { 380 return fmt.Errorf("Repository name contains invalid path characters") 381 } 382 383 // check for sequences that could be used for traversal when normalized 384 if strings.Contains(name, "./") || strings.Contains(name, "../") || 385 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") { 386 return fmt.Errorf("Repository name contains invalid path sequence") 387 } 388 389 // then continue with character validation 390 for _, char := range name { 391 if !((char >= 'a' && char <= 'z') || 392 (char >= 'A' && char <= 'Z') || 393 (char >= '0' && char <= '9') || 394 char == '-' || char == '_' || char == '.') { 395 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores") 396 } 397 } 398 399 // additional check to prevent multiple sequential dots 400 if strings.Contains(name, "..") { 401 return fmt.Errorf("Repository name cannot contain sequential dots") 402 } 403 404 // if all checks pass 405 return nil 406} 407 408func stripGitExt(name string) string { 409 return strings.TrimSuffix(name, ".git") 410} 411 412func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) { 413 switch r.Method { 414 case http.MethodGet: 415 user := s.oauth.GetMultiAccountUser(r) 416 knots, err := s.enforcer.GetKnotsForUser(user.Active.Did) 417 if err != nil { 418 s.pages.Notice(w, "repo", "Invalid user account.") 419 return 420 } 421 422 s.pages.NewRepo(w, pages.NewRepoParams{ 423 LoggedInUser: user, 424 Knots: knots, 425 }) 426 427 case http.MethodPost: 428 l := s.logger.With("handler", "NewRepo") 429 430 user := s.oauth.GetMultiAccountUser(r) 431 l = l.With("did", user.Active.Did) 432 433 // form validation 434 domain := r.FormValue("domain") 435 if domain == "" { 436 s.pages.Notice(w, "repo", "Invalid form submission&mdash;missing knot domain.") 437 return 438 } 439 l = l.With("knot", domain) 440 441 repoName := r.FormValue("name") 442 if repoName == "" { 443 s.pages.Notice(w, "repo", "Repository name cannot be empty.") 444 return 445 } 446 447 if err := validateRepoName(repoName); err != nil { 448 s.pages.Notice(w, "repo", err.Error()) 449 return 450 } 451 repoName = stripGitExt(repoName) 452 l = l.With("repoName", repoName) 453 454 defaultBranch := r.FormValue("branch") 455 if defaultBranch == "" { 456 defaultBranch = "main" 457 } 458 l = l.With("defaultBranch", defaultBranch) 459 460 description := r.FormValue("description") 461 462 // ACL validation 463 ok, err := s.enforcer.E.Enforce(user.Active.Did, domain, domain, "repo:create") 464 if err != nil || !ok { 465 l.Info("unauthorized") 466 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 467 return 468 } 469 470 // Check for existing repos 471 existingRepo, err := db.GetRepo( 472 s.db, 473 orm.FilterEq("did", user.Active.Did), 474 orm.FilterEq("name", repoName), 475 ) 476 if err == nil && existingRepo != nil { 477 l.Info("repo exists") 478 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot)) 479 return 480 } 481 482 // create atproto record for this repo 483 rkey := tid.TID() 484 repo := &models.Repo{ 485 Did: user.Active.Did, 486 Name: repoName, 487 Knot: domain, 488 Rkey: rkey, 489 Description: description, 490 Created: time.Now(), 491 Labels: s.config.Label.DefaultLabelDefs, 492 } 493 record := repo.AsRecord() 494 495 atpClient, err := s.oauth.AuthorizedClient(r) 496 if err != nil { 497 l.Info("PDS write failed", "err", err) 498 s.pages.Notice(w, "repo", "Failed to write record to PDS.") 499 return 500 } 501 502 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{ 503 Collection: tangled.RepoNSID, 504 Repo: user.Active.Did, 505 Rkey: rkey, 506 Record: &lexutil.LexiconTypeDecoder{ 507 Val: &record, 508 }, 509 }) 510 if err != nil { 511 l.Info("PDS write failed", "err", err) 512 s.pages.Notice(w, "repo", "Failed to announce repository creation.") 513 return 514 } 515 516 aturi := atresp.Uri 517 l = l.With("aturi", aturi) 518 l.Info("wrote to PDS") 519 520 tx, err := s.db.BeginTx(r.Context(), nil) 521 if err != nil { 522 l.Info("txn failed", "err", err) 523 s.pages.Notice(w, "repo", "Failed to save repository information.") 524 return 525 } 526 527 // The rollback function reverts a few things on failure: 528 // - the pending txn 529 // - the ACLs 530 // - the atproto record created 531 rollback := func() { 532 err1 := tx.Rollback() 533 err2 := s.enforcer.E.LoadPolicy() 534 err3 := rollbackRecord(context.Background(), aturi, atpClient) 535 536 // ignore txn complete errors, this is okay 537 if errors.Is(err1, sql.ErrTxDone) { 538 err1 = nil 539 } 540 541 if errs := errors.Join(err1, err2, err3); errs != nil { 542 l.Error("failed to rollback changes", "errs", errs) 543 return 544 } 545 } 546 defer rollback() 547 548 client, err := s.oauth.ServiceClient( 549 r, 550 oauth.WithService(domain), 551 oauth.WithLxm(tangled.RepoCreateNSID), 552 oauth.WithDev(s.config.Core.Dev), 553 ) 554 if err != nil { 555 l.Error("service auth failed", "err", err) 556 s.pages.Notice(w, "repo", "Failed to reach PDS.") 557 return 558 } 559 560 xe := tangled.RepoCreate( 561 r.Context(), 562 client, 563 &tangled.RepoCreate_Input{ 564 Rkey: rkey, 565 }, 566 ) 567 if err := xrpcclient.HandleXrpcErr(xe); err != nil { 568 l.Error("xrpc error", "xe", xe) 569 s.pages.Notice(w, "repo", err.Error()) 570 return 571 } 572 573 err = db.AddRepo(tx, repo) 574 if err != nil { 575 l.Error("db write failed", "err", err) 576 s.pages.Notice(w, "repo", "Failed to save repository information.") 577 return 578 } 579 580 // acls 581 p, _ := securejoin.SecureJoin(user.Active.Did, repoName) 582 err = s.enforcer.AddRepo(user.Active.Did, domain, p) 583 if err != nil { 584 l.Error("acl setup failed", "err", err) 585 s.pages.Notice(w, "repo", "Failed to set up repository permissions.") 586 return 587 } 588 589 err = tx.Commit() 590 if err != nil { 591 l.Error("txn commit failed", "err", err) 592 http.Error(w, err.Error(), http.StatusInternalServerError) 593 return 594 } 595 596 err = s.enforcer.E.SavePolicy() 597 if err != nil { 598 l.Error("acl save failed", "err", err) 599 http.Error(w, err.Error(), http.StatusInternalServerError) 600 return 601 } 602 603 // reset the ATURI because the transaction completed successfully 604 aturi = "" 605 606 s.notifier.NewRepo(r.Context(), repo) 607 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Active.Did, repoName)) 608 } 609} 610 611// this is used to rollback changes made to the PDS 612// 613// it is a no-op if the provided ATURI is empty 614func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error { 615 if aturi == "" { 616 return nil 617 } 618 619 parsed := syntax.ATURI(aturi) 620 621 collection := parsed.Collection().String() 622 repo := parsed.Authority().String() 623 rkey := parsed.RecordKey().String() 624 625 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{ 626 Collection: collection, 627 Repo: repo, 628 Rkey: rkey, 629 }) 630 return err 631} 632 633func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error { 634 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults)) 635 if err != nil { 636 return err 637 } 638 // already present 639 if len(defaultLabels) == len(defaults) { 640 return nil 641 } 642 643 labelDefs, err := models.FetchLabelDefs(r, defaults) 644 if err != nil { 645 return err 646 } 647 648 // Insert each label definition to the database 649 for _, labelDef := range labelDefs { 650 _, err = db.AddLabelDefinition(e, &labelDef) 651 if err != nil { 652 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err) 653 } 654 } 655 656 return nil 657}