Monorepo for Tangled tangled.org

nix,spindle: switch to tap from jetstream #982

open opened by boltless.me targeting master from sl/spindle-rewrite

spindle-tap will collect/stream record events from:

  • users dynamically added by spindle (spindle members | collaborators of repos using spindle)
  • any users with sh.tangled.repo.pull collection

It might be bit inefficient considering it will also stream repo creation events from PR authors due to second rule, but at least we now have backfill logic and Sync 1.1 based syncing.

This inefficiency can be fixed later by modifying upstream tap cli or embedding tap into spindle.

+--------- all tangled users --------+
|                                    |
| +-- users known to spindle-tap --+ |
| |  (PR author / manually added)  | |
| |                                | |
| | +----------------------------+ | |
| | |   users known to spindle   | | |
| | |  (members / collaborators) | | |
| | +----------------------------+ | |
| +--------------------------------+ |
+------------------------------------+

Close: https://tangled.org/tangled.org/core/issues/341

Signed-off-by: Seongmin Lee git@boltless.me

Labels

None yet.

assignee

None yet.

Participants 1
AT URI
at://did:plc:xasnlahkri4ewmbuzly2rlc5/sh.tangled.repo.pull/3mckguakeya22
+25 -25
Interdiff #3 #4
nix/modules/spindle.nix

This patch was likely rebased, as context lines do not match.

nix/vm.nix

This file has not been changed.

spindle/config/config.go

This file has not been changed.

spindle/db/db.go

This file has not been changed.

spindle/db/known_dids.go

This file has not been changed.

spindle/db/repos.go

This file has not been changed.

spindle/ingester.go

This file has not been changed.

