Monorepo for Tangled

wip: knotmirror: support xrpc

Signed-off-by: Seongmin Lee <git@boltless.me>

boltless.me 03cdb074 60265346

verified
+422 -33
+1
appview/config/config.go
··· 129 129 Pds PdsConfig `env:",prefix=TANGLED_PDS_"` 130 130 Cloudflare Cloudflare `env:",prefix=TANGLED_CLOUDFLARE_"` 131 131 Label LabelConfig `env:",prefix=TANGLED_LABEL_"` 132 + KnotMirror string `env:"TANGLED_KNOTMIRROR"` 132 133 } 133 134 134 135 func LoadConfig(ctx context.Context) (*Config, error) {
+27 -28
appview/db/profile.go
··· 5 5 "fmt" 6 6 "log" 7 7 "net/url" 8 - "slices" 9 8 "strings" 10 9 "time" 11 10 ··· 485 484 return err 486 485 } 487 486 488 - // ensure all pinned repos are either own repos or collaborating repos 489 - repos, err := GetRepos(e, 0, orm.FilterEq("did", profile.Did)) 490 - if err != nil { 491 - log.Printf("getting repos for %s: %s", profile.Did, err) 492 - } 493 - 494 - collaboratingRepos, err := CollaboratingIn(e, profile.Did) 495 - if err != nil { 496 - log.Printf("getting collaborating repos for %s: %s", profile.Did, err) 497 - } 498 - 499 - var validRepos []syntax.ATURI 500 - for _, r := range repos { 501 - validRepos = append(validRepos, r.RepoAt()) 502 - } 503 - for _, r := range collaboratingRepos { 504 - validRepos = append(validRepos, r.RepoAt()) 505 - } 506 - 507 - for _, pinned := range profile.PinnedRepos { 508 - if pinned == "" { 509 - continue 510 - } 511 - if !slices.Contains(validRepos, pinned) { 512 - return fmt.Errorf("Invalid pinned repo: `%s, does not belong to own or collaborating repos", pinned) 513 - } 514 - } 487 + // // ensure all pinned repos are either own repos or collaborating repos 488 + // repos, err := GetRepos(e, 0, orm.FilterEq("did", profile.Did)) 489 + // if err != nil { 490 + // log.Printf("getting repos for %s: %s", profile.Did, err) 491 + // } 492 + // 493 + // collaboratingRepos, err := CollaboratingIn(e, profile.Did) 494 + // if err != nil { 495 + // log.Printf("getting collaborating repos for %s: %s", profile.Did, err) 496 + // } 497 + // 498 + // var validRepos []syntax.ATURI 499 + // for _, r := range repos { 500 + // validRepos = append(validRepos, r.RepoAt()) 501 + // } 502 + // for _, r := range collaboratingRepos { 503 + // validRepos = append(validRepos, r.RepoAt()) 504 + // } 505 + // 506 + // for _, pinned := range profile.PinnedRepos { 507 + // if pinned == "" { 508 + // continue 509 + // } 510 + // if !slices.Contains(validRepos, pinned) { 511 + // return fmt.Errorf("Invalid pinned repo: `%s, does not belong to own or collaborating repos", pinned) 512 + // } 513 + // } 515 514 516 515 return nil 517 516 }
+49 -1
appview/ingester.go
··· 83 83 err = i.ingestLabelDefinition(e) 84 84 case tangled.LabelOpNSID: 85 85 err = i.ingestLabelOp(e) 86 + case tangled.RepoNSID: 87 + err = i.ingestRepo(ctx, e) 86 88 } 87 89 l = i.Logger.With("nsid", e.Commit.Collection) 88 90 } ··· 93 95 94 96 return nil 95 97 } 98 + } 99 + 100 + func (i *Ingester) ingestRepo(ctx context.Context, e *jmodels.Event) error { 101 + var err error 102 + 103 + l := i.Logger.With("handler", "ingestStar") 104 + l = l.With("nsid", e.Commit.Collection) 105 + 106 + switch e.Commit.Operation { 107 + case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 108 + record := tangled.Repo{} 109 + if err := json.Unmarshal(e.Commit.Record, &record); err != nil { 110 + l.Error("invalid record", "err", err) 111 + return err 112 + } 113 + 114 + repo := &models.Repo{ 115 + Did: e.Did, 116 + Rkey: e.Commit.RKey, 117 + Name: record.Name, 118 + Knot: record.Knot, 119 + Description: "", 120 + } 121 + ddb, _ := i.Db.Execer.(*db.DB) 122 + 123 + tx, err := ddb.Begin() 124 + if err != nil { 125 + return fmt.Errorf("failed to start transaction") 126 + } 127 + defer tx.Rollback() 128 + // tx, err := i.Db.SaveLastTimeUs 129 + 130 + if err := db.AddRepo(tx, repo); err != nil { 131 + return fmt.Errorf("adding repo: %w", err) 132 + } 133 + 134 + err = tx.Commit() 135 + case jmodels.CommitOperationDelete: 136 + } 137 + 138 + if err != nil { 139 + return fmt.Errorf("failed to %s repo record: %w", e.Commit.Operation, err) 140 + } 141 + l.Info("processed repo", "operation", e.Commit.Operation, "rkey", e.Commit.RKey) 142 + 143 + return nil 96 144 } 97 145 98 146 func (i *Ingester) ingestStar(e *jmodels.Event) error { ··· 361 409 362 410 err = db.ValidateProfile(tx, &profile) 363 411 if err != nil { 364 - return fmt.Errorf("invalid profile record") 412 + return fmt.Errorf("invalid profile record: %w", err) 365 413 } 366 414 367 415 err = db.UpsertProfile(tx, &profile)
+5 -3
appview/repo/index.go
··· 255 255 256 256 // buildIndexResponse creates a RepoIndexResponse by combining multiple xrpc calls in parallel 257 257 func (rp *Repo) buildIndexResponse(ctx context.Context, xrpcc *indigoxrpc.Client, repo *models.Repo, ref string) (*types.RepoIndexResponse, error) { 258 - didSlashRepo := fmt.Sprintf("%s/%s", repo.Did, repo.Name) 258 + mclient := &indigoxrpc.Client{Host: rp.config.KnotMirror} 259 259 260 260 // first get branches to determine the ref if not specified 261 - branchesBytes, err := tangled.RepoBranches(ctx, xrpcc, "", 0, didSlashRepo) 261 + branchesBytes, err := tangled.RepoTempListBranches(ctx, mclient, "", 0, repo.RepoAt().String()) 262 262 if err != nil { 263 263 return nil, fmt.Errorf("failed to call repoBranches: %w", err) 264 264 } ··· 302 302 wg.Add(1) 303 303 go func() { 304 304 defer wg.Done() 305 - tagsBytes, err := tangled.RepoTags(ctx, xrpcc, "", 0, didSlashRepo) 305 + tagsBytes, err := tangled.RepoTempListTags(ctx, mclient, "", 0, repo.RepoAt().String()) 306 306 if err != nil { 307 307 errs = errors.Join(errs, fmt.Errorf("failed to call repoTags: %w", err)) 308 308 return ··· 312 312 errs = errors.Join(errs, fmt.Errorf("failed to unmarshal repoTags: %w", err)) 313 313 } 314 314 }() 315 + 316 + didSlashRepo := fmt.Sprintf("%s/%s", repo.Did, repo.Name) 315 317 316 318 // tree/files 317 319 wg.Add(1)
+1
appview/state/state.go
··· 117 117 tangled.RepoIssueCommentNSID, 118 118 tangled.LabelDefinitionNSID, 119 119 tangled.LabelOpNSID, 120 + tangled.RepoNSID, 120 121 }, 121 122 nil, 122 123 tlog.SubLogger(logger, "jetstream"),
+2
knotmirror/config/config.go
··· 8 8 ) 9 9 10 10 type Config struct { 11 + PlcUrl string `env:"MIRROR_PLC_URL, default=https://plc.directory"` 11 12 TapUrl string `env:"MIRROR_TAP_URL, default=http://localhost:2480"` 12 13 DbPath string `env:"MIRROR_DB_PATH, default=mirror.db"` 13 14 KnotUseSSL bool `env:"MIRROR_KNOT_USE_SSL, default=false"` // use SSL for Knot when not scheme is not specified ··· 16 17 GitRepoFetchTimeout time.Duration `env:"MIRROR_GIT_FETCH_TIMEOUT, default=600s"` 17 18 ResyncParallelism int `env:"MIRROR_RESYNC_PARALLELISM, default=5"` 18 19 Slurper SlurperConfig `env:",prefix=MIRROR_SLURPER_"` 20 + Listen string `env:"MIRROR_LISTEN, default=:7000"` 19 21 MetricsListen string `env:"MIRROR_METRICS_LISTEN, default=:7100"` 20 22 AdminListen string `env:"MIRROR_ADMIN_LISTEN, default=:7200"` 21 23 }
+1 -1
knotmirror/git.go
··· 38 38 } 39 39 40 40 func (c *CliGitMirrorClient) Fetch(ctx context.Context, path, url string) error { 41 - cmd := exec.CommandContext(ctx, "git", "-C", path, "fetch", "--prune", url) 41 + cmd := exec.CommandContext(ctx, "git", "-C", path, "fetch", "--prune", "origin") 42 42 if out, err := cmd.CombinedOutput(); err != nil { 43 43 if ctx.Err() != nil { 44 44 return ctx.Err()
+13
knotmirror/knotmirror.go
··· 8 8 "time" 9 9 10 10 "github.com/prometheus/client_golang/prometheus/promhttp" 11 + "tangled.org/core/idresolver" 11 12 "tangled.org/core/knotmirror/config" 12 13 "tangled.org/core/knotmirror/db" 13 14 "tangled.org/core/knotmirror/knotstream" 14 15 "tangled.org/core/knotmirror/models" 16 + "tangled.org/core/knotmirror/xrpc" 15 17 "tangled.org/core/log" 16 18 ) 17 19 ··· 32 34 if err != nil { 33 35 return fmt.Errorf("initializing db: %w", err) 34 36 } 37 + 38 + resolver := idresolver.DefaultResolver(cfg.PlcUrl) 35 39 36 40 res, err := db.ExecContext(ctx, 37 41 `update repos set state = $1 where state = $2`, ··· 47 51 } 48 52 logger.Info(fmt.Sprintf("clearing resyning states: %d records updated", rows)) 49 53 54 + xrpc := xrpc.New(logger, cfg, resolver) 50 55 knotstream := knotstream.NewKnotStream(logger, db, cfg) 51 56 crawler := NewCrawler(logger, db) 52 57 resyncer := NewResyncer(logger, db, cfg) ··· 55 60 // maintain repository list with tap 56 61 // NOTE: this can be removed once we introduce did-for-repo because then we can just listen to KnotStream for #identity events. 57 62 tap := NewTapClient(logger, cfg, db, knotstream) 63 + 64 + // start xrpc server 65 + go func() { 66 + logger.Info("starting xrpc server", "addr", cfg.Listen) 67 + if err := http.ListenAndServe(cfg.Listen, xrpc.Router()); err != nil { 68 + logger.Error("xrpc server failed", "error", err) 69 + } 70 + }() 58 71 59 72 // start metrics endpoint 60 73 go func() {
+1
knotmirror/readme.md
··· 9 9 - [x] cleanup 'resyncing' state on shutdown (or on startup too) 10 10 - [x] better tap reconnecting logic 11 11 - [ ] handle really large repos (maybe shallow-clone first?) 12 + - [ ] handle repository knot changes (change git repo origin) 12 13 13 14 idea: run multiple different resync workers. 4 for long running tasks, 10 for short tasks. on timeout, schedule it for long running task 14 15
+94
knotmirror/xrpc/repo_listBranches.go
··· 1 + package xrpc 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "net/http" 7 + "path/filepath" 8 + "strconv" 9 + 10 + atclient "github.com/bluesky-social/indigo/atproto/client" 11 + "github.com/bluesky-social/indigo/atproto/syntax" 12 + "tangled.org/core/knotserver/git" 13 + "tangled.org/core/types" 14 + ) 15 + 16 + type ListBranches_Output struct { 17 + Cursor *string `json:"cursor,omitempty"` 18 + Branches []types.Branch `json:"branches"` 19 + } 20 + 21 + func (x *Xrpc) ListBranches(w http.ResponseWriter, r *http.Request) { 22 + var ( 23 + repoQuery = r.URL.Query().Get("repo") 24 + limitQuery = r.URL.Query().Get("limit") 25 + cursorQuery = r.URL.Query().Get("cursor") 26 + ) 27 + 28 + repo, err := syntax.ParseATURI(repoQuery) 29 + if err != nil || repo.RecordKey() == "" { 30 + writeJson(w, http.StatusBadRequest, atclient.ErrorBody{Name: "BadRequest", Message: fmt.Sprintf("repo parameter invalid: %s", repoQuery)}) 31 + } 32 + 33 + limit := 50 34 + if limitQuery != "" { 35 + limit, err = strconv.Atoi(limitQuery) 36 + if err != nil || limit < 1 || limit > 1000 { 37 + writeJson(w, http.StatusBadRequest, atclient.ErrorBody{Name: "BadRequest", Message: fmt.Sprintf("limit parameter invalid: %s", limitQuery)}) 38 + } 39 + } 40 + 41 + var cursor int64 42 + if cursorQuery != "" { 43 + cursor, err = strconv.ParseInt(cursorQuery, 10, 64) 44 + if err != nil || cursor < 0 { 45 + writeJson(w, http.StatusBadRequest, atclient.ErrorBody{Name: "BadRequest", Message: fmt.Sprintf("cursor parameter invalid: %s", cursorQuery)}) 46 + } 47 + } 48 + 49 + out, err := x.listBranches(r.Context(), repo, limit, cursor) 50 + if err != nil { 51 + // TODO: better error return 52 + writeJson(w, http.StatusInternalServerError, atclient.ErrorBody{Name: "InternalServerError", Message: "failed to list branches"}) 53 + } 54 + writeJson(w, http.StatusOK, out) 55 + } 56 + 57 + func (x *Xrpc) listBranches(ctx context.Context, repo syntax.ATURI, limit int, cursor int64) (*ListBranches_Output, error) { 58 + repoPath, err := x.makeRepoPath(ctx, repo) 59 + if err != nil { 60 + return nil, fmt.Errorf("failed to resolve repo at-uri: %w", err) 61 + } 62 + 63 + gr, err := git.PlainOpen(repoPath) 64 + if err != nil { 65 + return nil, fmt.Errorf("failed to open git repo: %w", err) 66 + } 67 + 68 + branches, err := gr.Branches(&git.BranchesOptions{ 69 + Limit: limit, 70 + Offset: int(cursor), 71 + }) 72 + if err != nil { 73 + return nil, fmt.Errorf("failed to get git branches: %w", err) 74 + } 75 + 76 + return &ListBranches_Output{ 77 + Branches: branches, 78 + // TODO: include default branch 79 + // TODO: return cursor 80 + }, nil 81 + } 82 + 83 + func (x *Xrpc) makeRepoPath(ctx context.Context, repo syntax.ATURI) (string, error) { 84 + id, err := x.resolver.ResolveIdent(ctx, repo.Authority().String()) 85 + if err != nil { 86 + return "", err 87 + } 88 + 89 + return filepath.Join( 90 + x.cfg.GitRepoBasePath, 91 + id.DID.String(), 92 + repo.RecordKey().String(), 93 + ), nil 94 + }
+82
knotmirror/xrpc/repo_listLanguages.go
··· 1 + package xrpc 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "math" 7 + "net/http" 8 + "time" 9 + 10 + atclient "github.com/bluesky-social/indigo/atproto/client" 11 + "github.com/bluesky-social/indigo/atproto/syntax" 12 + "tangled.org/core/api/tangled" 13 + "tangled.org/core/knotserver/git" 14 + ) 15 + 16 + func (x *Xrpc) ListLanguages(w http.ResponseWriter, r *http.Request) { 17 + var ( 18 + repoQuery = r.URL.Query().Get("repo") 19 + ref = r.URL.Query().Get("ref") 20 + ) 21 + l := x.logger.With("repo", repoQuery, "ref", ref) 22 + 23 + repo, err := syntax.ParseATURI(repoQuery) 24 + if err != nil || repo.RecordKey() == "" { 25 + l.Error("invalid repo at-uri", "err", err) 26 + writeJson(w, http.StatusBadRequest, atclient.ErrorBody{Name: "BadRequest", Message: fmt.Sprintf("repo parameter invalid: %s", repoQuery)}) 27 + } 28 + 29 + repoPath, err := x.makeRepoPath(r.Context(), repo) 30 + if err != nil { 31 + l.Error("invalid repo identifier", "err", err) 32 + // TODO: better error return 33 + writeJson(w, http.StatusInternalServerError, atclient.ErrorBody{Name: "InternalServerError", Message: "invalid repo identifier"}) 34 + } 35 + 36 + gr, err := git.Open(repoPath, ref) 37 + if err != nil { 38 + l.Error("failed to open repository", "err", err) 39 + // TODO: better error return 40 + writeJson(w, http.StatusInternalServerError, atclient.ErrorBody{Name: "InternalServerError", Message: "failed to open repository"}) 41 + } 42 + 43 + ctx, cancel := context.WithTimeout(r.Context(), 1*time.Second) 44 + defer cancel() 45 + 46 + sizes, err := gr.AnalyzeLanguages(ctx) 47 + if err != nil { 48 + l.Error("failed to open repository", "err", err) 49 + // TODO: better error return 50 + writeJson(w, http.StatusInternalServerError, atclient.ErrorBody{Name: "InternalServerError", Message: "failed to analyze languages"}) 51 + } 52 + 53 + langs := sizesToLanguages(sizes) 54 + 55 + writeJson(w, http.StatusOK, &tangled.RepoTempListLanguages_Output{ 56 + Ref: ref, 57 + Languages: langs, 58 + }) 59 + } 60 + 61 + func sizesToLanguages(sizes git.LangBreakdown) []*tangled.RepoTempListLanguages_Language { 62 + var apiLanguages []*tangled.RepoTempListLanguages_Language 63 + var totalSize int64 64 + for _, size := range sizes { 65 + totalSize += size 66 + } 67 + 68 + for name, size := range sizes { 69 + percentagef64 := float64(size) / float64(totalSize) * 100 70 + percentage := math.Round(percentagef64) 71 + 72 + lang := &tangled.RepoTempListLanguages_Language{ 73 + Name: name, 74 + Size: size, 75 + Percentage: int64(percentage), 76 + } 77 + 78 + apiLanguages = append(apiLanguages, lang) 79 + } 80 + 81 + return apiLanguages 82 + }
+96
knotmirror/xrpc/repo_listTags.go
··· 1 + package xrpc 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "net/http" 7 + "strconv" 8 + 9 + atclient "github.com/bluesky-social/indigo/atproto/client" 10 + "github.com/bluesky-social/indigo/atproto/syntax" 11 + "github.com/go-git/go-git/v5/plumbing" 12 + "github.com/go-git/go-git/v5/plumbing/object" 13 + "tangled.org/core/knotserver/git" 14 + "tangled.org/core/types" 15 + ) 16 + 17 + type ListTags_Output struct { 18 + Cursor *string `json:"cursor,omitempty"` 19 + Tags []*types.TagReference `json:"tags"` 20 + } 21 + 22 + func (x *Xrpc) ListTags(w http.ResponseWriter, r *http.Request) { 23 + var ( 24 + repoQuery = r.URL.Query().Get("repo") 25 + limitQuery = r.URL.Query().Get("limit") 26 + cursorQuery = r.URL.Query().Get("cursor") 27 + ) 28 + 29 + repo, err := syntax.ParseATURI(repoQuery) 30 + if err != nil || repo.RecordKey() == "" { 31 + writeJson(w, http.StatusBadRequest, atclient.ErrorBody{Name: "BadRequest", Message: fmt.Sprintf("repo parameter invalid: %s", repoQuery)}) 32 + } 33 + 34 + limit := 50 35 + if limitQuery != "" { 36 + limit, err = strconv.Atoi(limitQuery) 37 + if err != nil || limit < 1 || limit > 1000 { 38 + writeJson(w, http.StatusBadRequest, atclient.ErrorBody{Name: "BadRequest", Message: fmt.Sprintf("limit parameter invalid: %s", limitQuery)}) 39 + } 40 + } 41 + 42 + var cursor int64 43 + if cursorQuery != "" { 44 + cursor, err = strconv.ParseInt(cursorQuery, 10, 64) 45 + if err != nil || cursor < 0 { 46 + writeJson(w, http.StatusBadRequest, atclient.ErrorBody{Name: "BadRequest", Message: fmt.Sprintf("cursor parameter invalid: %s", cursorQuery)}) 47 + } 48 + } 49 + 50 + out, err := x.listTags(r.Context(), repo, limit, cursor) 51 + if err != nil { 52 + // TODO: better error return 53 + writeJson(w, http.StatusInternalServerError, atclient.ErrorBody{Name: "InternalServerError", Message: "failed to list tags"}) 54 + } 55 + writeJson(w, http.StatusOK, out) 56 + } 57 + 58 + func (x *Xrpc) listTags(ctx context.Context, repo syntax.ATURI, limit int, cursor int64) (*ListTags_Output, error) { 59 + repoPath, err := x.makeRepoPath(ctx, repo) 60 + if err != nil { 61 + return nil, fmt.Errorf("failed to resolve repo at-uri: %w", err) 62 + } 63 + 64 + gr, err := git.PlainOpen(repoPath) 65 + if err != nil { 66 + return nil, fmt.Errorf("failed to open git repo: %w", err) 67 + } 68 + 69 + tags, err := gr.Tags(&git.TagsOptions{ 70 + Limit: limit, 71 + Offset: int(cursor), 72 + }) 73 + if err != nil { 74 + return nil, fmt.Errorf("failed to get git tags: %w", err) 75 + } 76 + 77 + rtags := make([]*types.TagReference, len(tags)) 78 + for i, tag := range tags { 79 + var target *object.Tag 80 + if tag.Target != plumbing.ZeroHash { 81 + target = &tag 82 + } 83 + rtags[i] = &types.TagReference{ 84 + Reference: types.Reference{ 85 + Name: tag.Name, 86 + Hash: tag.Hash.String(), 87 + }, 88 + Tag: target, 89 + Message: tag.Message, 90 + } 91 + } 92 + 93 + return &ListTags_Output{ 94 + Tags: rtags, 95 + }, nil 96 + }
+50
knotmirror/xrpc/xrpc.go
··· 1 + package xrpc 2 + 3 + import ( 4 + "encoding/json" 5 + "log/slog" 6 + "net/http" 7 + 8 + "github.com/go-chi/chi/v5" 9 + "tangled.org/core/api/tangled" 10 + "tangled.org/core/idresolver" 11 + "tangled.org/core/knotmirror/config" 12 + ) 13 + 14 + type Xrpc struct { 15 + cfg *config.Config 16 + resolver *idresolver.Resolver 17 + logger *slog.Logger 18 + } 19 + 20 + func New(logger *slog.Logger, cfg *config.Config, resolver *idresolver.Resolver) *Xrpc { 21 + return &Xrpc{ 22 + cfg, 23 + resolver, 24 + logger, 25 + } 26 + } 27 + 28 + func (x *Xrpc) Router() http.Handler { 29 + r := chi.NewRouter() 30 + 31 + r.Route("/xrpc", func(r chi.Router) { 32 + // r.Get("/"+tangled.RepoTempGetBlobNSID, x.GetBlob) 33 + // r.Get("/"+tangled.RepoTempGetBranchNSID, x.GetBranch) 34 + // r.Get("/"+tangled.RepoTempGetCommitNSID, x.GetCommit) 35 + r.Get("/"+tangled.RepoTempListBranchesNSID, x.ListBranches) 36 + r.Get("/"+tangled.RepoTempListLanguagesNSID, x.ListLanguages) 37 + r.Get("/"+tangled.RepoTempListTagsNSID, x.ListTags) 38 + }) 39 + 40 + return r 41 + } 42 + 43 + func writeJson(w http.ResponseWriter, status int, response any) error { 44 + w.Header().Set("Content-Type", "application/json") 45 + w.WriteHeader(status) 46 + if err := json.NewEncoder(w).Encode(response); err != nil { 47 + return err 48 + } 49 + return nil 50 + }