this repo has no description
1package spindle
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "time"
8
9 "github.com/bluesky-social/indigo/atproto/syntax"
10 "tangled.org/core/api/tangled"
11 "tangled.org/core/eventconsumer"
12 "tangled.org/core/spindle/db"
13 "tangled.org/core/spindle/git"
14 "tangled.org/core/tap"
15)
16
17func (s *Spindle) processEvent(ctx context.Context, evt tap.Event) error {
18 l := s.l.With("component", "tapIndexer")
19
20 var err error
21 switch evt.Type {
22 case tap.EvtRecord:
23 switch evt.Record.Collection.String() {
24 case tangled.SpindleMemberNSID:
25 err = s.processMember(ctx, evt)
26 case tangled.RepoNSID:
27 err = s.processRepo(ctx, evt)
28 case tangled.RepoCollaboratorNSID:
29 err = s.processCollaborator(ctx, evt)
30 case tangled.RepoPullNSID:
31 err = s.processPull(ctx, evt)
32 }
33 case tap.EvtIdentity:
34 // no-op
35 }
36
37 if err != nil {
38 l.Error("failed to process message. will retry later", "event.ID", evt.ID, "err", err)
39 return err
40 }
41 return nil
42}
43
44// NOTE: make sure to return nil if we don't need to retry (e.g. forbidden, unrelated)
45
46func (s *Spindle) processMember(ctx context.Context, evt tap.Event) error {
47 l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri())
48
49 l.Info("processing spindle.member record")
50
51 // only listen to members
52 if ok, err := s.e.IsSpindleMemberInviteAllowed(evt.Record.Did, s.cfg.Server.Did()); !ok || err != nil {
53 l.Warn("forbidden request: member invite not allowed", "did", evt.Record.Did, "error", err)
54 return nil
55 }
56
57 switch evt.Record.Action {
58 case tap.RecordCreateAction, tap.RecordUpdateAction:
59 record := tangled.SpindleMember{}
60 if err := json.Unmarshal(evt.Record.Record, &record); err != nil {
61 return fmt.Errorf("parsing record: %w", err)
62 }
63
64 domain := s.cfg.Server.Hostname
65 if record.Instance != domain {
66 l.Info("domain mismatch", "domain", record.Instance, "expected", domain)
67 return nil
68 }
69
70 created, err := time.Parse(record.CreatedAt, time.RFC3339)
71 if err != nil {
72 created = time.Now()
73 }
74 if err := db.AddSpindleMember(s.db, db.SpindleMember{
75 Did: evt.Record.Did,
76 Rkey: evt.Record.Rkey.String(),
77 Instance: record.Instance,
78 Subject: syntax.DID(record.Subject),
79 Created: created,
80 }); err != nil {
81 l.Error("failed to add member", "error", err)
82 return fmt.Errorf("adding member to db: %w", err)
83 }
84 if err := s.e.AddSpindleMember(syntax.DID(record.Subject), s.cfg.Server.Did()); err != nil {
85 return fmt.Errorf("adding member to rbac: %w", err)
86 }
87 if err := s.tap.AddRepos(ctx, []syntax.DID{syntax.DID(record.Subject)}); err != nil {
88 return fmt.Errorf("adding did to tap", err)
89 }
90
91 l.Info("added member", "member", record.Subject)
92 return nil
93
94 case tap.RecordDeleteAction:
95 var (
96 did = evt.Record.Did.String()
97 rkey = evt.Record.Rkey.String()
98 )
99 member, err := db.GetSpindleMember(s.db, did, rkey)
100 if err != nil {
101 return fmt.Errorf("finding member: %w", err)
102 }
103
104 if err := db.RemoveSpindleMember(s.db, did, rkey); err != nil {
105 return fmt.Errorf("removing member from db: %w", err)
106 }
107 if err := s.e.RemoveSpindleMember(member.Subject, s.cfg.Server.Did()); err != nil {
108 return fmt.Errorf("removing member from rbac: %w", err)
109 }
110 if err := s.tapSafeRemoveDid(ctx, member.Subject); err != nil {
111 return fmt.Errorf("removing did from tap: %w", err)
112 }
113
114 l.Info("removed member", "member", member.Subject)
115 return nil
116 }
117 return nil
118}
119
120func (s *Spindle) processCollaborator(ctx context.Context, evt tap.Event) error {
121 l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri())
122
123 l.Info("processing repo.collaborator record")
124
125 // only listen to members
126 if ok, err := s.e.IsSpindleMember(evt.Record.Did, s.cfg.Server.Did()); !ok || err != nil {
127 l.Warn("forbidden request: not spindle member", "did", evt.Record.Did, "err", err)
128 return nil
129 }
130
131 switch evt.Record.Action {
132 case tap.RecordCreateAction, tap.RecordUpdateAction:
133 record := tangled.RepoCollaborator{}
134 if err := json.Unmarshal(evt.Record.Record, &record); err != nil {
135 l.Error("invalid record", "err", err)
136 return fmt.Errorf("parsing record: %w", err)
137 }
138
139 // retry later if target repo is not ingested yet
140 if _, err := s.db.GetRepo(syntax.ATURI(record.Repo)); err != nil {
141 l.Warn("target repo is not ingested yet", "repo", record.Repo, "err", err)
142 return fmt.Errorf("target repo is unknown")
143 }
144
145 // check perms for this user
146 if ok, err := s.e.IsRepoCollaboratorInviteAllowed(evt.Record.Did, syntax.ATURI(record.Repo)); !ok || err != nil {
147 l.Warn("forbidden request collaborator invite not allowed", "did", evt.Record.Did, "err", err)
148 return nil
149 }
150
151 if err := s.db.PutRepoCollaborator(&db.RepoCollaborator{
152 Did: evt.Record.Did,
153 Rkey: evt.Record.Rkey,
154 Repo: syntax.ATURI(record.Repo),
155 Subject: syntax.DID(record.Subject),
156 }); err != nil {
157 return fmt.Errorf("adding collaborator to db: %w", err)
158 }
159 if err := s.e.AddRepoCollaborator(syntax.DID(record.Subject), syntax.ATURI(record.Repo)); err != nil {
160 return fmt.Errorf("adding collaborator to rbac: %w", err)
161 }
162 if err := s.tap.AddRepos(ctx, []syntax.DID{syntax.DID(record.Subject)}); err != nil {
163 return fmt.Errorf("adding did to tap: %w", err)
164 }
165
166 l.Info("add repo collaborator", "subejct", record.Subject, "repo", record.Repo)
167 return nil
168
169 case tap.RecordDeleteAction:
170 // get existing collaborator
171 collaborator, err := s.db.GetRepoCollaborator(evt.Record.Did, evt.Record.Rkey)
172 if err != nil {
173 return fmt.Errorf("failed to get existing collaborator info: %w", err)
174 }
175
176 // check perms for this user
177 if ok, err := s.e.IsRepoCollaboratorInviteAllowed(evt.Record.Did, collaborator.Repo); !ok || err != nil {
178 l.Warn("forbidden request collaborator invite not allowed", "did", evt.Record.Did, "err", err)
179 return nil
180 }
181
182 if err := s.db.RemoveRepoCollaborator(collaborator.Subject, collaborator.Rkey); err != nil {
183 return fmt.Errorf("removing collaborator from db: %w", err)
184 }
185 if err := s.e.RemoveRepoCollaborator(collaborator.Subject, collaborator.Repo); err != nil {
186 return fmt.Errorf("removing collaborator from rbac: %w", err)
187 }
188 if err := s.tapSafeRemoveDid(ctx, collaborator.Subject); err != nil {
189 return fmt.Errorf("removing did from tap: %w", err)
190 }
191
192 l.Info("removed repo collaborator", "subejct", collaborator.Subject, "repo", collaborator.Repo)
193 return nil
194 }
195 return nil
196}
197
198func (s *Spindle) processRepo(ctx context.Context, evt tap.Event) error {
199 l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri())
200
201 l.Info("processing repo record")
202
203 // only listen to members
204 if ok, err := s.e.IsSpindleMember(evt.Record.Did, s.cfg.Server.Did()); !ok || err != nil {
205 l.Warn("forbidden request: not spindle member", "did", evt.Record.Did, "err", err)
206 return nil
207 }
208
209 switch evt.Record.Action {
210 case tap.RecordCreateAction, tap.RecordUpdateAction:
211 record := tangled.Repo{}
212 if err := json.Unmarshal(evt.Record.Record, &record); err != nil {
213 return fmt.Errorf("parsing record: %w", err)
214 }
215
216 domain := s.cfg.Server.Hostname
217 if record.Spindle == nil || *record.Spindle != domain {
218 if record.Spindle == nil {
219 l.Info("spindle isn't configured", "name", record.Name)
220 } else {
221 l.Info("different spindle configured", "name", record.Name, "spindle", *record.Spindle, "domain", domain)
222 }
223 if err := s.db.DeleteRepo(evt.Record.Did, evt.Record.Rkey); err != nil {
224 return fmt.Errorf("deleting repo from db: %w", err)
225 }
226 return nil
227 }
228
229 repo := &db.Repo{
230 Did: evt.Record.Did,
231 Rkey: evt.Record.Rkey,
232 Name: record.Name,
233 Knot: record.Knot,
234 }
235
236 if err := s.db.PutRepo(repo); err != nil {
237 return fmt.Errorf("adding repo to db: %w", err)
238 }
239
240 if err := s.e.AddRepo(evt.Record.AtUri()); err != nil {
241 return fmt.Errorf("adding repo to rbac")
242 }
243
244 // add this knot to the event consumer
245 src := eventconsumer.NewKnotSource(record.Knot)
246 s.ks.AddSource(context.Background(), src)
247
248 // setup sparse sync
249 repoCloneUri := s.newRepoCloneUrl(repo.Knot, repo.Did.String(), repo.Name)
250 repoPath := s.newRepoPath(repo.Did, repo.Rkey)
251 if err := git.SparseSyncGitRepo(ctx, repoCloneUri, repoPath, ""); err != nil {
252 return fmt.Errorf("setting up sparse-clone git repo: %w", err)
253 }
254
255 l.Info("added repo", "repo", evt.Record.AtUri())
256 return nil
257
258 case tap.RecordDeleteAction:
259 // check perms for this user
260 if ok, err := s.e.IsRepoOwner(evt.Record.Did, evt.Record.AtUri()); !ok || err != nil {
261 l.Warn("forbidden request: not repo owner", "did", evt.Record.Did, "err", err)
262 return nil
263 }
264
265 if err := s.db.DeleteRepo(evt.Record.Did, evt.Record.Rkey); err != nil {
266 return fmt.Errorf("deleting repo from db: %w", err)
267 }
268
269 if err := s.e.DeleteRepo(evt.Record.AtUri()); err != nil {
270 return fmt.Errorf("deleting repo from rbac: %w", err)
271 }
272
273 l.Info("deleted repo", "repo", evt.Record.AtUri())
274 return nil
275 }
276 return nil
277}
278
279func (s *Spindle) processPull(ctx context.Context, evt tap.Event) error {
280 l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri())
281
282 l.Info("processing pull record")
283
284 switch evt.Record.Action {
285 case tap.RecordCreateAction, tap.RecordUpdateAction:
286 // TODO
287 case tap.RecordDeleteAction:
288 // TODO
289 }
290 return nil
291}
292
293func (s *Spindle) tapSafeRemoveDid(ctx context.Context, did syntax.DID) error {
294 known, err := s.db.IsKnownDid(syntax.DID(did))
295 if err != nil {
296 return fmt.Errorf("ensuring did known state: %w", err)
297 }
298 if !known {
299 if err := s.tap.RemoveRepos(ctx, []syntax.DID{did}); err != nil {
300 return fmt.Errorf("removing did from tap: %w", err)
301 }
302 }
303 return nil
304}