Monorepo for Tangled tangled.org

wip: knotmirror: introduce knotmirror

Signed-off-by: Seongmin Lee <git@boltless.me>

boltless.me f50f53bc 6b739971

verified
+2235 -22
+1
.gitignore
··· 19 19 # Created if following hacking.md 20 20 genjwks.out 21 21 /nix/vm-data 22 + /mirror
+51
cmd/knotmirror/main.go
··· 1 + package main 2 + 3 + import ( 4 + "context" 5 + "log/slog" 6 + "os" 7 + "os/signal" 8 + "syscall" 9 + 10 + "github.com/carlmjohnson/versioninfo" 11 + "github.com/urfave/cli/v3" 12 + "tangled.org/core/knotmirror" 13 + "tangled.org/core/log" 14 + ) 15 + 16 + func main() { 17 + if err := run(os.Args); err != nil { 18 + slog.Error("error running knotmirror", "err", err) 19 + os.Exit(-1) 20 + } 21 + } 22 + 23 + func run(args []string) error { 24 + ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) 25 + defer cancel() 26 + 27 + logger := log.New("knotmirror") 28 + slog.SetDefault(logger) 29 + ctx = log.IntoContext(ctx, logger) 30 + 31 + app := cli.Command{ 32 + Name: "knotmirror", 33 + Usage: "knot mirroring service", 34 + Version: versioninfo.Short(), 35 + } 36 + app.Flags = []cli.Flag{} 37 + app.Commands = []*cli.Command{ 38 + { 39 + Name: "serve", 40 + Usage: "run the knotmirror daemon", 41 + Action: runKnotMirror, 42 + Flags: []cli.Flag{}, 43 + }, 44 + } 45 + return app.Run(ctx, args) 46 + } 47 + 48 + func runKnotMirror(ctx context.Context, cmd *cli.Command) error { 49 + // TODO: generate Config from arguments & pass down to Run() 50 + return knotmirror.Run(ctx) 51 + }
+3 -1
flake.nix
··· 99 99 bluesky-jetstream = self.callPackage ./nix/pkgs/bluesky-jetstream.nix {}; 100 100 bluesky-relay = self.callPackage ./nix/pkgs/bluesky-relay.nix {}; 101 101 tap = self.callPackage ./nix/pkgs/tap.nix {}; 102 + knotmirror = self.callPackage ./nix/pkgs/knot-mirror.nix {}; 102 103 }); 103 104 in { 104 105 overlays.default = final: prev: { 105 - inherit (mkPackageSet final) lexgen goat sqlite-lib spindle knot-unwrapped knot appview docs dolly did-method-plc bluesky-jetstream bluesky-relay tap; 106 + inherit (mkPackageSet final) lexgen goat sqlite-lib spindle knot-unwrapped knot appview docs dolly did-method-plc bluesky-jetstream bluesky-relay tap knotmirror; 106 107 }; 107 108 108 109 packages = forAllSystems (system: let ··· 191 192 pkgs.coreutils # for those of us who are on systems that use busybox (alpine) 192 193 packages'.lexgen 193 194 packages'.treefmt-wrapper 195 + packages'.tap 194 196 ]; 195 197 shellHook = '' 196 198 mkdir -p appview/pages/static
+8 -7
go.mod
··· 37 37 github.com/microcosm-cc/bluemonday v1.0.27 38 38 github.com/openbao/openbao/api/v2 v2.3.0 39 39 github.com/posthog/posthog-go v1.5.5 40 + github.com/prometheus/client_golang v1.23.2 40 41 github.com/redis/go-redis/v9 v9.7.3 41 42 github.com/resend/resend-go/v2 v2.15.0 42 43 github.com/sethvargo/go-envconfig v1.1.0 43 44 github.com/srwiley/oksvg v0.0.0-20221011165216-be6e8873101c 44 45 github.com/srwiley/rasterx v0.0.0-20220730225603-2ab79fcdd4ef 45 - github.com/stretchr/testify v1.10.0 46 + github.com/stretchr/testify v1.11.1 46 47 github.com/urfave/cli/v3 v3.4.1 47 48 github.com/whyrusleeping/cbor-gen v0.3.1 48 49 github.com/yuin/goldmark v1.7.13 49 50 github.com/yuin/goldmark-emoji v1.0.6 50 51 github.com/yuin/goldmark-highlighting/v2 v2.0.0-20230729083705-37449abec8cc 51 52 gitlab.com/staticnoise/goldmark-callout v0.0.0-20240609120641-6366b799e4ab 52 - golang.org/x/crypto v0.40.0 53 + golang.org/x/crypto v0.41.0 53 54 golang.org/x/image v0.31.0 54 - golang.org/x/net v0.42.0 55 + golang.org/x/net v0.43.0 55 56 golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da 56 57 gopkg.in/yaml.v3 v3.0.1 57 58 ) ··· 199 200 github.com/pkg/errors v0.9.1 // indirect 200 201 github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect 201 202 github.com/polydawn/refmt v0.89.1-0.20221221234430-40501e09de1f // indirect 202 - github.com/prometheus/client_golang v1.22.0 // indirect 203 203 github.com/prometheus/client_model v0.6.2 // indirect 204 - github.com/prometheus/common v0.64.0 // indirect 204 + github.com/prometheus/common v0.66.1 // indirect 205 205 github.com/prometheus/procfs v0.16.1 // indirect 206 206 github.com/puzpuzpuz/xsync/v4 v4.2.0 // indirect 207 207 github.com/rivo/uniseg v0.4.7 // indirect ··· 227 227 go.uber.org/atomic v1.11.0 // indirect 228 228 go.uber.org/multierr v1.11.0 // indirect 229 229 go.uber.org/zap v1.27.0 // indirect 230 + go.yaml.in/yaml/v2 v2.4.2 // indirect 230 231 golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b // indirect 231 232 golang.org/x/sync v0.17.0 // indirect 232 - golang.org/x/sys v0.34.0 // indirect 233 + golang.org/x/sys v0.35.0 // indirect 233 234 golang.org/x/text v0.29.0 // indirect 234 235 golang.org/x/time v0.12.0 // indirect 235 236 google.golang.org/genproto/googleapis/api v0.0.0-20250603155806-513f23925822 // indirect 236 237 google.golang.org/genproto/googleapis/rpc v0.0.0-20250603155806-513f23925822 // indirect 237 238 google.golang.org/grpc v1.73.0 // indirect 238 - google.golang.org/protobuf v1.36.6 // indirect 239 + google.golang.org/protobuf v1.36.8 // indirect 239 240 gopkg.in/fsnotify.v1 v1.4.7 // indirect 240 241 gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect 241 242 gopkg.in/warnings.v0 v0.1.2 // indirect
+126
knotmirror/adminpage.go
··· 1 + package knotmirror 2 + 3 + import ( 4 + "database/sql" 5 + "embed" 6 + "html/template" 7 + "log/slog" 8 + "net/http" 9 + "strconv" 10 + "time" 11 + 12 + "github.com/go-chi/chi/v5" 13 + "tangled.org/core/appview/pagination" 14 + "tangled.org/core/knotmirror/db" 15 + "tangled.org/core/knotmirror/models" 16 + "tangled.org/core/orm" 17 + ) 18 + 19 + //go:embed templates/*.html 20 + var templateFS embed.FS 21 + 22 + const repoPageSize = 20 23 + 24 + type AdminServer struct { 25 + db *sql.DB 26 + } 27 + 28 + func NewAdminServer(database *sql.DB) *AdminServer { 29 + return &AdminServer{db: database} 30 + } 31 + 32 + func (s *AdminServer) Router() http.Handler { 33 + r := chi.NewRouter() 34 + r.Get("/repos", s.handleRepos()) 35 + r.Get("/hosts", s.handleHosts()) 36 + return r 37 + } 38 + 39 + func funcmap() template.FuncMap { 40 + return template.FuncMap{ 41 + "add": func(a, b int) int { return a + b }, 42 + "sub": func(a, b int) int { return a - b }, 43 + "readt": func(ts int64) string { 44 + if ts == 0 { 45 + return "n/a" 46 + } 47 + return time.Unix(ts, 0).Format("2006-01-02 15:04") 48 + }, 49 + } 50 + } 51 + 52 + func (s *AdminServer) handleRepos() http.HandlerFunc { 53 + // TODO: prepare template 54 + tpl := template.Must(template.New("").Funcs(funcmap()).ParseFS(templateFS, "templates/base.html", "templates/repos.html")) 55 + return func(w http.ResponseWriter, r *http.Request) { 56 + pageNum, _ := strconv.Atoi(r.URL.Query().Get("page")) 57 + if pageNum < 1 { 58 + pageNum = 1 59 + } 60 + var ( 61 + did = r.URL.Query().Get("did") 62 + knot = r.URL.Query().Get("knot") 63 + state = r.URL.Query().Get("state") 64 + ) 65 + 66 + page := pagination.Page{ 67 + Offset: (pageNum - 1) * repoPageSize, 68 + Limit: repoPageSize, 69 + } 70 + var filters []orm.Filter 71 + 72 + if did != "" { 73 + filters = append(filters, orm.FilterEq("did", did)) 74 + } 75 + if knot != "" { 76 + filters = append(filters, orm.FilterEq("knot_domain", knot)) 77 + } 78 + if state != "" { 79 + filters = append(filters, orm.FilterEq("state", state)) 80 + } 81 + 82 + repos, err := db.ListRepos(r.Context(), s.db, page, filters...) 83 + if err != nil { 84 + http.Error(w, err.Error(), http.StatusInternalServerError) 85 + } 86 + counts, err := db.GetRepoCountsByState(r.Context(), s.db) 87 + if err != nil { 88 + http.Error(w, err.Error(), http.StatusInternalServerError) 89 + } 90 + err = tpl.ExecuteTemplate(w, "base", map[string]any{ 91 + "Repos": repos, 92 + "RepoCounts": counts, 93 + "Page": pageNum, 94 + "FilterByDid": did, 95 + "FilterByKnot": knot, 96 + "FilterByState": models.RepoState(state), 97 + }) 98 + if err != nil { 99 + slog.Error("failed to render", "err", err) 100 + } 101 + } 102 + } 103 + 104 + func (s *AdminServer) handleHosts() http.HandlerFunc { 105 + tpl := template.Must(template.New("").Funcs(funcmap()).ParseFS(templateFS, "templates/base.html", "templates/hosts.html")) 106 + return func(w http.ResponseWriter, r *http.Request) { 107 + var status = r.URL.Query().Get("status") 108 + 109 + var filters []orm.Filter 110 + if status != "" { 111 + filters = append(filters, orm.FilterEq("status", status)) 112 + } 113 + 114 + hosts, err := db.ListHosts(r.Context(), s.db, filters...) 115 + if err != nil { 116 + http.Error(w, err.Error(), http.StatusInternalServerError) 117 + } 118 + err = tpl.ExecuteTemplate(w, "base", map[string]any{ 119 + "Hosts": hosts, 120 + "FilterByStatus": models.HostStatus(status), 121 + }) 122 + if err != nil { 123 + slog.Error("failed to render", "err", err) 124 + } 125 + } 126 + }
+34
knotmirror/config/config.go
··· 1 + package config 2 + 3 + import ( 4 + "context" 5 + "time" 6 + 7 + "github.com/sethvargo/go-envconfig" 8 + ) 9 + 10 + type Config struct { 11 + TapUrl string `env:"MIRROR_TAP_URL, default=http://localhost:2480"` 12 + DbPath string `env:"MIRROR_DB_PATH, default=mirror.db"` 13 + KnotUseSSL bool `env:"MIRROR_KNOT_USE_SSL, default=false"` // use SSL for Knot when not scheme is not specified 14 + KnotSSRF bool `env:"MIRROR_KNOT_SSRF, default=false"` 15 + GitRepoBasePath string `env:"MIRROR_GIT_BASEPATH, default=repos"` 16 + GitRepoFetchTimeout time.Duration `env:"MIRROR_GIT_FETCH_TIMEOUT, default=600s"` 17 + ResyncParallelism int `env:"MIRROR_RESYNC_PARALLELISM, default=5"` 18 + Slurper SlurperConfig `env:",prefix=MIRROR_SLURPER_"` 19 + MetricsListen string `env:"MIRROR_METRICS_LISTEN, default=:7100"` 20 + AdminListen string `env:"MIRROR_ADMIN_LISTEN, default=:7200"` 21 + } 22 + 23 + type SlurperConfig struct { 24 + PersistCursorPeriod time.Duration `env:"PERSIST_CURSOR_PERIOD, default=4s"` 25 + ConcurrencyPerHost int `env:"CONCURRENCY, default=4"` 26 + } 27 + 28 + func Load(ctx context.Context) (*Config, error) { 29 + var cfg Config 30 + if err := envconfig.Process(ctx, &cfg); err != nil { 31 + return nil, err 32 + } 33 + return &cfg, nil 34 + }
+25
knotmirror/crawler.go
··· 1 + package knotmirror 2 + 3 + import ( 4 + "context" 5 + "database/sql" 6 + "log/slog" 7 + 8 + "tangled.org/core/log" 9 + ) 10 + 11 + type Crawler struct { 12 + logger *slog.Logger 13 + db *sql.DB 14 + } 15 + 16 + func NewCrawler(l *slog.Logger, db *sql.DB) *Crawler { 17 + return &Crawler{ 18 + logger: log.SubLogger(l, "crawler"), 19 + db: db, 20 + } 21 + } 22 + 23 + func (c *Crawler) Start(ctx context.Context) { 24 + // TODO: repository crawler 25 + }
+68
knotmirror/db/db.go
··· 1 + package db 2 + 3 + import ( 4 + "context" 5 + "database/sql" 6 + "fmt" 7 + "strings" 8 + _ "github.com/mattn/go-sqlite3" 9 + ) 10 + 11 + func Make(ctx context.Context, dbPath string) (*sql.DB, error) { 12 + // https://github.com/mattn/go-sqlite3#connection-string 13 + opts := []string{ 14 + "_foreign_keys=1", 15 + "_journal_mode=WAL", 16 + "_synchronous=NORMAL", 17 + "_auto_vacuum=incremental", 18 + } 19 + 20 + db, err := sql.Open("sqlite3", dbPath+"?"+strings.Join(opts, "&")) 21 + if err != nil { 22 + return nil, err 23 + } 24 + 25 + conn, err := db.Conn(ctx) 26 + if err != nil { 27 + return nil, err 28 + } 29 + defer conn.Close() 30 + 31 + _, err = conn.ExecContext(ctx, ` 32 + create table if not exists repos ( 33 + did text not null, 34 + rkey text not null, 35 + at_uri text generated always as ('at://' || did || '/' || 'sh.tangled.repo' || '/' || rkey) stored, 36 + cid text not null, 37 + 38 + -- record content 39 + name text not null, 40 + knot_domain text not null, 41 + 42 + -- sync data 43 + git_rev text not null, 44 + repo_sha text not null, 45 + state text not null default 'pending', 46 + error_msg text, 47 + retry_count integer not null default 0, 48 + retry_after integer not null default 0, 49 + 50 + unique(did, rkey) 51 + ); 52 + 53 + -- knot hosts 54 + create table if not exists hosts ( 55 + hostname text not null, 56 + no_ssl integer not null default 0, 57 + status text not null default 'active', 58 + last_seq integer not null default -1, 59 + 60 + unique(hostname) 61 + ); 62 + `) 63 + if err != nil { 64 + return nil, fmt.Errorf("initializing db schema: %w", err) 65 + } 66 + 67 + return db, nil 68 + }
+116
knotmirror/db/hosts.go
··· 1 + package db 2 + 3 + import ( 4 + "context" 5 + "database/sql" 6 + "errors" 7 + "fmt" 8 + "log" 9 + "strings" 10 + 11 + "tangled.org/core/knotmirror/models" 12 + "tangled.org/core/orm" 13 + ) 14 + 15 + func UpsertHost(ctx context.Context, e *sql.DB, host *models.Host) error { 16 + if _, err := e.ExecContext(ctx, 17 + `insert into hosts (hostname, no_ssl, status, last_seq) 18 + values (?, ?, ?, ?) 19 + on conflict(hostname) do update set 20 + no_ssl = excluded.no_ssl, 21 + status = excluded.status, 22 + last_seq = excluded.last_seq 23 + `, 24 + host.Hostname, 25 + host.NoSSL, 26 + host.Status, 27 + host.LastSeq, 28 + ); err != nil { 29 + return fmt.Errorf("upserting host: %w", err) 30 + }; 31 + return nil 32 + } 33 + 34 + func GetHost(ctx context.Context, e *sql.DB, hostname string) (*models.Host, error) { 35 + var host models.Host 36 + if err := e.QueryRowContext(ctx, 37 + `select hostname, no_ssl, status, last_seq 38 + from hosts where hostname = ?`, 39 + hostname, 40 + ).Scan( 41 + &host.Hostname, 42 + &host.NoSSL, 43 + &host.Status, 44 + &host.LastSeq, 45 + ); err != nil { 46 + if errors.Is(err, sql.ErrNoRows) { 47 + return nil, nil 48 + } 49 + return nil, err 50 + }; 51 + return &host, nil 52 + } 53 + 54 + func StoreCursors(ctx context.Context, e *sql.DB, cursors []models.HostCursor) error { 55 + tx, err := e.BeginTx(ctx, nil) 56 + if err != nil { 57 + return fmt.Errorf("starting transaction: %w", err) 58 + } 59 + defer tx.Rollback() 60 + for _, cur := range cursors { 61 + if cur.LastSeq <= 0 { 62 + continue 63 + } 64 + if _, err := tx.ExecContext(ctx, 65 + `update hosts set last_seq = ? where hostname = ?`, 66 + cur.LastSeq, 67 + cur.Hostname, 68 + ); err != nil { 69 + log.Println("failed to persist host cursor", "host:", cur.Hostname, "lastSeq", cur.LastSeq) 70 + } 71 + } 72 + return tx.Commit() 73 + } 74 + 75 + func ListHosts(ctx context.Context, e *sql.DB, filters ...orm.Filter) ([]models.Host, error) { 76 + var conditions []string 77 + var args []any 78 + 79 + for _, filter := range filters { 80 + conditions = append(conditions, filter.Condition()) 81 + args = append(args, filter.Arg()...) 82 + } 83 + 84 + whereClause := "" 85 + if len(conditions) > 0 { 86 + whereClause = " where " + strings.Join(conditions, " and ") 87 + } 88 + 89 + rows, err := e.QueryContext(ctx, 90 + `select hostname, no_ssl, status, last_seq 91 + from hosts` + whereClause, 92 + args..., 93 + ) 94 + if err != nil { 95 + return nil, fmt.Errorf("querying hosts: %w", err) 96 + } 97 + defer rows.Close() 98 + 99 + var hosts []models.Host 100 + for rows.Next() { 101 + var host models.Host 102 + if err := rows.Scan( 103 + &host.Hostname, 104 + &host.NoSSL, 105 + &host.Status, 106 + &host.LastSeq, 107 + ); err != nil { 108 + return nil, fmt.Errorf("scanning row: %w", err) 109 + } 110 + hosts = append(hosts, host) 111 + } 112 + if err := rows.Err(); err != nil { 113 + return nil, fmt.Errorf("scanning rows: %w ", err) 114 + } 115 + return hosts, nil 116 + }
+268
knotmirror/db/repos.go
··· 1 + package db 2 + 3 + import ( 4 + "context" 5 + "database/sql" 6 + "errors" 7 + "fmt" 8 + 9 + "github.com/bluesky-social/indigo/atproto/syntax" 10 + "tangled.org/core/appview/pagination" 11 + "tangled.org/core/knotmirror/models" 12 + "tangled.org/core/orm" 13 + ) 14 + 15 + func AddRepo(ctx context.Context, e *sql.DB, did syntax.DID, rkey syntax.RecordKey, cid syntax.CID, name, knot string) error { 16 + if _, err := e.ExecContext(ctx, 17 + `insert into repos (did, rkey, cid, name, knot_domain) 18 + values (?, ?, ?, ?, ?)`, 19 + did, rkey, cid, name, knot, 20 + ); err != nil { 21 + return fmt.Errorf("inserting repo: %w", err) 22 + } 23 + return nil 24 + } 25 + 26 + func UpsertRepo(ctx context.Context, e *sql.DB, repo *models.Repo) error { 27 + if _, err := e.ExecContext(ctx, 28 + `insert into repos (did, rkey, cid, name, knot_domain, git_rev, repo_sha, state, error_msg, retry_count, retry_after) 29 + values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 30 + on conflict(did, rkey) do update set 31 + cid = excluded.cid, 32 + name = excluded.name, 33 + knot_domain = excluded.knot_domain, 34 + git_rev = excluded.git_rev, 35 + repo_sha = excluded.repo_sha, 36 + state = excluded.state, 37 + error_msg = excluded.error_msg, 38 + retry_count = excluded.retry_count, 39 + retry_after = excluded.retry_after`, 40 + // where repos.cid != excluded.cid`, 41 + repo.Did, 42 + repo.Rkey, 43 + repo.Cid, 44 + repo.Name, 45 + repo.KnotDomain, 46 + repo.GitRev, 47 + repo.RepoSha, 48 + repo.State, 49 + repo.ErrorMsg, 50 + repo.RetryCount, 51 + repo.RetryAfter, 52 + ); err != nil { 53 + return fmt.Errorf("upserting repo: %w", err) 54 + } 55 + return nil 56 + } 57 + 58 + func UpdateRepoState(ctx context.Context, e *sql.DB, did syntax.DID, rkey syntax.RecordKey, state models.RepoState) error { 59 + if _, err := e.ExecContext(ctx, 60 + `update repos 61 + set state = ? 62 + where did = ? and rkey = ?`, 63 + state, 64 + did, rkey, 65 + ); err != nil { 66 + return fmt.Errorf("updating repo: %w", err) 67 + } 68 + return nil 69 + } 70 + 71 + func DeleteRepo(ctx context.Context, e *sql.DB, did syntax.DID, rkey syntax.RecordKey) error { 72 + if _, err := e.ExecContext(ctx, 73 + `delete from repos where did = ? and rkey = ?`, 74 + did, 75 + rkey, 76 + ); err != nil { 77 + return fmt.Errorf("deleting repo: %w", err) 78 + } 79 + return nil 80 + } 81 + 82 + func GetRepoByName(ctx context.Context, e *sql.DB, did syntax.DID, name string) (*models.Repo, error) { 83 + var repo models.Repo 84 + if err := e.QueryRowContext(ctx, 85 + `select 86 + did, 87 + rkey, 88 + cid, 89 + name, 90 + knot_domain, 91 + git_rev, 92 + repo_sha, 93 + state, 94 + error_msg, 95 + retry_count, 96 + retry_after 97 + from repos 98 + where did = ? and name = ?`, 99 + did, 100 + name, 101 + ).Scan( 102 + &repo.Did, 103 + &repo.Rkey, 104 + &repo.Cid, 105 + &repo.Name, 106 + &repo.KnotDomain, 107 + &repo.GitRev, 108 + &repo.RepoSha, 109 + &repo.State, 110 + &repo.ErrorMsg, 111 + &repo.RetryCount, 112 + &repo.RetryAfter, 113 + ); err != nil { 114 + if errors.Is(err, sql.ErrNoRows) { 115 + return nil, nil 116 + } 117 + return nil, fmt.Errorf("querying repo: %w", err) 118 + } 119 + return &repo, nil 120 + } 121 + 122 + func GetRepoByAtUri(ctx context.Context, e *sql.DB, aturi syntax.ATURI) (*models.Repo, error) { 123 + var repo models.Repo 124 + if err := e.QueryRowContext(ctx, 125 + `select 126 + did, 127 + rkey, 128 + cid, 129 + name, 130 + knot_domain, 131 + git_rev, 132 + repo_sha, 133 + state, 134 + error_msg, 135 + retry_count, 136 + retry_after 137 + from repos 138 + where at_uri = ?`, 139 + aturi, 140 + ).Scan( 141 + &repo.Did, 142 + &repo.Rkey, 143 + &repo.Cid, 144 + &repo.Name, 145 + &repo.KnotDomain, 146 + &repo.GitRev, 147 + &repo.RepoSha, 148 + &repo.State, 149 + &repo.ErrorMsg, 150 + &repo.RetryCount, 151 + &repo.RetryAfter, 152 + ); err != nil { 153 + if errors.Is(err, sql.ErrNoRows) { 154 + return nil, nil 155 + } 156 + return nil, fmt.Errorf("querying repo: %w", err) 157 + } 158 + return &repo, nil 159 + } 160 + 161 + func ListRepos(ctx context.Context, e *sql.DB, page pagination.Page, filters ...orm.Filter) ([]models.Repo, error) { 162 + var conditions []string 163 + var args []any 164 + 165 + for _, filter := range filters { 166 + conditions = append(conditions, filter.Condition()) 167 + args = append(args, filter.Arg()...) 168 + } 169 + 170 + whereClause := "" 171 + if len(conditions) > 0 { 172 + whereClause = "WHERE " + conditions[0] 173 + for _, condition := range conditions[1:] { 174 + whereClause += " AND " + condition 175 + } 176 + } 177 + pageClause := "" 178 + if page.Limit > 0 { 179 + pageClause = " limit ? offset ? " 180 + args = append(args, page.Limit, page.Offset) 181 + } 182 + 183 + query := ` 184 + select 185 + did, 186 + rkey, 187 + cid, 188 + name, 189 + knot_domain, 190 + git_rev, 191 + repo_sha, 192 + state, 193 + error_msg, 194 + retry_count, 195 + retry_after 196 + from repos 197 + ` + whereClause + pageClause 198 + rows, err := e.QueryContext(ctx, query, args...) 199 + if err != nil { 200 + return nil, err 201 + } 202 + defer rows.Close() 203 + 204 + var repos []models.Repo 205 + for rows.Next() { 206 + var repo models.Repo 207 + if err := rows.Scan( 208 + &repo.Did, 209 + &repo.Rkey, 210 + &repo.Cid, 211 + &repo.Name, 212 + &repo.KnotDomain, 213 + &repo.GitRev, 214 + &repo.RepoSha, 215 + &repo.State, 216 + &repo.ErrorMsg, 217 + &repo.RetryCount, 218 + &repo.RetryAfter, 219 + ); err != nil { 220 + return nil, fmt.Errorf("scanning row: %w", err) 221 + } 222 + repos = append(repos, repo) 223 + } 224 + if err := rows.Err(); err != nil { 225 + return nil, fmt.Errorf("scanning rows: %w ", err) 226 + } 227 + 228 + return repos, nil 229 + } 230 + 231 + func GetRepoCountsByState(ctx context.Context, e *sql.DB) (map[models.RepoState]int64, error) { 232 + const q = ` 233 + SELECT state, COUNT(*) 234 + FROM repos 235 + GROUP BY state 236 + ` 237 + 238 + rows, err := e.QueryContext(ctx, q) 239 + if err != nil { 240 + return nil, err 241 + } 242 + defer rows.Close() 243 + 244 + counts := make(map[models.RepoState]int64) 245 + 246 + for rows.Next() { 247 + var state string 248 + var count int64 249 + 250 + if err := rows.Scan(&state, &count); err != nil { 251 + return nil, err 252 + } 253 + 254 + counts[models.RepoState(state)] = count 255 + } 256 + 257 + if err := rows.Err(); err != nil { 258 + return nil, err 259 + } 260 + 261 + for _, s := range (models.RepoState("")).AllStates() { 262 + if _, ok := counts[s]; !ok { 263 + counts[s] = 0 264 + } 265 + } 266 + 267 + return counts, nil 268 + }
+105
knotmirror/git.go
··· 1 + package knotmirror 2 + 3 + import ( 4 + "context" 5 + "errors" 6 + "fmt" 7 + "os/exec" 8 + "regexp" 9 + "strings" 10 + 11 + "github.com/go-git/go-git/v5" 12 + gitconfig "github.com/go-git/go-git/v5/config" 13 + "github.com/go-git/go-git/v5/plumbing/transport" 14 + ) 15 + 16 + type GitMirrorClient interface { 17 + Clone(ctx context.Context, path, url string) error 18 + Fetch(ctx context.Context, path, url string) error 19 + } 20 + 21 + type CliGitMirrorClient struct{} 22 + 23 + var _ GitMirrorClient = new(CliGitMirrorClient) 24 + 25 + func (c *CliGitMirrorClient) Clone(ctx context.Context, path, url string) error { 26 + cmd := exec.CommandContext(ctx, "git", "clone", "--mirror", url, path) 27 + if out, err := cmd.CombinedOutput(); err != nil { 28 + if ctx.Err() != nil { 29 + return ctx.Err() 30 + } 31 + msg := string(out) 32 + if classification := classifyError(msg); classification != nil { 33 + return classification 34 + } 35 + return fmt.Errorf("cloning repo: %w\n%s", err, msg) 36 + } 37 + return nil 38 + } 39 + 40 + func (c *CliGitMirrorClient) Fetch(ctx context.Context, path, url string) error { 41 + cmd := exec.CommandContext(ctx, "git", "-C", path, "fetch", "--prune", "origin") 42 + if out, err := cmd.CombinedOutput(); err != nil { 43 + if ctx.Err() != nil { 44 + return ctx.Err() 45 + } 46 + return fmt.Errorf("fetching repo: %w\n%s", err, string(out)) 47 + } 48 + return nil 49 + } 50 + 51 + var ( 52 + ErrDNSFailure = errors.New("git: dns failure (could not resolve host)") 53 + ErrCertExpired = errors.New("git: certificate has expired") 54 + ErrRepoNotFound = errors.New("git: repository not found") 55 + ) 56 + 57 + var ( 58 + reDNS = regexp.MustCompile(`Could not resolve host:`) 59 + reCertExpired = regexp.MustCompile(`SSL certificate OpenSSL verify result: certificate has expired`) 60 + reRepoNotFound = regexp.MustCompile(`repository '.*' not found`) 61 + ) 62 + 63 + func classifyError(stderr string) error { 64 + msg := strings.TrimSpace(stderr) 65 + switch { 66 + case reDNS.MatchString(msg): 67 + return ErrDNSFailure 68 + case reCertExpired.MatchString(msg): 69 + return ErrCertExpired 70 + case reRepoNotFound.MatchString(msg): 71 + return ErrRepoNotFound 72 + } 73 + return nil 74 + } 75 + 76 + type GoGitMirrorClient struct{} 77 + 78 + var _ GitMirrorClient = new(GoGitMirrorClient) 79 + 80 + func (c *GoGitMirrorClient) Clone(ctx context.Context, path string, url string) error { 81 + _, err := git.PlainCloneContext(ctx, path, true, &git.CloneOptions{ 82 + URL: url, 83 + Mirror: true, 84 + }) 85 + if err != nil && !errors.Is(err, transport.ErrEmptyRemoteRepository) { 86 + return fmt.Errorf("cloning repo: %w", err) 87 + } 88 + return nil 89 + } 90 + 91 + func (c *GoGitMirrorClient) Fetch(ctx context.Context, path string, url string) error { 92 + gr, err := git.PlainOpen(path) 93 + if err != nil { 94 + return fmt.Errorf("opening local repo: %w", err) 95 + } 96 + if err := gr.FetchContext(ctx, &git.FetchOptions{ 97 + RemoteURL: url, 98 + RefSpecs: []gitconfig.RefSpec{gitconfig.RefSpec("+refs/*:refs/*")}, 99 + Force: true, 100 + Prune: true, 101 + }); err != nil { 102 + return fmt.Errorf("fetching reppo: %w", err) 103 + } 104 + return nil 105 + }
+120
knotmirror/knotmirror.go
··· 1 + package knotmirror 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "net/http" 7 + _ "net/http/pprof" 8 + "time" 9 + 10 + "github.com/prometheus/client_golang/prometheus/promhttp" 11 + "tangled.org/core/knotmirror/config" 12 + "tangled.org/core/knotmirror/db" 13 + "tangled.org/core/knotmirror/knotstream" 14 + "tangled.org/core/knotmirror/models" 15 + "tangled.org/core/log" 16 + ) 17 + 18 + func Run(ctx context.Context) error { 19 + // make sure every services are cleaned up on fast return 20 + ctx, cancel := context.WithCancel(ctx) 21 + defer cancel() 22 + 23 + logger := log.FromContext(ctx) 24 + cfg, err := config.Load(ctx) 25 + if err != nil { 26 + return fmt.Errorf("loading config: %w", err) 27 + } 28 + 29 + logger.Debug("config loaded:", "config", cfg) 30 + 31 + db, err := db.Make(ctx, cfg.DbPath) 32 + if err != nil { 33 + return fmt.Errorf("initializing db: %w", err) 34 + } 35 + 36 + res, err := db.ExecContext(ctx, 37 + `update repos set state = ? where state = ?`, 38 + models.RepoStateDesynchronized, 39 + models.RepoStateResyncing, 40 + ) 41 + if err != nil { 42 + return fmt.Errorf("clearing resyning states: %w", err) 43 + } 44 + rows, err := res.RowsAffected() 45 + if err != nil { 46 + return fmt.Errorf("getting affected rows: %w", err) 47 + } 48 + logger.Info(fmt.Sprintf("clearing resyning states: %d records updated", rows)) 49 + 50 + knotstream := knotstream.NewKnotStream(logger, db, cfg) 51 + crawler := NewCrawler(logger, db) 52 + resyncer := NewResyncer(logger, db, cfg) 53 + adminpage := NewAdminServer(db) 54 + 55 + // maintain repository list with tap 56 + // NOTE: this can be removed once we introduce did-for-repo because then we can just listen to KnotStream for #identity events. 57 + tap := NewTapClient(logger, cfg, db, knotstream) 58 + 59 + // start metrics endpoint 60 + go func() { 61 + metricsAddr := cfg.MetricsListen 62 + logger.Info("starting metrics server", "addr", metricsAddr) 63 + http.Handle("/metrics", promhttp.Handler()) 64 + if err := http.ListenAndServe(metricsAddr, nil); err != nil { 65 + logger.Error("metrics server failed", "error", err) 66 + } 67 + }() 68 + 69 + // start admin page endpoint 70 + go func() { 71 + logger.Info("starting admin server", "addr", cfg.AdminListen) 72 + if err := http.ListenAndServe(cfg.AdminListen, adminpage.Router()); err != nil { 73 + logger.Error("admin server failed", "error", err) 74 + } 75 + }() 76 + 77 + tap.Start(ctx) 78 + 79 + resyncer.Start(ctx) 80 + 81 + // periodically crawl the entire network to mirror the repositories 82 + crawler.Start(ctx) 83 + 84 + // listen to knotstream (currently we don't have relay for knots, so subscribe every known knots) 85 + knotstream.Start(ctx) 86 + 87 + svcErr := make(chan error, 1) 88 + if err := knotstream.ResubscribeAllHosts(ctx); err != nil { 89 + svcErr <- fmt.Errorf("resubscribing known hosts: %w", err) 90 + } 91 + 92 + logger.Info("startup complete") 93 + select { 94 + case <-ctx.Done(): 95 + logger.Info("received shutdown signal", "reason", ctx.Err()) 96 + case err := <-svcErr: 97 + if err != nil { 98 + logger.Error("service error", "error", err) 99 + } 100 + cancel() 101 + } 102 + 103 + logger.Info("shutting down knotmirror") 104 + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second) 105 + defer shutdownCancel() 106 + 107 + var errs []error 108 + if err := knotstream.Shutdown(shutdownCtx); err != nil { 109 + errs = append(errs, err) 110 + } 111 + if err := db.Close(); err != nil { 112 + errs = append(errs, err) 113 + } 114 + for _, err := range errs { 115 + logger.Error("error during shutdown", "err", err) 116 + } 117 + 118 + logger.Info("shutdown complete") 119 + return nil 120 + }
+89
knotmirror/knotstream/knotstream.go
··· 1 + package knotstream 2 + 3 + import ( 4 + "context" 5 + "database/sql" 6 + "fmt" 7 + "log/slog" 8 + "time" 9 + 10 + "tangled.org/core/knotmirror/config" 11 + "tangled.org/core/knotmirror/db" 12 + "tangled.org/core/knotmirror/models" 13 + "tangled.org/core/log" 14 + "tangled.org/core/orm" 15 + ) 16 + 17 + type KnotStream struct { 18 + logger *slog.Logger 19 + db *sql.DB 20 + slurper *KnotSlurper 21 + } 22 + 23 + func NewKnotStream(l *slog.Logger, db *sql.DB, cfg *config.Config) *KnotStream { 24 + l = log.SubLogger(l, "knotstream") 25 + return &KnotStream{ 26 + logger: l, 27 + db: db, 28 + slurper: NewKnotSlurper(l, db, cfg.Slurper), 29 + } 30 + } 31 + 32 + func (s *KnotStream) Start(ctx context.Context) { 33 + go s.slurper.Run(ctx) 34 + } 35 + 36 + func (s *KnotStream) Shutdown(ctx context.Context) error { 37 + return s.slurper.Shutdown(ctx) 38 + } 39 + 40 + func (s *KnotStream) CheckIfSubscribed(hostname string) bool { 41 + return s.slurper.CheckIfSubscribed(hostname) 42 + } 43 + 44 + func (s *KnotStream) SubscribeHost(ctx context.Context, hostname string, noSSL bool) error { 45 + l := s.logger.With("hostname", hostname, "nossl", noSSL) 46 + l.Debug("subscribe") 47 + host, err := db.GetHost(ctx, s.db, hostname) 48 + if err != nil { 49 + return fmt.Errorf("loading host from db: %w", err) 50 + } 51 + 52 + if host == nil { 53 + host = &models.Host{ 54 + Hostname: hostname, 55 + NoSSL: noSSL, 56 + Status: models.HostStatusActive, 57 + LastSeq: 0, 58 + } 59 + 60 + if err := db.UpsertHost(ctx, s.db, host); err != nil { 61 + return fmt.Errorf("adding host to db: %w", err) 62 + } 63 + 64 + l.Info("adding new host subscription") 65 + } 66 + 67 + if host.Status == models.HostStatusBanned { 68 + return fmt.Errorf("cannot subscribe to banned knot") 69 + } 70 + return s.slurper.Subscribe(ctx, *host) 71 + } 72 + 73 + func (s *KnotStream) ResubscribeAllHosts(ctx context.Context) error { 74 + hosts, err := db.ListHosts(ctx, s.db, orm.FilterEq("status", "active")) 75 + if err != nil { 76 + return fmt.Errorf("listing hosts: %w", err) 77 + } 78 + 79 + for _, host := range hosts { 80 + l := s.logger.With("hostname", host.Hostname) 81 + l.Info("re-subscribing to active host") 82 + if err := s.slurper.Subscribe(ctx, host); err != nil { 83 + l.Warn("failed to re-subscribe to host", "err", err) 84 + } 85 + // sleep for a very short period, so we don't open tons of sockets at the same time 86 + time.Sleep(1 * time.Millisecond) 87 + } 88 + return nil 89 + }
+28
knotmirror/knotstream/metrics.go
··· 1 + package knotstream 2 + 3 + import ( 4 + "github.com/prometheus/client_golang/prometheus" 5 + "github.com/prometheus/client_golang/prometheus/promauto" 6 + ) 7 + 8 + // KnotStream metrics 9 + var ( 10 + knotstreamEventsReceived = promauto.NewCounter(prometheus.CounterOpts{ 11 + Name: "knotmirror_knotstream_events_received_total", 12 + Help: "Total number of events received from knotstream", 13 + }) 14 + knotstreamEventsProcessed = promauto.NewCounter(prometheus.CounterOpts{ 15 + Name: "knotmirror_knotstream_events_processed_total", 16 + Help: "Total number of events successfully processed", 17 + }) 18 + knotstreamEventsSkipped = promauto.NewCounter(prometheus.CounterOpts{ 19 + Name: "knotmirror_knotstream_events_skipped_total", 20 + Help: "Total number of events skipped (not tracked)", 21 + }) 22 + ) 23 + 24 + // slurper metrics 25 + var connectedInbound = promauto.NewGauge(prometheus.GaugeOpts{ 26 + Name: "knotmirror_connected_inbound", 27 + Help: "Number of inbound knotstream we are consuming", 28 + })
+102
knotmirror/knotstream/scheduler.go
··· 1 + package knotstream 2 + 3 + import ( 4 + "context" 5 + "log/slog" 6 + "sync" 7 + "sync/atomic" 8 + "time" 9 + 10 + "tangled.org/core/log" 11 + ) 12 + 13 + type ParallelScheduler struct { 14 + concurrency int 15 + 16 + do func(ctx context.Context, task *Task) error 17 + 18 + feeder chan *Task 19 + lk sync.Mutex 20 + scheduled map[string][]*Task 21 + lastSeq atomic.Int64 22 + 23 + logger *slog.Logger 24 + } 25 + 26 + type Task struct { 27 + key string 28 + message []byte 29 + } 30 + 31 + func NewParallelScheduler(maxC int, ident string, do func(context.Context, *Task) error) *ParallelScheduler { 32 + return &ParallelScheduler{ 33 + concurrency: maxC, 34 + do: do, 35 + feeder: make(chan *Task), 36 + scheduled: make(map[string][]*Task), 37 + logger: log.New("parallel-scheduler"), 38 + } 39 + } 40 + 41 + func (s *ParallelScheduler) Start(ctx context.Context) { 42 + for range s.concurrency { 43 + go s.ForEach(ctx, s.do) 44 + } 45 + } 46 + 47 + func (s *ParallelScheduler) AddTask(ctx context.Context, task *Task) { 48 + s.lk.Lock() 49 + if st, ok := s.scheduled[task.key]; ok { 50 + // schedule task 51 + s.scheduled[task.key] = append(st, task) 52 + s.lk.Unlock() 53 + return 54 + } 55 + s.scheduled[task.key] = []*Task{} 56 + s.lk.Unlock() 57 + 58 + select { 59 + case <-ctx.Done(): 60 + return 61 + case s.feeder <- task: 62 + return 63 + } 64 + } 65 + 66 + func (s *ParallelScheduler) ForEach(ctx context.Context, fn func(context.Context, *Task) error) { 67 + for task := range s.feeder { 68 + for task != nil { 69 + select { 70 + case <-ctx.Done(): 71 + return 72 + default: 73 + } 74 + if err := fn(ctx, task); err != nil { 75 + s.logger.Error("event handler failed", "err", err) 76 + } 77 + 78 + s.lk.Lock() 79 + func() { 80 + rem, ok := s.scheduled[task.key] 81 + if !ok { 82 + s.logger.Error("should always have an 'active' entry if a worker is processing a job") 83 + } 84 + if len(rem) == 0 { 85 + delete(s.scheduled, task.key) 86 + task = nil 87 + } else { 88 + task = rem[0] 89 + s.scheduled[task.key] = rem[1:] 90 + } 91 + 92 + // TODO: update seq from received message 93 + s.lastSeq.Store(time.Now().UnixNano()) 94 + }() 95 + s.lk.Unlock() 96 + } 97 + } 98 + } 99 + 100 + func (s *ParallelScheduler) LastSeq() int64 { 101 + return s.lastSeq.Load() 102 + }
+334
knotmirror/knotstream/slurper.go
··· 1 + package knotstream 2 + 3 + import ( 4 + "context" 5 + "database/sql" 6 + "encoding/json" 7 + "fmt" 8 + "log/slog" 9 + "math/rand" 10 + "net/http" 11 + "sync" 12 + "time" 13 + 14 + "github.com/bluesky-social/indigo/atproto/syntax" 15 + "github.com/bluesky-social/indigo/util/ssrf" 16 + "github.com/carlmjohnson/versioninfo" 17 + "github.com/gorilla/websocket" 18 + "tangled.org/core/api/tangled" 19 + "tangled.org/core/knotmirror/config" 20 + "tangled.org/core/knotmirror/db" 21 + "tangled.org/core/knotmirror/models" 22 + "tangled.org/core/log" 23 + ) 24 + 25 + type KnotSlurper struct { 26 + logger *slog.Logger 27 + db *sql.DB 28 + cfg config.SlurperConfig 29 + 30 + subsLk sync.Mutex 31 + subs map[string]*subscription 32 + } 33 + 34 + func NewKnotSlurper(l *slog.Logger, db *sql.DB, cfg config.SlurperConfig) *KnotSlurper { 35 + return &KnotSlurper{ 36 + logger: log.SubLogger(l, "slurper"), 37 + db: db, 38 + cfg: cfg, 39 + subs: make(map[string]*subscription), 40 + } 41 + } 42 + 43 + func (s *KnotSlurper) Run(ctx context.Context) { 44 + for { 45 + select { 46 + case <-ctx.Done(): 47 + return 48 + case <-time.After(s.cfg.PersistCursorPeriod): 49 + if err := s.persistCursors(ctx); err != nil { 50 + s.logger.Error("failed to flush cursors", "err", err) 51 + } 52 + } 53 + } 54 + } 55 + 56 + func (s *KnotSlurper) CheckIfSubscribed(hostname string) bool { 57 + s.subsLk.Lock() 58 + defer s.subsLk.Unlock() 59 + 60 + _, ok := s.subs[hostname] 61 + return ok 62 + } 63 + 64 + func (s *KnotSlurper) Shutdown(ctx context.Context) error { 65 + s.logger.Info("starting shutdown host cursor flush") 66 + err := s.persistCursors(ctx) 67 + if err != nil { 68 + s.logger.Error("shutdown error", "err", err) 69 + } 70 + s.logger.Info("slurper shutdown complete") 71 + return err 72 + } 73 + 74 + func (s *KnotSlurper) persistCursors(ctx context.Context) error { 75 + // // gather cursor list from subscriptions and store them to DB 76 + // start := time.Now() 77 + 78 + s.subsLk.Lock() 79 + cursors := make([]models.HostCursor, len(s.subs)) 80 + i := 0 81 + for _, sub := range s.subs { 82 + cursors[i] = sub.HostCursor() 83 + i++ 84 + } 85 + s.subsLk.Unlock() 86 + 87 + err := db.StoreCursors(ctx, s.db, cursors) 88 + // s.logger.Info("finished persisting cursors", "count", len(cursors), "duration", time.Since(start).String(), "err", err) 89 + return err 90 + } 91 + 92 + func (s *KnotSlurper) Subscribe(ctx context.Context, host models.Host) error { 93 + s.subsLk.Lock() 94 + defer s.subsLk.Unlock() 95 + 96 + _, ok := s.subs[host.Hostname] 97 + if ok { 98 + return fmt.Errorf("already subscribed: %s", host.Hostname) 99 + } 100 + 101 + // TODO: include `cancel` function to kill subscription by hostname 102 + sub := &subscription{ 103 + hostname: host.Hostname, 104 + scheduler: NewParallelScheduler( 105 + s.cfg.ConcurrencyPerHost, 106 + host.Hostname, 107 + s.ProcessEvent, 108 + ), 109 + } 110 + s.subs[host.Hostname] = sub 111 + 112 + sub.scheduler.Start(ctx) 113 + go s.subscribeWithRedialer(ctx, host, sub) 114 + return nil 115 + } 116 + 117 + func (s *KnotSlurper) subscribeWithRedialer(ctx context.Context, host models.Host, sub *subscription) { 118 + l := s.logger.With("host", host.Hostname) 119 + 120 + dialer := websocket.Dialer{ 121 + HandshakeTimeout: time.Second * 5, 122 + } 123 + 124 + // if this isn't a localhost / private connection, then we should enable SSRF protections 125 + if !host.NoSSL { 126 + netDialer := ssrf.PublicOnlyDialer() 127 + dialer.NetDialContext = netDialer.DialContext 128 + } 129 + 130 + cursor := host.LastSeq 131 + 132 + connectedInbound.Inc() 133 + defer connectedInbound.Dec() 134 + 135 + var backoff int 136 + for { 137 + select { 138 + case <-ctx.Done(): 139 + return 140 + default: 141 + } 142 + u := host.LegacyEventsURL(cursor) 143 + l.Debug("made url with cursor", "cursor", cursor, "url", u) 144 + 145 + // NOTE: manual backoff retry implementation to explicitly handle fails 146 + hdr := make(http.Header) 147 + hdr.Add("User-Agent", userAgent()) 148 + conn, resp, err := dialer.DialContext(ctx, u, hdr) 149 + if err != nil { 150 + l.Warn("dialing failed", "err", err, "backoff", backoff) 151 + time.Sleep(sleepForBackoff(backoff)) 152 + backoff++ 153 + if backoff > 15 { 154 + l.Warn("host does not appear to be online, disabling for now") 155 + host.Status = models.HostStatusOffline 156 + if err := db.UpsertHost(ctx, s.db, &host); err != nil { 157 + l.Error("failed to update host status", "err", err) 158 + } 159 + return 160 + } 161 + continue 162 + } 163 + 164 + l.Debug("knot event subscription response", "code", resp.StatusCode, "url", u) 165 + 166 + if err := s.handleConnection(ctx, conn, sub); err != nil { 167 + // TODO: measure the last N connection error times and if they're coming too fast reconnect slower or don't reconnect and wait for requestCrawl 168 + l.Warn("host connection failed", "err", err, "backoff", backoff) 169 + } 170 + 171 + updatedCursor := sub.LastSeq() 172 + didProgress := updatedCursor > cursor 173 + l.Debug("cursor compare", "cursor", cursor, "updatedCursor", updatedCursor, "didProgress", didProgress) 174 + if cursor == 0 || didProgress { 175 + cursor = updatedCursor 176 + backoff = 0 177 + 178 + batch := []models.HostCursor{sub.HostCursor()} 179 + if err := db.StoreCursors(ctx, s.db, batch); err != nil { 180 + l.Error("failed to store cursors", "err", err) 181 + } 182 + } 183 + } 184 + } 185 + 186 + // handleConnection handles websocket connection. 187 + // Schedules task from received event and return when connection is closed 188 + func (s *KnotSlurper) handleConnection(ctx context.Context, conn *websocket.Conn, sub *subscription) error { 189 + // ping on every 30s 190 + ctx, cancel := context.WithCancel(ctx) 191 + defer cancel() // close the background ping job on connection close 192 + go func() { 193 + t := time.NewTicker(30 * time.Second) 194 + defer t.Stop() 195 + failcount := 0 196 + 197 + for { 198 + select { 199 + case <-t.C: 200 + if err := conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second*10)); err != nil { 201 + s.logger.Warn("failed to ping", "err", err) 202 + failcount++ 203 + if failcount >= 4 { 204 + s.logger.Error("too many ping fails", "count", failcount) 205 + _ = conn.Close() 206 + return 207 + } 208 + } else { 209 + failcount = 0 // ok ping 210 + } 211 + case <-ctx.Done(): 212 + _ = conn.Close() 213 + return 214 + } 215 + } 216 + }() 217 + 218 + conn.SetPingHandler(func(message string) error { 219 + err := conn.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(time.Minute)) 220 + if err == websocket.ErrCloseSent { 221 + return nil 222 + } 223 + return err 224 + }) 225 + conn.SetPongHandler(func(_ string) error { 226 + if err := conn.SetReadDeadline(time.Now().Add(time.Minute)); err != nil { 227 + s.logger.Error("failed to set read deadline", "err", err) 228 + } 229 + return nil 230 + }) 231 + 232 + for { 233 + select { 234 + case <-ctx.Done(): 235 + return ctx.Err() 236 + default: 237 + } 238 + msgType, msg, err := conn.ReadMessage() 239 + if err != nil { 240 + return err 241 + } 242 + 243 + if msgType != websocket.TextMessage { 244 + continue 245 + } 246 + 247 + sub.scheduler.AddTask(ctx, &Task{ 248 + key: sub.hostname, // TODO: replace to repository AT-URI for better concurrency 249 + message: msg, 250 + }) 251 + } 252 + } 253 + 254 + type LegacyGitEvent struct { 255 + Rkey string 256 + Nsid string 257 + Event tangled.GitRefUpdate 258 + } 259 + 260 + func (s *KnotSlurper) ProcessEvent(ctx context.Context, task *Task) error { 261 + var legacyMessage LegacyGitEvent 262 + if err := json.Unmarshal(task.message, &legacyMessage); err != nil { 263 + return fmt.Errorf("unmarshaling message: %w", err) 264 + } 265 + 266 + if err := s.ProcessLegacyGitRefUpdate(ctx, &legacyMessage); err != nil { 267 + return fmt.Errorf("processing gitRefUpdate: %w", err) 268 + } 269 + return nil 270 + } 271 + 272 + func (s *KnotSlurper) ProcessLegacyGitRefUpdate(ctx context.Context, evt *LegacyGitEvent) error { 273 + knotstreamEventsReceived.Inc() 274 + 275 + curr, err := db.GetRepoByName(ctx, s.db, syntax.DID(evt.Event.RepoDid), evt.Event.RepoName) 276 + if err != nil { 277 + return fmt.Errorf("failed to get repo '%s': %w", evt.Event.RepoDid+"/"+evt.Event.RepoName, err) 278 + } 279 + if curr == nil { 280 + // if repo doesn't exist in DB, just ignore the event. That repo is unknown. 281 + // 282 + // Normally did+name is already enough to perform git-fetch as that's 283 + // what needed to fetch the repository. 284 + // But we want to store that in did/rkey in knot-mirror. 285 + // Therefore, we should ignore when the repository is unknown. 286 + // Hopefully crawler will sync it later. 287 + s.logger.Warn("skipping event from unknown repo", "did/repo", evt.Event.RepoDid+"/"+evt.Event.RepoName) 288 + knotstreamEventsSkipped.Inc() 289 + return nil 290 + } 291 + l := s.logger.With("repoAt", curr.AtUri()) 292 + 293 + // TODO: should plan resync to resyncBuffer on RepoStateResyncing 294 + if curr.State != models.RepoStateActive { 295 + l.Debug("skipping non-active repo") 296 + knotstreamEventsSkipped.Inc() 297 + return nil 298 + } 299 + 300 + if curr.GitRev != "" && evt.Rkey <= curr.GitRev.String() { 301 + l.Debug("skipping replayed event", "event.Rkey", evt.Rkey, "currentRev", curr.GitRev) 302 + knotstreamEventsSkipped.Inc() 303 + return nil 304 + } 305 + 306 + // if curr.State == models.RepoStateResyncing { 307 + // firehoseEventsSkipped.Inc() 308 + // return fp.events.addToResyncBuffer(ctx, commit) 309 + // } 310 + 311 + // can't skip anything, update repo state 312 + if err := db.UpdateRepoState(ctx, s.db, curr.Did, curr.Rkey, models.RepoStateDesynchronized); err != nil { 313 + return err 314 + } 315 + 316 + l.Info("event processed", "eventRev", evt.Rkey) 317 + 318 + knotstreamEventsProcessed.Inc() 319 + return nil 320 + } 321 + 322 + func userAgent() string { 323 + return fmt.Sprintf("knotmirror/%s", versioninfo.Short()) 324 + } 325 + 326 + func sleepForBackoff(b int) time.Duration { 327 + if b == 0 { 328 + return 0 329 + } 330 + if b < 10 { 331 + return (time.Duration(b) * 2) + (time.Millisecond * time.Duration(rand.Intn(1000))) 332 + } 333 + return time.Second * 30 334 + }
+22
knotmirror/knotstream/subscription.go
··· 1 + package knotstream 2 + 3 + import "tangled.org/core/knotmirror/models" 4 + 5 + // subscription represents websocket connection with that host 6 + type subscription struct { 7 + hostname string 8 + 9 + // embedded parallel job scheduler 10 + scheduler *ParallelScheduler 11 + } 12 + 13 + func (s *subscription) LastSeq() int64 { 14 + return s.scheduler.LastSeq() 15 + } 16 + 17 + func (s *subscription) HostCursor() models.HostCursor { 18 + return models.HostCursor{ 19 + Hostname: s.hostname, 20 + LastSeq: s.LastSeq(), 21 + } 22 + }
+29
knotmirror/metrics.go
··· 1 + package knotmirror 2 + 3 + import ( 4 + "github.com/prometheus/client_golang/prometheus" 5 + "github.com/prometheus/client_golang/prometheus/promauto" 6 + ) 7 + 8 + // Resync metrics 9 + var ( 10 + // TODO: 11 + // - working / waiting resycner counts 12 + resyncsStarted = promauto.NewCounter(prometheus.CounterOpts{ 13 + Name: "knotmirror_resyncs_started_total", 14 + Help: "Total number of repo resyncs started", 15 + }) 16 + resyncsCompleted = promauto.NewCounter(prometheus.CounterOpts{ 17 + Name: "knotmirror_resyncs_completed_total", 18 + Help: "Total number of repo resyncs completed", 19 + }) 20 + resyncsFailed = promauto.NewCounter(prometheus.CounterOpts{ 21 + Name: "knotmirror_resyncs_failed_total", 22 + Help: "Total number of repo resyncs failed", 23 + }) 24 + resyncDuration = promauto.NewHistogram(prometheus.HistogramOpts{ 25 + Name: "knotmirror_resync_duration_seconds", 26 + Help: "Duration of repo resync operations", 27 + Buckets: prometheus.ExponentialBuckets(0.1, 2, 12), 28 + }) 29 + )
+110
knotmirror/models/models.go
··· 1 + package models 2 + 3 + import ( 4 + "fmt" 5 + 6 + "github.com/bluesky-social/indigo/atproto/syntax" 7 + "tangled.org/core/api/tangled" 8 + ) 9 + 10 + type Repo struct { 11 + Did syntax.DID 12 + Rkey syntax.RecordKey 13 + Cid *syntax.CID 14 + // content of tangled.Repo 15 + Name string 16 + KnotDomain string 17 + 18 + GitRev syntax.TID // last processed git.refUpdate revision 19 + RepoSha string // sha256 sum of git refs (to avoid no-op git fetch) 20 + State RepoState 21 + ErrorMsg string 22 + RetryCount int 23 + RetryAfter int64 // Unix timestamp (seconds) 24 + } 25 + 26 + func (r *Repo) AtUri() syntax.ATURI { 27 + return syntax.ATURI(fmt.Sprintf("at://%s/%s/%s", r.Did, tangled.RepoNSID, r.Rkey)) 28 + } 29 + 30 + func (r *Repo) DidSlashRepo() string { 31 + return fmt.Sprintf("%s/%s", r.Did, r.Name) 32 + } 33 + 34 + type RepoState string 35 + 36 + const ( 37 + RepoStatePending RepoState = "pending" 38 + RepoStateDesynchronized RepoState = "desynchronized" 39 + RepoStateResyncing RepoState = "resyncing" 40 + RepoStateActive RepoState = "active" 41 + RepoStateSuspended RepoState = "suspended" 42 + RepoStateError RepoState = "error" 43 + ) 44 + 45 + func (s RepoState) AllStates() []RepoState { 46 + return []RepoState{ 47 + RepoStatePending, 48 + RepoStateDesynchronized, 49 + RepoStateResyncing, 50 + RepoStateActive, 51 + RepoStateSuspended, 52 + RepoStateError, 53 + } 54 + } 55 + 56 + type HostCursor struct { 57 + Hostname string 58 + LastSeq int64 59 + } 60 + 61 + type Host struct { 62 + Hostname string 63 + NoSSL bool 64 + Status HostStatus 65 + LastSeq int64 66 + } 67 + 68 + type HostStatus string 69 + 70 + const ( 71 + HostStatusActive HostStatus = "active" 72 + HostStatusIdle HostStatus = "idle" 73 + HostStatusOffline HostStatus = "offline" 74 + HostStatusThrottled HostStatus = "throttled" 75 + HostStatusBanned HostStatus = "banned" 76 + ) 77 + 78 + func (s HostStatus) AllStatuses() []HostStatus { 79 + return []HostStatus{ 80 + HostStatusActive, 81 + HostStatusIdle, 82 + HostStatusOffline, 83 + HostStatusThrottled, 84 + HostStatusBanned, 85 + } 86 + } 87 + 88 + // func (h *Host) SubscribeGitRefsURL(cursor int64) string { 89 + // scheme := "wss" 90 + // if h.NoSSL { 91 + // scheme = "ws" 92 + // } 93 + // u := fmt.Sprintf("%s://%s/xrpc/%s", scheme, h.Hostname, tangled.SubscribeGitRefsNSID) 94 + // if cursor > 0 { 95 + // u = fmt.Sprintf("%s?cursor=%d", u, h.LastSeq) 96 + // } 97 + // return u 98 + // } 99 + 100 + func (h *Host) LegacyEventsURL(cursor int64) string { 101 + scheme := "wss" 102 + if h.NoSSL { 103 + scheme = "ws" 104 + } 105 + u := fmt.Sprintf("%s://%s/events", scheme, h.Hostname) 106 + if cursor > 0 { 107 + u = fmt.Sprintf("%s?cursor=%d", u, cursor) 108 + } 109 + return u 110 + }
+17
knotmirror/readme.md
··· 1 + # KnotMirror 2 + 3 + Mirror of all known repos. Heavily inspired by [indigo/relay] and [indigo/tap]. 4 + 5 + Knot Mirror syncs repo list using tap and subscribe to all known knots as KnotStream. 6 + 7 + # TODO 8 + 9 + - [x] cleanup 'resyncing' state on shutdown (or on startup too) 10 + - [x] better tap reconnecting logic 11 + - [ ] handle really large repos (maybe shallow-clone first?) 12 + - [ ] handle repository knot changes (change git repo origin) 13 + 14 + idea: run multiple different resync workers. 4 for long running tasks, 10 for short tasks. on timeout, schedule it for long running task 15 + 16 + [indigo/relay]: https://github.com/bluesky-social/indigo/tree/main/cmd/relay 17 + [indigo/tap]: https://github.com/bluesky-social/indigo/tree/main/cmd/tap
+264
knotmirror/resyncer.go
··· 1 + package knotmirror 2 + 3 + import ( 4 + "context" 5 + "database/sql" 6 + "errors" 7 + "fmt" 8 + "log/slog" 9 + "math/rand" 10 + "net/url" 11 + "os" 12 + "path" 13 + "sync" 14 + "time" 15 + 16 + "github.com/bluesky-social/indigo/atproto/syntax" 17 + "tangled.org/core/knotmirror/config" 18 + "tangled.org/core/knotmirror/db" 19 + "tangled.org/core/knotmirror/models" 20 + "tangled.org/core/log" 21 + ) 22 + 23 + type Resyncer struct { 24 + logger *slog.Logger 25 + db *sql.DB 26 + 27 + claimJobMu sync.Mutex 28 + 29 + repoBasePath string 30 + repoFetchTimeout time.Duration 31 + knotUseSSL bool 32 + 33 + parallelism int 34 + } 35 + 36 + func NewResyncer(l *slog.Logger, db *sql.DB, cfg *config.Config) *Resyncer { 37 + return &Resyncer{ 38 + logger: log.SubLogger(l, "resyncer"), 39 + db: db, 40 + repoBasePath: cfg.GitRepoBasePath, 41 + repoFetchTimeout: cfg.GitRepoFetchTimeout, 42 + knotUseSSL: cfg.KnotUseSSL, 43 + parallelism: cfg.ResyncParallelism, 44 + } 45 + } 46 + 47 + func (r *Resyncer) Start(ctx context.Context) { 48 + for i := 0; i < r.parallelism; i++ { 49 + go r.runResyncWorker(ctx, i) 50 + } 51 + } 52 + 53 + func (r *Resyncer) runResyncWorker(ctx context.Context, workerID int) { 54 + l := r.logger.With("worker", workerID) 55 + for { 56 + select { 57 + case <-ctx.Done(): 58 + l.Info("resync worker shutting down", "error", ctx.Err()) 59 + return 60 + default: 61 + } 62 + repoAt, found, err := r.claimResyncJob(ctx) 63 + if err != nil { 64 + l.Error("failed to claim resync job", "error", err) 65 + time.Sleep(time.Second) 66 + continue 67 + } 68 + if !found { 69 + time.Sleep(time.Second) 70 + continue 71 + } 72 + l.Info("processing resync", "aturi", repoAt) 73 + if err := r.resyncRepo(ctx, repoAt); err != nil { 74 + l.Error("resync failed", "aturi", repoAt, "error", err) 75 + } 76 + } 77 + } 78 + 79 + func (r *Resyncer) claimResyncJob(ctx context.Context) (syntax.ATURI, bool, error) { 80 + // use mutex to prevent duplicated jobs 81 + r.claimJobMu.Lock() 82 + defer r.claimJobMu.Unlock() 83 + 84 + var repoAt syntax.ATURI 85 + now := time.Now().Unix() 86 + if err := r.db.QueryRowContext(ctx, 87 + `update repos 88 + set state = ? 89 + where at_uri = ( 90 + select at_uri from repos 91 + where state in (?, ?, ?) 92 + and (retry_after = 0 or retry_after < ?) 93 + limit 1 94 + ) 95 + returning at_uri 96 + `, 97 + models.RepoStateResyncing, 98 + models.RepoStatePending, models.RepoStateDesynchronized, models.RepoStateError, 99 + now, 100 + ).Scan(&repoAt); err != nil { 101 + if errors.Is(err, sql.ErrNoRows) { 102 + return "", false, nil 103 + } 104 + return "", false, err 105 + } 106 + 107 + return repoAt, true, nil 108 + } 109 + 110 + func (r *Resyncer) resyncRepo(ctx context.Context, repoAt syntax.ATURI) error { 111 + // ctx, span := tracer.Start(ctx, "resyncRepo") 112 + // span.SetAttributes(attribute.String("aturi", repoAt)) 113 + // defer span.End() 114 + 115 + resyncsStarted.Inc() 116 + startTime := time.Now() 117 + 118 + success, err := r.doResync(ctx, repoAt) 119 + if !success { 120 + resyncsFailed.Inc() 121 + resyncDuration.Observe(time.Since(startTime).Seconds()) 122 + return r.handleResyncFailure(ctx, repoAt, err) 123 + } 124 + 125 + resyncsCompleted.Inc() 126 + resyncDuration.Observe(time.Since(startTime).Seconds()) 127 + return nil 128 + } 129 + 130 + func (r *Resyncer) doResync(ctx context.Context, repoAt syntax.ATURI) (bool, error) { 131 + // ctx, span := tracer.Start(ctx, "doResync") 132 + // span.SetAttributes(attribute.String("aturi", repoAt)) 133 + // defer span.End() 134 + 135 + repo, err := db.GetRepoByAtUri(ctx, r.db, repoAt) 136 + if err != nil { 137 + return false, fmt.Errorf("failed to get repo: %w", err) 138 + } 139 + if repo == nil { // untracked repo, skip 140 + return false, nil 141 + } 142 + 143 + repoPath := r.repoPath(repo) 144 + l := r.logger.With("repo", repo.DidSlashRepo(), "path", repoPath) 145 + 146 + remoteUrl, err := r.repoRemoteURL(repo) 147 + if err != nil { 148 + return false, fmt.Errorf("parsing knot url: %w", err) 149 + } 150 + l = l.With("url", remoteUrl) 151 + 152 + ctx, cancel := context.WithTimeout(ctx, r.repoFetchTimeout) 153 + defer cancel() 154 + 155 + // TODO: check if Knot is on backoff list. If so, return (false, nil) 156 + // TODO: use r.repoFetchTimeout on fetch 157 + // TODO: detect rate limit error (http.StatusTooManyRequests) to put Knot in backoff list 158 + 159 + // NOTE: using plain git-cli for clone/fetch as go-git is too memory-intensive. 160 + gitclient := &CliGitMirrorClient{} 161 + 162 + exist, err := isDir(repoPath) 163 + if err != nil { 164 + return false, fmt.Errorf("checking repo path: %w", err) 165 + } 166 + if !exist { 167 + if err := gitclient.Clone(ctx, repoPath, remoteUrl); err != nil { 168 + return false, err 169 + } 170 + } else { 171 + if err := gitclient.Fetch(ctx, repoPath, remoteUrl); err != nil { 172 + return false, err 173 + } 174 + } 175 + 176 + // repo.GitRev = <processed git.refUpdate revision> 177 + // repo.RepoSha = <sha256 sum of git refs> 178 + repo.State = models.RepoStateActive 179 + repo.ErrorMsg = "" 180 + repo.RetryCount = 0 181 + repo.RetryAfter = 0 182 + if err := db.UpsertRepo(ctx, r.db, repo); err != nil { 183 + return false, fmt.Errorf("updating repo state to active %w", err) 184 + } 185 + return true, nil 186 + } 187 + 188 + func (r *Resyncer) handleResyncFailure(ctx context.Context, repoAt syntax.ATURI, err error) error { 189 + r.logger.Debug("handleResyncFailure", "at_uri", repoAt, "err", err) 190 + var state models.RepoState 191 + var errMsg string 192 + if err == nil { 193 + state = models.RepoStateDesynchronized 194 + errMsg = "" 195 + } else { 196 + state = models.RepoStateError 197 + errMsg = err.Error() 198 + } 199 + 200 + repo, err := db.GetRepoByAtUri(ctx, r.db, repoAt) 201 + if err != nil { 202 + return fmt.Errorf("failed to get repo: %w", err) 203 + } 204 + if repo == nil { 205 + return fmt.Errorf("failed to get repo. repo '%s' doesn't exist in db", repoAt) 206 + } 207 + 208 + var retryCount = repo.RetryCount + 1 209 + var retryAfter int64 210 + if retryCount >= 10 { 211 + state = models.RepoStateSuspended 212 + errMsg = fmt.Sprintf("too many resync fails: %s", errMsg) 213 + retryAfter = 0 214 + } else { 215 + // start a 1 min & go up to 1 hr between retries 216 + retryAfter = time.Now().Add(backoff(retryCount, 60) * 60).Unix() 217 + } 218 + 219 + repo.State = state 220 + repo.ErrorMsg = errMsg 221 + repo.RetryCount = retryCount 222 + repo.RetryAfter = retryAfter 223 + if dbErr := db.UpsertRepo(ctx, r.db, repo); dbErr != nil { 224 + return dbErr 225 + } 226 + return err 227 + } 228 + 229 + func (r *Resyncer) repoPath(repo *models.Repo) string { 230 + return path.Join(r.repoBasePath, repo.Did.String(), repo.Rkey.String()) 231 + } 232 + 233 + func (r *Resyncer) repoRemoteURL(repo *models.Repo) (string, error) { 234 + u, err := url.Parse(repo.KnotDomain) 235 + if err != nil { 236 + return "", err 237 + } 238 + if u.Scheme == "" { 239 + if r.knotUseSSL { 240 + u.Scheme = "https" 241 + } else { 242 + u.Scheme = "http" 243 + } 244 + } 245 + u = u.JoinPath(repo.DidSlashRepo()) 246 + return u.String(), nil 247 + } 248 + 249 + func backoff(retries int, max int) time.Duration { 250 + dur := min(1<<retries, max) 251 + jitter := time.Millisecond * time.Duration(rand.Intn(1000)) 252 + return time.Second*time.Duration(dur) + jitter 253 + } 254 + 255 + func isDir(path string) (bool, error) { 256 + info, err := os.Stat(path) 257 + if err == nil && info.IsDir() { 258 + return true, nil 259 + } 260 + if os.IsNotExist(err) { 261 + return false, nil 262 + } 263 + return false, err 264 + }
+133
knotmirror/tapclient.go
··· 1 + package knotmirror 2 + 3 + import ( 4 + "context" 5 + "database/sql" 6 + "encoding/json" 7 + "fmt" 8 + "log/slog" 9 + "net/netip" 10 + "net/url" 11 + "time" 12 + 13 + "tangled.org/core/api/tangled" 14 + "tangled.org/core/knotmirror/config" 15 + "tangled.org/core/knotmirror/db" 16 + "tangled.org/core/knotmirror/knotstream" 17 + "tangled.org/core/knotmirror/models" 18 + "tangled.org/core/log" 19 + "tangled.org/core/tapc" 20 + ) 21 + 22 + type Tap struct { 23 + logger *slog.Logger 24 + cfg *config.Config 25 + tap tapc.Client 26 + db *sql.DB 27 + ks *knotstream.KnotStream 28 + } 29 + 30 + func NewTapClient(l *slog.Logger, cfg *config.Config, db *sql.DB, ks *knotstream.KnotStream) *Tap { 31 + return &Tap{ 32 + logger: log.SubLogger(l, "tapclient"), 33 + cfg: cfg, 34 + tap: tapc.NewClient(cfg.TapUrl, ""), 35 + db: db, 36 + ks: ks, 37 + } 38 + } 39 + 40 + func (t *Tap) Start(ctx context.Context) { 41 + // TODO: better reconnect logic 42 + go func() { 43 + for { 44 + t.tap.Connect(ctx, &tapc.SimpleIndexer{ 45 + EventHandler: t.processEvent, 46 + }) 47 + time.Sleep(time.Second) 48 + } 49 + }() 50 + } 51 + 52 + func (t *Tap) processEvent(ctx context.Context, evt tapc.Event) error { 53 + l := t.logger.With("component", "tapIndexer") 54 + 55 + var err error 56 + switch evt.Type { 57 + case tapc.EvtRecord: 58 + switch evt.Record.Collection.String() { 59 + case tangled.RepoNSID: 60 + err = t.processRepo(ctx, evt.Record) 61 + } 62 + } 63 + 64 + if err != nil { 65 + l.Error("failed to process message. will retry later", "event.ID", evt.ID, "err", err) 66 + return err 67 + } 68 + return nil 69 + } 70 + 71 + func (t *Tap) processRepo(ctx context.Context, evt *tapc.RecordEventData) error { 72 + switch evt.Action { 73 + case tapc.RecordCreateAction, tapc.RecordUpdateAction: 74 + record := tangled.Repo{} 75 + if err := json.Unmarshal(evt.Record, &record); err != nil { 76 + return fmt.Errorf("parsing record: %w", err) 77 + } 78 + 79 + status := models.RepoStatePending 80 + errMsg := "" 81 + u, err := url.Parse("http://"+record.Knot) // parsing with fake scheme 82 + if err != nil { 83 + status = models.RepoStateSuspended 84 + errMsg = "failed to parse knot url" 85 + } else if t.cfg.KnotSSRF && isPrivate(u.Hostname()) { 86 + status = models.RepoStateSuspended 87 + errMsg = "suspending non-public knot" 88 + } 89 + 90 + if err := db.UpsertRepo(ctx, t.db, &models.Repo{ 91 + Did: evt.Did, 92 + Rkey: evt.Rkey, 93 + Cid: evt.CID, 94 + Name: record.Name, 95 + KnotDomain: record.Knot, 96 + State: status, 97 + ErrorMsg: errMsg, 98 + }); err != nil { 99 + return fmt.Errorf("upserting repo to db: %w", err) 100 + } 101 + 102 + if !t.ks.CheckIfSubscribed(record.Knot) { 103 + if err := t.ks.SubscribeHost(ctx, record.Knot, !t.cfg.KnotUseSSL); err != nil { 104 + return fmt.Errorf("subscribing to knot: %w", err) 105 + } 106 + } 107 + 108 + case tapc.RecordDeleteAction: 109 + if err := db.DeleteRepo(ctx, t.db, evt.Did, evt.Rkey); err != nil { 110 + return fmt.Errorf("deleting repo from db: %w", err) 111 + } 112 + } 113 + return nil 114 + } 115 + 116 + // isPrivate checks if host is private network. It doesn't perform DNS resolution 117 + func isPrivate(host string) bool { 118 + if host == "localhost" { 119 + return true 120 + } 121 + addr, err := netip.ParseAddr(host) 122 + if err != nil { 123 + return false 124 + } 125 + return isPrivateAddr(addr) 126 + } 127 + 128 + func isPrivateAddr(addr netip.Addr) bool { 129 + return addr.IsLoopback() || 130 + addr.IsPrivate() || 131 + addr.IsLinkLocalUnicast() || 132 + addr.IsLinkLocalMulticast() 133 + }
+26
knotmirror/templates/base.html
··· 1 + {{define "base"}} 2 + <!DOCTYPE html> 3 + <html> 4 + <head> 5 + <title>KnotMirror Admin</title> 6 + <script src="https://cdn.jsdelivr.net/npm/htmx.org@2.0.8/dist/htmx.min.js" integrity="sha384-/TgkGk7p307TH7EXJDuUlgG3Ce1UVolAOFopFekQkkXihi5u/6OCvVKyz1W+idaz" crossorigin="anonymous"></script> 7 + <style> 8 + nav { margin-bottom: 20px; border-bottom: 1px solid #ccc; padding: 10px 0; } 9 + nav a { margin-right: 15px; } 10 + table { width: 100%; border-collapse: collapse; } 11 + th, td { text-align: left; padding: 8px; border: 1px solid #ddd; } 12 + .pagination { margin-top: 20px; } 13 + .filters { background: #f4f4f4; padding: 15px; margin-bottom: 20px; } 14 + </style> 15 + </head> 16 + <body> 17 + <nav> 18 + <a href="/repos">Repositories</a> 19 + <a href="/hosts">Knot Hosts</a> 20 + </nav> 21 + <main id="main"> 22 + {{template "content" .}} 23 + </main> 24 + </body> 25 + </html> 26 + {{end}}
+45
knotmirror/templates/hosts.html
··· 1 + {{template "base" .}} 2 + {{define "content"}} 3 + <h2>Knot Hosts</h2> 4 + 5 + <div class="filters"> 6 + <form 7 + hx-get="" 8 + hx-target="#table" 9 + hx-select="#table" 10 + hx-swap="outerHTML" 11 + hx-trigger="every 10s" 12 + > 13 + <select name="status"> 14 + <option value="">-- Statuse --</option> 15 + {{ range .FilterByStatus.AllStatuses }} 16 + <option value="{{.}}" {{ if eq $.FilterByStatus . }}selected{{end}}>{{.}}</option> 17 + {{ end }} 18 + </select> 19 + <button type="submit">Filter</button> 20 + </form> 21 + </div> 22 + 23 + <table id="table"> 24 + <thead> 25 + <tr> 26 + <th>Hostname</th> 27 + <th>SSL</th> 28 + <th>Status</th> 29 + <th>Last Seq</th> 30 + </tr> 31 + </thead> 32 + <tbody> 33 + {{range .Hosts}} 34 + <tr> 35 + <td>{{.Hostname}}</td> 36 + <td>{{if .NoSSL}}False{{else}}True{{end}}</td> 37 + <td>{{.Status}}</td> 38 + <td>{{.LastSeq}}</td> 39 + </tr> 40 + {{else}} 41 + <tr><td colspan="4">No hosts registered.</td></tr> 42 + {{end}} 43 + </tbody> 44 + </table> 45 + {{end}}
+71
knotmirror/templates/repos.html
··· 1 + {{template "base" .}} 2 + {{define "content"}} 3 + <h2>Repositories</h2> 4 + 5 + <div class="filters"> 6 + <form 7 + hx-get="" 8 + hx-target="#table" 9 + hx-select="#table" 10 + hx-swap="outerHTML" 11 + hx-trigger="every 10s" 12 + > 13 + <input type="text" name="did" placeholder="DID" value="{{.FilterByDid}}"> 14 + <input type="text" name="knot" placeholder="Knot Domain" value="{{.FilterByKnot}}"> 15 + <select name="state"> 16 + <option value="">-- State --</option> 17 + {{ range .FilterByState.AllStates }} 18 + <option value="{{.}}" {{ if eq $.FilterByState . }}selected{{end}}>{{.}}</option> 19 + {{ end }} 20 + </select> 21 + <button type="submit">Filter</button> 22 + <a href="/repos">Clear</a> 23 + </form> 24 + </div> 25 + 26 + <div id="table"> 27 + <div class="repo-state-indicators"> 28 + {{range .FilterByState.AllStates}} 29 + <span class="state-pill state-{{.}}"> 30 + {{.}}: {{index $.RepoCounts .}} 31 + </span> 32 + {{end}} 33 + </div> 34 + <table> 35 + <thead> 36 + <tr> 37 + <th>DID</th> 38 + <th>Name</th> 39 + <th>Knot</th> 40 + <th>State</th> 41 + <th>Retry</th> 42 + <th>Retry After</th> 43 + <th>Error Message</th> 44 + </tr> 45 + </thead> 46 + <tbody> 47 + {{range .Repos}} 48 + <tr> 49 + <td><code>{{.Did}}</code></td> 50 + <td>{{.Name}}</td> 51 + <td>{{.KnotDomain}}</td> 52 + <td><strong>{{.State}}</strong></td> 53 + <td>{{.RetryCount}}</td> 54 + <td>{{readt .RetryAfter}}</td> 55 + <td>{{.ErrorMsg}}</td> 56 + </tr> 57 + {{else}} 58 + <tr><td colspan="99">No repositories found.</td></tr> 59 + {{end}} 60 + </tbody> 61 + </table> 62 + </div> 63 + 64 + <div class="pagination"> 65 + {{if gt .Page 1}} 66 + <a href="?page={{sub .Page 1}}&did={{.FilterByDid}}&knot={{.FilterByKnot}}&state={{.FilterByState}}">« Previous</a> 67 + {{end}} 68 + <span>Page {{.Page}}</span> 69 + <a href="?page={{add .Page 1}}&did={{.FilterByDid}}&knot={{.FilterByKnot}}&state={{.FilterByState}}">Next »</a> 70 + </div> 71 + {{end}}
+17 -14
nix/gomod2nix.toml
··· 534 534 version = "v1.5.5" 535 535 hash = "sha256-ouhfDUCXsfpcgaCLfJE9oYprAQHuV61OJzb/aEhT0j8=" 536 536 [mod."github.com/prometheus/client_golang"] 537 - version = "v1.22.0" 538 - hash = "sha256-OJ/9rlWG1DIPQJAZUTzjykkX0o+f+4IKLvW8YityaMQ=" 537 + version = "v1.23.2" 538 + hash = "sha256-3GD4fBFa1tJu8MS4TNP6r2re2eViUE+kWUaieIOQXCg=" 539 539 [mod."github.com/prometheus/client_model"] 540 540 version = "v0.6.2" 541 541 hash = "sha256-q6Fh6v8iNJN9ypD47LjWmx66YITa3FyRjZMRsuRTFeQ=" 542 542 [mod."github.com/prometheus/common"] 543 - version = "v0.64.0" 544 - hash = "sha256-uy3KO60F2Cvhamz3fWQALGSsy13JiTk3NfpXgRuwqtI=" 543 + version = "v0.66.1" 544 + hash = "sha256-bqHPaV9IV70itx63wqwgy2PtxMN0sn5ThVxDmiD7+Tk=" 545 545 [mod."github.com/prometheus/procfs"] 546 546 version = "v0.16.1" 547 547 hash = "sha256-OBCvKlLW2obct35p0L9Q+1ZrxZjpTmbgHMP2rng9hpo=" ··· 577 577 version = "v0.0.0-20220730225603-2ab79fcdd4ef" 578 578 hash = "sha256-/XmSE/J+f6FLWXGvljh6uBK71uoCAK3h82XQEQ1Ki68=" 579 579 [mod."github.com/stretchr/testify"] 580 - version = "v1.10.0" 581 - hash = "sha256-fJ4gnPr0vnrOhjQYQwJ3ARDKPsOtA7d4olQmQWR+wpI=" 580 + version = "v1.11.1" 581 + hash = "sha256-sWfjkuKJyDllDEtnM8sb/pdLzPQmUYWYtmeWz/5suUc=" 582 582 [mod."github.com/urfave/cli/v3"] 583 583 version = "v3.4.1" 584 584 hash = "sha256-cDMaQrIVMthUhdyI1mKXzDC5/wIK151073lzRl92RnA=" ··· 654 654 [mod."go.uber.org/zap"] 655 655 version = "v1.27.0" 656 656 hash = "sha256-8655KDrulc4Das3VRduO9MjCn8ZYD5WkULjCvruaYsU=" 657 + [mod."go.yaml.in/yaml/v2"] 658 + version = "v2.4.2" 659 + hash = "sha256-oC8RWdf1zbMYCtmR0ATy/kCkhIwPR9UqFZSMOKLVF/A=" 657 660 [mod."golang.org/x/crypto"] 658 - version = "v0.40.0" 659 - hash = "sha256-I6p2fqvz63P9MwAuoQrljI7IUbfZQvCem0ii4Q2zZng=" 661 + version = "v0.41.0" 662 + hash = "sha256-o5Di0lsFmYnXl7a5MBTqmN9vXMCRpE9ay71C1Ar8jEY=" 660 663 [mod."golang.org/x/exp"] 661 664 version = "v0.0.0-20250620022241-b7579e27df2b" 662 665 hash = "sha256-IsDTeuWLj4UkPO4NhWTvFeZ22WNtlxjoWiyAJh6zdig=" ··· 664 667 version = "v0.31.0" 665 668 hash = "sha256-ZFTlu9+4QToPPLA8C5UcG2eq/lQylq81RoG/WtYo9rg=" 666 669 [mod."golang.org/x/net"] 667 - version = "v0.42.0" 668 - hash = "sha256-YxileisIIez+kcAI+21kY5yk0iRuEqti2YdmS8jvP2s=" 670 + version = "v0.43.0" 671 + hash = "sha256-bf3iQFrsC8BoarVaS0uSspEFAcr1zHp1uziTtBpwV34=" 669 672 [mod."golang.org/x/sync"] 670 673 version = "v0.17.0" 671 674 hash = "sha256-M85lz4hK3/fzmcUViAp/CowHSxnr3BHSO7pjHp1O6i0=" 672 675 [mod."golang.org/x/sys"] 673 - version = "v0.34.0" 674 - hash = "sha256-5rZ7p8IaGli5X1sJbfIKOcOEwY4c0yQhinJPh2EtK50=" 676 + version = "v0.35.0" 677 + hash = "sha256-ZKM8pesQE6NAFZeKQ84oPn5JMhGr8g4TSwLYAsHMGSI=" 675 678 [mod."golang.org/x/text"] 676 679 version = "v0.29.0" 677 680 hash = "sha256-2cWBtJje+Yc+AnSgCANqBlIwnOMZEGkpQ2cFI45VfLI=" ··· 691 694 version = "v1.73.0" 692 695 hash = "sha256-LfVlwip++q2DX70RU6CxoXglx1+r5l48DwlFD05G11c=" 693 696 [mod."google.golang.org/protobuf"] 694 - version = "v1.36.6" 695 - hash = "sha256-lT5qnefI5FDJnowz9PEkAGylH3+fE+A3DJDkAyy9RMc=" 697 + version = "v1.36.8" 698 + hash = "sha256-yZN8ZON0b5HjUNUSubHst7zbvnMsOzd81tDPYQRtPgM=" 696 699 [mod."gopkg.in/fsnotify.v1"] 697 700 version = "v1.4.7" 698 701 hash = "sha256-j/Ts92oXa3k1MFU7Yd8/AqafRTsFn7V2pDKCyDJLah8="
+23
nix/pkgs/knot-mirror.nix
··· 1 + { 2 + buildGoApplication, 3 + modules, 4 + sqlite-lib, 5 + src, 6 + }: 7 + buildGoApplication { 8 + pname = "knotmirror"; 9 + version = "0.1.0"; 10 + inherit src modules; 11 + 12 + doCheck = false; 13 + 14 + subPackages = ["cmd/knotmirror"]; 15 + tags = ["libsqlite3"]; 16 + 17 + env.CGO_CFLAGS = "-I ${sqlite-lib}/include "; 18 + env.CGO_LDFLAGS = "-L ${sqlite-lib}/lib"; 19 + CGO_ENABLED = 1; 20 + meta = { 21 + mainProgram = "knotmirror"; 22 + }; 23 + }