Monorepo for Tangled tangled.org

nix,spindle: switch to tap from jetstream #982

open opened by boltless.me targeting master from sl/spindle-rewrite

spindle-tap will collect/stream record events from:

  • users dynamically added by spindle (spindle members | collaborators of repos using spindle)
  • any users with sh.tangled.repo.pull collection

It might be bit inefficient considering it will also stream repo creation events from PR authors due to second rule, but at least we now have backfill logic and Sync 1.1 based syncing.

This inefficiency can be fixed later by modifying upstream tap cli or embedding tap into spindle.

+--------- all tangled users --------+
|                                    |
| +-- users known to spindle-tap --+ |
| |  (PR author / manually added)  | |
| |                                | |
| | +----------------------------+ | |
| | |   users known to spindle   | | |
| | |  (members / collaborators) | | |
| | +----------------------------+ | |
| +--------------------------------+ |
+------------------------------------+

Close: https://tangled.org/tangled.org/core/issues/341

Signed-off-by: Seongmin Lee git@boltless.me

Labels

None yet.

assignee

None yet.

Participants 1
AT URI
at://did:plc:xasnlahkri4ewmbuzly2rlc5/sh.tangled.repo.pull/3mckguakeya22
+25 -25
Interdiff #3 #4
nix/modules/spindle.nix

This patch was likely rebased, as context lines do not match.

nix/vm.nix

This file has not been changed.

spindle/config/config.go

This file has not been changed.

spindle/db/db.go

This file has not been changed.

spindle/db/known_dids.go

This file has not been changed.

spindle/db/repos.go

This file has not been changed.

spindle/ingester.go

This file has not been changed.

