Monorepo for Tangled
at 62df028f2b0cc9f1d32515aed04d65473249c734 715 lines 19 kB view raw
1package state 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "strings" 11 "time" 12 13 "tangled.org/core/api/tangled" 14 "tangled.org/core/appview" 15 "tangled.org/core/appview/bsky" 16 "tangled.org/core/appview/config" 17 "tangled.org/core/appview/db" 18 "tangled.org/core/appview/indexer" 19 "tangled.org/core/appview/mentions" 20 "tangled.org/core/appview/models" 21 "tangled.org/core/appview/notify" 22 dbnotify "tangled.org/core/appview/notify/db" 23 phnotify "tangled.org/core/appview/notify/posthog" 24 "tangled.org/core/appview/oauth" 25 "tangled.org/core/appview/pages" 26 "tangled.org/core/appview/reporesolver" 27 "tangled.org/core/appview/validator" 28 xrpcclient "tangled.org/core/appview/xrpcclient" 29 "tangled.org/core/consts" 30 "tangled.org/core/eventconsumer" 31 "tangled.org/core/idresolver" 32 "tangled.org/core/jetstream" 33 "tangled.org/core/log" 34 tlog "tangled.org/core/log" 35 "tangled.org/core/orm" 36 "tangled.org/core/rbac" 37 "tangled.org/core/tid" 38 39 comatproto "github.com/bluesky-social/indigo/api/atproto" 40 atpclient "github.com/bluesky-social/indigo/atproto/client" 41 "github.com/bluesky-social/indigo/atproto/syntax" 42 lexutil "github.com/bluesky-social/indigo/lex/util" 43 "github.com/bluesky-social/indigo/xrpc" 44 45 "github.com/go-chi/chi/v5" 46 "github.com/posthog/posthog-go" 47) 48 49type State struct { 50 db *db.DB 51 notifier notify.Notifier 52 indexer *indexer.Indexer 53 oauth *oauth.OAuth 54 enforcer *rbac.Enforcer 55 pages *pages.Pages 56 idResolver *idresolver.Resolver 57 mentionsResolver *mentions.Resolver 58 posthog posthog.Client 59 jc *jetstream.JetstreamClient 60 config *config.Config 61 repoResolver *reporesolver.RepoResolver 62 knotstream *eventconsumer.Consumer 63 spindlestream *eventconsumer.Consumer 64 logger *slog.Logger 65 validator *validator.Validator 66} 67 68func Make(ctx context.Context, config *config.Config) (*State, error) { 69 logger := tlog.FromContext(ctx) 70 71 d, err := db.Make(ctx, config.Core.DbPath) 72 if err != nil { 73 return nil, fmt.Errorf("failed to create db: %w", err) 74 } 75 76 indexer := indexer.New(log.SubLogger(logger, "indexer")) 77 err = indexer.Init(ctx, d) 78 if err != nil { 79 return nil, fmt.Errorf("failed to create indexer: %w", err) 80 } 81 82 enforcer, err := rbac.NewEnforcer(config.Core.DbPath) 83 if err != nil { 84 return nil, fmt.Errorf("failed to create enforcer: %w", err) 85 } 86 87 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL) 88 if err != nil { 89 logger.Error("failed to create redis resolver", "err", err) 90 res = idresolver.DefaultResolver(config.Plc.PLCURL) 91 } 92 93 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint}) 94 if err != nil { 95 return nil, fmt.Errorf("failed to create posthog client: %w", err) 96 } 97 98 pages := pages.NewPages(config, res, d, log.SubLogger(logger, "pages")) 99 oauth, err := oauth.New(config, posthog, d, enforcer, res, log.SubLogger(logger, "oauth")) 100 if err != nil { 101 return nil, fmt.Errorf("failed to start oauth handler: %w", err) 102 } 103 validator := validator.New(d, res, enforcer) 104 105 repoResolver := reporesolver.New(config, enforcer, d) 106 107 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver")) 108 109 wrapper := db.DbWrapper{Execer: d} 110 jc, err := jetstream.NewJetstreamClient( 111 config.Jetstream.Endpoint, 112 "appview", 113 []string{ 114 tangled.GraphFollowNSID, 115 tangled.FeedStarNSID, 116 tangled.PublicKeyNSID, 117 tangled.RepoArtifactNSID, 118 tangled.ActorProfileNSID, 119 tangled.KnotMemberNSID, 120 tangled.SpindleMemberNSID, 121 tangled.SpindleNSID, 122 tangled.StringNSID, 123 tangled.RepoIssueNSID, 124 tangled.RepoIssueCommentNSID, 125 tangled.LabelDefinitionNSID, 126 tangled.LabelOpNSID, 127 }, 128 nil, 129 tlog.SubLogger(logger, "jetstream"), 130 wrapper, 131 false, 132 133 // in-memory filter is inapplicable to appview so 134 // we'll never log dids anyway. 135 false, 136 ) 137 if err != nil { 138 return nil, fmt.Errorf("failed to create jetstream client: %w", err) 139 } 140 141 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil { 142 return nil, fmt.Errorf("failed to backfill default label defs: %w", err) 143 } 144 145 ingester := appview.Ingester{ 146 Db: wrapper, 147 Enforcer: enforcer, 148 IdResolver: res, 149 Config: config, 150 Logger: log.SubLogger(logger, "ingester"), 151 Validator: validator, 152 } 153 err = jc.StartJetstream(ctx, ingester.Ingest()) 154 if err != nil { 155 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 156 } 157 158 var notifiers []notify.Notifier 159 160 // Always add the database notifier 161 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res)) 162 163 // Add other notifiers in production only 164 if !config.Core.Dev { 165 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog)) 166 } 167 notifiers = append(notifiers, indexer) 168 169 // Add webhook notifier 170 notifiers = append(notifiers, notify.NewWebhookNotifier(d)) 171 172 notifier := notify.NewMergedNotifier(notifiers) 173 notifier = notify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify")) 174 175 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog, notifier) 176 if err != nil { 177 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err) 178 } 179 knotstream.Start(ctx) 180 181 spindlestream, err := Spindlestream(ctx, config, d, enforcer) 182 if err != nil { 183 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err) 184 } 185 spindlestream.Start(ctx) 186 187 state := &State{ 188 d, 189 notifier, 190 indexer, 191 oauth, 192 enforcer, 193 pages, 194 res, 195 mentionsResolver, 196 posthog, 197 jc, 198 config, 199 repoResolver, 200 knotstream, 201 spindlestream, 202 logger, 203 validator, 204 } 205 206 // fetch initial bluesky posts if configured 207 go fetchBskyPosts(ctx, res, config, d, logger) 208 209 return state, nil 210} 211 212func (s *State) Close() error { 213 // other close up logic goes here 214 return s.db.Close() 215} 216 217func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) { 218 w.Header().Set("Content-Type", "text/plain") 219 w.Header().Set("Cache-Control", "public, max-age=86400") // one day 220 221 robotsTxt := `# Hello, Tanglers! 222User-agent: * 223Allow: / 224Disallow: /*/*/settings 225Disallow: /settings 226Disallow: /*/*/compare 227Disallow: /*/*/fork 228 229Crawl-delay: 1 230` 231 w.Write([]byte(robotsTxt)) 232} 233 234func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) { 235 user := s.oauth.GetMultiAccountUser(r) 236 s.pages.TermsOfService(w, pages.TermsOfServiceParams{ 237 LoggedInUser: user, 238 }) 239} 240 241func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) { 242 user := s.oauth.GetMultiAccountUser(r) 243 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{ 244 LoggedInUser: user, 245 }) 246} 247 248func (s *State) Brand(w http.ResponseWriter, r *http.Request) { 249 user := s.oauth.GetMultiAccountUser(r) 250 s.pages.Brand(w, pages.BrandParams{ 251 LoggedInUser: user, 252 }) 253} 254 255func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) { 256 user := s.oauth.GetMultiAccountUser(r) 257 if user == nil { 258 return 259 } 260 261 l := s.logger.With("handler", "UpgradeBanner") 262 l = l.With("did", user.Active.Did) 263 264 regs, err := db.GetRegistrations( 265 s.db, 266 orm.FilterEq("did", user.Active.Did), 267 orm.FilterEq("needs_upgrade", 1), 268 ) 269 if err != nil { 270 l.Error("non-fatal: failed to get registrations", "err", err) 271 } 272 273 spindles, err := db.GetSpindles( 274 s.db, 275 orm.FilterEq("owner", user.Active.Did), 276 orm.FilterEq("needs_upgrade", 1), 277 ) 278 if err != nil { 279 l.Error("non-fatal: failed to get spindles", "err", err) 280 } 281 282 if regs == nil && spindles == nil { 283 return 284 } 285 286 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{ 287 Registrations: regs, 288 Spindles: spindles, 289 }) 290} 291 292func (s *State) Keys(w http.ResponseWriter, r *http.Request) { 293 user := chi.URLParam(r, "user") 294 user = strings.TrimPrefix(user, "@") 295 296 if user == "" { 297 w.WriteHeader(http.StatusBadRequest) 298 return 299 } 300 301 id, err := s.idResolver.ResolveIdent(r.Context(), user) 302 if err != nil { 303 w.WriteHeader(http.StatusInternalServerError) 304 return 305 } 306 307 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String()) 308 if err != nil { 309 s.logger.Error("failed to get public keys", "err", err) 310 http.Error(w, "failed to get public keys", http.StatusInternalServerError) 311 return 312 } 313 314 if len(pubKeys) == 0 { 315 w.WriteHeader(http.StatusNoContent) 316 return 317 } 318 319 for _, k := range pubKeys { 320 key := strings.TrimRight(k.Key, "\n") 321 fmt.Fprintln(w, key) 322 } 323} 324 325func validateRepoName(name string) error { 326 // check for path traversal attempts 327 if name == "." || name == ".." || 328 strings.Contains(name, "/") || strings.Contains(name, "\\") { 329 return fmt.Errorf("Repository name contains invalid path characters") 330 } 331 332 // check for sequences that could be used for traversal when normalized 333 if strings.Contains(name, "./") || strings.Contains(name, "../") || 334 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") { 335 return fmt.Errorf("Repository name contains invalid path sequence") 336 } 337 338 // then continue with character validation 339 for _, char := range name { 340 if !((char >= 'a' && char <= 'z') || 341 (char >= 'A' && char <= 'Z') || 342 (char >= '0' && char <= '9') || 343 char == '-' || char == '_' || char == '.') { 344 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores") 345 } 346 } 347 348 // additional check to prevent multiple sequential dots 349 if strings.Contains(name, "..") { 350 return fmt.Errorf("Repository name cannot contain sequential dots") 351 } 352 353 // if all checks pass 354 return nil 355} 356 357func stripGitExt(name string) string { 358 return strings.TrimSuffix(name, ".git") 359} 360 361func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) { 362 switch r.Method { 363 case http.MethodGet: 364 user := s.oauth.GetMultiAccountUser(r) 365 knots, err := s.enforcer.GetKnotsForUser(user.Active.Did) 366 if err != nil { 367 s.pages.Notice(w, "repo", "Invalid user account.") 368 return 369 } 370 371 s.pages.NewRepo(w, pages.NewRepoParams{ 372 LoggedInUser: user, 373 Knots: knots, 374 }) 375 376 case http.MethodPost: 377 l := s.logger.With("handler", "NewRepo") 378 379 user := s.oauth.GetMultiAccountUser(r) 380 l = l.With("did", user.Active.Did) 381 382 // form validation 383 domain := r.FormValue("domain") 384 if domain == "" { 385 s.pages.Notice(w, "repo", "Invalid form submission&mdash;missing knot domain.") 386 return 387 } 388 l = l.With("knot", domain) 389 390 repoName := r.FormValue("name") 391 if repoName == "" { 392 s.pages.Notice(w, "repo", "Repository name cannot be empty.") 393 return 394 } 395 396 if err := validateRepoName(repoName); err != nil { 397 s.pages.Notice(w, "repo", err.Error()) 398 return 399 } 400 repoName = stripGitExt(repoName) 401 l = l.With("repoName", repoName) 402 403 defaultBranch := r.FormValue("branch") 404 if defaultBranch == "" { 405 defaultBranch = "main" 406 } 407 l = l.With("defaultBranch", defaultBranch) 408 409 description := r.FormValue("description") 410 if len([]rune(description)) > 140 { 411 s.pages.Notice(w, "repo", "Description must be 140 characters or fewer.") 412 return 413 } 414 415 // ACL validation 416 ok, err := s.enforcer.E.Enforce(user.Active.Did, domain, domain, "repo:create") 417 if err != nil || !ok { 418 l.Info("unauthorized") 419 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 420 return 421 } 422 423 // Check for existing repos 424 existingRepo, err := db.GetRepo( 425 s.db, 426 orm.FilterEq("did", user.Active.Did), 427 orm.FilterEq("name", repoName), 428 ) 429 if err == nil && existingRepo != nil { 430 l.Info("repo exists") 431 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot)) 432 return 433 } 434 435 rkey := tid.TID() 436 437 client, err := s.oauth.ServiceClient( 438 r, 439 oauth.WithService(domain), 440 oauth.WithLxm(tangled.RepoCreateNSID), 441 oauth.WithDev(s.config.Core.Dev), 442 ) 443 if err != nil { 444 l.Error("service auth failed", "err", err) 445 s.pages.Notice(w, "repo", "Failed to reach knot server.") 446 return 447 } 448 449 input := &tangled.RepoCreate_Input{ 450 Rkey: rkey, 451 Name: repoName, 452 DefaultBranch: &defaultBranch, 453 } 454 if rd := strings.TrimSpace(r.FormValue("repo_did")); rd != "" { 455 input.RepoDid = &rd 456 } 457 458 createResp, xe := tangled.RepoCreate( 459 r.Context(), 460 client, 461 input, 462 ) 463 if err := xrpcclient.HandleXrpcErr(xe); err != nil { 464 l.Error("xrpc error", "xe", xe) 465 s.pages.Notice(w, "repo", err.Error()) 466 return 467 } 468 469 var repoDid string 470 if createResp != nil && createResp.RepoDid != nil { 471 repoDid = *createResp.RepoDid 472 } 473 if repoDid == "" { 474 l.Error("knot returned empty repo DID") 475 s.pages.Notice(w, "repo", "Knot failed to mint a repo DID. The knot may need to be upgraded.") 476 return 477 } 478 479 repo := &models.Repo{ 480 Did: user.Active.Did, 481 Name: repoName, 482 Knot: domain, 483 Rkey: rkey, 484 Description: description, 485 Created: time.Now(), 486 Labels: s.config.Label.DefaultLabelDefs, 487 RepoDid: repoDid, 488 } 489 record := repo.AsRecord() 490 491 cleanupKnot := func() { 492 go func() { 493 delays := []time.Duration{0, 2 * time.Second, 5 * time.Second} 494 for attempt, delay := range delays { 495 time.Sleep(delay) 496 deleteClient, dErr := s.oauth.ServiceClient( 497 r, 498 oauth.WithService(domain), 499 oauth.WithLxm(tangled.RepoDeleteNSID), 500 oauth.WithDev(s.config.Core.Dev), 501 ) 502 if dErr != nil { 503 l.Error("failed to create delete client for knot cleanup", "attempt", attempt+1, "err", dErr) 504 continue 505 } 506 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) 507 if dErr := tangled.RepoDelete(ctx, deleteClient, &tangled.RepoDelete_Input{ 508 Did: user.Active.Did, 509 Name: repoName, 510 Rkey: rkey, 511 }); dErr != nil { 512 cancel() 513 l.Error("failed to clean up repo on knot after rollback", "attempt", attempt+1, "err", dErr) 514 continue 515 } 516 cancel() 517 l.Info("successfully cleaned up repo on knot after rollback", "attempt", attempt+1) 518 return 519 } 520 l.Error("exhausted retries for knot cleanup, repo may be orphaned", 521 "did", user.Active.Did, "repo", repoName, "knot", domain) 522 }() 523 } 524 525 atpClient, err := s.oauth.AuthorizedClient(r) 526 if err != nil { 527 l.Info("PDS write failed", "err", err) 528 cleanupKnot() 529 s.pages.Notice(w, "repo", "Failed to write record to PDS.") 530 return 531 } 532 533 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{ 534 Collection: tangled.RepoNSID, 535 Repo: user.Active.Did, 536 Rkey: rkey, 537 Record: &lexutil.LexiconTypeDecoder{ 538 Val: &record, 539 }, 540 }) 541 if err != nil { 542 l.Info("PDS write failed", "err", err) 543 cleanupKnot() 544 s.pages.Notice(w, "repo", "Failed to announce repository creation.") 545 return 546 } 547 548 aturi := atresp.Uri 549 l = l.With("aturi", aturi) 550 l.Info("wrote to PDS") 551 552 tx, err := s.db.BeginTx(r.Context(), nil) 553 if err != nil { 554 l.Info("txn failed", "err", err) 555 s.pages.Notice(w, "repo", "Failed to save repository information.") 556 return 557 } 558 559 rollback := func() { 560 err1 := tx.Rollback() 561 err2 := s.enforcer.E.LoadPolicy() 562 err3 := rollbackRecord(context.Background(), aturi, atpClient) 563 564 if errors.Is(err1, sql.ErrTxDone) { 565 err1 = nil 566 } 567 568 if errs := errors.Join(err1, err2, err3); errs != nil { 569 l.Error("failed to rollback changes", "errs", errs) 570 } 571 572 if aturi != "" { 573 cleanupKnot() 574 } 575 } 576 defer rollback() 577 578 err = db.AddRepo(tx, repo) 579 if err != nil { 580 l.Error("db write failed", "err", err) 581 s.pages.Notice(w, "repo", "Failed to save repository information.") 582 return 583 } 584 585 rbacPath := repo.RepoIdentifier() 586 err = s.enforcer.AddRepo(user.Active.Did, domain, rbacPath) 587 if err != nil { 588 l.Error("acl setup failed", "err", err) 589 s.pages.Notice(w, "repo", "Failed to set up repository permissions.") 590 return 591 } 592 593 err = tx.Commit() 594 if err != nil { 595 l.Error("txn commit failed", "err", err) 596 http.Error(w, err.Error(), http.StatusInternalServerError) 597 return 598 } 599 600 err = s.enforcer.E.SavePolicy() 601 if err != nil { 602 l.Error("acl save failed", "err", err) 603 http.Error(w, err.Error(), http.StatusInternalServerError) 604 return 605 } 606 607 aturi = "" 608 609 s.notifier.NewRepo(r.Context(), repo) 610 if repoDid != "" { 611 s.pages.HxLocation(w, fmt.Sprintf("/%s", repoDid)) 612 } else { 613 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Active.Did, repoName)) 614 } 615 } 616} 617 618// this is used to rollback changes made to the PDS 619// 620// it is a no-op if the provided ATURI is empty 621func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error { 622 if aturi == "" { 623 return nil 624 } 625 626 parsed := syntax.ATURI(aturi) 627 628 collection := parsed.Collection().String() 629 repo := parsed.Authority().String() 630 rkey := parsed.RecordKey().String() 631 632 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{ 633 Collection: collection, 634 Repo: repo, 635 Rkey: rkey, 636 }) 637 return err 638} 639 640func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error { 641 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults)) 642 if err != nil { 643 return err 644 } 645 // already present 646 if len(defaultLabels) == len(defaults) { 647 return nil 648 } 649 650 labelDefs, err := models.FetchLabelDefs(r, defaults) 651 if err != nil { 652 return err 653 } 654 655 // Insert each label definition to the database 656 for _, labelDef := range labelDefs { 657 _, err = db.AddLabelDefinition(e, &labelDef) 658 if err != nil { 659 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err) 660 } 661 } 662 663 return nil 664} 665 666func fetchBskyPosts(ctx context.Context, res *idresolver.Resolver, config *config.Config, d *db.DB, logger *slog.Logger) { 667 resolved, err := res.ResolveIdent(context.Background(), consts.TangledDid) 668 if err != nil { 669 logger.Error("failed to resolve tangled.org DID", "err", err) 670 return 671 } 672 673 pdsEndpoint := resolved.PDSEndpoint() 674 if pdsEndpoint == "" { 675 logger.Error("no PDS endpoint found for tangled.sh DID") 676 return 677 } 678 679 session, err := oauth.CreateAppPasswordSession(res, config.Core.AppPassword, consts.TangledDid, config.Core.RateLimitBypass, logger) 680 if err != nil { 681 logger.Error("failed to create appassword session... skipping fetch", "err", err) 682 return 683 } 684 685 client := xrpc.Client{ 686 Auth: &xrpc.AuthInfo{ 687 AccessJwt: session.AccessJwt, 688 Did: session.Did, 689 }, 690 Host: session.PdsEndpoint, 691 } 692 693 l := log.SubLogger(logger, "bluesky") 694 695 ticker := time.NewTicker(config.Bluesky.UpdateInterval) 696 defer ticker.Stop() 697 698 for { 699 posts, _, err := bsky.FetchPosts(ctx, &client, 20, "") 700 if err != nil { 701 l.Error("failed to fetch bluesky posts", "err", err) 702 } else if err := db.InsertBlueskyPosts(d, posts); err != nil { 703 l.Error("failed to insert bluesky posts", "err", err) 704 } else { 705 l.Info("inserted bluesky posts", "count", len(posts)) 706 } 707 708 select { 709 case <-ticker.C: 710 case <-ctx.Done(): 711 l.Info("stopping bluesky updater") 712 return 713 } 714 } 715}