Monorepo for Tangled tangled.org

nix,spindle: sync workflow files on sh.tangled.git.refUpdate #984

open opened by boltless.me targeting master from sl/spindle-rewrite
Labels

None yet.

assignee

None yet.

Participants 1
AT URI
at://did:plc:xasnlahkri4ewmbuzly2rlc5/sh.tangled.repo.pull/3mckguakfc422
+165 -7
Diff #4
+1
go.mod
··· 29 29 github.com/gorilla/feeds v1.2.0 30 30 github.com/gorilla/sessions v1.4.0 31 31 github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 32 + github.com/hashicorp/go-version v1.8.0 32 33 github.com/hiddeco/sshsig v0.2.0 33 34 github.com/hpcloud/tail v1.0.0 34 35 github.com/ipfs/go-cid v0.5.0
+2
go.sum
··· 296 296 github.com/hashicorp/go-secure-stdlib/strutil v0.1.2/go.mod h1:Gou2R9+il93BqX25LAKCLuM+y9U2T4hlwvT1yprcna4= 297 297 github.com/hashicorp/go-sockaddr v1.0.7 h1:G+pTkSO01HpR5qCxg7lxfsFEZaG+C0VssTy/9dbT+Fw= 298 298 github.com/hashicorp/go-sockaddr v1.0.7/go.mod h1:FZQbEYa1pxkQ7WLpyXJ6cbjpT8q0YgQaK/JakXqGyWw= 299 + github.com/hashicorp/go-version v1.8.0 h1:KAkNb1HAiZd1ukkxDFGmokVZe1Xy9HG6NUp+bPle2i4= 300 + github.com/hashicorp/go-version v1.8.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= 299 301 github.com/hashicorp/golang-lru v1.0.2 h1:dV3g9Z/unq5DpblPpw+Oqcv4dU/1omnb4Ok8iPY6p1c= 300 302 github.com/hashicorp/golang-lru v1.0.2/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= 301 303 github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
+3
nix/gomod2nix.toml
··· 314 314 [mod."github.com/hashicorp/go-sockaddr"] 315 315 version = "v1.0.7" 316 316 hash = "sha256-p6eDOrGzN1jMmT/F/f/VJMq0cKNFhUcEuVVwTE6vSrs=" 317 + [mod."github.com/hashicorp/go-version"] 318 + version = "v1.8.0" 319 + hash = "sha256-KXtqERmYrWdpqPCViWcHbe6jnuH7k16bvBIcuJuevj8=" 317 320 [mod."github.com/hashicorp/golang-lru"] 318 321 version = "v1.0.2" 319 322 hash = "sha256-yy+5botc6T5wXgOe2mfNXJP3wr+MkVlUZ2JBkmmrA48="
+4
nix/modules/spindle.nix
··· 1 1 { 2 2 config, 3 + pkgs, 3 4 lib, 4 5 ... 5 6 }: let ··· 132 133 description = "spindle service"; 133 134 after = ["network.target" "docker.service"]; 134 135 wantedBy = ["multi-user.target"]; 136 + path = [ 137 + pkgs.git 138 + ]; 135 139 serviceConfig = { 136 140 LogsDirectory = "spindle"; 137 141 StateDirectory = "spindle";
+4
spindle/config/config.go
··· 29 29 return syntax.DID(fmt.Sprintf("did:web:%s", s.Hostname)) 30 30 } 31 31 32 + func (s Server) RepoDir() string { 33 + return filepath.Join(s.DataDir, "repos") 34 + } 35 + 32 36 func (s Server) DBPath() string { 33 37 return filepath.Join(s.DataDir, "spindle.db") 34 38 }
+73
spindle/git/git.go
··· 1 + package git 2 + 3 + import ( 4 + "bytes" 5 + "context" 6 + "fmt" 7 + "os" 8 + "os/exec" 9 + "strings" 10 + 11 + "github.com/hashicorp/go-version" 12 + ) 13 + 14 + func Version() (*version.Version, error) { 15 + var buf bytes.Buffer 16 + cmd := exec.Command("git", "version") 17 + cmd.Stdout = &buf 18 + cmd.Stderr = os.Stderr 19 + err := cmd.Run() 20 + if err != nil { 21 + return nil, err 22 + } 23 + fields := strings.Fields(buf.String()) 24 + if len(fields) < 3 { 25 + return nil, fmt.Errorf("invalid git version: %s", buf.String()) 26 + } 27 + 28 + // version string is like: "git version 2.29.3" or "git version 2.29.3.windows.1" 29 + versionString := fields[2] 30 + if pos := strings.Index(versionString, "windows"); pos >= 1 { 31 + versionString = versionString[:pos-1] 32 + } 33 + return version.NewVersion(versionString) 34 + } 35 + 36 + const WorkflowDir = `/.tangled/workflows` 37 + 38 + func SparseSyncGitRepo(ctx context.Context, cloneUri, path, rev string) error { 39 + exist, err := isDir(path) 40 + if err != nil { 41 + return err 42 + } 43 + if rev == "" { 44 + rev = "HEAD" 45 + } 46 + if !exist { 47 + if err := exec.Command("git", "clone", "--no-checkout", "--depth=1", "--filter=tree:0", "--revision="+rev, cloneUri, path).Run(); err != nil { 48 + return fmt.Errorf("git clone: %w", err) 49 + } 50 + if err := exec.Command("git", "-C", path, "sparse-checkout", "set", "--no-cone", WorkflowDir).Run(); err != nil { 51 + return fmt.Errorf("git sparse-checkout set: %w", err) 52 + } 53 + } else { 54 + if err := exec.Command("git", "-C", path, "fetch", "--depth=1", "--filter=tree:0", "origin", rev).Run(); err != nil { 55 + return fmt.Errorf("git pull: %w", err) 56 + } 57 + } 58 + if err := exec.Command("git", "-C", path, "checkout", rev).Run(); err != nil { 59 + return fmt.Errorf("git checkout: %w", err) 60 + } 61 + return nil 62 + } 63 + 64 + func isDir(path string) (bool, error) { 65 + info, err := os.Stat(path) 66 + if err == nil && info.IsDir() { 67 + return true, nil 68 + } 69 + if os.IsNotExist(err) { 70 + return false, nil 71 + } 72 + return false, err 73 + }
+66 -5
spindle/server.go
··· 8 8 "log/slog" 9 9 "maps" 10 10 "net/http" 11 + "path/filepath" 11 12 "sync" 12 13 "time" 13 14 14 15 "github.com/bluesky-social/indigo/atproto/syntax" 15 16 "github.com/bluesky-social/indigo/service/tap" 16 17 "github.com/go-chi/chi/v5" 18 + "github.com/hashicorp/go-version" 17 19 "tangled.org/core/api/tangled" 18 20 "tangled.org/core/eventconsumer" 19 21 "tangled.org/core/eventconsumer/cursor" ··· 25 27 "tangled.org/core/spindle/db" 26 28 "tangled.org/core/spindle/engine" 27 29 "tangled.org/core/spindle/engines/nixery" 30 + "tangled.org/core/spindle/git" 28 31 "tangled.org/core/spindle/models" 29 32 "tangled.org/core/spindle/queue" 30 33 "tangled.org/core/spindle/secrets" ··· 56 59 func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) { 57 60 logger := log.FromContext(ctx) 58 61 59 - d, err := db.Make(ctx, cfg.Server.DBPath) 62 + if err := ensureGitVersion(); err != nil { 63 + return nil, fmt.Errorf("ensuring git version: %w", err) 64 + } 65 + 66 + d, err := db.Make(ctx, cfg.Server.DBPath()) 60 67 if err != nil { 61 68 return nil, fmt.Errorf("failed to setup db: %w", err) 62 69 } 63 70 64 - e, err := rbac2.NewEnforcer(cfg.Server.DBPath) 71 + e, err := rbac2.NewEnforcer(cfg.Server.DBPath()) 65 72 if err != nil { 66 73 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err) 67 74 } ··· 84 91 } 85 92 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount) 86 93 case "sqlite", "": 87 - vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets")) 94 + vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath(), secrets.WithTableName("secrets")) 88 95 if err != nil { 89 96 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err) 90 97 } 91 - logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath) 98 + logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath()) 92 99 default: 93 100 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider) 94 101 } ··· 120 127 } 121 128 logger.Info("owner set", "did", cfg.Server.Owner) 122 129 123 - cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath) 130 + cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath()) 124 131 if err != nil { 125 132 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 126 133 } ··· 310 317 } 311 318 312 319 func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 320 + l := log.FromContext(ctx).With("handler", "processKnotStream") 321 + l = l.With("src", src.Key(), "msg.Nsid", msg.Nsid, "msg.Rkey", msg.Rkey) 313 322 if msg.Nsid == tangled.PipelineNSID { 323 + return nil 314 324 tpl := tangled.Pipeline{} 315 325 err := json.Unmarshal(msg.EventJson, &tpl) 316 326 if err != nil { ··· 411 421 } else { 412 422 s.l.Error("failed to enqueue pipeline: queue is full") 413 423 } 424 + } else if msg.Nsid == tangled.GitRefUpdateNSID { 425 + event := tangled.GitRefUpdate{} 426 + if err := json.Unmarshal(msg.EventJson, &event); err != nil { 427 + l.Error("error unmarshalling", "err", err) 428 + return err 429 + } 430 + l = l.With("repoDid", event.RepoDid, "repoName", event.RepoName) 431 + 432 + // resolve repo name to rkey 433 + // TODO: git.refUpdate should respond with rkey instead of repo name 434 + repo, err := s.db.GetRepoWithName(syntax.DID(event.RepoDid), event.RepoName) 435 + if err != nil { 436 + return fmt.Errorf("get repo with did and name (%s/%s): %w", event.RepoDid, event.RepoName, err) 437 + } 438 + 439 + // NOTE: we are blindly trusting the knot that it will return only repos it own 440 + repoCloneUri := s.newRepoCloneUrl(src.Key(), event.RepoDid, event.RepoName) 441 + repoPath := s.newRepoPath(repo.Did, repo.Rkey) 442 + if err := git.SparseSyncGitRepo(ctx, repoCloneUri, repoPath, event.NewSha); err != nil { 443 + return fmt.Errorf("sync git repo: %w", err) 444 + } 445 + l.Info("synced git repo") 446 + 447 + // TODO: plan the pipeline 448 + } 449 + 450 + return nil 451 + } 452 + 453 + // newRepoPath creates a path to store repository by its did and rkey. 454 + // The path format would be: `/data/repos/did:plc:foo/sh.tangled.repo/repo-rkey 455 + func (s *Spindle) newRepoPath(did syntax.DID, rkey syntax.RecordKey) string { 456 + return filepath.Join(s.cfg.Server.RepoDir(), did.String(), tangled.RepoNSID, rkey.String()) 457 + } 458 + 459 + func (s *Spindle) newRepoCloneUrl(knot, did, name string) string { 460 + scheme := "https://" 461 + if s.cfg.Server.Dev { 462 + scheme = "http://" 414 463 } 464 + return fmt.Sprintf("%s%s/%s/%s", scheme, knot, did, name) 465 + } 466 + 467 + const RequiredVersion = "2.49.0" 415 468 469 + func ensureGitVersion() error { 470 + v, err := git.Version() 471 + if err != nil { 472 + return fmt.Errorf("fetching git version: %w", err) 473 + } 474 + if v.LessThan(version.Must(version.NewVersion(RequiredVersion))) { 475 + return fmt.Errorf("installed git version %q is not supported, Spindle requires git version >= %q", v, RequiredVersion) 476 + } 416 477 return nil 417 478 }
+12 -2
spindle/tap.go
··· 10 10 "tangled.org/core/api/tangled" 11 11 "tangled.org/core/eventconsumer" 12 12 "tangled.org/core/spindle/db" 13 + "tangled.org/core/spindle/git" 13 14 "tangled.org/core/tapc" 14 15 ) 15 16 ··· 225 226 return nil 226 227 } 227 228 228 - if err := s.db.PutRepo(&db.Repo{ 229 + repo := &db.Repo{ 229 230 Did: evt.Record.Did, 230 231 Rkey: evt.Record.Rkey, 231 232 Name: record.Name, 232 233 Knot: record.Knot, 233 - }); err != nil { 234 + } 235 + 236 + if err := s.db.PutRepo(repo); err != nil { 234 237 return fmt.Errorf("adding repo to db: %w", err) 235 238 } 236 239 ··· 242 245 src := eventconsumer.NewKnotSource(record.Knot) 243 246 s.ks.AddSource(context.Background(), src) 244 247 248 + // setup sparse sync 249 + repoCloneUri := s.newRepoCloneUrl(repo.Knot, repo.Did.String(), repo.Name) 250 + repoPath := s.newRepoPath(repo.Did, repo.Rkey) 251 + if err := git.SparseSyncGitRepo(ctx, repoCloneUri, repoPath, ""); err != nil { 252 + return fmt.Errorf("setting up sparse-clone git repo: %w", err) 253 + } 254 + 245 255 l.Info("added repo", "repo", evt.Record.AtUri()) 246 256 return nil 247 257

History

5 rounds 0 comments
sign up or login to add to the discussion
1 commit
expand
nix,spindle: sync workflow files on sh.tangled.git.refUpdate
1/3 failed, 2/3 success
expand
merge conflicts detected
expand
  • appview/state/state.go:35
  • go.mod:9
  • go.sum:483
expand 0 comments
1 commit
expand
nix,spindle: sync workflow files on sh.tangled.git.refUpdate
2/3 failed, 1/3 success
expand
expand 0 comments
1 commit
expand
nix,spindle: sync workflow files on sh.tangled.git.refUpdate
1/3 failed, 2/3 success
expand
expand 0 comments
1 commit
expand
nix,spindle: sync workflow files on sh.tangled.git.refUpdate
1/3 failed, 2/3 success
expand
expand 0 comments
1 commit
expand
nix,spindle: sync workflow files on sh.tangled.git.refUpdate
1/3 failed, 2/3 success
expand
expand 0 comments