this repo has no description
1package state
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log"
9 "log/slog"
10 "net/http"
11 "strings"
12 "time"
13
14 comatproto "github.com/bluesky-social/indigo/api/atproto"
15 "github.com/bluesky-social/indigo/atproto/syntax"
16 lexutil "github.com/bluesky-social/indigo/lex/util"
17 securejoin "github.com/cyphar/filepath-securejoin"
18 "github.com/go-chi/chi/v5"
19 "github.com/posthog/posthog-go"
20 "tangled.sh/tangled.sh/core/api/tangled"
21 "tangled.sh/tangled.sh/core/appview"
22 "tangled.sh/tangled.sh/core/appview/cache"
23 "tangled.sh/tangled.sh/core/appview/cache/session"
24 "tangled.sh/tangled.sh/core/appview/config"
25 "tangled.sh/tangled.sh/core/appview/db"
26 "tangled.sh/tangled.sh/core/appview/notify"
27 "tangled.sh/tangled.sh/core/appview/oauth"
28 "tangled.sh/tangled.sh/core/appview/pages"
29 posthogService "tangled.sh/tangled.sh/core/appview/posthog"
30 "tangled.sh/tangled.sh/core/appview/reporesolver"
31 xrpcclient "tangled.sh/tangled.sh/core/appview/xrpcclient"
32 "tangled.sh/tangled.sh/core/eventconsumer"
33 "tangled.sh/tangled.sh/core/idresolver"
34 "tangled.sh/tangled.sh/core/jetstream"
35 tlog "tangled.sh/tangled.sh/core/log"
36 "tangled.sh/tangled.sh/core/rbac"
37 "tangled.sh/tangled.sh/core/tid"
38 // xrpcerr "tangled.sh/tangled.sh/core/xrpc/errors"
39)
40
41type State struct {
42 db *db.DB
43 notifier notify.Notifier
44 oauth *oauth.OAuth
45 enforcer *rbac.Enforcer
46 pages *pages.Pages
47 sess *session.SessionStore
48 idResolver *idresolver.Resolver
49 posthog posthog.Client
50 jc *jetstream.JetstreamClient
51 config *config.Config
52 repoResolver *reporesolver.RepoResolver
53 knotstream *eventconsumer.Consumer
54 spindlestream *eventconsumer.Consumer
55 logger *slog.Logger
56}
57
58func Make(ctx context.Context, config *config.Config) (*State, error) {
59 d, err := db.Make(config.Core.DbPath)
60 if err != nil {
61 return nil, fmt.Errorf("failed to create db: %w", err)
62 }
63
64 enforcer, err := rbac.NewEnforcer(config.Core.DbPath)
65 if err != nil {
66 return nil, fmt.Errorf("failed to create enforcer: %w", err)
67 }
68
69 res, err := idresolver.RedisResolver(config.Redis.ToURL())
70 if err != nil {
71 log.Printf("failed to create redis resolver: %v", err)
72 res = idresolver.DefaultResolver()
73 }
74
75 pgs := pages.NewPages(config, res)
76
77 cache := cache.New(config.Redis.Addr)
78 sess := session.New(cache)
79
80 oauth := oauth.NewOAuth(config, sess)
81
82 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint})
83 if err != nil {
84 return nil, fmt.Errorf("failed to create posthog client: %w", err)
85 }
86
87 repoResolver := reporesolver.New(config, enforcer, res, d)
88
89 wrapper := db.DbWrapper{d}
90 jc, err := jetstream.NewJetstreamClient(
91 config.Jetstream.Endpoint,
92 "appview",
93 []string{
94 tangled.GraphFollowNSID,
95 tangled.FeedStarNSID,
96 tangled.PublicKeyNSID,
97 tangled.RepoArtifactNSID,
98 tangled.ActorProfileNSID,
99 tangled.SpindleMemberNSID,
100 tangled.SpindleNSID,
101 tangled.StringNSID,
102 tangled.RepoIssueNSID,
103 },
104 nil,
105 slog.Default(),
106 wrapper,
107 false,
108
109 // in-memory filter is inapplicalble to appview so
110 // we'll never log dids anyway.
111 false,
112 )
113 if err != nil {
114 return nil, fmt.Errorf("failed to create jetstream client: %w", err)
115 }
116
117 ingester := appview.Ingester{
118 Db: wrapper,
119 Enforcer: enforcer,
120 IdResolver: res,
121 Config: config,
122 Logger: tlog.New("ingester"),
123 }
124 err = jc.StartJetstream(ctx, ingester.Ingest())
125 if err != nil {
126 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err)
127 }
128
129 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog)
130 if err != nil {
131 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err)
132 }
133 knotstream.Start(ctx)
134
135 spindlestream, err := Spindlestream(ctx, config, d, enforcer)
136 if err != nil {
137 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err)
138 }
139 spindlestream.Start(ctx)
140
141 var notifiers []notify.Notifier
142 if !config.Core.Dev {
143 notifiers = append(notifiers, posthogService.NewPosthogNotifier(posthog))
144 }
145 notifier := notify.NewMergedNotifier(notifiers...)
146
147 state := &State{
148 d,
149 notifier,
150 oauth,
151 enforcer,
152 pgs,
153 sess,
154 res,
155 posthog,
156 jc,
157 config,
158 repoResolver,
159 knotstream,
160 spindlestream,
161 slog.Default(),
162 }
163
164 return state, nil
165}
166
167func (s *State) Favicon(w http.ResponseWriter, r *http.Request) {
168 w.Header().Set("Content-Type", "image/svg+xml")
169 w.Header().Set("Cache-Control", "public, max-age=31536000") // one year
170 w.Header().Set("ETag", `"favicon-svg-v1"`)
171
172 if match := r.Header.Get("If-None-Match"); match == `"favicon-svg-v1"` {
173 w.WriteHeader(http.StatusNotModified)
174 return
175 }
176
177 s.pages.Favicon(w)
178}
179
180func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) {
181 user := s.oauth.GetUser(r)
182 s.pages.TermsOfService(w, pages.TermsOfServiceParams{
183 LoggedInUser: user,
184 })
185}
186
187func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) {
188 user := s.oauth.GetUser(r)
189 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{
190 LoggedInUser: user,
191 })
192}
193
194func (s *State) Timeline(w http.ResponseWriter, r *http.Request) {
195 user := s.oauth.GetUser(r)
196
197 timeline, err := db.MakeTimeline(s.db)
198 if err != nil {
199 log.Println(err)
200 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.")
201 }
202
203 repos, err := db.GetTopStarredReposLastWeek(s.db)
204 if err != nil {
205 log.Println(err)
206 s.pages.Notice(w, "topstarredrepos", "Unable to load.")
207 return
208 }
209
210 s.pages.Timeline(w, pages.TimelineParams{
211 LoggedInUser: user,
212 Timeline: timeline,
213 Repos: repos,
214 })
215}
216
217func (s *State) Keys(w http.ResponseWriter, r *http.Request) {
218 user := chi.URLParam(r, "user")
219 user = strings.TrimPrefix(user, "@")
220
221 if user == "" {
222 w.WriteHeader(http.StatusBadRequest)
223 return
224 }
225
226 id, err := s.idResolver.ResolveIdent(r.Context(), user)
227 if err != nil {
228 w.WriteHeader(http.StatusInternalServerError)
229 return
230 }
231
232 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String())
233 if err != nil {
234 w.WriteHeader(http.StatusNotFound)
235 return
236 }
237
238 if len(pubKeys) == 0 {
239 w.WriteHeader(http.StatusNotFound)
240 return
241 }
242
243 for _, k := range pubKeys {
244 key := strings.TrimRight(k.Key, "\n")
245 w.Write([]byte(fmt.Sprintln(key)))
246 }
247}
248
249func validateRepoName(name string) error {
250 // check for path traversal attempts
251 if name == "." || name == ".." ||
252 strings.Contains(name, "/") || strings.Contains(name, "\\") {
253 return fmt.Errorf("Repository name contains invalid path characters")
254 }
255
256 // check for sequences that could be used for traversal when normalized
257 if strings.Contains(name, "./") || strings.Contains(name, "../") ||
258 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") {
259 return fmt.Errorf("Repository name contains invalid path sequence")
260 }
261
262 // then continue with character validation
263 for _, char := range name {
264 if !((char >= 'a' && char <= 'z') ||
265 (char >= 'A' && char <= 'Z') ||
266 (char >= '0' && char <= '9') ||
267 char == '-' || char == '_' || char == '.') {
268 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores")
269 }
270 }
271
272 // additional check to prevent multiple sequential dots
273 if strings.Contains(name, "..") {
274 return fmt.Errorf("Repository name cannot contain sequential dots")
275 }
276
277 // if all checks pass
278 return nil
279}
280
281func stripGitExt(name string) string {
282 return strings.TrimSuffix(name, ".git")
283}
284
285func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) {
286 switch r.Method {
287 case http.MethodGet:
288 user := s.oauth.GetUser(r)
289 knots, err := s.enforcer.GetKnotsForUser(user.Did)
290 if err != nil {
291 s.pages.Notice(w, "repo", "Invalid user account.")
292 return
293 }
294
295 s.pages.NewRepo(w, pages.NewRepoParams{
296 LoggedInUser: user,
297 Knots: knots,
298 })
299
300 case http.MethodPost:
301 l := s.logger.With("handler", "NewRepo")
302
303 user := s.oauth.GetUser(r)
304 l = l.With("did", user.Did)
305 l = l.With("handle", user.Handle)
306
307 // form validation
308 domain := r.FormValue("domain")
309 if domain == "" {
310 s.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
311 return
312 }
313 l = l.With("knot", domain)
314
315 repoName := r.FormValue("name")
316 if repoName == "" {
317 s.pages.Notice(w, "repo", "Repository name cannot be empty.")
318 return
319 }
320
321 if err := validateRepoName(repoName); err != nil {
322 s.pages.Notice(w, "repo", err.Error())
323 return
324 }
325 repoName = stripGitExt(repoName)
326 l = l.With("repoName", repoName)
327
328 defaultBranch := r.FormValue("branch")
329 if defaultBranch == "" {
330 defaultBranch = "main"
331 }
332 l = l.With("defaultBranch", defaultBranch)
333
334 description := r.FormValue("description")
335
336 // ACL validation
337 ok, err := s.enforcer.E.Enforce(user.Did, domain, domain, "repo:create")
338 if err != nil || !ok {
339 l.Info("unauthorized")
340 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
341 return
342 }
343
344 // Check for existing repos
345 existingRepo, err := db.GetRepo(s.db, user.Did, repoName)
346 if err == nil && existingRepo != nil {
347 l.Info("repo exists")
348 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot))
349 return
350 }
351
352 // create atproto record for this repo
353 rkey := tid.TID()
354 repo := &db.Repo{
355 Did: user.Did,
356 Name: repoName,
357 Knot: domain,
358 Rkey: rkey,
359 Description: description,
360 }
361
362 xrpcClient, err := s.oauth.AuthorizedClient(r)
363 if err != nil {
364 l.Info("PDS write failed", "err", err)
365 s.pages.Notice(w, "repo", "Failed to write record to PDS.")
366 return
367 }
368
369 createdAt := time.Now().Format(time.RFC3339)
370 atresp, err := xrpcClient.RepoPutRecord(r.Context(), &comatproto.RepoPutRecord_Input{
371 Collection: tangled.RepoNSID,
372 Repo: user.Did,
373 Rkey: rkey,
374 Record: &lexutil.LexiconTypeDecoder{
375 Val: &tangled.Repo{
376 Knot: repo.Knot,
377 Name: repoName,
378 CreatedAt: createdAt,
379 Owner: user.Did,
380 }},
381 })
382 if err != nil {
383 l.Info("PDS write failed", "err", err)
384 s.pages.Notice(w, "repo", "Failed to announce repository creation.")
385 return
386 }
387
388 aturi := atresp.Uri
389 l = l.With("aturi", aturi)
390 l.Info("wrote to PDS")
391
392 tx, err := s.db.BeginTx(r.Context(), nil)
393 if err != nil {
394 l.Info("txn failed", "err", err)
395 s.pages.Notice(w, "repo", "Failed to save repository information.")
396 return
397 }
398
399 // The rollback function reverts a few things on failure:
400 // - the pending txn
401 // - the ACLs
402 // - the atproto record created
403 rollback := func() {
404 err1 := tx.Rollback()
405 err2 := s.enforcer.E.LoadPolicy()
406 err3 := rollbackRecord(context.Background(), aturi, xrpcClient)
407
408 // ignore txn complete errors, this is okay
409 if errors.Is(err1, sql.ErrTxDone) {
410 err1 = nil
411 }
412
413 if errs := errors.Join(err1, err2, err3); errs != nil {
414 l.Error("failed to rollback changes", "errs", errs)
415 return
416 }
417 }
418 defer rollback()
419
420 client, err := s.oauth.ServiceClient(
421 r,
422 oauth.WithService(domain),
423 oauth.WithLxm(tangled.RepoCreateNSID),
424 oauth.WithDev(s.config.Core.Dev),
425 )
426 if err != nil {
427 l.Error("service auth failed", "err", err)
428 s.pages.Notice(w, "repo", "Failed to reach PDS.")
429 return
430 }
431
432 xe := tangled.RepoCreate(
433 r.Context(),
434 client,
435 &tangled.RepoCreate_Input{
436 Rkey: rkey,
437 },
438 )
439 if err := xrpcclient.HandleXrpcErr(xe); err != nil {
440 l.Error("xrpc error", "xe", xe)
441 s.pages.Notice(w, "repo", err.Error())
442 return
443 }
444
445 err = db.AddRepo(tx, repo)
446 if err != nil {
447 l.Error("db write failed", "err", err)
448 s.pages.Notice(w, "repo", "Failed to save repository information.")
449 return
450 }
451
452 // acls
453 p, _ := securejoin.SecureJoin(user.Did, repoName)
454 err = s.enforcer.AddRepo(user.Did, domain, p)
455 if err != nil {
456 l.Error("acl setup failed", "err", err)
457 s.pages.Notice(w, "repo", "Failed to set up repository permissions.")
458 return
459 }
460
461 err = tx.Commit()
462 if err != nil {
463 l.Error("txn commit failed", "err", err)
464 http.Error(w, err.Error(), http.StatusInternalServerError)
465 return
466 }
467
468 err = s.enforcer.E.SavePolicy()
469 if err != nil {
470 l.Error("acl save failed", "err", err)
471 http.Error(w, err.Error(), http.StatusInternalServerError)
472 return
473 }
474
475 // reset the ATURI because the transaction completed successfully
476 aturi = ""
477
478 s.notifier.NewRepo(r.Context(), repo)
479 s.pages.HxLocation(w, fmt.Sprintf("/@%s/%s", user.Handle, repoName))
480 }
481}
482
483// this is used to rollback changes made to the PDS
484//
485// it is a no-op if the provided ATURI is empty
486func rollbackRecord(ctx context.Context, aturi string, xrpcc *xrpcclient.Client) error {
487 if aturi == "" {
488 return nil
489 }
490
491 parsed := syntax.ATURI(aturi)
492
493 collection := parsed.Collection().String()
494 repo := parsed.Authority().String()
495 rkey := parsed.RecordKey().String()
496
497 _, err := xrpcc.RepoDeleteRecord(ctx, &comatproto.RepoDeleteRecord_Input{
498 Collection: collection,
499 Repo: repo,
500 Rkey: rkey,
501 })
502 return err
503}