this repo has no description
1package appview
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log"
8 "time"
9
10 "github.com/bluesky-social/indigo/atproto/syntax"
11 "github.com/bluesky-social/jetstream/pkg/models"
12 "github.com/go-git/go-git/v5/plumbing"
13 "github.com/ipfs/go-cid"
14 "tangled.sh/tangled.sh/core/api/tangled"
15 "tangled.sh/tangled.sh/core/appview/db"
16 "tangled.sh/tangled.sh/core/rbac"
17)
18
19type Ingester func(ctx context.Context, e *models.Event) error
20
21func Ingest(d db.DbWrapper, enforcer *rbac.Enforcer) Ingester {
22 return func(ctx context.Context, e *models.Event) error {
23 var err error
24 defer func() {
25 eventTime := e.TimeUS
26 lastTimeUs := eventTime + 1
27 if err := d.SaveLastTimeUs(lastTimeUs); err != nil {
28 err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
29 }
30 }()
31
32 if e.Kind != models.EventKindCommit {
33 return nil
34 }
35
36 switch e.Commit.Collection {
37 case tangled.GraphFollowNSID:
38 ingestFollow(&d, e)
39 case tangled.FeedStarNSID:
40 ingestStar(&d, e)
41 case tangled.PublicKeyNSID:
42 ingestPublicKey(&d, e)
43 case tangled.RepoArtifactNSID:
44 ingestArtifact(&d, e, enforcer)
45 case tangled.ActorProfileNSID:
46 ingestProfile(&d, e)
47 case tangled.SpindleMemberNSID:
48 ingestSpindleMember(&d, e, enforcer)
49 }
50
51 return err
52 }
53}
54
55func ingestStar(d *db.DbWrapper, e *models.Event) error {
56 var err error
57 did := e.Did
58
59 switch e.Commit.Operation {
60 case models.CommitOperationCreate, models.CommitOperationUpdate:
61 var subjectUri syntax.ATURI
62
63 raw := json.RawMessage(e.Commit.Record)
64 record := tangled.FeedStar{}
65 err := json.Unmarshal(raw, &record)
66 if err != nil {
67 log.Println("invalid record")
68 return err
69 }
70
71 subjectUri, err = syntax.ParseATURI(record.Subject)
72 if err != nil {
73 log.Println("invalid record")
74 return err
75 }
76 err = db.AddStar(d, did, subjectUri, e.Commit.RKey)
77 case models.CommitOperationDelete:
78 err = db.DeleteStarByRkey(d, did, e.Commit.RKey)
79 }
80
81 if err != nil {
82 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err)
83 }
84
85 return nil
86}
87
88func ingestFollow(d *db.DbWrapper, e *models.Event) error {
89 var err error
90 did := e.Did
91
92 switch e.Commit.Operation {
93 case models.CommitOperationCreate, models.CommitOperationUpdate:
94 raw := json.RawMessage(e.Commit.Record)
95 record := tangled.GraphFollow{}
96 err = json.Unmarshal(raw, &record)
97 if err != nil {
98 log.Println("invalid record")
99 return err
100 }
101
102 subjectDid := record.Subject
103 err = db.AddFollow(d, did, subjectDid, e.Commit.RKey)
104 case models.CommitOperationDelete:
105 err = db.DeleteFollowByRkey(d, did, e.Commit.RKey)
106 }
107
108 if err != nil {
109 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err)
110 }
111
112 return nil
113}
114
115func ingestPublicKey(d *db.DbWrapper, e *models.Event) error {
116 did := e.Did
117 var err error
118
119 switch e.Commit.Operation {
120 case models.CommitOperationCreate, models.CommitOperationUpdate:
121 log.Println("processing add of pubkey")
122 raw := json.RawMessage(e.Commit.Record)
123 record := tangled.PublicKey{}
124 err = json.Unmarshal(raw, &record)
125 if err != nil {
126 log.Printf("invalid record: %s", err)
127 return err
128 }
129
130 name := record.Name
131 key := record.Key
132 err = db.AddPublicKey(d, did, name, key, e.Commit.RKey)
133 case models.CommitOperationDelete:
134 log.Println("processing delete of pubkey")
135 err = db.DeletePublicKeyByRkey(d, did, e.Commit.RKey)
136 }
137
138 if err != nil {
139 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err)
140 }
141
142 return nil
143}
144
145func ingestArtifact(d *db.DbWrapper, e *models.Event, enforcer *rbac.Enforcer) error {
146 did := e.Did
147 var err error
148
149 switch e.Commit.Operation {
150 case models.CommitOperationCreate, models.CommitOperationUpdate:
151 raw := json.RawMessage(e.Commit.Record)
152 record := tangled.RepoArtifact{}
153 err = json.Unmarshal(raw, &record)
154 if err != nil {
155 log.Printf("invalid record: %s", err)
156 return err
157 }
158
159 repoAt, err := syntax.ParseATURI(record.Repo)
160 if err != nil {
161 return err
162 }
163
164 repo, err := db.GetRepoByAtUri(d, repoAt.String())
165 if err != nil {
166 return err
167 }
168
169 ok, err := enforcer.E.Enforce(did, repo.Knot, repo.DidSlashRepo(), "repo:push")
170 if err != nil || !ok {
171 return err
172 }
173
174 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt)
175 if err != nil {
176 createdAt = time.Now()
177 }
178
179 artifact := db.Artifact{
180 Did: did,
181 Rkey: e.Commit.RKey,
182 RepoAt: repoAt,
183 Tag: plumbing.Hash(record.Tag),
184 CreatedAt: createdAt,
185 BlobCid: cid.Cid(record.Artifact.Ref),
186 Name: record.Name,
187 Size: uint64(record.Artifact.Size),
188 MimeType: record.Artifact.MimeType,
189 }
190
191 err = db.AddArtifact(d, artifact)
192 case models.CommitOperationDelete:
193 err = db.DeleteArtifact(d, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey))
194 }
195
196 if err != nil {
197 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err)
198 }
199
200 return nil
201}
202
203func ingestProfile(d *db.DbWrapper, e *models.Event) error {
204 did := e.Did
205 var err error
206
207 if e.Commit.RKey != "self" {
208 return fmt.Errorf("ingestProfile only ingests `self` record")
209 }
210
211 switch e.Commit.Operation {
212 case models.CommitOperationCreate, models.CommitOperationUpdate:
213 raw := json.RawMessage(e.Commit.Record)
214 record := tangled.ActorProfile{}
215 err = json.Unmarshal(raw, &record)
216 if err != nil {
217 log.Printf("invalid record: %s", err)
218 return err
219 }
220
221 description := ""
222 if record.Description != nil {
223 description = *record.Description
224 }
225
226 includeBluesky := record.Bluesky
227
228 location := ""
229 if record.Location != nil {
230 location = *record.Location
231 }
232
233 var links [5]string
234 for i, l := range record.Links {
235 if i < 5 {
236 links[i] = l
237 }
238 }
239
240 var stats [2]db.VanityStat
241 for i, s := range record.Stats {
242 if i < 2 {
243 stats[i].Kind = db.VanityStatKind(s)
244 }
245 }
246
247 var pinned [6]syntax.ATURI
248 for i, r := range record.PinnedRepositories {
249 if i < 6 {
250 pinned[i] = syntax.ATURI(r)
251 }
252 }
253
254 profile := db.Profile{
255 Did: did,
256 Description: description,
257 IncludeBluesky: includeBluesky,
258 Location: location,
259 Links: links,
260 Stats: stats,
261 PinnedRepos: pinned,
262 }
263
264 ddb, ok := d.Execer.(*db.DB)
265 if !ok {
266 return fmt.Errorf("failed to index profile record, invalid db cast")
267 }
268
269 tx, err := ddb.Begin()
270 if err != nil {
271 return fmt.Errorf("failed to start transaction")
272 }
273
274 err = db.ValidateProfile(tx, &profile)
275 if err != nil {
276 return fmt.Errorf("invalid profile record")
277 }
278
279 err = db.UpsertProfile(tx, &profile)
280 case models.CommitOperationDelete:
281 err = db.DeleteArtifact(d, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey))
282 }
283
284 if err != nil {
285 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err)
286 }
287
288 return nil
289}
290
291func ingestSpindleMember(_ *db.DbWrapper, e *models.Event, enforcer *rbac.Enforcer) error {
292 did := e.Did
293 var err error
294
295 switch e.Commit.Operation {
296 case models.CommitOperationCreate:
297 raw := json.RawMessage(e.Commit.Record)
298 record := tangled.SpindleMember{}
299 err = json.Unmarshal(raw, &record)
300 if err != nil {
301 log.Printf("invalid record: %s", err)
302 return err
303 }
304
305 // only spindle owner can invite to spindles
306 ok, err := enforcer.IsSpindleInviteAllowed(did, record.Instance)
307 if err != nil || !ok {
308 return fmt.Errorf("failed to enforce permissions: %w", err)
309 }
310
311 err = enforcer.AddSpindleMember(record.Instance, record.Subject)
312 if err != nil {
313 return fmt.Errorf("failed to add member: %w", err)
314 }
315 }
316
317 return nil
318}