this repo has no description
1package spindle
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "time"
8
9 "github.com/bluesky-social/indigo/atproto/syntax"
10 "tangled.org/core/api/tangled"
11 "tangled.org/core/eventconsumer"
12 "tangled.org/core/spindle/db"
13 "tangled.org/core/tap"
14)
15
16func (s *Spindle) processEvent(ctx context.Context, evt tap.Event) error {
17 l := s.l.With("component", "tapIndexer")
18
19 var err error
20 switch evt.Type {
21 case tap.EvtRecord:
22 switch evt.Record.Collection.String() {
23 case tangled.SpindleMemberNSID:
24 err = s.processMember(ctx, evt)
25 case tangled.RepoNSID:
26 err = s.processRepo(ctx, evt)
27 case tangled.RepoCollaboratorNSID:
28 err = s.processCollaborator(ctx, evt)
29 case tangled.RepoPullNSID:
30 err = s.processPull(ctx, evt)
31 }
32 case tap.EvtIdentity:
33 // no-op
34 }
35
36 if err != nil {
37 l.Error("failed to process message. will retry later", "event.ID", evt.ID, "err", err)
38 return err
39 }
40 return nil
41}
42
43// NOTE: make sure to return nil if we don't need to retry (e.g. forbidden, unrelated)
44
45func (s *Spindle) processMember(ctx context.Context, evt tap.Event) error {
46 l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri())
47
48 l.Info("processing spindle.member record")
49
50 // only listen to members
51 if ok, err := s.e.IsSpindleMemberInviteAllowed(evt.Record.Did, s.cfg.Server.Did()); !ok || err != nil {
52 l.Warn("forbidden request: member invite not allowed", "did", evt.Record.Did, "error", err)
53 return nil
54 }
55
56 switch evt.Record.Action {
57 case tap.RecordCreateAction, tap.RecordUpdateAction:
58 record := tangled.SpindleMember{}
59 if err := json.Unmarshal(evt.Record.Record, &record); err != nil {
60 return fmt.Errorf("parsing record: %w", err)
61 }
62
63 domain := s.cfg.Server.Hostname
64 if record.Instance != domain {
65 l.Info("domain mismatch", "domain", record.Instance, "expected", domain)
66 return nil
67 }
68
69 created, err := time.Parse(record.CreatedAt, time.RFC3339)
70 if err != nil {
71 created = time.Now()
72 }
73 if err := db.AddSpindleMember(s.db, db.SpindleMember{
74 Did: evt.Record.Did,
75 Rkey: evt.Record.Rkey.String(),
76 Instance: record.Instance,
77 Subject: syntax.DID(record.Subject),
78 Created: created,
79 }); err != nil {
80 l.Error("failed to add member", "error", err)
81 return fmt.Errorf("adding member to db: %w", err)
82 }
83 if err := s.e.AddSpindleMember(syntax.DID(record.Subject), s.cfg.Server.Did()); err != nil {
84 return fmt.Errorf("adding member to rbac: %w", err)
85 }
86 if err := s.tap.AddRepos(ctx, []syntax.DID{syntax.DID(record.Subject)}); err != nil {
87 return fmt.Errorf("adding did to tap", err)
88 }
89
90 l.Info("added member", "member", record.Subject)
91 return nil
92
93 case tap.RecordDeleteAction:
94 var (
95 did = evt.Record.Did.String()
96 rkey = evt.Record.Rkey.String()
97 )
98 member, err := db.GetSpindleMember(s.db, did, rkey)
99 if err != nil {
100 return fmt.Errorf("finding member: %w", err)
101 }
102
103 if err := db.RemoveSpindleMember(s.db, did, rkey); err != nil {
104 return fmt.Errorf("removing member from db: %w", err)
105 }
106 if err := s.e.RemoveSpindleMember(member.Subject, s.cfg.Server.Did()); err != nil {
107 return fmt.Errorf("removing member from rbac: %w", err)
108 }
109 if err := s.tapSafeRemoveDid(ctx, member.Subject); err != nil {
110 return fmt.Errorf("removing did from tap: %w", err)
111 }
112
113 l.Info("removed member", "member", member.Subject)
114 return nil
115 }
116 return nil
117}
118
119func (s *Spindle) processCollaborator(ctx context.Context, evt tap.Event) error {
120 l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri())
121
122 l.Info("processing repo.collaborator record")
123
124 // only listen to members
125 if ok, err := s.e.IsSpindleMember(evt.Record.Did, s.cfg.Server.Did()); !ok || err != nil {
126 l.Warn("forbidden request: not spindle member", "did", evt.Record.Did, "err", err)
127 return nil
128 }
129
130 switch evt.Record.Action {
131 case tap.RecordCreateAction, tap.RecordUpdateAction:
132 record := tangled.RepoCollaborator{}
133 if err := json.Unmarshal(evt.Record.Record, &record); err != nil {
134 l.Error("invalid record", "err", err)
135 return fmt.Errorf("parsing record: %w", err)
136 }
137
138 // retry later if target repo is not ingested yet
139 if _, err := s.db.GetRepo(syntax.ATURI(record.Repo)); err != nil {
140 l.Warn("target repo is not ingested yet", "repo", record.Repo, "err", err)
141 return fmt.Errorf("target repo is unknown")
142 }
143
144 // check perms for this user
145 if ok, err := s.e.IsRepoCollaboratorInviteAllowed(evt.Record.Did, syntax.ATURI(record.Repo)); !ok || err != nil {
146 l.Warn("forbidden request collaborator invite not allowed", "did", evt.Record.Did, "err", err)
147 return nil
148 }
149
150 if err := s.db.PutRepoCollaborator(&db.RepoCollaborator{
151 Did: evt.Record.Did,
152 Rkey: evt.Record.Rkey,
153 Repo: syntax.ATURI(record.Repo),
154 Subject: syntax.DID(record.Subject),
155 }); err != nil {
156 return fmt.Errorf("adding collaborator to db: %w", err)
157 }
158 if err := s.e.AddRepoCollaborator(syntax.DID(record.Subject), syntax.ATURI(record.Repo)); err != nil {
159 return fmt.Errorf("adding collaborator to rbac: %w", err)
160 }
161 if err := s.tap.AddRepos(ctx, []syntax.DID{syntax.DID(record.Subject)}); err != nil {
162 return fmt.Errorf("adding did to tap: %w", err)
163 }
164
165 l.Info("add repo collaborator", "subejct", record.Subject, "repo", record.Repo)
166 return nil
167
168 case tap.RecordDeleteAction:
169 // get existing collaborator
170 collaborator, err := s.db.GetRepoCollaborator(evt.Record.Did, evt.Record.Rkey)
171 if err != nil {
172 return fmt.Errorf("failed to get existing collaborator info: %w", err)
173 }
174
175 // check perms for this user
176 if ok, err := s.e.IsRepoCollaboratorInviteAllowed(evt.Record.Did, collaborator.Repo); !ok || err != nil {
177 l.Warn("forbidden request collaborator invite not allowed", "did", evt.Record.Did, "err", err)
178 return nil
179 }
180
181 if err := s.db.RemoveRepoCollaborator(collaborator.Subject, collaborator.Rkey); err != nil {
182 return fmt.Errorf("removing collaborator from db: %w", err)
183 }
184 if err := s.e.RemoveRepoCollaborator(collaborator.Subject, collaborator.Repo); err != nil {
185 return fmt.Errorf("removing collaborator from rbac: %w", err)
186 }
187 if err := s.tapSafeRemoveDid(ctx, collaborator.Subject); err != nil {
188 return fmt.Errorf("removing did from tap: %w", err)
189 }
190
191 l.Info("removed repo collaborator", "subejct", collaborator.Subject, "repo", collaborator.Repo)
192 return nil
193 }
194 return nil
195}
196
197func (s *Spindle) processRepo(ctx context.Context, evt tap.Event) error {
198 l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri())
199
200 l.Info("processing repo record")
201
202 // only listen to members
203 if ok, err := s.e.IsSpindleMember(evt.Record.Did, s.cfg.Server.Did()); !ok || err != nil {
204 l.Warn("forbidden request: not spindle member", "did", evt.Record.Did, "err", err)
205 return nil
206 }
207
208 switch evt.Record.Action {
209 case tap.RecordCreateAction, tap.RecordUpdateAction:
210 record := tangled.Repo{}
211 if err := json.Unmarshal(evt.Record.Record, &record); err != nil {
212 return fmt.Errorf("parsing record: %w", err)
213 }
214
215 domain := s.cfg.Server.Hostname
216 if record.Spindle == nil || *record.Spindle != domain {
217 if record.Spindle == nil {
218 l.Info("spindle isn't configured", "name", record.Name)
219 } else {
220 l.Info("different spindle configured", "name", record.Name, "spindle", *record.Spindle, "domain", domain)
221 }
222 if err := s.db.DeleteRepo(evt.Record.Did, evt.Record.Rkey); err != nil {
223 return fmt.Errorf("deleting repo from db: %w", err)
224 }
225 return nil
226 }
227
228 if err := s.db.PutRepo(&db.Repo{
229 Did: evt.Record.Did,
230 Rkey: evt.Record.Rkey,
231 Name: record.Name,
232 Knot: record.Knot,
233 }); err != nil {
234 return fmt.Errorf("adding repo to db: %w", err)
235 }
236
237 if err := s.e.AddRepo(evt.Record.AtUri()); err != nil {
238 return fmt.Errorf("adding repo to rbac")
239 }
240
241 // add this knot to the event consumer
242 src := eventconsumer.NewKnotSource(record.Knot)
243 s.ks.AddSource(context.Background(), src)
244
245 l.Info("added repo", "repo", evt.Record.AtUri())
246 return nil
247
248 case tap.RecordDeleteAction:
249 // check perms for this user
250 if ok, err := s.e.IsRepoOwner(evt.Record.Did, evt.Record.AtUri()); !ok || err != nil {
251 l.Warn("forbidden request: not repo owner", "did", evt.Record.Did, "err", err)
252 return nil
253 }
254
255 if err := s.db.DeleteRepo(evt.Record.Did, evt.Record.Rkey); err != nil {
256 return fmt.Errorf("deleting repo from db: %w", err)
257 }
258
259 if err := s.e.DeleteRepo(evt.Record.AtUri()); err != nil {
260 return fmt.Errorf("deleting repo from rbac: %w", err)
261 }
262
263 l.Info("deleted repo", "repo", evt.Record.AtUri())
264 return nil
265 }
266 return nil
267}
268
269func (s *Spindle) processPull(ctx context.Context, evt tap.Event) error {
270 l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri())
271
272 l.Info("processing pull record")
273
274 switch evt.Record.Action {
275 case tap.RecordCreateAction, tap.RecordUpdateAction:
276 // TODO
277 case tap.RecordDeleteAction:
278 // TODO
279 }
280 return nil
281}
282
283func (s *Spindle) tapSafeRemoveDid(ctx context.Context, did syntax.DID) error {
284 known, err := s.db.IsKnownDid(syntax.DID(did))
285 if err != nil {
286 return fmt.Errorf("ensuring did known state: %w", err)
287 }
288 if !known {
289 if err := s.tap.RemoveRepos(ctx, []syntax.DID{did}); err != nil {
290 return fmt.Errorf("removing did from tap: %w", err)
291 }
292 }
293 return nil
294}