+4 -4
spindle/server.go
··· 27 "tangled.org/core/spindle/queue" 28 "tangled.org/core/spindle/secrets" 29 "tangled.org/core/spindle/xrpc" 30 - "tangled.org/core/tap" 31 "tangled.org/core/xrpc/serviceauth" 32 ) 33 ··· 35 var defaultMotd []byte 36 37 type Spindle struct { 38 - tap *tap.Client 39 db *db.DB 40 e *rbac2.Enforcer 41 l *slog.Logger ··· 94 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount) 95 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount) 96 97 - tap := tap.NewClient(cfg.Server.TapUrl, "") 98 99 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl) 100 ··· 211 212 go func() { 213 s.l.Info("starting tap stream consumer") 214 - s.tap.Connect(ctx, &tap.SimpleIndexer{ 215 EventHandler: s.processEvent, 216 }) 217 }()
··· 27 "tangled.org/core/spindle/queue" 28 "tangled.org/core/spindle/secrets" 29 "tangled.org/core/spindle/xrpc" 30 + "tangled.org/core/tapc" 31 "tangled.org/core/xrpc/serviceauth" 32 ) 33 ··· 35 var defaultMotd []byte 36 37 type Spindle struct { 38 + tap *tapc.Client 39 db *db.DB 40 e *rbac2.Enforcer 41 l *slog.Logger ··· 94 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount) 95 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount) 96 97 + tap := tapc.NewClient(cfg.Server.TapUrl, "") 98 99 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl) 100 ··· 211 212 go func() { 213 s.l.Info("starting tap stream consumer") 214 + s.tap.Connect(ctx, &tapc.SimpleIndexer{ 215 EventHandler: s.processEvent, 216 }) 217 }()
+21 -21
spindle/tap.go
··· 10 "tangled.org/core/api/tangled" 11 "tangled.org/core/eventconsumer" 12 "tangled.org/core/spindle/db" 13 - "tangled.org/core/tap" 14 ) 15 16 - func (s *Spindle) processEvent(ctx context.Context, evt tap.Event) error { 17 l := s.l.With("component", "tapIndexer") 18 19 var err error 20 switch evt.Type { 21 - case tap.EvtRecord: 22 switch evt.Record.Collection.String() { 23 case tangled.SpindleMemberNSID: 24 err = s.processMember(ctx, evt) ··· 29 case tangled.RepoPullNSID: 30 err = s.processPull(ctx, evt) 31 } 32 - case tap.EvtIdentity: 33 // no-op 34 } 35 ··· 42 43 // NOTE: make sure to return nil if we don't need to retry (e.g. forbidden, unrelated) 44 45 - func (s *Spindle) processMember(ctx context.Context, evt tap.Event) error { 46 l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri()) 47 48 l.Info("processing spindle.member record") ··· 54 } 55 56 switch evt.Record.Action { 57 - case tap.RecordCreateAction, tap.RecordUpdateAction: 58 record := tangled.SpindleMember{} 59 if err := json.Unmarshal(evt.Record.Record, &record); err != nil { 60 return fmt.Errorf("parsing record: %w", err) ··· 84 return fmt.Errorf("adding member to rbac: %w", err) 85 } 86 if err := s.tap.AddRepos(ctx, []syntax.DID{syntax.DID(record.Subject)}); err != nil { 87 - return fmt.Errorf("adding did to tap: %w", err) 88 } 89 90 l.Info("added member", "member", record.Subject) 91 return nil 92 93 - case tap.RecordDeleteAction: 94 var ( 95 did = evt.Record.Did.String() 96 rkey = evt.Record.Rkey.String() ··· 107 return fmt.Errorf("removing member from rbac: %w", err) 108 } 109 if err := s.tapSafeRemoveDid(ctx, member.Subject); err != nil { 110 - return fmt.Errorf("removing did from tap: %w", err) 111 } 112 113 l.Info("removed member", "member", member.Subject) ··· 116 return nil 117 } 118 119 - func (s *Spindle) processCollaborator(ctx context.Context, evt tap.Event) error { 120 l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri()) 121 122 l.Info("processing repo.collaborator record") ··· 128 } 129 130 switch evt.Record.Action { 131 - case tap.RecordCreateAction, tap.RecordUpdateAction: 132 record := tangled.RepoCollaborator{} 133 if err := json.Unmarshal(evt.Record.Record, &record); err != nil { 134 l.Error("invalid record", "err", err) ··· 159 return fmt.Errorf("adding collaborator to rbac: %w", err) 160 } 161 if err := s.tap.AddRepos(ctx, []syntax.DID{syntax.DID(record.Subject)}); err != nil { 162 - return fmt.Errorf("adding did to tap: %w", err) 163 } 164 165 l.Info("add repo collaborator", "subejct", record.Subject, "repo", record.Repo) 166 return nil 167 168 - case tap.RecordDeleteAction: 169 // get existing collaborator 170 collaborator, err := s.db.GetRepoCollaborator(evt.Record.Did, evt.Record.Rkey) 171 if err != nil { ··· 185 return fmt.Errorf("removing collaborator from rbac: %w", err) 186 } 187 if err := s.tapSafeRemoveDid(ctx, collaborator.Subject); err != nil { 188 - return fmt.Errorf("removing did from tap: %w", err) 189 } 190 191 l.Info("removed repo collaborator", "subejct", collaborator.Subject, "repo", collaborator.Repo) ··· 194 return nil 195 } 196 197 - func (s *Spindle) processRepo(ctx context.Context, evt tap.Event) error { 198 l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri()) 199 200 l.Info("processing repo record") ··· 206 } 207 208 switch evt.Record.Action { 209 - case tap.RecordCreateAction, tap.RecordUpdateAction: 210 record := tangled.Repo{} 211 if err := json.Unmarshal(evt.Record.Record, &record); err != nil { 212 return fmt.Errorf("parsing record: %w", err) ··· 245 l.Info("added repo", "repo", evt.Record.AtUri()) 246 return nil 247 248 - case tap.RecordDeleteAction: 249 // check perms for this user 250 if ok, err := s.e.IsRepoOwner(evt.Record.Did, evt.Record.AtUri()); !ok || err != nil { 251 l.Warn("forbidden request: not repo owner", "did", evt.Record.Did, "err", err) ··· 266 return nil 267 } 268 269 - func (s *Spindle) processPull(ctx context.Context, evt tap.Event) error { 270 l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri()) 271 272 l.Info("processing pull record") 273 274 switch evt.Record.Action { 275 - case tap.RecordCreateAction, tap.RecordUpdateAction: 276 // TODO 277 - case tap.RecordDeleteAction: 278 // TODO 279 } 280 return nil ··· 287 } 288 if !known { 289 if err := s.tap.RemoveRepos(ctx, []syntax.DID{did}); err != nil { 290 - return fmt.Errorf("removing did from tap: %w", err) 291 } 292 } 293 return nil
··· 10 "tangled.org/core/api/tangled" 11 "tangled.org/core/eventconsumer" 12 "tangled.org/core/spindle/db" 13 + "tangled.org/core/tapc" 14 ) 15 16 + func (s *Spindle) processEvent(ctx context.Context, evt tapc.Event) error { 17 l := s.l.With("component", "tapIndexer") 18 19 var err error 20 switch evt.Type { 21 + case tapc.EvtRecord: 22 switch evt.Record.Collection.String() { 23 case tangled.SpindleMemberNSID: 24 err = s.processMember(ctx, evt) ··· 29 case tangled.RepoPullNSID: 30 err = s.processPull(ctx, evt) 31 } 32 + case tapc.EvtIdentity: 33 // no-op 34 } 35 ··· 42 43 // NOTE: make sure to return nil if we don't need to retry (e.g. forbidden, unrelated) 44 45 + func (s *Spindle) processMember(ctx context.Context, evt tapc.Event) error { 46 l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri()) 47 48 l.Info("processing spindle.member record") ··· 54 } 55 56 switch evt.Record.Action { 57 + case tapc.RecordCreateAction, tapc.RecordUpdateAction: 58 record := tangled.SpindleMember{} 59 if err := json.Unmarshal(evt.Record.Record, &record); err != nil { 60 return fmt.Errorf("parsing record: %w", err) ··· 84 return fmt.Errorf("adding member to rbac: %w", err) 85 } 86 if err := s.tap.AddRepos(ctx, []syntax.DID{syntax.DID(record.Subject)}); err != nil { 87 + return fmt.Errorf("adding did to tapc: %w", err) 88 } 89 90 l.Info("added member", "member", record.Subject) 91 return nil 92 93 + case tapc.RecordDeleteAction: 94 var ( 95 did = evt.Record.Did.String() 96 rkey = evt.Record.Rkey.String() ··· 107 return fmt.Errorf("removing member from rbac: %w", err) 108 } 109 if err := s.tapSafeRemoveDid(ctx, member.Subject); err != nil { 110 + return fmt.Errorf("removing did from tapc: %w", err) 111 } 112 113 l.Info("removed member", "member", member.Subject) ··· 116 return nil 117 } 118 119 + func (s *Spindle) processCollaborator(ctx context.Context, evt tapc.Event) error { 120 l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri()) 121 122 l.Info("processing repo.collaborator record") ··· 128 } 129 130 switch evt.Record.Action { 131 + case tapc.RecordCreateAction, tapc.RecordUpdateAction: 132 record := tangled.RepoCollaborator{} 133 if err := json.Unmarshal(evt.Record.Record, &record); err != nil { 134 l.Error("invalid record", "err", err) ··· 159 return fmt.Errorf("adding collaborator to rbac: %w", err) 160 } 161 if err := s.tap.AddRepos(ctx, []syntax.DID{syntax.DID(record.Subject)}); err != nil { 162 + return fmt.Errorf("adding did to tapc: %w", err) 163 } 164 165 l.Info("add repo collaborator", "subejct", record.Subject, "repo", record.Repo) 166 return nil 167 168 + case tapc.RecordDeleteAction: 169 // get existing collaborator 170 collaborator, err := s.db.GetRepoCollaborator(evt.Record.Did, evt.Record.Rkey) 171 if err != nil { ··· 185 return fmt.Errorf("removing collaborator from rbac: %w", err) 186 } 187 if err := s.tapSafeRemoveDid(ctx, collaborator.Subject); err != nil { 188 + return fmt.Errorf("removing did from tapc: %w", err) 189 } 190 191 l.Info("removed repo collaborator", "subejct", collaborator.Subject, "repo", collaborator.Repo) ··· 194 return nil 195 } 196 197 + func (s *Spindle) processRepo(ctx context.Context, evt tapc.Event) error { 198 l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri()) 199 200 l.Info("processing repo record") ··· 206 } 207 208 switch evt.Record.Action { 209 + case tapc.RecordCreateAction, tapc.RecordUpdateAction: 210 record := tangled.Repo{} 211 if err := json.Unmarshal(evt.Record.Record, &record); err != nil { 212 return fmt.Errorf("parsing record: %w", err) ··· 245 l.Info("added repo", "repo", evt.Record.AtUri()) 246 return nil 247 248 + case tapc.RecordDeleteAction: 249 // check perms for this user 250 if ok, err := s.e.IsRepoOwner(evt.Record.Did, evt.Record.AtUri()); !ok || err != nil { 251 l.Warn("forbidden request: not repo owner", "did", evt.Record.Did, "err", err) ··· 266 return nil 267 } 268 269 + func (s *Spindle) processPull(ctx context.Context, evt tapc.Event) error { 270 l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri()) 271 272 l.Info("processing pull record") 273 274 switch evt.Record.Action { 275 + case tapc.RecordCreateAction, tapc.RecordUpdateAction: 276 // TODO 277 + case tapc.RecordDeleteAction: 278 // TODO 279 } 280 return nil ··· 287 } 288 if !known { 289 if err := s.tap.RemoveRepos(ctx, []syntax.DID{did}); err != nil { 290 + return fmt.Errorf("removing did from tapc: %w", err) 291 } 292 } 293 return nil

History

5 rounds 0 comments
sign up or login to add to the discussion
1 commit
expand
nix,spindle: switch to tap from jetstream
merge conflicts detected
expand
  • appview/state/state.go:35
expand 0 comments
1 commit
expand
nix,spindle: switch to tap from jetstream
expand 0 comments
1 commit
expand
nix,spindle: switch to tap from jetstream
expand 0 comments
1 commit
expand
nix,spindle: switch to tap from jetstream
expand 0 comments
1 commit
expand
nix,spindle: switch to tap from jetstream
expand 0 comments