Monorepo for Tangled

wip

Signed-off-by: Seongmin Lee <git@boltless.me>

boltless.me b0daf11e 6e7dfa98

verified
+160 -45
+3 -2
knotmirror/config/config.go
··· 10 type Config struct { 11 TapUrl string `env:"MIRROR_TAP_URL, default=http://localhost:2480"` 12 DbPath string `env:"MIRROR_DB_PATH, default=mirror.db"` 13 - KnotUseSSL bool `env:"MIRROR_KNOT_USE_SSL, default=false"` // use SSL for Knot when not schema is not specified 14 GitRepoBasePath string `env:"MIRROR_GIT_BASEPATH, default=repos"` 15 GitRepoFetchTimeout time.Duration `env:"MIRROR_GIT_FETCH_TIMEOUT, default=600s"` 16 ResyncParallelism int `env:"MIRROR_RESYNC_PARALLELISM, default=5"` ··· 21 22 type SlurperConfig struct { 23 PersistCursorPeriod time.Duration `env:"PERSIST_CURSOR_PERIOD, default=4s"` 24 - ConcurrencyPerHost int `env:"CONCURRENCY, default=40"` 25 } 26 27 func Load(ctx context.Context) (*Config, error) {
··· 10 type Config struct { 11 TapUrl string `env:"MIRROR_TAP_URL, default=http://localhost:2480"` 12 DbPath string `env:"MIRROR_DB_PATH, default=mirror.db"` 13 + KnotUseSSL bool `env:"MIRROR_KNOT_USE_SSL, default=false"` // use SSL for Knot when not scheme is not specified 14 + KnotSSRF bool `env:"MIRROR_KNOT_SSRF, default=false"` 15 GitRepoBasePath string `env:"MIRROR_GIT_BASEPATH, default=repos"` 16 GitRepoFetchTimeout time.Duration `env:"MIRROR_GIT_FETCH_TIMEOUT, default=600s"` 17 ResyncParallelism int `env:"MIRROR_RESYNC_PARALLELISM, default=5"` ··· 22 23 type SlurperConfig struct { 24 PersistCursorPeriod time.Duration `env:"PERSIST_CURSOR_PERIOD, default=4s"` 25 + ConcurrencyPerHost int `env:"CONCURRENCY, default=4"` 26 } 27 28 func Load(ctx context.Context) (*Config, error) {
+3
knotmirror/db/repos.go
··· 111 &repo.RetryCount, 112 &repo.RetryAfter, 113 ); err != nil { 114 return nil, fmt.Errorf("querying repo: %w", err) 115 } 116 return &repo, nil
··· 111 &repo.RetryCount, 112 &repo.RetryAfter, 113 ); err != nil { 114 + if errors.Is(err, sql.ErrNoRows) { 115 + return nil, nil 116 + } 117 return nil, fmt.Errorf("querying repo: %w", err) 118 } 119 return &repo, nil
+68
knotmirror/git.go
···
··· 1 + package knotmirror 2 + 3 + import ( 4 + "context" 5 + "errors" 6 + "fmt" 7 + "os/exec" 8 + 9 + "github.com/go-git/go-git/v5" 10 + gitconfig "github.com/go-git/go-git/v5/config" 11 + "github.com/go-git/go-git/v5/plumbing/transport" 12 + ) 13 + 14 + type GitMirrorClient interface { 15 + Clone(ctx context.Context, path, url string) error 16 + Fetch(ctx context.Context, path, url string) error 17 + } 18 + 19 + type CliGitMirrorClient struct{} 20 + 21 + var _ GitMirrorClient = new(CliGitMirrorClient) 22 + 23 + func (c *CliGitMirrorClient) Clone(ctx context.Context, path, url string) error { 24 + cmd := exec.Command("git", "clone", "--mirror", url, path) 25 + if out, err := cmd.CombinedOutput(); err != nil { 26 + return fmt.Errorf("cloning repo: %w\n%s", err, string(out)) 27 + } 28 + return nil 29 + } 30 + 31 + func (c *CliGitMirrorClient) Fetch(ctx context.Context, path, url string) error { 32 + cmd := exec.Command("git", "-C", path, "fetch", "--prune", url) 33 + if out, err := cmd.CombinedOutput(); err != nil { 34 + return fmt.Errorf("fetching repo: %w\n%s", err, string(out)) 35 + } 36 + return nil 37 + } 38 + 39 + type GoGitMirrorClient struct{} 40 + 41 + var _ GitMirrorClient = new(GoGitMirrorClient) 42 + 43 + func (c *GoGitMirrorClient) Clone(ctx context.Context, path string, url string) error { 44 + _, err := git.PlainCloneContext(ctx, path, true, &git.CloneOptions{ 45 + URL: url, 46 + Mirror: true, 47 + }) 48 + if err != nil && !errors.Is(err, transport.ErrEmptyRemoteRepository) { 49 + return fmt.Errorf("cloning repo: %w", err) 50 + } 51 + return nil 52 + } 53 + 54 + func (c *GoGitMirrorClient) Fetch(ctx context.Context, path string, url string) error { 55 + gr, err := git.PlainOpen(path) 56 + if err != nil { 57 + return fmt.Errorf("opening local repo: %w", err) 58 + } 59 + if err := gr.FetchContext(ctx, &git.FetchOptions{ 60 + RemoteURL: url, 61 + RefSpecs: []gitconfig.RefSpec{gitconfig.RefSpec("+refs/*:refs/*")}, 62 + Force: true, 63 + Prune: true, 64 + }); err != nil { 65 + return fmt.Errorf("fetching reppo: %w", err) 66 + } 67 + return nil 68 + }
+16
knotmirror/knotmirror.go
··· 4 "context" 5 "fmt" 6 "net/http" 7 "time" 8 9 "github.com/prometheus/client_golang/prometheus/promhttp" 10 "tangled.org/core/knotmirror/config" 11 "tangled.org/core/knotmirror/db" 12 "tangled.org/core/knotmirror/knotstream" 13 "tangled.org/core/log" 14 ) 15 ··· 30 if err != nil { 31 return fmt.Errorf("initializing db: %w", err) 32 } 33 34 knotstream := knotstream.NewKnotStream(logger, db, cfg) 35 crawler := NewCrawler(logger, db)
··· 4 "context" 5 "fmt" 6 "net/http" 7 + _ "net/http/pprof" 8 "time" 9 10 "github.com/prometheus/client_golang/prometheus/promhttp" 11 "tangled.org/core/knotmirror/config" 12 "tangled.org/core/knotmirror/db" 13 "tangled.org/core/knotmirror/knotstream" 14 + "tangled.org/core/knotmirror/models" 15 "tangled.org/core/log" 16 ) 17 ··· 32 if err != nil { 33 return fmt.Errorf("initializing db: %w", err) 34 } 35 + 36 + res, err := db.ExecContext(ctx, 37 + `update repos set state = ? where state = ?`, 38 + models.RepoStateDesynchronized, 39 + models.RepoStateResyncing, 40 + ) 41 + if err != nil { 42 + return fmt.Errorf("clearing resyning states: %w", err) 43 + } 44 + rows, err := res.RowsAffected() 45 + if err != nil { 46 + return fmt.Errorf("getting affected rows: %w", err) 47 + } 48 + logger.Info(fmt.Sprintf("clearing resyning states: %d records updated", rows)) 49 50 knotstream := knotstream.NewKnotStream(logger, db, cfg) 51 crawler := NewCrawler(logger, db)
+3 -2
knotmirror/knotstream/knotstream.go
··· 42 } 43 44 func (s *KnotStream) SubscribeHost(ctx context.Context, hostname string, noSSL bool) error { 45 - s.logger.Debug("subscribe", "nossl", noSSL) 46 host, err := db.GetHost(ctx, s.db, hostname) 47 if err != nil { 48 return fmt.Errorf("loading host from db: %w", err) ··· 60 return fmt.Errorf("adding host to db: %w", err) 61 } 62 63 - s.logger.Info("adding new host subscription", "hostname", hostname, "noSSL", noSSL) 64 } 65 66 if host.Status == models.HostStatusBanned {
··· 42 } 43 44 func (s *KnotStream) SubscribeHost(ctx context.Context, hostname string, noSSL bool) error { 45 + l := s.logger.With("hostname", hostname, "nossl", noSSL) 46 + l.Debug("subscribe") 47 host, err := db.GetHost(ctx, s.db, hostname) 48 if err != nil { 49 return fmt.Errorf("loading host from db: %w", err) ··· 61 return fmt.Errorf("adding host to db: %w", err) 62 } 63 64 + l.Info("adding new host subscription") 65 } 66 67 if host.Status == models.HostStatusBanned {
+2 -2
knotmirror/readme.md
··· 6 7 # TODO 8 9 - - [ ] cleanup 'resyncing' state on shutdown (or on startup too) 10 - - [ ] better tap reconnecting logic 11 - [ ] handle really large repos (maybe shallow-clone first?) 12 13 idea: run multiple different resync workers. 4 for long running tasks, 10 for short tasks. on timeout, schedule it for long running task
··· 6 7 # TODO 8 9 + - [x] cleanup 'resyncing' state on shutdown (or on startup too) 10 + - [x] better tap reconnecting logic 11 - [ ] handle really large repos (maybe shallow-clone first?) 12 13 idea: run multiple different resync workers. 4 for long running tasks, 10 for short tasks. on timeout, schedule it for long running task
+43 -37
knotmirror/resyncer.go
··· 8 "log/slog" 9 "math/rand" 10 "net/url" 11 "path" 12 "sync" 13 "time" 14 15 "github.com/bluesky-social/indigo/atproto/syntax" 16 - "github.com/go-git/go-git/v5" 17 - gitconfig "github.com/go-git/go-git/v5/config" 18 - "github.com/go-git/go-git/v5/plumbing/transport" 19 "tangled.org/core/knotmirror/config" 20 "tangled.org/core/knotmirror/db" 21 "tangled.org/core/knotmirror/models" ··· 74 l.Info("processing resync", "aturi", repoAt) 75 if err := r.resyncRepo(ctx, repoAt); err != nil { 76 l.Error("resync failed", "aturi", repoAt, "error", err) 77 } 78 } 79 } ··· 143 } 144 145 repoPath := r.repoPath(repo) 146 - remoteUrl := r.repoRemoteURL(repo) 147 - l := r.logger.With("repo", repo.DidSlashRepo(), "path", repoPath, "url", remoteUrl) 148 149 ctx, cancel := context.WithTimeout(ctx, r.repoFetchTimeout) 150 defer cancel() ··· 152 // TODO: check if Knot is on backoff list. If so, return (false, nil) 153 // TODO: use r.repoFetchTimeout on fetch 154 // TODO: detect rate limit error (http.StatusTooManyRequests) to put Knot in backoff list 155 - gr, err := git.PlainOpen(repoPath) 156 - if errors.Is(err, git.ErrRepositoryNotExists) { 157 - l.Debug("cloning repo") 158 - // if err := exec.Command("git", "clone", "--mirror", remoteUrl, repoPath).Run(); err != nil { 159 - // return false, fmt.Errorf("cloning repo: %w", err) 160 - // } 161 - _, err := git.PlainCloneContext(ctx, repoPath, true, &git.CloneOptions{ 162 - URL: remoteUrl, 163 - Mirror: true, 164 - }) 165 - if err != nil && !errors.Is(err, transport.ErrEmptyRemoteRepository) { 166 - return false, fmt.Errorf("cloning repo: %w", err) 167 } 168 } else { 169 - if err != nil { 170 - return false, fmt.Errorf("laoding repo: %w", err) 171 - } 172 - l.Debug("fetching repo") 173 - // if err := exec.Command("git", "-C", repoPath, "fetch", "--mirror", remoteUrl).Run(); err != nil { 174 - // return false, fmt.Errorf("fetching repo: %w", err) 175 - // } 176 - if err := gr.FetchContext(ctx, &git.FetchOptions{ 177 - RemoteURL: remoteUrl, 178 - RefSpecs: []gitconfig.RefSpec{gitconfig.RefSpec("+refs/*:refs/*")}, 179 - Force: true, 180 - Prune: true, 181 - }); err != nil { 182 - return false, fmt.Errorf("fetching reppo: %w", err) 183 } 184 } 185 ··· 209 210 repo, err := db.GetRepoByAtUri(ctx, r.db, repoAt) 211 if err != nil { 212 - return err 213 } 214 if repo == nil { 215 return fmt.Errorf("failed to get repo. repo '%s' doesn't exist in db", repoAt) ··· 217 218 var retryCount = repo.RetryCount + 1 219 var retryAfter int64 220 - if retryCount >= 5 { 221 state = models.RepoStateSuspended 222 - errMsg = "too many resync fails" 223 retryAfter = 0 224 } else { 225 // start a 1 min & go up to 1 hr between retries ··· 240 return path.Join(r.repoBasePath, repo.Did.String(), repo.Rkey.String()) 241 } 242 243 - func (r *Resyncer) repoRemoteURL(repo *models.Repo) string { 244 - u, _ := url.Parse(repo.KnotDomain) 245 if u.Scheme == "" { 246 if r.knotUseSSL { 247 u.Scheme = "https" ··· 250 } 251 } 252 u = u.JoinPath(repo.DidSlashRepo()) 253 - return u.String() 254 } 255 256 func backoff(retries int, max int) time.Duration { ··· 258 jitter := time.Millisecond * time.Duration(rand.Intn(1000)) 259 return time.Second*time.Duration(dur) + jitter 260 }
··· 8 "log/slog" 9 "math/rand" 10 "net/url" 11 + "os" 12 "path" 13 "sync" 14 "time" 15 16 "github.com/bluesky-social/indigo/atproto/syntax" 17 "tangled.org/core/knotmirror/config" 18 "tangled.org/core/knotmirror/db" 19 "tangled.org/core/knotmirror/models" ··· 72 l.Info("processing resync", "aturi", repoAt) 73 if err := r.resyncRepo(ctx, repoAt); err != nil { 74 l.Error("resync failed", "aturi", repoAt, "error", err) 75 + 76 + // TODO: cleanup. set resync state to desynchronized 77 } 78 } 79 } ··· 143 } 144 145 repoPath := r.repoPath(repo) 146 + l := r.logger.With("repo", repo.DidSlashRepo(), "path", repoPath) 147 + 148 + remoteUrl, err := r.repoRemoteURL(repo) 149 + if err != nil { 150 + return false, fmt.Errorf("parsing knot url: %w", err) 151 + } 152 + l = l.With("url", remoteUrl) 153 154 ctx, cancel := context.WithTimeout(ctx, r.repoFetchTimeout) 155 defer cancel() ··· 157 // TODO: check if Knot is on backoff list. If so, return (false, nil) 158 // TODO: use r.repoFetchTimeout on fetch 159 // TODO: detect rate limit error (http.StatusTooManyRequests) to put Knot in backoff list 160 + 161 + // NOTE: using plain git-cli for clone/fetch as go-git is too memory-intensive. 162 + gitclient := &CliGitMirrorClient{} 163 + 164 + exist, err := isDir(repoPath) 165 + if err != nil { 166 + return false, fmt.Errorf("checking repo path: %w", err) 167 + } 168 + if !exist { 169 + if err := gitclient.Clone(ctx, repoPath, remoteUrl); err != nil { 170 + return false, err 171 } 172 } else { 173 + if err := gitclient.Fetch(ctx, repoPath, remoteUrl); err != nil { 174 + return false, err 175 } 176 } 177 ··· 201 202 repo, err := db.GetRepoByAtUri(ctx, r.db, repoAt) 203 if err != nil { 204 + return fmt.Errorf("failed to get repo: %w", err) 205 } 206 if repo == nil { 207 return fmt.Errorf("failed to get repo. repo '%s' doesn't exist in db", repoAt) ··· 209 210 var retryCount = repo.RetryCount + 1 211 var retryAfter int64 212 + if retryCount >= 10 { 213 state = models.RepoStateSuspended 214 + errMsg = fmt.Sprintf("too many resync fails: %s", errMsg) 215 retryAfter = 0 216 } else { 217 // start a 1 min & go up to 1 hr between retries ··· 232 return path.Join(r.repoBasePath, repo.Did.String(), repo.Rkey.String()) 233 } 234 235 + func (r *Resyncer) repoRemoteURL(repo *models.Repo) (string, error) { 236 + u, err := url.Parse(repo.KnotDomain) 237 + if err != nil { 238 + return "", err 239 + } 240 if u.Scheme == "" { 241 if r.knotUseSSL { 242 u.Scheme = "https" ··· 245 } 246 } 247 u = u.JoinPath(repo.DidSlashRepo()) 248 + return u.String(), nil 249 } 250 251 func backoff(retries int, max int) time.Duration { ··· 253 jitter := time.Millisecond * time.Duration(rand.Intn(1000)) 254 return time.Second*time.Duration(dur) + jitter 255 } 256 + 257 + func isDir(path string) (bool, error) { 258 + info, err := os.Stat(path) 259 + if err == nil && info.IsDir() { 260 + return true, nil 261 + } 262 + if os.IsNotExist(err) { 263 + return false, nil 264 + } 265 + return false, err 266 + }
+22 -2
knotmirror/tapclient.go
··· 6 "encoding/json" 7 "fmt" 8 "log/slog" 9 "net/url" 10 "time" 11 ··· 82 status = models.RepoStateSuspended 83 errMsg = "failed to parse knot url" 84 } 85 - if u.Hostname() == "localhost" { 86 status = models.RepoStateSuspended 87 - errMsg = "suspending localhost knot" 88 } 89 90 if err := db.UpsertRepo(ctx, t.db, &models.Repo{ ··· 112 } 113 return nil 114 }
··· 6 "encoding/json" 7 "fmt" 8 "log/slog" 9 + "net/netip" 10 "net/url" 11 "time" 12 ··· 83 status = models.RepoStateSuspended 84 errMsg = "failed to parse knot url" 85 } 86 + if t.cfg.KnotSSRF && isPrivate(u.Hostname()) { 87 status = models.RepoStateSuspended 88 + errMsg = "suspending non-public knot" 89 } 90 91 if err := db.UpsertRepo(ctx, t.db, &models.Repo{ ··· 113 } 114 return nil 115 } 116 + 117 + // isPrivate checks if host is private network. It doesn't perform DNS resolution 118 + func isPrivate(host string) bool { 119 + if host == "localhost" { 120 + return true 121 + } 122 + addr, err := netip.ParseAddr(host) 123 + if err != nil { 124 + return false 125 + } 126 + return isPrivateAddr(addr) 127 + } 128 + 129 + func isPrivateAddr(addr netip.Addr) bool { 130 + return addr.IsLoopback() || 131 + addr.IsPrivate() || 132 + addr.IsLinkLocalUnicast() || 133 + addr.IsLinkLocalMulticast() 134 + }