this repo has no description

nix,spindle: switch to tap from jetstream

spindle-tap will collect/stream record events from:
- users dynamically added by spindle (spindle members | collaborators of
repos using spindle)
- any users with `sh.tangled.repo.pull` collection

It might be bit inefficient considering it will also stream repo
creation events from PR authors due to second rule, but at least we now
have backfill logic and Sync 1.1 based syncing.

This inefficiency can be fixed later by modifying upstream tap cli or
embedding tap into spindle.

```
+--------- all tangled users --------+
| |
| +-- users known to spindle-tap --+ |
| | (PR author / manually added) | |
| | | |
| | +----------------------------+ | |
| | | users known to spindle | | |
| | | (members / collaborators) | | |
| | +----------------------------+ | |
| +--------------------------------+ |
+------------------------------------+
```

Close: <https://tangled.org/tangled.org/core/issues/341>

Signed-off-by: Seongmin Lee <git@boltless.me>

Changed files
+478 -395
nix
spindle
+2 -8
nix/modules/spindle.nix
··· 53 description = "atproto PLC directory"; 54 }; 55 56 - jetstreamEndpoint = mkOption { 57 - type = types.str; 58 - default = "wss://jetstream1.us-west.bsky.network/subscribe"; 59 - description = "Jetstream endpoint to subscribe to"; 60 - }; 61 - 62 dev = mkOption { 63 type = types.bool; 64 default = false; ··· 149 150 systemd.services.spindle = { 151 description = "spindle service"; 152 - after = ["network.target" "docker.service"]; 153 wantedBy = ["multi-user.target"]; 154 serviceConfig = { 155 LogsDirectory = "spindle"; ··· 159 "SPINDLE_SERVER_DB_PATH=${cfg.server.dbPath}" 160 "SPINDLE_SERVER_HOSTNAME=${cfg.server.hostname}" 161 "SPINDLE_SERVER_PLC_URL=${cfg.server.plcUrl}" 162 - "SPINDLE_SERVER_JETSTREAM_ENDPOINT=${cfg.server.jetstreamEndpoint}" 163 "SPINDLE_SERVER_DEV=${lib.boolToString cfg.server.dev}" 164 "SPINDLE_SERVER_OWNER=${cfg.server.owner}" 165 "SPINDLE_SERVER_MAX_JOB_COUNT=${toString cfg.server.maxJobCount}" ··· 167 "SPINDLE_SERVER_SECRETS_PROVIDER=${cfg.server.secrets.provider}" 168 "SPINDLE_SERVER_SECRETS_OPENBAO_PROXY_ADDR=${cfg.server.secrets.openbao.proxyAddr}" 169 "SPINDLE_SERVER_SECRETS_OPENBAO_MOUNT=${cfg.server.secrets.openbao.mount}" 170 "SPINDLE_NIXERY_PIPELINES_NIXERY=${cfg.pipelines.nixery}" 171 "SPINDLE_NIXERY_PIPELINES_WORKFLOW_TIMEOUT=${cfg.pipelines.workflowTimeout}" 172 ];
··· 53 description = "atproto PLC directory"; 54 }; 55 56 dev = mkOption { 57 type = types.bool; 58 default = false; ··· 143 144 systemd.services.spindle = { 145 description = "spindle service"; 146 + after = ["network.target" "docker.service" "spindle-tap.service"]; 147 wantedBy = ["multi-user.target"]; 148 serviceConfig = { 149 LogsDirectory = "spindle"; ··· 153 "SPINDLE_SERVER_DB_PATH=${cfg.server.dbPath}" 154 "SPINDLE_SERVER_HOSTNAME=${cfg.server.hostname}" 155 "SPINDLE_SERVER_PLC_URL=${cfg.server.plcUrl}" 156 "SPINDLE_SERVER_DEV=${lib.boolToString cfg.server.dev}" 157 "SPINDLE_SERVER_OWNER=${cfg.server.owner}" 158 "SPINDLE_SERVER_MAX_JOB_COUNT=${toString cfg.server.maxJobCount}" ··· 160 "SPINDLE_SERVER_SECRETS_PROVIDER=${cfg.server.secrets.provider}" 161 "SPINDLE_SERVER_SECRETS_OPENBAO_PROXY_ADDR=${cfg.server.secrets.openbao.proxyAddr}" 162 "SPINDLE_SERVER_SECRETS_OPENBAO_MOUNT=${cfg.server.secrets.openbao.mount}" 163 + "SPINDLE_SERVER_TAP_URL=http://localhost:2480" 164 "SPINDLE_NIXERY_PIPELINES_NIXERY=${cfg.pipelines.nixery}" 165 "SPINDLE_NIXERY_PIPELINES_WORKFLOW_TIMEOUT=${cfg.pipelines.workflowTimeout}" 166 ];
+5 -1
nix/vm.nix
··· 58 host.port = 6555; 59 guest.port = 6555; 60 } 61 ]; 62 sharedDirectories = { 63 # We can't use the 9p mounts directly for most of these ··· 101 owner = envVar "TANGLED_VM_SPINDLE_OWNER"; 102 hostname = envVarOr "TANGLED_VM_SPINDLE_HOST" "localhost:6555"; 103 plcUrl = plcUrl; 104 - jetstreamEndpoint = jetstream; 105 listenAddr = "0.0.0.0:6555"; 106 dev = true; 107 queueSize = 100;
··· 58 host.port = 6555; 59 guest.port = 6555; 60 } 61 + { 62 + from = "host"; 63 + host.port = 6556; 64 + guest.port = 2480; 65 + } 66 ]; 67 sharedDirectories = { 68 # We can't use the 9p mounts directly for most of these ··· 106 owner = envVar "TANGLED_VM_SPINDLE_OWNER"; 107 hostname = envVarOr "TANGLED_VM_SPINDLE_HOST" "localhost:6555"; 108 plcUrl = plcUrl; 109 listenAddr = "0.0.0.0:6555"; 110 dev = true; 111 queueSize = 100;
+11 -11
spindle/config/config.go
··· 9 ) 10 11 type Server struct { 12 - ListenAddr string `env:"LISTEN_ADDR, default=0.0.0.0:6555"` 13 - DBPath string `env:"DB_PATH, default=spindle.db"` 14 - Hostname string `env:"HOSTNAME, required"` 15 - JetstreamEndpoint string `env:"JETSTREAM_ENDPOINT, default=wss://jetstream1.us-west.bsky.network/subscribe"` 16 - PlcUrl string `env:"PLC_URL, default=https://plc.directory"` 17 - Dev bool `env:"DEV, default=false"` 18 - Owner syntax.DID `env:"OWNER, required"` 19 - Secrets Secrets `env:",prefix=SECRETS_"` 20 - LogDir string `env:"LOG_DIR, default=/var/log/spindle"` 21 - QueueSize int `env:"QUEUE_SIZE, default=100"` 22 - MaxJobCount int `env:"MAX_JOB_COUNT, default=2"` // max number of jobs that run at a time 23 } 24 25 func (s Server) Did() syntax.DID {
··· 9 ) 10 11 type Server struct { 12 + ListenAddr string `env:"LISTEN_ADDR, default=0.0.0.0:6555"` 13 + DBPath string `env:"DB_PATH, default=spindle.db"` 14 + Hostname string `env:"HOSTNAME, required"` 15 + TapUrl string `env:"TAP_URL, required"` 16 + PlcUrl string `env:"PLC_URL, default=https://plc.directory"` 17 + Dev bool `env:"DEV, default=false"` 18 + Owner syntax.DID `env:"OWNER, required"` 19 + Secrets Secrets `env:",prefix=SECRETS_"` 20 + LogDir string `env:"LOG_DIR, default=/var/log/spindle"` 21 + QueueSize int `env:"QUEUE_SIZE, default=100"` 22 + MaxJobCount int `env:"MAX_JOB_COUNT, default=2"` // max number of jobs that run at a time 23 } 24 25 func (s Server) Did() syntax.DID {
+28 -14
spindle/db/db.go
··· 5 "database/sql" 6 "strings" 7 8 _ "github.com/mattn/go-sqlite3" 9 "tangled.org/core/log" 10 "tangled.org/core/orm" ··· 55 addedAt text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 56 57 unique(owner, name) 58 ); 59 60 create table if not exists spindle_members ( ··· 120 return &DB{db}, nil 121 } 122 123 - func (d *DB) SaveLastTimeUs(lastTimeUs int64) error { 124 - _, err := d.Exec(` 125 - insert into _jetstream (id, last_time_us) 126 - values (1, ?) 127 - on conflict(id) do update set last_time_us = excluded.last_time_us 128 - `, lastTimeUs) 129 - return err 130 - } 131 - 132 - func (d *DB) GetLastTimeUs() (int64, error) { 133 - var lastTimeUs int64 134 - row := d.QueryRow(`select last_time_us from _jetstream where id = 1;`) 135 - err := row.Scan(&lastTimeUs) 136 - return lastTimeUs, err 137 }
··· 5 "database/sql" 6 "strings" 7 8 + "github.com/bluesky-social/indigo/atproto/syntax" 9 _ "github.com/mattn/go-sqlite3" 10 "tangled.org/core/log" 11 "tangled.org/core/orm" ··· 56 addedAt text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 57 58 unique(owner, name) 59 + ); 60 + 61 + create table if not exists repo_collaborators ( 62 + -- identifiers 63 + id integer primary key autoincrement, 64 + did text not null, 65 + rkey text not null, 66 + at_uri text generated always as ('at://' || did || '/' || 'sh.tangled.repo.collaborator' || '/' || rkey) stored, 67 + 68 + repo text not null, 69 + subject text not null, 70 + 71 + addedAt text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 72 + unique(did, rkey) 73 ); 74 75 create table if not exists spindle_members ( ··· 135 return &DB{db}, nil 136 } 137 138 + func (d *DB) IsKnownDid(did syntax.DID) (bool, error) { 139 + // is spindle member / repo collaborator 140 + var exists bool 141 + err := d.QueryRow( 142 + `select exists ( 143 + select 1 from repo_collaborators where subject = ? 144 + union all 145 + select 1 from spindle_members where did = ? 146 + )`, 147 + did, 148 + did, 149 + ).Scan(&exists) 150 + return exists, err 151 }
-44
spindle/db/known_dids.go
··· 1 - package db 2 - 3 - func (d *DB) AddDid(did string) error { 4 - _, err := d.Exec(`insert or ignore into known_dids (did) values (?)`, did) 5 - return err 6 - } 7 - 8 - func (d *DB) RemoveDid(did string) error { 9 - _, err := d.Exec(`delete from known_dids where did = ?`, did) 10 - return err 11 - } 12 - 13 - func (d *DB) GetAllDids() ([]string, error) { 14 - var dids []string 15 - 16 - rows, err := d.Query(`select did from known_dids`) 17 - if err != nil { 18 - return nil, err 19 - } 20 - defer rows.Close() 21 - 22 - for rows.Next() { 23 - var did string 24 - if err := rows.Scan(&did); err != nil { 25 - return nil, err 26 - } 27 - dids = append(dids, did) 28 - } 29 - 30 - if err := rows.Err(); err != nil { 31 - return nil, err 32 - } 33 - 34 - return dids, nil 35 - } 36 - 37 - func (d *DB) HasKnownDids() bool { 38 - var count int 39 - err := d.QueryRow(`select count(*) from known_dids`).Scan(&count) 40 - if err != nil { 41 - return false 42 - } 43 - return count > 0 44 - }
···
+119 -11
spindle/db/repos.go
··· 1 package db 2 3 type Repo struct { 4 - Knot string 5 - Owner string 6 - Name string 7 } 8 9 - func (d *DB) AddRepo(knot, owner, name string) error { 10 - _, err := d.Exec(`insert or ignore into repos (knot, owner, name) values (?, ?, ?)`, knot, owner, name) 11 return err 12 } 13 ··· 34 return knots, nil 35 } 36 37 - func (d *DB) GetRepo(knot, owner, name string) (*Repo, error) { 38 var repo Repo 39 - 40 - query := "select knot, owner, name from repos where knot = ? and owner = ? and name = ?" 41 - err := d.DB.QueryRow(query, knot, owner, name). 42 - Scan(&repo.Knot, &repo.Owner, &repo.Name) 43 - 44 if err != nil { 45 return nil, err 46 } 47 48 return &repo, nil 49 }
··· 1 package db 2 3 + import "github.com/bluesky-social/indigo/atproto/syntax" 4 + 5 type Repo struct { 6 + Did syntax.DID 7 + Rkey syntax.RecordKey 8 + Name string 9 + Knot string 10 + } 11 + 12 + type RepoCollaborator struct { 13 + Did syntax.DID 14 + Rkey syntax.RecordKey 15 + Repo syntax.ATURI 16 + Subject syntax.DID 17 + } 18 + 19 + func (d *DB) PutRepo(repo *Repo) error { 20 + _, err := d.Exec( 21 + `insert or ignore into repos (did, rkey, name, knot) 22 + values (?, ?, ?, ?) 23 + on conflict(did, rkey) do update set 24 + name = excluded.name, 25 + knot = excluded.knot`, 26 + repo.Did, 27 + repo.Rkey, 28 + repo.Name, 29 + repo.Knot, 30 + ) 31 + return err 32 } 33 34 + func (d *DB) DeleteRepo(did syntax.DID, rkey syntax.RecordKey) error { 35 + _, err := d.Exec( 36 + `delete from repos where did = ? and rkey = ?`, 37 + did, 38 + rkey, 39 + ) 40 return err 41 } 42 ··· 63 return knots, nil 64 } 65 66 + func (d *DB) GetRepo(repoAt syntax.ATURI) (*Repo, error) { 67 var repo Repo 68 + err := d.DB.QueryRow( 69 + `select 70 + did, 71 + rkey, 72 + name, 73 + knot 74 + from repos where at_uri = ?`, 75 + repoAt, 76 + ).Scan( 77 + &repo.Did, 78 + &repo.Rkey, 79 + &repo.Name, 80 + &repo.Knot, 81 + ) 82 if err != nil { 83 return nil, err 84 } 85 + return &repo, nil 86 + } 87 88 + func (d *DB) GetRepoWithName(did syntax.DID, name string) (*Repo, error) { 89 + var repo Repo 90 + err := d.DB.QueryRow( 91 + `select 92 + did, 93 + rkey, 94 + name, 95 + knot 96 + from repos where did = ? and name = ?`, 97 + did, 98 + name, 99 + ).Scan( 100 + &repo.Did, 101 + &repo.Rkey, 102 + &repo.Name, 103 + &repo.Knot, 104 + ) 105 + if err != nil { 106 + return nil, err 107 + } 108 return &repo, nil 109 } 110 + 111 + func (d *DB) PutRepoCollaborator(collaborator *RepoCollaborator) error { 112 + _, err := d.Exec( 113 + `insert into repo_collaborators (did, rkey, repo, subject) 114 + values (?, ?, ?, ?) 115 + on conflict(did, rkey) do update set 116 + repo = excluded.repo, 117 + subject = excluded.subject`, 118 + collaborator.Did, 119 + collaborator.Rkey, 120 + collaborator.Repo, 121 + collaborator.Subject, 122 + ) 123 + return err 124 + } 125 + 126 + func (d *DB) RemoveRepoCollaborator(did syntax.DID, rkey syntax.RecordKey) error { 127 + _, err := d.Exec( 128 + `delete from repo_collaborators where did = ? and rkey = ?`, 129 + did, 130 + rkey, 131 + ) 132 + return err 133 + } 134 + 135 + func (d *DB) GetRepoCollaborator(did syntax.DID, rkey syntax.RecordKey) (*RepoCollaborator, error) { 136 + var collaborator RepoCollaborator 137 + err := d.DB.QueryRow( 138 + `select 139 + did, 140 + rkey, 141 + repo, 142 + subject 143 + from repo_collaborators 144 + where did = ? and rkey = ?`, 145 + did, 146 + rkey, 147 + ).Scan( 148 + &collaborator.Did, 149 + &collaborator.Rkey, 150 + &collaborator.Repo, 151 + &collaborator.Subject, 152 + ) 153 + if err != nil { 154 + return nil, err 155 + } 156 + return &collaborator, nil 157 + }
-276
spindle/ingester.go
··· 1 - package spindle 2 - 3 - import ( 4 - "context" 5 - "encoding/json" 6 - "errors" 7 - "fmt" 8 - "time" 9 - 10 - "tangled.org/core/api/tangled" 11 - "tangled.org/core/eventconsumer" 12 - "tangled.org/core/spindle/db" 13 - 14 - comatproto "github.com/bluesky-social/indigo/api/atproto" 15 - "github.com/bluesky-social/indigo/atproto/syntax" 16 - "github.com/bluesky-social/indigo/xrpc" 17 - "github.com/bluesky-social/jetstream/pkg/models" 18 - ) 19 - 20 - type Ingester func(ctx context.Context, e *models.Event) error 21 - 22 - func (s *Spindle) ingest() Ingester { 23 - return func(ctx context.Context, e *models.Event) error { 24 - var err error 25 - defer func() { 26 - eventTime := e.TimeUS 27 - lastTimeUs := eventTime + 1 28 - if err := s.db.SaveLastTimeUs(lastTimeUs); err != nil { 29 - err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 30 - } 31 - }() 32 - 33 - if e.Kind != models.EventKindCommit { 34 - return nil 35 - } 36 - 37 - switch e.Commit.Collection { 38 - case tangled.SpindleMemberNSID: 39 - err = s.ingestMember(ctx, e) 40 - case tangled.RepoNSID: 41 - err = s.ingestRepo(ctx, e) 42 - case tangled.RepoCollaboratorNSID: 43 - err = s.ingestCollaborator(ctx, e) 44 - } 45 - 46 - if err != nil { 47 - s.l.Debug("failed to process message", "nsid", e.Commit.Collection, "err", err) 48 - } 49 - 50 - return nil 51 - } 52 - } 53 - 54 - func (s *Spindle) ingestMember(_ context.Context, e *models.Event) error { 55 - var err error 56 - did := e.Did 57 - rkey := e.Commit.RKey 58 - 59 - l := s.l.With("component", "ingester", "record", tangled.SpindleMemberNSID) 60 - 61 - switch e.Commit.Operation { 62 - case models.CommitOperationCreate, models.CommitOperationUpdate: 63 - raw := e.Commit.Record 64 - record := tangled.SpindleMember{} 65 - err = json.Unmarshal(raw, &record) 66 - if err != nil { 67 - l.Error("invalid record", "error", err) 68 - return err 69 - } 70 - 71 - domain := s.cfg.Server.Hostname 72 - recordInstance := record.Instance 73 - 74 - if recordInstance != domain { 75 - l.Error("domain mismatch", "domain", recordInstance, "expected", domain) 76 - return fmt.Errorf("domain mismatch: %s != %s", record.Instance, domain) 77 - } 78 - 79 - ok, err := s.e.IsSpindleMemberInviteAllowed(syntax.DID(did), s.cfg.Server.Did()) 80 - if err != nil || !ok { 81 - l.Error("failed to add member", "did", did, "error", err) 82 - return fmt.Errorf("failed to enforce permissions: %w", err) 83 - } 84 - 85 - if err := db.AddSpindleMember(s.db, db.SpindleMember{ 86 - Did: syntax.DID(did), 87 - Rkey: rkey, 88 - Instance: recordInstance, 89 - Subject: syntax.DID(record.Subject), 90 - Created: time.Now(), 91 - }); err != nil { 92 - l.Error("failed to add member", "error", err) 93 - return fmt.Errorf("failed to add member: %w", err) 94 - } 95 - 96 - if err := s.e.AddSpindleMember(syntax.DID(record.Subject), s.cfg.Server.Did()); err != nil { 97 - l.Error("failed to add member", "error", err) 98 - return fmt.Errorf("failed to add member: %w", err) 99 - } 100 - l.Info("added member from firehose", "member", record.Subject) 101 - 102 - if err := s.db.AddDid(record.Subject); err != nil { 103 - l.Error("failed to add did", "error", err) 104 - return fmt.Errorf("failed to add did: %w", err) 105 - } 106 - s.jc.AddDid(record.Subject) 107 - 108 - return nil 109 - 110 - case models.CommitOperationDelete: 111 - record, err := db.GetSpindleMember(s.db, did, rkey) 112 - if err != nil { 113 - l.Error("failed to find member", "error", err) 114 - return fmt.Errorf("failed to find member: %w", err) 115 - } 116 - 117 - if err := db.RemoveSpindleMember(s.db, did, rkey); err != nil { 118 - l.Error("failed to remove member", "error", err) 119 - return fmt.Errorf("failed to remove member: %w", err) 120 - } 121 - 122 - if err := s.e.RemoveSpindleMember(record.Subject, s.cfg.Server.Did()); err != nil { 123 - l.Error("failed to add member", "error", err) 124 - return fmt.Errorf("failed to add member: %w", err) 125 - } 126 - l.Info("added member from firehose", "member", record.Subject) 127 - 128 - if err := s.db.RemoveDid(record.Subject.String()); err != nil { 129 - l.Error("failed to add did", "error", err) 130 - return fmt.Errorf("failed to add did: %w", err) 131 - } 132 - s.jc.RemoveDid(record.Subject.String()) 133 - 134 - } 135 - return nil 136 - } 137 - 138 - func (s *Spindle) ingestRepo(ctx context.Context, e *models.Event) error { 139 - var err error 140 - did := e.Did 141 - 142 - l := s.l.With("component", "ingester", "record", tangled.RepoNSID) 143 - 144 - l.Info("ingesting repo record", "did", did) 145 - 146 - switch e.Commit.Operation { 147 - case models.CommitOperationCreate, models.CommitOperationUpdate: 148 - raw := e.Commit.Record 149 - record := tangled.Repo{} 150 - err = json.Unmarshal(raw, &record) 151 - if err != nil { 152 - l.Error("invalid record", "error", err) 153 - return err 154 - } 155 - 156 - domain := s.cfg.Server.Hostname 157 - 158 - // no spindle configured for this repo 159 - if record.Spindle == nil { 160 - l.Info("no spindle configured", "name", record.Name) 161 - return nil 162 - } 163 - 164 - // this repo did not want this spindle 165 - if *record.Spindle != domain { 166 - l.Info("different spindle configured", "name", record.Name, "spindle", *record.Spindle, "domain", domain) 167 - return nil 168 - } 169 - 170 - // add this repo to the watch list 171 - if err := s.db.AddRepo(record.Knot, did, record.Name); err != nil { 172 - l.Error("failed to add repo", "error", err) 173 - return fmt.Errorf("failed to add repo: %w", err) 174 - } 175 - 176 - repoAt := syntax.ATURI(fmt.Sprintf("at://%s/%s/%s", did, e.Commit.Collection, e.Commit.RKey)) 177 - 178 - // add repo to rbac 179 - if err := s.e.AddRepo(repoAt); err != nil { 180 - l.Error("failed to add repo to enforcer", "error", err) 181 - return fmt.Errorf("failed to add repo: %w", err) 182 - } 183 - 184 - // add collaborators to rbac 185 - if err := s.fetchAndAddCollaborators(ctx, repoAt); err != nil { 186 - return err 187 - } 188 - 189 - // add this knot to the event consumer 190 - src := eventconsumer.NewKnotSource(record.Knot) 191 - s.ks.AddSource(context.Background(), src) 192 - 193 - return nil 194 - 195 - } 196 - return nil 197 - } 198 - 199 - func (s *Spindle) ingestCollaborator(ctx context.Context, e *models.Event) error { 200 - var err error 201 - 202 - l := s.l.With("component", "ingester", "record", tangled.RepoCollaboratorNSID, "did", e.Did) 203 - 204 - l.Info("ingesting collaborator record") 205 - 206 - switch e.Commit.Operation { 207 - case models.CommitOperationCreate, models.CommitOperationUpdate: 208 - raw := e.Commit.Record 209 - record := tangled.RepoCollaborator{} 210 - err = json.Unmarshal(raw, &record) 211 - if err != nil { 212 - l.Error("invalid record", "error", err) 213 - return err 214 - } 215 - 216 - subjectId, err := s.res.ResolveIdent(ctx, record.Subject) 217 - if err != nil || subjectId.Handle.IsInvalidHandle() { 218 - return err 219 - } 220 - 221 - repoAt, err := syntax.ParseATURI(record.Repo) 222 - if err != nil { 223 - l.Info("rejecting record, invalid repoAt", "repoAt", record.Repo) 224 - return nil 225 - } 226 - 227 - // check perms for this user 228 - if ok, err := s.e.IsRepoCollaboratorInviteAllowed(syntax.DID(e.Did), repoAt); !ok || err != nil { 229 - return fmt.Errorf("insufficient permissions: %w", err) 230 - } 231 - 232 - // add collaborator to rbac 233 - if err := s.e.AddRepoCollaborator(syntax.DID(record.Subject), repoAt); err != nil { 234 - l.Error("failed to add repo to enforcer", "error", err) 235 - return fmt.Errorf("failed to add repo: %w", err) 236 - } 237 - 238 - return nil 239 - } 240 - return nil 241 - } 242 - 243 - func (s *Spindle) fetchAndAddCollaborators(ctx context.Context, repo syntax.ATURI) error { 244 - l := s.l.With("component", "ingester", "handler", "fetchAndAddCollaborators") 245 - 246 - l.Info("fetching and adding existing collaborators") 247 - 248 - ident, err := s.res.ResolveIdent(ctx, repo.Authority().String()) 249 - if err != nil || ident.Handle.IsInvalidHandle() { 250 - return fmt.Errorf("failed to resolve handle: %w", err) 251 - } 252 - 253 - xrpcc := xrpc.Client{ 254 - Host: ident.PDSEndpoint(), 255 - } 256 - 257 - resp, err := comatproto.RepoListRecords(ctx, &xrpcc, tangled.RepoCollaboratorNSID, "", 50, ident.DID.String(), false) 258 - if err != nil { 259 - return err 260 - } 261 - 262 - var errs error 263 - for _, r := range resp.Records { 264 - if r == nil { 265 - continue 266 - } 267 - record := r.Value.Val.(*tangled.RepoCollaborator) 268 - 269 - if err := s.e.AddRepoCollaborator(syntax.DID(record.Subject), syntax.ATURI(record.Repo)); err != nil { 270 - l.Error("failed to add repo to enforcer", "error", err) 271 - errors.Join(errs, fmt.Errorf("failed to add repo: %w", err)) 272 - } 273 - } 274 - 275 - return errs 276 - }
···
+19 -30
spindle/server.go
··· 9 "maps" 10 "net/http" 11 12 "github.com/go-chi/chi/v5" 13 "tangled.org/core/api/tangled" 14 "tangled.org/core/eventconsumer" 15 "tangled.org/core/eventconsumer/cursor" 16 "tangled.org/core/idresolver" 17 - "tangled.org/core/jetstream" 18 "tangled.org/core/log" 19 "tangled.org/core/notifier" 20 "tangled.org/core/rbac2" ··· 26 "tangled.org/core/spindle/queue" 27 "tangled.org/core/spindle/secrets" 28 "tangled.org/core/spindle/xrpc" 29 "tangled.org/core/xrpc/serviceauth" 30 ) 31 ··· 33 var motd []byte 34 35 type Spindle struct { 36 - jc *jetstream.JetstreamClient 37 db *db.DB 38 e *rbac2.Enforcer 39 l *slog.Logger ··· 90 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount) 91 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount) 92 93 - collections := []string{ 94 - tangled.SpindleMemberNSID, 95 - tangled.RepoNSID, 96 - tangled.RepoCollaboratorNSID, 97 - } 98 - jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, log.SubLogger(logger, "jetstream"), d, true, true) 99 - if err != nil { 100 - return nil, fmt.Errorf("failed to setup jetstream client: %w", err) 101 - } 102 - jc.AddDid(cfg.Server.Owner.String()) 103 - 104 - // Check if the spindle knows about any Dids; 105 - dids, err := d.GetAllDids() 106 - if err != nil { 107 - return nil, fmt.Errorf("failed to get all dids: %w", err) 108 - } 109 - for _, d := range dids { 110 - jc.AddDid(d) 111 - } 112 113 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl) 114 115 spindle := &Spindle{ 116 - jc: jc, 117 e: e, 118 db: d, 119 l: logger, ··· 134 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath) 135 if err != nil { 136 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 137 - } 138 - 139 - err = jc.StartJetstream(ctx, spindle.ingest()) 140 - if err != nil { 141 - return nil, fmt.Errorf("failed to start jetstream consumer: %w", err) 142 } 143 144 // for each incoming sh.tangled.pipeline, we execute ··· 208 s.ks.Start(ctx) 209 }() 210 211 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr) 212 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router()) 213 } ··· 288 } 289 290 // filter by repos 291 - _, err = s.db.GetRepo( 292 - tpl.TriggerMetadata.Repo.Knot, 293 - tpl.TriggerMetadata.Repo.Did, 294 tpl.TriggerMetadata.Repo.Repo, 295 ) 296 if err != nil {
··· 9 "maps" 10 "net/http" 11 12 + "github.com/bluesky-social/indigo/atproto/syntax" 13 "github.com/go-chi/chi/v5" 14 "tangled.org/core/api/tangled" 15 "tangled.org/core/eventconsumer" 16 "tangled.org/core/eventconsumer/cursor" 17 "tangled.org/core/idresolver" 18 "tangled.org/core/log" 19 "tangled.org/core/notifier" 20 "tangled.org/core/rbac2" ··· 26 "tangled.org/core/spindle/queue" 27 "tangled.org/core/spindle/secrets" 28 "tangled.org/core/spindle/xrpc" 29 + "tangled.org/core/tap" 30 "tangled.org/core/xrpc/serviceauth" 31 ) 32 ··· 34 var motd []byte 35 36 type Spindle struct { 37 + tap *tap.Client 38 db *db.DB 39 e *rbac2.Enforcer 40 l *slog.Logger ··· 91 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount) 92 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount) 93 94 + tap := tap.NewClient(cfg.Server.TapUrl, "") 95 96 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl) 97 98 spindle := &Spindle{ 99 + tap: &tap, 100 e: e, 101 db: d, 102 l: logger, ··· 117 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath) 118 if err != nil { 119 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 120 } 121 122 // for each incoming sh.tangled.pipeline, we execute ··· 186 s.ks.Start(ctx) 187 }() 188 189 + // ensure server owner is tracked 190 + if err := s.tap.AddRepos(ctx, []syntax.DID{s.cfg.Server.Owner}); err != nil { 191 + return err 192 + } 193 + 194 + go func() { 195 + s.l.Info("starting tap stream consumer") 196 + s.tap.Connect(ctx, &tap.SimpleIndexer{ 197 + EventHandler: s.processEvent, 198 + }) 199 + }() 200 + 201 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr) 202 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router()) 203 } ··· 278 } 279 280 // filter by repos 281 + _, err = s.db.GetRepoWithName( 282 + syntax.DID(tpl.TriggerMetadata.Repo.Did), 283 tpl.TriggerMetadata.Repo.Repo, 284 ) 285 if err != nil {
+294
spindle/tap.go
···
··· 1 + package spindle 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "fmt" 7 + "time" 8 + 9 + "github.com/bluesky-social/indigo/atproto/syntax" 10 + "tangled.org/core/api/tangled" 11 + "tangled.org/core/eventconsumer" 12 + "tangled.org/core/spindle/db" 13 + "tangled.org/core/tap" 14 + ) 15 + 16 + func (s *Spindle) processEvent(ctx context.Context, evt tap.Event) error { 17 + l := s.l.With("component", "tapIndexer") 18 + 19 + var err error 20 + switch evt.Type { 21 + case tap.EvtRecord: 22 + switch evt.Record.Collection.String() { 23 + case tangled.SpindleMemberNSID: 24 + err = s.processMember(ctx, evt) 25 + case tangled.RepoNSID: 26 + err = s.processRepo(ctx, evt) 27 + case tangled.RepoCollaboratorNSID: 28 + err = s.processCollaborator(ctx, evt) 29 + case tangled.RepoPullNSID: 30 + err = s.processPull(ctx, evt) 31 + } 32 + case tap.EvtIdentity: 33 + // no-op 34 + } 35 + 36 + if err != nil { 37 + l.Error("failed to process message. will retry later", "event.ID", evt.ID, "err", err) 38 + return err 39 + } 40 + return nil 41 + } 42 + 43 + // NOTE: make sure to return nil if we don't need to retry (e.g. forbidden, unrelated) 44 + 45 + func (s *Spindle) processMember(ctx context.Context, evt tap.Event) error { 46 + l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri()) 47 + 48 + l.Info("processing spindle.member record") 49 + 50 + // only listen to members 51 + if ok, err := s.e.IsSpindleMemberInviteAllowed(evt.Record.Did, s.cfg.Server.Did()); !ok || err != nil { 52 + l.Warn("forbidden request: member invite not allowed", "did", evt.Record.Did, "error", err) 53 + return nil 54 + } 55 + 56 + switch evt.Record.Action { 57 + case tap.RecordCreateAction, tap.RecordUpdateAction: 58 + record := tangled.SpindleMember{} 59 + if err := json.Unmarshal(evt.Record.Record, &record); err != nil { 60 + return fmt.Errorf("parsing record: %w", err) 61 + } 62 + 63 + domain := s.cfg.Server.Hostname 64 + if record.Instance != domain { 65 + l.Info("domain mismatch", "domain", record.Instance, "expected", domain) 66 + return nil 67 + } 68 + 69 + created, err := time.Parse(record.CreatedAt, time.RFC3339) 70 + if err != nil { 71 + created = time.Now() 72 + } 73 + if err := db.AddSpindleMember(s.db, db.SpindleMember{ 74 + Did: evt.Record.Did, 75 + Rkey: evt.Record.Rkey.String(), 76 + Instance: record.Instance, 77 + Subject: syntax.DID(record.Subject), 78 + Created: created, 79 + }); err != nil { 80 + l.Error("failed to add member", "error", err) 81 + return fmt.Errorf("adding member to db: %w", err) 82 + } 83 + if err := s.e.AddSpindleMember(syntax.DID(record.Subject), s.cfg.Server.Did()); err != nil { 84 + return fmt.Errorf("adding member to rbac: %w", err) 85 + } 86 + if err := s.tap.AddRepos(ctx, []syntax.DID{syntax.DID(record.Subject)}); err != nil { 87 + return fmt.Errorf("adding did to tap", err) 88 + } 89 + 90 + l.Info("added member", "member", record.Subject) 91 + return nil 92 + 93 + case tap.RecordDeleteAction: 94 + var ( 95 + did = evt.Record.Did.String() 96 + rkey = evt.Record.Rkey.String() 97 + ) 98 + member, err := db.GetSpindleMember(s.db, did, rkey) 99 + if err != nil { 100 + return fmt.Errorf("finding member: %w", err) 101 + } 102 + 103 + if err := db.RemoveSpindleMember(s.db, did, rkey); err != nil { 104 + return fmt.Errorf("removing member from db: %w", err) 105 + } 106 + if err := s.e.RemoveSpindleMember(member.Subject, s.cfg.Server.Did()); err != nil { 107 + return fmt.Errorf("removing member from rbac: %w", err) 108 + } 109 + if err := s.tapSafeRemoveDid(ctx, member.Subject); err != nil { 110 + return fmt.Errorf("removing did from tap: %w", err) 111 + } 112 + 113 + l.Info("removed member", "member", member.Subject) 114 + return nil 115 + } 116 + return nil 117 + } 118 + 119 + func (s *Spindle) processCollaborator(ctx context.Context, evt tap.Event) error { 120 + l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri()) 121 + 122 + l.Info("processing repo.collaborator record") 123 + 124 + // only listen to members 125 + if ok, err := s.e.IsSpindleMember(evt.Record.Did, s.cfg.Server.Did()); !ok || err != nil { 126 + l.Warn("forbidden request: not spindle member", "did", evt.Record.Did, "err", err) 127 + return nil 128 + } 129 + 130 + switch evt.Record.Action { 131 + case tap.RecordCreateAction, tap.RecordUpdateAction: 132 + record := tangled.RepoCollaborator{} 133 + if err := json.Unmarshal(evt.Record.Record, &record); err != nil { 134 + l.Error("invalid record", "err", err) 135 + return fmt.Errorf("parsing record: %w", err) 136 + } 137 + 138 + // retry later if target repo is not ingested yet 139 + if _, err := s.db.GetRepo(syntax.ATURI(record.Repo)); err != nil { 140 + l.Warn("target repo is not ingested yet", "repo", record.Repo, "err", err) 141 + return fmt.Errorf("target repo is unknown") 142 + } 143 + 144 + // check perms for this user 145 + if ok, err := s.e.IsRepoCollaboratorInviteAllowed(evt.Record.Did, syntax.ATURI(record.Repo)); !ok || err != nil { 146 + l.Warn("forbidden request collaborator invite not allowed", "did", evt.Record.Did, "err", err) 147 + return nil 148 + } 149 + 150 + if err := s.db.PutRepoCollaborator(&db.RepoCollaborator{ 151 + Did: evt.Record.Did, 152 + Rkey: evt.Record.Rkey, 153 + Repo: syntax.ATURI(record.Repo), 154 + Subject: syntax.DID(record.Subject), 155 + }); err != nil { 156 + return fmt.Errorf("adding collaborator to db: %w", err) 157 + } 158 + if err := s.e.AddRepoCollaborator(syntax.DID(record.Subject), syntax.ATURI(record.Repo)); err != nil { 159 + return fmt.Errorf("adding collaborator to rbac: %w", err) 160 + } 161 + if err := s.tap.AddRepos(ctx, []syntax.DID{syntax.DID(record.Subject)}); err != nil { 162 + return fmt.Errorf("adding did to tap: %w", err) 163 + } 164 + 165 + l.Info("add repo collaborator", "subejct", record.Subject, "repo", record.Repo) 166 + return nil 167 + 168 + case tap.RecordDeleteAction: 169 + // get existing collaborator 170 + collaborator, err := s.db.GetRepoCollaborator(evt.Record.Did, evt.Record.Rkey) 171 + if err != nil { 172 + return fmt.Errorf("failed to get existing collaborator info: %w", err) 173 + } 174 + 175 + // check perms for this user 176 + if ok, err := s.e.IsRepoCollaboratorInviteAllowed(evt.Record.Did, collaborator.Repo); !ok || err != nil { 177 + l.Warn("forbidden request collaborator invite not allowed", "did", evt.Record.Did, "err", err) 178 + return nil 179 + } 180 + 181 + if err := s.db.RemoveRepoCollaborator(collaborator.Subject, collaborator.Rkey); err != nil { 182 + return fmt.Errorf("removing collaborator from db: %w", err) 183 + } 184 + if err := s.e.RemoveRepoCollaborator(collaborator.Subject, collaborator.Repo); err != nil { 185 + return fmt.Errorf("removing collaborator from rbac: %w", err) 186 + } 187 + if err := s.tapSafeRemoveDid(ctx, collaborator.Subject); err != nil { 188 + return fmt.Errorf("removing did from tap: %w", err) 189 + } 190 + 191 + l.Info("removed repo collaborator", "subejct", collaborator.Subject, "repo", collaborator.Repo) 192 + return nil 193 + } 194 + return nil 195 + } 196 + 197 + func (s *Spindle) processRepo(ctx context.Context, evt tap.Event) error { 198 + l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri()) 199 + 200 + l.Info("processing repo record") 201 + 202 + // only listen to members 203 + if ok, err := s.e.IsSpindleMember(evt.Record.Did, s.cfg.Server.Did()); !ok || err != nil { 204 + l.Warn("forbidden request: not spindle member", "did", evt.Record.Did, "err", err) 205 + return nil 206 + } 207 + 208 + switch evt.Record.Action { 209 + case tap.RecordCreateAction, tap.RecordUpdateAction: 210 + record := tangled.Repo{} 211 + if err := json.Unmarshal(evt.Record.Record, &record); err != nil { 212 + return fmt.Errorf("parsing record: %w", err) 213 + } 214 + 215 + domain := s.cfg.Server.Hostname 216 + if record.Spindle == nil || *record.Spindle != domain { 217 + if record.Spindle == nil { 218 + l.Info("spindle isn't configured", "name", record.Name) 219 + } else { 220 + l.Info("different spindle configured", "name", record.Name, "spindle", *record.Spindle, "domain", domain) 221 + } 222 + if err := s.db.DeleteRepo(evt.Record.Did, evt.Record.Rkey); err != nil { 223 + return fmt.Errorf("deleting repo from db: %w", err) 224 + } 225 + return nil 226 + } 227 + 228 + if err := s.db.PutRepo(&db.Repo{ 229 + Did: evt.Record.Did, 230 + Rkey: evt.Record.Rkey, 231 + Name: record.Name, 232 + Knot: record.Knot, 233 + }); err != nil { 234 + return fmt.Errorf("adding repo to db: %w", err) 235 + } 236 + 237 + if err := s.e.AddRepo(evt.Record.AtUri()); err != nil { 238 + return fmt.Errorf("adding repo to rbac") 239 + } 240 + 241 + // add this knot to the event consumer 242 + src := eventconsumer.NewKnotSource(record.Knot) 243 + s.ks.AddSource(context.Background(), src) 244 + 245 + l.Info("added repo", "repo", evt.Record.AtUri()) 246 + return nil 247 + 248 + case tap.RecordDeleteAction: 249 + // check perms for this user 250 + if ok, err := s.e.IsRepoOwner(evt.Record.Did, evt.Record.AtUri()); !ok || err != nil { 251 + l.Warn("forbidden request: not repo owner", "did", evt.Record.Did, "err", err) 252 + return nil 253 + } 254 + 255 + if err := s.db.DeleteRepo(evt.Record.Did, evt.Record.Rkey); err != nil { 256 + return fmt.Errorf("deleting repo from db: %w", err) 257 + } 258 + 259 + if err := s.e.DeleteRepo(evt.Record.AtUri()); err != nil { 260 + return fmt.Errorf("deleting repo from rbac: %w", err) 261 + } 262 + 263 + l.Info("deleted repo", "repo", evt.Record.AtUri()) 264 + return nil 265 + } 266 + return nil 267 + } 268 + 269 + func (s *Spindle) processPull(ctx context.Context, evt tap.Event) error { 270 + l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri()) 271 + 272 + l.Info("processing pull record") 273 + 274 + switch evt.Record.Action { 275 + case tap.RecordCreateAction, tap.RecordUpdateAction: 276 + // TODO 277 + case tap.RecordDeleteAction: 278 + // TODO 279 + } 280 + return nil 281 + } 282 + 283 + func (s *Spindle) tapSafeRemoveDid(ctx context.Context, did syntax.DID) error { 284 + known, err := s.db.IsKnownDid(syntax.DID(did)) 285 + if err != nil { 286 + return fmt.Errorf("ensuring did known state: %w", err) 287 + } 288 + if !known { 289 + if err := s.tap.RemoveRepos(ctx, []syntax.DID{did}); err != nil { 290 + return fmt.Errorf("removing did from tap: %w", err) 291 + } 292 + } 293 + return nil 294 + }