forked from
tangled.org/core
Monorepo for Tangled
1package state
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "net/http"
10 "strings"
11 "time"
12
13 "tangled.org/core/api/tangled"
14 "tangled.org/core/appview"
15 "tangled.org/core/appview/bsky"
16 "tangled.org/core/appview/config"
17 "tangled.org/core/appview/db"
18 "tangled.org/core/appview/indexer"
19 "tangled.org/core/appview/mentions"
20 "tangled.org/core/appview/models"
21 "tangled.org/core/appview/notify"
22 dbnotify "tangled.org/core/appview/notify/db"
23 phnotify "tangled.org/core/appview/notify/posthog"
24 "tangled.org/core/appview/oauth"
25 "tangled.org/core/appview/pages"
26 "tangled.org/core/appview/reporesolver"
27 "tangled.org/core/appview/validator"
28 xrpcclient "tangled.org/core/appview/xrpcclient"
29 "tangled.org/core/consts"
30 "tangled.org/core/eventconsumer"
31 "tangled.org/core/idresolver"
32 "tangled.org/core/jetstream"
33 "tangled.org/core/log"
34 tlog "tangled.org/core/log"
35 "tangled.org/core/orm"
36 "tangled.org/core/rbac"
37 "tangled.org/core/tid"
38
39 comatproto "github.com/bluesky-social/indigo/api/atproto"
40 atpclient "github.com/bluesky-social/indigo/atproto/client"
41 "github.com/bluesky-social/indigo/atproto/syntax"
42 lexutil "github.com/bluesky-social/indigo/lex/util"
43 "github.com/bluesky-social/indigo/xrpc"
44
45 "github.com/go-chi/chi/v5"
46 "github.com/posthog/posthog-go"
47)
48
49type State struct {
50 db *db.DB
51 notifier notify.Notifier
52 indexer *indexer.Indexer
53 oauth *oauth.OAuth
54 enforcer *rbac.Enforcer
55 pages *pages.Pages
56 idResolver *idresolver.Resolver
57 mentionsResolver *mentions.Resolver
58 posthog posthog.Client
59 jc *jetstream.JetstreamClient
60 config *config.Config
61 repoResolver *reporesolver.RepoResolver
62 knotstream *eventconsumer.Consumer
63 spindlestream *eventconsumer.Consumer
64 logger *slog.Logger
65 validator *validator.Validator
66}
67
68func Make(ctx context.Context, config *config.Config) (*State, error) {
69 logger := tlog.FromContext(ctx)
70
71 d, err := db.Make(ctx, config.Core.DbPath)
72 if err != nil {
73 return nil, fmt.Errorf("failed to create db: %w", err)
74 }
75
76 indexer := indexer.New(log.SubLogger(logger, "indexer"))
77 err = indexer.Init(ctx, d)
78 if err != nil {
79 return nil, fmt.Errorf("failed to create indexer: %w", err)
80 }
81
82 enforcer, err := rbac.NewEnforcer(config.Core.DbPath)
83 if err != nil {
84 return nil, fmt.Errorf("failed to create enforcer: %w", err)
85 }
86
87 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL)
88 if err != nil {
89 logger.Error("failed to create redis resolver", "err", err)
90 res = idresolver.DefaultResolver(config.Plc.PLCURL)
91 }
92
93 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint})
94 if err != nil {
95 return nil, fmt.Errorf("failed to create posthog client: %w", err)
96 }
97
98 pages := pages.NewPages(config, res, d, log.SubLogger(logger, "pages"))
99 oauth, err := oauth.New(config, posthog, d, enforcer, res, log.SubLogger(logger, "oauth"))
100 if err != nil {
101 return nil, fmt.Errorf("failed to start oauth handler: %w", err)
102 }
103 validator := validator.New(d, res, enforcer)
104
105 repoResolver := reporesolver.New(config, enforcer, d)
106
107 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver"))
108
109 wrapper := db.DbWrapper{Execer: d}
110 jc, err := jetstream.NewJetstreamClient(
111 config.Jetstream.Endpoint,
112 "appview",
113 []string{
114 tangled.GraphFollowNSID,
115 tangled.FeedStarNSID,
116 tangled.PublicKeyNSID,
117 tangled.RepoArtifactNSID,
118 tangled.ActorProfileNSID,
119 tangled.KnotMemberNSID,
120 tangled.SpindleMemberNSID,
121 tangled.SpindleNSID,
122 tangled.StringNSID,
123 tangled.RepoIssueNSID,
124 tangled.RepoIssueCommentNSID,
125 tangled.LabelDefinitionNSID,
126 tangled.LabelOpNSID,
127 },
128 nil,
129 tlog.SubLogger(logger, "jetstream"),
130 wrapper,
131 false,
132
133 // in-memory filter is inapplicable to appview so
134 // we'll never log dids anyway.
135 false,
136 )
137 if err != nil {
138 return nil, fmt.Errorf("failed to create jetstream client: %w", err)
139 }
140
141 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil {
142 return nil, fmt.Errorf("failed to backfill default label defs: %w", err)
143 }
144
145 ingester := appview.Ingester{
146 Db: wrapper,
147 Enforcer: enforcer,
148 IdResolver: res,
149 Config: config,
150 Logger: log.SubLogger(logger, "ingester"),
151 Validator: validator,
152 }
153 err = jc.StartJetstream(ctx, ingester.Ingest())
154 if err != nil {
155 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err)
156 }
157
158 var notifiers []notify.Notifier
159
160 // Always add the database notifier
161 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res))
162
163 // Add other notifiers in production only
164 if !config.Core.Dev {
165 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog))
166 }
167 notifiers = append(notifiers, indexer)
168
169 // Add webhook notifier
170 notifiers = append(notifiers, notify.NewWebhookNotifier(d))
171
172 notifier := notify.NewMergedNotifier(notifiers)
173 notifier = notify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify"))
174
175 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog, notifier)
176 if err != nil {
177 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err)
178 }
179 knotstream.Start(ctx)
180
181 spindlestream, err := Spindlestream(ctx, config, d, enforcer)
182 if err != nil {
183 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err)
184 }
185 spindlestream.Start(ctx)
186
187 state := &State{
188 d,
189 notifier,
190 indexer,
191 oauth,
192 enforcer,
193 pages,
194 res,
195 mentionsResolver,
196 posthog,
197 jc,
198 config,
199 repoResolver,
200 knotstream,
201 spindlestream,
202 logger,
203 validator,
204 }
205
206 // fetch initial bluesky posts if configured
207 go fetchBskyPosts(ctx, res, config, d, logger)
208
209 return state, nil
210}
211
212func (s *State) Close() error {
213 // other close up logic goes here
214 return s.db.Close()
215}
216
217func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) {
218 w.Header().Set("Content-Type", "text/plain")
219 w.Header().Set("Cache-Control", "public, max-age=86400") // one day
220
221 robotsTxt := `# Hello, Tanglers!
222User-agent: *
223Allow: /
224Disallow: /*/*/settings
225Disallow: /settings
226Disallow: /*/*/compare
227Disallow: /*/*/fork
228
229Crawl-delay: 1
230`
231 w.Write([]byte(robotsTxt))
232}
233
234func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) {
235 user := s.oauth.GetMultiAccountUser(r)
236 s.pages.TermsOfService(w, pages.TermsOfServiceParams{
237 LoggedInUser: user,
238 })
239}
240
241func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) {
242 user := s.oauth.GetMultiAccountUser(r)
243 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{
244 LoggedInUser: user,
245 })
246}
247
248func (s *State) Brand(w http.ResponseWriter, r *http.Request) {
249 user := s.oauth.GetMultiAccountUser(r)
250 s.pages.Brand(w, pages.BrandParams{
251 LoggedInUser: user,
252 })
253}
254
255func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) {
256 user := s.oauth.GetMultiAccountUser(r)
257 if user == nil {
258 return
259 }
260
261 l := s.logger.With("handler", "UpgradeBanner")
262 l = l.With("did", user.Active.Did)
263
264 regs, err := db.GetRegistrations(
265 s.db,
266 orm.FilterEq("did", user.Active.Did),
267 orm.FilterEq("needs_upgrade", 1),
268 )
269 if err != nil {
270 l.Error("non-fatal: failed to get registrations", "err", err)
271 }
272
273 spindles, err := db.GetSpindles(
274 s.db,
275 orm.FilterEq("owner", user.Active.Did),
276 orm.FilterEq("needs_upgrade", 1),
277 )
278 if err != nil {
279 l.Error("non-fatal: failed to get spindles", "err", err)
280 }
281
282 if regs == nil && spindles == nil {
283 return
284 }
285
286 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{
287 Registrations: regs,
288 Spindles: spindles,
289 })
290}
291
292func (s *State) Keys(w http.ResponseWriter, r *http.Request) {
293 user := chi.URLParam(r, "user")
294 user = strings.TrimPrefix(user, "@")
295
296 if user == "" {
297 w.WriteHeader(http.StatusBadRequest)
298 return
299 }
300
301 id, err := s.idResolver.ResolveIdent(r.Context(), user)
302 if err != nil {
303 w.WriteHeader(http.StatusInternalServerError)
304 return
305 }
306
307 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String())
308 if err != nil {
309 s.logger.Error("failed to get public keys", "err", err)
310 http.Error(w, "failed to get public keys", http.StatusInternalServerError)
311 return
312 }
313
314 if len(pubKeys) == 0 {
315 w.WriteHeader(http.StatusNoContent)
316 return
317 }
318
319 for _, k := range pubKeys {
320 key := strings.TrimRight(k.Key, "\n")
321 fmt.Fprintln(w, key)
322 }
323}
324
325func validateRepoName(name string) error {
326 // check for path traversal attempts
327 if name == "." || name == ".." ||
328 strings.Contains(name, "/") || strings.Contains(name, "\\") {
329 return fmt.Errorf("Repository name contains invalid path characters")
330 }
331
332 // check for sequences that could be used for traversal when normalized
333 if strings.Contains(name, "./") || strings.Contains(name, "../") ||
334 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") {
335 return fmt.Errorf("Repository name contains invalid path sequence")
336 }
337
338 // then continue with character validation
339 for _, char := range name {
340 if !((char >= 'a' && char <= 'z') ||
341 (char >= 'A' && char <= 'Z') ||
342 (char >= '0' && char <= '9') ||
343 char == '-' || char == '_' || char == '.') {
344 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores")
345 }
346 }
347
348 // additional check to prevent multiple sequential dots
349 if strings.Contains(name, "..") {
350 return fmt.Errorf("Repository name cannot contain sequential dots")
351 }
352
353 // if all checks pass
354 return nil
355}
356
357func stripGitExt(name string) string {
358 return strings.TrimSuffix(name, ".git")
359}
360
361func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) {
362 switch r.Method {
363 case http.MethodGet:
364 user := s.oauth.GetMultiAccountUser(r)
365 knots, err := s.enforcer.GetKnotsForUser(user.Active.Did)
366 if err != nil {
367 s.pages.Notice(w, "repo", "Invalid user account.")
368 return
369 }
370
371 s.pages.NewRepo(w, pages.NewRepoParams{
372 LoggedInUser: user,
373 Knots: knots,
374 })
375
376 case http.MethodPost:
377 l := s.logger.With("handler", "NewRepo")
378
379 user := s.oauth.GetMultiAccountUser(r)
380 l = l.With("did", user.Active.Did)
381
382 // form validation
383 domain := r.FormValue("domain")
384 if domain == "" {
385 s.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
386 return
387 }
388 l = l.With("knot", domain)
389
390 repoName := r.FormValue("name")
391 if repoName == "" {
392 s.pages.Notice(w, "repo", "Repository name cannot be empty.")
393 return
394 }
395
396 if err := validateRepoName(repoName); err != nil {
397 s.pages.Notice(w, "repo", err.Error())
398 return
399 }
400 repoName = stripGitExt(repoName)
401 l = l.With("repoName", repoName)
402
403 defaultBranch := r.FormValue("branch")
404 if defaultBranch == "" {
405 defaultBranch = "main"
406 }
407 l = l.With("defaultBranch", defaultBranch)
408
409 description := r.FormValue("description")
410 if len([]rune(description)) > 140 {
411 s.pages.Notice(w, "repo", "Description must be 140 characters or fewer.")
412 return
413 }
414
415 // ACL validation
416 ok, err := s.enforcer.E.Enforce(user.Active.Did, domain, domain, "repo:create")
417 if err != nil || !ok {
418 l.Info("unauthorized")
419 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
420 return
421 }
422
423 // Check for existing repos
424 existingRepo, err := db.GetRepo(
425 s.db,
426 orm.FilterEq("did", user.Active.Did),
427 orm.FilterEq("name", repoName),
428 )
429 if err == nil && existingRepo != nil {
430 l.Info("repo exists")
431 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot))
432 return
433 }
434
435 rkey := tid.TID()
436
437 client, err := s.oauth.ServiceClient(
438 r,
439 oauth.WithService(domain),
440 oauth.WithLxm(tangled.RepoCreateNSID),
441 oauth.WithDev(s.config.Core.Dev),
442 )
443 if err != nil {
444 l.Error("service auth failed", "err", err)
445 s.pages.Notice(w, "repo", "Failed to reach knot server.")
446 return
447 }
448
449 input := &tangled.RepoCreate_Input{
450 Rkey: rkey,
451 Name: repoName,
452 DefaultBranch: &defaultBranch,
453 }
454 if rd := strings.TrimSpace(r.FormValue("repo_did")); rd != "" {
455 input.RepoDid = &rd
456 }
457
458 createResp, xe := tangled.RepoCreate(
459 r.Context(),
460 client,
461 input,
462 )
463 if err := xrpcclient.HandleXrpcErr(xe); err != nil {
464 l.Error("xrpc error", "xe", xe)
465 s.pages.Notice(w, "repo", err.Error())
466 return
467 }
468
469 var repoDid string
470 if createResp != nil && createResp.RepoDid != nil {
471 repoDid = *createResp.RepoDid
472 }
473 if repoDid == "" {
474 l.Error("knot returned empty repo DID")
475 s.pages.Notice(w, "repo", "Knot failed to mint a repo DID. The knot may need to be upgraded.")
476 return
477 }
478
479 repo := &models.Repo{
480 Did: user.Active.Did,
481 Name: repoName,
482 Knot: domain,
483 Rkey: rkey,
484 Description: description,
485 Created: time.Now(),
486 Labels: s.config.Label.DefaultLabelDefs,
487 RepoDid: repoDid,
488 }
489 record := repo.AsRecord()
490
491 cleanupKnot := func() {
492 go func() {
493 delays := []time.Duration{0, 2 * time.Second, 5 * time.Second}
494 for attempt, delay := range delays {
495 time.Sleep(delay)
496 deleteClient, dErr := s.oauth.ServiceClient(
497 r,
498 oauth.WithService(domain),
499 oauth.WithLxm(tangled.RepoDeleteNSID),
500 oauth.WithDev(s.config.Core.Dev),
501 )
502 if dErr != nil {
503 l.Error("failed to create delete client for knot cleanup", "attempt", attempt+1, "err", dErr)
504 continue
505 }
506 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
507 if dErr := tangled.RepoDelete(ctx, deleteClient, &tangled.RepoDelete_Input{
508 Did: user.Active.Did,
509 Name: repoName,
510 Rkey: rkey,
511 }); dErr != nil {
512 cancel()
513 l.Error("failed to clean up repo on knot after rollback", "attempt", attempt+1, "err", dErr)
514 continue
515 }
516 cancel()
517 l.Info("successfully cleaned up repo on knot after rollback", "attempt", attempt+1)
518 return
519 }
520 l.Error("exhausted retries for knot cleanup, repo may be orphaned",
521 "did", user.Active.Did, "repo", repoName, "knot", domain)
522 }()
523 }
524
525 atpClient, err := s.oauth.AuthorizedClient(r)
526 if err != nil {
527 l.Info("PDS write failed", "err", err)
528 cleanupKnot()
529 s.pages.Notice(w, "repo", "Failed to write record to PDS.")
530 return
531 }
532
533 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{
534 Collection: tangled.RepoNSID,
535 Repo: user.Active.Did,
536 Rkey: rkey,
537 Record: &lexutil.LexiconTypeDecoder{
538 Val: &record,
539 },
540 })
541 if err != nil {
542 l.Info("PDS write failed", "err", err)
543 cleanupKnot()
544 s.pages.Notice(w, "repo", "Failed to announce repository creation.")
545 return
546 }
547
548 aturi := atresp.Uri
549 l = l.With("aturi", aturi)
550 l.Info("wrote to PDS")
551
552 tx, err := s.db.BeginTx(r.Context(), nil)
553 if err != nil {
554 l.Info("txn failed", "err", err)
555 s.pages.Notice(w, "repo", "Failed to save repository information.")
556 return
557 }
558
559 rollback := func() {
560 err1 := tx.Rollback()
561 err2 := s.enforcer.E.LoadPolicy()
562 err3 := rollbackRecord(context.Background(), aturi, atpClient)
563
564 if errors.Is(err1, sql.ErrTxDone) {
565 err1 = nil
566 }
567
568 if errs := errors.Join(err1, err2, err3); errs != nil {
569 l.Error("failed to rollback changes", "errs", errs)
570 }
571
572 if aturi != "" {
573 cleanupKnot()
574 }
575 }
576 defer rollback()
577
578 err = db.AddRepo(tx, repo)
579 if err != nil {
580 l.Error("db write failed", "err", err)
581 s.pages.Notice(w, "repo", "Failed to save repository information.")
582 return
583 }
584
585 rbacPath := repo.RepoIdentifier()
586 err = s.enforcer.AddRepo(user.Active.Did, domain, rbacPath)
587 if err != nil {
588 l.Error("acl setup failed", "err", err)
589 s.pages.Notice(w, "repo", "Failed to set up repository permissions.")
590 return
591 }
592
593 err = tx.Commit()
594 if err != nil {
595 l.Error("txn commit failed", "err", err)
596 http.Error(w, err.Error(), http.StatusInternalServerError)
597 return
598 }
599
600 err = s.enforcer.E.SavePolicy()
601 if err != nil {
602 l.Error("acl save failed", "err", err)
603 http.Error(w, err.Error(), http.StatusInternalServerError)
604 return
605 }
606
607 aturi = ""
608
609 s.notifier.NewRepo(r.Context(), repo)
610 if repoDid != "" {
611 s.pages.HxLocation(w, fmt.Sprintf("/%s", repoDid))
612 } else {
613 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Active.Did, repoName))
614 }
615 }
616}
617
618// this is used to rollback changes made to the PDS
619//
620// it is a no-op if the provided ATURI is empty
621func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error {
622 if aturi == "" {
623 return nil
624 }
625
626 parsed := syntax.ATURI(aturi)
627
628 collection := parsed.Collection().String()
629 repo := parsed.Authority().String()
630 rkey := parsed.RecordKey().String()
631
632 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{
633 Collection: collection,
634 Repo: repo,
635 Rkey: rkey,
636 })
637 return err
638}
639
640func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error {
641 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults))
642 if err != nil {
643 return err
644 }
645 // already present
646 if len(defaultLabels) == len(defaults) {
647 return nil
648 }
649
650 labelDefs, err := models.FetchLabelDefs(r, defaults)
651 if err != nil {
652 return err
653 }
654
655 // Insert each label definition to the database
656 for _, labelDef := range labelDefs {
657 _, err = db.AddLabelDefinition(e, &labelDef)
658 if err != nil {
659 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err)
660 }
661 }
662
663 return nil
664}
665
666func fetchBskyPosts(ctx context.Context, res *idresolver.Resolver, config *config.Config, d *db.DB, logger *slog.Logger) {
667 resolved, err := res.ResolveIdent(context.Background(), consts.TangledDid)
668 if err != nil {
669 logger.Error("failed to resolve tangled.org DID", "err", err)
670 return
671 }
672
673 pdsEndpoint := resolved.PDSEndpoint()
674 if pdsEndpoint == "" {
675 logger.Error("no PDS endpoint found for tangled.sh DID")
676 return
677 }
678
679 session, err := oauth.CreateAppPasswordSession(res, config.Core.AppPassword, consts.TangledDid, config.Core.RateLimitBypass, logger)
680 if err != nil {
681 logger.Error("failed to create appassword session... skipping fetch", "err", err)
682 return
683 }
684
685 client := xrpc.Client{
686 Auth: &xrpc.AuthInfo{
687 AccessJwt: session.AccessJwt,
688 Did: session.Did,
689 },
690 Host: session.PdsEndpoint,
691 }
692
693 l := log.SubLogger(logger, "bluesky")
694
695 ticker := time.NewTicker(config.Bluesky.UpdateInterval)
696 defer ticker.Stop()
697
698 for {
699 posts, _, err := bsky.FetchPosts(ctx, &client, 20, "")
700 if err != nil {
701 l.Error("failed to fetch bluesky posts", "err", err)
702 } else if err := db.InsertBlueskyPosts(d, posts); err != nil {
703 l.Error("failed to insert bluesky posts", "err", err)
704 } else {
705 l.Info("inserted bluesky posts", "count", len(posts))
706 }
707
708 select {
709 case <-ticker.C:
710 case <-ctx.Done():
711 l.Info("stopping bluesky updater")
712 return
713 }
714 }
715}