Monorepo for Tangled
at 8f7e61bf51373c417c6f98339f4c7becb560d299 660 lines 17 kB view raw
1package state 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "strings" 11 "time" 12 13 "tangled.org/core/api/tangled" 14 "tangled.org/core/appview" 15 "tangled.org/core/appview/bsky" 16 "tangled.org/core/appview/config" 17 "tangled.org/core/appview/db" 18 "tangled.org/core/appview/indexer" 19 "tangled.org/core/appview/mentions" 20 "tangled.org/core/appview/models" 21 "tangled.org/core/appview/notify" 22 dbnotify "tangled.org/core/appview/notify/db" 23 phnotify "tangled.org/core/appview/notify/posthog" 24 "tangled.org/core/appview/oauth" 25 "tangled.org/core/appview/pages" 26 "tangled.org/core/appview/reporesolver" 27 "tangled.org/core/appview/validator" 28 xrpcclient "tangled.org/core/appview/xrpcclient" 29 "tangled.org/core/consts" 30 "tangled.org/core/eventconsumer" 31 "tangled.org/core/idresolver" 32 "tangled.org/core/jetstream" 33 "tangled.org/core/log" 34 tlog "tangled.org/core/log" 35 "tangled.org/core/orm" 36 "tangled.org/core/rbac" 37 "tangled.org/core/tid" 38 39 comatproto "github.com/bluesky-social/indigo/api/atproto" 40 atpclient "github.com/bluesky-social/indigo/atproto/client" 41 "github.com/bluesky-social/indigo/atproto/syntax" 42 lexutil "github.com/bluesky-social/indigo/lex/util" 43 "github.com/bluesky-social/indigo/xrpc" 44 securejoin "github.com/cyphar/filepath-securejoin" 45 "github.com/go-chi/chi/v5" 46 "github.com/posthog/posthog-go" 47) 48 49type State struct { 50 db *db.DB 51 notifier notify.Notifier 52 indexer *indexer.Indexer 53 oauth *oauth.OAuth 54 enforcer *rbac.Enforcer 55 pages *pages.Pages 56 idResolver *idresolver.Resolver 57 mentionsResolver *mentions.Resolver 58 posthog posthog.Client 59 jc *jetstream.JetstreamClient 60 config *config.Config 61 repoResolver *reporesolver.RepoResolver 62 knotstream *eventconsumer.Consumer 63 spindlestream *eventconsumer.Consumer 64 logger *slog.Logger 65 validator *validator.Validator 66} 67 68func Make(ctx context.Context, config *config.Config) (*State, error) { 69 logger := tlog.FromContext(ctx) 70 71 d, err := db.Make(ctx, config.Core.DbPath) 72 if err != nil { 73 return nil, fmt.Errorf("failed to create db: %w", err) 74 } 75 76 indexer := indexer.New(log.SubLogger(logger, "indexer")) 77 err = indexer.Init(ctx, d) 78 if err != nil { 79 return nil, fmt.Errorf("failed to create indexer: %w", err) 80 } 81 82 enforcer, err := rbac.NewEnforcer(config.Core.DbPath) 83 if err != nil { 84 return nil, fmt.Errorf("failed to create enforcer: %w", err) 85 } 86 87 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL) 88 if err != nil { 89 logger.Error("failed to create redis resolver", "err", err) 90 res = idresolver.DefaultResolver(config.Plc.PLCURL) 91 } 92 93 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint}) 94 if err != nil { 95 return nil, fmt.Errorf("failed to create posthog client: %w", err) 96 } 97 98 pages := pages.NewPages(config, res, d, log.SubLogger(logger, "pages")) 99 oauth, err := oauth.New(config, posthog, d, enforcer, res, log.SubLogger(logger, "oauth")) 100 if err != nil { 101 return nil, fmt.Errorf("failed to start oauth handler: %w", err) 102 } 103 validator := validator.New(d, res, enforcer) 104 105 repoResolver := reporesolver.New(config, enforcer, d) 106 107 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver")) 108 109 wrapper := db.DbWrapper{Execer: d} 110 jc, err := jetstream.NewJetstreamClient( 111 config.Jetstream.Endpoint, 112 "appview", 113 []string{ 114 tangled.GraphFollowNSID, 115 tangled.FeedStarNSID, 116 tangled.PublicKeyNSID, 117 tangled.RepoArtifactNSID, 118 tangled.ActorProfileNSID, 119 tangled.SpindleMemberNSID, 120 tangled.SpindleNSID, 121 tangled.StringNSID, 122 tangled.RepoIssueNSID, 123 tangled.RepoIssueCommentNSID, 124 tangled.LabelDefinitionNSID, 125 tangled.LabelOpNSID, 126 }, 127 nil, 128 tlog.SubLogger(logger, "jetstream"), 129 wrapper, 130 false, 131 132 // in-memory filter is inapplicable to appview so 133 // we'll never log dids anyway. 134 false, 135 ) 136 if err != nil { 137 return nil, fmt.Errorf("failed to create jetstream client: %w", err) 138 } 139 140 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil { 141 return nil, fmt.Errorf("failed to backfill default label defs: %w", err) 142 } 143 144 ingester := appview.Ingester{ 145 Db: wrapper, 146 Enforcer: enforcer, 147 IdResolver: res, 148 Config: config, 149 Logger: log.SubLogger(logger, "ingester"), 150 Validator: validator, 151 } 152 err = jc.StartJetstream(ctx, ingester.Ingest()) 153 if err != nil { 154 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 155 } 156 157 var notifiers []notify.Notifier 158 159 // Always add the database notifier 160 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res)) 161 162 // Add other notifiers in production only 163 if !config.Core.Dev { 164 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog)) 165 } 166 notifiers = append(notifiers, indexer) 167 168 // Add webhook notifier 169 notifiers = append(notifiers, notify.NewWebhookNotifier(d)) 170 171 notifier := notify.NewMergedNotifier(notifiers) 172 notifier = notify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify")) 173 174 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog, notifier) 175 if err != nil { 176 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err) 177 } 178 knotstream.Start(ctx) 179 180 spindlestream, err := Spindlestream(ctx, config, d, enforcer) 181 if err != nil { 182 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err) 183 } 184 spindlestream.Start(ctx) 185 186 state := &State{ 187 d, 188 notifier, 189 indexer, 190 oauth, 191 enforcer, 192 pages, 193 res, 194 mentionsResolver, 195 posthog, 196 jc, 197 config, 198 repoResolver, 199 knotstream, 200 spindlestream, 201 logger, 202 validator, 203 } 204 205 // fetch initial bluesky posts if configured 206 go fetchBskyPosts(ctx, res, config, d, logger) 207 208 return state, nil 209} 210 211func (s *State) Close() error { 212 // other close up logic goes here 213 return s.db.Close() 214} 215 216func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) { 217 w.Header().Set("Content-Type", "text/plain") 218 w.Header().Set("Cache-Control", "public, max-age=86400") // one day 219 220 robotsTxt := `# Hello, Tanglers! 221User-agent: * 222Allow: / 223Disallow: /*/*/settings 224Disallow: /settings 225Disallow: /*/*/compare 226Disallow: /*/*/fork 227 228Crawl-delay: 1 229` 230 w.Write([]byte(robotsTxt)) 231} 232 233func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) { 234 user := s.oauth.GetMultiAccountUser(r) 235 s.pages.TermsOfService(w, pages.TermsOfServiceParams{ 236 LoggedInUser: user, 237 }) 238} 239 240func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) { 241 user := s.oauth.GetMultiAccountUser(r) 242 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{ 243 LoggedInUser: user, 244 }) 245} 246 247func (s *State) Brand(w http.ResponseWriter, r *http.Request) { 248 user := s.oauth.GetMultiAccountUser(r) 249 s.pages.Brand(w, pages.BrandParams{ 250 LoggedInUser: user, 251 }) 252} 253 254func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) { 255 user := s.oauth.GetMultiAccountUser(r) 256 if user == nil { 257 return 258 } 259 260 l := s.logger.With("handler", "UpgradeBanner") 261 l = l.With("did", user.Active.Did) 262 263 regs, err := db.GetRegistrations( 264 s.db, 265 orm.FilterEq("did", user.Active.Did), 266 orm.FilterEq("needs_upgrade", 1), 267 ) 268 if err != nil { 269 l.Error("non-fatal: failed to get registrations", "err", err) 270 } 271 272 spindles, err := db.GetSpindles( 273 s.db, 274 orm.FilterEq("owner", user.Active.Did), 275 orm.FilterEq("needs_upgrade", 1), 276 ) 277 if err != nil { 278 l.Error("non-fatal: failed to get spindles", "err", err) 279 } 280 281 if regs == nil && spindles == nil { 282 return 283 } 284 285 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{ 286 Registrations: regs, 287 Spindles: spindles, 288 }) 289} 290 291func (s *State) Keys(w http.ResponseWriter, r *http.Request) { 292 user := chi.URLParam(r, "user") 293 user = strings.TrimPrefix(user, "@") 294 295 if user == "" { 296 w.WriteHeader(http.StatusBadRequest) 297 return 298 } 299 300 id, err := s.idResolver.ResolveIdent(r.Context(), user) 301 if err != nil { 302 w.WriteHeader(http.StatusInternalServerError) 303 return 304 } 305 306 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String()) 307 if err != nil { 308 s.logger.Error("failed to get public keys", "err", err) 309 http.Error(w, "failed to get public keys", http.StatusInternalServerError) 310 return 311 } 312 313 if len(pubKeys) == 0 { 314 w.WriteHeader(http.StatusNoContent) 315 return 316 } 317 318 for _, k := range pubKeys { 319 key := strings.TrimRight(k.Key, "\n") 320 fmt.Fprintln(w, key) 321 } 322} 323 324func validateRepoName(name string) error { 325 // check for path traversal attempts 326 if name == "." || name == ".." || 327 strings.Contains(name, "/") || strings.Contains(name, "\\") { 328 return fmt.Errorf("Repository name contains invalid path characters") 329 } 330 331 // check for sequences that could be used for traversal when normalized 332 if strings.Contains(name, "./") || strings.Contains(name, "../") || 333 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") { 334 return fmt.Errorf("Repository name contains invalid path sequence") 335 } 336 337 // then continue with character validation 338 for _, char := range name { 339 if !((char >= 'a' && char <= 'z') || 340 (char >= 'A' && char <= 'Z') || 341 (char >= '0' && char <= '9') || 342 char == '-' || char == '_' || char == '.') { 343 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores") 344 } 345 } 346 347 // additional check to prevent multiple sequential dots 348 if strings.Contains(name, "..") { 349 return fmt.Errorf("Repository name cannot contain sequential dots") 350 } 351 352 // if all checks pass 353 return nil 354} 355 356func stripGitExt(name string) string { 357 return strings.TrimSuffix(name, ".git") 358} 359 360func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) { 361 switch r.Method { 362 case http.MethodGet: 363 user := s.oauth.GetMultiAccountUser(r) 364 knots, err := s.enforcer.GetKnotsForUser(user.Active.Did) 365 if err != nil { 366 s.pages.Notice(w, "repo", "Invalid user account.") 367 return 368 } 369 370 s.pages.NewRepo(w, pages.NewRepoParams{ 371 LoggedInUser: user, 372 Knots: knots, 373 }) 374 375 case http.MethodPost: 376 l := s.logger.With("handler", "NewRepo") 377 378 user := s.oauth.GetMultiAccountUser(r) 379 l = l.With("did", user.Active.Did) 380 381 // form validation 382 domain := r.FormValue("domain") 383 if domain == "" { 384 s.pages.Notice(w, "repo", "Invalid form submission&mdash;missing knot domain.") 385 return 386 } 387 l = l.With("knot", domain) 388 389 repoName := r.FormValue("name") 390 if repoName == "" { 391 s.pages.Notice(w, "repo", "Repository name cannot be empty.") 392 return 393 } 394 395 if err := validateRepoName(repoName); err != nil { 396 s.pages.Notice(w, "repo", err.Error()) 397 return 398 } 399 repoName = stripGitExt(repoName) 400 l = l.With("repoName", repoName) 401 402 defaultBranch := r.FormValue("branch") 403 if defaultBranch == "" { 404 defaultBranch = "main" 405 } 406 l = l.With("defaultBranch", defaultBranch) 407 408 description := r.FormValue("description") 409 if len([]rune(description)) > 140 { 410 s.pages.Notice(w, "repo", "Description must be 140 characters or fewer.") 411 return 412 } 413 414 // ACL validation 415 ok, err := s.enforcer.E.Enforce(user.Active.Did, domain, domain, "repo:create") 416 if err != nil || !ok { 417 l.Info("unauthorized") 418 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 419 return 420 } 421 422 // Check for existing repos 423 existingRepo, err := db.GetRepo( 424 s.db, 425 orm.FilterEq("did", user.Active.Did), 426 orm.FilterEq("name", repoName), 427 ) 428 if err == nil && existingRepo != nil { 429 l.Info("repo exists") 430 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot)) 431 return 432 } 433 434 // create atproto record for this repo 435 rkey := tid.TID() 436 repo := &models.Repo{ 437 Did: user.Active.Did, 438 Name: repoName, 439 Knot: domain, 440 Rkey: rkey, 441 Description: description, 442 Created: time.Now(), 443 Labels: s.config.Label.DefaultLabelDefs, 444 } 445 record := repo.AsRecord() 446 447 atpClient, err := s.oauth.AuthorizedClient(r) 448 if err != nil { 449 l.Info("PDS write failed", "err", err) 450 s.pages.Notice(w, "repo", "Failed to write record to PDS.") 451 return 452 } 453 454 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{ 455 Collection: tangled.RepoNSID, 456 Repo: user.Active.Did, 457 Rkey: rkey, 458 Record: &lexutil.LexiconTypeDecoder{ 459 Val: &record, 460 }, 461 }) 462 if err != nil { 463 l.Info("PDS write failed", "err", err) 464 s.pages.Notice(w, "repo", "Failed to announce repository creation.") 465 return 466 } 467 468 aturi := atresp.Uri 469 l = l.With("aturi", aturi) 470 l.Info("wrote to PDS") 471 472 tx, err := s.db.BeginTx(r.Context(), nil) 473 if err != nil { 474 l.Info("txn failed", "err", err) 475 s.pages.Notice(w, "repo", "Failed to save repository information.") 476 return 477 } 478 479 // The rollback function reverts a few things on failure: 480 // - the pending txn 481 // - the ACLs 482 // - the atproto record created 483 rollback := func() { 484 err1 := tx.Rollback() 485 err2 := s.enforcer.E.LoadPolicy() 486 err3 := rollbackRecord(context.Background(), aturi, atpClient) 487 488 // ignore txn complete errors, this is okay 489 if errors.Is(err1, sql.ErrTxDone) { 490 err1 = nil 491 } 492 493 if errs := errors.Join(err1, err2, err3); errs != nil { 494 l.Error("failed to rollback changes", "errs", errs) 495 return 496 } 497 } 498 defer rollback() 499 500 client, err := s.oauth.ServiceClient( 501 r, 502 oauth.WithService(domain), 503 oauth.WithLxm(tangled.RepoCreateNSID), 504 oauth.WithDev(s.config.Core.Dev), 505 ) 506 if err != nil { 507 l.Error("service auth failed", "err", err) 508 s.pages.Notice(w, "repo", "Failed to reach PDS.") 509 return 510 } 511 512 xe := tangled.RepoCreate( 513 r.Context(), 514 client, 515 &tangled.RepoCreate_Input{ 516 Rkey: rkey, 517 }, 518 ) 519 if err := xrpcclient.HandleXrpcErr(xe); err != nil { 520 l.Error("xrpc error", "xe", xe) 521 s.pages.Notice(w, "repo", err.Error()) 522 return 523 } 524 525 err = db.AddRepo(tx, repo) 526 if err != nil { 527 l.Error("db write failed", "err", err) 528 s.pages.Notice(w, "repo", "Failed to save repository information.") 529 return 530 } 531 532 // acls 533 p, _ := securejoin.SecureJoin(user.Active.Did, repoName) 534 err = s.enforcer.AddRepo(user.Active.Did, domain, p) 535 if err != nil { 536 l.Error("acl setup failed", "err", err) 537 s.pages.Notice(w, "repo", "Failed to set up repository permissions.") 538 return 539 } 540 541 err = tx.Commit() 542 if err != nil { 543 l.Error("txn commit failed", "err", err) 544 http.Error(w, err.Error(), http.StatusInternalServerError) 545 return 546 } 547 548 err = s.enforcer.E.SavePolicy() 549 if err != nil { 550 l.Error("acl save failed", "err", err) 551 http.Error(w, err.Error(), http.StatusInternalServerError) 552 return 553 } 554 555 // reset the ATURI because the transaction completed successfully 556 aturi = "" 557 558 s.notifier.NewRepo(r.Context(), repo) 559 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Active.Did, repoName)) 560 } 561} 562 563// this is used to rollback changes made to the PDS 564// 565// it is a no-op if the provided ATURI is empty 566func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error { 567 if aturi == "" { 568 return nil 569 } 570 571 parsed := syntax.ATURI(aturi) 572 573 collection := parsed.Collection().String() 574 repo := parsed.Authority().String() 575 rkey := parsed.RecordKey().String() 576 577 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{ 578 Collection: collection, 579 Repo: repo, 580 Rkey: rkey, 581 }) 582 return err 583} 584 585func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error { 586 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults)) 587 if err != nil { 588 return err 589 } 590 // already present 591 if len(defaultLabels) == len(defaults) { 592 return nil 593 } 594 595 labelDefs, err := models.FetchLabelDefs(r, defaults) 596 if err != nil { 597 return err 598 } 599 600 // Insert each label definition to the database 601 for _, labelDef := range labelDefs { 602 _, err = db.AddLabelDefinition(e, &labelDef) 603 if err != nil { 604 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err) 605 } 606 } 607 608 return nil 609} 610 611func fetchBskyPosts(ctx context.Context, res *idresolver.Resolver, config *config.Config, d *db.DB, logger *slog.Logger) { 612 resolved, err := res.ResolveIdent(context.Background(), consts.TangledDid) 613 if err != nil { 614 logger.Error("failed to resolve tangled.org DID", "err", err) 615 return 616 } 617 618 pdsEndpoint := resolved.PDSEndpoint() 619 if pdsEndpoint == "" { 620 logger.Error("no PDS endpoint found for tangled.sh DID") 621 return 622 } 623 624 session, err := oauth.CreateAppPasswordSession(res, config.Core.AppPassword, consts.TangledDid) 625 if err != nil { 626 logger.Error("failed to create appassword session... skipping fetch", "err", err) 627 return 628 } 629 630 client := xrpc.Client{ 631 Auth: &xrpc.AuthInfo{ 632 AccessJwt: session.AccessJwt, 633 Did: session.Did, 634 }, 635 Host: session.PdsEndpoint, 636 } 637 638 l := log.SubLogger(logger, "bluesky") 639 640 ticker := time.NewTicker(config.Bluesky.UpdateInterval) 641 defer ticker.Stop() 642 643 for { 644 posts, _, err := bsky.FetchPosts(ctx, &client, 20, "") 645 if err != nil { 646 l.Error("failed to fetch bluesky posts", "err", err) 647 } else if err := db.InsertBlueskyPosts(d, posts); err != nil { 648 l.Error("failed to insert bluesky posts", "err", err) 649 } else { 650 l.Info("inserted bluesky posts", "count", len(posts)) 651 } 652 653 select { 654 case <-ticker.C: 655 case <-ctx.Done(): 656 l.Info("stopping bluesky updater") 657 return 658 } 659 } 660}