Monorepo for Tangled
1package state
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "net/http"
10 "strings"
11 "time"
12
13 "tangled.org/core/api/tangled"
14 "tangled.org/core/appview"
15 "tangled.org/core/appview/config"
16 "tangled.org/core/appview/db"
17 "tangled.org/core/appview/indexer"
18 "tangled.org/core/appview/mentions"
19 "tangled.org/core/appview/models"
20 "tangled.org/core/appview/notify"
21 dbnotify "tangled.org/core/appview/notify/db"
22 phnotify "tangled.org/core/appview/notify/posthog"
23 "tangled.org/core/appview/oauth"
24 "tangled.org/core/appview/pages"
25 "tangled.org/core/appview/reporesolver"
26 "tangled.org/core/appview/validator"
27 xrpcclient "tangled.org/core/appview/xrpcclient"
28 "tangled.org/core/eventconsumer"
29 "tangled.org/core/idresolver"
30 "tangled.org/core/jetstream"
31 "tangled.org/core/log"
32 tlog "tangled.org/core/log"
33 "tangled.org/core/orm"
34 "tangled.org/core/rbac"
35 "tangled.org/core/tid"
36
37 comatproto "github.com/bluesky-social/indigo/api/atproto"
38 atpclient "github.com/bluesky-social/indigo/atproto/client"
39 "github.com/bluesky-social/indigo/atproto/syntax"
40 lexutil "github.com/bluesky-social/indigo/lex/util"
41 securejoin "github.com/cyphar/filepath-securejoin"
42 "github.com/go-chi/chi/v5"
43 "github.com/posthog/posthog-go"
44)
45
46type State struct {
47 db *db.DB
48 notifier notify.Notifier
49 indexer *indexer.Indexer
50 oauth *oauth.OAuth
51 enforcer *rbac.Enforcer
52 pages *pages.Pages
53 idResolver *idresolver.Resolver
54 mentionsResolver *mentions.Resolver
55 posthog posthog.Client
56 jc *jetstream.JetstreamClient
57 config *config.Config
58 repoResolver *reporesolver.RepoResolver
59 knotstream *eventconsumer.Consumer
60 spindlestream *eventconsumer.Consumer
61 logger *slog.Logger
62 validator *validator.Validator
63}
64
65func Make(ctx context.Context, config *config.Config) (*State, error) {
66 logger := tlog.FromContext(ctx)
67
68 d, err := db.Make(ctx, config.Core.DbPath)
69 if err != nil {
70 return nil, fmt.Errorf("failed to create db: %w", err)
71 }
72
73 indexer := indexer.New(log.SubLogger(logger, "indexer"))
74 err = indexer.Init(ctx, d)
75 if err != nil {
76 return nil, fmt.Errorf("failed to create indexer: %w", err)
77 }
78
79 enforcer, err := rbac.NewEnforcer(config.Core.DbPath)
80 if err != nil {
81 return nil, fmt.Errorf("failed to create enforcer: %w", err)
82 }
83
84 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL)
85 if err != nil {
86 logger.Error("failed to create redis resolver", "err", err)
87 res = idresolver.DefaultResolver(config.Plc.PLCURL)
88 }
89
90 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint})
91 if err != nil {
92 return nil, fmt.Errorf("failed to create posthog client: %w", err)
93 }
94
95 pages := pages.NewPages(config, res, d, log.SubLogger(logger, "pages"))
96 oauth, err := oauth.New(config, posthog, d, enforcer, res, log.SubLogger(logger, "oauth"))
97 if err != nil {
98 return nil, fmt.Errorf("failed to start oauth handler: %w", err)
99 }
100 validator := validator.New(d, res, enforcer)
101
102 repoResolver := reporesolver.New(config, enforcer, d)
103
104 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver"))
105
106 wrapper := db.DbWrapper{Execer: d}
107 jc, err := jetstream.NewJetstreamClient(
108 config.Jetstream.Endpoint,
109 "appview",
110 []string{
111 tangled.GraphFollowNSID,
112 tangled.FeedStarNSID,
113 tangled.PublicKeyNSID,
114 tangled.RepoArtifactNSID,
115 tangled.ActorProfileNSID,
116 tangled.SpindleMemberNSID,
117 tangled.SpindleNSID,
118 tangled.StringNSID,
119 tangled.RepoIssueNSID,
120 tangled.RepoIssueCommentNSID,
121 tangled.LabelDefinitionNSID,
122 tangled.LabelOpNSID,
123 },
124 nil,
125 tlog.SubLogger(logger, "jetstream"),
126 wrapper,
127 false,
128
129 // in-memory filter is inapplicalble to appview so
130 // we'll never log dids anyway.
131 false,
132 )
133 if err != nil {
134 return nil, fmt.Errorf("failed to create jetstream client: %w", err)
135 }
136
137 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil {
138 return nil, fmt.Errorf("failed to backfill default label defs: %w", err)
139 }
140
141 ingester := appview.Ingester{
142 Db: wrapper,
143 Enforcer: enforcer,
144 IdResolver: res,
145 Config: config,
146 Logger: log.SubLogger(logger, "ingester"),
147 Validator: validator,
148 }
149 err = jc.StartJetstream(ctx, ingester.Ingest())
150 if err != nil {
151 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err)
152 }
153
154 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog)
155 if err != nil {
156 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err)
157 }
158 knotstream.Start(ctx)
159
160 spindlestream, err := Spindlestream(ctx, config, d, enforcer)
161 if err != nil {
162 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err)
163 }
164 spindlestream.Start(ctx)
165
166 var notifiers []notify.Notifier
167
168 // Always add the database notifier
169 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res))
170
171 // Add other notifiers in production only
172 if !config.Core.Dev {
173 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog))
174 }
175 notifiers = append(notifiers, indexer)
176 notifier := notify.NewMergedNotifier(notifiers)
177 notifier = notify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify"))
178
179 state := &State{
180 d,
181 notifier,
182 indexer,
183 oauth,
184 enforcer,
185 pages,
186 res,
187 mentionsResolver,
188 posthog,
189 jc,
190 config,
191 repoResolver,
192 knotstream,
193 spindlestream,
194 logger,
195 validator,
196 }
197
198 return state, nil
199}
200
201func (s *State) Close() error {
202 // other close up logic goes here
203 return s.db.Close()
204}
205
206func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) {
207 w.Header().Set("Content-Type", "text/plain")
208 w.Header().Set("Cache-Control", "public, max-age=86400") // one day
209
210 robotsTxt := `User-agent: *
211Allow: /
212`
213 w.Write([]byte(robotsTxt))
214}
215
216func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) {
217 user := s.oauth.GetMultiAccountUser(r)
218 s.pages.TermsOfService(w, pages.TermsOfServiceParams{
219 LoggedInUser: user,
220 })
221}
222
223func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) {
224 user := s.oauth.GetMultiAccountUser(r)
225 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{
226 LoggedInUser: user,
227 })
228}
229
230func (s *State) Brand(w http.ResponseWriter, r *http.Request) {
231 user := s.oauth.GetMultiAccountUser(r)
232 s.pages.Brand(w, pages.BrandParams{
233 LoggedInUser: user,
234 })
235}
236
237func (s *State) HomeOrTimeline(w http.ResponseWriter, r *http.Request) {
238 if s.oauth.GetMultiAccountUser(r) != nil {
239 s.Timeline(w, r)
240 return
241 }
242 s.Home(w, r)
243}
244
245func (s *State) Timeline(w http.ResponseWriter, r *http.Request) {
246 user := s.oauth.GetMultiAccountUser(r)
247
248 // TODO: set this flag based on the UI
249 filtered := false
250
251 var userDid string
252 if user != nil && user.Active != nil {
253 userDid = user.Active.Did
254 }
255 timeline, err := db.MakeTimeline(s.db, 50, userDid, filtered)
256 if err != nil {
257 s.logger.Error("failed to make timeline", "err", err)
258 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.")
259 }
260
261 repos, err := db.GetTopStarredReposLastWeek(s.db)
262 if err != nil {
263 s.logger.Error("failed to get top starred repos", "err", err)
264 s.pages.Notice(w, "topstarredrepos", "Unable to load.")
265 return
266 }
267
268 gfiLabel, err := db.GetLabelDefinition(s.db, orm.FilterEq("at_uri", s.config.Label.GoodFirstIssue))
269 if err != nil {
270 // non-fatal
271 }
272
273 s.pages.Timeline(w, pages.TimelineParams{
274 LoggedInUser: user,
275 Timeline: timeline,
276 Repos: repos,
277 GfiLabel: gfiLabel,
278 })
279}
280
281func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) {
282 user := s.oauth.GetMultiAccountUser(r)
283 if user == nil {
284 return
285 }
286
287 l := s.logger.With("handler", "UpgradeBanner")
288 l = l.With("did", user.Active.Did)
289
290 regs, err := db.GetRegistrations(
291 s.db,
292 orm.FilterEq("did", user.Active.Did),
293 orm.FilterEq("needs_upgrade", 1),
294 )
295 if err != nil {
296 l.Error("non-fatal: failed to get registrations", "err", err)
297 }
298
299 spindles, err := db.GetSpindles(
300 s.db,
301 orm.FilterEq("owner", user.Active.Did),
302 orm.FilterEq("needs_upgrade", 1),
303 )
304 if err != nil {
305 l.Error("non-fatal: failed to get spindles", "err", err)
306 }
307
308 if regs == nil && spindles == nil {
309 return
310 }
311
312 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{
313 Registrations: regs,
314 Spindles: spindles,
315 })
316}
317
318func (s *State) Home(w http.ResponseWriter, r *http.Request) {
319 // TODO: set this flag based on the UI
320 filtered := false
321
322 timeline, err := db.MakeTimeline(s.db, 5, "", filtered)
323 if err != nil {
324 s.logger.Error("failed to make timeline", "err", err)
325 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.")
326 return
327 }
328
329 repos, err := db.GetTopStarredReposLastWeek(s.db)
330 if err != nil {
331 s.logger.Error("failed to get top starred repos", "err", err)
332 s.pages.Notice(w, "topstarredrepos", "Unable to load.")
333 return
334 }
335
336 s.pages.Home(w, pages.TimelineParams{
337 LoggedInUser: nil,
338 Timeline: timeline,
339 Repos: repos,
340 })
341}
342
343func (s *State) Keys(w http.ResponseWriter, r *http.Request) {
344 user := chi.URLParam(r, "user")
345 user = strings.TrimPrefix(user, "@")
346
347 if user == "" {
348 w.WriteHeader(http.StatusBadRequest)
349 return
350 }
351
352 id, err := s.idResolver.ResolveIdent(r.Context(), user)
353 if err != nil {
354 w.WriteHeader(http.StatusInternalServerError)
355 return
356 }
357
358 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String())
359 if err != nil {
360 s.logger.Error("failed to get public keys", "err", err)
361 http.Error(w, "failed to get public keys", http.StatusInternalServerError)
362 return
363 }
364
365 if len(pubKeys) == 0 {
366 w.WriteHeader(http.StatusNoContent)
367 return
368 }
369
370 for _, k := range pubKeys {
371 key := strings.TrimRight(k.Key, "\n")
372 fmt.Fprintln(w, key)
373 }
374}
375
376func validateRepoName(name string) error {
377 // check for path traversal attempts
378 if name == "." || name == ".." ||
379 strings.Contains(name, "/") || strings.Contains(name, "\\") {
380 return fmt.Errorf("Repository name contains invalid path characters")
381 }
382
383 // check for sequences that could be used for traversal when normalized
384 if strings.Contains(name, "./") || strings.Contains(name, "../") ||
385 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") {
386 return fmt.Errorf("Repository name contains invalid path sequence")
387 }
388
389 // then continue with character validation
390 for _, char := range name {
391 if !((char >= 'a' && char <= 'z') ||
392 (char >= 'A' && char <= 'Z') ||
393 (char >= '0' && char <= '9') ||
394 char == '-' || char == '_' || char == '.') {
395 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores")
396 }
397 }
398
399 // additional check to prevent multiple sequential dots
400 if strings.Contains(name, "..") {
401 return fmt.Errorf("Repository name cannot contain sequential dots")
402 }
403
404 // if all checks pass
405 return nil
406}
407
408func stripGitExt(name string) string {
409 return strings.TrimSuffix(name, ".git")
410}
411
412func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) {
413 switch r.Method {
414 case http.MethodGet:
415 user := s.oauth.GetMultiAccountUser(r)
416 knots, err := s.enforcer.GetKnotsForUser(user.Active.Did)
417 if err != nil {
418 s.pages.Notice(w, "repo", "Invalid user account.")
419 return
420 }
421
422 s.pages.NewRepo(w, pages.NewRepoParams{
423 LoggedInUser: user,
424 Knots: knots,
425 })
426
427 case http.MethodPost:
428 l := s.logger.With("handler", "NewRepo")
429
430 user := s.oauth.GetMultiAccountUser(r)
431 l = l.With("did", user.Active.Did)
432
433 // form validation
434 domain := r.FormValue("domain")
435 if domain == "" {
436 s.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
437 return
438 }
439 l = l.With("knot", domain)
440
441 repoName := r.FormValue("name")
442 if repoName == "" {
443 s.pages.Notice(w, "repo", "Repository name cannot be empty.")
444 return
445 }
446
447 if err := validateRepoName(repoName); err != nil {
448 s.pages.Notice(w, "repo", err.Error())
449 return
450 }
451 repoName = stripGitExt(repoName)
452 l = l.With("repoName", repoName)
453
454 defaultBranch := r.FormValue("branch")
455 if defaultBranch == "" {
456 defaultBranch = "main"
457 }
458 l = l.With("defaultBranch", defaultBranch)
459
460 description := r.FormValue("description")
461
462 // ACL validation
463 ok, err := s.enforcer.E.Enforce(user.Active.Did, domain, domain, "repo:create")
464 if err != nil || !ok {
465 l.Info("unauthorized")
466 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
467 return
468 }
469
470 // Check for existing repos
471 existingRepo, err := db.GetRepo(
472 s.db,
473 orm.FilterEq("did", user.Active.Did),
474 orm.FilterEq("name", repoName),
475 )
476 if err == nil && existingRepo != nil {
477 l.Info("repo exists")
478 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot))
479 return
480 }
481
482 // create atproto record for this repo
483 rkey := tid.TID()
484 repo := &models.Repo{
485 Did: user.Active.Did,
486 Name: repoName,
487 Knot: domain,
488 Rkey: rkey,
489 Description: description,
490 Created: time.Now(),
491 Labels: s.config.Label.DefaultLabelDefs,
492 }
493 record := repo.AsRecord()
494
495 atpClient, err := s.oauth.AuthorizedClient(r)
496 if err != nil {
497 l.Info("PDS write failed", "err", err)
498 s.pages.Notice(w, "repo", "Failed to write record to PDS.")
499 return
500 }
501
502 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{
503 Collection: tangled.RepoNSID,
504 Repo: user.Active.Did,
505 Rkey: rkey,
506 Record: &lexutil.LexiconTypeDecoder{
507 Val: &record,
508 },
509 })
510 if err != nil {
511 l.Info("PDS write failed", "err", err)
512 s.pages.Notice(w, "repo", "Failed to announce repository creation.")
513 return
514 }
515
516 aturi := atresp.Uri
517 l = l.With("aturi", aturi)
518 l.Info("wrote to PDS")
519
520 tx, err := s.db.BeginTx(r.Context(), nil)
521 if err != nil {
522 l.Info("txn failed", "err", err)
523 s.pages.Notice(w, "repo", "Failed to save repository information.")
524 return
525 }
526
527 // The rollback function reverts a few things on failure:
528 // - the pending txn
529 // - the ACLs
530 // - the atproto record created
531 rollback := func() {
532 err1 := tx.Rollback()
533 err2 := s.enforcer.E.LoadPolicy()
534 err3 := rollbackRecord(context.Background(), aturi, atpClient)
535
536 // ignore txn complete errors, this is okay
537 if errors.Is(err1, sql.ErrTxDone) {
538 err1 = nil
539 }
540
541 if errs := errors.Join(err1, err2, err3); errs != nil {
542 l.Error("failed to rollback changes", "errs", errs)
543 return
544 }
545 }
546 defer rollback()
547
548 client, err := s.oauth.ServiceClient(
549 r,
550 oauth.WithService(domain),
551 oauth.WithLxm(tangled.RepoCreateNSID),
552 oauth.WithDev(s.config.Core.Dev),
553 )
554 if err != nil {
555 l.Error("service auth failed", "err", err)
556 s.pages.Notice(w, "repo", "Failed to reach PDS.")
557 return
558 }
559
560 xe := tangled.RepoCreate(
561 r.Context(),
562 client,
563 &tangled.RepoCreate_Input{
564 Rkey: rkey,
565 },
566 )
567 if err := xrpcclient.HandleXrpcErr(xe); err != nil {
568 l.Error("xrpc error", "xe", xe)
569 s.pages.Notice(w, "repo", err.Error())
570 return
571 }
572
573 err = db.AddRepo(tx, repo)
574 if err != nil {
575 l.Error("db write failed", "err", err)
576 s.pages.Notice(w, "repo", "Failed to save repository information.")
577 return
578 }
579
580 // acls
581 p, _ := securejoin.SecureJoin(user.Active.Did, repoName)
582 err = s.enforcer.AddRepo(user.Active.Did, domain, p)
583 if err != nil {
584 l.Error("acl setup failed", "err", err)
585 s.pages.Notice(w, "repo", "Failed to set up repository permissions.")
586 return
587 }
588
589 err = tx.Commit()
590 if err != nil {
591 l.Error("txn commit failed", "err", err)
592 http.Error(w, err.Error(), http.StatusInternalServerError)
593 return
594 }
595
596 err = s.enforcer.E.SavePolicy()
597 if err != nil {
598 l.Error("acl save failed", "err", err)
599 http.Error(w, err.Error(), http.StatusInternalServerError)
600 return
601 }
602
603 // reset the ATURI because the transaction completed successfully
604 aturi = ""
605
606 s.notifier.NewRepo(r.Context(), repo)
607 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Active.Did, repoName))
608 }
609}
610
611// this is used to rollback changes made to the PDS
612//
613// it is a no-op if the provided ATURI is empty
614func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error {
615 if aturi == "" {
616 return nil
617 }
618
619 parsed := syntax.ATURI(aturi)
620
621 collection := parsed.Collection().String()
622 repo := parsed.Authority().String()
623 rkey := parsed.RecordKey().String()
624
625 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{
626 Collection: collection,
627 Repo: repo,
628 Rkey: rkey,
629 })
630 return err
631}
632
633func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error {
634 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults))
635 if err != nil {
636 return err
637 }
638 // already present
639 if len(defaultLabels) == len(defaults) {
640 return nil
641 }
642
643 labelDefs, err := models.FetchLabelDefs(r, defaults)
644 if err != nil {
645 return err
646 }
647
648 // Insert each label definition to the database
649 for _, labelDef := range labelDefs {
650 _, err = db.AddLabelDefinition(e, &labelDef)
651 if err != nil {
652 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err)
653 }
654 }
655
656 return nil
657}