this repo has no description
1package state 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log" 9 "log/slog" 10 "net/http" 11 "strings" 12 "time" 13 14 comatproto "github.com/bluesky-social/indigo/api/atproto" 15 "github.com/bluesky-social/indigo/atproto/syntax" 16 lexutil "github.com/bluesky-social/indigo/lex/util" 17 securejoin "github.com/cyphar/filepath-securejoin" 18 "github.com/go-chi/chi/v5" 19 "github.com/posthog/posthog-go" 20 "tangled.sh/tangled.sh/core/api/tangled" 21 "tangled.sh/tangled.sh/core/appview" 22 "tangled.sh/tangled.sh/core/appview/cache" 23 "tangled.sh/tangled.sh/core/appview/cache/session" 24 "tangled.sh/tangled.sh/core/appview/config" 25 "tangled.sh/tangled.sh/core/appview/db" 26 "tangled.sh/tangled.sh/core/appview/notify" 27 "tangled.sh/tangled.sh/core/appview/oauth" 28 "tangled.sh/tangled.sh/core/appview/pages" 29 posthogService "tangled.sh/tangled.sh/core/appview/posthog" 30 "tangled.sh/tangled.sh/core/appview/reporesolver" 31 xrpcclient "tangled.sh/tangled.sh/core/appview/xrpcclient" 32 "tangled.sh/tangled.sh/core/eventconsumer" 33 "tangled.sh/tangled.sh/core/idresolver" 34 "tangled.sh/tangled.sh/core/jetstream" 35 tlog "tangled.sh/tangled.sh/core/log" 36 "tangled.sh/tangled.sh/core/rbac" 37 "tangled.sh/tangled.sh/core/tid" 38 // xrpcerr "tangled.sh/tangled.sh/core/xrpc/errors" 39) 40 41type State struct { 42 db *db.DB 43 notifier notify.Notifier 44 oauth *oauth.OAuth 45 enforcer *rbac.Enforcer 46 pages *pages.Pages 47 sess *session.SessionStore 48 idResolver *idresolver.Resolver 49 posthog posthog.Client 50 jc *jetstream.JetstreamClient 51 config *config.Config 52 repoResolver *reporesolver.RepoResolver 53 knotstream *eventconsumer.Consumer 54 spindlestream *eventconsumer.Consumer 55 logger *slog.Logger 56} 57 58func Make(ctx context.Context, config *config.Config) (*State, error) { 59 d, err := db.Make(config.Core.DbPath) 60 if err != nil { 61 return nil, fmt.Errorf("failed to create db: %w", err) 62 } 63 64 enforcer, err := rbac.NewEnforcer(config.Core.DbPath) 65 if err != nil { 66 return nil, fmt.Errorf("failed to create enforcer: %w", err) 67 } 68 69 res, err := idresolver.RedisResolver(config.Redis.ToURL()) 70 if err != nil { 71 log.Printf("failed to create redis resolver: %v", err) 72 res = idresolver.DefaultResolver() 73 } 74 75 pgs := pages.NewPages(config, res) 76 77 cache := cache.New(config.Redis.Addr) 78 sess := session.New(cache) 79 80 oauth := oauth.NewOAuth(config, sess) 81 82 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint}) 83 if err != nil { 84 return nil, fmt.Errorf("failed to create posthog client: %w", err) 85 } 86 87 repoResolver := reporesolver.New(config, enforcer, res, d) 88 89 wrapper := db.DbWrapper{d} 90 jc, err := jetstream.NewJetstreamClient( 91 config.Jetstream.Endpoint, 92 "appview", 93 []string{ 94 tangled.GraphFollowNSID, 95 tangled.FeedStarNSID, 96 tangled.PublicKeyNSID, 97 tangled.RepoArtifactNSID, 98 tangled.ActorProfileNSID, 99 tangled.SpindleMemberNSID, 100 tangled.SpindleNSID, 101 tangled.StringNSID, 102 tangled.RepoIssueNSID, 103 }, 104 nil, 105 slog.Default(), 106 wrapper, 107 false, 108 109 // in-memory filter is inapplicalble to appview so 110 // we'll never log dids anyway. 111 false, 112 ) 113 if err != nil { 114 return nil, fmt.Errorf("failed to create jetstream client: %w", err) 115 } 116 117 ingester := appview.Ingester{ 118 Db: wrapper, 119 Enforcer: enforcer, 120 IdResolver: res, 121 Config: config, 122 Logger: tlog.New("ingester"), 123 } 124 err = jc.StartJetstream(ctx, ingester.Ingest()) 125 if err != nil { 126 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 127 } 128 129 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog) 130 if err != nil { 131 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err) 132 } 133 knotstream.Start(ctx) 134 135 spindlestream, err := Spindlestream(ctx, config, d, enforcer) 136 if err != nil { 137 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err) 138 } 139 spindlestream.Start(ctx) 140 141 var notifiers []notify.Notifier 142 if !config.Core.Dev { 143 notifiers = append(notifiers, posthogService.NewPosthogNotifier(posthog)) 144 } 145 notifier := notify.NewMergedNotifier(notifiers...) 146 147 state := &State{ 148 d, 149 notifier, 150 oauth, 151 enforcer, 152 pgs, 153 sess, 154 res, 155 posthog, 156 jc, 157 config, 158 repoResolver, 159 knotstream, 160 spindlestream, 161 slog.Default(), 162 } 163 164 return state, nil 165} 166 167func (s *State) Favicon(w http.ResponseWriter, r *http.Request) { 168 w.Header().Set("Content-Type", "image/svg+xml") 169 w.Header().Set("Cache-Control", "public, max-age=31536000") // one year 170 w.Header().Set("ETag", `"favicon-svg-v1"`) 171 172 if match := r.Header.Get("If-None-Match"); match == `"favicon-svg-v1"` { 173 w.WriteHeader(http.StatusNotModified) 174 return 175 } 176 177 s.pages.Favicon(w) 178} 179 180func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) { 181 user := s.oauth.GetUser(r) 182 s.pages.TermsOfService(w, pages.TermsOfServiceParams{ 183 LoggedInUser: user, 184 }) 185} 186 187func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) { 188 user := s.oauth.GetUser(r) 189 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{ 190 LoggedInUser: user, 191 }) 192} 193 194func (s *State) Timeline(w http.ResponseWriter, r *http.Request) { 195 user := s.oauth.GetUser(r) 196 197 timeline, err := db.MakeTimeline(s.db) 198 if err != nil { 199 log.Println(err) 200 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.") 201 } 202 203 repos, err := db.GetTopStarredReposLastWeek(s.db) 204 if err != nil { 205 log.Println(err) 206 s.pages.Notice(w, "topstarredrepos", "Unable to load.") 207 return 208 } 209 210 s.pages.Timeline(w, pages.TimelineParams{ 211 LoggedInUser: user, 212 Timeline: timeline, 213 Repos: repos, 214 }) 215} 216 217func (s *State) Keys(w http.ResponseWriter, r *http.Request) { 218 user := chi.URLParam(r, "user") 219 user = strings.TrimPrefix(user, "@") 220 221 if user == "" { 222 w.WriteHeader(http.StatusBadRequest) 223 return 224 } 225 226 id, err := s.idResolver.ResolveIdent(r.Context(), user) 227 if err != nil { 228 w.WriteHeader(http.StatusInternalServerError) 229 return 230 } 231 232 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String()) 233 if err != nil { 234 w.WriteHeader(http.StatusNotFound) 235 return 236 } 237 238 if len(pubKeys) == 0 { 239 w.WriteHeader(http.StatusNotFound) 240 return 241 } 242 243 for _, k := range pubKeys { 244 key := strings.TrimRight(k.Key, "\n") 245 w.Write([]byte(fmt.Sprintln(key))) 246 } 247} 248 249func validateRepoName(name string) error { 250 // check for path traversal attempts 251 if name == "." || name == ".." || 252 strings.Contains(name, "/") || strings.Contains(name, "\\") { 253 return fmt.Errorf("Repository name contains invalid path characters") 254 } 255 256 // check for sequences that could be used for traversal when normalized 257 if strings.Contains(name, "./") || strings.Contains(name, "../") || 258 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") { 259 return fmt.Errorf("Repository name contains invalid path sequence") 260 } 261 262 // then continue with character validation 263 for _, char := range name { 264 if !((char >= 'a' && char <= 'z') || 265 (char >= 'A' && char <= 'Z') || 266 (char >= '0' && char <= '9') || 267 char == '-' || char == '_' || char == '.') { 268 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores") 269 } 270 } 271 272 // additional check to prevent multiple sequential dots 273 if strings.Contains(name, "..") { 274 return fmt.Errorf("Repository name cannot contain sequential dots") 275 } 276 277 // if all checks pass 278 return nil 279} 280 281func stripGitExt(name string) string { 282 return strings.TrimSuffix(name, ".git") 283} 284 285func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) { 286 switch r.Method { 287 case http.MethodGet: 288 user := s.oauth.GetUser(r) 289 knots, err := s.enforcer.GetKnotsForUser(user.Did) 290 if err != nil { 291 s.pages.Notice(w, "repo", "Invalid user account.") 292 return 293 } 294 295 s.pages.NewRepo(w, pages.NewRepoParams{ 296 LoggedInUser: user, 297 Knots: knots, 298 }) 299 300 case http.MethodPost: 301 l := s.logger.With("handler", "NewRepo") 302 303 user := s.oauth.GetUser(r) 304 l = l.With("did", user.Did) 305 l = l.With("handle", user.Handle) 306 307 // form validation 308 domain := r.FormValue("domain") 309 if domain == "" { 310 s.pages.Notice(w, "repo", "Invalid form submission&mdash;missing knot domain.") 311 return 312 } 313 l = l.With("knot", domain) 314 315 repoName := r.FormValue("name") 316 if repoName == "" { 317 s.pages.Notice(w, "repo", "Repository name cannot be empty.") 318 return 319 } 320 321 if err := validateRepoName(repoName); err != nil { 322 s.pages.Notice(w, "repo", err.Error()) 323 return 324 } 325 repoName = stripGitExt(repoName) 326 l = l.With("repoName", repoName) 327 328 defaultBranch := r.FormValue("branch") 329 if defaultBranch == "" { 330 defaultBranch = "main" 331 } 332 l = l.With("defaultBranch", defaultBranch) 333 334 description := r.FormValue("description") 335 336 // ACL validation 337 ok, err := s.enforcer.E.Enforce(user.Did, domain, domain, "repo:create") 338 if err != nil || !ok { 339 l.Info("unauthorized") 340 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 341 return 342 } 343 344 // Check for existing repos 345 existingRepo, err := db.GetRepo(s.db, user.Did, repoName) 346 if err == nil && existingRepo != nil { 347 l.Info("repo exists") 348 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot)) 349 return 350 } 351 352 // create atproto record for this repo 353 rkey := tid.TID() 354 repo := &db.Repo{ 355 Did: user.Did, 356 Name: repoName, 357 Knot: domain, 358 Rkey: rkey, 359 Description: description, 360 } 361 362 xrpcClient, err := s.oauth.AuthorizedClient(r) 363 if err != nil { 364 l.Info("PDS write failed", "err", err) 365 s.pages.Notice(w, "repo", "Failed to write record to PDS.") 366 return 367 } 368 369 createdAt := time.Now().Format(time.RFC3339) 370 atresp, err := xrpcClient.RepoPutRecord(r.Context(), &comatproto.RepoPutRecord_Input{ 371 Collection: tangled.RepoNSID, 372 Repo: user.Did, 373 Rkey: rkey, 374 Record: &lexutil.LexiconTypeDecoder{ 375 Val: &tangled.Repo{ 376 Knot: repo.Knot, 377 Name: repoName, 378 CreatedAt: createdAt, 379 Owner: user.Did, 380 }}, 381 }) 382 if err != nil { 383 l.Info("PDS write failed", "err", err) 384 s.pages.Notice(w, "repo", "Failed to announce repository creation.") 385 return 386 } 387 388 aturi := atresp.Uri 389 l = l.With("aturi", aturi) 390 l.Info("wrote to PDS") 391 392 tx, err := s.db.BeginTx(r.Context(), nil) 393 if err != nil { 394 l.Info("txn failed", "err", err) 395 s.pages.Notice(w, "repo", "Failed to save repository information.") 396 return 397 } 398 399 // The rollback function reverts a few things on failure: 400 // - the pending txn 401 // - the ACLs 402 // - the atproto record created 403 rollback := func() { 404 err1 := tx.Rollback() 405 err2 := s.enforcer.E.LoadPolicy() 406 err3 := rollbackRecord(context.Background(), aturi, xrpcClient) 407 408 // ignore txn complete errors, this is okay 409 if errors.Is(err1, sql.ErrTxDone) { 410 err1 = nil 411 } 412 413 if errs := errors.Join(err1, err2, err3); errs != nil { 414 l.Error("failed to rollback changes", "errs", errs) 415 return 416 } 417 } 418 defer rollback() 419 420 client, err := s.oauth.ServiceClient( 421 r, 422 oauth.WithService(domain), 423 oauth.WithLxm(tangled.RepoCreateNSID), 424 oauth.WithDev(s.config.Core.Dev), 425 ) 426 if err != nil { 427 l.Error("service auth failed", "err", err) 428 s.pages.Notice(w, "repo", "Failed to reach PDS.") 429 return 430 } 431 432 xe := tangled.RepoCreate( 433 r.Context(), 434 client, 435 &tangled.RepoCreate_Input{ 436 Rkey: rkey, 437 }, 438 ) 439 if err := xrpcclient.HandleXrpcErr(xe); err != nil { 440 l.Error("xrpc error", "xe", xe) 441 s.pages.Notice(w, "repo", err.Error()) 442 return 443 } 444 445 err = db.AddRepo(tx, repo) 446 if err != nil { 447 l.Error("db write failed", "err", err) 448 s.pages.Notice(w, "repo", "Failed to save repository information.") 449 return 450 } 451 452 // acls 453 p, _ := securejoin.SecureJoin(user.Did, repoName) 454 err = s.enforcer.AddRepo(user.Did, domain, p) 455 if err != nil { 456 l.Error("acl setup failed", "err", err) 457 s.pages.Notice(w, "repo", "Failed to set up repository permissions.") 458 return 459 } 460 461 err = tx.Commit() 462 if err != nil { 463 l.Error("txn commit failed", "err", err) 464 http.Error(w, err.Error(), http.StatusInternalServerError) 465 return 466 } 467 468 err = s.enforcer.E.SavePolicy() 469 if err != nil { 470 l.Error("acl save failed", "err", err) 471 http.Error(w, err.Error(), http.StatusInternalServerError) 472 return 473 } 474 475 // reset the ATURI because the transaction completed successfully 476 aturi = "" 477 478 s.notifier.NewRepo(r.Context(), repo) 479 s.pages.HxLocation(w, fmt.Sprintf("/@%s/%s", user.Handle, repoName)) 480 } 481} 482 483// this is used to rollback changes made to the PDS 484// 485// it is a no-op if the provided ATURI is empty 486func rollbackRecord(ctx context.Context, aturi string, xrpcc *xrpcclient.Client) error { 487 if aturi == "" { 488 return nil 489 } 490 491 parsed := syntax.ATURI(aturi) 492 493 collection := parsed.Collection().String() 494 repo := parsed.Authority().String() 495 rkey := parsed.RecordKey().String() 496 497 _, err := xrpcc.RepoDeleteRecord(ctx, &comatproto.RepoDeleteRecord_Input{ 498 Collection: collection, 499 Repo: repo, 500 Rkey: rkey, 501 }) 502 return err 503}