Monorepo for Tangled
at master 673 lines 18 kB view raw
1package state 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "strings" 11 "time" 12 13 "tangled.org/core/api/tangled" 14 "tangled.org/core/appview" 15 "tangled.org/core/appview/bsky" 16 "tangled.org/core/appview/cloudflare" 17 "tangled.org/core/appview/config" 18 "tangled.org/core/appview/db" 19 "tangled.org/core/appview/indexer" 20 "tangled.org/core/appview/mentions" 21 "tangled.org/core/appview/models" 22 "tangled.org/core/appview/notify" 23 dbnotify "tangled.org/core/appview/notify/db" 24 phnotify "tangled.org/core/appview/notify/posthog" 25 "tangled.org/core/appview/oauth" 26 "tangled.org/core/appview/pages" 27 "tangled.org/core/appview/reporesolver" 28 "tangled.org/core/appview/validator" 29 xrpcclient "tangled.org/core/appview/xrpcclient" 30 "tangled.org/core/consts" 31 "tangled.org/core/eventconsumer" 32 "tangled.org/core/idresolver" 33 "tangled.org/core/jetstream" 34 "tangled.org/core/log" 35 tlog "tangled.org/core/log" 36 "tangled.org/core/orm" 37 "tangled.org/core/rbac" 38 "tangled.org/core/tid" 39 40 comatproto "github.com/bluesky-social/indigo/api/atproto" 41 atpclient "github.com/bluesky-social/indigo/atproto/client" 42 "github.com/bluesky-social/indigo/atproto/syntax" 43 lexutil "github.com/bluesky-social/indigo/lex/util" 44 "github.com/bluesky-social/indigo/xrpc" 45 securejoin "github.com/cyphar/filepath-securejoin" 46 "github.com/go-chi/chi/v5" 47 "github.com/posthog/posthog-go" 48) 49 50type State struct { 51 db *db.DB 52 notifier notify.Notifier 53 indexer *indexer.Indexer 54 oauth *oauth.OAuth 55 enforcer *rbac.Enforcer 56 pages *pages.Pages 57 idResolver *idresolver.Resolver 58 mentionsResolver *mentions.Resolver 59 posthog posthog.Client 60 jc *jetstream.JetstreamClient 61 config *config.Config 62 repoResolver *reporesolver.RepoResolver 63 knotstream *eventconsumer.Consumer 64 spindlestream *eventconsumer.Consumer 65 logger *slog.Logger 66 validator *validator.Validator 67 cfClient *cloudflare.Client 68} 69 70func Make(ctx context.Context, config *config.Config) (*State, error) { 71 logger := tlog.FromContext(ctx) 72 73 d, err := db.Make(ctx, config.Core.DbPath) 74 if err != nil { 75 return nil, fmt.Errorf("failed to create db: %w", err) 76 } 77 78 indexer := indexer.New(log.SubLogger(logger, "indexer")) 79 err = indexer.Init(ctx, d) 80 if err != nil { 81 return nil, fmt.Errorf("failed to create indexer: %w", err) 82 } 83 84 enforcer, err := rbac.NewEnforcer(config.Core.DbPath) 85 if err != nil { 86 return nil, fmt.Errorf("failed to create enforcer: %w", err) 87 } 88 89 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL) 90 if err != nil { 91 logger.Error("failed to create redis resolver", "err", err) 92 res = idresolver.DefaultResolver(config.Plc.PLCURL) 93 } 94 95 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint}) 96 if err != nil { 97 return nil, fmt.Errorf("failed to create posthog client: %w", err) 98 } 99 100 pages := pages.NewPages(config, res, d, log.SubLogger(logger, "pages")) 101 oauth, err := oauth.New(config, posthog, d, enforcer, res, log.SubLogger(logger, "oauth")) 102 if err != nil { 103 return nil, fmt.Errorf("failed to start oauth handler: %w", err) 104 } 105 validator := validator.New(d, res, enforcer) 106 107 repoResolver := reporesolver.New(config, enforcer, d) 108 109 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver")) 110 111 wrapper := db.DbWrapper{Execer: d} 112 jc, err := jetstream.NewJetstreamClient( 113 config.Jetstream.Endpoint, 114 "appview", 115 []string{ 116 tangled.GraphFollowNSID, 117 tangled.FeedStarNSID, 118 tangled.PublicKeyNSID, 119 tangled.RepoArtifactNSID, 120 tangled.ActorProfileNSID, 121 tangled.KnotMemberNSID, 122 tangled.SpindleMemberNSID, 123 tangled.SpindleNSID, 124 tangled.StringNSID, 125 tangled.RepoIssueNSID, 126 tangled.RepoIssueCommentNSID, 127 tangled.LabelDefinitionNSID, 128 tangled.LabelOpNSID, 129 }, 130 nil, 131 tlog.SubLogger(logger, "jetstream"), 132 wrapper, 133 false, 134 135 // in-memory filter is inapplicable to appview so 136 // we'll never log dids anyway. 137 false, 138 ) 139 if err != nil { 140 return nil, fmt.Errorf("failed to create jetstream client: %w", err) 141 } 142 143 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil { 144 return nil, fmt.Errorf("failed to backfill default label defs: %w", err) 145 } 146 147 ingester := appview.Ingester{ 148 Db: wrapper, 149 Enforcer: enforcer, 150 IdResolver: res, 151 Config: config, 152 Logger: log.SubLogger(logger, "ingester"), 153 Validator: validator, 154 } 155 err = jc.StartJetstream(ctx, ingester.Ingest()) 156 if err != nil { 157 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 158 } 159 160 var notifiers []notify.Notifier 161 162 // Always add the database notifier 163 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res)) 164 165 // Add other notifiers in production only 166 if !config.Core.Dev { 167 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog)) 168 } 169 notifiers = append(notifiers, indexer) 170 171 // Add webhook notifier 172 notifiers = append(notifiers, notify.NewWebhookNotifier(d)) 173 174 notifier := notify.NewMergedNotifier(notifiers) 175 notifier = notify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify")) 176 177 var cfClient *cloudflare.Client 178 if config.Cloudflare.ApiToken != "" { 179 cfClient, err = cloudflare.New(config) 180 if err != nil { 181 logger.Warn("failed to create cloudflare client, sites upload will be disabled", "err", err) 182 cfClient = nil 183 } 184 } 185 186 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog, notifier, cfClient) 187 if err != nil { 188 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err) 189 } 190 knotstream.Start(ctx) 191 192 spindlestream, err := Spindlestream(ctx, config, d, enforcer) 193 if err != nil { 194 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err) 195 } 196 spindlestream.Start(ctx) 197 198 state := &State{ 199 db: d, 200 notifier: notifier, 201 indexer: indexer, 202 oauth: oauth, 203 enforcer: enforcer, 204 pages: pages, 205 idResolver: res, 206 mentionsResolver: mentionsResolver, 207 posthog: posthog, 208 jc: jc, 209 config: config, 210 repoResolver: repoResolver, 211 knotstream: knotstream, 212 spindlestream: spindlestream, 213 logger: logger, 214 validator: validator, 215 cfClient: cfClient, 216 } 217 218 // fetch initial bluesky posts if configured 219 go fetchBskyPosts(ctx, res, config, d, logger) 220 221 return state, nil 222} 223 224func (s *State) Close() error { 225 // other close up logic goes here 226 return s.db.Close() 227} 228 229func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) { 230 w.Header().Set("Content-Type", "text/plain") 231 w.Header().Set("Cache-Control", "public, max-age=86400") // one day 232 233 robotsTxt := `# Hello, Tanglers! 234User-agent: * 235Allow: / 236Disallow: /*/*/settings 237Disallow: /settings 238Disallow: /*/*/compare 239Disallow: /*/*/fork 240 241Crawl-delay: 1 242` 243 w.Write([]byte(robotsTxt)) 244} 245 246func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) { 247 user := s.oauth.GetMultiAccountUser(r) 248 s.pages.TermsOfService(w, pages.TermsOfServiceParams{ 249 LoggedInUser: user, 250 }) 251} 252 253func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) { 254 user := s.oauth.GetMultiAccountUser(r) 255 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{ 256 LoggedInUser: user, 257 }) 258} 259 260func (s *State) Brand(w http.ResponseWriter, r *http.Request) { 261 user := s.oauth.GetMultiAccountUser(r) 262 s.pages.Brand(w, pages.BrandParams{ 263 LoggedInUser: user, 264 }) 265} 266 267func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) { 268 user := s.oauth.GetMultiAccountUser(r) 269 if user == nil { 270 return 271 } 272 273 l := s.logger.With("handler", "UpgradeBanner") 274 l = l.With("did", user.Active.Did) 275 276 regs, err := db.GetRegistrations( 277 s.db, 278 orm.FilterEq("did", user.Active.Did), 279 orm.FilterEq("needs_upgrade", 1), 280 ) 281 if err != nil { 282 l.Error("non-fatal: failed to get registrations", "err", err) 283 } 284 285 spindles, err := db.GetSpindles( 286 s.db, 287 orm.FilterEq("owner", user.Active.Did), 288 orm.FilterEq("needs_upgrade", 1), 289 ) 290 if err != nil { 291 l.Error("non-fatal: failed to get spindles", "err", err) 292 } 293 294 if regs == nil && spindles == nil { 295 return 296 } 297 298 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{ 299 Registrations: regs, 300 Spindles: spindles, 301 }) 302} 303 304func (s *State) Keys(w http.ResponseWriter, r *http.Request) { 305 user := chi.URLParam(r, "user") 306 user = strings.TrimPrefix(user, "@") 307 308 if user == "" { 309 w.WriteHeader(http.StatusBadRequest) 310 return 311 } 312 313 id, err := s.idResolver.ResolveIdent(r.Context(), user) 314 if err != nil { 315 w.WriteHeader(http.StatusInternalServerError) 316 return 317 } 318 319 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String()) 320 if err != nil { 321 s.logger.Error("failed to get public keys", "err", err) 322 http.Error(w, "failed to get public keys", http.StatusInternalServerError) 323 return 324 } 325 326 if len(pubKeys) == 0 { 327 w.WriteHeader(http.StatusNoContent) 328 return 329 } 330 331 for _, k := range pubKeys { 332 key := strings.TrimRight(k.Key, "\n") 333 fmt.Fprintln(w, key) 334 } 335} 336 337func validateRepoName(name string) error { 338 // check for path traversal attempts 339 if name == "." || name == ".." || 340 strings.Contains(name, "/") || strings.Contains(name, "\\") { 341 return fmt.Errorf("Repository name contains invalid path characters") 342 } 343 344 // check for sequences that could be used for traversal when normalized 345 if strings.Contains(name, "./") || strings.Contains(name, "../") || 346 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") { 347 return fmt.Errorf("Repository name contains invalid path sequence") 348 } 349 350 // then continue with character validation 351 for _, char := range name { 352 if !((char >= 'a' && char <= 'z') || 353 (char >= 'A' && char <= 'Z') || 354 (char >= '0' && char <= '9') || 355 char == '-' || char == '_' || char == '.') { 356 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores") 357 } 358 } 359 360 // additional check to prevent multiple sequential dots 361 if strings.Contains(name, "..") { 362 return fmt.Errorf("Repository name cannot contain sequential dots") 363 } 364 365 // if all checks pass 366 return nil 367} 368 369func stripGitExt(name string) string { 370 return strings.TrimSuffix(name, ".git") 371} 372 373func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) { 374 switch r.Method { 375 case http.MethodGet: 376 user := s.oauth.GetMultiAccountUser(r) 377 knots, err := s.enforcer.GetKnotsForUser(user.Active.Did) 378 if err != nil { 379 s.pages.Notice(w, "repo", "Invalid user account.") 380 return 381 } 382 383 s.pages.NewRepo(w, pages.NewRepoParams{ 384 LoggedInUser: user, 385 Knots: knots, 386 }) 387 388 case http.MethodPost: 389 l := s.logger.With("handler", "NewRepo") 390 391 user := s.oauth.GetMultiAccountUser(r) 392 l = l.With("did", user.Active.Did) 393 394 // form validation 395 domain := r.FormValue("domain") 396 if domain == "" { 397 s.pages.Notice(w, "repo", "Invalid form submission&mdash;missing knot domain.") 398 return 399 } 400 l = l.With("knot", domain) 401 402 repoName := r.FormValue("name") 403 if repoName == "" { 404 s.pages.Notice(w, "repo", "Repository name cannot be empty.") 405 return 406 } 407 408 if err := validateRepoName(repoName); err != nil { 409 s.pages.Notice(w, "repo", err.Error()) 410 return 411 } 412 repoName = stripGitExt(repoName) 413 l = l.With("repoName", repoName) 414 415 defaultBranch := r.FormValue("branch") 416 if defaultBranch == "" { 417 defaultBranch = "main" 418 } 419 l = l.With("defaultBranch", defaultBranch) 420 421 description := r.FormValue("description") 422 if len([]rune(description)) > 140 { 423 s.pages.Notice(w, "repo", "Description must be 140 characters or fewer.") 424 return 425 } 426 427 // ACL validation 428 ok, err := s.enforcer.E.Enforce(user.Active.Did, domain, domain, "repo:create") 429 if err != nil || !ok { 430 l.Info("unauthorized") 431 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 432 return 433 } 434 435 // Check for existing repos 436 existingRepo, err := db.GetRepo( 437 s.db, 438 orm.FilterEq("did", user.Active.Did), 439 orm.FilterEq("name", repoName), 440 ) 441 if err == nil && existingRepo != nil { 442 l.Info("repo exists") 443 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot)) 444 return 445 } 446 447 // create atproto record for this repo 448 rkey := tid.TID() 449 repo := &models.Repo{ 450 Did: user.Active.Did, 451 Name: repoName, 452 Knot: domain, 453 Rkey: rkey, 454 Description: description, 455 Created: time.Now(), 456 Labels: s.config.Label.DefaultLabelDefs, 457 } 458 record := repo.AsRecord() 459 460 atpClient, err := s.oauth.AuthorizedClient(r) 461 if err != nil { 462 l.Info("PDS write failed", "err", err) 463 s.pages.Notice(w, "repo", "Failed to write record to PDS.") 464 return 465 } 466 467 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{ 468 Collection: tangled.RepoNSID, 469 Repo: user.Active.Did, 470 Rkey: rkey, 471 Record: &lexutil.LexiconTypeDecoder{ 472 Val: &record, 473 }, 474 }) 475 if err != nil { 476 l.Info("PDS write failed", "err", err) 477 s.pages.Notice(w, "repo", "Failed to announce repository creation.") 478 return 479 } 480 481 aturi := atresp.Uri 482 l = l.With("aturi", aturi) 483 l.Info("wrote to PDS") 484 485 tx, err := s.db.BeginTx(r.Context(), nil) 486 if err != nil { 487 l.Info("txn failed", "err", err) 488 s.pages.Notice(w, "repo", "Failed to save repository information.") 489 return 490 } 491 492 // The rollback function reverts a few things on failure: 493 // - the pending txn 494 // - the ACLs 495 // - the atproto record created 496 rollback := func() { 497 err1 := tx.Rollback() 498 err2 := s.enforcer.E.LoadPolicy() 499 err3 := rollbackRecord(context.Background(), aturi, atpClient) 500 501 // ignore txn complete errors, this is okay 502 if errors.Is(err1, sql.ErrTxDone) { 503 err1 = nil 504 } 505 506 if errs := errors.Join(err1, err2, err3); errs != nil { 507 l.Error("failed to rollback changes", "errs", errs) 508 return 509 } 510 } 511 defer rollback() 512 513 client, err := s.oauth.ServiceClient( 514 r, 515 oauth.WithService(domain), 516 oauth.WithLxm(tangled.RepoCreateNSID), 517 oauth.WithDev(s.config.Core.Dev), 518 ) 519 if err != nil { 520 l.Error("service auth failed", "err", err) 521 s.pages.Notice(w, "repo", "Failed to reach PDS.") 522 return 523 } 524 525 xe := tangled.RepoCreate( 526 r.Context(), 527 client, 528 &tangled.RepoCreate_Input{ 529 Rkey: rkey, 530 }, 531 ) 532 if err := xrpcclient.HandleXrpcErr(xe); err != nil { 533 l.Error("xrpc error", "xe", xe) 534 s.pages.Notice(w, "repo", err.Error()) 535 return 536 } 537 538 err = db.AddRepo(tx, repo) 539 if err != nil { 540 l.Error("db write failed", "err", err) 541 s.pages.Notice(w, "repo", "Failed to save repository information.") 542 return 543 } 544 545 // acls 546 p, _ := securejoin.SecureJoin(user.Active.Did, repoName) 547 err = s.enforcer.AddRepo(user.Active.Did, domain, p) 548 if err != nil { 549 l.Error("acl setup failed", "err", err) 550 s.pages.Notice(w, "repo", "Failed to set up repository permissions.") 551 return 552 } 553 554 err = tx.Commit() 555 if err != nil { 556 l.Error("txn commit failed", "err", err) 557 http.Error(w, err.Error(), http.StatusInternalServerError) 558 return 559 } 560 561 err = s.enforcer.E.SavePolicy() 562 if err != nil { 563 l.Error("acl save failed", "err", err) 564 http.Error(w, err.Error(), http.StatusInternalServerError) 565 return 566 } 567 568 // reset the ATURI because the transaction completed successfully 569 aturi = "" 570 571 s.notifier.NewRepo(r.Context(), repo) 572 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Active.Did, repoName)) 573 } 574} 575 576// this is used to rollback changes made to the PDS 577// 578// it is a no-op if the provided ATURI is empty 579func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error { 580 if aturi == "" { 581 return nil 582 } 583 584 parsed := syntax.ATURI(aturi) 585 586 collection := parsed.Collection().String() 587 repo := parsed.Authority().String() 588 rkey := parsed.RecordKey().String() 589 590 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{ 591 Collection: collection, 592 Repo: repo, 593 Rkey: rkey, 594 }) 595 return err 596} 597 598func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error { 599 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults)) 600 if err != nil { 601 return err 602 } 603 // already present 604 if len(defaultLabels) == len(defaults) { 605 return nil 606 } 607 608 labelDefs, err := models.FetchLabelDefs(r, defaults) 609 if err != nil { 610 return err 611 } 612 613 // Insert each label definition to the database 614 for _, labelDef := range labelDefs { 615 _, err = db.AddLabelDefinition(e, &labelDef) 616 if err != nil { 617 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err) 618 } 619 } 620 621 return nil 622} 623 624func fetchBskyPosts(ctx context.Context, res *idresolver.Resolver, config *config.Config, d *db.DB, logger *slog.Logger) { 625 resolved, err := res.ResolveIdent(context.Background(), consts.TangledDid) 626 if err != nil { 627 logger.Error("failed to resolve tangled.org DID", "err", err) 628 return 629 } 630 631 pdsEndpoint := resolved.PDSEndpoint() 632 if pdsEndpoint == "" { 633 logger.Error("no PDS endpoint found for tangled.sh DID") 634 return 635 } 636 637 session, err := oauth.CreateAppPasswordSession(res, config.Core.AppPassword, consts.TangledDid, logger) 638 if err != nil { 639 logger.Error("failed to create appassword session... skipping fetch", "err", err) 640 return 641 } 642 643 client := xrpc.Client{ 644 Auth: &xrpc.AuthInfo{ 645 AccessJwt: session.AccessJwt, 646 Did: session.Did, 647 }, 648 Host: session.PdsEndpoint, 649 } 650 651 l := log.SubLogger(logger, "bluesky") 652 653 ticker := time.NewTicker(config.Bluesky.UpdateInterval) 654 defer ticker.Stop() 655 656 for { 657 posts, _, err := bsky.FetchPosts(ctx, &client, 20, "") 658 if err != nil { 659 l.Error("failed to fetch bluesky posts", "err", err) 660 } else if err := db.InsertBlueskyPosts(d, posts); err != nil { 661 l.Error("failed to insert bluesky posts", "err", err) 662 } else { 663 l.Info("inserted bluesky posts", "count", len(posts)) 664 } 665 666 select { 667 case <-ticker.C: 668 case <-ctx.Done(): 669 l.Info("stopping bluesky updater") 670 return 671 } 672 } 673}