this repo has no description
1package state
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log"
9 "log/slog"
10 "net/http"
11 "strings"
12 "time"
13
14 "tangled.org/core/api/tangled"
15 "tangled.org/core/appview"
16 "tangled.org/core/appview/cache"
17 "tangled.org/core/appview/cache/session"
18 "tangled.org/core/appview/config"
19 "tangled.org/core/appview/db"
20 "tangled.org/core/appview/models"
21 "tangled.org/core/appview/notify"
22 dbnotify "tangled.org/core/appview/notify/db"
23 phnotify "tangled.org/core/appview/notify/posthog"
24 "tangled.org/core/appview/oauth"
25 "tangled.org/core/appview/pages"
26 "tangled.org/core/appview/reporesolver"
27 "tangled.org/core/appview/validator"
28 xrpcclient "tangled.org/core/appview/xrpcclient"
29 "tangled.org/core/eventconsumer"
30 "tangled.org/core/idresolver"
31 "tangled.org/core/jetstream"
32 tlog "tangled.org/core/log"
33 "tangled.org/core/rbac"
34 "tangled.org/core/tid"
35
36 comatproto "github.com/bluesky-social/indigo/api/atproto"
37 atpclient "github.com/bluesky-social/indigo/atproto/client"
38 "github.com/bluesky-social/indigo/atproto/syntax"
39 lexutil "github.com/bluesky-social/indigo/lex/util"
40 securejoin "github.com/cyphar/filepath-securejoin"
41 "github.com/go-chi/chi/v5"
42 "github.com/posthog/posthog-go"
43)
44
45type State struct {
46 db *db.DB
47 notifier notify.Notifier
48 oauth *oauth.OAuth
49 enforcer *rbac.Enforcer
50 pages *pages.Pages
51 sess *session.SessionStore
52 idResolver *idresolver.Resolver
53 posthog posthog.Client
54 jc *jetstream.JetstreamClient
55 config *config.Config
56 repoResolver *reporesolver.RepoResolver
57 knotstream *eventconsumer.Consumer
58 spindlestream *eventconsumer.Consumer
59 logger *slog.Logger
60 validator *validator.Validator
61}
62
63func Make(ctx context.Context, config *config.Config) (*State, error) {
64 d, err := db.Make(config.Core.DbPath)
65 if err != nil {
66 return nil, fmt.Errorf("failed to create db: %w", err)
67 }
68
69 enforcer, err := rbac.NewEnforcer(config.Core.DbPath)
70 if err != nil {
71 return nil, fmt.Errorf("failed to create enforcer: %w", err)
72 }
73
74 res, err := idresolver.RedisResolver(config.Redis.ToURL())
75 if err != nil {
76 log.Printf("failed to create redis resolver: %v", err)
77 res = idresolver.DefaultResolver()
78 }
79
80 pages := pages.NewPages(config, res)
81 cache := cache.New(config.Redis.Addr)
82 sess := session.New(cache)
83 oauth2, err := oauth.New(config)
84 if err != nil {
85 return nil, fmt.Errorf("failed to start oauth handler: %w", err)
86 }
87 validator := validator.New(d, res, enforcer)
88
89 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint})
90 if err != nil {
91 return nil, fmt.Errorf("failed to create posthog client: %w", err)
92 }
93
94 repoResolver := reporesolver.New(config, enforcer, res, d)
95
96 wrapper := db.DbWrapper{Execer: d}
97 jc, err := jetstream.NewJetstreamClient(
98 config.Jetstream.Endpoint,
99 "appview",
100 []string{
101 tangled.GraphFollowNSID,
102 tangled.FeedStarNSID,
103 tangled.PublicKeyNSID,
104 tangled.RepoArtifactNSID,
105 tangled.ActorProfileNSID,
106 tangled.SpindleMemberNSID,
107 tangled.SpindleNSID,
108 tangled.StringNSID,
109 tangled.RepoIssueNSID,
110 tangled.RepoIssueCommentNSID,
111 tangled.LabelDefinitionNSID,
112 tangled.LabelOpNSID,
113 },
114 nil,
115 slog.Default(),
116 wrapper,
117 false,
118
119 // in-memory filter is inapplicalble to appview so
120 // we'll never log dids anyway.
121 false,
122 )
123 if err != nil {
124 return nil, fmt.Errorf("failed to create jetstream client: %w", err)
125 }
126
127 if err := BackfillDefaultDefs(d, res); err != nil {
128 return nil, fmt.Errorf("failed to backfill default label defs: %w", err)
129 }
130
131 ingester := appview.Ingester{
132 Db: wrapper,
133 Enforcer: enforcer,
134 IdResolver: res,
135 Config: config,
136 Logger: tlog.New("ingester"),
137 Validator: validator,
138 }
139 err = jc.StartJetstream(ctx, ingester.Ingest())
140 if err != nil {
141 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err)
142 }
143
144 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog)
145 if err != nil {
146 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err)
147 }
148 knotstream.Start(ctx)
149
150 spindlestream, err := Spindlestream(ctx, config, d, enforcer)
151 if err != nil {
152 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err)
153 }
154 spindlestream.Start(ctx)
155
156 var notifiers []notify.Notifier
157
158 // Always add the database notifier
159 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res))
160
161 // Add other notifiers in production only
162 if !config.Core.Dev {
163 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog))
164 }
165 notifier := notify.NewMergedNotifier(notifiers...)
166
167 state := &State{
168 d,
169 notifier,
170 oauth2,
171 enforcer,
172 pages,
173 sess,
174 res,
175 posthog,
176 jc,
177 config,
178 repoResolver,
179 knotstream,
180 spindlestream,
181 slog.Default(),
182 validator,
183 }
184
185 return state, nil
186}
187
188func (s *State) Close() error {
189 // other close up logic goes here
190 return s.db.Close()
191}
192
193func (s *State) Favicon(w http.ResponseWriter, r *http.Request) {
194 w.Header().Set("Content-Type", "image/svg+xml")
195 w.Header().Set("Cache-Control", "public, max-age=31536000") // one year
196 w.Header().Set("ETag", `"favicon-svg-v1"`)
197
198 if match := r.Header.Get("If-None-Match"); match == `"favicon-svg-v1"` {
199 w.WriteHeader(http.StatusNotModified)
200 return
201 }
202
203 s.pages.Favicon(w)
204}
205
206// https://developer.mozilla.org/en-US/docs/Web/Progressive_web_apps/Manifest
207const manifestJson = `{
208 "name": "tangled",
209 "description": "tightly-knit social coding.",
210 "icons": [
211 {
212 "src": "/favicon.svg",
213 "sizes": "144x144"
214 }
215 ],
216 "start_url": "/",
217 "id": "org.tangled",
218
219 "display": "standalone",
220 "background_color": "#111827",
221 "theme_color": "#111827"
222}`
223
224func (p *State) PWAManifest(w http.ResponseWriter, r *http.Request) {
225 w.Header().Set("Content-Type", "application/json")
226 w.Write([]byte(manifestJson))
227}
228
229func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) {
230 user := s.oauth.GetUser(r)
231 s.pages.TermsOfService(w, pages.TermsOfServiceParams{
232 LoggedInUser: user,
233 })
234}
235
236func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) {
237 user := s.oauth.GetUser(r)
238 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{
239 LoggedInUser: user,
240 })
241}
242
243func (s *State) Brand(w http.ResponseWriter, r *http.Request) {
244 user := s.oauth.GetUser(r)
245 s.pages.Brand(w, pages.BrandParams{
246 LoggedInUser: user,
247 })
248}
249
250func (s *State) HomeOrTimeline(w http.ResponseWriter, r *http.Request) {
251 if s.oauth.GetUser(r) != nil {
252 s.Timeline(w, r)
253 return
254 }
255 s.Home(w, r)
256}
257
258func (s *State) Timeline(w http.ResponseWriter, r *http.Request) {
259 user := s.oauth.GetUser(r)
260
261 // TODO: set this flag based on the UI
262 filtered := false
263
264 var userDid string
265 if user != nil {
266 userDid = user.Did
267 }
268 timeline, err := db.MakeTimeline(s.db, 50, userDid, filtered)
269 if err != nil {
270 log.Println(err)
271 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.")
272 }
273
274 repos, err := db.GetTopStarredReposLastWeek(s.db)
275 if err != nil {
276 log.Println(err)
277 s.pages.Notice(w, "topstarredrepos", "Unable to load.")
278 return
279 }
280
281 gfiLabel, err := db.GetLabelDefinition(s.db, db.FilterEq("at_uri", models.LabelGoodFirstIssue))
282 if err != nil {
283 // non-fatal
284 }
285
286 s.pages.Timeline(w, pages.TimelineParams{
287 LoggedInUser: user,
288 Timeline: timeline,
289 Repos: repos,
290 GfiLabel: gfiLabel,
291 })
292}
293
294func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) {
295 user := s.oauth.GetUser(r)
296 if user == nil {
297 return
298 }
299
300 l := s.logger.With("handler", "UpgradeBanner")
301 l = l.With("did", user.Did)
302
303 regs, err := db.GetRegistrations(
304 s.db,
305 db.FilterEq("did", user.Did),
306 db.FilterEq("needs_upgrade", 1),
307 )
308 if err != nil {
309 l.Error("non-fatal: failed to get registrations", "err", err)
310 }
311
312 spindles, err := db.GetSpindles(
313 s.db,
314 db.FilterEq("owner", user.Did),
315 db.FilterEq("needs_upgrade", 1),
316 )
317 if err != nil {
318 l.Error("non-fatal: failed to get spindles", "err", err)
319 }
320
321 if regs == nil && spindles == nil {
322 return
323 }
324
325 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{
326 Registrations: regs,
327 Spindles: spindles,
328 })
329}
330
331func (s *State) Home(w http.ResponseWriter, r *http.Request) {
332 // TODO: set this flag based on the UI
333 filtered := false
334
335 timeline, err := db.MakeTimeline(s.db, 5, "", filtered)
336 if err != nil {
337 log.Println(err)
338 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.")
339 return
340 }
341
342 repos, err := db.GetTopStarredReposLastWeek(s.db)
343 if err != nil {
344 log.Println(err)
345 s.pages.Notice(w, "topstarredrepos", "Unable to load.")
346 return
347 }
348
349 s.pages.Home(w, pages.TimelineParams{
350 LoggedInUser: nil,
351 Timeline: timeline,
352 Repos: repos,
353 })
354}
355
356func (s *State) Keys(w http.ResponseWriter, r *http.Request) {
357 user := chi.URLParam(r, "user")
358 user = strings.TrimPrefix(user, "@")
359
360 if user == "" {
361 w.WriteHeader(http.StatusBadRequest)
362 return
363 }
364
365 id, err := s.idResolver.ResolveIdent(r.Context(), user)
366 if err != nil {
367 w.WriteHeader(http.StatusInternalServerError)
368 return
369 }
370
371 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String())
372 if err != nil {
373 w.WriteHeader(http.StatusNotFound)
374 return
375 }
376
377 if len(pubKeys) == 0 {
378 w.WriteHeader(http.StatusNotFound)
379 return
380 }
381
382 for _, k := range pubKeys {
383 key := strings.TrimRight(k.Key, "\n")
384 fmt.Fprintln(w, key)
385 }
386}
387
388func validateRepoName(name string) error {
389 // check for path traversal attempts
390 if name == "." || name == ".." ||
391 strings.Contains(name, "/") || strings.Contains(name, "\\") {
392 return fmt.Errorf("Repository name contains invalid path characters")
393 }
394
395 // check for sequences that could be used for traversal when normalized
396 if strings.Contains(name, "./") || strings.Contains(name, "../") ||
397 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") {
398 return fmt.Errorf("Repository name contains invalid path sequence")
399 }
400
401 // then continue with character validation
402 for _, char := range name {
403 if !((char >= 'a' && char <= 'z') ||
404 (char >= 'A' && char <= 'Z') ||
405 (char >= '0' && char <= '9') ||
406 char == '-' || char == '_' || char == '.') {
407 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores")
408 }
409 }
410
411 // additional check to prevent multiple sequential dots
412 if strings.Contains(name, "..") {
413 return fmt.Errorf("Repository name cannot contain sequential dots")
414 }
415
416 // if all checks pass
417 return nil
418}
419
420func stripGitExt(name string) string {
421 return strings.TrimSuffix(name, ".git")
422}
423
424func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) {
425 switch r.Method {
426 case http.MethodGet:
427 user := s.oauth.GetUser(r)
428 knots, err := s.enforcer.GetKnotsForUser(user.Did)
429 if err != nil {
430 s.pages.Notice(w, "repo", "Invalid user account.")
431 return
432 }
433
434 s.pages.NewRepo(w, pages.NewRepoParams{
435 LoggedInUser: user,
436 Knots: knots,
437 })
438
439 case http.MethodPost:
440 l := s.logger.With("handler", "NewRepo")
441
442 user := s.oauth.GetUser(r)
443 l = l.With("did", user.Did)
444
445 // form validation
446 domain := r.FormValue("domain")
447 if domain == "" {
448 s.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
449 return
450 }
451 l = l.With("knot", domain)
452
453 repoName := r.FormValue("name")
454 if repoName == "" {
455 s.pages.Notice(w, "repo", "Repository name cannot be empty.")
456 return
457 }
458
459 if err := validateRepoName(repoName); err != nil {
460 s.pages.Notice(w, "repo", err.Error())
461 return
462 }
463 repoName = stripGitExt(repoName)
464 l = l.With("repoName", repoName)
465
466 defaultBranch := r.FormValue("branch")
467 if defaultBranch == "" {
468 defaultBranch = "main"
469 }
470 l = l.With("defaultBranch", defaultBranch)
471
472 description := r.FormValue("description")
473
474 // ACL validation
475 ok, err := s.enforcer.E.Enforce(user.Did, domain, domain, "repo:create")
476 if err != nil || !ok {
477 l.Info("unauthorized")
478 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
479 return
480 }
481
482 // Check for existing repos
483 existingRepo, err := db.GetRepo(
484 s.db,
485 db.FilterEq("did", user.Did),
486 db.FilterEq("name", repoName),
487 )
488 if err == nil && existingRepo != nil {
489 l.Info("repo exists")
490 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot))
491 return
492 }
493
494 // create atproto record for this repo
495 rkey := tid.TID()
496 repo := &models.Repo{
497 Did: user.Did,
498 Name: repoName,
499 Knot: domain,
500 Rkey: rkey,
501 Description: description,
502 Created: time.Now(),
503 Labels: models.DefaultLabelDefs(),
504 }
505 record := repo.AsRecord()
506
507 atpClient, err := s.oauth.AuthorizedClient(r)
508 if err != nil {
509 l.Info("PDS write failed", "err", err)
510 s.pages.Notice(w, "repo", "Failed to write record to PDS.")
511 return
512 }
513
514 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{
515 Collection: tangled.RepoNSID,
516 Repo: user.Did,
517 Rkey: rkey,
518 Record: &lexutil.LexiconTypeDecoder{
519 Val: &record,
520 },
521 })
522 if err != nil {
523 l.Info("PDS write failed", "err", err)
524 s.pages.Notice(w, "repo", "Failed to announce repository creation.")
525 return
526 }
527
528 aturi := atresp.Uri
529 l = l.With("aturi", aturi)
530 l.Info("wrote to PDS")
531
532 tx, err := s.db.BeginTx(r.Context(), nil)
533 if err != nil {
534 l.Info("txn failed", "err", err)
535 s.pages.Notice(w, "repo", "Failed to save repository information.")
536 return
537 }
538
539 // The rollback function reverts a few things on failure:
540 // - the pending txn
541 // - the ACLs
542 // - the atproto record created
543 rollback := func() {
544 err1 := tx.Rollback()
545 err2 := s.enforcer.E.LoadPolicy()
546 err3 := rollbackRecord(context.Background(), aturi, atpClient)
547
548 // ignore txn complete errors, this is okay
549 if errors.Is(err1, sql.ErrTxDone) {
550 err1 = nil
551 }
552
553 if errs := errors.Join(err1, err2, err3); errs != nil {
554 l.Error("failed to rollback changes", "errs", errs)
555 return
556 }
557 }
558 defer rollback()
559
560 client, err := s.oauth.ServiceClient(
561 r,
562 oauth.WithService(domain),
563 oauth.WithLxm(tangled.RepoCreateNSID),
564 oauth.WithDev(s.config.Core.Dev),
565 )
566 if err != nil {
567 l.Error("service auth failed", "err", err)
568 s.pages.Notice(w, "repo", "Failed to reach PDS.")
569 return
570 }
571
572 xe := tangled.RepoCreate(
573 r.Context(),
574 client,
575 &tangled.RepoCreate_Input{
576 Rkey: rkey,
577 },
578 )
579 if err := xrpcclient.HandleXrpcErr(xe); err != nil {
580 l.Error("xrpc error", "xe", xe)
581 s.pages.Notice(w, "repo", err.Error())
582 return
583 }
584
585 err = db.AddRepo(tx, repo)
586 if err != nil {
587 l.Error("db write failed", "err", err)
588 s.pages.Notice(w, "repo", "Failed to save repository information.")
589 return
590 }
591
592 // acls
593 p, _ := securejoin.SecureJoin(user.Did, repoName)
594 err = s.enforcer.AddRepo(user.Did, domain, p)
595 if err != nil {
596 l.Error("acl setup failed", "err", err)
597 s.pages.Notice(w, "repo", "Failed to set up repository permissions.")
598 return
599 }
600
601 err = tx.Commit()
602 if err != nil {
603 l.Error("txn commit failed", "err", err)
604 http.Error(w, err.Error(), http.StatusInternalServerError)
605 return
606 }
607
608 err = s.enforcer.E.SavePolicy()
609 if err != nil {
610 l.Error("acl save failed", "err", err)
611 http.Error(w, err.Error(), http.StatusInternalServerError)
612 return
613 }
614
615 // reset the ATURI because the transaction completed successfully
616 aturi = ""
617
618 s.notifier.NewRepo(r.Context(), repo)
619 s.pages.HxLocation(w, fmt.Sprintf("/@%s/%s", user.Did, repoName))
620 }
621}
622
623// this is used to rollback changes made to the PDS
624//
625// it is a no-op if the provided ATURI is empty
626func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error {
627 if aturi == "" {
628 return nil
629 }
630
631 parsed := syntax.ATURI(aturi)
632
633 collection := parsed.Collection().String()
634 repo := parsed.Authority().String()
635 rkey := parsed.RecordKey().String()
636
637 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{
638 Collection: collection,
639 Repo: repo,
640 Rkey: rkey,
641 })
642 return err
643}
644
645func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver) error {
646 defaults := models.DefaultLabelDefs()
647 defaultLabels, err := db.GetLabelDefinitions(e, db.FilterIn("at_uri", defaults))
648 if err != nil {
649 return err
650 }
651 // already present
652 if len(defaultLabels) == len(defaults) {
653 return nil
654 }
655
656 labelDefs, err := models.FetchDefaultDefs(r)
657 if err != nil {
658 return err
659 }
660
661 // Insert each label definition to the database
662 for _, labelDef := range labelDefs {
663 _, err = db.AddLabelDefinition(e, &labelDef)
664 if err != nil {
665 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err)
666 }
667 }
668
669 return nil
670}