this repo has no description
1package state
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log"
9 "log/slog"
10 "net/http"
11 "strings"
12 "time"
13
14 comatproto "github.com/bluesky-social/indigo/api/atproto"
15 "github.com/bluesky-social/indigo/atproto/syntax"
16 lexutil "github.com/bluesky-social/indigo/lex/util"
17 securejoin "github.com/cyphar/filepath-securejoin"
18 "github.com/go-chi/chi/v5"
19 "github.com/posthog/posthog-go"
20 "tangled.sh/tangled.sh/core/api/tangled"
21 "tangled.sh/tangled.sh/core/appview"
22 "tangled.sh/tangled.sh/core/appview/cache"
23 "tangled.sh/tangled.sh/core/appview/cache/session"
24 "tangled.sh/tangled.sh/core/appview/config"
25 "tangled.sh/tangled.sh/core/appview/db"
26 "tangled.sh/tangled.sh/core/appview/notify"
27 "tangled.sh/tangled.sh/core/appview/oauth"
28 "tangled.sh/tangled.sh/core/appview/pages"
29 posthogService "tangled.sh/tangled.sh/core/appview/posthog"
30 "tangled.sh/tangled.sh/core/appview/reporesolver"
31 "tangled.sh/tangled.sh/core/appview/validator"
32 xrpcclient "tangled.sh/tangled.sh/core/appview/xrpcclient"
33 "tangled.sh/tangled.sh/core/eventconsumer"
34 "tangled.sh/tangled.sh/core/idresolver"
35 "tangled.sh/tangled.sh/core/jetstream"
36 tlog "tangled.sh/tangled.sh/core/log"
37 "tangled.sh/tangled.sh/core/rbac"
38 "tangled.sh/tangled.sh/core/tid"
39 // xrpcerr "tangled.sh/tangled.sh/core/xrpc/errors"
40)
41
42type State struct {
43 db *db.DB
44 notifier notify.Notifier
45 oauth *oauth.OAuth
46 enforcer *rbac.Enforcer
47 pages *pages.Pages
48 sess *session.SessionStore
49 idResolver *idresolver.Resolver
50 posthog posthog.Client
51 jc *jetstream.JetstreamClient
52 config *config.Config
53 repoResolver *reporesolver.RepoResolver
54 knotstream *eventconsumer.Consumer
55 spindlestream *eventconsumer.Consumer
56 logger *slog.Logger
57 validator *validator.Validator
58}
59
60func Make(ctx context.Context, config *config.Config) (*State, error) {
61 d, err := db.Make(config.Core.DbPath)
62 if err != nil {
63 return nil, fmt.Errorf("failed to create db: %w", err)
64 }
65
66 enforcer, err := rbac.NewEnforcer(config.Core.DbPath)
67 if err != nil {
68 return nil, fmt.Errorf("failed to create enforcer: %w", err)
69 }
70
71 res, err := idresolver.RedisResolver(config.Redis.ToURL())
72 if err != nil {
73 log.Printf("failed to create redis resolver: %v", err)
74 res = idresolver.DefaultResolver()
75 }
76
77 pgs := pages.NewPages(config, res)
78 cache := cache.New(config.Redis.Addr)
79 sess := session.New(cache)
80 oauth := oauth.NewOAuth(config, sess)
81 validator := validator.New(d)
82
83 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint})
84 if err != nil {
85 return nil, fmt.Errorf("failed to create posthog client: %w", err)
86 }
87
88 repoResolver := reporesolver.New(config, enforcer, res, d)
89
90 wrapper := db.DbWrapper{d}
91 jc, err := jetstream.NewJetstreamClient(
92 config.Jetstream.Endpoint,
93 "appview",
94 []string{
95 tangled.GraphFollowNSID,
96 tangled.FeedStarNSID,
97 tangled.PublicKeyNSID,
98 tangled.RepoArtifactNSID,
99 tangled.ActorProfileNSID,
100 tangled.SpindleMemberNSID,
101 tangled.SpindleNSID,
102 tangled.StringNSID,
103 tangled.RepoIssueNSID,
104 tangled.RepoIssueCommentNSID,
105 },
106 nil,
107 slog.Default(),
108 wrapper,
109 false,
110
111 // in-memory filter is inapplicalble to appview so
112 // we'll never log dids anyway.
113 false,
114 )
115 if err != nil {
116 return nil, fmt.Errorf("failed to create jetstream client: %w", err)
117 }
118
119 ingester := appview.Ingester{
120 Db: wrapper,
121 Enforcer: enforcer,
122 IdResolver: res,
123 Config: config,
124 Logger: tlog.New("ingester"),
125 Validator: validator,
126 }
127 err = jc.StartJetstream(ctx, ingester.Ingest())
128 if err != nil {
129 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err)
130 }
131
132 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog)
133 if err != nil {
134 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err)
135 }
136 knotstream.Start(ctx)
137
138 spindlestream, err := Spindlestream(ctx, config, d, enforcer)
139 if err != nil {
140 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err)
141 }
142 spindlestream.Start(ctx)
143
144 var notifiers []notify.Notifier
145 if !config.Core.Dev {
146 notifiers = append(notifiers, posthogService.NewPosthogNotifier(posthog))
147 }
148 notifier := notify.NewMergedNotifier(notifiers...)
149
150 state := &State{
151 d,
152 notifier,
153 oauth,
154 enforcer,
155 pgs,
156 sess,
157 res,
158 posthog,
159 jc,
160 config,
161 repoResolver,
162 knotstream,
163 spindlestream,
164 slog.Default(),
165 validator,
166 }
167
168 return state, nil
169}
170
171func (s *State) Close() error {
172 // other close up logic goes here
173 return s.db.Close()
174}
175
176func (s *State) Favicon(w http.ResponseWriter, r *http.Request) {
177 w.Header().Set("Content-Type", "image/svg+xml")
178 w.Header().Set("Cache-Control", "public, max-age=31536000") // one year
179 w.Header().Set("ETag", `"favicon-svg-v1"`)
180
181 if match := r.Header.Get("If-None-Match"); match == `"favicon-svg-v1"` {
182 w.WriteHeader(http.StatusNotModified)
183 return
184 }
185
186 s.pages.Favicon(w)
187}
188
189func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) {
190 user := s.oauth.GetUser(r)
191 s.pages.TermsOfService(w, pages.TermsOfServiceParams{
192 LoggedInUser: user,
193 })
194}
195
196func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) {
197 user := s.oauth.GetUser(r)
198 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{
199 LoggedInUser: user,
200 })
201}
202
203func (s *State) HomeOrTimeline(w http.ResponseWriter, r *http.Request) {
204 if s.oauth.GetUser(r) != nil {
205 s.Timeline(w, r)
206 return
207 }
208 s.Home(w, r)
209}
210
211func (s *State) Timeline(w http.ResponseWriter, r *http.Request) {
212 user := s.oauth.GetUser(r)
213
214 var userDid string
215 if user != nil {
216 userDid = user.Did
217 }
218 timeline, err := db.MakeTimeline(s.db, 50, userDid)
219 if err != nil {
220 log.Println(err)
221 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.")
222 }
223
224 repos, err := db.GetTopStarredReposLastWeek(s.db)
225 if err != nil {
226 log.Println(err)
227 s.pages.Notice(w, "topstarredrepos", "Unable to load.")
228 return
229 }
230
231 s.pages.Timeline(w, pages.TimelineParams{
232 LoggedInUser: user,
233 Timeline: timeline,
234 Repos: repos,
235 })
236}
237
238func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) {
239 user := s.oauth.GetUser(r)
240 l := s.logger.With("handler", "UpgradeBanner")
241 l = l.With("did", user.Did)
242 l = l.With("handle", user.Handle)
243
244 regs, err := db.GetRegistrations(
245 s.db,
246 db.FilterEq("did", user.Did),
247 db.FilterEq("needs_upgrade", 1),
248 )
249 if err != nil {
250 l.Error("non-fatal: failed to get registrations", "err", err)
251 }
252
253 spindles, err := db.GetSpindles(
254 s.db,
255 db.FilterEq("owner", user.Did),
256 db.FilterEq("needs_upgrade", 1),
257 )
258 if err != nil {
259 l.Error("non-fatal: failed to get spindles", "err", err)
260 }
261
262 if regs == nil && spindles == nil {
263 return
264 }
265
266 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{
267 Registrations: regs,
268 Spindles: spindles,
269 })
270}
271
272func (s *State) Home(w http.ResponseWriter, r *http.Request) {
273 timeline, err := db.MakeTimeline(s.db, 5, "")
274 if err != nil {
275 log.Println(err)
276 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.")
277 return
278 }
279
280 repos, err := db.GetTopStarredReposLastWeek(s.db)
281 if err != nil {
282 log.Println(err)
283 s.pages.Notice(w, "topstarredrepos", "Unable to load.")
284 return
285 }
286
287 s.pages.Home(w, pages.TimelineParams{
288 LoggedInUser: nil,
289 Timeline: timeline,
290 Repos: repos,
291 })
292}
293
294func (s *State) Keys(w http.ResponseWriter, r *http.Request) {
295 user := chi.URLParam(r, "user")
296 user = strings.TrimPrefix(user, "@")
297
298 if user == "" {
299 w.WriteHeader(http.StatusBadRequest)
300 return
301 }
302
303 id, err := s.idResolver.ResolveIdent(r.Context(), user)
304 if err != nil {
305 w.WriteHeader(http.StatusInternalServerError)
306 return
307 }
308
309 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String())
310 if err != nil {
311 w.WriteHeader(http.StatusNotFound)
312 return
313 }
314
315 if len(pubKeys) == 0 {
316 w.WriteHeader(http.StatusNotFound)
317 return
318 }
319
320 for _, k := range pubKeys {
321 key := strings.TrimRight(k.Key, "\n")
322 fmt.Fprintln(w, key)
323 }
324}
325
326func validateRepoName(name string) error {
327 // check for path traversal attempts
328 if name == "." || name == ".." ||
329 strings.Contains(name, "/") || strings.Contains(name, "\\") {
330 return fmt.Errorf("Repository name contains invalid path characters")
331 }
332
333 // check for sequences that could be used for traversal when normalized
334 if strings.Contains(name, "./") || strings.Contains(name, "../") ||
335 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") {
336 return fmt.Errorf("Repository name contains invalid path sequence")
337 }
338
339 // then continue with character validation
340 for _, char := range name {
341 if !((char >= 'a' && char <= 'z') ||
342 (char >= 'A' && char <= 'Z') ||
343 (char >= '0' && char <= '9') ||
344 char == '-' || char == '_' || char == '.') {
345 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores")
346 }
347 }
348
349 // additional check to prevent multiple sequential dots
350 if strings.Contains(name, "..") {
351 return fmt.Errorf("Repository name cannot contain sequential dots")
352 }
353
354 // if all checks pass
355 return nil
356}
357
358func stripGitExt(name string) string {
359 return strings.TrimSuffix(name, ".git")
360}
361
362func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) {
363 switch r.Method {
364 case http.MethodGet:
365 user := s.oauth.GetUser(r)
366 knots, err := s.enforcer.GetKnotsForUser(user.Did)
367 if err != nil {
368 s.pages.Notice(w, "repo", "Invalid user account.")
369 return
370 }
371
372 s.pages.NewRepo(w, pages.NewRepoParams{
373 LoggedInUser: user,
374 Knots: knots,
375 })
376
377 case http.MethodPost:
378 l := s.logger.With("handler", "NewRepo")
379
380 user := s.oauth.GetUser(r)
381 l = l.With("did", user.Did)
382 l = l.With("handle", user.Handle)
383
384 // form validation
385 domain := r.FormValue("domain")
386 if domain == "" {
387 s.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
388 return
389 }
390 l = l.With("knot", domain)
391
392 repoName := r.FormValue("name")
393 if repoName == "" {
394 s.pages.Notice(w, "repo", "Repository name cannot be empty.")
395 return
396 }
397
398 if err := validateRepoName(repoName); err != nil {
399 s.pages.Notice(w, "repo", err.Error())
400 return
401 }
402 repoName = stripGitExt(repoName)
403 l = l.With("repoName", repoName)
404
405 defaultBranch := r.FormValue("branch")
406 if defaultBranch == "" {
407 defaultBranch = "main"
408 }
409 l = l.With("defaultBranch", defaultBranch)
410
411 description := r.FormValue("description")
412
413 // ACL validation
414 ok, err := s.enforcer.E.Enforce(user.Did, domain, domain, "repo:create")
415 if err != nil || !ok {
416 l.Info("unauthorized")
417 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
418 return
419 }
420
421 // Check for existing repos
422 existingRepo, err := db.GetRepo(s.db, user.Did, repoName)
423 if err == nil && existingRepo != nil {
424 l.Info("repo exists")
425 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot))
426 return
427 }
428
429 // create atproto record for this repo
430 rkey := tid.TID()
431 repo := &db.Repo{
432 Did: user.Did,
433 Name: repoName,
434 Knot: domain,
435 Rkey: rkey,
436 Description: description,
437 }
438
439 xrpcClient, err := s.oauth.AuthorizedClient(r)
440 if err != nil {
441 l.Info("PDS write failed", "err", err)
442 s.pages.Notice(w, "repo", "Failed to write record to PDS.")
443 return
444 }
445
446 createdAt := time.Now().Format(time.RFC3339)
447 atresp, err := xrpcClient.RepoPutRecord(r.Context(), &comatproto.RepoPutRecord_Input{
448 Collection: tangled.RepoNSID,
449 Repo: user.Did,
450 Rkey: rkey,
451 Record: &lexutil.LexiconTypeDecoder{
452 Val: &tangled.Repo{
453 Knot: repo.Knot,
454 Name: repoName,
455 CreatedAt: createdAt,
456 }},
457 })
458 if err != nil {
459 l.Info("PDS write failed", "err", err)
460 s.pages.Notice(w, "repo", "Failed to announce repository creation.")
461 return
462 }
463
464 aturi := atresp.Uri
465 l = l.With("aturi", aturi)
466 l.Info("wrote to PDS")
467
468 tx, err := s.db.BeginTx(r.Context(), nil)
469 if err != nil {
470 l.Info("txn failed", "err", err)
471 s.pages.Notice(w, "repo", "Failed to save repository information.")
472 return
473 }
474
475 // The rollback function reverts a few things on failure:
476 // - the pending txn
477 // - the ACLs
478 // - the atproto record created
479 rollback := func() {
480 err1 := tx.Rollback()
481 err2 := s.enforcer.E.LoadPolicy()
482 err3 := rollbackRecord(context.Background(), aturi, xrpcClient)
483
484 // ignore txn complete errors, this is okay
485 if errors.Is(err1, sql.ErrTxDone) {
486 err1 = nil
487 }
488
489 if errs := errors.Join(err1, err2, err3); errs != nil {
490 l.Error("failed to rollback changes", "errs", errs)
491 return
492 }
493 }
494 defer rollback()
495
496 client, err := s.oauth.ServiceClient(
497 r,
498 oauth.WithService(domain),
499 oauth.WithLxm(tangled.RepoCreateNSID),
500 oauth.WithDev(s.config.Core.Dev),
501 )
502 if err != nil {
503 l.Error("service auth failed", "err", err)
504 s.pages.Notice(w, "repo", "Failed to reach PDS.")
505 return
506 }
507
508 xe := tangled.RepoCreate(
509 r.Context(),
510 client,
511 &tangled.RepoCreate_Input{
512 Rkey: rkey,
513 },
514 )
515 if err := xrpcclient.HandleXrpcErr(xe); err != nil {
516 l.Error("xrpc error", "xe", xe)
517 s.pages.Notice(w, "repo", err.Error())
518 return
519 }
520
521 err = db.AddRepo(tx, repo)
522 if err != nil {
523 l.Error("db write failed", "err", err)
524 s.pages.Notice(w, "repo", "Failed to save repository information.")
525 return
526 }
527
528 // acls
529 p, _ := securejoin.SecureJoin(user.Did, repoName)
530 err = s.enforcer.AddRepo(user.Did, domain, p)
531 if err != nil {
532 l.Error("acl setup failed", "err", err)
533 s.pages.Notice(w, "repo", "Failed to set up repository permissions.")
534 return
535 }
536
537 err = tx.Commit()
538 if err != nil {
539 l.Error("txn commit failed", "err", err)
540 http.Error(w, err.Error(), http.StatusInternalServerError)
541 return
542 }
543
544 err = s.enforcer.E.SavePolicy()
545 if err != nil {
546 l.Error("acl save failed", "err", err)
547 http.Error(w, err.Error(), http.StatusInternalServerError)
548 return
549 }
550
551 // reset the ATURI because the transaction completed successfully
552 aturi = ""
553
554 s.notifier.NewRepo(r.Context(), repo)
555 s.pages.HxLocation(w, fmt.Sprintf("/@%s/%s", user.Handle, repoName))
556 }
557}
558
559// this is used to rollback changes made to the PDS
560//
561// it is a no-op if the provided ATURI is empty
562func rollbackRecord(ctx context.Context, aturi string, xrpcc *xrpcclient.Client) error {
563 if aturi == "" {
564 return nil
565 }
566
567 parsed := syntax.ATURI(aturi)
568
569 collection := parsed.Collection().String()
570 repo := parsed.Authority().String()
571 rkey := parsed.RecordKey().String()
572
573 _, err := xrpcc.RepoDeleteRecord(ctx, &comatproto.RepoDeleteRecord_Input{
574 Collection: collection,
575 Repo: repo,
576 Rkey: rkey,
577 })
578 return err
579}