Monorepo for Tangled
1package state
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "net/http"
10 "strings"
11 "time"
12
13 "tangled.org/core/api/tangled"
14 "tangled.org/core/appview"
15 "tangled.org/core/appview/config"
16 "tangled.org/core/appview/db"
17 "tangled.org/core/appview/indexer"
18 "tangled.org/core/appview/mentions"
19 "tangled.org/core/appview/models"
20 "tangled.org/core/appview/notify"
21 dbnotify "tangled.org/core/appview/notify/db"
22 phnotify "tangled.org/core/appview/notify/posthog"
23 "tangled.org/core/appview/oauth"
24 "tangled.org/core/appview/pages"
25 "tangled.org/core/appview/reporesolver"
26 "tangled.org/core/appview/validator"
27 xrpcclient "tangled.org/core/appview/xrpcclient"
28 "tangled.org/core/eventconsumer"
29 "tangled.org/core/idresolver"
30 "tangled.org/core/jetstream"
31 "tangled.org/core/log"
32 tlog "tangled.org/core/log"
33 "tangled.org/core/orm"
34 "tangled.org/core/rbac"
35 "tangled.org/core/tid"
36
37 comatproto "github.com/bluesky-social/indigo/api/atproto"
38 atpclient "github.com/bluesky-social/indigo/atproto/client"
39 "github.com/bluesky-social/indigo/atproto/syntax"
40 lexutil "github.com/bluesky-social/indigo/lex/util"
41 securejoin "github.com/cyphar/filepath-securejoin"
42 "github.com/go-chi/chi/v5"
43 "github.com/posthog/posthog-go"
44)
45
46type State struct {
47 db *db.DB
48 notifier notify.Notifier
49 indexer *indexer.Indexer
50 oauth *oauth.OAuth
51 enforcer *rbac.Enforcer
52 pages *pages.Pages
53 idResolver *idresolver.Resolver
54 mentionsResolver *mentions.Resolver
55 posthog posthog.Client
56 jc *jetstream.JetstreamClient
57 config *config.Config
58 repoResolver *reporesolver.RepoResolver
59 knotstream *eventconsumer.Consumer
60 spindlestream *eventconsumer.Consumer
61 logger *slog.Logger
62 validator *validator.Validator
63}
64
65func Make(ctx context.Context, config *config.Config) (*State, error) {
66 logger := tlog.FromContext(ctx)
67
68 d, err := db.Make(ctx, config.Core.DbPath)
69 if err != nil {
70 return nil, fmt.Errorf("failed to create db: %w", err)
71 }
72
73 indexer := indexer.New(log.SubLogger(logger, "indexer"))
74 err = indexer.Init(ctx, d)
75 if err != nil {
76 return nil, fmt.Errorf("failed to create indexer: %w", err)
77 }
78
79 enforcer, err := rbac.NewEnforcer(config.Core.DbPath)
80 if err != nil {
81 return nil, fmt.Errorf("failed to create enforcer: %w", err)
82 }
83
84 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL)
85 if err != nil {
86 logger.Error("failed to create redis resolver", "err", err)
87 res = idresolver.DefaultResolver(config.Plc.PLCURL)
88 }
89
90 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint})
91 if err != nil {
92 return nil, fmt.Errorf("failed to create posthog client: %w", err)
93 }
94
95 pages := pages.NewPages(config, res, d, log.SubLogger(logger, "pages"))
96 oauth, err := oauth.New(config, posthog, d, enforcer, res, log.SubLogger(logger, "oauth"))
97 if err != nil {
98 return nil, fmt.Errorf("failed to start oauth handler: %w", err)
99 }
100 validator := validator.New(d, res, enforcer)
101
102 repoResolver := reporesolver.New(config, enforcer, d)
103
104 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver"))
105
106 wrapper := db.DbWrapper{Execer: d}
107 jc, err := jetstream.NewJetstreamClient(
108 config.Jetstream.Endpoint,
109 "appview",
110 []string{
111 tangled.GraphFollowNSID,
112 tangled.FeedStarNSID,
113 tangled.PublicKeyNSID,
114 tangled.RepoArtifactNSID,
115 tangled.ActorProfileNSID,
116 tangled.SpindleMemberNSID,
117 tangled.SpindleNSID,
118 tangled.StringNSID,
119 tangled.RepoIssueNSID,
120 tangled.RepoIssueCommentNSID,
121 tangled.LabelDefinitionNSID,
122 tangled.LabelOpNSID,
123 },
124 nil,
125 tlog.SubLogger(logger, "jetstream"),
126 wrapper,
127 false,
128
129 // in-memory filter is inapplicable to appview so
130 // we'll never log dids anyway.
131 false,
132 )
133 if err != nil {
134 return nil, fmt.Errorf("failed to create jetstream client: %w", err)
135 }
136
137 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil {
138 return nil, fmt.Errorf("failed to backfill default label defs: %w", err)
139 }
140
141 ingester := appview.Ingester{
142 Db: wrapper,
143 Enforcer: enforcer,
144 IdResolver: res,
145 Config: config,
146 Logger: log.SubLogger(logger, "ingester"),
147 Validator: validator,
148 }
149 err = jc.StartJetstream(ctx, ingester.Ingest())
150 if err != nil {
151 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err)
152 }
153
154 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog)
155 if err != nil {
156 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err)
157 }
158 knotstream.Start(ctx)
159
160 spindlestream, err := Spindlestream(ctx, config, d, enforcer)
161 if err != nil {
162 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err)
163 }
164 spindlestream.Start(ctx)
165
166 var notifiers []notify.Notifier
167
168 // Always add the database notifier
169 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res))
170
171 // Add other notifiers in production only
172 if !config.Core.Dev {
173 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog))
174 }
175 notifiers = append(notifiers, indexer)
176 notifier := notify.NewMergedNotifier(notifiers)
177 notifier = notify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify"))
178
179 state := &State{
180 d,
181 notifier,
182 indexer,
183 oauth,
184 enforcer,
185 pages,
186 res,
187 mentionsResolver,
188 posthog,
189 jc,
190 config,
191 repoResolver,
192 knotstream,
193 spindlestream,
194 logger,
195 validator,
196 }
197
198 return state, nil
199}
200
201func (s *State) Close() error {
202 // other close up logic goes here
203 return s.db.Close()
204}
205
206func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) {
207 w.Header().Set("Content-Type", "text/plain")
208 w.Header().Set("Cache-Control", "public, max-age=86400") // one day
209
210 robotsTxt := `# Hello, Tanglers!
211User-agent: *
212Allow: /
213Disallow: /*/*/settings
214Disallow: /settings
215Disallow: /*/*/compare
216Disallow: /*/*/fork
217
218Crawl-delay: 1
219`
220 w.Write([]byte(robotsTxt))
221}
222
223func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) {
224 user := s.oauth.GetMultiAccountUser(r)
225 s.pages.TermsOfService(w, pages.TermsOfServiceParams{
226 LoggedInUser: user,
227 })
228}
229
230func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) {
231 user := s.oauth.GetMultiAccountUser(r)
232 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{
233 LoggedInUser: user,
234 })
235}
236
237func (s *State) Brand(w http.ResponseWriter, r *http.Request) {
238 user := s.oauth.GetMultiAccountUser(r)
239 s.pages.Brand(w, pages.BrandParams{
240 LoggedInUser: user,
241 })
242}
243
244func (s *State) HomeOrTimeline(w http.ResponseWriter, r *http.Request) {
245 if s.oauth.GetMultiAccountUser(r) != nil {
246 s.Timeline(w, r)
247 return
248 }
249 s.Home(w, r)
250}
251
252func (s *State) Timeline(w http.ResponseWriter, r *http.Request) {
253 user := s.oauth.GetMultiAccountUser(r)
254
255 // TODO: set this flag based on the UI
256 filtered := false
257
258 var userDid string
259 if user != nil && user.Active != nil {
260 userDid = user.Active.Did
261 }
262 timeline, err := db.MakeTimeline(s.db, 50, userDid, filtered)
263 if err != nil {
264 s.logger.Error("failed to make timeline", "err", err)
265 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.")
266 }
267
268 repos, err := db.GetTopStarredReposLastWeek(s.db)
269 if err != nil {
270 s.logger.Error("failed to get top starred repos", "err", err)
271 s.pages.Notice(w, "topstarredrepos", "Unable to load.")
272 return
273 }
274
275 gfiLabel, err := db.GetLabelDefinition(s.db, orm.FilterEq("at_uri", s.config.Label.GoodFirstIssue))
276 if err != nil {
277 // non-fatal
278 }
279
280 s.pages.Timeline(w, pages.TimelineParams{
281 LoggedInUser: user,
282 Timeline: timeline,
283 Repos: repos,
284 GfiLabel: gfiLabel,
285 })
286}
287
288func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) {
289 user := s.oauth.GetMultiAccountUser(r)
290 if user == nil {
291 return
292 }
293
294 l := s.logger.With("handler", "UpgradeBanner")
295 l = l.With("did", user.Active.Did)
296
297 regs, err := db.GetRegistrations(
298 s.db,
299 orm.FilterEq("did", user.Active.Did),
300 orm.FilterEq("needs_upgrade", 1),
301 )
302 if err != nil {
303 l.Error("non-fatal: failed to get registrations", "err", err)
304 }
305
306 spindles, err := db.GetSpindles(
307 s.db,
308 orm.FilterEq("owner", user.Active.Did),
309 orm.FilterEq("needs_upgrade", 1),
310 )
311 if err != nil {
312 l.Error("non-fatal: failed to get spindles", "err", err)
313 }
314
315 if regs == nil && spindles == nil {
316 return
317 }
318
319 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{
320 Registrations: regs,
321 Spindles: spindles,
322 })
323}
324
325func (s *State) Home(w http.ResponseWriter, r *http.Request) {
326 // TODO: set this flag based on the UI
327 filtered := false
328
329 timeline, err := db.MakeTimeline(s.db, 5, "", filtered)
330 if err != nil {
331 s.logger.Error("failed to make timeline", "err", err)
332 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.")
333 return
334 }
335
336 repos, err := db.GetTopStarredReposLastWeek(s.db)
337 if err != nil {
338 s.logger.Error("failed to get top starred repos", "err", err)
339 s.pages.Notice(w, "topstarredrepos", "Unable to load.")
340 return
341 }
342
343 s.pages.Home(w, pages.TimelineParams{
344 LoggedInUser: nil,
345 Timeline: timeline,
346 Repos: repos,
347 })
348}
349
350func (s *State) Keys(w http.ResponseWriter, r *http.Request) {
351 user := chi.URLParam(r, "user")
352 user = strings.TrimPrefix(user, "@")
353
354 if user == "" {
355 w.WriteHeader(http.StatusBadRequest)
356 return
357 }
358
359 id, err := s.idResolver.ResolveIdent(r.Context(), user)
360 if err != nil {
361 w.WriteHeader(http.StatusInternalServerError)
362 return
363 }
364
365 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String())
366 if err != nil {
367 s.logger.Error("failed to get public keys", "err", err)
368 http.Error(w, "failed to get public keys", http.StatusInternalServerError)
369 return
370 }
371
372 if len(pubKeys) == 0 {
373 w.WriteHeader(http.StatusNoContent)
374 return
375 }
376
377 for _, k := range pubKeys {
378 key := strings.TrimRight(k.Key, "\n")
379 fmt.Fprintln(w, key)
380 }
381}
382
383func validateRepoName(name string) error {
384 // check for path traversal attempts
385 if name == "." || name == ".." ||
386 strings.Contains(name, "/") || strings.Contains(name, "\\") {
387 return fmt.Errorf("Repository name contains invalid path characters")
388 }
389
390 // check for sequences that could be used for traversal when normalized
391 if strings.Contains(name, "./") || strings.Contains(name, "../") ||
392 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") {
393 return fmt.Errorf("Repository name contains invalid path sequence")
394 }
395
396 // then continue with character validation
397 for _, char := range name {
398 if !((char >= 'a' && char <= 'z') ||
399 (char >= 'A' && char <= 'Z') ||
400 (char >= '0' && char <= '9') ||
401 char == '-' || char == '_' || char == '.') {
402 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores")
403 }
404 }
405
406 // additional check to prevent multiple sequential dots
407 if strings.Contains(name, "..") {
408 return fmt.Errorf("Repository name cannot contain sequential dots")
409 }
410
411 // if all checks pass
412 return nil
413}
414
415func stripGitExt(name string) string {
416 return strings.TrimSuffix(name, ".git")
417}
418
419func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) {
420 switch r.Method {
421 case http.MethodGet:
422 user := s.oauth.GetMultiAccountUser(r)
423 knots, err := s.enforcer.GetKnotsForUser(user.Active.Did)
424 if err != nil {
425 s.pages.Notice(w, "repo", "Invalid user account.")
426 return
427 }
428
429 s.pages.NewRepo(w, pages.NewRepoParams{
430 LoggedInUser: user,
431 Knots: knots,
432 })
433
434 case http.MethodPost:
435 l := s.logger.With("handler", "NewRepo")
436
437 user := s.oauth.GetMultiAccountUser(r)
438 l = l.With("did", user.Active.Did)
439
440 // form validation
441 domain := r.FormValue("domain")
442 if domain == "" {
443 s.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
444 return
445 }
446 l = l.With("knot", domain)
447
448 repoName := r.FormValue("name")
449 if repoName == "" {
450 s.pages.Notice(w, "repo", "Repository name cannot be empty.")
451 return
452 }
453
454 if err := validateRepoName(repoName); err != nil {
455 s.pages.Notice(w, "repo", err.Error())
456 return
457 }
458 repoName = stripGitExt(repoName)
459 l = l.With("repoName", repoName)
460
461 defaultBranch := r.FormValue("branch")
462 if defaultBranch == "" {
463 defaultBranch = "main"
464 }
465 l = l.With("defaultBranch", defaultBranch)
466
467 description := r.FormValue("description")
468
469 // ACL validation
470 ok, err := s.enforcer.E.Enforce(user.Active.Did, domain, domain, "repo:create")
471 if err != nil || !ok {
472 l.Info("unauthorized")
473 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
474 return
475 }
476
477 // Check for existing repos
478 existingRepo, err := db.GetRepo(
479 s.db,
480 orm.FilterEq("did", user.Active.Did),
481 orm.FilterEq("name", repoName),
482 )
483 if err == nil && existingRepo != nil {
484 l.Info("repo exists")
485 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot))
486 return
487 }
488
489 // create atproto record for this repo
490 rkey := tid.TID()
491 repo := &models.Repo{
492 Did: user.Active.Did,
493 Name: repoName,
494 Knot: domain,
495 Rkey: rkey,
496 Description: description,
497 Created: time.Now(),
498 Labels: s.config.Label.DefaultLabelDefs,
499 }
500 record := repo.AsRecord()
501
502 atpClient, err := s.oauth.AuthorizedClient(r)
503 if err != nil {
504 l.Info("PDS write failed", "err", err)
505 s.pages.Notice(w, "repo", "Failed to write record to PDS.")
506 return
507 }
508
509 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{
510 Collection: tangled.RepoNSID,
511 Repo: user.Active.Did,
512 Rkey: rkey,
513 Record: &lexutil.LexiconTypeDecoder{
514 Val: &record,
515 },
516 })
517 if err != nil {
518 l.Info("PDS write failed", "err", err)
519 s.pages.Notice(w, "repo", "Failed to announce repository creation.")
520 return
521 }
522
523 aturi := atresp.Uri
524 l = l.With("aturi", aturi)
525 l.Info("wrote to PDS")
526
527 tx, err := s.db.BeginTx(r.Context(), nil)
528 if err != nil {
529 l.Info("txn failed", "err", err)
530 s.pages.Notice(w, "repo", "Failed to save repository information.")
531 return
532 }
533
534 // The rollback function reverts a few things on failure:
535 // - the pending txn
536 // - the ACLs
537 // - the atproto record created
538 rollback := func() {
539 err1 := tx.Rollback()
540 err2 := s.enforcer.E.LoadPolicy()
541 err3 := rollbackRecord(context.Background(), aturi, atpClient)
542
543 // ignore txn complete errors, this is okay
544 if errors.Is(err1, sql.ErrTxDone) {
545 err1 = nil
546 }
547
548 if errs := errors.Join(err1, err2, err3); errs != nil {
549 l.Error("failed to rollback changes", "errs", errs)
550 return
551 }
552 }
553 defer rollback()
554
555 client, err := s.oauth.ServiceClient(
556 r,
557 oauth.WithService(domain),
558 oauth.WithLxm(tangled.RepoCreateNSID),
559 oauth.WithDev(s.config.Core.Dev),
560 )
561 if err != nil {
562 l.Error("service auth failed", "err", err)
563 s.pages.Notice(w, "repo", "Failed to reach PDS.")
564 return
565 }
566
567 xe := tangled.RepoCreate(
568 r.Context(),
569 client,
570 &tangled.RepoCreate_Input{
571 Rkey: rkey,
572 },
573 )
574 if err := xrpcclient.HandleXrpcErr(xe); err != nil {
575 l.Error("xrpc error", "xe", xe)
576 s.pages.Notice(w, "repo", err.Error())
577 return
578 }
579
580 err = db.AddRepo(tx, repo)
581 if err != nil {
582 l.Error("db write failed", "err", err)
583 s.pages.Notice(w, "repo", "Failed to save repository information.")
584 return
585 }
586
587 // acls
588 p, _ := securejoin.SecureJoin(user.Active.Did, repoName)
589 err = s.enforcer.AddRepo(user.Active.Did, domain, p)
590 if err != nil {
591 l.Error("acl setup failed", "err", err)
592 s.pages.Notice(w, "repo", "Failed to set up repository permissions.")
593 return
594 }
595
596 err = tx.Commit()
597 if err != nil {
598 l.Error("txn commit failed", "err", err)
599 http.Error(w, err.Error(), http.StatusInternalServerError)
600 return
601 }
602
603 err = s.enforcer.E.SavePolicy()
604 if err != nil {
605 l.Error("acl save failed", "err", err)
606 http.Error(w, err.Error(), http.StatusInternalServerError)
607 return
608 }
609
610 // reset the ATURI because the transaction completed successfully
611 aturi = ""
612
613 s.notifier.NewRepo(r.Context(), repo)
614 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Active.Did, repoName))
615 }
616}
617
618// this is used to rollback changes made to the PDS
619//
620// it is a no-op if the provided ATURI is empty
621func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error {
622 if aturi == "" {
623 return nil
624 }
625
626 parsed := syntax.ATURI(aturi)
627
628 collection := parsed.Collection().String()
629 repo := parsed.Authority().String()
630 rkey := parsed.RecordKey().String()
631
632 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{
633 Collection: collection,
634 Repo: repo,
635 Rkey: rkey,
636 })
637 return err
638}
639
640func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error {
641 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults))
642 if err != nil {
643 return err
644 }
645 // already present
646 if len(defaultLabels) == len(defaults) {
647 return nil
648 }
649
650 labelDefs, err := models.FetchLabelDefs(r, defaults)
651 if err != nil {
652 return err
653 }
654
655 // Insert each label definition to the database
656 for _, labelDef := range labelDefs {
657 _, err = db.AddLabelDefinition(e, &labelDef)
658 if err != nil {
659 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err)
660 }
661 }
662
663 return nil
664}