Monorepo for Tangled
1package appview
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "log/slog"
10 "maps"
11 "slices"
12
13 "time"
14
15 "github.com/bluesky-social/indigo/atproto/syntax"
16 jmodels "github.com/bluesky-social/jetstream/pkg/models"
17 "github.com/go-git/go-git/v5/plumbing"
18 "github.com/ipfs/go-cid"
19 "tangled.org/core/api/tangled"
20 "tangled.org/core/appview/config"
21 "tangled.org/core/appview/db"
22 "tangled.org/core/appview/models"
23 "tangled.org/core/appview/serververify"
24 "tangled.org/core/appview/validator"
25 "tangled.org/core/idresolver"
26 "tangled.org/core/orm"
27 "tangled.org/core/rbac"
28)
29
30type Ingester struct {
31 Db db.DbWrapper
32 Enforcer *rbac.Enforcer
33 IdResolver *idresolver.Resolver
34 Config *config.Config
35 Logger *slog.Logger
36 Validator *validator.Validator
37}
38
39type processFunc func(ctx context.Context, e *jmodels.Event) error
40
41func (i *Ingester) Ingest() processFunc {
42 return func(ctx context.Context, e *jmodels.Event) error {
43 var err error
44 defer func() {
45 eventTime := e.TimeUS
46 lastTimeUs := eventTime + 1
47 if err := i.Db.SaveLastTimeUs(lastTimeUs); err != nil {
48 err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
49 }
50 }()
51
52 l := i.Logger.With("kind", e.Kind)
53 switch e.Kind {
54 case jmodels.EventKindAccount:
55 if !e.Account.Active && *e.Account.Status == "deactivated" {
56 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did)
57 }
58 case jmodels.EventKindIdentity:
59 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did)
60 case jmodels.EventKindCommit:
61 switch e.Commit.Collection {
62 case tangled.GraphFollowNSID:
63 err = i.ingestFollow(e)
64 case tangled.FeedStarNSID:
65 err = i.ingestStar(e)
66 case tangled.PublicKeyNSID:
67 err = i.ingestPublicKey(e)
68 case tangled.RepoArtifactNSID:
69 err = i.ingestArtifact(e)
70 case tangled.ActorProfileNSID:
71 err = i.ingestProfile(e)
72 case tangled.SpindleMemberNSID:
73 err = i.ingestSpindleMember(ctx, e)
74 case tangled.SpindleNSID:
75 err = i.ingestSpindle(ctx, e)
76 case tangled.KnotMemberNSID:
77 err = i.ingestKnotMember(e)
78 case tangled.KnotNSID:
79 err = i.ingestKnot(e)
80 case tangled.StringNSID:
81 err = i.ingestString(e)
82 case tangled.RepoIssueNSID:
83 err = i.ingestIssue(ctx, e)
84 case tangled.RepoIssueCommentNSID:
85 err = i.ingestIssueComment(e)
86 case tangled.LabelDefinitionNSID:
87 err = i.ingestLabelDefinition(e)
88 case tangled.LabelOpNSID:
89 err = i.ingestLabelOp(e)
90 }
91 l = i.Logger.With("nsid", e.Commit.Collection)
92 }
93
94 if err != nil {
95 l.Warn("refused to ingest record", "err", err)
96 }
97
98 return nil
99 }
100}
101
102func (i *Ingester) ingestStar(e *jmodels.Event) error {
103 var err error
104 did := e.Did
105
106 l := i.Logger.With("handler", "ingestStar")
107 l = l.With("nsid", e.Commit.Collection)
108
109 switch e.Commit.Operation {
110 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
111 var subjectUri syntax.ATURI
112
113 raw := json.RawMessage(e.Commit.Record)
114 record := tangled.FeedStar{}
115 err := json.Unmarshal(raw, &record)
116 if err != nil {
117 l.Error("invalid record", "err", err)
118 return err
119 }
120
121 subjectUri, err = syntax.ParseATURI(record.Subject)
122 if err != nil {
123 l.Error("invalid record", "err", err)
124 return err
125 }
126 star := &models.Star{
127 Did: did,
128 RepoAt: subjectUri,
129 Rkey: e.Commit.RKey,
130 }
131 if record.SubjectDid != nil {
132 star.SubjectDid = *record.SubjectDid
133 }
134 if star.SubjectDid == "" {
135 repo, repoErr := db.GetRepoByAtUri(i.Db, subjectUri.String())
136 if repoErr == nil && repo.RepoDid != "" {
137 star.SubjectDid = repo.RepoDid
138 if enqErr := db.EnqueuePdsRewrite(i.Db, did, repo.RepoDid, tangled.FeedStarNSID, e.Commit.RKey, record.Subject); enqErr != nil {
139 l.Warn("failed to enqueue PDS rewrite for star", "err", enqErr, "did", did, "repoDid", repo.RepoDid)
140 }
141 }
142 }
143 err = db.AddStar(i.Db, star)
144 case jmodels.CommitOperationDelete:
145 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey)
146 }
147
148 if err != nil {
149 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err)
150 }
151
152 return nil
153}
154
155func (i *Ingester) ingestFollow(e *jmodels.Event) error {
156 var err error
157 did := e.Did
158
159 l := i.Logger.With("handler", "ingestFollow")
160 l = l.With("nsid", e.Commit.Collection)
161
162 switch e.Commit.Operation {
163 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
164 raw := json.RawMessage(e.Commit.Record)
165 record := tangled.GraphFollow{}
166 err = json.Unmarshal(raw, &record)
167 if err != nil {
168 l.Error("invalid record", "err", err)
169 return err
170 }
171
172 err = db.AddFollow(i.Db, &models.Follow{
173 UserDid: did,
174 SubjectDid: record.Subject,
175 Rkey: e.Commit.RKey,
176 })
177 case jmodels.CommitOperationDelete:
178 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey)
179 }
180
181 if err != nil {
182 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err)
183 }
184
185 return nil
186}
187
188func (i *Ingester) ingestPublicKey(e *jmodels.Event) error {
189 did := e.Did
190 var err error
191
192 l := i.Logger.With("handler", "ingestPublicKey")
193 l = l.With("nsid", e.Commit.Collection)
194
195 switch e.Commit.Operation {
196 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
197 l.Debug("processing add of pubkey")
198 raw := json.RawMessage(e.Commit.Record)
199 record := tangled.PublicKey{}
200 err = json.Unmarshal(raw, &record)
201 if err != nil {
202 l.Error("invalid record", "err", err)
203 return err
204 }
205
206 name := record.Name
207 key := record.Key
208 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey)
209 case jmodels.CommitOperationDelete:
210 l.Debug("processing delete of pubkey")
211 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey)
212 }
213
214 if err != nil {
215 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err)
216 }
217
218 return nil
219}
220
221func (i *Ingester) ingestArtifact(e *jmodels.Event) error {
222 did := e.Did
223 var err error
224
225 l := i.Logger.With("handler", "ingestArtifact")
226 l = l.With("nsid", e.Commit.Collection)
227
228 switch e.Commit.Operation {
229 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
230 raw := json.RawMessage(e.Commit.Record)
231 record := tangled.RepoArtifact{}
232 err = json.Unmarshal(raw, &record)
233 if err != nil {
234 l.Error("invalid record", "err", err)
235 return err
236 }
237
238 repoAt, err := syntax.ParseATURI(record.Repo)
239 if err != nil {
240 return err
241 }
242
243 var repo *models.Repo
244 if record.RepoDid != nil && *record.RepoDid != "" {
245 repo, err = db.GetRepoByDid(i.Db, *record.RepoDid)
246 if err != nil && !errors.Is(err, sql.ErrNoRows) {
247 return fmt.Errorf("failed to look up repo by DID %s: %w", *record.RepoDid, err)
248 }
249 }
250 if repo == nil {
251 repo, err = db.GetRepoByAtUri(i.Db, repoAt.String())
252 if err != nil {
253 return err
254 }
255 }
256
257 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.DidSlashRepo(), "repo:push")
258 if err != nil || !ok {
259 return err
260 }
261
262 repoDid := repo.RepoDid
263 if repoDid == "" && record.RepoDid != nil {
264 repoDid = *record.RepoDid
265 }
266 if repoDid != "" && (record.RepoDid == nil || *record.RepoDid == "") {
267 if enqErr := db.EnqueuePdsRewrite(i.Db, did, repoDid, tangled.RepoArtifactNSID, e.Commit.RKey, record.Repo); enqErr != nil {
268 l.Warn("failed to enqueue PDS rewrite for artifact", "err", enqErr, "did", did, "repoDid", repoDid)
269 }
270 }
271
272 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt)
273 if err != nil {
274 createdAt = time.Now()
275 }
276
277 artifact := models.Artifact{
278 Did: did,
279 Rkey: e.Commit.RKey,
280 RepoAt: repoAt,
281 RepoDid: repoDid,
282 Tag: plumbing.Hash(record.Tag),
283 CreatedAt: createdAt,
284 BlobCid: cid.Cid(record.Artifact.Ref),
285 Name: record.Name,
286 Size: uint64(record.Artifact.Size),
287 MimeType: record.Artifact.MimeType,
288 }
289
290 err = db.AddArtifact(i.Db, artifact)
291 case jmodels.CommitOperationDelete:
292 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey))
293 }
294
295 if err != nil {
296 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err)
297 }
298
299 return nil
300}
301
302func (i *Ingester) ingestProfile(e *jmodels.Event) error {
303 did := e.Did
304 var err error
305
306 l := i.Logger.With("handler", "ingestProfile")
307 l = l.With("nsid", e.Commit.Collection)
308
309 if e.Commit.RKey != "self" {
310 return fmt.Errorf("ingestProfile only ingests `self` record")
311 }
312
313 switch e.Commit.Operation {
314 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
315 raw := json.RawMessage(e.Commit.Record)
316 record := tangled.ActorProfile{}
317 err = json.Unmarshal(raw, &record)
318 if err != nil {
319 l.Error("invalid record", "err", err)
320 return err
321 }
322
323 avatar := ""
324 if record.Avatar != nil {
325 avatar = record.Avatar.Ref.String()
326 }
327
328 description := ""
329 if record.Description != nil {
330 description = *record.Description
331 }
332
333 includeBluesky := record.Bluesky
334
335 pronouns := ""
336 if record.Pronouns != nil {
337 pronouns = *record.Pronouns
338 }
339
340 location := ""
341 if record.Location != nil {
342 location = *record.Location
343 }
344
345 var links [5]string
346 for i, l := range record.Links {
347 if i < 5 {
348 links[i] = l
349 }
350 }
351
352 var stats [2]models.VanityStat
353 for i, s := range record.Stats {
354 if i < 2 {
355 stats[i].Kind = models.ParseVanityStatKind(s)
356 }
357 }
358
359 var pinned [6]syntax.ATURI
360 for i, r := range record.PinnedRepositories {
361 if i < 6 {
362 pinned[i] = syntax.ATURI(r)
363 }
364 }
365
366 profile := models.Profile{
367 Did: did,
368 Avatar: avatar,
369 Description: description,
370 IncludeBluesky: includeBluesky,
371 Location: location,
372 Links: links,
373 Stats: stats,
374 PinnedRepos: pinned,
375 Pronouns: pronouns,
376 }
377
378 ddb, ok := i.Db.Execer.(*db.DB)
379 if !ok {
380 return fmt.Errorf("failed to index profile record, invalid db cast")
381 }
382
383 tx, err := ddb.Begin()
384 if err != nil {
385 return fmt.Errorf("failed to start transaction")
386 }
387
388 err = db.ValidateProfile(tx, &profile)
389 if err != nil {
390 return fmt.Errorf("invalid profile record")
391 }
392
393 err = db.UpsertProfile(tx, &profile)
394 case jmodels.CommitOperationDelete:
395 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey))
396 }
397
398 if err != nil {
399 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err)
400 }
401
402 return nil
403}
404
405func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event) error {
406 did := e.Did
407 var err error
408
409 l := i.Logger.With("handler", "ingestSpindleMember")
410 l = l.With("nsid", e.Commit.Collection)
411
412 switch e.Commit.Operation {
413 case jmodels.CommitOperationCreate:
414 raw := json.RawMessage(e.Commit.Record)
415 record := tangled.SpindleMember{}
416 err = json.Unmarshal(raw, &record)
417 if err != nil {
418 l.Error("invalid record", "err", err)
419 return err
420 }
421
422 // only spindle owner can invite to spindles
423 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance)
424 if err != nil || !ok {
425 return fmt.Errorf("failed to enforce permissions: %w", err)
426 }
427
428 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject)
429 if err != nil {
430 return err
431 }
432
433 if memberId.Handle.IsInvalidHandle() {
434 return err
435 }
436
437 ddb, ok := i.Db.Execer.(*db.DB)
438 if !ok {
439 return fmt.Errorf("invalid db cast")
440 }
441
442 err = db.AddSpindleMember(ddb, models.SpindleMember{
443 Did: syntax.DID(did),
444 Rkey: e.Commit.RKey,
445 Instance: record.Instance,
446 Subject: memberId.DID,
447 })
448 if !ok {
449 return fmt.Errorf("failed to add to db: %w", err)
450 }
451
452 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String())
453 if err != nil {
454 return fmt.Errorf("failed to update ACLs: %w", err)
455 }
456
457 l.Info("added spindle member")
458 case jmodels.CommitOperationDelete:
459 rkey := e.Commit.RKey
460
461 ddb, ok := i.Db.Execer.(*db.DB)
462 if !ok {
463 return fmt.Errorf("failed to index profile record, invalid db cast")
464 }
465
466 // get record from db first
467 members, err := db.GetSpindleMembers(
468 ddb,
469 orm.FilterEq("did", did),
470 orm.FilterEq("rkey", rkey),
471 )
472 if err != nil || len(members) != 1 {
473 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members))
474 }
475 member := members[0]
476
477 tx, err := ddb.Begin()
478 if err != nil {
479 return fmt.Errorf("failed to start txn: %w", err)
480 }
481
482 // remove record by rkey && update enforcer
483 if err = db.RemoveSpindleMember(
484 tx,
485 orm.FilterEq("did", did),
486 orm.FilterEq("rkey", rkey),
487 ); err != nil {
488 return fmt.Errorf("failed to remove from db: %w", err)
489 }
490
491 // update enforcer
492 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String())
493 if err != nil {
494 return fmt.Errorf("failed to update ACLs: %w", err)
495 }
496
497 if err = tx.Commit(); err != nil {
498 return fmt.Errorf("failed to commit txn: %w", err)
499 }
500
501 if err = i.Enforcer.E.SavePolicy(); err != nil {
502 return fmt.Errorf("failed to save ACLs: %w", err)
503 }
504
505 l.Info("removed spindle member")
506 }
507
508 return nil
509}
510
511func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event) error {
512 did := e.Did
513 var err error
514
515 l := i.Logger.With("handler", "ingestSpindle")
516 l = l.With("nsid", e.Commit.Collection)
517
518 switch e.Commit.Operation {
519 case jmodels.CommitOperationCreate:
520 raw := json.RawMessage(e.Commit.Record)
521 record := tangled.Spindle{}
522 err = json.Unmarshal(raw, &record)
523 if err != nil {
524 l.Error("invalid record", "err", err)
525 return err
526 }
527
528 instance := e.Commit.RKey
529
530 ddb, ok := i.Db.Execer.(*db.DB)
531 if !ok {
532 return fmt.Errorf("failed to index profile record, invalid db cast")
533 }
534
535 err := db.AddSpindle(ddb, models.Spindle{
536 Owner: syntax.DID(did),
537 Instance: instance,
538 })
539 if err != nil {
540 l.Error("failed to add spindle to db", "err", err, "instance", instance)
541 return err
542 }
543
544 err = serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev)
545 if err != nil {
546 l.Error("failed to add spindle to db", "err", err, "instance", instance)
547 return err
548 }
549
550 _, err = serververify.MarkSpindleVerified(ddb, i.Enforcer, instance, did)
551 if err != nil {
552 return fmt.Errorf("failed to mark verified: %w", err)
553 }
554
555 return nil
556
557 case jmodels.CommitOperationDelete:
558 instance := e.Commit.RKey
559
560 ddb, ok := i.Db.Execer.(*db.DB)
561 if !ok {
562 return fmt.Errorf("failed to index profile record, invalid db cast")
563 }
564
565 // get record from db first
566 spindles, err := db.GetSpindles(
567 ddb,
568 orm.FilterEq("owner", did),
569 orm.FilterEq("instance", instance),
570 )
571 if err != nil || len(spindles) != 1 {
572 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles))
573 }
574 spindle := spindles[0]
575
576 tx, err := ddb.Begin()
577 if err != nil {
578 return err
579 }
580 defer func() {
581 tx.Rollback()
582 i.Enforcer.E.LoadPolicy()
583 }()
584
585 // remove spindle members first
586 err = db.RemoveSpindleMember(
587 tx,
588 orm.FilterEq("owner", did),
589 orm.FilterEq("instance", instance),
590 )
591 if err != nil {
592 return err
593 }
594
595 err = db.DeleteSpindle(
596 tx,
597 orm.FilterEq("owner", did),
598 orm.FilterEq("instance", instance),
599 )
600 if err != nil {
601 return err
602 }
603
604 if spindle.Verified != nil {
605 err = i.Enforcer.RemoveSpindle(instance)
606 if err != nil {
607 return err
608 }
609 }
610
611 err = tx.Commit()
612 if err != nil {
613 return err
614 }
615
616 err = i.Enforcer.E.SavePolicy()
617 if err != nil {
618 return err
619 }
620 }
621
622 return nil
623}
624
625func (i *Ingester) ingestString(e *jmodels.Event) error {
626 did := e.Did
627 rkey := e.Commit.RKey
628
629 var err error
630
631 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
632 l.Info("ingesting record")
633
634 ddb, ok := i.Db.Execer.(*db.DB)
635 if !ok {
636 return fmt.Errorf("failed to index string record, invalid db cast")
637 }
638
639 switch e.Commit.Operation {
640 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
641 raw := json.RawMessage(e.Commit.Record)
642 record := tangled.String{}
643 err = json.Unmarshal(raw, &record)
644 if err != nil {
645 l.Error("invalid record", "err", err)
646 return err
647 }
648
649 string := models.StringFromRecord(did, rkey, record)
650
651 if err = i.Validator.ValidateString(&string); err != nil {
652 l.Error("invalid record", "err", err)
653 return err
654 }
655
656 if err = db.AddString(ddb, string); err != nil {
657 l.Error("failed to add string", "err", err)
658 return err
659 }
660
661 return nil
662
663 case jmodels.CommitOperationDelete:
664 if err := db.DeleteString(
665 ddb,
666 orm.FilterEq("did", did),
667 orm.FilterEq("rkey", rkey),
668 ); err != nil {
669 l.Error("failed to delete", "err", err)
670 return fmt.Errorf("failed to delete string record: %w", err)
671 }
672
673 return nil
674 }
675
676 return nil
677}
678
679func (i *Ingester) ingestKnotMember(e *jmodels.Event) error {
680 did := e.Did
681 var err error
682
683 l := i.Logger.With("handler", "ingestKnotMember")
684 l = l.With("nsid", e.Commit.Collection)
685
686 switch e.Commit.Operation {
687 case jmodels.CommitOperationCreate:
688 raw := json.RawMessage(e.Commit.Record)
689 record := tangled.KnotMember{}
690 err = json.Unmarshal(raw, &record)
691 if err != nil {
692 l.Error("invalid record", "err", err)
693 return err
694 }
695
696 // only knot owner can invite to knots
697 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain)
698 if err != nil || !ok {
699 return fmt.Errorf("failed to enforce permissions: %w", err)
700 }
701
702 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject)
703 if err != nil {
704 return err
705 }
706
707 if memberId.Handle.IsInvalidHandle() {
708 return err
709 }
710
711 err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String())
712 if err != nil {
713 return fmt.Errorf("failed to update ACLs: %w", err)
714 }
715
716 l.Info("added knot member")
717 case jmodels.CommitOperationDelete:
718 // we don't store knot members in a table (like we do for spindle)
719 // and we can't remove this just yet. possibly fixed if we switch
720 // to either:
721 // 1. a knot_members table like with spindle and store the rkey
722 // 2. use the knot host as the rkey
723 //
724 // TODO: implement member deletion
725 l.Info("skipping knot member delete", "did", did, "rkey", e.Commit.RKey)
726 }
727
728 return nil
729}
730
731func (i *Ingester) ingestKnot(e *jmodels.Event) error {
732 did := e.Did
733 var err error
734
735 l := i.Logger.With("handler", "ingestKnot")
736 l = l.With("nsid", e.Commit.Collection)
737
738 switch e.Commit.Operation {
739 case jmodels.CommitOperationCreate:
740 raw := json.RawMessage(e.Commit.Record)
741 record := tangled.Knot{}
742 err = json.Unmarshal(raw, &record)
743 if err != nil {
744 l.Error("invalid record", "err", err)
745 return err
746 }
747
748 domain := e.Commit.RKey
749
750 ddb, ok := i.Db.Execer.(*db.DB)
751 if !ok {
752 return fmt.Errorf("failed to index profile record, invalid db cast")
753 }
754
755 err := db.AddKnot(ddb, domain, did)
756 if err != nil {
757 l.Error("failed to add knot to db", "err", err, "domain", domain)
758 return err
759 }
760
761 err = serververify.RunVerification(context.Background(), domain, did, i.Config.Core.Dev)
762 if err != nil {
763 l.Error("failed to verify knot", "err", err, "domain", domain)
764 return err
765 }
766
767 err = serververify.MarkKnotVerified(ddb, i.Enforcer, domain, did)
768 if err != nil {
769 return fmt.Errorf("failed to mark verified: %w", err)
770 }
771
772 return nil
773
774 case jmodels.CommitOperationDelete:
775 domain := e.Commit.RKey
776
777 ddb, ok := i.Db.Execer.(*db.DB)
778 if !ok {
779 return fmt.Errorf("failed to index knot record, invalid db cast")
780 }
781
782 // get record from db first
783 registrations, err := db.GetRegistrations(
784 ddb,
785 orm.FilterEq("domain", domain),
786 orm.FilterEq("did", did),
787 )
788 if err != nil {
789 return fmt.Errorf("failed to get registration: %w", err)
790 }
791 if len(registrations) != 1 {
792 return fmt.Errorf("got incorret number of registrations: %d, expected 1", len(registrations))
793 }
794 registration := registrations[0]
795
796 tx, err := ddb.Begin()
797 if err != nil {
798 return err
799 }
800 defer func() {
801 tx.Rollback()
802 i.Enforcer.E.LoadPolicy()
803 }()
804
805 err = db.DeleteKnot(
806 tx,
807 orm.FilterEq("did", did),
808 orm.FilterEq("domain", domain),
809 )
810 if err != nil {
811 return err
812 }
813
814 if registration.Registered != nil {
815 err = i.Enforcer.RemoveKnot(domain)
816 if err != nil {
817 return err
818 }
819 }
820
821 err = tx.Commit()
822 if err != nil {
823 return err
824 }
825
826 err = i.Enforcer.E.SavePolicy()
827 if err != nil {
828 return err
829 }
830 }
831
832 return nil
833}
834func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event) error {
835 did := e.Did
836 rkey := e.Commit.RKey
837
838 var err error
839
840 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
841 l.Info("ingesting record")
842
843 ddb, ok := i.Db.Execer.(*db.DB)
844 if !ok {
845 return fmt.Errorf("failed to index issue record, invalid db cast")
846 }
847
848 switch e.Commit.Operation {
849 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
850 raw := json.RawMessage(e.Commit.Record)
851 record := tangled.RepoIssue{}
852 err = json.Unmarshal(raw, &record)
853 if err != nil {
854 l.Error("invalid record", "err", err)
855 return err
856 }
857
858 issue := models.IssueFromRecord(did, rkey, record)
859
860 if err := i.Validator.ValidateIssue(&issue); err != nil {
861 return fmt.Errorf("failed to validate issue: %w", err)
862 }
863
864 if issue.RepoDid == "" {
865 repo, repoErr := db.GetRepoByAtUri(i.Db, record.Repo)
866 if repoErr == nil && repo.RepoDid != "" {
867 issue.RepoDid = repo.RepoDid
868 if enqErr := db.EnqueuePdsRewrite(i.Db, did, repo.RepoDid, tangled.RepoIssueNSID, rkey, record.Repo); enqErr != nil {
869 l.Warn("failed to enqueue PDS rewrite for issue", "err", enqErr, "did", did, "repoDid", repo.RepoDid)
870 }
871 }
872 }
873
874 tx, err := ddb.BeginTx(ctx, nil)
875 if err != nil {
876 l.Error("failed to begin transaction", "err", err)
877 return err
878 }
879 defer tx.Rollback()
880
881 err = db.PutIssue(tx, &issue)
882 if err != nil {
883 l.Error("failed to create issue", "err", err)
884 return err
885 }
886
887 err = tx.Commit()
888 if err != nil {
889 l.Error("failed to commit txn", "err", err)
890 return err
891 }
892
893 return nil
894
895 case jmodels.CommitOperationDelete:
896 tx, err := ddb.BeginTx(ctx, nil)
897 if err != nil {
898 l.Error("failed to begin transaction", "err", err)
899 return err
900 }
901 defer tx.Rollback()
902
903 if err := db.DeleteIssues(
904 tx,
905 did,
906 rkey,
907 ); err != nil {
908 l.Error("failed to delete", "err", err)
909 return fmt.Errorf("failed to delete issue record: %w", err)
910 }
911 if err := tx.Commit(); err != nil {
912 l.Error("failed to commit txn", "err", err)
913 return err
914 }
915
916 return nil
917 }
918
919 return nil
920}
921
922func (i *Ingester) ingestIssueComment(e *jmodels.Event) error {
923 did := e.Did
924 rkey := e.Commit.RKey
925
926 var err error
927
928 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
929 l.Info("ingesting record")
930
931 ddb, ok := i.Db.Execer.(*db.DB)
932 if !ok {
933 return fmt.Errorf("failed to index issue comment record, invalid db cast")
934 }
935
936 switch e.Commit.Operation {
937 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
938 raw := json.RawMessage(e.Commit.Record)
939 record := tangled.RepoIssueComment{}
940 err = json.Unmarshal(raw, &record)
941 if err != nil {
942 return fmt.Errorf("invalid record: %w", err)
943 }
944
945 comment, err := models.IssueCommentFromRecord(did, rkey, record)
946 if err != nil {
947 return fmt.Errorf("failed to parse comment from record: %w", err)
948 }
949
950 if err := i.Validator.ValidateIssueComment(comment); err != nil {
951 return fmt.Errorf("failed to validate comment: %w", err)
952 }
953
954 tx, err := ddb.Begin()
955 if err != nil {
956 return fmt.Errorf("failed to start transaction: %w", err)
957 }
958 defer tx.Rollback()
959
960 _, err = db.AddIssueComment(tx, *comment)
961 if err != nil {
962 return fmt.Errorf("failed to create issue comment: %w", err)
963 }
964
965 return tx.Commit()
966
967 case jmodels.CommitOperationDelete:
968 if err := db.DeleteIssueComments(
969 ddb,
970 orm.FilterEq("did", did),
971 orm.FilterEq("rkey", rkey),
972 ); err != nil {
973 return fmt.Errorf("failed to delete issue comment record: %w", err)
974 }
975
976 return nil
977 }
978
979 return nil
980}
981
982func (i *Ingester) ingestLabelDefinition(e *jmodels.Event) error {
983 did := e.Did
984 rkey := e.Commit.RKey
985
986 var err error
987
988 l := i.Logger.With("handler", "ingestLabelDefinition", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
989 l.Info("ingesting record")
990
991 ddb, ok := i.Db.Execer.(*db.DB)
992 if !ok {
993 return fmt.Errorf("failed to index label definition, invalid db cast")
994 }
995
996 switch e.Commit.Operation {
997 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
998 raw := json.RawMessage(e.Commit.Record)
999 record := tangled.LabelDefinition{}
1000 err = json.Unmarshal(raw, &record)
1001 if err != nil {
1002 return fmt.Errorf("invalid record: %w", err)
1003 }
1004
1005 def, err := models.LabelDefinitionFromRecord(did, rkey, record)
1006 if err != nil {
1007 return fmt.Errorf("failed to parse labeldef from record: %w", err)
1008 }
1009
1010 if err := i.Validator.ValidateLabelDefinition(def); err != nil {
1011 return fmt.Errorf("failed to validate labeldef: %w", err)
1012 }
1013
1014 _, err = db.AddLabelDefinition(ddb, def)
1015 if err != nil {
1016 return fmt.Errorf("failed to create labeldef: %w", err)
1017 }
1018
1019 return nil
1020
1021 case jmodels.CommitOperationDelete:
1022 if err := db.DeleteLabelDefinition(
1023 ddb,
1024 orm.FilterEq("did", did),
1025 orm.FilterEq("rkey", rkey),
1026 ); err != nil {
1027 return fmt.Errorf("failed to delete labeldef record: %w", err)
1028 }
1029
1030 return nil
1031 }
1032
1033 return nil
1034}
1035
1036func (i *Ingester) ingestLabelOp(e *jmodels.Event) error {
1037 did := e.Did
1038 rkey := e.Commit.RKey
1039
1040 var err error
1041
1042 l := i.Logger.With("handler", "ingestLabelOp", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1043 l.Info("ingesting record")
1044
1045 ddb, ok := i.Db.Execer.(*db.DB)
1046 if !ok {
1047 return fmt.Errorf("failed to index label op, invalid db cast")
1048 }
1049
1050 switch e.Commit.Operation {
1051 case jmodels.CommitOperationCreate:
1052 raw := json.RawMessage(e.Commit.Record)
1053 record := tangled.LabelOp{}
1054 err = json.Unmarshal(raw, &record)
1055 if err != nil {
1056 return fmt.Errorf("invalid record: %w", err)
1057 }
1058
1059 subject := syntax.ATURI(record.Subject)
1060 collection := subject.Collection()
1061
1062 var repo *models.Repo
1063 switch collection {
1064 case tangled.RepoIssueNSID:
1065 i, err := db.GetIssues(ddb, orm.FilterEq("at_uri", subject))
1066 if err != nil || len(i) != 1 {
1067 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i))
1068 }
1069 repo = i[0].Repo
1070 default:
1071 return fmt.Errorf("unsupport label subject: %s", collection)
1072 }
1073
1074 actx, err := db.NewLabelApplicationCtx(ddb, orm.FilterIn("at_uri", repo.Labels))
1075 if err != nil {
1076 return fmt.Errorf("failed to build label application ctx: %w", err)
1077 }
1078
1079 ops := models.LabelOpsFromRecord(did, rkey, record)
1080
1081 for _, o := range ops {
1082 def, ok := actx.Defs[o.OperandKey]
1083 if !ok {
1084 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs)))
1085 }
1086 if err := i.Validator.ValidateLabelOp(def, repo, &o); err != nil {
1087 return fmt.Errorf("failed to validate labelop: %w", err)
1088 }
1089 }
1090
1091 tx, err := ddb.Begin()
1092 if err != nil {
1093 return err
1094 }
1095 defer tx.Rollback()
1096
1097 for _, o := range ops {
1098 _, err = db.AddLabelOp(tx, &o)
1099 if err != nil {
1100 return fmt.Errorf("failed to add labelop: %w", err)
1101 }
1102 }
1103
1104 if err = tx.Commit(); err != nil {
1105 return err
1106 }
1107 }
1108
1109 return nil
1110}