+1
go.mod
+1
go.mod
+2
go.sum
+2
go.sum
···
264
github.com/hashicorp/go-secure-stdlib/strutil v0.1.2/go.mod h1:Gou2R9+il93BqX25LAKCLuM+y9U2T4hlwvT1yprcna4=
265
github.com/hashicorp/go-sockaddr v1.0.7 h1:G+pTkSO01HpR5qCxg7lxfsFEZaG+C0VssTy/9dbT+Fw=
266
github.com/hashicorp/go-sockaddr v1.0.7/go.mod h1:FZQbEYa1pxkQ7WLpyXJ6cbjpT8q0YgQaK/JakXqGyWw=
267
github.com/hashicorp/golang-lru v1.0.2 h1:dV3g9Z/unq5DpblPpw+Oqcv4dU/1omnb4Ok8iPY6p1c=
268
github.com/hashicorp/golang-lru v1.0.2/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
269
github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
···
264
github.com/hashicorp/go-secure-stdlib/strutil v0.1.2/go.mod h1:Gou2R9+il93BqX25LAKCLuM+y9U2T4hlwvT1yprcna4=
265
github.com/hashicorp/go-sockaddr v1.0.7 h1:G+pTkSO01HpR5qCxg7lxfsFEZaG+C0VssTy/9dbT+Fw=
266
github.com/hashicorp/go-sockaddr v1.0.7/go.mod h1:FZQbEYa1pxkQ7WLpyXJ6cbjpT8q0YgQaK/JakXqGyWw=
267
+
github.com/hashicorp/go-version v1.8.0 h1:KAkNb1HAiZd1ukkxDFGmokVZe1Xy9HG6NUp+bPle2i4=
268
+
github.com/hashicorp/go-version v1.8.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA=
269
github.com/hashicorp/golang-lru v1.0.2 h1:dV3g9Z/unq5DpblPpw+Oqcv4dU/1omnb4Ok8iPY6p1c=
270
github.com/hashicorp/golang-lru v1.0.2/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
271
github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
+3
nix/gomod2nix.toml
+3
nix/gomod2nix.toml
···
304
[mod."github.com/hashicorp/go-sockaddr"]
305
version = "v1.0.7"
306
hash = "sha256-p6eDOrGzN1jMmT/F/f/VJMq0cKNFhUcEuVVwTE6vSrs="
307
+
[mod."github.com/hashicorp/go-version"]
308
+
version = "v1.8.0"
309
+
hash = "sha256-KXtqERmYrWdpqPCViWcHbe6jnuH7k16bvBIcuJuevj8="
310
[mod."github.com/hashicorp/golang-lru"]
311
version = "v1.0.2"
312
hash = "sha256-yy+5botc6T5wXgOe2mfNXJP3wr+MkVlUZ2JBkmmrA48="
+4
nix/modules/spindle.nix
+4
nix/modules/spindle.nix
···
1
{
2
config,
3
+
pkgs,
4
lib,
5
...
6
}: let
···
146
description = "spindle service";
147
after = ["network.target" "docker.service" "spindle-tap.service"];
148
wantedBy = ["multi-user.target"];
149
+
path = [
150
+
pkgs.git
151
+
];
152
serviceConfig = {
153
LogsDirectory = "spindle";
154
StateDirectory = "spindle";
+4
spindle/config/config.go
+4
spindle/config/config.go
+73
spindle/git/git.go
+73
spindle/git/git.go
···
···
1
+
package git
2
+
3
+
import (
4
+
"bytes"
5
+
"context"
6
+
"fmt"
7
+
"os"
8
+
"os/exec"
9
+
"strings"
10
+
11
+
"github.com/hashicorp/go-version"
12
+
)
13
+
14
+
func Version() (*version.Version, error) {
15
+
var buf bytes.Buffer
16
+
cmd := exec.Command("git", "version")
17
+
cmd.Stdout = &buf
18
+
cmd.Stderr = os.Stderr
19
+
err := cmd.Run()
20
+
if err != nil {
21
+
return nil, err
22
+
}
23
+
fields := strings.Fields(buf.String())
24
+
if len(fields) < 3 {
25
+
return nil, fmt.Errorf("invalid git version: %s", buf.String())
26
+
}
27
+
28
+
// version string is like: "git version 2.29.3" or "git version 2.29.3.windows.1"
29
+
versionString := fields[2]
30
+
if pos := strings.Index(versionString, "windows"); pos >= 1 {
31
+
versionString = versionString[:pos-1]
32
+
}
33
+
return version.NewVersion(versionString)
34
+
}
35
+
36
+
const WorkflowDir = `/.tangled/workflows`
37
+
38
+
func SparseSyncGitRepo(ctx context.Context, cloneUri, path, rev string) error {
39
+
exist, err := isDir(path)
40
+
if err != nil {
41
+
return err
42
+
}
43
+
if rev == "" {
44
+
rev = "HEAD"
45
+
}
46
+
if !exist {
47
+
if err := exec.Command("git", "clone", "--no-checkout", "--depth=1", "--filter=tree:0", "--revision="+rev, cloneUri, path).Run(); err != nil {
48
+
return fmt.Errorf("git clone: %w", err)
49
+
}
50
+
if err := exec.Command("git", "-C", path, "sparse-checkout", "set", "--no-cone", WorkflowDir).Run(); err != nil {
51
+
return fmt.Errorf("git sparse-checkout set: %w", err)
52
+
}
53
+
} else {
54
+
if err := exec.Command("git", "-C", path, "fetch", "--depth=1", "--filter=tree:0", "origin", rev).Run(); err != nil {
55
+
return fmt.Errorf("git pull: %w", err)
56
+
}
57
+
}
58
+
if err := exec.Command("git", "-C", path, "checkout", rev).Run(); err != nil {
59
+
return fmt.Errorf("git checkout: %w", err)
60
+
}
61
+
return nil
62
+
}
63
+
64
+
func isDir(path string) (bool, error) {
65
+
info, err := os.Stat(path)
66
+
if err == nil && info.IsDir() {
67
+
return true, nil
68
+
}
69
+
if os.IsNotExist(err) {
70
+
return false, nil
71
+
}
72
+
return false, err
73
+
}
+66
-5
spindle/server.go
+66
-5
spindle/server.go
···
8
"log/slog"
9
"maps"
10
"net/http"
11
12
"github.com/bluesky-social/indigo/atproto/syntax"
13
"github.com/go-chi/chi/v5"
14
"tangled.org/core/api/tangled"
15
"tangled.org/core/eventconsumer"
16
"tangled.org/core/eventconsumer/cursor"
···
22
"tangled.org/core/spindle/db"
23
"tangled.org/core/spindle/engine"
24
"tangled.org/core/spindle/engines/nixery"
25
"tangled.org/core/spindle/models"
26
"tangled.org/core/spindle/queue"
27
"tangled.org/core/spindle/secrets"
···
51
func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) {
52
logger := log.FromContext(ctx)
53
54
-
d, err := db.Make(ctx, cfg.Server.DBPath)
55
if err != nil {
56
return nil, fmt.Errorf("failed to setup db: %w", err)
57
}
58
59
-
e, err := rbac2.NewEnforcer(cfg.Server.DBPath)
60
if err != nil {
61
return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err)
62
}
···
79
}
80
logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount)
81
case "sqlite", "":
82
-
vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets"))
83
if err != nil {
84
return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err)
85
}
86
-
logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath)
87
default:
88
return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider)
89
}
···
114
}
115
logger.Info("owner set", "did", cfg.Server.Owner)
116
117
-
cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath)
118
if err != nil {
119
return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
120
}
···
257
}
258
259
func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error {
260
if msg.Nsid == tangled.PipelineNSID {
261
tpl := tangled.Pipeline{}
262
err := json.Unmarshal(msg.EventJson, &tpl)
263
if err != nil {
···
358
} else {
359
s.l.Error("failed to enqueue pipeline: queue is full")
360
}
361
}
362
363
return nil
364
}
···
8
"log/slog"
9
"maps"
10
"net/http"
11
+
"path/filepath"
12
13
"github.com/bluesky-social/indigo/atproto/syntax"
14
"github.com/go-chi/chi/v5"
15
+
"github.com/hashicorp/go-version"
16
"tangled.org/core/api/tangled"
17
"tangled.org/core/eventconsumer"
18
"tangled.org/core/eventconsumer/cursor"
···
24
"tangled.org/core/spindle/db"
25
"tangled.org/core/spindle/engine"
26
"tangled.org/core/spindle/engines/nixery"
27
+
"tangled.org/core/spindle/git"
28
"tangled.org/core/spindle/models"
29
"tangled.org/core/spindle/queue"
30
"tangled.org/core/spindle/secrets"
···
54
func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) {
55
logger := log.FromContext(ctx)
56
57
+
if err := ensureGitVersion(); err != nil {
58
+
return nil, fmt.Errorf("ensuring git version: %w", err)
59
+
}
60
+
61
+
d, err := db.Make(ctx, cfg.Server.DBPath())
62
if err != nil {
63
return nil, fmt.Errorf("failed to setup db: %w", err)
64
}
65
66
+
e, err := rbac2.NewEnforcer(cfg.Server.DBPath())
67
if err != nil {
68
return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err)
69
}
···
86
}
87
logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount)
88
case "sqlite", "":
89
+
vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath(), secrets.WithTableName("secrets"))
90
if err != nil {
91
return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err)
92
}
93
+
logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath())
94
default:
95
return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider)
96
}
···
121
}
122
logger.Info("owner set", "did", cfg.Server.Owner)
123
124
+
cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath())
125
if err != nil {
126
return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
127
}
···
264
}
265
266
func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error {
267
+
l := log.FromContext(ctx).With("handler", "processKnotStream")
268
+
l = l.With("src", src.Key(), "msg.Nsid", msg.Nsid, "msg.Rkey", msg.Rkey)
269
if msg.Nsid == tangled.PipelineNSID {
270
+
return nil
271
tpl := tangled.Pipeline{}
272
err := json.Unmarshal(msg.EventJson, &tpl)
273
if err != nil {
···
368
} else {
369
s.l.Error("failed to enqueue pipeline: queue is full")
370
}
371
+
} else if msg.Nsid == tangled.GitRefUpdateNSID {
372
+
event := tangled.GitRefUpdate{}
373
+
if err := json.Unmarshal(msg.EventJson, &event); err != nil {
374
+
l.Error("error unmarshalling", "err", err)
375
+
return err
376
+
}
377
+
l = l.With("repoDid", event.RepoDid, "repoName", event.RepoName)
378
+
379
+
// resolve repo name to rkey
380
+
// TODO: git.refUpdate should respond with rkey instead of repo name
381
+
repo, err := s.db.GetRepoWithName(syntax.DID(event.RepoDid), event.RepoName)
382
+
if err != nil {
383
+
return fmt.Errorf("get repo with did and name (%s/%s): %w", event.RepoDid, event.RepoName, err)
384
+
}
385
+
386
+
// NOTE: we are blindly trusting the knot that it will return only repos it own
387
+
repoCloneUri := s.newRepoCloneUrl(src.Key(), event.RepoDid, event.RepoName)
388
+
repoPath := s.newRepoPath(repo.Did, repo.Rkey)
389
+
if err := git.SparseSyncGitRepo(ctx, repoCloneUri, repoPath, event.NewSha); err != nil {
390
+
return fmt.Errorf("sync git repo: %w", err)
391
+
}
392
+
l.Info("synced git repo")
393
+
394
+
// TODO: plan the pipeline
395
}
396
397
return nil
398
}
399
+
400
+
// newRepoPath creates a path to store repository by its did and rkey.
401
+
// The path format would be: `/data/repos/did:plc:foo/sh.tangled.repo/repo-rkey
402
+
func (s *Spindle) newRepoPath(did syntax.DID, rkey syntax.RecordKey) string {
403
+
return filepath.Join(s.cfg.Server.RepoDir(), did.String(), tangled.RepoNSID, rkey.String())
404
+
}
405
+
406
+
func (s *Spindle) newRepoCloneUrl(knot, did, name string) string {
407
+
scheme := "https://"
408
+
if s.cfg.Server.Dev {
409
+
scheme = "http://"
410
+
}
411
+
return fmt.Sprintf("%s%s/%s/%s", scheme, knot, did, name)
412
+
}
413
+
414
+
const RequiredVersion = "2.49.0"
415
+
416
+
func ensureGitVersion() error {
417
+
v, err := git.Version()
418
+
if err != nil {
419
+
return fmt.Errorf("fetching git version: %w", err)
420
+
}
421
+
if v.LessThan(version.Must(version.NewVersion(RequiredVersion))) {
422
+
return fmt.Errorf("installed git version %q is not supported, Spindle requires git version >= %q", v, RequiredVersion)
423
+
}
424
+
return nil
425
+
}
+12
-2
spindle/tap.go
+12
-2
spindle/tap.go
···
10
"tangled.org/core/api/tangled"
11
"tangled.org/core/eventconsumer"
12
"tangled.org/core/spindle/db"
13
"tangled.org/core/tap"
14
)
15
···
225
return nil
226
}
227
228
-
if err := s.db.PutRepo(&db.Repo{
229
Did: evt.Record.Did,
230
Rkey: evt.Record.Rkey,
231
Name: record.Name,
232
Knot: record.Knot,
233
-
}); err != nil {
234
return fmt.Errorf("adding repo to db: %w", err)
235
}
236
···
241
// add this knot to the event consumer
242
src := eventconsumer.NewKnotSource(record.Knot)
243
s.ks.AddSource(context.Background(), src)
244
245
l.Info("added repo", "repo", evt.Record.AtUri())
246
return nil
···
10
"tangled.org/core/api/tangled"
11
"tangled.org/core/eventconsumer"
12
"tangled.org/core/spindle/db"
13
+
"tangled.org/core/spindle/git"
14
"tangled.org/core/tap"
15
)
16
···
226
return nil
227
}
228
229
+
repo := &db.Repo{
230
Did: evt.Record.Did,
231
Rkey: evt.Record.Rkey,
232
Name: record.Name,
233
Knot: record.Knot,
234
+
}
235
+
236
+
if err := s.db.PutRepo(repo); err != nil {
237
return fmt.Errorf("adding repo to db: %w", err)
238
}
239
···
244
// add this knot to the event consumer
245
src := eventconsumer.NewKnotSource(record.Knot)
246
s.ks.AddSource(context.Background(), src)
247
+
248
+
// setup sparse sync
249
+
repoCloneUri := s.newRepoCloneUrl(repo.Knot, repo.Did.String(), repo.Name)
250
+
repoPath := s.newRepoPath(repo.Did, repo.Rkey)
251
+
if err := git.SparseSyncGitRepo(ctx, repoCloneUri, repoPath, ""); err != nil {
252
+
return fmt.Errorf("setting up sparse-clone git repo: %w", err)
253
+
}
254
255
l.Info("added repo", "repo", evt.Record.AtUri())
256
return nil