this repo has no description
1package state
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log"
9 "log/slog"
10 "net/http"
11 "strings"
12 "time"
13
14 comatproto "github.com/bluesky-social/indigo/api/atproto"
15 "github.com/bluesky-social/indigo/atproto/syntax"
16 lexutil "github.com/bluesky-social/indigo/lex/util"
17 securejoin "github.com/cyphar/filepath-securejoin"
18 "github.com/go-chi/chi/v5"
19 "github.com/posthog/posthog-go"
20 "tangled.sh/tangled.sh/core/api/tangled"
21 "tangled.sh/tangled.sh/core/appview"
22 "tangled.sh/tangled.sh/core/appview/cache"
23 "tangled.sh/tangled.sh/core/appview/cache/session"
24 "tangled.sh/tangled.sh/core/appview/config"
25 "tangled.sh/tangled.sh/core/appview/db"
26 "tangled.sh/tangled.sh/core/appview/notify"
27 "tangled.sh/tangled.sh/core/appview/oauth"
28 "tangled.sh/tangled.sh/core/appview/pages"
29 posthogService "tangled.sh/tangled.sh/core/appview/posthog"
30 "tangled.sh/tangled.sh/core/appview/reporesolver"
31 "tangled.sh/tangled.sh/core/appview/validator"
32 xrpcclient "tangled.sh/tangled.sh/core/appview/xrpcclient"
33 "tangled.sh/tangled.sh/core/eventconsumer"
34 "tangled.sh/tangled.sh/core/idresolver"
35 "tangled.sh/tangled.sh/core/jetstream"
36 tlog "tangled.sh/tangled.sh/core/log"
37 "tangled.sh/tangled.sh/core/rbac"
38 "tangled.sh/tangled.sh/core/tid"
39 // xrpcerr "tangled.sh/tangled.sh/core/xrpc/errors"
40)
41
42type State struct {
43 db *db.DB
44 notifier notify.Notifier
45 oauth *oauth.OAuth
46 enforcer *rbac.Enforcer
47 pages *pages.Pages
48 sess *session.SessionStore
49 idResolver *idresolver.Resolver
50 posthog posthog.Client
51 jc *jetstream.JetstreamClient
52 config *config.Config
53 repoResolver *reporesolver.RepoResolver
54 knotstream *eventconsumer.Consumer
55 spindlestream *eventconsumer.Consumer
56 logger *slog.Logger
57 validator *validator.Validator
58}
59
60func Make(ctx context.Context, config *config.Config) (*State, error) {
61 d, err := db.Make(config.Core.DbPath)
62 if err != nil {
63 return nil, fmt.Errorf("failed to create db: %w", err)
64 }
65
66 enforcer, err := rbac.NewEnforcer(config.Core.DbPath)
67 if err != nil {
68 return nil, fmt.Errorf("failed to create enforcer: %w", err)
69 }
70
71 res, err := idresolver.RedisResolver(config.Redis.ToURL())
72 if err != nil {
73 log.Printf("failed to create redis resolver: %v", err)
74 res = idresolver.DefaultResolver()
75 }
76
77 pgs := pages.NewPages(config, res)
78 cache := cache.New(config.Redis.Addr)
79 sess := session.New(cache)
80 oauth := oauth.NewOAuth(config, sess)
81 validator := validator.New(d)
82
83 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint})
84 if err != nil {
85 return nil, fmt.Errorf("failed to create posthog client: %w", err)
86 }
87
88 repoResolver := reporesolver.New(config, enforcer, res, d)
89
90 wrapper := db.DbWrapper{d}
91 jc, err := jetstream.NewJetstreamClient(
92 config.Jetstream.Endpoint,
93 "appview",
94 []string{
95 tangled.GraphFollowNSID,
96 tangled.FeedStarNSID,
97 tangled.PublicKeyNSID,
98 tangled.RepoArtifactNSID,
99 tangled.ActorProfileNSID,
100 tangled.SpindleMemberNSID,
101 tangled.SpindleNSID,
102 tangled.StringNSID,
103 tangled.RepoIssueNSID,
104 tangled.RepoIssueCommentNSID,
105 },
106 nil,
107 slog.Default(),
108 wrapper,
109 false,
110
111 // in-memory filter is inapplicalble to appview so
112 // we'll never log dids anyway.
113 false,
114 )
115 if err != nil {
116 return nil, fmt.Errorf("failed to create jetstream client: %w", err)
117 }
118
119 ingester := appview.Ingester{
120 Db: wrapper,
121 Enforcer: enforcer,
122 IdResolver: res,
123 Config: config,
124 Logger: tlog.New("ingester"),
125 Validator: validator,
126 }
127 err = jc.StartJetstream(ctx, ingester.Ingest())
128 if err != nil {
129 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err)
130 }
131
132 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog)
133 if err != nil {
134 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err)
135 }
136 knotstream.Start(ctx)
137
138 spindlestream, err := Spindlestream(ctx, config, d, enforcer)
139 if err != nil {
140 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err)
141 }
142 spindlestream.Start(ctx)
143
144 var notifiers []notify.Notifier
145 if !config.Core.Dev {
146 notifiers = append(notifiers, posthogService.NewPosthogNotifier(posthog))
147 }
148 notifier := notify.NewMergedNotifier(notifiers...)
149
150 state := &State{
151 d,
152 notifier,
153 oauth,
154 enforcer,
155 pgs,
156 sess,
157 res,
158 posthog,
159 jc,
160 config,
161 repoResolver,
162 knotstream,
163 spindlestream,
164 slog.Default(),
165 validator,
166 }
167
168 return state, nil
169}
170
171func (s *State) Close() error {
172 // other close up logic goes here
173 return s.db.Close()
174}
175
176func (s *State) Favicon(w http.ResponseWriter, r *http.Request) {
177 w.Header().Set("Content-Type", "image/svg+xml")
178 w.Header().Set("Cache-Control", "public, max-age=31536000") // one year
179 w.Header().Set("ETag", `"favicon-svg-v1"`)
180
181 if match := r.Header.Get("If-None-Match"); match == `"favicon-svg-v1"` {
182 w.WriteHeader(http.StatusNotModified)
183 return
184 }
185
186 s.pages.Favicon(w)
187}
188
189func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) {
190 user := s.oauth.GetUser(r)
191 s.pages.TermsOfService(w, pages.TermsOfServiceParams{
192 LoggedInUser: user,
193 })
194}
195
196func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) {
197 user := s.oauth.GetUser(r)
198 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{
199 LoggedInUser: user,
200 })
201}
202
203func (s *State) HomeOrTimeline(w http.ResponseWriter, r *http.Request) {
204 if s.oauth.GetUser(r) != nil {
205 s.Timeline(w, r)
206 return
207 }
208 s.Home(w, r)
209}
210
211func (s *State) Timeline(w http.ResponseWriter, r *http.Request) {
212 user := s.oauth.GetUser(r)
213
214 timeline, err := db.MakeTimeline(s.db, 50)
215 if err != nil {
216 log.Println(err)
217 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.")
218 }
219
220 repos, err := db.GetTopStarredReposLastWeek(s.db)
221 if err != nil {
222 log.Println(err)
223 s.pages.Notice(w, "topstarredrepos", "Unable to load.")
224 return
225 }
226
227 s.pages.Timeline(w, pages.TimelineParams{
228 LoggedInUser: user,
229 Timeline: timeline,
230 Repos: repos,
231 })
232}
233
234func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) {
235 user := s.oauth.GetUser(r)
236 l := s.logger.With("handler", "UpgradeBanner")
237 l = l.With("did", user.Did)
238 l = l.With("handle", user.Handle)
239
240 regs, err := db.GetRegistrations(
241 s.db,
242 db.FilterEq("did", user.Did),
243 db.FilterEq("needs_upgrade", 1),
244 )
245 if err != nil {
246 l.Error("non-fatal: failed to get registrations", "err", err)
247 }
248
249 spindles, err := db.GetSpindles(
250 s.db,
251 db.FilterEq("owner", user.Did),
252 db.FilterEq("needs_upgrade", 1),
253 )
254 if err != nil {
255 l.Error("non-fatal: failed to get spindles", "err", err)
256 }
257
258 if regs == nil && spindles == nil {
259 return
260 }
261
262 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{
263 Registrations: regs,
264 Spindles: spindles,
265 })
266}
267
268func (s *State) Home(w http.ResponseWriter, r *http.Request) {
269 timeline, err := db.MakeTimeline(s.db, 5)
270 if err != nil {
271 log.Println(err)
272 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.")
273 return
274 }
275
276 repos, err := db.GetTopStarredReposLastWeek(s.db)
277 if err != nil {
278 log.Println(err)
279 s.pages.Notice(w, "topstarredrepos", "Unable to load.")
280 return
281 }
282
283 s.pages.Home(w, pages.TimelineParams{
284 LoggedInUser: nil,
285 Timeline: timeline,
286 Repos: repos,
287 })
288}
289
290func (s *State) Keys(w http.ResponseWriter, r *http.Request) {
291 user := chi.URLParam(r, "user")
292 user = strings.TrimPrefix(user, "@")
293
294 if user == "" {
295 w.WriteHeader(http.StatusBadRequest)
296 return
297 }
298
299 id, err := s.idResolver.ResolveIdent(r.Context(), user)
300 if err != nil {
301 w.WriteHeader(http.StatusInternalServerError)
302 return
303 }
304
305 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String())
306 if err != nil {
307 w.WriteHeader(http.StatusNotFound)
308 return
309 }
310
311 if len(pubKeys) == 0 {
312 w.WriteHeader(http.StatusNotFound)
313 return
314 }
315
316 for _, k := range pubKeys {
317 key := strings.TrimRight(k.Key, "\n")
318 fmt.Fprintln(w, key)
319 }
320}
321
322func validateRepoName(name string) error {
323 // check for path traversal attempts
324 if name == "." || name == ".." ||
325 strings.Contains(name, "/") || strings.Contains(name, "\\") {
326 return fmt.Errorf("Repository name contains invalid path characters")
327 }
328
329 // check for sequences that could be used for traversal when normalized
330 if strings.Contains(name, "./") || strings.Contains(name, "../") ||
331 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") {
332 return fmt.Errorf("Repository name contains invalid path sequence")
333 }
334
335 // then continue with character validation
336 for _, char := range name {
337 if !((char >= 'a' && char <= 'z') ||
338 (char >= 'A' && char <= 'Z') ||
339 (char >= '0' && char <= '9') ||
340 char == '-' || char == '_' || char == '.') {
341 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores")
342 }
343 }
344
345 // additional check to prevent multiple sequential dots
346 if strings.Contains(name, "..") {
347 return fmt.Errorf("Repository name cannot contain sequential dots")
348 }
349
350 // if all checks pass
351 return nil
352}
353
354func stripGitExt(name string) string {
355 return strings.TrimSuffix(name, ".git")
356}
357
358func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) {
359 switch r.Method {
360 case http.MethodGet:
361 user := s.oauth.GetUser(r)
362 knots, err := s.enforcer.GetKnotsForUser(user.Did)
363 if err != nil {
364 s.pages.Notice(w, "repo", "Invalid user account.")
365 return
366 }
367
368 s.pages.NewRepo(w, pages.NewRepoParams{
369 LoggedInUser: user,
370 Knots: knots,
371 })
372
373 case http.MethodPost:
374 l := s.logger.With("handler", "NewRepo")
375
376 user := s.oauth.GetUser(r)
377 l = l.With("did", user.Did)
378 l = l.With("handle", user.Handle)
379
380 // form validation
381 domain := r.FormValue("domain")
382 if domain == "" {
383 s.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
384 return
385 }
386 l = l.With("knot", domain)
387
388 repoName := r.FormValue("name")
389 if repoName == "" {
390 s.pages.Notice(w, "repo", "Repository name cannot be empty.")
391 return
392 }
393
394 if err := validateRepoName(repoName); err != nil {
395 s.pages.Notice(w, "repo", err.Error())
396 return
397 }
398 repoName = stripGitExt(repoName)
399 l = l.With("repoName", repoName)
400
401 defaultBranch := r.FormValue("branch")
402 if defaultBranch == "" {
403 defaultBranch = "main"
404 }
405 l = l.With("defaultBranch", defaultBranch)
406
407 description := r.FormValue("description")
408
409 // ACL validation
410 ok, err := s.enforcer.E.Enforce(user.Did, domain, domain, "repo:create")
411 if err != nil || !ok {
412 l.Info("unauthorized")
413 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
414 return
415 }
416
417 // Check for existing repos
418 existingRepo, err := db.GetRepo(s.db, user.Did, repoName)
419 if err == nil && existingRepo != nil {
420 l.Info("repo exists")
421 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot))
422 return
423 }
424
425 // create atproto record for this repo
426 rkey := tid.TID()
427 repo := &db.Repo{
428 Did: user.Did,
429 Name: repoName,
430 Knot: domain,
431 Rkey: rkey,
432 Description: description,
433 }
434
435 xrpcClient, err := s.oauth.AuthorizedClient(r)
436 if err != nil {
437 l.Info("PDS write failed", "err", err)
438 s.pages.Notice(w, "repo", "Failed to write record to PDS.")
439 return
440 }
441
442 createdAt := time.Now().Format(time.RFC3339)
443 atresp, err := xrpcClient.RepoPutRecord(r.Context(), &comatproto.RepoPutRecord_Input{
444 Collection: tangled.RepoNSID,
445 Repo: user.Did,
446 Rkey: rkey,
447 Record: &lexutil.LexiconTypeDecoder{
448 Val: &tangled.Repo{
449 Knot: repo.Knot,
450 Name: repoName,
451 CreatedAt: createdAt,
452 }},
453 })
454 if err != nil {
455 l.Info("PDS write failed", "err", err)
456 s.pages.Notice(w, "repo", "Failed to announce repository creation.")
457 return
458 }
459
460 aturi := atresp.Uri
461 l = l.With("aturi", aturi)
462 l.Info("wrote to PDS")
463
464 tx, err := s.db.BeginTx(r.Context(), nil)
465 if err != nil {
466 l.Info("txn failed", "err", err)
467 s.pages.Notice(w, "repo", "Failed to save repository information.")
468 return
469 }
470
471 // The rollback function reverts a few things on failure:
472 // - the pending txn
473 // - the ACLs
474 // - the atproto record created
475 rollback := func() {
476 err1 := tx.Rollback()
477 err2 := s.enforcer.E.LoadPolicy()
478 err3 := rollbackRecord(context.Background(), aturi, xrpcClient)
479
480 // ignore txn complete errors, this is okay
481 if errors.Is(err1, sql.ErrTxDone) {
482 err1 = nil
483 }
484
485 if errs := errors.Join(err1, err2, err3); errs != nil {
486 l.Error("failed to rollback changes", "errs", errs)
487 return
488 }
489 }
490 defer rollback()
491
492 client, err := s.oauth.ServiceClient(
493 r,
494 oauth.WithService(domain),
495 oauth.WithLxm(tangled.RepoCreateNSID),
496 oauth.WithDev(s.config.Core.Dev),
497 )
498 if err != nil {
499 l.Error("service auth failed", "err", err)
500 s.pages.Notice(w, "repo", "Failed to reach PDS.")
501 return
502 }
503
504 xe := tangled.RepoCreate(
505 r.Context(),
506 client,
507 &tangled.RepoCreate_Input{
508 Rkey: rkey,
509 },
510 )
511 if err := xrpcclient.HandleXrpcErr(xe); err != nil {
512 l.Error("xrpc error", "xe", xe)
513 s.pages.Notice(w, "repo", err.Error())
514 return
515 }
516
517 err = db.AddRepo(tx, repo)
518 if err != nil {
519 l.Error("db write failed", "err", err)
520 s.pages.Notice(w, "repo", "Failed to save repository information.")
521 return
522 }
523
524 // acls
525 p, _ := securejoin.SecureJoin(user.Did, repoName)
526 err = s.enforcer.AddRepo(user.Did, domain, p)
527 if err != nil {
528 l.Error("acl setup failed", "err", err)
529 s.pages.Notice(w, "repo", "Failed to set up repository permissions.")
530 return
531 }
532
533 err = tx.Commit()
534 if err != nil {
535 l.Error("txn commit failed", "err", err)
536 http.Error(w, err.Error(), http.StatusInternalServerError)
537 return
538 }
539
540 err = s.enforcer.E.SavePolicy()
541 if err != nil {
542 l.Error("acl save failed", "err", err)
543 http.Error(w, err.Error(), http.StatusInternalServerError)
544 return
545 }
546
547 // reset the ATURI because the transaction completed successfully
548 aturi = ""
549
550 s.notifier.NewRepo(r.Context(), repo)
551 s.pages.HxLocation(w, fmt.Sprintf("/@%s/%s", user.Handle, repoName))
552 }
553}
554
555// this is used to rollback changes made to the PDS
556//
557// it is a no-op if the provided ATURI is empty
558func rollbackRecord(ctx context.Context, aturi string, xrpcc *xrpcclient.Client) error {
559 if aturi == "" {
560 return nil
561 }
562
563 parsed := syntax.ATURI(aturi)
564
565 collection := parsed.Collection().String()
566 repo := parsed.Authority().String()
567 rkey := parsed.RecordKey().String()
568
569 _, err := xrpcc.RepoDeleteRecord(ctx, &comatproto.RepoDeleteRecord_Input{
570 Collection: collection,
571 Repo: repo,
572 Rkey: rkey,
573 })
574 return err
575}