this repo has no description
1package state
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log"
9 "log/slog"
10 "net/http"
11 "strings"
12 "time"
13
14 comatproto "github.com/bluesky-social/indigo/api/atproto"
15 "github.com/bluesky-social/indigo/atproto/syntax"
16 lexutil "github.com/bluesky-social/indigo/lex/util"
17 securejoin "github.com/cyphar/filepath-securejoin"
18 "github.com/go-chi/chi/v5"
19 "github.com/posthog/posthog-go"
20 "tangled.sh/tangled.sh/core/api/tangled"
21 "tangled.sh/tangled.sh/core/appview"
22 "tangled.sh/tangled.sh/core/appview/cache"
23 "tangled.sh/tangled.sh/core/appview/cache/session"
24 "tangled.sh/tangled.sh/core/appview/config"
25 "tangled.sh/tangled.sh/core/appview/db"
26 "tangled.sh/tangled.sh/core/appview/notify"
27 "tangled.sh/tangled.sh/core/appview/oauth"
28 "tangled.sh/tangled.sh/core/appview/pages"
29 posthogService "tangled.sh/tangled.sh/core/appview/posthog"
30 "tangled.sh/tangled.sh/core/appview/reporesolver"
31 xrpcclient "tangled.sh/tangled.sh/core/appview/xrpcclient"
32 "tangled.sh/tangled.sh/core/eventconsumer"
33 "tangled.sh/tangled.sh/core/idresolver"
34 "tangled.sh/tangled.sh/core/jetstream"
35 tlog "tangled.sh/tangled.sh/core/log"
36 "tangled.sh/tangled.sh/core/rbac"
37 "tangled.sh/tangled.sh/core/tid"
38 // xrpcerr "tangled.sh/tangled.sh/core/xrpc/errors"
39)
40
41type State struct {
42 db *db.DB
43 notifier notify.Notifier
44 oauth *oauth.OAuth
45 enforcer *rbac.Enforcer
46 pages *pages.Pages
47 sess *session.SessionStore
48 idResolver *idresolver.Resolver
49 posthog posthog.Client
50 jc *jetstream.JetstreamClient
51 config *config.Config
52 repoResolver *reporesolver.RepoResolver
53 knotstream *eventconsumer.Consumer
54 spindlestream *eventconsumer.Consumer
55 logger *slog.Logger
56}
57
58func Make(ctx context.Context, config *config.Config) (*State, error) {
59 d, err := db.Make(config.Core.DbPath)
60 if err != nil {
61 return nil, fmt.Errorf("failed to create db: %w", err)
62 }
63
64 enforcer, err := rbac.NewEnforcer(config.Core.DbPath)
65 if err != nil {
66 return nil, fmt.Errorf("failed to create enforcer: %w", err)
67 }
68
69 res, err := idresolver.RedisResolver(config.Redis.ToURL())
70 if err != nil {
71 log.Printf("failed to create redis resolver: %v", err)
72 res = idresolver.DefaultResolver()
73 }
74
75 pgs := pages.NewPages(config, res)
76
77 cache := cache.New(config.Redis.Addr)
78 sess := session.New(cache)
79
80 oauth := oauth.NewOAuth(config, sess)
81
82 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint})
83 if err != nil {
84 return nil, fmt.Errorf("failed to create posthog client: %w", err)
85 }
86
87 repoResolver := reporesolver.New(config, enforcer, res, d)
88
89 wrapper := db.DbWrapper{d}
90 jc, err := jetstream.NewJetstreamClient(
91 config.Jetstream.Endpoint,
92 "appview",
93 []string{
94 tangled.GraphFollowNSID,
95 tangled.FeedStarNSID,
96 tangled.PublicKeyNSID,
97 tangled.RepoArtifactNSID,
98 tangled.ActorProfileNSID,
99 tangled.SpindleMemberNSID,
100 tangled.SpindleNSID,
101 tangled.StringNSID,
102 tangled.RepoIssueNSID,
103 tangled.RepoIssueCommentNSID,
104 },
105 nil,
106 slog.Default(),
107 wrapper,
108 false,
109
110 // in-memory filter is inapplicalble to appview so
111 // we'll never log dids anyway.
112 false,
113 )
114 if err != nil {
115 return nil, fmt.Errorf("failed to create jetstream client: %w", err)
116 }
117
118 ingester := appview.Ingester{
119 Db: wrapper,
120 Enforcer: enforcer,
121 IdResolver: res,
122 Config: config,
123 Logger: tlog.New("ingester"),
124 }
125 err = jc.StartJetstream(ctx, ingester.Ingest())
126 if err != nil {
127 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err)
128 }
129
130 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog)
131 if err != nil {
132 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err)
133 }
134 knotstream.Start(ctx)
135
136 spindlestream, err := Spindlestream(ctx, config, d, enforcer)
137 if err != nil {
138 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err)
139 }
140 spindlestream.Start(ctx)
141
142 var notifiers []notify.Notifier
143 if !config.Core.Dev {
144 notifiers = append(notifiers, posthogService.NewPosthogNotifier(posthog))
145 }
146 notifier := notify.NewMergedNotifier(notifiers...)
147
148 state := &State{
149 d,
150 notifier,
151 oauth,
152 enforcer,
153 pgs,
154 sess,
155 res,
156 posthog,
157 jc,
158 config,
159 repoResolver,
160 knotstream,
161 spindlestream,
162 slog.Default(),
163 }
164
165 return state, nil
166}
167
168func (s *State) Close() error {
169 // other close up logic goes here
170 return s.db.Close()
171}
172
173func (s *State) Favicon(w http.ResponseWriter, r *http.Request) {
174 w.Header().Set("Content-Type", "image/svg+xml")
175 w.Header().Set("Cache-Control", "public, max-age=31536000") // one year
176 w.Header().Set("ETag", `"favicon-svg-v1"`)
177
178 if match := r.Header.Get("If-None-Match"); match == `"favicon-svg-v1"` {
179 w.WriteHeader(http.StatusNotModified)
180 return
181 }
182
183 s.pages.Favicon(w)
184}
185
186func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) {
187 user := s.oauth.GetUser(r)
188 s.pages.TermsOfService(w, pages.TermsOfServiceParams{
189 LoggedInUser: user,
190 })
191}
192
193func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) {
194 user := s.oauth.GetUser(r)
195 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{
196 LoggedInUser: user,
197 })
198}
199
200func (s *State) HomeOrTimeline(w http.ResponseWriter, r *http.Request) {
201 if s.oauth.GetUser(r) != nil {
202 s.Timeline(w, r)
203 return
204 }
205 s.Home(w, r)
206}
207
208func (s *State) Timeline(w http.ResponseWriter, r *http.Request) {
209 user := s.oauth.GetUser(r)
210
211 timeline, err := db.MakeTimeline(s.db, 50)
212 if err != nil {
213 log.Println(err)
214 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.")
215 }
216
217 repos, err := db.GetTopStarredReposLastWeek(s.db)
218 if err != nil {
219 log.Println(err)
220 s.pages.Notice(w, "topstarredrepos", "Unable to load.")
221 return
222 }
223
224 s.pages.Timeline(w, pages.TimelineParams{
225 LoggedInUser: user,
226 Timeline: timeline,
227 Repos: repos,
228 })
229}
230
231func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) {
232 user := s.oauth.GetUser(r)
233 l := s.logger.With("handler", "UpgradeBanner")
234 l = l.With("did", user.Did)
235 l = l.With("handle", user.Handle)
236
237 regs, err := db.GetRegistrations(
238 s.db,
239 db.FilterEq("did", user.Did),
240 db.FilterEq("needs_upgrade", 1),
241 )
242 if err != nil {
243 l.Error("non-fatal: failed to get registrations")
244 return
245 }
246
247 spindles, err := db.GetSpindles(
248 s.db,
249 db.FilterEq("did", user.Did),
250 db.FilterEq("needs_upgrade", 1),
251 )
252 if err != nil {
253 l.Error("non-fatal: failed to get spindles")
254 return
255 }
256
257 if regs == nil && spindles == nil {
258 return
259 }
260
261 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{
262 Registrations: regs,
263 Spindles: spindles,
264 })
265}
266
267func (s *State) Home(w http.ResponseWriter, r *http.Request) {
268 timeline, err := db.MakeTimeline(s.db, 5)
269 if err != nil {
270 log.Println(err)
271 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.")
272 return
273 }
274
275 repos, err := db.GetTopStarredReposLastWeek(s.db)
276 if err != nil {
277 log.Println(err)
278 s.pages.Notice(w, "topstarredrepos", "Unable to load.")
279 return
280 }
281
282 s.pages.Home(w, pages.TimelineParams{
283 LoggedInUser: nil,
284 Timeline: timeline,
285 Repos: repos,
286 })
287}
288
289func (s *State) Keys(w http.ResponseWriter, r *http.Request) {
290 user := chi.URLParam(r, "user")
291 user = strings.TrimPrefix(user, "@")
292
293 if user == "" {
294 w.WriteHeader(http.StatusBadRequest)
295 return
296 }
297
298 id, err := s.idResolver.ResolveIdent(r.Context(), user)
299 if err != nil {
300 w.WriteHeader(http.StatusInternalServerError)
301 return
302 }
303
304 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String())
305 if err != nil {
306 w.WriteHeader(http.StatusNotFound)
307 return
308 }
309
310 if len(pubKeys) == 0 {
311 w.WriteHeader(http.StatusNotFound)
312 return
313 }
314
315 for _, k := range pubKeys {
316 key := strings.TrimRight(k.Key, "\n")
317 fmt.Fprintln(w, key)
318 }
319}
320
321func validateRepoName(name string) error {
322 // check for path traversal attempts
323 if name == "." || name == ".." ||
324 strings.Contains(name, "/") || strings.Contains(name, "\\") {
325 return fmt.Errorf("Repository name contains invalid path characters")
326 }
327
328 // check for sequences that could be used for traversal when normalized
329 if strings.Contains(name, "./") || strings.Contains(name, "../") ||
330 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") {
331 return fmt.Errorf("Repository name contains invalid path sequence")
332 }
333
334 // then continue with character validation
335 for _, char := range name {
336 if !((char >= 'a' && char <= 'z') ||
337 (char >= 'A' && char <= 'Z') ||
338 (char >= '0' && char <= '9') ||
339 char == '-' || char == '_' || char == '.') {
340 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores")
341 }
342 }
343
344 // additional check to prevent multiple sequential dots
345 if strings.Contains(name, "..") {
346 return fmt.Errorf("Repository name cannot contain sequential dots")
347 }
348
349 // if all checks pass
350 return nil
351}
352
353func stripGitExt(name string) string {
354 return strings.TrimSuffix(name, ".git")
355}
356
357func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) {
358 switch r.Method {
359 case http.MethodGet:
360 user := s.oauth.GetUser(r)
361 knots, err := s.enforcer.GetKnotsForUser(user.Did)
362 if err != nil {
363 s.pages.Notice(w, "repo", "Invalid user account.")
364 return
365 }
366
367 s.pages.NewRepo(w, pages.NewRepoParams{
368 LoggedInUser: user,
369 Knots: knots,
370 })
371
372 case http.MethodPost:
373 l := s.logger.With("handler", "NewRepo")
374
375 user := s.oauth.GetUser(r)
376 l = l.With("did", user.Did)
377 l = l.With("handle", user.Handle)
378
379 // form validation
380 domain := r.FormValue("domain")
381 if domain == "" {
382 s.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
383 return
384 }
385 l = l.With("knot", domain)
386
387 repoName := r.FormValue("name")
388 if repoName == "" {
389 s.pages.Notice(w, "repo", "Repository name cannot be empty.")
390 return
391 }
392
393 if err := validateRepoName(repoName); err != nil {
394 s.pages.Notice(w, "repo", err.Error())
395 return
396 }
397 repoName = stripGitExt(repoName)
398 l = l.With("repoName", repoName)
399
400 defaultBranch := r.FormValue("branch")
401 if defaultBranch == "" {
402 defaultBranch = "main"
403 }
404 l = l.With("defaultBranch", defaultBranch)
405
406 description := r.FormValue("description")
407
408 // ACL validation
409 ok, err := s.enforcer.E.Enforce(user.Did, domain, domain, "repo:create")
410 if err != nil || !ok {
411 l.Info("unauthorized")
412 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
413 return
414 }
415
416 // Check for existing repos
417 existingRepo, err := db.GetRepo(s.db, user.Did, repoName)
418 if err == nil && existingRepo != nil {
419 l.Info("repo exists")
420 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot))
421 return
422 }
423
424 // create atproto record for this repo
425 rkey := tid.TID()
426 repo := &db.Repo{
427 Did: user.Did,
428 Name: repoName,
429 Knot: domain,
430 Rkey: rkey,
431 Description: description,
432 }
433
434 xrpcClient, err := s.oauth.AuthorizedClient(r)
435 if err != nil {
436 l.Info("PDS write failed", "err", err)
437 s.pages.Notice(w, "repo", "Failed to write record to PDS.")
438 return
439 }
440
441 createdAt := time.Now().Format(time.RFC3339)
442 atresp, err := xrpcClient.RepoPutRecord(r.Context(), &comatproto.RepoPutRecord_Input{
443 Collection: tangled.RepoNSID,
444 Repo: user.Did,
445 Rkey: rkey,
446 Record: &lexutil.LexiconTypeDecoder{
447 Val: &tangled.Repo{
448 Knot: repo.Knot,
449 Name: repoName,
450 CreatedAt: createdAt,
451 Owner: user.Did,
452 }},
453 })
454 if err != nil {
455 l.Info("PDS write failed", "err", err)
456 s.pages.Notice(w, "repo", "Failed to announce repository creation.")
457 return
458 }
459
460 aturi := atresp.Uri
461 l = l.With("aturi", aturi)
462 l.Info("wrote to PDS")
463
464 tx, err := s.db.BeginTx(r.Context(), nil)
465 if err != nil {
466 l.Info("txn failed", "err", err)
467 s.pages.Notice(w, "repo", "Failed to save repository information.")
468 return
469 }
470
471 // The rollback function reverts a few things on failure:
472 // - the pending txn
473 // - the ACLs
474 // - the atproto record created
475 rollback := func() {
476 err1 := tx.Rollback()
477 err2 := s.enforcer.E.LoadPolicy()
478 err3 := rollbackRecord(context.Background(), aturi, xrpcClient)
479
480 // ignore txn complete errors, this is okay
481 if errors.Is(err1, sql.ErrTxDone) {
482 err1 = nil
483 }
484
485 if errs := errors.Join(err1, err2, err3); errs != nil {
486 l.Error("failed to rollback changes", "errs", errs)
487 return
488 }
489 }
490 defer rollback()
491
492 client, err := s.oauth.ServiceClient(
493 r,
494 oauth.WithService(domain),
495 oauth.WithLxm(tangled.RepoCreateNSID),
496 oauth.WithDev(s.config.Core.Dev),
497 )
498 if err != nil {
499 l.Error("service auth failed", "err", err)
500 s.pages.Notice(w, "repo", "Failed to reach PDS.")
501 return
502 }
503
504 xe := tangled.RepoCreate(
505 r.Context(),
506 client,
507 &tangled.RepoCreate_Input{
508 Rkey: rkey,
509 },
510 )
511 if err := xrpcclient.HandleXrpcErr(xe); err != nil {
512 l.Error("xrpc error", "xe", xe)
513 s.pages.Notice(w, "repo", err.Error())
514 return
515 }
516
517 err = db.AddRepo(tx, repo)
518 if err != nil {
519 l.Error("db write failed", "err", err)
520 s.pages.Notice(w, "repo", "Failed to save repository information.")
521 return
522 }
523
524 // acls
525 p, _ := securejoin.SecureJoin(user.Did, repoName)
526 err = s.enforcer.AddRepo(user.Did, domain, p)
527 if err != nil {
528 l.Error("acl setup failed", "err", err)
529 s.pages.Notice(w, "repo", "Failed to set up repository permissions.")
530 return
531 }
532
533 err = tx.Commit()
534 if err != nil {
535 l.Error("txn commit failed", "err", err)
536 http.Error(w, err.Error(), http.StatusInternalServerError)
537 return
538 }
539
540 err = s.enforcer.E.SavePolicy()
541 if err != nil {
542 l.Error("acl save failed", "err", err)
543 http.Error(w, err.Error(), http.StatusInternalServerError)
544 return
545 }
546
547 // reset the ATURI because the transaction completed successfully
548 aturi = ""
549
550 s.notifier.NewRepo(r.Context(), repo)
551 s.pages.HxLocation(w, fmt.Sprintf("/@%s/%s", user.Handle, repoName))
552 }
553}
554
555// this is used to rollback changes made to the PDS
556//
557// it is a no-op if the provided ATURI is empty
558func rollbackRecord(ctx context.Context, aturi string, xrpcc *xrpcclient.Client) error {
559 if aturi == "" {
560 return nil
561 }
562
563 parsed := syntax.ATURI(aturi)
564
565 collection := parsed.Collection().String()
566 repo := parsed.Authority().String()
567 rkey := parsed.RecordKey().String()
568
569 _, err := xrpcc.RepoDeleteRecord(ctx, &comatproto.RepoDeleteRecord_Input{
570 Collection: collection,
571 Repo: repo,
572 Rkey: rkey,
573 })
574 return err
575}