+4 -4
spindle/server.go
··· 27 27 "tangled.org/core/spindle/queue" 28 28 "tangled.org/core/spindle/secrets" 29 29 "tangled.org/core/spindle/xrpc" 30 - "tangled.org/core/tap" 30 + "tangled.org/core/tapc" 31 31 "tangled.org/core/xrpc/serviceauth" 32 32 ) 33 33 ··· 35 35 var defaultMotd []byte 36 36 37 37 type Spindle struct { 38 - tap *tap.Client 38 + tap *tapc.Client 39 39 db *db.DB 40 40 e *rbac2.Enforcer 41 41 l *slog.Logger ··· 94 94 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount) 95 95 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount) 96 96 97 - tap := tap.NewClient(cfg.Server.TapUrl, "") 97 + tap := tapc.NewClient(cfg.Server.TapUrl, "") 98 98 99 99 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl) 100 100 ··· 211 211 212 212 go func() { 213 213 s.l.Info("starting tap stream consumer") 214 - s.tap.Connect(ctx, &tap.SimpleIndexer{ 214 + s.tap.Connect(ctx, &tapc.SimpleIndexer{ 215 215 EventHandler: s.processEvent, 216 216 }) 217 217 }()
+21 -21
spindle/tap.go
··· 10 10 "tangled.org/core/api/tangled" 11 11 "tangled.org/core/eventconsumer" 12 12 "tangled.org/core/spindle/db" 13 - "tangled.org/core/tap" 13 + "tangled.org/core/tapc" 14 14 ) 15 15 16 - func (s *Spindle) processEvent(ctx context.Context, evt tap.Event) error { 16 + func (s *Spindle) processEvent(ctx context.Context, evt tapc.Event) error { 17 17 l := s.l.With("component", "tapIndexer") 18 18 19 19 var err error 20 20 switch evt.Type { 21 - case tap.EvtRecord: 21 + case tapc.EvtRecord: 22 22 switch evt.Record.Collection.String() { 23 23 case tangled.SpindleMemberNSID: 24 24 err = s.processMember(ctx, evt) ··· 29 29 case tangled.RepoPullNSID: 30 30 err = s.processPull(ctx, evt) 31 31 } 32 - case tap.EvtIdentity: 32 + case tapc.EvtIdentity: 33 33 // no-op 34 34 } 35 35 ··· 42 42 43 43 // NOTE: make sure to return nil if we don't need to retry (e.g. forbidden, unrelated) 44 44 45 - func (s *Spindle) processMember(ctx context.Context, evt tap.Event) error { 45 + func (s *Spindle) processMember(ctx context.Context, evt tapc.Event) error { 46 46 l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri()) 47 47 48 48 l.Info("processing spindle.member record") ··· 54 54 } 55 55 56 56 switch evt.Record.Action { 57 - case tap.RecordCreateAction, tap.RecordUpdateAction: 57 + case tapc.RecordCreateAction, tapc.RecordUpdateAction: 58 58 record := tangled.SpindleMember{} 59 59 if err := json.Unmarshal(evt.Record.Record, &record); err != nil { 60 60 return fmt.Errorf("parsing record: %w", err) ··· 84 84 return fmt.Errorf("adding member to rbac: %w", err) 85 85 } 86 86 if err := s.tap.AddRepos(ctx, []syntax.DID{syntax.DID(record.Subject)}); err != nil { 87 - return fmt.Errorf("adding did to tap: %w", err) 87 + return fmt.Errorf("adding did to tapc: %w", err) 88 88 } 89 89 90 90 l.Info("added member", "member", record.Subject) 91 91 return nil 92 92 93 - case tap.RecordDeleteAction: 93 + case tapc.RecordDeleteAction: 94 94 var ( 95 95 did = evt.Record.Did.String() 96 96 rkey = evt.Record.Rkey.String() ··· 107 107 return fmt.Errorf("removing member from rbac: %w", err) 108 108 } 109 109 if err := s.tapSafeRemoveDid(ctx, member.Subject); err != nil { 110 - return fmt.Errorf("removing did from tap: %w", err) 110 + return fmt.Errorf("removing did from tapc: %w", err) 111 111 } 112 112 113 113 l.Info("removed member", "member", member.Subject) ··· 116 116 return nil 117 117 } 118 118 119 - func (s *Spindle) processCollaborator(ctx context.Context, evt tap.Event) error { 119 + func (s *Spindle) processCollaborator(ctx context.Context, evt tapc.Event) error { 120 120 l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri()) 121 121 122 122 l.Info("processing repo.collaborator record") ··· 128 128 } 129 129 130 130 switch evt.Record.Action { 131 - case tap.RecordCreateAction, tap.RecordUpdateAction: 131 + case tapc.RecordCreateAction, tapc.RecordUpdateAction: 132 132 record := tangled.RepoCollaborator{} 133 133 if err := json.Unmarshal(evt.Record.Record, &record); err != nil { 134 134 l.Error("invalid record", "err", err) ··· 159 159 return fmt.Errorf("adding collaborator to rbac: %w", err) 160 160 } 161 161 if err := s.tap.AddRepos(ctx, []syntax.DID{syntax.DID(record.Subject)}); err != nil { 162 - return fmt.Errorf("adding did to tap: %w", err) 162 + return fmt.Errorf("adding did to tapc: %w", err) 163 163 } 164 164 165 165 l.Info("add repo collaborator", "subejct", record.Subject, "repo", record.Repo) 166 166 return nil 167 167 168 - case tap.RecordDeleteAction: 168 + case tapc.RecordDeleteAction: 169 169 // get existing collaborator 170 170 collaborator, err := s.db.GetRepoCollaborator(evt.Record.Did, evt.Record.Rkey) 171 171 if err != nil { ··· 185 185 return fmt.Errorf("removing collaborator from rbac: %w", err) 186 186 } 187 187 if err := s.tapSafeRemoveDid(ctx, collaborator.Subject); err != nil { 188 - return fmt.Errorf("removing did from tap: %w", err) 188 + return fmt.Errorf("removing did from tapc: %w", err) 189 189 } 190 190 191 191 l.Info("removed repo collaborator", "subejct", collaborator.Subject, "repo", collaborator.Repo) ··· 194 194 return nil 195 195 } 196 196 197 - func (s *Spindle) processRepo(ctx context.Context, evt tap.Event) error { 197 + func (s *Spindle) processRepo(ctx context.Context, evt tapc.Event) error { 198 198 l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri()) 199 199 200 200 l.Info("processing repo record") ··· 206 206 } 207 207 208 208 switch evt.Record.Action { 209 - case tap.RecordCreateAction, tap.RecordUpdateAction: 209 + case tapc.RecordCreateAction, tapc.RecordUpdateAction: 210 210 record := tangled.Repo{} 211 211 if err := json.Unmarshal(evt.Record.Record, &record); err != nil { 212 212 return fmt.Errorf("parsing record: %w", err) ··· 245 245 l.Info("added repo", "repo", evt.Record.AtUri()) 246 246 return nil 247 247 248 - case tap.RecordDeleteAction: 248 + case tapc.RecordDeleteAction: 249 249 // check perms for this user 250 250 if ok, err := s.e.IsRepoOwner(evt.Record.Did, evt.Record.AtUri()); !ok || err != nil { 251 251 l.Warn("forbidden request: not repo owner", "did", evt.Record.Did, "err", err) ··· 266 266 return nil 267 267 } 268 268 269 - func (s *Spindle) processPull(ctx context.Context, evt tap.Event) error { 269 + func (s *Spindle) processPull(ctx context.Context, evt tapc.Event) error { 270 270 l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri()) 271 271 272 272 l.Info("processing pull record") 273 273 274 274 switch evt.Record.Action { 275 - case tap.RecordCreateAction, tap.RecordUpdateAction: 275 + case tapc.RecordCreateAction, tapc.RecordUpdateAction: 276 276 // TODO 277 - case tap.RecordDeleteAction: 277 + case tapc.RecordDeleteAction: 278 278 // TODO 279 279 } 280 280 return nil ··· 287 287 } 288 288 if !known { 289 289 if err := s.tap.RemoveRepos(ctx, []syntax.DID{did}); err != nil { 290 - return fmt.Errorf("removing did from tap: %w", err) 290 + return fmt.Errorf("removing did from tapc: %w", err) 291 291 } 292 292 } 293 293 return nil

History

5 rounds 0 comments
sign up or login to add to the discussion
1 commit
expand
nix,spindle: switch to tap from jetstream
3/3 success
expand
merge conflicts detected
expand
  • appview/state/state.go:35
  • go.mod:9
  • go.sum:483
  • nix/gomod2nix.toml:510
expand 0 comments
1 commit
expand
nix,spindle: switch to tap from jetstream
2/3 timeout, 1/3 success
expand
expand 0 comments
1 commit
expand
nix,spindle: switch to tap from jetstream
1/3 failed, 2/3 success
expand
expand 0 comments
1 commit
expand
nix,spindle: switch to tap from jetstream
1/3 failed, 2/3 success
expand
expand 0 comments
1 commit
expand
nix,spindle: switch to tap from jetstream
1/3 failed, 2/3 success
expand
expand 0 comments