Monorepo for Tangled
1package appview
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "log/slog"
10 "maps"
11 "slices"
12
13 "time"
14
15 "github.com/bluesky-social/indigo/atproto/syntax"
16 jmodels "github.com/bluesky-social/jetstream/pkg/models"
17 "github.com/go-git/go-git/v5/plumbing"
18 "github.com/ipfs/go-cid"
19 "tangled.org/core/api/tangled"
20 "tangled.org/core/appview/config"
21 "tangled.org/core/appview/db"
22 "tangled.org/core/appview/models"
23 "tangled.org/core/appview/serververify"
24 "tangled.org/core/appview/validator"
25 "tangled.org/core/idresolver"
26 "tangled.org/core/orm"
27 "tangled.org/core/rbac"
28)
29
30type Ingester struct {
31 Db db.DbWrapper
32 Enforcer *rbac.Enforcer
33 IdResolver *idresolver.Resolver
34 Config *config.Config
35 Logger *slog.Logger
36 Validator *validator.Validator
37}
38
39type processFunc func(ctx context.Context, e *jmodels.Event) error
40
41func (i *Ingester) Ingest() processFunc {
42 return func(ctx context.Context, e *jmodels.Event) error {
43 var err error
44 defer func() {
45 eventTime := e.TimeUS
46 lastTimeUs := eventTime + 1
47 if err := i.Db.SaveLastTimeUs(lastTimeUs); err != nil {
48 err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
49 }
50 }()
51
52 l := i.Logger.With("kind", e.Kind)
53 switch e.Kind {
54 case jmodels.EventKindAccount:
55 if !e.Account.Active && *e.Account.Status == "deactivated" {
56 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did)
57 }
58 case jmodels.EventKindIdentity:
59 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did)
60 case jmodels.EventKindCommit:
61 switch e.Commit.Collection {
62 case tangled.GraphFollowNSID:
63 err = i.ingestFollow(e)
64 case tangled.FeedStarNSID:
65 err = i.ingestStar(e)
66 case tangled.PublicKeyNSID:
67 err = i.ingestPublicKey(e)
68 case tangled.RepoArtifactNSID:
69 err = i.ingestArtifact(e)
70 case tangled.ActorProfileNSID:
71 err = i.ingestProfile(e)
72 case tangled.SpindleMemberNSID:
73 err = i.ingestSpindleMember(ctx, e)
74 case tangled.SpindleNSID:
75 err = i.ingestSpindle(ctx, e)
76 case tangled.KnotMemberNSID:
77 err = i.ingestKnotMember(e)
78 case tangled.KnotNSID:
79 err = i.ingestKnot(e)
80 case tangled.StringNSID:
81 err = i.ingestString(e)
82 case tangled.RepoIssueNSID:
83 err = i.ingestIssue(ctx, e)
84 case tangled.RepoIssueCommentNSID:
85 err = i.ingestIssueComment(e)
86 case tangled.LabelDefinitionNSID:
87 err = i.ingestLabelDefinition(e)
88 case tangled.LabelOpNSID:
89 err = i.ingestLabelOp(e)
90 }
91 l = i.Logger.With("nsid", e.Commit.Collection)
92 }
93
94 if err != nil {
95 l.Warn("refused to ingest record", "err", err)
96 }
97
98 return nil
99 }
100}
101
102func (i *Ingester) ingestStar(e *jmodels.Event) error {
103 var err error
104 did := e.Did
105
106 l := i.Logger.With("handler", "ingestStar")
107 l = l.With("nsid", e.Commit.Collection)
108
109 switch e.Commit.Operation {
110 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
111 var subjectUri syntax.ATURI
112
113 raw := json.RawMessage(e.Commit.Record)
114 record := tangled.FeedStar{}
115 err := json.Unmarshal(raw, &record)
116 if err != nil {
117 l.Error("invalid record", "err", err)
118 return err
119 }
120
121 star := &models.Star{
122 Did: did,
123 Rkey: e.Commit.RKey,
124 }
125
126 switch {
127 case record.SubjectDid != nil:
128 star.SubjectDid = *record.SubjectDid
129 repo, repoErr := db.GetRepo(i.Db, orm.FilterEq("repo_did", *record.SubjectDid))
130 if repoErr == nil {
131 subjectUri = repo.RepoAt()
132 star.RepoAt = subjectUri
133 }
134 case record.Subject != nil:
135 subjectUri, err = syntax.ParseATURI(*record.Subject)
136 if err != nil {
137 l.Error("invalid record", "err", err)
138 return err
139 }
140 star.RepoAt = subjectUri
141 repo, repoErr := db.GetRepoByAtUri(i.Db, subjectUri.String())
142 if repoErr == nil && repo.RepoDid != "" {
143 star.SubjectDid = repo.RepoDid
144 if enqErr := db.EnqueuePdsRewrite(i.Db, did, repo.RepoDid, tangled.FeedStarNSID, e.Commit.RKey, *record.Subject); enqErr != nil {
145 l.Warn("failed to enqueue PDS rewrite for star", "err", enqErr, "did", did, "repoDid", repo.RepoDid)
146 }
147 }
148 default:
149 l.Error("star record has neither subject nor subjectDid")
150 return fmt.Errorf("star record has neither subject nor subjectDid")
151 }
152 err = db.AddStar(i.Db, star)
153 case jmodels.CommitOperationDelete:
154 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey)
155 }
156
157 if err != nil {
158 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err)
159 }
160
161 return nil
162}
163
164func (i *Ingester) ingestFollow(e *jmodels.Event) error {
165 var err error
166 did := e.Did
167
168 l := i.Logger.With("handler", "ingestFollow")
169 l = l.With("nsid", e.Commit.Collection)
170
171 switch e.Commit.Operation {
172 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
173 raw := json.RawMessage(e.Commit.Record)
174 record := tangled.GraphFollow{}
175 err = json.Unmarshal(raw, &record)
176 if err != nil {
177 l.Error("invalid record", "err", err)
178 return err
179 }
180
181 err = db.AddFollow(i.Db, &models.Follow{
182 UserDid: did,
183 SubjectDid: record.Subject,
184 Rkey: e.Commit.RKey,
185 })
186 case jmodels.CommitOperationDelete:
187 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey)
188 }
189
190 if err != nil {
191 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err)
192 }
193
194 return nil
195}
196
197func (i *Ingester) ingestPublicKey(e *jmodels.Event) error {
198 did := e.Did
199 var err error
200
201 l := i.Logger.With("handler", "ingestPublicKey")
202 l = l.With("nsid", e.Commit.Collection)
203
204 switch e.Commit.Operation {
205 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
206 l.Debug("processing add of pubkey")
207 raw := json.RawMessage(e.Commit.Record)
208 record := tangled.PublicKey{}
209 err = json.Unmarshal(raw, &record)
210 if err != nil {
211 l.Error("invalid record", "err", err)
212 return err
213 }
214
215 name := record.Name
216 key := record.Key
217 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey)
218 case jmodels.CommitOperationDelete:
219 l.Debug("processing delete of pubkey")
220 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey)
221 }
222
223 if err != nil {
224 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err)
225 }
226
227 return nil
228}
229
230func (i *Ingester) ingestArtifact(e *jmodels.Event) error {
231 did := e.Did
232 var err error
233
234 l := i.Logger.With("handler", "ingestArtifact")
235 l = l.With("nsid", e.Commit.Collection)
236
237 switch e.Commit.Operation {
238 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
239 raw := json.RawMessage(e.Commit.Record)
240 record := tangled.RepoArtifact{}
241 err = json.Unmarshal(raw, &record)
242 if err != nil {
243 l.Error("invalid record", "err", err)
244 return err
245 }
246
247 var repo *models.Repo
248 if record.RepoDid != nil && *record.RepoDid != "" {
249 repo, err = db.GetRepoByDid(i.Db, *record.RepoDid)
250 if err != nil && !errors.Is(err, sql.ErrNoRows) {
251 return fmt.Errorf("failed to look up repo by DID %s: %w", *record.RepoDid, err)
252 }
253 }
254 if repo == nil && record.Repo != nil {
255 repoAt, parseErr := syntax.ParseATURI(*record.Repo)
256 if parseErr != nil {
257 return parseErr
258 }
259 repo, err = db.GetRepoByAtUri(i.Db, repoAt.String())
260 if err != nil {
261 return err
262 }
263 }
264 if repo == nil {
265 return fmt.Errorf("artifact record has neither valid repoDid nor repo field")
266 }
267
268 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.RepoIdentifier(), "repo:push")
269 if err != nil || !ok {
270 return err
271 }
272
273 repoDid := repo.RepoDid
274 if repoDid == "" && record.RepoDid != nil {
275 repoDid = *record.RepoDid
276 }
277 if repoDid != "" && (record.RepoDid == nil || *record.RepoDid == "") && record.Repo != nil {
278 if enqErr := db.EnqueuePdsRewrite(i.Db, did, repoDid, tangled.RepoArtifactNSID, e.Commit.RKey, *record.Repo); enqErr != nil {
279 l.Warn("failed to enqueue PDS rewrite for artifact", "err", enqErr, "did", did, "repoDid", repoDid)
280 }
281 }
282
283 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt)
284 if err != nil {
285 createdAt = time.Now()
286 }
287
288 artifact := models.Artifact{
289 Did: did,
290 Rkey: e.Commit.RKey,
291 RepoAt: repo.RepoAt(),
292 RepoDid: repoDid,
293 Tag: plumbing.Hash(record.Tag),
294 CreatedAt: createdAt,
295 BlobCid: cid.Cid(record.Artifact.Ref),
296 Name: record.Name,
297 Size: uint64(record.Artifact.Size),
298 MimeType: record.Artifact.MimeType,
299 }
300
301 err = db.AddArtifact(i.Db, artifact)
302 case jmodels.CommitOperationDelete:
303 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey))
304 }
305
306 if err != nil {
307 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err)
308 }
309
310 return nil
311}
312
313func (i *Ingester) ingestProfile(e *jmodels.Event) error {
314 did := e.Did
315 var err error
316
317 l := i.Logger.With("handler", "ingestProfile")
318 l = l.With("nsid", e.Commit.Collection)
319
320 if e.Commit.RKey != "self" {
321 return fmt.Errorf("ingestProfile only ingests `self` record")
322 }
323
324 switch e.Commit.Operation {
325 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
326 raw := json.RawMessage(e.Commit.Record)
327 record := tangled.ActorProfile{}
328 err = json.Unmarshal(raw, &record)
329 if err != nil {
330 l.Error("invalid record", "err", err)
331 return err
332 }
333
334 avatar := ""
335 if record.Avatar != nil {
336 avatar = record.Avatar.Ref.String()
337 }
338
339 description := ""
340 if record.Description != nil {
341 description = *record.Description
342 }
343
344 includeBluesky := record.Bluesky
345
346 pronouns := ""
347 if record.Pronouns != nil {
348 pronouns = *record.Pronouns
349 }
350
351 location := ""
352 if record.Location != nil {
353 location = *record.Location
354 }
355
356 var links [5]string
357 for i, l := range record.Links {
358 if i < 5 {
359 links[i] = l
360 }
361 }
362
363 var stats [2]models.VanityStat
364 for i, s := range record.Stats {
365 if i < 2 {
366 stats[i].Kind = models.ParseVanityStatKind(s)
367 }
368 }
369
370 var pinned [6]syntax.ATURI
371 for i, r := range record.PinnedRepositories {
372 if i < 6 {
373 pinned[i] = syntax.ATURI(r)
374 }
375 }
376
377 profile := models.Profile{
378 Did: did,
379 Avatar: avatar,
380 Description: description,
381 IncludeBluesky: includeBluesky,
382 Location: location,
383 Links: links,
384 Stats: stats,
385 PinnedRepos: pinned,
386 Pronouns: pronouns,
387 }
388
389 ddb, ok := i.Db.Execer.(*db.DB)
390 if !ok {
391 return fmt.Errorf("failed to index profile record, invalid db cast")
392 }
393
394 tx, err := ddb.Begin()
395 if err != nil {
396 return fmt.Errorf("failed to start transaction")
397 }
398
399 err = db.ValidateProfile(tx, &profile)
400 if err != nil {
401 return fmt.Errorf("invalid profile record")
402 }
403
404 err = db.UpsertProfile(tx, &profile)
405 case jmodels.CommitOperationDelete:
406 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey))
407 }
408
409 if err != nil {
410 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err)
411 }
412
413 return nil
414}
415
416func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event) error {
417 did := e.Did
418 var err error
419
420 l := i.Logger.With("handler", "ingestSpindleMember")
421 l = l.With("nsid", e.Commit.Collection)
422
423 switch e.Commit.Operation {
424 case jmodels.CommitOperationCreate:
425 raw := json.RawMessage(e.Commit.Record)
426 record := tangled.SpindleMember{}
427 err = json.Unmarshal(raw, &record)
428 if err != nil {
429 l.Error("invalid record", "err", err)
430 return err
431 }
432
433 // only spindle owner can invite to spindles
434 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance)
435 if err != nil || !ok {
436 return fmt.Errorf("failed to enforce permissions: %w", err)
437 }
438
439 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject)
440 if err != nil {
441 return err
442 }
443
444 if memberId.Handle.IsInvalidHandle() {
445 return err
446 }
447
448 ddb, ok := i.Db.Execer.(*db.DB)
449 if !ok {
450 return fmt.Errorf("invalid db cast")
451 }
452
453 err = db.AddSpindleMember(ddb, models.SpindleMember{
454 Did: syntax.DID(did),
455 Rkey: e.Commit.RKey,
456 Instance: record.Instance,
457 Subject: memberId.DID,
458 })
459 if !ok {
460 return fmt.Errorf("failed to add to db: %w", err)
461 }
462
463 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String())
464 if err != nil {
465 return fmt.Errorf("failed to update ACLs: %w", err)
466 }
467
468 l.Info("added spindle member")
469 case jmodels.CommitOperationDelete:
470 rkey := e.Commit.RKey
471
472 ddb, ok := i.Db.Execer.(*db.DB)
473 if !ok {
474 return fmt.Errorf("failed to index profile record, invalid db cast")
475 }
476
477 // get record from db first
478 members, err := db.GetSpindleMembers(
479 ddb,
480 orm.FilterEq("did", did),
481 orm.FilterEq("rkey", rkey),
482 )
483 if err != nil || len(members) != 1 {
484 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members))
485 }
486 member := members[0]
487
488 tx, err := ddb.Begin()
489 if err != nil {
490 return fmt.Errorf("failed to start txn: %w", err)
491 }
492
493 // remove record by rkey && update enforcer
494 if err = db.RemoveSpindleMember(
495 tx,
496 orm.FilterEq("did", did),
497 orm.FilterEq("rkey", rkey),
498 ); err != nil {
499 return fmt.Errorf("failed to remove from db: %w", err)
500 }
501
502 // update enforcer
503 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String())
504 if err != nil {
505 return fmt.Errorf("failed to update ACLs: %w", err)
506 }
507
508 if err = tx.Commit(); err != nil {
509 return fmt.Errorf("failed to commit txn: %w", err)
510 }
511
512 if err = i.Enforcer.E.SavePolicy(); err != nil {
513 return fmt.Errorf("failed to save ACLs: %w", err)
514 }
515
516 l.Info("removed spindle member")
517 }
518
519 return nil
520}
521
522func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event) error {
523 did := e.Did
524 var err error
525
526 l := i.Logger.With("handler", "ingestSpindle")
527 l = l.With("nsid", e.Commit.Collection)
528
529 switch e.Commit.Operation {
530 case jmodels.CommitOperationCreate:
531 raw := json.RawMessage(e.Commit.Record)
532 record := tangled.Spindle{}
533 err = json.Unmarshal(raw, &record)
534 if err != nil {
535 l.Error("invalid record", "err", err)
536 return err
537 }
538
539 instance := e.Commit.RKey
540
541 ddb, ok := i.Db.Execer.(*db.DB)
542 if !ok {
543 return fmt.Errorf("failed to index profile record, invalid db cast")
544 }
545
546 err := db.AddSpindle(ddb, models.Spindle{
547 Owner: syntax.DID(did),
548 Instance: instance,
549 })
550 if err != nil {
551 l.Error("failed to add spindle to db", "err", err, "instance", instance)
552 return err
553 }
554
555 err = serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev)
556 if err != nil {
557 l.Error("failed to add spindle to db", "err", err, "instance", instance)
558 return err
559 }
560
561 _, err = serververify.MarkSpindleVerified(ddb, i.Enforcer, instance, did)
562 if err != nil {
563 return fmt.Errorf("failed to mark verified: %w", err)
564 }
565
566 return nil
567
568 case jmodels.CommitOperationDelete:
569 instance := e.Commit.RKey
570
571 ddb, ok := i.Db.Execer.(*db.DB)
572 if !ok {
573 return fmt.Errorf("failed to index profile record, invalid db cast")
574 }
575
576 // get record from db first
577 spindles, err := db.GetSpindles(
578 ddb,
579 orm.FilterEq("owner", did),
580 orm.FilterEq("instance", instance),
581 )
582 if err != nil || len(spindles) != 1 {
583 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles))
584 }
585 spindle := spindles[0]
586
587 tx, err := ddb.Begin()
588 if err != nil {
589 return err
590 }
591 defer func() {
592 tx.Rollback()
593 i.Enforcer.E.LoadPolicy()
594 }()
595
596 // remove spindle members first
597 err = db.RemoveSpindleMember(
598 tx,
599 orm.FilterEq("owner", did),
600 orm.FilterEq("instance", instance),
601 )
602 if err != nil {
603 return err
604 }
605
606 err = db.DeleteSpindle(
607 tx,
608 orm.FilterEq("owner", did),
609 orm.FilterEq("instance", instance),
610 )
611 if err != nil {
612 return err
613 }
614
615 if spindle.Verified != nil {
616 err = i.Enforcer.RemoveSpindle(instance)
617 if err != nil {
618 return err
619 }
620 }
621
622 err = tx.Commit()
623 if err != nil {
624 return err
625 }
626
627 err = i.Enforcer.E.SavePolicy()
628 if err != nil {
629 return err
630 }
631 }
632
633 return nil
634}
635
636func (i *Ingester) ingestString(e *jmodels.Event) error {
637 did := e.Did
638 rkey := e.Commit.RKey
639
640 var err error
641
642 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
643 l.Info("ingesting record")
644
645 ddb, ok := i.Db.Execer.(*db.DB)
646 if !ok {
647 return fmt.Errorf("failed to index string record, invalid db cast")
648 }
649
650 switch e.Commit.Operation {
651 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
652 raw := json.RawMessage(e.Commit.Record)
653 record := tangled.String{}
654 err = json.Unmarshal(raw, &record)
655 if err != nil {
656 l.Error("invalid record", "err", err)
657 return err
658 }
659
660 string := models.StringFromRecord(did, rkey, record)
661
662 if err = i.Validator.ValidateString(&string); err != nil {
663 l.Error("invalid record", "err", err)
664 return err
665 }
666
667 if err = db.AddString(ddb, string); err != nil {
668 l.Error("failed to add string", "err", err)
669 return err
670 }
671
672 return nil
673
674 case jmodels.CommitOperationDelete:
675 if err := db.DeleteString(
676 ddb,
677 orm.FilterEq("did", did),
678 orm.FilterEq("rkey", rkey),
679 ); err != nil {
680 l.Error("failed to delete", "err", err)
681 return fmt.Errorf("failed to delete string record: %w", err)
682 }
683
684 return nil
685 }
686
687 return nil
688}
689
690func (i *Ingester) ingestKnotMember(e *jmodels.Event) error {
691 did := e.Did
692 var err error
693
694 l := i.Logger.With("handler", "ingestKnotMember")
695 l = l.With("nsid", e.Commit.Collection)
696
697 switch e.Commit.Operation {
698 case jmodels.CommitOperationCreate:
699 raw := json.RawMessage(e.Commit.Record)
700 record := tangled.KnotMember{}
701 err = json.Unmarshal(raw, &record)
702 if err != nil {
703 l.Error("invalid record", "err", err)
704 return err
705 }
706
707 // only knot owner can invite to knots
708 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain)
709 if err != nil || !ok {
710 return fmt.Errorf("failed to enforce permissions: %w", err)
711 }
712
713 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject)
714 if err != nil {
715 return err
716 }
717
718 if memberId.Handle.IsInvalidHandle() {
719 return err
720 }
721
722 err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String())
723 if err != nil {
724 return fmt.Errorf("failed to update ACLs: %w", err)
725 }
726
727 l.Info("added knot member")
728 case jmodels.CommitOperationDelete:
729 // we don't store knot members in a table (like we do for spindle)
730 // and we can't remove this just yet. possibly fixed if we switch
731 // to either:
732 // 1. a knot_members table like with spindle and store the rkey
733 // 2. use the knot host as the rkey
734 //
735 // TODO: implement member deletion
736 l.Info("skipping knot member delete", "did", did, "rkey", e.Commit.RKey)
737 }
738
739 return nil
740}
741
742func (i *Ingester) ingestKnot(e *jmodels.Event) error {
743 did := e.Did
744 var err error
745
746 l := i.Logger.With("handler", "ingestKnot")
747 l = l.With("nsid", e.Commit.Collection)
748
749 switch e.Commit.Operation {
750 case jmodels.CommitOperationCreate:
751 raw := json.RawMessage(e.Commit.Record)
752 record := tangled.Knot{}
753 err = json.Unmarshal(raw, &record)
754 if err != nil {
755 l.Error("invalid record", "err", err)
756 return err
757 }
758
759 domain := e.Commit.RKey
760
761 ddb, ok := i.Db.Execer.(*db.DB)
762 if !ok {
763 return fmt.Errorf("failed to index profile record, invalid db cast")
764 }
765
766 err := db.AddKnot(ddb, domain, did)
767 if err != nil {
768 l.Error("failed to add knot to db", "err", err, "domain", domain)
769 return err
770 }
771
772 err = serververify.RunVerification(context.Background(), domain, did, i.Config.Core.Dev)
773 if err != nil {
774 l.Error("failed to verify knot", "err", err, "domain", domain)
775 return err
776 }
777
778 err = serververify.MarkKnotVerified(ddb, i.Enforcer, domain, did)
779 if err != nil {
780 return fmt.Errorf("failed to mark verified: %w", err)
781 }
782
783 return nil
784
785 case jmodels.CommitOperationDelete:
786 domain := e.Commit.RKey
787
788 ddb, ok := i.Db.Execer.(*db.DB)
789 if !ok {
790 return fmt.Errorf("failed to index knot record, invalid db cast")
791 }
792
793 // get record from db first
794 registrations, err := db.GetRegistrations(
795 ddb,
796 orm.FilterEq("domain", domain),
797 orm.FilterEq("did", did),
798 )
799 if err != nil {
800 return fmt.Errorf("failed to get registration: %w", err)
801 }
802 if len(registrations) != 1 {
803 return fmt.Errorf("got incorret number of registrations: %d, expected 1", len(registrations))
804 }
805 registration := registrations[0]
806
807 tx, err := ddb.Begin()
808 if err != nil {
809 return err
810 }
811 defer func() {
812 tx.Rollback()
813 i.Enforcer.E.LoadPolicy()
814 }()
815
816 err = db.DeleteKnot(
817 tx,
818 orm.FilterEq("did", did),
819 orm.FilterEq("domain", domain),
820 )
821 if err != nil {
822 return err
823 }
824
825 if registration.Registered != nil {
826 err = i.Enforcer.RemoveKnot(domain)
827 if err != nil {
828 return err
829 }
830 }
831
832 err = tx.Commit()
833 if err != nil {
834 return err
835 }
836
837 err = i.Enforcer.E.SavePolicy()
838 if err != nil {
839 return err
840 }
841 }
842
843 return nil
844}
845func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event) error {
846 did := e.Did
847 rkey := e.Commit.RKey
848
849 var err error
850
851 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
852 l.Info("ingesting record")
853
854 ddb, ok := i.Db.Execer.(*db.DB)
855 if !ok {
856 return fmt.Errorf("failed to index issue record, invalid db cast")
857 }
858
859 switch e.Commit.Operation {
860 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
861 raw := json.RawMessage(e.Commit.Record)
862 record := tangled.RepoIssue{}
863 err = json.Unmarshal(raw, &record)
864 if err != nil {
865 l.Error("invalid record", "err", err)
866 return err
867 }
868
869 issue := models.IssueFromRecord(did, rkey, record)
870
871 if issue.RepoDid == "" && issue.RepoAt == "" {
872 return fmt.Errorf("issue record has neither repo nor repoDid")
873 }
874
875 if err := i.Validator.ValidateIssue(&issue); err != nil {
876 return fmt.Errorf("failed to validate issue: %w", err)
877 }
878
879 if issue.RepoDid == "" && record.Repo != nil {
880 repo, repoErr := db.GetRepoByAtUri(i.Db, *record.Repo)
881 if repoErr == nil && repo.RepoDid != "" {
882 issue.RepoDid = repo.RepoDid
883 if enqErr := db.EnqueuePdsRewrite(i.Db, did, repo.RepoDid, tangled.RepoIssueNSID, rkey, *record.Repo); enqErr != nil {
884 l.Warn("failed to enqueue PDS rewrite for issue", "err", enqErr, "did", did, "repoDid", repo.RepoDid)
885 }
886 }
887 }
888
889 tx, err := ddb.BeginTx(ctx, nil)
890 if err != nil {
891 l.Error("failed to begin transaction", "err", err)
892 return err
893 }
894 defer tx.Rollback()
895
896 err = db.PutIssue(tx, &issue)
897 if err != nil {
898 l.Error("failed to create issue", "err", err)
899 return err
900 }
901
902 err = tx.Commit()
903 if err != nil {
904 l.Error("failed to commit txn", "err", err)
905 return err
906 }
907
908 return nil
909
910 case jmodels.CommitOperationDelete:
911 tx, err := ddb.BeginTx(ctx, nil)
912 if err != nil {
913 l.Error("failed to begin transaction", "err", err)
914 return err
915 }
916 defer tx.Rollback()
917
918 if err := db.DeleteIssues(
919 tx,
920 did,
921 rkey,
922 ); err != nil {
923 l.Error("failed to delete", "err", err)
924 return fmt.Errorf("failed to delete issue record: %w", err)
925 }
926 if err := tx.Commit(); err != nil {
927 l.Error("failed to commit txn", "err", err)
928 return err
929 }
930
931 return nil
932 }
933
934 return nil
935}
936
937func (i *Ingester) ingestIssueComment(e *jmodels.Event) error {
938 did := e.Did
939 rkey := e.Commit.RKey
940
941 var err error
942
943 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
944 l.Info("ingesting record")
945
946 ddb, ok := i.Db.Execer.(*db.DB)
947 if !ok {
948 return fmt.Errorf("failed to index issue comment record, invalid db cast")
949 }
950
951 switch e.Commit.Operation {
952 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
953 raw := json.RawMessage(e.Commit.Record)
954 record := tangled.RepoIssueComment{}
955 err = json.Unmarshal(raw, &record)
956 if err != nil {
957 return fmt.Errorf("invalid record: %w", err)
958 }
959
960 comment, err := models.IssueCommentFromRecord(did, rkey, record)
961 if err != nil {
962 return fmt.Errorf("failed to parse comment from record: %w", err)
963 }
964
965 if err := i.Validator.ValidateIssueComment(comment); err != nil {
966 return fmt.Errorf("failed to validate comment: %w", err)
967 }
968
969 tx, err := ddb.Begin()
970 if err != nil {
971 return fmt.Errorf("failed to start transaction: %w", err)
972 }
973 defer tx.Rollback()
974
975 _, err = db.AddIssueComment(tx, *comment)
976 if err != nil {
977 return fmt.Errorf("failed to create issue comment: %w", err)
978 }
979
980 return tx.Commit()
981
982 case jmodels.CommitOperationDelete:
983 if err := db.DeleteIssueComments(
984 ddb,
985 orm.FilterEq("did", did),
986 orm.FilterEq("rkey", rkey),
987 ); err != nil {
988 return fmt.Errorf("failed to delete issue comment record: %w", err)
989 }
990
991 return nil
992 }
993
994 return nil
995}
996
997func (i *Ingester) ingestLabelDefinition(e *jmodels.Event) error {
998 did := e.Did
999 rkey := e.Commit.RKey
1000
1001 var err error
1002
1003 l := i.Logger.With("handler", "ingestLabelDefinition", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1004 l.Info("ingesting record")
1005
1006 ddb, ok := i.Db.Execer.(*db.DB)
1007 if !ok {
1008 return fmt.Errorf("failed to index label definition, invalid db cast")
1009 }
1010
1011 switch e.Commit.Operation {
1012 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1013 raw := json.RawMessage(e.Commit.Record)
1014 record := tangled.LabelDefinition{}
1015 err = json.Unmarshal(raw, &record)
1016 if err != nil {
1017 return fmt.Errorf("invalid record: %w", err)
1018 }
1019
1020 def, err := models.LabelDefinitionFromRecord(did, rkey, record)
1021 if err != nil {
1022 return fmt.Errorf("failed to parse labeldef from record: %w", err)
1023 }
1024
1025 if err := i.Validator.ValidateLabelDefinition(def); err != nil {
1026 return fmt.Errorf("failed to validate labeldef: %w", err)
1027 }
1028
1029 _, err = db.AddLabelDefinition(ddb, def)
1030 if err != nil {
1031 return fmt.Errorf("failed to create labeldef: %w", err)
1032 }
1033
1034 return nil
1035
1036 case jmodels.CommitOperationDelete:
1037 if err := db.DeleteLabelDefinition(
1038 ddb,
1039 orm.FilterEq("did", did),
1040 orm.FilterEq("rkey", rkey),
1041 ); err != nil {
1042 return fmt.Errorf("failed to delete labeldef record: %w", err)
1043 }
1044
1045 return nil
1046 }
1047
1048 return nil
1049}
1050
1051func (i *Ingester) ingestLabelOp(e *jmodels.Event) error {
1052 did := e.Did
1053 rkey := e.Commit.RKey
1054
1055 var err error
1056
1057 l := i.Logger.With("handler", "ingestLabelOp", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1058 l.Info("ingesting record")
1059
1060 ddb, ok := i.Db.Execer.(*db.DB)
1061 if !ok {
1062 return fmt.Errorf("failed to index label op, invalid db cast")
1063 }
1064
1065 switch e.Commit.Operation {
1066 case jmodels.CommitOperationCreate:
1067 raw := json.RawMessage(e.Commit.Record)
1068 record := tangled.LabelOp{}
1069 err = json.Unmarshal(raw, &record)
1070 if err != nil {
1071 return fmt.Errorf("invalid record: %w", err)
1072 }
1073
1074 subject := syntax.ATURI(record.Subject)
1075 collection := subject.Collection()
1076
1077 var repo *models.Repo
1078 switch collection {
1079 case tangled.RepoIssueNSID:
1080 i, err := db.GetIssues(ddb, orm.FilterEq("at_uri", subject))
1081 if err != nil || len(i) != 1 {
1082 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i))
1083 }
1084 repo = i[0].Repo
1085 default:
1086 return fmt.Errorf("unsupport label subject: %s", collection)
1087 }
1088
1089 actx, err := db.NewLabelApplicationCtx(ddb, orm.FilterIn("at_uri", repo.Labels))
1090 if err != nil {
1091 return fmt.Errorf("failed to build label application ctx: %w", err)
1092 }
1093
1094 ops := models.LabelOpsFromRecord(did, rkey, record)
1095
1096 for _, o := range ops {
1097 def, ok := actx.Defs[o.OperandKey]
1098 if !ok {
1099 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs)))
1100 }
1101 if err := i.Validator.ValidateLabelOp(def, repo, &o); err != nil {
1102 return fmt.Errorf("failed to validate labelop: %w", err)
1103 }
1104 }
1105
1106 tx, err := ddb.Begin()
1107 if err != nil {
1108 return err
1109 }
1110 defer tx.Rollback()
1111
1112 for _, o := range ops {
1113 _, err = db.AddLabelOp(tx, &o)
1114 if err != nil {
1115 return fmt.Errorf("failed to add labelop: %w", err)
1116 }
1117 }
1118
1119 if err = tx.Commit(); err != nil {
1120 return err
1121 }
1122 }
1123
1124 return nil
1125}