this repo has no description
1package state 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log" 9 "log/slog" 10 "net/http" 11 "strings" 12 "time" 13 14 comatproto "github.com/bluesky-social/indigo/api/atproto" 15 "github.com/bluesky-social/indigo/atproto/syntax" 16 lexutil "github.com/bluesky-social/indigo/lex/util" 17 securejoin "github.com/cyphar/filepath-securejoin" 18 "github.com/go-chi/chi/v5" 19 "github.com/posthog/posthog-go" 20 "tangled.sh/tangled.sh/core/api/tangled" 21 "tangled.sh/tangled.sh/core/appview" 22 "tangled.sh/tangled.sh/core/appview/cache" 23 "tangled.sh/tangled.sh/core/appview/cache/session" 24 "tangled.sh/tangled.sh/core/appview/config" 25 "tangled.sh/tangled.sh/core/appview/db" 26 "tangled.sh/tangled.sh/core/appview/notify" 27 "tangled.sh/tangled.sh/core/appview/oauth" 28 "tangled.sh/tangled.sh/core/appview/pages" 29 posthogService "tangled.sh/tangled.sh/core/appview/posthog" 30 "tangled.sh/tangled.sh/core/appview/reporesolver" 31 "tangled.sh/tangled.sh/core/appview/validator" 32 xrpcclient "tangled.sh/tangled.sh/core/appview/xrpcclient" 33 "tangled.sh/tangled.sh/core/eventconsumer" 34 "tangled.sh/tangled.sh/core/idresolver" 35 "tangled.sh/tangled.sh/core/jetstream" 36 tlog "tangled.sh/tangled.sh/core/log" 37 "tangled.sh/tangled.sh/core/rbac" 38 "tangled.sh/tangled.sh/core/tid" 39 // xrpcerr "tangled.sh/tangled.sh/core/xrpc/errors" 40) 41 42type State struct { 43 db *db.DB 44 notifier notify.Notifier 45 oauth *oauth.OAuth 46 enforcer *rbac.Enforcer 47 pages *pages.Pages 48 sess *session.SessionStore 49 idResolver *idresolver.Resolver 50 posthog posthog.Client 51 jc *jetstream.JetstreamClient 52 config *config.Config 53 repoResolver *reporesolver.RepoResolver 54 knotstream *eventconsumer.Consumer 55 spindlestream *eventconsumer.Consumer 56 logger *slog.Logger 57 validator *validator.Validator 58} 59 60func Make(ctx context.Context, config *config.Config) (*State, error) { 61 d, err := db.Make(config.Core.DbPath) 62 if err != nil { 63 return nil, fmt.Errorf("failed to create db: %w", err) 64 } 65 66 enforcer, err := rbac.NewEnforcer(config.Core.DbPath) 67 if err != nil { 68 return nil, fmt.Errorf("failed to create enforcer: %w", err) 69 } 70 71 res, err := idresolver.RedisResolver(config.Redis.ToURL()) 72 if err != nil { 73 log.Printf("failed to create redis resolver: %v", err) 74 res = idresolver.DefaultResolver() 75 } 76 77 pgs := pages.NewPages(config, res) 78 cache := cache.New(config.Redis.Addr) 79 sess := session.New(cache) 80 oauth := oauth.NewOAuth(config, sess) 81 validator := validator.New(d) 82 83 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint}) 84 if err != nil { 85 return nil, fmt.Errorf("failed to create posthog client: %w", err) 86 } 87 88 repoResolver := reporesolver.New(config, enforcer, res, d) 89 90 wrapper := db.DbWrapper{d} 91 jc, err := jetstream.NewJetstreamClient( 92 config.Jetstream.Endpoint, 93 "appview", 94 []string{ 95 tangled.GraphFollowNSID, 96 tangled.FeedStarNSID, 97 tangled.PublicKeyNSID, 98 tangled.RepoArtifactNSID, 99 tangled.ActorProfileNSID, 100 tangled.SpindleMemberNSID, 101 tangled.SpindleNSID, 102 tangled.StringNSID, 103 tangled.RepoIssueNSID, 104 tangled.RepoIssueCommentNSID, 105 }, 106 nil, 107 slog.Default(), 108 wrapper, 109 false, 110 111 // in-memory filter is inapplicalble to appview so 112 // we'll never log dids anyway. 113 false, 114 ) 115 if err != nil { 116 return nil, fmt.Errorf("failed to create jetstream client: %w", err) 117 } 118 119 ingester := appview.Ingester{ 120 Db: wrapper, 121 Enforcer: enforcer, 122 IdResolver: res, 123 Config: config, 124 Logger: tlog.New("ingester"), 125 Validator: validator, 126 } 127 err = jc.StartJetstream(ctx, ingester.Ingest()) 128 if err != nil { 129 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 130 } 131 132 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog) 133 if err != nil { 134 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err) 135 } 136 knotstream.Start(ctx) 137 138 spindlestream, err := Spindlestream(ctx, config, d, enforcer) 139 if err != nil { 140 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err) 141 } 142 spindlestream.Start(ctx) 143 144 var notifiers []notify.Notifier 145 if !config.Core.Dev { 146 notifiers = append(notifiers, posthogService.NewPosthogNotifier(posthog)) 147 } 148 notifier := notify.NewMergedNotifier(notifiers...) 149 150 state := &State{ 151 d, 152 notifier, 153 oauth, 154 enforcer, 155 pgs, 156 sess, 157 res, 158 posthog, 159 jc, 160 config, 161 repoResolver, 162 knotstream, 163 spindlestream, 164 slog.Default(), 165 validator, 166 } 167 168 return state, nil 169} 170 171func (s *State) Close() error { 172 // other close up logic goes here 173 return s.db.Close() 174} 175 176func (s *State) Favicon(w http.ResponseWriter, r *http.Request) { 177 w.Header().Set("Content-Type", "image/svg+xml") 178 w.Header().Set("Cache-Control", "public, max-age=31536000") // one year 179 w.Header().Set("ETag", `"favicon-svg-v1"`) 180 181 if match := r.Header.Get("If-None-Match"); match == `"favicon-svg-v1"` { 182 w.WriteHeader(http.StatusNotModified) 183 return 184 } 185 186 s.pages.Favicon(w) 187} 188 189func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) { 190 user := s.oauth.GetUser(r) 191 s.pages.TermsOfService(w, pages.TermsOfServiceParams{ 192 LoggedInUser: user, 193 }) 194} 195 196func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) { 197 user := s.oauth.GetUser(r) 198 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{ 199 LoggedInUser: user, 200 }) 201} 202 203func (s *State) HomeOrTimeline(w http.ResponseWriter, r *http.Request) { 204 if s.oauth.GetUser(r) != nil { 205 s.Timeline(w, r) 206 return 207 } 208 s.Home(w, r) 209} 210 211func (s *State) Timeline(w http.ResponseWriter, r *http.Request) { 212 user := s.oauth.GetUser(r) 213 214 var userDid string 215 if user != nil { 216 userDid = user.Did 217 } 218 timeline, err := db.MakeTimeline(s.db, 50, userDid) 219 if err != nil { 220 log.Println(err) 221 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.") 222 } 223 224 repos, err := db.GetTopStarredReposLastWeek(s.db) 225 if err != nil { 226 log.Println(err) 227 s.pages.Notice(w, "topstarredrepos", "Unable to load.") 228 return 229 } 230 231 s.pages.Timeline(w, pages.TimelineParams{ 232 LoggedInUser: user, 233 Timeline: timeline, 234 Repos: repos, 235 }) 236} 237 238func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) { 239 user := s.oauth.GetUser(r) 240 l := s.logger.With("handler", "UpgradeBanner") 241 l = l.With("did", user.Did) 242 l = l.With("handle", user.Handle) 243 244 regs, err := db.GetRegistrations( 245 s.db, 246 db.FilterEq("did", user.Did), 247 db.FilterEq("needs_upgrade", 1), 248 ) 249 if err != nil { 250 l.Error("non-fatal: failed to get registrations", "err", err) 251 } 252 253 spindles, err := db.GetSpindles( 254 s.db, 255 db.FilterEq("owner", user.Did), 256 db.FilterEq("needs_upgrade", 1), 257 ) 258 if err != nil { 259 l.Error("non-fatal: failed to get spindles", "err", err) 260 } 261 262 if regs == nil && spindles == nil { 263 return 264 } 265 266 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{ 267 Registrations: regs, 268 Spindles: spindles, 269 }) 270} 271 272func (s *State) Home(w http.ResponseWriter, r *http.Request) { 273 timeline, err := db.MakeTimeline(s.db, 5, "") 274 if err != nil { 275 log.Println(err) 276 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.") 277 return 278 } 279 280 repos, err := db.GetTopStarredReposLastWeek(s.db) 281 if err != nil { 282 log.Println(err) 283 s.pages.Notice(w, "topstarredrepos", "Unable to load.") 284 return 285 } 286 287 s.pages.Home(w, pages.TimelineParams{ 288 LoggedInUser: nil, 289 Timeline: timeline, 290 Repos: repos, 291 }) 292} 293 294func (s *State) Keys(w http.ResponseWriter, r *http.Request) { 295 user := chi.URLParam(r, "user") 296 user = strings.TrimPrefix(user, "@") 297 298 if user == "" { 299 w.WriteHeader(http.StatusBadRequest) 300 return 301 } 302 303 id, err := s.idResolver.ResolveIdent(r.Context(), user) 304 if err != nil { 305 w.WriteHeader(http.StatusInternalServerError) 306 return 307 } 308 309 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String()) 310 if err != nil { 311 w.WriteHeader(http.StatusNotFound) 312 return 313 } 314 315 if len(pubKeys) == 0 { 316 w.WriteHeader(http.StatusNotFound) 317 return 318 } 319 320 for _, k := range pubKeys { 321 key := strings.TrimRight(k.Key, "\n") 322 fmt.Fprintln(w, key) 323 } 324} 325 326func validateRepoName(name string) error { 327 // check for path traversal attempts 328 if name == "." || name == ".." || 329 strings.Contains(name, "/") || strings.Contains(name, "\\") { 330 return fmt.Errorf("Repository name contains invalid path characters") 331 } 332 333 // check for sequences that could be used for traversal when normalized 334 if strings.Contains(name, "./") || strings.Contains(name, "../") || 335 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") { 336 return fmt.Errorf("Repository name contains invalid path sequence") 337 } 338 339 // then continue with character validation 340 for _, char := range name { 341 if !((char >= 'a' && char <= 'z') || 342 (char >= 'A' && char <= 'Z') || 343 (char >= '0' && char <= '9') || 344 char == '-' || char == '_' || char == '.') { 345 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores") 346 } 347 } 348 349 // additional check to prevent multiple sequential dots 350 if strings.Contains(name, "..") { 351 return fmt.Errorf("Repository name cannot contain sequential dots") 352 } 353 354 // if all checks pass 355 return nil 356} 357 358func stripGitExt(name string) string { 359 return strings.TrimSuffix(name, ".git") 360} 361 362func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) { 363 switch r.Method { 364 case http.MethodGet: 365 user := s.oauth.GetUser(r) 366 knots, err := s.enforcer.GetKnotsForUser(user.Did) 367 if err != nil { 368 s.pages.Notice(w, "repo", "Invalid user account.") 369 return 370 } 371 372 s.pages.NewRepo(w, pages.NewRepoParams{ 373 LoggedInUser: user, 374 Knots: knots, 375 }) 376 377 case http.MethodPost: 378 l := s.logger.With("handler", "NewRepo") 379 380 user := s.oauth.GetUser(r) 381 l = l.With("did", user.Did) 382 l = l.With("handle", user.Handle) 383 384 // form validation 385 domain := r.FormValue("domain") 386 if domain == "" { 387 s.pages.Notice(w, "repo", "Invalid form submission&mdash;missing knot domain.") 388 return 389 } 390 l = l.With("knot", domain) 391 392 repoName := r.FormValue("name") 393 if repoName == "" { 394 s.pages.Notice(w, "repo", "Repository name cannot be empty.") 395 return 396 } 397 398 if err := validateRepoName(repoName); err != nil { 399 s.pages.Notice(w, "repo", err.Error()) 400 return 401 } 402 repoName = stripGitExt(repoName) 403 l = l.With("repoName", repoName) 404 405 defaultBranch := r.FormValue("branch") 406 if defaultBranch == "" { 407 defaultBranch = "main" 408 } 409 l = l.With("defaultBranch", defaultBranch) 410 411 description := r.FormValue("description") 412 413 // ACL validation 414 ok, err := s.enforcer.E.Enforce(user.Did, domain, domain, "repo:create") 415 if err != nil || !ok { 416 l.Info("unauthorized") 417 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 418 return 419 } 420 421 // Check for existing repos 422 existingRepo, err := db.GetRepo(s.db, user.Did, repoName) 423 if err == nil && existingRepo != nil { 424 l.Info("repo exists") 425 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot)) 426 return 427 } 428 429 // create atproto record for this repo 430 rkey := tid.TID() 431 repo := &db.Repo{ 432 Did: user.Did, 433 Name: repoName, 434 Knot: domain, 435 Rkey: rkey, 436 Description: description, 437 } 438 439 xrpcClient, err := s.oauth.AuthorizedClient(r) 440 if err != nil { 441 l.Info("PDS write failed", "err", err) 442 s.pages.Notice(w, "repo", "Failed to write record to PDS.") 443 return 444 } 445 446 createdAt := time.Now().Format(time.RFC3339) 447 atresp, err := xrpcClient.RepoPutRecord(r.Context(), &comatproto.RepoPutRecord_Input{ 448 Collection: tangled.RepoNSID, 449 Repo: user.Did, 450 Rkey: rkey, 451 Record: &lexutil.LexiconTypeDecoder{ 452 Val: &tangled.Repo{ 453 Knot: repo.Knot, 454 Name: repoName, 455 CreatedAt: createdAt, 456 Owner: user.Did, 457 }}, 458 }) 459 if err != nil { 460 l.Info("PDS write failed", "err", err) 461 s.pages.Notice(w, "repo", "Failed to announce repository creation.") 462 return 463 } 464 465 aturi := atresp.Uri 466 l = l.With("aturi", aturi) 467 l.Info("wrote to PDS") 468 469 tx, err := s.db.BeginTx(r.Context(), nil) 470 if err != nil { 471 l.Info("txn failed", "err", err) 472 s.pages.Notice(w, "repo", "Failed to save repository information.") 473 return 474 } 475 476 // The rollback function reverts a few things on failure: 477 // - the pending txn 478 // - the ACLs 479 // - the atproto record created 480 rollback := func() { 481 err1 := tx.Rollback() 482 err2 := s.enforcer.E.LoadPolicy() 483 err3 := rollbackRecord(context.Background(), aturi, xrpcClient) 484 485 // ignore txn complete errors, this is okay 486 if errors.Is(err1, sql.ErrTxDone) { 487 err1 = nil 488 } 489 490 if errs := errors.Join(err1, err2, err3); errs != nil { 491 l.Error("failed to rollback changes", "errs", errs) 492 return 493 } 494 } 495 defer rollback() 496 497 client, err := s.oauth.ServiceClient( 498 r, 499 oauth.WithService(domain), 500 oauth.WithLxm(tangled.RepoCreateNSID), 501 oauth.WithDev(s.config.Core.Dev), 502 ) 503 if err != nil { 504 l.Error("service auth failed", "err", err) 505 s.pages.Notice(w, "repo", "Failed to reach PDS.") 506 return 507 } 508 509 xe := tangled.RepoCreate( 510 r.Context(), 511 client, 512 &tangled.RepoCreate_Input{ 513 Rkey: rkey, 514 }, 515 ) 516 if err := xrpcclient.HandleXrpcErr(xe); err != nil { 517 l.Error("xrpc error", "xe", xe) 518 s.pages.Notice(w, "repo", err.Error()) 519 return 520 } 521 522 err = db.AddRepo(tx, repo) 523 if err != nil { 524 l.Error("db write failed", "err", err) 525 s.pages.Notice(w, "repo", "Failed to save repository information.") 526 return 527 } 528 529 // acls 530 p, _ := securejoin.SecureJoin(user.Did, repoName) 531 err = s.enforcer.AddRepo(user.Did, domain, p) 532 if err != nil { 533 l.Error("acl setup failed", "err", err) 534 s.pages.Notice(w, "repo", "Failed to set up repository permissions.") 535 return 536 } 537 538 err = tx.Commit() 539 if err != nil { 540 l.Error("txn commit failed", "err", err) 541 http.Error(w, err.Error(), http.StatusInternalServerError) 542 return 543 } 544 545 err = s.enforcer.E.SavePolicy() 546 if err != nil { 547 l.Error("acl save failed", "err", err) 548 http.Error(w, err.Error(), http.StatusInternalServerError) 549 return 550 } 551 552 // reset the ATURI because the transaction completed successfully 553 aturi = "" 554 555 s.notifier.NewRepo(r.Context(), repo) 556 s.pages.HxLocation(w, fmt.Sprintf("/@%s/%s", user.Handle, repoName)) 557 } 558} 559 560// this is used to rollback changes made to the PDS 561// 562// it is a no-op if the provided ATURI is empty 563func rollbackRecord(ctx context.Context, aturi string, xrpcc *xrpcclient.Client) error { 564 if aturi == "" { 565 return nil 566 } 567 568 parsed := syntax.ATURI(aturi) 569 570 collection := parsed.Collection().String() 571 repo := parsed.Authority().String() 572 rkey := parsed.RecordKey().String() 573 574 _, err := xrpcc.RepoDeleteRecord(ctx, &comatproto.RepoDeleteRecord_Input{ 575 Collection: collection, 576 Repo: repo, 577 Rkey: rkey, 578 }) 579 return err 580}