Monorepo for Tangled
1package state
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "net/http"
10 "strings"
11 "time"
12
13 "tangled.org/core/api/tangled"
14 "tangled.org/core/appview"
15 "tangled.org/core/appview/bsky"
16 "tangled.org/core/appview/config"
17 "tangled.org/core/appview/db"
18 "tangled.org/core/appview/indexer"
19 "tangled.org/core/appview/mentions"
20 "tangled.org/core/appview/models"
21 "tangled.org/core/appview/notify"
22 dbnotify "tangled.org/core/appview/notify/db"
23 phnotify "tangled.org/core/appview/notify/posthog"
24 "tangled.org/core/appview/oauth"
25 "tangled.org/core/appview/pages"
26 "tangled.org/core/appview/reporesolver"
27 "tangled.org/core/appview/validator"
28 xrpcclient "tangled.org/core/appview/xrpcclient"
29 "tangled.org/core/consts"
30 "tangled.org/core/eventconsumer"
31 "tangled.org/core/idresolver"
32 "tangled.org/core/jetstream"
33 "tangled.org/core/log"
34 tlog "tangled.org/core/log"
35 "tangled.org/core/orm"
36 "tangled.org/core/rbac"
37 "tangled.org/core/tid"
38
39 comatproto "github.com/bluesky-social/indigo/api/atproto"
40 atpclient "github.com/bluesky-social/indigo/atproto/client"
41 "github.com/bluesky-social/indigo/atproto/syntax"
42 lexutil "github.com/bluesky-social/indigo/lex/util"
43 "github.com/bluesky-social/indigo/xrpc"
44 securejoin "github.com/cyphar/filepath-securejoin"
45 "github.com/go-chi/chi/v5"
46 "github.com/posthog/posthog-go"
47)
48
49type State struct {
50 db *db.DB
51 notifier notify.Notifier
52 indexer *indexer.Indexer
53 oauth *oauth.OAuth
54 enforcer *rbac.Enforcer
55 pages *pages.Pages
56 idResolver *idresolver.Resolver
57 mentionsResolver *mentions.Resolver
58 posthog posthog.Client
59 jc *jetstream.JetstreamClient
60 config *config.Config
61 repoResolver *reporesolver.RepoResolver
62 knotstream *eventconsumer.Consumer
63 spindlestream *eventconsumer.Consumer
64 logger *slog.Logger
65 validator *validator.Validator
66}
67
68func Make(ctx context.Context, config *config.Config) (*State, error) {
69 logger := tlog.FromContext(ctx)
70
71 d, err := db.Make(ctx, config.Core.DbPath)
72 if err != nil {
73 return nil, fmt.Errorf("failed to create db: %w", err)
74 }
75
76 indexer := indexer.New(log.SubLogger(logger, "indexer"))
77 err = indexer.Init(ctx, d)
78 if err != nil {
79 return nil, fmt.Errorf("failed to create indexer: %w", err)
80 }
81
82 enforcer, err := rbac.NewEnforcer(config.Core.DbPath)
83 if err != nil {
84 return nil, fmt.Errorf("failed to create enforcer: %w", err)
85 }
86
87 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL)
88 if err != nil {
89 logger.Error("failed to create redis resolver", "err", err)
90 res = idresolver.DefaultResolver(config.Plc.PLCURL)
91 }
92
93 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint})
94 if err != nil {
95 return nil, fmt.Errorf("failed to create posthog client: %w", err)
96 }
97
98 pages := pages.NewPages(config, res, d, log.SubLogger(logger, "pages"))
99 oauth, err := oauth.New(config, posthog, d, enforcer, res, log.SubLogger(logger, "oauth"))
100 if err != nil {
101 return nil, fmt.Errorf("failed to start oauth handler: %w", err)
102 }
103 validator := validator.New(d, res, enforcer)
104
105 repoResolver := reporesolver.New(config, enforcer, d)
106
107 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver"))
108
109 wrapper := db.DbWrapper{Execer: d}
110 jc, err := jetstream.NewJetstreamClient(
111 config.Jetstream.Endpoint,
112 "appview",
113 []string{
114 tangled.GraphFollowNSID,
115 tangled.FeedStarNSID,
116 tangled.PublicKeyNSID,
117 tangled.RepoArtifactNSID,
118 tangled.ActorProfileNSID,
119 tangled.SpindleMemberNSID,
120 tangled.SpindleNSID,
121 tangled.StringNSID,
122 tangled.RepoIssueNSID,
123 tangled.RepoIssueCommentNSID,
124 tangled.LabelDefinitionNSID,
125 tangled.LabelOpNSID,
126 },
127 nil,
128 tlog.SubLogger(logger, "jetstream"),
129 wrapper,
130 false,
131
132 // in-memory filter is inapplicable to appview so
133 // we'll never log dids anyway.
134 false,
135 )
136 if err != nil {
137 return nil, fmt.Errorf("failed to create jetstream client: %w", err)
138 }
139
140 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil {
141 return nil, fmt.Errorf("failed to backfill default label defs: %w", err)
142 }
143
144 ingester := appview.Ingester{
145 Db: wrapper,
146 Enforcer: enforcer,
147 IdResolver: res,
148 Config: config,
149 Logger: log.SubLogger(logger, "ingester"),
150 Validator: validator,
151 }
152 err = jc.StartJetstream(ctx, ingester.Ingest())
153 if err != nil {
154 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err)
155 }
156
157 var notifiers []notify.Notifier
158
159 // Always add the database notifier
160 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res))
161
162 // Add other notifiers in production only
163 if !config.Core.Dev {
164 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog))
165 }
166 notifiers = append(notifiers, indexer)
167
168 // Add webhook notifier
169 notifiers = append(notifiers, notify.NewWebhookNotifier(d))
170
171 notifier := notify.NewMergedNotifier(notifiers)
172 notifier = notify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify"))
173
174 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog, notifier)
175 if err != nil {
176 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err)
177 }
178 knotstream.Start(ctx)
179
180 spindlestream, err := Spindlestream(ctx, config, d, enforcer)
181 if err != nil {
182 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err)
183 }
184 spindlestream.Start(ctx)
185
186 state := &State{
187 d,
188 notifier,
189 indexer,
190 oauth,
191 enforcer,
192 pages,
193 res,
194 mentionsResolver,
195 posthog,
196 jc,
197 config,
198 repoResolver,
199 knotstream,
200 spindlestream,
201 logger,
202 validator,
203 }
204
205 // fetch initial bluesky posts if configured
206 go fetchBskyPosts(ctx, res, config, d, logger)
207
208 return state, nil
209}
210
211func (s *State) Close() error {
212 // other close up logic goes here
213 return s.db.Close()
214}
215
216func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) {
217 w.Header().Set("Content-Type", "text/plain")
218 w.Header().Set("Cache-Control", "public, max-age=86400") // one day
219
220 robotsTxt := `# Hello, Tanglers!
221User-agent: *
222Allow: /
223Disallow: /*/*/settings
224Disallow: /settings
225Disallow: /*/*/compare
226Disallow: /*/*/fork
227
228Crawl-delay: 1
229`
230 w.Write([]byte(robotsTxt))
231}
232
233func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) {
234 user := s.oauth.GetMultiAccountUser(r)
235 s.pages.TermsOfService(w, pages.TermsOfServiceParams{
236 LoggedInUser: user,
237 })
238}
239
240func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) {
241 user := s.oauth.GetMultiAccountUser(r)
242 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{
243 LoggedInUser: user,
244 })
245}
246
247func (s *State) Brand(w http.ResponseWriter, r *http.Request) {
248 user := s.oauth.GetMultiAccountUser(r)
249 s.pages.Brand(w, pages.BrandParams{
250 LoggedInUser: user,
251 })
252}
253
254func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) {
255 user := s.oauth.GetMultiAccountUser(r)
256 if user == nil {
257 return
258 }
259
260 l := s.logger.With("handler", "UpgradeBanner")
261 l = l.With("did", user.Active.Did)
262
263 regs, err := db.GetRegistrations(
264 s.db,
265 orm.FilterEq("did", user.Active.Did),
266 orm.FilterEq("needs_upgrade", 1),
267 )
268 if err != nil {
269 l.Error("non-fatal: failed to get registrations", "err", err)
270 }
271
272 spindles, err := db.GetSpindles(
273 s.db,
274 orm.FilterEq("owner", user.Active.Did),
275 orm.FilterEq("needs_upgrade", 1),
276 )
277 if err != nil {
278 l.Error("non-fatal: failed to get spindles", "err", err)
279 }
280
281 if regs == nil && spindles == nil {
282 return
283 }
284
285 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{
286 Registrations: regs,
287 Spindles: spindles,
288 })
289}
290
291func (s *State) Keys(w http.ResponseWriter, r *http.Request) {
292 user := chi.URLParam(r, "user")
293 user = strings.TrimPrefix(user, "@")
294
295 if user == "" {
296 w.WriteHeader(http.StatusBadRequest)
297 return
298 }
299
300 id, err := s.idResolver.ResolveIdent(r.Context(), user)
301 if err != nil {
302 w.WriteHeader(http.StatusInternalServerError)
303 return
304 }
305
306 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String())
307 if err != nil {
308 s.logger.Error("failed to get public keys", "err", err)
309 http.Error(w, "failed to get public keys", http.StatusInternalServerError)
310 return
311 }
312
313 if len(pubKeys) == 0 {
314 w.WriteHeader(http.StatusNoContent)
315 return
316 }
317
318 for _, k := range pubKeys {
319 key := strings.TrimRight(k.Key, "\n")
320 fmt.Fprintln(w, key)
321 }
322}
323
324func validateRepoName(name string) error {
325 // check for path traversal attempts
326 if name == "." || name == ".." ||
327 strings.Contains(name, "/") || strings.Contains(name, "\\") {
328 return fmt.Errorf("Repository name contains invalid path characters")
329 }
330
331 // check for sequences that could be used for traversal when normalized
332 if strings.Contains(name, "./") || strings.Contains(name, "../") ||
333 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") {
334 return fmt.Errorf("Repository name contains invalid path sequence")
335 }
336
337 // then continue with character validation
338 for _, char := range name {
339 if !((char >= 'a' && char <= 'z') ||
340 (char >= 'A' && char <= 'Z') ||
341 (char >= '0' && char <= '9') ||
342 char == '-' || char == '_' || char == '.') {
343 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores")
344 }
345 }
346
347 // additional check to prevent multiple sequential dots
348 if strings.Contains(name, "..") {
349 return fmt.Errorf("Repository name cannot contain sequential dots")
350 }
351
352 // if all checks pass
353 return nil
354}
355
356func stripGitExt(name string) string {
357 return strings.TrimSuffix(name, ".git")
358}
359
360func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) {
361 switch r.Method {
362 case http.MethodGet:
363 user := s.oauth.GetMultiAccountUser(r)
364 knots, err := s.enforcer.GetKnotsForUser(user.Active.Did)
365 if err != nil {
366 s.pages.Notice(w, "repo", "Invalid user account.")
367 return
368 }
369
370 s.pages.NewRepo(w, pages.NewRepoParams{
371 LoggedInUser: user,
372 Knots: knots,
373 })
374
375 case http.MethodPost:
376 l := s.logger.With("handler", "NewRepo")
377
378 user := s.oauth.GetMultiAccountUser(r)
379 l = l.With("did", user.Active.Did)
380
381 // form validation
382 domain := r.FormValue("domain")
383 if domain == "" {
384 s.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
385 return
386 }
387 l = l.With("knot", domain)
388
389 repoName := r.FormValue("name")
390 if repoName == "" {
391 s.pages.Notice(w, "repo", "Repository name cannot be empty.")
392 return
393 }
394
395 if err := validateRepoName(repoName); err != nil {
396 s.pages.Notice(w, "repo", err.Error())
397 return
398 }
399 repoName = stripGitExt(repoName)
400 l = l.With("repoName", repoName)
401
402 defaultBranch := r.FormValue("branch")
403 if defaultBranch == "" {
404 defaultBranch = "main"
405 }
406 l = l.With("defaultBranch", defaultBranch)
407
408 description := r.FormValue("description")
409 if len([]rune(description)) > 140 {
410 s.pages.Notice(w, "repo", "Description must be 140 characters or fewer.")
411 return
412 }
413
414 // ACL validation
415 ok, err := s.enforcer.E.Enforce(user.Active.Did, domain, domain, "repo:create")
416 if err != nil || !ok {
417 l.Info("unauthorized")
418 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
419 return
420 }
421
422 // Check for existing repos
423 existingRepo, err := db.GetRepo(
424 s.db,
425 orm.FilterEq("did", user.Active.Did),
426 orm.FilterEq("name", repoName),
427 )
428 if err == nil && existingRepo != nil {
429 l.Info("repo exists")
430 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot))
431 return
432 }
433
434 // create atproto record for this repo
435 rkey := tid.TID()
436 repo := &models.Repo{
437 Did: user.Active.Did,
438 Name: repoName,
439 Knot: domain,
440 Rkey: rkey,
441 Description: description,
442 Created: time.Now(),
443 Labels: s.config.Label.DefaultLabelDefs,
444 }
445 record := repo.AsRecord()
446
447 atpClient, err := s.oauth.AuthorizedClient(r)
448 if err != nil {
449 l.Info("PDS write failed", "err", err)
450 s.pages.Notice(w, "repo", "Failed to write record to PDS.")
451 return
452 }
453
454 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{
455 Collection: tangled.RepoNSID,
456 Repo: user.Active.Did,
457 Rkey: rkey,
458 Record: &lexutil.LexiconTypeDecoder{
459 Val: &record,
460 },
461 })
462 if err != nil {
463 l.Info("PDS write failed", "err", err)
464 s.pages.Notice(w, "repo", "Failed to announce repository creation.")
465 return
466 }
467
468 aturi := atresp.Uri
469 l = l.With("aturi", aturi)
470 l.Info("wrote to PDS")
471
472 tx, err := s.db.BeginTx(r.Context(), nil)
473 if err != nil {
474 l.Info("txn failed", "err", err)
475 s.pages.Notice(w, "repo", "Failed to save repository information.")
476 return
477 }
478
479 // The rollback function reverts a few things on failure:
480 // - the pending txn
481 // - the ACLs
482 // - the atproto record created
483 rollback := func() {
484 err1 := tx.Rollback()
485 err2 := s.enforcer.E.LoadPolicy()
486 err3 := rollbackRecord(context.Background(), aturi, atpClient)
487
488 // ignore txn complete errors, this is okay
489 if errors.Is(err1, sql.ErrTxDone) {
490 err1 = nil
491 }
492
493 if errs := errors.Join(err1, err2, err3); errs != nil {
494 l.Error("failed to rollback changes", "errs", errs)
495 return
496 }
497 }
498 defer rollback()
499
500 client, err := s.oauth.ServiceClient(
501 r,
502 oauth.WithService(domain),
503 oauth.WithLxm(tangled.RepoCreateNSID),
504 oauth.WithDev(s.config.Core.Dev),
505 )
506 if err != nil {
507 l.Error("service auth failed", "err", err)
508 s.pages.Notice(w, "repo", "Failed to reach PDS.")
509 return
510 }
511
512 xe := tangled.RepoCreate(
513 r.Context(),
514 client,
515 &tangled.RepoCreate_Input{
516 Rkey: rkey,
517 },
518 )
519 if err := xrpcclient.HandleXrpcErr(xe); err != nil {
520 l.Error("xrpc error", "xe", xe)
521 s.pages.Notice(w, "repo", err.Error())
522 return
523 }
524
525 err = db.AddRepo(tx, repo)
526 if err != nil {
527 l.Error("db write failed", "err", err)
528 s.pages.Notice(w, "repo", "Failed to save repository information.")
529 return
530 }
531
532 // acls
533 p, _ := securejoin.SecureJoin(user.Active.Did, repoName)
534 err = s.enforcer.AddRepo(user.Active.Did, domain, p)
535 if err != nil {
536 l.Error("acl setup failed", "err", err)
537 s.pages.Notice(w, "repo", "Failed to set up repository permissions.")
538 return
539 }
540
541 err = tx.Commit()
542 if err != nil {
543 l.Error("txn commit failed", "err", err)
544 http.Error(w, err.Error(), http.StatusInternalServerError)
545 return
546 }
547
548 err = s.enforcer.E.SavePolicy()
549 if err != nil {
550 l.Error("acl save failed", "err", err)
551 http.Error(w, err.Error(), http.StatusInternalServerError)
552 return
553 }
554
555 // reset the ATURI because the transaction completed successfully
556 aturi = ""
557
558 s.notifier.NewRepo(r.Context(), repo)
559 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Active.Did, repoName))
560 }
561}
562
563// this is used to rollback changes made to the PDS
564//
565// it is a no-op if the provided ATURI is empty
566func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error {
567 if aturi == "" {
568 return nil
569 }
570
571 parsed := syntax.ATURI(aturi)
572
573 collection := parsed.Collection().String()
574 repo := parsed.Authority().String()
575 rkey := parsed.RecordKey().String()
576
577 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{
578 Collection: collection,
579 Repo: repo,
580 Rkey: rkey,
581 })
582 return err
583}
584
585func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error {
586 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults))
587 if err != nil {
588 return err
589 }
590 // already present
591 if len(defaultLabels) == len(defaults) {
592 return nil
593 }
594
595 labelDefs, err := models.FetchLabelDefs(r, defaults)
596 if err != nil {
597 return err
598 }
599
600 // Insert each label definition to the database
601 for _, labelDef := range labelDefs {
602 _, err = db.AddLabelDefinition(e, &labelDef)
603 if err != nil {
604 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err)
605 }
606 }
607
608 return nil
609}
610
611func fetchBskyPosts(ctx context.Context, res *idresolver.Resolver, config *config.Config, d *db.DB, logger *slog.Logger) {
612 resolved, err := res.ResolveIdent(context.Background(), consts.TangledDid)
613 if err != nil {
614 logger.Error("failed to resolve tangled.org DID", "err", err)
615 return
616 }
617
618 pdsEndpoint := resolved.PDSEndpoint()
619 if pdsEndpoint == "" {
620 logger.Error("no PDS endpoint found for tangled.sh DID")
621 return
622 }
623
624 session, err := oauth.CreateAppPasswordSession(res, config.Core.AppPassword, consts.TangledDid)
625 if err != nil {
626 logger.Error("failed to create appassword session... skipping fetch", "err", err)
627 return
628 }
629
630 client := xrpc.Client{
631 Auth: &xrpc.AuthInfo{
632 AccessJwt: session.AccessJwt,
633 Did: session.Did,
634 },
635 Host: session.PdsEndpoint,
636 }
637
638 l := log.SubLogger(logger, "bluesky")
639
640 ticker := time.NewTicker(config.Bluesky.UpdateInterval)
641 defer ticker.Stop()
642
643 for {
644 posts, _, err := bsky.FetchPosts(ctx, &client, 20, "")
645 if err != nil {
646 l.Error("failed to fetch bluesky posts", "err", err)
647 } else if err := db.InsertBlueskyPosts(d, posts); err != nil {
648 l.Error("failed to insert bluesky posts", "err", err)
649 } else {
650 l.Info("inserted bluesky posts", "count", len(posts))
651 }
652
653 select {
654 case <-ticker.C:
655 case <-ctx.Done():
656 l.Info("stopping bluesky updater")
657 return
658 }
659 }
660}