this repo has no description
1package spindle 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "time" 8 9 "github.com/bluesky-social/indigo/atproto/syntax" 10 "tangled.org/core/api/tangled" 11 "tangled.org/core/eventconsumer" 12 "tangled.org/core/spindle/db" 13 "tangled.org/core/spindle/git" 14 "tangled.org/core/tap" 15) 16 17func (s *Spindle) processEvent(ctx context.Context, evt tap.Event) error { 18 l := s.l.With("component", "tapIndexer") 19 20 var err error 21 switch evt.Type { 22 case tap.EvtRecord: 23 switch evt.Record.Collection.String() { 24 case tangled.SpindleMemberNSID: 25 err = s.processMember(ctx, evt) 26 case tangled.RepoNSID: 27 err = s.processRepo(ctx, evt) 28 case tangled.RepoCollaboratorNSID: 29 err = s.processCollaborator(ctx, evt) 30 case tangled.RepoPullNSID: 31 err = s.processPull(ctx, evt) 32 } 33 case tap.EvtIdentity: 34 // no-op 35 } 36 37 if err != nil { 38 l.Error("failed to process message. will retry later", "event.ID", evt.ID, "err", err) 39 return err 40 } 41 return nil 42} 43 44// NOTE: make sure to return nil if we don't need to retry (e.g. forbidden, unrelated) 45 46func (s *Spindle) processMember(ctx context.Context, evt tap.Event) error { 47 l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri()) 48 49 l.Info("processing spindle.member record") 50 51 // only listen to members 52 if ok, err := s.e.IsSpindleMemberInviteAllowed(evt.Record.Did, s.cfg.Server.Did()); !ok || err != nil { 53 l.Warn("forbidden request: member invite not allowed", "did", evt.Record.Did, "error", err) 54 return nil 55 } 56 57 switch evt.Record.Action { 58 case tap.RecordCreateAction, tap.RecordUpdateAction: 59 record := tangled.SpindleMember{} 60 if err := json.Unmarshal(evt.Record.Record, &record); err != nil { 61 return fmt.Errorf("parsing record: %w", err) 62 } 63 64 domain := s.cfg.Server.Hostname 65 if record.Instance != domain { 66 l.Info("domain mismatch", "domain", record.Instance, "expected", domain) 67 return nil 68 } 69 70 created, err := time.Parse(record.CreatedAt, time.RFC3339) 71 if err != nil { 72 created = time.Now() 73 } 74 if err := db.AddSpindleMember(s.db, db.SpindleMember{ 75 Did: evt.Record.Did, 76 Rkey: evt.Record.Rkey.String(), 77 Instance: record.Instance, 78 Subject: syntax.DID(record.Subject), 79 Created: created, 80 }); err != nil { 81 l.Error("failed to add member", "error", err) 82 return fmt.Errorf("adding member to db: %w", err) 83 } 84 if err := s.e.AddSpindleMember(syntax.DID(record.Subject), s.cfg.Server.Did()); err != nil { 85 return fmt.Errorf("adding member to rbac: %w", err) 86 } 87 if err := s.tap.AddRepos(ctx, []syntax.DID{syntax.DID(record.Subject)}); err != nil { 88 return fmt.Errorf("adding did to tap", err) 89 } 90 91 l.Info("added member", "member", record.Subject) 92 return nil 93 94 case tap.RecordDeleteAction: 95 var ( 96 did = evt.Record.Did.String() 97 rkey = evt.Record.Rkey.String() 98 ) 99 member, err := db.GetSpindleMember(s.db, did, rkey) 100 if err != nil { 101 return fmt.Errorf("finding member: %w", err) 102 } 103 104 if err := db.RemoveSpindleMember(s.db, did, rkey); err != nil { 105 return fmt.Errorf("removing member from db: %w", err) 106 } 107 if err := s.e.RemoveSpindleMember(member.Subject, s.cfg.Server.Did()); err != nil { 108 return fmt.Errorf("removing member from rbac: %w", err) 109 } 110 if err := s.tapSafeRemoveDid(ctx, member.Subject); err != nil { 111 return fmt.Errorf("removing did from tap: %w", err) 112 } 113 114 l.Info("removed member", "member", member.Subject) 115 return nil 116 } 117 return nil 118} 119 120func (s *Spindle) processCollaborator(ctx context.Context, evt tap.Event) error { 121 l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri()) 122 123 l.Info("processing repo.collaborator record") 124 125 // only listen to members 126 if ok, err := s.e.IsSpindleMember(evt.Record.Did, s.cfg.Server.Did()); !ok || err != nil { 127 l.Warn("forbidden request: not spindle member", "did", evt.Record.Did, "err", err) 128 return nil 129 } 130 131 switch evt.Record.Action { 132 case tap.RecordCreateAction, tap.RecordUpdateAction: 133 record := tangled.RepoCollaborator{} 134 if err := json.Unmarshal(evt.Record.Record, &record); err != nil { 135 l.Error("invalid record", "err", err) 136 return fmt.Errorf("parsing record: %w", err) 137 } 138 139 // retry later if target repo is not ingested yet 140 if _, err := s.db.GetRepo(syntax.ATURI(record.Repo)); err != nil { 141 l.Warn("target repo is not ingested yet", "repo", record.Repo, "err", err) 142 return fmt.Errorf("target repo is unknown") 143 } 144 145 // check perms for this user 146 if ok, err := s.e.IsRepoCollaboratorInviteAllowed(evt.Record.Did, syntax.ATURI(record.Repo)); !ok || err != nil { 147 l.Warn("forbidden request collaborator invite not allowed", "did", evt.Record.Did, "err", err) 148 return nil 149 } 150 151 if err := s.db.PutRepoCollaborator(&db.RepoCollaborator{ 152 Did: evt.Record.Did, 153 Rkey: evt.Record.Rkey, 154 Repo: syntax.ATURI(record.Repo), 155 Subject: syntax.DID(record.Subject), 156 }); err != nil { 157 return fmt.Errorf("adding collaborator to db: %w", err) 158 } 159 if err := s.e.AddRepoCollaborator(syntax.DID(record.Subject), syntax.ATURI(record.Repo)); err != nil { 160 return fmt.Errorf("adding collaborator to rbac: %w", err) 161 } 162 if err := s.tap.AddRepos(ctx, []syntax.DID{syntax.DID(record.Subject)}); err != nil { 163 return fmt.Errorf("adding did to tap: %w", err) 164 } 165 166 l.Info("add repo collaborator", "subejct", record.Subject, "repo", record.Repo) 167 return nil 168 169 case tap.RecordDeleteAction: 170 // get existing collaborator 171 collaborator, err := s.db.GetRepoCollaborator(evt.Record.Did, evt.Record.Rkey) 172 if err != nil { 173 return fmt.Errorf("failed to get existing collaborator info: %w", err) 174 } 175 176 // check perms for this user 177 if ok, err := s.e.IsRepoCollaboratorInviteAllowed(evt.Record.Did, collaborator.Repo); !ok || err != nil { 178 l.Warn("forbidden request collaborator invite not allowed", "did", evt.Record.Did, "err", err) 179 return nil 180 } 181 182 if err := s.db.RemoveRepoCollaborator(collaborator.Subject, collaborator.Rkey); err != nil { 183 return fmt.Errorf("removing collaborator from db: %w", err) 184 } 185 if err := s.e.RemoveRepoCollaborator(collaborator.Subject, collaborator.Repo); err != nil { 186 return fmt.Errorf("removing collaborator from rbac: %w", err) 187 } 188 if err := s.tapSafeRemoveDid(ctx, collaborator.Subject); err != nil { 189 return fmt.Errorf("removing did from tap: %w", err) 190 } 191 192 l.Info("removed repo collaborator", "subejct", collaborator.Subject, "repo", collaborator.Repo) 193 return nil 194 } 195 return nil 196} 197 198func (s *Spindle) processRepo(ctx context.Context, evt tap.Event) error { 199 l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri()) 200 201 l.Info("processing repo record") 202 203 // only listen to members 204 if ok, err := s.e.IsSpindleMember(evt.Record.Did, s.cfg.Server.Did()); !ok || err != nil { 205 l.Warn("forbidden request: not spindle member", "did", evt.Record.Did, "err", err) 206 return nil 207 } 208 209 switch evt.Record.Action { 210 case tap.RecordCreateAction, tap.RecordUpdateAction: 211 record := tangled.Repo{} 212 if err := json.Unmarshal(evt.Record.Record, &record); err != nil { 213 return fmt.Errorf("parsing record: %w", err) 214 } 215 216 domain := s.cfg.Server.Hostname 217 if record.Spindle == nil || *record.Spindle != domain { 218 if record.Spindle == nil { 219 l.Info("spindle isn't configured", "name", record.Name) 220 } else { 221 l.Info("different spindle configured", "name", record.Name, "spindle", *record.Spindle, "domain", domain) 222 } 223 if err := s.db.DeleteRepo(evt.Record.Did, evt.Record.Rkey); err != nil { 224 return fmt.Errorf("deleting repo from db: %w", err) 225 } 226 return nil 227 } 228 229 repo := &db.Repo{ 230 Did: evt.Record.Did, 231 Rkey: evt.Record.Rkey, 232 Name: record.Name, 233 Knot: record.Knot, 234 } 235 236 if err := s.db.PutRepo(repo); err != nil { 237 return fmt.Errorf("adding repo to db: %w", err) 238 } 239 240 if err := s.e.AddRepo(evt.Record.AtUri()); err != nil { 241 return fmt.Errorf("adding repo to rbac") 242 } 243 244 // add this knot to the event consumer 245 src := eventconsumer.NewKnotSource(record.Knot) 246 s.ks.AddSource(context.Background(), src) 247 248 // setup sparse sync 249 repoCloneUri := s.newRepoCloneUrl(repo.Knot, repo.Did.String(), repo.Name) 250 repoPath := s.newRepoPath(repo.Did, repo.Rkey) 251 if err := git.SparseSyncGitRepo(ctx, repoCloneUri, repoPath, ""); err != nil { 252 return fmt.Errorf("setting up sparse-clone git repo: %w", err) 253 } 254 255 l.Info("added repo", "repo", evt.Record.AtUri()) 256 return nil 257 258 case tap.RecordDeleteAction: 259 // check perms for this user 260 if ok, err := s.e.IsRepoOwner(evt.Record.Did, evt.Record.AtUri()); !ok || err != nil { 261 l.Warn("forbidden request: not repo owner", "did", evt.Record.Did, "err", err) 262 return nil 263 } 264 265 if err := s.db.DeleteRepo(evt.Record.Did, evt.Record.Rkey); err != nil { 266 return fmt.Errorf("deleting repo from db: %w", err) 267 } 268 269 if err := s.e.DeleteRepo(evt.Record.AtUri()); err != nil { 270 return fmt.Errorf("deleting repo from rbac: %w", err) 271 } 272 273 l.Info("deleted repo", "repo", evt.Record.AtUri()) 274 return nil 275 } 276 return nil 277} 278 279func (s *Spindle) processPull(ctx context.Context, evt tap.Event) error { 280 l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri()) 281 282 l.Info("processing pull record") 283 284 switch evt.Record.Action { 285 case tap.RecordCreateAction, tap.RecordUpdateAction: 286 // TODO 287 case tap.RecordDeleteAction: 288 // TODO 289 } 290 return nil 291} 292 293func (s *Spindle) tapSafeRemoveDid(ctx context.Context, did syntax.DID) error { 294 known, err := s.db.IsKnownDid(syntax.DID(did)) 295 if err != nil { 296 return fmt.Errorf("ensuring did known state: %w", err) 297 } 298 if !known { 299 if err := s.tap.RemoveRepos(ctx, []syntax.DID{did}); err != nil { 300 return fmt.Errorf("removing did from tap: %w", err) 301 } 302 } 303 return nil 304}