Monorepo for Tangled

[WIP] appview: state, ingester, middleware, resolver updates for repo DID

+315 -131
+78 -18
appview/ingester.go
··· 2 3 import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log/slog" 8 "maps" ··· 116 return err 117 } 118 119 - subjectUri, err = syntax.ParseATURI(record.Subject) 120 - if err != nil { 121 - l.Error("invalid record", "err", err) 122 - return err 123 } 124 - err = db.AddStar(i.Db, &models.Star{ 125 - Did: did, 126 - RepoAt: subjectUri, 127 - Rkey: e.Commit.RKey, 128 - }) 129 case jmodels.CommitOperationDelete: 130 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey) 131 } ··· 220 return err 221 } 222 223 - repoAt, err := syntax.ParseATURI(record.Repo) 224 - if err != nil { 225 - return err 226 } 227 - 228 - repo, err := db.GetRepoByAtUri(i.Db, repoAt.String()) 229 - if err != nil { 230 - return err 231 } 232 233 - ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.DidSlashRepo(), "repo:push") 234 if err != nil || !ok { 235 return err 236 } 237 238 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt) 239 if err != nil { 240 createdAt = time.Now() ··· 243 artifact := models.Artifact{ 244 Did: did, 245 Rkey: e.Commit.RKey, 246 - RepoAt: repoAt, 247 Tag: plumbing.Hash(record.Tag), 248 CreatedAt: createdAt, 249 BlobCid: cid.Cid(record.Artifact.Ref), ··· 822 823 issue := models.IssueFromRecord(did, rkey, record) 824 825 if err := i.Validator.ValidateIssue(&issue); err != nil { 826 return fmt.Errorf("failed to validate issue: %w", err) 827 } 828 829 tx, err := ddb.BeginTx(ctx, nil)
··· 2 3 import ( 4 "context" 5 + "database/sql" 6 "encoding/json" 7 + "errors" 8 "fmt" 9 "log/slog" 10 "maps" ··· 118 return err 119 } 120 121 + star := &models.Star{ 122 + Did: did, 123 + Rkey: e.Commit.RKey, 124 + } 125 + 126 + switch { 127 + case record.SubjectDid != nil: 128 + star.SubjectDid = *record.SubjectDid 129 + repo, repoErr := db.GetRepo(i.Db, orm.FilterEq("repo_did", *record.SubjectDid)) 130 + if repoErr == nil { 131 + subjectUri = repo.RepoAt() 132 + star.RepoAt = subjectUri 133 + } 134 + case record.Subject != nil: 135 + subjectUri, err = syntax.ParseATURI(*record.Subject) 136 + if err != nil { 137 + l.Error("invalid record", "err", err) 138 + return err 139 + } 140 + star.RepoAt = subjectUri 141 + repo, repoErr := db.GetRepoByAtUri(i.Db, subjectUri.String()) 142 + if repoErr == nil && repo.RepoDid != "" { 143 + star.SubjectDid = repo.RepoDid 144 + if enqErr := db.EnqueuePdsRewrite(i.Db, did, repo.RepoDid, tangled.FeedStarNSID, e.Commit.RKey, *record.Subject); enqErr != nil { 145 + l.Warn("failed to enqueue PDS rewrite for star", "err", enqErr, "did", did, "repoDid", repo.RepoDid) 146 + } 147 + } 148 + default: 149 + l.Error("star record has neither subject nor subjectDid") 150 + return fmt.Errorf("star record has neither subject nor subjectDid") 151 } 152 + err = db.AddStar(i.Db, star) 153 case jmodels.CommitOperationDelete: 154 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey) 155 } ··· 244 return err 245 } 246 247 + var repo *models.Repo 248 + if record.RepoDid != nil && *record.RepoDid != "" { 249 + repo, err = db.GetRepoByDid(i.Db, *record.RepoDid) 250 + if err != nil && !errors.Is(err, sql.ErrNoRows) { 251 + return fmt.Errorf("failed to look up repo by DID %s: %w", *record.RepoDid, err) 252 + } 253 } 254 + if repo == nil && record.Repo != nil { 255 + repoAt, parseErr := syntax.ParseATURI(*record.Repo) 256 + if parseErr != nil { 257 + return parseErr 258 + } 259 + repo, err = db.GetRepoByAtUri(i.Db, repoAt.String()) 260 + if err != nil { 261 + return err 262 + } 263 + } 264 + if repo == nil { 265 + return fmt.Errorf("artifact record has neither valid repoDid nor repo field") 266 } 267 268 + ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.RepoIdentifier(), "repo:push") 269 if err != nil || !ok { 270 return err 271 } 272 273 + repoDid := repo.RepoDid 274 + if repoDid == "" && record.RepoDid != nil { 275 + repoDid = *record.RepoDid 276 + } 277 + if repoDid != "" && (record.RepoDid == nil || *record.RepoDid == "") && record.Repo != nil { 278 + if enqErr := db.EnqueuePdsRewrite(i.Db, did, repoDid, tangled.RepoArtifactNSID, e.Commit.RKey, *record.Repo); enqErr != nil { 279 + l.Warn("failed to enqueue PDS rewrite for artifact", "err", enqErr, "did", did, "repoDid", repoDid) 280 + } 281 + } 282 + 283 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt) 284 if err != nil { 285 createdAt = time.Now() ··· 288 artifact := models.Artifact{ 289 Did: did, 290 Rkey: e.Commit.RKey, 291 + RepoAt: repo.RepoAt(), 292 + RepoDid: repoDid, 293 Tag: plumbing.Hash(record.Tag), 294 CreatedAt: createdAt, 295 BlobCid: cid.Cid(record.Artifact.Ref), ··· 868 869 issue := models.IssueFromRecord(did, rkey, record) 870 871 + if issue.RepoDid == "" && issue.RepoAt == "" { 872 + return fmt.Errorf("issue record has neither repo nor repoDid") 873 + } 874 + 875 if err := i.Validator.ValidateIssue(&issue); err != nil { 876 return fmt.Errorf("failed to validate issue: %w", err) 877 + } 878 + 879 + if issue.RepoDid == "" && record.Repo != nil { 880 + repo, repoErr := db.GetRepoByAtUri(i.Db, *record.Repo) 881 + if repoErr == nil && repo.RepoDid != "" { 882 + issue.RepoDid = repo.RepoDid 883 + if enqErr := db.EnqueuePdsRewrite(i.Db, did, repo.RepoDid, tangled.RepoIssueNSID, rkey, *record.Repo); enqErr != nil { 884 + l.Warn("failed to enqueue PDS rewrite for issue", "err", enqErr, "did", did, "repoDid", repo.RepoDid) 885 + } 886 + } 887 } 888 889 tx, err := ddb.BeginTx(ctx, nil)
+29 -5
appview/middleware/middleware.go
··· 17 "tangled.org/core/appview/pages" 18 "tangled.org/core/appview/pagination" 19 "tangled.org/core/appview/reporesolver" 20 "tangled.org/core/idresolver" 21 "tangled.org/core/orm" 22 "tangled.org/core/rbac" ··· 161 return 162 } 163 164 - ok, err := mw.enforcer.E.Enforce(actor.Active.Did, f.Knot, f.DidSlashRepo(), requiredPerm) 165 if err != nil || !ok { 166 - log.Printf("%s does not have perms of a %s in repo %s", actor.Active.Did, requiredPerm, f.DidSlashRepo()) 167 http.Error(w, "Forbiden", http.StatusUnauthorized) 168 return 169 } ··· 188 189 id, err := mw.idResolver.ResolveIdent(req.Context(), didOrHandle) 190 if err != nil { 191 - // invalid did or handle 192 log.Printf("failed to resolve did/handle '%s': %s\n", didOrHandle, err) 193 mw.pages.Error404(w) 194 return ··· 226 return 227 } 228 229 ctx := context.WithValue(req.Context(), "repo", repo) 230 next.ServeHTTP(w, req.WithContext(ctx)) 231 }) ··· 334 335 if r.Header.Get("User-Agent") == "Go-http-client/1.1" { 336 if r.URL.Query().Get("go-get") == "1" { 337 html := fmt.Sprintf( 338 `<meta name="go-import" content="tangled.sh/%s git https://tangled.sh/%s"/> 339 <meta name="go-import" content="tangled.org/%s git https://tangled.org/%s"/>`, 340 - fullName, fullName, 341 - fullName, fullName, 342 ) 343 w.Header().Set("Content-Type", "text/html") 344 w.Write([]byte(html))
··· 17 "tangled.org/core/appview/pages" 18 "tangled.org/core/appview/pagination" 19 "tangled.org/core/appview/reporesolver" 20 + "tangled.org/core/appview/state/userutil" 21 "tangled.org/core/idresolver" 22 "tangled.org/core/orm" 23 "tangled.org/core/rbac" ··· 162 return 163 } 164 165 + ok, err := mw.enforcer.E.Enforce(actor.Active.Did, f.Knot, f.RepoIdentifier(), requiredPerm) 166 if err != nil || !ok { 167 + log.Printf("%s does not have perms of a %s in repo %s", actor.Active.Did, requiredPerm, f.RepoIdentifier()) 168 http.Error(w, "Forbiden", http.StatusUnauthorized) 169 return 170 } ··· 189 190 id, err := mw.idResolver.ResolveIdent(req.Context(), didOrHandle) 191 if err != nil { 192 log.Printf("failed to resolve did/handle '%s': %s\n", didOrHandle, err) 193 mw.pages.Error404(w) 194 return ··· 226 return 227 } 228 229 + if repo.RepoDid != "" && req.Context().Value("repoDidCanonical") == nil { 230 + gitPaths := []string{"/info/refs", "/git-upload-pack", "/git-receive-pack", "/git-upload-archive"} 231 + user := chi.URLParam(req, "user") 232 + repoParam := chi.URLParam(req, "repo") 233 + remaining := strings.TrimPrefix(req.URL.Path, "/"+user+"/"+repoParam) 234 + 235 + isGitPath := slices.ContainsFunc(gitPaths, func(p string) bool { 236 + return strings.HasSuffix(remaining, p) 237 + }) 238 + 239 + if !isGitPath && req.URL.Query().Get("go-get") != "1" { 240 + target := "/" + repo.RepoDid + remaining 241 + if req.URL.RawQuery != "" { 242 + target += "?" + req.URL.RawQuery 243 + } 244 + http.Redirect(w, req, target, http.StatusFound) 245 + return 246 + } 247 + } 248 + 249 ctx := context.WithValue(req.Context(), "repo", repo) 250 next.ServeHTTP(w, req.WithContext(ctx)) 251 }) ··· 354 355 if r.Header.Get("User-Agent") == "Go-http-client/1.1" { 356 if r.URL.Query().Get("go-get") == "1" { 357 + modulePath := userutil.FlattenDid(fullName) 358 + if strings.Contains(modulePath, ":") { 359 + modulePath = userutil.FlattenDid(f.Did) + "/" + f.Name 360 + } 361 html := fmt.Sprintf( 362 `<meta name="go-import" content="tangled.sh/%s git https://tangled.sh/%s"/> 363 <meta name="go-import" content="tangled.org/%s git https://tangled.org/%s"/>`, 364 + modulePath, fullName, 365 + modulePath, fullName, 366 ) 367 w.Header().Set("Content-Type", "text/html") 368 w.Write([]byte(html))
+20 -6
appview/reporesolver/resolver.go
··· 30 31 // NOTE: this... should not even be here. the entire package will be removed in future refactor 32 func GetBaseRepoPath(r *http.Request, repo *models.Repo) string { 33 var ( 34 user = chi.URLParam(r, "user") 35 name = chi.URLParam(r, "repo") 36 ) 37 if user == "" || name == "" { 38 - return repo.DidSlashRepo() 39 } 40 return path.Join(user, name) 41 } ··· 71 roles := repoinfo.RolesInRepo{} 72 if user != nil && user.Active != nil { 73 isStarred = db.GetStarStatus(rr.execer, user.Active.Did, repoAt) 74 - roles.Roles = rr.enforcer.GetPermissionsInRepo(user.Active.Did, repo.Knot, repo.DidSlashRepo()) 75 } 76 77 stats := repo.RepoStats 78 if stats == nil { 79 - starCount, err := db.GetStarCount(rr.execer, repoAt) 80 - if err != nil { 81 log.Println("failed to get star count for ", repoAt) 82 } 83 issueCount, err := db.GetIssueCount(rr.execer, repoAt) ··· 98 var sourceRepo *models.Repo 99 var err error 100 if repo.Source != "" { 101 - sourceRepo, err = db.GetRepoByAtUri(rr.execer, repo.Source) 102 if err != nil { 103 - log.Println("failed to get repo by at uri", err) 104 } 105 } 106 ··· 108 // this is basically a models.Repo 109 OwnerDid: ownerId.DID.String(), 110 OwnerHandle: ownerId.Handle.String(), 111 Name: repo.Name, 112 Rkey: repo.Rkey, 113 Description: repo.Description,
··· 30 31 // NOTE: this... should not even be here. the entire package will be removed in future refactor 32 func GetBaseRepoPath(r *http.Request, repo *models.Repo) string { 33 + if repo.RepoDid != "" { 34 + return repo.RepoDid 35 + } 36 var ( 37 user = chi.URLParam(r, "user") 38 name = chi.URLParam(r, "repo") 39 ) 40 if user == "" || name == "" { 41 + return repo.RepoIdentifier() 42 } 43 return path.Join(user, name) 44 } ··· 74 roles := repoinfo.RolesInRepo{} 75 if user != nil && user.Active != nil { 76 isStarred = db.GetStarStatus(rr.execer, user.Active.Did, repoAt) 77 + roles.Roles = rr.enforcer.GetPermissionsInRepo(user.Active.Did, repo.Knot, repo.RepoIdentifier()) 78 } 79 80 stats := repo.RepoStats 81 if stats == nil { 82 + var starCount int 83 + var starErr error 84 + if repo.RepoDid != "" { 85 + starCount, starErr = db.GetStarCountByRepoDid(rr.execer, repo.RepoDid, repoAt) 86 + } else { 87 + starCount, starErr = db.GetStarCount(rr.execer, repoAt) 88 + } 89 + if starErr != nil { 90 log.Println("failed to get star count for ", repoAt) 91 } 92 issueCount, err := db.GetIssueCount(rr.execer, repoAt) ··· 107 var sourceRepo *models.Repo 108 var err error 109 if repo.Source != "" { 110 + if strings.HasPrefix(repo.Source, "did:") { 111 + sourceRepo, err = db.GetRepoByDid(rr.execer, repo.Source) 112 + } else { 113 + sourceRepo, err = db.GetRepoByAtUri(rr.execer, repo.Source) 114 + } 115 if err != nil { 116 + log.Println("failed to get source repo", err) 117 } 118 } 119 ··· 121 // this is basically a models.Repo 122 OwnerDid: ownerId.DID.String(), 123 OwnerHandle: ownerId.Handle.String(), 124 + RepoDid: repo.RepoDid, 125 Name: repo.Name, 126 Rkey: repo.Rkey, 127 Description: repo.Description,
+7 -20
appview/state/git_http.go
··· 12 ) 13 14 func (s *State) InfoRefs(w http.ResponseWriter, r *http.Request) { 15 - user := r.Context().Value("resolvedId").(identity.Identity) 16 repo := r.Context().Value("repo").(*models.Repo) 17 18 scheme := "https" ··· 20 scheme = "http" 21 } 22 23 - targetURL := fmt.Sprintf("%s://%s/%s/%s/info/refs?%s", scheme, repo.Knot, user.DID, repo.Name, r.URL.RawQuery) 24 s.proxyRequest(w, r, targetURL) 25 26 } 27 28 func (s *State) UploadArchive(w http.ResponseWriter, r *http.Request) { 29 - user, ok := r.Context().Value("resolvedId").(identity.Identity) 30 - if !ok { 31 - http.Error(w, "failed to resolve user", http.StatusInternalServerError) 32 - return 33 - } 34 repo := r.Context().Value("repo").(*models.Repo) 35 36 scheme := "https" ··· 38 scheme = "http" 39 } 40 41 - targetURL := fmt.Sprintf("%s://%s/%s/%s/git-upload-archive?%s", scheme, repo.Knot, user.DID, repo.Name, r.URL.RawQuery) 42 s.proxyRequest(w, r, targetURL) 43 } 44 45 func (s *State) UploadPack(w http.ResponseWriter, r *http.Request) { 46 - user, ok := r.Context().Value("resolvedId").(identity.Identity) 47 - if !ok { 48 - http.Error(w, "failed to resolve user", http.StatusInternalServerError) 49 - return 50 - } 51 repo := r.Context().Value("repo").(*models.Repo) 52 53 scheme := "https" ··· 55 scheme = "http" 56 } 57 58 - targetURL := fmt.Sprintf("%s://%s/%s/%s/git-upload-pack?%s", scheme, repo.Knot, user.DID, repo.Name, r.URL.RawQuery) 59 s.proxyRequest(w, r, targetURL) 60 } 61 62 func (s *State) ReceivePack(w http.ResponseWriter, r *http.Request) { 63 - user, ok := r.Context().Value("resolvedId").(identity.Identity) 64 - if !ok { 65 - http.Error(w, "failed to resolve user", http.StatusInternalServerError) 66 - return 67 - } 68 repo := r.Context().Value("repo").(*models.Repo) 69 70 scheme := "https" ··· 72 scheme = "http" 73 } 74 75 - targetURL := fmt.Sprintf("%s://%s/%s/%s/git-receive-pack?%s", scheme, repo.Knot, user.DID, repo.Name, r.URL.RawQuery) 76 s.proxyRequest(w, r, targetURL) 77 } 78 ··· 90 proxyReq.Header = r.Header 91 92 repoOwnerHandle := chi.URLParam(r, "user") 93 proxyReq.Header.Add("x-tangled-repo-owner-handle", repoOwnerHandle) 94 95 // Execute request
··· 12 ) 13 14 func (s *State) InfoRefs(w http.ResponseWriter, r *http.Request) { 15 repo := r.Context().Value("repo").(*models.Repo) 16 17 scheme := "https" ··· 19 scheme = "http" 20 } 21 22 + targetURL := fmt.Sprintf("%s://%s/%s/info/refs?%s", scheme, repo.Knot, repo.RepoIdentifier(), r.URL.RawQuery) 23 s.proxyRequest(w, r, targetURL) 24 25 } 26 27 func (s *State) UploadArchive(w http.ResponseWriter, r *http.Request) { 28 repo := r.Context().Value("repo").(*models.Repo) 29 30 scheme := "https" ··· 32 scheme = "http" 33 } 34 35 + targetURL := fmt.Sprintf("%s://%s/%s/git-upload-archive?%s", scheme, repo.Knot, repo.RepoIdentifier(), r.URL.RawQuery) 36 s.proxyRequest(w, r, targetURL) 37 } 38 39 func (s *State) UploadPack(w http.ResponseWriter, r *http.Request) { 40 repo := r.Context().Value("repo").(*models.Repo) 41 42 scheme := "https" ··· 44 scheme = "http" 45 } 46 47 + targetURL := fmt.Sprintf("%s://%s/%s/git-upload-pack?%s", scheme, repo.Knot, repo.RepoIdentifier(), r.URL.RawQuery) 48 s.proxyRequest(w, r, targetURL) 49 } 50 51 func (s *State) ReceivePack(w http.ResponseWriter, r *http.Request) { 52 repo := r.Context().Value("repo").(*models.Repo) 53 54 scheme := "https" ··· 56 scheme = "http" 57 } 58 59 + targetURL := fmt.Sprintf("%s://%s/%s/git-receive-pack?%s", scheme, repo.Knot, repo.RepoIdentifier(), r.URL.RawQuery) 60 s.proxyRequest(w, r, targetURL) 61 } 62 ··· 74 proxyReq.Header = r.Header 75 76 repoOwnerHandle := chi.URLParam(r, "user") 77 + if id, ok := r.Context().Value("resolvedId").(identity.Identity); ok && !id.Handle.IsInvalidHandle() { 78 + repoOwnerHandle = id.Handle.String() 79 + } 80 proxyReq.Header.Add("x-tangled-repo-owner-handle", repoOwnerHandle) 81 82 // Execute request
+42 -36
appview/state/knotstream.go
··· 2 3 import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "fmt" ··· 86 return err 87 } 88 89 knownKnots, err := enforcer.GetKnotsForUser(record.CommitterDid) 90 if err != nil { 91 return err ··· 95 } 96 97 logger.Info("processing gitRefUpdate event", 98 - "repo_did", record.RepoDid, 99 - "repo_name", record.RepoName, 100 "ref", record.Ref, 101 "old_sha", record.OldSha, 102 "new_sha", record.NewSha) 103 104 - // trigger webhook notifications first (before other ops that might fail) 105 var errWebhook error 106 - repos, err := db.GetRepos( 107 - d, 108 - 0, 109 - orm.FilterEq("did", record.RepoDid), 110 - orm.FilterEq("name", record.RepoName), 111 - ) 112 - if err != nil { 113 - errWebhook = fmt.Errorf("failed to lookup repo for webhooks: %w", err) 114 - } else if len(repos) == 1 { 115 notifier.Push(ctx, &repos[0], record.Ref, record.OldSha, record.NewSha, record.CommitterDid) 116 } 117 ··· 167 168 func updateRepoLanguages(d *db.DB, record tangled.GitRefUpdate) error { 169 if record.Meta == nil || record.Meta.LangBreakdown == nil || record.Meta.LangBreakdown.Inputs == nil { 170 - return fmt.Errorf("empty language data for repo: %s/%s", record.RepoDid, record.RepoName) 171 } 172 173 - repos, err := db.GetRepos( 174 - d, 175 - 0, 176 - orm.FilterEq("did", record.RepoDid), 177 - orm.FilterEq("name", record.RepoName), 178 - ) 179 - if err != nil { 180 - return fmt.Errorf("failed to look for repo in DB (%s/%s): %w", record.RepoDid, record.RepoName, err) 181 } 182 - if len(repos) != 1 { 183 - return fmt.Errorf("incorrect number of repos returned: %d (expected 1)", len(repos)) 184 } 185 - repo := repos[0] 186 187 ref := plumbing.ReferenceName(record.Ref) 188 if !ref.IsBranch() { ··· 197 198 langs = append(langs, models.RepoLanguage{ 199 RepoAt: repo.RepoAt(), 200 Ref: ref.Short(), 201 IsDefaultRef: record.Meta.IsDefaultRef, 202 Language: l.Lang, ··· 235 return fmt.Errorf("empty repo: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 236 } 237 238 - // does this repo have a spindle configured? 239 - repos, err := db.GetRepos( 240 - d, 241 - 0, 242 - orm.FilterEq("did", record.TriggerMetadata.Repo.Did), 243 - orm.FilterEq("name", record.TriggerMetadata.Repo.Repo), 244 - ) 245 - if err != nil { 246 - return fmt.Errorf("failed to look for repo in DB: nsid %s, rkey %s, %w", msg.Nsid, msg.Rkey, err) 247 } 248 - if len(repos) != 1 { 249 - return fmt.Errorf("incorrect number of repos returned: %d (expected 1)", len(repos)) 250 } 251 if repos[0].Spindle == "" { 252 return fmt.Errorf("repo does not have a spindle configured yet: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 253 } ··· 280 return fmt.Errorf("failed to add trigger entry: %w", err) 281 } 282 283 pipeline := models.Pipeline{ 284 Rkey: msg.Rkey, 285 Knot: source.Key(), 286 RepoOwner: syntax.DID(record.TriggerMetadata.Repo.Did), 287 - RepoName: record.TriggerMetadata.Repo.Repo, 288 TriggerId: int(triggerId), 289 Sha: sha, 290 }
··· 2 3 import ( 4 "context" 5 + "database/sql" 6 "encoding/json" 7 "errors" 8 "fmt" ··· 87 return err 88 } 89 90 + if record.RepoDid == nil || *record.RepoDid == "" { 91 + logger.Error("gitRefUpdate missing repoDid, skipping", "owner_did", record.OwnerDid, "repo_name", record.RepoName) 92 + return fmt.Errorf("gitRefUpdate missing repoDid") 93 + } 94 + 95 knownKnots, err := enforcer.GetKnotsForUser(record.CommitterDid) 96 if err != nil { 97 return err ··· 101 } 102 103 logger.Info("processing gitRefUpdate event", 104 + "repo_did", *record.RepoDid, 105 "ref", record.Ref, 106 "old_sha", record.OldSha, 107 "new_sha", record.NewSha) 108 109 var errWebhook error 110 + 111 + repo, lookupErr := db.GetRepoByDid(d, *record.RepoDid) 112 + if lookupErr != nil && !errors.Is(lookupErr, sql.ErrNoRows) { 113 + return fmt.Errorf("failed to look up repo by DID %s: %w", *record.RepoDid, lookupErr) 114 + } 115 + 116 + var repos []models.Repo 117 + if lookupErr == nil { 118 + repos = []models.Repo{*repo} 119 + } 120 + 121 + if errWebhook == nil && len(repos) == 1 { 122 notifier.Push(ctx, &repos[0], record.Ref, record.OldSha, record.NewSha, record.CommitterDid) 123 } 124 ··· 174 175 func updateRepoLanguages(d *db.DB, record tangled.GitRefUpdate) error { 176 if record.Meta == nil || record.Meta.LangBreakdown == nil || record.Meta.LangBreakdown.Inputs == nil { 177 + return fmt.Errorf("empty language data for repo: %s/%s", record.OwnerDid, record.RepoName) 178 } 179 180 + if record.RepoDid == nil || *record.RepoDid == "" { 181 + return fmt.Errorf("gitRefUpdate missing repoDid for language update") 182 } 183 + 184 + r, lookupErr := db.GetRepoByDid(d, *record.RepoDid) 185 + if lookupErr != nil { 186 + return fmt.Errorf("failed to look up repo by DID %s: %w", *record.RepoDid, lookupErr) 187 } 188 + repo := *r 189 190 ref := plumbing.ReferenceName(record.Ref) 191 if !ref.IsBranch() { ··· 200 201 langs = append(langs, models.RepoLanguage{ 202 RepoAt: repo.RepoAt(), 203 + RepoDid: repo.RepoDid, 204 Ref: ref.Short(), 205 IsDefaultRef: record.Meta.IsDefaultRef, 206 Language: l.Lang, ··· 239 return fmt.Errorf("empty repo: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 240 } 241 242 + if record.TriggerMetadata.Repo.RepoDid == nil || *record.TriggerMetadata.Repo.RepoDid == "" { 243 + return fmt.Errorf("pipeline missing repoDid: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 244 } 245 + 246 + repo, lookupErr := db.GetRepoByDid(d, *record.TriggerMetadata.Repo.RepoDid) 247 + if lookupErr != nil { 248 + return fmt.Errorf("failed to look up repo by DID %s: %w", *record.TriggerMetadata.Repo.RepoDid, lookupErr) 249 } 250 + repos := []models.Repo{*repo} 251 if repos[0].Spindle == "" { 252 return fmt.Errorf("repo does not have a spindle configured yet: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 253 } ··· 280 return fmt.Errorf("failed to add trigger entry: %w", err) 281 } 282 283 + repoName := "" 284 + if record.TriggerMetadata.Repo.Repo != nil { 285 + repoName = *record.TriggerMetadata.Repo.Repo 286 + } 287 + 288 pipeline := models.Pipeline{ 289 Rkey: msg.Rkey, 290 Knot: source.Key(), 291 RepoOwner: syntax.DID(record.TriggerMetadata.Repo.Did), 292 + RepoName: repoName, 293 + RepoDid: repos[0].RepoDid, 294 TriggerId: int(triggerId), 295 Sha: sha, 296 }
+28 -2
appview/state/router.go
··· 1 package state 2 3 import ( 4 "net/http" 5 "strings" 6 7 "github.com/go-chi/chi/v5" 8 "tangled.org/core/appview/issues" 9 "tangled.org/core/appview/knots" 10 "tangled.org/core/appview/labels" ··· 45 if len(pathParts) > 0 { 46 firstPart := pathParts[0] 47 48 - // if using a DID or handle, just continue as per usual 49 - if userutil.IsDid(firstPart) || userutil.IsHandle(firstPart) { 50 userRouter.ServeHTTP(w, r) 51 return 52 }
··· 1 package state 2 3 import ( 4 + "context" 5 + "database/sql" 6 + "errors" 7 "net/http" 8 "strings" 9 10 "github.com/go-chi/chi/v5" 11 + "tangled.org/core/appview/db" 12 "tangled.org/core/appview/issues" 13 "tangled.org/core/appview/knots" 14 "tangled.org/core/appview/labels" ··· 49 if len(pathParts) > 0 { 50 firstPart := pathParts[0] 51 52 + if userutil.IsDid(firstPart) { 53 + repo, err := db.GetRepoByDid(s.db, firstPart) 54 + switch { 55 + case err == nil: 56 + remaining := "" 57 + if len(pathParts) > 1 { 58 + remaining = "/" + pathParts[1] 59 + } 60 + rewritten := "/" + repo.Did + "/" + repo.Name + remaining 61 + r2 := r.Clone(r.Context()) 62 + r2.URL.Path = rewritten 63 + r2.URL.RawPath = rewritten 64 + ctx := context.WithValue(r2.Context(), "repoDidCanonical", true) 65 + userRouter.ServeHTTP(w, r2.WithContext(ctx)) 66 + case errors.Is(err, sql.ErrNoRows): 67 + userRouter.ServeHTTP(w, r) 68 + default: 69 + s.logger.Error("db error looking up repo DID", "repoDid", firstPart, "err", err) 70 + http.Error(w, "internal server error", http.StatusInternalServerError) 71 + } 72 + return 73 + } 74 + 75 + if userutil.IsHandle(firstPart) { 76 userRouter.ServeHTTP(w, r) 77 return 78 }
+18 -5
appview/state/star.go
··· 12 "tangled.org/core/appview/db" 13 "tangled.org/core/appview/models" 14 "tangled.org/core/appview/pages" 15 "tangled.org/core/tid" 16 ) 17 ··· 40 case http.MethodPost: 41 createdAt := time.Now().Format(time.RFC3339) 42 rkey := tid.TID() 43 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 44 Collection: tangled.FeedStarNSID, 45 Repo: currentUser.Active.Did, 46 Rkey: rkey, 47 - Record: &lexutil.LexiconTypeDecoder{ 48 - Val: &tangled.FeedStar{ 49 - Subject: subjectUri.String(), 50 - CreatedAt: createdAt, 51 - }}, 52 }) 53 if err != nil { 54 log.Println("failed to create atproto record", err) ··· 60 Did: currentUser.Active.Did, 61 RepoAt: subjectUri, 62 Rkey: rkey, 63 } 64 65 err = db.AddStar(s.db, star)
··· 12 "tangled.org/core/appview/db" 13 "tangled.org/core/appview/models" 14 "tangled.org/core/appview/pages" 15 + "tangled.org/core/orm" 16 "tangled.org/core/tid" 17 ) 18 ··· 41 case http.MethodPost: 42 createdAt := time.Now().Format(time.RFC3339) 43 rkey := tid.TID() 44 + 45 + starRecord := &tangled.FeedStar{ 46 + CreatedAt: createdAt, 47 + } 48 + repo, err := db.GetRepo(s.db, orm.FilterEq("at_uri", subjectUri.String())) 49 + repoHasDid := err == nil && repo.RepoDid != "" 50 + if repoHasDid { 51 + starRecord.SubjectDid = &repo.RepoDid 52 + } else { 53 + s := subjectUri.String() 54 + starRecord.Subject = &s 55 + } 56 + 57 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 58 Collection: tangled.FeedStarNSID, 59 Repo: currentUser.Active.Did, 60 Rkey: rkey, 61 + Record: &lexutil.LexiconTypeDecoder{Val: starRecord}, 62 }) 63 if err != nil { 64 log.Println("failed to create atproto record", err) ··· 70 Did: currentUser.Active.Did, 71 RepoAt: subjectUri, 72 Rkey: rkey, 73 + } 74 + if repoHasDid { 75 + star.SubjectDid = repo.RepoDid 76 } 77 78 err = db.AddStar(s.db, star)
+92 -38
appview/state/state.go
··· 41 "github.com/bluesky-social/indigo/atproto/syntax" 42 lexutil "github.com/bluesky-social/indigo/lex/util" 43 "github.com/bluesky-social/indigo/xrpc" 44 - securejoin "github.com/cyphar/filepath-securejoin" 45 "github.com/go-chi/chi/v5" 46 "github.com/posthog/posthog-go" 47 ) ··· 432 return 433 } 434 435 - // create atproto record for this repo 436 rkey := tid.TID() 437 repo := &models.Repo{ 438 Did: user.Active.Did, 439 Name: repoName, ··· 442 Description: description, 443 Created: time.Now(), 444 Labels: s.config.Label.DefaultLabelDefs, 445 } 446 record := repo.AsRecord() 447 448 atpClient, err := s.oauth.AuthorizedClient(r) 449 if err != nil { 450 l.Info("PDS write failed", "err", err) 451 s.pages.Notice(w, "repo", "Failed to write record to PDS.") 452 return 453 } ··· 462 }) 463 if err != nil { 464 l.Info("PDS write failed", "err", err) 465 s.pages.Notice(w, "repo", "Failed to announce repository creation.") 466 return 467 } ··· 477 return 478 } 479 480 - // The rollback function reverts a few things on failure: 481 - // - the pending txn 482 - // - the ACLs 483 - // - the atproto record created 484 rollback := func() { 485 err1 := tx.Rollback() 486 err2 := s.enforcer.E.LoadPolicy() 487 err3 := rollbackRecord(context.Background(), aturi, atpClient) 488 489 - // ignore txn complete errors, this is okay 490 if errors.Is(err1, sql.ErrTxDone) { 491 err1 = nil 492 } 493 494 if errs := errors.Join(err1, err2, err3); errs != nil { 495 l.Error("failed to rollback changes", "errs", errs) 496 - return 497 } 498 - } 499 - defer rollback() 500 501 - client, err := s.oauth.ServiceClient( 502 - r, 503 - oauth.WithService(domain), 504 - oauth.WithLxm(tangled.RepoCreateNSID), 505 - oauth.WithDev(s.config.Core.Dev), 506 - ) 507 - if err != nil { 508 - l.Error("service auth failed", "err", err) 509 - s.pages.Notice(w, "repo", "Failed to reach PDS.") 510 - return 511 - } 512 - 513 - xe := tangled.RepoCreate( 514 - r.Context(), 515 - client, 516 - &tangled.RepoCreate_Input{ 517 - Rkey: rkey, 518 - }, 519 - ) 520 - if err := xrpcclient.HandleXrpcErr(xe); err != nil { 521 - l.Error("xrpc error", "xe", xe) 522 - s.pages.Notice(w, "repo", err.Error()) 523 - return 524 } 525 526 err = db.AddRepo(tx, repo) 527 if err != nil { ··· 530 return 531 } 532 533 - // acls 534 - p, _ := securejoin.SecureJoin(user.Active.Did, repoName) 535 - err = s.enforcer.AddRepo(user.Active.Did, domain, p) 536 if err != nil { 537 l.Error("acl setup failed", "err", err) 538 s.pages.Notice(w, "repo", "Failed to set up repository permissions.") ··· 553 return 554 } 555 556 - // reset the ATURI because the transaction completed successfully 557 aturi = "" 558 559 s.notifier.NewRepo(r.Context(), repo) 560 - s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Active.Did, repoName)) 561 } 562 } 563
··· 41 "github.com/bluesky-social/indigo/atproto/syntax" 42 lexutil "github.com/bluesky-social/indigo/lex/util" 43 "github.com/bluesky-social/indigo/xrpc" 44 + 45 "github.com/go-chi/chi/v5" 46 "github.com/posthog/posthog-go" 47 ) ··· 432 return 433 } 434 435 rkey := tid.TID() 436 + 437 + client, err := s.oauth.ServiceClient( 438 + r, 439 + oauth.WithService(domain), 440 + oauth.WithLxm(tangled.RepoCreateNSID), 441 + oauth.WithDev(s.config.Core.Dev), 442 + ) 443 + if err != nil { 444 + l.Error("service auth failed", "err", err) 445 + s.pages.Notice(w, "repo", "Failed to reach knot server.") 446 + return 447 + } 448 + 449 + input := &tangled.RepoCreate_Input{ 450 + Rkey: rkey, 451 + Name: repoName, 452 + DefaultBranch: &defaultBranch, 453 + } 454 + if rd := strings.TrimSpace(r.FormValue("repo_did")); rd != "" { 455 + input.RepoDid = &rd 456 + } 457 + 458 + createResp, xe := tangled.RepoCreate( 459 + r.Context(), 460 + client, 461 + input, 462 + ) 463 + if err := xrpcclient.HandleXrpcErr(xe); err != nil { 464 + l.Error("xrpc error", "xe", xe) 465 + s.pages.Notice(w, "repo", err.Error()) 466 + return 467 + } 468 + 469 + var repoDid string 470 + if createResp != nil && createResp.RepoDid != nil { 471 + repoDid = *createResp.RepoDid 472 + } 473 + if repoDid == "" { 474 + l.Error("knot returned empty repo DID") 475 + s.pages.Notice(w, "repo", "Knot failed to mint a repo DID. The knot may need to be upgraded.") 476 + return 477 + } 478 + 479 repo := &models.Repo{ 480 Did: user.Active.Did, 481 Name: repoName, ··· 484 Description: description, 485 Created: time.Now(), 486 Labels: s.config.Label.DefaultLabelDefs, 487 + RepoDid: repoDid, 488 } 489 record := repo.AsRecord() 490 491 + cleanupKnot := func() { 492 + go func() { 493 + delays := []time.Duration{0, 2 * time.Second, 5 * time.Second} 494 + for attempt, delay := range delays { 495 + time.Sleep(delay) 496 + deleteClient, dErr := s.oauth.ServiceClient( 497 + r, 498 + oauth.WithService(domain), 499 + oauth.WithLxm(tangled.RepoDeleteNSID), 500 + oauth.WithDev(s.config.Core.Dev), 501 + ) 502 + if dErr != nil { 503 + l.Error("failed to create delete client for knot cleanup", "attempt", attempt+1, "err", dErr) 504 + continue 505 + } 506 + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) 507 + if dErr := tangled.RepoDelete(ctx, deleteClient, &tangled.RepoDelete_Input{ 508 + Did: user.Active.Did, 509 + Name: repoName, 510 + Rkey: rkey, 511 + }); dErr != nil { 512 + cancel() 513 + l.Error("failed to clean up repo on knot after rollback", "attempt", attempt+1, "err", dErr) 514 + continue 515 + } 516 + cancel() 517 + l.Info("successfully cleaned up repo on knot after rollback", "attempt", attempt+1) 518 + return 519 + } 520 + l.Error("exhausted retries for knot cleanup, repo may be orphaned", 521 + "did", user.Active.Did, "repo", repoName, "knot", domain) 522 + }() 523 + } 524 + 525 atpClient, err := s.oauth.AuthorizedClient(r) 526 if err != nil { 527 l.Info("PDS write failed", "err", err) 528 + cleanupKnot() 529 s.pages.Notice(w, "repo", "Failed to write record to PDS.") 530 return 531 } ··· 540 }) 541 if err != nil { 542 l.Info("PDS write failed", "err", err) 543 + cleanupKnot() 544 s.pages.Notice(w, "repo", "Failed to announce repository creation.") 545 return 546 } ··· 556 return 557 } 558 559 rollback := func() { 560 err1 := tx.Rollback() 561 err2 := s.enforcer.E.LoadPolicy() 562 err3 := rollbackRecord(context.Background(), aturi, atpClient) 563 564 if errors.Is(err1, sql.ErrTxDone) { 565 err1 = nil 566 } 567 568 if errs := errors.Join(err1, err2, err3); errs != nil { 569 l.Error("failed to rollback changes", "errs", errs) 570 } 571 572 + if aturi != "" { 573 + cleanupKnot() 574 + } 575 } 576 + defer rollback() 577 578 err = db.AddRepo(tx, repo) 579 if err != nil { ··· 582 return 583 } 584 585 + rbacPath := repo.RepoIdentifier() 586 + err = s.enforcer.AddRepo(user.Active.Did, domain, rbacPath) 587 if err != nil { 588 l.Error("acl setup failed", "err", err) 589 s.pages.Notice(w, "repo", "Failed to set up repository permissions.") ··· 604 return 605 } 606 607 aturi = "" 608 609 s.notifier.NewRepo(r.Context(), repo) 610 + if repoDid != "" { 611 + s.pages.HxLocation(w, fmt.Sprintf("/%s", repoDid)) 612 + } else { 613 + s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Active.Did, repoName)) 614 + } 615 } 616 } 617
+1 -1
appview/validator/label.go
··· 109 // validate permissions: only collaborators can apply labels currently 110 // 111 // TODO: introduce a repo:triage permission 112 - ok, err := v.enforcer.IsPushAllowed(labelOp.Did, repo.Knot, repo.DidSlashRepo()) 113 if err != nil { 114 return fmt.Errorf("failed to enforce permissions: %w", err) 115 }
··· 109 // validate permissions: only collaborators can apply labels currently 110 // 111 // TODO: introduce a repo:triage permission 112 + ok, err := v.enforcer.IsPushAllowed(labelOp.Did, repo.Knot, repo.RepoIdentifier()) 113 if err != nil { 114 return fmt.Errorf("failed to enforce permissions: %w", err) 115 }