Monorepo for Tangled
1package state
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "net/http"
10 "strings"
11 "time"
12
13 "tangled.org/core/api/tangled"
14 "tangled.org/core/appview"
15 "tangled.org/core/appview/bsky"
16 "tangled.org/core/appview/config"
17 "tangled.org/core/appview/db"
18 "tangled.org/core/appview/indexer"
19 "tangled.org/core/appview/mentions"
20 "tangled.org/core/appview/models"
21 "tangled.org/core/appview/notify"
22 dbnotify "tangled.org/core/appview/notify/db"
23 phnotify "tangled.org/core/appview/notify/posthog"
24 "tangled.org/core/appview/oauth"
25 "tangled.org/core/appview/pages"
26 "tangled.org/core/appview/reporesolver"
27 "tangled.org/core/appview/validator"
28 xrpcclient "tangled.org/core/appview/xrpcclient"
29 "tangled.org/core/consts"
30 "tangled.org/core/eventconsumer"
31 "tangled.org/core/idresolver"
32 "tangled.org/core/jetstream"
33 "tangled.org/core/log"
34 tlog "tangled.org/core/log"
35 "tangled.org/core/orm"
36 "tangled.org/core/rbac"
37 "tangled.org/core/tid"
38
39 comatproto "github.com/bluesky-social/indigo/api/atproto"
40 atpclient "github.com/bluesky-social/indigo/atproto/client"
41 "github.com/bluesky-social/indigo/atproto/syntax"
42 lexutil "github.com/bluesky-social/indigo/lex/util"
43 "github.com/bluesky-social/indigo/xrpc"
44 securejoin "github.com/cyphar/filepath-securejoin"
45 "github.com/go-chi/chi/v5"
46 "github.com/posthog/posthog-go"
47)
48
49type State struct {
50 db *db.DB
51 notifier notify.Notifier
52 indexer *indexer.Indexer
53 oauth *oauth.OAuth
54 enforcer *rbac.Enforcer
55 pages *pages.Pages
56 idResolver *idresolver.Resolver
57 mentionsResolver *mentions.Resolver
58 posthog posthog.Client
59 jc *jetstream.JetstreamClient
60 config *config.Config
61 repoResolver *reporesolver.RepoResolver
62 knotstream *eventconsumer.Consumer
63 spindlestream *eventconsumer.Consumer
64 logger *slog.Logger
65 validator *validator.Validator
66}
67
68func Make(ctx context.Context, config *config.Config) (*State, error) {
69 logger := tlog.FromContext(ctx)
70
71 d, err := db.Make(ctx, config.Core.DbPath)
72 if err != nil {
73 return nil, fmt.Errorf("failed to create db: %w", err)
74 }
75
76 indexer := indexer.New(log.SubLogger(logger, "indexer"))
77 err = indexer.Init(ctx, d)
78 if err != nil {
79 return nil, fmt.Errorf("failed to create indexer: %w", err)
80 }
81
82 enforcer, err := rbac.NewEnforcer(config.Core.DbPath)
83 if err != nil {
84 return nil, fmt.Errorf("failed to create enforcer: %w", err)
85 }
86
87 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL)
88 if err != nil {
89 logger.Error("failed to create redis resolver", "err", err)
90 res = idresolver.DefaultResolver(config.Plc.PLCURL)
91 }
92
93 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint})
94 if err != nil {
95 return nil, fmt.Errorf("failed to create posthog client: %w", err)
96 }
97
98 pages := pages.NewPages(config, res, d, log.SubLogger(logger, "pages"))
99 oauth, err := oauth.New(config, posthog, d, enforcer, res, log.SubLogger(logger, "oauth"))
100 if err != nil {
101 return nil, fmt.Errorf("failed to start oauth handler: %w", err)
102 }
103 validator := validator.New(d, res, enforcer)
104
105 repoResolver := reporesolver.New(config, enforcer, d)
106
107 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver"))
108
109 wrapper := db.DbWrapper{Execer: d}
110 jc, err := jetstream.NewJetstreamClient(
111 config.Jetstream.Endpoint,
112 "appview",
113 []string{
114 tangled.GraphFollowNSID,
115 tangled.FeedStarNSID,
116 tangled.PublicKeyNSID,
117 tangled.RepoArtifactNSID,
118 tangled.ActorProfileNSID,
119 tangled.KnotMemberNSID,
120 tangled.SpindleMemberNSID,
121 tangled.SpindleNSID,
122 tangled.StringNSID,
123 tangled.RepoIssueNSID,
124 tangled.RepoIssueCommentNSID,
125 tangled.LabelDefinitionNSID,
126 tangled.LabelOpNSID,
127 },
128 nil,
129 tlog.SubLogger(logger, "jetstream"),
130 wrapper,
131 false,
132
133 // in-memory filter is inapplicable to appview so
134 // we'll never log dids anyway.
135 false,
136 )
137 if err != nil {
138 return nil, fmt.Errorf("failed to create jetstream client: %w", err)
139 }
140
141 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil {
142 return nil, fmt.Errorf("failed to backfill default label defs: %w", err)
143 }
144
145 ingester := appview.Ingester{
146 Db: wrapper,
147 Enforcer: enforcer,
148 IdResolver: res,
149 Config: config,
150 Logger: log.SubLogger(logger, "ingester"),
151 Validator: validator,
152 }
153 err = jc.StartJetstream(ctx, ingester.Ingest())
154 if err != nil {
155 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err)
156 }
157
158 var notifiers []notify.Notifier
159
160 // Always add the database notifier
161 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res))
162
163 // Add other notifiers in production only
164 if !config.Core.Dev {
165 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog))
166 }
167 notifiers = append(notifiers, indexer)
168
169 // Add webhook notifier
170 notifiers = append(notifiers, notify.NewWebhookNotifier(d))
171
172 notifier := notify.NewMergedNotifier(notifiers)
173 notifier = notify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify"))
174
175 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog, notifier)
176 if err != nil {
177 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err)
178 }
179 knotstream.Start(ctx)
180
181 spindlestream, err := Spindlestream(ctx, config, d, enforcer)
182 if err != nil {
183 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err)
184 }
185 spindlestream.Start(ctx)
186
187 state := &State{
188 d,
189 notifier,
190 indexer,
191 oauth,
192 enforcer,
193 pages,
194 res,
195 mentionsResolver,
196 posthog,
197 jc,
198 config,
199 repoResolver,
200 knotstream,
201 spindlestream,
202 logger,
203 validator,
204 }
205
206 // fetch initial bluesky posts if configured
207 go fetchBskyPosts(ctx, res, config, d, logger)
208
209 return state, nil
210}
211
212func (s *State) Close() error {
213 // other close up logic goes here
214 return s.db.Close()
215}
216
217func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) {
218 w.Header().Set("Content-Type", "text/plain")
219 w.Header().Set("Cache-Control", "public, max-age=86400") // one day
220
221 robotsTxt := `# Hello, Tanglers!
222User-agent: *
223Allow: /
224Disallow: /*/*/settings
225Disallow: /settings
226Disallow: /*/*/compare
227Disallow: /*/*/fork
228
229Crawl-delay: 1
230`
231 w.Write([]byte(robotsTxt))
232}
233
234func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) {
235 user := s.oauth.GetMultiAccountUser(r)
236 s.pages.TermsOfService(w, pages.TermsOfServiceParams{
237 LoggedInUser: user,
238 })
239}
240
241func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) {
242 user := s.oauth.GetMultiAccountUser(r)
243 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{
244 LoggedInUser: user,
245 })
246}
247
248func (s *State) Brand(w http.ResponseWriter, r *http.Request) {
249 user := s.oauth.GetMultiAccountUser(r)
250 s.pages.Brand(w, pages.BrandParams{
251 LoggedInUser: user,
252 })
253}
254
255func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) {
256 user := s.oauth.GetMultiAccountUser(r)
257 if user == nil {
258 return
259 }
260
261 l := s.logger.With("handler", "UpgradeBanner")
262 l = l.With("did", user.Active.Did)
263
264 regs, err := db.GetRegistrations(
265 s.db,
266 orm.FilterEq("did", user.Active.Did),
267 orm.FilterEq("needs_upgrade", 1),
268 )
269 if err != nil {
270 l.Error("non-fatal: failed to get registrations", "err", err)
271 }
272
273 spindles, err := db.GetSpindles(
274 s.db,
275 orm.FilterEq("owner", user.Active.Did),
276 orm.FilterEq("needs_upgrade", 1),
277 )
278 if err != nil {
279 l.Error("non-fatal: failed to get spindles", "err", err)
280 }
281
282 if regs == nil && spindles == nil {
283 return
284 }
285
286 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{
287 Registrations: regs,
288 Spindles: spindles,
289 })
290}
291
292func (s *State) Keys(w http.ResponseWriter, r *http.Request) {
293 user := chi.URLParam(r, "user")
294 user = strings.TrimPrefix(user, "@")
295
296 if user == "" {
297 w.WriteHeader(http.StatusBadRequest)
298 return
299 }
300
301 id, err := s.idResolver.ResolveIdent(r.Context(), user)
302 if err != nil {
303 w.WriteHeader(http.StatusInternalServerError)
304 return
305 }
306
307 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String())
308 if err != nil {
309 s.logger.Error("failed to get public keys", "err", err)
310 http.Error(w, "failed to get public keys", http.StatusInternalServerError)
311 return
312 }
313
314 if len(pubKeys) == 0 {
315 w.WriteHeader(http.StatusNoContent)
316 return
317 }
318
319 for _, k := range pubKeys {
320 key := strings.TrimRight(k.Key, "\n")
321 fmt.Fprintln(w, key)
322 }
323}
324
325func validateRepoName(name string) error {
326 // check for path traversal attempts
327 if name == "." || name == ".." ||
328 strings.Contains(name, "/") || strings.Contains(name, "\\") {
329 return fmt.Errorf("Repository name contains invalid path characters")
330 }
331
332 // check for sequences that could be used for traversal when normalized
333 if strings.Contains(name, "./") || strings.Contains(name, "../") ||
334 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") {
335 return fmt.Errorf("Repository name contains invalid path sequence")
336 }
337
338 // then continue with character validation
339 for _, char := range name {
340 if !((char >= 'a' && char <= 'z') ||
341 (char >= 'A' && char <= 'Z') ||
342 (char >= '0' && char <= '9') ||
343 char == '-' || char == '_' || char == '.') {
344 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores")
345 }
346 }
347
348 // additional check to prevent multiple sequential dots
349 if strings.Contains(name, "..") {
350 return fmt.Errorf("Repository name cannot contain sequential dots")
351 }
352
353 // if all checks pass
354 return nil
355}
356
357func stripGitExt(name string) string {
358 return strings.TrimSuffix(name, ".git")
359}
360
361func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) {
362 switch r.Method {
363 case http.MethodGet:
364 user := s.oauth.GetMultiAccountUser(r)
365 knots, err := s.enforcer.GetKnotsForUser(user.Active.Did)
366 if err != nil {
367 s.pages.Notice(w, "repo", "Invalid user account.")
368 return
369 }
370
371 s.pages.NewRepo(w, pages.NewRepoParams{
372 LoggedInUser: user,
373 Knots: knots,
374 })
375
376 case http.MethodPost:
377 l := s.logger.With("handler", "NewRepo")
378
379 user := s.oauth.GetMultiAccountUser(r)
380 l = l.With("did", user.Active.Did)
381
382 // form validation
383 domain := r.FormValue("domain")
384 if domain == "" {
385 s.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
386 return
387 }
388 l = l.With("knot", domain)
389
390 repoName := r.FormValue("name")
391 if repoName == "" {
392 s.pages.Notice(w, "repo", "Repository name cannot be empty.")
393 return
394 }
395
396 if err := validateRepoName(repoName); err != nil {
397 s.pages.Notice(w, "repo", err.Error())
398 return
399 }
400 repoName = stripGitExt(repoName)
401 l = l.With("repoName", repoName)
402
403 defaultBranch := r.FormValue("branch")
404 if defaultBranch == "" {
405 defaultBranch = "main"
406 }
407 l = l.With("defaultBranch", defaultBranch)
408
409 description := r.FormValue("description")
410 if len([]rune(description)) > 140 {
411 s.pages.Notice(w, "repo", "Description must be 140 characters or fewer.")
412 return
413 }
414
415 // ACL validation
416 ok, err := s.enforcer.E.Enforce(user.Active.Did, domain, domain, "repo:create")
417 if err != nil || !ok {
418 l.Info("unauthorized")
419 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
420 return
421 }
422
423 // Check for existing repos
424 existingRepo, err := db.GetRepo(
425 s.db,
426 orm.FilterEq("did", user.Active.Did),
427 orm.FilterEq("name", repoName),
428 )
429 if err == nil && existingRepo != nil {
430 l.Info("repo exists")
431 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot))
432 return
433 }
434
435 // create atproto record for this repo
436 rkey := tid.TID()
437 repo := &models.Repo{
438 Did: user.Active.Did,
439 Name: repoName,
440 Knot: domain,
441 Rkey: rkey,
442 Description: description,
443 Created: time.Now(),
444 Labels: s.config.Label.DefaultLabelDefs,
445 }
446 record := repo.AsRecord()
447
448 atpClient, err := s.oauth.AuthorizedClient(r)
449 if err != nil {
450 l.Info("PDS write failed", "err", err)
451 s.pages.Notice(w, "repo", "Failed to write record to PDS.")
452 return
453 }
454
455 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{
456 Collection: tangled.RepoNSID,
457 Repo: user.Active.Did,
458 Rkey: rkey,
459 Record: &lexutil.LexiconTypeDecoder{
460 Val: &record,
461 },
462 })
463 if err != nil {
464 l.Info("PDS write failed", "err", err)
465 s.pages.Notice(w, "repo", "Failed to announce repository creation.")
466 return
467 }
468
469 aturi := atresp.Uri
470 l = l.With("aturi", aturi)
471 l.Info("wrote to PDS")
472
473 tx, err := s.db.BeginTx(r.Context(), nil)
474 if err != nil {
475 l.Info("txn failed", "err", err)
476 s.pages.Notice(w, "repo", "Failed to save repository information.")
477 return
478 }
479
480 // The rollback function reverts a few things on failure:
481 // - the pending txn
482 // - the ACLs
483 // - the atproto record created
484 rollback := func() {
485 err1 := tx.Rollback()
486 err2 := s.enforcer.E.LoadPolicy()
487 err3 := rollbackRecord(context.Background(), aturi, atpClient)
488
489 // ignore txn complete errors, this is okay
490 if errors.Is(err1, sql.ErrTxDone) {
491 err1 = nil
492 }
493
494 if errs := errors.Join(err1, err2, err3); errs != nil {
495 l.Error("failed to rollback changes", "errs", errs)
496 return
497 }
498 }
499 defer rollback()
500
501 client, err := s.oauth.ServiceClient(
502 r,
503 oauth.WithService(domain),
504 oauth.WithLxm(tangled.RepoCreateNSID),
505 oauth.WithDev(s.config.Core.Dev),
506 )
507 if err != nil {
508 l.Error("service auth failed", "err", err)
509 s.pages.Notice(w, "repo", "Failed to reach PDS.")
510 return
511 }
512
513 xe := tangled.RepoCreate(
514 r.Context(),
515 client,
516 &tangled.RepoCreate_Input{
517 Rkey: rkey,
518 },
519 )
520 if err := xrpcclient.HandleXrpcErr(xe); err != nil {
521 l.Error("xrpc error", "xe", xe)
522 s.pages.Notice(w, "repo", err.Error())
523 return
524 }
525
526 err = db.AddRepo(tx, repo)
527 if err != nil {
528 l.Error("db write failed", "err", err)
529 s.pages.Notice(w, "repo", "Failed to save repository information.")
530 return
531 }
532
533 // acls
534 p, _ := securejoin.SecureJoin(user.Active.Did, repoName)
535 err = s.enforcer.AddRepo(user.Active.Did, domain, p)
536 if err != nil {
537 l.Error("acl setup failed", "err", err)
538 s.pages.Notice(w, "repo", "Failed to set up repository permissions.")
539 return
540 }
541
542 err = tx.Commit()
543 if err != nil {
544 l.Error("txn commit failed", "err", err)
545 http.Error(w, err.Error(), http.StatusInternalServerError)
546 return
547 }
548
549 err = s.enforcer.E.SavePolicy()
550 if err != nil {
551 l.Error("acl save failed", "err", err)
552 http.Error(w, err.Error(), http.StatusInternalServerError)
553 return
554 }
555
556 // reset the ATURI because the transaction completed successfully
557 aturi = ""
558
559 s.notifier.NewRepo(r.Context(), repo)
560 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Active.Did, repoName))
561 }
562}
563
564// this is used to rollback changes made to the PDS
565//
566// it is a no-op if the provided ATURI is empty
567func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error {
568 if aturi == "" {
569 return nil
570 }
571
572 parsed := syntax.ATURI(aturi)
573
574 collection := parsed.Collection().String()
575 repo := parsed.Authority().String()
576 rkey := parsed.RecordKey().String()
577
578 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{
579 Collection: collection,
580 Repo: repo,
581 Rkey: rkey,
582 })
583 return err
584}
585
586func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error {
587 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults))
588 if err != nil {
589 return err
590 }
591 // already present
592 if len(defaultLabels) == len(defaults) {
593 return nil
594 }
595
596 labelDefs, err := models.FetchLabelDefs(r, defaults)
597 if err != nil {
598 return err
599 }
600
601 // Insert each label definition to the database
602 for _, labelDef := range labelDefs {
603 _, err = db.AddLabelDefinition(e, &labelDef)
604 if err != nil {
605 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err)
606 }
607 }
608
609 return nil
610}
611
612func fetchBskyPosts(ctx context.Context, res *idresolver.Resolver, config *config.Config, d *db.DB, logger *slog.Logger) {
613 resolved, err := res.ResolveIdent(context.Background(), consts.TangledDid)
614 if err != nil {
615 logger.Error("failed to resolve tangled.org DID", "err", err)
616 return
617 }
618
619 pdsEndpoint := resolved.PDSEndpoint()
620 if pdsEndpoint == "" {
621 logger.Error("no PDS endpoint found for tangled.sh DID")
622 return
623 }
624
625 session, err := oauth.CreateAppPasswordSession(res, config.Core.AppPassword, consts.TangledDid, config.Core.RateLimitBypass, logger)
626 if err != nil {
627 logger.Error("failed to create appassword session... skipping fetch", "err", err)
628 return
629 }
630
631 client := xrpc.Client{
632 Auth: &xrpc.AuthInfo{
633 AccessJwt: session.AccessJwt,
634 Did: session.Did,
635 },
636 Host: session.PdsEndpoint,
637 }
638
639 l := log.SubLogger(logger, "bluesky")
640
641 ticker := time.NewTicker(config.Bluesky.UpdateInterval)
642 defer ticker.Stop()
643
644 for {
645 posts, _, err := bsky.FetchPosts(ctx, &client, 20, "")
646 if err != nil {
647 l.Error("failed to fetch bluesky posts", "err", err)
648 } else if err := db.InsertBlueskyPosts(d, posts); err != nil {
649 l.Error("failed to insert bluesky posts", "err", err)
650 } else {
651 l.Info("inserted bluesky posts", "count", len(posts))
652 }
653
654 select {
655 case <-ticker.C:
656 case <-ctx.Done():
657 l.Info("stopping bluesky updater")
658 return
659 }
660 }
661}