Monorepo for Tangled
1package state
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "net/http"
10 "strings"
11 "time"
12
13 "tangled.org/core/api/tangled"
14 "tangled.org/core/appview"
15 "tangled.org/core/appview/config"
16 "tangled.org/core/appview/db"
17 "tangled.org/core/appview/indexer"
18 "tangled.org/core/appview/mentions"
19 "tangled.org/core/appview/models"
20 "tangled.org/core/appview/notify"
21 dbnotify "tangled.org/core/appview/notify/db"
22 phnotify "tangled.org/core/appview/notify/posthog"
23 "tangled.org/core/appview/oauth"
24 "tangled.org/core/appview/pages"
25 "tangled.org/core/appview/reporesolver"
26 "tangled.org/core/appview/validator"
27 xrpcclient "tangled.org/core/appview/xrpcclient"
28 "tangled.org/core/eventconsumer"
29 "tangled.org/core/idresolver"
30 "tangled.org/core/jetstream"
31 "tangled.org/core/log"
32 tlog "tangled.org/core/log"
33 "tangled.org/core/orm"
34 "tangled.org/core/rbac"
35 "tangled.org/core/tid"
36
37 comatproto "github.com/bluesky-social/indigo/api/atproto"
38 atpclient "github.com/bluesky-social/indigo/atproto/client"
39 "github.com/bluesky-social/indigo/atproto/syntax"
40 lexutil "github.com/bluesky-social/indigo/lex/util"
41 securejoin "github.com/cyphar/filepath-securejoin"
42 "github.com/go-chi/chi/v5"
43 "github.com/posthog/posthog-go"
44)
45
46type State struct {
47 db *db.DB
48 notifier notify.Notifier
49 indexer *indexer.Indexer
50 oauth *oauth.OAuth
51 enforcer *rbac.Enforcer
52 pages *pages.Pages
53 idResolver *idresolver.Resolver
54 mentionsResolver *mentions.Resolver
55 posthog posthog.Client
56 jc *jetstream.JetstreamClient
57 config *config.Config
58 repoResolver *reporesolver.RepoResolver
59 knotstream *eventconsumer.Consumer
60 spindlestream *eventconsumer.Consumer
61 logger *slog.Logger
62 validator *validator.Validator
63}
64
65func Make(ctx context.Context, config *config.Config) (*State, error) {
66 logger := tlog.FromContext(ctx)
67
68 d, err := db.Make(ctx, config.Core.DbPath)
69 if err != nil {
70 return nil, fmt.Errorf("failed to create db: %w", err)
71 }
72
73 indexer := indexer.New(log.SubLogger(logger, "indexer"))
74 err = indexer.Init(ctx, d)
75 if err != nil {
76 return nil, fmt.Errorf("failed to create indexer: %w", err)
77 }
78
79 enforcer, err := rbac.NewEnforcer(config.Core.DbPath)
80 if err != nil {
81 return nil, fmt.Errorf("failed to create enforcer: %w", err)
82 }
83
84 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL)
85 if err != nil {
86 logger.Error("failed to create redis resolver", "err", err)
87 res = idresolver.DefaultResolver(config.Plc.PLCURL)
88 }
89
90 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint})
91 if err != nil {
92 return nil, fmt.Errorf("failed to create posthog client: %w", err)
93 }
94
95 pages := pages.NewPages(config, res, d, log.SubLogger(logger, "pages"))
96 oauth, err := oauth.New(config, posthog, d, enforcer, res, log.SubLogger(logger, "oauth"))
97 if err != nil {
98 return nil, fmt.Errorf("failed to start oauth handler: %w", err)
99 }
100 validator := validator.New(d, res, enforcer)
101
102 repoResolver := reporesolver.New(config, enforcer, d)
103
104 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver"))
105
106 wrapper := db.DbWrapper{Execer: d}
107 jc, err := jetstream.NewJetstreamClient(
108 config.Jetstream.Endpoint,
109 "appview",
110 []string{
111 tangled.GraphFollowNSID,
112 tangled.FeedStarNSID,
113 tangled.PublicKeyNSID,
114 tangled.RepoArtifactNSID,
115 tangled.ActorProfileNSID,
116 tangled.SpindleMemberNSID,
117 tangled.SpindleNSID,
118 tangled.StringNSID,
119 tangled.RepoIssueNSID,
120 tangled.RepoIssueCommentNSID,
121 tangled.LabelDefinitionNSID,
122 tangled.LabelOpNSID,
123 },
124 nil,
125 tlog.SubLogger(logger, "jetstream"),
126 wrapper,
127 false,
128
129 // in-memory filter is inapplicable to appview so
130 // we'll never log dids anyway.
131 false,
132 )
133 if err != nil {
134 return nil, fmt.Errorf("failed to create jetstream client: %w", err)
135 }
136
137 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil {
138 return nil, fmt.Errorf("failed to backfill default label defs: %w", err)
139 }
140
141 ingester := appview.Ingester{
142 Db: wrapper,
143 Enforcer: enforcer,
144 IdResolver: res,
145 Config: config,
146 Logger: log.SubLogger(logger, "ingester"),
147 Validator: validator,
148 }
149 err = jc.StartJetstream(ctx, ingester.Ingest())
150 if err != nil {
151 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err)
152 }
153
154 var notifiers []notify.Notifier
155
156 // Always add the database notifier
157 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res))
158
159 // Add other notifiers in production only
160 if !config.Core.Dev {
161 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog))
162 }
163 notifiers = append(notifiers, indexer)
164
165 // Add webhook notifier
166 notifiers = append(notifiers, notify.NewWebhookNotifier(d))
167
168 notifier := notify.NewMergedNotifier(notifiers)
169 notifier = notify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify"))
170
171 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog, notifier)
172 if err != nil {
173 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err)
174 }
175 knotstream.Start(ctx)
176
177 spindlestream, err := Spindlestream(ctx, config, d, enforcer)
178 if err != nil {
179 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err)
180 }
181 spindlestream.Start(ctx)
182
183 state := &State{
184 d,
185 notifier,
186 indexer,
187 oauth,
188 enforcer,
189 pages,
190 res,
191 mentionsResolver,
192 posthog,
193 jc,
194 config,
195 repoResolver,
196 knotstream,
197 spindlestream,
198 logger,
199 validator,
200 }
201
202 return state, nil
203}
204
205func (s *State) Close() error {
206 // other close up logic goes here
207 return s.db.Close()
208}
209
210func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) {
211 w.Header().Set("Content-Type", "text/plain")
212 w.Header().Set("Cache-Control", "public, max-age=86400") // one day
213
214 robotsTxt := `# Hello, Tanglers!
215User-agent: *
216Allow: /
217Disallow: /*/*/settings
218Disallow: /settings
219Disallow: /*/*/compare
220Disallow: /*/*/fork
221
222Crawl-delay: 1
223`
224 w.Write([]byte(robotsTxt))
225}
226
227func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) {
228 user := s.oauth.GetMultiAccountUser(r)
229 s.pages.TermsOfService(w, pages.TermsOfServiceParams{
230 LoggedInUser: user,
231 })
232}
233
234func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) {
235 user := s.oauth.GetMultiAccountUser(r)
236 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{
237 LoggedInUser: user,
238 })
239}
240
241func (s *State) Brand(w http.ResponseWriter, r *http.Request) {
242 user := s.oauth.GetMultiAccountUser(r)
243 s.pages.Brand(w, pages.BrandParams{
244 LoggedInUser: user,
245 })
246}
247
248func (s *State) HomeOrTimeline(w http.ResponseWriter, r *http.Request) {
249 if s.oauth.GetMultiAccountUser(r) != nil {
250 s.Timeline(w, r)
251 return
252 }
253 s.Home(w, r)
254}
255
256func (s *State) Timeline(w http.ResponseWriter, r *http.Request) {
257 user := s.oauth.GetMultiAccountUser(r)
258
259 // TODO: set this flag based on the UI
260 filtered := false
261
262 var userDid string
263 if user != nil && user.Active != nil {
264 userDid = user.Active.Did
265 }
266 timeline, err := db.MakeTimeline(s.db, 50, userDid, filtered)
267 if err != nil {
268 s.logger.Error("failed to make timeline", "err", err)
269 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.")
270 }
271
272 repos, err := db.GetTopStarredReposLastWeek(s.db)
273 if err != nil {
274 s.logger.Error("failed to get top starred repos", "err", err)
275 s.pages.Notice(w, "topstarredrepos", "Unable to load.")
276 return
277 }
278
279 gfiLabel, err := db.GetLabelDefinition(s.db, orm.FilterEq("at_uri", s.config.Label.GoodFirstIssue))
280 if err != nil {
281 // non-fatal
282 }
283
284 s.pages.Timeline(w, pages.TimelineParams{
285 LoggedInUser: user,
286 Timeline: timeline,
287 Repos: repos,
288 GfiLabel: gfiLabel,
289 })
290}
291
292func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) {
293 user := s.oauth.GetMultiAccountUser(r)
294 if user == nil {
295 return
296 }
297
298 l := s.logger.With("handler", "UpgradeBanner")
299 l = l.With("did", user.Active.Did)
300
301 regs, err := db.GetRegistrations(
302 s.db,
303 orm.FilterEq("did", user.Active.Did),
304 orm.FilterEq("needs_upgrade", 1),
305 )
306 if err != nil {
307 l.Error("non-fatal: failed to get registrations", "err", err)
308 }
309
310 spindles, err := db.GetSpindles(
311 s.db,
312 orm.FilterEq("owner", user.Active.Did),
313 orm.FilterEq("needs_upgrade", 1),
314 )
315 if err != nil {
316 l.Error("non-fatal: failed to get spindles", "err", err)
317 }
318
319 if regs == nil && spindles == nil {
320 return
321 }
322
323 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{
324 Registrations: regs,
325 Spindles: spindles,
326 })
327}
328
329func (s *State) Home(w http.ResponseWriter, r *http.Request) {
330 // TODO: set this flag based on the UI
331 filtered := false
332
333 timeline, err := db.MakeTimeline(s.db, 5, "", filtered)
334 if err != nil {
335 s.logger.Error("failed to make timeline", "err", err)
336 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.")
337 return
338 }
339
340 repos, err := db.GetTopStarredReposLastWeek(s.db)
341 if err != nil {
342 s.logger.Error("failed to get top starred repos", "err", err)
343 s.pages.Notice(w, "topstarredrepos", "Unable to load.")
344 return
345 }
346
347 s.pages.Home(w, pages.TimelineParams{
348 LoggedInUser: nil,
349 Timeline: timeline,
350 Repos: repos,
351 })
352}
353
354func (s *State) Keys(w http.ResponseWriter, r *http.Request) {
355 user := chi.URLParam(r, "user")
356 user = strings.TrimPrefix(user, "@")
357
358 if user == "" {
359 w.WriteHeader(http.StatusBadRequest)
360 return
361 }
362
363 id, err := s.idResolver.ResolveIdent(r.Context(), user)
364 if err != nil {
365 w.WriteHeader(http.StatusInternalServerError)
366 return
367 }
368
369 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String())
370 if err != nil {
371 s.logger.Error("failed to get public keys", "err", err)
372 http.Error(w, "failed to get public keys", http.StatusInternalServerError)
373 return
374 }
375
376 if len(pubKeys) == 0 {
377 w.WriteHeader(http.StatusNoContent)
378 return
379 }
380
381 for _, k := range pubKeys {
382 key := strings.TrimRight(k.Key, "\n")
383 fmt.Fprintln(w, key)
384 }
385}
386
387func validateRepoName(name string) error {
388 // check for path traversal attempts
389 if name == "." || name == ".." ||
390 strings.Contains(name, "/") || strings.Contains(name, "\\") {
391 return fmt.Errorf("Repository name contains invalid path characters")
392 }
393
394 // check for sequences that could be used for traversal when normalized
395 if strings.Contains(name, "./") || strings.Contains(name, "../") ||
396 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") {
397 return fmt.Errorf("Repository name contains invalid path sequence")
398 }
399
400 // then continue with character validation
401 for _, char := range name {
402 if !((char >= 'a' && char <= 'z') ||
403 (char >= 'A' && char <= 'Z') ||
404 (char >= '0' && char <= '9') ||
405 char == '-' || char == '_' || char == '.') {
406 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores")
407 }
408 }
409
410 // additional check to prevent multiple sequential dots
411 if strings.Contains(name, "..") {
412 return fmt.Errorf("Repository name cannot contain sequential dots")
413 }
414
415 // if all checks pass
416 return nil
417}
418
419func stripGitExt(name string) string {
420 return strings.TrimSuffix(name, ".git")
421}
422
423func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) {
424 switch r.Method {
425 case http.MethodGet:
426 user := s.oauth.GetMultiAccountUser(r)
427 knots, err := s.enforcer.GetKnotsForUser(user.Active.Did)
428 if err != nil {
429 s.pages.Notice(w, "repo", "Invalid user account.")
430 return
431 }
432
433 s.pages.NewRepo(w, pages.NewRepoParams{
434 LoggedInUser: user,
435 Knots: knots,
436 })
437
438 case http.MethodPost:
439 l := s.logger.With("handler", "NewRepo")
440
441 user := s.oauth.GetMultiAccountUser(r)
442 l = l.With("did", user.Active.Did)
443
444 // form validation
445 domain := r.FormValue("domain")
446 if domain == "" {
447 s.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
448 return
449 }
450 l = l.With("knot", domain)
451
452 repoName := r.FormValue("name")
453 if repoName == "" {
454 s.pages.Notice(w, "repo", "Repository name cannot be empty.")
455 return
456 }
457
458 if err := validateRepoName(repoName); err != nil {
459 s.pages.Notice(w, "repo", err.Error())
460 return
461 }
462 repoName = stripGitExt(repoName)
463 l = l.With("repoName", repoName)
464
465 defaultBranch := r.FormValue("branch")
466 if defaultBranch == "" {
467 defaultBranch = "main"
468 }
469 l = l.With("defaultBranch", defaultBranch)
470
471 description := r.FormValue("description")
472 if len([]rune(description)) > 140 {
473 s.pages.Notice(w, "repo", "Description must be 140 characters or fewer.")
474 return
475 }
476
477 // ACL validation
478 ok, err := s.enforcer.E.Enforce(user.Active.Did, domain, domain, "repo:create")
479 if err != nil || !ok {
480 l.Info("unauthorized")
481 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
482 return
483 }
484
485 // Check for existing repos
486 existingRepo, err := db.GetRepo(
487 s.db,
488 orm.FilterEq("did", user.Active.Did),
489 orm.FilterEq("name", repoName),
490 )
491 if err == nil && existingRepo != nil {
492 l.Info("repo exists")
493 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot))
494 return
495 }
496
497 // create atproto record for this repo
498 rkey := tid.TID()
499 repo := &models.Repo{
500 Did: user.Active.Did,
501 Name: repoName,
502 Knot: domain,
503 Rkey: rkey,
504 Description: description,
505 Created: time.Now(),
506 Labels: s.config.Label.DefaultLabelDefs,
507 }
508 record := repo.AsRecord()
509
510 atpClient, err := s.oauth.AuthorizedClient(r)
511 if err != nil {
512 l.Info("PDS write failed", "err", err)
513 s.pages.Notice(w, "repo", "Failed to write record to PDS.")
514 return
515 }
516
517 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{
518 Collection: tangled.RepoNSID,
519 Repo: user.Active.Did,
520 Rkey: rkey,
521 Record: &lexutil.LexiconTypeDecoder{
522 Val: &record,
523 },
524 })
525 if err != nil {
526 l.Info("PDS write failed", "err", err)
527 s.pages.Notice(w, "repo", "Failed to announce repository creation.")
528 return
529 }
530
531 aturi := atresp.Uri
532 l = l.With("aturi", aturi)
533 l.Info("wrote to PDS")
534
535 tx, err := s.db.BeginTx(r.Context(), nil)
536 if err != nil {
537 l.Info("txn failed", "err", err)
538 s.pages.Notice(w, "repo", "Failed to save repository information.")
539 return
540 }
541
542 // The rollback function reverts a few things on failure:
543 // - the pending txn
544 // - the ACLs
545 // - the atproto record created
546 rollback := func() {
547 err1 := tx.Rollback()
548 err2 := s.enforcer.E.LoadPolicy()
549 err3 := rollbackRecord(context.Background(), aturi, atpClient)
550
551 // ignore txn complete errors, this is okay
552 if errors.Is(err1, sql.ErrTxDone) {
553 err1 = nil
554 }
555
556 if errs := errors.Join(err1, err2, err3); errs != nil {
557 l.Error("failed to rollback changes", "errs", errs)
558 return
559 }
560 }
561 defer rollback()
562
563 client, err := s.oauth.ServiceClient(
564 r,
565 oauth.WithService(domain),
566 oauth.WithLxm(tangled.RepoCreateNSID),
567 oauth.WithDev(s.config.Core.Dev),
568 )
569 if err != nil {
570 l.Error("service auth failed", "err", err)
571 s.pages.Notice(w, "repo", "Failed to reach PDS.")
572 return
573 }
574
575 xe := tangled.RepoCreate(
576 r.Context(),
577 client,
578 &tangled.RepoCreate_Input{
579 Rkey: rkey,
580 },
581 )
582 if err := xrpcclient.HandleXrpcErr(xe); err != nil {
583 l.Error("xrpc error", "xe", xe)
584 s.pages.Notice(w, "repo", err.Error())
585 return
586 }
587
588 err = db.AddRepo(tx, repo)
589 if err != nil {
590 l.Error("db write failed", "err", err)
591 s.pages.Notice(w, "repo", "Failed to save repository information.")
592 return
593 }
594
595 // acls
596 p, _ := securejoin.SecureJoin(user.Active.Did, repoName)
597 err = s.enforcer.AddRepo(user.Active.Did, domain, p)
598 if err != nil {
599 l.Error("acl setup failed", "err", err)
600 s.pages.Notice(w, "repo", "Failed to set up repository permissions.")
601 return
602 }
603
604 err = tx.Commit()
605 if err != nil {
606 l.Error("txn commit failed", "err", err)
607 http.Error(w, err.Error(), http.StatusInternalServerError)
608 return
609 }
610
611 err = s.enforcer.E.SavePolicy()
612 if err != nil {
613 l.Error("acl save failed", "err", err)
614 http.Error(w, err.Error(), http.StatusInternalServerError)
615 return
616 }
617
618 // reset the ATURI because the transaction completed successfully
619 aturi = ""
620
621 s.notifier.NewRepo(r.Context(), repo)
622 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Active.Did, repoName))
623 }
624}
625
626// this is used to rollback changes made to the PDS
627//
628// it is a no-op if the provided ATURI is empty
629func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error {
630 if aturi == "" {
631 return nil
632 }
633
634 parsed := syntax.ATURI(aturi)
635
636 collection := parsed.Collection().String()
637 repo := parsed.Authority().String()
638 rkey := parsed.RecordKey().String()
639
640 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{
641 Collection: collection,
642 Repo: repo,
643 Rkey: rkey,
644 })
645 return err
646}
647
648func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error {
649 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults))
650 if err != nil {
651 return err
652 }
653 // already present
654 if len(defaultLabels) == len(defaults) {
655 return nil
656 }
657
658 labelDefs, err := models.FetchLabelDefs(r, defaults)
659 if err != nil {
660 return err
661 }
662
663 // Insert each label definition to the database
664 for _, labelDef := range labelDefs {
665 _, err = db.AddLabelDefinition(e, &labelDef)
666 if err != nil {
667 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err)
668 }
669 }
670
671 return nil
672}