forked from
tangled.org/core
Monorepo for Tangled
1package state
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "net/http"
10 "strings"
11 "time"
12
13 "tangled.org/core/api/tangled"
14 "tangled.org/core/appview"
15 "tangled.org/core/appview/bsky"
16 "tangled.org/core/appview/cloudflare"
17 "tangled.org/core/appview/config"
18 "tangled.org/core/appview/db"
19 "tangled.org/core/appview/indexer"
20 "tangled.org/core/appview/mentions"
21 "tangled.org/core/appview/models"
22 "tangled.org/core/appview/notify"
23 dbnotify "tangled.org/core/appview/notify/db"
24 phnotify "tangled.org/core/appview/notify/posthog"
25 "tangled.org/core/appview/oauth"
26 "tangled.org/core/appview/pages"
27 "tangled.org/core/appview/reporesolver"
28 "tangled.org/core/appview/validator"
29 xrpcclient "tangled.org/core/appview/xrpcclient"
30 "tangled.org/core/consts"
31 "tangled.org/core/eventconsumer"
32 "tangled.org/core/idresolver"
33 "tangled.org/core/jetstream"
34 "tangled.org/core/log"
35 tlog "tangled.org/core/log"
36 "tangled.org/core/orm"
37 "tangled.org/core/rbac"
38 "tangled.org/core/tid"
39
40 comatproto "github.com/bluesky-social/indigo/api/atproto"
41 atpclient "github.com/bluesky-social/indigo/atproto/client"
42 "github.com/bluesky-social/indigo/atproto/syntax"
43 lexutil "github.com/bluesky-social/indigo/lex/util"
44 "github.com/bluesky-social/indigo/xrpc"
45 securejoin "github.com/cyphar/filepath-securejoin"
46 "github.com/go-chi/chi/v5"
47 "github.com/posthog/posthog-go"
48)
49
50type State struct {
51 db *db.DB
52 notifier notify.Notifier
53 indexer *indexer.Indexer
54 oauth *oauth.OAuth
55 enforcer *rbac.Enforcer
56 pages *pages.Pages
57 idResolver *idresolver.Resolver
58 mentionsResolver *mentions.Resolver
59 posthog posthog.Client
60 jc *jetstream.JetstreamClient
61 config *config.Config
62 repoResolver *reporesolver.RepoResolver
63 knotstream *eventconsumer.Consumer
64 spindlestream *eventconsumer.Consumer
65 logger *slog.Logger
66 validator *validator.Validator
67 cfClient *cloudflare.Client
68}
69
70func Make(ctx context.Context, config *config.Config) (*State, error) {
71 logger := tlog.FromContext(ctx)
72
73 d, err := db.Make(ctx, config.Core.DbPath)
74 if err != nil {
75 return nil, fmt.Errorf("failed to create db: %w", err)
76 }
77
78 indexer := indexer.New(log.SubLogger(logger, "indexer"))
79 err = indexer.Init(ctx, d)
80 if err != nil {
81 return nil, fmt.Errorf("failed to create indexer: %w", err)
82 }
83
84 enforcer, err := rbac.NewEnforcer(config.Core.DbPath)
85 if err != nil {
86 return nil, fmt.Errorf("failed to create enforcer: %w", err)
87 }
88
89 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL)
90 if err != nil {
91 logger.Error("failed to create redis resolver", "err", err)
92 res = idresolver.DefaultResolver(config.Plc.PLCURL)
93 }
94
95 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint})
96 if err != nil {
97 return nil, fmt.Errorf("failed to create posthog client: %w", err)
98 }
99
100 pages := pages.NewPages(config, res, d, log.SubLogger(logger, "pages"))
101 oauth, err := oauth.New(config, posthog, d, enforcer, res, log.SubLogger(logger, "oauth"))
102 if err != nil {
103 return nil, fmt.Errorf("failed to start oauth handler: %w", err)
104 }
105 validator := validator.New(d, res, enforcer)
106
107 repoResolver := reporesolver.New(config, enforcer, d)
108
109 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver"))
110
111 wrapper := db.DbWrapper{Execer: d}
112 jc, err := jetstream.NewJetstreamClient(
113 config.Jetstream.Endpoint,
114 "appview",
115 []string{
116 tangled.GraphFollowNSID,
117 tangled.FeedStarNSID,
118 tangled.PublicKeyNSID,
119 tangled.RepoArtifactNSID,
120 tangled.ActorProfileNSID,
121 tangled.KnotMemberNSID,
122 tangled.SpindleMemberNSID,
123 tangled.SpindleNSID,
124 tangled.StringNSID,
125 tangled.RepoIssueNSID,
126 tangled.RepoIssueCommentNSID,
127 tangled.LabelDefinitionNSID,
128 tangled.LabelOpNSID,
129 },
130 nil,
131 tlog.SubLogger(logger, "jetstream"),
132 wrapper,
133 false,
134
135 // in-memory filter is inapplicable to appview so
136 // we'll never log dids anyway.
137 false,
138 )
139 if err != nil {
140 return nil, fmt.Errorf("failed to create jetstream client: %w", err)
141 }
142
143 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil {
144 return nil, fmt.Errorf("failed to backfill default label defs: %w", err)
145 }
146
147 ingester := appview.Ingester{
148 Db: wrapper,
149 Enforcer: enforcer,
150 IdResolver: res,
151 Config: config,
152 Logger: log.SubLogger(logger, "ingester"),
153 Validator: validator,
154 }
155 err = jc.StartJetstream(ctx, ingester.Ingest())
156 if err != nil {
157 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err)
158 }
159
160 var notifiers []notify.Notifier
161
162 // Always add the database notifier
163 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res))
164
165 // Add other notifiers in production only
166 if !config.Core.Dev {
167 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog))
168 }
169 notifiers = append(notifiers, indexer)
170
171 // Add webhook notifier
172 notifiers = append(notifiers, notify.NewWebhookNotifier(d))
173
174 notifier := notify.NewMergedNotifier(notifiers)
175 notifier = notify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify"))
176
177 var cfClient *cloudflare.Client
178 if config.Cloudflare.ApiToken != "" {
179 cfClient, err = cloudflare.New(config)
180 if err != nil {
181 logger.Warn("failed to create cloudflare client, sites upload will be disabled", "err", err)
182 cfClient = nil
183 }
184 }
185
186 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog, notifier, cfClient)
187 if err != nil {
188 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err)
189 }
190 knotstream.Start(ctx)
191
192 spindlestream, err := Spindlestream(ctx, config, d, enforcer)
193 if err != nil {
194 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err)
195 }
196 spindlestream.Start(ctx)
197
198 state := &State{
199 db: d,
200 notifier: notifier,
201 indexer: indexer,
202 oauth: oauth,
203 enforcer: enforcer,
204 pages: pages,
205 idResolver: res,
206 mentionsResolver: mentionsResolver,
207 posthog: posthog,
208 jc: jc,
209 config: config,
210 repoResolver: repoResolver,
211 knotstream: knotstream,
212 spindlestream: spindlestream,
213 logger: logger,
214 validator: validator,
215 cfClient: cfClient,
216 }
217
218 // fetch initial bluesky posts if configured
219 go fetchBskyPosts(ctx, res, config, d, logger)
220
221 return state, nil
222}
223
224func (s *State) Close() error {
225 // other close up logic goes here
226 return s.db.Close()
227}
228
229func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) {
230 w.Header().Set("Content-Type", "text/plain")
231 w.Header().Set("Cache-Control", "public, max-age=86400") // one day
232
233 robotsTxt := `# Hello, Tanglers!
234User-agent: *
235Allow: /
236Disallow: /*/*/settings
237Disallow: /settings
238Disallow: /*/*/compare
239Disallow: /*/*/fork
240
241Crawl-delay: 1
242`
243 w.Write([]byte(robotsTxt))
244}
245
246func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) {
247 user := s.oauth.GetMultiAccountUser(r)
248 s.pages.TermsOfService(w, pages.TermsOfServiceParams{
249 LoggedInUser: user,
250 })
251}
252
253func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) {
254 user := s.oauth.GetMultiAccountUser(r)
255 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{
256 LoggedInUser: user,
257 })
258}
259
260func (s *State) Brand(w http.ResponseWriter, r *http.Request) {
261 user := s.oauth.GetMultiAccountUser(r)
262 s.pages.Brand(w, pages.BrandParams{
263 LoggedInUser: user,
264 })
265}
266
267func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) {
268 user := s.oauth.GetMultiAccountUser(r)
269 if user == nil {
270 return
271 }
272
273 l := s.logger.With("handler", "UpgradeBanner")
274 l = l.With("did", user.Active.Did)
275
276 regs, err := db.GetRegistrations(
277 s.db,
278 orm.FilterEq("did", user.Active.Did),
279 orm.FilterEq("needs_upgrade", 1),
280 )
281 if err != nil {
282 l.Error("non-fatal: failed to get registrations", "err", err)
283 }
284
285 spindles, err := db.GetSpindles(
286 s.db,
287 orm.FilterEq("owner", user.Active.Did),
288 orm.FilterEq("needs_upgrade", 1),
289 )
290 if err != nil {
291 l.Error("non-fatal: failed to get spindles", "err", err)
292 }
293
294 if regs == nil && spindles == nil {
295 return
296 }
297
298 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{
299 Registrations: regs,
300 Spindles: spindles,
301 })
302}
303
304func (s *State) Keys(w http.ResponseWriter, r *http.Request) {
305 user := chi.URLParam(r, "user")
306 user = strings.TrimPrefix(user, "@")
307
308 if user == "" {
309 w.WriteHeader(http.StatusBadRequest)
310 return
311 }
312
313 id, err := s.idResolver.ResolveIdent(r.Context(), user)
314 if err != nil {
315 w.WriteHeader(http.StatusInternalServerError)
316 return
317 }
318
319 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String())
320 if err != nil {
321 s.logger.Error("failed to get public keys", "err", err)
322 http.Error(w, "failed to get public keys", http.StatusInternalServerError)
323 return
324 }
325
326 if len(pubKeys) == 0 {
327 w.WriteHeader(http.StatusNoContent)
328 return
329 }
330
331 for _, k := range pubKeys {
332 key := strings.TrimRight(k.Key, "\n")
333 fmt.Fprintln(w, key)
334 }
335}
336
337func validateRepoName(name string) error {
338 // check for path traversal attempts
339 if name == "." || name == ".." ||
340 strings.Contains(name, "/") || strings.Contains(name, "\\") {
341 return fmt.Errorf("Repository name contains invalid path characters")
342 }
343
344 // check for sequences that could be used for traversal when normalized
345 if strings.Contains(name, "./") || strings.Contains(name, "../") ||
346 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") {
347 return fmt.Errorf("Repository name contains invalid path sequence")
348 }
349
350 // then continue with character validation
351 for _, char := range name {
352 if !((char >= 'a' && char <= 'z') ||
353 (char >= 'A' && char <= 'Z') ||
354 (char >= '0' && char <= '9') ||
355 char == '-' || char == '_' || char == '.') {
356 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores")
357 }
358 }
359
360 // additional check to prevent multiple sequential dots
361 if strings.Contains(name, "..") {
362 return fmt.Errorf("Repository name cannot contain sequential dots")
363 }
364
365 // if all checks pass
366 return nil
367}
368
369func stripGitExt(name string) string {
370 return strings.TrimSuffix(name, ".git")
371}
372
373func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) {
374 switch r.Method {
375 case http.MethodGet:
376 user := s.oauth.GetMultiAccountUser(r)
377 knots, err := s.enforcer.GetKnotsForUser(user.Active.Did)
378 if err != nil {
379 s.pages.Notice(w, "repo", "Invalid user account.")
380 return
381 }
382
383 s.pages.NewRepo(w, pages.NewRepoParams{
384 LoggedInUser: user,
385 Knots: knots,
386 })
387
388 case http.MethodPost:
389 l := s.logger.With("handler", "NewRepo")
390
391 user := s.oauth.GetMultiAccountUser(r)
392 l = l.With("did", user.Active.Did)
393
394 // form validation
395 domain := r.FormValue("domain")
396 if domain == "" {
397 s.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
398 return
399 }
400 l = l.With("knot", domain)
401
402 repoName := r.FormValue("name")
403 if repoName == "" {
404 s.pages.Notice(w, "repo", "Repository name cannot be empty.")
405 return
406 }
407
408 if err := validateRepoName(repoName); err != nil {
409 s.pages.Notice(w, "repo", err.Error())
410 return
411 }
412 repoName = stripGitExt(repoName)
413 l = l.With("repoName", repoName)
414
415 defaultBranch := r.FormValue("branch")
416 if defaultBranch == "" {
417 defaultBranch = "main"
418 }
419 l = l.With("defaultBranch", defaultBranch)
420
421 description := r.FormValue("description")
422 if len([]rune(description)) > 140 {
423 s.pages.Notice(w, "repo", "Description must be 140 characters or fewer.")
424 return
425 }
426
427 // ACL validation
428 ok, err := s.enforcer.E.Enforce(user.Active.Did, domain, domain, "repo:create")
429 if err != nil || !ok {
430 l.Info("unauthorized")
431 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
432 return
433 }
434
435 // Check for existing repos
436 existingRepo, err := db.GetRepo(
437 s.db,
438 orm.FilterEq("did", user.Active.Did),
439 orm.FilterEq("name", repoName),
440 )
441 if err == nil && existingRepo != nil {
442 l.Info("repo exists")
443 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot))
444 return
445 }
446
447 // create atproto record for this repo
448 rkey := tid.TID()
449 repo := &models.Repo{
450 Did: user.Active.Did,
451 Name: repoName,
452 Knot: domain,
453 Rkey: rkey,
454 Description: description,
455 Created: time.Now(),
456 Labels: s.config.Label.DefaultLabelDefs,
457 }
458 record := repo.AsRecord()
459
460 atpClient, err := s.oauth.AuthorizedClient(r)
461 if err != nil {
462 l.Info("PDS write failed", "err", err)
463 s.pages.Notice(w, "repo", "Failed to write record to PDS.")
464 return
465 }
466
467 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{
468 Collection: tangled.RepoNSID,
469 Repo: user.Active.Did,
470 Rkey: rkey,
471 Record: &lexutil.LexiconTypeDecoder{
472 Val: &record,
473 },
474 })
475 if err != nil {
476 l.Info("PDS write failed", "err", err)
477 s.pages.Notice(w, "repo", "Failed to announce repository creation.")
478 return
479 }
480
481 aturi := atresp.Uri
482 l = l.With("aturi", aturi)
483 l.Info("wrote to PDS")
484
485 tx, err := s.db.BeginTx(r.Context(), nil)
486 if err != nil {
487 l.Info("txn failed", "err", err)
488 s.pages.Notice(w, "repo", "Failed to save repository information.")
489 return
490 }
491
492 // The rollback function reverts a few things on failure:
493 // - the pending txn
494 // - the ACLs
495 // - the atproto record created
496 rollback := func() {
497 err1 := tx.Rollback()
498 err2 := s.enforcer.E.LoadPolicy()
499 err3 := rollbackRecord(context.Background(), aturi, atpClient)
500
501 // ignore txn complete errors, this is okay
502 if errors.Is(err1, sql.ErrTxDone) {
503 err1 = nil
504 }
505
506 if errs := errors.Join(err1, err2, err3); errs != nil {
507 l.Error("failed to rollback changes", "errs", errs)
508 return
509 }
510 }
511 defer rollback()
512
513 client, err := s.oauth.ServiceClient(
514 r,
515 oauth.WithService(domain),
516 oauth.WithLxm(tangled.RepoCreateNSID),
517 oauth.WithDev(s.config.Core.Dev),
518 )
519 if err != nil {
520 l.Error("service auth failed", "err", err)
521 s.pages.Notice(w, "repo", "Failed to reach PDS.")
522 return
523 }
524
525 xe := tangled.RepoCreate(
526 r.Context(),
527 client,
528 &tangled.RepoCreate_Input{
529 Rkey: rkey,
530 },
531 )
532 if err := xrpcclient.HandleXrpcErr(xe); err != nil {
533 l.Error("xrpc error", "xe", xe)
534 s.pages.Notice(w, "repo", err.Error())
535 return
536 }
537
538 err = db.AddRepo(tx, repo)
539 if err != nil {
540 l.Error("db write failed", "err", err)
541 s.pages.Notice(w, "repo", "Failed to save repository information.")
542 return
543 }
544
545 // acls
546 p, _ := securejoin.SecureJoin(user.Active.Did, repoName)
547 err = s.enforcer.AddRepo(user.Active.Did, domain, p)
548 if err != nil {
549 l.Error("acl setup failed", "err", err)
550 s.pages.Notice(w, "repo", "Failed to set up repository permissions.")
551 return
552 }
553
554 err = tx.Commit()
555 if err != nil {
556 l.Error("txn commit failed", "err", err)
557 http.Error(w, err.Error(), http.StatusInternalServerError)
558 return
559 }
560
561 err = s.enforcer.E.SavePolicy()
562 if err != nil {
563 l.Error("acl save failed", "err", err)
564 http.Error(w, err.Error(), http.StatusInternalServerError)
565 return
566 }
567
568 // reset the ATURI because the transaction completed successfully
569 aturi = ""
570
571 s.notifier.NewRepo(r.Context(), repo)
572 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Active.Did, repoName))
573 }
574}
575
576// this is used to rollback changes made to the PDS
577//
578// it is a no-op if the provided ATURI is empty
579func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error {
580 if aturi == "" {
581 return nil
582 }
583
584 parsed := syntax.ATURI(aturi)
585
586 collection := parsed.Collection().String()
587 repo := parsed.Authority().String()
588 rkey := parsed.RecordKey().String()
589
590 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{
591 Collection: collection,
592 Repo: repo,
593 Rkey: rkey,
594 })
595 return err
596}
597
598func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error {
599 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults))
600 if err != nil {
601 return err
602 }
603 // already present
604 if len(defaultLabels) == len(defaults) {
605 return nil
606 }
607
608 labelDefs, err := models.FetchLabelDefs(r, defaults)
609 if err != nil {
610 return err
611 }
612
613 // Insert each label definition to the database
614 for _, labelDef := range labelDefs {
615 _, err = db.AddLabelDefinition(e, &labelDef)
616 if err != nil {
617 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err)
618 }
619 }
620
621 return nil
622}
623
624func fetchBskyPosts(ctx context.Context, res *idresolver.Resolver, config *config.Config, d *db.DB, logger *slog.Logger) {
625 resolved, err := res.ResolveIdent(context.Background(), consts.TangledDid)
626 if err != nil {
627 logger.Error("failed to resolve tangled.org DID", "err", err)
628 return
629 }
630
631 pdsEndpoint := resolved.PDSEndpoint()
632 if pdsEndpoint == "" {
633 logger.Error("no PDS endpoint found for tangled.sh DID")
634 return
635 }
636
637 session, err := oauth.CreateAppPasswordSession(res, config.Core.AppPassword, consts.TangledDid, logger)
638 if err != nil {
639 logger.Error("failed to create appassword session... skipping fetch", "err", err)
640 return
641 }
642
643 client := xrpc.Client{
644 Auth: &xrpc.AuthInfo{
645 AccessJwt: session.AccessJwt,
646 Did: session.Did,
647 },
648 Host: session.PdsEndpoint,
649 }
650
651 l := log.SubLogger(logger, "bluesky")
652
653 ticker := time.NewTicker(config.Bluesky.UpdateInterval)
654 defer ticker.Stop()
655
656 for {
657 posts, _, err := bsky.FetchPosts(ctx, &client, 20, "")
658 if err != nil {
659 l.Error("failed to fetch bluesky posts", "err", err)
660 } else if err := db.InsertBlueskyPosts(d, posts); err != nil {
661 l.Error("failed to insert bluesky posts", "err", err)
662 } else {
663 l.Info("inserted bluesky posts", "count", len(posts))
664 }
665
666 select {
667 case <-ticker.C:
668 case <-ctx.Done():
669 l.Info("stopping bluesky updater")
670 return
671 }
672 }
673}