this repo has no description
1package state 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log" 9 "log/slog" 10 "net/http" 11 "strings" 12 "time" 13 14 comatproto "github.com/bluesky-social/indigo/api/atproto" 15 "github.com/bluesky-social/indigo/atproto/syntax" 16 lexutil "github.com/bluesky-social/indigo/lex/util" 17 securejoin "github.com/cyphar/filepath-securejoin" 18 "github.com/go-chi/chi/v5" 19 "github.com/posthog/posthog-go" 20 "tangled.org/core/api/tangled" 21 "tangled.org/core/appview" 22 "tangled.org/core/appview/cache" 23 "tangled.org/core/appview/cache/session" 24 "tangled.org/core/appview/config" 25 "tangled.org/core/appview/db" 26 "tangled.org/core/appview/models" 27 "tangled.org/core/appview/notify" 28 dbnotify "tangled.org/core/appview/notify/db" 29 phnotify "tangled.org/core/appview/notify/posthog" 30 "tangled.org/core/appview/oauth" 31 "tangled.org/core/appview/pages" 32 "tangled.org/core/appview/reporesolver" 33 "tangled.org/core/appview/validator" 34 xrpcclient "tangled.org/core/appview/xrpcclient" 35 "tangled.org/core/eventconsumer" 36 "tangled.org/core/idresolver" 37 "tangled.org/core/jetstream" 38 tlog "tangled.org/core/log" 39 "tangled.org/core/rbac" 40 "tangled.org/core/tid" 41) 42 43type State struct { 44 db *db.DB 45 notifier notify.Notifier 46 oauth *oauth.OAuth 47 enforcer *rbac.Enforcer 48 pages *pages.Pages 49 sess *session.SessionStore 50 idResolver *idresolver.Resolver 51 posthog posthog.Client 52 jc *jetstream.JetstreamClient 53 config *config.Config 54 repoResolver *reporesolver.RepoResolver 55 knotstream *eventconsumer.Consumer 56 spindlestream *eventconsumer.Consumer 57 logger *slog.Logger 58 validator *validator.Validator 59} 60 61func Make(ctx context.Context, config *config.Config) (*State, error) { 62 d, err := db.Make(config.Core.DbPath) 63 if err != nil { 64 return nil, fmt.Errorf("failed to create db: %w", err) 65 } 66 67 enforcer, err := rbac.NewEnforcer(config.Core.DbPath) 68 if err != nil { 69 return nil, fmt.Errorf("failed to create enforcer: %w", err) 70 } 71 72 res, err := idresolver.RedisResolver(config.Redis.ToURL()) 73 if err != nil { 74 log.Printf("failed to create redis resolver: %v", err) 75 res = idresolver.DefaultResolver() 76 } 77 78 pgs := pages.NewPages(config, res) 79 cache := cache.New(config.Redis.Addr) 80 sess := session.New(cache) 81 oauth := oauth.NewOAuth(config, sess) 82 validator := validator.New(d, res, enforcer) 83 84 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint}) 85 if err != nil { 86 return nil, fmt.Errorf("failed to create posthog client: %w", err) 87 } 88 89 repoResolver := reporesolver.New(config, enforcer, res, d) 90 91 wrapper := db.DbWrapper{Execer: d} 92 jc, err := jetstream.NewJetstreamClient( 93 config.Jetstream.Endpoint, 94 "appview", 95 []string{ 96 tangled.GraphFollowNSID, 97 tangled.FeedStarNSID, 98 tangled.PublicKeyNSID, 99 tangled.RepoArtifactNSID, 100 tangled.ActorProfileNSID, 101 tangled.SpindleMemberNSID, 102 tangled.SpindleNSID, 103 tangled.StringNSID, 104 tangled.RepoIssueNSID, 105 tangled.RepoIssueCommentNSID, 106 tangled.LabelDefinitionNSID, 107 tangled.LabelOpNSID, 108 }, 109 nil, 110 slog.Default(), 111 wrapper, 112 false, 113 114 // in-memory filter is inapplicalble to appview so 115 // we'll never log dids anyway. 116 false, 117 ) 118 if err != nil { 119 return nil, fmt.Errorf("failed to create jetstream client: %w", err) 120 } 121 122 if err := BackfillDefaultDefs(d, res); err != nil { 123 return nil, fmt.Errorf("failed to backfill default label defs: %w", err) 124 } 125 126 ingester := appview.Ingester{ 127 Db: wrapper, 128 Enforcer: enforcer, 129 IdResolver: res, 130 Config: config, 131 Logger: tlog.New("ingester"), 132 Validator: validator, 133 } 134 err = jc.StartJetstream(ctx, ingester.Ingest()) 135 if err != nil { 136 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 137 } 138 139 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog) 140 if err != nil { 141 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err) 142 } 143 knotstream.Start(ctx) 144 145 spindlestream, err := Spindlestream(ctx, config, d, enforcer) 146 if err != nil { 147 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err) 148 } 149 spindlestream.Start(ctx) 150 151 var notifiers []notify.Notifier 152 153 // Always add the database notifier 154 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res)) 155 156 // Add other notifiers in production only 157 if !config.Core.Dev { 158 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog)) 159 } 160 notifier := notify.NewMergedNotifier(notifiers...) 161 162 state := &State{ 163 d, 164 notifier, 165 oauth, 166 enforcer, 167 pgs, 168 sess, 169 res, 170 posthog, 171 jc, 172 config, 173 repoResolver, 174 knotstream, 175 spindlestream, 176 slog.Default(), 177 validator, 178 } 179 180 return state, nil 181} 182 183func (s *State) Close() error { 184 // other close up logic goes here 185 return s.db.Close() 186} 187 188func (s *State) Favicon(w http.ResponseWriter, r *http.Request) { 189 w.Header().Set("Content-Type", "image/svg+xml") 190 w.Header().Set("Cache-Control", "public, max-age=31536000") // one year 191 w.Header().Set("ETag", `"favicon-svg-v1"`) 192 193 if match := r.Header.Get("If-None-Match"); match == `"favicon-svg-v1"` { 194 w.WriteHeader(http.StatusNotModified) 195 return 196 } 197 198 s.pages.Favicon(w) 199} 200 201// https://developer.mozilla.org/en-US/docs/Web/Progressive_web_apps/Manifest 202const manifestJson = `{ 203 "name": "tangled", 204 "description": "tightly-knit social coding.", 205 "icons": [ 206 { 207 "src": "/favicon.svg", 208 "sizes": "144x144" 209 } 210 ], 211 "start_url": "/", 212 "id": "org.tangled", 213 214 "display": "standalone", 215 "background_color": "#111827", 216 "theme_color": "#111827" 217}` 218 219func (p *State) PWAManifest(w http.ResponseWriter, r *http.Request) { 220 w.Header().Set("Content-Type", "application/json") 221 w.Write([]byte(manifestJson)) 222} 223 224func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) { 225 user := s.oauth.GetUser(r) 226 s.pages.TermsOfService(w, pages.TermsOfServiceParams{ 227 LoggedInUser: user, 228 }) 229} 230 231func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) { 232 user := s.oauth.GetUser(r) 233 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{ 234 LoggedInUser: user, 235 }) 236} 237 238func (s *State) Brand(w http.ResponseWriter, r *http.Request) { 239 user := s.oauth.GetUser(r) 240 s.pages.Brand(w, pages.BrandParams{ 241 LoggedInUser: user, 242 }) 243} 244 245func (s *State) HomeOrTimeline(w http.ResponseWriter, r *http.Request) { 246 if s.oauth.GetUser(r) != nil { 247 s.Timeline(w, r) 248 return 249 } 250 s.Home(w, r) 251} 252 253func (s *State) Timeline(w http.ResponseWriter, r *http.Request) { 254 user := s.oauth.GetUser(r) 255 256 var userDid string 257 if user != nil { 258 userDid = user.Did 259 } 260 timeline, err := db.MakeTimeline(s.db, 50, userDid) 261 if err != nil { 262 log.Println(err) 263 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.") 264 } 265 266 repos, err := db.GetTopStarredReposLastWeek(s.db) 267 if err != nil { 268 log.Println(err) 269 s.pages.Notice(w, "topstarredrepos", "Unable to load.") 270 return 271 } 272 273 gfiLabel, err := db.GetLabelDefinition(s.db, db.FilterEq("at_uri", models.LabelGoodFirstIssue)) 274 if err != nil { 275 // non-fatal 276 } 277 278 fmt.Println(s.pages.Timeline(w, pages.TimelineParams{ 279 LoggedInUser: user, 280 Timeline: timeline, 281 Repos: repos, 282 GfiLabel: gfiLabel, 283 })) 284} 285 286func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) { 287 user := s.oauth.GetUser(r) 288 if user == nil { 289 return 290 } 291 292 l := s.logger.With("handler", "UpgradeBanner") 293 l = l.With("did", user.Did) 294 l = l.With("handle", user.Handle) 295 296 regs, err := db.GetRegistrations( 297 s.db, 298 db.FilterEq("did", user.Did), 299 db.FilterEq("needs_upgrade", 1), 300 ) 301 if err != nil { 302 l.Error("non-fatal: failed to get registrations", "err", err) 303 } 304 305 spindles, err := db.GetSpindles( 306 s.db, 307 db.FilterEq("owner", user.Did), 308 db.FilterEq("needs_upgrade", 1), 309 ) 310 if err != nil { 311 l.Error("non-fatal: failed to get spindles", "err", err) 312 } 313 314 if regs == nil && spindles == nil { 315 return 316 } 317 318 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{ 319 Registrations: regs, 320 Spindles: spindles, 321 }) 322} 323 324func (s *State) Home(w http.ResponseWriter, r *http.Request) { 325 timeline, err := db.MakeTimeline(s.db, 5, "") 326 if err != nil { 327 log.Println(err) 328 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.") 329 return 330 } 331 332 repos, err := db.GetTopStarredReposLastWeek(s.db) 333 if err != nil { 334 log.Println(err) 335 s.pages.Notice(w, "topstarredrepos", "Unable to load.") 336 return 337 } 338 339 s.pages.Home(w, pages.TimelineParams{ 340 LoggedInUser: nil, 341 Timeline: timeline, 342 Repos: repos, 343 }) 344} 345 346func (s *State) Keys(w http.ResponseWriter, r *http.Request) { 347 user := chi.URLParam(r, "user") 348 user = strings.TrimPrefix(user, "@") 349 350 if user == "" { 351 w.WriteHeader(http.StatusBadRequest) 352 return 353 } 354 355 id, err := s.idResolver.ResolveIdent(r.Context(), user) 356 if err != nil { 357 w.WriteHeader(http.StatusInternalServerError) 358 return 359 } 360 361 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String()) 362 if err != nil { 363 w.WriteHeader(http.StatusNotFound) 364 return 365 } 366 367 if len(pubKeys) == 0 { 368 w.WriteHeader(http.StatusNotFound) 369 return 370 } 371 372 for _, k := range pubKeys { 373 key := strings.TrimRight(k.Key, "\n") 374 fmt.Fprintln(w, key) 375 } 376} 377 378func validateRepoName(name string) error { 379 // check for path traversal attempts 380 if name == "." || name == ".." || 381 strings.Contains(name, "/") || strings.Contains(name, "\\") { 382 return fmt.Errorf("Repository name contains invalid path characters") 383 } 384 385 // check for sequences that could be used for traversal when normalized 386 if strings.Contains(name, "./") || strings.Contains(name, "../") || 387 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") { 388 return fmt.Errorf("Repository name contains invalid path sequence") 389 } 390 391 // then continue with character validation 392 for _, char := range name { 393 if !((char >= 'a' && char <= 'z') || 394 (char >= 'A' && char <= 'Z') || 395 (char >= '0' && char <= '9') || 396 char == '-' || char == '_' || char == '.') { 397 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores") 398 } 399 } 400 401 // additional check to prevent multiple sequential dots 402 if strings.Contains(name, "..") { 403 return fmt.Errorf("Repository name cannot contain sequential dots") 404 } 405 406 // if all checks pass 407 return nil 408} 409 410func stripGitExt(name string) string { 411 return strings.TrimSuffix(name, ".git") 412} 413 414func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) { 415 switch r.Method { 416 case http.MethodGet: 417 user := s.oauth.GetUser(r) 418 knots, err := s.enforcer.GetKnotsForUser(user.Did) 419 if err != nil { 420 s.pages.Notice(w, "repo", "Invalid user account.") 421 return 422 } 423 424 s.pages.NewRepo(w, pages.NewRepoParams{ 425 LoggedInUser: user, 426 Knots: knots, 427 }) 428 429 case http.MethodPost: 430 l := s.logger.With("handler", "NewRepo") 431 432 user := s.oauth.GetUser(r) 433 l = l.With("did", user.Did) 434 l = l.With("handle", user.Handle) 435 436 // form validation 437 domain := r.FormValue("domain") 438 if domain == "" { 439 s.pages.Notice(w, "repo", "Invalid form submission&mdash;missing knot domain.") 440 return 441 } 442 l = l.With("knot", domain) 443 444 repoName := r.FormValue("name") 445 if repoName == "" { 446 s.pages.Notice(w, "repo", "Repository name cannot be empty.") 447 return 448 } 449 450 if err := validateRepoName(repoName); err != nil { 451 s.pages.Notice(w, "repo", err.Error()) 452 return 453 } 454 repoName = stripGitExt(repoName) 455 l = l.With("repoName", repoName) 456 457 defaultBranch := r.FormValue("branch") 458 if defaultBranch == "" { 459 defaultBranch = "main" 460 } 461 l = l.With("defaultBranch", defaultBranch) 462 463 description := r.FormValue("description") 464 465 // ACL validation 466 ok, err := s.enforcer.E.Enforce(user.Did, domain, domain, "repo:create") 467 if err != nil || !ok { 468 l.Info("unauthorized") 469 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 470 return 471 } 472 473 // Check for existing repos 474 existingRepo, err := db.GetRepo( 475 s.db, 476 db.FilterEq("did", user.Did), 477 db.FilterEq("name", repoName), 478 ) 479 if err == nil && existingRepo != nil { 480 l.Info("repo exists") 481 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot)) 482 return 483 } 484 485 // create atproto record for this repo 486 rkey := tid.TID() 487 repo := &models.Repo{ 488 Did: user.Did, 489 Name: repoName, 490 Knot: domain, 491 Rkey: rkey, 492 Description: description, 493 Created: time.Now(), 494 Labels: models.DefaultLabelDefs(), 495 } 496 record := repo.AsRecord() 497 498 xrpcClient, err := s.oauth.AuthorizedClient(r) 499 if err != nil { 500 l.Info("PDS write failed", "err", err) 501 s.pages.Notice(w, "repo", "Failed to write record to PDS.") 502 return 503 } 504 505 atresp, err := xrpcClient.RepoPutRecord(r.Context(), &comatproto.RepoPutRecord_Input{ 506 Collection: tangled.RepoNSID, 507 Repo: user.Did, 508 Rkey: rkey, 509 Record: &lexutil.LexiconTypeDecoder{ 510 Val: &record, 511 }, 512 }) 513 if err != nil { 514 l.Info("PDS write failed", "err", err) 515 s.pages.Notice(w, "repo", "Failed to announce repository creation.") 516 return 517 } 518 519 aturi := atresp.Uri 520 l = l.With("aturi", aturi) 521 l.Info("wrote to PDS") 522 523 tx, err := s.db.BeginTx(r.Context(), nil) 524 if err != nil { 525 l.Info("txn failed", "err", err) 526 s.pages.Notice(w, "repo", "Failed to save repository information.") 527 return 528 } 529 530 // The rollback function reverts a few things on failure: 531 // - the pending txn 532 // - the ACLs 533 // - the atproto record created 534 rollback := func() { 535 err1 := tx.Rollback() 536 err2 := s.enforcer.E.LoadPolicy() 537 err3 := rollbackRecord(context.Background(), aturi, xrpcClient) 538 539 // ignore txn complete errors, this is okay 540 if errors.Is(err1, sql.ErrTxDone) { 541 err1 = nil 542 } 543 544 if errs := errors.Join(err1, err2, err3); errs != nil { 545 l.Error("failed to rollback changes", "errs", errs) 546 return 547 } 548 } 549 defer rollback() 550 551 client, err := s.oauth.ServiceClient( 552 r, 553 oauth.WithService(domain), 554 oauth.WithLxm(tangled.RepoCreateNSID), 555 oauth.WithDev(s.config.Core.Dev), 556 ) 557 if err != nil { 558 l.Error("service auth failed", "err", err) 559 s.pages.Notice(w, "repo", "Failed to reach PDS.") 560 return 561 } 562 563 xe := tangled.RepoCreate( 564 r.Context(), 565 client, 566 &tangled.RepoCreate_Input{ 567 Rkey: rkey, 568 }, 569 ) 570 if err := xrpcclient.HandleXrpcErr(xe); err != nil { 571 l.Error("xrpc error", "xe", xe) 572 s.pages.Notice(w, "repo", err.Error()) 573 return 574 } 575 576 err = db.AddRepo(tx, repo) 577 if err != nil { 578 l.Error("db write failed", "err", err) 579 s.pages.Notice(w, "repo", "Failed to save repository information.") 580 return 581 } 582 583 // acls 584 p, _ := securejoin.SecureJoin(user.Did, repoName) 585 err = s.enforcer.AddRepo(user.Did, domain, p) 586 if err != nil { 587 l.Error("acl setup failed", "err", err) 588 s.pages.Notice(w, "repo", "Failed to set up repository permissions.") 589 return 590 } 591 592 err = tx.Commit() 593 if err != nil { 594 l.Error("txn commit failed", "err", err) 595 http.Error(w, err.Error(), http.StatusInternalServerError) 596 return 597 } 598 599 err = s.enforcer.E.SavePolicy() 600 if err != nil { 601 l.Error("acl save failed", "err", err) 602 http.Error(w, err.Error(), http.StatusInternalServerError) 603 return 604 } 605 606 // reset the ATURI because the transaction completed successfully 607 aturi = "" 608 609 s.notifier.NewRepo(r.Context(), repo) 610 s.pages.HxLocation(w, fmt.Sprintf("/@%s/%s", user.Handle, repoName)) 611 } 612} 613 614// this is used to rollback changes made to the PDS 615// 616// it is a no-op if the provided ATURI is empty 617func rollbackRecord(ctx context.Context, aturi string, xrpcc *xrpcclient.Client) error { 618 if aturi == "" { 619 return nil 620 } 621 622 parsed := syntax.ATURI(aturi) 623 624 collection := parsed.Collection().String() 625 repo := parsed.Authority().String() 626 rkey := parsed.RecordKey().String() 627 628 _, err := xrpcc.RepoDeleteRecord(ctx, &comatproto.RepoDeleteRecord_Input{ 629 Collection: collection, 630 Repo: repo, 631 Rkey: rkey, 632 }) 633 return err 634} 635 636func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver) error { 637 defaults := models.DefaultLabelDefs() 638 defaultLabels, err := db.GetLabelDefinitions(e, db.FilterIn("at_uri", defaults)) 639 if err != nil { 640 return err 641 } 642 // already present 643 if len(defaultLabels) == len(defaults) { 644 return nil 645 } 646 647 labelDefs, err := models.FetchDefaultDefs(r) 648 if err != nil { 649 return err 650 } 651 652 // Insert each label definition to the database 653 for _, labelDef := range labelDefs { 654 _, err = db.AddLabelDefinition(e, &labelDef) 655 if err != nil { 656 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err) 657 } 658 } 659 660 return nil 661}