this repo has no description
1package state
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log"
9 "log/slog"
10 "net/http"
11 "strings"
12 "time"
13
14 comatproto "github.com/bluesky-social/indigo/api/atproto"
15 "github.com/bluesky-social/indigo/atproto/syntax"
16 lexutil "github.com/bluesky-social/indigo/lex/util"
17 securejoin "github.com/cyphar/filepath-securejoin"
18 "github.com/go-chi/chi/v5"
19 "github.com/posthog/posthog-go"
20 "tangled.org/core/api/tangled"
21 "tangled.org/core/appview"
22 "tangled.org/core/appview/cache"
23 "tangled.org/core/appview/cache/session"
24 "tangled.org/core/appview/config"
25 "tangled.org/core/appview/db"
26 "tangled.org/core/appview/models"
27 "tangled.org/core/appview/notify"
28 dbnotify "tangled.org/core/appview/notify/db"
29 phnotify "tangled.org/core/appview/notify/posthog"
30 "tangled.org/core/appview/oauth"
31 "tangled.org/core/appview/pages"
32 "tangled.org/core/appview/reporesolver"
33 "tangled.org/core/appview/validator"
34 xrpcclient "tangled.org/core/appview/xrpcclient"
35 "tangled.org/core/eventconsumer"
36 "tangled.org/core/idresolver"
37 "tangled.org/core/jetstream"
38 tlog "tangled.org/core/log"
39 "tangled.org/core/rbac"
40 "tangled.org/core/tid"
41)
42
43type State struct {
44 db *db.DB
45 notifier notify.Notifier
46 oauth *oauth.OAuth
47 enforcer *rbac.Enforcer
48 pages *pages.Pages
49 sess *session.SessionStore
50 idResolver *idresolver.Resolver
51 posthog posthog.Client
52 jc *jetstream.JetstreamClient
53 config *config.Config
54 repoResolver *reporesolver.RepoResolver
55 knotstream *eventconsumer.Consumer
56 spindlestream *eventconsumer.Consumer
57 logger *slog.Logger
58 validator *validator.Validator
59}
60
61func Make(ctx context.Context, config *config.Config) (*State, error) {
62 d, err := db.Make(config.Core.DbPath)
63 if err != nil {
64 return nil, fmt.Errorf("failed to create db: %w", err)
65 }
66
67 enforcer, err := rbac.NewEnforcer(config.Core.DbPath)
68 if err != nil {
69 return nil, fmt.Errorf("failed to create enforcer: %w", err)
70 }
71
72 res, err := idresolver.RedisResolver(config.Redis.ToURL())
73 if err != nil {
74 log.Printf("failed to create redis resolver: %v", err)
75 res = idresolver.DefaultResolver()
76 }
77
78 pgs := pages.NewPages(config, res)
79 cache := cache.New(config.Redis.Addr)
80 sess := session.New(cache)
81 oauth := oauth.NewOAuth(config, sess)
82 validator := validator.New(d, res, enforcer)
83
84 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint})
85 if err != nil {
86 return nil, fmt.Errorf("failed to create posthog client: %w", err)
87 }
88
89 repoResolver := reporesolver.New(config, enforcer, res, d)
90
91 wrapper := db.DbWrapper{Execer: d}
92 jc, err := jetstream.NewJetstreamClient(
93 config.Jetstream.Endpoint,
94 "appview",
95 []string{
96 tangled.GraphFollowNSID,
97 tangled.FeedStarNSID,
98 tangled.PublicKeyNSID,
99 tangled.RepoArtifactNSID,
100 tangled.ActorProfileNSID,
101 tangled.SpindleMemberNSID,
102 tangled.SpindleNSID,
103 tangled.StringNSID,
104 tangled.RepoIssueNSID,
105 tangled.RepoIssueCommentNSID,
106 tangled.LabelDefinitionNSID,
107 tangled.LabelOpNSID,
108 },
109 nil,
110 slog.Default(),
111 wrapper,
112 false,
113
114 // in-memory filter is inapplicalble to appview so
115 // we'll never log dids anyway.
116 false,
117 )
118 if err != nil {
119 return nil, fmt.Errorf("failed to create jetstream client: %w", err)
120 }
121
122 if err := BackfillDefaultDefs(d, res); err != nil {
123 return nil, fmt.Errorf("failed to backfill default label defs: %w", err)
124 }
125
126 ingester := appview.Ingester{
127 Db: wrapper,
128 Enforcer: enforcer,
129 IdResolver: res,
130 Config: config,
131 Logger: tlog.New("ingester"),
132 Validator: validator,
133 }
134 err = jc.StartJetstream(ctx, ingester.Ingest())
135 if err != nil {
136 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err)
137 }
138
139 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog)
140 if err != nil {
141 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err)
142 }
143 knotstream.Start(ctx)
144
145 spindlestream, err := Spindlestream(ctx, config, d, enforcer)
146 if err != nil {
147 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err)
148 }
149 spindlestream.Start(ctx)
150
151 var notifiers []notify.Notifier
152
153 // Always add the database notifier
154 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res))
155
156 // Add other notifiers in production only
157 if !config.Core.Dev {
158 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog))
159 }
160 notifier := notify.NewMergedNotifier(notifiers...)
161
162 state := &State{
163 d,
164 notifier,
165 oauth,
166 enforcer,
167 pgs,
168 sess,
169 res,
170 posthog,
171 jc,
172 config,
173 repoResolver,
174 knotstream,
175 spindlestream,
176 slog.Default(),
177 validator,
178 }
179
180 return state, nil
181}
182
183func (s *State) Close() error {
184 // other close up logic goes here
185 return s.db.Close()
186}
187
188func (s *State) Favicon(w http.ResponseWriter, r *http.Request) {
189 w.Header().Set("Content-Type", "image/svg+xml")
190 w.Header().Set("Cache-Control", "public, max-age=31536000") // one year
191 w.Header().Set("ETag", `"favicon-svg-v1"`)
192
193 if match := r.Header.Get("If-None-Match"); match == `"favicon-svg-v1"` {
194 w.WriteHeader(http.StatusNotModified)
195 return
196 }
197
198 s.pages.Favicon(w)
199}
200
201// https://developer.mozilla.org/en-US/docs/Web/Progressive_web_apps/Manifest
202const manifestJson = `{
203 "name": "tangled",
204 "description": "tightly-knit social coding.",
205 "icons": [
206 {
207 "src": "/favicon.svg",
208 "sizes": "144x144"
209 }
210 ],
211 "start_url": "/",
212 "id": "org.tangled",
213
214 "display": "standalone",
215 "background_color": "#111827",
216 "theme_color": "#111827"
217}`
218
219func (p *State) PWAManifest(w http.ResponseWriter, r *http.Request) {
220 w.Header().Set("Content-Type", "application/json")
221 w.Write([]byte(manifestJson))
222}
223
224func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) {
225 user := s.oauth.GetUser(r)
226 s.pages.TermsOfService(w, pages.TermsOfServiceParams{
227 LoggedInUser: user,
228 })
229}
230
231func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) {
232 user := s.oauth.GetUser(r)
233 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{
234 LoggedInUser: user,
235 })
236}
237
238func (s *State) Brand(w http.ResponseWriter, r *http.Request) {
239 user := s.oauth.GetUser(r)
240 s.pages.Brand(w, pages.BrandParams{
241 LoggedInUser: user,
242 })
243}
244
245func (s *State) HomeOrTimeline(w http.ResponseWriter, r *http.Request) {
246 if s.oauth.GetUser(r) != nil {
247 s.Timeline(w, r)
248 return
249 }
250 s.Home(w, r)
251}
252
253func (s *State) Timeline(w http.ResponseWriter, r *http.Request) {
254 user := s.oauth.GetUser(r)
255
256 var userDid string
257 if user != nil {
258 userDid = user.Did
259 }
260 timeline, err := db.MakeTimeline(s.db, 50, userDid)
261 if err != nil {
262 log.Println(err)
263 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.")
264 }
265
266 repos, err := db.GetTopStarredReposLastWeek(s.db)
267 if err != nil {
268 log.Println(err)
269 s.pages.Notice(w, "topstarredrepos", "Unable to load.")
270 return
271 }
272
273 gfiLabel, err := db.GetLabelDefinition(s.db, db.FilterEq("at_uri", models.LabelGoodFirstIssue))
274 if err != nil {
275 // non-fatal
276 }
277
278 fmt.Println(s.pages.Timeline(w, pages.TimelineParams{
279 LoggedInUser: user,
280 Timeline: timeline,
281 Repos: repos,
282 GfiLabel: gfiLabel,
283 }))
284}
285
286func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) {
287 user := s.oauth.GetUser(r)
288 if user == nil {
289 return
290 }
291
292 l := s.logger.With("handler", "UpgradeBanner")
293 l = l.With("did", user.Did)
294 l = l.With("handle", user.Handle)
295
296 regs, err := db.GetRegistrations(
297 s.db,
298 db.FilterEq("did", user.Did),
299 db.FilterEq("needs_upgrade", 1),
300 )
301 if err != nil {
302 l.Error("non-fatal: failed to get registrations", "err", err)
303 }
304
305 spindles, err := db.GetSpindles(
306 s.db,
307 db.FilterEq("owner", user.Did),
308 db.FilterEq("needs_upgrade", 1),
309 )
310 if err != nil {
311 l.Error("non-fatal: failed to get spindles", "err", err)
312 }
313
314 if regs == nil && spindles == nil {
315 return
316 }
317
318 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{
319 Registrations: regs,
320 Spindles: spindles,
321 })
322}
323
324func (s *State) Home(w http.ResponseWriter, r *http.Request) {
325 timeline, err := db.MakeTimeline(s.db, 5, "")
326 if err != nil {
327 log.Println(err)
328 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.")
329 return
330 }
331
332 repos, err := db.GetTopStarredReposLastWeek(s.db)
333 if err != nil {
334 log.Println(err)
335 s.pages.Notice(w, "topstarredrepos", "Unable to load.")
336 return
337 }
338
339 s.pages.Home(w, pages.TimelineParams{
340 LoggedInUser: nil,
341 Timeline: timeline,
342 Repos: repos,
343 })
344}
345
346func (s *State) Keys(w http.ResponseWriter, r *http.Request) {
347 user := chi.URLParam(r, "user")
348 user = strings.TrimPrefix(user, "@")
349
350 if user == "" {
351 w.WriteHeader(http.StatusBadRequest)
352 return
353 }
354
355 id, err := s.idResolver.ResolveIdent(r.Context(), user)
356 if err != nil {
357 w.WriteHeader(http.StatusInternalServerError)
358 return
359 }
360
361 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String())
362 if err != nil {
363 w.WriteHeader(http.StatusNotFound)
364 return
365 }
366
367 if len(pubKeys) == 0 {
368 w.WriteHeader(http.StatusNotFound)
369 return
370 }
371
372 for _, k := range pubKeys {
373 key := strings.TrimRight(k.Key, "\n")
374 fmt.Fprintln(w, key)
375 }
376}
377
378func validateRepoName(name string) error {
379 // check for path traversal attempts
380 if name == "." || name == ".." ||
381 strings.Contains(name, "/") || strings.Contains(name, "\\") {
382 return fmt.Errorf("Repository name contains invalid path characters")
383 }
384
385 // check for sequences that could be used for traversal when normalized
386 if strings.Contains(name, "./") || strings.Contains(name, "../") ||
387 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") {
388 return fmt.Errorf("Repository name contains invalid path sequence")
389 }
390
391 // then continue with character validation
392 for _, char := range name {
393 if !((char >= 'a' && char <= 'z') ||
394 (char >= 'A' && char <= 'Z') ||
395 (char >= '0' && char <= '9') ||
396 char == '-' || char == '_' || char == '.') {
397 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores")
398 }
399 }
400
401 // additional check to prevent multiple sequential dots
402 if strings.Contains(name, "..") {
403 return fmt.Errorf("Repository name cannot contain sequential dots")
404 }
405
406 // if all checks pass
407 return nil
408}
409
410func stripGitExt(name string) string {
411 return strings.TrimSuffix(name, ".git")
412}
413
414func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) {
415 switch r.Method {
416 case http.MethodGet:
417 user := s.oauth.GetUser(r)
418 knots, err := s.enforcer.GetKnotsForUser(user.Did)
419 if err != nil {
420 s.pages.Notice(w, "repo", "Invalid user account.")
421 return
422 }
423
424 s.pages.NewRepo(w, pages.NewRepoParams{
425 LoggedInUser: user,
426 Knots: knots,
427 })
428
429 case http.MethodPost:
430 l := s.logger.With("handler", "NewRepo")
431
432 user := s.oauth.GetUser(r)
433 l = l.With("did", user.Did)
434 l = l.With("handle", user.Handle)
435
436 // form validation
437 domain := r.FormValue("domain")
438 if domain == "" {
439 s.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
440 return
441 }
442 l = l.With("knot", domain)
443
444 repoName := r.FormValue("name")
445 if repoName == "" {
446 s.pages.Notice(w, "repo", "Repository name cannot be empty.")
447 return
448 }
449
450 if err := validateRepoName(repoName); err != nil {
451 s.pages.Notice(w, "repo", err.Error())
452 return
453 }
454 repoName = stripGitExt(repoName)
455 l = l.With("repoName", repoName)
456
457 defaultBranch := r.FormValue("branch")
458 if defaultBranch == "" {
459 defaultBranch = "main"
460 }
461 l = l.With("defaultBranch", defaultBranch)
462
463 description := r.FormValue("description")
464
465 // ACL validation
466 ok, err := s.enforcer.E.Enforce(user.Did, domain, domain, "repo:create")
467 if err != nil || !ok {
468 l.Info("unauthorized")
469 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
470 return
471 }
472
473 // Check for existing repos
474 existingRepo, err := db.GetRepo(
475 s.db,
476 db.FilterEq("did", user.Did),
477 db.FilterEq("name", repoName),
478 )
479 if err == nil && existingRepo != nil {
480 l.Info("repo exists")
481 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot))
482 return
483 }
484
485 // create atproto record for this repo
486 rkey := tid.TID()
487 repo := &models.Repo{
488 Did: user.Did,
489 Name: repoName,
490 Knot: domain,
491 Rkey: rkey,
492 Description: description,
493 Created: time.Now(),
494 Labels: models.DefaultLabelDefs(),
495 }
496 record := repo.AsRecord()
497
498 xrpcClient, err := s.oauth.AuthorizedClient(r)
499 if err != nil {
500 l.Info("PDS write failed", "err", err)
501 s.pages.Notice(w, "repo", "Failed to write record to PDS.")
502 return
503 }
504
505 atresp, err := xrpcClient.RepoPutRecord(r.Context(), &comatproto.RepoPutRecord_Input{
506 Collection: tangled.RepoNSID,
507 Repo: user.Did,
508 Rkey: rkey,
509 Record: &lexutil.LexiconTypeDecoder{
510 Val: &record,
511 },
512 })
513 if err != nil {
514 l.Info("PDS write failed", "err", err)
515 s.pages.Notice(w, "repo", "Failed to announce repository creation.")
516 return
517 }
518
519 aturi := atresp.Uri
520 l = l.With("aturi", aturi)
521 l.Info("wrote to PDS")
522
523 tx, err := s.db.BeginTx(r.Context(), nil)
524 if err != nil {
525 l.Info("txn failed", "err", err)
526 s.pages.Notice(w, "repo", "Failed to save repository information.")
527 return
528 }
529
530 // The rollback function reverts a few things on failure:
531 // - the pending txn
532 // - the ACLs
533 // - the atproto record created
534 rollback := func() {
535 err1 := tx.Rollback()
536 err2 := s.enforcer.E.LoadPolicy()
537 err3 := rollbackRecord(context.Background(), aturi, xrpcClient)
538
539 // ignore txn complete errors, this is okay
540 if errors.Is(err1, sql.ErrTxDone) {
541 err1 = nil
542 }
543
544 if errs := errors.Join(err1, err2, err3); errs != nil {
545 l.Error("failed to rollback changes", "errs", errs)
546 return
547 }
548 }
549 defer rollback()
550
551 client, err := s.oauth.ServiceClient(
552 r,
553 oauth.WithService(domain),
554 oauth.WithLxm(tangled.RepoCreateNSID),
555 oauth.WithDev(s.config.Core.Dev),
556 )
557 if err != nil {
558 l.Error("service auth failed", "err", err)
559 s.pages.Notice(w, "repo", "Failed to reach PDS.")
560 return
561 }
562
563 xe := tangled.RepoCreate(
564 r.Context(),
565 client,
566 &tangled.RepoCreate_Input{
567 Rkey: rkey,
568 },
569 )
570 if err := xrpcclient.HandleXrpcErr(xe); err != nil {
571 l.Error("xrpc error", "xe", xe)
572 s.pages.Notice(w, "repo", err.Error())
573 return
574 }
575
576 err = db.AddRepo(tx, repo)
577 if err != nil {
578 l.Error("db write failed", "err", err)
579 s.pages.Notice(w, "repo", "Failed to save repository information.")
580 return
581 }
582
583 // acls
584 p, _ := securejoin.SecureJoin(user.Did, repoName)
585 err = s.enforcer.AddRepo(user.Did, domain, p)
586 if err != nil {
587 l.Error("acl setup failed", "err", err)
588 s.pages.Notice(w, "repo", "Failed to set up repository permissions.")
589 return
590 }
591
592 err = tx.Commit()
593 if err != nil {
594 l.Error("txn commit failed", "err", err)
595 http.Error(w, err.Error(), http.StatusInternalServerError)
596 return
597 }
598
599 err = s.enforcer.E.SavePolicy()
600 if err != nil {
601 l.Error("acl save failed", "err", err)
602 http.Error(w, err.Error(), http.StatusInternalServerError)
603 return
604 }
605
606 // reset the ATURI because the transaction completed successfully
607 aturi = ""
608
609 s.notifier.NewRepo(r.Context(), repo)
610 s.pages.HxLocation(w, fmt.Sprintf("/@%s/%s", user.Handle, repoName))
611 }
612}
613
614// this is used to rollback changes made to the PDS
615//
616// it is a no-op if the provided ATURI is empty
617func rollbackRecord(ctx context.Context, aturi string, xrpcc *xrpcclient.Client) error {
618 if aturi == "" {
619 return nil
620 }
621
622 parsed := syntax.ATURI(aturi)
623
624 collection := parsed.Collection().String()
625 repo := parsed.Authority().String()
626 rkey := parsed.RecordKey().String()
627
628 _, err := xrpcc.RepoDeleteRecord(ctx, &comatproto.RepoDeleteRecord_Input{
629 Collection: collection,
630 Repo: repo,
631 Rkey: rkey,
632 })
633 return err
634}
635
636func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver) error {
637 defaults := models.DefaultLabelDefs()
638 defaultLabels, err := db.GetLabelDefinitions(e, db.FilterIn("at_uri", defaults))
639 if err != nil {
640 return err
641 }
642 // already present
643 if len(defaultLabels) == len(defaults) {
644 return nil
645 }
646
647 labelDefs, err := models.FetchDefaultDefs(r)
648 if err != nil {
649 return err
650 }
651
652 // Insert each label definition to the database
653 for _, labelDef := range labelDefs {
654 _, err = db.AddLabelDefinition(e, &labelDef)
655 if err != nil {
656 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err)
657 }
658 }
659
660 return nil
661}