this repo has no description
1package state
2
3import (
4 "context"
5 "fmt"
6 "log"
7 "log/slog"
8 "net/http"
9 "strings"
10 "time"
11
12 comatproto "github.com/bluesky-social/indigo/api/atproto"
13 lexutil "github.com/bluesky-social/indigo/lex/util"
14 securejoin "github.com/cyphar/filepath-securejoin"
15 "github.com/go-chi/chi/v5"
16 "github.com/posthog/posthog-go"
17 "tangled.sh/tangled.sh/core/api/tangled"
18 "tangled.sh/tangled.sh/core/appview"
19 "tangled.sh/tangled.sh/core/appview/cache"
20 "tangled.sh/tangled.sh/core/appview/cache/session"
21 "tangled.sh/tangled.sh/core/appview/config"
22 "tangled.sh/tangled.sh/core/appview/db"
23 "tangled.sh/tangled.sh/core/appview/dns"
24 "tangled.sh/tangled.sh/core/appview/notify"
25 "tangled.sh/tangled.sh/core/appview/oauth"
26 "tangled.sh/tangled.sh/core/appview/pages"
27 posthogService "tangled.sh/tangled.sh/core/appview/posthog"
28 "tangled.sh/tangled.sh/core/appview/reporesolver"
29 "tangled.sh/tangled.sh/core/eventconsumer"
30 "tangled.sh/tangled.sh/core/idresolver"
31 "tangled.sh/tangled.sh/core/jetstream"
32 "tangled.sh/tangled.sh/core/knotclient"
33 tlog "tangled.sh/tangled.sh/core/log"
34 "tangled.sh/tangled.sh/core/rbac"
35 "tangled.sh/tangled.sh/core/tid"
36)
37
38type State struct {
39 db *db.DB
40 notifier notify.Notifier
41 oauth *oauth.OAuth
42 enforcer *rbac.Enforcer
43 pages *pages.Pages
44 sess *session.SessionStore
45 idResolver *idresolver.Resolver
46 posthog posthog.Client
47 jc *jetstream.JetstreamClient
48 config *config.Config
49 repoResolver *reporesolver.RepoResolver
50 cf *dns.Cloudflare
51 knotstream *eventconsumer.Consumer
52 spindlestream *eventconsumer.Consumer
53}
54
55func Make(ctx context.Context, config *config.Config) (*State, error) {
56 d, err := db.Make(config.Core.DbPath)
57 if err != nil {
58 return nil, fmt.Errorf("failed to create db: %w", err)
59 }
60
61 enforcer, err := rbac.NewEnforcer(config.Core.DbPath)
62 if err != nil {
63 return nil, fmt.Errorf("failed to create enforcer: %w", err)
64 }
65
66 pgs := pages.NewPages(config)
67
68 res, err := idresolver.RedisResolver(config.Redis.ToURL())
69 if err != nil {
70 log.Printf("failed to create redis resolver: %v", err)
71 res = idresolver.DefaultResolver()
72 }
73
74 cache := cache.New(config.Redis.Addr)
75 sess := session.New(cache)
76
77 oauth := oauth.NewOAuth(config, sess)
78
79 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint})
80 if err != nil {
81 return nil, fmt.Errorf("failed to create posthog client: %w", err)
82 }
83
84 repoResolver := reporesolver.New(config, enforcer, res, d)
85
86 wrapper := db.DbWrapper{d}
87 jc, err := jetstream.NewJetstreamClient(
88 config.Jetstream.Endpoint,
89 "appview",
90 []string{
91 tangled.GraphFollowNSID,
92 tangled.FeedStarNSID,
93 tangled.PublicKeyNSID,
94 tangled.RepoArtifactNSID,
95 tangled.ActorProfileNSID,
96 tangled.SpindleMemberNSID,
97 tangled.SpindleNSID,
98 },
99 nil,
100 slog.Default(),
101 wrapper,
102 false,
103
104 // in-memory filter is inapplicalble to appview so
105 // we'll never log dids anyway.
106 false,
107 )
108 if err != nil {
109 return nil, fmt.Errorf("failed to create jetstream client: %w", err)
110 }
111
112 ingester := appview.Ingester{
113 Db: wrapper,
114 Enforcer: enforcer,
115 IdResolver: res,
116 Config: config,
117 Logger: tlog.New("ingester"),
118 }
119 err = jc.StartJetstream(ctx, ingester.Ingest())
120 if err != nil {
121 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err)
122 }
123
124 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog)
125 if err != nil {
126 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err)
127 }
128 knotstream.Start(ctx)
129
130 spindlestream, err := Spindlestream(ctx, config, d, enforcer)
131 if err != nil {
132 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err)
133 }
134 spindlestream.Start(ctx)
135
136 var notifiers []notify.Notifier
137 if !config.Core.Dev {
138 notifiers = append(notifiers, posthogService.NewPosthogNotifier(posthog))
139 }
140 notifier := notify.NewMergedNotifier(notifiers...)
141
142 cf, err := dns.NewCloudflare(config)
143 if err != nil {
144 return nil, fmt.Errorf("failed to create Cloudflare client: %w", err)
145 }
146
147 state := &State{
148 d,
149 notifier,
150 oauth,
151 enforcer,
152 pgs,
153 sess,
154 res,
155 posthog,
156 jc,
157 config,
158 repoResolver,
159 cf,
160 knotstream,
161 spindlestream,
162 }
163
164 return state, nil
165}
166
167func (s *State) Timeline(w http.ResponseWriter, r *http.Request) {
168 user := s.oauth.GetUser(r)
169
170 timeline, err := db.MakeTimeline(s.db)
171 if err != nil {
172 log.Println(err)
173 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.")
174 }
175
176 var didsToResolve []string
177 for _, ev := range timeline {
178 if ev.Repo != nil {
179 didsToResolve = append(didsToResolve, ev.Repo.Did)
180 if ev.Source != nil {
181 didsToResolve = append(didsToResolve, ev.Source.Did)
182 }
183 }
184 if ev.Follow != nil {
185 didsToResolve = append(didsToResolve, ev.Follow.UserDid, ev.Follow.SubjectDid)
186 }
187 if ev.Star != nil {
188 didsToResolve = append(didsToResolve, ev.Star.StarredByDid, ev.Star.Repo.Did)
189 }
190 }
191
192 resolvedIds := s.idResolver.ResolveIdents(r.Context(), didsToResolve)
193 didHandleMap := make(map[string]string)
194 for _, identity := range resolvedIds {
195 if !identity.Handle.IsInvalidHandle() {
196 didHandleMap[identity.DID.String()] = fmt.Sprintf("@%s", identity.Handle.String())
197 } else {
198 didHandleMap[identity.DID.String()] = identity.DID.String()
199 }
200 }
201
202 s.pages.Timeline(w, pages.TimelineParams{
203 LoggedInUser: user,
204 Timeline: timeline,
205 DidHandleMap: didHandleMap,
206 })
207
208 return
209}
210
211func (s *State) Keys(w http.ResponseWriter, r *http.Request) {
212 user := chi.URLParam(r, "user")
213 user = strings.TrimPrefix(user, "@")
214
215 if user == "" {
216 w.WriteHeader(http.StatusBadRequest)
217 return
218 }
219
220 id, err := s.idResolver.ResolveIdent(r.Context(), user)
221 if err != nil {
222 w.WriteHeader(http.StatusInternalServerError)
223 return
224 }
225
226 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String())
227 if err != nil {
228 w.WriteHeader(http.StatusNotFound)
229 return
230 }
231
232 if len(pubKeys) == 0 {
233 w.WriteHeader(http.StatusNotFound)
234 return
235 }
236
237 for _, k := range pubKeys {
238 key := strings.TrimRight(k.Key, "\n")
239 w.Write([]byte(fmt.Sprintln(key)))
240 }
241}
242
243func validateRepoName(name string) error {
244 // check for path traversal attempts
245 if name == "." || name == ".." ||
246 strings.Contains(name, "/") || strings.Contains(name, "\\") {
247 return fmt.Errorf("Repository name contains invalid path characters")
248 }
249
250 // check for sequences that could be used for traversal when normalized
251 if strings.Contains(name, "./") || strings.Contains(name, "../") ||
252 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") {
253 return fmt.Errorf("Repository name contains invalid path sequence")
254 }
255
256 // then continue with character validation
257 for _, char := range name {
258 if !((char >= 'a' && char <= 'z') ||
259 (char >= 'A' && char <= 'Z') ||
260 (char >= '0' && char <= '9') ||
261 char == '-' || char == '_' || char == '.') {
262 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores")
263 }
264 }
265
266 // additional check to prevent multiple sequential dots
267 if strings.Contains(name, "..") {
268 return fmt.Errorf("Repository name cannot contain sequential dots")
269 }
270
271 // if all checks pass
272 return nil
273}
274
275func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) {
276 switch r.Method {
277 case http.MethodGet:
278 user := s.oauth.GetUser(r)
279 knots, err := s.enforcer.GetKnotsForUser(user.Did)
280 if err != nil {
281 s.pages.Notice(w, "repo", "Invalid user account.")
282 return
283 }
284
285 s.pages.NewRepo(w, pages.NewRepoParams{
286 LoggedInUser: user,
287 Knots: knots,
288 })
289
290 case http.MethodPost:
291 user := s.oauth.GetUser(r)
292
293 domain := r.FormValue("domain")
294 if domain == "" {
295 s.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
296 return
297 }
298
299 repoName := r.FormValue("name")
300 if repoName == "" {
301 s.pages.Notice(w, "repo", "Repository name cannot be empty.")
302 return
303 }
304
305 if err := validateRepoName(repoName); err != nil {
306 s.pages.Notice(w, "repo", err.Error())
307 return
308 }
309
310 defaultBranch := r.FormValue("branch")
311 if defaultBranch == "" {
312 defaultBranch = "main"
313 }
314
315 description := r.FormValue("description")
316
317 ok, err := s.enforcer.E.Enforce(user.Did, domain, domain, "repo:create")
318 if err != nil || !ok {
319 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
320 return
321 }
322
323 existingRepo, err := db.GetRepo(s.db, user.Did, repoName)
324 if err == nil && existingRepo != nil {
325 s.pages.Notice(w, "repo", fmt.Sprintf("A repo by this name already exists on %s", existingRepo.Knot))
326 return
327 }
328
329 secret, err := db.GetRegistrationKey(s.db, domain)
330 if err != nil {
331 s.pages.Notice(w, "repo", fmt.Sprintf("No registration key found for knot %s.", domain))
332 return
333 }
334
335 client, err := knotclient.NewSignedClient(domain, secret, s.config.Core.Dev)
336 if err != nil {
337 s.pages.Notice(w, "repo", "Failed to connect to knot server.")
338 return
339 }
340
341 rkey := tid.TID()
342 repo := &db.Repo{
343 Did: user.Did,
344 Name: repoName,
345 Knot: domain,
346 Rkey: rkey,
347 Description: description,
348 }
349
350 xrpcClient, err := s.oauth.AuthorizedClient(r)
351 if err != nil {
352 s.pages.Notice(w, "repo", "Failed to write record to PDS.")
353 return
354 }
355
356 createdAt := time.Now().Format(time.RFC3339)
357 atresp, err := xrpcClient.RepoPutRecord(r.Context(), &comatproto.RepoPutRecord_Input{
358 Collection: tangled.RepoNSID,
359 Repo: user.Did,
360 Rkey: rkey,
361 Record: &lexutil.LexiconTypeDecoder{
362 Val: &tangled.Repo{
363 Knot: repo.Knot,
364 Name: repoName,
365 CreatedAt: createdAt,
366 Owner: user.Did,
367 }},
368 })
369 if err != nil {
370 log.Printf("failed to create record: %s", err)
371 s.pages.Notice(w, "repo", "Failed to announce repository creation.")
372 return
373 }
374 log.Println("created repo record: ", atresp.Uri)
375
376 tx, err := s.db.BeginTx(r.Context(), nil)
377 if err != nil {
378 log.Println(err)
379 s.pages.Notice(w, "repo", "Failed to save repository information.")
380 return
381 }
382 defer func() {
383 tx.Rollback()
384 err = s.enforcer.E.LoadPolicy()
385 if err != nil {
386 log.Println("failed to rollback policies")
387 }
388 }()
389
390 resp, err := client.NewRepo(user.Did, repoName, defaultBranch)
391 if err != nil {
392 s.pages.Notice(w, "repo", "Failed to create repository on knot server.")
393 return
394 }
395
396 switch resp.StatusCode {
397 case http.StatusConflict:
398 s.pages.Notice(w, "repo", "A repository with that name already exists.")
399 return
400 case http.StatusInternalServerError:
401 s.pages.Notice(w, "repo", "Failed to create repository on knot. Try again later.")
402 case http.StatusNoContent:
403 // continue
404 }
405
406 repo.AtUri = atresp.Uri
407 err = db.AddRepo(tx, repo)
408 if err != nil {
409 log.Println(err)
410 s.pages.Notice(w, "repo", "Failed to save repository information.")
411 return
412 }
413
414 // acls
415 p, _ := securejoin.SecureJoin(user.Did, repoName)
416 err = s.enforcer.AddRepo(user.Did, domain, p)
417 if err != nil {
418 log.Println(err)
419 s.pages.Notice(w, "repo", "Failed to set up repository permissions.")
420 return
421 }
422
423 err = tx.Commit()
424 if err != nil {
425 log.Println("failed to commit changes", err)
426 http.Error(w, err.Error(), http.StatusInternalServerError)
427 return
428 }
429
430 err = s.enforcer.E.SavePolicy()
431 if err != nil {
432 log.Println("failed to update ACLs", err)
433 http.Error(w, err.Error(), http.StatusInternalServerError)
434 return
435 }
436
437 s.notifier.NewRepo(r.Context(), repo)
438
439 s.pages.HxLocation(w, fmt.Sprintf("/@%s/%s", user.Handle, repoName))
440 return
441 }
442}