this repo has no description
1package spindle 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "time" 8 9 "github.com/bluesky-social/indigo/atproto/syntax" 10 "tangled.org/core/api/tangled" 11 "tangled.org/core/eventconsumer" 12 "tangled.org/core/spindle/db" 13 "tangled.org/core/tap" 14) 15 16func (s *Spindle) processEvent(ctx context.Context, evt tap.Event) error { 17 l := s.l.With("component", "tapIndexer") 18 19 var err error 20 switch evt.Type { 21 case tap.EvtRecord: 22 switch evt.Record.Collection.String() { 23 case tangled.SpindleMemberNSID: 24 err = s.processMember(ctx, evt) 25 case tangled.RepoNSID: 26 err = s.processRepo(ctx, evt) 27 case tangled.RepoCollaboratorNSID: 28 err = s.processCollaborator(ctx, evt) 29 case tangled.RepoPullNSID: 30 err = s.processPull(ctx, evt) 31 } 32 case tap.EvtIdentity: 33 // no-op 34 } 35 36 if err != nil { 37 l.Error("failed to process message. will retry later", "event.ID", evt.ID, "err", err) 38 return err 39 } 40 return nil 41} 42 43// NOTE: make sure to return nil if we don't need to retry (e.g. forbidden, unrelated) 44 45func (s *Spindle) processMember(ctx context.Context, evt tap.Event) error { 46 l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri()) 47 48 l.Info("processing spindle.member record") 49 50 // only listen to members 51 if ok, err := s.e.IsSpindleMemberInviteAllowed(evt.Record.Did, s.cfg.Server.Did()); !ok || err != nil { 52 l.Warn("forbidden request: member invite not allowed", "did", evt.Record.Did, "error", err) 53 return nil 54 } 55 56 switch evt.Record.Action { 57 case tap.RecordCreateAction, tap.RecordUpdateAction: 58 record := tangled.SpindleMember{} 59 if err := json.Unmarshal(evt.Record.Record, &record); err != nil { 60 return fmt.Errorf("parsing record: %w", err) 61 } 62 63 domain := s.cfg.Server.Hostname 64 if record.Instance != domain { 65 l.Info("domain mismatch", "domain", record.Instance, "expected", domain) 66 return nil 67 } 68 69 created, err := time.Parse(record.CreatedAt, time.RFC3339) 70 if err != nil { 71 created = time.Now() 72 } 73 if err := db.AddSpindleMember(s.db, db.SpindleMember{ 74 Did: evt.Record.Did, 75 Rkey: evt.Record.Rkey.String(), 76 Instance: record.Instance, 77 Subject: syntax.DID(record.Subject), 78 Created: created, 79 }); err != nil { 80 l.Error("failed to add member", "error", err) 81 return fmt.Errorf("adding member to db: %w", err) 82 } 83 if err := s.e.AddSpindleMember(syntax.DID(record.Subject), s.cfg.Server.Did()); err != nil { 84 return fmt.Errorf("adding member to rbac: %w", err) 85 } 86 if err := s.tap.AddRepos(ctx, []syntax.DID{syntax.DID(record.Subject)}); err != nil { 87 return fmt.Errorf("adding did to tap", err) 88 } 89 90 l.Info("added member", "member", record.Subject) 91 return nil 92 93 case tap.RecordDeleteAction: 94 var ( 95 did = evt.Record.Did.String() 96 rkey = evt.Record.Rkey.String() 97 ) 98 member, err := db.GetSpindleMember(s.db, did, rkey) 99 if err != nil { 100 return fmt.Errorf("finding member: %w", err) 101 } 102 103 if err := db.RemoveSpindleMember(s.db, did, rkey); err != nil { 104 return fmt.Errorf("removing member from db: %w", err) 105 } 106 if err := s.e.RemoveSpindleMember(member.Subject, s.cfg.Server.Did()); err != nil { 107 return fmt.Errorf("removing member from rbac: %w", err) 108 } 109 if err := s.tapSafeRemoveDid(ctx, member.Subject); err != nil { 110 return fmt.Errorf("removing did from tap: %w", err) 111 } 112 113 l.Info("removed member", "member", member.Subject) 114 return nil 115 } 116 return nil 117} 118 119func (s *Spindle) processCollaborator(ctx context.Context, evt tap.Event) error { 120 l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri()) 121 122 l.Info("processing repo.collaborator record") 123 124 // only listen to members 125 if ok, err := s.e.IsSpindleMember(evt.Record.Did, s.cfg.Server.Did()); !ok || err != nil { 126 l.Warn("forbidden request: not spindle member", "did", evt.Record.Did, "err", err) 127 return nil 128 } 129 130 switch evt.Record.Action { 131 case tap.RecordCreateAction, tap.RecordUpdateAction: 132 record := tangled.RepoCollaborator{} 133 if err := json.Unmarshal(evt.Record.Record, &record); err != nil { 134 l.Error("invalid record", "err", err) 135 return fmt.Errorf("parsing record: %w", err) 136 } 137 138 // retry later if target repo is not ingested yet 139 if _, err := s.db.GetRepo(syntax.ATURI(record.Repo)); err != nil { 140 l.Warn("target repo is not ingested yet", "repo", record.Repo, "err", err) 141 return fmt.Errorf("target repo is unknown") 142 } 143 144 // check perms for this user 145 if ok, err := s.e.IsRepoCollaboratorInviteAllowed(evt.Record.Did, syntax.ATURI(record.Repo)); !ok || err != nil { 146 l.Warn("forbidden request collaborator invite not allowed", "did", evt.Record.Did, "err", err) 147 return nil 148 } 149 150 if err := s.db.PutRepoCollaborator(&db.RepoCollaborator{ 151 Did: evt.Record.Did, 152 Rkey: evt.Record.Rkey, 153 Repo: syntax.ATURI(record.Repo), 154 Subject: syntax.DID(record.Subject), 155 }); err != nil { 156 return fmt.Errorf("adding collaborator to db: %w", err) 157 } 158 if err := s.e.AddRepoCollaborator(syntax.DID(record.Subject), syntax.ATURI(record.Repo)); err != nil { 159 return fmt.Errorf("adding collaborator to rbac: %w", err) 160 } 161 if err := s.tap.AddRepos(ctx, []syntax.DID{syntax.DID(record.Subject)}); err != nil { 162 return fmt.Errorf("adding did to tap: %w", err) 163 } 164 165 l.Info("add repo collaborator", "subejct", record.Subject, "repo", record.Repo) 166 return nil 167 168 case tap.RecordDeleteAction: 169 // get existing collaborator 170 collaborator, err := s.db.GetRepoCollaborator(evt.Record.Did, evt.Record.Rkey) 171 if err != nil { 172 return fmt.Errorf("failed to get existing collaborator info: %w", err) 173 } 174 175 // check perms for this user 176 if ok, err := s.e.IsRepoCollaboratorInviteAllowed(evt.Record.Did, collaborator.Repo); !ok || err != nil { 177 l.Warn("forbidden request collaborator invite not allowed", "did", evt.Record.Did, "err", err) 178 return nil 179 } 180 181 if err := s.db.RemoveRepoCollaborator(collaborator.Subject, collaborator.Rkey); err != nil { 182 return fmt.Errorf("removing collaborator from db: %w", err) 183 } 184 if err := s.e.RemoveRepoCollaborator(collaborator.Subject, collaborator.Repo); err != nil { 185 return fmt.Errorf("removing collaborator from rbac: %w", err) 186 } 187 if err := s.tapSafeRemoveDid(ctx, collaborator.Subject); err != nil { 188 return fmt.Errorf("removing did from tap: %w", err) 189 } 190 191 l.Info("removed repo collaborator", "subejct", collaborator.Subject, "repo", collaborator.Repo) 192 return nil 193 } 194 return nil 195} 196 197func (s *Spindle) processRepo(ctx context.Context, evt tap.Event) error { 198 l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri()) 199 200 l.Info("processing repo record") 201 202 // only listen to members 203 if ok, err := s.e.IsSpindleMember(evt.Record.Did, s.cfg.Server.Did()); !ok || err != nil { 204 l.Warn("forbidden request: not spindle member", "did", evt.Record.Did, "err", err) 205 return nil 206 } 207 208 switch evt.Record.Action { 209 case tap.RecordCreateAction, tap.RecordUpdateAction: 210 record := tangled.Repo{} 211 if err := json.Unmarshal(evt.Record.Record, &record); err != nil { 212 return fmt.Errorf("parsing record: %w", err) 213 } 214 215 domain := s.cfg.Server.Hostname 216 if record.Spindle == nil || *record.Spindle != domain { 217 if record.Spindle == nil { 218 l.Info("spindle isn't configured", "name", record.Name) 219 } else { 220 l.Info("different spindle configured", "name", record.Name, "spindle", *record.Spindle, "domain", domain) 221 } 222 if err := s.db.DeleteRepo(evt.Record.Did, evt.Record.Rkey); err != nil { 223 return fmt.Errorf("deleting repo from db: %w", err) 224 } 225 return nil 226 } 227 228 if err := s.db.PutRepo(&db.Repo{ 229 Did: evt.Record.Did, 230 Rkey: evt.Record.Rkey, 231 Name: record.Name, 232 Knot: record.Knot, 233 }); err != nil { 234 return fmt.Errorf("adding repo to db: %w", err) 235 } 236 237 if err := s.e.AddRepo(evt.Record.AtUri()); err != nil { 238 return fmt.Errorf("adding repo to rbac") 239 } 240 241 // add this knot to the event consumer 242 src := eventconsumer.NewKnotSource(record.Knot) 243 s.ks.AddSource(context.Background(), src) 244 245 l.Info("added repo", "repo", evt.Record.AtUri()) 246 return nil 247 248 case tap.RecordDeleteAction: 249 // check perms for this user 250 if ok, err := s.e.IsRepoOwner(evt.Record.Did, evt.Record.AtUri()); !ok || err != nil { 251 l.Warn("forbidden request: not repo owner", "did", evt.Record.Did, "err", err) 252 return nil 253 } 254 255 if err := s.db.DeleteRepo(evt.Record.Did, evt.Record.Rkey); err != nil { 256 return fmt.Errorf("deleting repo from db: %w", err) 257 } 258 259 if err := s.e.DeleteRepo(evt.Record.AtUri()); err != nil { 260 return fmt.Errorf("deleting repo from rbac: %w", err) 261 } 262 263 l.Info("deleted repo", "repo", evt.Record.AtUri()) 264 return nil 265 } 266 return nil 267} 268 269func (s *Spindle) processPull(ctx context.Context, evt tap.Event) error { 270 l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri()) 271 272 l.Info("processing pull record") 273 274 switch evt.Record.Action { 275 case tap.RecordCreateAction, tap.RecordUpdateAction: 276 // TODO 277 case tap.RecordDeleteAction: 278 // TODO 279 } 280 return nil 281} 282 283func (s *Spindle) tapSafeRemoveDid(ctx context.Context, did syntax.DID) error { 284 known, err := s.db.IsKnownDid(syntax.DID(did)) 285 if err != nil { 286 return fmt.Errorf("ensuring did known state: %w", err) 287 } 288 if !known { 289 if err := s.tap.RemoveRepos(ctx, []syntax.DID{did}); err != nil { 290 return fmt.Errorf("removing did from tap: %w", err) 291 } 292 } 293 return nil 294}