Monorepo for Tangled
at push-rukyyyptkmtm 661 lines 17 kB view raw
1package state 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "strings" 11 "time" 12 13 "tangled.org/core/api/tangled" 14 "tangled.org/core/appview" 15 "tangled.org/core/appview/bsky" 16 "tangled.org/core/appview/config" 17 "tangled.org/core/appview/db" 18 "tangled.org/core/appview/indexer" 19 "tangled.org/core/appview/mentions" 20 "tangled.org/core/appview/models" 21 "tangled.org/core/appview/notify" 22 dbnotify "tangled.org/core/appview/notify/db" 23 phnotify "tangled.org/core/appview/notify/posthog" 24 "tangled.org/core/appview/oauth" 25 "tangled.org/core/appview/pages" 26 "tangled.org/core/appview/reporesolver" 27 "tangled.org/core/appview/validator" 28 xrpcclient "tangled.org/core/appview/xrpcclient" 29 "tangled.org/core/consts" 30 "tangled.org/core/eventconsumer" 31 "tangled.org/core/idresolver" 32 "tangled.org/core/jetstream" 33 "tangled.org/core/log" 34 tlog "tangled.org/core/log" 35 "tangled.org/core/orm" 36 "tangled.org/core/rbac" 37 "tangled.org/core/tid" 38 39 comatproto "github.com/bluesky-social/indigo/api/atproto" 40 atpclient "github.com/bluesky-social/indigo/atproto/client" 41 "github.com/bluesky-social/indigo/atproto/syntax" 42 lexutil "github.com/bluesky-social/indigo/lex/util" 43 "github.com/bluesky-social/indigo/xrpc" 44 securejoin "github.com/cyphar/filepath-securejoin" 45 "github.com/go-chi/chi/v5" 46 "github.com/posthog/posthog-go" 47) 48 49type State struct { 50 db *db.DB 51 notifier notify.Notifier 52 indexer *indexer.Indexer 53 oauth *oauth.OAuth 54 enforcer *rbac.Enforcer 55 pages *pages.Pages 56 idResolver *idresolver.Resolver 57 mentionsResolver *mentions.Resolver 58 posthog posthog.Client 59 jc *jetstream.JetstreamClient 60 config *config.Config 61 repoResolver *reporesolver.RepoResolver 62 knotstream *eventconsumer.Consumer 63 spindlestream *eventconsumer.Consumer 64 logger *slog.Logger 65 validator *validator.Validator 66} 67 68func Make(ctx context.Context, config *config.Config) (*State, error) { 69 logger := tlog.FromContext(ctx) 70 71 d, err := db.Make(ctx, config.Core.DbPath) 72 if err != nil { 73 return nil, fmt.Errorf("failed to create db: %w", err) 74 } 75 76 indexer := indexer.New(log.SubLogger(logger, "indexer")) 77 err = indexer.Init(ctx, d) 78 if err != nil { 79 return nil, fmt.Errorf("failed to create indexer: %w", err) 80 } 81 82 enforcer, err := rbac.NewEnforcer(config.Core.DbPath) 83 if err != nil { 84 return nil, fmt.Errorf("failed to create enforcer: %w", err) 85 } 86 87 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL) 88 if err != nil { 89 logger.Error("failed to create redis resolver", "err", err) 90 res = idresolver.DefaultResolver(config.Plc.PLCURL) 91 } 92 93 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint}) 94 if err != nil { 95 return nil, fmt.Errorf("failed to create posthog client: %w", err) 96 } 97 98 pages := pages.NewPages(config, res, d, log.SubLogger(logger, "pages")) 99 oauth, err := oauth.New(config, posthog, d, enforcer, res, log.SubLogger(logger, "oauth")) 100 if err != nil { 101 return nil, fmt.Errorf("failed to start oauth handler: %w", err) 102 } 103 validator := validator.New(d, res, enforcer) 104 105 repoResolver := reporesolver.New(config, enforcer, d) 106 107 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver")) 108 109 wrapper := db.DbWrapper{Execer: d} 110 jc, err := jetstream.NewJetstreamClient( 111 config.Jetstream.Endpoint, 112 "appview", 113 []string{ 114 tangled.GraphFollowNSID, 115 tangled.FeedStarNSID, 116 tangled.PublicKeyNSID, 117 tangled.RepoArtifactNSID, 118 tangled.ActorProfileNSID, 119 tangled.KnotMemberNSID, 120 tangled.SpindleMemberNSID, 121 tangled.SpindleNSID, 122 tangled.StringNSID, 123 tangled.RepoIssueNSID, 124 tangled.RepoIssueCommentNSID, 125 tangled.LabelDefinitionNSID, 126 tangled.LabelOpNSID, 127 }, 128 nil, 129 tlog.SubLogger(logger, "jetstream"), 130 wrapper, 131 false, 132 133 // in-memory filter is inapplicable to appview so 134 // we'll never log dids anyway. 135 false, 136 ) 137 if err != nil { 138 return nil, fmt.Errorf("failed to create jetstream client: %w", err) 139 } 140 141 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil { 142 return nil, fmt.Errorf("failed to backfill default label defs: %w", err) 143 } 144 145 ingester := appview.Ingester{ 146 Db: wrapper, 147 Enforcer: enforcer, 148 IdResolver: res, 149 Config: config, 150 Logger: log.SubLogger(logger, "ingester"), 151 Validator: validator, 152 } 153 err = jc.StartJetstream(ctx, ingester.Ingest()) 154 if err != nil { 155 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 156 } 157 158 var notifiers []notify.Notifier 159 160 // Always add the database notifier 161 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res)) 162 163 // Add other notifiers in production only 164 if !config.Core.Dev { 165 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog)) 166 } 167 notifiers = append(notifiers, indexer) 168 169 // Add webhook notifier 170 notifiers = append(notifiers, notify.NewWebhookNotifier(d)) 171 172 notifier := notify.NewMergedNotifier(notifiers) 173 notifier = notify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify")) 174 175 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog, notifier) 176 if err != nil { 177 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err) 178 } 179 knotstream.Start(ctx) 180 181 spindlestream, err := Spindlestream(ctx, config, d, enforcer) 182 if err != nil { 183 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err) 184 } 185 spindlestream.Start(ctx) 186 187 state := &State{ 188 d, 189 notifier, 190 indexer, 191 oauth, 192 enforcer, 193 pages, 194 res, 195 mentionsResolver, 196 posthog, 197 jc, 198 config, 199 repoResolver, 200 knotstream, 201 spindlestream, 202 logger, 203 validator, 204 } 205 206 // fetch initial bluesky posts if configured 207 go fetchBskyPosts(ctx, res, config, d, logger) 208 209 return state, nil 210} 211 212func (s *State) Close() error { 213 // other close up logic goes here 214 return s.db.Close() 215} 216 217func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) { 218 w.Header().Set("Content-Type", "text/plain") 219 w.Header().Set("Cache-Control", "public, max-age=86400") // one day 220 221 robotsTxt := `# Hello, Tanglers! 222User-agent: * 223Allow: / 224Disallow: /*/*/settings 225Disallow: /settings 226Disallow: /*/*/compare 227Disallow: /*/*/fork 228 229Crawl-delay: 1 230` 231 w.Write([]byte(robotsTxt)) 232} 233 234func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) { 235 user := s.oauth.GetMultiAccountUser(r) 236 s.pages.TermsOfService(w, pages.TermsOfServiceParams{ 237 LoggedInUser: user, 238 }) 239} 240 241func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) { 242 user := s.oauth.GetMultiAccountUser(r) 243 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{ 244 LoggedInUser: user, 245 }) 246} 247 248func (s *State) Brand(w http.ResponseWriter, r *http.Request) { 249 user := s.oauth.GetMultiAccountUser(r) 250 s.pages.Brand(w, pages.BrandParams{ 251 LoggedInUser: user, 252 }) 253} 254 255func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) { 256 user := s.oauth.GetMultiAccountUser(r) 257 if user == nil { 258 return 259 } 260 261 l := s.logger.With("handler", "UpgradeBanner") 262 l = l.With("did", user.Active.Did) 263 264 regs, err := db.GetRegistrations( 265 s.db, 266 orm.FilterEq("did", user.Active.Did), 267 orm.FilterEq("needs_upgrade", 1), 268 ) 269 if err != nil { 270 l.Error("non-fatal: failed to get registrations", "err", err) 271 } 272 273 spindles, err := db.GetSpindles( 274 s.db, 275 orm.FilterEq("owner", user.Active.Did), 276 orm.FilterEq("needs_upgrade", 1), 277 ) 278 if err != nil { 279 l.Error("non-fatal: failed to get spindles", "err", err) 280 } 281 282 if regs == nil && spindles == nil { 283 return 284 } 285 286 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{ 287 Registrations: regs, 288 Spindles: spindles, 289 }) 290} 291 292func (s *State) Keys(w http.ResponseWriter, r *http.Request) { 293 user := chi.URLParam(r, "user") 294 user = strings.TrimPrefix(user, "@") 295 296 if user == "" { 297 w.WriteHeader(http.StatusBadRequest) 298 return 299 } 300 301 id, err := s.idResolver.ResolveIdent(r.Context(), user) 302 if err != nil { 303 w.WriteHeader(http.StatusInternalServerError) 304 return 305 } 306 307 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String()) 308 if err != nil { 309 s.logger.Error("failed to get public keys", "err", err) 310 http.Error(w, "failed to get public keys", http.StatusInternalServerError) 311 return 312 } 313 314 if len(pubKeys) == 0 { 315 w.WriteHeader(http.StatusNoContent) 316 return 317 } 318 319 for _, k := range pubKeys { 320 key := strings.TrimRight(k.Key, "\n") 321 fmt.Fprintln(w, key) 322 } 323} 324 325func validateRepoName(name string) error { 326 // check for path traversal attempts 327 if name == "." || name == ".." || 328 strings.Contains(name, "/") || strings.Contains(name, "\\") { 329 return fmt.Errorf("Repository name contains invalid path characters") 330 } 331 332 // check for sequences that could be used for traversal when normalized 333 if strings.Contains(name, "./") || strings.Contains(name, "../") || 334 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") { 335 return fmt.Errorf("Repository name contains invalid path sequence") 336 } 337 338 // then continue with character validation 339 for _, char := range name { 340 if !((char >= 'a' && char <= 'z') || 341 (char >= 'A' && char <= 'Z') || 342 (char >= '0' && char <= '9') || 343 char == '-' || char == '_' || char == '.') { 344 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores") 345 } 346 } 347 348 // additional check to prevent multiple sequential dots 349 if strings.Contains(name, "..") { 350 return fmt.Errorf("Repository name cannot contain sequential dots") 351 } 352 353 // if all checks pass 354 return nil 355} 356 357func stripGitExt(name string) string { 358 return strings.TrimSuffix(name, ".git") 359} 360 361func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) { 362 switch r.Method { 363 case http.MethodGet: 364 user := s.oauth.GetMultiAccountUser(r) 365 knots, err := s.enforcer.GetKnotsForUser(user.Active.Did) 366 if err != nil { 367 s.pages.Notice(w, "repo", "Invalid user account.") 368 return 369 } 370 371 s.pages.NewRepo(w, pages.NewRepoParams{ 372 LoggedInUser: user, 373 Knots: knots, 374 }) 375 376 case http.MethodPost: 377 l := s.logger.With("handler", "NewRepo") 378 379 user := s.oauth.GetMultiAccountUser(r) 380 l = l.With("did", user.Active.Did) 381 382 // form validation 383 domain := r.FormValue("domain") 384 if domain == "" { 385 s.pages.Notice(w, "repo", "Invalid form submission&mdash;missing knot domain.") 386 return 387 } 388 l = l.With("knot", domain) 389 390 repoName := r.FormValue("name") 391 if repoName == "" { 392 s.pages.Notice(w, "repo", "Repository name cannot be empty.") 393 return 394 } 395 396 if err := validateRepoName(repoName); err != nil { 397 s.pages.Notice(w, "repo", err.Error()) 398 return 399 } 400 repoName = stripGitExt(repoName) 401 l = l.With("repoName", repoName) 402 403 defaultBranch := r.FormValue("branch") 404 if defaultBranch == "" { 405 defaultBranch = "main" 406 } 407 l = l.With("defaultBranch", defaultBranch) 408 409 description := r.FormValue("description") 410 if len([]rune(description)) > 140 { 411 s.pages.Notice(w, "repo", "Description must be 140 characters or fewer.") 412 return 413 } 414 415 // ACL validation 416 ok, err := s.enforcer.E.Enforce(user.Active.Did, domain, domain, "repo:create") 417 if err != nil || !ok { 418 l.Info("unauthorized") 419 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 420 return 421 } 422 423 // Check for existing repos 424 existingRepo, err := db.GetRepo( 425 s.db, 426 orm.FilterEq("did", user.Active.Did), 427 orm.FilterEq("name", repoName), 428 ) 429 if err == nil && existingRepo != nil { 430 l.Info("repo exists") 431 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot)) 432 return 433 } 434 435 // create atproto record for this repo 436 rkey := tid.TID() 437 repo := &models.Repo{ 438 Did: user.Active.Did, 439 Name: repoName, 440 Knot: domain, 441 Rkey: rkey, 442 Description: description, 443 Created: time.Now(), 444 Labels: s.config.Label.DefaultLabelDefs, 445 } 446 record := repo.AsRecord() 447 448 atpClient, err := s.oauth.AuthorizedClient(r) 449 if err != nil { 450 l.Info("PDS write failed", "err", err) 451 s.pages.Notice(w, "repo", "Failed to write record to PDS.") 452 return 453 } 454 455 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{ 456 Collection: tangled.RepoNSID, 457 Repo: user.Active.Did, 458 Rkey: rkey, 459 Record: &lexutil.LexiconTypeDecoder{ 460 Val: &record, 461 }, 462 }) 463 if err != nil { 464 l.Info("PDS write failed", "err", err) 465 s.pages.Notice(w, "repo", "Failed to announce repository creation.") 466 return 467 } 468 469 aturi := atresp.Uri 470 l = l.With("aturi", aturi) 471 l.Info("wrote to PDS") 472 473 tx, err := s.db.BeginTx(r.Context(), nil) 474 if err != nil { 475 l.Info("txn failed", "err", err) 476 s.pages.Notice(w, "repo", "Failed to save repository information.") 477 return 478 } 479 480 // The rollback function reverts a few things on failure: 481 // - the pending txn 482 // - the ACLs 483 // - the atproto record created 484 rollback := func() { 485 err1 := tx.Rollback() 486 err2 := s.enforcer.E.LoadPolicy() 487 err3 := rollbackRecord(context.Background(), aturi, atpClient) 488 489 // ignore txn complete errors, this is okay 490 if errors.Is(err1, sql.ErrTxDone) { 491 err1 = nil 492 } 493 494 if errs := errors.Join(err1, err2, err3); errs != nil { 495 l.Error("failed to rollback changes", "errs", errs) 496 return 497 } 498 } 499 defer rollback() 500 501 client, err := s.oauth.ServiceClient( 502 r, 503 oauth.WithService(domain), 504 oauth.WithLxm(tangled.RepoCreateNSID), 505 oauth.WithDev(s.config.Core.Dev), 506 ) 507 if err != nil { 508 l.Error("service auth failed", "err", err) 509 s.pages.Notice(w, "repo", "Failed to reach PDS.") 510 return 511 } 512 513 xe := tangled.RepoCreate( 514 r.Context(), 515 client, 516 &tangled.RepoCreate_Input{ 517 Rkey: rkey, 518 }, 519 ) 520 if err := xrpcclient.HandleXrpcErr(xe); err != nil { 521 l.Error("xrpc error", "xe", xe) 522 s.pages.Notice(w, "repo", err.Error()) 523 return 524 } 525 526 err = db.AddRepo(tx, repo) 527 if err != nil { 528 l.Error("db write failed", "err", err) 529 s.pages.Notice(w, "repo", "Failed to save repository information.") 530 return 531 } 532 533 // acls 534 p, _ := securejoin.SecureJoin(user.Active.Did, repoName) 535 err = s.enforcer.AddRepo(user.Active.Did, domain, p) 536 if err != nil { 537 l.Error("acl setup failed", "err", err) 538 s.pages.Notice(w, "repo", "Failed to set up repository permissions.") 539 return 540 } 541 542 err = tx.Commit() 543 if err != nil { 544 l.Error("txn commit failed", "err", err) 545 http.Error(w, err.Error(), http.StatusInternalServerError) 546 return 547 } 548 549 err = s.enforcer.E.SavePolicy() 550 if err != nil { 551 l.Error("acl save failed", "err", err) 552 http.Error(w, err.Error(), http.StatusInternalServerError) 553 return 554 } 555 556 // reset the ATURI because the transaction completed successfully 557 aturi = "" 558 559 s.notifier.NewRepo(r.Context(), repo) 560 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Active.Did, repoName)) 561 } 562} 563 564// this is used to rollback changes made to the PDS 565// 566// it is a no-op if the provided ATURI is empty 567func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error { 568 if aturi == "" { 569 return nil 570 } 571 572 parsed := syntax.ATURI(aturi) 573 574 collection := parsed.Collection().String() 575 repo := parsed.Authority().String() 576 rkey := parsed.RecordKey().String() 577 578 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{ 579 Collection: collection, 580 Repo: repo, 581 Rkey: rkey, 582 }) 583 return err 584} 585 586func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error { 587 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults)) 588 if err != nil { 589 return err 590 } 591 // already present 592 if len(defaultLabels) == len(defaults) { 593 return nil 594 } 595 596 labelDefs, err := models.FetchLabelDefs(r, defaults) 597 if err != nil { 598 return err 599 } 600 601 // Insert each label definition to the database 602 for _, labelDef := range labelDefs { 603 _, err = db.AddLabelDefinition(e, &labelDef) 604 if err != nil { 605 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err) 606 } 607 } 608 609 return nil 610} 611 612func fetchBskyPosts(ctx context.Context, res *idresolver.Resolver, config *config.Config, d *db.DB, logger *slog.Logger) { 613 resolved, err := res.ResolveIdent(context.Background(), consts.TangledDid) 614 if err != nil { 615 logger.Error("failed to resolve tangled.org DID", "err", err) 616 return 617 } 618 619 pdsEndpoint := resolved.PDSEndpoint() 620 if pdsEndpoint == "" { 621 logger.Error("no PDS endpoint found for tangled.sh DID") 622 return 623 } 624 625 session, err := oauth.CreateAppPasswordSession(res, config.Core.AppPassword, consts.TangledDid, config.Core.RateLimitBypass, logger) 626 if err != nil { 627 logger.Error("failed to create appassword session... skipping fetch", "err", err) 628 return 629 } 630 631 client := xrpc.Client{ 632 Auth: &xrpc.AuthInfo{ 633 AccessJwt: session.AccessJwt, 634 Did: session.Did, 635 }, 636 Host: session.PdsEndpoint, 637 } 638 639 l := log.SubLogger(logger, "bluesky") 640 641 ticker := time.NewTicker(config.Bluesky.UpdateInterval) 642 defer ticker.Stop() 643 644 for { 645 posts, _, err := bsky.FetchPosts(ctx, &client, 20, "") 646 if err != nil { 647 l.Error("failed to fetch bluesky posts", "err", err) 648 } else if err := db.InsertBlueskyPosts(d, posts); err != nil { 649 l.Error("failed to insert bluesky posts", "err", err) 650 } else { 651 l.Info("inserted bluesky posts", "count", len(posts)) 652 } 653 654 select { 655 case <-ticker.C: 656 case <-ctx.Done(): 657 l.Info("stopping bluesky updater") 658 return 659 } 660 } 661}